色哟哟视频在线观看-色哟哟视频在线-色哟哟欧美15最新在线-色哟哟免费在线观看-国产l精品国产亚洲区在线观看-国产l精品国产亚洲区久久

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

怎樣去解決Kafka消息重復的問題呢?

jf_ro2CN3Fa ? 來源:稀土掘金 ? 2023-02-12 14:18 ? 次閱讀

一、前言

數據重復這個問題其實也是挺正常,全鏈路都有可能會導致數據重復。

00fc324a-a28b-11ed-bfe3-dac502259ad0.jpg

通常,消息消費時候都會設置一定重試次數來避免網絡波動造成的影響,同時帶來副作用是可能出現消息重復。

整理下消息重復的幾個場景:

生產端: 遇到異常,基本解決措施都是 重試

場景一:leader分區不可用了,拋 LeaderNotAvailableException 異常,等待選出新 leader 分區。

場景二:Controller 所在 Broker 掛了,拋 NotControllerException 異常,等待 Controller 重新選舉。

場景三:網絡異常、斷網、網絡分區、丟包等,拋 NetworkException 異常,等待網絡恢復。

消費端: poll 一批數據,處理完畢還沒提交 offset ,機子宕機重啟了,又會 poll 上批數據,再度消費就造成了消息重復。

怎么解決?

先來了解下消息的三種投遞語義:

最多一次( at most once): 消息只發一次,消息可能會丟失,但絕不會被重復發送。例如:mqtt 中 QoS = 0。

至少一次( at least once): 消息至少發一次,消息不會丟失,但有可能被重復發送。例如:mqtt 中 QoS = 1

精確一次( exactly once): 消息精確發一次,消息不會丟失,也不會被重復發送。例如:mqtt 中 QoS = 2。

了解了這三種語義,再來看如何解決消息重復,即如何實現精準一次,可分為三種方法:

Kafka 冪等性 Producer: 保證生產端發送消息冪等。局限性,是只能保證單分區且單會話(重啟后就算新會話)

Kafka 事務: 保證生產端發送消息冪等。解決冪等 Producer 的局限性。

消費端冪等:保證消費端接收消息冪等。蔸底方案。

1)Kafka 冪等性 Producer

冪等性指 :無論執行多少次同樣的運算,結果都是相同的。即一條命令,任意多次執行所產生的影響均與一次執行的影響相同。

冪等性使用示例:在生產端添加對應配置即可

Propertiesprops=newProperties();
props.put("enable.idempotence",ture);//1.設置冪等
props.put("acks","all");//2.當enable.idempotence為true,這里默認為all
props.put("max.in.flight.requests.per.connection",5);//3.注意

設置冪等,啟動冪等。

配置 acks,注意:一定要設置 acks=all,否則會拋異常。

配置 max.in.flight.requests.per.connection 需要 <= 5 ,否則會拋異常 OutOfOrderSequenceException。

0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1

Kafka >= 1.1, max.in.flight.request.per.connection <= 5

[**為了更好理解,需要了解下 Kafka 冪等機制:]

010c3212-a28b-11ed-bfe3-dac502259ad0.jpg

Producer 每次啟動后,會向 Broker 申請一個全局唯一的 pid。(重啟后 pid 會變化,這也是弊端之一)

Sequence Numbe:針對每個 都對應一個從0開始單調遞增的 Sequence,同時 Broker端會緩存這個 seq num

判斷是否重復: 去 Broker 里對應的隊列 ProducerStateEntry.Queue(默認隊列長度為 5)查詢是否存在

如果 nextSeq == lastSeq + 1,即 服務端seq + 1 == 生產傳入seq,則接收。

如果 nextSeq == 0 && lastSeq == Int.MaxValue,即剛初始化,也接收。

反之,要么重復,要么丟消息,均拒絕。

011a794e-a28b-11ed-bfe3-dac502259ad0.jpg

這種設計針對解決了兩個問題:

消息重復: 場景 Broker 保存消息后還沒發送 ack 就宕機了,這時候 Producer 就會重試,這就造成消息重復。

