RocketMQ中的线程池是如何创建的?
wptr33 2025-09-19 03:55 2 浏览
前言
大家好,我是小郭,今天主要来和大家聊一聊RocketMQ中的线程池是如何创建的,如何设置线程池数量,同时也可以从中去学习到一些线程池的实践和需要注意的一些细节。
RocketMQ在哪些地方使用到了线程池?
在RocketMQ中存在了大量的对线程池的使用,从消息的生产到投递Broker中,到最后的消息消费每一个环节中都大量使用到线程池的地方,下面我们拿出几个不同类型的线程池来看一看。
在 NameServer的路由注册和剔除中,多次使用到了定时线程池
定时线程池
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
复制代码
// 定时任务 每10s扫描一次Broker,移除失活Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//定时任务,每隔30s向集群中所有NameServer发送心跳包
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
复制代码
线程池newFixedThreadPool
FixedThreadPool常用于创建一个固定大小的线程池,
它的特点就是核心线程数量与最大线程数量一致,采用无界的阻塞队列 LinkedBlockingQueue,并且没有设置队列的大小默认是Integer.MAX_VALUE,适用于负载较重的场景
private ExecutorService remotingExecutor;
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 用来设置接收到消息后的处理方法
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
复制代码
消息发送初始化默认异步发送者线程池
核心线程数与最大线程数设置均为 Runtime.getRuntime().availableProcessors() ,可用的计算资源
阻塞队列设置为一个初始化50000长度的阻塞队列
keepAliveTime设置60s,超过则时间空闲的线程将被终止
private final ExecutorService defaultAsyncSenderExecutor;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
复制代码
消费端拉取消息线程池
我们重点来看一下消费端的线程池是如何创建,它可以说是整个RocketMQ中最关键的一个线程池
为了提高消费速度,我们通常有两种方式来提高消费并行度
- 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度
- 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。
如何创建?
在消息监听的时候,利用线程池进行不断的拉取消息
提交消费请求,消息提交到内部的线程池
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
复制代码
参数设置
创建内部线程池,核心参数核心线程数和最大线程数,主要是根据配置来进行设置
设置线程池名称以 ConsumeMessageThread_ 开头的,利于排查问题
阻塞队列是一个无界的阻塞队列LinkedBlockingQueue
private final BlockingQueue<Runnable> consumeRequestQueue;
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl(consumeThreadPrefix));
复制代码
通过RocketMQ的源码,我们看到 consumeExecutor 线程池的创建也是非常简单的
如果想要修改线程池参数,需要注意什么?
根据线程池的原理我们知道,只有阻塞队列为满的情况下,不会创建临时线程
所以线程池内部持有的队列为一个无界队列,导致 consumeThreadMax 大于 consumeThreadMin,线程个数最大也只能 consumeThreadMin 个线程数量
什么时候需要修改?
在正常的业务场景中,启动应用之后,我们就不会再修改消费者线程数,但有可能突发业务高峰导致消息堆积,这时候我们就需要调整单个 Consumer 的消费并行线程数。
如何修改线程数?
- 修改线程池后,重新启动消费者,缺点是参数不易评估,随着业务的并发提升,需要频繁的重启服务来更改线程数,这势必会带来一定的造成影响。
- 官方也为我们提供了修改线程数的方法,当更新的线程数大于0且小于 Short.MAX_VALUE 且小于最大线程数,则更新核心线程数。
JDK允许线程池使用方通过ThreadPoolExecutor的实例来动态设置线程池的核心策略
@Override
public void updateCorePoolSize(int corePoolSize) {
if (corePoolSize > 0
&& corePoolSize <= Short.MAX_VALUE
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
}
}
复制代码
这两种方式都存在一定的痛点
- 线程数量随着业务的变动,需要修改代码
- 在springBoot和SpringCloud Stream下,对线程池参数变更不是很友好
- 不能通过管理界面,直接动态修改线程池参数
针对上面的痛点问题,我们可以考虑封装线程池动态参数调整,首先肯定原来代码是毫无侵入性的,
同时通过管理页面对不同消费者组的线程池进行管理自由的随着业务波动进行平滑修改,降低线程池参数修改的成本。
相关推荐
- 高性能并发队列Disruptor使用详解
-
基本概念Disruptor是一个高性能的异步处理框架,是一个轻量的Java消息服务JMS,能够在无锁的情况下实现队列的并发操作Disruptor使用环形数组实现了类似队列的功能,并且是一个有界队列....
- Disruptor一个高性能队列_java高性能队列
-
Disruptor一个高性能队列前言说到队列比较熟悉的可能是ArrayBlockingQueue、LinkedBlockingQueue这两个有界队列,大多应用在线程池中使用能保证线程安全,但其安全性...
- 谈谈防御性编程_防御性策略
-
防御性编程对于程序员来说是一种良好的代码习惯,是为了保护自己的程序在不可未知的异常下,避免带来更大的破坏性崩溃,使得程序在错误发生时,依然能够云淡风轻的处理,但很多程序员入行很多年,写出的代码依然都是...
- 有人敲门,开水开了,电话响了,孩子哭了,你先顾谁?
-
前言哎呀,这种情况你肯定遇到过吧!正在家里忙活着,突然——咚咚咚有人敲门,咕噜咕噜开水开了,铃铃铃电话响了,哇哇哇孩子又哭了...我去,四件事一起来,人都懵了!你说先搞哪个?其实这跟我们写Java多线...
- 面试官:线程池如何按照core、max、queue的执行顺序去执行?
-
前言这是一个真实的面试题。前几天一个朋友在群里分享了他刚刚面试候选者时问的问题:"线程池如何按照core、max、queue的执行循序去执行?"。我们都知道线程池中代码执行顺序是:co...
- 深入剖析 Java 中线程池的多种实现方式
-
在当今高度并发的互联网软件开发领域,高效地管理和利用线程资源是提升程序性能的关键。Java作为一种广泛应用于后端开发的编程语言,为我们提供了丰富的线程池实现方式。今天,就让我们深入探讨Java中...
- 并发编程之《彻底搞懂Java线程》_java多线程并发解决方案详解
-
目录引言一、核心概念:线程是什么?...
- Redis怎么实现延时消息_redis实现延时任务
-
一句话总结Redis可通过有序集合(ZSET)实现延时消息:将消息作为value,到期时间戳作为score存入ZSET。消费者轮询用ZRANGEBYSCORE获取到期消息,配合Lua脚本保证原子性获取...
- CompletableFuture真的用对了吗?盘点它最容易被误用的5个场景
-
在Java并发编程中,CompletableFuture是处理异步任务的利器,但不少开发者在使用时踩过这些坑——线上服务突然雪崩、异常悄无声息消失、接口响应时间翻倍……本文结合真实案例,拆解5个最容易...
- 接口性能优化技巧,有点硬_接口性能瓶颈
-
背景我负责的系统到2021年初完成了功能上的建设,开始进入到推广阶段。随着推广的逐步深入,收到了很多好评的同时也收到了很多对性能的吐槽。刚刚收到吐槽的时候,我们的心情是这样的:...
- 禁止使用这5个Java类,每一个背后都有一段"血泪史"
-
某电商平台的支付系统突然报警:大量订单状态异常。排查日志发现,同一笔订单被重复支付了三次。事后复盘显示,罪魁祸首竟是一行看似无害的SimpleDateFormat代码。在Java开发中,这类因使用不安...
- 无锁队列Disruptor原理解析_无锁队列实现原理
-
队列比较队列...
- Java并发队列与容器_java 并发队列
-
【前言:无论是大数据从业人员还是Java从业人员,掌握Java高并发和多线程是必备技能之一。本文主要阐述Java并发包下的阻塞队列和并发容器,其实研读过大数据相关技术如Spark、Storm等源码的,...
- 线程池工具及拒绝策略的使用_线程池处理策略
-
线程池的拒绝策略若线程池中的核心线程数被用完且阻塞队列已排满,则此时线程池的资源已耗尽,线程池将没有足够的线程资源执行新的任务。为了保证操作系统的安全,线程池将通过拒绝策略处理新添加的线程任务。...
- 【面试题精讲】ArrayBlockingQueue 和 LinkedBlockingQueue 区别?
-
有的时候博客内容会有变动,首发博客是最新的,其他博客地址可能会未同步,认准...
- 一周热门
-
-
C# 13 和 .NET 9 全知道 :13 使用 ASP.NET Core 构建网站 (1)
-
程序员的开源月刊《HelloGitHub》第 71 期
-
详细介绍一下Redis的Watch机制,可以利用Watch机制来做什么?
-
假如有100W个用户抢一张票,除了负载均衡办法,怎么支持高并发?
-
如何将AI助手接入微信(打开ai手机助手)
-
Java面试必考问题:什么是乐观锁与悲观锁
-
SparkSQL——DataFrame的创建与使用
-
redission YYDS spring boot redission 使用
-
一文带你了解Redis与Memcached? redis与memcached的区别
-
如何利用Redis进行事务处理呢? 如何利用redis进行事务处理呢英文
-
- 最近发表
- 标签列表
-
- git pull (33)
- git fetch (35)
- mysql insert (35)
- mysql distinct (37)
- concat_ws (36)
- java continue (36)
- jenkins官网 (37)
- mysql 子查询 (37)
- python元组 (33)
- mybatis 分页 (35)
- vba split (37)
- redis watch (34)
- python list sort (37)
- nvarchar2 (34)
- mysql not null (36)
- hmset (35)
- python telnet (35)
- python readlines() 方法 (36)
- munmap (35)
- docker network create (35)
- redis 集合 (37)
- python sftp (37)
- setpriority (34)
- c语言 switch (34)
- git commit (34)