Python增強的生成器:協程
本文主要介紹python中Enhanced generator即coroutine相關內容,包括基本語法、使用場景、注意事項,以及與其他語言協程實現的異同。
enhanced generator
在上文《Python Yield Generator 詳解》中介紹了yield和generator的使用場景和主意事項,只用到了generator的next方法,事實上generator還有更強大的功能。PEP 342為generator增加了一系列方法來使得generator更像一個協程Coroutine。做主要的變化在于早期的yield只能返回值(作為數據的產生者), 而新增加的send方法能在generator恢復的時候消費一個數值,而去caller(generator的調用著)也可以通過throw在generator掛起的主動拋出異常。
- back_data = yield cur_ret
這段代碼的意思是:當執行到這條語句時,返回cur_ret給調用者;并且當generator通過next()或者send(some_data)方法恢復的時候,將some_data賦值給back_data.例如:
- def gen(data):
- print 'before yield', data
- back_data = yield data
- print 'after resume', back_data
- if __name__ == '__main__':
- g = gen(1)
- print g.next()
- try:
- g.send(0)
- except StopIteration:
- pass
輸出:
- before yield 1
- 1
- after resume 0
兩點需要注意:
- next() 等價于 send(None)
- ***次調用時,需要使用next()語句或是send(None),不能使用send發送一個非None的值,否則會出錯的,因為沒有Python yield語句來接收這個值。
應用場景
當generator可以接受數據(在從掛起狀態恢復的時候) 而不僅僅是返回數據時, generator就有了消費數據(push)的能力。下面的例子來自這里:
- word_map = {}
- def consume_data_from_file(file_name, consumer):
- for line in file(file_name):
- consumer.send(line)
- def consume_words(consumer):
- while True:
- line = yield
- for word in (w for w in line.split() if w.strip()):
- consumer.send(word)
- def count_words_consumer():
- while True:
- word = yield
- if word not in word_map:
- word_map[word] = 0
- word_map[word] += 1
- print word_map
- if __name__ == '__main__':
- cons = count_words_consumer()
- cons.next()
- cons_inner = consume_words(cons)
- cons_inner.next()
- c = consume_data_from_file('test.txt', cons_inner)
- print word_map
上面的代碼中,真正的數據消費者是count_words_consumer,最原始的數據生產者是consume_data_from_file,數據的流向是主動從生產者推向消費者。不過上面第22、24行分別調用了兩次next,這個可以使用一個decorator封裝一下。
- def consumer(func):
- def wrapper(*args,**kw):
- gen = func(*args, **kw)
- gen.next()
- return gen
- wrapper.__name__ = func.__name__
- wrapper.__dict__ = func.__dict__
- wrapper.__doc__ = func.__doc__
- return wrapper
修改后的代碼:
- def consumer(func):
- def wrapper(*args,**kw):
- gen = func(*args, **kw)
- gen.next()
- return gen
- wrapper.__name__ = func.__name__
- wrapper.__dict__ = func.__dict__
- wrapper.__doc__ = func.__doc__
- return wrapper
- word_map = {}
- def consume_data_from_file(file_name, consumer):
- for line in file(file_name):
- consumer.send(line)
- @consumer
- def consume_words(consumer):
- while True:
- line = yield
- for word in (w for w in line.split() if w.strip()):
- consumer.send(word)
- @consumer
- def count_words_consumer():
- while True:
- word = yield
- if word not in word_map:
- word_map[word] = 0
- word_map[word] += 1
- print word_map
- if __name__ == '__main__':
- cons = count_words_consumer()
- cons_inner = consume_words(cons)
- c = consume_data_from_file('test.txt', cons_inner)
- print word_map
- example_with_deco
generator throw
除了next和send方法,generator還提供了兩個實用的方法,throw和close,這兩個方法加強了caller對generator的控制。send方法可以傳遞一個值給generator,throw方法在generator掛起的地方拋出異常,close方法讓generator正常結束(之后就不能再調用next send了)。下面詳細介紹一下throw方法。
- throw(type[, value[, traceback]])
在generator yield的地方拋出type類型的異常,并且返回下一個被yield的值。如果type類型的異常沒有被捕獲,那么會被傳給caller。另外,如果generator不能yield新的值,那么向caller拋出StopIteration異常:
- @consumer
- def gen_throw():
- value = yield
- try:
- yield value
- except Exception, e:
- yield str(e) # 如果注釋掉這行,那么會拋出StopIteration
- if __name__ == '__main__':
- g = gen_throw()
- assert g.send(5) == 5
- assert g.throw(Exception, 'throw Exception') == 'throw Exception'
***次調用send,代碼返回value(5)之后在第5行掛起, 然后generator throw之后會被第6行catch住。如果第7行沒有重新yield,那么會重新拋出StopIteration異常。
注意事項
如果一個生成器已經通過send開始執行,那么在其再次yield之前,是不能從其他生成器再次調度到該生成器
- @consumer
- def funcA():
- while True:
- data = yield
- print 'funcA recevie', data
- fb.send(data * 2)
- @consumer
- def funcB():
- while True:
- data = yield
- print 'funcB recevie', data
- fa.send(data * 2)
- fa = funcA()
- fb = funcB()
- if __name__ == '__main__':
- fa.send(10)
輸出:
- funcA recevie 10
- funcB recevie 20
- ValueError: generator already executing
Generator 與 Coroutine
回到Coroutine,可參見維基百科解釋,而我自己的理解比較簡單(或者片面):程序員可控制的并發流程,不管是進程還是線程,其切換都是操作系統在調度,而對于協程,程序員可以控制什么時候切換出去,什么時候切換回來。協程比進程 線程輕量級很多,較少了上下文切換的開銷。另外,由于是程序員控制調度,一定程度上也能避免一個任務被中途中斷.。協程可以用在哪些場景呢,我覺得可以歸納為非阻塞等待的場景,如游戲編程,異步IO,事件驅動。
Python中,generator的send和throw方法使得generator很像一個協程(coroutine), 但是generator只是一個半協程(semicoroutines),python doc是這樣描述的:
“All of this makes generator functions quite similar to coroutines; they yield multiple times, they have more than one entry point and their execution can be suspended. The only difference is that a generator function cannot control where should the execution continue after it yields; the control is always transferred to the generator’s caller.”
盡管如此,利用enhanced generator也能實現更強大的功能。比如上文中提到的yield_dec的例子,只能被動的等待時間到達之后繼續執行。在某些情況下比如觸發了某個事件,我們希望立即恢復執行流程,而且我們也關心具體是什么事件,這個時候就需要在generator send了。另外一種情形,我們需要終止這個執行流程,那么刻意調用close,同時在代碼里面做一些處理,偽代碼如下:
- @yield_dec
- def do(a):
- print 'do', a
- try:
- event = yield 5
- print 'post_do', a, event
- finally:
- print 'do sth'
至于之前提到的另一個例子,服務(進程)之間的異步調用,也是非常適合實用協程的例子。callback的方式會割裂代碼,把一段邏輯分散到多個函數,協程的方式會好很多,至少對于代碼閱讀而言。其他語言,比如C#、Go語言,協程都是標準實現,特別對于go語言,協程是高并發的基石。在python3.x中,通過asyncio和async\await也增加了對協程的支持。在筆者所使用的2.7環境下,也可以使用greenlet,之后會有博文介紹。
參考
- https://www.python.org/dev/peps/pep-0342/
- http://www.dabeaz.com/coroutines/
- https://en.wikipedia.org/wiki/Coroutine#Implementations_for_Python