一文搞懂POSIX多線程:解鎖高性能編程的密碼
在計算機編程的廣闊領域中,POSIX 標準就像是一把通用的鑰匙,開啟了跨平臺編程的大門。POSIX,即 Portable Operating System Interface(可移植操作系統(tǒng)接口) ,是 IEEE 為了規(guī)范各種 UNIX 操作系統(tǒng)提供的 API 接口而定義的一系列互相關聯(lián)標準的總稱。它的出現(xiàn),旨在解決不同 UNIX 系統(tǒng)之間接口不一致的問題,讓開發(fā)者能夠編寫一次代碼,在多個符合 POSIX 標準的系統(tǒng)上運行,實現(xiàn)源代碼級別的軟件可移植性。
對于多線程編程而言,POSIX 標準同樣意義非凡。在多核處理器盛行的今天,多線程編程成為充分利用硬件資源、提高程序性能的關鍵技術。POSIX 標準定義了一套清晰、規(guī)范的多線程編程接口,讓開發(fā)者可以在不同的操作系統(tǒng)環(huán)境中,以統(tǒng)一的方式創(chuàng)建、管理線程,以及處理線程之間的同步和通信問題 。無論是開發(fā)高性能的服務器程序,還是優(yōu)化計算密集型的應用,POSIX 標準下的多線程編程都能提供強大的支持。
接下來,讓我們深入探索 POSIX 標準下的多線程編程世界,揭開線程創(chuàng)建、同步機制等核心概念的神秘面紗。
一、多線程編程簡介
1.1線程初印象
線程,作為進程內(nèi)的執(zhí)行單元,可以理解為進程這個大舞臺上的一個個小舞者,各自有著獨立的舞步(執(zhí)行路徑),卻又共享著舞臺的資源(進程資源)。與進程相比,線程更加輕量級。進程是系統(tǒng)進行資源分配和調(diào)度的基本單位,擁有獨立的地址空間、內(nèi)存、文件描述符等資源 ,進程間的切換開銷較大。而線程則是共享所屬進程的資源,它們之間的切換開銷相對較小,就像在同一個舞臺上不同舞者之間的快速換位,無需重新搭建整個舞臺。
線程的這些特點,使得多線程編程在提升程序執(zhí)行效率上有著獨特的優(yōu)勢。多個線程可以并發(fā)執(zhí)行,充分利用多核處理器的并行計算能力,將復雜的任務分解為多個子任務,每個子任務由一個線程負責處理,從而大大提高了程序的整體運行速度。例如,在一個網(wǎng)絡服務器程序中,一個線程可以負責監(jiān)聽客戶端的連接請求,另一個線程負責處理已經(jīng)建立連接的客戶端的數(shù)據(jù)傳輸,這樣可以同時處理多個客戶端的請求,提升服務器的響應性能 。
1.2POSIX 線程庫
在 POSIX 標準下,進行多線程編程離不開 POSIX 線程庫(pthread 庫)。它就像是一根神奇的魔法棒,為開發(fā)者提供了一系列強大的接口函數(shù),讓我們能夠輕松地操控線程。
其中,pthread_create函數(shù)用于創(chuàng)建一個新的線程 ,它的原型如下:
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);
thread參數(shù)用于返回新創(chuàng)建線程的 ID;attr參數(shù)用于設置線程的屬性,如果為NULL則使用默認屬性;start_routine是一個函數(shù)指針,指向線程開始執(zhí)行時調(diào)用的函數(shù);arg是傳遞給start_routine函數(shù)的參數(shù)。
而pthread_join函數(shù)則用于等待一個線程結束,其原型為:
int pthread_join(pthread_t thread, void **retval);
thread參數(shù)是要等待結束的線程 ID,retval用于獲取線程結束時的返回值。
下面是一個簡單的使用pthread_create和pthread_join函數(shù)的代碼示例:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
// 線程執(zhí)行的函數(shù)
void* thread_function(void* arg) {
printf("線程開始執(zhí)行,參數(shù)為: %s\n", (char*)arg);
sleep(2); // 模擬線程執(zhí)行任務
printf("線程執(zhí)行結束\n");
return (void*)1; // 返回線程執(zhí)行結果
}
int main() {
pthread_t thread;
int res;
void* thread_result;
// 創(chuàng)建線程
res = pthread_create(&thread, NULL, thread_function, (void*)"Hello, Thread!");
if (res != 0) {
perror("線程創(chuàng)建失敗");
return 1;
}
printf("等待線程結束...\n");
// 等待線程結束,并獲取線程返回值
res = pthread_join(thread, &thread_result);
if (res != 0) {
perror("線程等待失敗");
return 1;
}
printf("線程已結束,返回值為: %ld\n", (long)thread_result);
return 0;
}
在這個示例中,我們創(chuàng)建了一個新線程,線程執(zhí)行thread_function函數(shù),在函數(shù)中打印傳入的參數(shù),然后休眠 2 秒模擬執(zhí)行任務,最后返回一個值。主線程通過pthread_join等待子線程結束,并獲取其返回值。
1.3線程的生命周期
線程如同一個有生命的個體,有著自己完整的生命周期,從創(chuàng)建的那一刻開始,經(jīng)歷運行、阻塞、喚醒等階段,最終走向結束。
當我們調(diào)用pthread_create函數(shù)時,線程就誕生了,此時它處于就緒狀態(tài),等待著 CPU 的調(diào)度。一旦獲得 CPU 時間片,線程就進入運行狀態(tài),開始執(zhí)行它的任務,也就是調(diào)用我們指定的函數(shù) 。
在運行過程中,線程可能會因為某些原因進入阻塞狀態(tài)。比如,當線程調(diào)用sleep函數(shù)時,它會主動放棄 CPU 使用權,進入睡眠狀態(tài),直到睡眠時間結束才會重新回到就緒狀態(tài),等待再次被調(diào)度執(zhí)行 。又或者,當線程訪問共享資源時,如果資源被其他線程占用,它就需要等待,從而進入阻塞狀態(tài),直到獲取到資源才會被喚醒,重新進入運行狀態(tài)。
當線程執(zhí)行完它的任務,也就是指定的函數(shù)返回時,線程就進入了結束狀態(tài)。此時,我們可以通過pthread_join函數(shù)等待線程結束,并獲取它的返回值 ,也可以在創(chuàng)建線程時將其設置為分離狀態(tài),這樣線程結束后資源會自動被回收,無需等待。了解線程的生命周期,有助于我們更好地管理線程,優(yōu)化程序的性能 。
二、Posix網(wǎng)絡API
2.1客戶端和服務端代碼示例
(1)服務端server.cpp
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
int main(int argc,char *argv[])
{
if (argc != 2)
{
printf("Using:./server port\nExample:./server 5005\n\n"); return -1;
}
// 第1步:創(chuàng)建服務端的socket。
int listenfd;
if ( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket");
return -1;
}
// 第2步:把服務端用于通信的地址和端口綁定到socket上。
struct sockaddr_in servaddr; // 服務端地址信息的數(shù)據(jù)結構。
memset(&servaddr,0,sizeof(servaddr));
servaddr.sin_family = AF_INET; // 協(xié)議族,在socket編程中只能是AF_INET。
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 任意ip地址。
//servaddr.sin_addr.s_addr = inet_addr("192.168.190.134"); // 指定ip地址。
servaddr.sin_port = htons(atoi(argv[1])); // 指定通信端口。
if (bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr)) != 0 )
{
perror("bind");
close(listenfd);
return -1;
}
// 第3步:把socket設置為監(jiān)聽模式。
if (listen(listenfd,5) != 0 )
{
perror("listen");
close(listenfd);
return -1;
}
// 第4步:接受客戶端的連接。
int clientfd; // 連上來的客戶端socket。
int socklen = sizeof(struct sockaddr_in); // struct sockaddr_in的大小
struct sockaddr_in clientaddr; // 客戶端的地址信息。
clientfd = accept(listenfd, (struct sockaddr *)&clientaddr, (socklen_t*)&socklen);
printf("client (%s) connect server success。。。\n", inet_ntoa(clientaddr.sin_addr));
// 第5步:與客戶端通信,接收客戶端發(fā)過來的報文后,將該報文原封不動返回給客戶端。
char buffer[1024];
// memset(buffer, 0, 1024);
while (1)
{
int ret;
memset(buffer, 0, sizeof(buffer));
// 接收客戶端的請求報文。
if ( (ret = recv(clientfd, buffer, sizeof(buffer), 0)) <= 0)
{
printf("ret = %d , client disconected!!!\n", ret);
break;
}
printf("recv msg: %s\n", buffer);
// 向客戶端發(fā)送響應結果。
if ( (ret = send(clientfd, buffer, strlen(buffer), 0)) <= 0)
{
perror("send");
break;
}
printf("response client: %s success...\n", buffer);
}
// 第6步:關閉socket,釋放資源。
close(listenfd);
close(clientfd);
return 0;
}
(2)客戶端client.cpp
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
int main(int argc,char *argv[])
{
if (argc != 3)
{
printf("Using:./client ip port\nExample:./client 127.0.0.1 5005\n\n"); return -1;
}
// 第1步:創(chuàng)建客戶端的socket。
int sockfd;
if ( (sockfd = socket(AF_INET,SOCK_STREAM,0))==-1)
{
perror("socket");
return -1;
}
// 第2步:向服務器發(fā)起連接請求。
struct hostent* h;
if ( (h = gethostbyname(argv[1])) == 0 ) // 指定服務端的ip地址。
{ printf("gethostbyname failed.\n"); close(sockfd); return -1; }
struct sockaddr_in servaddr;
memset(&servaddr,0,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(atoi(argv[2])); // 指定服務端的通信端口。
memcpy(&servaddr.sin_addr,h->h_addr,h->h_length);
// 向服務端發(fā)起連接清求。
if (connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) != 0)
{
perror("connect");
close(sockfd);
return -1;
}
char buffer[1024];
// 第3步:與服務端通信,發(fā)送一個報文后等待回復,然后再發(fā)下一個報文。
for (int i = 0; i < 3; i++)
{
int ret;
memset(buffer, 0, sizeof(buffer));
sprintf(buffer, "這是第[%d]條消息!", i+1);
if ( (ret = send(sockfd, buffer, strlen(buffer),0)) <= 0) // 向服務端發(fā)送請求報文。
{
perror("send");
break;
}
printf("發(fā)送:%s\n", buffer);
memset(buffer,0,sizeof(buffer));
if ( (ret = recv(sockfd, buffer, sizeof(buffer), 0)) <= 0) // 接收服務端的回應報文。
{
printf("ret = %d error\n", ret);
break;
}
printf("從服務端接收:%s\n", buffer);
sleep(1);
}
// 第4步:關閉socket,釋放資源。
close(sockfd);
}
運行結果:
圖片
著重分析以下幾個函數(shù)
(1)socket函數(shù)
int socket(int domain, int type, int protocol);
調(diào)用socket()函數(shù)會創(chuàng)建一個套接字(socket)對象。套接字由兩部分組成,文件描述符(fd)和 TCP控制塊(Tcp Control Block,tcb) 。Tcb主要包括關系信息有網(wǎng)絡的五元組(remote IP,remote Port, local IP, local Port, protocol),一個五元組就可以確定一個具體的網(wǎng)絡連接。
(2)listen函數(shù)
listen(int listenfd, backlog);
服務端在調(diào)用listen()后,就開始監(jiān)聽網(wǎng)絡上連接請求。第二個參數(shù) backlog, 在Linux是指全連接隊列的長度,即一次最多能保存 backlog 個連接請求。
圖片
(3)connect 函數(shù)
客戶端調(diào)用connect()函數(shù),向指定服務端發(fā)起連接請求。
(4)accept 函數(shù)
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
accept()函數(shù)只做兩件事,將連接請求從全連接隊列中取出,給該連接分配一個fd并返回。
(5) 三次握手過程分析
三次握手與listen/connect/accept三個函數(shù)有關,這里放到一起進行描述。
客戶端調(diào)用 connect 函數(shù),開始進入三次握手。客戶端發(fā)送syn包,以及帶著隨機的seq;
服務端listen函數(shù)監(jiān)聽到有客戶端連接,listen函數(shù)會在內(nèi)核協(xié)議棧為該客戶端創(chuàng)建一個Tcb控制塊,并將其加入到半連接隊列。服務端在收到syn包后,會給客戶端恢復ack和syn包;
客戶端收到服務端的ack和syn后再次恢復ack,連接建立成功。
服務端在收到客戶端的ack后,會將該客戶端對應的Tcb數(shù)據(jù)從半連接隊列移動到全連接隊列。只要全連接隊列中有數(shù)據(jù)就會觸發(fā)accept,返回連接成功的客戶端fd、IP以及端口。此時,Tcb完整的五元組構建成功。
(6)send/recv 函數(shù)
至此,客戶端與服務端已經(jīng)成功建立連接,就可以相互通信了。
send/recv函數(shù)主要負責數(shù)據(jù)的收發(fā)。
過程分析
send函數(shù):負責將數(shù)據(jù)從用戶空間拷貝到內(nèi)核(具體是拷貝到該連接對應的Tcb控制塊中的發(fā)送緩沖區(qū))。注意:send函數(shù)返回并不意味著數(shù)據(jù)已成功發(fā)送,因為數(shù)據(jù)在到達內(nèi)核緩沖區(qū)后,內(nèi)核會根據(jù)自己的策略決定什么時候?qū)?shù)據(jù)發(fā)出。
recv函數(shù):負責將數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶空間。同理,數(shù)據(jù)也顯示到達該連接對應的Tcb控制塊的接受緩沖區(qū)。
(7)close 函數(shù)
在服務器與客戶端建立連接之后,會進行一些讀寫操作,完成讀寫操作后我們需要關閉相應的socket,好比操作完打開的文件要調(diào)用fclose關閉打開的文件一樣。close過程涉及到四次揮手的全過程
四次揮手流程:
- 客戶端調(diào)用close函數(shù),內(nèi)核會發(fā)送fin包,客戶端進入fin_wait1狀態(tài);
- 服務端收到fin包回復ack,客戶端進入close_wait狀態(tài)。此時,客戶客戶端往服務端發(fā)送的通道就關閉了,因為Tcp是全雙工的,服務端還可以向客戶端發(fā)數(shù)據(jù)。
- 客戶端收到ack,進入到fin_wait2狀態(tài);
- 服務端發(fā)送完數(shù)據(jù),發(fā)送fin包,服務端進入last_ack狀態(tài);
- 客戶端收到fin包后,回復ack,進入到time_wait狀態(tài);
- 服務端收到ack,雙方連接正常關閉。
注意:close操作只是讓相應socket描述字的引用計數(shù)-1,只有當引用計數(shù)為0的時候,才會觸發(fā)TCP客戶端向服務器發(fā)送終止連接請求
2.2雙方同時調(diào)用close
圖片
2.3常見面試問題
為什么要三次握手?
答:因為一個完整的TCP連接需要雙方都得到確認,客戶端發(fā)送請求和收到確認需要兩次;服務端發(fā)送請求和收到確認需要兩次,當中服務回復確認和發(fā)送請求合并為一次總共需要3次;才能保證雙向通道是通的。
一個服務器的端口數(shù)是65535,為何能做到一百萬的連接?
答:主要是因為一條連接是由五元組所組成,所以一個服務器的連接數(shù)是五個成員數(shù)的乘積。
如何應對Dos(Deny of Service,拒絕服務)攻擊?
答:Dos攻擊就是利用三次握手的原理,模擬客戶端只向服務器發(fā)送syn包,然后耗盡被攻擊對象的資源。比較多的做法是利用防火墻,做一些過濾規(guī)則
如何解決Tcp的粘包問題?
答:(1) 在包頭上添加一個數(shù)據(jù)包長度的字段,用于數(shù)據(jù)的劃分,實際項目中這個也用的最多;(2)包尾部加固定分隔符;
Tcp如何保證順序到達?
答:順序到達是由于TCP的延遲ACK的機制來保證的,TCP接收到數(shù)據(jù)并不是立即回復而是經(jīng)過一個延遲時間,回復接收到連續(xù)包的最大序列號加1。如果丟包之后的包都需要重傳。在弱網(wǎng)情況下這里就會有實時性問題和帶寬占用的問題;
time_wait 作用?
答:防止最后一個ACK沒有順利到達對方,超時重新發(fā)送ack。time_wait時常一般是120s可以修改。
服務器掉線重啟出現(xiàn)端口被占用怎么辦?
答:其實主要是由于還處于time_wait狀態(tài),端口并沒有真正釋放。這時候可以設置SO_REUSEADDR屬性,保證掉線能馬上重連。
三、同步機制:多線程協(xié)作的 “指揮家”
在多線程編程的舞臺上,同步機制就像是一位經(jīng)驗豐富的指揮家,協(xié)調(diào)著各個線程的行動,確保它們能夠和諧共處,高效地完成任務。多線程編程中,由于多個線程共享進程資源,資源競爭和線程協(xié)作問題不可避免,而同步機制正是解決這些問題的關鍵。接下來,我們將深入探討互斥鎖、信號量和條件變量這幾種常見的同步機制 。
3.1資源競爭:多線程中的 “暗礁”
當多個線程同時訪問和修改共享資源時,資源競爭問題就如同隱藏在暗處的暗礁,隨時可能讓程序的運行陷入混亂。假設我們有一個簡單的程序,包含兩個線程,它們都試圖對一個全局變量進行加 1 操作:
#include <stdio.h>
#include <pthread.h>
// 全局變量
int global_variable = 0;
// 線程執(zhí)行函數(shù)
void* thread_function(void* arg) {
for (int i = 0; i < 1000000; i++) {
global_variable++;
}
return NULL;
}
int main() {
pthread_t thread1, thread2;
// 創(chuàng)建線程
pthread_create(&thread1, NULL, thread_function, NULL);
pthread_create(&thread2, NULL, thread_function, NULL);
// 等待線程結束
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
printf("最終的全局變量值: %d\n", global_variable);
return 0;
}
按照我們的預期,兩個線程各對全局變量加 1000000 次,最終的結果應該是 2000000。然而,實際運行這個程序,你會發(fā)現(xiàn)結果往往小于 2000000。這是因為在多線程環(huán)境下,global_variable++ 這一操作并非原子操作,它實際上包含了讀取變量值、加 1、寫回變量值這三個步驟 。當兩個線程同時執(zhí)行這一操作時,可能會出現(xiàn)一個線程讀取了變量值,還未完成加 1 和寫回操作,另一個線程也讀取了相同的值,導致最終結果出現(xiàn)偏差,數(shù)據(jù)不一致 。
3.2互斥鎖:守護資源的 “衛(wèi)士”
互斥鎖(Mutex)是解決資源競爭問題的常用工具,它就像一位忠誠的衛(wèi)士,守護著共享資源,確保同一時間只有一個線程能夠訪問資源。互斥鎖的工作原理基于一個簡單的概念:當一個線程獲取到互斥鎖時,其他線程就必須等待,直到該線程釋放互斥鎖。
在 POSIX 線程庫中,使用互斥鎖非常簡單。首先,我們需要定義一個互斥鎖變量:
pthread_mutex_t mutex;
然后,在訪問共享資源之前,通過 pthread_mutex_lock 函數(shù)獲取互斥鎖:
pthread_mutex_lock(&mutex);
如果互斥鎖已經(jīng)被其他線程持有,調(diào)用 pthread_mutex_lock 的線程將被阻塞,直到互斥鎖被釋放。當訪問完共享資源后,使用 pthread_mutex_unlock 函數(shù)釋放互斥鎖:
pthread_mutex_unlock(&mutex);
下面是使用互斥鎖改進后的代碼:
#include <stdio.h>
#include <pthread.h>
// 全局變量
int global_variable = 0;
// 互斥鎖
pthread_mutex_t mutex;
// 線程執(zhí)行函數(shù)
void* thread_function(void* arg) {
for (int i = 0; i < 1000000; i++) {
// 獲取互斥鎖
pthread_mutex_lock(&mutex);
global_variable++;
// 釋放互斥鎖
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main() {
pthread_t thread1, thread2;
// 初始化互斥鎖
pthread_mutex_init(&mutex, NULL);
// 創(chuàng)建線程
pthread_create(&thread1, NULL, thread_function, NULL);
pthread_create(&thread2, NULL, thread_function, NULL);
// 等待線程結束
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
// 銷毀互斥鎖
pthread_mutex_destroy(&mutex);
printf("最終的全局變量值: %d\n", global_variable);
return 0;
}
通過這種方式,互斥鎖有效地保護了共享資源,確保了數(shù)據(jù)的一致性 。
3.3信號量:資源分配的 “調(diào)度員”
信號量(Semaphore)是另一種強大的同步工具,它不僅可以用于實現(xiàn)互斥,還能用于管理資源的分配。信號量可以看作是一個計數(shù)器,它的值表示可用資源的數(shù)量 。當一個線程想要訪問資源時,它需要先獲取信號量,如果信號量的值大于 0,則表示有可用資源,線程可以獲取信號量并繼續(xù)執(zhí)行,同時信號量的值減 1;如果信號量的值為 0,則表示沒有可用資源,線程將被阻塞,直到有其他線程釋放信號量 。
在 POSIX 標準中,信號量相關的函數(shù)主要有 sem_init(初始化信號量)、sem_wait(等待信號量)、sem_post(釋放信號量)和 sem_destroy(銷毀信號量)。假設我們有一個場景,有多個線程需要訪問有限數(shù)量的資源,比如數(shù)據(jù)庫連接池中的連接。我們可以使用信號量來控制對這些資源的訪問:
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
// 定義信號量,假設有5個可用資源
sem_t semaphore;
// 線程執(zhí)行函數(shù)
void* thread_function(void* arg) {
// 等待信號量
sem_wait(&semaphore);
printf("線程獲取到資源,開始執(zhí)行任務...\n");
// 模擬任務執(zhí)行
sleep(1);
printf("線程任務執(zhí)行完畢,釋放資源\n");
// 釋放信號量
sem_post(&semaphore);
return NULL;
}
int main() {
pthread_t threads[10];
// 初始化信號量,設置初始值為5
sem_init(&semaphore, 0, 5);
// 創(chuàng)建10個線程
for (int i = 0; i < 10; i++) {
pthread_create(&threads[i], NULL, thread_function, NULL);
}
// 等待所有線程結束
for (int i = 0; i < 10; i++) {
pthread_join(threads[i], NULL);
}
// 銷毀信號量
sem_destroy(&semaphore);
return 0;
}
在這個例子中,我們初始化信號量的值為 5,表示有 5 個可用資源。每個線程在執(zhí)行任務前先通過 sem_wait 等待信號量,獲取到信號量后才能訪問資源,執(zhí)行完任務后通過 sem_post 釋放信號量,這樣就保證了同時最多只有 5 個線程可以訪問資源 。
3.4條件變量:線程間的 “傳聲筒”
條件變量(Condition Variable)用于線程間基于條件的通信,它為線程提供了一種等待特定條件發(fā)生的機制,就像一個傳聲筒,讓線程之間能夠相互傳達信息。條件變量通常與互斥鎖配合使用,以實現(xiàn)線程之間的同步和協(xié)作。
一個經(jīng)典的例子是生產(chǎn)者 - 消費者模型。在這個模型中,生產(chǎn)者線程負責生成數(shù)據(jù)并將其放入緩沖區(qū),消費者線程則從緩沖區(qū)中取出數(shù)據(jù)進行處理。當緩沖區(qū)為空時,消費者線程需要等待,直到生產(chǎn)者線程向緩沖區(qū)中放入數(shù)據(jù);當緩沖區(qū)滿時,生產(chǎn)者線程需要等待,直到消費者線程從緩沖區(qū)中取出數(shù)據(jù) 。
下面是使用條件變量和互斥鎖實現(xiàn)生產(chǎn)者 - 消費者模型的代碼示例:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#define BUFFER_SIZE 5
int buffer[BUFFER_SIZE];
int in = 0, out = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
// 生產(chǎn)者線程函數(shù)
void* producer(void* arg) {
while (1) {
int item = rand() % 100; // 生成一個隨機數(shù)
pthread_mutex_lock(&mutex);
while ((in + 1) % BUFFER_SIZE == out) { // 緩沖區(qū)滿
pthread_cond_wait(?_full, &mutex);
}
buffer[in] = item;
printf("生產(chǎn)者放入數(shù)據(jù): %d\n", item);
in = (in + 1) % BUFFER_SIZE;
pthread_cond_signal(?_empty);
pthread_mutex_unlock(&mutex);
sleep(rand() % 2); // 模擬生產(chǎn)時間
}
return NULL;
}
// 消費者線程函數(shù)
void* consumer(void* arg) {
while (1) {
pthread_mutex_lock(&mutex);
while (in == out) { // 緩沖區(qū)空
pthread_cond_wait(?_empty, &mutex);
}
int item = buffer[out];
printf("消費者取出數(shù)據(jù): %d\n", item);
out = (out + 1) % BUFFER_SIZE;
pthread_cond_signal(?_full);
pthread_mutex_unlock(&mutex);
sleep(rand() % 3); // 模擬消費時間
}
return NULL;
}
int main() {
pthread_t producer_thread, consumer_thread;
// 創(chuàng)建生產(chǎn)者和消費者線程
pthread_create(&producer_thread, NULL, producer, NULL);
pthread_create(&consumer_thread, NULL, consumer, NULL);
// 等待線程結束
pthread_join(producer_thread, NULL);
pthread_join(consumer_thread, NULL);
// 銷毀互斥鎖和條件變量
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(?_empty);
pthread_cond_destroy(?_full);
return 0;
}
在這個代碼中,pthread_cond_wait 函數(shù)會使線程進入等待狀態(tài),并自動釋放互斥鎖,當條件滿足被喚醒時,會重新獲取互斥鎖。pthread_cond_signal 函數(shù)則用于喚醒等待在條件變量上的一個線程。通過條件變量和互斥鎖的緊密配合,生產(chǎn)者和消費者線程能夠有條不紊地工作,實現(xiàn)高效的數(shù)據(jù)處理 。
四、多線程編程實戰(zhàn)演練
4.1多線程案例分析
在日常的編程工作中,文件處理是一項常見的任務。當面對大量文件需要處理時,單線程的處理方式往往效率低下,而多線程編程則能成為提升效率的利器。假設我們有一個需求:處理一批日志文件,需要統(tǒng)計每個文件中特定關鍵詞出現(xiàn)的次數(shù),并將結果匯總。
為了實現(xiàn)這個目標,我們可以設計一個多線程的文件處理方案。首先,將文件列表進行分割,把不同的文件分配給不同的線程處理,這就像是將一堆任務分配給不同的工人,每個工人專注于自己手頭的任務 。每個線程負責讀取分配給自己的文件內(nèi)容,逐行掃描,統(tǒng)計關鍵詞出現(xiàn)的次數(shù)。
這個過程中,線程之間的同步機制至關重要。我們可以使用互斥鎖來保護共享的統(tǒng)計結果變量,確保不同線程在更新統(tǒng)計結果時不會出現(xiàn)數(shù)據(jù)競爭問題 。比如,當一個線程統(tǒng)計完自己負責文件后,需要將統(tǒng)計結果累加到全局的統(tǒng)計變量中,此時通過獲取互斥鎖,保證同一時間只有一個線程能夠進行累加操作,避免了數(shù)據(jù)不一致的情況 。
4.2代碼實現(xiàn)示例
下面是使用 POSIX 線程庫實現(xiàn)多線程文件處理的具體代碼:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#define MAX_FILES 10
#define KEYWORD "error" // 要統(tǒng)計的關鍵詞
// 線程參數(shù)結構體
typedef struct {
char *file_name;
} ThreadArgs;
// 全局統(tǒng)計變量
int global_count = 0;
// 互斥鎖
pthread_mutex_t mutex;
// 線程執(zhí)行函數(shù)
void* count_keyword(void* arg) {
ThreadArgs *args = (ThreadArgs*)arg;
FILE *file = fopen(args->file_name, "r");
if (file == NULL) {
perror("文件打開失敗");
pthread_exit(NULL);
}
char line[1024];
int local_count = 0;
while (fgets(line, sizeof(line), file) != NULL) {
if (strstr(line, KEYWORD) != NULL) {
local_count++;
}
}
fclose(file);
// 獲取互斥鎖,更新全局統(tǒng)計變量
pthread_mutex_lock(&mutex);
global_count += local_count;
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}
int main() {
pthread_t threads[MAX_FILES];
ThreadArgs args[MAX_FILES];
char file_names[MAX_FILES][50] = {"file1.log", "file2.log", "file3.log", "file4.log", "file5.log", "file6.log", "file7.log", "file8.log", "file9.log", "file10.log"};
// 初始化互斥鎖
pthread_mutex_init(&mutex, NULL);
// 創(chuàng)建線程并分配文件
for (int i = 0; i < MAX_FILES; i++) {
args[i].file_name = file_names[i];
if (pthread_create(&threads[i], NULL, count_keyword, &args[i]) != 0) {
perror("線程創(chuàng)建失敗");
return 1;
}
}
// 等待所有線程結束
for (int i = 0; i < MAX_FILES; i++) {
if (pthread_join(threads[i], NULL) != 0) {
perror("線程等待失敗");
return 1;
}
}
// 銷毀互斥鎖
pthread_mutex_destroy(&mutex);
printf("關鍵詞 '%s' 出現(xiàn)的總次數(shù): %d\n", KEYWORD, global_count);
return 0;
}
在這段代碼中,count_keyword 函數(shù)是線程執(zhí)行的主體,它打開分配的文件,逐行讀取并統(tǒng)計關鍵詞出現(xiàn)的次數(shù),最后通過互斥鎖將本地統(tǒng)計結果累加到全局變量中 。main 函數(shù)負責創(chuàng)建線程,為每個線程分配文件,并等待所有線程執(zhí)行完畢后輸出最終的統(tǒng)計結果 。
4.3多線程調(diào)試與優(yōu)化
在多線程程序的調(diào)試過程中,我們可能會遇到各種各樣的問題。死鎖是一個常見的問題,比如兩個線程分別持有不同的鎖,卻又試圖獲取對方持有的鎖,就會陷入死鎖狀態(tài),導致程序無法繼續(xù)執(zhí)行 。為了檢測死鎖,可以使用工具如Valgrind的Helgrind工具,它能夠幫助我們發(fā)現(xiàn)潛在的死鎖問題。一旦發(fā)現(xiàn)死鎖,我們需要仔細檢查代碼中鎖的獲取和釋放順序,避免嵌套鎖的不合理使用 。
線程異常也是需要關注的問題。當線程執(zhí)行過程中出現(xiàn)未捕獲的異常時,可能會導致整個程序崩潰。我們可以在線程函數(shù)中使用try - catch塊(如果是 C++ 代碼)或者進行適當?shù)腻e誤處理,確保線程在遇到異常時能夠安全地退出,而不影響其他線程的正常運行 。
在優(yōu)化方面,合理調(diào)整線程數(shù)量是一個重要的思路。線程數(shù)量并非越多越好,過多的線程會導致上下文切換開銷增大,反而降低程序性能 。對于 CPU 密集型的任務,線程數(shù)量可以設置為接近 CPU 核心數(shù);對于 I/O 密集型的任務,由于線程在等待 I/O 操作時會阻塞,不會占用 CPU 資源,因此可以適當增加線程數(shù)量 。此外,優(yōu)化同步機制也能提升性能,比如使用更細粒度的鎖,減少鎖的競爭范圍,或者在合適的場景下使用無鎖數(shù)據(jù)結構,避免鎖帶來的開銷 。通過不斷地調(diào)試和優(yōu)化,我們能夠讓多線程程序更加穩(wěn)健高效地運行 。