成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

AbstractFetcherThread:拉取消息分幾步?

開(kāi)發(fā) 前端
通過(guò)分析Kafka中的?Timer?和?SystemTimer?類,我們深入了解了Kafka如何通過(guò)分層時(shí)間輪實(shí)現(xiàn)高效的延時(shí)任務(wù)調(diào)度機(jī)制。Kafka的延時(shí)處理不僅應(yīng)用于消費(fèi)者組協(xié)調(diào)器,還廣泛用于副本管理、控制器等模塊。

今天我們來(lái)深入探討Kafka中的延遲處理機(jī)制,即通過(guò)DelayedOperation來(lái)實(shí)現(xiàn)的延時(shí)處理請(qǐng)求。具體來(lái)說(shuō),Kafka使用了一種名為“分層時(shí)間輪”的數(shù)據(jù)結(jié)構(gòu)來(lái)管理延時(shí)任務(wù),并通過(guò)它實(shí)現(xiàn)了對(duì)延遲請(qǐng)求的高效處理。這種延時(shí)機(jī)制廣泛應(yīng)用于Kafka的各個(gè)模塊,比如控制器、分區(qū)管理、副本同步等。

本節(jié)課我們將通過(guò)分析Kafka的相關(guān)源碼,詳細(xì)講解DelayedOperation是如何在Broker中延時(shí)處理請(qǐng)求的。同時(shí),我們還會(huì)講解兩個(gè)關(guān)鍵類:Timer和SystemTimer,看看它們是如何與Kafka的整體框架結(jié)合的。

一、Kafka延時(shí)處理機(jī)制概述

Kafka中延遲請(qǐng)求的處理場(chǎng)景非常多,比如:

  • 消費(fèi)者組協(xié)調(diào)器:處理消費(fèi)者組中的成員加入和離開(kāi)時(shí)的超時(shí)。
  • 控制器:在處理集群元數(shù)據(jù)的變化時(shí)需要對(duì)副本分配、Leader選舉進(jìn)行延時(shí)操作。
  • 副本管理:當(dāng)副本與Leader失聯(lián)時(shí),需要延遲一段時(shí)間再?zèng)Q定是否剔除該副本。

Kafka為了應(yīng)對(duì)這些場(chǎng)景,使用了一種高效的延時(shí)處理機(jī)制:分層時(shí)間輪(Hierarchical Timing Wheels)。這個(gè)數(shù)據(jù)結(jié)構(gòu)通過(guò)將延時(shí)任務(wù)按照超時(shí)時(shí)間分層存儲(chǔ),極大地提高了處理大量延時(shí)任務(wù)的性能。

1.1 什么是分層時(shí)間輪?

分層時(shí)間輪是一種常用于處理延遲任務(wù)的數(shù)據(jù)結(jié)構(gòu),它的核心思想是將時(shí)間分為一系列固定大小的時(shí)間槽(Bucket),每個(gè)槽對(duì)應(yīng)一個(gè)時(shí)間段。延時(shí)任務(wù)會(huì)根據(jù)它的超時(shí)時(shí)間被放入相應(yīng)的時(shí)間槽中,時(shí)間輪會(huì)隨著時(shí)間推移不斷向前轉(zhuǎn)動(dòng),每當(dāng)轉(zhuǎn)到某個(gè)時(shí)間槽時(shí),執(zhí)行其中的所有任務(wù)。

Kafka實(shí)現(xiàn)的分層時(shí)間輪有多個(gè)層次,每一層的時(shí)間槽覆蓋不同的時(shí)間范圍。隨著層次的增加,每個(gè)時(shí)間槽覆蓋的時(shí)間也逐漸變大。這樣設(shè)計(jì)的好處是,可以通過(guò)較少的層次和時(shí)間槽來(lái)管理大范圍的延時(shí)任務(wù)。

二、核心類:Timer 和 SystemTimer

在Kafka中,延時(shí)任務(wù)的管理由兩個(gè)關(guān)鍵類負(fù)責(zé):

  • Timer:這是時(shí)間輪的抽象接口,定義了延時(shí)任務(wù)的調(diào)度方法。
  • SystemTimer:這是Timer的具體實(shí)現(xiàn),使用分層時(shí)間輪來(lái)管理任務(wù)。

