背景
前段時間有個小項目需要使用延遲任務,談到延遲任務,我腦子第一時間一閃而過的就是使用消息隊列來做,比如RabbitMQ的死信隊列又或者RocketMQ的延遲隊列,但是奈何這是一個小項目,并沒有引入MQ,我也不太想因為一個延遲任務就引入MQ,增加系統復雜度,所以這個方案直接就被pass了。
雖然基于MQ這個方式走不通了,但是這個項目中使用到Redis,所以我就想是否能夠使用Redis來代替MQ實現延遲隊列的功能,于是我就查了一下有沒有現成可用的方案,別說,還真給我查到了兩種方案,并且我還仔細研究對比了這兩個方案,發現要想很好的實現延遲隊列,并不簡單。
監聽過期key
基于監聽過期key的方式來實現延遲隊列是我查到的第一個方案,為了弄懂這個方案實現的細節,我還特地去扒了扒官網,還真有所收獲
1、Redis發布訂閱模式
一談到發布訂閱模式,其實一想到的就是MQ,只不過Redis也實現了一套,并且跟MQ賊像,如圖:
圖中的channel的概念跟MQ中的topic的概念差不多,你可以把channel理解成MQ中的topic。
生產者在消息發送時需要到指定發送到哪個channel上,消費者訂閱這個channel就能獲取到消息。
在Redis中,有很多默認的channel,只不過向這些channel發送消息的生產者不是我們寫的代碼,而是Redis本身。當消費者監聽這些channel時,就可以感知到Redis中數據的變化。
這個功能Redis官方稱為keyspace notifications,字面意思就是鍵空間通知。
這些默認的channel被分為兩類:
以__keyspace@
舉個例子,現在有個消費者監聽了__keyspace@0__:sanyou這個channel,sanyou就是Redis中的一個普通key,那么當sanyou這個key被刪除或者發生了其它事件,那么消費者就會收到sanyou這個key刪除或者其它事件的消息
以__keyevent@
同樣舉個例子,現在有個消費者監聽了__keyevent@0__:expired這個channel,代表了監聽key的過期事件。那么當某個Redis的key過期了(expired),那么消費者就能收到這個key過期的消息。如果把expired換成del,那么監聽的就是刪除事件。具體支持哪些事件,可從官網查。
上述db是指具體的數據庫,Redis不是默認分為16個庫么,序號從0-15,所以db就是0到15的數字,示例中的0就是指0對應的數據庫。
3、延遲隊列實現原理
通過對上面的兩個概念了解之后,應該就對監聽過期key的實現原理一目了然了,其實就是當這個key過期之后,Redis會發布一個key過期的事件到__keyevent@
所以這種方式實現延遲隊列就只需要兩步:
發送延遲任務,key是延遲消息本身,過期時間就是延遲時間
監聽__keyevent@
4、demo
好了,基本概念和核心原理都說完了之后,又到了show me the code環節。
好巧不巧,Spring已經實現了監聽__keyevent@*__:expired這個channel這個功能,__keyevent@*__:expired中的*代表通配符的意思,監聽所有的數據庫。
所以demo寫起來就很簡單了,只需3步即可
引入pom
org.springframework.boot spring-boot-starter-data-redis 2.2.5.RELEASE org.springframework.boot spring-boot-starter-web 2.2.5.RELEASE
配置類
@Configuration publicclassRedisConfiguration{ @Bean publicRedisMessageListenerContainerredisMessageListenerContainer(RedisConnectionFactoryconnectionFactory){ RedisMessageListenerContainerredisMessageListenerContainer=newRedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(connectionFactory); returnredisMessageListenerContainer; } @Bean publicKeyExpirationEventMessageListenerredisKeyExpirationListener(RedisMessageListenerContainerredisMessageListenerContainer){ returnnewKeyExpirationEventMessageListener(redisMessageListenerContainer); } }
KeyExpirationEventMessageListener實現了對__keyevent@*__:expiredchannel的監聽
當KeyExpirationEventMessageListener收到Redis發布的過期Key的消息的時候,會發布RedisKeyExpiredEvent事件
所以我們只需要監聽RedisKeyExpiredEvent事件就可以拿到過期消息的Key,也就是延遲消息。
對RedisKeyExpiredEvent事件的監聽實現MyRedisKeyExpiredEventListener
@Component publicclassMyRedisKeyExpiredEventListenerimplementsApplicationListener{ @Override publicvoidonApplicationEvent(RedisKeyExpiredEventevent){ byte[]body=event.getSource(); System.out.println("獲取到延遲消息:"+newString(body)); } }
整個工程目錄也簡單
代碼寫好,啟動應用
之后我直接通過Redis命令設置消息,就沒通過代碼發送消息了,消息的key為sanyou,值為task,值不重要,過期時間為5s
setsanyoutask expiresanyou5
如果上面都理論都正確,不出意外的話,5s后MyRedisKeyExpiredEventListener應該可以監聽到sanyou這個key過期的消息,也就相當于拿到了延遲任務,控制臺會打印出獲取到延遲消息:sanyou。
于是我滿懷希望,靜靜地等待了5s。。
5、4、3、2、1,時間一到,我查看控制臺,但是控制臺并沒有按照預期打印出上面那句話。
為什么會沒打印出?難道是代碼寫錯了?正當我準備檢查代碼的時候,官網的一段話道出了真實原因。
我給大家翻譯一下上面這段話講的內容。
上面這段話主要討論的是key過期事件的時效性問題,首先提到了Redis過期key的兩種清除策略,就是面試八股文常背的兩種:
惰性清除。當這個key過期之后,訪問時,這個Key才會被清除
定時清除。后臺會定期檢查一部分key,如果有key過期了,就會被清除
再后面那段話是核心,意思是說,key的過期事件發布時機并不是當這個key的過期時間到了之后就發布,而是這個key在Redis中被清理之后,也就是真正被刪除之后才會發布。
到這我終于明白了,上面的例子中即使我設置了5s的過期時間,但是當5s過去之后,只要兩種清除策略都不滿足,沒人訪問sanyou這個key,后臺的定時清理的任務也沒掃描到sanyou這個key,那么就不會發布key過期的事件,自然而然也就監聽不到了。
至于后臺的定時清理的任務什么時候能掃到,這個沒有固定時間,可能一到過期時間就被掃到,也可能等一定時間才會被掃到,這就可能會造成了客戶端從發布到監聽到的消息時間差會大于等于過期時間,從而造成一定時間消息的延遲,這就著實有點坑了。。
5、坑
除了上面測試demo的時候遇到的坑之外,在我深入研究之后,還發現了一些更離譜的坑。
丟消息太頻繁
Redis的丟消息跟MQ不一樣,因為MQ都會有消息的持久化機制,可能只有當機器宕機了,才會丟點消息,但是Redis丟消息就很離譜,比如說你的服務在重啟的時候就消息會丟消息。
Redis實現的發布訂閱模式,消息是沒有持久化機制,當消息發布到某個channel之后,如果沒有客戶端訂閱這個channel,那么這個消息就丟了,并不會像MQ一樣進行持久化,等有消費者訂閱的時候再給消費者消費。
所以說,假設服務重啟期間,某個生產者或者是Redis本身發布了一條消息到某個channel,由于服務重啟,沒有監聽這個channel,那么這個消息自然就丟了。
消息消費只有廣播模式
Redis的發布訂閱模式消息消費只有廣播模式一種。
所謂的廣播模式就是多個消費者訂閱同一個channel,那么每個消費者都能消費到發布到這個channel的所有消息。
如圖,生產者發布了一條消息,內容為sanyou,那么兩個消費者都可以同時收到sanyou這條消息。
所以,如果通過監聽channel來獲取延遲任務,那么一旦服務實例有多個的話,還得保證消息不能重復處理,額外地增加了代碼開發量。
接收到所有key的某個事件
這個不屬于Redis發布訂閱模式的問題,而是Redis本身事件通知的問題。
當消費者監聽了以__keyevent@
舉個例子,某個消費者監聽了__keyevent@*__:expired這個channel,那么只要key過期了,不管這個key是張三還會李四,消費者都能收到。
所以如果你只想消費某一類消息的key,那么還得自行加一些標記,比如消息的key加個前綴,消費的時候判斷一下帶前綴的key就是需要消費的任務。
所以,綜上能夠得出一個非常重要的結論,那就是監聽Redis過期Key這種方式實現延遲隊列,不穩定,坑賊多!
那有沒有比較靠譜的延遲隊列的實現方案呢?這就不得不提到我研究的第二種方案了。
Redisson實現延遲隊列
Redisson他是Redis的兒子(Redis son),基于Redis實現了非常多的功能,其中最常使用的就是Redis分布式鎖的實現,但是除了實現Redis分布式鎖之外,它還實現了延遲隊列的功能。
先來個demo,后面再來說說這種實現的原理。
1、demo
引入pom
org.redisson redisson 3.13.1
封裝了一個RedissonDelayQueue類
@Component @Slf4j publicclassRedissonDelayQueue{ privateRedissonClientredissonClient; privateRDelayedQueuedelayQueue; privateRBlockingQueue blockingQueue; @PostConstruct publicvoidinit(){ initDelayQueue(); startDelayQueueConsumer(); } privatevoidinitDelayQueue(){ Configconfig=newConfig(); SingleServerConfigserverConfig=config.useSingleServer(); serverConfig.setAddress("redis://localhost:6379"); redissonClient=Redisson.create(config); blockingQueue=redissonClient.getBlockingQueue("SANYOU"); delayQueue=redissonClient.getDelayedQueue(blockingQueue); } privatevoidstartDelayQueueConsumer(){ newThread(()->{ while(true){ try{ Stringtask=blockingQueue.take(); log.info("接收到延遲任務:{}",task); }catch(Exceptione){ e.printStackTrace(); } } },"SANYOU-Consumer").start(); } publicvoidofferTask(Stringtask,longseconds){ log.info("添加延遲任務:{}延遲時間:{}s",task,seconds); delayQueue.offer(task,seconds,TimeUnit.SECONDS); } }
這個類在創建的時候會去初始化延遲隊列,創建一個RedissonClient對象,之后通過RedissonClient對象獲取到RDelayedQueue和RBlockingQueue對象,傳入的隊列名字叫SANYOU,這個名字無所謂。
當延遲隊列創建之后,會開啟一個延遲任務的消費線程,這個線程會一直從RBlockingQueue中通過take方法阻塞獲取延遲任務。
添加任務的時候是通過RDelayedQueue的offer方法添加的。
controller類,通過接口添加任務,延遲時間為5s
@RestController publicclassRedissonDelayQueueController{ @Resource privateRedissonDelayQueueredissonDelayQueue; @GetMapping("/add") publicvoidaddTask(@RequestParam("task")Stringtask){ redissonDelayQueue.offerTask(task,5); } }
啟動項目,添加任務
靜靜等待5s,成功獲取到任務。
2、實現原理
如下圖就是上面demo中,一個延遲隊列會在Redis內部使用到的channel和數據類型
SANYOU前面的前綴都是固定的,Redisson創建的時候會拼上前綴。
redisson_delay_queue_timeout:SANYOU,sorted set數據類型,存放所有延遲任務,按照延遲任務的到期時間戳(提交任務時的時間戳 + 延遲時間)來排序的,所以列表的最前面的第一個元素就是整個延遲隊列中最早要被執行的任務,這個概念很重要
redisson_delay_queue:SANYOU,list數據類型,也是存放所有的任務,但是研究下來發現好像沒什么用。。
SANYOU,list數據類型,被稱為目標隊列,這個里面存放的任務都是已經到了延遲時間的,可以被消費者獲取的任務,所以上面demo中的RBlockingQueue的take方法是從這個目標隊列中獲取到任務的
redisson_delay_queue_channel:SANYOU,是一個channel,用來通知客戶端開啟一個延遲任務
有了這些概念之后,再來看看整體的運行原理圖
生產者在提交任務的時候將任務放到redisson_delay_queue_timeout:SANYOU中,分數就是提交任務的時間戳+延遲時間,就是延遲任務的到期時間戳
客戶端會有一個延遲任務,為了區分,后面我都說是客戶端延遲任務。這個延遲任務會向Redis Server發送一段lua腳本,Redis執行lua腳本中的命令,并且是原子性的
這段lua腳本主要干了兩件事:
將到了延遲時間的任務從redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU這個目標隊列
獲取到redisson_delay_queue_timeout:SANYOU中目前最早到過期時間的延遲任務的到期時間戳,然后發布到redisson_delay_queue_channel:SANYOU這個channel中
當客戶端監聽到redisson_delay_queue_channel:SANYOU這個channel的消息時,會再次提交一個客戶端延遲任務,延遲時間就是消息(最早到過期時間的延遲任務的到期時間戳)- 當前時間戳,這個時間其實也就是redisson_delay_queue_channel:SANYOU中最早到過期時間的任務還剩余的延遲時間。
此處可以等待10s,好好想想。。
這樣,一旦時間來到了上面說的最早到過期時間任務的到期時間戳,redisson_delay_queue_timeout:SANYOU中上面說的最早到過期時間的任務已經到期了,客戶端的延遲任務也同時到期,于是開始執行lua腳本操作,及時將到了延遲時間的任務放到目標隊列中。然后再次發布剩余的延遲任務中最早到期的任務到期時間戳到channe中,如此循環往復,一直運行下去,保證redisson_delay_queue_timeout:SANYOU中到期的數據能及時放到目標隊列中。
所以,上述說了一大堆的主要的作用就是保證到了延遲時間的任務能夠及時被放到目標隊列。
這里再補充兩個特殊情況,圖中沒有畫出:
第一個就是如果redisson_delay_queue_timeout:SANYOU是新添加的任務(隊列之前有或者沒有任務)是隊列中最早需要被執行的,也會發布消息到channel,之后就按時上面說的流程走了。
添加任務代碼如下,也是通過lua腳本來的
第二種特殊情況就是項目啟動的時候會執行一次客戶端延遲任務。項目在重啟時,由于沒有客戶端延遲任務的執行,可能會出現redisson_delay_queue_timeout:SANYOU隊列中有到期但是沒有被放到目標隊列的可能,重啟就執行一次就是為了保證到期的數據能被及時放到目標隊列中。
3、與第一種方案比較
現在來比較一下第一種方案和Redisson的這種方案,看看有沒有第一種方案的那些坑。
第一個任務延遲的問題,Redisson方案理論上是沒有延遲的,但是當消息數量增加,消費者消費緩慢這個情況下可能會導致延遲任務消費的延遲。
第二個丟消息的問題,Redisson方案很大程度上減輕了丟消息的可能性,因為所有的任務都是存在list和sorted set兩種數據類型中,Redis有持久化機制,就算Redis宕機了,也就可能會丟一點點數據。
第三個廣播消費任務的問題,這個是不會出現的,因為每個客戶端都是從同一個目標隊列中獲取任務的。
第四個問題是Redis內部channel發布事件的問題,跟這種方案不沾邊,就更不可能存在了。
所以,通過上面的對比可以看出,Redisson這種實現方案就顯得更加的靠譜了。
審核編輯:劉清
-
數據庫
+關注
關注
7文章
3845瀏覽量
64592 -
lua腳本
+關注
關注
0文章
21瀏覽量
7601 -
Redis
+關注
關注
0文章
378瀏覽量
10907
原文標題:用 Redis 實現延遲隊列,我研究了兩種方案,發現并不簡單
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論