成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

LSM-TREE從入門到入魔:從零開始實現一個高性能鍵值存儲

存儲 存儲架構
我們使用Zig語言實現了一個LSM-Tree的核心功能,包括MemTable、SSTable、寫流程、各類Iterator與數據壓縮能力。通過這個項目,我收獲了很多心得體會。

目錄

一、引言

二、LSM-Treee 核心功能概述

三、核心功能實現

    1.MemTable 實現

    2.SSTable

    3.Write

    4.Iterators

    5.Read/Scan

    6.壓縮

四、總結

一.引 言

LSM-Tree(Log-Structured Merge Tree)是一種高效的鍵值存儲數據結構,廣泛應用于NoSQL數據庫和大數據處理系統中。其核心思想是通過分層、有序地利用磁盤順序寫入的性能優勢,優化寫入操作,同時犧牲部分讀取性能以換取更高的寫入吞吐量。

圖片圖片

圖片圖片

在互聯網的各個基礎設施中,不論是數據庫還是緩存亦或是大數據框架,LSM-Tree這個數據結構都是很常見的身影。

我每天都在使用這個存儲引擎,但是對它的了解還流于表面,所以我想要自己實現一次LSM-Tree加深理解。

本次實現我們采用了Zig語言,簡要的實現LSM-Tree的核心功能(讀寫、數據壓縮、持久化,不包含MVCC的內容)。

Zig是一種新興的系統編程語言,其設計目標是提供現代特性的同時保持低復雜性。

本項目極大的受到了Mini-Lsm這個項目的啟發,強烈推薦大家學習這個項目!

二.LSM-Treee 核心功能概述

在開始自己編寫之前,我先簡單介紹一下LSM-Tree(Log-Structured Merge Tree)的架構以及讀寫流程。

LSM-Tree它結合了日志和索引的特點,優化了寫入和讀取性能。每次寫入都是采用append-only的方式,所以寫入性能很高。

而作為代價,追加寫入會造成存儲放大,LSM-Tree時采用了多層SSTable的方式將數據堆疊在硬盤上。所以需要一個合并壓縮的過程來回收過多的空間。

圖片圖片

寫流程


讀流程


  • 預寫日志(WAL) :寫操作首先寫入預寫日志(WAL),用于記錄未提交的數據,確保數據的持久性和一致性。
  • MemTable:隨后將數據寫入內存中的MemTable,MemTable是一個平衡樹(如skiplist),支持快速插入和刪除操作。
  • 觸發Compaction:當MemTable達到一定閾值時,會觸發后臺線程將MemTable中的數據刷入磁盤,生成SSTable文件。
  • SSTable:生成的SSTable文件是不可變的,存儲在磁盤上,用于后續讀取操作。
  • 合并操作(Merge) :當多個SSTable文件達到一定數量時,會觸發合并操作,將它們合并為一個更大的SSTable文件,以減少文件數量。
  • MemTable優先:讀取操作首先從MemTable中查找數據,因為MemTable是按升序排列的,查找效率較高。
  • Block Cache:如果MemTable中未找到數據,則從Block Cache中查找。Block Cache存儲了預先加載到內存中的SSTable塊,以提高讀取性能。
  • SSTable查找:如果Block Cache中也未找到數據,則從磁盤上的SSTable文件中查找。Lsm-tree會從最低層(L0)開始查找,逐層向上查找,直到找到目標數據。
  • 多版本并發控制(MVCC) :Lsm-tree支持多版本并發控制,允許同時訪問不同版本的數據,從而提高并發性能。

三、核心功能實現

MemTable 實現

首先,我們先實現 LSM 存儲引擎的內存結構—Memtable。我們選擇跳表實現作為 Memtable 的數據結構,因為它支持無鎖的并發讀寫。我們不會深入介紹跳表的工作原理(Redis的同學應該不陌生這個東西),簡單來說,它是一個易于實現的有序鍵值映射。

圖片圖片

Skiplist的實現非常簡單,這里我利用Zig編譯時的能力實現了一個泛型版本的跳表src/skiplist.zig,有興趣的小伙伴可以直接去倉庫中參觀代碼。

基于SkipList的能力,我們即可包裝出Memtable的基本功能。

我們這個LSM支持WAL功能的,即寫入內存表之前要先寫入磁盤日志,方便在意外宕機重啟后可以恢復數據。

WAL的能力我就不想自己再實現了,于是從網上扒了一個C的實現(Zig集成C語言非常便捷,可以參考與 C 交互)。

map: Map,
lock: RwLock,
wal: ?Wal,
id: usize,
allocator: std.mem.Allocator,
arena: std.heap.ArenaAllocator,
approximate_size: atomic.Value(usize) = atomic.Value(usize).init(0),


fn putToList(self: *Self, key: []const u8, value: []const u8) !void {
    {
        self.lock.lock();
        defer self.lock.unlock();
        try self.map.insert(kk, vv);
    }


    _ = self.approximate_size.fetchAdd(@intCast(key.len + value.len), .monotonic);
}


fn putToWal(self: *Self, key: []const u8, value: []const u8) !void {
    // [key-size: 4bytes][key][value-size: 4bytes][value]


    if (self.wal) |w| {
        var buf = std.ArrayList(u8).init(self.arena.allocator());


        var bw = buf.writer();
        try bw.writeInt(u32, @intCast(key.len), .big);
        _ = try bw.write(key);
        try bw.writeInt(u32, @intCast(value.len), .big);
        _ = try bw.write(value);
        try w.append(buf.items);
    }
}


// 寫入Memtable,先寫WAL,再寫skiplist table
pub fn put(self: *Self, key: []const u8, value: []const u8) !void {
    try self.putToWal(key, value);
    try self.putToList(key, value);
}


pub fn get(self: *Self, key: []const u8, val: *[]const u8) !bool {
    self.lock.lockShared();
    defer self.lock.unlockShared();
    var vv: []const u8 = undefined;
    if (try self.map.get(key, &vv)) {
        val.* = vv;
        return true;
    }
    return false;
}

