這篇文章主要講解了“SparkStreaming的實(shí)現(xiàn)和使用方法”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“SparkStreaming的實(shí)現(xiàn)和使用方法”吧!
堆龍德慶ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書未來(lái)市場(chǎng)廣闊!成為創(chuàng)新互聯(lián)公司的ssl證書銷售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:028-86922220(備注:SSL證書合作)期待與您的合作!生產(chǎn)中使用多的是一個(gè)文件中有很多域名,另一個(gè)中是黑名單,要進(jìn)行剔除 數(shù)據(jù)一:日志信息 DStream domain,traffic xinlang.com xinlang.com baidu.com 數(shù)據(jù)二:已有的文件 黑名單 RDD domain baidu.com
package sparkstreaming02 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ListBuffer object Demo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Demo1").setMaster("local[2]") val sc = new SparkContext(conf) val input1 = new ListBuffer[(String,Long)] input1.append(("www.xinlang.com", 8888)) input1.append(("www.xinalng.com", 9999)) input1.append(("www.baidu.com", 7777)) val data1 = sc.parallelize(input1) //進(jìn)行join一定要是key,value形式的 val input2 = new ListBuffer[(String,Boolean)] input2.append(("www.baidu.com",true)) val data2 = sc.parallelize(input2) data1.leftOuterJoin(data2) .filter(x => { x._2._2.getOrElse(false) != true }).map(x => (x._1,x._2._1)) .collect().foreach(println) } }
package sparkstreaming02 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable.ListBuffer object Streaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Streaming").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(10)) val lines = ssc.socketTextStream("s201",9999) // 數(shù)據(jù)二: rdd val input2 = new ListBuffer[(String,Boolean)] input2.append(("www.baidu.com",true)) val data2 = ssc.sparkContext.parallelize(input2) lines.map(x=>(x.split(",")(0), x)).transform( rdd => { rdd.leftOuterJoin(data2) .filter(x => { x._2._2.getOrElse(false) != true //注意 join之后過(guò)濾 }).map(x => (x._1,x._2._1)) } ).print() ssc.start() ssc.awaitTermination() } }
、
connect 在Driver端創(chuàng)建,record在executor,發(fā)過(guò)去序列化錯(cuò)誤
解決:第一種把connect放到executor端 這樣弊端是每條記錄會(huì)生成一個(gè)connect太耗費(fèi)資源 words.foreachRDD { rdd => rdd.foreach { record => val connection = createConnection() // executed at the driver val word = record._1 val count = record._2.toInt val sql = s"insert into wc (wc,count) values($word,$count)" connection.createStatement().execute(sql) }
package sparkstreaming02 import java.sql.DriverManager import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object MysqlStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[2]").setAppName("MysqlStreaming") val ssc = new StreamingContext(conf,Seconds(1)) val lines = ssc.socketTextStream("s201",9999) val words = lines.flatMap(x => x.split(",")).map((_,1)).reduceByKey(_+_) // words.foreachRDD { rdd => // val connection = createConnection() // executed at the driver // rdd.foreach { record => // val word = record._1 // val count = record._2 // val sql = s"insert into wc (word,count) values($word,$count)" // connection.createStatement().execute(sql) // } // } // words.foreachRDD { rdd => // rdd.foreach { record => // val connection = createConnection() // executed at the driver // val word = record._1 // val count = record._2.toInt // val sql = s"insert into wc (wc,count) values($word,$count)" // connection.createStatement().execute(sql) // } // } //最終的寫法 words.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createConnection() partitionOfRecords.foreach( record =>{ val word = record._1 val count = record._2 val sql = s"insert into wc (wc,count) values('$word',$count)" connection.createStatement().execute(sql)} ) } } ssc.start() ssc.awaitTermination() } def createConnection() = { Class.forName("com.mysql.cj.jdbc.Driver") DriverManager.getConnection("jdbc:mysql://localhost:3306/hive?serverTimezone=UTC&useSSL=false","root","123456") } }
錯(cuò)誤,插入數(shù)據(jù)庫(kù)時(shí),你要插入字符串要用'' 例如: val sql = s"insert into wc (wc,count) values($word,$count)" word是字符串,你要不加雙引號(hào)就報(bào)這個(gè)錯(cuò)誤 正確 val sql = s"insert into wc (wc,count) values('$word',$count)"
感謝各位的閱讀,以上就是“SparkStreaming的實(shí)現(xiàn)和使用方法”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)SparkStreaming的實(shí)現(xiàn)和使用方法這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
網(wǎng)頁(yè)名稱:SparkStreaming的實(shí)現(xiàn)和使用方法-創(chuàng)新互聯(lián)
文章路徑:http://aaarwkj.com/article34/ccjese.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號(hào)、微信小程序、網(wǎng)站排名、域名注冊(cè)、網(wǎng)頁(yè)設(shè)計(jì)公司、用戶體驗(yàn)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容