字節跳動百萬級Metrics Agent性能優化的探索與實踐
背景
圖片
metricserver2 (以下簡稱Agent)是與字節內場時序數據庫 ByteTSD 配套使用的用戶指標打點 Agent,用于在物理機粒度收集用戶的指標打點數據,在字節內幾乎所有的服務節點上均有部署集成,裝機量達到百萬以上。此外Agent需要負責打點數據的解析、聚合、壓縮、協議轉換和發送,屬于CPU和Mem密集的服務。兩者結合,使得Agent在監控全鏈路服務成本中占比達到70%以上,對Agent進行性能優化,降本增效是刻不容緩的命題。
基本架構
圖片
- Receiver 監聽socket、UDP端口,接收SDK發出的metrics數據
- Msg-Parser對數據包進行反序列化,丟掉不符合規范的打點,然后將數據點暫存在Storage中
- Storage支持7種類型的metircs指標存儲
- Flusher在每個發送周期的整時刻,觸發任務獲取Storage的快照,并對其存儲的metrics數據進行聚合,將聚合后的數據按照發送要求進行編碼
- Compress對編碼的數據包進行壓縮
- Sender支持HTTP和TCP方式,將數據發給后端服務
我們將按照數據接收、數據處理、數據發送三個部分來分析Agent優化的性能熱點。
數據接收
Case 1
Agent與用戶SDK通信的時候,使用 msgpack 對數據進行序列化。它的數據格式與json類似,但在存儲時對數字、多字節字符、數組等都做了優化,減少了無用的字符,下圖是其與json的簡單對比:
圖片
Agent在獲得數據后,需要通過msgpack.unpack
進行反序列化,然后把數據重新組織成 std::vector。這個過程中,有兩步復制的操作,分別是:從上游數據反序列為 msgpack::object 和 msgpack::object 轉換 std::vector。
{ // Process Function
msgpack::unpacked msg;
msgpack::unpack(&msg, buffer.data(), buffer.size());
msgpack::object obj = msg.get();
std::vector<std::vector<std::string>> vecs;
if (obj.via.array.ptr[0].type == 5) {
std::vector<std::string> vec;
obj.convert(&vec);
vecs.push_back(vec);
} else if (obj.via.array.ptr[0].type == 6) {
obj.convert(&vecs);
} else {
++fail_count;
return result;
}
// Some more process steps
}
但實際上,整個數據的處理都在處理函數中。這意味著傳過來的數據在整個處理周期都是存在的,因此這兩步復制可以視為額外的開銷。
msgpack協議在對數據進行反序列化解析的時候,其內存管理的基本邏輯如下:
圖片
為了避免復制 string,bin 這些類型的數據,msgpack 支持在解析的時候傳入一個函數,用來決定這些類型的數據是否需要進行復制:
圖片
因此在第二步,對 msgpack::object 進行轉換的時候,我們不再轉換為 string,而是使用 string_view,可以優化掉 string 的復制和內存分配等:
// Define string_view convert struct.
template <>
struct msgpack::adaptor::convert<std::string_view> {
msgpack::object const& operator()(msgpack::object const& o, std::string_view& v) const {
switch (o.type) {
case msgpack::type::BIN:
v = std::string_view(o.via.bin.ptr, o.via.bin.size);
break;
case msgpack::type::STR:
v = std::string_view(o.via.str.ptr, o.via.str.size);
break;
default:
throw msgpack::type_error();
break;
}
return o;
}
};
static bool string_reference(msgpack::type::object_type type, std::size_t, void*) {
return type == msgpack::type::STR;
}
{
msgpack::unpacked msg;
msgpack::unpack(msg, buffer.data(), buffer.size(), string_reference);
msgpack::object obj = msg.get();
std::vector<std::vector<std::string_view>> vecs;
if (obj.via.array.ptr[0].type == msgpack::type::STR) {
std::vector<std::string_view> vec;
obj.convert(&vec);
vecs.push_back(vec);
} else if (obj.via.array.ptr[0].type == msgpack::type::ARRAY) {
obj.convert(&vecs);
} else {
++fail_count;
return result;
}
}
經過驗證可以看到:零拷貝的時候,轉換完的所有數據的內存地址都在原來的的 buffer 的內存地址范圍內。而使用 string 進行復制的時候,內存地址和 buffer 的內存地址明顯不同。
圖片
Case 2
圖片
Agent在接收端通過系統調用完成數據接收后,會立刻將數據投遞到異步的線程池內,進行數據的解析工作,以達到不阻塞接收端的效果。但我們在對線上數據進行分析時發現,用戶產生的數據包大小是不固定的,并且存在大量的小包(比如一條打點數據)。這會導致異步線程池內的任務數量較多,平均每個任務的體積較小,線程池需要頻繁的從隊列獲取新的任務,帶來了處理性能的下降。
因此我們充分理解了msgpack的協議格式(https://github.com/msgpack/msgpack/blob/master/spec.md)后,在接收端將多個數據小包(一條打點數據)聚合成一個數據大包(多條打點數據),進行一次任務提交,提高了接收端的處理性能,降低了線程切換的開銷。
static inline bool tryMerge(std::string& merge_buf, std::string& recv_buf, int msg_size, int merge_buf_cap) {
uint16_t big_endian_len, host_endian_len, cur_msg_len;
memcpy(&big_endian_len, (void*)&merge_buf[1], sizeof(big_endian_len));
host_endian_len = ntohs(big_endian_len);
cur_msg_len = recv_buf[0] & 0x0f;
if((recv_buf[0] & 0xf0) != 0x90 || merge_buf.size() + msg_size > merge_buf_cap || host_endian_len + cur_msg_len > 0xffff) {
// upper 4 digits are not 1001
// or merge_buf cannot hold anymore data
// or array 16 in the merge_buf cannot hold more objs (although not possible right now, but have to check)
return false;
}
// start merging
host_endian_len += cur_msg_len;
merge_buf.append(++recv_buf.begin(), recv_buf.begin() + msg_size);
// update elem cnt in array 16
big_endian_len = htons(host_endian_len);
memcpy((void*)&merge_buf[1], &big_endian_len, sizeof(big_endian_len));
return true;
}
{ // receiver function
// array 16 with 0 member
std::string merge_buf({(char)0xdc, (char)0x00, (char)0x00});
for(int i = 0 ; i < 1024; ++i) {
int r = recv(fd, const_cast<char *>(tmp_buffer_.data()), tmp_buffer_size_, 0);
if (r > 0) {
if(!tryMerge(merge_buf, tmp_buffer_, r, tmp_buffer_size_)) {
// Submit Task
}
// Some other logics
}
}
從關鍵的系統指標的角度看,在merge邏輯有收益時(接收QPS = 48k,75k,120k,150k),小包合并邏輯大大減少了上下文切換,執行指令數,icache/dcache miss,并且增加了IPC(instructions per cycle)見下表:
同時通過對前后火焰圖的對比分析看,在合并數據包之后,原本用于調度線程池的cpu資源更多的消耗在了收包上,也解釋了小包合并之后context switch減少的情況。
Case 3
用戶在打點指標中的Tags,是拼接成字符串進行純文本傳遞的,這樣設計的主要目的是簡化SDK和Agent之間的數據格式。但這種方式就要求Agent必須對字符串進行解析,將文本化的Tags反序列化出來,又由于在接收端收到的用戶打點QPS很高,這也成為了Agent的性能熱點。
早期Agent在實現這個解析操作時,采用了遍歷字符串的方式,將字符串按|
和 =
分割成 key-value 對。在其成為性能瓶頸后,我們發現它很適合使用SIMD進行加速處理。
原版
inline bool is_tag_split(const char &c) {
return c == '|' || c == ' ';
}
inline bool is_kv_split(const char &c) {
return c == '=';
}
bool find_str_with_delimiters(const char *str, const std::size_t &cur_idx, const std::size_t &end_idx,
const Process_State &state, std::size_t *str_end) {
if (cur_idx >= end_idx) {
return false;
}
std::size_t index = cur_idx;
while (index < end_idx) {
if (state == TAG_KEY) {
if (is_kv_split(str[index])) {
*str_end = index;
return true;
} else if (is_tag_split(str[index])) {
return false;
}
} else {
if (is_tag_split(str[index])) {
*str_end = index;
return true;
}
}
index++;
}
if (state == TAG_VALUE) {
*str_end = index;
return true;
}
return false;
}
SIMD 版
#if defined(__SSE__)
static std::size_t find_key_simd(const char *str, std::size_t end, std::size_t idx) {
if (idx >= end) { return 0; }
for (; idx + 16 <= end; idx += 16) {
__m128i v = _mm_loadu_si128((const __m128i*)(str + idx));
__m128i is_tag = _mm_or_si128(_mm_cmpeq_epi8(v, _mm_set1_epi8('|')),
_mm_cmpeq_epi8(v, _mm_set1_epi8(' ')));
__m128i is_kv = _mm_cmpeq_epi8(v, _mm_set1_epi8('='));
int tag_bits = _mm_movemask_epi8(is_tag);
int kv_bits = _mm_movemask_epi8(is_kv);
// has '|' or ' ' first
bool has_tag_first = ((kv_bits - 1) & tag_bits) != 0;
if (has_tag_first) { return 0; }
if (kv_bits) { // found '='
return idx + __builtin_ctz(kv_bits);
}
}
for (; idx < end; ++idx) {
if (is_kv_split(str[idx])) { return idx; }
else if (is_tag_split(str[idx])) { return 0; }
}
return 0;
}
static std::size_t find_value_simd(const char *str, std::size_t end, std::size_t idx) {
if (idx >= end) { return 0; }
for (; idx + 16 <= end; idx += 16) {
__m128i v = _mm_loadu_si128((const __m128i*)(str + idx));
__m128i is_tag = _mm_or_si128(_mm_cmpeq_epi8(v, _mm_set1_epi8('|')),
_mm_cmpeq_epi8(v, _mm_set1_epi8(' ')));
int tag_bits = _mm_movemask_epi8(is_tag);
if (tag_bits) {
return idx + __builtin_ctz(tag_bits);
}
}
for (; idx < end; ++idx) {
if (is_tag_split(str[idx])) { return idx; }
}
return idx;
}
構建的測試用例格式為
。text 則是測試例子里的 str_size,用來測試不同 str_size 下使用 simd 的收益。可以看到,在 str_size 較大時,simd 性能明顯高于標量的實現。
str_size | simd | scalar |
1 | 109 | 140 |
2 | 145 | 158 |
4 | 147 | 198 |
8 | 143 | 283 |
16 | 155 | 459 |
32 | 168 | 809 |
64 | 220 | 1589 |
128 | 289 | 3216 |
256 | 477 | 6297 |
512 | 883 | 12494 |
1024 | 1687 | 24410 |
數據處理
Case 1
Agent在數據聚合過程中,需要一個map來存儲一個指標的所有序列,用于對一段時間內的打點值進行聚合計算,得到一個固定間隔的觀測值。這個map的key是指標的tags,map的value是指標的值。我們通過采集火焰圖發現,這個map的查找操作存在一定程度的熱點。
圖片
下面是 _M_find_before_node 的實現:
圖片
這個函數作用是:算完 hash 后,在 hash 桶里找到匹配 key 的元素。這也意味著,即使命中了,hash 查找的時候也要進行一次 key 的比較操作。而在 Agent 里,這個 key 的比較操作定義為:
bool operator==(const TagSet &other) const {
if (tags.size() != other.tags.size()) {
return false;
}
for (size_t i = 0; i < tags.size(); ++i) {
auto &left = tags[i];
auto &right = other.tags[i];
if (left.key_ != right.key_ || left.value_ != right.value_) {
return false;
}
}
return true;
}
這里需要遍歷整個 Tagset 的元素并比較他們是否相等。在查找較多的情況下,每次 hash 命中后都要進行這樣一次操作是非常耗時的。可能導致時間開銷增大的原因有:
- 每個 tag 的 key_ 和 value_ 是單獨的內存(如果數據較短,stl 不會額外分配內存,這樣的情況下就沒有單獨分配的內存了),存在著 cache miss 的開銷,硬件預取效果也會變差;
- 需要頻繁地調用 memcmp 函數;
- 按個比較每個 tag,分支較多。
圖片
因此,我們將 TagSet 的數據使用 string_view 表示,并將所有的 data 全部存放在同一塊內存中。在 dictionary encode 的時候,再把 TagSet 轉換成 string 的格式返回出去。
// TagView
#include <functional>
#include <string>
#include <vector>
struct TagView {
TagView() = default;
TagView(std::string_view k, std::string_view v) : key_(k), value_(v) {}
std::string_view key_;
std::string_view value_;
};
struct TagViewSet {
TagViewSet() = default;
TagViewSet(const std::vector<TagView> &tgs, std::string&& buffer) : tags(tgs),
tags_buffer(std::move(buffer)) {}
TagViewSet(std::vector<TagView> &&tgs, std::string&& buffer) { tags = std::move(tgs); }
TagViewSet(const std::vector<TagView> &tgs, size_t buffer_assume_size) {
tags.reserve(tgs.size());
tags_buffer.reserve(buffer_assume_size);
for (auto& tg : tgs) {
tags_buffer += tg.key_;
tags_buffer += tg.value_;
}
const char* start = tags_buffer.c_str();
for (auto& tg : tgs) {
std::string_view key(start, tg.key_.size());
start += key.size();
std::string_view value(start, tg.value_.size());
start += value.size();
tags.emplace_back(key, value);
}
}
bool operator==(const TagViewSet &other) const {
if (tags.size() != other.tags.size()) {
return false;
}
// not compare every tag
return tags_buffer == other.tags_buffer;
}
std::vector<TagView> tags;
std::string tags_buffer;
};
struct TagViewSetPtrHash {
inline std::size_t operator()(const TagViewSet *tgs) const {
return std::hash<std::string>{}(tgs->tags_buffer);
}
};
驗證結果表明,當 Tagset 中 kv 的個數大于 2 的時候,新方法性能較好。
圖片
數據發送
Case 1
早期Agent使用zlib進行數據發送前的壓縮,隨著用戶打點規模的增長,壓縮逐步成為了Agent的性能熱點。
因此我們通過構造滿足線上用戶數據特征的數據集,對常用的壓縮庫進行了測試:
zlib使用cloudflare
圖片
zlib使用1.2.11
圖片
通過測試結果我們可以看到,除bzip2外,其他壓縮算法均在不同程度上優于zlib:
- zlib的高性能分支,基于cloudflare優化 比 1.2.11的官方分支性能好,壓縮CPU開銷約為后者的37.5%
- 采用SIMD指令加速計算
- zstd能夠在壓縮率低于zlib的情況下,獲得更低的cpu開銷,因此如果希望獲得比當前更好的壓縮率,可以考慮zstd算法
- 若不考慮壓縮率的影響,追求極致低的cpu開銷,那么snappy是更好的選擇
結合業務場景考慮,我們最終執行短期使用 zlib-cloudflare 替換,長期使用 zstd 替換的優化方案。
結論
上述優化取得了非常好的效果,經過上線驗證得出:
- CPU峰值使用量降低了10.26%,平均使用量降低了6.27%
- Mem峰值使用量降低了19.67%,平均使用量降低了19.81%
綜合分析以上性能熱點和優化方案,可以看到我們對Agent優化的主要考量點是:
- 減少不必要的內存拷貝
- 減少程序上下文的切換開銷,提高緩存命中率
- 使用SIMD指令來加速處理關鍵性的熱點邏輯
除此之外,我們還在開展 PGO 和 clang thinLTO 的驗證工作,借助編譯器的能力來進一步優化Agent性能。
加入我們
本文作者趙杰裔,來自字節跳動 基礎架構-云原生-可觀測團隊,我們提供日均數十PB級可觀測性數據采集、存儲和查詢分析的引擎底座,致力于為業務、業務中臺、基礎架構建設完整統一的可觀測性技術支撐能力。同時,我們也將逐步開展在火山引擎上構建可觀測性的云產品,較大程度地輸出多年技術沉淀。 如果你也想一起攻克技術難題,迎接更大的技術挑戰,歡迎投遞簡歷到 zhaojieyi@bytedance.com
最 Nice 的工作氛圍和成長機會,福利與機遇多多,在上海、杭州和北京均有職位,歡迎加入字節跳動可觀測團隊 !
參考引用
- v2_0_cpp_unpacker:https://github.com/msgpack/msgpack-c/wiki/v2_0_cpp_unpacker#memory-management
- messagepack-specification:https://github.com/msgpack/msgpack/blob/master/spec.md
- Cloudflare fork of zlib with massive performance improvements:https://github.com/RJVB/zlib-cloudflare
- Intel? Intrinsics Guide:https://www.intel.com/content/www/us/en/docs/intrinsics-guide/index.html
- Profile-guided optimization:https://en.wikipedia.org/wiki/Profile-guided_optimization
- ThinLTO:https://clang.llvm.org/docs/ThinLTO.html