注意到這里我們沒有實現刪除的功能,這里我仿照了RocksDB中的墓碑機制,用空值代表刪除,所以刪除被put(key, "")取代。

SSTable

接下來,我們就著手開始實現LSM中另外一個重要元素 --- SSTable。

SSTable(Sorted String Table)是一種不可變的(Immutable)磁盤文件,內部按Key有序排列,存儲鍵值對數據。每個SSTable文件生成后不再修改,更新和刪除操作通過追加新記錄或標記刪除,最終通過合并(Compaction)清理冗余數據。

每當LSM-Tree中的MemTable體積超出閾值,就會將內存中的數據寫入SsTable。

圖片圖片

每個SSTable由多個Block組成,每個Block是一組KV的package。

Block的編碼格式如下:

圖片圖片

為了構建一個Block,我們實現了一個BlockBuilder的模塊,這部分代碼見src/block.zig:

pub const Block = struct {
    data_v: std.ArrayList(u8),
    offset_v: std.ArrayList(u16),
}


pub const BlockBuilder = struct {
    allocator: std.mem.Allocator,
    offset_v: std.ArrayList(u16),
    data_v: std.ArrayList(u8),
    block_size: usize,
    first_key: []u8,
    
    pub fn add(self: *Self, key: []const u8, value: ?[]const u8) !bool {
        std.debug.assert(key.len > 0); // key must not be empty


        const vSize = if (value) |v| v.len else 0;
        
        if ((self.estimated_size() + key.len + vSize + 3 * @sizeOf(u16) > self.block_size) and !self.is_empty()) {
            return false;
        }
        try self.doAdd(key, value);


        if (self.first_key.len == 0) {
            self.first_key = try self.allocator.dupe(u8, key);
        }
        return true;
    }


    fn doAdd(self: *Self, key: []const u8, value: ?[]const u8) !void {
        // add the offset of the data into the offset array
        try self.offset_v.append(@intCast(self.data_v.items.len));
        const overlap = calculate_overlap(self.first_key, key);


        var dw = self.data_v.writer();
        // encode key overlap
        try dw.writeInt(u16, @intCast(overlap), .big);
        // encode key length
        try dw.writeInt(u16, @intCast(key.len - overlap), .big);


        // encode key content
        _ = try dw.write(key[overlap..]);
        // encode value length
        if (value) |v| {
            try dw.writeInt(u16, @intCast(v.len), .big);
            // encode value content
            _ = try dw.write(v);
        } else {
            try dw.writeInt(u16, 0, .big);
        }
    }


    pub fn build(self: *Self) !Block {
        if (self.isEmpty()) {
            @panic("block is empty");
        }
        return Block.init(
            try self.data_v.clone(),
            try self.offset_v.clone(),
        );
    }
}

可能有同學注意到,我們寫key的時候沒有直接將key值寫入,而且只寫了key與當前block的第一個key不重疊的suffix部分。由于block中的key都是有序的,所以一個block中的key有很大概率是前綴類似的,所以這里是一個空間優化的小技巧,例如:

Key: foo, foo1, foo2, foo3 ....

我們寫入block時,只需要寫:

foo|1|2|3|....

很多有序表的實現中都會用到這個小技巧。

有了block的實現,我們可以進一步來定義SSTable的格式。一個SSTable由多個Block、block元數據以及布隆過濾器構成。

圖片圖片

布隆過濾器是一種概率性數據結構,用于維護一組鍵。您可以向布隆過濾器中添加鍵,并且可以知道在添加到布隆過濾器中的鍵集中可能存在或必須不存在的鍵。

在SSTable中添加布隆過濾器可以有效提升查詢key的效率。

元數據包含了block的第一個與最后一個key以及block在sst中的offset信息,記錄元數據主要為了在后續的檢索中可以快速定位某個key落在哪個block中。

同樣的套路,為了構建SSTable,我們先實現一個SSTableBuilder,部分代碼見src/ss_table.zig

pub const SsTableBuilder = struct {
    allocator: std.mem.Allocator,
    builder: BlockBuilder, // 剛才實現的block構建裝置
    first_key: ?[]const u8,
    last_key: ?[]const u8,
    meta: std.ArrayList(BlockMeta),
    block_size: usize,
    data: std.ArrayList(u8),
    bloom: BloomFilterPtr, // 布隆過濾器
    
    pub fn add(self: *Self, key: []const u8, value: []const u8) !void {
        try self.setFirstKey(key);
        try self.bloom.get().insert(key); // 寫入布隆過濾器


        if (try self.builder.add(key, value)) {
            try self.setLastKey(key);
            return;
        }
        // block is full
        try self.finishBlock();
        std.debug.assert(try self.builder.add(key, value));
        try self.resetFirstKey(key);
        try self.setLastKey(key);
    }
    
    // 寫入一個block的數據
    fn finishBlock(self: *Self) !void {
        if (self.builder.isEmpty()) {
            return;
        }
        var bo = self.builder;
        // reset block
        defer bo.reset();


        self.builder = BlockBuilder.init(self.allocator, self.block_size);
        var blk = try bo.build();
        defer blk.deinit();
        const encoded_block = try blk.encode(self.allocator); // block序列化
        defer self.allocator.free(encoded_block);
        
        // 記錄block的元數據
        try self.meta.append(.{
            .allocator = self.allocator,
            .offset = self.data.items.len,
            .first_key = try self.allocator.dupe(u8, self.first_key.?),
            .last_key = try self.allocator.dupe(u8, self.last_key.?),
        });
        const cksm = hash.Crc32.hash(encoded_block); // 寫入4b的校驗值
        try self.data.appendSlice(encoded_block);
        try self.data.writer().writeInt(u32, cksm, .big);
    }
    
    // 構建為一個SSTable
    pub fn build(
        self: *Self,
        id: usize,
        block_cache: ?BlockCachePtr, // 讀取block數據的緩存,減少block的反序列化成本
        path: []const u8,
    ) !SsTable {
        var arena = std.heap.ArenaAllocator.init(self.allocator);
        defer arena.deinit();
        const allocator = arena.allocator();


        try self.finishBlock();
        const w = self.data.writer();
        
        // 寫入元數據及其offset
        const meta_offset = self.data.items.len;
        const meta_b = try BlockMeta.batchEncode(self.meta.items, allocator);
        _ = try w.write(meta_b);
        try w.writeInt(u32, @intCast(meta_offset), .big);


        // 寫入布隆過濾器及其offset
        const bloom_offset = self.data.items.len;
        const encoded_bloom = try self.bloom.get().encode(allocator);
        _ = try w.write(encoded_bloom);
        try w.writeInt(u32, @intCast(bloom_offset), .big);
        
        
        const file = try FileObject.init(path, self.data.items);
        errdefer file.deinit();


        const fk = self.meta.items[0].first_key;
        const lk = self.meta.getLast().last_key;


        return .{
            .allocator = self.allocator,
            .file = file,
            .block_metas = try self.meta.toOwnedSlice(),
            .meta_offset = meta_offset,
            .block_cache = block_cache,
            .bloom = self.bloom.clone(),
            .id = id,
            .first_key = try self.allocator.dupe(u8, fk),
            .last_key = try self.allocator.dupe(u8, lk),
            .max_ts = 0,
        };
    }
}

