Linux高性能編程-線程池
今天我們來學習Linux線程池,線程池是高并發場景必須具備的軟件組件,很多開源項目都會使用線程池,話不多說,直接開始。
1.線程池簡介
1.1 什么是線程池?
線程池就是提前創建好一批線程,通過一個池子來管理所有的線程。當有任務時,從池子中取出一個線程去執行該任務,執行結束后,線程并不會死亡,而是再次返回線程池中成為空閑狀態,等待執行下一個任務。
我們通過幾個圖來講解一下線程池的作用。
1)常規多線程
圖片
常規多線程方式,每個任務都會創建一個新的線程來處理,任務處理完后線程會銷毀。
優點:在任務量比較少的情況下,任務執行效率比較高。
缺點:1.隨著任務量增多,線程數量會越來越多,線程開銷(CPU,內存開銷)很大,如果不控制線程數量,系統會出現異常。2.缺乏任務管理機制。3.缺乏多線程管理機制。
2)簡單的線程池
圖片
簡單線程池通過任務隊列和線程池完成。
新任務產生后存儲在任務隊列,線程池中的空閑線程從任務隊列獲取任務并執行。
優點:1.線程數量可控,能夠保證系統安全。2.任務和線程統一管理,方便程序設計。
缺點:1.沒有任務拒絕機制,任務會堆積在任務隊列。2.線程池線程數量固定,無法動態調節線程數量,導致任務處理效率不高。
3)完善的線程池
圖片
完善的線程池需要具備一下幾個優點:
- 多線程動態管理。
- 任務實時響應。
- 高并發安全保護機制。
線程池被分為核心線程和動態線程,核心線程會一直運行保證基礎業務,當任務越來越多的時候,核心線程無法保證任務快速響應,此時需要通過創建動態線程提高線程響應速度。
當動態線程到達極限時,系統的處理能力到達瓶頸,此時需要啟動安全保護機制,通過任務拒絕,保證系統安全。
1.2 線程池優點
通過上述分析,我們了解到線程池有以下優點:
- 降低開銷:線程池通過重用已存在的線程,降低了線程的創建和銷毀所帶來的開銷,提高了性能。
- 控制并發數:線程池能夠有效控制同時執行的線程數量,防止因線程過多導致系統資源耗盡或性能下降,提高系統穩定性。
- 提高響應速度:由于線程池中的線程可以復用,減少了線程創建的時間,因此提高了任務的響應速度。
- 提供豐富功能:線程池不僅可以執行普通任務,還提供定時執行、定期執行、單線程執行以及并發數控制等高級功能,使得線程的管理和使用更加靈活和高效。
2.線程池設計
2.1 整體設計
設計一個完善的線程池,我們需要設計幾個核心模塊:狀態管理、線程管理,任務管理。
- 狀態管理:管理線程池的生命周期。
- 線程管理:管理核心線程和動態線程。
- 任務管理:管理任務申請,任務拒絕以及任務高效調度。
2.2 詳細設計
線程池定義(struct thread_pool):
typedef struct thread_pool {
struct list_head head; //任務隊列
int task_max; //任務隊列最大長度
int task_num; //任務隊列實際長度
int core_num; //核心線程數量
int dyn_max; //動態線程最大數量
atomic_int dyn_num; //動態線程實際數量
pthread_t *core_th; //核心線程pthread_t數組
pthread_t *dyn_th; //動態線程pthread_t數組
int status; //線程池狀態
pthread_mutex_t mutex; //互斥鎖
pthread_cond_t cond; //條件變量
} thread_pool_t;
線程池通過struct thread_pool結構體定義,每個成員的作用已在代碼中注釋。
創建線程池:
thread_pool_t* thread_pool_create(int core_num, int dyn_max) {
thread_pool_t *tp = (thread_pool_t *)malloc(sizeof(thread_pool_t));
if (!tp) return NULL;
pthread_mutex_init(&tp->mutex, NULL);
pthread_cond_init(&tp->cond, NULL);
list_head_init(&tp->head);
tp->status = INIT_STATUS;
tp->task_max = TASK_MAX;
tp->task_num = 0;
tp->dyn_max = dyn_max;
atomic_init(&tp->dyn_num, 0);
tp->core_num = core_num;
tp->core_th = malloc(core_num * sizeof(pthread_t));
for (int i = 0; i < tp->core_num; i++) { //創建核心線程
thread_arg *th_arg = (thread_arg *)malloc(sizeof(thread_arg));
*th_arg = (thread_arg) {.tp = tp, .type = CORE_THREAD, .task = NULL};
pthread_create(&tp->core_th[i], NULL, thread_proc, (void *)th_arg);
}
tp->status = RUNNING_STATUS; //設置線程池為running狀態
return tp;
}
線程池創建時會指定核心線程數量以及最大動態線程數量,核心線程跟著線程池一起創建。
動態線程根據實際任務量動態創建,為了防止創建過多的動態線程,需限制動態線程最大數量。
銷毀線程池:
void thread_pool_destroy_p(thread_pool_t **ptp) {
if (!ptp) return;
thread_pool_t *tp = *ptp;
while(atomic_load(&tp->dyn_num)) { //等待動態線程全部退出
usleep(10 * 1000);
}
for (int i = 0; i < tp->core_num; i++) { //等待核心線程全部退出
pthread_join(tp->core_th[i], NULL);
}
free(tp->core_th);
pthread_mutex_destroy(&tp->mutex);
pthread_cond_destroy(&tp->cond);
tp->status = STOP_STATUS; //設置線程池為stop狀態
free(tp);
ptp = NULL;
return;
}
線程池銷毀需要回收核心線程和動態線程,核心線程采用pthread_join方式回收。
動態線程設置分離屬性,線程池成員dyn_num(原子變量)用于記錄仍在工作的動態線程數量,dyn_num等于0時表示當前所有的動態線程都已經退出。
2.2.1 狀態管理
線程池整個生命周期可分為4個狀態:
#define INIT_STATUS (1<<0) //init狀態
#define RUNNING_STATUS (1<<1) //running狀態
#define SHUTDOWN_STATUS (1<<2) //shutdown狀態
#define STOP_STATUS (1<<3) //stop狀態
- init狀態:線程池創建時的狀態,init狀態線程不能處理任務。
- running狀態:線程池初始化完畢后設置為running狀態,此時線程池能夠提取任務并執行。
- shutdown狀態:線程池正常或者異常退出,設置為shutdown狀態,此時線程池能繼續處理工作隊列中剩余任務,但無法再接收新的任務。
- stop狀態:線程池所有線程全部釋放,線程池被銷毀。
2.2.2 線程管理
核心線程和動態線程處理函數:
void *thread_proc(void *arg) {
thread_arg *th_arg = (thread_arg *)arg;
thread_pool_t *tp = th_arg->tp;
pid_t tid = gettid();
//動態線程設置成分離模式,方便管理
if (th_arg->type == DYN_THREAD) pthread_detach(pthread_self());
int count = 0;
task_t *task = NULL;
while(1) {
if (th_arg->task) { //創建線程并執行首次任務
printf("tid:%d first process task\n", tid);
task = th_arg->task;
th_arg->task = NULL;
} else {
task = thread_get_task(tp); //從任務隊列中獲取任務并執行
if (!task) {
if (tp->status != RUNNING_STATUS) break; //線程池被關閉,線程退出
count++;
if ((count >= 10) && (th_arg->type == DYN_THREAD)) {
printf("dyn thread 10s break\n");
break; //動態線程空閑10秒自動退出
}
continue;
}
count = 0;
}
task->cb(task->arg);
task_freep(&task);
}
free(th_arg);
if (th_arg->type == DYN_THREAD) {
atomic_fetch_sub(&tp->dyn_num, 1);
}
return NULL;
}
1)核心線程
核心線程數量固定,在線程池創建時會創建所有的核心線程,線程池退出時會銷毀所有的核心線程。
2)動態線程
動態線程的管理比較復雜,需要根據任務數量做動態調整,任務量大時,動態線程會被創建,提高線程池任務響應速率,任務量小時,空閑的動態線程會被回收,從而減少線程開銷。
2.2.3 任務管理
任務定義(struct task):
typedef enum TASK_TYPE {
FREE_TYPE,
NOFREE_TYPE,
}TASK_TYPE;
typedef void (*func)(void *arg);
typedef struct task {
struct list_head list; //隊列節點
func cb; //回調函數
void *arg; //回調函數參數
TASK_TYPE type; //任務參數是否需要釋放
}task_t;
任務通過struct task定義,任務主要成員:
- list:隊列節點,用于插入和移除任務隊列。
- cb:回調函數,任務處理函數。
- arg:回調函數參數。
- type:回調函數參數是否需要釋放標志。
任務申請流程如下圖,創建一個新的任務后,需要做一些檢測才能將任務加入線程池,檢測不通過則執行任務拒絕,從而保證線程池始終處于安全高效運行狀態。
任務執行完畢后,需釋放任務,回收資源。
圖片
線程池一定要做任務管理,任務管理的目的有兩個:
- 保證線程池安全,不會堆積過多任務,消耗CPU和內存資源。
- 提高任務處理效率。
3.線程池測試
測試環境:樹莓派4B,4核,4GB。
分別采用多線程和線程池方式測試CPU密集型和IO密集型任務,對比兩種方式性能和效率的差異。每毫秒產生1個任務,總共測試10000個任務。
通過time命令執行測試程序,記錄測試程序執行情況。
測試代碼如下:
(完整代碼請聯系博主獲取)
#define CORE_THREAD_NUM (4) //核心線程數量
#define DYN_THREAD_MAX (32) //動態線程最大數量
#define TASK_MAX (128) //任務隊列任務最大數量
#define ENABLE_THREAD_POOL (0) //是否開啟線程池,0:關閉 1:開啟
#define TEST_TASK_NUM (10000) //測試任務數量
#define TASK_INTERVAL (1000) //任務產生間隔時間,單位:毫秒
#define TEST_TASK_TYPE (0) //任務類型,0:CPU密集型 1:IO密集型
#define NOP_TIMES (10000000) //CPU密集型任務執行空指令次數
#define RAND_RANGE (1 << 18) //IO密集型任務休眠時間隨機范圍,單位:毫秒
void cpu_stress() {
for (int i = 0; i < NOP_TIMES; i++) {
;
}
}
void rand_sleep() {
srand(time(0));
int ms = rand() & (RAND_RANGE - 1);
usleep(ms);
}
//任務處理函數
void task_cb(void *arg) {
#if !TEST_TASK_TYPE
cpu_stress();
#else
rand_sleep();
#endif
return;
}
atomic_int running_threads;
void *thread_proc1(void *arg) {
atomic_fetch_add(&running_threads, 1);
pthread_detach(pthread_self());
task_t *task = (task_t *)arg;
task->cb(task->arg);
free(task);
atomic_fetch_sub(&running_threads, 1);
return NULL;
}
int main(int argc, char *argv[]) {
#if ENABLE_THREAD_POOL
thread_pool_t *tp = thread_pool_create(CORE_THREAD_NUM, DYN_THREAD_MAX);
if (!tp) {
printf("thread_pool_create error");
return -1;
}
int seq = 0;
while(1) {
usleep(TASK_INTERVAL);
task_t *task = task_create(task_cb, NULL, FREE_TYPE);
if (!task) {
printf("task create error\n");
usleep(10 * 1000);
continue;
}
int ret = thread_add_task(tp, task);
if (ret == -1) { //任務拒絕
task_freep(&task);
usleep(10 * 1000);
continue;
}
if (seq++ >= TEST_TASK_NUM) {
thread_pool_exit(tp);
thread_pool_destroy_p(&tp);
break;
}
}
printf("thread pool test done------\n");
#else
atomic_init(&running_threads, 0);
int seq = 0;
int old_num = 0;
while(1) {
usleep(TASK_INTERVAL);
task_t *task = task_create(task_cb, NULL, FREE_TYPE);
if (!task) {
printf("task create error\n");
usleep(10 * 1000);
continue;
}
pthread_t th;
int ret = pthread_create(&th, NULL, thread_proc1, (void *)task);
if (ret != 0) {
free(task);
usleep(10 * 1000);
continue;
}
int num = atomic_load(&running_threads);
if (old_num < num) {
old_num = num;
printf("running_threads:%d\n", num);
}
if (seq++ >= TEST_TASK_NUM) break;
}
printf("threads test done------\n");
#endif
return 0;
}
1)CPU密集型場景測試
測試參數如下:
圖片
多線程測試結果-->:
圖片
CPU使用率397%(已使用完),最高同時創建913個線程,完成測試時間2分鐘,用戶時間7分51秒,系統時間0.5秒。
線程池測試結果-->:
圖片
CPU使用率398%(已使用完),核心線程4個,動態線程32個,共36各個線程,完成測試時間1分56秒,用戶時間7分41秒,系統時間0.04秒。
小節:CPU密集場景多線程和線程池方式處理效率相差不大,多線程方式最多同時創建900多個線程,會消耗大量系統資源。線程池方式線程始終控制在36個,比較安全。
2)IO密集型場景測試
測試參數如下:
圖片
多線程測試結果-->:
圖片
CPU使用率10.2%(已使用完),最高同時創建212個線程,完成測試時間11秒,用戶時間0.15秒,系統時間為1秒。
線程池測試結果-->:
圖片
圖片
CPU使用率3.6%(已使用完),核心線程4個,動態線程32個,共36各個線程,完成測試時間20秒,用戶時間0.25秒,系統時間為0.32秒。
小節:IO密集型場景線程處于IO阻塞狀態,CPU使用率并不高,此時可以適當增加線程數量來提高CPU利用率。
總結:
- 線程池能夠有效控制線程數量,防止線程過多對系統造成危害。
- 線程池能夠高效管理任務,使軟件開發更方便、高效。
- 從測試結果來看,多線程和線程池方式處理任務效率相差并不大,即使頻繁的創建和銷毀線程,也未對效率產生很大影響。