成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Linux 網絡編程:從 Socket API 到極簡 Redis 發布/訂閱 sub/pub 服務的實現

系統 Linux
網絡編程的本質是實現進程間通信(Inter-Process Communication, IPC),特別是跨越主機邊界的分布式通信。在 Linux 及其他類 Unix 操作系統中,Socket(套接字)接口是實現網絡通信的標準范式。

引言

本文旨在系統性地闡述 Linux 環境下的網絡編程基礎,重點關注 Socket 應用程序接口(API)的原理與應用。通過循序漸進的方式,結合具體的 C 語言代碼示例,我們將剖析核心系統調用的機制,并最終構建一個基于傳輸控制協議(TCP)的簡化版發布/訂閱(Publish/Subscribe, Pub/Sub)服務器模型。

在進行深入探討之前,本文假設讀者已具備以下先驗知識:

  1. C 語言編程能力 :熟悉 C 語言的核心語法、指針操作及內存管理機制。
  2. Linux 操作系統基礎 :了解 Linux 基本命令行操作及 C 程序的編譯流程(例如,使用 GCC 工具鏈)。
  3. 計算機網絡基礎 :對 OSI 參考模型或 TCP/IP 協議棧有概念性認識,理解 TCP 與 UDP 的核心差異(面向連接與無連接、可靠性保證機制等),并掌握 IP 地址和端口號在網絡通信中的作用。

網絡編程的本質是實現進程間通信(Inter-Process Communication, IPC),特別是跨越主機邊界的分布式通信。在 Linux 及其他類 Unix 操作系統中,Socket(套接字)接口是實現網絡通信的標準范式。它提供了一套抽象的 API,允許應用程序將網絡通信視為一種特殊的文件 I/O 操作,從而簡化了網絡數據收發的復雜性。

本文的實踐目標是構建一個功能類似于 Redis 服務器 PUBLISH 和 SUBSCRIBE 命令的簡化服務實例:

  • 服務器進程監聽一個預定義的 TCP 端口。
  • 允許多個客戶端并發連接至服務器。
  • 客戶端通過發送特定格式的命令(如 SUB <topic_name>)訂閱感興趣的主題。
  • 客戶端通過發送特定格式的命令(如 PUB <topic_name> <message_data>)向指定主題發布消息。
  • 服務器負責將發布的消息轉發給所有訂閱了對應主題的客戶端(通常不包括發布者自身)。

接下來,我們將逐步解析相關的系統調用和編程技術。

Linux 上的 Socket API

在 Linux 操作系統的設計哲學中,“一切皆文件”是一個核心概念。網絡連接在內核層面被抽象為一種特殊的文件類型,并通過文件描述符(File Descriptor)進行管理。socket() 系統調用是創建此類“網絡文件”的入口點,用于在網絡通信的參與方創建一個通信端點(Endpoint)。

socket() 系統調用詳解

socket() 的主要功能是在內核中創建一個新的、未連接的套接字,并返回一個與之關聯的文件描述符,供用戶空間程序使用。其函數原型定義于 <sys/socket.h> 頭文件中:

#include <sys/types.h>
#include <sys/socket.h>

int socket(int domain, int type, int protocol);

參數說明 :

  • domain (地址族 Address Family - AF): 指定套接字使用的協議族。常用取值包括:

a.AF_INET: 用于 IPv4 網絡協議通信。本文主要采用此地址族。

b.AF_INET6: 用于 IPv6 網絡協議通信。

c.AF_UNIX (或 AF_LOCAL): 用于同一主機內部的進程間通信,依賴本地文件系統路徑而非網絡地址。

  • type (套接字類型 Socket Type): 定義套接字的通信語義。關鍵取值有:
  • SOCK_STREAM: 提供面向連接、可靠的、基于字節流的傳輸服務。TCP 協議即為此類型,保證數據傳輸的順序性和可靠性(通過序列號、確認、重傳等機制)。
  • SOCK_DGRAM: 提供無連接、不可靠的數據報服務。UDP 協議屬此類,數據包傳輸可能發生丟失、重復或亂序。
  • protocol (協議 Protocol): 通常設置為 0,表示由系統根據指定的 domain 和 type 自動選擇默認協議。例如,AF_INET 與 SOCK_STREAM 組合通常默認選用 IPPROTO_TCP;AF_INET 與 SOCK_DGRAM 組合則默認選用 IPPROTO_UDP。亦可顯式指定協議常量(如 IPPROTO_TCP)。

內核操作與數據結構 :

當應用程序調用 socket() 時,會觸發一次系統調用,進入內核態執行:

  1. 資源分配 :內核網絡協議棧為新的套接字分配必要的內存資源,創建一個內部的 struct socket 或類似的核心數據結構。此結構包含了套接字的狀態信息(如初始狀態、類型、協議族)、發送和接收緩沖區、指向特定協議(如 TCP、UDP)處理函數的指針、以及等待隊列等。
  2. 文件描述符關聯 :內核在當前進程的文件描述符表中找到一個未使用的條目,并將該條目指向一個代表該套接字的內核文件對象(struct file)。這個文件描述符(一個非負整數)是用戶空間程序操作該套接字的句柄。
  3. 返回 :系統調用返回新分配的文件描述符給應用程序。若創建失?。ㄈ缳Y源不足、權限問題),則返回 -1,并設置全局變量 errno 以指示具體錯誤代碼。

返回值 :

  • 成功:返回一個新的文件描述符(非負整數)。
  • 失?。悍祷?-1,并設置 errno。

示例:創建 IPv4 TCP 套接字

#include <stdio.h>      // 標準輸入輸出
#include <stdlib.h>     // 標準庫函數,如 exit
#include <sys/socket.h> // 套接字核心函數和數據結構
#include <errno.h>      // 錯誤碼 errno
#include <unistd.h>     // close 函數