Write

有了SSTable和MemTable,我們就有了LSM-Tree需要的兩個最重要的材料,后續的讀寫不過是對這兩類材料的組合拼裝。

在實現寫操作之前,我們先假想一下LSM-Tree的數據結構:

  • 首先我們需要一個數據結構存儲當前MemTable、冷MemTables和多層的SST,如下圖所示。

圖片圖片

  • 其次我們需要一個鎖用于同步上述數據結構的讀寫行為。
  • 我們還需要一個SSTable的自增id。
  • 最后還需要一些必要的配置,例如存儲路徑、線程管理器等。

最終,我們實現的LSM數據結構如下:

pub const StorageState = struct {
    allocator: std.mem.Allocator,
    mem_table: MemTablePtr, // 當前正在寫的MemTable
    imm_mem_tables: std.ArrayList(MemTablePtr), // 冷MemTable數組
    l0_sstables: std.ArrayList(usize), // 第一層的SSTable數組
    levels: std.ArrayList(std.ArrayList(usize)), // 后續多層的SSTable數組
    sstables: std.AutoHashMap(usize, SsTablePtr), // sst_id => SSTable
}


pub const StorageInner = struct {
    const Self = @This();


    allocator: std.mem.Allocator,
    state: StorageState,
    state_lock: std.Thread.RwLock = .{},
    next_sst_id: atomic.Value(usize),
    path: []const u8,
    options: StorageOptions,
    compaction_controller: CompactionController,
    block_cache: BlockCachePtr,
    terminate: std.Thread.ResetEvent = .{},
    wg: std.Thread.WaitGroup = .{},
}

先不考慮逐層壓縮的邏輯,只考慮一層SSTable的簡單情況,寫邏輯可以簡化為如下流程:

圖片圖片

  • 寫入State中的MemTable
pub fn writeBatch(self: *Self, records: []const WriteBatchRecord) !void {
    for (records) |record| {
        switch (record) {
            .put => |pp| {
                try self.state.getMemTable().put(pp.key, pp.value);
            },
            .delete => |dd| {
                // we use "" as the tombstone value
                try self.state.getMemTable().put(dd, "");
            },
        }
        // 嘗試把當前MemTable壓入冷數據
        try self.tryFreeze(self.state.getMemTable().getApproximateSize());
    }
}
  • 當MemTable體積超出閾值,壓入冷MemTable數組,重置當前MemTable
fn forceFreezeMemtable(self: *Self) !void {
    const next_sst_id = self.getNextSstId();
    
    // 生成一個新的MemTable
    var new_mm: MemTable = undefined;
    {
        if (self.options.enable_wal) {
            const mm_path = try pathOfWal(self.allocator, self.path, next_sst_id);
            defer self.allocator.free(mm_path);
            new_mm = MemTable.init(next_sst_id, self.allocator, mm_path);
        } else {
            new_mm = MemTable.init(next_sst_id, self.allocator, null);
        }
    }
    errdefer new_mm.deinit();


    var old_mm: *MemTable = undefined;
    {
        self.state_lock.lock();
        defer self.state_lock.unlock();
        var old_mm_ptr = self.state.mem_table;
        old_mm = old_mm_ptr.get();
        defer old_mm_ptr.release();
        self.state.mem_table = try MemTablePtr.create(self.allocator, new_mm);
        
        // 將寫滿的MemTable壓入冷數據
        try self.state.imm_mem_tables.append(old_mm_ptr.clone()); // newer memtable is inserted at the end
    }
    // TIPS:把磁盤同步放在鎖的范圍外面,降低鎖的覆蓋
    try old_mm.syncWal();
}
  • 當冷MemTable數組大小超出配置閾值,觸發SSTable落盤,彈出最冷的MemTable,寫入磁盤SSTable,并記錄在L0的SSTable數組中。這一過程是在一個線程中定時觸發
pub fn flushNextMemtable(self: *Self) !void {
    std.debug.assert(self.state.imm_mem_tables.items.len > 0);
    var to_flush_table: *MemTable = undefined;
    {
        self.state_lock.lockShared();
        defer self.state_lock.unlockShared();
        // oldest memtable is at the index 0
        to_flush_table = self.state.imm_mem_tables.items[0].load();
    }


    // 將最冷的MemTable構建為SSTable
    var builder = try SsTableBuilder.init(self.allocator, self.options.block_size);
    defer builder.deinit();


    const sst_id = to_flush_table.id;
    try to_flush_table.flush(&builder);


    const sst_path = try self.pathOfSst(sst_id);
    defer self.allocator.free(sst_path);
    var sst = try builder.build(sst_id, self.block_cache.clone(), sst_path);
    errdefer sst.deinit();


    // add the flushed table to l0_sstables
    {
        self.state_lock.lock();
        defer self.state_lock.unlock();


        var m = self.state.imm_mem_tables.orderedRemove(0);
        defer m.deinit();
        std.debug.assert(m.load().id == sst_id);


        // newest sstable is at the end
        try self.state.l0_sstables.append(sst_id);
        try self.state.sstables.put(sst.id, try SsTablePtr.create(self.allocator, sst));
    }
}

