欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

flink怎么使用Event_time處理實時數(shù)據(jù)

本篇內(nèi)容主要講解“flink怎么使用Event_time處理實時數(shù)據(jù)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“flink怎么使用Event_time處理實時數(shù)據(jù)”吧!

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:域名與空間、網(wǎng)絡(luò)空間、營銷軟件、網(wǎng)站建設(shè)、賈汪網(wǎng)站維護、網(wǎng)站推廣。

//flink中關(guān)于時間的三個概念
//event time:數(shù)據(jù)產(chǎn)生的時間
//processing time:處理數(shù)據(jù)的時間 即操作數(shù)據(jù)的之間 比如一個flink在scala中的map函數(shù)處理數(shù)據(jù)時
//ingest time:攝取數(shù)據(jù)時間,在一個streaming程序中 一個時間段收集數(shù)據(jù)的時間
//而evet time在處理實時數(shù)據(jù)時是比較有用的,例如在由于網(wǎng)絡(luò)的繁忙的原因,某些數(shù)據(jù)未能按時到達,假設(shè)遲到了30min,
//而我們定義的最大延遲不能超過十分鐘,那么一些數(shù)據(jù)包含了超時的數(shù)據(jù)那么這些數(shù)據(jù)是不會在這次操作中處理的而是會
//丟棄掉
//kafka生產(chǎn)者代碼
package kafka.partition.test;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class PartitionProducer {
	public static void main(String[] args) {
		
		Map<String,Object> props = new HashMap<>();
		props.put("acks", "1");
		props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("bootstrap.servers", "bigdata01:9092");
		String topic = "event_time";
		
		KafkaProducer<String, String> producer = new KafkaProducer<>(props);
		for(int i = 0 ; i <= 20;i++) {	
		//flink的watermarkassginer里面定義的超時時間是5000毫秒
			long mills = System.currentTimeMillis();
			if(i%3==0) {
			//數(shù)據(jù)的event time放在字符串的開頭 以空格分割
			//kafka event_time topic的0分區(qū)超時4000毫秒
				String line = (mills-4000)+" "+"partition-0--this is a big +" +i;
				ProducerRecord< String,String> record = new ProducerRecord<String, String>(topic, new Integer(0), null, i+"", line);
				producer.send(record);
			}else if(i%3==1) {
			//kafka event_time topic的1分區(qū)超時5000毫秒
				String line = (mills-5000)+" "+"partition-1--this is a big +" +i;
				ProducerRecord< String,String> record = new ProducerRecord<String, String>(topic, new Integer(1), null, i+"", line);
				producer.send(record);
			}else if(i%3==2) {
			//kafka event_time topic的2分區(qū)超時8000毫秒
				String line = (mills-8000)+" "+"partition-2--this is a big +" +i;
				ProducerRecord< String,String> record = new ProducerRecord<String, String>(topic, new Integer(2), null, i+"", line);
				producer.send(record);
			}
		}
		
		producer.close();
	}
}
//自定義的TimestampsAndWatermarks
package flink.streaming
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
class CustomWaterMarks extends AssignerWithPeriodicWatermarks[String]{
    //超時時間
  val maxOutOrderness = 5000l
  //flink過一段時間便會調(diào)一次該函數(shù)獲取水印
  def getCurrentWatermark():Watermark ={
    val  sysMilssecons =  System.currentTimeMillis()
     new Watermark(sysMilssecons-maxOutOrderness) 
    
  }
  //每條記錄多會調(diào)用 來獲得even time 在生產(chǎn)的數(shù)據(jù)中 even_time放在字符串的第一個字段 用空格分割
  def extractTimestamp(element: String,previousElementTimestamp: Long): Long = {
   ((element.split(" ")).head).toLong
  }
}
package flink.streaming
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
object StreamWithEventTimeAndWaterMarks {
  
