欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

SparkStreaming算子開發(fā)的示例分析

這篇文章給大家分享的是有關(guān)Spark Streaming算子開發(fā)的示例分析的內(nèi)容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。

成都創(chuàng)新互聯(lián)是一家專注于成都做網(wǎng)站、成都網(wǎng)站設(shè)計與策劃設(shè)計,秭歸網(wǎng)站建設(shè)哪家好?成都創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設(shè)10余年,網(wǎng)設(shè)計領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:秭歸等地區(qū)。秭歸做網(wǎng)站價格咨詢:13518219792

Spark Streaming算子開發(fā)實例

transform算子開發(fā)

transform操作應(yīng)用在DStream上時,可以用于執(zhí)行任意的RDD到RDD的轉(zhuǎn)換操作,還可以用于實現(xiàn)DStream API中所沒有提供的操作,比如說,DStreamAPI中并沒有提供將一個DStream中的每個batch,與一個特定的RDD進(jìn)行join的操作,DStream中的join算子只能join其他DStream,但是我們自己就可以使用transform操作來實現(xiàn)該功能。

實例:黑名單用戶實時過濾

package StreamingDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 實時黑名單過濾
 */
object TransformDemo {
 def main(args: Array[String]): Unit = {
  //設(shè)置日志級別
  Logger.getLogger("org").setLevel(Level.WARN)
  val conf = new SparkConf()
   .setAppName(this.getClass.getSimpleName)
   .setMaster("local[2]")
  val ssc = new StreamingContext(conf, Seconds(2))

  //創(chuàng)建一個黑名單的RDD
  val blackRDD =
   ssc.sparkContext.parallelize(Array(("zs", true), ("lisi", true)))

  //通過socket從nc中獲取數(shù)據(jù)
  val linesDStream = ssc.socketTextStream("Hadoop01", 6666)

  /**
   * 過濾黑名單用戶發(fā)言
   * zs sb sb sb sb
   * lisi fuck fuck fuck
   * jack hello
   */
  linesDStream
   .map(x => {
    val info = x.split(" ")
    (info(0), info.toList.tail.mkString(" "))
   })
   .transform(rdd => { //transform是一個RDD->RDD的操作,所以返回值必須是RDD
    /**
     * 經(jīng)過leftouterjoin操作之后,產(chǎn)生的結(jié)果如下:
     * (zs,(sb sb sb sb),Some(true)))
     * (lisi,(fuck fuck fuck),some(true)))
     * (jack,(hello,None))
     */
    val joinRDD = rdd.leftOuterJoin(blackRDD)

    //如果是Some(true)的,說明就是黑名單用戶,如果是None的,說明不在黑名單內(nèi),把非黑名單的用戶保留下來
    val filterRDD = joinRDD.filter(x => x._2._2.isEmpty)

    filterRDD
   })
   .map(x=>(x._1,x._2._1)).print()

  ssc.start()
  ssc.awaitTermination()
 }
}

測試

啟動nc,傳入用戶及其發(fā)言信息

Spark Streaming算子開發(fā)的示例分析

可以看到程序?qū)崟r的過濾掉了在黑名單里的用戶發(fā)言

Spark Streaming算子開發(fā)的示例分析

updateStateByKey算子開發(fā)

updateStateByKey算子可以保持任意狀態(tài),同時不斷有新的信息進(jìn)行更新,這個算子可以為每個key維護(hù)一份state,并持續(xù)不斷的更新state。對于每個batch來說,Spark都會為每個之前已經(jīng)存在的key去應(yīng)用一次State更新函數(shù),無論這個key在batch中是否有新的值,如果State更新函數(shù)返回的值是none,那么這個key對應(yīng)的state就會被刪除;對于新出現(xiàn)的key也會執(zhí)行state更新函數(shù)。

要使用該算子,必須進(jìn)行兩個步驟

  • 定義state——state可以是任意的數(shù)據(jù)類型

  • 定義state更新函數(shù)——用一個函數(shù)指定如何使用之前的狀態(tài),以及從輸入流中獲取新值更新狀態(tài)

注意:updateStateByKey操作,要求必須開啟Checkpoint機(jī)制