當然,這里只實現了一半的寫邏輯,數據停留在L0的SST中,后續的多層SST還沒有使用。

剩下一半的寫邏輯會在數據壓縮的章節中介紹。

Iterators

寫入的過程比較好理解,但是讀就略微復雜了,以上面我們實現的寫結果為例子,最終我們的數據沉淀在一個3層的數據結構中,要如何高效的從其中檢索數據呢?

圖片圖片

如同寫過程一般,讀過程也是對各個基礎單元(MemTable、SSTable、Block)讀過程的組合,為了方便組合邏輯,我們要先統一各個模塊的讀行為。

在LSM-Tree中,所有的讀行為都定義為了如下的Interface(Zig中沒trait或者Interface,所以這里實例代碼我用Rust描述):

pub trait StorageIterator {
    /// Get the current value.
    fn value(&self) -> &[u8];


    /// Get the current key.
    fn key(&self) -> &[u8];


    /// Check if the current iterator is empty.
    fn is_empty(&self) -> bool;


    /// Move to the next position.
    fn next(&mut self) -> anyhow::Result<()>;


    /// Number of underlying active iterators for this iterator.
    fn num_active_iterators(&self) -> usize {
        1
    }
}

我們首先對MemTable、SSTable、Block這些模塊實現讀接口,代碼可見:src/MemTable.zig,src/block.zig,src/ss_table.zig,這里單獨簡單介紹下SSTable的讀接口實現思路,其他的模塊實現思路類似,感興趣的直接閱讀源碼即可。

pub const SsTableIterator = struct {
    allocator: std.mem.Allocator,
    table: SsTablePtr,
    blk: BlockPtr,
    blk_iterator: BlockIteratorPtr,
    blk_idx: usize,


    const Self = @This();




    pub fn initAndSeekToFirst(allocator: std.mem.Allocator, table: SsTablePtr) !Self {
        const s = try seekToFirstInner(allocator, table);
        return .{
            .allocator = allocator,
            .table = table,
            .blk_iterator = s.blk_iter,
            .blk = s.blk,
            .blk_idx = 0,
        };
    }


    pub fn initAndSeekToKey(allocator: std.mem.Allocator, table: SsTablePtr, k: []const u8) !Self {
        const b = try seekToKeyInner(allocator, table, k);
        return .{
            .allocator = allocator,
            .table = table,
            .blk_iterator = b.blk_iter,
            .blk_idx = b.blk_idx,
            .blk = b.blk,
        };
    }


    fn seekToFirstInner(allocator: std.mem.Allocator, table: SsTablePtr) !struct {
        blk: BlockPtr,
        blk_iter: BlockIteratorPtr,
    } {
        var blk = try table.get().readBlockCached(0, allocator); // 讀取第一個block
        errdefer blk.release();
        var blk_iter = try BlockIterator.createAndSeekToFirst(allocator, blk.clone());
        errdefer blk_iter.deinit();


        return .{
            .blk = blk,
            .blk_iter = try BlockIteratorPtr.create(allocator, blk_iter), // 從SSTable的讀接口轉換為Block的讀接口
        };
    }


    fn seekToKeyInner(allocator: std.mem.Allocator, table: SsTablePtr, k: []const u8) !struct {
        blk_idx: usize,
        blk: BlockPtr,
        blk_iter: BlockIteratorPtr,
    } {
        const table_ptr = table.get();
        var blk_idx = try table_ptr.findBlockIndex(k);
        var blk = try table_ptr.readBlockCached(blk_idx, allocator);
        errdefer blk.deinit();
        var blk_iter = try BlockIterator.createAndSeekToKey(allocator, blk.clone(), k);
        errdefer blk_iter.deinit();
        var blk_iter_ptr = try BlockIteratorPtr.create(allocator, blk_iter);
        errdefer blk_iter_ptr.release();


        // 如果當前block讀完了,跳到下一個block,并生成block的讀接口
        if (blk_iter.isEmpty()) {
            blk_idx += 1;
            if (blk_idx < table_ptr.numBlocks()) {
                {
                    blk.deinit();
                    blk_iter.deinit();
                }
                var blk2 = try table_ptr.readBlockCached(blk_idx, allocator);
                errdefer blk2.deinit();
                var blk_iter2 = try BlockIterator.createAndSeekToFirst(allocator, blk2.clone());
                errdefer blk_iter2.deinit();


                return .{
                    .blk_idx = blk_idx,
                    .blk_iter = try BlockIteratorPtr.create(allocator, blk_iter2),
                    .blk = blk2,
                };
            }
        }
        return .{
            .blk_idx = blk_idx,
            .blk_iter = blk_iter_ptr,
            .blk = blk,
        };
    }


    pub fn key(self: Self) []const u8 {
        return self.blk_iterator.get().key();
    }


    pub fn value(self: Self) []const u8 {
        return self.blk_iterator.get().value();
    }


    pub fn isEmpty(self: Self) bool {
        return self.blk_iterator.get().isEmpty();
    }


    pub fn next(self: *Self) !void {
        try self.blk_iterator.get().next();
        // 若當前的Block讀完了,就跳到下一個block,并生成Block讀接口。
        if (self.blk_iterator.get().isEmpty()) {
            self.blk_idx += 1;
            if (self.blk_idx < self.table.get().numBlocks()) {
                self.reset();
                const blk = try self.table.get().readBlockCached(self.blk_idx, self.allocator);
                const blk_iter = try BlockIterator.createAndSeekToFirst(self.allocator, blk.clone());
                self.blk = blk;
                self.blk_iterator = try BlockIteratorPtr.create(self.allocator, blk_iter);
            }
        }
    }
};

有了幾個基本元素的讀接口之后,我們便遇到第一個問題:我們如何對多個MemTable做讀檢索?

圖片圖片

