您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

「RocketMQ」消息的刷盘机制

时间:2022-07-06 09:00:54  来源:  作者:Java码农之路

刷盘策略

CommitLog的asyncPutMessage方法中可以看到在写入消息之后,调用了submitFlushRequest方法执行刷盘策略:

public class CommitLog {
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // ...
        try {
            // 获取上一次写入的文件
            MAppedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            // ...
            // 写入消息
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            // ...
        } finally {
            beginTimeInLock = 0;
            putMessageLock.unlock();
        }
        // ...
        // 执行刷盘
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        // ...
    }
}

刷盘有两种策略:

  • 同步刷盘,表示消息写入到内存之后需要立刻刷到磁盘文件中。
  • 同步刷盘会构建GroupCommitRequest组提交请求并设置本次刷盘后的位置偏移量的值(写入位置偏移量+写入数据字节数),然后将请求添加到flushDiskWatcher和GroupCommitService中进行刷盘。
  • 异步刷盘,表示消息写入内存成功之后就返回,由MQ定时将数据刷入到磁盘中,会有一定的数据丢失风险。
public class CommitLog {
    // 监控刷盘
    private final FlushDiskWatcher flushDiskWatcher;
    public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
        // 是否是同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            // 获取GroupCommitService
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            // 是否等待
            if (messageExt.isWAItStoreMsgOK()) {
                // 构建组提交请求,传入本次刷盘后位置的偏移量:写入位置偏移量+写入数据字节数
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                        this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                // 添加到wather中
                flushDiskWatcher.add(request);
                // 添加到service
                service.putRequest(request);
                // 返回
                return request.future();
            } else {
                service.wakeup();
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
            }
        }
        // 如果是异步刷盘
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else  {
                commitLogService.wakeup();
            }
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
}

同步刷盘

如果使用的是同步刷盘,首先获取了GroupCommitService,然后构建GroupCommitRequest组提交请求,将请求添加到flushDiskWatcher和GroupCommitService中,其中flushDiskWatcher用于监控刷盘是否超时,GroupCommitService用于提交刷盘数据。

构建GroupCommitRequest提交请求

GroupCommitRequest是CommitLog的内部类:

  • nextOffset:写入位置偏移量+写入数据字节数,也就是本次刷盘成功后应该对应的flush偏移量
  • flushOKFuture:刷盘结果
  • deadLine:刷盘的限定时间,值为当前时间 + 传入的超时时间,超过限定时间还未刷盘完毕会被认为超时
public class CommitLog {
    public static class GroupCommitRequest {
        private final long nextOffset;
        // 刷盘状态
        private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
        private final long deadLine;// 刷盘的限定时间,超过限定时间还未刷盘完毕会被认为超时

        public GroupCommitRequest(long nextOffset, long timeoutMillis) {
            this.nextOffset = nextOffset;
            // 设置限定时间:当前时间 + 超时时间
            this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);
        }

        public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
            // 结束刷盘,设置刷盘状态
            this.flushOKFuture.complete(putMessageStatus);
        }

        public CompletableFuture<PutMessageStatus> future() {
            // 返回刷盘状态
            return flushOKFuture;
        }

    }
}

GroupCommitService处理刷盘

GroupCommitService是CommitLog的内部类,从继承关系中可知它实现了Runnable接口,在run方法调用waitForRunning等待刷盘请求的提交,然后处理刷盘,不过这个线程是在什么时候启动的呢?

public class CommitLog {
    /**
     * GroupCommit Service
     */
    class GroupCommitService extends FlushCommitLogService {
        // ...
        // run方法
        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            while (!this.isStopped()) {
                try {
                    // 等待刷盘请求的到来
                    this.waitForRunning(10);
                    // 处理刷盘
                    this.doCommit();
                } catch (Exception e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }
            // ...
        }
    }
}

刷盘线程的启动

在BrokerController的启动方法中,可以看到调用了messageStore的start方法,前面可知使用的是DefaultMessageStore,进入到DefaultMessageStore的start方法,它又调用了commitLog的start方法,在CommitLog的start方法中,启动了刷盘的线程和监控刷盘的线程:

public class BrokerController {
    public void start() throws Exception {
        if (this.messageStore != null) {
            // 启动
            this.messageStore.start();
        }
        // ...
    }
}

public class DefaultMessageStore implements MessageStore {
   /**
     * @throws Exception
     */
    public void start() throws Exception {
        // ...
        this.flushConsumeQueueService.start();
        // 调用CommitLog的启动方法
        this.commitLog.start();
        this.storeStatsService.start();
        // ...
    }
}