消息亂序: 避免場景,前一條消息發送失敗而其后一條發送成功,前一條消息重試后成功,造成的消息亂序。

那什么時候該使用冪等:

如果已經使用 acks=all,使用冪等也可以。

如果已經使用 acks=0 或者 acks=1,說明你的系統追求高性能,對數據一致性要求不高。不要使用冪等。

2)Kafka 事務

使用 Kafka 事務解決冪等的弊端:單會話且單分區冪等。

Tips: 這塊篇幅較長,這先稍微提及下使用,之后另起一篇。

事務使用示例:分為生產端 和 消費端

Propertiesprops=newProperties();
props.put("enable.idempotence",ture);//1.設置冪等
props.put("acks","all");//2.當enable.idempotence為true,這里默認為all
props.put("max.in.flight.requests.per.connection",5);//3.最大等待數
props.put("transactional.id","my-transactional-id");//4.設定事務id

Producerproducer=newKafkaProducer(props);

//初始化事務
producer.initTransactions();

try{
//開始事務
producer.beginTransaction();

//發送數據
producer.send(newProducerRecord("Topic","Key","Value"));

//數據發送及Offset發送均成功的情況下,提交事務
producer.commitTransaction();
}catch(ProducerFencedException|OutOfOrderSequenceException|AuthorizationExceptione){
//數據發送或者Offset發送出現異常時,終止事務
producer.abortTransaction();
}finally{
//關閉Producer和Consumer
producer.close();
consumer.close();
}

這里消費端 Consumer 需要設置下配置:isolation.level 參數

read_uncommitted: 這是默認值,表明 Consumer 能夠讀取到 Kafka 寫入的任何消息,不論事務型 Producer 提交事務還是終止事務,其寫入的消息都可以讀取。

如果你用了事務型 Producer,那么對應的 Consumer 就不要使用這個值。

read_committed: 表明 Consumer 只會讀取事務型 Producer 成功提交事務寫入的消息。當然了,它也能看到非事務型 Producer 寫入的所有消息。

3)消費端冪等

“如何解決消息重復?” 這個問題,其實換一種說法:就是如何解決消費端冪等性問題。

只要消費端具備了冪等性,那么重復消費消息的問題也就解決了。

典型的方案是使用:消息表,來去重:

0129106c-a28b-11ed-bfe3-dac502259ad0.jpg

上述栗子中,消費端拉取到一條消息后,開啟事務,將消息Id 新增到本地消息表中,同時更新訂單信息

如果消息重復,則新增操作 insert 會異常,同時觸發事務回滾。

二、案例:Kafka 冪等性 Producer 使用

準備工作如下:

1、Zookeeper:本地使用 Docker 啟動

$dockerrun-d--namezookeeper-p2181:2181zookeeper
a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4

2、Kafka:版本 2.7.1,源碼編譯啟動(看上文源碼搭建啟動)

3、啟動生產者:Kafka 源碼中 exmaple 中

4、啟動消息者:可以用 Kafka 提供的腳本

>基于SpringCloudAlibaba+Gateway+Nacos+RocketMQ+Vue&Element實現的后臺管理系統+用戶小程序,支持RBAC動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
>
>*項目地址:
>*視頻教程

#舉個栗子:topic 需要自己去修改
$cd./kafka-2.7.1-src/bin
$./kafka-console-producer.sh--broker-listlocalhost:9092--topictest_topic

創建 topic : 1副本,2 分區

$./kafka-topics.sh--bootstrap-serverlocalhost:9092--topicmyTopic--create--replication-factor1--partitions2

#查看
$./kafka-topics.sh--bootstrap-serverbroker:9092--topicmyTopic--describe

生產者代碼:

013daf5e-a28b-11ed-bfe3-dac502259ad0.jpg

