實(shí)時(shí)計(jì)算引擎也叫流式計(jì)算引擎,常用的目前有3個(gè):
1、Apache Storm:真正的流式計(jì)算
2、Spark Streaming:嚴(yán)格上來(lái)說(shuō),不是真正的流式計(jì)算(實(shí)時(shí)計(jì)算)
? 把連續(xù)的流式數(shù)據(jù),當(dāng)成不連續(xù)的RDD來(lái)處理
? 本質(zhì):是一個(gè)離散計(jì)算(不連續(xù)的數(shù)據(jù))
? 面試中問(wèn)到時(shí):先說(shuō)它的本質(zhì),
? 然后說(shuō)自己的理解
? 常用的方法
? 和其他同類型技術(shù)的對(duì)比
3、Apache Flink:真正的流式計(jì)算。和Spark Streaming相反。
? 本質(zhì):一個(gè)流式計(jì)算,雖然可以用于離線計(jì)算,但是本質(zhì)上是將離散數(shù)據(jù)模擬成流式數(shù)據(jù)來(lái)給flink做流式計(jì)算
創(chuàng)新互聯(lián)專注于橋西企業(yè)網(wǎng)站建設(shè),自適應(yīng)網(wǎng)站建設(shè),商城網(wǎng)站定制開(kāi)發(fā)。橋西網(wǎng)站建設(shè)公司,為橋西等地區(qū)提供建站服務(wù)。全流程按需求定制開(kāi)發(fā),專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)
?
? Spark Streaming是核心Spark API的擴(kuò)展,可實(shí)現(xiàn)可擴(kuò)展、高吞吐量、可容錯(cuò)的實(shí)時(shí)數(shù)據(jù)流處理。數(shù)據(jù)可以從諸如Kafka,F(xiàn)lume,Kinesis或TCP套接字等眾多來(lái)源獲取,并且可以使用由高級(jí)函數(shù)(如map,reduce,join和window)開(kāi)發(fā)的復(fù)雜算法進(jìn)行流數(shù)據(jù)處理。最后,處理后的數(shù)據(jù)可以被推送到文件系統(tǒng),數(shù)據(jù)庫(kù)和實(shí)時(shí)儀表板。而且,您還可以在數(shù)據(jù)流上應(yīng)用Spark提供的機(jī)器學(xué)習(xí)和圖處理算法。
特點(diǎn):
1、易用:集成在Spark中
2、容錯(cuò)性:底層RDD,RDD本身就具備容錯(cuò)機(jī)制。
3、支持多種編程語(yǔ)言:Java Scala Python
?
spark-streaming用來(lái)接收實(shí)時(shí)數(shù)據(jù),然后處理程序通過(guò)類似于定時(shí)采樣的方式分批取得數(shù)據(jù),每一批數(shù)據(jù)就是一個(gè)RDD,最終輸入給處理程序的是一個(gè)RDD隊(duì)列流,這個(gè)流其實(shí)就是discretizedstream或DStream。在內(nèi)部,DStream 由一個(gè)RDD序列表示。DStream對(duì)象就是可以用來(lái)調(diào)用各種算子進(jìn)行處理
圖1.1 DStream原理
首先啟動(dòng)netcat服務(wù)器,并監(jiān)聽(tīng)在端口1234上
nc -l 1234
沒(méi)有這個(gè)命令就 yum -y install netcat 安裝一下
接著啟動(dòng)spark-streaming樣例程序,從本地的1234端口獲取數(shù)據(jù),并進(jìn)行wordcount操作
到spark的安裝目錄下,執(zhí)行bin目錄下的命令:
bin/run-example streaming.NetworkWordCount localhost 1234
然后在netcat端輸入各種字符串:
[root@bigdata121 hive-1.2.1-bin]# nc -l 1234
king king hello
在另外一個(gè)窗口查看統(tǒng)計(jì)信息:
-------------------------------------------
Time: 1567005584000 ms
-------------------------------------------
(hello,1)
(king,2)
這邊就立馬統(tǒng)計(jì)出來(lái)了
首先maven中pom.xml記得再加上streaming的依賴(為了方便最好spark所有組件的依賴都加上)
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>king</groupId>
<artifactId>sparkTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.1.0</spark.version>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.1.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>MySQL</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.12</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-configuration -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>1.8.0</version>
</dependency>
</dependencies>
<!--下面這是maven打包scala的插件,一定要,否則直接忽略scala代碼-->
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
代碼:
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* wordcount流式計(jì)算程序
*
* 1、創(chuàng)建streamingContext對(duì)象
* 創(chuàng)建DStream流(離散流)
* 本質(zhì)是離散計(jì)算
*
* 離散:將連續(xù)數(shù)據(jù)變成離散數(shù)據(jù),并實(shí)時(shí)立刻處理
* 離線:并非是實(shí)時(shí)處理的
*
* 2、DStream表現(xiàn)形式就是RDD
* 和操作RDD一樣
*
* 3、使用DStream將連續(xù)的數(shù)據(jù)庫(kù)切割成離散的RDD
*/
object NetworkWordCount {
def main(args: Array[String]): Unit = {
//設(shè)置日志級(jí)別為ERROR,默認(rèn)是INFO
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
//Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
/**
這是 StreamingContext 對(duì)象的標(biāo)準(zhǔn)創(chuàng)建方式
無(wú)法通過(guò) sparkSession對(duì)象來(lái)創(chuàng)建
*/
//創(chuàng)建streamingContext對(duì)象,指定master為local[2],意思是使用至少兩個(gè)核心,即兩個(gè)線程,一個(gè)用于發(fā)送數(shù)據(jù),一個(gè)處理數(shù)據(jù)
val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
//這里指定conf對(duì)象,還有批處理的時(shí)間間隔為3秒,每3秒切一個(gè)rdd,然后處理.
val streamingContext = new StreamingContext(conf, Seconds(3))
//創(chuàng)建接收數(shù)據(jù)源,這里創(chuàng)建socketstream,接收數(shù)據(jù),內(nèi)部會(huì)自動(dòng)切割成一個(gè)個(gè)rdd。
//指定監(jiān)聽(tīng)的主機(jī)端口
val streamText = streamingContext.socketTextStream("bigdata121", 1234, StorageLevel.MEMORY_ONLY)
//wordcount流程
val rdd1 = streamText.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//打印結(jié)構(gòu)
rdd1.print()
//啟動(dòng)streamingContext,開(kāi)始計(jì)算
streamingContext.start()
//等待任務(wù)結(jié)束
streamingContext.awaitTermination()
}
}
在bigdata121虛擬機(jī)上啟動(dòng)netcat服務(wù):
nc -l 1234
idea中運(yùn)行上面的程序,并在netcat中輸入字符,結(jié)構(gòu)和實(shí)例的一樣
1、StreamingContext會(huì)內(nèi)在的創(chuàng)建一個(gè)SparkContext的實(shí)例(所有Spark功能的起始點(diǎn)),你可以通過(guò)ssc.sparkContext訪問(wèn)到這個(gè)實(shí)例。
2、一旦一個(gè)StreamingContext開(kāi)始運(yùn)作,就不能設(shè)置或添加新的流計(jì)算。
3、一旦一個(gè)上下文被停止,它將無(wú)法重新啟動(dòng)。
4、同一時(shí)刻,一個(gè)JVM中只能有一個(gè)StreamingContext處于活動(dòng)狀態(tài)。
5、StreamingContext上的stop()方法也會(huì)停止SparkContext。 要僅停止StreamingContext(保持SparkContext活躍),請(qǐng)將stop() 方法的可選參數(shù)stopSparkContext設(shè)置為false。
6、只要前一個(gè)StreamingContext在下一個(gè)StreamingContext被創(chuàng)建之前停止(不停止SparkContext),SparkContext就可以被重用來(lái)創(chuàng)建多個(gè)StreamingContext。
?
? DStream對(duì)象可以說(shuō)整個(gè)spark-streaming程序的一個(gè)數(shù)據(jù)的出口,處理的數(shù)據(jù)都從這里來(lái)。前面也說(shuō)了,這個(gè)對(duì)象里面其實(shí)一個(gè)個(gè)的RDD,這是DStream的本質(zhì)。而且經(jīng)過(guò)算子的轉(zhuǎn)換之后,DStream仍舊是DStream對(duì)象,里面也還是RDD。所以算子轉(zhuǎn)換的過(guò)程和普通RDD的概率類似??偟膩?lái)說(shuō)streaming程序中,就是DStream之間的轉(zhuǎn)換,本質(zhì)上就是DStream中的RDD的轉(zhuǎn)換
?
算子列表:
圖2.1 DStream算子
和普通rdd很類似,有兩個(gè)比較特殊的算子,transform和updateStateByKey
transform(RDD[T]=>RDD[U])
是一個(gè)用于將dstream中的rdd轉(zhuǎn)換成新的rdd的算子。所以要注意,這個(gè)算子中的處理函數(shù)是接收rdd作為參數(shù),不像其他算子是接收rdd中的數(shù)據(jù)作為參數(shù)的。
例子:
val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
//這里指定conf對(duì)象,還有批處理的時(shí)間間隔為4秒,每4秒切一個(gè)rdd,然后處理.
val streamingContext = new StreamingContext(conf, Seconds(3))
//創(chuàng)建socketstream,接收數(shù)據(jù),內(nèi)部會(huì)自動(dòng)切割成一個(gè)個(gè)rdd
val streamText = streamingContext.socketTextStream("bigdata121", 1234, StorageLevel.MEMORY_ONLY)
//接收的函數(shù)參數(shù)中就是rdd,然后在里面對(duì)rdd進(jìn)行處理,最后返回新的rdd
streamText.transform(rdd=>{
rdd.flatMap(_.split(" "))
})
? 默認(rèn)情況下,Spark Streaming 不記錄之前的狀態(tài),每次發(fā)一條數(shù)據(jù),都從0開(kāi)始。比如說(shuō)進(jìn)行單詞統(tǒng)計(jì)時(shí),之前統(tǒng)計(jì)的單詞數(shù)量并不會(huì)累加到下一次的統(tǒng)計(jì)中,下一次是從0開(kāi)始計(jì)數(shù)的。如果想進(jìn)行累加操作,使用這個(gè)算子來(lái)實(shí)現(xiàn)這個(gè)功能
updateStateByKey((Seq[T],Option[S])=>Option[S])
這個(gè)算子接收的函數(shù)的參數(shù)要求有兩個(gè):
Seq[T]:當(dāng)前對(duì)key進(jìn)行分組后,同一個(gè)key的value的一個(gè)集合,比如("age",[1,2,1,1])中的[1,2,1,1]
Option[S]:同一個(gè)key,在此之前的value總和,也就是這個(gè)key之前的計(jì)數(shù)狀態(tài)
返回值是之前的計(jì)數(shù)+現(xiàn)在的計(jì)數(shù)的一個(gè)返回值
例子:
下面將之前的wordcount改變一些,實(shí)現(xiàn)單詞的持續(xù)計(jì)數(shù),不會(huì)每次都重新從0開(kāi)始計(jì)數(shù)
package SparkStreamExer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 測(cè)試updateStateByKey 進(jìn)行狀態(tài)的累加
*/
object TestUpdateState {
def main(args: Array[String]): Unit = {
//設(shè)置日志級(jí)別為ERROR,默認(rèn)是INFO
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
//創(chuàng)建streamingContext對(duì)象,指定master為local[2],意思是使用至少兩個(gè)核心,即兩個(gè)線程,一個(gè)用于發(fā)送數(shù)據(jù),一個(gè)處理數(shù)據(jù)
val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
//這里指定conf對(duì)象,還有批處理的時(shí)間間隔為4秒,每4秒切一個(gè)rdd,然后處理.
val streamingContext = new StreamingContext(conf, Seconds(3))
//設(shè)置檢查點(diǎn),保存之前狀態(tài),需要保證目錄不存在
streamingContext.checkpoint("hdfs://bigdata121:9000/sparkCheckpoint/spark-streaming")
//創(chuàng)建socketstream,接收數(shù)據(jù),內(nèi)部會(huì)自動(dòng)切割成一個(gè)個(gè)rdd
val streamText = streamingContext.socketTextStream("bigdata121", 1234, StorageLevel.MEMORY_ONLY)
//切割數(shù)據(jù),并添加計(jì)數(shù)對(duì)
val wordPair = streamText.flatMap(_.split(" ")).map((_,1))
//累加處理函數(shù)
val addFunc = (currentValues:Seq[Int], previousValue:Option[Int]) => {
//當(dāng)前值累加
val currentSum = currentValues.sum
//取出之前的值.如果值不存在就返回0
val pre = previousValue.getOrElse(0)
//之前和現(xiàn)在的值相加
Option(pre + currentSum)
}
//更新,將舊計(jì)數(shù)更新為新計(jì)數(shù)狀態(tài)
wordPair.updateStateByKey(addFunc).print()
//啟動(dòng)streamingContext,開(kāi)始計(jì)算
streamingContext.start()
//等待任務(wù)結(jié)束
streamingContext.awaitTermination()
}
}
運(yùn)行這個(gè)demo的過(guò)程出現(xiàn)的報(bào)錯(cuò):
Caused by: java.lang.ClassNotFoundException: org.apache.commons.io.Charsets
說(shuō)是沒(méi)有org.apache.commons.io.Charsets 這個(gè)類,進(jìn)去org.apache.commons.io看了下,果然沒(méi)有,估計(jì)是包版本太舊了,沒(méi)有這個(gè)類,百度了一下,2.5版本的有這個(gè)類,所以就在pom.xml添加上新的依賴
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
接著運(yùn)行,OK了
這個(gè)算子類似forech,但是操作的對(duì)象是整個(gè)rdd,不是rdd中的某些數(shù)據(jù)。
foreachRDD(RDD[T]=>Unit)
一般用于將rdd的結(jié)果寫入其他存儲(chǔ)中,比如hdfs,mysql等
下面有一個(gè)關(guān)于 foreachRDD和sql 的例子。
應(yīng)用場(chǎng)景:
一般用于統(tǒng)計(jì)最近N小時(shí)的數(shù)據(jù),這樣的應(yīng)用的場(chǎng)景,這時(shí)候就需要窗口
原理圖:
圖2.2 spark-streaming窗口操作
? 窗口其實(shí)就是DStream的基礎(chǔ)上,再加上一個(gè)時(shí)間范圍。如圖所示,每當(dāng)窗口滑過(guò)originalDStream時(shí),落在窗口內(nèi)的源RDD被組合并被執(zhí)行操作以產(chǎn)生windowed DStream的RDD。在上面的例子中,操作應(yīng)用于最近3個(gè)時(shí)間單位的數(shù)據(jù),并以2個(gè)時(shí)間單位滑動(dòng)。所以窗口操作比起普通的DStream操作,普通的DStream是一個(gè)個(gè)RDD處理,而窗口則是一個(gè)時(shí)間范圍內(nèi)的RDD一起處理。而且窗口是DStream再上一層的一個(gè)封裝。
? 使用窗口的時(shí)候,有兩個(gè)關(guān)鍵參數(shù):
窗口長(zhǎng)度(windowlength):窗口的時(shí)間長(zhǎng)度(上圖的示例中為:3)
滑動(dòng)間隔(slidinginterval): 兩次相鄰的窗口操作的間隔(即每次滑動(dòng)的時(shí)間長(zhǎng)度)(上圖示例中為:2)
而且要注意的一點(diǎn)是:這兩個(gè)參數(shù)必須是源DStream的采樣間隔的倍數(shù)(上圖示例中為:1)。因?yàn)槿绻皇钦麛?shù)倍,就會(huì)導(dǎo)致窗口邊緣會(huì)將一個(gè)rdd分隔成兩份,這樣是不行的,spark沒(méi)辦法處理半個(gè)rdd,rdd是不可分的。
window(windowLength, slideInterval)
->基于源DStream產(chǎn)生的窗口化的批數(shù)據(jù)計(jì)算一個(gè)新的DStream
countByWindow(windowLength, slideInterval)
->返回流中元素的一個(gè)滑動(dòng)窗口數(shù)
reduceByWindow(func, windowLength, slideInterval)
->返回一個(gè)單元素流。利用函數(shù)func聚集滑動(dòng)時(shí)間間隔的流的元素創(chuàng)建這個(gè)單元素流。函數(shù)必須是相關(guān)聯(lián)的以使計(jì)算能夠正確的并行計(jì)算。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
->應(yīng)用到一個(gè)(K,V)對(duì)組成的DStream上,返回一個(gè)由(K,V)對(duì)組成的新的DStream。每一個(gè)key的值均由給定的reduce函數(shù)聚集起來(lái)。注意:在默認(rèn)情況下,這個(gè)算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組。你可以用numTasks參數(shù)設(shè)置不同的任務(wù)數(shù)
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
->上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一窗口的reduce計(jì)算結(jié)果遞增地計(jì)算每個(gè)窗口的reduce值。這是通過(guò)對(duì)進(jìn)入滑動(dòng)窗口的新數(shù)據(jù)進(jìn)行reduce操作,以及“逆減(inverse reducing)”離開(kāi)窗口的舊數(shù)據(jù)來(lái)完成的。一個(gè)例子是當(dāng)窗口滑動(dòng)時(shí)對(duì)鍵對(duì)應(yīng)的值進(jìn)行“一加一減”操作。但是,它僅適用于“可逆減函數(shù)(invertible reduce functions)”,即具有相應(yīng)“反減”功能的減函數(shù)(作為參數(shù)invFunc)。 像reduceByKeyAndWindow一樣,通過(guò)可選參數(shù)可以配置reduce任務(wù)的數(shù)量。 請(qǐng)注意,使用此操作必須啟用檢查點(diǎn)。
countByValueAndWindow(windowLength, slideInterval, [numTasks])
->應(yīng)用到一個(gè)(K,V)對(duì)組成的DStream上,返回一個(gè)由(K,V)對(duì)組成的新的DStream。每個(gè)key的值都是它們?cè)诨瑒?dòng)窗口中出現(xiàn)的頻率。
比較常用的是reduceByKeyAndWindow這個(gè),常用于統(tǒng)計(jì)固定最近一段時(shí)間內(nèi)的數(shù)據(jù),比如統(tǒng)計(jì)最近1小時(shí)訂單銷售量。下面把這個(gè)算子應(yīng)用到wordcount例子中。
窗口大小為30s,每10s滑動(dòng)一次窗口,并且對(duì)單詞的計(jì)數(shù)是累加的
package SparkStreamExer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 測(cè)試updateStateByKey 進(jìn)行狀態(tài)的累加
*/
object TestUpdateState {
def main(args: Array[String]): Unit = {
//設(shè)置日志級(jí)別為ERROR,默認(rèn)是INFO
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
//創(chuàng)建streamingContext對(duì)象,指定master為local[2],意思是使用至少兩個(gè)核心,即兩個(gè)線程,一個(gè)用于發(fā)送數(shù)據(jù),一個(gè)處理數(shù)據(jù)
val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
//這里指定conf對(duì)象,還有批處理的時(shí)間間隔為4秒,每4秒切一個(gè)rdd,然后處理.
val streamingContext = new StreamingContext(conf, Seconds(1))
//設(shè)置檢查點(diǎn),保存之前狀態(tài),需要保證目錄不存在
streamingContext.checkpoint("hdfs://bigdata121:9000/sparkCheckpoint/spark-streaming3")
//創(chuàng)建socketstream,接收數(shù)據(jù),內(nèi)部會(huì)自動(dòng)切割成一個(gè)個(gè)rdd
val streamText = streamingContext.socketTextStream("bigdata121", 1234, StorageLevel.MEMORY_ONLY)
//切割數(shù)據(jù),并添加計(jì)數(shù)對(duì)
val wordPair = streamText.flatMap(_.split(" ")).map((_,1))
//在這里添加一個(gè)窗口操作
val windowValue = wordPair.reduceByKeyAndWindow((x:Int,y:Int)=>x+y, Seconds(30), Seconds(10))
//累加處理函數(shù)
val addFunc = (currentValues:Seq[Int], previousValue:Option[Int]) => {
//當(dāng)前值累加
val currentSum = currentValues.sum
//取出之前的值.如果值不存在就返回0
val pre = previousValue.getOrElse(0)
//之前和現(xiàn)在的值相加
Option(pre + currentSum)
}
//更新,將舊計(jì)數(shù)更新為新計(jì)數(shù)狀態(tài)
//wordPair.updateStateByKey(addFunc).print()
windowValue.updateStateByKey(addFunc).print()
//啟動(dòng)streamingContext,開(kāi)始計(jì)算
streamingContext.start()
//等待任務(wù)結(jié)束
streamingContext.awaitTermination()
}
}
package SparkStreamExer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 將streaming的DStream轉(zhuǎn)為可以使用sql操作
*/
object StreamingAndSql {
def main(args: Array[String]): Unit = {
//設(shè)置日志級(jí)別為ERROR,默認(rèn)是INFO
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("streaming and sql").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
val lines = ssc.socketTextStream("bigdata121",1234, StorageLevel.MEMORY_ONLY)
val words = lines.flatMap(_.split(" "))
//需要將rdd轉(zhuǎn)為df對(duì)象,才能用于spark sql操作
words.foreachRDD(rdd => {
//從rdd中獲取conf配置,保證配置和rdd的配置一樣
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.sqlContext.implicits._
//rdd轉(zhuǎn)為df,并指定列名
val df = rdd.toDF("word")
//創(chuàng)建視圖并執(zhí)行sql
df.createOrReplaceTempView("tmp1")
val resultDF = spark.sql("select word,count(1) from tmp1 group by word")
resultDF.show()
})
ssc.start()
ssc.awaitTermination()
}
}
?
這個(gè)和rdd中類似,只不過(guò)streaming中是通過(guò) StreamingContext對(duì)象進(jìn)行checkpoint:
//創(chuàng)建streamingContext對(duì)象,指定master為local[2],意思是使用至少兩個(gè)核心,即兩個(gè)線程,一個(gè)用于發(fā)送數(shù)據(jù),一個(gè)處理數(shù)據(jù)
val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
//這里指定conf對(duì)象,還有批處理的時(shí)間間隔為4秒,每4秒切一個(gè)rdd,然后處理.
val streamingContext = new StreamingContext(conf, Seconds(1))
//設(shè)置檢查點(diǎn),保存之前狀態(tài),需要保證目錄不存在
streamingContext.checkpoint("hdfs://bigdata121:9000/sparkCheckpoint/spark-streaming3")
?
文件流:textFileStream
套接字流:socketTextStream/sockeStream,前面已經(jīng)講過(guò)例子,這里不重復(fù)
RDD隊(duì)列流:queueStream
1、textFileStream
通過(guò)監(jiān)控文件系統(tǒng)的變化,若有新文件添加,則將它讀入并作為數(shù)據(jù)流
需要注意的是:
這些文件具有相同的格式
這些文件通過(guò)原子移動(dòng)或重命名文件的方式在dataDirectory創(chuàng)建
如果在文件中追加內(nèi)容,這些追加的新數(shù)據(jù)也不會(huì)被讀取。
例子:
package SparkStreamExer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingFromFile {
def main(args: Array[String]): Unit = {
//設(shè)置日志級(jí)別為ERROR,默認(rèn)是INFO
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("spark window operation").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(4))
val fileStream = ssc.textFileStream("G:\\test\\teststreaming")
fileStream.print()
ssc.start()
ssc.awaitTermination()
}
}
//==========================================================
2、queueStream
RDD隊(duì)列流是從一個(gè)隊(duì)列中讀取RDD
例子:
package SparkStreamExer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object StreamingFromRDDQueue {
def main(args: Array[String]): Unit = {
//設(shè)置日志級(jí)別為ERROR,默認(rèn)是INFO
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("spark streaming rdd queue").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(4))
//創(chuàng)建隊(duì)列
val rddQueue = new mutable.Queue[RDD[Int]]()
//隊(duì)列中添加rdd
for (x<- 1 to 3) {
rddQueue += ssc.sparkContext.makeRDD(1 to 10)
}
//從隊(duì)列讀取rdd
val queueRdd = ssc.queueStream(rddQueue).map(_*2)
queueRdd.print()
ssc.start()
ssc.awaitTermination()
}
}
高級(jí)數(shù)據(jù)源一般在生產(chǎn)中比較常用,很少使用spark直接監(jiān)控?cái)?shù)據(jù)的。常用的高級(jí)數(shù)據(jù)源有Kafka,Flume,Kinesis,Twitter等等。下面主要講解flume
1、flume推送數(shù)據(jù)到計(jì)算節(jié)點(diǎn)
(1)首先配置flume的agent配置文件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
# 監(jiān)控目錄
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/opt/modules/apache-flume-1.8.0-bin/logs/.*
a1.sources.r1.fileHeader=true
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
# 我這里是在ide中直接運(yùn)行spark程序,所以flume數(shù)據(jù)直接推導(dǎo)windows主機(jī)上
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.50.1
a1.sinks.k1.port=1234
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
(2)spark代碼
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>SparkDemo</groupId>
<artifactId>SparkDemoTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.1.0</spark.version>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.1.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.12</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-configuration -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>1.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume -->
<!--這里是spark從flume讀取數(shù)據(jù)的依賴,不要忘了-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume-sink -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
</dependencies>
<!--下面這是maven打包scala的插件,一定要,否則直接忽略scala代碼-->
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
依賴這里,方便起見(jiàn),直接添加flume和spark的全部依賴,自己到maven的官方庫(kù)上搜索,然后添加就可以。接著最重要的是 spark使用flume的依賴的spark-streaming-flume 這個(gè)包,不要漏了。如果在集群中運(yùn)行,記得將這個(gè)包放到spark的jars目錄下
代碼:
package SparkStreamExer
import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingFromFlume {
def main(args: Array[String]): Unit = {
//設(shè)置日志級(jí)別為ERROR,默認(rèn)是INFO
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("spark streaming from flume").setMaster("local[2]")
conf.registerKryoClasses(Array())
val ssc = new StreamingContext(conf, Seconds(4))
//創(chuàng)建flumeevent,接收從flume push來(lái)的數(shù)據(jù)
val flumeDStream = FlumeUtils.createStream(ssc, "192.168.50.1", 1234, StorageLevel.MEMORY_ONLY)
val eventDStream = flumeDStream.map(event => {
(event.event.getHeaders.toString,new String(event.event.getBody.array()))
})
eventDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
(3)啟動(dòng):
先啟動(dòng)spark程序,直接在ide中運(yùn)行。
接著啟動(dòng)flume:flume-ng agent --conf conf --name a1 --conf-file conf/flume-spark.properties -Dflume.root.logger=INFO,console
然后自己在監(jiān)控目錄下修改文件,或者添加文件。
接著查看ide中輸出的數(shù)據(jù)
2、spark從flume拉取數(shù)據(jù)
這種方式比起第一種方式要更加靈活,可擴(kuò)展性高。
(1)flume配置文件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/opt/modules/apache-flume-1.8.0-bin/logs/.*
a1.sources.r1.fileHeader=true
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
# 這里使用spark自己實(shí)現(xiàn)的一個(gè)sink
a1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=192.168.50.121
a1.sinks.k1.port=1234
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
另外,需要將spark-streaming-flume-sink_2.11-2.1.0.jar 這個(gè)jar包添加到flume的lib目錄下,這是上面使用的SparkSink所在的jar包??梢宰约涸趇dea中添加這個(gè)依賴,然后下載,接著到本地倉(cāng)庫(kù)目錄復(fù)制到flume的lib下。
(2)代碼
pom.xml
和上面類似,只是多了
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>2.1.0</version>
</dependency>
這個(gè)依賴
代碼:
package SparkStreamExer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object FromFlumePull {
def main(args: Array[String]): Unit = {
//設(shè)置日志級(jí)別為ERROR,默認(rèn)是INFO
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("flume through spark sink").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(4))
//創(chuàng)建 poll streaming,從flume拉取數(shù)據(jù)到本地處理
val flumePollingStream = FlumeUtils.createPollingStream(ssc, "bigdata121", 1234, StorageLevel.MEMORY_ONLY)
/**
* 這里要注意:
* event.event.getBody.array() 不要直接 toString,解析處理的字符串只是[class name]@[hashCode]的形式
* 應(yīng)該用 New string(event.event.getBody.array()) 這樣會(huì)根據(jù)默認(rèn)編解碼規(guī)則給bytes字符串解碼
* 因?yàn)閭鬏斶^(guò)來(lái)的是bytes數(shù)據(jù)
*/
flumePollingStream.map(event=>{
(event.event.getHeaders.toString, new String(event.event.getBody.array()))
}).print()
ssc.start()
ssc.awaitTermination()
}
}
(3)啟動(dòng)
啟動(dòng)方式和上面類似,這里不重復(fù)。
(4)遇到的問(wèn)題
問(wèn)題1:
已經(jīng)將spark-streaming-flume-sink_2.11.jar包放到flume的lib目錄下,flume的agent啟動(dòng)時(shí)報(bào)錯(cuò):
29 Aug 2019 17:59:31,838 WARN [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80) - Error while processing transaction.
java.lang.IllegalStateException: begin() called when transaction is OPEN!
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114)
at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
重點(diǎn)在:java.lang.IllegalStateException: begin() called when transaction is OPEN!
有可能是flume的一些jar包的問(wèn)題,具體還不清楚。
屢次報(bào)這個(gè)錯(cuò),最后看了看flume的lib下的scala包
scala-library-2.10.5.jar
是這個(gè)版本,我放進(jìn)去的sparksink包是基于 scala 2.11.8的,所以我在想是不是scala library包版本不對(duì),所以從spark的jar目錄下拷貝scala-library-2.11.8.jar 這個(gè)包過(guò)去flume下,將原來(lái)的重命名,不讓flume使用舊的。
接著重新啟動(dòng)flume agent,正常運(yùn)行。
所以這個(gè)問(wèn)題是因?yàn)橐蕾嚢姹静粚?duì)應(yīng)的問(wèn)題發(fā)生的。
問(wèn)題2:
讀取body的時(shí)候,直接toString和new String有區(qū)別,前者亂碼,后者還原原本字符串
toString()與new String ()用法區(qū)別
str.toString是調(diào)用了b這個(gè)object對(duì)象的類的toString方法。一般是返回這么一個(gè)String:[class name]@[hashCode]。
new String(str)是根據(jù)parameter是一個(gè)字節(jié)數(shù)組,使用Java虛擬機(jī)默認(rèn)的編碼格式,將這個(gè)字節(jié)數(shù)組decode為對(duì)應(yīng)的字符。若虛擬機(jī)默認(rèn)的編碼格式是ISO-8859-1,按照ascii編碼表即可得到字節(jié)對(duì)應(yīng)的字符。
什么時(shí)候用什么方法呢?
new String()一般使用字符轉(zhuǎn)碼的時(shí)候,byte[]數(shù)組的時(shí)候
toString()將對(duì)象打印的時(shí)候使用
分享題目:五、spark--sparkstreaming原理和使用
轉(zhuǎn)載注明:http://aaarwkj.com/article32/ipoipc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供商城網(wǎng)站、虛擬主機(jī)、手機(jī)網(wǎng)站建設(shè)、、Google、企業(yè)建站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)