欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

怎么進(jìn)行PulsarKafkaClient的簡單分析

本篇文章給大家分享的是有關(guān)怎么進(jìn)行Pulsar Kafka Client的簡單分析,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

專注于為中小企業(yè)提供成都做網(wǎng)站、成都網(wǎng)站建設(shè)服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)渝中免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了數(shù)千家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。

為了方便 Kafka 用戶使用 Pulsar,Pulsar 對 Kafka Client 做了一些封裝,讓 Kafka 用戶更方便的使用 Pulsar。

下面主要介紹 Kafka Client 如何將消息發(fā)送到 Pulsar, 并從 Pulsar 消費消息,以及如何使用 Pulsar Schema。    

??引入依賴

<dependency>  <groupId>org.apache.pulsar</groupId>  <artifactId>pulsar-client-kafka</artifactId>  <version>{project.version}</version></dependency>
依賴引入了 Kafka 的 0.10.2.1 版本的客戶端,還有 Pulsar 對 Kafka Client 封裝后的客戶端。  

?? 使用 Kafka Schema

>>>添加生產(chǎn)者代碼

String topic = "persistent://public/default/test";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {    producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i)));}
producer.close();
在上述配置中 topic 是指 Pulsar 中的 Topic,接著使用 Kafka 的配置方式來初始化各種配置,包括 Server 地址、key 的序列化與 value 的序列化類,然后構(gòu)造一個 ProducerRecord 的類將其發(fā)送出去。  

>>> 添加消費者代碼

String topic = "persistent://public/default/test";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("group.id", "my-subscription-name");props.put("enable.auto.commit", "false");props.put("key.deserializer", IntegerDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());
@SuppressWarnings("resource")Consumer<Integer, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));
while (true) {    ConsumerRecords<Integer, String> records = consumer.poll(100);    records.forEach(record -> {        log.info("Received record: {}", record);    });
   // Commit last offset    consumer.commitSync();}
有些配置同生產(chǎn)者代碼的配置是類似的,例如 topic,Server 等。另外使用 Kafka 的 group.id 作為配置 Pulsar 中的訂閱名稱,關(guān)閉自動提交,在消費者端為 key 和 value 配置的是反序列化的類。然后同常規(guī)的消費者類似,開始消費消息。  

??使用 Pulsar Schema

在上述情況中使用的是 Kafka 的 Schema 來進(jìn)行序列化與反序列化,當(dāng)然也支持使用 Pulsar 的 Schema 來進(jìn)行此過程。下面使用 AVRO 進(jìn)行簡單的介紹。
首先定義 Schema 所需要使用的 pojo 類。  
@Data@ToString@EqualsAndHashCodepublic class Foo {    @Nullable    private String field1;    @Nullable    private String field2;    private int field3;}
@Data@ToString@EqualsAndHashCodepublic class Bar {    private boolean field1;}

>>> 生產(chǎn)者端代碼

String topic = "persistent://public/default/test-avro";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Bar bar = new Bar();bar.setField1(true);
Foo foo = new Foo();foo.setField1("field1");foo.setField2("field2");foo.setField3(3);

Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
for (int i = 0; i < 10; i++) {    producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));    log.info("Message {} sent successfully", i);}
producer.close();
可以看到大部分配置同上面使用 Kafka Client 的配置是類似的,但是中間加入了一些 Pulsar 的 Schema,使用 Foo 作為 key,使用 Bar 類作為 value。  

>>> 消費者端代碼

String topic = "persistent://public/default/test-avro";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("group.id", "my-subscription-name");props.put("enable.auto.commit", "false");props.put("key.deserializer", IntegerDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Bar bar = new Bar();bar.setField1(true);
Foo foo = new Foo();foo.setField1("field1");foo.setField2("field2");foo.setField3(3);
@SuppressWarnings("resource")Consumer<Foo, Bar> consumer = new PulsarKafkaConsumer<>(props, fooSchema, barSchema);consumer.subscribe(Arrays.asList(topic));
while (true) {    ConsumerRecords<Foo, Bar> records = consumer.poll(100);    records.forEach(record -> {        log.info("Received record: {}", record);    });
   // Commit last offset    consumer.commitSync();}
消費者端同樣是類似的配置,使用與生產(chǎn)者端相同的 Schema 進(jìn)行數(shù)據(jù)的反序列化。      

以上就是怎么進(jìn)行Pulsar Kafka Client的簡單分析,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

本文題目:怎么進(jìn)行PulsarKafkaClient的簡單分析
文章URL:http://aaarwkj.com/article16/iipsdg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站制作建站公司、用戶體驗、網(wǎng)站收錄微信公眾號、服務(wù)器托管

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設(shè)
国产精品日本欧美一区二区| 太爽了少妇高潮在线看片| 国产精品一区午夜福利| 亚洲女优中文字幕在线免费| 白小白的视频在线观看| 后入蜜桃臀美女在线观看| 欧美亚洲国产另类第一页| 日本午夜福利免费在线播放| av天堂久久人妻精品加勒比| 午夜精品一区二区三区久久| 国产精品一久久香蕉产线看| 怡红院一区二区三区毛片| 国产精品人妻在线av| 巨乳人妻一区二区三区| 色吊丝日韩在线观看| 五月婷婷六月丁香伊人妞| 四虎永久精品国产毛片| 国产三级精品三级在线播放| 91精品国产91久久综合福利| 91欧美精品综合在线| 麻豆人妻少妇精品系列| 国产精品一区二区三区日本| 不卡的av中文字幕在线播放| 国产精品传媒成人免费| 精品伊人久久大香线蕉| 免费国产成人在线视频| 亚洲伦理国产一国产二| 亚洲国产精品一区一区| 国产三级视频在线观看视频| 夜夜春国产精品不卡一区二区| 中文字幕乱码熟女人妻视频| 亚洲av乱码专区国产乱码| 亚洲一区二区三区经典精品| 国产精品乱人偷免费视频| 午夜体内射精免费视频| 久久午夜视频在线观看| 手机黄色av免费在线网址| 在线国产一区二区不卡| 日本av在线中文一区二区| 97高清视频在线观看| 亚洲天堂av现在观看|