百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

一口气整理一波延时队列实现方案 延时队列java

wptr33 2024-12-17 16:47 23 浏览

作者:kevinkrcai,腾讯IEG后台开发工程师

一、前言

前段时间参与了海外客服系统相关需求的开发工作,其中需要实现客服对用户消息回复超时的相关处理策略。比如:当客服接收到用户消息后,如果在3分钟内没有回复用户,则需要给用户推送一个问卷表单;当客服30分钟还没有回复用户消息,则需要给对应的客服发告警消息等。这类属于在当前时间点往后推迟指定时间执行的任务,可以采用延时队列实现此类功能,本文对延时队列的相关实现方案做一个简单的整理和总结。

二、相关实现

延时队列,就是具有延时功能的数据结构

2.1 基于内存数据结构实现

基于链表实现

基于链表实现的话,链表的每一个元素都是一个延迟任务,包含延时执行时间和延时任务的创建时间,旁路开启一个时间检测线程去轮询遍历链表,只要有当前时间减去延时任务的创建时间大于或等于延迟时间,任务就可以执行,执行完后从链表删除该任务,因此,新增任务的时间复杂度可以做到O(1),但是删除和检测任务是否能够执行时间复杂度则是O(N)。

上面是比较简单的实现思路,但是每次检测任务是否能够执行需要遍历整个链表,时间复杂度位O(N),如果在新增任务时,就根据执行时间的先后做排序,让延时时间最短的任务排在链表头部,那么检测任务是否能够执行和删除延时任务的时间复杂度能优化为O(1),相对前者较好,但是在新增任务时因需要做排序,所以时间复杂度会变为O(N)。

基于最小堆实现

基于排序链表的实现可以给我们提供了一种思路,将任务按照执行时间排序,所以我们可以考虑排序时间复杂度更优的数据结构:最小堆。JDK JUC包内置的延时队列DelayQueue就是基于最小堆实现。最小堆能保证每次新增任务,延时时间最近的排在堆的根结点,检测任务是否能够执行只需要检查根节点的任务是否到期了就行,检测任务时间复杂度是O(1),但是新增和删除任务,因为要做堆排序,所以时间复杂度是O(logN)

基于时间轮实现

什么是时间轮?

时间轮是一个存储定时任务的环状数组,相对上面提到的基于最小堆和链表的实现,时间轮可以做到新增,删除和检测任务的时间复杂度都是O(1)。

最简单的时间轮是单层时间轮,如下图所示,图中有8个时间格,每个时间格表示1s,那么这个时间轮就是表示周期为8s的时间轮,定时任务存放在时间轮每一个格子上,时间过去1s,则当前时间的指针则会向前移动一个单位时间。

延时任务如何存放在时间轮上:

计算某一个定时任务应该存放在时间轮哪一个下标下:
((延时任务执行时间 - 当前时间) / 时间轮单位时间 + 当前时间在时间轮的索引下标) % 时间周期

比如:当前时间00:03,现在有一个在00:04执行的任务,那么计算后得出,(1 / 1 + 3) % 8 = 4,定时任务应该存放在时间轮下标为4的格子上。同样道理,00:10执行的任务,就需要放在下标为2的格子上,可以复用之前的时间轮格子。

如果一个时间格子上刚好有多个任务,怎么执行?

如果延时执行的时间不同,但是下标计算的结果相同,放在了同一个时间格中,可以在这个任务中多一个round字段,表示要多转几圈,例如:当前时间是00:03,有两个延时任务,一个是00:10执行,一个是00:18执行,那他们都会放在下标为2的格子上,但是00:18执行的任务,round = 1,表示要多转一圈。只有round = 0的任务才是当前时间周期可以执行的。这些任务可以在对应的时间格中通过链表连接起来,每次对时间格的处理还需要遍历链表更新任务的round,超出当前时间周期的任务的round字段可以在减1,所以这块的时间复杂度是O(n)。

上面在检测任务的时候,还需要更新对应时间格链表的round字段,时间复杂度是O(n),通过层级时间轮可以将该复杂度优化为O(1),层级时间轮就是有多个时间轮,但是每个时间轮的单位时间是不一样的,如图所示:第一层时间轮的单位时间是1s,时间周期是0到8s,第二层的单位时间等于第一层的时间周期即8s,第二层的时间周期是0到64秒。这样对于超出第一层时间轮时间周期的任务就可以放在第二层。第一层转一圈,第二层转一格。