public class CommitLog {
    private final FlushCommitLogService flushCommitLogService; // 刷盘
    private final FlushDiskWatcher flushDiskWatcher; // 监控刷盘
    private final FlushCommitLogService commitLogService; // commitLogService
    public void start() {
        // 启动刷盘的线程
        this.flushCommitLogService.start();
        flushDiskWatcher.setDaemon(true);
        // 启动监控刷盘的线程
        flushDiskWatcher.start();
        if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            this.commitLogService.start();
        }
    }
}

刷盘请求的处理

既然知道了线程在何时启动的,接下来详细看一下GroupCommitService是如何处理刷盘提交请求的。

前面知道在GroupCommitService的run方法中,调用了waitForRunning方法等待刷盘请求,waitForRunning在GroupCommitService父类ServiceThread中实现。ServiceThread是一个抽象类,实现了Runnable接口,里面使用了CountDownLatch进行线程间的通信,大小设为1。

waitForRunning方法在进入的时候先判断hasNotified是否为true(已通知),并尝试将其更新为false(未通知),由于hasNotified的初始化值为false,所以首次进入的时候条件不成立,不会进入到这个处理逻辑,会继续执行后面的代码。

接着调用 waitPoint的reset方法将其重置为1,并调用waitPoint的await方法进行等待:

// ServiceThread
public abstract class ServiceThread implements Runnable {
    // 是否通知,初始化为false
    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
  
    // CountDownLatch用于线程间的通信
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
  
    // 等待运行
    protected void waitForRunning(long interval) {
        // 判断hasNotified是否为true,并尝试将其更新为false
        if (hasNotified.compareAndSet(true, false)) {
            // 调用onWaitEnd
            this.onWaitEnd();
            return;
        }

        // 重置waitPoint的值,也就是值为1
        waitPoint.reset();
        try {
            // 会一直等待waitPoint值降为0
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            // 是否被通知设置为false
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }
}

一、添加刷盘请求,唤醒刷盘线程

上面可知需要刷盘的时候调用了GroupCommitService的putRequest方法添加刷盘请求,在putRequest方法中,将刷盘请求GroupCommitRequest添加到了requestsWrite组提交写请求链表中,然后调用wakeup方法唤醒刷盘线程,wakeup方法在它的父类ServiceThread中实现。

在wakeup方法中可以看到,首先将hasNotified更改为了true表示处于已通知状态,然后调用了countDown方法,此时waitPoint值变成0,就会唤醒之前waitForRunning方法中一直在等待的线程。

public class CommitLog {
    /**
     * 组提交Service
     */
    class GroupCommitService extends FlushCommitLogService {
        // 组提交写请求链表
        private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
        
        // ...
      
        // 添加提交请求
        public synchronized void putRequest(final GroupCommitRequest request) {
            // 加锁
            lock.lock();
            try {
                // 加入到写请求链表
                this.requestsWrite.add(request);
            } finally {
                lock.unlock();
            }
            // 唤醒线程执行提交任务
            this.wakeup();
        }   
        // ...
    }
  
}

// ServiceThread
public abstract class ServiceThread implements Runnable {

    // CountDownLatch用于线程间的通信
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
  
    // 唤醒刷盘线程
    public void wakeup() {
        // 更改状态为已通知状态
        if (hasNotified.compareAndSet(false, true)) {
            // waitPoint的值减1,由于大小设置为1,减1之后变为0,会唤醒等待的线程
            waitPoint.countDown(); 
        }
    }
  
    // ...
}

二、线程被唤醒,执行刷盘前的操作

waitForRunning方法中的await方法一直在等待countdown的值变为0,当上一步调用了wakeup后,就会唤醒该线程,然后开始往下执行,在finally中可以看到将是否被通知hasNotified又设置为了false,然后调用了onWaitEnd方法,GroupCommitService方法中重写了该方法,里面又调用了swapRequests方法将读写请求列表的数据进行了交换,putRequest方法中将提交的刷盘请求放在了写链表中,经过交换,数据会被放在读链表中,后续进行刷盘时会从读链表中获取请求进行处理

// ServiceThread
public abstract class ServiceThread implements Runnable {
    // CountDownLatch
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
    
    // 等待运行
    protected void waitForRunning(long interval) {
        if (hasNotified.compareAndSet(true, false)) {
            // 交换
            this.onWaitEnd();
            return;
        }

        // 重置
        waitPoint.reset();
        try {
            // 会一直等待countdown为0
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        } finally {
            // 是否被通知设置为false
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }
}

public class CommitLog {
    /**
     * 组提交Service
     */
    class GroupCommitService extends FlushCommitLogService {
        // 组提交写请求链表
        private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
        // 组提交读请求链表
        private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
        
        @Override
        protected void onWaitEnd() {
            // 交换读写请求列表的数据请求
            this.swapRequests();
        }

