色哟哟视频在线观看-色哟哟视频在线-色哟哟欧美15最新在线-色哟哟免费在线观看-国产l精品国产亚洲区在线观看-国产l精品国产亚洲区久久

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

使用MQ消息隊列時需要考慮的問題

我快閉嘴 ? 來源:稀土掘金技術社區 ? 作者:稀土掘金技術社區 ? 2022-09-13 16:37 ? 次閱讀


引入 MQ 消息中間件最直接的目的:系統解耦以及流量控制(削峰填谷)

  • 系統解耦: 上下游系統之間的通信相互依賴,利用 MQ 消息隊列可以隔離上下游環境變化帶來的不穩定因素。
  • 流量控制: 超高并發場景中,引入 MQ 可以實現流量 “削峰填谷” 的作用以及服務異步處理,不至于打崩服務。

引入 MQ 同樣帶來其他問題:數據一致性。

在分布式系統中,如果兩個節點之間存在數據同步,就會帶來數據一致性的問題。消息生產端發送消息到 MQ 再到消息消費端需要保證消息不丟失。

1bed7338-3248-11ed-ba43-dac502259ad0.jpg

所以在使用 MQ 消息隊列時,需要考慮這 3 個問題:

  • 如何知道有消息丟失?

  • 哪些環節可能丟消息?

  • 如何確保消息不丟失?

    1c0240d8-3248-11ed-ba43-dac502259ad0.jpg

1、如何知道有消息丟失?

如何感知消息是否丟失了?可總結如下:

  1. 他人反饋: 運營、PM 反饋消息丟失。
  2. 監控報警: 監控指定指標,即時報警人工調整。Kafka 集群異常、Broker 宕機、Broker 磁盤掛載問題、消費者異常導致消息積壓等都會給用戶直接感覺是消息丟失了。

案例:輿情分析中數據采集同步

1c12aa22-3248-11ed-ba43-dac502259ad0.jpg
  • PM 可自己下發采集調度指令,去采集特定數據。
  • PM 可通過 ES 近實時查詢對應數據,若沒相應數據可再次下發指令。

當感知消息丟失了,那就需要一種機制來檢查消息是否丟失。

檢索消息

運維工具有:

  1. 查看 Kafka 消費位置:
>基于SpringBoot+MyBatisPlus+Vue&Element實現的后臺管理系統+用戶小程序,支持RBAC動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
>
>*項目地址:
>*視頻教程#查看某個topic的message數量
$./kafka-run-class.shkafka.tools.GetOffsetShell--broker-listlocalhost:9092--topictest_topic


>基于SpringCloudAlibaba+Gateway+Nacos+RocketMQ+Vue&Element實現的后臺管理系統+用戶小程序,支持RBAC動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
>
>*項目地址:
>*視頻教程#查看consumerGroup列表
$./kafka-consumer-groups.sh--list--bootstrap-server192.168.88.108:9092

#查看offset消費情況
$./kafka-consumer-groups.sh--bootstrap-serverlocalhost:9092--groupconsole-consumer-1152--describe
GROUPTOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-ID
console-consumer-1152test_topic0-4-consumer-console-consumer-1152-1-2703ea2b-b62d-4cfd-8950-34e8c321b942/127.0.0.1consumer-console-consumer-1152-1
  1. 利用工具:Kafka Tools
1c1fe430-3248-11ed-ba43-dac502259ad0.jpg
  1. 其他可見化界面工具

2、哪些環節可能丟消息?

一條消息從生產到消費完成經歷 3 個環節:消息生產者、消息中間件、消息消費者。

1bed7338-3248-11ed-ba43-dac502259ad0.jpg

哪個環節都有可能出現消息丟失問題。

1)生產端

首先要認識到 Kafka 生產端發送消息流程:

調用 send() 方法時,不會立刻把消息發送出去,而是緩存起來,選擇恰當時機把緩存里的消息劃分成一批數據,通過 Sender 線程按批次發送給服務端 Broker