如下图所示:可以一个三层时间轮表示一天24小时,根据定时任务执行时间放置在不同的时间轮上,比如:02时00分01秒执行的任务比下面两个时间周期大,需要放在hours时间轮上,当current hour指向2,当前时间轮需要下降到下一层时间轮,直到下降到最后一层时间轮为止。

这类基于内存数据结构实现的延迟队列不支持分布式或者持久化,进程重启后数据会丢失,并且当延时任务很多时,队列中对象也会很多,不适合任务量比较大的情况,只适合不需要持久化且任务量相对较少的单机任务,并且能容忍丢失。

2.2 基于DB定时轮询实现

基于DB+定时任务轮询的实现方案通常就是通过一个线程定时去扫描数据库表,找到可以执行的任务触发执行。基于DB轮询,虽然可以解决任务持久化,但是也有以下几个缺点:

  • 首先,定时轮询就会存在延迟,这个延迟的误差就在于你设定的轮询的间隔时间。比如你每隔3分钟扫描一次,那最坏的延迟时间就是3分钟
  • 第二,扫描数据表对DB有压力,如果任务数量多,表的数据量大,定时扫表会增加DB磁盘IO压力。
  • 第三,对自身服务器,定时轮询也会增加CPU的开销。

2.3 基于Redis实现

基于Redis发布订阅实现

延时消息事件通过 Redis 的pub/sub来进行分发, 设置key的过期时间是延迟时间,利用key过期事件通知来实现延迟消息。因为事件监听配置默认关闭,故开启 redis 的事件监听与发布,需修改redis.conf配置文件:

#  notify-keyspace-events ""
notify-keyspace-events Ex

连接redis,订阅监听expire事件:当key过期后,就会监听到过期的key

PSUBSCRIBE __keyevent@*__:expired

如图所示:监听5秒后过期的key:

小demo:

var redisCli *redis.Client

func init() {
	// 连接redis
	redisCli = redis.NewClient(&redis.Options{
		Addr:     "127.0.0.1:6379",
	})
}

func SetExpireEvent() {
	// 设置一个键,并且3秒钟之后过期
	redisCli.Set(context.Background(), "test_expire_event_notify", "测试键值过期通知", 3*time.Second)
    fmt.Println(time.Now())
}

func SubExpireEvent() {
	// 订阅key过期事件
	sub := redisCli.PSubscribe(context.Background(), "__keyevent@*__:expired")
	for {
		msg := <- sub.Channel()
		fmt.Println("Channel ", msg.Channel)
		fmt.Println("pattern ", msg.Pattern)
		fmt.Println("pattern ", msg.Payload) // 过期的key
        fmt.Println(time.Now())
	}
}

func main() {
	SetExpireEvent()
	go SubExpireEvent()
	select {}
}

这种方案缺点在于:

  • 第一:redis官方文档中也明确指出,redis从未保证key过期后会立马删除key并立即发送过期通知,因为redis实现key过期的方式是定时随机扫描key,发现过期则删除或者在访问key的时候再去检查key是否过期,过期则删除,所以消息的延迟是必然存在的。
  • 第二:redis的事件消息是通过Pub/Sub发出来的,Pub/Sub消息不会做持久化,也没有消息的Ack机制,那样如果消费者没有ack就挂掉或者中间断链,重启后无法再处理这个消息,导致消息丢失。

基于Redis的ZSet实现

Redis的ZSet是基于score进行排序的有序数据结构,因此可以将延迟消息的到期时间戳作为score放到zset中,定时轮询存放延时任务的Zset,取出ZSet中最近要执行的延时任务和当前时间比较,小于等于当前时间即可以执行该任务。新增和查询任务的时间复杂度都是O(logN)。

小demo:

var redisCli *redis.Client

func init() {
	// 连接redis
	redisCli = redis.NewClient(&redis.Options{
		Addr:     "127.0.0.1:6379",
	})
}

func addTask()  {
	// 添加一个三秒的定时任务
	redisCli.ZAdd(context.Background(),"first-job",&redis.Z{Score:float64(time.Now().Add(3 * time.Second).Unix()),Member: 1})
	fmt.Println("添加定时任务: ",time.Now())
}