        private void swapRequests() {
            // 加锁
            lock.lock();
            try {
                // 将读写请求链表的数据进行交换
                LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
                this.requestsWrite = this.requestsRead;
                this.requestsRead = tmp;
            } finally {
                lock.unlock();
            }
        }
        // ...
    }
}
折叠 

这里使用读写链表进行交换应该是为了提升性能,如果只使用一个链表,在提交请求的时候需要往链表中添加请求,此时需要加锁,而刷盘线程在处理完请求之后是需要从链表中移除请求的,假设添加请求时加的锁还未释放,刷盘线程就要一直等待,而添加和处理完全可以同时进行,所以使用了两个链表,在添加请求的时候使用写链表,处理请求的时候对读写链表的数据进行交换使用读链表,这样只需在交换数据的时候加锁,以此来提升性能。

三、执行刷盘

waitForRunning执行完毕后,会回到GroupCommitService中的run方法开始继续往后执行代码,从代码中可以看到接下来会调用doCommit方法执行刷盘。

doCommit方法中对读链表中的数据进行了判空,如果不为空,进行遍历处理每一个提交请求,处理逻辑如下:

  1. 获取CommitLog映射文件记录的刷盘位置偏移量flushedWhere,判断是否大于请求设定的刷盘位置偏移量nextOffset,正常情况下flush的位置应该小于本次刷入数据后的偏移量,所以如果flush位置大于等于本次请求设置的flush偏移量,本次将不能进行刷盘
「RocketMQ」消息的刷盘机制

 

  1. 开启一个循环,调用mappedFileQueue的flush方法执行刷盘(具体的实现在异步刷盘的时候再看),由于CommitLog大小为1G,所以本次刷完之后,如果当前已经刷入的偏移量小于请求设定的位置,表示数据未刷完,需要继续刷,反之表示数据已经刷完,flushOK为true,for循环条件不满足结束执行。
  2. 请求处理之后会清空读链表。
public class CommitLog {
    /**
     * 组提交Service
     */
    class GroupCommitService extends FlushCommitLogService {  
        // 运行
        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            // 如果没有停止
            while (!this.isStopped()) {
                try {
                    // 等待唤醒刷盘线程
                    this.waitForRunning(10);
                    // 进行提交
                    this.doCommit();
                } catch (Exception e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            // 睡眠10毫秒
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                CommitLog.log.warn(this.getServiceName() + " Exception, ", e);
            }

            synchronized (this) {
                this.swapRequests();
            }
            // 停止之前提交一次
            this.doCommit();

            CommitLog.log.info(this.getServiceName() + " service end");
        }
      
        // 提交刷盘
        private void doCommit() {
            // 如果不为空
            if (!this.requestsRead.isEmpty()) {
                // 遍历刷盘请求
                for (GroupCommitRequest req : this.requestsRead) {
                    // 获取映射文件的flush位置,判断是否大于请求设定的刷盘位置
                    boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                    for (int i = 0; i < 2 && !flushOK; i++) {
                        // 进行刷盘
                        CommitLog.this.mappedFileQueue.flush(0);
                        // 由于CommitLog大小为1G,所以本次刷完之后,如果当前已经刷入的偏移量小于请求设定的位置,表示数据未刷完,需要继续刷,反之表示数据已经刷完,flushOK为true,for循环条件不满足结束执行
                        flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                    }
                    // 设置刷盘结果
                    req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }

                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                // 请求处理完之后清空链表
                this.requestsRead = new LinkedList<>();
            } else {
                // Because of individual messages is set to not sync flush, it
                // will come to this process
                CommitLog.this.mappedFileQueue.flush(0);
            }
        }
    }
  
}
折叠 

刷盘超时监控

FlushDiskWatcher用于监控刷盘请求的耗时,它也继承了ServiceThread,在Broker启动时开启了该线程,在run方法中,使用while循环,只要服务未停止,会一直从阻塞队列中获取提交的刷盘请求,开启while循环隔一段时间判断一下刷盘是否完成,如果未完成,会做如下判断:

  1. 使用当前时间减去请求设置的刷盘截止时间,如果已经超过截止时间,说明刷盘时间已经超时,调用wakeupCustomer方法设置刷盘结果为已超时
  2. 如果未超时,为了避免当前线程频繁的进行判断,将当前线程睡眠一会儿,睡眠的计算方式是使用刷盘请求设置的截止时间 - 当前时间,表示剩余的时间,然后除以1000000化为毫秒,得到距离刷盘截止时间的毫秒数sleepTime:sleepTime如果为0,只能是当前时间等于截止时间,也就是到了截止时间,此时同样调用wakeupCustomer方法设置刷盘结果为已超时sleepTime不为0,在10毫秒和sleepTime的值之间取较小的那个作为睡眠的毫秒数将当前线程睡眠,等待刷盘任务执行
public class FlushDiskWatcher extends ServiceThread {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // 阻塞队列,存放提交请求
    private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();

