成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

DelayedOperation:Broker是怎么延時處理請求的?

云計算 Kafka
通過分析Kafka中的?Timer?和?SystemTimer?類,我們深入了解了Kafka如何通過分層時間輪實現(xiàn)高效的延時任務(wù)調(diào)度機制。Kafka的延時處理不僅應(yīng)用于消費者組協(xié)調(diào)器,還廣泛用于副本管理、控制器等模塊。

今天我們來深入探討Kafka中的延遲處理機制,即通過DelayedOperation來實現(xiàn)的延時處理請求。具體來說,Kafka使用了一種名為“分層時間輪”的數(shù)據(jù)結(jié)構(gòu)來管理延時任務(wù),并通過它實現(xiàn)了對延遲請求的高效處理。這種延時機制廣泛應(yīng)用于Kafka的各個模塊,比如控制器、分區(qū)管理、副本同步等。

本節(jié)課我們將通過分析Kafka的相關(guān)源碼,詳細(xì)講解DelayedOperation是如何在Broker中延時處理請求的。同時,我們還會講解兩個關(guān)鍵類:Timer和SystemTimer,看看它們是如何與Kafka的整體框架結(jié)合的。

一、Kafka延時處理機制概述

Kafka中延遲請求的處理場景非常多,比如:

  • 消費者組協(xié)調(diào)器:處理消費者組中的成員加入和離開時的超時。
  • 控制器:在處理集群元數(shù)據(jù)的變化時需要對副本分配、Leader選舉進行延時操作。
  • 副本管理:當(dāng)副本與Leader失聯(lián)時,需要延遲一段時間再決定是否剔除該副本。

Kafka為了應(yīng)對這些場景,使用了一種高效的延時處理機制:分層時間輪(Hierarchical Timing Wheels)。這個數(shù)據(jù)結(jié)構(gòu)通過將延時任務(wù)按照超時時間分層存儲,極大地提高了處理大量延時任務(wù)的性能。

1.1 什么是分層時間輪?

分層時間輪是一種常用于處理延遲任務(wù)的數(shù)據(jù)結(jié)構(gòu),它的核心思想是將時間分為一系列固定大小的時間槽(Bucket),每個槽對應(yīng)一個時間段。延時任務(wù)會根據(jù)它的超時時間被放入相應(yīng)的時間槽中,時間輪會隨著時間推移不斷向前轉(zhuǎn)動,每當(dāng)轉(zhuǎn)到某個時間槽時,執(zhí)行其中的所有任務(wù)。

Kafka實現(xiàn)的分層時間輪有多個層次,每一層的時間槽覆蓋不同的時間范圍。隨著層次的增加,每個時間槽覆蓋的時間也逐漸變大。這樣設(shè)計的好處是,可以通過較少的層次和時間槽來管理大范圍的延時任務(wù)。

二、核心類:Timer 和 SystemTimer

在Kafka中,延時任務(wù)的管理由兩個關(guān)鍵類負(fù)責(zé):

  • Timer:這是時間輪的抽象接口,定義了延時任務(wù)的調(diào)度方法。
  • SystemTimer:這是Timer的具體實現(xiàn),使用分層時間輪來管理任務(wù)。

接下來,我們通過源碼詳細(xì)了解這兩個類的實現(xiàn)。

2.1 Timer接口

首先來看Timer接口,這是Kafka中用于管理延時任務(wù)的通用接口。它的主要方法包括:

public interface Timer {

    /**
     * 添加一個延時操作到定時器中。
     */
    void add(DelayedOperation operation);

    /**
     * 觸發(fā)到期的延時操作。
     */
    boolean advanceClock(long timeoutMs) throws InterruptedException;

    /**
     * 檢查定時器中是否有待執(zhí)行的操作。
     */
    int size();

    /**
     * 關(guān)閉定時器。
     */
    void shutdown();
}
  • add(DelayedOperation operation):將一個延時任務(wù)添加到時間輪中。
  • advanceClock(long timeoutMs):推進時間輪的時鐘,觸發(fā)已經(jīng)到期的延時任務(wù)。
  • size():返回當(dāng)前定時器中未執(zhí)行的任務(wù)數(shù)。
  • shutdown():關(guān)閉定時器,停止任務(wù)調(diào)度。

Timer接口為Kafka中所有延時任務(wù)的管理提供了統(tǒng)一的抽象,各個模塊的延時任務(wù)都通過這個接口進行調(diào)度。

2.2 SystemTimer類

SystemTimer是Timer接口的具體實現(xiàn),它使用了分層時間輪來管理延時任務(wù)。我們來看一下它的主要實現(xiàn):

public class SystemTimer implements Timer {

    private final String executorName;
    private final TimerTaskList[] timeWheel;
    private final long tickMs;
    private final int wheelSize;
    private final long startMs;