publicclassKafkaProducerApplication{

privatefinalProducerproducer;
finalStringoutTopic;

publicKafkaProducerApplication(finalProducerproducer,
finalStringtopic){
this.producer=producer;
outTopic=topic;
}

publicvoidproduce(finalStringmessage){
finalString[]parts=message.split("-");
finalStringkey,value;
if(parts.length>1){
key=parts[0];
value=parts[1];
}else{
key=null;
value=parts[0];
}
finalProducerRecordproducerRecord
=newProducerRecord<>(outTopic,key,value);
producer.send(producerRecord,
(recordMetadata,e)->{
if(e!=null){
e.printStackTrace();
}else{
System.out.println("key/value"+key+"/"+value+"	writtentotopic[partition]"+recordMetadata.topic()+"["+recordMetadata.partition()+"]atoffset"+recordMetadata.offset());
}
}
);
}

publicvoidshutdown(){
producer.close();
}

publicstaticvoidmain(String[]args){

finalPropertiesprops=newProperties();

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
props.put(ProducerConfig.ACKS_CONFIG,"all");

props.put(ProducerConfig.CLIENT_ID_CONFIG,"myApp");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

finalStringtopic="myTopic";
finalProducerproducer=newKafkaProducer<>(props);
finalKafkaProducerApplicationproducerApp=newKafkaProducerApplication(producer,topic);

StringfilePath="/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
try{
ListlinesToProduce=Files.readAllLines(Paths.get(filePath));
linesToProduce.stream().filter(l->!l.trim().isEmpty())
.forEach(producerApp::produce);
System.out.println("Offsetsandtimestampscommittedinbatchfrom"+filePath);
}catch(IOExceptione){
System.err.printf("Errorreadingfile%sdueto%s%n",filePath,e);
}finally{
producerApp.shutdown();
}
}
}

啟動生產者后,控制臺輸出如下:

014ce7da-a28b-11ed-bfe3-dac502259ad0.jpg

啟動消費者:

$./kafka-console-consumer.sh--bootstrap-serverlocalhost:9092--topicmyTopic
015c4680-a28b-11ed-bfe3-dac502259ad0.jpg

修改配置 acks

``

啟用冪等的情況下,調整 acks 配置,生產者啟動后結果是怎樣的:

修改配置 acks = 1

修改配置 acks = 0

會直接報錯:

Exceptioninthread"main"org.apache.kafka.common.config.ConfigException:Mustsetackstoallinordertousetheidempotentproducer.
Otherwisewecannotguaranteeidempotence.
0173c8b4-a28b-11ed-bfe3-dac502259ad0.jpg

修改配置 max.in.flight.requests.per.connection

``

啟用冪等的情況下,調整此配置,結果是怎樣的:

將 max.in.flight.requests.per.connection > 5 會怎樣?

0182e556-a28b-11ed-bfe3-dac502259ad0.jpg

當然會報錯:

Causedby:org.apache.kafka.common.config.ConfigException:Mustsetmax.in.flight.requests.per.connectiontoatmost5tousetheidempotentproducer.
01987f24-a28b-11ed-bfe3-dac502259ad0.jpg






審核編輯:劉清

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • MQTT協議
    +關注

    關注

    0

    文章

    98

    瀏覽量

    5455
  • kafka
    +關注

    關注

    0

    文章

    52

    瀏覽量

    5234