    @Override
    public String getServiceName() {
        return FlushDiskWatcher.class.getSimpleName();
    }

    @Override
    public void run() {
        // 如果未停止
        while (!isStopped()) {
            GroupCommitRequest request = null;
            try {
                // 从阻塞队列中获取提交请求
                request = commitRequests.take();
            } catch (InterruptedException e) {
                log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");
                continue;
            }
            // 如果还未完成
            while (!request.future().isDone()) {
                long now = System.nanoTime();
                // 如果已经超时
                if (now - request.getDeadLine() >= 0) {
                    // 设置刷盘结果为超时
                    request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    break;
                }
                // 避免频繁的判断,使用(截止时间 - 当前时间)/1000000 计算一个毫秒数
                long sleepTime = (request.getDeadLine() - now) / 1_000_000;
                // 在计算的毫秒数与10之间取最小的
                sleepTime = Math.min(10, sleepTime);
                // 如果sleepTime为0表示已经到了截止时间
                if (sleepTime == 0) {
                    // 设置刷盘结果为超时
                    request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                    break;
                }
                try {
                    // 睡眠等待刷盘任务的执行
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    log.warn(
                            "An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");
                    break;
                }
            }
        }
    }
}
折叠 

异步刷盘

上面讲解了同步刷盘,接下来去看下异步刷盘,首先会判断是否使用了暂存池,如果未开启调用flushCommitLogService的wakeup唤醒刷盘线程,否则使用commitLogService先将数据写入到FileChannel,然后统一进行刷盘:

 public class CommitLog {
    private final FlushDiskWatcher flushDiskWatcher;
    public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
        // 是否是同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            // ...
        }
        // 如果是异步刷盘
        else {
            // 如果未使用暂存池
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                // 唤醒刷盘线程进行刷盘
                flushCommitLogService.wakeup();
            } else  {
                // 如果使用暂存池,使用commitLogService,先将数据写入到FILECHANNEL,然后统一进行刷盘
                commitLogService.wakeup();
            }
            // 返回结果
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
}

在CommitLog的构造函数中可以看到,commitLogService使用的是CommitRealTimeService进行实例化的,flushCommitLogService需要根据设置决定使用哪种类型进行实例化:

  • 如果是同步刷盘,使用GroupCommitService,由前面的同步刷盘可知,使用的就是GroupCommitService进行刷盘的。
  • 如果是异步刷盘,使用FlushRealTimeService。

所以接下来需要关注CommitRealTimeService和FlushRealTimeService:

「RocketMQ」消息的刷盘机制

 

public class CommitLog {    
    private final FlushCommitLogService flushCommitLogService;

    // 刷盘Service
    private final FlushCommitLogService commitLogService;

    public CommitLog(final DefaultMessageStore defaultMessageStore) {
        // 如果设置的同步刷盘
        if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            // 使用GroupCommitService
            this.flushCommitLogService = new GroupCommitService();
        } else {
            // 使用FlushRealTimeService
            this.flushCommitLogService = new FlushRealTimeService();
        }
        // commitLogService
        this.commitLogService = new CommitRealTimeService();
    }
}

CommitRealTimeService

「RocketMQ」消息的刷盘机制

 

在开启暂存池时,会使用CommitRealTimeService,它继承了FlushCommitLogService,所以会实现run方法,处理逻辑如下:

  1. 从配置信息中获取提交间隔每次提交的最少页数两次提交的最大间隔时间
  2. 如果当前时间大于上次提交时间+两次提交的最大间隔时间,意味着已经有比较长的一段时间没有进行提交了,需要尽快刷盘,此时将每次提交的最少页数设置为0不限制提交页数
  3. 调用mappedFileQueue的commit方法进行提交,并返回提交的结果:如果结果为true表示未提交任何数据如果结果为false表示进行了数据提交,需要等待刷盘
  4. 判断提交返回结果是否返回false,如果是调用flushCommitLogService的wakeup方法唤醒刷盘线程,进行刷盘
  5. 调用waitForRunning等待下一次提交处理
class CommitRealTimeService extends FlushCommitLogService {
        // 上次提交时间戳
        private long lastCommitTimestamp = 0;

        @Override
        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            // 如果未停止
            while (!this.isStopped()) {
                // 获取提交间隔
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
                // 一次提交的最少页数
                int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
                // 两次提交的最大间隔时间
                int commitDataThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
                // 开始时间
                long begin = System.currentTimeMillis();
                // 如果当前时间大于上次提交时间+提交的最大间隔时间
                if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                    this.lastCommitTimestamp = begin; // 提交时间
                    commitDataLeastPages = 0;// 最少提交页数设为0,表示不限制提交页数
                }