    // 構(gòu)造函數(shù),初始化時間輪
    public SystemTimer(String executorName, long tickMs, int wheelSize) {
        this.executorName = executorName;
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.timeWheel = new TimerTaskList[wheelSize];
        this.startMs = System.currentTimeMillis();
        // 初始化時間輪的每個Bucket
        for (int i = 0; i < wheelSize; i++) {
            timeWheel[i] = new TimerTaskList();
        }
    }

    @Override
    public void add(DelayedOperation operation) {
        long expiration = operation.expirationMs();
        long delayMs = expiration - System.currentTimeMillis();
        int bucketIndex = (int) ((delayMs / tickMs) % wheelSize);
        timeWheel[bucketIndex].add(operation);
    }

    @Override
    public boolean advanceClock(long timeoutMs) {
        long currentTimeMs = System.currentTimeMillis();
        int currentBucket = (int) ((currentTimeMs - startMs) / tickMs % wheelSize);
        // 處理當(dāng)前 Bucket 中的到期任務(wù)
        timeWheel[currentBucket].advance();
        return true;
    }

    @Override
    public int size() {
        int size = 0;
        for (TimerTaskList taskList : timeWheel) {
            size += taskList.size();
        }
        return size;
    }

    @Override
    public void shutdown() {
        // 清理所有未完成的任務(wù)
    }
}

SystemTimer的核心成員變量包括:

  • tickMs:時間輪的最小時間間隔,也就是時間輪每次轉(zhuǎn)動的步長。
  • wheelSize:時間輪中時間槽的數(shù)量。
  • timeWheel[]:時間輪的數(shù)組,每個元素對應(yīng)一個時間槽(Bucket),用來存儲延時任務(wù)。

2.2.1 add()方法

add()方法用于將延時任務(wù)添加到時間輪中。它通過計算任務(wù)的超時時間,確定該任務(wù)應(yīng)該存放在哪個時間槽中。計算方式是根據(jù)當(dāng)前時間和任務(wù)的超時時間,確定需要經(jīng)過多少個tick,然后取模得到對應(yīng)的時間槽。

long expiration = operation.expirationMs();
long delayMs = expiration - System.currentTimeMillis();
int bucketIndex = (int) ((delayMs / tickMs) % wheelSize);
timeWheel[bucketIndex].add(operation);

這樣,Kafka可以將延時任務(wù)按超時時間分布到不同的時間槽中,隨著時間輪的轉(zhuǎn)動逐漸觸發(fā)這些任務(wù)。

2.2.2 advanceClock()方法

advanceClock()方法用于推進時間輪的時鐘。當(dāng)時間輪的時鐘前進時,會檢查當(dāng)前時間槽中的任務(wù),觸發(fā)已經(jīng)到期的任務(wù)。

long currentTimeMs = System.currentTimeMillis();
int currentBucket = (int) ((currentTimeMs - startMs) / tickMs % wheelSize);
timeWheel[currentBucket].advance();

這個方法會計算當(dāng)前的時間槽索引,并處理當(dāng)前槽中的任務(wù)。Kafka通過不斷推進時間輪的時鐘,逐步觸發(fā)延時任務(wù)的執(zhí)行。

2.2.3 TimerTaskList類

時間輪中的每個時間槽是一個TimerTaskList對象,它存儲了當(dāng)前槽中的所有延時任務(wù)。TimerTaskList類的實現(xiàn)如下:

public class TimerTaskList {
    private final List<DelayedOperation> tasks = new LinkedList<>();

    // 添加任務(wù)
    public void add(DelayedOperation operation) {
        tasks.add(operation);
    }

    // 觸發(fā)到期任務(wù)
    public void advance() {
        Iterator<DelayedOperation> iterator = tasks.iterator();
        while (iterator.hasNext()) {
            DelayedOperation task = iterator.next();
            if (task.isExpired()) {
                task.run();
                iterator.remove();
            }
        }
    }

    public int size() {
        return tasks.size();
    }
}

TimerTaskList通過鏈表存儲延時任務(wù),并在時鐘推進時檢查任務(wù)是否到期,執(zhí)行到期任務(wù)并將其從列表中移除。

三、Kafka中的延遲處理示例

接下來我們結(jié)合Kafka的具體場景,來看一下DelayedOperation是如何被應(yīng)用的。一個典型的例子就是消費者組協(xié)調(diào)器(GroupCoordinator)中的延遲處理。

3.1 消費者組協(xié)調(diào)器中的延遲請求

在Kafka的消費者組管理中,延遲請求被廣泛應(yīng)用。比如,當(dāng)一個消費者加入或離開消費者組時,協(xié)調(diào)器需要等待一段時間,直到確定沒有其他消費者的變更請求,這時就需要使用延遲操作來處理請求。