int main() {
    int sockfd;

    // 創建一個用于 IPv4 TCP 通信的套接字
    // AF_INET: 指定使用 IPv4 協議族
    // SOCK_STREAM: 指定使用面向連接的字節流服務 (TCP)
    // 0: 讓內核自動選擇合適的協議 (對于 AF_INET 和 SOCK_STREAM,通常是 IPPROTO_TCP)
    sockfd = socket(AF_INET, SOCK_STREAM, 0);

    // 檢查 socket() 調用是否成功
    if (sockfd == -1) {
        perror("socket 創建失敗"); // perror 會根據當前的 errno 值打印錯誤信息
        exit(EXIT_FAILURE);       // 異常退出程序
    }

    printf("套接字創建成功! 文件描述符: %d\n", sockfd);

    // 在實際應用中,套接字使用完畢后應顯式關閉
    // close(sockfd);

    return 0;
}

編譯與執行 :

gcc create_socket_example.c -o create_socket_example
./create_socket_example
套接字創建成功! 文件描述符: 4

輸出將提示套接字創建成功,并顯示其文件描述符。

sockaddr_in 結構體與地址表示

僅創建套接字不足以進行通信,服務器端需要將其綁定到具體的本地網絡地址(IP 地址和端口號)。在 IPv4 環境下,此地址信息通過 struct sockaddr_in 結構體(定義于 <netinet/in.h>)來承載:

#include <netinet/in.h>

struct sockaddr_in {
    sa_family_t    sin_family; /* 地址族: 固定為 AF_INET */
    in_port_t      sin_port;   /* 端口號 (網絡字節序) */
    struct in_addr sin_addr;   /* IPv4 地址 (網絡字節序) */
    // char           sin_zero[8]; /* 填充字節,通常不直接使用,應清零 */
};

/* IPv4 地址結構 */
struct in_addr {
    uint32_t       s_addr;     /* 32位IPv4地址 (網絡字節序) */
};

關鍵字段解析 :

  • sin_family : 地址族,對于 IPv4,必須設置為 AF_INET,與 socket() 調用中的 domain 參數保持一致。
  • sin_port : 端口號。 注意 :該字段必須存儲為 網絡字節序 (Network Byte Order,即大端序 Big-Endian)。應用程序需使用 htons() (Host to Network Short) 函數將主機字節序(Host Byte Order)的端口號轉換為網絡字節序。例如,htons(8080)。
  • sin_addr.s_addr : 32位 IPv4 地址。 注意 : 同樣必須是 網絡字節序 。

服務器端執行 bind() 時,若希望監聽本機所有可用的網絡接口,應將此字段設置為 htonl(INADDR_ANY)。INADDR_ANY 是一個特殊常量(通常為 0),htonl() (Host to Network Long) 用于將其轉換為主機字節序到網絡字節序。

若需綁定到特定 IP 地址(如服務器僅監聽某塊網卡,或客戶端執行 connect() 時指定目標服務器 IP),可使用 inet_pton() (Presentation to Network) 函數將點分十進制表示的 IP 地址字符串(如 "192.168.1.100")轉換為網絡字節序的 32 位整數,并存入 s_addr。

字節序轉換函數 (通常定義于 <arpa/inet.h>):

  • htons(): 主機字節序到網絡字節序(16位,用于端口號)。
  • htonl(): 主機字節序到網絡字節序(32位,用于 IPv4 地址)。
  • ntohs(): 網絡字節序到主機字節序(16位)。
  • ntohl(): 網絡字節序到主機字節序(32位)。
  • inet_pton(AF_INET, "ip_string", &addr_struct->sin_addr): 將點分十進制 IP 字符串轉換為網絡字節序二進制形式。
  • inet_ntop(AF_INET, &addr_struct->sin_addr, buffer, buffer_size): 將網絡字節序二進制 IP 地址轉換為點分十進制字符串。

bind(), listen(), accept() - 服務器端核心調用

對于 TCP 服務器而言,創建套接字后,必須執行一系列步驟來準備接收客戶端連接:

  1. bind() : 將套接字與一個本地 IP 地址和端口號關聯起來,定義服務的監聽地址。
  2. listen() : 將套接字設置為監聽模式,使其能夠接受外來的連接請求,并配置連接請求隊列。
  3. accept() : 從已完成三次握手的連接隊列中接受一個連接,并為此連接創建一個新的專用套接字。

1. bind() 系統調用

bind() 用于將 socket() 創建的套接字文件描述符 sockfd 與 my_addr 指定的本地地址(IP 和端口)進行綁定。

#include <sys/socket.h>

int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
  • sockfd: socket() 返回的文件描述符。
  • addr: 指向包含待綁定地址信息的 sockaddr 結構體的指針。對于 IPv4,實際傳遞的是已填充好的 struct sockaddr_in 結構體的地址,需強制類型轉換為 (struct sockaddr *)。
  • addrlen: addr 指向的結構體的大小,對于 struct sockaddr_in,通常為 sizeof(struct sockaddr_in)。

內核操作 :

調用 bind() 進入內核態后:

  1. 地址復制與校驗 :內核將用戶空間傳入的 sockaddr 結構體復制到內核內存。
  2. 狀態檢查 :檢查 sockfd 對應的套接字是否有效且未被綁定。
  3. 地址可用性檢查 :檢查指定的 IP 地址和端口號是否可用。對于端口號,檢查是否已被其他套接字綁定(除非設置了 SO_REUSEADDR 等選項);對于 IP 地址,檢查是否是分配給本機的有效地址(或 INADDR_ANY)。
  4. 權限檢查 :檢查進程是否有權限綁定到指定端口(通常,綁定到 1024 以下的端口需要超級用戶權限)。
  5. 綁定操作 :如果所有檢查通過,內核將該地址信息與內部的套接字結構關聯起來。

返回值 :成功返回 0;失敗返回 -1,并設置 errno。常見錯誤包括 EADDRINUSE(地址已在使用)、EACCES(權限不足)、EINVAL(sockfd 無效或已綁定)。

示例:綁定套接字到本地 8080 端口

#include <stdio.h>
#include <stdlib.h>
#include <string.h>       // 用于 memset
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>    // 用于 htons, htonl
#include <unistd.h>       // 用于 close

