延遲任務(wù)
最近有一個(gè)需求,基于消息隊(duì)列對(duì)數(shù)據(jù)消費(fèi),并根據(jù)多次消費(fèi)的結(jié)果對(duì)數(shù)據(jù)進(jìn)行重新組裝,如果在指定時(shí)間內(nèi),需要的數(shù)據(jù)全部到達(dá),則進(jìn)行數(shù)據(jù)組裝以及后續(xù)邏輯。簡(jiǎn)單的說(shuō),設(shè)置一個(gè)超時(shí)時(shí)間,如果在該時(shí)間內(nèi)由MQ中消費(fèi)到完整的數(shù)據(jù)則直接處理,否則進(jìn)入其他流程。
針對(duì)這種場(chǎng)景使用了延遲任務(wù)來(lái)實(shí)現(xiàn),以此為契機(jī)對(duì)延遲任務(wù)相關(guān)的技術(shù)做了個(gè)簡(jiǎn)單了解...
簡(jiǎn)介
延遲任務(wù)是一種指定任務(wù)在未來(lái)某個(gè)時(shí)間點(diǎn)或一定時(shí)間后執(zhí)行的方式。通常情況下,延遲任務(wù)可以通過(guò)設(shè)置任務(wù)的執(zhí)行時(shí)間或延遲時(shí)間來(lái)實(shí)現(xiàn)。
延遲任務(wù)可以用于異步操作、定時(shí)任務(wù)和任務(wù)調(diào)度等場(chǎng)景。例如,在用戶注冊(cè)后發(fā)送歡迎郵件或者在用戶下單后發(fā)送訂單確認(rèn)短信,可以通過(guò)延遲任務(wù)來(lái)實(shí)現(xiàn)異步操作。定時(shí)檢查服務(wù)器狀態(tài)、定時(shí)備份數(shù)據(jù)等任務(wù),也可以通過(guò)延遲任務(wù)來(lái)實(shí)現(xiàn)定時(shí)任務(wù)。在某個(gè)時(shí)間點(diǎn)觸發(fā)某個(gè)任務(wù)、在某個(gè)時(shí)間段內(nèi)重復(fù)執(zhí)行某個(gè)任務(wù)等,可以通過(guò)延遲任務(wù)來(lái)實(shí)現(xiàn)任務(wù)調(diào)度。
延遲任務(wù)通常使用隊(duì)列或者定時(shí)器來(lái)實(shí)現(xiàn)。在隊(duì)列中,任務(wù)會(huì)被添加到一個(gè)等待隊(duì)列中,等待隊(duì)列中的任務(wù)會(huì)在指定的時(shí)間點(diǎn)或延遲時(shí)間后被取出執(zhí)行。在定時(shí)器中,任務(wù)會(huì)被添加到一個(gè)定時(shí)器中,定時(shí)器會(huì)在指定的時(shí)間點(diǎn)觸發(fā)任務(wù)執(zhí)行。
總之,延遲任務(wù)是一種非常實(shí)用的技術(shù),可以幫助我們更好地管理系統(tǒng)中的異步操作、定時(shí)任務(wù)和任務(wù)調(diào)度等場(chǎng)景。
使用場(chǎng)景
異步操作:延遲任務(wù)可以用于異步操作,例如在用戶注冊(cè)后發(fā)送歡迎郵件或者在用戶下單后發(fā)送訂單確認(rèn)短信。通過(guò)使用延遲任務(wù),可以將這些操作推遲到后臺(tái)處理,從而提高系統(tǒng)的響應(yīng)速度和并發(fā)能力。
定時(shí)任務(wù):延遲任務(wù)可以用于定時(shí)任務(wù),例如定時(shí)檢查服務(wù)器狀態(tài)、定時(shí)備份數(shù)據(jù)等。通過(guò)使用延遲任務(wù),可以在指定的時(shí)間點(diǎn)自動(dòng)觸發(fā)任務(wù),避免手動(dòng)操作的繁瑣和容易出錯(cuò)。
任務(wù)調(diào)度:延遲任務(wù)可以用于任務(wù)調(diào)度,例如在某個(gè)時(shí)間點(diǎn)觸發(fā)某個(gè)任務(wù)、在某個(gè)時(shí)間段內(nèi)重復(fù)執(zhí)行某個(gè)任務(wù)等。通過(guò)使用延遲任務(wù),可以方便地進(jìn)行任務(wù)調(diào)度,提高系統(tǒng)的可靠性和穩(wěn)定性。
技術(shù)實(shí)現(xiàn)
- 基于內(nèi)存,應(yīng)用重啟(或宕機(jī))會(huì)導(dǎo)致任務(wù)丟失
- 基于內(nèi)存存放隊(duì)列,不支持集群
- 依據(jù)compareTo方法排列隊(duì)列,調(diào)用take阻塞式的取出第一個(gè)任務(wù)(不調(diào)用則不取出),比較不靈活,會(huì)影響時(shí)間的準(zhǔn)確性
- ScheduledThreadPoolExecutor
- 基于內(nèi)存,應(yīng)用重啟(或宕機(jī))會(huì)導(dǎo)致任務(wù)丟失
- 基于內(nèi)存存放任務(wù),不支持集群
- 一個(gè)任務(wù)就要新建一個(gè)線程綁定任務(wù)的執(zhí)行,容易造成資源浪費(fèi)
- Redis過(guò)期監(jiān)聽(tīng) 基于Redis過(guò)期訂閱
- 客戶端斷開(kāi)后重連會(huì)導(dǎo)致所有事件丟失
- 高并發(fā)場(chǎng)景下,存在大量的失效key場(chǎng)景會(huì)導(dǎo)出失效時(shí)間存在延遲
- 若有多個(gè)監(jiān)聽(tīng)器監(jiān)聽(tīng)該key,是會(huì)重復(fù)消費(fèi)這個(gè)過(guò)期事件的,需要特定邏輯判斷
- MQ延遲隊(duì)列 基于消息死信隊(duì)列實(shí)現(xiàn) 支持集群,分布式,高并發(fā)場(chǎng)景;缺點(diǎn):引入額外的消息隊(duì)列,增加項(xiàng)目的部署和維護(hù)的復(fù)雜度。
- HashedWheelTimer 基于Netty提供的工具類HashedWheelTimer HashedWheelTimer 是使用定時(shí)輪實(shí)現(xiàn)的,定時(shí)輪其實(shí)就是一種環(huán)型的數(shù)據(jù)結(jié)構(gòu),可以把它想象成一個(gè)時(shí)鐘, 分成了許多格子,每個(gè)格子代表一定的時(shí)間,在這個(gè)格子上用一個(gè)鏈表來(lái)保存要執(zhí)行的超時(shí)任務(wù),同時(shí)有一個(gè)指針一格一格的走,走到那個(gè)格子時(shí)就執(zhí)行格子對(duì)應(yīng)的延遲任務(wù),
其中前三種Timer、DelayQueue、ScheduledThreadPoolExecutor實(shí)現(xiàn)比較簡(jiǎn)單,只不過(guò)只適用于單體應(yīng)用,任務(wù)數(shù)據(jù)都在內(nèi)存中,在系統(tǒng)崩潰后數(shù)據(jù)丟失;后兩張實(shí)現(xiàn)相對(duì)復(fù)雜,并且需要依賴于第三方應(yīng)用,在系統(tǒng)整體結(jié)構(gòu)上更加復(fù)雜且消耗更多資源,但能支持分布式系統(tǒng),且有較高的容錯(cuò)性。
示例
定義延遲任務(wù)對(duì)象:
@Getter
public class DelayTask implements Serializable{
private static final long serialVersionUID = -5062977578344039366L;
private long delaySeconds;
private TaskExecute taskExecute;
public DelayTask(long delaySeconds, TaskExecute taskExecute) {
this.delaySeconds = delaySeconds;
this.taskExecute = taskExecute;
}
/**
*
*/
public void execute(){
taskExecute.run();
}
public interface TaskExecute extends Runnable, Serializable {
}
}
調(diào)度器:
public interface ScheduleTrigger {
/**
* 延遲任務(wù)調(diào)度
* @param delayTask
*/
void schedule(DelayTask delayTask);
}
- Timer
public class JavaTrigger implements ScheduleTrigger{
private Timer timer;
public JavaTimer(){
this.timer = new Timer();
}
/**
*
* @param delayTask
*/
public void schedule(DelayTask delayTask){
timer.schedule(buildTimerTask(delayTask.getTaskExecute()), toMillis(delayTask.getDelaySeconds()));
}
private TimerTask buildTimerTask(Runnable runnable){
return new TimerTask() {
@Override
public void run() {
runnable.run();
}
};
}
}
- DelayQueue
public class DelayQueueTrigger implements ScheduleTrigger{
private DelayQueue< Task > queue = new DelayQueue< >();
public DelayQueueTrigger() {
Thread thread = new Thread(() - > {
while (true) {
try {
Task task = queue.take();
if(task != null)
task.execute();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
thread.setDaemon(true);
thread.start();
}
/**
* @param delayTask
*/
public void schedule(DelayTask delayTask){
if( delayTask instanceof Task ){
queue.put((Task) delayTask);
}
}
}
class Task extends DelayTask implements Delayed{
private long execTime;
public Task(long delaySeconds, TaskExecute taskExecute) {
super(delaySeconds, taskExecute);
this.execTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delaySeconds);
}
/**
* 輪詢執(zhí)行該方法判斷是否滿足執(zhí)行條件(<=0)
* 同時(shí)該返回作為等待時(shí)長(zhǎng)
* @param unit the time unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return this.execTime - System.currentTimeMillis(); // ms
}
public long getExecTime() {
return execTime;
}
@Override
public int compareTo(Delayed other) {
if(this.getExecTime() == ((Task)other).getExecTime()){
return 0;
}
return this.getExecTime() > ((Task)other).getExecTime() ? 1: -1;
}
}
- ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor實(shí)現(xiàn)也是基于延遲隊(duì)列BlockingQueue實(shí)現(xiàn)
public class ScheduledExecutorTrigger implements ScheduleTrigger{
private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
public void schedule(DelayTask delayTask){
executorService.schedule(delayTask.getTaskExecute(), delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
- Redis過(guò)期監(jiān)聽(tīng)
需要修改redis配置文件:notify-keyspace-events Ex
public class RedisTimer{
private static final String EXPIRATION_KEY = "REDIS_EXPIRATION_KEY";
@Configuration
@Import(RedisAutoConfiguration.class)
public static class Config{
@Bean(name = "redisTemplate")
public RedisTemplate< Object, Object > redisTemplate(RedisConnectionFactory factory) {
RedisTemplate< Object, Object > template = new RedisTemplate< >();
RedisSerializer< String > keySerializer = new StringRedisSerializer();
RedisSerializer< Object > valueSerializer = new ObjectRedisSerializer();
template.setConnectionFactory(factory);
template.setKeySerializer(keySerializer);
template.setValueSerializer(valueSerializer);
return template;
}
/**
* 消息監(jiān)聽(tīng)器容器bean
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
@Bean
public RedisKeyExpirationListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer){
RedisKeyExpirationListener redisKeyExpirationListener = new RedisKeyExpirationListener(redisMessageListenerContainer);
redisKeyExpirationListener.setContext(context());
return redisKeyExpirationListener;
}
@Bean
public Context context(){
return new Context();
}
@Bean
public RedisTrigger redisTrigger(RedisTemplate redisTemplate){
return new RedisTrigger(redisTemplate, context());
}
class ObjectRedisSerializer implements RedisSerializer{
@Override
public byte[] serialize(Object o) throws SerializationException {
return SerializeUtils.serialize(o);
}
@Override
public Object deserialize(byte[] bytes) throws SerializationException {
return SerializeUtils.deserialize(bytes);
}
}
}
public static class RedisTrigger implements ScheduleTrigger{
private RedisTemplate redisTemplate;
private Context context;
public RedisTrigger(RedisTemplate redisTemplate, Context context){
this.redisTemplate = redisTemplate;
this.context = context;
}
public void schedule(DelayTask delayTask){
context.put(EXPIRATION_KEY, delayTask);
redisTemplate.opsForValue().set(EXPIRATION_KEY, delayTask, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
@Slf4j
public static class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
private Context context;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 這里沒(méi)法拿到過(guò)期值
* @param message never {@literal null}.
*/
@SneakyThrows
@Override
public void doHandleMessage(Message message) {
try {
String topic = new String(message.getChannel(), "utf-8");
String key = new String(message.getBody(), "utf-8");
if (EXPIRATION_KEY.equals(key)) {
Object object = context.get(EXPIRATION_KEY);
if( object instanceof DelayTask ){
log.info("redis key[{}] 過(guò)期回調(diào)", key);
((DelayTask) object).execute();
}
}
} catch (Exception e) {
log.error("處理Redis延遲任務(wù)異常:{}", e.getMessage() ,e);
}
}
public void setContext(Context context) {
this.context = context;
}
}
public static class Context{
private Map< String,Object > context = new ConcurrentHashMap< >();
public void put(String key, Object value){
context.put(key, value);
}
public Object get(String key){
return context.get(key);
}
}
}
- MQ延遲隊(duì)列
這里MQ選擇的是RabbitMq,要知道在RabbitMq中是沒(méi)有延遲隊(duì)列的,但可以通過(guò)延遲消息插件rabbitmq_delayed_message_exchange實(shí)現(xiàn),另外一種是基于死信來(lái)實(shí)現(xiàn)。
什么時(shí)候消息進(jìn)入死信?
- 1)消息消費(fèi)方調(diào)用了basicNack() 或 basicReject(),并且參數(shù)都是 requeue = false,則消息會(huì)路由進(jìn)死信隊(duì)列
- 2)消息消費(fèi)過(guò)期,過(guò)了TTL(消息、或隊(duì)列設(shè)置超時(shí)時(shí)間) 存活時(shí)間,就是消費(fèi)方在 TTL 時(shí)間之內(nèi)沒(méi)有消費(fèi),則消息會(huì)路由進(jìn)死信隊(duì)列
- 3)隊(duì)列設(shè)置了x-max-length 最大消息數(shù)量且當(dāng)前隊(duì)列中的消息已經(jīng)達(dá)到了這個(gè)數(shù)量,再次投遞,消息將被擠掉,被擠掉的消息會(huì)路由進(jìn)死信隊(duì)列
public class RabbitTimer{
@Configuration
@Import(RabbitAutoConfiguration.class)
public static class Config{
static final String TTL_EXCHANGE_FOR_SCHEDULE = "TTL_EXCHANGE_FOR_SCHEDULE";
static final String TTL_QUEUE_FOR_SCHEDULE = "TTL_QUEUE_FOR_SCHEDULE";
static final String TTL_ROUTING_KEY_FOR_SCHEDULE = "TTL_ROUTING_KEY_FOR_SCHEDULE";
static final String COMMON_QUEUE_FOR_SCHEDULE = "COMMON_QUEUE_FOR_SCHEDULE";
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable(TTL_QUEUE_FOR_SCHEDULE).build();
}
@Bean
public Exchange ttlExchange(){
return ExchangeBuilder.directExchange(TTL_EXCHANGE_FOR_SCHEDULE).build();
}
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with(TTL_ROUTING_KEY_FOR_SCHEDULE).noargs();
}
@Bean
public Queue commonQueue(){
return QueueBuilder.durable(COMMON_QUEUE_FOR_SCHEDULE)
.deadLetterExchange(TTL_EXCHANGE_FOR_SCHEDULE)
.deadLetterRoutingKey(TTL_ROUTING_KEY_FOR_SCHEDULE)
.build();
}
@Bean
public TtlMessageConsumer ttlMessageConsumer(){
return new TtlMessageConsumer();
}
@Bean
public RabbitTrigger rabbitTrigger(RabbitTemplate rabbitTemplate){
return new RabbitTrigger(rabbitTemplate);
}
}
@Slf4j
@RabbitListener(queues=TTL_QUEUE_FOR_SCHEDULE)
public static class TtlMessageConsumer{
@RabbitHandler
public void handle(byte [] message){
Object deserialize = SerializeUtils.deserialize(message);
if( deserialize instanceof DelayTask ){
((DelayTask) deserialize).execute();
}
}
}
public static class RabbitTrigger implements ScheduleTrigger{
@Autowired
private RabbitTemplate rabbitTemplate;
public RabbitTrigger(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void schedule(DelayTask delayTask){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration( String.valueOf(TimeUnit.SECONDS.toMillis(delayTask.getDelaySeconds())));
Message message = new Message(SerializeUtils.serialize(delayTask), messageProperties);
rabbitTemplate.send(COMMON_QUEUE_FOR_SCHEDULE, message);
}
}
}
- HashedWheelTimer
public class NettyTrigger implements ScheduleTrigger {
HashedWheelTimer timer = new HashedWheelTimer(200,
TimeUnit.MILLISECONDS,
100); // 時(shí)間輪中的槽數(shù)
/**
*
*/
@Override
public void schedule(DelayTask delayTask){
TimerTask task = timeout - > delayTask.execute();
//
timer.newTimeout(task, delayTask.getDelaySeconds(), TimeUnit.SECONDS);
}
}
測(cè)試:
ScheduleTrigger.schedule(DelayTask delayTask);
結(jié)束語(yǔ)
通過(guò)幾個(gè)簡(jiǎn)單的示例了解延遲隊(duì)列的實(shí)現(xiàn)方式,可以根據(jù)實(shí)際業(yè)務(wù)場(chǎng)景以及應(yīng)用架構(gòu)做出合理的選擇。
-
數(shù)據(jù)
+關(guān)注
關(guān)注
8文章
7048瀏覽量
89073 -
服務(wù)器
+關(guān)注
關(guān)注
12文章
9184瀏覽量
85482 -
內(nèi)存
+關(guān)注
關(guān)注
8文章
3028瀏覽量
74076 -
定時(shí)器
+關(guān)注
關(guān)注
23文章
3250瀏覽量
114866 -
延遲
+關(guān)注
關(guān)注
1文章
70瀏覽量
13529
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論