這個時候,我們需要一個新的數據結構來實現多個讀實例的合并檢索---- MergeIterator

MergeIterator在內部維護一個二叉堆。堆中數據的優先級如下:

當各個迭代器key不同時,具有最小key的迭代器最優。當多個迭代器有相同的當前key時,最新的迭代器一個最優。

假設我們有如下MemTable(iter1最新,iter3最舊):

  • iter1: b->del, c->4, d->5
  • iter2: a->1, b->2, c->3
  • iter3: e->4

經過合并后迭代器結果應該為:

  • a最小,iter2優先迭代
  • iter2迭代一次后,iter1與iter2 key相同,iter1優先迭代,b->2跳過
  • c最小,iter1優先迭代,iter2中c->3跳過
  • d最小,iter1優先迭代
  • 只剩iter3,迭代iter3

最終結果:a->1, b->del, c->4, d->5, e->4

實現代碼如下:

// 標準庫中有二叉堆實現
const IteratorHeap = std.PriorityQueue(*HeapWrapper, Comparer.Context, Comparer.cmp);


allocator: std.mem.Allocator,
q: IteratorHeap,
current: ?*HeapWrapper,


pub fn init(allocator: std.mem.Allocator, iters: std.ArrayList(StorageIteratorPtr)) !Self {
    var q = IteratorHeap.init(allocator, .{});
    if (iters.items.len == 0) {
        return Self{
            .allocator = allocator,
            .q = q,
            .current = null,
        };
    }


    // PS: the last iter has the highest priority
    // 按順序寫入二叉堆
    for (iters.items, 0..) |sp, i| {
        if (!sp.load().isEmpty()) {
            const hw = try allocator.create(HeapWrapper);
            errdefer allocator.destroy(hw);
            hw.* = HeapWrapper.init(i, sp.clone());
            try q.add(hw);
        }
    }


    const cc = q.removeOrNull();
    return Self{
        .allocator = allocator,
        .q = q,
        .current = cc,
    };
}


pub fn key(self: Self) []const u8 {
    return self.current.?.key();
}


pub fn value(self: Self) []const u8 {
    return self.current.?.value();
}


pub fn isEmpty(self: Self) bool {
    if (self.current) |cc| {
        return cc.isEmpty();
    }
    return true;
}


pub fn next(self: *Self) !void {
    const cc = self.current.?;
    while (true) {
        if (self.q.peek()) |ii| {
            std.debug.assert(!ii.isEmpty());
            // 如果優先堆頭部迭代器A和當前正在生效的迭代器B的key相同,讓迭代器A跳過重復key
            if (std.mem.eql(u8, cc.key(), ii.key())) {
                try ii.next();
                if (ii.isEmpty()) {
                    _ = self.q.remove();
                    ii.deinit();
                    self.allocator.destroy(ii);
                }
            } else {
                break;
            }
        }
        break;
    }


    try cc.next(); // 迭代當前迭代器


    // 如果當前優先迭代器迭代完了,就從堆中彈出最優迭代器
    if (cc.isEmpty()) {
        defer {
            cc.deinit();
            self.allocator.destroy(cc);
        }
        if (self.q.removeOrNull()) |h| {
            self.current = h;
        } else {
            self.current = null;
        }
        return;
    }


    // 將當前迭代器寫回二叉堆,重新計算最優迭代器
    try self.q.add(cc); 
    self.current = self.q.removeOrNull();
}

有了MergeIterator這個工具,我們具備了在多個MemTable和多個SSTable中迭代檢索的能力,但是還有個問題,我們當前有兩個MergeIterator,應該如何在兩個迭代器中執行迭代任務?

圖片圖片

此時,我們再引入一個新的數據結構:TwoMergeIterator,這個是MergeIterator在元素只有兩個的情況下的簡化版。

TwoMergeIterator由兩個迭代器構成,一個高優一個低優,每次迭代優先迭代高優,當key相同時,優先迭代高優。實現如下:

pub const TwoMergeIterator = struct {
    a: StorageIteratorPtr,
    b: StorageIteratorPtr,
    choose_a: bool,


    // 選擇兩個迭代器中key更小的迭代器
    fn chooseA(a: *StorageIterator, b: *StorageIterator) bool {
        if (a.isEmpty()) {
            return false;
        }
        if (b.isEmpty()) {
            return true;
        }
        return std.mem.lessThan(u8, a.key(), b.key());
    }


    // key相同時,跳過低優中的key
    fn skipB(self: *TwoMergeIterator) !void {
        const ap = self.a.load();
        const bp = self.b.load();
        if (!ap.isEmpty() and !bp.isEmpty() and std.mem.eql(u8, ap.key(), bp.key())) try bp.next();
    }


    pub fn init(a: StorageIteratorPtr, b: StorageIteratorPtr) !TwoMergeIterator {
        var iter = TwoMergeIterator{
            .a = a,
            .b = b,
            .choose_a = false,
        };
        try iter.skipB();
        iter.choose_a = chooseA(iter.a.load(), iter.b.load());
        return iter;
    }


    pub fn deinit(self: *TwoMergeIterator) void {
        self.a.release();
        self.b.release();
    }


    pub fn key(self: TwoMergeIterator) []const u8 {
        if (self.choose_a) {
            std.debug.assert(!self.a.load().isEmpty());
            return self.a.load().key();
        }
        std.debug.assert(!self.b.load().isEmpty());
        return self.b.load().key();
    }


    pub fn value(self: TwoMergeIterator) []const u8 {
        if (self.choose_a) {
            std.debug.assert(!self.a.load().isEmpty());
            return self.a.load().value();
        }
        std.debug.assert(!self.b.load().isEmpty());
        return self.b.load().value();
    }


    pub fn isEmpty(self: TwoMergeIterator) bool {
        if (self.choose_a) {
            return self.a.load().isEmpty();
        }
        return self.b.load().isEmpty();
    }


    pub fn next(self: *TwoMergeIterator) !void {
        if (self.choose_a) {
            try self.a.load().next();
        } else {
            try self.b.load().next();
        }
        try self.skipB();
        self.choose_a = chooseA(self.a.load(), self.b.load());
    }
};

