目录
一、刷盘概览
二、Broker刷盘机制
1. 同步刷盘
2. 异步刷盘
1):未开启堆外内存池
2):开启堆外内存池
三、参考资料
一、刷盘概览
RocketMQ存储与读写是基于JDK NIO的内存映射机制(MappedByteBuffer),消息存储时首先将消息追加到文件内存映射(commit操作),再根据配置的刷盘策略在不同时间进行刷写到磁盘(flush操作)。同步刷盘,消息提交到文件内存映射后,将等待同步调用MappedByteBuffer的force()方法写入磁盘后返回给生产者;异步刷盘,消息提交到文件内存映射后,立刻返回给生产者。如下图所示,两种刷盘方式对比。
RocketMQ启动单独的线程周期执行刷盘操作。 broker.conf中配置flushDiskType来设定刷盘方式,值:ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘),默认为异步刷盘。
本章节以Commitlog文件刷盘机制为例来剖析RocketMQ的刷盘机制,ConsumeQueue、IndexFile刷盘的实现原理与Commitlog刷盘机制类似。注意,IndexFile文件的刷盘并不是采取定时刷盘机制,而是每更新一次索引文件就会将上一次的改动刷写到磁盘。
二、Broker刷盘机制
org.apache.rocketmq.store.CommitLog#asyncPutMessage是RocketMQ执行消息提交到文件内存映射的核心方法,消息存储流程参考《RocketMQ5.0.0消息存储<二>_消息存储流程》。其中调用org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA方法来执行同步或异步刷盘、HA主从同步复制等,该方法是刷盘入口。其调用链、代码如下所示。
/*** 执行同步或异步刷盘、HA主从同步复制等* @param putMessageResult 消息* @param messageExt 消息扩展属性* @param needAckNums 消息数量* @param needHandleHA 是否需要HA主从复制* @return*/
private CompletableFuture<PutMessageResult> handleDiskFlushAndHA(PutMessageResult putMessageResult,MessageExt messageExt, int needAckNums, boolean needHandleHA) {// 刷盘操作CompletableFuture<PutMessageStatus> flushResultFuture = handleDiskFlush(putMessageResult.getAppendMessageResult(), messageExt);CompletableFuture<PutMessageStatus> replicaResultFuture;if (!needHandleHA) {replicaResultFuture = CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);} else {replicaResultFuture = handleHA(putMessageResult.getAppendMessageResult(), putMessageResult, needAckNums);}return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);}return putMessageResult;});
}
org.apache.rocketmq.store.CommitLog.FlushManager是刷盘管理器接口,其接口的实现类是org.apache.rocketmq.store.CommitLog.DefaultFlushManager刷盘管理器实现类维护刷盘线程执行周期性刷盘操作。其UML图如下。
public DefaultFlushManager() {// 同步刷盘if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new CommitLog.GroupCommitService();}// 异步刷盘else {this.flushCommitLogService = new CommitLog.FlushRealTimeService();}// 消息提交线程this.commitLogService = new CommitLog.CommitRealTimeService();
}
1. 同步刷盘
同步刷盘:将消息追加到内存映射文件中(内存)后,立即将数据从内存刷写到磁盘文件。org.apache.rocketmq.store.CommitLog.DefaultFlushManager#handleDiskFlush是刷盘核心方法,含有同步、异步刷盘逻辑,如下代码所示。
/*** 刷盘操作:消息从内存映射文件写入到磁盘* 同步刷盘:* step1:创建组提交线程GroupCommitService* step2:创建刷盘任务请求对象GroupCommitRequest,并提交任务* step3:刷盘任务请求对象添加到flushDiskWatcher,监控刷盘,如:刷盘超时处理* step4:阻塞,获取刷盘结果* 异步刷盘:* transientStorePoolEnable是否开启堆外内存池:* 作用:申请与目前Commitlog文件大小相同的堆外内存,并锁定内存避免与虚拟内存置换* 过程:有堆外内存:消息追加到堆外内存,然后commit文件内存映射,最后flush写入磁盘* 无堆外内存:消息直接追加到文件内存映射,最后flush写入磁盘* {@link CommitRealTimeService}:默认每200ms将消息追加到文件内存映射中(commitPosition移到当前wrotePosition且flushedPosition移到当前wrotePosition)* {@link FlushRealTimeService}:默认每500ms将文件内存映射写入磁盘(flushedPosition移到当前wrotePosition)* @param result 追加消息到内存映射文件的结果* @param messageExt 消息扩展* @return 同步或异步返回写入结果*/
@Override
public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {// Synchronization flush 同步刷盘SYNC_FLUSHif (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {// 组提交线程final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;// 消息成功提交到文件内存映射中if (messageExt.isWaitStoreMsgOK()) {// 创建刷盘任务请求对象GroupCommitRequestGroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());// 监控刷盘是否完成线程,如:刷盘超时处理flushDiskWatcher.add(request);// 提交任务到线程service.putRequest(request);// 阻塞,获取刷盘结果return request.future();}// 消息还未提交完成,则当前线程处于等待状态,则唤醒线程,异步else {service.wakeup();return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}}// Asynchronous flush 异步刷盘ASYNC_FLUSH......
}
org.apache.rocketmq.store.CommitLog.GroupCommitService是一个线程,用于处理同步刷盘任务,处理对象是org.apache.rocketmq.store.CommitLog.GroupCommitRequest。
GroupCommitService类中的关键属性如下。注意,采用两个容器requestsWrite、requestsRead,避免刷盘任务提交和执行的锁冲突。
/*** 注意,采用两个容器requestsWrite、requestsRead,避免任务提交和执行的锁冲突*/
// 刷盘任务暂存容器
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
// 每次处理任务的容器
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
private final PutMessageSpinLock lock = new PutMessageSpinLock();
GroupCommitService线程的run()调用链,及刷盘核心方法org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit代码,如下所示。
/*** 写入磁盘操作* step1:逐一从requestsRead容器取出刷盘任务进行刷盘操作* step2:this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(),表示刷盘任务完成,避免两次刷盘操作* step3:最终使用FileChannel.force()完成刷盘* step4:每个刷盘任务完成后,通知调用方刷盘结果* step5:requestsRead的所有任务完成后,进行更新Checkpoint(只是更新Checkpoint内存映射,并没有进行刷盘操作: Commitlog转发到消费队列中进行触发Checkpoint刷盘)*/
private void doCommit() {if (!this.requestsRead.isEmpty()) {for (GroupCommitRequest req : this.requestsRead) {// 当前刷盘位置 >= 下一个消息刷盘偏移量,如大于则消息刷盘成功,避免两次刷盘// There may be a message in the next file, so a maximum of// two times the flushboolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();for (int i = 0; i < 2 && !flushOK; i++) {// 写入磁盘,调用:MappedFile.flush()方法,最终使用FileChannel.force()完成CommitLog.this.mappedFileQueue.flush(0);flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();}// 该刷盘任务完成后,将消息发送线程唤醒,通知调用方GroupCommitRequest刷盘结果req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);}// 所有刷盘任务完成后,更新Checkpoint,注意:只是更新Checkpoint内存映射,并没有进行刷盘操作(Commitlog转发到消费队列中进行触发Checkpoint刷盘)long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}this.requestsRead = new LinkedList<>();} else {// Because of individual messages is set to not sync flush, it// will come to this processCommitLog.this.mappedFileQueue.flush(0);}
}public void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 线程每处理一批刷盘任务后,就休息10毫秒this.waitForRunning(10);// 写入磁盘操作this.doCommit();} catch (Exception e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);}}// Under normal circumstances shutdown, wait for the arrival of the// request, and then flushtry {Thread.sleep(10);} catch (InterruptedException e) {CommitLog.log.warn("GroupCommitService Exception, ", e);}// 两个容器每执行完任务后,交互,继续消费任务synchronized (this) {this.swapRequests();}this.doCommit();CommitLog.log.info(this.getServiceName() + " service end");
}
消息生产者在消息服务端将消息内容追加到内存映射文件中后,需要同步将内存的内容立刻刷写到磁盘。通过调用内存映射文件(MappedByteBuffer的force方法)可将内存中的数据写入磁盘。
2. 异步刷盘
异步刷盘:将消息追加到内存映射文件中(内存)后,立刻返回给消息发送端。org.apache.rocketmq.store.CommitLog.DefaultFlushManager#handleDiskFlush是刷盘核心方法,含有同步、异步刷盘逻辑,如下代码所示。注意异步刷盘根据是否开启堆外内存池(transientStorePoolEnable默认false,未开启),执行不同的线程:
- 未开启堆外内存池:使用类org.apache.rocketmq.store.CommitLog.FlushRealTimeService,直接执行刷盘,即:commitPosition移到当前wrotePosition,flushedPosition移到当前wrotePosition。
- 开启堆外内存池:使用类org.apache.rocketmq.store.CommitLog.CommitRealTimeService,提交所有消息到文件内存映射,再执行刷盘,即:flushedPosition移到当前wrotePosition。
/*** 刷盘操作:消息从内存映射文件写入到磁盘* 同步刷盘:* step1:创建组提交线程GroupCommitService* step2:创建刷盘任务请求对象GroupCommitRequest,并提交任务* step3:刷盘任务请求对象添加到flushDiskWatcher,监控刷盘,如:刷盘超时处理* step4:阻塞,获取刷盘结果* 异步刷盘:* transientStorePoolEnable是否开启堆外内存池:* 作用:申请与目前Commitlog文件大小相同的堆外内存,并锁定内存避免与虚拟内存置换* 过程:有堆外内存:消息追加到堆外内存,然后commit文件内存映射,最后flush写入磁盘* 无堆外内存:消息直接追加到文件内存映射,最后flush写入磁盘* {@link CommitRealTimeService}:默认每200ms将消息追加到文件内存映射中(commitPosition移到当前wrotePosition且flushedPosition移到当前wrotePosition)* {@link FlushRealTimeService}:默认每500ms将文件内存映射写入磁盘(flushedPosition移到当前wrotePosition)* @param result 追加消息到内存映射文件的结果* @param messageExt 消息扩展* @return 同步或异步返回写入结果*/
@Override
public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {// Synchronization flush 同步刷盘SYNC_FLUSHif (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {.....}// Asynchronous flush 异步刷盘ASYNC_FLUSH/*transientStorePoolEnable是否开启堆外内存池:作用:申请与目前Commitlog文件大小相同的堆外内存,并锁定内存避免与虚拟内存置换过程:有堆外内存:消息追加到堆外内存,然后commit文件内存映射,最后flush写入磁盘无堆外内存:消息直接追加到文件内存映射,最后flush写入磁盘CommitRealTimeService:默认每200ms将消息追加到文件内存映射中(commitPosition移到当前wrotePosition且flushedPosition移到当前wrotePosition)FlushRealTimeService:默认每500ms将文件内存映射写入磁盘(flushedPosition移到当前wrotePosition)*/else {// 没有开启堆外内存if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {// 实现类FlushRealTimeServiceflushCommitLogService.wakeup();}// 开启堆外内存else {// 实现类CommitRealTimeServicecommitLogService.wakeup();}return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);}
}
1):未开启堆外内存池
FlushRealTimeService线程默认每500ms将文件内存映射写入磁盘(flushedPosition移到当前wrotePosition),如下代码所示。
/*** 刷盘过程* step1:根据休息方式flushCommitLogTimed,刷盘线程默认每次休息500ms* step2:刷盘,其最终调用FileChannel#force(boolean)* step3:更新检查点Checkpoint文件Commitlog的时间戳* 注意:并没有进行Checkpoint文件刷盘,其刷盘为消费队列刷盘时触发,入口DefaultMessageStore.FlushConsumeQueueService*/
@Override
public void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {// 等待方式:默认false,表示await()方法等待;true表示sleep()方法等待boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();// 线程运行间隔时间,默认500msint interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();// 一次刷盘任务的页数,默认4页,小于该值则忽略本次刷盘int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();// 两次真实提交的最大间隔时间,默认10sint flushPhysicQueueThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();boolean printFlushProgress = false;// 当前刷盘时间 与 上次刷盘时间 差值超出flushPhysicQueueThoroughInterval,则忽略flushPhysicQueueLeastPages参数long currentTimeMillis = System.currentTimeMillis();if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {this.lastFlushTimestamp = currentTimeMillis;flushPhysicQueueLeastPages = 0;printFlushProgress = (printTimes++ % 10) == 0; // Print flush progress 打印刷盘进度}try {// 根据等待方式,刷盘线程休息if (flushCommitLogTimed) {Thread.sleep(interval);} else {this.waitForRunning(interval);}// 打印刷盘进度if (printFlushProgress) {this.printFlushProgress();}// 开始刷盘,其最终调用FileChannel#force(boolean)long begin = System.currentTimeMillis();CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();/*刷盘完成后,更新检查点Checkpoint文件Commitlog的时间戳注意:并没有进行Checkpoint文件刷盘,其刷盘为消费队列刷盘时触发,入口DefaultMessageStore.FlushConsumeQueueService*/if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}long past = System.currentTimeMillis() - begin;CommitLog.this.getMessageStore().getPerfCounter().flowOnce("FLUSH_DATA_TIME_MS", (int) past);if (past > 500) {log.info("Flush data to disk costs {} ms", past);}} catch (Throwable e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);this.printFlushProgress();}}// 刷盘线程正常关闭后,则确保所有消息写入磁盘// Normal shutdown, to ensure that all the flush before exitboolean result = false;for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {result = CommitLog.this.mappedFileQueue.flush(0);CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));}this.printFlushProgress();CommitLog.log.info(this.getServiceName() + " service end");
}
2):开启堆外内存池
CommitRealTimeService线程默认每200ms将消息追加到文件内存映射中(commitPosition移到当前wrotePosition),再flush刷盘(flushedPosition移到当前wrotePosition),如下所示,开启堆外内存池的刷盘流程。
具体步骤说明:
step1:首先将消息直接追加到ByteBuffer(缓存),wrotePosition随着消息的不断追加向后移动;
step2:CommitRealTimeService线程默认每200ms将ByteBuffer新追加的内容(wrotePosihon减去commitedPosition)的数据提交到MappedByteBuffer中(内存映射文件);
step3:MappedByteBuffer在内存中追加提交的内容,wrotePosition指针向前后移动,然后返回;
step4:commit操作成功返回,将commitedPosition向前后移动本次提交的内容长度,此时wrotePosition指针依然可以向前推进;
step5:FlushRealTimeService线程默认每500ms将MappedByt巳Buffer中新追加的内存(wrotePosition减去上一次刷写位置flushedPositiont)通过调用MappedByteBuffer#force()方法将数据刷写到磁盘。
/*** 消息提交过程,先commit再flush*/
@Override
public void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {// 线程运行间隔时间,默认200msint interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();// 一次提交任务的页数,默认4页,小于该值则忽略本次提交int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();// 两次真实提交的最大间隔时间,默认200msint commitDataThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();// 当前提交时间 与 上次提交时间 差值超出commitDataThoroughInterval,则忽略commitDataLeastPages参数long begin = System.currentTimeMillis();if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {this.lastCommitTimestamp = begin;commitDataLeastPages = 0;}try {// 消息提交:true所有消息都提交;false部分消息已提交boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);long end = System.currentTimeMillis();if (!result) {this.lastCommitTimestamp = end; // result = false means some data committed.// 唤醒flush线程CommitLog.this.flushManager.wakeUpFlush();}CommitLog.this.getMessageStore().getPerfCounter().flowOnce("COMMIT_DATA_TIME_MS", (int) (end - begin));if (end - begin > 500) {log.info("Commit data to file costs {} ms", end - begin);}// 本次commit后,休息this.waitForRunning(interval);} catch (Throwable e) {CommitLog.log.error(this.getServiceName() + " service has exception. ", e);}}// commit线程关闭时,重试次数boolean result = false;for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {result = CommitLog.this.mappedFileQueue.commit(0);CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));}CommitLog.log.info(this.getServiceName() + " service end");
}
文件检测点文件(Checkpoint文件)的刷盘动作在刷盘消息消费队列线程中执行,其入口为org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService。由于消息消费队列、索引文件的刷盘实现原理与Commitlog文件的刷盘机制类同,本章节不做介绍。
三、参考资料
RocketMQ部署及刷盘机制_谁喝了我的菊花茶的博客-CSDN博客
RocketMQ刷盘策略_chongshui129727的博客-CSDN博客
RocketMQ5.0.0消息存储<一>_存储文件及内存映射_爱我所爱0505的博客-CSDN博客
RocketMQ5.0.0消息存储<二>_消息存储流程_爱我所爱0505的博客-CSDN博客
RocketMQ5.0.0消息存储<三>_消息转发与恢复机制_爱我所爱0505的博客-CSDN博客