vivo Pulsar萬億級消息處理實踐(1)-數據發送原理解析和性能調優
一、Pulsar 簡要介紹
Pulsar是新一代的云原生消息中間件,由Apache軟件基金會孵化和開源。它的設計目的是為了滿足現代數據處理和計算應用程序對可擴展性、可靠性和高性能的需求,具備存儲與計算分離、節點對等、獨立擴展、實時均衡、節點故障快速恢復等特性。
Pulsar由四個核心模塊組成:broker、bookKeeper和client(Producer和Consumer)、zk(元數據管理和節點協調)。broker接受來自Producer的消息,將消息路由到對應的topic;bookKeeper用于數據持久化存儲和數據復制;Consumer消費topic上的數據。Pulsar支持多種編程語言和協議(如Java、C++、Go、Python等),可以運行在云、本地和混合環境中,擴展性好,支持多租戶和跨數據中心復制等特性。因此,Pulsar被廣泛應用于云計算、大數據、物聯網等領域的實時消息傳遞和處理應用中。
二、Pulsar Producer解析
首先需要了解Producer的數據發送流程,這里以“開啟壓縮、batch發送消息給partitioned topic“這樣的一個線上常規場景為例,解析數據的發送的關鍵環節。
tips:
在Pulsar中有無分區(Non-Partitioned)Topic 和有分區 (Partitioned) 的 Topic之分,Partitioned topic最小分區數為1,為滿足任務的拓展性,在線上一般使用Partitioned topic。
2.1 消息生產與發送的詳細流程
Producer發送數據主要分為12個步驟:
① 創建Producer:
partitioned topic創建的是一個Partitioned-
ProducerImpl對象,該對象包含了所有分區及其對應的ProducerImpl對象,ProducerImpl對象負責所對應分區數據的維護和發送。
② 構造消息:
一條消息被發送前首先會被封裝成為一個Message對象,對象中包含了所發送的topic name、消息體、消息大小、schema類型、metadata(是否指定key等)等信息。
③ 確定目標分區:
在發送消息前需要通過路由策略決定發往哪一個分區,選擇對應分區的ProducerImpl對象進行進一步處理。
④ 攔截器:
Producer可以設置自定義的攔截器,攔截器需要實現producerInterceptor接口,在消息發送前可對消息進行攔截修改。
⑤ 消息堆積控制:
Producer可以處理的消息是有限的,接收新的消息時會分別進行信號量和內存使用率校驗,控制接收消息的速率,防止消息無限在本地堆積。
⑥ batch容器管理:
默認情況下分好區的消息不是直接被發送,而是放入了生產者的一個batch緩存容器中里面。在這個緩存里面,多條消息會被封裝成為一個批次(batch)。
⑦ 消息序列化:
Pulsar 的消息需要從客戶端傳到服務端,涉及到網絡傳輸,因此Producer將batch緩沖區中的所有消息逐一進行序列化。
⑧ 壓縮:
Pulsar內置了多種壓縮算法,在發送前會根據所選擇的壓縮算法對batch整體進行壓縮,這將優化網絡傳輸以提高Pulsar消息傳輸的性能。
⑨ 構建消息發送對象:
無論是開啟batch的批次消息,還是關閉batch的單條消息,都會被包裝為一個OpSendMsg對象,OpSendMsg也是Producer發送和pulsar broker接收處理的最小單位。
⑩ pending隊列:
所有構建好的OpSendMsg在發送前都會被放入pendingMessages隊列中,消息處理完成后才會從隊列中移除。
? 消息傳輸:
Pulsar 使用netty將消息異步的從客戶端發送到服務端,Broker節點將在收到消息后對其進行確認,并將其存儲在指定主題的持久存儲中。
? 響應處理:
Pulsar Broker 在收到消息時會返回一個響應,如果寫入成功,消息將會從pendingMessages隊列中移除。如果寫入失敗,會返回一個錯誤,生產者在收到可重試錯誤之后會嘗試重新發送消息,直到重試成功或超時。
2.2 關鍵環節原理分析
接下來會對上述流程中關鍵環節的設計和原理作進一步的剖析,幫助讀者更好的理解Producer。
2.2.1 創建Producer
在Pulsar中,PartitionedProducerImpl用于將多個ProducerImpl對象包裝成為一個邏輯生產者,以便向Partitioned Topic發送消息時能夠批量操作。其中,PartitionedProducerImpl.producers成員變量維護了每個分區及其對應的ProducerImpl對象,該設計主要有以下3個好處:
① 每個分區對應一個單獨的生產者:
在Pulsar中,Partitioned Topic按照分區(Partition)將多個 ProducerImpl 對象進行分配,以便能夠同時發往多個 Broker 節點,因此對于每個分區,需要擁有一個單獨的生產者以便進行發送操作。在 PartitionedProducerImpl 類中,需要為每個分區維護一個 ProducerImpl 對象,以便在消息被分配好“目標分區”后可以調用對應的ProducerImpl進行處理。
②簡化代碼邏輯:
在PartitionedProducerImpl中,將每個分區及其對應的ProducerImpl對象維護在一個HashMap中,能夠更加方便的維護并管理不同分區的生產者,使得代碼邏輯更加清晰簡明。
③ 提高容錯性:
當某個分區的ProducerImpl對象無法工作時,可以選擇其他可用的ProducerImpl對象,從而保證系統整體的可用性。由于將不同分區的ProducerImpl對象分別進行維護,因此具備更加靈活的容錯處理策略。
在線上實踐中我們也基于該設計,在PartitionedProducerImpl層做了進一步優化,通過感知下一層每個ProducerImpl的阻塞狀態(信號量的使用情況)來決定新的消息發送,避免將消息持續發往阻塞較為嚴重的分區,規避了topic被某一個分區阻塞而影響到整體發送性能的情況,也提高了線上系統的穩定性,具體的實現可以詳見這篇文章《構建下一代萬億級云原生消息架構:Apache Pulsar 在 vivo 的探索與實踐》。
關鍵代碼:
//對每一個分區都創建一個ProducerImpl對象
private void start(List<Integer> indexList) {
AtomicReference<Throwable> createFail = new AtomicReference<Throwable>();
AtomicInteger completed = new AtomicInteger();
for (int partitionIndex : indexList) {
createProducer(partitionIndex).producerCreatedFuture().handle((prod, createException) -> {
.......
});
}
}
private ProducerImpl<T> createProducer(final int partitionIndex) {
return producers.computeIfAbsent(partitionIndex, (idx) -> {
String partitionName = TopicName.get(topic).getPartition(idx).toString();
return client.newProducerImpl(partitionName, idx,
conf, schema, interceptors, new CompletableFuture<>());
});
}
2.2.2 確定目標分區
在發送消息前需要決定發往哪一個分區,確定好分區后便調用對應分區的ProducerImpl對象進一步處理,目標分區的確定主要跟“路由策略”和“是否指定key”有關:
(1)如果消息沒有指定key:則按照三種路由策略的效果選擇分區進行發送,三種路由策略如下:
- SinglePartition:
如果消息沒有指定Key,Producer會隨機選擇一個 Partition,然后把所有的消息都發送到這個 Partition上。 - RoundRobinPartition:
生產者將以輪詢方式在所有 Partition之間發布消息,以實現最大吞吐量。需要注意的是如果開啟了batch發送,則輪詢將會以批為單位進行消息發送,批次發送時每隔partitionSwitchMs會輪詢一個 Partition。如果關閉了批量發送,那么每條消息發送都會輪詢一個Partition。(partitionSwitchMs至少為一個batchingMaxPublishDelay時間)。 - CustomPartition:
使用用戶自定義的消息路由實現,根據自定義的 Router實現決定消息要發往哪個分區。用戶自定義的 Router可以通過 messageRoute參數設置。自定義的 Router需要實現 MessageRouter接口的 choosePartition方法。
(2)如果消息指定key:則會對Key做哈希處理,然后找到對應的 Partition,把key所對應的消息都發送到同一個分區:
對消息的Key進行哈希處理后如何找到對應的 Partition的?即使用Key的哈希值對總的 Partition數取模:N=(Key的哈希值%總的 Partition數),得到的N就是第N個 Partition,Producer可以通過設置 hashingscheme來使用不同的哈希算法 ,現在已經支持 JavastringHash和 Murmur3_32Hash兩種哈希算法,前者直接調用String.hash.Code(),后者使用Murmur3。
路由策略的關鍵代碼如下:
//SinglePartition路由策略:
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
// If the message has a key, it supersedes the single partition routing policy
if (msg.hasKey()) {
return signSafeMod(hash.makeHash(msg.getKey()), metadata.numPartitions());
}
return partitionIndex;
}
//RoundRobin路由策略:
public int choosePartition(Message<?> msg, TopicMetadata topicMetadata) {
// If the message has a key, it supersedes the round robin routing policy
if (msg.hasKey()) {
return signSafeMod(hash.makeHash(msg.getKey()), topicMetadata.numPartitions());
}
if (isBatchingEnabled) { // if batching is enabled, choose partition on `partitionSwitchMs` boundary.
long currentMs = clock.millis();
return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, topicMetadata.numPartitions());
} else {
return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), topicMetadata.numPartitions());
}
}
2.2.3 消息堆積控制
Producer不可能無限接收新的消息,如果某些分區數據發送較慢,消息就會堆積在Prouducer緩存中,導致已經阻塞的分區堆積大量的消息,又無法重新發往其他分區,同時也可能因為無限堆積的消息占用了大量的內存,使得任務頻繁GC甚至OOM。
在Pulsar提供了兩個核心的速率限制策略和一個阻塞時的消息處理策略:
- 消息數量限制:
maxPendingMessages控制每個分區某一時刻最大可處理消息數量,通過信號量的方式控制“新進入的消息”的信號量分配和“處理完成消息“的信號量釋放,防止某個分區的消息嚴重堆積。 - 消息占用內存大小限制:
memoryLimit控制整個Pulsar client的消息最大占用內存大小,通過計數器方式控制“新進入的消息”有效載荷的內存分配和“處理完成消息“有效載荷的內存釋放,這里需要特殊說明的是memoryLimit是client的參數,針對的是該client對象下的所有topic,因此并不建議一個Pulsar client對象new多個Producer topic ,因為很容易出現某一個topic占用內存過多,導致另一個topic無空間可分配的情況。 - 阻塞處理策略:由blockIfQueueFull進行控制,當blockIfQueueFull為true時,代表阻塞等待,Producer會等待獲取信號量;當blockIfQueueFull為false時,一旦獲取不到信號量,就會立刻失敗,需要注意的是如果blockIfQueueFull為false,業務需要處理好消息失敗后的回調策略,否則會導致數據在Producer上“丟失”。
關鍵代碼如下:
public void sendAsync(Message<?> message, SendCallback callback) {
......
MessageImpl<?> msg = (MessageImpl<?>) message;
MessageMetadata msgMetadata = msg.getMessageBuilder();
ByteBuf payload = msg.getDataBuffer();
int uncompressedSize = payload.readableBytes();
//對發送隊列大小以及client memory進行判斷是否有空間放入新的消息
if (!canEnqueueRequest(callback, message.getSequenceId(), uncompressedSize)) {
return;
}
......
}
private boolean canEnqueueRequest(SendCallback callback, long sequenceId, int payloadSize) {
try {
if (conf.isBlockIfQueueFull()) {
//當blockIfQueueFull為true時,等待獲取信號量
if (semaphore.isPresent()) {
semaphore.get().acquire();
}
//分配消息有效載荷所需要的內存空間
client.getMemoryLimitController().reserveMemory(payloadSize);
} else {
//當blockIfQueueFull為false時,如果無法獲取到信號量,則快速失敗
if (!semaphore.map(Semaphore::tryAcquire).orElse(true)) {
callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full", sequenceId));
return false;
}
//如果沒有如何的內存空間用于消息分配,則報錯
if (!client.getMemoryLimitController().tryReserveMemory(payloadSize)) {
semaphore.ifPresent(Semaphore::release);
callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError("Client memory buffer is full", sequenceId));
return false;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
callback.sendComplete(new PulsarClientException(e, sequenceId));
return false;
}
return true;
}
2.2.4 消息batch容器打包
(1)batch關鍵組成信息
- Messages: 保存消息的list,保存跟這個batch相關所有的MessageImpl對象。
- Metadata:保存batch相關的元數據,如批量消息的序列號、消息發送的時間戳等信息。
- Callback:保存消息回調邏輯的集合,記錄了每一條消息對應的callback策略,在batch消息發送并等到服務端響應后,依次對消息的回調進行處理。
(2)batch打包條件
batch打包條件的三個關鍵參數:滿足其一數據就會被打包發送出去。
- 批次大小:batchingMaxBytes
- 批次條數:batchingMaxMessages
- 批次延遲發送時間:
batchingMaxPublishDelay
Pulsar使用兩個模塊設計來實現上面的參數控制:
- accumulator:在 BatchMessage-
ContainerImpl 中通過計數器的方式去控制batch的大小和條數,numMessages-
InBatch記錄已經緩存的消息數量,currentBatchSizeBytes用于記錄已緩存的消息的大小。當這些變量達到閾值時,BatchMessageContainerImpl 將會觸發批量消息的發送。 - batchTimerTask:當生產者使用批量消息發送模式時,Producer將會創建一個定時器任務(batchTimerTask),并通過計時器的方式定時將BatchMessageContainer容器中的消息進行批量發送。
2.2.5 消息壓縮
如果開啟了消息壓縮,在發送前都需要進行壓縮處理。對于單條消息發送的場景,是對每一條消息進行單獨壓縮后進行發送;而如果開啟了batch則是對整個batch進行壓縮后再整個進行發送。
在線上實踐中,推薦在不影響業務延遲的情況下batch越大越好,主要有兩個理由:
- 可以優化網絡IO降低CPU負載:
不論Producer發送的是一條消息還是一批消息,在pulsar客戶端都會被構建為一個OpSendMsg對象,同時pulsar broker接收到消息進行寫入處理時,也是按照OpSendMsg為一個處理單位將消息寫入磁盤,因此當消息數量一定時,batch越大,則代表需要處理的OpSendMsg越少。 - batch越大“壓縮效果則越好”:
壓縮算法對應的壓縮率并不固定,它通常取決于所要壓縮的數據對象的內容和壓縮算法本身,壓縮的本質在于通過消除或利用數據中存在的冗余來實現數據的壓縮和重構。而Pulsar是以batch來進行打包的,batch越大,壓縮的目標包體越大壓縮效果則可能越好,同時也能夠盡可能避免單條消息因為包體較小導致越壓縮后包體越大的情況出現。
以下是開啟了batch情況下,構建發送消息和壓縮的關鍵代碼:
public OpSendMsg createOpSendMsg() throws IOException {
//對數據進行壓縮、加密等操作
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
......
ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), numMessagesInBatch, messageMetadata, encryptedPayload);
//對整個batch構建一個OpSendMsg
OpSendMsg op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), firstCallback);
......
return op;
}
//對batch進行壓縮,并將壓縮后信息更新到messageMetadata中
private ByteBuf getCompressedBatchMetadataAndPayload() {
......
int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
batchedMessageMetadataAndPayload.release();
......
return compressedPayload;
}
2.2.6 pending隊列
Pulsar 中的 pendingMessages隊列是客戶端用來暫存“未處理完成的消息”的一個緩存隊列。用于存儲當Producer連接到 Broker 服務器后,還未發送或尚未得到 Broker 系統的 ACK 確認的所有生產者(Producer)的消息。在發送消息之前,Producer 首先會將消息緩存到 pendingMessages 隊列中,并記錄保存緩存消息的OpSendMsg對象,直到它被成功發送到了 Broker 端并收到 Broker 發送的ACK 確認之后,相關的元信息和消息信息才會從隊列中移除。
需要注意的是:pending隊列的本質是一個回調處理隊列,而不是發送隊列,消息在放入pending隊列的同時就被異步發送到服務端了,所以這里需要重點理解什么是“未處理完成的消息”。
pendingMessages 隊列的作用在于:對于已經發送但尚未收到 ACK 確認的消息,防止在連接出現異常時丟失消息。當連接中斷時,緩存在 pendingMessages 隊列中的未確認消息將被認為是需要重發的,當連接恢復時,緩存的消息將重新發送到 Broker 端,以確保生產者生產的消息不會丟失。
總的來說,pendingMessages 隊列是 Pulsar 客戶端保證消息可靠性和一致性的關鍵功能組件,在 Pulsar 的生產者(Producer)和消息確認的機制中擔任著非常重要的角色。
關鍵代碼如下:
add() 方法用于在追加消息時將指定元素插入隊列中的隊尾,remove() 用于消息在完成后移除隊列頭部的元素。
protected void processOpSendMsg(OpSendMsg op) {
if (op == null) {
return;
}
try {
if (op.msg != null && isBatchMessagingEnabled()) {
batchMessageAndSend();
}
//將消息放入“待處理消息隊列”
pendingMessages.add(op);
......
// If we do have a connection, the message is sent immediately, otherwise we'll try again once a new
// connection is established
op.cmd.retain();
cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
......
}
//添加消息到pendingMessages隊列
public boolean add(OpSendMsg o) {
// postpone adding to the queue while forEach iteration is in progress
//batch的計數是按照batch中消息的總量進行計數
messagesCount.addAndGet(o.numMessagesInBatch);
if (forEachDepth > 0) {
if (postponedOpSendMgs == null) {
postponedOpSendMgs = new ArrayList<>();
}
return postponedOpSendMgs.add(o);
} else {
return delegate.add(o);
}
}
//將消息從pendingMessages隊列移除
public void remove() {
OpSendMsg op = delegate.remove();
if (op != null) {
messagesCount.addAndGet(-op.numMessagesInBatch);
}
}
2.2.7 消息傳輸
Producer和broker都維護了分區維度的pending隊列來保證消息處理的順序性,以及實現消息重新發送、重新寫入持久化存儲的能力。在Producer端,消息被順序追加到pending隊列并異步發送到服務端,服務端的pending隊列在接收到消息后,按照順序追加到隊列中,并按照順序將數據寫入bookie進行持久化處理,處理完成后按照順序返回響應Producer,并將消息從broker pending和producer pending隊列中移除。
另外在數據傳輸過程中,無論是使用Pulsar Producer的同步發送還是異步發送,在消息傳輸環節本質上都是使用netty將消息異步的從客戶端發送到服務端,區別在于send() 方法封裝了 sendAsync() 方法,使其可以在向服務器發送 Pulsar 消息時阻塞等待 Broker 的響應,直到確認消息已經被 Broker 成功處理后才會返回,常規情況下,建議使用異步的方式發送 Pulsar 消息,因為同步方式必須在 Broker 端成功接收到消息之后才會返回,因此會帶來較大的性能損耗和延遲。但是在部分場景下,需要使用同步方式來保證可靠性,以防 Broker 端接收失敗,可以考慮使用 send() 方法實現同步方式的方式發送 Pulsar 消息。
使用netty執行的代碼:
private static final class WriteInEventLoopCallback implements Runnable {
......
@Override
public void run() {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", producer.topic, producer.producerName, cnx,
sequenceId);
}
try {
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
op.updateSentTimestamp();
} finally {
recycle();
}
}
......
}
2.2.8 處理響應
Pulsar Producer 使用“ACK 跟蹤機制”來實現對 Broker 返回的 ACK 確認消息的處理,用于檢測和處理到達生產者的全部消息狀態信息。
對于Producer發送的消息,Pulsar會對每個消息分配一個唯一的 sequenceId 序號,并記錄該消息的創建時間(createdAt)等元數據信息。當 Broker 確認收到某個消息時,Producer 會依據返回的 ACK 序號和 Broker 返回的確認時間來判斷當前 ACK 是否有效,并從已緩存的 pendingMessages 隊列中找到對應的消息元數據信息,以進行確認處理,在 Broker 確認消息接收成功時,Producer 將從等待確認的消息隊列中刪除對應的消息元數據信息,如果 Broker 返回的 ACK 消息不符合生產者預期的消息狀態信息,Producer 將會重發消息,直到重試成功或多次重試失敗后拋出異常后再從隊列中移除對應消息元數據信息并釋放對應內存、信號量等資源。
消息重發的關鍵代碼如下:
private void resendMessages(ClientCnx cnx, long expectedEpoch) {
cnx.ctx().channel().eventLoop().execute(() -> {
synchronized (this) {
//判斷連接狀態:當連接正在關閉或者已經關閉則不進行重發
if (getState() == State.Closing || getState() == State.Closed) {
// Producer was closed while reconnecting, close the connection to make sure the broker
// drops the producer on its side
cnx.channel().close();
return;
}
......
//調用重發消息方法
recoverProcessOpSendMsgFrom(cnx, null, expectedEpoch);
}
});
}
// Must acquire a lock on ProducerImpl.this before calling method.
private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, long expectedEpoch) {
......
final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
OpSendMsg pendingRegisteringOp = null;
while (msgIterator.hasNext()) {
OpSendMsg op = msgIterator.next();
......
op.cmd.retain();
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", topic, producerName,
cnx.channel(), op.sequenceId);
}
//發送消息
cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
op.updateSentTimestamp();
stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
}
cnx.ctx().flush();
......
}
三、Pulsar 數據發送端參數調優實踐
根據以上對原理解析,我們對Producer已經有了一個大致理解,下面通過一個Producer參數調優實踐案例來幫助讀者基于原理進一步理解客戶端參數之間的聯系。
3.1 調優目的
首先要清楚為什么要進行參數調優,有以下兩個目的:
- 降低參數使用門檻:
Pulsar client和Producer的幾十個配置參數,參數多且聯系緊密,需要花費較多的時間成本去理解,同時參數之間存在協同生效互相影響的情況,對普通使用者而言場景復雜理解門檻高,我們希望能夠有一套較為通用的參數配置,或有公式化的參數配置方法論。 - 提升單機處理性能:
站在客戶端的角度,相同時間內處理的數據量越多,則認為單機處理性能更強。作為中間件系統的提供者,我們經常認為性能提升是服務端的事情,想盡辦法在pulsar的broker和bookie上去提升單機處理性能,但pulsar client作為整個消息中間件系統的核心組件,它能否發送好一份數據,對整個消息中間件系統的性能和穩定性也發揮著至關重要的作用。
3.2 調優實踐
下面就圍繞“參數通用模版化”和“提升單機處理性能”兩個目的出發并結合上述講解的數據發送原理,來分享一些實踐經驗。
3.2.1 關聯與場景相關的重點參數
Pulsar客戶端參數雖多但都提供了默認值,不需要一一調整。只需要對業務場景相關的針對性的去調整即可,如我們本次的參數調優目的是提升單機處理性能,則重點關注哪些場景哪些參數可以提升客戶端的發送速率、降低服務端的壓力,讓服務端可以處理更多的數據,有以下四點最為關鍵:
- batch打包發送:
消息多條批次發送,在降低客戶端和服務端網絡IO的同時也降低了兩者的cpu的負載。這里需強調的是我們希望batch是一個均勻的、“完整”的包,如pending隊列被打滿,batch只能空等到延遲發送時間過后被發送,沒有構建出預期中的batch,那么可以認為這個batch是一個不完整的包,這種batch包含的數據量少,對發送效率有著極大的影響。 - 數據壓縮:
Pulsar是IO密集型系統,常規情況下磁盤是系統的主要瓶頸,開啟壓縮可以有效降低網絡I/O,提升處理相同數據量下的讀寫能力。由于壓縮是針對batch的,在發送時間一定的情況下,batch越大其壓縮效果也越好,代表著處理的消息量也更多。 - RoundRobin發送:
將數據均勻地分配到多個分區中。它的基本思想是輪詢將新的數據寫入到不同的分區中,以均衡地分散負載。 - 消息堆積控制:maxPendingMessages信號量和memoryLimit限制不直接提升發送速率,但它能夠有效保障我們客戶端的穩定,也是控制或限制發送效率的重要參數之一。
涉及的客戶端關鍵參數以及默認值和我們線上調優后設置的數值如下表:
3.2.2 結合Producer發送原理分析參數的效果
接下來我們以參數的效用角度來描述一條消息從構建到發送的過程,進一步解釋參數如此設置的意義:
(1)選擇分區
構建消息后,通過messageRoutingMode參數所設置的路由策略來選擇分區,這里以RoundRobinPartition為路由策略,開啟batch時則每間隔partitionSwitchMs時間換一個分區進行數據發送,partitionSwitchMs的值為“batchingPartitionSwitchFrequencyByPublish
-Delay、batchingMaxPublishDelayMicros”這兩個Producer參數之積,也就是每batchingPartition
-SwitchFrequencyByPublishDelay個batch的最大打包時間,消息就會輪換一個分區發送。
為了能在batchingMaxPublishDelayMicros內得到一個較大的包,我們希望這個batch接收的消息是連續的,因此batchingPartitionSwitchFrequency-
ByPublishDelay不能小于1,同時也希望一個分區之間數據是較為均勻的,所以batchingPartition-
SwitchFrequencyByPublishDelay也要盡量小,否則分區對應的信號量maxPendingMessages耗盡還沒有切換分區,就會導致batch必須等待一個batchingMaxPublishDelayMicros時間。因此將batchingPartitionSwitchFrequencyByPublishDelay修改成了1,保證打包了一個batch之后就切換分區,這也極大的避免了分區信號量耗盡,出現發送阻塞。
(2)消息堆積控制
maxPendingMessages作為分區的信號量,也是“pending隊列”的大小,代表著每個分區能夠同時處理的最大消息上限,而maxPendingMessages-
AcrossPartitions則是針對整個topic生效的,maxPendingMessages=min( maxPending-
Messages,maxPendingMessagesAcrossPartitions/Partition),由于線上分區可能會變化,有不確定性,因此就使用上而言除非有特殊的使用場景,建議將maxPendingMessagesAcrossPartitions設置的比較大,讓maxPendingMessages生效即可。
除了maxPendingMessages以外,消息能否接收被放入pending隊列中,還要看當前正在處理的消息體大小總和是否超過了memoryLimit參數的限制,memoryLimit控制了消息待處理隊列中未壓縮前的消息有效荷載總和,可以避免在消息有效荷載非常大時,還未觸發maxPendingMessages限制,就導致內存占用過多出現頻繁GC和oom的問題。由于memoryLimit是client級別的策略,因此也建議一個client對應一個Poducer。
總而言之maxPendingMessages控制了每個分區可以處理消息數量的上限,memoryLimit控制了所有分區可以消息占用內存的上限,兩者相輔相成。
(3)消息batch容器打包
決定一個batch是否打包完成有三個條件控制,batchingMaxBytes、batchingMaxMessages、batchingMaxPublishDelayMicros滿足其一即可,根據這三個參數的含義去設置值看似是容易的,但容易忽略的是batch中用來打包的消息也是受memoryLimit和maxPendingMessages制約的,應該避免出現batch中消息的數量超過memoryLimit和maxPendingMessages導致batch打包效率受影響。舉個例子,當maxPendingMessages設置為500,而batchingMaxMessages設置1000時,batch就永遠無法滿足消息條數達到1000的條件,只能空等batchingMaxPublishDelayMicros或者batchingMaxBytes兩者生效。
3.2.3 公式化模版
通過上述分析,大致了解了關鍵參數的生效效果,且彼此相互關聯,根據這些關系就能夠輸出一個較為簡單的參數調優模版。
假設我們發送的單條消息大小為:messageByte;分區數量為:partitionNum。
那么對應參數調整公式如下:
//業務發送速率越大,這里設置的值越大
maxPendingMessages:一般1000-2000之間
//這里值也可以設置大一些,讓maxPendingMessages生效即可
maxPendingMessagesAcrossPartitions = maxPendingMessages * partitionNum
//memoryLimit的值就是打算阻塞總消息大小,這與消息體和maxPendingMessages有關
memoryLimit=(maxPendingMessages * partitionNum * messageByte)
//batch的條數不超過“待處理消息隊列”大小的一半
batchingMaxMessages=maxPendingMessages/2,這樣可以保證在消息發送等待ack的時候,該分區剩下一半的空間還能用來構建一個batch
//batch大小同理,batch大小不超過“待處理消息隊列”消息大小的一半
batchingMaxBytes= Math.min(memoryLimit * 1024 * 1024 /partitionNum/2,1048576)
//業務能夠接受的延遲大小,一般延遲時間越大,batch越大
batchingMaxPublishDelayMicros=1ms-100m皆可
//每構建一個batch就轉換一個分區
batchingPartitionSwitchFrequencyByPublishDelay=1
可以看到根據上面的分析,參數之間是有一個模版化的公式,但這也不是唯一的,讀者可以根據自己的業務場景進行調整。在真實使用過程中線上的消息大小以及分區數量實際上是會變化的,因此真正的參數設置還需要根據實際情況來確定,比如我們線上通常的做法是根據機器配置將memoryLimit直接設置為64M-256M,分區數量我們線上不會超過1000,那么這里就假設為1000,確定了這兩個參數,其他的參數的值也就確定了。
3.2.4 效果對比
以線上一個業務參數調優為例,前后都開啟壓縮的情況下調整上述參數后的一個效果。
服務端(Pulsar):
優化前后對比數據:
相同的寫入速率,Pulsar服務端網卡流量縮減約50%(batch包體增加,壓縮效果提升),cpu負載降低約90%,Pulsar服務端總體成本相較優化前至少可降低50%以上,客戶端也有一定程度的負載降低。
參數調整后,CPU負載得到明顯降低,一定程度上避免了CPU成為系統的瓶頸,同時由于壓縮效果的提升,Pulsar 的磁盤IO負載得到顯著降低,可以用更少的機器處理更多的數據。
四、總結
理解Producer發送原理以及核心參數是寫好數據發送程序最為有效的手段,最簡單的客戶端參數優化反而隱藏了巨大的收益。本文通過對Producer原理進行剖析、對消息的流轉過程中參數效用進行講解,并配合參數調優實踐案例,介紹了具體的分析思路和調優的方法,在實際使用過程中通過對核心的幾個上游系統進行調優,服務端單機處理能力至少提升了一倍以上,成本得到了極大的降低。
參考文章: