pos機(jī)獲取信息,「RocketMQ」消息的刷盤(pán)機(jī)制

 新聞資訊  |   2023-04-24 09:47  |  投稿人:pos機(jī)之家

網(wǎng)上有很多關(guān)于pos機(jī)獲取信息,「RocketMQ」消息的刷盤(pán)機(jī)制的知識(shí),也有很多人為大家解答關(guān)于pos機(jī)獲取信息的問(wèn)題,今天pos機(jī)之家(m.afbey.com)為大家整理了關(guān)于這方面的知識(shí),讓我們一起來(lái)看下吧!

本文目錄一覽:

1、pos機(jī)獲取信息

pos機(jī)獲取信息

刷盤(pán)策略

CommitLog的asyncPutMessage方法中可以看到在寫(xiě)入消息之后,調(diào)用了submitFlushrequest方法執(zhí)行刷盤(pán)策略:

public class CommitLog { public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { // ... try { // 獲取上一次寫(xiě)入的文件 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // ... // 寫(xiě)入消息 result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); // ... } finally { beginTimeInLock = 0; putMessagelock.unlock(); } // ... // 執(zhí)行刷盤(pán) CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg); // ... }}

刷盤(pán)有兩種策略:

同步刷盤(pán),表示消息寫(xiě)入到內(nèi)存之后需要立刻刷到磁盤(pán)文件中。同步刷盤(pán)會(huì)構(gòu)建GroupCommitRequest組提交請(qǐng)求并設(shè)置本次刷盤(pán)后的位置偏移量的值(寫(xiě)入位置偏移量+寫(xiě)入數(shù)據(jù)字節(jié)數(shù)),然后將請(qǐng)求添加到flushDiskWatcher和GroupCommitService中進(jìn)行刷盤(pán)。異步刷盤(pán),表示消息寫(xiě)入內(nèi)存成功之后就返回,由MQ定時(shí)將數(shù)據(jù)刷入到磁盤(pán)中,會(huì)有一定的數(shù)據(jù)丟失風(fēng)險(xiǎn)。

public class CommitLog { // 監(jiān)控刷盤(pán) private final FlushDiskWatcher flushDiskWatcher; public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // 是否是同步刷盤(pán) if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 獲取GroupCommitService final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; // 是否等待 if (messageExt.isWaitStoreMsgOK()) { // 構(gòu)建組提交請(qǐng)求,傳入本次刷盤(pán)后位置的偏移量:寫(xiě)入位置偏移量+寫(xiě)入數(shù)據(jù)字節(jié)數(shù) GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 添加到wather中 flushDiskWatcher.add(request); // 添加到service service.putRequest(request); // 返回 return request.future(); } else { service.wakeup(); return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } } // 如果是異步刷盤(pán) else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }}同步刷盤(pán)

如果使用的是同步刷盤(pán),首先獲取了GroupCommitService,然后構(gòu)建GroupCommitRequest組提交請(qǐng)求,將請(qǐng)求添加到flushDiskWatcher和GroupCommitService中,其中flushDiskWatcher用于監(jiān)控刷盤(pán)是否超時(shí),GroupCommitService用于提交刷盤(pán)數(shù)據(jù)。

構(gòu)建GroupCommitRequest提交請(qǐng)求

GroupCommitRequest是CommitLog的內(nèi)部類(lèi):

nextOffset:寫(xiě)入位置偏移量+寫(xiě)入數(shù)據(jù)字節(jié)數(shù),也就是本次刷盤(pán)成功后應(yīng)該對(duì)應(yīng)的flush偏移量flushOKFuture:刷盤(pán)結(jié)果deadLine:刷盤(pán)的限定時(shí)間,值為當(dāng)前時(shí)間 + 傳入的超時(shí)時(shí)間,超過(guò)限定時(shí)間還未刷盤(pán)完畢會(huì)被認(rèn)為超時(shí)

public class CommitLog { public static class GroupCommitRequest { private final long nextOffset; // 刷盤(pán)狀態(tài) private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>(); private final long deadLine;// 刷盤(pán)的限定時(shí)間,超過(guò)限定時(shí)間還未刷盤(pán)完畢會(huì)被認(rèn)為超時(shí) public GroupCommitRequest(long nextOffset, long timeoutMillis) { this.nextOffset = nextOffset; // 設(shè)置限定時(shí)間:當(dāng)前時(shí)間 + 超時(shí)時(shí)間 this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000); } public void wakeupCustomer(final PutMessageStatus putMessageStatus) { // 結(jié)束刷盤(pán),設(shè)置刷盤(pán)狀態(tài) this.flushOKFuture.complete(putMessageStatus); } public CompletableFuture<PutMessageStatus> future() { // 返回刷盤(pán)狀態(tài) return flushOKFuture; } }}

GroupCommitService處理刷盤(pán)

GroupCommitService是CommitLog的內(nèi)部類(lèi),從繼承關(guān)系中可知它實(shí)現(xiàn)了Runnable接口,在run方法調(diào)用waitForRunning等待刷盤(pán)請(qǐng)求的提交,然后處理刷盤(pán),不過(guò)這個(gè)線(xiàn)程是在什么時(shí)候啟動(dòng)的呢?

public class CommitLog { /** * GroupCommit Service */ class GroupCommitService extends FlushCommitLogService { // ... // run方法 public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { // 等待刷盤(pán)請(qǐng)求的到來(lái) this.waitForRunning(10); // 處理刷盤(pán) this.doCommit(); } catch (exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // ... } }}刷盤(pán)線(xiàn)程的啟動(dòng)

在BrokerController的啟動(dòng)方法中,可以看到調(diào)用了messageStore的start方法,前面可知使用的是DefaultMessageStore,進(jìn)入到DefaultMessageStore的start方法,它又調(diào)用了commitLog的start方法,在CommitLog的start方法中,啟動(dòng)了刷盤(pán)的線(xiàn)程和監(jiān)控刷盤(pán)的線(xiàn)程:

public class BrokerController { public void start() throws Exception { if (this.messageStore != null) { // 啟動(dòng) this.messageStore.start(); } // ... }}public class DefaultMessageStore implements MessageStore { /** * @throws Exception */ public void start() throws Exception { // ... this.flushConsumeQueueService.start(); // 調(diào)用CommitLog的啟動(dòng)方法 this.commitLog.start(); this.storeStatsService.start(); // ... }}public class CommitLog { private final FlushCommitLogService flushCommitLogService; // 刷盤(pán) private final FlushDiskWatcher flushDiskWatcher; // 監(jiān)控刷盤(pán) private final FlushCommitLogService commitLogService; // commitLogService public void start() { // 啟動(dòng)刷盤(pán)的線(xiàn)程 this.flushCommitLogService.start(); flushDiskWatcher.setDaemon(true); // 啟動(dòng)監(jiān)控刷盤(pán)的線(xiàn)程 flushDiskWatcher.start(); if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { this.commitLogService.start(); } }}刷盤(pán)請(qǐng)求的處理

既然知道了線(xiàn)程在何時(shí)啟動(dòng)的,接下來(lái)詳細(xì)看一下GroupCommitService是如何處理刷盤(pán)提交請(qǐng)求的。

前面知道在GroupCommitService的run方法中,調(diào)用了waitForRunning方法等待刷盤(pán)請(qǐng)求,waitForRunning在GroupCommitService父類(lèi)ServiceThread中實(shí)現(xiàn)。ServiceThread是一個(gè)抽象類(lèi),實(shí)現(xiàn)了Runnable接口,里面使用了CountDownLatch進(jìn)行線(xiàn)程間的通信,大小設(shè)為1。

waitForRunning方法在進(jìn)入的時(shí)候先判斷hasNotified是否為true(已通知),并嘗試將其更新為false(未通知),由于hasNotified的初始化值為false,所以首次進(jìn)入的時(shí)候條件不成立,不會(huì)進(jìn)入到這個(gè)處理邏輯,會(huì)繼續(xù)執(zhí)行后面的代碼。

接著調(diào)用 waitPoint的reset方法將其重置為1,并調(diào)用waitPoint的await方法進(jìn)行等待:

// ServiceThreadpublic abstract class ServiceThread implements Runnable { // 是否通知,初始化為false protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false); // CountDownLatch用于線(xiàn)程間的通信 protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); // 等待運(yùn)行 protected void waitForRunning(long interval) { // 判斷hasNotified是否為true,并嘗試將其更新為false if (hasNotified.compareAndSet(true, false)) { // 調(diào)用onWaitEnd this.onWaitEnd(); return; } // 重置waitPoint的值,也就是值為1 waitPoint.reset(); try { // 會(huì)一直等待waitPoint值降為0 waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { // 是否被通知設(shè)置為false hasNotified.set(false); this.onWaitEnd(); } }}

一、添加刷盤(pán)請(qǐng)求,喚醒刷盤(pán)線(xiàn)程

上面可知需要刷盤(pán)的時(shí)候調(diào)用了GroupCommitService的putRequest方法添加刷盤(pán)請(qǐng)求,在putRequest方法中,將刷盤(pán)請(qǐng)求GroupCommitRequest添加到了requestsWrite組提交寫(xiě)請(qǐng)求鏈表中,然后調(diào)用wakeup方法喚醒刷盤(pán)線(xiàn)程,wakeup方法在它的父類(lèi)ServiceThread中實(shí)現(xiàn)。

在wakeup方法中可以看到,首先將hasNotified更改為了true表示處于已通知狀態(tài),然后調(diào)用了countDown方法,此時(shí)waitPoint值變成0,就會(huì)喚醒之前waitForRunning方法中一直在等待的線(xiàn)程。

public class CommitLog { /** * 組提交Service */ class GroupCommitService extends FlushCommitLogService { // 組提交寫(xiě)請(qǐng)求鏈表 private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>(); // ... // 添加提交請(qǐng)求 public synchronized void putRequest(final GroupCommitRequest request) { // 加鎖 lock.lock(); try { // 加入到寫(xiě)請(qǐng)求鏈表 this.requestsWrite.add(request); } finally { lock.unlock(); } // 喚醒線(xiàn)程執(zhí)行提交任務(wù) this.wakeup(); } // ... } }// ServiceThreadpublic abstract class ServiceThread implements Runnable { // CountDownLatch用于線(xiàn)程間的通信 protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); // 喚醒刷盤(pán)線(xiàn)程 public void wakeup() { // 更改狀態(tài)為已通知狀態(tài) if (hasNotified.compareAndSet(false, true)) { // waitPoint的值減1,由于大小設(shè)置為1,減1之后變?yōu)?,會(huì)喚醒等待的線(xiàn)程 waitPoint.countDown(); } } // ...}

二、線(xiàn)程被喚醒,執(zhí)行刷盤(pán)前的操作

waitForRunning方法中的await方法一直在等待countdown的值變?yōu)?,當(dāng)上一步調(diào)用了wakeup后,就會(huì)喚醒該線(xiàn)程,然后開(kāi)始往下執(zhí)行,在finally中可以看到將是否被通知hasNotified又設(shè)置為了false,然后調(diào)用了onWaitEnd方法,GroupCommitService方法中重寫(xiě)了該方法,里面又調(diào)用了swapRequests方法將讀寫(xiě)請(qǐng)求列表的數(shù)據(jù)進(jìn)行了交換,putRequest方法中將提交的刷盤(pán)請(qǐng)求放在了寫(xiě)鏈表中,經(jīng)過(guò)交換,數(shù)據(jù)會(huì)被放在讀鏈表中,后續(xù)進(jìn)行刷盤(pán)時(shí)會(huì)從讀鏈表中獲取請(qǐng)求進(jìn)行處理

// ServiceThreadpublic abstract class ServiceThread implements Runnable { // CountDownLatch protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); // 等待運(yùn)行 protected void waitForRunning(long interval) { if (hasNotified.compareAndSet(true, false)) { // 交換 this.onWaitEnd(); return; } // 重置 waitPoint.reset(); try { // 會(huì)一直等待countdown為0 waitPoint.await(interval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("Interrupted", e); } finally { // 是否被通知設(shè)置為false hasNotified.set(false); this.onWaitEnd(); } }}public class CommitLog { /** * 組提交Service */ class GroupCommitService extends FlushCommitLogService { // 組提交寫(xiě)請(qǐng)求鏈表 private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>(); // 組提交讀請(qǐng)求鏈表 private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>(); @Override protected void onWaitEnd() { // 交換讀寫(xiě)請(qǐng)求列表的數(shù)據(jù)請(qǐng)求 this.swapRequests(); } private void swapRequests() { // 加鎖 lock.lock(); try { // 將讀寫(xiě)請(qǐng)求鏈表的數(shù)據(jù)進(jìn)行交換 LinkedList<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } finally { lock.unlock(); } } // ... }}折疊

這里使用讀寫(xiě)鏈表進(jìn)行交換應(yīng)該是為了提升性能,如果只使用一個(gè)鏈表,在提交請(qǐng)求的時(shí)候需要往鏈表中添加請(qǐng)求,此時(shí)需要加鎖,而刷盤(pán)線(xiàn)程在處理完請(qǐng)求之后是需要從鏈表中移除請(qǐng)求的,假設(shè)添加請(qǐng)求時(shí)加的鎖還未釋放,刷盤(pán)線(xiàn)程就要一直等待,而添加和處理完全可以同時(shí)進(jìn)行,所以使用了兩個(gè)鏈表,在添加請(qǐng)求的時(shí)候使用寫(xiě)鏈表,處理請(qǐng)求的時(shí)候?qū)ψx寫(xiě)鏈表的數(shù)據(jù)進(jìn)行交換使用讀鏈表,這樣只需在交換數(shù)據(jù)的時(shí)候加鎖,以此來(lái)提升性能。

三、執(zhí)行刷盤(pán)

waitForRunning執(zhí)行完畢后,會(huì)回到GroupCommitService中的run方法開(kāi)始繼續(xù)往后執(zhí)行代碼,從代碼中可以看到接下來(lái)會(huì)調(diào)用doCommit方法執(zhí)行刷盤(pán)。

doCommit方法中對(duì)讀鏈表中的數(shù)據(jù)進(jìn)行了判空,如果不為空,進(jìn)行遍歷處理每一個(gè)提交請(qǐng)求,處理邏輯如下:

獲取CommitLog映射文件記錄的刷盤(pán)位置偏移量flushedWhere,判斷是否大于請(qǐng)求設(shè)定的刷盤(pán)位置偏移量nextOffset,正常情況下flush的位置應(yīng)該小于本次刷入數(shù)據(jù)后的偏移量,所以如果flush位置大于等于本次請(qǐng)求設(shè)置的flush偏移量,本次將不能進(jìn)行刷盤(pán)開(kāi)啟一個(gè)循環(huán),調(diào)用mappedFileQueue的flush方法執(zhí)行刷盤(pán)(具體的實(shí)現(xiàn)在異步刷盤(pán)的時(shí)候再看),由于CommitLog大小為1G,所以本次刷完之后,如果當(dāng)前已經(jīng)刷入的偏移量小于請(qǐng)求設(shè)定的位置,表示數(shù)據(jù)未刷完,需要繼續(xù)刷,反之表示數(shù)據(jù)已經(jīng)刷完,flushOK為true,for循環(huán)條件不滿(mǎn)足結(jié)束執(zhí)行。請(qǐng)求處理之后會(huì)清空讀鏈表。

public class CommitLog { /** * 組提交Service */ class GroupCommitService extends FlushCommitLogService { // 運(yùn)行 public void run() { CommitLog.log.info(this.getServiceName() + " service started"); // 如果沒(méi)有停止 while (!this.isStopped()) { try { // 等待喚醒刷盤(pán)線(xiàn)程 this.waitForRunning(10); // 進(jìn)行提交 this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } // 睡眠10毫秒 try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn(this.getServiceName() + " Exception, ", e); } synchronized (this) { this.swapRequests(); } // 停止之前提交一次 this.doCommit(); CommitLog.log.info(this.getServiceName() + " service end"); } // 提交刷盤(pán) private void doCommit() { // 如果不為空 if (!this.requestsRead.isEmpty()) { // 遍歷刷盤(pán)請(qǐng)求 for (GroupCommitRequest req : this.requestsRead) { // 獲取映射文件的flush位置,判斷是否大于請(qǐng)求設(shè)定的刷盤(pán)位置 boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); for (int i = 0; i < 2 && !flushOK; i++) { // 進(jìn)行刷盤(pán) CommitLog.this.mappedFileQueue.flush(0); // 由于CommitLog大小為1G,所以本次刷完之后,如果當(dāng)前已經(jīng)刷入的偏移量小于請(qǐng)求設(shè)定的位置,表示數(shù)據(jù)未刷完,需要繼續(xù)刷,反之表示數(shù)據(jù)已經(jīng)刷完,flushOK為true,for循環(huán)條件不滿(mǎn)足結(jié)束執(zhí)行 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } // 設(shè)置刷盤(pán)結(jié)果 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } // 請(qǐng)求處理完之后清空鏈表 this.requestsRead = new LinkedList<>(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } } }折疊 刷盤(pán)超時(shí)監(jiān)控

FlushDiskWatcher用于監(jiān)控刷盤(pán)請(qǐng)求的耗時(shí),它也繼承了ServiceThread,在Broker啟動(dòng)時(shí)開(kāi)啟了該線(xiàn)程,在run方法中,使用while循環(huán),只要服務(wù)未停止,會(huì)一直從阻塞隊(duì)列中獲取提交的刷盤(pán)請(qǐng)求,開(kāi)啟while循環(huán)隔一段時(shí)間判斷一下刷盤(pán)是否完成,如果未完成,會(huì)做如下判斷:

使用當(dāng)前時(shí)間減去請(qǐng)求設(shè)置的刷盤(pán)截止時(shí)間,如果已經(jīng)超過(guò)截止時(shí)間,說(shuō)明刷盤(pán)時(shí)間已經(jīng)超時(shí),調(diào)用wakeupCustomer方法設(shè)置刷盤(pán)結(jié)果為已超時(shí)如果未超時(shí),為了避免當(dāng)前線(xiàn)程頻繁的進(jìn)行判斷,將當(dāng)前線(xiàn)程睡眠一會(huì)兒,睡眠的計(jì)算方式是使用刷盤(pán)請(qǐng)求設(shè)置的截止時(shí)間 - 當(dāng)前時(shí)間,表示剩余的時(shí)間,然后除以1000000化為毫秒,得到距離刷盤(pán)截止時(shí)間的毫秒數(shù)sleepTime:sleepTime如果為0,只能是當(dāng)前時(shí)間等于截止時(shí)間,也就是到了截止時(shí)間,此時(shí)同樣調(diào)用wakeupCustomer方法設(shè)置刷盤(pán)結(jié)果為已超時(shí)sleepTime不為0,在10毫秒和sleepTime的值之間取較小的那個(gè)作為睡眠的毫秒數(shù)將當(dāng)前線(xiàn)程睡眠,等待刷盤(pán)任務(wù)執(zhí)行

public class FlushDiskWatcher extends ServiceThread { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); // 阻塞隊(duì)列,存放提交請(qǐng)求 private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>(); @Override public String getServiceName() { return FlushDiskWatcher.class.getSimpleName(); } @Override public void run() { // 如果未停止 while (!isStopped()) { GroupCommitRequest request = null; try { // 從阻塞隊(duì)列中獲取提交請(qǐng)求 request = commitRequests.take(); } catch (InterruptedException e) { log.warn("take flush disk commit request, but interrupted, this may caused by shutdown"); continue; } // 如果還未完成 while (!request.future().isDone()) { long now = System.nanoTime(); // 如果已經(jīng)超時(shí) if (now - request.getDeadLine() >= 0) { // 設(shè)置刷盤(pán)結(jié)果為超時(shí) request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT); break; } // 避免頻繁的判斷,使用(截止時(shí)間 - 當(dāng)前時(shí)間)/1000000 計(jì)算一個(gè)毫秒數(shù) long sleepTime = (request.getDeadLine() - now) / 1_000_000; // 在計(jì)算的毫秒數(shù)與10之間取最小的 sleepTime = Math.min(10, sleepTime); // 如果sleepTime為0表示已經(jīng)到了截止時(shí)間 if (sleepTime == 0) { // 設(shè)置刷盤(pán)結(jié)果為超時(shí) request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT); break; } try { // 睡眠等待刷盤(pán)任務(wù)的執(zhí)行 Thread.sleep(sleepTime); } catch (InterruptedException e) { log.warn( "An exception occurred while waiting for flushing disk to complete. this may caused by shutdown"); break; } } } }}折疊 異步刷盤(pán)

上面講解了同步刷盤(pán),接下來(lái)去看下異步刷盤(pán),首先會(huì)判斷是否使用了暫存池,如果未開(kāi)啟調(diào)用flushCommitLogService的wakeup喚醒刷盤(pán)線(xiàn)程,否則使用commitLogService先將數(shù)據(jù)寫(xiě)入到FileChannel,然后統(tǒng)一進(jìn)行刷盤(pán):

public class CommitLog { private final FlushDiskWatcher flushDiskWatcher; public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // 是否是同步刷盤(pán) if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // ... } // 如果是異步刷盤(pán) else { // 如果未使用暫存池 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { // 喚醒刷盤(pán)線(xiàn)程進(jìn)行刷盤(pán) flushCommitLogService.wakeup(); } else { // 如果使用暫存池,使用commitLogService,先將數(shù)據(jù)寫(xiě)入到FILECHANNEL,然后統(tǒng)一進(jìn)行刷盤(pán) commitLogService.wakeup(); } // 返回結(jié)果 return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } }}

在CommitLog的構(gòu)造函數(shù)中可以看到,commitLogService使用的是CommitRealTimeService進(jìn)行實(shí)例化的,flushCommitLogService需要根據(jù)設(shè)置決定使用哪種類(lèi)型進(jìn)行實(shí)例化:

如果是同步刷盤(pán),使用GroupCommitService,由前面的同步刷盤(pán)可知,使用的就是GroupCommitService進(jìn)行刷盤(pán)的。如果是異步刷盤(pán),使用FlushRealTimeService。

所以接下來(lái)需要關(guān)注CommitRealTimeService和FlushRealTimeService:

public class CommitLog { private final FlushCommitLogService flushCommitLogService; // 刷盤(pán)Service private final FlushCommitLogService commitLogService; public CommitLog(final DefaultMessageStore defaultMessageStore) { // 如果設(shè)置的同步刷盤(pán) if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { // 使用GroupCommitService this.flushCommitLogService = new GroupCommitService(); } else { // 使用FlushRealTimeService this.flushCommitLogService = new FlushRealTimeService(); } // commitLogService this.commitLogService = new CommitRealTimeService(); }}

CommitRealTimeService

在開(kāi)啟暫存池時(shí),會(huì)使用CommitRealTimeService,它繼承了FlushCommitLogService,所以會(huì)實(shí)現(xiàn)run方法,處理邏輯如下:

從配置信息中獲取提交間隔、每次提交的最少頁(yè)數(shù)兩次提交的最大間隔時(shí)間如果當(dāng)前時(shí)間大于上次提交時(shí)間+兩次提交的最大間隔時(shí)間,意味著已經(jīng)有比較長(zhǎng)的一段時(shí)間沒(méi)有進(jìn)行提交了,需要盡快刷盤(pán),此時(shí)將每次提交的最少頁(yè)數(shù)設(shè)置為0不限制提交頁(yè)數(shù)調(diào)用mappedFileQueue的commit方法進(jìn)行提交,并返回提交的結(jié)果:如果結(jié)果為true表示未提交任何數(shù)據(jù)如果結(jié)果為false表示進(jìn)行了數(shù)據(jù)提交,需要等待刷盤(pán)判斷提交返回結(jié)果是否返回false,如果是調(diào)用flushCommitLogService的wakeup方法喚醒刷盤(pán)線(xiàn)程,進(jìn)行刷盤(pán)調(diào)用waitForRunning等待下一次提交處理

class CommitRealTimeService extends FlushCommitLogService { // 上次提交時(shí)間戳 private long lastCommitTimestamp = 0; @Override public void run() { CommitLog.log.info(this.getServiceName() + " service started"); // 如果未停止 while (!this.isStopped()) { // 獲取提交間隔 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 一次提交的最少頁(yè)數(shù) int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); // 兩次提交的最大間隔時(shí)間 int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); // 開(kāi)始時(shí)間 long begin = System.currentTimeMillis(); // 如果當(dāng)前時(shí)間大于上次提交時(shí)間+提交的最大間隔時(shí)間 if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; // 提交時(shí)間 commitDataLeastPages = 0;// 最少提交頁(yè)數(shù)設(shè)為0,表示不限制提交頁(yè)數(shù) } try { // 提交 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); // 提交結(jié)束時(shí)間 long end = System.currentTimeMillis(); // 如果返回false表示提交了一部分?jǐn)?shù)據(jù)但是還未進(jìn)行刷盤(pán) if (!result) { // 再次更新提交時(shí)間戳 this.lastCommitTimestamp = end; // 喚醒flush線(xiàn)程進(jìn)行刷盤(pán) flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } // 等待下一次提交 this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); } }折疊 提交

提交的方法在MappedFileQueue的commit方法中實(shí)現(xiàn),處理邏輯如下:

根據(jù)記錄的CommitLog文件提交位置的偏移量獲取映射文件,如果獲取不為空,調(diào)用MappedFile的commit方法進(jìn)行提交,然后返回本次提交數(shù)據(jù)的偏移量記錄本次提交的偏移量:文件的偏移量 + 提交數(shù)據(jù)的偏移量判斷本次提交的偏移量是否等于上一次的提交偏移量,如果等于表示本次未提交任何數(shù)據(jù),返回結(jié)果置為true,否則表示提交了數(shù)據(jù),等待刷盤(pán),返回結(jié)果為false更新上一次提交偏移量committedWhere的值為本次的提交偏移量的值

public class MappedFileQueue { protected long flushedWhere = 0; // flush的位置偏移量 private long committedWhere = 0; // 提交的位置偏移量 public boolean commit(final int commitLeastPages) { boolean result = true; // 根據(jù)提交位置的偏移量獲取映射文件 MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0); if (mappedFile != null) { // 調(diào)用mappedFile的commit方法進(jìn)行提交,返回提交數(shù)據(jù)的偏移量 int offset = mappedFile.commit(commitLeastPages); // 記錄本次提交的偏移量:文件的偏移量 + 提交數(shù)據(jù)的偏移量 long where = mappedFile.getFileFromOffset() + offset; // 設(shè)置返回結(jié)果,如果本次提交偏移量等于上一次的提交偏移量為true,表示什么也沒(méi)干,否則表示提交了數(shù)據(jù),等待刷盤(pán) result = where == this.committedWhere; // 更新上一次提交偏移量的值為本次的 this.committedWhere = where; } return result; }}

MappedFile

MappedFile中記錄CommitLog的寫(xiě)入位置wrotePosition、提交位置committedPosition以及flush位置flushedPosition,在commit方法中,調(diào)用了isAbleToCommit判斷是否可以提交數(shù)據(jù),判斷的流程如下:

獲取提交數(shù)據(jù)的位置偏移量和寫(xiě)入數(shù)據(jù)的位置偏移量如果最少提交頁(yè)數(shù)大于0,計(jì)算本次寫(xiě)入的頁(yè)數(shù)是否大于或等于最少提交頁(yè)數(shù)本次寫(xiě)入數(shù)據(jù)的頁(yè)數(shù)計(jì)算方法:寫(xiě)入位置/頁(yè)大小 - flush位置/頁(yè)大小如果以上條件都滿(mǎn)足,判斷寫(xiě)入位置是否大于flush位置,如果大于表示有一部數(shù)據(jù)未flush可以進(jìn)行提交

滿(mǎn)足提交條件后,就會(huì)調(diào)用commit0方法提交數(shù)據(jù),將數(shù)據(jù)寫(xiě)入到fileChannel中:

public class MappedFile extends ReferenceResource { // 數(shù)據(jù)寫(xiě)入位置 protected final AtomicInteger wrotePosition = new AtomicInteger(0); // 數(shù)據(jù)提交位置 protected final AtomicInteger committedPosition = new AtomicInteger(0); // 數(shù)據(jù)flush位置 private final AtomicInteger flushedPosition = new AtomicInteger(0); // 提交數(shù)據(jù) public int commit(final int commitLeastPages) { // 如果writeBuffer為空 if (writeBuffer == null) { // 不需要提交任何數(shù)據(jù)到,返回之前記錄的寫(xiě)入位置 return this.wrotePosition.get(); } // 如果可以提交數(shù)據(jù) if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { // 提交數(shù)據(jù) commit0(); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } } // All dirty data has been committed to FileChannel. if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; } // 返回提交位置 return this.committedPosition.get(); } // 是否可以提交數(shù)據(jù) protected boolean isAbleToCommit(final int commitLeastPages) { // 獲取提交數(shù)據(jù)的位置偏移量 int flush = this.committedPosition.get(); // 獲取寫(xiě)入數(shù)據(jù)的位置偏移量 int write = this.wrotePosition.get(); if (this.isFull()) { return true; } // 如果最少提交頁(yè)數(shù)大于0 if (commitLeastPages > 0) { // 寫(xiě)入位置/頁(yè)大小 - flush位置/頁(yè)大小 是否大于至少提交的頁(yè)數(shù) return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages; } // 判斷是否需要flush數(shù)據(jù) return write > flush; } protected void commit0() { // 獲取寫(xiě)入位置 int writePos = this.wrotePosition.get(); // 獲取上次提交的位置 int lastCommittedPosition = this.committedPosition.get(); if (writePos - lastCommittedPosition > 0) { try { // 創(chuàng)建共享緩沖區(qū) ByteBuffer byteBuffer = writeBuffer.slice(); // 設(shè)置上一次提交位置 byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); // 數(shù)據(jù)寫(xiě)入fileChannel this.fileChannel.write(byteBuffer); // 更新寫(xiě)入的位置 this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } }}折疊

FlushRealTimeService

如果未開(kāi)啟暫存池,會(huì)直接使用FlushRealTimeService進(jìn)行刷盤(pán),當(dāng)然如果開(kāi)啟暫存池,寫(xiě)入一批數(shù)據(jù)后,同樣會(huì)使用FlushRealTimeService進(jìn)行刷盤(pán),F(xiàn)lushRealTimeService同樣繼承了FlushCommitLogService,是用于執(zhí)行刷盤(pán)的線(xiàn)程,處理邏輯與提交刷盤(pán)數(shù)據(jù)邏輯相似,只不過(guò)不是提交數(shù)據(jù),而是調(diào)用flush方法將提交的數(shù)據(jù)刷入磁盤(pán):

從配置信息中獲取flush間隔每次flush的最少頁(yè)數(shù)兩次flush的最大間隔時(shí)間如果當(dāng)前時(shí)間大于上次flush時(shí)間+兩次flush的最大間隔時(shí)間,意味著已經(jīng)有比較長(zhǎng)的一段時(shí)間沒(méi)有進(jìn)行flush,此時(shí)將每次flush的最少頁(yè)數(shù)設(shè)置為0不限制flush頁(yè)數(shù)調(diào)用waitForRunning等待被喚醒如果被喚醒,調(diào)用mappedFileQueue的flush方法進(jìn)行刷盤(pán)

class FlushRealTimeService extends FlushCommitLogService { private long lastFlushTimestamp = 0; // 上一次flush的時(shí)間 private long printTimes = 0; public void run() { CommitLog.log.info(this.getServiceName() + " service started"); // 如果未停止 while (!this.isStopped()) { // boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); // 獲取flush間隔 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); // flush至少包含的頁(yè)數(shù) int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); // 兩次flush的時(shí)間間隔 int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; long currentTimeMillis = System.currentTimeMillis(); // 如果當(dāng)前毫秒數(shù) 大于上次flush時(shí)間 + 兩次flush之間的間隔 if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; // 更新flush時(shí)間 flushPhysicQueueLeastPages = 0; // flush至少包含的頁(yè)數(shù)置為0 printFlushProgress = (printTimes++ % 10) == 0; } try { // if (flushCommitLogTimed) { // 睡眠 Thread.sleep(interval); } else { // 等待flush被喚醒 this.waitForRunning(interval); } if (printFlushProgress) { // 打印刷盤(pán)進(jìn)程 this.printFlushProgress(); } long begin = System.currentTimeMillis(); // 進(jìn)行刷盤(pán) CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // 如果服務(wù)停止,確保數(shù)據(jù)被刷盤(pán) boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { // 進(jìn)行刷盤(pán) result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end"); }折疊 刷盤(pán)

刷盤(pán)的方法在MappedFileQueue的flush方法中實(shí)現(xiàn),處理邏輯如下:

根據(jù) flush的位置偏移量獲取映射文件調(diào)用mappedFile的flush方法進(jìn)行刷盤(pán),并返回刷盤(pán)后的位置偏移量計(jì)算最新的flush偏移量更新flushedWhere的值為最新的flush偏移量

public class MappedFileQueue { protected long flushedWhere = 0; // flush的位置偏移量 private long committedWhere = 0; // 提交的位置偏移量 // flush刷盤(pán) public boolean flush(final int flushLeastPages) { boolean result = true; // 獲取flush的位置偏移量映射文件 MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { // 獲取時(shí)間戳 long tmpTimeStamp = mappedFile.getStoreTimestamp(); // 調(diào)用MappedFile的flush方法進(jìn)行刷盤(pán),返回刷盤(pán)后的偏移量 int offset = mappedFile.flush(flushLeastPages); // 計(jì)算最新的flush偏移量 long where = mappedFile.getFileFromOffset() + offset; result = where == this.flushedWhere; // 更新flush偏移量 this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } // 返回flush的偏移量 return result; }}

flush的邏輯也與commit方法的邏輯類(lèi)似:

調(diào)用isAbleToFlush判斷是否滿(mǎn)足刷盤(pán)條件,獲取上次flush位置偏移量和當(dāng)前寫(xiě)入位置偏移量進(jìn)行如下校驗(yàn):文件是否已寫(xiě)滿(mǎn),即文件大小是否與寫(xiě)入數(shù)據(jù)位置相等,如果相等說(shuō)明文件已經(jīng)寫(xiě)滿(mǎn)需要執(zhí)行刷盤(pán),滿(mǎn)足刷盤(pán)條件如果最少flush頁(yè)數(shù)大于0,計(jì)算本次flush的頁(yè)數(shù)是否大于或等于最少flush頁(yè)數(shù),如果滿(mǎn)足可以進(jìn)行刷盤(pán)本次flush數(shù)據(jù)的頁(yè)數(shù)計(jì)算方法:寫(xiě)入位置/頁(yè)大小 - flush位置/頁(yè)大小如果寫(xiě)入位置偏移量是否大于flush位置偏移量,如果大于表示有數(shù)據(jù)未進(jìn)行刷盤(pán),滿(mǎn)足刷盤(pán)條件調(diào)用fileChannel的force或者mappedByteBuffer的force方法進(jìn)行刷盤(pán)記錄本次flush的位置,并作為結(jié)果返回

public class MappedFile extends ReferenceResource { protected final AtomicInteger wrotePosition = new AtomicInteger(0); protected final AtomicInteger committedPosition = new AtomicInteger(0); private final AtomicInteger flushedPosition = new AtomicInteger(0); /** * 進(jìn)行刷盤(pán)并返回flush后的偏移量 */ public int flush(final int flushLeastPages) { // 是否可以刷盤(pán) if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { // 如果writeBuffer不為空 if (writeBuffer != null || this.fileChannel.position() != 0) { // 將數(shù)據(jù)刷到硬盤(pán) this.fileChannel.force(false); } else { this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } // 記錄flush位置 this.flushedPosition.set(value); this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } // 返回flush位置 return this.getFlushedPosition(); } // 是否可以刷盤(pán) private boolean isAbleToFlush(final int flushLeastPages) { // 獲取上次flush位置 int flush = this.flushedPosition.get(); // 寫(xiě)入位置偏移量 int write = getReadPosition(); if (this.isFull()) { return true; } // 如果flush的頁(yè)數(shù)大于0,校驗(yàn)本次flush的頁(yè)數(shù)是否滿(mǎn)足條件 if (flushLeastPages > 0) { // 本次flush的頁(yè)數(shù):寫(xiě)入位置偏移量/OS_PAGE_SIZE - 上次flush位置偏移量/OS_PAGE_SIZE,是否大于flushLeastPages return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; } // 寫(xiě)入位置偏移量是否大于flush位置偏移量 return write > flush; } // 文件是否已寫(xiě)滿(mǎn) public boolean isFull() { // 文件大小是否與寫(xiě)入數(shù)據(jù)位置相等 return this.fileSize == this.wrotePosition.get(); } /** * 返回當(dāng)前有效數(shù)據(jù)的位置 */ public int getReadPosition() { // 如果writeBuffer為空使用寫(xiě)入位置,否則使用提交位置 return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get(); }}折疊

以上就是關(guān)于pos機(jī)獲取信息,「RocketMQ」消息的刷盤(pán)機(jī)制的知識(shí),后面我們會(huì)繼續(xù)為大家整理關(guān)于pos機(jī)獲取信息的知識(shí),希望能夠幫助到大家!

轉(zhuǎn)發(fā)請(qǐng)帶上網(wǎng)址:http://m.afbey.com/news/32722.html

你可能會(huì)喜歡:

版權(quán)聲明:本文內(nèi)容由互聯(lián)網(wǎng)用戶(hù)自發(fā)貢獻(xiàn),該文觀點(diǎn)僅代表作者本人。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如發(fā)現(xiàn)本站有涉嫌抄襲侵權(quán)/違法違規(guī)的內(nèi)容, 請(qǐng)發(fā)送郵件至 babsan@163.com 舉報(bào),一經(jīng)查實(shí),本站將立刻刪除。