潍坊市论坛

首页 » 分类 » 问答 » SpringBoot整合Redis延时
TUhjnbcbe - 2021/8/19 17:49:00

点击上方“Java基基”,选择“设为星标”

做积极的人,而不是积极废人!

源码精品专栏

原创

Java超神之路,很肝~

中文详细注释的开源项目

RPC框架Dubbo源码解析

网络应用框架Netty源码解析

消息中间件RocketMQ源码解析

数据库中间件Sharding-JDBC和MyCAT源码解析

作业调度中间件Elastic-Job源码解析

分布式事务中间件TCC-Transaction源码解析

Eureka和Hystrix源码解析

Java并发源码

来源:blog.csdn.net/qq/article/details/

设计实现测试源码地址设计

之前学习Redis的时候发现有赞团队之前分享过一篇关于延时队列的设计:有赞延时队列现在就尝试实现一下

业务流程

首先我们分析下这个流程

用户提交任务。首先将任务推送至延迟队列中。延迟队列接收到任务后,首先将任务推送至jobpool中,然后计算其执行时间。然后生成延迟任务(仅仅包含任务id)放入某个桶中时间组件时刻轮询各个桶,当时间到达的时候从jobpool中获得任务元信息。监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间如果合法则计算时间,如果时间合法:根据topic将任务放入对应的readyqueue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容消费端轮询对应topic的readyqueue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。完成消费后,发送finish消息,服务端根据jobid删除对应信息。对象

我们现在可以了解到中间存在的几个组件

延迟队列,为Redis延迟队列。实现消息传递Jobpool任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为jobDelayBucket用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入Timer时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个BucketReadyQueue负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个ReadyQueue。

其中Timer负责轮询,Jobpool、DelayBucket、ReadyQueue都是不同职责的集合。

任务状态ready:可执行状态,delay:不可执行状态,等待时钟周期。reserved:已被消费者读取,但没有完成消费。deleted:已被消费完成或者已被删除。对外提供的接口接口描述数据add添加任务Job数据pop取出待处理任务topic就是任务分组finish完成任务任务IDdelete删除任务任务ID额外的内容首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。文章中因为使用了集群,所以使用redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。实现

现在我们根据设计内容完成设计。这一块设计我们分四步完成

任务及相关对象

目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象(delayjob)

任务对象

Data

AllArgsConstructor

NoArgsConstructorpublicclassJobimplementsSerializable{/***延迟任务的唯一标识,用于检索任务*/

JsonSerialize(using=ToStringSerializer.class)privateLongid;/***任务类型(具体业务类型)*/privateStringtopic;/***任务的延迟时间*/privatelongdelayTime;/***任务的执行超时时间*/privatelongttrTime;/***任务具体的消息内容,用于处理具体业务逻辑用*/privateStringmessage;/***重试次数*/privateintretryCount;/***任务状态*/privateJobStatusstatus;}

任务引用对象

Data

AllArgsConstructorpublicclassDelayJobimplementsSerializable{/***延迟任务的唯一标识*/privatelongjodId;/***任务的执行时间*/privatelongdelayDate;/***任务类型(具体业务类型)*/privateStringtopic;publicDelayJob(Jobjob){this.jodId=job.getId();this.delayDate=System.currentTimeMillis()+job.getDelayTime();this.topic=job.getTopic();}publicDelayJob(Objectvalue,Doublescore){this.jodId=Long.parseLong(String.valueOf(value));this.delayDate=System.currentTimeMillis()+score.longValue();}}容器

目前我们需要完成三个容器的创建,Job任务池、延迟任务容器、待完成任务容器

job任务池,为普通的K/V结构,提供基础的操作

Component