至此,我們讀行為所需要的武器就完備了!

Read/Scan

讓我們再來看看LSM的架構圖:

圖片

我們將每個數據層中的數據標上優先級,由于LSM-Tree是append-only的,所以優先級越高的數據層中數據越新。

所以我們的讀策略也很明顯:按照上圖中P0至P2依次檢索,這部分代碼實現見src/storage.zig。

  • 讀MemTable
// search in memtable
if (try self.state.getMemTable().get(key, value)) {
    if (value.*.len == 0) {
        // tomestone
        return false;
    }
    return true;
}
  • 讀Immutable MemTable
// search in imm_memtables


self.state_lock.lockShared();
defer self.state_lock.unlockShared();
for (self.state.imm_mem_tables.items) |imm_table| {
    if (try imm_table.load().get(key, value)) {
        if (value.*.len == 0) {
            // tomestone
            return false;
        }
        return true;
    }
}
  • 讀LV0~LVmax SSTables
// 收集L0中的迭代器
var l0_iters = std.ArrayList(StorageIteratorPtr).init(self.allocator);
defer {
    for (l0_iters.items) |iter| {
        var ii = iter;
        ii.release();
    }
    l0_iters.deinit();
}
{
    self.state_lock.lockShared();
    defer self.state_lock.unlockShared();
    for (self.state.l0_sstables.items) |sst_id| {
        const sst = self.state.sstables.get(sst_id).?;
        if (try sst.load().mayContain(key)) {
            var ss_iter = try SsTableIterator.initAndSeekToKey(self.allocator, sst.clone(), key);
            errdefer ss_iter.deinit();
            try l0_iters.append(try StorageIteratorPtr.create(self.allocator, .{ .ss_table_iter = ss_iter }));
        }
    }
}


// 收集Levels中的迭代器
var level_iters: std.ArrayList(StorageIteratorPtr) = undefined;
{
    self.state_lock.lockShared();
    defer self.state_lock.unlockShared();
    level_iters = try std.ArrayList(StorageIteratorPtr).initCapacity(
        self.allocator,
        self.state.levels.items.len,
    );
    for (self.state.levels.items) |level| {
        var level_ssts = try std.ArrayList(SsTablePtr).initCapacity(self.allocator, level.items.len);
        errdefer level_ssts.deinit();
        for (level.items) |sst_id| {
            const sst = self.state.sstables.get(sst_id).?;
            if (try mayWithinTable(key, sst)) {
                try level_ssts.append(sst.clone());
            }
        }
        if (level_ssts.items.len > 0) {
            var level_iter = try SstConcatIterator.initAndSeekToKey(
                self.allocator,
                level_ssts,
                key,
            );
            errdefer level_iter.deinit();
            try level_iters.append(try StorageIteratorPtr.create(self.allocator, .{ .sst_concat_iter = level_iter }));
        }
    }
}


// 將多個迭代器合并為一個TwoMergeIterator
var l0_merge_iter = try MergeIterators.init(self.allocator, l0_iters);
errdefer l0_merge_iter.deinit();


var levels_merge_iter = try MergeIterators.init(self.allocator, level_iters);
errdefer levels_merge_iter.deinit();


var iter = try TwoMergeIterator.init(
    try StorageIteratorPtr.create(self.allocator, .{ .merge_iterators = l0_merge_iter }),
    try StorageIteratorPtr.create(self.allocator, .{ .merge_iterators = levels_merge_iter }),
);
defer iter.deinit();


if (iter.isEmpty()) {
    return false;
}


if (std.mem.eql(u8, iter.key(), key) and iter.value().len > 0) {
    value.* = iter.value();
    return true;
}

壓縮

在上一節的寫過程中,我們實現了從內存表到Level0的SSTable堆疊。

隨著寫入的持續,Lv0的SSTable會越來越多,這個時候就需要我們將Lv0中的數據合并寫入至Lv2,并依次類推重復這個過程,直到堆疊到最深的層數,這個逐層合并數據的過程就是數據壓縮。

圖片圖片

LSM-Tree中數據壓縮的過程大致如下:

圖片圖片

具體的實現代碼可見src/compact.zig,src/storage.zig。

簡單分層壓縮與原始 LSM 論文中的壓縮策略相似。它為 LSM 樹維護多個層級。當一個層級太大時,它會將此層級的所有 SST 與下一層合并。壓縮策略由 3 個參數控制:

  • size_ratio_percent:【文件低級數量/文件高級數量】,當實際計算的值低于此閾值時觸發壓縮。假設這里我們設置為60%,當L0中SST數量為2,L1中SST數量為1,此時ratio為1/2 = 50% < 60%,此時我們應該將L0壓縮合并至L1。
  • level0_file_num_compaction_trigger: 第一層SSTable達到多少后觸發壓縮。因為這是最高層,沒法與更高層比較,只能固定觸發壓縮。
  • max_levels: 顧名思義,最大的層數限制。

做好這些準備工作,我們可以逐步實現壓縮邏輯:

  • 生成壓縮任務:
