阻塞和非阻塞的實現
本文轉載自微信公眾號「編程雜技」,作者theanarkh。轉載本文請聯系編程雜技公眾號。
我們可能都已經聽過阻塞非阻塞的概念,本文以tcp中的connect系統調用為例子(基于1.12.13內核,新版的原理類似,但是過程就很復雜了,有時間再分析),分析阻塞和非阻塞是什么并且看他是如何實現的。話不多說,直接開始。
- static int inet_connect(struct socket *sock, struct sockaddr * uaddr,
- int addr_len, int flags)
- {
- struct sock *sk=(struct sock *)sock->data;
- // 調用底層的連接函數,發一個syn包
- err = sk->prot->connect(sk, (struct sockaddr_in *)uaddr, addr_len);
- if (err < 0)
- return(err);
- // 還沒建立連接成功并且是非阻塞的方式,直接返回
- if (sk->state != TCP_ESTABLISHED &&(flags & O_NONBLOCK))
- return(-EINPROGRESS);
- // 早期通過關中斷防止競態情況
- cli();
- // 連接建立中,阻塞當前進程
- while(sk->state == TCP_SYN_SENT || sk->state == TCP_SYN_RECV)
- {
- // 阻塞進程
- interruptible_sleep_on(sk->sleep);
- // 連接失敗
- if(sk->err && sk->protocol == IPPROTO_TCP)
- {
- sti();
- sock->state = SS_UNCONNECTED;
- err = -sk->err;
- sk->err=0;
- return err; /* set by tcp_err() */
- }
- }
- sti();
- // 連接建立
- sock->state = SS_CONNECTED;
- // 返回成功
- return(0);
- }
我們看到connect函數首先會調用tcp層的函數發送一個sync包,然后根據socket的屬性(阻塞非阻塞,可以通過setsocketopt設置)做下一步處理,如果是非阻塞,那么就比較簡單,直接返回給應用層。這也是非阻塞+事件驅動架構中的做法。因為這種架構下通常是單進程的,要避免阻塞進程,那么返回后什么時候才能知道連接成功呢?這就是epoll提供的機制,當連接成功后,tcp層會通知epoll,epoll就會通知應用層。下面我們繼續分析阻塞的過程,interruptible_sleep_on(sk->sleep)。我們看到socket中有一個sleep字段,該字段用于管理隊列。我們看看interruptible_sleep_on
- void interruptible_sleep_on(struct wait_queue **p)
- {
- __sleep_on(p,TASK_INTERRUPTIBLE);
- }
- static inline void __sleep_on(struct wait_queue **p, int state)
- {
- unsigned long flags;
- struct wait_queue wait = { current, NULL };
- current->state = state;
- add_wait_queue(p, &wait);
- save_flags(flags);
- sti();
- schedule();
- remove_wait_queue(p, &wait);
- restore_flags(flags);
- }
這里我們只關注兩個地方add_wait_queue和schedule。add_wait_queue就是把一個節點插入隊列。我們看看wait_queue的定義。
- struct wait_queue {
- struct task_struct * task;
- struct wait_queue * next;
- };
所以add_wait_queue執行完之后架構如下。
接著調用schedule調度其他進程執行,我們發現這時候當前進程的狀態是TASK_INTERRUPTIBLE,所以是不會被調度執行的。這就是進程阻塞的原理,主要是兩個過程
1 加入等待隊列
2 讓出CPU,調度其他進程執行。
我們這個進程什么時候被喚醒呢?我們從收到sync的回包開始分析。具體邏輯在tcp_rcv中。
- if(sk->state==TCP_SYN_SENT)
- {
- /* Crossed SYN or previous junk segment */
- // 發送了syn包,收到ack包說明可能是建立連接的ack包
- if(th->ack)
- {
- // 發送第三次握手的ack包,進入連接建立狀態
- tcp_send_ack(sk->sent_seq,sk->acked_seq,sk,th,sk->daddr);
- tcp_set_state(sk, TCP_ESTABLISHED);
- // 喚醒阻塞在connect函數的進程
- if(!sk->dead)
- {
- // 喚醒進程
- sk->state_change(sk);
- // 給進程發送SIGIO信號
- sock_wake_async(sk->socket, 0);
- }
- }
- }
我們看到收到ack后,tcp層調用state_change回調,state_change的值是def_callback1。
- static void def_callback1(struct sock *sk)
- {
- if(!sk->dead)
- wake_up_interruptible(sk->sleep);
- }
我們看到這里會調用wake_up_interruptible喚醒進程。我們看看實現。
- void wake_up_interruptible(struct wait_queue **q)
- {
- struct wait_queue *tmp;
- struct task_struct * p;
- if (!q || !(tmp = *q))
- return;
- do {
- if ((p = tmp->task) != NULL) {
- if (p->state == TASK_INTERRUPTIBLE) {
- p->state = TASK_RUNNING;
- if (p->counter > current->counter + 3)
- need_resched = 1;
- }
- }
- tmp = tmp->next;
- } while (tmp != *q);
- }
我們看到wake_up_interruptible會喚醒所有進程,這就是導致景群效應的地方,新版內核已經處理了相關問題。另外我們看到,這里這是修改進程為可執行狀態,但是不會立刻調度,要等下一次進程調度的時候才發生進程調度。以上就是進程阻塞和非阻塞的原理。