#define PORT 8080

int main() {
    int server_fd;
    struct sockaddr_in address;
    int opt = 1; // 用于 setsockopt

    // 創建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket 創建失敗");
        exit(EXIT_FAILURE);
    }

    // 可選: 設置套接字選項,允許地址重用,便于服務器快速重啟
    // SO_REUSEADDR 允許重用本地地址 (IP+端口),尤其是在 TIME_WAIT 狀態下的端口
    // SO_REUSEPORT (需要內核支持) 允許多個進程綁定到同一 IP 和端口
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt 失敗");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 準備 sockaddr_in 結構體
    memset(&address, 0, sizeof(address)); // 推薦清零結構體
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = htonl(INADDR_ANY); // 監聽所有網絡接口
    address.sin_port = htons(PORT);              // 監聽指定端口 (轉換為網絡字節序)

    // 將套接字綁定到指定的地址和端口
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind 失敗");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    printf("套接字成功綁定到端口 %d\n", PORT);

    // ... 后續步驟: listen() 和 accept() ...

    close(server_fd); // 完成后關閉監聽套接字

    return 0;
}

2. listen() 系統調用

listen() 用于將一個已綁定的流式套接字(SOCK_STREAM 或 SOCK_SEQPACKET)轉換為被動監聽狀態,使其能夠接受傳入的連接請求。

#include <sys/socket.h>

int listen(int sockfd, int backlog);
  • sockfd: 已成功 bind() 的套接字文件描述符。
  • backlog: 指定內核為此監聽套接字維護的 已完成連接隊列 (Completed Connection Queue, 或稱 Accept Queue)的最大長度。

內核操作與連接隊列 :

當調用 listen() 時:

  • 狀態轉換 :內核將 sockfd 對應的內部套接字結構的狀態從 CLOSED(或 BOUND)修改為 LISTEN。
  • 隊列初始化 :內核為該監聽套接字關聯并初始化兩個重要的隊列:

a.未完成連接隊列 (Incomplete Connection Queue / SYN Queue) :存儲收到的 SYN 包,但尚未完成三次握手的連接請求(處于 SYN_RCVD 狀態)。此隊列的大小通常由系統參數(如 net.ipv4.tcp_max_syn_backlog)控制,backlog 參數對其影響有限或間接。

b.已完成連接隊列 (Completed Connection Queue / Accept Queue) :存儲已經完成 TCP 三次握手,等待被應用程序通過 accept() 提取的連接(這些連接在內核中已是 ESTABLISHED 狀態,但從服務器監聽角度看是在等待 accept)。backlog 參數主要限制的是這個隊列的大小。當此隊列滿時,內核可能會拒絕新的已完成握手的連接(例如,不響應 ACK,或發送 RST)。

返回值 :成功返回 0;失敗返回 -1,并設置 errno。

示例(續上) :