原文標題:一碰就頭疼的 Kafka 消息重復問題,立馬解決!

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    在Boost電源中該怎樣選擇電容的型號和電容容量

    我們之前了解過電容的作用,不外乎儲能、濾波等作用。那么在Boost電源中又該怎樣選擇電容的型號和電容容量
    發表于 08-14 15:44 ?3218次閱讀
    在Boost電源中該<b class='flag-5'>怎樣</b><b class='flag-5'>去</b>選擇電容的型號和電容容量<b class='flag-5'>呢</b>?

    kafka數據可靠性深度解讀

    At least once,可以保證不丟,但是可能會重復,為了解決重復需要引入唯一標識和重機制,kafka提供了GUID實現了唯一標識,但是并沒有提供自帶的
    發表于 05-08 16:29

    基于閃存存儲的Apache Kafka性能提升方法

    據生態系統中最常用的分布式消息傳遞系統之一的Apache Kafka進行評估,測試如何以最佳方式將美光固態存儲應用于 Apache Kafka,以及將產生怎樣的收益。A
    發表于 07-24 06:58

    Kafka集群環境的搭建

    :2181,zk02:2181,zk03:2181注意:broker.id安裝集群服務個數編排即可,集群下不能重復。5、啟動kafka集群# 啟動命令[root@node02 kafka2.11]# bin
    發表于 01-05 17:55

    怎樣設置數值元件的格式

    怎樣設置數值元件怎樣設置數值元件的格式
    發表于 09-26 09:16

    怎樣獲取Android的電池電壓

    怎樣獲取Android的電池電壓怎樣獲取Android的電池電流
    發表于 10-09 08:39

    怎樣使用springboot

    怎樣使用springboot?學習springboot需要懂得哪些?
    發表于 10-25 07:13

    怎樣使用HSE/HSI配置RCC的時鐘

    怎樣使用HSE/HSI配置RCC的時鐘怎樣設置系統時鐘的庫函數
    發表于 11-10 07:08

    怎樣配置設備樹的leds節點

    配置設備樹leds節點,sys文件系統中沒有出現相應設備文件,引腳沒有查出有重復定義的?怎樣配置設備樹的leds節點
    發表于 01-07 06:15

    socket通信該怎樣實現

    socket通信該怎樣實現怎樣實現socket AES-CBC加密
    發表于 01-20 07:41

    怎樣配置Android的SDIO部分

    怎樣配置Android的電源部分怎樣配置Android的SDIO部分
    發表于 02-10 07:00

    Ubuntu固件的編譯該怎樣使用

    怎樣編譯Ubuntu固件?Ubuntu固件的編譯該怎樣使用
    發表于 02-15 06:18

    怎樣寫回調函數怎樣使用回調函數

    回調函數的作用是什么?單片機怎么用回調函數在不同文件之間傳遞數據怎樣寫回調函數怎樣使
    發表于 02-23 07:40

    Kafka的概念及Kafka的宕機

    問題要從一次Kafka的宕機開始說起。 筆者所在的是一家金融科技公司,但公司內部并沒有采用在金融支付領域更為流行的 RabbitMQ ,而是采用了設計之初就為日志處理而生的 Kafka ,所以我一直
    的頭像 發表于 08-27 11:21 ?2135次閱讀
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機

    怎樣減少Confluent Cloud Kafka運營成本

    流式數據已成為企業構建和運營出色數據產品的必要條件,而 Apache Kafka 已成為實時流式傳輸的標準。
    的頭像 發表于 09-23 17:23 ?881次閱讀
    主站蜘蛛池模板: 欧洲最强rapper潮水喷视频| 暖暖 视频 在线 观看 高清| 内射一区二区精品视频在线观看| 亚洲精品有码在线观看| 国产97精品久久久天天A片| 欧美另类一区| china chinese中国人玩| 美女内射视频WWW网站午夜| 最近免费中文字幕MV免费高清| 久久精品国产亚洲AV麻豆欧美玲 | 99精品视频在线| 久久中文电影| 人与禽交3d动漫羞羞动漫| 99精彩视频在线观看| 男插女高潮一区二区| fyeex性欧美人与曾| 日韩欧美一区二区中文字幕| 范冰冰hdxxxx| 久久丫线这里只精品| 午夜福利院电影| 67194免费入口| 欧美精品一区二区三区四区| 超碰免费视频公开观看| 善良的女房东味道2在线观看| 国产精品高清视频在线| 午夜家庭影院| 极品内射少妇精品无码视频| 中文字幕在线视频在线看| 美女被爆插| 一本道本线中文无码| 久久性生大片免费观看性| AV一区AV久久AV无码| 久久视频这里只精品99热在线 | 同桌别揉我奶了嗯啊| 好男人社区| 97视频视频人人碰视频| 日韩一区二区三区射精| 国产一区二区无码蜜芽精品| 丝袜美女自摸| 狠狠射首页| avav去吧|