欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

如何掌握Table與DataStream之間的互轉(zhuǎn)

本篇內(nèi)容介紹了“如何掌握Table與DataStream之間的互轉(zhuǎn)”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

創(chuàng)新互聯(lián)主營鎮(zhèn)江網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,重慶APP開發(fā),鎮(zhèn)江h(huán)5微信小程序搭建,鎮(zhèn)江網(wǎng)站營銷推廣歡迎鎮(zhèn)江等地區(qū)企業(yè)咨詢

一、將kafka作為輸入流

kafka 的連接器 flink-kafka-connector 中,1.10 版本的已經(jīng)提供了 Table API 的支持。我們可以在  connect方法中直接傳入一個叫做 Kafka 的類,這就是 kafka 連接器的描述器ConnectorDescriptor。

準(zhǔn)備數(shù)據(jù):

1,語數(shù) 2,英物 3,化生 4,文學(xué) 5,語理 6,學(xué)物

創(chuàng)建kafka主題

./kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic FlinkSqlTest

通過命令行的方式啟動一個生產(chǎn)者

[root@node01 bin]# ./kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic FlinkSqlTest >1,語數(shù) >2,英物  >3,化生 >4,文學(xué) >5,語理\ >6,學(xué)物

編寫Flink代碼連接到kafka