// ... bind() 成功后 ...

    // 開始監聽傳入連接
    // backlog 設置為 10,意味著最多允許 10 個已完成三次握手的連接在隊列中等待 accept()
    if (listen(server_fd, 10) < 0) {
        perror("listen 失敗");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    printf("服務器正在端口 %d 上監聽...\n", PORT);

    // ... 下一步: accept() ...

3. accept() 系統調用

accept() 從監聽套接字 sockfd 的已完成連接隊列中取出一個連接請求,為該連接創建一個 新的 、已連接的套接字,并返回這個新套接字的文件描述符。

#include <sys/socket.h>

int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
  • sockfd: 處于監聽狀態的套接字文件描述符。
  • addr: (可選)指向 sockaddr 結構體的指針,用于接收發起連接的客戶端的地址信息。若不關心客戶端地址,可傳遞 NULL。
  • addrlen: (可選)指向 socklen_t 類型變量的指針。這是一個 值-結果 參數:

調用前,*addrlen 必須初始化為 addr 指向的緩沖區的實際大小。

調用成功后,內核會將客戶端地址結構的實際大小寫入 *addrlen。若傳入 addr 為 NULL,則 addrlen 也應為 NULL。

內核操作 :

  • 檢查隊列 :內核檢查與 sockfd 關聯的已完成連接隊列。
  • 阻塞/非阻塞行為 :

如果隊列為空且 sockfd 是阻塞模式(默認),accept() 調用將使進程 睡眠 ,直到隊列中出現新的已完成連接。

如果隊列為空且 sockfd 是非阻塞模式,accept() 立即返回 -1,并將 errno 設置為 EAGAIN 或 EWOULDBLOCK。

  1. 提取連接 :如果隊列非空,內核從中取出一個連接。
  • 創建新套接字 :內核為這個被接受的連接創建一個 全新的 內部套接字結構和對應的文件對象。這個新套接字的狀態被設為 ESTABLISHED,并關聯了客戶端的地址信息。
  • 返回新描述符 :內核在進程的文件描述符表中分配一個新的文件描述符,指向這個新創建的已連接套接字的文件對象,并將此描述符返回給應用程序。
  • 填充地址信息 :如果 addr 和 addrlen 參數有效,內核將客戶端的地址信息復制到 addr 指向的緩沖區,并更新 *addrlen。

關鍵點 :accept() 返回的是一個 新的文件描述符 ,代表與特定客戶端的通信通道。后續與該客戶端的數據收發(send(), recv() 等)必須使用這個 新描述符 ,而非原來的監聽描述符 sockfd。監聽描述符 sockfd 保持不變,繼續用于接受后續的連接請求。

返回值 :

  • 成功:返回一個新的、非負的已連接套接字文件描述符。
  • 失?。悍祷?-1,并設置 errno。

示例(續上,接受單個連接) :

// ... listen() 成功后 ...

    int new_socket;
    struct sockaddr_in client_address;
    socklen_t addrlen = sizeof(client_address); // 注意類型是 socklen_t
    char client_ip[INET_ADDRSTRLEN]; // 用于存儲客戶端 IP 字符串

    printf("等待連接...\n");

    // 接受一個傳入連接
    // 默認情況下,accept() 會阻塞,直到有客戶端連接進來
    if ((new_socket = accept(server_fd, (struct sockaddr *)&client_address, &addrlen)) < 0) {
        perror("accept 失敗");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 將客戶端的 IP 地址從網絡字節序轉換為點分十進制字符串以便打印
    inet_ntop(AF_INET, &client_address.sin_addr, client_ip, INET_ADDRSTRLEN);
    printf("接受來自 %s:%d 的連接\n", client_ip, ntohs(client_address.sin_port));
    printf("用于通信的新套接字描述符: %d\n", new_socket);

    // 現在可以使用 new_socket 與該客戶端進行數據交換
    // 例如: send(new_socket, "歡迎!\n", 7, 0);
    // 例如: recv(new_socket, buffer, 1024, 0);

    // 完成與該客戶端的通信后,關閉連接套接字
    close(new_socket);

    // 服務器最終關閉時,關閉監聽套接字
    close(server_fd);

上述示例僅能處理一次連接。為實現并發處理多個客戶端,需引入循環結構,并結合多進程/多線程模型或 I/O 多路復用技術。對于需要高效處理大量并發連接的場景(如我們的 Pub/Sub 服務器),I/O 多路復用是更常用的方案。

select() - I/O 多路復用機制

服務器程序通常需要同時關注多個事件源:監聽套接字上的新連接請求,以及多個已連接客戶端套接字上的數據到達。若使用阻塞式的 accept() 和 recv(),程序執行流會在單一調用點暫停,無法及時響應其他事件。I/O 多路復用技術解決了這個問題,它允許進程同時監視多個文件描述符,并在其中任何一個或多個描述符準備好進行 I/O 操作(可讀、可寫或異常)時獲得通知。select() 是 POSIX 標準中定義的一種經典 I/O 多路復用機制。

#include <sys/select.h>
#include <sys/time.h> // 對于 struct timeval

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);
  • nfds: 被監聽的文件描述符的數量,其值應為所有被監聽文件描述符中的最大值 加 1 。例如,若監聽 fd 3, 5, 8,則 nfds 應為 9。
  • readfds: 指向 fd_set 結構體的指針,用于指定需要監聽 可讀 事件的文件描述符集合。對于監聽套接字,可讀意味著有新連接待 accept();對于已連接套接字,可讀意味著有數據到達、連接已關閉(收到 FIN)或發生錯誤。
  • writefds: 指向 fd_set 的指針,用于指定需要監聽 可寫 事件的文件描述符集合。通常表示套接字的發送緩沖區有可用空間。
  • exceptfds: 指向 fd_set 的指針,用于指定需要監聽 異常 條件的文件描述符集合(如 TCP 帶外數據)。
  • timeout: 指向 struct timeval 結構體的指針,用于設定 select() 的最大等待時間。

struct timeval { time_t tv_sec; suseconds_t tv_usec; }; (秒和微秒)

若 timeout 為 NULL,select() 將無限期阻塞,直到至少有一個描述符就緒。

若 timeout 指向的結構體中 tv_sec 和 tv_usec 均為 0,select() 執行非阻塞檢查,立即返回。

若 timeout 指向的結構體包含正值,select() 最多等待指定時間。超時前有描述符就緒則返回;超時則返回 0。

fd_set 相關宏定義 :

  • FD_ZERO(fd_set *set): 清空(初始化)一個 fd_set 集合。 每次調用 select() 前,對工作集合必須執行此操作或從主集合復制 。
  • FD_SET(int fd, fd_set *set): 將文件描述符 fd 添加到集合 set 中。
  • FD_CLR(int fd, fd_set *set): 將文件描述符 fd 從集合 set 中移除。
  • FD_ISSET(int fd, fd_set *set):  在 select() 返回后 ,用于檢查文件描述符 fd 是否仍在就緒集合 set 中。

重要特性 :select() 調用會 修改 傳入的 fd_set 集合(readfds, writefds, exceptfds),將其中未就緒的文件描述符移除。因此,應用程序通常需要維護一個 主集合 (master set)記錄所有需要關心的文件描述符,在每次循環調用 select() 之前,將主集合的內容 復制 到一個 工作集合 (working set),然后將工作集合傳遞給 select()。

返回值 :

  • 成功:返回三個集合中總共就緒的文件描述符數量。
  • 超時:返回 0。
  • 失?。悍祷?-1,并設置 errno。

使用 select() 的服務器模式 :

  • 初始化監聽套接字 (socket, bind, listen)。
  • 初始化主文件描述符集合 master_fds:FD_ZERO(&master_fds),然后 FD_SET(listener_fd, &master_fds)。
  • 維護當前最大文件描述符 max_fd,初始值為 listener_fd。
  • 進入主事件循環: a.  創建臨時工作集合 read_fds,將其初始化為主集合:read_fds = master_fds。 b.  調用 select(max_fd + 1, &read_fds, NULL, NULL, NULL)(此處示例僅關心讀事件,阻塞等待)。 c.  檢查 select 返回值。若為 -1,處理錯誤(需注意 EINTR 信號中斷)。 d.  遍歷從 0 到 max_fd 的所有文件描述符 i: i.  使用 FD_ISSET(i, &read_fds) 判斷描述符 i 是否在就緒的讀集合中。 ii. 若 i 是監聽套接字 (listener_fd) 且 FD_ISSET 為真: * 調用 accept() 接受新連接,得到 new_fd。 * 將 new_fd 添加到主集合 master_fds:FD_SET(new_fd, &master_fds)。 * 更新 max_fd:if (new_fd > max_fd) max_fd = new_fd;。 iii. 若 i 是已連接客戶端套接字且 FD_ISSET 為真: * 調用 recv(i, buffer, ...) 或 read(i, buffer, ...) 嘗試讀取數據。 * 處理連接狀態 : * 返回值 > 0 : 成功讀取 valread 字節數據。處理接收到的數據(解析命令 SUB/PUB 等)。 * 返回值 0 : 表示對端執行了有序關閉(發送了 FIN 包),所有數據已接收完畢。這是 TCP 半關閉 狀態的體現(對方關閉了寫,本方可以繼續寫,但通常也應準備關閉)。應用程序應: * close(i) 關閉本端的套接字連接。 * 從 master_fds 中移除 i:FD_CLR(i, &master_fds)。 * 清理與該客戶端相關的應用層資源(如訂閱信息)。 * 返回值 -1 : 發生錯誤。檢查 errno: * 若 errno 為 EAGAIN 或 EWOULDBLOCK(非阻塞模式下),表示暫時無數據可讀,不是錯誤。 * 若 errno 為 ECONNRESET,表示對端發送了 RST 包(連接異常中斷)。 * 其他錯誤(如 ETIMEDOUT, ENOTCONN 等)。 * 對于實際錯誤,同樣需要 close(i),從 master_fds 移除 i,并清理資源。

關于 shutdown() 和半關閉

recv/read 返回 0 表明對端關閉了其發送通道。如果本端應用還想發送數據,理論上可以(只要對端接收緩沖區未滿且未完全關閉連接),但這通常不符合應用邏輯。更常見的是,接收到 0 后本端也應關閉連接。

有時應用可能需要主動進行 半關閉 ,即關閉自己的發送通道,但仍保持接收通道打開,以接收對端可能還未發送完的數據。這可以通過 shutdown() 系統調用實現:

#include <sys/socket.h>

int shutdown(int sockfd, int how);
  • sockfd: 要操作的套接字。
  • how: 指定關閉方式:

SHUT_RD: 關閉接收通道(之后不能再從此套接字接收數據)。

SHUT_WR: 關閉發送通道(之后不能再從此套接字發送數據)。這會向對端發送一個 FIN 包。

SHUT_RDWR: 同時關閉接收和發送通道(等同于 close() 的一部分效果,但不釋放文件描述符)。

使用 shutdown(sockfd, SHUT_WR) 后,對端 recv 會在接收完所有已在途的數據后返回 0。而本端仍可調用 recv 接收數據,直到對端也關閉其發送通道(發送 FIN)。

close() 調用則會同時關閉讀寫兩個方向(如果引用計數為零,還會釋放文件描述符和相關內核資源)。

在我們的簡單 Pub/Sub 服務器中,我們直接使用 close() 處理連接終止,這隱含了雙向關閉。

極簡 Pub/Sub 服務器 C 代碼實現

以下是使用 select() 實現的極簡 Pub/Sub 服務器的 C 代碼,注釋已更新為中文。請注意,此實現為了教學目的保持簡潔,省略了許多生產環境中必要的健壯性設計(如完善的錯誤處理、動態資源管理、優化的 max_fd 更新策略等)。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <errno.h>

#define PORT 8080               // 服務器監聽端口
#define MAX_CLIENTS 30          // 最大并發客戶端數量
#define MAX_TOPICS 10           // 最大主題數量
#define MAX_SUBS_PER_TOPIC 10   // 每個主題最大訂閱者數量
#define BUFFER_SIZE 1024        // 接收緩沖區大小

// 主題訂閱數據結構
typedef struct {
    char name[50];                       // 主題名稱
    int subscribers[MAX_SUBS_PER_TOPIC]; // 訂閱該主題的客戶端套接字描述符數組
    int sub_count;                       // 當前訂閱者數量
} Topic;

Topic topics[MAX_TOPICS]; // 全局主題數組
int topic_count = 0;      // 當前主題數量

// 輔助函數:查找或創建主題
int find_or_create_topic(const char* topic_name) {
    // 查找現有主題
    for (int i = 0; i < topic_count; ++i) {
        if (strcmp(topics[i].name, topic_name) == 0) {
            return i; // 找到,返回索引
        }
    }
    // 如果未找到且還有空間,則創建新主題
    if (topic_count < MAX_TOPICS) {
        strncpy(topics[topic_count].name, topic_name, sizeof(topics[topic_count].name) - 1);
        topics[topic_count].name[sizeof(topics[topic_count].name) - 1] = '\0'; // 確??兆址Y尾
        topics[topic_count].sub_count = 0; // 初始化訂閱者數量為0
        return topic_count++; // 返回新主題的索引,并增加主題計數
    }
    return -1; // 主題數組已滿,無法創建
}

// 輔助函數:將客戶端添加到主題的訂閱列表
void add_subscriber(int topic_index, int client_socket) {
    if (topic_index < 0 || topic_index >= topic_count) return; // 無效的主題索引
    Topic* topic = &topics[topic_index];
    if (topic->sub_count < MAX_SUBS_PER_TOPIC) {
        // 檢查是否已訂閱,避免重復添加
        for(int i = 0; i < topic->sub_count; ++i) {
            if (topic->subscribers[i] == client_socket) return; // 已存在,直接返回
        }
        // 添加新的訂閱者
        topic->subscribers[topic->sub_count++] = client_socket;
        printf("客戶端 %d 訂閱了主題 '%s'\n", client_socket, topic->name);
    } else {
        // 主題訂閱已滿
        printf("主題 '%s' 訂閱已滿, 無法添加客戶端 %d\n", topic->name, client_socket);
        // 可選: 向客戶端發送錯誤消息
        // send(client_socket, "ERR topic full\n", 15, 0);
    }
}

// 輔助函數:從所有主題中移除指定客戶端的訂閱
void remove_subscriber(int client_socket) {
    printf("正在移除客戶端 %d 的所有訂閱。\n", client_socket);
    for (int i = 0; i < topic_count; ++i) { // 遍歷所有主題
        int found_idx = -1;
        // 在當前主題的訂閱者列表中查找該客戶端
        for (int j = 0; j < topics[i].sub_count; ++j) {
            if (topics[i].subscribers[j] == client_socket) {
                found_idx = j;
                break;
            }
        }
        // 如果找到該客戶端
        if (found_idx != -1) {
            // 將后續訂閱者向前移動一位,覆蓋掉要移除的客戶端
            for (int k = found_idx; k < topics[i].sub_count - 1; ++k) {
                topics[i].subscribers[k] = topics[i].subscribers[k + 1];
            }
            topics[i].sub_count--; // 減少訂閱者計數
            printf("已將客戶端 %d 從主題 '%s' 中移除\n", client_socket, topics[i].name);
        }
    }
}

// 輔助函數:向指定主題發布消息
void publish_message(int topic_index, const char* message, int publisher_socket) {
    if (topic_index < 0 || topic_index >= topic_count) return; // 無效的主題索引
    Topic* topic = &topics[topic_index];
    char full_message[BUFFER_SIZE + 100]; // 預留足夠空間構造 "MSG <topic> <data>\n" 格式的消息

    // 構造完整的消息格式
    snprintf(full_message, sizeof(full_message), "MSG %s %s", topic->name, message);
     // 確保消息以換行符結束 (如果原始消息沒有的話)
    if (full_message[strlen(full_message)-1] != '\n') {
         strncat(full_message, "\n", sizeof(full_message) - strlen(full_message) - 1);
    }


    printf("向主題 '%s' 發布消息: %s", topic->name, message); // 假設原始 message 可能已包含換行符
    // 遍歷該主題的所有訂閱者
    for (int i = 0; i < topic->sub_count; ++i) {
        int subscriber_socket = topic->subscribers[i];
        // 不將消息發回給發布者自己
        if (subscriber_socket != publisher_socket) {
            // 發送消息給訂閱者
            if (send(subscriber_socket, full_message, strlen(full_message), 0) < 0) {
                // 發送失敗,可能需要處理連接斷開等問題
                perror("向訂閱者發送消息失敗");
                // 注意: 健壯的實現應在此處檢測到錯誤后,可能也需要關閉并清理該訂閱者的連接
            } else {
                 printf("  -> 已發送至客戶端 %d\n", subscriber_socket);
            }
        }
    }
}


int main() {
    int listener_fd;                   // 監聽套接字描述符
    int new_socket;                    // 新接受的客戶端連接套接字描述符
    int client_sockets[MAX_CLIENTS];   // 存儲客戶端套接字描述符的數組
    int max_clients = MAX_CLIENTS;     // 最大客戶端數 (冗余變量,可直接用宏)
    int activity;                      // select() 的返回值
    int i, valread, sd;                // 循環變量, read() 返回值, 當前處理的套接字描述符
    int max_sd;                        // select() 需要的最大文件描述符值 + 1
    struct sockaddr_in address;        // 服務器地址結構
    char buffer[BUFFER_SIZE + 1];      // 數據接收緩沖區 (+1 用于空字符結尾)

    // 文件描述符集合
    fd_set readfds;

    // 初始化客戶端套接字數組,全部置 0 (0 不是有效的文件描述符)
    for (i = 0; i < max_clients; i++) {
        client_sockets[i] = 0;
    }

    // 創建主監聽套接字
    if ((listener_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket 創建失敗");
        exit(EXIT_FAILURE);
    }

    // 設置監聽套接字選項,允許地址重用
    int opt = 1;
    if (setsockopt(listener_fd, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)) < 0) {
        perror("setsockopt 失敗");
        exit(EXIT_FAILURE);
    }

    // 配置服務器地址結構
    address.sin_family = AF_INET;         // IPv4
    address.sin_addr.s_addr = INADDR_ANY; // 監聽所有接口
    address.sin_port = htons(PORT);       // 指定端口(網絡字節序)

    // 將監聽套接字綁定到指定地址和端口
    if (bind(listener_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind 失敗");
        exit(EXIT_FAILURE);
    }
    printf("監聽器已啟動在端口 %d \n", PORT);

    // 使套接字進入監聽狀態,設定等待隊列長度為 3
    if (listen(listener_fd, 3) < 0) {
        perror("listen 失敗");
        exit(EXIT_FAILURE);
    }

    // 等待客戶端連接
    socklen_t addrlen = sizeof(address);
    puts("等待連接中 ...");

    while (1) { // 服務器主循環
        // 清空讀文件描述符集合
        FD_ZERO(&readfds);

        // 將監聽套接字加入集合
        FD_SET(listener_fd, &readfds);
        max_sd = listener_fd; // 初始化 max_sd

        // 將所有活動的客戶端套接字加入集合
        for (i = 0; i < max_clients; i++) {
            sd = client_sockets[i]; // 獲取客戶端套接字描述符

            // 如果是有效的套接字描述符 (大于 0),則加入讀集合
            if (sd > 0)
                FD_SET(sd, &readfds);

            // 更新 max_sd 以跟蹤最大的文件描述符值
            if (sd > max_sd)
                max_sd = sd;
        }

        // 調用 select() 等待活動發生,timeout 設置為 NULL 表示無限期阻塞
        activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);

        // 檢查 select() 是否出錯 (忽略 EINTR 信號中斷)
        if ((activity < 0) && (errno != EINTR)) {
            perror("select 錯誤");
            // 在實際應用中可能需要更復雜的錯誤處理邏輯
        }

        // 檢查監聽套接字是否就緒 (表示有新連接請求)
        if (FD_ISSET(listener_fd, &readfds)) {
            // 接受新連接
            if ((new_socket = accept(listener_fd, (struct sockaddr *)&address, &addrlen)) < 0) {
                perror("accept 失敗");
                // 嚴重錯誤,通常需要退出或重啟服務
                exit(EXIT_FAILURE);
            }

            // 獲取并打印新客戶端的 IP 和端口信息
             char client_ip[INET_ADDRSTRLEN];
             inet_ntop(AF_INET, &address.sin_addr, client_ip, INET_ADDRSTRLEN);
             printf("新連接建立,套接字描述符為 %d,IP 為: %s,端口為: %d\n",
                   new_socket, client_ip, ntohs(address.sin_port));

            // 將新套接字添加到客戶端數組中
            for (i = 0; i < max_clients; i++) {
                // 找到數組中第一個空位
                if (client_sockets[i] == 0) {
                    client_sockets[i] = new_socket;
                    printf("已將套接字 %d 添加到列表位置 %d\n", new_socket, i);
                    break; // 添加成功后退出循環
                }
            }
            // 如果客戶端數組已滿
             if (i == max_clients) {
                printf("客戶端連接數已達上限,拒絕連接。\n");
                close(new_socket); // 關閉這個無法處理的連接
            }
        }

        // 檢查各個客戶端套接字是否有活動 (數據到達或連接關閉)
        for (i = 0; i < max_clients; i++) {
            sd = client_sockets[i]; // 當前要檢查的客戶端套接字

            // 如果當前客戶端套接字在就緒的讀集合中
            if (FD_ISSET(sd, &readfds)) {
                // 嘗試讀取數據
                // read() 返回值: >0 表示讀取的字節數, 0 表示連接已關閉 (收到FIN), <0 表示錯誤
                if ((valread = read(sd, buffer, BUFFER_SIZE)) <= 0) {
                    // 連接關閉或發生錯誤
                    if (valread == 0) {
                        // 對端正常關閉連接 (FIN)
                        getpeername(sd, (struct sockaddr*)&address, &addrlen); // 獲取對端地址信息用于打印
                        char client_ip[INET_ADDRSTRLEN];
                        inet_ntop(AF_INET, &address.sin_addr, client_ip, INET_ADDRSTRLEN);
                        printf("主機 %s:%d (套接字 %d) 斷開連接\n", client_ip, ntohs(address.sin_port), sd);
                    } else {
                        // 讀取錯誤
                        perror("read 錯誤");
                        // 也可以在這里獲取對端信息打印
                    }

                    // 關閉套接字
                    close(sd);
                    // 從客戶端數組中移除 (標記為 0)
                    client_sockets[i] = 0;
                    // 從所有主題訂閱中移除該客戶端
                    remove_subscriber(sd);

                } else {
                    // 成功讀取到數據,處理客戶端發送的命令
                    buffer[valread] = '\0'; // 添加空字符結尾,確保字符串處理安全

                    // 簡單的命令解析 (生產環境需要更健壯的解析器)
                    char command[5];       // 存儲命令 (SUB/PUB)
                    char topic_name[50];   // 存儲主題名
                    char message[BUFFER_SIZE]; // 存儲消息內容 (PUB 命令)
                    memset(message, 0, sizeof(message)); // 清空消息緩沖區

                    // 使用 sscanf 解析輸入,格式為 "CMD TOPIC [MESSAGE]"
                    // %4s: 讀取最多4個字符作為命令
                    // %49s: 讀取最多49個字符作為主題名 (防止溢出)
                    // %[^\n]: 讀取從主題名后到行尾的所有字符作為消息 (包括空格)
                    int parsed_items = sscanf(buffer, "%4s %49s %[^\n]", command, topic_name, message);

                    printf("收到來自客戶端 %d 的數據: %s", sd, buffer); // buffer 可能自帶換行符

                    if (parsed_items >= 2 && strcmp(command, "SUB") == 0) {
                        // 處理 SUB 命令
                        int topic_idx = find_or_create_topic(topic_name);
                        if (topic_idx != -1) {
                            add_subscriber(topic_idx, sd);
                            // 可選: 向客戶端發送訂閱成功確認
                            // send(sd, "SUB OK\n", 7, 0);
                        } else {
                            // 可選: 發送錯誤(如主題數達到上限)
                            // send(sd, "ERR max topics\n", 15, 0);
                        }
                    } else if (parsed_items >= 3 && strcmp(command, "PUB") == 0) {
                         // 處理 PUB 命令
                         int topic_idx = find_or_create_topic(topic_name); // 查找主題,如果不存在,簡單實現可以忽略或報錯
                          if (topic_idx != -1) {
                             // 確保消息格式包含換行符,便于客戶端接收處理
                             // 注意: 這里假設 message 已包含從 sscanf 讀取的內容
                             // 這里對 message 的處理可以省略,因為 publish_message 內部會確保換行符
                             publish_message(topic_idx, message, sd);
                         } else {
                              printf("未找到用于發布的主題 '%s'.\n", topic_name);
                             // 可選: 發送錯誤(主題不存在)
                             // send(sd, "ERR no such topic\n", 18, 0);
                         }
                    } else {
                        // 無效命令格式
                        printf("來自客戶端 %d 的無效命令: %s", sd, buffer);
                        // 可選: 發送錯誤
                        // send(sd, "ERR invalid command\n", 20, 0);
                    }
                    // 清理緩沖區以便下次讀取 (可選,因為 read 會覆蓋)
                    memset(buffer, 0, BUFFER_SIZE + 1);
                }
            }
        }
    }

    // 理論上,服務器的無限循環不會到達這里,但在正常關閉流程中應關閉監聽套接字
    close(listener_fd);

    return 0;
}

編譯

gcc pubsub_server_revised.c -o pubsub_server_revised

運行

./pubsub_server_revised
(base) ?  back-end-notes git:(master) ? nc localhost 8080
^C
(base) ?  back-end-notes git:(master) ? nc localhost 8080
SUB TOPIC-1
MSG TOPIC-1 1,242.42,214\n
(base) ?  back-end-notes git:(master) ? nc localhost 8080
PUB TOPIC-1 1,242.42,214\n
piperliu@go-x86:~/code/socket$ ./pubsub_server_revised
監聽器已啟動在端口 8080 
等待連接中 ...
新連接建立,套接字描述符為 5,IP 為: 127.0.0.1,端口為: 38910
已將套接字 5 添加到列表位置 0
主機 127.0.0.1:38910 (套接字 5) 斷開連接
正在移除客戶端 5 的所有訂閱。
新連接建立,套接字描述符為 5,IP 為: 127.0.0.1,端口為: 38924
已將套接字 5 添加到列表位置 0
收到來自客戶端 5 的數據: SUB TOPIC-1
客戶端 5 訂閱了主題 'TOPIC-1'
新連接建立,套接字描述符為 6,IP 為: 127.0.0.1,端口為: 58630
已將套接字 6 添加到列表位置 1
收到來自客戶端 6 的數據: PUB TOPIC-1 1,242.42,214\n
向主題 'TOPIC-1' 發布消息: 1,242.42,214\n  -> 已發送至客戶端 5

訂閱與發布流程的內核視角梳理

結合內核操作,我們重新梳理客戶端訂閱和發布時服務器端的處理流程:

  1. 服務器初始化與監聽
  • socket():內核創建 socket 結構,分配資源,返回文件描述符 listener_fd。
  • bind():內核校驗地址、端口可用性及權限,將本地地址信息關聯到 listener_fd 的 socket 結構。
  • listen():內核將 listener_fd 對應的 socket 結構狀態置為 LISTEN,并初始化 SYN 隊列和 Accept 隊列。
  1. 客戶端連接建立
  • 客戶端發起 TCP 連接請求(發送 SYN 包)。
  • 服務器內核網絡協議棧收到 SYN,創建半連接條目放入 SYN 隊列,回復 SYN-ACK。
  • 客戶端回復 ACK,完成三次握手。內核將對應的連接從 SYN 隊列移至 Accept 隊列。
  • 服務器進程調用 select(),select 檢測到 listener_fd 可讀(因為 Accept 隊列非空)。
  • FD_ISSET(listener_fd, ...) 為真。
  • 服務器進程調用 accept()。內核從 Accept 隊列取出一個連接,創建新的 socket 結構和文件描述符 new_socket(狀態為 ESTABLISHED),返回給服務器進程。
  1. 客戶端訂閱 (SUB)
  • 客戶端通過 new_socket 發送 "SUB ..." 數據。
  • 數據到達服務器,內核將其放入 new_socket 的接收緩沖區。
  • 服務器進程調用 select(),select 檢測到 new_socket 可讀(接收緩沖區有數據)。
  • FD_ISSET(new_socket, ...) 為真。
  • 服務器進程調用 read(new_socket, ...),內核從接收緩沖區復制數據到用戶空間 buffer。
  • 服務器進程解析 buffer,識別訂閱請求,更新應用層數據結構(topics 數組)。
  1. 客戶端發布 (PUB)
  • 客戶端通過 new_socket 發送 "PUB ..." 數據。
  • 數據到達服務器,內核處理同上,select 報告 new_socket 可讀。
  • 服務器進程調用 read() 獲取數據并解析。
  • 服務器進程查找主題,遍歷訂閱者列表。
  • 對于每個訂閱者 sub_socket(且 sub_socket != new_socket):

服務器進程調用 send(sub_socket, ...)。內核將數據復制到 sub_socket 的發送緩沖區。

內核網絡協議棧負責將發送緩沖區的數據打包成 TCP 段并發送出去。

  1. 客戶端斷開連接 (有序關閉)
  • 客戶端調用 close() 或 shutdown(SHUT_WR),其內核發送 FIN 包。
  • 服務器內核收到 FIN,將 new_socket 標記為收到 FIN,并向客戶端回復 ACK。連接進入 CLOSE_WAIT 狀態。
  • 服務器進程調用 select(),select 檢測到 new_socket 可讀(因為收到 FIN 也是可讀事件)。
  • FD_ISSET(new_socket, ...) 為真。
  • 服務器進程調用 read(new_socket, ...),read 返回 0。
  • 服務器進程識別出連接關閉,調用 close(new_socket)。內核發送 FIN 包給客戶端(如果尚未發送),釋放套接字資源(當引用計數為0時),從 master_fds 移除 new_socket,清理應用層資源。
責任編輯:武曉燕 來源: Piper蛋窩
相關推薦

2022-08-15 09:02:22

Redis模式訂閱消息

2010-03-03 16:19:29

Python Sock

2014-05-04 13:47:39

銳捷網絡極簡網絡

2025-02-25 09:29:34

2019-02-17 10:05:24

TCPSocket網絡編程

2021-01-12 08:43:29

Redis ListStreams

2019-07-16 09:20:11

Redis數據庫NoSQL

2023-05-31 15:47:52

銳捷

2024-01-10 08:16:08

Redis集成JMS

2022-09-02 17:12:16

BlackboxLinux

2013-03-27 13:26:04

Android開發Socket

2017-09-07 08:33:07

華為CloudFabric云數據

2022-05-26 18:08:32

數據中心

2009-09-07 14:29:47

C# Socket編程C# Socket

2024-07-02 11:42:53

SpringRedis自定義

2018-09-10 18:50:05

云管理網絡華為云管理網絡

2015-05-05 14:36:05

高校網絡銳捷

2021-10-28 16:04:04

以太全光網銳捷網絡

2016-08-25 21:12:31

微服務架構發布

2022-12-02 07:28:58

Event訂閱模式Spring
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 亚洲在线视频 | 99免费在线观看视频 | 欧美日韩一二三区 | 午夜a级理论片915影院 | 日韩一区二区三区在线看 | 天天天天操 | 日韩精品久久一区二区三区 | 国产欧美视频一区 | 亚洲精品免费视频 | 国产一级黄色网 | 午夜资源| 亚洲人成人一区二区在线观看 | 日韩黄色小视频 | 精品久久久久久久 | 亚洲一区影院 | 国产精品国产三级国产aⅴ中文 | 青青草一区二区 | 伊人网在线综合 | 黄色综合 | 欧美精品a∨在线观看不卡 欧美日韩中文字幕在线播放 | aaa一区 | 久久精品亚洲欧美日韩久久 | 暴草美女 | 亚洲综合国产 | 国产精品污www一区二区三区 | av片在线观看网站 | 91亚洲国产 | 一区二区三区视频在线观看 | 亚洲h视频 | 国产精品一区二区精品 | 丁香久久| 日本三级网站在线 | 国产精品视频在线播放 | 国外成人免费视频 | 午夜午夜精品一区二区三区文 | 亚洲美女av网站 | 日产精品久久久一区二区福利 | 成人黄色在线视频 | 亚洲黄色在线免费观看 | 美女一级a毛片免费观看97 | 久久久999国产精品 中文字幕在线精品 |