在GroupCoordinator中,有一個completeJoinGroupRequest()方法,它通過延遲操作來管理消費者加入組的請求:

public void completeJoinGroupRequest(String groupId, int memberId, long timeoutMs) {
    DelayedJoinGroup delayedJoin = new DelayedJoinGroup(groupId, memberId, timeoutMs);
    this.timer.add(delayedJoin);
}

這里DelayedJoinGroup是`

DelayedOperation的一個子類,用來處理消費者加入組的邏輯。它會被添加到timer`中,并在超時后觸發(fā)執(zhí)行。

3.2 DelayedOperation類

DelayedOperation是Kafka中所有延遲任務(wù)的基類,定義了延遲任務(wù)的基本行為。它的核心方法如下:

public abstract class DelayedOperation {

    private final long deadlineMs;

    public DelayedOperation(long timeoutMs) {
        this.deadlineMs = System.currentTimeMillis() + timeoutMs;
    }

    // 檢查任務(wù)是否超時
    public boolean isExpired() {
        return System.currentTimeMillis() >= deadlineMs;
    }

    // 執(zhí)行任務(wù)
    public abstract void run();
}

DelayedOperation通過isExpired()方法判斷任務(wù)是否超時,并通過run()方法執(zhí)行任務(wù)。Kafka中很多延時任務(wù)都是基于這個類實現(xiàn)的。

四、總結(jié)

通過分析Kafka中的Timer和SystemTimer類,我們深入了解了Kafka如何通過分層時間輪實現(xiàn)高效的延時任務(wù)調(diào)度機制。Kafka的延時處理不僅應(yīng)用于消費者組協(xié)調(diào)器,還廣泛用于副本管理、控制器等模塊。

延時處理機制通過將任務(wù)分層存儲,極大地提高了Kafka處理大量延時任務(wù)的性能。這種機制的設(shè)計既簡潔又高效,適用于大規(guī)模分布式系統(tǒng)的延時任務(wù)處理需求。

責(zé)任編輯:武曉燕 來源: 架構(gòu)師秋天
相關(guān)推薦

2022-07-01 07:31:18

AhooksDOM場景

2023-10-04 07:35:03

2023-09-19 22:41:30

控制器HTTP

2018-06-24 08:53:42

Tomcat理搜索引擎爬蟲

2021-01-18 05:13:04

TomcatHttp

2021-01-21 09:09:18

時區(qū)轉(zhuǎn)換程序

2022-08-13 12:13:13

RTOS延時代碼

2022-06-13 11:05:35

RocketMQ消費者線程

2021-06-17 09:32:39

重復(fù)請求并發(fā)請求Java

2023-08-07 08:32:05

RocketMQ名字服務(wù)

2017-08-11 14:28:02

58同城推薦系統(tǒng)

2021-07-27 14:50:15

axiosHTTP前端

2020-11-11 14:19:17

隱私APP設(shè)計

2022-07-04 09:15:10

Spring請求處理流程

2019-11-27 11:10:58

TomcatOverviewAcceptor

2011-05-06 15:54:47

Service BroSQL Server

2018-10-22 13:23:29

MySQL主從延時線程

2021-08-06 11:24:35

域名劫持網(wǎng)站安全網(wǎng)絡(luò)攻擊

2011-05-06 15:48:35

Service BroSQL Server

2009-07-08 13:31:23

調(diào)用Servlet處理
點贊
收藏

51CTO技術(shù)棧公眾號

主站蜘蛛池模板: 亚洲成人三级 | 国产欧美一区二区久久性色99 | 国内精品久久久久久影视8 最新黄色在线观看 | 在线三级电影 | 亚洲网址 | 国产在线观看 | 男女羞羞视频大全 | 91精品国产91久久久久游泳池 | 日本高清视频在线播放 | 成人自拍av | 日韩欧美三级电影在线观看 | 亚洲激情视频在线 | 久久午夜影院 | 精品久久一区 | 91视频久久| 99精品视频免费观看 | 国产成人综合网 | 男女羞羞免费视频 | 亚洲一卡二卡 | 亚洲高清视频一区 | 国内自拍视频在线观看 | 色免费看 | 色综合天天天天做夜夜夜夜做 | 欧美久久久网站 | 国产一区二区欧美 | 视频一区二区在线观看 | 福利视频网站 | 男女污污动态图 | 国产99久久精品一区二区永久免费 | 国产成人精品久久二区二区91 | xxxxxx国产 | 91精品国产综合久久久久久丝袜 | 一级黄片一级毛片 | 日本视频一区二区三区 | 久久久久久成人 | 成人在线视频一区 | 亚洲精品一区中文字幕乱码 | 欧洲精品一区 | 免费视频中文字幕 | 久久久久国产视频 | 国产精品视频免费看 |