實例:基于緩存的實時WordCount

package StreamingDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 基于緩存的實時WordCount,在全局范圍內(nèi)統(tǒng)計單詞出現(xiàn)次數(shù)
 */
object UpdateStateByKeyDemo {
 def main(args: Array[String]): Unit = {
  //設(shè)置日志級別
  Logger.getLogger("org").setLevel(Level.WARN)

  /**
   * 如果沒有啟用安全認(rèn)證或者從Kerberos獲取的用戶為null,那么獲取HADOOP_USER_NAME環(huán)境變量,
   * 并將它的值作為Hadoop執(zhí)行用戶設(shè)置hadoop username
   * 這里實驗了一下在沒有啟用安全認(rèn)證的情況下,就算不顯式添加,也會自動獲取我的用戶名
   */
  //System.setProperty("HADOOP_USER_NAME","Setsuna")

  val conf = new SparkConf()
   .setAppName(this.getClass.getSimpleName)
   .setMaster("local[2]")
  val ssc = new StreamingContext(conf, Seconds(2))

  //設(shè)置Checkpoint存放的路徑
  ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")

  //創(chuàng)建輸入DStream
  val lineDStream = ssc.socketTextStream("Hadoop01", 6666)
  val wordDStream = lineDStream.flatMap(_.split(" "))
  val pairsDStream = wordDStream.map((_, 1))

  /**
   * state:代表之前的狀態(tài)值
   * values:代表當(dāng)前batch中key對應(yīng)的values值
   */
  val resultDStream =
   pairsDStream.updateStateByKey((values: Seq[Int], state: Option[Int]) => {

    //當(dāng)state為none,表示沒有對這個單詞做統(tǒng)計,則返回0值給計數(shù)器count
    var count = state.getOrElse(0)

    //遍歷values,累加新出現(xiàn)的單詞的value值
    for (value <- values) {
     count += value
    }

    //返回key對應(yīng)的新state,即單詞的出現(xiàn)次數(shù)
    Option(count)
   })

  //在控制臺輸出
  resultDStream.print()

  ssc.start()
  ssc.awaitTermination()
 }
}

測試

開啟nc,輸入單詞

Spark Streaming算子開發(fā)的示例分析

控制臺實時輸出的結(jié)果

Spark Streaming算子開發(fā)的示例分析

window滑動窗口算子開發(fā)

Spark Streaming提供了滑動窗口操作的支持,可以對一個滑動窗口內(nèi)的數(shù)據(jù)執(zhí)行計算操作
在滑動窗口中,包含批處理間隔、窗口間隔、滑動間隔

  • 對于窗口操作而言,在其窗口內(nèi)部會有N個批處理數(shù)據(jù)

  • 批處理數(shù)據(jù)的大小由窗口間隔決定,而窗口間隔指的就是窗口的持續(xù)時間,也就是窗口的長度

  • 滑動時間間隔指的是經(jīng)過多長時間窗口滑動一次,形成新的窗口,滑動間隔默認(rèn)情況下和批處理時間間隔的相同

注意:滑動時間間隔和窗口時間間隔的大小一定得設(shè)置為批處理間隔的整數(shù)倍

用一個官方的圖來作為說明

Spark Streaming算子開發(fā)的示例分析

批處理間隔是1個時間單位,窗口間隔是3個時間單位,滑動間隔是2個時間單位。對于初始的窗口time1-time3,只有窗口間隔滿足了才觸發(fā)數(shù)據(jù)的處理。所以滑動窗口操作都必須指定兩個參數(shù),窗口長度和滑動時間間隔。在Spark Streaming中對滑動窗口的支持是比Storm更加完善的。

Window滑動算子操作

算子描述
window()對每個滑動窗口的數(shù)據(jù)執(zhí)行自定義的計算
countByWindow()對每個滑動窗口的數(shù)據(jù)執(zhí)行count操作
reduceByWindow()對每個滑動窗口的數(shù)據(jù)執(zhí)行reduce操作
reduceByKeyAndWindow()對每個滑動窗口的數(shù)據(jù)執(zhí)行reduceByKey操作
countByValueAndWindow()對每個滑動窗口的數(shù)據(jù)執(zhí)行countByValue操作

