点击上方“Java基基”,选择“设为星标”
做积极的人,而不是积极废人!
源码精品专栏
原创
Java超神之路,很肝~
中文详细注释的开源项目
RPC框架Dubbo源码解析
网络应用框架Netty源码解析
消息中间件RocketMQ源码解析
数据库中间件Sharding-JDBC和MyCAT源码解析
作业调度中间件Elastic-Job源码解析
分布式事务中间件TCC-Transaction源码解析
Eureka和Hystrix源码解析
Java并发源码
来源:blog.csdn.net/qq/article/details/
设计实现测试源码地址设计之前学习Redis的时候发现有赞团队之前分享过一篇关于延时队列的设计:有赞延时队列现在就尝试实现一下
业务流程首先我们分析下这个流程
用户提交任务。首先将任务推送至延迟队列中。延迟队列接收到任务后,首先将任务推送至jobpool中,然后计算其执行时间。然后生成延迟任务(仅仅包含任务id)放入某个桶中时间组件时刻轮询各个桶,当时间到达的时候从jobpool中获得任务元信息。监测任务的合法性如果已经删除则pass。继续轮询。如果任务合法则再次计算时间如果合法则计算时间,如果时间合法:根据topic将任务放入对应的readyqueue,然后从bucket中移除。如果时间不合法,则重新计算时间再次放入bucket,并移除之前的bucket中的内容消费端轮询对应topic的readyqueue。获取job后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的job按照其设定的TTR,重新计算执行时间,并将其放入bucket。完成消费后,发送finish消息,服务端根据jobid删除对应信息。对象我们现在可以了解到中间存在的几个组件
延迟队列,为Redis延迟队列。实现消息传递Jobpool任务池保存job元信息。根据文章描述使用K/V的数据结构,key为ID,value为jobDelayBucket用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个Bucket可以知道其并没有使用topic来区分,个人这里默认使用顺序插入Timer时间组件,负责扫描各个Bucket。根据文章描述存在多个Timer,但是同一个Timer同一时间只能扫描一个BucketReadyQueue负责存放需要被完成的任务,但是根据描述根据Topic的不同存在多个ReadyQueue。其中Timer负责轮询,Jobpool、DelayBucket、ReadyQueue都是不同职责的集合。
任务状态ready:可执行状态,delay:不可执行状态,等待时钟周期。reserved:已被消费者读取,但没有完成消费。deleted:已被消费完成或者已被删除。对外提供的接口接口描述数据add添加任务Job数据pop取出待处理任务topic就是任务分组finish完成任务任务IDdelete删除任务任务ID额外的内容首先根据状态状态描述,finish和delete操作都是将任务设置成deleted状态。根据文章描述的操作,在执行finish或者delete的操作的时候任务已经从元数据中移除,此时deleted状态可能只存在极短时间,所以实际实现中就直接删除了。文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。文章中因为使用了集群,所以使用redis的setnx锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。实现现在我们根据设计内容完成设计。这一块设计我们分四步完成
任务及相关对象目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象(delayjob)
任务对象
DataAllArgsConstructorNoArgsConstructorpublicclassJobimplementsSerializable{/***延迟任务的唯一标识,用于检索任务*/JsonSerialize(using=ToStringSerializer.class)privateLongid;/***任务类型(具体业务类型)*/privateStringtopic;/***任务的延迟时间*/privatelongdelayTime;/***任务的执行超时时间*/privatelongttrTime;/***任务具体的消息内容,用于处理具体业务逻辑用*/privateStringmessage;/***重试次数*/privateintretryCount;/***任务状态*/privateJobStatusstatus;}任务引用对象
DataAllArgsConstructorpublicclassDelayJobimplementsSerializable{/***延迟任务的唯一标识*/privatelongjodId;/***任务的执行时间*/privatelongdelayDate;/***任务类型(具体业务类型)*/privateStringtopic;publicDelayJob(Jobjob){this.jodId=job.getId();this.delayDate=System.currentTimeMillis()+job.getDelayTime();this.topic=job.getTopic();}publicDelayJob(Objectvalue,Doublescore){this.jodId=Long.parseLong(String.valueOf(value));this.delayDate=System.currentTimeMillis()+score.longValue();}}容器目前我们需要完成三个容器的创建,Job任务池、延迟任务容器、待完成任务容器
job任务池,为普通的K/V结构,提供基础的操作
ComponentSlf4jpublicclassJobPool{AutowiredprivateRedisTemplateredisTemplate;privateStringNAME="job.pool";privateBoundHashOperationsgetPool(){BoundHashOperationsops=redisTemplate.boundHashOps(NAME);returnops;}/***添加任务*paramjob*/publicvoidaddJob(Jobjob){log.info("任务池添加任务:{}",JSON.toJSONString(job));getPool().put(job.getId(),job);return;}/***获得任务*paramjobId*return*/publicJobgetJob(LongjobId){Objecto=getPool().get(jobId);if(oinstanceofJob){return(Job)o;}returnnull;}/***移除任务*paramjobId*/publicvoidremoveDelayJob(LongjobId){log.info("任务池移除任务:{}",jobId);//移除任务getPool().delete(jobId);}}延迟任务,使用可排序的ZSet保存数据,提供取出最小值等操作
Slf4jComponentpublicclassDelayBucket{AutowiredprivateRedisTemplateredisTemplate;privatestaticAtomicIntegerindex=newAtomicInteger(0);Value("${thread.size}")privateintbucketsSize;privateListStringbucketNames=newArrayList();BeanpublicListStringcreateBuckets(){for(inti=0;ibucketsSize;i++){bucketNames.add("bucket"+i);}returnbucketNames;}/***获得桶的名称*return*/privateStringgetThisBucketName(){intthisIndex=index.addAndGet(1);inti1=thisIndex%bucketsSize;returnbucketNames.get(i1);}/***获得桶集合*parambucketName*return*/privateBoundZSetOperationsgetBucket(StringbucketName){returnredisTemplate.boundZSetOps(bucketName);}/***放入延时任务*paramjob*/publicvoidaddDelayJob(DelayJobjob){log.info("添加延迟任务:{}",JSON.toJSONString(job));StringthisBucketName=getThisBucketName();BoundZSetOperationsbucket=getBucket(thisBucketName);bucket.add(job,job.getDelayDate());}/***获得最新的延期任务*return*/publicDelayJobgetFirstDelayTime(Integerindex){Stringname=bucketNames.get(index);BoundZSetOperationsbucket=getBucket(name);SetZSetOperations.TypedTupleset=bucket.rangeWithScores(0,1);if(CollectionUtils.isEmpty(set)){returnnull;}ZSetOperations.TypedTupletypedTuple=(ZSetOperations.TypedTuple)set.toArray()[0];Objectvalue=typedTuple.getValue();if(valueinstanceofDelayJob){return(DelayJob)value;}returnnull;}/***移除延时任务*paramindex*paramdelayJob*/publicvoidremoveDelayTime(Integerindex,DelayJobdelayJob){Stringname=bucketNames.get(index);BoundZSetOperationsbucket=getBucket(name);bucket.remove(delayJob);}}待完成任务,内部使用topic进行细分,每个topic对应一个list集合
ComponentSlf4jpublicclassReadyQueue{AutowiredprivateRedisTemplateredisTemplate;privateStringNAME="process.queue";privateStringgetKey(Stringtopic){returnNAME+topic;}/***获得队列*paramtopic*return*/privateBoundListOperationsgetQueue(Stringtopic){BoundListOperationsops=redisTemplate.boundListOps(getKey(topic));returnops;}/***设置任务*paramdelayJob*/publicvoidpushJob(DelayJobdelayJob){log.info("执行队列添加任务:{}",delayJob);BoundListOperationslistOperations=getQueue(delayJob.getTopic());listOperations.leftPush(delayJob);}/***移除并获得任务*paramtopic*return*/publicDelayJobpopJob(Stringtopic){BoundListOperationslistOperations=getQueue(topic);Objecto=listOperations.leftPop();if(oinstanceofDelayJob){log.info("执行队列取出任务:{}",JSON.toJSONString((DelayJob)o));return(DelayJob)o;}returnnull;}}轮询处理设置了线程池为每个bucket设置一个轮询操作
ComponentpublicclassDelayTimerimplementsApplicationListenerContextRefreshedEvent{AutowiredprivateDelayBucketdelayBucket;AutowiredprivateJobPooljobPool;AutowiredprivateReadyQueuereadyQueue;Value("${thread.size}")privateintlength;OverridepublicvoidonApplicationEvent(ContextRefreshedEventcontextRefreshedEvent){ExecutorServiceexecutorService=newThreadPoolExecutor(length,length,0L,TimeUnit.MILLISECONDS,newLinkedBlockingQueueRunnable());for(inti=0;ilength;i++){executorService.execute(newDelayJobHandler(delayBucket,jobPool,readyQueue,i));}}}测试请求/***测试用请求*
authordaify*date-07-:26**/RestControllerRequestMapping("delay")publicclassDelayController{AutowiredprivateJobServicejobService;/***添加*paramrequest*return*/RequestMapping(value="add",method=RequestMethod.POST)publicStringaddDefJob(Jobrequest){DelayJobdelayJob=jobService.addDefJob(request);returnJSON.toJSONString(delayJob);}/***获取*return*/RequestMapping(value="pop",method=RequestMethod.GET)publicStringgetProcessJob(Stringtopic){Jobprocess=jobService.getProcessJob(topic);returnJSON.toJSONString(process);}/***完成一个执行的任务*paramjobId*return*/RequestMapping(value="finish",method=RequestMethod.DELETE)publicStringfinishJob(LongjobId){jobService.finishJob(jobId);return"success";}RequestMapping(value="delete",method=RequestMethod.DELETE)publicStringdeleteJob(LongjobId){jobService.deleteJob(jobId);return"success";}}测试添加延迟任务
通过postman请求:localhost:/delay/add
此时这条延时任务被添加进了线程池中
-08-:21:36.INFO---[nio--exec-6]d.samples.redis.delay.container.JobPool:任务池添加任务:{"delayTime":,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":}-08-:21:36.INFO---[nio--exec-6]d.s.redis.delay.container.DelayBucket:添加延迟任务:{"delayDate":1565616106,"jodId":3,"topic":"test"}
根据设置10秒钟之后任务会被添加至ReadyQueue中
-08-:21:46.INFO---[pool-1-thread-4]d.s.redis.delay.container.ReadyQueue:执行队列添加任务:DelayJob(jodId=3,delayDate=1565616106,topic=test)
获得任务
这时候我们请求localhost:/delay/pop
这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在DelayBucket中
-08-:36:02.INFO---[nio--exec-3]d.s.redis.delay.container.ReadyQueue:执行队列取出任务:{"delayDate":4,"jodId":1,"topic":"测试"}-08-:36:02.INFO---[nio--exec-3]d.samples.redis.delay.container.JobPool:任务池添加任务:{"delayTime":,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":}-08-:36:02.INFO---[nio--exec-3]d.s.redis.delay.container.DelayBucket:添加延迟任务:{"delayDate":1565321792,"jodId":1,"topic":"测试"}
按照设计在30秒后,任务假如没有被消费将会重新放置在ReadyQueue中
-08-:21:48.INFO---[nio--exec-7]d.s.redis.delay.container.ReadyQueue:执行队列取出任务:{"delayDate":1565616106,"jodId":3,"topic":"test"}-08-:21:48.INFO---[nio--exec-7]d.samples.redis.delay.container.JobPool:任务池添加任务:{"delayTime":,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":}
任务的删除/消费
现在我们请求:localhost:/delay/delete
此时在Jobpool中此任务将会被移除,此时元数据已经不存在,但任务还在DelayBucket中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。
-08-:21:54.INFO---[nio--exec-8]d.samples.redis.delay.container.JobPool:任务池移除任务:3-08-:21:59.INFO---[pool-1-thread-5]d.s.redis.delay.handler.DelayJobHandler:移除不存在任务:{"delayDate":1565616118,"jodId":3,"topic":"test"}源码地址
首先点击右下方在看,再长按下