pub const SimpleLeveledCompactionController = struct {
    options: SimpleLeveledCompactionOptions,


    pub fn generateCompactionTask(self: SimpleLeveledCompactionController, state: *storage.StorageState) !?SimpleLeveledCompactionTask {
        if (self.options.max_levels == 1) {
            return null;
        }


        var level_sizes = std.ArrayList(usize).init(state.allocator);
        defer level_sizes.deinit();


        try level_sizes.append(state.l0_sstables.items.len);
        for (state.levels.items) |level| {
            try level_sizes.append(level.items.len);
        }


        // 如果Lv0中SST數量超出閾值,觸發L0級別壓縮
        if (state.l0_sstables.items.len >= self.options.level0_file_num_compaction_trigger) {
            std.debug.print("compaction of L0 to L1 because L0 has owocgsc SSTS >= owocgsc\n", .{ state.l0_sstables.items.len, self.options.level0_file_num_compaction_trigger });
            return .{
                .upper_level = null,
                .upper_level_sst_ids = try state.l0_sstables.clone(),
                .lower_level = 1,
                .lower_level_sst_ids = try state.levels.items[0].clone(),
                .is_lower_level_bottom = false,
            };
        }


        // 計算Lv[n+1]/lv[n],如果比例小于閾值,觸發Lv[n]級別壓縮
        for (1..self.options.max_levels) |level| {
            const lower_level = level + 1;
            if (level_sizes.items[level] == 0) {
                continue;
            }
            const size_ration = level_sizes.items[lower_level] * 100 / level_sizes.items[level];
            if (size_ration < self.options.size_ration_percent) {
                std.debug.print("compaction of Lowocgsc to Lowocgsc because Lowocgsc size ratio owocgsc < owocgsc\n", .{ level, lower_level, level, size_ration, self.options.size_ration_percent });
                return .{
                    .upper_level = level,
                    .upper_level_sst_ids = try state.levels.items[level - 1].clone(),
                    .lower_level = lower_level,
                    .lower_level_sst_ids = try state.levels.items[lower_level - 1].clone(),
                    .is_lower_level_bottom = lower_level == self.options.max_levels,
                };
            }
        }


        return null;
    }
}
  • 執行壓縮任務:

有了上一小節中讀過程的介紹,多層數據的壓縮過程就很好理解了。

例如我們想將L1與L2的SSTable合并壓縮至L2,我們只需要把L1和L2的數據放在一起創造一個迭代器,再持續從該迭代器中讀出數據寫入新的SSTable中,這個過程保證了新的SSTable中數據不重復且有序。

fn compactSimple(self: *Self, task: SimpleLeveledCompactionTask) !std.ArrayList(SsTablePtr) {
    if (task.upper_level) |_| {
        var upper_ssts = try std.ArrayList(SsTablePtr).initCapacity(
            self.allocator,
            task.upper_level_sst_ids.items.len,
        );
        var lower_ssts = try std.ArrayList(SsTablePtr).initCapacity(
            self.allocator,
            task.lower_level_sst_ids.items.len,
        );


        self.state_lock.lockShared();
        for (task.upper_level_sst_ids.items) |sst_id| {
            const sst = self.state.sstables.get(sst_id).?;
            try upper_ssts.append(sst.clone());
        }
        for (task.lower_level_sst_ids.items) |sst_id| {
            const sst = self.state.sstables.get(sst_id).?;
            try lower_ssts.append(sst.clone());
        }
        self.state_lock.unlockShared();


        var upper_iter = try SstConcatIterator.initAndSeekToFirst(self.allocator, upper_ssts);
        errdefer upper_iter.deinit();


        var lower_iter = try SstConcatIterator.initAndSeekToFirst(self.allocator, lower_ssts);
        errdefer lower_iter.deinit();


        var iter = try TwoMergeIterator.init(
            try StorageIteratorPtr.create(self.allocator, .{ .sst_concat_iter = upper_iter }),
            try StorageIteratorPtr.create(self.allocator, .{ .sst_concat_iter = lower_iter }),
        );
        defer iter.deinit();
        return self.compactGenerateSstFromIter(&iter, task.is_lower_level_bottom);
    } else {
        // compact l0_sstables to l1_sstables
        // ..... 代碼邏輯大致與上面LvN層壓縮一致,只是Lv0層的SSTable是無序的需要特殊考慮
        return self.compactGenerateSstFromIter(&iter, task.is_lower_level_bottom);
    }
}




fn compactGenerateSstFromIter(self: *Self, iter: *TwoMergeIterator, compact_to_bottom_level: bool) !std.ArrayList(SsTablePtr) {
    var builder: SsTableBuilder = try SsTableBuilder.init(self.allocator, self.options.block_size);
    defer builder.deinit();
    var new_ssts = std.ArrayList(SsTablePtr).init(self.allocator);
    
    // 持續迭代此迭代器
    while (!iter.isEmpty()) {
        // 如果壓縮至最后一層,可以不保留墓碑值key了
        if (compact_to_bottom_level) {
            if (iter.value().len > 0) {
                try builder.add(iter.key(), iter.value());
            }
        } else {
            try builder.add(iter.key(), iter.value());
        }
        // 當寫滿一個SSTable后,就清空builder,把寫滿的SSTable入列
        if (builder.estimatedSize() >= self.options.target_sst_size) {
            // reset builder
            defer builder.reset() catch unreachable;
            const sst_id = self.getNextSstId();
            const path = try self.pathOfSst(sst_id);
            defer self.allocator.free(path);
            var sst = try builder.build(sst_id, self.block_cache.clone(), path);
            errdefer sst.deinit();


            var sst_ptr = try SsTablePtr.create(self.allocator, sst);
            errdefer sst_ptr.deinit();


            try new_ssts.append(sst_ptr);
        }
        try iter.next();
    }
    // 剩余的數據單獨一個SSTable
    if (builder.estimatedSize() > 0) {
        const sst_id = self.getNextSstId();
        const path = try self.pathOfSst(sst_id);
        defer self.allocator.free(path);
        var sst = try builder.build(sst_id, self.block_cache.clone(), path);
        errdefer sst.deinit();
        var sst_ptr = try SsTablePtr.create(self.allocator, sst);
        errdefer sst_ptr.deinit();
        try new_ssts.append(sst_ptr);
    }
    return new_ssts;
}
  • 替換壓縮后的SST

這部分邏輯并不復雜,即刪除此次壓縮任務中的原有兩層數據,用新合并的SSTable替換至較低層數據。

這里有個需要注意的點,即壓縮過程是在一個線程中單獨執行的,壓縮過程中LSM-Tree的原數據可能發生了改變,所以這里執行SSTable刪除時要注意過濾掉新數據,不能覆蓋了有效數據。

并發問題是軟件中的Bug集散地!