1c37d91e-3248-11ed-ba43-dac502259ad0.jpg

此環節丟失消息的場景有: 即導致 Producer 消息沒有發送成功

  1. 網絡波動: 生產者與服務端之間的鏈路不可達,發送超時?,F象是:各端狀態正常,但消費端就是沒有消費消息,就像丟失消息一樣。

  • *解決措施: *重試 props.put("retries", "10");
  • 不恰當配置: 發送消息無 ack 確認; 發送消息失敗無回調,無日志。

    producer.send(newProducerRecord<>(topic,messageKey,messageStr),
    newCallBack(){...});
    
  • *解決措施: *設置 acks=1 或者 acks=all。發送消息設置回調。

回顧下重要的參數 acks

  • acks=0:不需要等待服務器的確認. 這是 retries 設置無效. 響應里來自服務端的 offset 總是 -1producer只管發不管發送成功與否。延遲低,容易丟失數據。
  • acks=1:表示 leader 寫入成功(但是并沒有刷新到磁盤)后即向 producer 響應。延遲中等,一旦 leader 副本掛了,就會丟失數據。
  • acks=all:等待數據完成副本的復制, 等同于 -1. 假如需要保證消息不丟失, 需要使用該設置. 同時需要設置 unclean.leader.election.enabletrue, 保證當 ISR 列表為空時, 選擇其他存活的副本作為新的 leader.
2)服務端

先來了解下 Kafka Broker 寫入數據的過程:

  1. Broker 接收到一批數據,會先寫入內存 PageCacheOS Cache)中。
  2. 操作系統會隔段時間把 OS Cache 中數據進行刷盤,這個過程會是 「異步批量刷盤」
1c46c898-3248-11ed-ba43-dac502259ad0.jpg

這里就有個隱患,如果數據寫入 PageCacheKafka Broker宕機會怎樣?機子宕機/掉電?

  • Kafka Broker 宕機: 消息不會丟失。因為數據已經寫入 PageCache,只等待操作系統刷盤即可。

  • 機子宕機/掉電: 消息會丟失。因為數據仍在內存里,內存RAM 掉電后就會丟失數據。

  • 解決方案 :使用帶蓄電池后備電源的緩存 cache,防止系統斷電異常。
  1. 對比學習 MySQL 的 “雙1” 策略,基本不使用這個策略,因為 “雙1” 會導致頻繁的 I/O 操作,也是最慢的一種。
  2. 對比學習 RedisAOF 策略,默認且推薦的策略:**Everysec(AOF_FSYNC_EVERYSEC) 每一秒鐘保存一次(默認):** 。每個寫命令執行完, 只是先把日志寫到 AOF 文件的內存緩沖區, 每隔一秒把緩沖區中的內容寫入磁盤。

拓展:Kafka 日志刷盤機制

# 推薦采用默認值,即不配置該配置,交由操作系統自行決定何時落盤,以提升性能。
# 針對 broker 配置:
log.flush.interval.messages=10000 # 日志落盤消息條數間隔,即每接收到一定條數消息,即進行log落盤。
log.flush.interval.ms=1000        # 日志落盤時間間隔,單位ms,即每隔一定時間,即進行log落盤。

# 針對 topic 配置:
flush.messages.flush.ms=1000  # topic下每1s刷盤
flush.messages=1              # topic下每個消息都落盤


# 查看 Linux 后臺線程執行配置
$ sysctl -a | grep dirty
vm.dirty_background_bytes = 0
vm.dirty_background_ratio = 10      # 表示當臟頁占總內存的的百分比超過這個值時,后臺線程開始刷新臟頁。
vm.dirty_bytes = 0
vm.dirty_expire_centisecs = 3000    # 表示臟數據多久會被刷新到磁盤上(30秒)。
vm.dirty_ratio = 20
vm.dirty_writeback_centisecs = 500  # 表示多久喚醒一次刷新臟頁的后臺線程(5秒)。
vm.dirtytime_expire_seconds = 43200