Slf4jpublicclassJobPool{

AutowiredprivateRedisTemplateredisTemplate;privateStringNAME="job.pool";privateBoundHashOperationsgetPool(){BoundHashOperationsops=redisTemplate.boundHashOps(NAME);returnops;}/***添加任务*

paramjob*/publicvoidaddJob(Jobjob){log.info("任务池添加任务:{}",JSON.toJSONString(job));getPool().put(job.getId(),job);return;}/***获得任务*

paramjobId*

return*/publicJobgetJob(LongjobId){Objecto=getPool().get(jobId);if(oinstanceofJob){return(Job)o;}returnnull;}/***移除任务*

paramjobId*/publicvoidremoveDelayJob(LongjobId){log.info("任务池移除任务:{}",jobId);//移除任务getPool().delete(jobId);}}

延迟任务,使用可排序的ZSet保存数据,提供取出最小值等操作

Slf4j

ComponentpublicclassDelayBucket{

AutowiredprivateRedisTemplateredisTemplate;privatestaticAtomicIntegerindex=newAtomicInteger(0);

Value("${thread.size}")privateintbucketsSize;privateListStringbucketNames=newArrayList();

BeanpublicListStringcreateBuckets(){for(inti=0;ibucketsSize;i++){bucketNames.add("bucket"+i);}returnbucketNames;}/***获得桶的名称*

return*/privateStringgetThisBucketName(){intthisIndex=index.addAndGet(1);inti1=thisIndex%bucketsSize;returnbucketNames.get(i1);}/***获得桶集合*

parambucketName*

return*/privateBoundZSetOperationsgetBucket(StringbucketName){returnredisTemplate.boundZSetOps(bucketName);}/***放入延时任务*

paramjob*/publicvoidaddDelayJob(DelayJobjob){log.info("添加延迟任务:{}",JSON.toJSONString(job));StringthisBucketName=getThisBucketName();BoundZSetOperationsbucket=getBucket(thisBucketName);bucket.add(job,job.getDelayDate());}/***获得最新的延期任务*

return*/publicDelayJobgetFirstDelayTime(Integerindex){Stringname=bucketNames.get(index);BoundZSetOperationsbucket=getBucket(name);SetZSetOperations.TypedTupleset=bucket.rangeWithScores(0,1);if(CollectionUtils.isEmpty(set)){returnnull;}ZSetOperations.TypedTupletypedTuple=(ZSetOperations.TypedTuple)set.toArray()[0];Objectvalue=typedTuple.getValue();if(valueinstanceofDelayJob){return(DelayJob)value;}returnnull;}/***移除延时任务*

paramindex*

paramdelayJob*/publicvoidremoveDelayTime(Integerindex,DelayJobdelayJob){Stringname=bucketNames.get(index);BoundZSetOperationsbucket=getBucket(name);bucket.remove(delayJob);}}

待完成任务,内部使用topic进行细分,每个topic对应一个list集合

Component

Slf4jpublicclassReadyQueue{

AutowiredprivateRedisTemplateredisTemplate;privateStringNAME="process.queue";privateStringgetKey(Stringtopic){returnNAME+topic;}/***获得队列*

paramtopic*

return*/privateBoundListOperationsgetQueue(Stringtopic){BoundListOperationsops=redisTemplate.boundListOps(getKey(topic));returnops;}/***设置任务*

paramdelayJob*/publicvoidpushJob(DelayJobdelayJob){log.info("执行队列添加任务:{}",delayJob);BoundListOperationslistOperations=getQueue(delayJob.getTopic());listOperations.leftPush(delayJob);}/***移除并获得任务*

paramtopic*

return*/publicDelayJobpopJob(Stringtopic){BoundListOperationslistOperations=getQueue(topic);Objecto=listOperations.leftPop();if(oinstanceofDelayJob){log.info("执行队列取出任务:{}",JSON.toJSONString((DelayJob)o));return(DelayJob)o;}returnnull;}}轮询处理

设置了线程池为每个bucket设置一个轮询操作

ComponentpublicclassDelayTimerimplementsApplicationListenerContextRefreshedEvent{

AutowiredprivateDelayBucketdelayBucket;

AutowiredprivateJobPooljobPool;

AutowiredprivateReadyQueuereadyQueue;

Value("${thread.size}")privateintlength;

OverridepublicvoidonApplicationEvent(ContextRefreshedEventcontextRefreshedEvent){ExecutorServiceexecutorService=newThreadPoolExecutor(length,length,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueueRunnable());for(inti=0;ilength;i++){executorService.execute(newDelayJobHandler(delayBucket,jobPool,readyQueue,i));}}}测试请求

/***测试用请求*

authordaify*

date-07-:26**/

RestController

RequestMapping("delay")publicclassDelayController{

AutowiredprivateJobServicejobService;/***添加*

paramrequest*

return*/

RequestMapping(value="add",method=RequestMethod.POST)publicStringaddDefJob(Jobrequest){DelayJobdelayJob=jobService.addDefJob(request);returnJSON.toJSONString(delayJob);}/***获取*

return*/

RequestMapping(value="pop",method=RequestMethod.GET)publicStringgetProcessJob(Stringtopic){Jobprocess=jobService.getProcessJob(topic);returnJSON.toJSONString(process);}/***完成一个执行的任务*

paramjobId*

return*/

RequestMapping(value="finish",method=RequestMethod.DELETE)publicStringfinishJob(LongjobId){jobService.finishJob(jobId);return"success";}

RequestMapping(value="delete",method=RequestMethod.DELETE)publicStringdeleteJob(LongjobId){jobService.deleteJob(jobId);return"success";}}测试

添加延迟任务

通过postman请求:localhost:/delay/add

此时这条延时任务被添加进了线程池中

-08-:21:36.INFO---[nio--exec-6]d.samples.redis.delay.container.JobPool:任务池添加任务:{"delayTime":,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":}-08-:21:36.INFO---[nio--exec-6]d.s.redis.delay.container.DelayBucket:添加延迟任务:{"delayDate":1565616106,"jodId":3,"topic":"test"}

根据设置10秒钟之后任务会被添加至ReadyQueue中

-08-:21:46.INFO---[pool-1-thread-4]d.s.redis.delay.container.ReadyQueue:执行队列添加任务:DelayJob(jodId=3,delayDate=1565616106,topic=test)

获得任务

这时候我们请求localhost:/delay/pop

这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在DelayBucket中

-08-:36:02.INFO---[nio--exec-3]d.s.redis.delay.container.ReadyQueue:执行队列取出任务:{"delayDate":4,"jodId":1,"topic":"测试"}-08-:36:02.INFO---[nio--exec-3]d.samples.redis.delay.container.JobPool:任务池添加任务:{"delayTime":,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":}-08-:36:02.INFO---[nio--exec-3]d.s.redis.delay.container.DelayBucket:添加延迟任务:{"delayDate":1565321792,"jodId":1,"topic":"测试"}

按照设计在30秒后,任务假如没有被消费将会重新放置在ReadyQueue中

-08-:21:48.INFO---[nio--exec-7]d.s.redis.delay.container.ReadyQueue:执行队列取出任务:{"delayDate":1565616106,"jodId":3,"topic":"test"}-08-:21:48.INFO---[nio--exec-7]d.samples.redis.delay.container.JobPool:任务池添加任务:{"delayTime":,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":}

任务的删除/消费

现在我们请求:localhost:/delay/delete

此时在Jobpool中此任务将会被移除,此时元数据已经不存在,但任务还在DelayBucket中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。

-08-:21:54.INFO---[nio--exec-8]d.samples.redis.delay.container.JobPool:任务池移除任务:3-08-:21:59.INFO---[pool-1-thread-5]d.s.redis.delay.handler.DelayJobHandler:移除不存在任务:{"delayDate":1565616118,"jodId":3,"topic":"test"}源码地址

首先点击右下方在看,再长按下

1
查看完整版本: SpringBoot整合Redis延时