func getTaskFromZset()  {
	for {
		luaScript := `
		local message = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'WITHSCORES', 'LIMIT', 0, 1);
		if #message > 0 then
		  redis.call('ZREM', KEYS[1], message[1]);
		  return message[1];
		else
		  return nil;
		end
		`
		res, err := redis.NewScript(luaScript).Run(context.Background(), redisCli, []string{"first-job"},strconv.FormatInt(time.Now().Unix(), 10)).Result()
		if err != nil {
			continue
		}
		// 执行延迟任务
		fmt.Println("res:",res)
		fmt.Println("执行延迟任务:",time.Now())
	}
}

func main() {
	addTask()
	go getTaskFromZset()
	select {}
}

基于Redis ZSet实现相对简单,但也有几点需要注意:

  • 第一就是延时消息处理的原子性:轮询线程从Zset获取最近要执行的任务执行并将该任务移除出Zset这几个步骤整体需要封装为一个原子操作,否则服务多实例环境下,可能被其他服务的轮询线程消费到同一任务。
  • 第二,定时轮训Zset可能会出现很多空轮训消耗服务器CPU

2.4 基于MQ实现

基于Redis ZSet实现延时队列是一种理解起来比较简单直观并且可以快速落地的方案,但是需要我们自己去实现延迟消息的检测,利用消息队列对延迟消息的支持,我们可以只关心延迟消息的业务逻辑,而不需要关心延迟消息什么时候可以执行。

基于Pulsar延时消费实现

Pulsar最早是在2.4.0引入了延迟消息投递的特性,在Pulsar中使用延迟消息,可以精确指定延迟投递的时间,有deliverAfter和deliverAt两种方式。其中deliverAt可以指定具体的时间戳;deliverAfter可以指定在当前多长时间后执行。

Pulsar延迟消息投递:

			// 发送消息,5分钟后执行
			_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
				// 消息内容
				Payload:      []byte("hello go client, this is a message."),
				DeliverAfter: 5 * time.Minute,
			})

			// 发送消息,在指定时间执行
			_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
				// 消息内容
				Payload:   []byte("hello go client, this is a message."),
				DeliverAt: time.Now().Add(time.Second * 10),
			})

Pulsar延迟消息实现原理:Pulsar中的延迟消息是基于Netty中的时间轮去实现的,通过时间轮去推动时间,到达对应时间后去寻找对应的消息索引,延迟消息的索引通过优先级队列维护,优先队列根据延时时间排序,维护在Broker内存中。找到消息后在发给Consumer去消费。 Pulsar延迟消息有以下不足之处:

  • 第一:延迟消息索引维护在内存中,一条延迟消息的消息索引由三个long类型的字段组成<延迟执行的时间戳以及用于定位消息的ledgerId和entryId,所以当有大量的延迟消息需要执行或者延迟消息的延迟时间越长,对内存的开销也会越大。
  • 第二:如果集群出现 Broker 宕机或者 topic 的 ownership 转移,Pulsar 会重建延迟消息索引,对于大规模的延时消息,索引重建的时间也会比较长。

我们目前是为海外的超核玩家服务,用户基数比较小,所以我们也选用了腾讯云的TDMQ-Pulsar作为延时队列的实现方案。

基于RabbitMQ实现

基于RabbitMQ实现有两种方式:

  • 第一种是基于RabbitMQ的死信队列实现
  • 第二种是基于RabbitMQ官方的延迟插件实现

其实RabbitMQ本身是不支持延迟消息的,但是RabbitMQ支持消息和队列TTL和死信队列,因此,可以利用这两个特性实现延迟队列的效果。

什么是死信消息?

在消息生产和消费过程中,当消息出现以下情况,则会变为死信消息:

  • 消息被拒绝且不再重复投递
  • 消息超时未被消费,TTL过期
  • 队列达到最大长度无法正常投递消息

死信消息会被投递到Dead-Letter-Exchange,然后根据绑定的规则转发到队列的死信队列上,监听这些队列就消费消息就可以实现延迟消息的效果。

但是基于这种方式实现的延迟队列有个缺陷:RabbitMQ只会检查第一个消息是否过期,比如这种情况,第一个消息设置了20s的TTL,第二个消息设置了10s的TTL,RabbitMQ只会等到第一个消息过期了,才会让接下来的第二个消息过期,因此,可能会出现延迟消息投递不按时。

第一种肯定是不能应用于生产环境的,因此RabbitMQ推出了官方的延迟插件

这个插件其实是在消息发到Exchange后,将延迟消息暂时存放在mnesia(一种分布式数据库),等消息到期后在投递到队列中给消费者去消费。