Broker 的可靠性需要依賴其多副本機制: 一般副本數 3 個(配置參數:replication.factor=3

  • Leader Partition 副本:提供對外讀寫機制。
  • Follower Partition 副本:同步 Leader 數據。
1c4de3c6-3248-11ed-ba43-dac502259ad0.jpg

副本之間的數據同步也可能出現問題:數據丟失問題和數據不一致問題。

解決方案:ISREpoch 機制

  • ISR(In-Sync Replicas) :Le``ader 宕機,可以從 ISR 中選擇一個 Follower 作為 Leader。

  • Epoch 機制: 解決 Leader 副本高水位更新和 Follower 副本高水位更新在時間上是存在錯配問題。

    Tips: Kafka 0.11.x 版本才引入 leader epoch 機制解決高水位機制弊端。

對應需要的配置參數如下:

  1. acks=-1 或者 acks=all 必須所有副本均同步到消息,才能表明消息發送成功。

  2. replication.factor >= 3 副本數至少有 3 個。

  3. min.insync.replicas > 1 代表消息至少寫入 2個副本才算發送成功。前提需要 acks=-1

    舉個栗子:Leader 宕機了,至少要保證 ISR 中有一個 Follower,這樣這個Follwer被選舉為Leader 且不會丟失數據。

    公式:replication.factor = min.insync.replicas + 1

  4. unclean.leader.election.enable=false 防止不在 ISR 中的 Follower 被選舉為 Leader

    Kafka 0.11.0.0版本開始默認 unclean.leader.election.enable=false

3)消費端

消費端消息丟失場景有:

  1. 消息堆積: 幾個分區的消息都沒消費,就跟丟消息一樣。

  • 解決措施: 一般問題都出在消費端,盡量提高客戶端的消費速度,消費邏輯另起線程進行處理。
  • 自動提交: 消費端拉下一批數據,正在處理中自動提交了 offset,這時候消費端宕機了; 重啟后,拉到新一批數據,而上一批數據卻沒處理完。

  • 解決措施: 取消自動提交 auto.commit = false,改為手動 ack。
  • 心跳超時,引發 Rebalance 客戶端心跳超時,觸發 Rebalance被踢出消費組。如果只有這一個客戶端,那消息就不會被消費了。

    同時避免兩次 poll 的間隔時間超過閾值:

  • max.poll.records:降低該參數值,建議遠遠小于 <單個線程每秒消費的條數> * <消費線程的個數> * 的積。

  • max.poll.interval.ms: 該值要大于 / (<單個線程每秒消費的條數> * <消費線程的個數>) 的值。

  • 解決措施: 客戶端版本升級至 0.10.2 以上版本。

案例:凡凡曾遇到數據同步時,消息中的文本需經過 NLPNER 分析,再同步到 ES

這個過程的主要流程是:

1c5a2208-3248-11ed-ba43-dac502259ad0.jpg
  1. 數據同步程序從 Kafka 中拉取消息。
  2. 數據同步程序將消息內的文本發送的 NER 進行分析,得到特征數組。
  3. 數據同步程序將消息同步給 ES

現象:線上數據同步程序運行一段時間后,消息就不消費了。

  • 排查日志: 發現有 Rebalance 日志,懷疑是客戶端消費太慢被踢出了消費組。
  • 本地測試: 發現運行一段時間也會出現 Rebalance,且 NLPNER 服務訪問 HTTP 500 報錯。
  • 得出結論:NER服務異常,導致數據同步程序消費超時。且當時客戶端版本為 v0.10.1,Consumer 沒有獨立線程維持心跳,而是把心跳維持與 poll 接口耦合在一起,從而也會造成心跳超時。

當時解決措施是:

  1. session.timeout.ms 設置為 25s,當時沒有升級客戶端版本,怕帶來其他問題。
  2. 熔斷機制: 增加 Hystrix,超過 3 次服務調用異常就熔斷,保護客戶端正常消費數據。

3、如何確保消息不丟失?