reduceByKeyAndWindow算子開發(fā)

實例:在線熱點搜索詞實時滑動統(tǒng)計

每隔2秒鐘,統(tǒng)計最近5秒鐘的搜索詞中排名最靠前的3個搜索詞以及出現(xiàn)次數(shù)

package StreamingDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 需求:每隔2秒鐘,統(tǒng)計最近5秒鐘的搜索詞中排名最靠前的3個搜索詞以及出現(xiàn)次數(shù)
 */
object ReduceByKeyAndWindowDemo {
 def main(args: Array[String]): Unit = {

  //設(shè)置日志級別
  Logger.getLogger("org").setLevel(Level.WARN)
  //基礎(chǔ)配置
  val conf = new SparkConf()
   .setAppName(this.getClass.getSimpleName)
   .setMaster("local[2]")

  //批處理間隔設(shè)置為1s
  val ssc = new StreamingContext(conf, Seconds(1))

  val linesDStream = ssc.socketTextStream("Hadoop01", 6666)
  linesDStream
   .flatMap(_.split(" ")) //根據(jù)空格來做分詞
   .map((_, 1)) //返回(word,1)
   .reduceByKeyAndWindow(
    //定義窗口如何計算的函數(shù)
    //x代表的是聚合后的結(jié)果,y代表的是這個Key對應(yīng)的下一個需要聚合的值
    (x: Int, y: Int) => x + y,
    //窗口長度為5秒
    Seconds(5),
    //窗口時間間隔為2秒
    Seconds(2)
   )
   .transform(rdd => { //transform算子對rdd做處理,轉(zhuǎn)換為另一個rdd
    //根據(jù)Key的出現(xiàn)次數(shù)來進(jìn)行排序,然后降序排列,獲取最靠前的3個搜索詞
    val info: Array[(String, Int)] = rdd.sortBy(_._2, false).take(3)
    //將Array轉(zhuǎn)換為resultRDD
    val resultRDD = ssc.sparkContext.parallelize(info)
    resultRDD
   })
   .map(x => s"${x._1}出現(xiàn)的次數(shù)是:${x._2}")
   .print()

  ssc.start()
  ssc.awaitTermination()

 }
}

測試結(jié)果

Spark Streaming算子開發(fā)的示例分析

DStream Output操作概覽

Spark Streaming允許DStream的數(shù)據(jù)輸出到外部系統(tǒng),DSteram中的所有計算,都是由output操作觸發(fā)的,foreachRDD輸出操作,也必須在里面對RDD執(zhí)行action操作,才能觸發(fā)對每一個batch的計算邏輯。

轉(zhuǎn)換描述
print()在Driver中打印出DStream中數(shù)據(jù)的前10個元素。主要用于測試,或者是不需要執(zhí)行什么output操作時,用于簡單觸發(fā)一下job。
saveAsTextFiles(prefix,
 [suffix])
將DStream中的內(nèi)容以文本的形式保存為文本文件,其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsObjectFiles(prefix
 , [suffix])
將DStream中的內(nèi)容按對象序列化并且以SequenceFile的格式保存。其中每次批處理間隔內(nèi)產(chǎn)生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。
saveAsHadoopFiles(pref
 ix, [suffix])
將DStream中的內(nèi)容以文本的形式保存為Hadoop文件,其中每次批處理間隔內(nèi)產(chǎn)生的文件
 以prefix-TIME_IN_MS[.suffix]的方式命名。
foreachRDD(func)最基本的輸出操作,將func函數(shù)應(yīng)用于DStream中的RDD上,這個操作會輸出數(shù)據(jù)到外部系
 統(tǒng),比如保存RDD到文件或者網(wǎng)絡(luò)數(shù)據(jù)庫等。需要注意的是func函數(shù)是在運行該streaming
 應(yīng)用的Driver進(jìn)程里執(zhí)行的。

foreachRDD算子開發(fā)