使用延时插件实现延时队列也存在一些局限:

  • 延迟消息存储在 Mnesia 数据库,在集群情况下,数据只会分布在其中一个节点,如果该节点宕机,延时消息不可用,但是恢复之后消息不会丢
  • 官方文档也有提到,该插件依赖 Erlang 计时器,设计上并 不适合大量的延迟消息,如:百万级以上的延时消息
  • 插件一旦被禁用了,没有被消费的延时消息会丢失。

三、总结

相关推荐

Python自动化脚本应用与示例(python办公自动化脚本)

Python是编写自动化脚本的绝佳选择,因其语法简洁、库丰富且跨平台兼容性强。以下是Python自动化脚本的常见应用场景及示例,帮助你快速上手:一、常见自动化场景文件与目录操作...

Python文件操作常用库高级应用教程

本文是在前面《Python文件操作常用库使用教程》的基础上,进一步学习Python文件操作库的高级应用。一、高级文件系统监控1.1watchdog库-实时文件系统监控安装与基本使用:...

Python办公自动化系列篇之六:文件系统与操作系统任务

作为高效办公自动化领域的主流编程语言,Python凭借其优雅的语法结构、完善的技术生态及成熟的第三方工具库集合,已成为企业数字化转型过程中提升运营效率的理想选择。该语言在结构化数据处理、自动化文档生成...

14《Python 办公自动化教程》os 模块操作文件与文件夹

在日常工作中,我们经常会和文件、文件夹打交道,比如将服务器上指定目录下文件进行归档,或将爬虫爬取的数据根据时间创建对应的文件夹/文件,如果这些还依靠手动来进行操作,无疑是费时费力的,这时候Pyt...

python中os模块详解(python os.path模块)

os模块是Python标准库中的一个模块,它提供了与操作系统交互的方法。使用os模块可以方便地执行许多常见的系统任务,如文件和目录操作、进程管理、环境变量管理等。下面是os模块中一些常用的函数和方法:...

21-Python-文件操作(python文件的操作步骤)

在Python中,文件操作是非常重要的一部分,它允许我们读取、写入和修改文件。下面将详细讲解Python文件操作的各个方面,并给出相应的示例。1-打开文件...

轻松玩转Python文件操作:移动、删除

哈喽,大家好,我是木头左!Python文件操作基础在处理计算机文件时,经常需要执行如移动和删除等基本操作。Python提供了一些内置的库来帮助完成这些任务,其中最常用的就是os模块和shutil模块。...

Python 初学者练习:删除文件和文件夹

在本教程中,你将学习如何在Python中删除文件和文件夹。使用os.remove()函数删除文件...

引人遐想,用 Python 获取你想要的“某个人”摄像头照片

仅用来学习,希望给你们有提供到学习上的作用。1.安装库需要安装python3.5以上版本,在官网下载即可。然后安装库opencv-python,安装方式为打开终端输入命令行。...

Python如何使用临时文件和目录(python目录下文件)

在某些项目中,有时候会有大量的临时数据,比如各种日志,这时候我们要做数据分析,并把最后的结果储存起来,这些大量的临时数据如果常驻内存,将消耗大量内存资源,我们可以使用临时文件,存储这些临时数据。使用标...

Linux 下海量文件删除方法效率对比,最慢的竟然是 rm

Linux下海量文件删除方法效率对比,本次参赛选手一共6位,分别是:rm、find、findwithdelete、rsync、Python、Perl.首先建立50万个文件$testfor...

Python 开发工程师必会的 5 个系统命令操作库

当我们需要编写自动化脚本、部署工具、监控程序时,熟练操作系统命令几乎是必备技能。今天就来聊聊我在实际项目中高频使用的5个系统命令操作库,这些可都是能让你效率翻倍的"瑞士军刀"。一...

Python常用文件操作库使用详解(python文件操作选项)

Python生态系统提供了丰富的文件操作库,可以处理各种复杂的文件操作需求。本教程将介绍Python中最常用的文件操作库及其实际应用。一、标准库核心模块1.1os模块-操作系统接口主要功能...

11. 文件与IO操作(文件io和网络io)

本章深入探讨Go语言文件处理与IO操作的核心技术,结合高性能实践与安全规范,提供企业级解决方案。11.1文件读写11.1.1基础操作...

Python os模块的20个应用实例(python中 import os模块用法)

在Python中,...