欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

SpringBoot怎么整合Kafka

本文小編為大家詳細(xì)介紹“SpringBoot怎么整合Kafka”,內(nèi)容詳細(xì),步驟清晰,細(xì)節(jié)處理妥當(dāng),希望這篇“SpringBoot怎么整合Kafka”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來(lái)學(xué)習(xí)新知識(shí)吧。

目前成都創(chuàng)新互聯(lián)已為數(shù)千家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)頁(yè)空間、網(wǎng)站托管、服務(wù)器租用、企業(yè)網(wǎng)站設(shè)計(jì)、霞山網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。

一、準(zhǔn)備工作
提前說(shuō)明:如果你運(yùn)行出問(wèn)題,請(qǐng)檢查Kafka的版本與SpringBoot的版本是否與我文中的一致,本文中的環(huán)境已經(jīng)經(jīng)過(guò)測(cè)試。

Kafka服務(wù)版本為 kafka_2.11-1.1.0 (Scala), 也就是1.1.0
SpringBoot版本:1.5.10.RELEASE

提前啟動(dòng)zk,kafka,并且創(chuàng)建一個(gè)Topic

[root@Basic kafka_2.11-1.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic 

確保你的kafka能夠訪問(wèn),如果訪問(wèn)不了,需要打開外網(wǎng)訪問(wèn)。
config/server.properties

advertised.listeners=PLAINTEXT://192.168.239.128:9092

Maven 依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

二、項(xiàng)目結(jié)構(gòu)
為了更加體現(xiàn)實(shí)際開發(fā)需求,一般生產(chǎn)者都是在調(diào)用某些接口的服務(wù)處理完邏輯之后然后往kafka里面扔數(shù)據(jù),然后有一個(gè)消費(fèi)者不停的監(jiān)控這個(gè)Topic,然后處理數(shù)據(jù),所以這里把生產(chǎn)者作為一個(gè)接口,消費(fèi)者放到kafka這個(gè)目錄下,注意@Component注解,不然掃描不到@KafkaListener

三、具體實(shí)現(xiàn)代碼
SpringBoot配置文件
application.yml

spring:
kafka:
bootstrap-servers: 192.168.239.128:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

生產(chǎn)者
package cn.saytime.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 測(cè)試kafka生產(chǎn)者
*/
@RestController
@RequestMapping("kafka")
public class TestKafkaProducerController {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@RequestMapping("send")
public String send(String msg){
kafkaTemplate.send("test_topic", msg);
return "success";
}

}
消費(fèi)者
這里的消費(fèi)者會(huì)監(jiān)聽這個(gè)主題,有消息就會(huì)執(zhí)行,不需要進(jìn)行while(true)

package cn.saytime.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
* kafka消費(fèi)者測(cè)試
*/
@Component
public class TestConsumer {

@KafkaListener(topics = "test_topic")
public void listen (ConsumerRecord<?, ?> record) throws Exception {
System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}

項(xiàng)目啟動(dòng)類

package cn.saytime;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TestApplication{

public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
}

四、測(cè)試
運(yùn)行項(xiàng)目,執(zhí)行:http://localhost:8080/kafka/send?msg=hello

控制臺(tái)輸出:

topic = test_topic, offset = 19, value = hello 
1
為了體現(xiàn)消費(fèi)者不止執(zhí)行一次就結(jié)束,再調(diào)用一次接口: 
http://localhost:8080/kafka/send?msg=kafka

topic = test_topic, offset = 20, value = kafka 
1
所以可以看到這里消費(fèi)者實(shí)際上是不停的poll Topic數(shù)據(jù)的。

讀到這里,這篇“SpringBoot怎么整合Kafka”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識(shí)點(diǎn)還需要大家自己動(dòng)手實(shí)踐使用過(guò)才能領(lǐng)會(huì),如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

當(dāng)前標(biāo)題:SpringBoot怎么整合Kafka
URL分享:http://aaarwkj.com/article34/ihhjse.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號(hào)、軟件開發(fā)、移動(dòng)網(wǎng)站建設(shè)、建站公司小程序開發(fā)、搜索引擎優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

小程序開發(fā)
国产亚洲综合一区二区三区| 国产精品亚洲欧美在线| 日本中文字幕区二区三区电影| 五月婷婷少妇中文字幕| 少妇的诱惑免费在线看| 欧美激情韩国三级日本| 国产精品久久123区| 国产精品自拍午夜福利| 欧美高清在线观看视频| 人妻熟妇av在线一区二区三区| 欧美日韩在线精品1区2区| 欧美日韩精品福利一区二区| 亚洲美女插入av网络导航| av熟女乱一区二区三区| 久热在线这里只有精品| 青青草原一区二区三区| 一本久久综合亚洲鲁鲁五月天| 九九热精品只有这里有| 欧美大片免费高清观看| 欧美成人精品视频在线不卡| 日本av免费观看一区二区| 精品欧美自拍偷拍三区| 久久精品少妇人妻视频| 2020中文字字幕在线不卡| 国产亚洲欧美精品久久久久久| 视频一区日本视频二区| 亚洲国产一区二区高清| 免费高清日本一区二区三区视频 | 国产亚洲一区激情小说| 一区不卡在线视频免费国产| 日韩精品中文字幕人妻系列| 欧美日韩国产av一区| 国产黄色大片一级久久 | 久久熟妇少妇亚洲精品| 欧美国产成人精品一区| 十八禁真人无摭挡观看| 在线观看免费国产不卡| 热精品韩国毛久久久久久| 黄片免费在线播放欧美| 国产饥渴熟女在线三区| 一区二区三区欧美黑人|