Linux 內核動態追蹤技術的實現
之前的文章介紹了基于 tracepoint 靜態追蹤技術的實現,本文再介紹基于 kprobe 的動態追蹤即使的實現。同樣,動態追蹤也是排查問題的利器。
kprobe 是內核提供的動態追蹤技術機制,它允許動態安裝內核模塊的方式安裝系統鉤子,非常強大。下面先看一個內核中的例子。
- #include <linux/kernel.h>
- #include <linux/module.h>
- #include <linux/kprobes.h>
- #define MAX_SYMBOL_LEN 64
- // 要 hanck 的內核函數名
- static char symbol[MAX_SYMBOL_LEN] = "_do_fork";
- module_param_string(symbol, symbol, sizeof(symbol), 0644);
- static struct kprobe kp = {
- .symbol_name = symbol,
- };
- // 執行系統函數前被執行的鉤子
- static int __kprobes handler_pre(struct kprobe *p, struct pt_regs *regs){
- // ...
- }
- // 執行系統函數的單條指令后執行的鉤子(不是執行完系統函數)
- static void __kprobes handler_post(struct kprobe *p, struct pt_regs *regs,
- unsigned long flags){
- // ...
- }
- // 鉤子執行出錯或者單條執行執行出錯時被執行函數static int handler_fault(struct kprobe *p, struct pt_regs *regs, int trapnr){
- // ...
- }
- static int __init kprobe_init(void){
- int ret;
- // 設置鉤子
- kp.pre_handler = handler_pre;
- kp.post_handler = handler_post;
- kp.fault_handler = handler_fault;
- // 安裝鉤子
- register_kprobe(&kp);
- return 0;
- }
- static void __exit kprobe_exit(void){
- unregister_kprobe(&kp);
- pr_info("kprobe at %p unregistered\n", kp.addr);
- }
- // 安裝進內核后的初始化和注銷函數
- module_init(kprobe_init)
- module_exit(kprobe_exit)
- MODULE_LICENSE("GPL");
設置完 kprobe 后,通過 register_kprobe 注冊到內核。
- int register_kprobe(struct kprobe *p){
- int ret;
- struct kprobe *old_p;
- struct module *probed_mod;
- kprobe_opcode_t *addr;
- // 通過系統函數名找到對應的地址,內核維護了這個數據
- addr = kprobe_addr(p);
- // 記錄這個地址
- p->addr = addr;
- p->flags &= KPROBE_FLAG_DISABLED;
- p->nmissed = 0;
- INIT_LIST_HEAD(&p->list);
- // 之前是否已經存在鉤子,是的話就插入存在的列表,否則插入一個新的記錄
- old_p = get_kprobe(p->addr);
- if (old_p) {
- /* Since this may unoptimize old_p, locking text_mutex. */
- ret = register_aggr_kprobe(old_p, p);
- goto out;
- }
- // 把被 hack 的系統函數的指令保存到 probe 結構體,因為下面要覆蓋這塊內存
- /*
- prepare_kprobe =>
- unsigned long addr = (unsigned long) p->addr;
- unsigned long *kprobe_addr = (unsigned long *)(addr & ~0xFULL);
- memcpy(&p->opcode, kprobe_addr, sizeof(kprobe_opcode_t));
- memcpy(p->ainsn.insn, kprobe_addr, sizeof(kprobe_opcode_t));
- */
- ret = prepare_kprobe(p);
- INIT_HLIST_NODE(&p->hlist);
- // 插入內核維護的哈希表
- hlist_add_head_rcu(&p->hlist,
- &kprobe_table[hash_ptr(p->addr, KPROBE_HASH_BITS)]);
- // hack 掉系統函數所在內存的內容
- arm_kprobe(p);
- }
注冊一個 probe,首先是通過被 hack 的函數名找到對應的地址,然后保存這個地址對應內存的信息,接著把 probe 插入哈希表,最后調用 arm_kprobe 函數 hack 掉系統函數所在內存的內容。看一下 arm_kprobe。
- void arch_arm_kprobe(struct kprobe *p){
- // #define INT3_INSN_OPCODE 0xCC
- u8 int3 = INT3_INSN_OPCODE;
- // 把 int3 的內存復制到 addr
- text_poke(p->addr, &int3, 1);
- text_poke_sync();
- perf_event_text_poke(p->addr, &p->opcode, 1, &int3, 1);
- }
0xCC 是 intel 架構下 int3 對應的指令。所以這里就是把被 hack 函數對應指令的前面部分改成 int3。完成 hack。當執行到系統函數的時候,就會執行 int3,從而觸發 trap,并執行對應的處理函數 do_int3(這里比較復雜,我也沒有深入分析,大概是這個流程)。
- static bool do_int3(struct pt_regs *regs){
- kprobe_int3_handler(regs);}int kprobe_int3_handler(struct pt_regs *regs){
- kprobe_opcode_t *addr;
- struct kprobe *p;
- struct kprobe_ctlblk *kcb;
- addr = (kprobe_opcode_t *)(regs->ip - sizeof(kprobe_opcode_t));
- kcb = get_kprobe_ctlblk();
- // 通過地址從 probe 哈希表拿到對應的 probe 結構體
- p = get_kprobe(addr);
- set_current_kprobe(p, regs, kcb);
- kcb->kprobe_status = KPROBE_HIT_ACTIVE;
- // 執行 pre_handler 鉤子
- if (!p->pre_handler || !p->pre_handler(p, regs))
- setup_singlestep(p, regs, kcb, 0);
- }
執行完。pre_handler 鉤子后,會通過 setup_singlestep 設置單步執行 flag。
- static void setup_singlestep(struct kprobe *p, struct pt_regs *regs,
- struct kprobe_ctlblk *kcb, int reenter){
- // 修改寄存器的值
- // 設置 eflags 寄存器的 tf 位,允許單步調試
- regs->flags |= X86_EFLAGS_TF;
- regs->flags &= ~X86_EFLAGS_IF;
- // 設置下一條指令為系統函數的指令
- if (p->opcode == INT3_INSN_OPCODE)
- regs->ip = (unsigned long)p->addr;
- else
- regs->ip = (unsigned long)p->ainsn.insn;
- }
setup_singlestep 首先設置了允許單步調試,也就是說執行下一條指令后會觸發一個 trap,從而執行一個處理函數。并設置了下一條指令為被 hack 函數對應的指令,這是在注冊 probe 時保存下來的。觸發單步調試的 trap 后,最終會執行到 kprobe_debug_handler
- int kprobe_debug_handler(struct pt_regs *regs){
- struct kprobe *cur = kprobe_running();
- struct kprobe_ctlblk *kcb = get_kprobe_ctlblk();
- // 恢復指令為系統函數的指令
- resume_execution(cur, regs, kcb);
- regs->flags |= kcb->kprobe_saved_flags;
- // 執行 post 鉤子
- if ((kcb->kprobe_status != KPROBE_REENTER) && cur->post_handler) {
- kcb->kprobe_status = KPROBE_HIT_SSDONE;
- cur->post_handler(cur, regs, 0);
- }
- }
在單步調試的 trap 處理函數中,會執行 post 鉤子,并恢復真正的系統函數執行。這就完成了整個過程。
我們可以看到 kprobe 可以在系統函數執行前執行我們的鉤子,另外內核還提供了另外一個機制 kretprobe 用于在系統函數執行后返回前安裝鉤子。下面通過一個例子大致看一下 kretprobe。
- struct my_data {
- ktime_t entry_stamp;
- };
- // 記錄函數執行開始時間
- static int entry_handler(struct kretprobe_instance *ri, struct pt_regs *regs){
- struct my_data *data;
- data = (struct my_data *)ri->data;
- data->entry_stamp = ktime_get();
- return 0;
- }
- // 記錄函數執行結束時間
- static int ret_handler(struct kretprobe_instance *ri, struct pt_regs *regs){
- unsigned long retval = regs_return_value(regs);
- struct my_data *data = (struct my_data *)ri->data;
- s64 delta;
- ktime_t now;
- now = ktime_get();
- delta = ktime_to_ns(ktime_sub(now, data->entry_stamp));
- return 0;
- }
- static struct kretprobe my_kretprobe = {
- // 函數返回前執行
- .handler = ret_handler,
- // 函數開始前執行
- .entry_handler = entry_handler,
- .data_size = sizeof(struct my_data),
- /* Probe up to 20 instances concurrently. */
- .maxactive = 20,
- };
- static char func_name[NAME_MAX] = "_do_fork";
- module_param_string(func, func_name, NAME_MAX, S_IRUGO);
- my_kretprobe.kp.symbol_name = func_name;
- // 注冊
- register_kretprobe(&my_kretprobe);
我們可以看到可以通過 kretprobe 計算系統函數的耗時。kretprobe 是基于 kprobe 實現的,主要邏輯是通過通過 kprobe 注冊一個 pre_handler,在 pre_handler 中 hack 掉函數的棧,因為函數執行時,返回地址是存在棧中的,把這個內存改成一段內核的代碼,等到函數執行完后,彈出返回地址時,就會執行內核 hack 的代碼,從而執行我們的鉤子,執行完后再跳回到真正的返回地址繼續執行。
總結:內核通過劫持的方式實現了 kprobe,基于 kprobe 的動態追蹤技術可謂是非常復雜而強大,我們可以利用這個機制,動態修改邏輯,收集信息。不過實現過于復雜,涉及到對 CPU 架構和內存模型的了解,本文也是大致分析了一下流程,有興趣的同學可以自行查看源碼。