foreachRDD是最常用的output操作,可以遍歷DStream中的每個產(chǎn)生的RDD并進(jìn)行處理,然后將每個RDD中的數(shù)據(jù)寫入外部存儲,如文件、數(shù)據(jù)庫、緩存等,通常在其中針對RDD執(zhí)行action操作,比如foreach

使用foreachRDD操作數(shù)據(jù)庫

通常在foreachRDD中都會創(chuàng)建一個Connection,比如JDBC Connection,然后通過Connection將數(shù)據(jù)寫入外部存儲

誤區(qū)一:在RDD的foreach操作外部創(chuàng)建Connection

dstream.foreachRDD { rdd =>
  val connection=createNewConnection()
  rdd.foreach { record => connection.send(record)
  }
}

這種方式是錯誤的,這樣的方式會導(dǎo)致Connection對象被序列化后被傳輸?shù)矫恳粋€task上,但是Connection對象是不支持序列化的,所以也就無法被傳輸

誤區(qū)二:在RDD的foreach操作內(nèi)部創(chuàng)建Connection

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

這種方式雖然是可以的,但是執(zhí)行效率會很低,因為它會導(dǎo)致對RDD中的每一條數(shù)據(jù)都創(chuàng)建一個Connection對象,通常Connection對象的創(chuàng)建都是很消耗性能的

合理的方式

  • 第一種:使用RDD的foreachPartition操作,并且在該操作內(nèi)部創(chuàng)建Connection對象,這樣就相當(dāng)于為RDD的每個partition創(chuàng)建一個Connection對象,節(jié)省了很多資源

  • 第二種:自己手動封裝一個靜態(tài)連接池,使用RDD的foreachPartition操作,并且在該操作內(nèi)部從靜態(tài)連接池中,通過靜態(tài)方法獲取到一個連接,連接使用完之后再放回連接池中。這樣的話,可以在多個RDD的partition之間復(fù)用連接了

實例:實時全局統(tǒng)計WordCount,并將結(jié)果保存到MySQL數(shù)據(jù)庫中

MySQL數(shù)據(jù)庫建表語句如下

CREATE TABLE wordcount (
  word varchar(100) CHARACTER SET utf8 NOT NULL,
  count int(10) NOT NULL,
  PRIMARY KEY (word)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

在IDEA中添加mysql-connector-java-5.1.40-bin.jar

Spark Streaming算子開發(fā)的示例分析

代碼如下

連接池的代碼,其實一開始有想過用靜態(tài)塊來寫個池子直接獲取,但是如果考慮到池子寬度不夠用的問題,這樣的方式其實更好,一開始,實例化一個連接池出來,被調(diào)用獲取連接,當(dāng)連接全部都被獲取了的時候,池子空了,就再實例化一個池子出來

package StreamingDemo

import java.sql.{Connection, DriverManager, SQLException}
import java.util

object JDBCManager {
 var connectionQue: java.util.LinkedList[Connection] = null

 /**
  * 從數(shù)據(jù)庫連接池中獲取連接對象
  * @return
  */
 def getConnection(): Connection = {
  synchronized({
   try {
    //如果連接池是空的,那么就實例化一個Connection類型的鏈表
    if (connectionQue == null) {
     connectionQue = new util.LinkedList[Connection]()
     for (i <- 0 until (10)) {
      //生成10個連接,并配置相關(guān)信息
      val connection = DriverManager.getConnection(
       "jdbc:mysql://Hadoop01:3306/test?characterEncoding=utf-8",
       "root",
       "root")
      //將連接push進(jìn)連接池
      connectionQue.push(connection)
     }
    }
   } catch {
    //捕獲異常并輸出
    case e: SQLException => e.printStackTrace()
   }
   //如果連接池不為空,則返回表頭元素,并將它在鏈表里刪除
   return connectionQue.poll()
  })
 }

 /**
  * 當(dāng)連接對象用完后,需要調(diào)用這個方法歸還連接
  * @param connection
  */
 def returnConnection(connection: Connection) = {
  //插入元素
  connectionQue.push(connection)
 }

 def main(args: Array[String]): Unit = {
  //main方法測試
  getConnection()
  println(connectionQue.size())
 }
}

wordcount代碼

package StreamingDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, streaming}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ForeachRDDDemo {
 def main(args: Array[String]): Unit = {
  //設(shè)置日志級別,避免INFO信息過多
  Logger.getLogger("org").setLevel(Level.WARN)

  //設(shè)置Hadoop的用戶,不加也可以
  System.setProperty("HADOOP_USER_NAME", "Setsuna")

  //Spark基本配置
  val conf = new SparkConf()
   .setAppName(this.getClass.getSimpleName)
   .setMaster("local[2]")
  val ssc = new StreamingContext(conf, streaming.Seconds(2))

  //因為要使用updateStateByKey,所以需要使用checkpoint
  ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")

  //設(shè)置socket,跟nc配置的一樣
  val linesDStream = ssc.socketTextStream("Hadoop01", 6666)
  val wordCountDStream = linesDStream
   .flatMap(_.split(" "))   //根據(jù)空格做分詞
   .map((_, 1)) //生成(word,1)
   .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
    //實時更新狀態(tài)信息
    var count = state.getOrElse(0)
    for (value <- values) {
     count += value
    }
    Option(count)
   })

  wordCountDStream.foreachRDD(rdd => {
   if (!rdd.isEmpty()) {
    rdd.foreachPartition(part => {
     //從連接池中獲取連接
     val connection = JDBCManager.getConnection()
     part.foreach(data => {
      val sql = //往wordcount表中插入wordcount信息,on duplicate key update子句是有則更新無則插入
       s"insert into wordcount (word,count) " +
        s"values ('${data._1}',${data._2}) on duplicate key update count=${data._2}"
      //使用prepareStatement來使用sql語句
      val pstmt = connection.prepareStatement(sql)
      pstmt.executeUpdate()
     })
     //在連接處提交完數(shù)據(jù)后,歸還連接到連接池
     JDBCManager.returnConnection(connection)
    })
   }
  })

  ssc.start()
  ssc.awaitTermination()
 }
}

