linux驅動程序下的tasklet機制
Tasklet 機制 是一種較為特殊的軟中斷。Tasklet一詞的原意是“小片任務”的意思,這里是指一小段可執行的代碼,且通常以函數的形式出現。軟中斷向量HI_SOFTIRQ和TASKLET_SOFTIRQ均是用tasklet機制來實現的。
從某種程度上講,tasklet機制是Linux內核對BH機制的一種擴展。在2.4內核引入了softirq機制后,原有的BH機制正是通過tasklet機制這個橋梁來納入softirq機制的整體框架中的。正是由于這種歷史的延伸關系,使得tasklet機制與一般意義上的軟中斷有所不同,而呈現出以下兩個顯著的特點:
1. 與一般的軟中斷不同,某一段tasklet代碼在某個時刻只能在一個CPU上運行,而不像一般的軟中斷服務函數(即softirq_action結構中的action函數指針)那樣——在同一時刻可以被多個CPU并發地執行。
2. 與BH機制不同,不同的tasklet代碼在同一時刻可以在多個CPU上并發地執行,而不像BH(Bottom Half)機制那樣必須嚴格地串行化執行(也即在同一時刻系統中只能有一個CPU執行BH函數)。
Linux用數據結構 tasklet_struct 來描述一個tasklet。該數據結構定義在include/linux/interrupt.h頭文件中。如下所示:
- view sourceprint?1 struct tasklet_struct
- {
- struct tasklet_struct *next;
- unsigned long state;
- atomic_t count;
- void (*func)(unsigned long);
- unsigned long data;
- };
各成員的含義如下:
(1)next指針:指向下一個tasklet的指針。
(2)state:定義了這個tasklet的當前狀態。這一個32位的無符號長整數,當前只使用了bit[1]和bit[0]兩個狀態位。其中,bit[1]=1表示這個tasklet當前正在某個CPU上被執行,它僅對SMP系統才有意義,其作用就是為了防止多個CPU同時執行一個tasklet的情形出現;bit[0]=1表示這個tasklet已經被調度去等待執行了。對這兩個狀態位的宏定義如下所示(interrupt.h):
- view sourceprint?1 enum
- {
- TASKLET_STATE_SCHED, /* Tasklet is scheduled for execution */
- TASKLET_STATE_RUN /* Tasklet is running (SMP only) */
- };
(3)原子計數count:對這個tasklet的引用計數值。NOTE!只有當count等于0時,tasklet代碼段才能執行,也即此時tasklet是被使能的;如果count非零,則這個tasklet是被禁止的。任何想要執行一個tasklet代碼段的人都首先必須先檢查其count成員是否為0。
(4)函數指針func:指向以函數形式表現的可執行tasklet代碼段。
(5)data:函數func的參數。這是一個32位的無符號整數,其具體含義可供func函數自行解釋,比如將其解釋成一個指向某個用戶自定義數據結構的地址值。
Linux在interrupt.h頭文件中又定義了兩個用來定義 tasklet_struct 結構變量的輔助宏:
- view sourceprint?1 #define DECLARE_TASKLET(name, func, data)
- struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }
- #define DECLARE_TASKLET_DISABLED(name, func, data)
- struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }
顯然,從上述源代碼可以看出,用 DECLARE_TASKLET 宏定義的tasklet在初始化時是被使能的(enabled),因為其count成員為0。而用 DECLARE_TASKLET_DISABLED 宏定義的tasklet在初始時是被禁止的(disabled),因為其count等于1。
在這里,tasklet狀態指兩個方面:1. state成員所表示的運行狀態;2. count成員決定的使能/禁止狀態。
#p#
(1)改變一個tasklet的運行狀態 state 成員中的bit[0]表示一個tasklet是否已被調度去等待執行,bit[1]表示一個tasklet是否正在某個CPU上執行。對于 state 變量中某位的改變必須是一個原子操作,因此可以用定義在include/asm/bitops.h頭文件中的位操作來進行。
由于bit[1]這一位(即TASKLET_STATE_RUN)僅僅對于SMP系統才有意義,因此Linux在Interrupt.h頭文件中顯示地定義了對TASKLET_STATE_RUN位的操作。如下所示:
- view sourceprint?1 #ifdef CONFIG_SMP
- #define tasklet_trylock(t) (!test_and_set_bit(TASKLET_STATE_RUN, &(t)->state))
- #define tasklet_unlock_wait(t) while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { /* NOTHING */ }
- #define tasklet_unlock(t) clear_bit(TASKLET_STATE_RUN, &(t)->state)
- #else
- #define tasklet_trylock(t) 1
- #define tasklet_unlock_wait(t) do { } while (0)
- #define tasklet_unlock(t) do { } while (0)
- #endif
顯然,在SMP系統同,tasklet_trylock() 宏將把一個 tasklet_struct 結構變量中的state成員中的bit[1]位設置成1,同時還返回bit[1]位的非。因此,如果bit[1]位原有值為1(表示另外一個CPU正在執行這個tasklet代碼),那么tasklet_trylock()宏將返回值0,也就表示上鎖不成功。如果bit[1]位的原有值為0,那么tasklet_trylock()宏將返回值1,表示加鎖成功。而在單CPU系統中,tasklet_trylock()宏總是返回為1。
任何想要執行某個tasklet代碼的程序都必須首先調用宏 tasklet_trylock() 來試圖對這個tasklet進行上鎖(即設置TASKLET_STATE_RUN位),且只能在上鎖成功的情況下才能執行這個tasklet。建議!即使你的程序只在單 CPU 系統上運行,你也要在執行tasklet之前調用tasklet_trylock()宏,以便使你的代碼獲得良好可移植性。
在SMP系統中,tasklet_unlock_wait() 宏將一直不停地測試 TASKLET_STATE_RUN 位的值,直到該位的值變為0(即一直等待到解鎖),假如:CPU0正在執行tasklet A的代碼,在此期間,CPU1也想執行tasklet A的代碼,但CPU1發現tasklet A 的 TASKLET_STATE_RUN 位為1,于是它就可以通過 tasklet_unlock_wait() 宏等待tasklet A被解鎖(也即TASKLET_STATE_RUN位被清零)。在單CPU系統中,這是一個空操作。
宏 tasklet_unlock() 用來對一個 tasklet 進行解鎖操作,也即將TASKLET_STATE_RUN位清零。在單CPU系統中,這是一個空操作。
(2)使能/禁止一個tasklet
使能與禁止操作往往總是成對地被調用的,tasklet_disable() 函數如下
- (interrupt.h):
- view sourceprint?01 static inline void tasklet_disable(struct tasklet_struct *t)
- {
- tasklet_disable_nosync(t);
- tasklet_unlock_wait(t);
- }
- // 函數tasklet_disable_nosync()也是一個靜態inline函數,它簡單地通過原子操作將count成員變量的值減1。如下所示(interrupt.h):
- static inline void tasklet_disable_nosync(struct tasklet_struct *t)
- {
- atomic_inc(&t->count);
- }
- // 函數tasklet_enable()用于使能一個tasklet,如下所示(interrupt.h):
- static inline void tasklet_enable(struct tasklet_struct *t)
- {
- atomic_dec(&t->count);
- }
- // 函數tasklet_init()用來初始化一個指定的tasklet描述符,其源碼如下所示(kernel/softirq.c):
- void tasklet_init(struct tasklet_struct *t,
- void (*func)(unsigned long),
- unsigned long data)
- {
- t->funcfunc = func;
- t->datadata = data;
- t->state = 0;
- atomic_set(&t->count, 0);
- }
- // 函數tasklet_kill()用來將一個已經被調度了的tasklet殺死,即將其恢復到未調度的狀態。其源碼如下所示(kernel/softirq.c):
- void tasklet_kill(struct tasklet_struct *t)
- {
- if (in_interrupt())
- printk("Attempt to kill tasklet from interruptn");
- while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
- current->state = TASK_RUNNING;
- do {
- current->policy |= SCHED_YIELD;
- schedule();
- } while (test_bit(TASKLET_STATE_SCHED, &t->state));
- }
- tasklet_unlock_wait(t);
- clear_bit(TASKLET_STATE_SCHED, &t->state);
- }
- // 多個tasklet可以通過tasklet描述符中的next成員指針鏈接成一個單向對列。為此,Linux專門在頭文件include/linux/interrupt.h中定義了數據結構tasklet_head來描述一個tasklet對列的頭部指針。如下所示:
- struct tasklet_head
- {
- struct tasklet_struct *list;
- } __attribute__ ((__aligned__(SMP_CACHE_BYTES)));
盡管 tasklet 機制是特定于軟中斷向量HI_SOFTIRQ和TASKLET_SOFTIRQ的一種實現,但是tasklet機制仍然屬于softirq機制的整體框架范圍內的,因此,它的設計與實現仍然必須堅持“誰觸發,誰執行”的思想。為此,Linux為系統中的每一個CPU都定義了一個 tasklet 對列頭部,來表示應該有各個CPU負責執行的tasklet對列。如下所示(kernel/softirq.c):
- struct tasklet_head tasklet_vec[NR_CPUS] __cacheline_aligned;struct tasklet_head tasklet_hi_vec[NR_CPUS] __cacheline_aligned;
#p#
其中,tasklet_vec[]數組用于軟中斷向量TASKLET_SOFTIRQ,而tasklet_hi_vec[]數組則用于軟中斷向量HI_SOFTIRQ。也即,如果CPUi(0≤i≤NR_CPUS-1)觸發了軟中斷向量TASKLET_SOFTIRQ,那么對列tasklet_vec[i]中的每一個tasklet都將在CPUi服務于軟中斷向量TASKLET_SOFTIRQ時被CPUi所執行。同樣地,如果CPUi(0≤i≤NR_CPUS-1)觸發了軟中斷向量HI_SOFTIRQ,那么隊列tasklet_vec[i]中的每一個tasklet都將CPUi在對軟中斷向量HI_SOFTIRQ進行服務時被CPUi所執行。
隊列tasklet_vec[I]和tasklet_hi_vec[I]中的各個tasklet是怎樣被所CPUi所執行的呢?其關鍵就是軟中斷向量TASKLET_SOFTIRQ和HI_SOFTIRQ的軟中斷服務程序——tasklet_action()函數和tasklet_hi_action()函數。下面我們就來分析這兩個函數。
Linux為軟中斷向量TASKLET_SOFTIRQ和HI_SOFTIRQ實現了專用的觸發函數和軟中斷服務函數。其中,tasklet_schedule() 函數和 tasklet_hi_schedule() 函數分別用來在當前CPU上觸發軟中斷向量TASKLET_SOFTIRQ 和 HI_SOFTIRQ,并把指定的tasklet 加入當前CPU所對應的 tasklet 隊列中去等待執行。而tasklet_action() 函數和 tasklet_hi_action() 函數則分別是軟中斷向量 TASKLET_SOFTIRQ 和 HI_SOFTIRQ 的軟中斷服務函數。在初始化函數 softirq_init() 中,這兩個軟中斷向量對應的描述符softirq_vec[0]和softirq_vec[3]中的action函數指針就被分別初始化成指向函數 tasklet_hi_action() 和函數 tasklet_action()。
(1)軟中斷向量TASKLET_SOFTIRQ的觸發函數 tasklet_schedule()
該函數實現在include/linux/interrupt.h頭文件中,是一個 inline 函數。其源碼如下所示:
- view sourceprint?01 static inline void tasklet_schedule(struct tasklet_struct *t)
- {
- if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
- int cpu = smp_processor_id();
- unsigned long flags;
- local_irq_save(flags);
- t->next = tasklet_vec[cpu].list;
- tasklet_vec[cpu].list = t;
- __cpu_raise_softirq(cpu, TASKLET_SOFTIRQ);
- local_irq_restore(flags);
- }
- }
該函數的參數t指向要在當前CPU上被執行的 tasklet。對該函數的NOTE如下:
①調用test_and_set_bit()函數將待調度的 tasklet 的state成員變量的bit[0]位(也即TASKLET_STATE_SCHED位)設置為1,該函數同時還返回TASKLET_STATE_SCHED位的原有值。因此如果bit[0]為的原有值已經為1,那就說明這個tasklet已經被調度到另一個CPU上去等待執行了。由于一個tasklet在某一個時刻只能由一個CPU來執行,因此tasklet_schedule()函數什么也不做就直接返回了。否則,就繼續下面的調度操作。
②首先,調用 local_irq_save() 函數來關閉當前CPU的中斷,以保證下面的步驟在當前CPU上原子地被執行。
③然后,將待調度的 tasklet 添加到當前CPU對應的 tasklet 隊列的首部。
④接著,調用 __cpu_raise_softirq() 函數在當前CPU上觸發軟中斷請求TASKLET_SOFTIRQ。
⑤***,調用local_irq_restore() 函數來開當前CPU的中斷。
(2)軟中斷向量TASKLET_SOFTIRQ的服務程序tasklet_action()
函數tasklet_action()是tasklet機制與軟中斷向量TASKLET_SOFTIRQ的聯系紐帶。正是該函數
- view sourceprint?01 static inline void tasklet_hi_schedule(struct tasklet_struct *t)
- {
- if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
- int cpu = smp_processor_id();
- unsigned long flags;
- local_irq_save(flags);
- t->next = tasklet_hi_vec[cpu].list;
- tasklet_hi_vec[cpu].list = t;
- __cpu_raise_softirq(cpu, HI_SOFTIRQ);
- local_irq_restore(flags);
- }
- }
將當前CPU的tasklet隊列中的各個tasklet放到當前CPU上來執行的。該函數實現在kernel/softirq.c文件中,其源代碼如下:
- view sourceprint?01 static void tasklet_action(struct softirq_action *a)
- {
- int cpu = smp_processor_id();
- struct tasklet_struct *list;
- local_irq_disable();
- list = tasklet_vec[cpu].list;
- tasklet_vec[cpu].list = NULL;
- local_irq_enable();
- while (list != NULL) {
- struct tasklet_struct *t = list;
- listlist = list->next;
- if (tasklet_trylock(t)) {
- if (atomic_read(&t->count) == 0) {
- clear_bit(TASKLET_STATE_SCHED, &t->state);
- t->func(t->data);
- /*
- * talklet_trylock() uses test_and_set_bit that imply
- * an mb when it returns zero, thus we need the explicit
- * mb only here: while closing the critical section.
- */
- #ifdef CONFIG_SMP
- smp_mb__before_clear_bit();
- #endif
- tasklet_unlock(t);
- continue;
- }
- tasklet_unlock(t);
- }
- local_irq_disable();
- t->next = tasklet_vec[cpu].list;
- tasklet_vec[cpu].list = t;
- __cpu_raise_softirq(cpu, TASKLET_SOFTIRQ);
- local_irq_enable();
- }
- }
注釋如下:
①首先,在當前CPU關中斷的情況下,“原子”地讀取當前CPU的tasklet隊列頭部指針,將其保存到局部變量list指針中,然后將當前CPU的tasklet隊列頭部指針設置為NULL,以表示理論上當前CPU將不再有tasklet需要執行(但***的實際結果卻并不一定如此,下面將會看到)。
②然后,用一個while{}循環來遍歷由list所指向的tasklet隊列,隊列中的各個元素就是將在當前CPU上執行的tasklet。循環體的執行步驟如下:
用指針t來表示當前隊列元素,即當前需要執行的tasklet。
更新list指針為list->next,使它指向下一個要執行的tasklet。
用tasklet_trylock()宏試圖對當前要執行的tasklet(由指針t所指向)進行加鎖,如果加鎖成功(當前沒有任何其他CPU正在執行這個tasklet),則用原子讀函數atomic_read()進一步判斷count成員的值。如果count為0,說明這個tasklet是允許執行的,于是:a先清除TASKLET_STATE_SCHED位;然后,調用這個tasklet的可執行函數func;執行barrier()操作;調用宏tasklet_unlock()來清除TASKLET_STATE_RUN位。***,執行continue語句跳過下面的步驟,回到while循環繼續遍歷隊列中的下一個元素。如果count不為0,說明這個tasklet是禁止運行的,于是調用tasklet_unlock()清除前面用tasklet_trylock()設置的TASKLET_STATE_RUN位。
如果tasklet_trylock()加鎖不成功,或者因為當前tasklet的count值非0而不允許執行時,我們必須將這個tasklet重新放回到當前CPU的tasklet隊列中,以留待這個CPU下次服務軟中斷向量TASKLET_SOFTIRQ時再執行。為此進行這樣幾步操作:先關CPU中斷,以保證下面操作的原子性。把這個tasklet重新放回到當前CPU的tasklet隊列的首部;調用__cpu_raise_softirq()函數在當前CPU上再觸發一次軟中斷請求TASKLET_SOFTIRQ;開中斷。 ***,回到while循環繼續遍歷隊列。
#p#
(3)軟中斷向量HI_SOFTIRQ的觸發函數tasklet_hi_schedule()
該函數與tasklet_schedule()幾乎相同,其源碼如下(include/linux/interrupt.h):
- view sourceprint?01 static inline void tasklet_hi_schedule(struct tasklet_struct *t)
- {
- if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
- int cpu = smp_processor_id();
- unsigned long flags;
- local_irq_save(flags);
- t->next = tasklet_hi_vec[cpu].list;
- tasklet_hi_vec[cpu].list = t;
- __cpu_raise_softirq(cpu, HI_SOFTIRQ);
- local_irq_restore(flags);
- }
- }
(4)軟中斷向量HI_SOFTIRQ的服務函數tasklet_hi_action()
該函數與tasklet_action()函數幾乎相同,其源碼如下(kernel/softirq.c):
- view sourceprint?01 static void tasklet_hi_action(struct softirq_action *a)
- {
- int cpu = smp_processor_id();
- struct tasklet_struct *list;
- local_irq_disable();
- list = tasklet_hi_vec[cpu].list;
- tasklet_hi_vec[cpu].list = NULL;
- local_irq_enable();
- while (list != NULL)
- {
- struct tasklet_struct *t = list;
- listlist = list->next;
- if (tasklet_trylock(t)) {
- if (atomic_read(&t->count) == 0) {
- clear_bit(TASKLET_STATE_SCHED, &t->state);
- t->func(t->data);
- tasklet_unlock(t);
- continue;
- }
- tasklet_unlock(t);
- }
- local_irq_disable();
- t->next = tasklet_hi_vec[cpu].list;
- tasklet_hi_vec[cpu].list = t;
- __cpu_raise_softirq(cpu, HI_SOFTIRQ);
- local_irq_enable();
- }
- }
Bottom Half 機制在新的softirq機制中被保留下來,并作為softirq框架的一部分。其實現也似乎更為復雜些,因為它是通過 tasklet 機制這個中介橋梁來納入softirq框架中的。實際上,軟中斷向量 HI_SOFTIRQ 是內核專用于執行BH函數的。原有的32個BH函數指針被保留,定義在kernel/softirq.c文件中:static void (*bh_base[32])(void);
但是,每個BH函數都對應有一個tasklet,并由tasklet的可執行函數func來負責調用相應的bh函數(func函數的參數指定調用哪一個BH函數)。與32個BH函數指針相對應的tasklet的定義如下所示(kernel/softirq.c):struct tasklet_struct bh_task_vec[32];
上述tasklet數組使系統全局的,它對所有的CPU均可見。由于在某一個時刻只能有一個CPU在執行BH函數,因此定義一個全局的自旋鎖來保護BH函數,如下所示(kernel/softirq.c):spinlock_t global_bh_lock = SPIN_LOCK_UNLOCKED;
在softirq機制的初始化函數softirq_init()中將bh_task_vec[32]數組中的每一個tasklet中的func函數指針都設置為指向同一個函數bh_action,而data成員(也即func函數的調用參數)則被設置成該tasklet在數組中的索引值。因此,bh_action()函數將負責相應地調用參數所指定的bh函數。該函數是連接 tasklet機制與Bottom Half機制的關鍵所在。
該函數的源碼如下(kernel/softirq.c):
- view sourceprint?1 void __init softirq_init()
- 2 {
- 3 ……
- 4 for (i=0; i<32; i++)
- 5 tasklet_init(bh_task_vec+i, bh_action, i);
- 6 ……
- 7 }
- view sourceprint?01 static void bh_action(unsigned long nr)
- {
- int cpu = smp_processor_id();
- if (!spin_trylock(&global_bh_lock))
- goto resched;
- if (!hardirq_trylock(cpu))
- goto resched_unlock;
- if (bh_base[nr])
- bh_base[nr]();
- hardirq_endlock(cpu);
- spin_unlock(&global_bh_lock);
- return;
- resched_unlock:
- spin_unlock(&global_bh_lock);
- resched:
- mark_bh(nr);
- }
對該函數的注釋如下:
①首先,調用spin_trylock()函數試圖對自旋鎖global_bh_lock進行加鎖,同時該函數還將返回自旋鎖global_bh_lock的原有值的非。因此,如果global_bh_lock已被某個CPU上鎖而為非0值(那個CPU肯定在執行某個BH函數),那么spin_trylock()將返回為0表示上鎖失敗,在這種情況下,當前CPU是不能執行BH函數的,因為另一個CPU正在執行BH函數,于是執行goto語句跳轉到resched程序段,以便在當前CPU上再一次調度該BH函數。
②調用hardirq_trylock()函數鎖定當前CPU,確保當前CPU不是處于硬件中斷請求服務中,如果鎖定失敗,跳轉到resched_unlock程序段,以便先對global_bh_lock解鎖,在重新調度一次該BH函數。
③此時,我們已經可以放心地在當前CPU上執行BH函數了。當然,對應的BH函數指針bh_base[nr]必須有效才行。
④從BH函數返回后,先調用hardirq_endlock()函數(實際上它什么也不干,調用它只是為了保此加、解鎖的成對關系),然后解除自旋鎖global_bh_lock,***函數就可以返回了。
⑤resched_unlock程序段:先解除自旋鎖global_bh_lock,然后執行reched程序段。
⑥resched程序段:當某個CPU正在執行BH函數時,當前CPU就不能通過bh_action()函數來調用執行任何BH函數,所以就通過調用mark_bh()函數在當前CPU上再重新調度一次,以便將這個BH函數留待下次軟中斷服務時執行。
(1)init_bh()函數
該函數用來在bh_base[]數組登記一個指定的bh函數,如下所示(kernel/softirq.c):
- view sourceprint?1 void init_bh(int nr, void (*routine)(void))
- {
- bh_base[nr] = routine;
- mb();
- }
(2)remove_bh()函數
該函數用來在bh_base[]數組中注銷指定的函數指針,同時將相對應的tasklet殺掉。
如下所示(kernel/softirq.c):
- view sourceprint?1 void remove_bh(int nr)
- {
- tasklet_kill(bh_task_vec+nr);
- bh_base[nr] = NULL;
- }
(3)mark_bh()函數
該函數用來向當前CPU標記由一個BH函數等待去執行。它實際上通過調用tasklet_hi_schedule()函數將相應的tasklet加入到當前CPU的tasklet隊列tasklet_hi_vec[cpu]中,然后觸發軟中斷請求HI_SOFTIRQ,如下所示(include/linux/interrupt.h):
- view sourceprint?1 static inline void mark_bh(int nr)
- {
- tasklet_hi_schedule(bh_task_vec+nr);
- }
在32個BH函數指針中,大多數已經固定用于一些常見的外設,比如:第0個BH函數就固定地用于時鐘中斷。Linux在頭文件include/linux/interrupt.h中定義了這些已經被使用的BH函數所引,如下所示:
- view sourceprint?01 enum
- {
- TIMER_BH = 0,
- TQUEUE_BH,
- DIGI_BH,
- SERIAL_BH,
- RISCOM8_BH,
- SPECIALIX_BH,
- AURORA_BH,
- ESP_BH,
- SCSI_BH,
- IMMEDIATE_BH,
- CYCLADES_BH,
- CM206_BH,
- JS_BH,
- MACSERIAL_BH,
- ISICOM_BH
- };
從以上的例子可以看出,所謂小任務機制就是為下半部分函數提供的一種執行機制,也就是說推遲處理的事情由tasklet_handler實現。經過小任務封裝以后再交給內核去處理。以上就是tasklet機制在linux中的實現 ,使得tasklet機制與一般意義上的軟中斷有所不同
【編輯推薦】