import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, Kafka, Schema}  /**  * @Package  * @author 大數(shù)據(jù)老哥  * @date 2020/12/17 0:35  * @version V1.0  */  object FlinkSQLSourceKafka {   def main(args: Array[String]): Unit = {     // 獲取流處理的運行環(huán)境     val env = StreamExecutionEnvironment.getExecutionEnvironment     // 獲取table的運行環(huán)境     val tableEnv = StreamTableEnvironment.create(env)     tableEnv.connect(       new Kafka()         .version("0.11")  // 設(shè)置kafka的版本           .topic("FlinkSqlTest") // 設(shè)置要連接的主題         .property("zookeeper.connect","node01:2181,node02:2181,node03:2181")  //設(shè)置zookeeper的連接地址跟端口號         .property("bootstrap.servers","node01:9092,node02:9092,node03:9092") //設(shè)置kafka的連接地址跟端口號     ).withFormat(new Csv()) // 設(shè)置格式       .withSchema(new Schema()  // 設(shè)置元數(shù)據(jù)信息         .field("id",DataTypes.STRING())         .field("name",DataTypes.STRING())       ).createTemporaryTable("kafkaInputTable") // 創(chuàng)建臨時表      //定義要查詢的sql語句     val result = tableEnv.sqlQuery("select * from  kafkaInputTable ")     //打印數(shù)據(jù)     result.toAppendStream[(String,String)].print()     // 開啟執(zhí)行     env.execute("source kafkaInputTable")   } }

運行結(jié)果圖

如何掌握Table與DataStream之間的互轉(zhuǎn)

當(dāng)然也可以連接到 ElasticSearch、MySQL、HBase、Hive 等外部系統(tǒng),實現(xiàn)方式基本上是類似的。

二、表的查詢

利用外部系統(tǒng)的連接器 connector,我們可以讀寫數(shù)據(jù),并在環(huán)境的 Catalog 中注冊表。接下來就可以對表做查詢轉(zhuǎn)換了。Flink  給我們提供了兩種查詢方式:Table API 和 SQL。

三、Table API 的調(diào)用

Table API 是集成在 Scala 和 Java 語言內(nèi)的查詢 API。與 SQL 不同,Table API  的查詢不會用字符串表示,而是在宿主語言中一步一步調(diào)用完成的。 Table API 基于代表一張表的 Table 類,并提供一整套操作處理的方法  API。這些方法會返回一個新的 Table 對象,這個對象就表示對輸入表應(yīng)用轉(zhuǎn)換操作的結(jié)果。有些關(guān)系型轉(zhuǎn)換操作,可以由多個方法調(diào)用組成,構(gòu)成鏈?zhǔn)秸{(diào)用結(jié)構(gòu)。例如  table.select(…).filter(…) ,其中 select(…)  表示選擇表中指定的字段,filter(…)表示篩選條件。代碼中的實現(xiàn)如下:

val kafkaInputTable = tableEnv.from("kafkaInputTable")    kafkaInputTable.select("*")      .filter('id !=="1")

四、SQL查詢

Flink 的 SQL 集成,基于的是 ApacheCalcite,它實現(xiàn)了 SQL 標(biāo)準(zhǔn)。在 Flink 中,用常規(guī)字符串來定義 SQL  查詢語句。SQL 查詢的結(jié)果,是一個新的 Table。

代碼實現(xiàn)如下:

val result = tableEnv.sqlQuery("select * from  kafkaInputTable ")

當(dāng)然,也可以加上聚合操作,比如我們統(tǒng)計每個用戶的個數(shù)

調(diào)用 table API

val result: Table = tableEnv.from("kafkaInputTable")        result.groupBy("user")        .select('name,'name.count as 'count)

調(diào)用SQL

val result = tableEnv.sqlQuery("select  name ,count(1) as count from kafkaInputTable group by name ")

這里 Table API 里指定的字段,前面加了一個單引號’,這是 Table API 中定義的  Expression類型的寫法,可以很方便地表示一個表中的字段。  字段可以直接全部用雙引號引起來,也可以用半邊單引號+字段名的方式。以后的代碼中,一般都用后一種形式。

五、將DataStream 轉(zhuǎn)成Table

Flink 允許我們把 Table 和DataStream 做轉(zhuǎn)換:我們可以基于一個 DataStream,先流式地讀取數(shù)據(jù)源,然后 map  成樣例類,再把它轉(zhuǎn)成 Table。Table 的列字段(column fields),就是樣例類里的字段,這樣就不用再麻煩地定義 schema 了。

5.1、代碼實現(xiàn)

代碼中實現(xiàn)非常簡單,直接用 tableEnv.fromDataStream() 就可以了。默認(rèn)轉(zhuǎn)換后的 Table schema 和 DataStream  中的字段定義一一對應(yīng),也可以單獨指定出來。

這就允許我們更換字段的順序、重命名,或者只選取某些字段出來,相當(dāng)于做了一次 map 操作(或者 Table API 的 select 操作)。

代碼具體如下:

import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala._  /**  * @Package  * @author 大數(shù)據(jù)老哥  * @date 2020/12/17 21:21  * @version V1.0  */ object FlinkSqlReadFileTable {    def main(args: Array[String]): Unit = {     // 構(gòu)建流處理運行環(huán)境     val env = StreamExecutionEnvironment.getExecutionEnvironment     // 構(gòu)建table運行環(huán)境     val tableEnv = StreamTableEnvironment.create(env)     // 使用流處理來讀取數(shù)據(jù)     val readData = env.readTextFile("./data/word.txt")     // 使用flatMap進(jìn)行切分     val word: DataStream[String] = readData.flatMap(_.split(" "))     // 將word 轉(zhuǎn)為 table     val table = tableEnv.fromDataStream(word)     // 計算wordcount     val wordCount = table.groupBy("f0").select('f0, 'f0.count as 'count)     wordCount.printSchema()     //轉(zhuǎn)換成流處理打印輸出     tableEnv.toRetractStream[(String,Long)](wordCount).print()     env.execute("FlinkSqlReadFileTable")   } }

5.2 數(shù)據(jù)類型與 Table schema 的對應(yīng)

DataStream 中的數(shù)據(jù)類型,與表的 Schema之間的對應(yīng)關(guān)系,是按照樣例類中的字段名來對應(yīng)的(name-based  mapping),所以還可以用 as 做重命名。

另外一種對應(yīng)方式是,直接按照字段的位置來對應(yīng)(position-based mapping),對應(yīng)的過程中,就可以直接指定新的字段名了。

基于名稱的對應(yīng):

val userTable = tableEnv.fromDataStream(dataStream,'username as 'name,'id as 'myid)

基于位置的對應(yīng):

val userTable = tableEnv.fromDataStream(dataStream, 'name, 'id)

Flink 的 DataStream 和 DataSet API 支持多種類型。組合類型,比如元組(內(nèi)置 Scala 和 Java  元組)、POJO、Scala case 類和 Flink 的 Row 類型等,允許具有多個字段的嵌套數(shù)據(jù)結(jié)構(gòu),這些字段可以在 Table  的表達(dá)式中訪問。其他類型,則被視為原子類型。

元組類型和原子類型,一般用位置對應(yīng)會好一些;如果非要用名稱對應(yīng),也是可以的:元組類型,默認(rèn)的名稱是_1, _2;而原子類型,默認(rèn)名稱是 f0。

六、創(chuàng)建臨時視圖(Temporary View)

創(chuàng)建臨時視圖的第一種方式,就是直接從 DataStream 轉(zhuǎn)換而來。同樣,可以直接對應(yīng)字段轉(zhuǎn)換;也可以在轉(zhuǎn)換的時候,指定相應(yīng)的字段。代碼如下:

tableEnv.createTemporaryView("sensorView", dataStream)  tableEnv.createTemporaryView("sensorView", dataStream, 'id, 'temperature,'timestamp as 'ts)

另外,當(dāng)然還可以基于 Table 創(chuàng)建視圖:

tableEnv.createTemporaryView("sensorView", sensorTable)

View 和 Table 的 Schema 完全相同。事實上,在 Table API 中,可以認(rèn)為 View 和 Table是等價的。

“如何掌握Table與DataStream之間的互轉(zhuǎn)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

新聞名稱:如何掌握Table與DataStream之間的互轉(zhuǎn)
標(biāo)題鏈接:http://aaarwkj.com/article40/gdgoho.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站排名、網(wǎng)站維護(hù)服務(wù)器托管、網(wǎng)站內(nèi)鏈品牌網(wǎng)站建設(shè)、網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化
亚洲熟妇精品一区二区三区| 日韩精品视频一二三区| 日本一区二区三区高清不卡| 大龄熟妇丰满有水多毛浓| 日韩性生活视频免费播放| 午夜福利视频在线观看| 人妻在线中文字幕一区| 亚州欧美精品一区二区| 91久久国产综合精品女同| 亚洲成在人线免费观看| 日本a亚洲中文字幕永远| 国产av蜜臀一区二区三区| 一区二区亚洲免费的视频| 国产三级精品三级在线专区1| 亚洲福利视频在线观看免费 | 午夜影院在线观看网站| 成年人收看黄色一二级片 | 精品国产一区二区三区卡| 日本高清免费黄色录像| 日本人妻精品一区二区| 日韩精品一区二区三区四区蜜桃 | 清纯少妇激情四射网站| 欧美日韩精品综合国产| 欧美视频亚洲视频自拍视频| 成人爱爱免费观看视频| 国产情侣自拍视频在线观看| 亚洲天堂av在线有码| 欧美精品亚洲精品日韩| 国产精品一区二区三区欧美| 91大神午夜在线观看| 免费看av网站一区二区| 中文字幕久久一区二区三区| 国产龙熟女高潮一区二区| 亚洲香蕉一区二区免费| 亚洲成人影院中文字幕| 日韩三级精品一区二区| 亚洲欧美日韩成人在线观看| 精品视频日韩在线观看| 日韩欧美精品久久黄| 亚洲综合偷拍欧美一区色| 欧美黄片网站在线观看|