簡析XDP的重定向機制
一. XDP Socket示例解析
源碼參見:https://github.com/xdp-project/xdp-tutorial/tree/master/advanced03-AF_XDP 該示例演示了如何通過BPF將網絡數據包從XDP Hook點旁路到用戶態的XDP Socket,解析過程中為突出重點,將只關注重點代碼段,一些函數會被精簡,比如:錯誤處理等。
二. BPF 程序 af_xdp_kern.c
BPF程序是運行在內核態的一段代碼,如下:
struct bpf_map_def SEC("maps") xsks_map = {
.type = BPF_MAP_TYPE_XSKMAP,
.key_size = sizeof(int),
.value_size = sizeof(int),
.max_entries = 64, /* Assume netdev has no more than 64 queues */
};
SEC("xdp_sock")
int xdp_sock_prog(struct xdp_md *ctx)
{
int index = ctx->rx_queue_index;
if (bpf_map_lookup_elem(&xsks_map, &index))
return bpf_redirect_map(&xsks_map, index, 0);
return XDP_PASS;
}
struct bpf_map_def SEC("maps") xsks_map?: 定義了一個BPF_MAP_TYPE_XSKMAP類型的映射表,當采用SEC("maps")方式來顯示定義時,將在生成的bpf目標文件的ELF格式中看到相關描述,當BPF程序被加載到內核時,會自動創建名為“xsks_map”的描述符, 用戶態可通過查找“xsks_map”來獲取該map的描述符,這樣用戶態和內核BPF程序就可以共同訪問該map。
type = BPF_MAP_TYPE_XSKMAP:指定該map的類型,它與bpf_redirect_map() 結合使用以將收到的幀傳遞到指定套接字。
key_size = sizeof(int),value_size = sizeof(int):指定key,value長度。
針對以上key,value需要說明一下:對于BPF_MAP_TYPE_XSKMAP類型的map,value必須是XDP socket描述符,key必須是int類型,原因在于bpf_redirect_map()的第二個參數,參見下面2.10。
max_entries = 64:指定map最多存儲64個元素。
SEC("xdp_sock"):指定prog函數符號,應用層可通過查找"xdp_sock"加載該prog,并綁定到指定網卡。
int xdp_sock_prog(struct xdp_md *ctx):當網卡收到數據包時,會在xdp hook點調用該函數。
int index = ctx->rx_queue_index: 獲取該數據包來自網卡到哪個rx隊列ID,ctx有許多成員,比如:網卡ID,數據幀等等。
if (bpf_map_lookup_elem(&xsks_map, &index)): 判斷xsks_map是否存在key為index(即rx隊列號)的數據,注意,這里實際上就是判斷該網卡是否綁定了xdp Socket。
bpf_redirect_map(&xsks_map, index, 0):bpf_redirect_map?函數作用就是重定向,比如:將數據重定向到某個網卡,CPU, Socket等等;當bpf_redirect_map?函數的第一個參數的map類型為BPF_MAP_TYPE_XSKMAP時,則表示將數據重定向到XDP Scoket。
bpf_redirect_map()會查找參數1即xsks_map 中 key為index 的 value 是否存在,若存在,則檢查value是否是一個XDP Scoket,并且是否綁定到了該網卡(可以綁定到任意有效隊列)。
綜合以上,該bpf程序實現的功能就是:將收到的數據包重定向到xsks_map中指定的XDP Socket。
三. 用戶態程序 af_xdp_user.c
該程序實現bpf加載到網卡,創建XDP Scoket并綁定到網卡的指定隊列,并通過XDP Scoket收發數據,這里僅分析xXDP Scoket相關部分。
int main(int argc, char **argv)
{
...
bpf_obj = load_bpf_and_xdp_attach(&cfg);
map = bpf_object__find_map_by_name(bpf_obj, "xsks_map");
...
xsks_map_fd = bpf_map__fd(map);
...
umem = configure_xsk_umem(packet_buffer, packet_buffer_size);
...
xsk_socket = xsk_configure_socket(&cfg, umem);
...
rx_and_process(&cfg, xsk_socket);
...
}
static struct xsk_socket_info *xsk_configure_socket(struct config *cfg,
struct xsk_umem_info *umem)
{
...
ret = xsk_socket__create(&xsk_info->xsk, cfg->ifname,
cfg->xsk_if_queue, umem->umem, &xsk_info->rx,
&xsk_info->tx, &xsk_cfg);
...
bpf_obj = load_bpf_and_xdp_attach(&cfg): 加載bpf程序,并綁定到網卡。
map = bpf_object__find_map_by_name(bpf_obj, "xsks_map"): 查找bpf程序內定義的xsks_map。
umem = configure_xsk_umem(packet_buffer, packet_buffer_size): 為XDP Scoket準備UMEM。
xsk_configure_socket()通過調用bpf helper函數xsk_socket__create()創建XDP Scoket并綁定到cfg->ifname網卡的cfg->xsk_if_queue隊列,默認情況下將該【cfg->xsk_if_queue, xsk_info->xsk fd】添加到xsks_map, 這樣bpf程序就可以重定向到該XDP Scoket(參見2.9, 2.10), 除非指定XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD標志。
static void rx_and_process(struct config *cfg,
struct xsk_socket_info *xsk_socket)
{
struct pollfd fds[2];
int ret, nfds = 1;
memset(fds, 0, sizeof(fds));
fds[0].fd = xsk_socket__fd(xsk_socket->xsk);
fds[0].events = POLLIN;
while(!global_exit) {
if (cfg->xsk_poll_mode) {
ret = poll(fds, nfds, -1);
if (ret <= 0 || ret > 1)
continue;
}
handle_receive_packets(xsk_socket);
}
}
XDP Scoket也是一個文件描述符,因此可以通過poll/epoll/select來等待IO事件,需要說明的是:收/發的數據包是原始的以太網幀,因此在包處理上要麻煩一些。
四. 總結
以上簡略分析了bpf程序如何將數據重定向到用戶態程序,通過xsks_map來實現bpf與用戶態程序的交互;
需要說明的是,這些分析僅是梳理了淺層次的代碼,實際上BPF是如何將數據讀寫到XDP Scoket收發緩沖區的呢?其實是通過創建共享內存并關聯XDP Scoket的rx_ring,tx_ring,以及umem來實現的,后續繼續分析。
bpf程序通常都非常簡單,復雜的是用戶態程序,此外,BPF有非常多的技術細節,限于篇幅及主題不在此展開。