接下來(lái),我們通過(guò)源碼詳細(xì)了解這兩個(gè)類的實(shí)現(xiàn)。

2.1 Timer接口

首先來(lái)看Timer接口,這是Kafka中用于管理延時(shí)任務(wù)的通用接口。它的主要方法包括:

public interface Timer {

    /**
     * 添加一個(gè)延時(shí)操作到定時(shí)器中。
     */
    void add(DelayedOperation operation);

    /**
     * 觸發(fā)到期的延時(shí)操作。
     */
    boolean advanceClock(long timeoutMs) throws InterruptedException;

    /**
     * 檢查定時(shí)器中是否有待執(zhí)行的操作。
     */
    int size();

    /**
     * 關(guān)閉定時(shí)器。
     */
    void shutdown();
}
  • add(DelayedOperation operation):將一個(gè)延時(shí)任務(wù)添加到時(shí)間輪中。
  • advanceClock(long timeoutMs):推進(jìn)時(shí)間輪的時(shí)鐘,觸發(fā)已經(jīng)到期的延時(shí)任務(wù)。
  • size():返回當(dāng)前定時(shí)器中未執(zhí)行的任務(wù)數(shù)。
  • shutdown():關(guān)閉定時(shí)器,停止任務(wù)調(diào)度。

Timer接口為Kafka中所有延時(shí)任務(wù)的管理提供了統(tǒng)一的抽象,各個(gè)模塊的延時(shí)任務(wù)都通過(guò)這個(gè)接口進(jìn)行調(diào)度。

2.2 SystemTimer類

SystemTimer是Timer接口的具體實(shí)現(xiàn),它使用了分層時(shí)間輪來(lái)管理延時(shí)任務(wù)。我們來(lái)看一下它的主要實(shí)現(xiàn):

public class SystemTimer implements Timer {

    private final String executorName;
    private final TimerTaskList[] timeWheel;
    private final long tickMs;
    private final int wheelSize;
    private final long startMs;

    // 構(gòu)造函數(shù),初始化時(shí)間輪
    public SystemTimer(String executorName, long tickMs, int wheelSize) {
        this.executorName = executorName;
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.timeWheel = new TimerTaskList[wheelSize];
        this.startMs = System.currentTimeMillis();
        // 初始化時(shí)間輪的每個(gè)Bucket
        for (int i = 0; i < wheelSize; i++) {
            timeWheel[i] = new TimerTaskList();
        }
    }

    @Override
    public void add(DelayedOperation operation) {
        long expiration = operation.expirationMs();
        long delayMs = expiration - System.currentTimeMillis();
        int bucketIndex = (int) ((delayMs / tickMs) % wheelSize);
        timeWheel[bucketIndex].add(operation);
    }

    @Override
    public boolean advanceClock(long timeoutMs) {
        long currentTimeMs = System.currentTimeMillis();
        int currentBucket = (int) ((currentTimeMs - startMs) / tickMs % wheelSize);
        // 處理當(dāng)前 Bucket 中的到期任務(wù)
        timeWheel[currentBucket].advance();
        return true;
    }

    @Override
    public int size() {
        int size = 0;
        for (TimerTaskList taskList : timeWheel) {
            size += taskList.size();
        }
        return size;
    }

    @Override
    public void shutdown() {
        // 清理所有未完成的任務(wù)
    }
}

SystemTimer的核心成員變量包括:

  • tickMs:時(shí)間輪的最小時(shí)間間隔,也就是時(shí)間輪每次轉(zhuǎn)動(dòng)的步長(zhǎng)。
  • wheelSize:時(shí)間輪中時(shí)間槽的數(shù)量。
  • timeWheel[]:時(shí)間輪的數(shù)組,每個(gè)元素對(duì)應(yīng)一個(gè)時(shí)間槽(Bucket),用來(lái)存儲(chǔ)延時(shí)任務(wù)。

2.2.1 add()方法

add()方法用于將延時(shí)任務(wù)添加到時(shí)間輪中。它通過(guò)計(jì)算任務(wù)的超時(shí)時(shí)間,確定該任務(wù)應(yīng)該存放在哪個(gè)時(shí)間槽中。計(jì)算方式是根據(jù)當(dāng)前時(shí)間和任務(wù)的超時(shí)時(shí)間,確定需要經(jīng)過(guò)多少個(gè)tick,然后取模得到對(duì)應(yīng)的時(shí)間槽。