打開nc,輸入數(shù)據(jù)

Spark Streaming算子開發(fā)的示例分析

在另一個終端對wordcount的結(jié)果進(jìn)行查詢,可以發(fā)現(xiàn)是實時發(fā)生變化的

Spark Streaming算子開發(fā)的示例分析

感謝各位的閱讀!關(guān)于“Spark Streaming算子開發(fā)的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學(xué)到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!

網(wǎng)頁名稱:SparkStreaming算子開發(fā)的示例分析
URL網(wǎng)址:http://aaarwkj.com/article22/gjoojc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供移動網(wǎng)站建設(shè)云服務(wù)器、建站公司、網(wǎng)頁設(shè)計公司、電子商務(wù)、營銷型網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

商城網(wǎng)站建設(shè)
国产福利三级在线观看| 国产亚洲加勒比久久精品| 一本久久精品午夜福利| 在线看日本十八禁网站| 国产三级精品久久三级国专区| 黄色片在线观看中文字幕| 久久精品国产亚洲av高清综合 | 婷婷国产成人久久精品激情| 91无人区一区二区三乱码 | 2023国产精品一区| 久久麻豆精亚洲av品国产一区| 亚洲精品一区av在线观看| 国产原创传媒在线观看| 国产精品一级自拍视频| 熟女人妻av五十路六十路| 日韩欧美亚洲一级黄片| 特黄特色的日本大片| 日本在线精品在线观看| 日韩欧美精品在线不卡| 精品在线免费视频观看| 九九热这里只有免费视频| 欧美另类亚洲综合久青草| 成年人免费在线观看毛片| 国产又粗又硬又长又爽在线观看 | av影片免费网址大全| 亚洲精品国产熟女av| 国产精品亚洲av三区国产毛片 | 亚洲av男人的天堂看| 国产成人免费自拍一区| 精品久久一区麻豆香蕉| 国产在线第一页第二页| 色吊丝日韩在线观看| 欧美午夜一区二区电影| 欧美欧美欧美欧美在线| 欧美精品久久久久九九九| 国自产偷精品不卡在线| 午夜视频在线观看91| 女同伦理视频在线观看| 亚洲精品国产亚洲精品| 亚洲高清成人在线观看| 国产三级自拍视频在线观看|