  def main(args: Array[String]): Unit = {
    val kafkaProps = new Properties()
    //kafka的一些屬性
    kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092")
    //所在的消費組
    kafkaProps.setProperty("group.id", "group2")
    //獲取當(dāng)前的執(zhí)行環(huán)境
    val evn = StreamExecutionEnvironment.getExecutionEnvironment
    //配制處理數(shù)據(jù)的時候使用event time
    evn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //kafka的consumer,test1是要消費的topic
    val kafkaSource = new FlinkKafkaConsumer[String]("event_time",new SimpleStringSchema,kafkaProps)
    //添加自定義的 TimestampsAndWatermarks
    kafkaSource.assignTimestampsAndWatermarks(new CustomWaterMarks)
    //設(shè)置從最新的offset開始消費
    //kafkaSource.setStartFromGroupOffsets()
    kafkaSource.setStartFromLatest()
    //自動提交offset
    kafkaSource.setCommitOffsetsOnCheckpoints(true)
    
    //flink的checkpoint的時間間隔
    //evn.enableCheckpointing(2000)
    //添加consumer
    val stream = evn.addSource(kafkaSource)
    evn.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)
    //stream.setParallelism(3)
    val text = stream.flatMap{ _.toLowerCase().split(" ").drop(1).filter { _.nonEmpty} }
          .map{(_,1)}
          .keyBy(0)
          .timeWindow(Time.seconds(5))
          .sum(1)
          .map(x=>{(x._1,(new Integer(x._2)))})
     text.print()
     //啟動執(zhí)行    
     
     //text.addSink(new Ssinks())
     
    evn.execute("kafkawd")  
    
  }
  
}
打印結(jié)果 partition-2中的數(shù)據(jù)因為超時沒有出現(xiàn)
1> (big,14)
4> (is,14)
1> (+0,1)
2> (+1,1)
3> (partition-1--this,7)
4> (+15,1)
3> (+12,1)
1> (partition-0--this,7)
3> (+6,1)
1> (+16,1)
4> (+10,1)
2> (+18,1)
4> (+7,1)
3> (+3,1)
2> (+9,1)
3> (+19,1)
2> (+13,1)
3> (a,14)
2> (+4,1)

到此,相信大家對“flink怎么使用Event_time處理實時數(shù)據(jù)”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

本文標(biāo)題:flink怎么使用Event_time處理實時數(shù)據(jù)
文章轉(zhuǎn)載:http://aaarwkj.com/article2/pjccoc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供用戶體驗品牌網(wǎng)站設(shè)計、商城網(wǎng)站、微信公眾號、動態(tài)網(wǎng)站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司
亚洲华人在线免费视频| 国产免费成人黄视频网站| 深夜福利视频一区二区| 国产91在线视频播放| 91久久精品91久久性色| 精精国产xxxx视频在线不卡| 国产在线精品91国自产拍| 日韩在线不卡播放视频| 国产欧美日韩综合一区| 日韩欧美一区二区黄色| 国产一级成人免费视频| 欧美日韩国产一区二区的| 日本道欧美一区二区aaaa| 亚洲人成网站18禁止人| 91精品婷婷国产综合| 人妻一区二区三区中文字幕| 国产91九色视频在线| 国产成人短视频在线播放| 亚洲av偷拍一区二区三区不卡| 亚洲中文乱码一区二区| 亚洲偷拍自拍在线观看| 日韩最新视频一区二区三| 91九色视频免费观看| 女同毛片一区二区三区| 亚洲女同在线免费观看| 欧美国产综合欧美一区二区三区 | 国产不卡的视频在线观看| 国产亚洲香蕉精彩视频| 日本韩国精品视频在线| 成人性生交大片免费男同| 国产三级三级三级三级| 亚洲成在人天堂一区二区| 欧美艳星一区二区三区四区| 麻豆精品国产免费av影片| 国产成av人片乱码色午夜| 日韩人妻中文字幕乱码一区| 欧美成人一区二区三区片| 日韩精品视频一二三区| 日本美女阴部毛茸茸视频| 亚洲一区二区三区av蜜桃| 日本岛国大片一区二区在线观看|