                try {
                    // 提交
                    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                    // 提交结束时间
                    long end = System.currentTimeMillis();
                    // 如果返回false表示提交了一部分数据但是还未进行刷盘
                    if (!result) {
                        // 再次更新提交时间戳
                        this.lastCommitTimestamp = end;
                        // 唤醒flush线程进行刷盘
                        flushCommitLogService.wakeup();
                    }

                    if (end - begin > 500) {
                        log.info("Commit data to file costs {} ms", end - begin);
                    }
                    // 等待下一次提交
                    this.waitForRunning(interval);
                } catch (Throwable e) {
                    CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
                }
            }

            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                result = CommitLog.this.mappedFileQueue.commit(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            CommitLog.log.info(this.getServiceName() + " service end");
        }
    }
折叠 

提交

提交的方法在MappedFileQueue的commit方法中实现,处理逻辑如下:

  1. 根据记录的CommitLog文件提交位置的偏移量获取映射文件,如果获取不为空,调用MappedFile的commit方法进行提交,然后返回本次提交数据的偏移量
  2. 记录本次提交的偏移量:文件的偏移量 + 提交数据的偏移量
  3. 判断本次提交的偏移量是否等于上一次的提交偏移量,如果等于表示本次未提交任何数据,返回结果置为true,否则表示提交了数据,等待刷盘,返回结果为false
  4. 更新上一次提交偏移量committedWhere的值为本次的提交偏移量的值
public class MappedFileQueue {
    protected long flushedWhere = 0; // flush的位置偏移量
    private long committedWhere = 0; // 提交的位置偏移量
 
    public boolean commit(final int commitLeastPages) {
        boolean result = true;
        // 根据提交位置的偏移量获取映射文件
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
        if (mappedFile != null) {
            // 调用mappedFile的commit方法进行提交,返回提交数据的偏移量
            int offset = mappedFile.commit(commitLeastPages);
            // 记录本次提交的偏移量:文件的偏移量 + 提交数据的偏移量
            long where = mappedFile.getFileFromOffset() + offset;
            // 设置返回结果,如果本次提交偏移量等于上一次的提交偏移量为true,表示什么也没干,否则表示提交了数据,等待刷盘
            result = where == this.committedWhere;
            // 更新上一次提交偏移量的值为本次的
            this.committedWhere = where;
        }
        return result;
    }
}

MappedFile

MappedFile中记录CommitLog的写入位置wrotePosition、提交位置committedPosition以及flush位置flushedPosition,在commit方法中,调用了isAbleToCommit判断是否可以提交数据,判断的流程如下:

  1. 获取提交数据的位置偏移量和写入数据的位置偏移量
  2. 如果最少提交页数大于0,计算本次写入的页数是否大于或等于最少提交页数
  3. 本次写入数据的页数计算方法:写入位置/页大小 - flush位置/页大小
  4. 如果以上条件都满足,判断写入位置是否大于flush位置,如果大于表示有一部数据未flush可以进行提交

满足提交条件后,就会调用commit0方法提交数据,将数据写入到fileChannel中:

public class MappedFile extends ReferenceResource {
    // 数据写入位置
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    // 数据提交位置
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // 数据flush位置
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
  
    // 提交数据
    public int commit(final int commitLeastPages) {
        // 如果writeBuffer为空
        if (writeBuffer == null) {
            // 不需要提交任何数据到,返回之前记录的写入位置
            return this.wrotePosition.get();
        }
        // 如果可以提交数据
        if (this.isAbleToCommit(commitLeastPages)) {
            if (this.hold()) {
                // 提交数据
                commit0();
                this.release();
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }

        // All dirty data has been committed to FileChannel.
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this.writeBuffer = null;
        }
        // 返回提交位置
        return this.committedPosition.get();
    }

    // 是否可以提交数据
    protected boolean isAbleToCommit(final int commitLeastPages) {
        // 获取提交数据的位置偏移量
        int flush = this.committedPosition.get();
        // 获取写入数据的位置偏移量
        int write = this.wrotePosition.get();

        if (this.isFull()) {
            return true;
        }
        // 如果最少提交页数大于0
        if (commitLeastPages > 0) {
            // 写入位置/页大小 - flush位置/页大小 是否大于至少提交的页数
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
        }
        // 判断是否需要flush数据
        return write > flush;
    }
  