long expiration = operation.expirationMs();
long delayMs = expiration - System.currentTimeMillis();
int bucketIndex = (int) ((delayMs / tickMs) % wheelSize);
timeWheel[bucketIndex].add(operation);

這樣,Kafka可以將延時(shí)任務(wù)按超時(shí)時(shí)間分布到不同的時(shí)間槽中,隨著時(shí)間輪的轉(zhuǎn)動(dòng)逐漸觸發(fā)這些任務(wù)。

2.2.2 advanceClock()方法

advanceClock()方法用于推進(jìn)時(shí)間輪的時(shí)鐘。當(dāng)時(shí)間輪的時(shí)鐘前進(jìn)時(shí),會(huì)檢查當(dāng)前時(shí)間槽中的任務(wù),觸發(fā)已經(jīng)到期的任務(wù)。

long currentTimeMs = System.currentTimeMillis();
int currentBucket = (int) ((currentTimeMs - startMs) / tickMs % wheelSize);
timeWheel[currentBucket].advance();

這個(gè)方法會(huì)計(jì)算當(dāng)前的時(shí)間槽索引,并處理當(dāng)前槽中的任務(wù)。Kafka通過(guò)不斷推進(jìn)時(shí)間輪的時(shí)鐘,逐步觸發(fā)延時(shí)任務(wù)的執(zhí)行。

2.2.3 TimerTaskList類

時(shí)間輪中的每個(gè)時(shí)間槽是一個(gè)TimerTaskList對(duì)象,它存儲(chǔ)了當(dāng)前槽中的所有延時(shí)任務(wù)。TimerTaskList類的實(shí)現(xiàn)如下:

public class TimerTaskList {
    private final List<DelayedOperation> tasks = new LinkedList<>();

    // 添加任務(wù)
    public void add(DelayedOperation operation) {
        tasks.add(operation);
    }

    // 觸發(fā)到期任務(wù)
    public void advance() {
        Iterator<DelayedOperation> iterator = tasks.iterator();
        while (iterator.hasNext()) {
            DelayedOperation task = iterator.next();
            if (task.isExpired()) {
                task.run();
                iterator.remove();
            }
        }
    }

    public int size() {
        return tasks.size();
    }
}

TimerTaskList通過(guò)鏈表存儲(chǔ)延時(shí)任務(wù),并在時(shí)鐘推進(jìn)時(shí)檢查任務(wù)是否到期,執(zhí)行到期任務(wù)并將其從列表中移除。

三、Kafka中的延遲處理示例

接下來(lái)我們結(jié)合Kafka的具體場(chǎng)景,來(lái)看一下DelayedOperation是如何被應(yīng)用的。一個(gè)典型的例子就是消費(fèi)者組協(xié)調(diào)器(GroupCoordinator)中的延遲處理。

3.1 消費(fèi)者組協(xié)調(diào)器中的延遲請(qǐng)求

在Kafka的消費(fèi)者組管理中,延遲請(qǐng)求被廣泛應(yīng)用。比如,當(dāng)一個(gè)消費(fèi)者加入或離開(kāi)消費(fèi)者組時(shí),協(xié)調(diào)器需要等待一段時(shí)間,直到確定沒(méi)有其他消費(fèi)者的變更請(qǐng)求,這時(shí)就需要使用延遲操作來(lái)處理請(qǐng)求。

在GroupCoordinator中,有一個(gè)completeJoinGroupRequest()方法,它通過(guò)延遲操作來(lái)管理消費(fèi)者加入組的請(qǐng)求:

public void completeJoinGroupRequest(String groupId, int memberId, long timeoutMs) {
    DelayedJoinGroup delayedJoin = new DelayedJoinGroup(groupId, memberId, timeoutMs);
    this.timer.add(delayedJoin);
}

這里DelayedJoinGroup是`

DelayedOperation的一個(gè)子類,用來(lái)處理消費(fèi)者加入組的邏輯。它會(huì)被添加到timer`中,并在超時(shí)后觸發(fā)執(zhí)行。

3.2 DelayedOperation類