掌握這些技能:

  1. 熟悉消息從發送到消費的每個階段
  2. 監控報警 Kafka 集群
  3. 熟悉方案 “MQ 可靠消息投遞”
怎么確保消息 100% 不丟失?

到這,總結下:

  1. 生產端:
  • 設置重試:props.put("retries", "10");
  • 設置 acks=all
  • 設置回調:producer.send(msg, new CallBack(){...});
  1. Broker:
  • 內存:使用帶蓄電池后備電源的緩存 cache
  • Kafka 版本 0.11.x 以上:支持 Epoch 機制。
  • replication.factor >= 3 副本數至少有 3 個。
  • min.insync.replicas > 1 代表消息至少寫入 2個副本才算發送成功。前提需要 acks=-1。
  • unclean.leader.election.enable=false 防止不在 ISR 中的 Follower 被選舉為 Leader。
  1. 消費端
  • 客戶端版本升級至 0.10.2 以上版本。
  • 取消自動提交 auto.commit = false,改為手動 ack
  • 盡量提高客戶端的消費速度,消費邏輯另起線程進行處理。


審核編輯:湯梓紅


聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 消息隊列
    +關注

    關注

    0

    文章

    33

    瀏覽量

    3011
  • kafka
    +關注

    關注

    0

    文章

    51

    瀏覽量

    5230

原文標題:案例 | Kafka 為什么會丟消息?