    protected void commit0() {
        // 获取写入位置
        int writePos = this.wrotePosition.get();
        // 获取上次提交的位置
        int lastCommittedPosition = this.committedPosition.get();

        if (writePos - lastCommittedPosition > 0) {
            try {
                // 创建共享缓冲区
                ByteBuffer byteBuffer = writeBuffer.slice();
                // 设置上一次提交位置
                byteBuffer.position(lastCommittedPosition);
                byteBuffer.limit(writePos);
                this.fileChannel.position(lastCommittedPosition);
                // 数据写入fileChannel
                this.fileChannel.write(byteBuffer);
                // 更新写入的位置
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
    }
}
折叠 

FlushRealTimeService

如果未开启暂存池,会直接使用FlushRealTimeService进行刷盘,当然如果开启暂存池,写入一批数据后,同样会使用FlushRealTimeService进行刷盘,FlushRealTimeService同样继承了FlushCommitLogService,是用于执行刷盘的线程,处理逻辑与提交刷盘数据逻辑相似,只不过不是提交数据,而是调用flush方法将提交的数据刷入磁盘:

  1. 从配置信息中获取flush间隔每次flush的最少页数两次flush的最大间隔时间
  2. 如果当前时间大于上次flush时间+两次flush的最大间隔时间,意味着已经有比较长的一段时间没有进行flush,此时将每次flush的最少页数设置为0不限制flush页数
  3. 调用waitForRunning等待被唤醒
  4. 如果被唤醒,调用mappedFileQueue的flush方法进行刷盘
class FlushRealTimeService extends FlushCommitLogService {
        private long lastFlushTimestamp = 0; // 上一次flush的时间
        private long printTimes = 0;

        public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");
            // 如果未停止
            while (!this.isStopped()) {
                // 
                boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
                // 获取flush间隔
                int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
                // flush至少包含的页数
                int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
                // 两次flush的时间间隔
                int flushPhysicQueueThoroughInterval =
                    CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

                boolean printFlushProgress = false;
                long currentTimeMillis = System.currentTimeMillis();
                // 如果当前毫秒数 大于上次flush时间 + 两次flush之间的间隔
                if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                    this.lastFlushTimestamp = currentTimeMillis; // 更新flush时间
                    flushPhysicQueueLeastPages = 0; // flush至少包含的页数置为0
                    printFlushProgress = (printTimes++ % 10) == 0;
                }

                try {
                    // 
                    if (flushCommitLogTimed) {
                        // 睡眠
                        Thread.sleep(interval);
                    } else {
                        // 等待flush被唤醒
                        this.waitForRunning(interval);
                    }
                    if (printFlushProgress) {
                        // 打印刷盘进程
                        this.printFlushProgress();
                    }

                    long begin = System.currentTimeMillis();
                    // 进行刷盘
                    CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }
                    long past = System.currentTimeMillis() - begin;
                    if (past > 500) {
                        log.info("Flush data to disk costs {} ms", past);
                    }
                } catch (Throwable e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                    this.printFlushProgress();
                }
            }

            // 如果服务停止,确保数据被刷盘
            boolean result = false;
            for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                // 进行刷盘
                result = CommitLog.this.mappedFileQueue.flush(0);
                CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
            }
            this.printFlushProgress();
            CommitLog.log.info(this.getServiceName() + " service end");
        }

折叠 

刷盘

刷盘的方法在MappedFileQueue的flush方法中实现,处理逻辑如下:

  1. 根据 flush的位置偏移量获取映射文件
  2. 调用mappedFile的flush方法进行刷盘,并返回刷盘后的位置偏移量
  3. 计算最新的flush偏移量
  4. 更新flushedWhere的值为最新的flush偏移量
public class MappedFileQueue {
    protected long flushedWhere = 0; // flush的位置偏移量
    private long committedWhere = 0; // 提交的位置偏移量
  
    // flush刷盘
    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        // 获取flush的位置偏移量映射文件
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            // 获取时间戳
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            // 调用MappedFile的flush方法进行刷盘,返回刷盘后的偏移量
            int offset = mappedFile.flush(flushLeastPages);
            // 计算最新的flush偏移量
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            // 更新flush偏移量
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
        // 返回flush的偏移量
        return result;
    }
}

flush的逻辑也与commit方法的逻辑类似:

  1. 调用isAbleToFlush判断是否满足刷盘条件,获取上次flush位置偏移量和当前写入位置偏移量进行如下校验:
  2. 文件是否已写满,即文件大小是否与写入数据位置相等,如果相等说明文件已经写满需要执行刷盘,满足刷盘条件
  3. 如果最少flush页数大于0,计算本次flush的页数是否大于或等于最少flush页数,如果满足可以进行刷盘
  4. 本次flush数据的页数计算方法:写入位置/页大小 - flush位置/页大小
  5. 如果写入位置偏移量是否大于flush位置偏移量,如果大于表示有数据未进行刷盘,满足刷盘条件
  6. 调用fileChannel的force或者mappedByteBuffer的force方法进行刷盘
  7. 记录本次flush的位置,并作为结果返回
public class MappedFile extends ReferenceResource {
  
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
   