DelayedOperation是Kafka中所有延遲任務(wù)的基類,定義了延遲任務(wù)的基本行為。它的核心方法如下:

public abstract class DelayedOperation {

    private final long deadlineMs;

    public DelayedOperation(long timeoutMs) {
        this.deadlineMs = System.currentTimeMillis() + timeoutMs;
    }

    // 檢查任務(wù)是否超時(shí)
    public boolean isExpired() {
        return System.currentTimeMillis() >= deadlineMs;
    }

    // 執(zhí)行任務(wù)
    public abstract void run();
}

DelayedOperation通過(guò)isExpired()方法判斷任務(wù)是否超時(shí),并通過(guò)run()方法執(zhí)行任務(wù)。Kafka中很多延時(shí)任務(wù)都是基于這個(gè)類實(shí)現(xiàn)的。

四、總結(jié)

通過(guò)分析Kafka中的Timer和SystemTimer類,我們深入了解了Kafka如何通過(guò)分層時(shí)間輪實(shí)現(xiàn)高效的延時(shí)任務(wù)調(diào)度機(jī)制。Kafka的延時(shí)處理不僅應(yīng)用于消費(fèi)者組協(xié)調(diào)器,還廣泛用于副本管理、控制器等模塊。

延時(shí)處理機(jī)制通過(guò)將任務(wù)分層存儲(chǔ),極大地提高了Kafka處理大量延時(shí)任務(wù)的性能。這種機(jī)制的設(shè)計(jì)既簡(jiǎn)潔又高效,適用于大規(guī)模分布式系統(tǒng)的延時(shí)任務(wù)處理需求。

責(zé)任編輯:武曉燕 來(lái)源: 架構(gòu)師秋天
相關(guān)推薦

2023-02-10 15:12:34

特斯拉電動(dòng)汽車

2016-12-08 16:33:54

2016-09-13 22:46:41

大數(shù)據(jù)

2016-12-23 19:05:24

存儲(chǔ)

2020-05-28 15:51:50

接口手機(jī)蘋果

2020-03-17 14:21:39

數(shù)據(jù)平臺(tái)架構(gòu)

2022-05-16 11:04:43

RocketMQPUSH 模式PULL 模式

2022-12-14 08:23:30

2010-06-02 18:29:36

搭建SVN

2011-08-04 18:14:42

Objective-C 消息

2022-09-24 09:52:42

TopicQueuekafka

2021-01-05 09:23:49

網(wǎng)頁(yè)端消息

2017-03-16 08:46:57

延時(shí)消息環(huán)形隊(duì)列數(shù)據(jù)結(jié)構(gòu)

2010-11-07 03:54:07

賽門鐵克收購(gòu)分拆出售

2024-08-27 13:43:38

Spring系統(tǒng)業(yè)務(wù)

2010-01-14 13:51:03

2010-09-17 20:28:29

2010-07-02 12:22:37

2024-08-07 08:02:08

2020-12-28 14:36:03

辦公
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)

主站蜘蛛池模板: 在线久草| 日韩超碰 | 亚洲国产精品99久久久久久久久 | 91看片免费版 | 亚洲精品九九 | 成人精品福利 | 黄网免费看| 中文字幕日韩欧美一区二区三区 | 国产乱码精品一品二品 | 男女国产网站 | 日本精品一区二区三区在线观看 | 超碰日韩| 激情一区 | 99福利| 在线一区| 精品国产一区二区三区免费 | 亚洲精品视频在线看 | 中文字幕一区二区三区四区 | 亚洲狠狠 | 久久91精品国产 | 欧美a在线 | 国产欧美在线观看 | 国产精品久久国产精品 | 91看片免费 | 狠狠夜夜 | 久久99网 | 在线资源视频 | 欧美精品一区二区三区视频 | 91久久久精品国产一区二区蜜臀 | 免费一级淫片aaa片毛片a级 | 久久蜜桃av一区二区天堂 | 婷婷综合色 | 日韩影院在线观看 | 久久综合伊人一区二区三 | 一区二区日韩 | 天天色天天 | 中文成人无字幕乱码精品 | 性做久久久久久免费观看欧美 | 高清亚洲 | 日本成人免费网站 | 日日操夜夜操天天操 |