文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    Linux下進程通訊消息隊列

    ?MQ(message queue),從字面意思上看,本質是個隊列,FIFO 先入先出,只不過隊列中存放的內容是message 而已。MQ 是在消息的傳輸過程中保存消息的容器。多用于分
    的頭像 發表于 08-19 19:56 ?1859次閱讀
    Linux下進程通訊消息<b class='flag-5'>隊列</b>

    RT-thread內核之消息隊列

    ; pointer indicated the free node of queue *///指向空閑隊列};typedef struct rt_messagequeue *rt_mq_t;#endif
    發表于 03-06 17:17

    請問ucosIII的消息隊列怎么使用?

    POST消息,但是 接收到的消息 是空的不知道是哪的原因。并且 怎么設置 消息隊列的數據 存儲區呢 ?我記得在 ucosII 中可以直接 定義 :gsm_req_event = OSQCreate(gsm_req_mq, MAX_GSM_REQ_
    發表于 08-06 04:36

    RT-Thread系統消息隊列常用的函數接口有哪些

    struct rt_messagequeue 表示。另外 rt_mq_t 表示消息隊列的句柄,即指向消息隊列控制塊的指針。消息隊列控制塊的數據結構定義如下:結構體定義中,繼承關系一目
    發表于 03-31 14:14

    使用消息隊列的rt_mq_send參數如果不相同會怎么樣

    求助1.看論壇的文章里這里寫的消息隊列不可以直接發變長數據嗎?意思就是使用rt_mq_send函數的時候,size參數必須和rt_mq_create中的msg_size相同嗎?如果不相同會怎么樣?2.多個不同優先級的線程和中斷向
    發表于 07-29 10:11

    rt_mq_recv函數是怎么從消息隊列讀取到消息的呢

    在使用rt_mq_recv函數是,遇到這樣一段代碼:rt_uint8_t rx_size;while(1){ //從消息隊列中獲取一條信息 ret = rt_mq
    發表于 08-25 14:30

    有什么方法解決RTT消息隊列的數據發送問題

    靜態創建了一個消息隊列struct rt_messagequeue usart2_mq;static rt_uint8_t msg_pool[300];result = rt_mq
    發表于 08-31 14:37

    串口open參數對消息隊列rt_mq_recv執行的影響線程假死如何解決?

    = rt_mq_send(&rx_mq, &msg, sizeof(msg)); if ( result == -RT_EFULL) {/* 消息隊列滿 */rt_kprintf("
    發表于 02-08 10:51

    創建消息隊列失敗,STM32F103RET6使用rt_mq_init創建消息隊列出錯怎么排查啊

    user_task_thread()在mian進入 使用rt_mq_init創建消息隊列出錯,出現HardFault_Handler 斷點調試最后到 rt_mq
    發表于 07-31 09:40

    發送隊列長度功率控制

    無線多跳網絡具有信道時變性強、拓撲動態變化等特點,需要簡單高效的功率控制機制。發射功率影響數據發送速率,而基于發送隊列長度的功率控制機制存在可行解。為此,結合無線多跳網絡中間節點需要協助其他節點進行
    發表于 03-20 15:07 ?0次下載
    發送<b class='flag-5'>隊列</b>長度功率控制

    Linux IPC POSIX 消息隊列

    模型:#include#include #include mq_open() //創建/獲取消息隊列fd mq_get() //設置/獲取消息隊列
    發表于 04-02 14:46 ?589次閱讀

    引入消息隊列會多出哪些問題

    前言 最近,消息隊列(Message Queue ,簡稱 MQ)越來越火。很多公司在用,很多人在用,其重要性不言而喻。 如果讓你回答下面這些問題,你的心中是否有答案了呢? 為什么要用 MQ? 引入
    的頭像 發表于 09-23 14:53 ?1743次閱讀

    設計一個MQ需要考慮哪些問題

    本文主要講解 MQ 的通用知識,讓大家先弄明白:如果讓你來設計一個 MQ,該如何下手?需要考慮哪些問題?又有哪些技術挑戰? 有了這個基礎后,我相信后面幾篇文章再講 Kafka 和 Ro
    的頭像 發表于 11-19 14:21 ?1957次閱讀

    消息隊列經典十連問

    我們通常說的消息隊列,簡稱MQ(Message Queue),它其實就指消息中間件,當前業界比較流行的開源消息中間件包括:RabbitMQ、RocketMQ、Kafka。
    的頭像 發表于 03-22 10:08 ?1293次閱讀

    MQ消息亂序問題解析與實戰解決方案

    作者:京東物流 劉浩 1. 背景 在分布式系統中,消息隊列MQ)是實現系統解耦、異步通信的重要工具。然而,MQ消費時出現的消息亂序問題,經常會對業務邏輯的正確執行和系統穩定性產生不良影響。本文將
    的頭像 發表于 12-06 09:46 ?247次閱讀
    主站蜘蛛池模板: 日本xxxxxxx| 久久综合给会久久狠狠狠 | 少妇被阴内射XXXB少妇BB | 美女被抽插到哭内射视频免费 | 妈妈的朋友5在线观看免费完整版中文 | 宝贝好紧好爽再搔一点试視頻 | 黄色三级三级三级免费看 | 伊人久久亚洲精品一区 | 收集最新中文国产中文字幕 | 国产精品久久久久久影院 | 日本老人oldmantv乱 | 日本黄色成年人免费观看 | 日韩人妻双飞无码精品久久 | 日韩中文亚洲欧美视频二 | md2.pud 麻豆传媒官网 | 亚洲欧美无码2017在线 | 处xxxx.88| 久久久久久久久a免费 | 动态抽插图视频 | 亚洲国产精品久久无套麻豆 | 国产成人精品免费视频下载 | 国产精品爽爽久久久久久无码 | 无套内射纹身女视频 | 亚洲精品理论电影在线观看 | 国产这里有精品 | 毛片免费在线播放 | 欧美日韩精品一区二区三区四区 | WWW国产亚洲精品久久麻豆 | 国产成人精品s8p视频 | yy8090韩国理伦片在线 | 国产精品成人无码免费视频 | 国产精品久久久久久久久免费下载 | 扒开女人下面使劲桶动态图 | 亚洲一区在线观看视频 | 十七岁日本免费完整版BD | 中字幕久久久人妻熟女天美传媒 | 亚洲深夜在线 | 日日夜夜天天操 | 国内精品久久久久影院男同志 | 国产精品久久久久影院色 | 免费精品美女久久久久久久久久 |