pub fn applyCompactionResult(
    _: SimpleLeveledCompactionController,
    state: *storage.StorageState,
    task: SimpleLeveledCompactionTask,
    output: []usize,
) !std.ArrayList(usize) {
    var files_to_remove = std.ArrayList(usize).init(state.allocator);
    errdefer files_to_remove.deinit();


    if (task.upper_level) |upper_level| {
        // 刪除高層SSTable數據,這層數據不會在壓縮過程中變更,放心刪
        std.debug.assert(sliceEquals(
            task.upper_level_sst_ids.items,
            state.levels.items[upper_level - 1].items,
        ));
        try files_to_remove.appendSlice(task.upper_level_sst_ids.items);
        state.levels.items[upper_level - 1].clearAndFree();
    } else {
        // 刪除L0數據,需要小心
        try files_to_remove.appendSlice(task.upper_level_sst_ids.items);
        var new_l0_sstables = std.ArrayList(usize).init(state.allocator);
        errdefer new_l0_sstables.deinit();


        {
            var l0_sst_compacted = std.AutoHashMap(usize, struct {}).init(state.allocator);
            defer l0_sst_compacted.deinit();
            for (task.upper_level_sst_ids.items) |sst_id| {
                try l0_sst_compacted.put(sst_id, .{});
            }


            for (state.l0_sstables.items) |sst_id| {
                if (!l0_sst_compacted.remove(sst_id)) { // 不在壓縮任務中的SST不能刪除
                    try new_l0_sstables.append(sst_id);
                }
            }
            std.debug.assert(l0_sst_compacted.count() == 0);
        }
        state.l0_sstables.deinit();
        state.l0_sstables = new_l0_sstables;
    }
    // 低層SSTable數據,直接刪除
    try files_to_remove.appendSlice(task.lower_level_sst_ids.items);
    state.levels.items[task.lower_level - 1].clearAndFree();
    try state.levels.items[task.lower_level - 1].appendSlice(output);


    return files_to_remove;
}




// sst to remove
var ssts_to_remove = std.ArrayList(SsTablePtr).init(self.allocator);


{
    var new_sst_ids = std.ArrayList(usize).init(self.allocator);
    defer new_sst_ids.deinit();


    self.state_lock.lock();
    defer self.state_lock.unlock();


    for (sstables.items) |sst| {
        const id: usize = @intCast(sst.get().sstId());
        try new_sst_ids.append(id);
        try self.state.sstables.put(id, sst.clone());
    }


    var file_to_remove = try self.compaction_controller.applyCompactionResult(
        &self.state,
        task,
        output.items,
    );
    defer file_to_remove.deinit();


    for (file_to_remove.items) |id| {
        if (self.state.sstables.fetchRemove(id)) |kv| {
            try ssts_to_remove.append(kv.value);
        }
    }
    try self.syncDir();
}


for (ssts_to_remove.items) |sst| {
    const path = try self.pathOfSst(sst.get().sstId());
    defer self.allocator.free(path);
    try std.fs.cwd().deleteFile(path);
}
try self.syncDir();

四、總結

我們使用Zig語言實現了一個LSM-Tree的核心功能,包括MemTable、SSTable、寫流程、各類Iterator與數據壓縮能力。通過這個項目,我收獲了很多心得體會。

了解了LSM-Tree的核心流程

以往對LSM這個數據結構的多層SST設計與寫過程早有耳聞,但是讀流程的實現不太理解。這個項目解答了我疑惑很久的讀流程的實現,特別是MergeIterator的算法設計非常巧妙。

摸索了個zig語言的智能指針

Zig語言沒有內存安全的保證,為了不想指針亂飛到處泄露,在Deepseek的幫助下實現了一個簡單的智能指針,極大降低了內存管理的心智負擔。

工程經驗

  • 盡可能多的做assertion的工作,可以提前暴露很多bug。
  • 大型多模塊的項目,一定要寫單元測試,不然出了bug無法分塊定位問題。
  • 千萬不要把IO過程放在鎖的范圍里,極大的影響性能!
責任編輯:武曉燕 來源: 得物技術
相關推薦

2019-11-26 15:12:08

數據存儲B+樹

2022-11-08 15:14:17

MyBatis插件

2022-10-29 08:44:39

分布式數據庫存儲

2022-06-02 09:09:27

前端React低代碼編輯器

2020-09-24 11:46:03

Promise

2021-06-30 07:19:36

網絡安全

2023-10-24 16:44:24

RubyDNS

2023-07-25 14:24:33

元素JSX解析器

2023-03-21 07:35:43

2022-09-01 10:46:02

前端組件庫

2025-02-05 12:09:12

2015-11-17 16:11:07

Code Review

2019-01-18 12:39:45

云計算PaaS公有云

2018-04-18 07:01:59

Docker容器虛擬機

2021-03-10 09:52:38

開發技能架構

2024-12-06 17:02:26

2020-07-02 15:32:23

Kubernetes容器架構

2011-04-29 10:46:32

iPhone開發入門iPhoneiOS

2023-11-14 16:14:49

2019-04-24 15:06:37

Http服務器協議
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 91se在线| 国产精品美女久久久久aⅴ国产馆 | 国产一区二区自拍 | 丁香五月网久久综合 | 欧洲成人 | 超碰在线免费 | 日本久久网| 午夜精品久久 | 日日操天天射 | 国产精品欧美一区二区三区不卡 | 国产免费观看一级国产 | 91在线精品一区二区 | 成人免费网视频 | 91午夜在线 | 精品一区二区三区在线观看国产 | 一级黄色裸片 | 伊人伊人伊人 | 91欧美精品成人综合在线观看 | 欧美电影一区 | 日韩在线国产 | 亚洲欧洲日本国产 | www亚洲精品| 日本久久久影视 | 91视频网| 欧美日韩成人 | 欧美久久久久久久 | 亚洲国产成人av好男人在线观看 | 国产成人在线观看免费 | 在线午夜电影 | 精品欧美一区二区在线观看视频 | 日日碰狠狠躁久久躁96avv | 成av在线 | a级片www| 久久中文字幕电影 | 日韩欧美一区二区三区免费看 | 亚洲免费三区 | 国产精品久久久久久久久图文区 | 欧美激情一区二区 | 宅女噜噜66国产精品观看免费 | 亚洲二区在线 | 国产日韩一区二区三区 |