    /**
     * 进行刷盘并返回flush后的偏移量
     */
    public int flush(final int flushLeastPages) {
        // 是否可以刷盘
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();
                try {
                    // 如果writeBuffer不为空
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        // 将数据刷到硬盘
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }
                // 记录flush位置
                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        // 返回flush位置
        return this.getFlushedPosition();
    }
    
    // 是否可以刷盘
    private boolean isAbleToFlush(final int flushLeastPages) {
        // 获取上次flush位置
        int flush = this.flushedPosition.get();
        // 写入位置偏移量
        int write = getReadPosition();
        if (this.isFull()) {
            return true;
        }
        // 如果flush的页数大于0,校验本次flush的页数是否满足条件
        if (flushLeastPages > 0) {
            // 本次flush的页数:写入位置偏移量/OS_PAGE_SIZE - 上次flush位置偏移量/OS_PAGE_SIZE,是否大于flushLeastPages
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
        } 
        // 写入位置偏移量是否大于flush位置偏移量
        return write > flush;
    }
    
    // 文件是否已写满
    public boolean isFull() {
        // 文件大小是否与写入数据位置相等
        return this.fileSize == this.wrotePosition.get();
    }
  
    /**
     * 返回当前有效数据的位置
     */
    public int getReadPosition() {
        // 如果writeBuffer为空使用写入位置,否则使用提交位置
        return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
    }
}
折叠 
「RocketMQ」消息的刷盘机制

 



Tags:RocketMQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
大白话设计RocketMQ延迟消息
延迟消息一般用于:提前发送消息,延迟一段时间后才需要被处理的场景。比如:下单半小时后还未支付,则取消订单 释放库存 等。RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间...【详细内容】
2023-12-27  Search: RocketMQ  点击:(110)  评论:(0)  加入收藏
九个问答牢记RocketMQ架构
RocketMQ是Java兄弟们常用的消息中间件,虽说常用,但对于RocketMQ架构经常忘记。究其原因就2点:忙于业务开发然后长时间不看则忘了、不理解架构设计的根本原因记不牢。本文用大...【详细内容】
2023-12-27  Search: RocketMQ  点击:(114)  评论:(0)  加入收藏
如何应对 RocketMQ 消息堆积
这篇文章,我们聊聊如何应对 RocketMQ 消息堆积。图片1 基础概念消费者在消费的过程中,消费的速度跟不上服务端的发送速度,未处理的消息会越来越多,消息出现堆积进而会造成消息消...【详细内容】
2023-12-21  Search: RocketMQ  点击:(73)  评论:(0)  加入收藏
解锁RocketMQ秘籍:如何保障消息顺序性?
嗨,小伙伴们!小米在这里啦!今天我们要聊的话题是社招面试中一个经典而又百思不得其解的问题&mdash;&mdash;“RocketMQ如何保证顺序性?”不用担心,小米来给你揭秘RocketMQ的秘密武...【详细内容】
2023-12-15  Search: RocketMQ  点击:(101)  评论:(0)  加入收藏
Apache RocketMQ 5.0腾讯云落地实践
Apache RocketMQ 发展历程回顾RocketMQ 最早诞生于淘宝的在线电商交易场景,经过了历年双十一大促流量洪峰的打磨,2016年捐献给 Apache 社区,成为 Apache 社区的顶级项目,并在国...【详细内容】
2023-12-13  Search: RocketMQ  点击:(147)  评论:(0)  加入收藏
聊聊 RocketMQ 5.0 的 POP 消费模式!
大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式。不过,RocketMQ 的 PUSH 模式有明显的不足,主要体现在以下...【详细内容】
2023-05-16  Search: RocketMQ  点击:(305)  评论:(0)  加入收藏
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因
在众多关于MQ的面试八股文中有这么一道题,“如何保证MQ消息消费的幂等性”。为什么需要保证幂等性呢?是因为消息会重复消费。为什么消息会重复消费?明明已经消费了,为什么消息会...【详细内容】
2023-04-13  Search: RocketMQ  点击:(240)  评论:(0)  加入收藏
SpringBoot整合RocketMQ,老鸟们都是这么玩的!
今天我们来讨论如何在项目开发中优雅地使用RocketMQ。本文分为三部分,第一部分实现SpringBoot与RocketMQ的整合,第二部分解决在使用RocketMQ过程中可能遇到的一些问题并解决...【详细内容】
2023-04-12  Search: RocketMQ  点击:(431)  评论:(0)  加入收藏
SpringBoot 与RabbitMQ、RocketMQ高可靠、高性能、分布式应用实践
Spring Boot 是一个基于 Spring 框架的快速开发框架,而 RabbitMQ 和 RocketMQ 则是常用的消息队列中间件。下面是它们常用的一些用法和场景。 订单处理在电商等系统中,下单后...【详细内容】
2023-03-09  Search: RocketMQ  点击:(206)  评论:(0)  加入收藏
通过源码分析RocketMQ主从复制原理
作者:京东物流 宫丙来一、主从复制概述 RocketMQ Broker的主从复制主要包括两部分内容:CommitLog的消息复制和Broker元数据的复制。 CommitLog的消息复制是发生在消息写入时,当...【详细内容】
2023-03-02  Search: RocketMQ  点击:(70)  评论:(0)  加入收藏
▌简易百科推荐
Meta如何将缓存一致性提高到99.99999999%
介绍缓存是一种强大的技术,广泛应用于计算机系统的各个方面,从硬件缓存到操作系统、网络浏览器,尤其是后端开发。对于Meta这样的公司来说,缓存尤为重要,因为它有助于减少延迟、扩...【详细内容】
2024-04-15    dbaplus社群  Tags:Meta   点击:(3)  评论:(0)  加入收藏
SELECT COUNT(*) 会造成全表扫描?回去等通知吧
前言SELECT COUNT(*)会不会导致全表扫描引起慢查询呢?SELECT COUNT(*) FROM SomeTable网上有一种说法,针对无 where_clause 的 COUNT(*),MySQL 是有优化的,优化器会选择成本最小...【详细内容】
2024-04-11  dbaplus社群    Tags:SELECT   点击:(3)  评论:(0)  加入收藏
10年架构师感悟:从问题出发,而非技术
这些感悟并非来自于具体的技术实现,而是关于我在架构设计和实施过程中所体会到的一些软性经验和领悟。我希望通过这些分享,能够激发大家对于架构设计和技术实践的思考,帮助大家...【详细内容】
2024-04-11  dbaplus社群    Tags:架构师   点击:(2)  评论:(0)  加入收藏
Netflix 是如何管理 2.38 亿会员的
作者 | Surabhi Diwan译者 | 明知山策划 | TinaNetflix 高级软件工程师 Surabhi Diwan 在 2023 年旧金山 QCon 大会上发表了题为管理 Netflix 的 2.38 亿会员 的演讲。她在...【详细内容】
2024-04-08    InfoQ  Tags:Netflix   点击:(5)  评论:(0)  加入收藏
即将过时的 5 种软件开发技能!
作者 | Eran Yahav编译 | 言征出品 | 51CTO技术栈(微信号:blog51cto) 时至今日,AI编码工具已经进化到足够强大了吗?这未必好回答,但从2023 年 Stack Overflow 上的调查数据来看,44%...【详细内容】
2024-04-03    51CTO  Tags:软件开发   点击:(9)  评论:(0)  加入收藏
跳转链接代码怎么写?
在网页开发中,跳转链接是一项常见的功能。然而,对于非技术人员来说,编写跳转链接代码可能会显得有些困难。不用担心!我们可以借助外链平台来简化操作,即使没有编程经验,也能轻松实...【详细内容】
2024-03-27  蓝色天纪    Tags:跳转链接   点击:(16)  评论:(0)  加入收藏
中台亡了,问题到底出在哪里?
曾几何时,中台一度被当做“变革灵药”,嫁接在“前台作战单元”和“后台资源部门”之间,实现企业各业务线的“打通”和全域业务能力集成,提高开发和服务效率。但在中台如火如荼之...【详细内容】
2024-03-27  dbaplus社群    Tags:中台   点击:(14)  评论:(0)  加入收藏
员工写了个比删库更可怕的Bug!
想必大家都听说过删库跑路吧,我之前一直把它当一个段子来看。可万万没想到,就在昨天,我们公司的某位员工,竟然写了一个比删库更可怕的 Bug!给大家分享一下(不是公开处刑),希望朋友们...【详细内容】
2024-03-26  dbaplus社群    Tags:Bug   点击:(9)  评论:(0)  加入收藏
我们一起聊聊什么是正向代理和反向代理
从字面意思上看,代理就是代替处理的意思,一个对象有能力代替另一个对象处理某一件事。代理,这个词在我们的日常生活中也不陌生,比如在购物、旅游等场景中,我们经常会委托别人代替...【详细内容】
2024-03-26  萤火架构  微信公众号  Tags:正向代理   点击:(15)  评论:(0)  加入收藏
看一遍就理解:IO模型详解
前言大家好,我是程序员田螺。今天我们一起来学习IO模型。在本文开始前呢,先问问大家几个问题哈~什么是IO呢?什么是阻塞非阻塞IO?什么是同步异步IO?什么是IO多路复用?select/epoll...【详细内容】
2024-03-26  捡田螺的小男孩  微信公众号  Tags:IO模型   点击:(10)  评论:(0)  加入收藏
站内最新
站内热门
站内头条