成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Python一行代碼完成并行任務

開發 后端
眾所周知,Python的并行處理能力很不理想。我認為如果不考慮線程和GIL的標準參數(它們大多是合法的),其原因不是因為技術不到位,而是我們的使用方法不恰當。大多數關于Python線程和多進程的教材雖然都很出色,但是內容繁瑣冗長。它們的確在開篇鋪陳了許多有用信息,但往往都不會涉及真正能提高日常工作的部分。

眾所周知,Python的并行處理能力很不理想。我認為如果不考慮線程和GIL的標準參數(它們大多是合法的),其原因不是因為技術不到位,而是我們的使用方法不恰當。大多數關于Python線程和多進程的教材雖然都很出色,但是內容繁瑣冗長。它們的確在開篇鋪陳了許多有用信息,但往往都不會涉及真正能提高日常工作的部分。

經典例子

DDG上以“Python threading tutorial (Python線程教程)”為關鍵字的熱門搜索結果表明:幾乎每篇文章中給出的例子都是相同的類+隊列。

事實上,它們就是以下這段使用producer/Consumer來處理線程/多進程的代碼示例:

  1. #Example.py 
  2.  
  3. ''
  4.  
  5.     Standard Producer/Consumer Threading Pattern 
  6.  
  7. ''
  8.  
  9.   
  10.  
  11. import time 
  12.  
  13. import threading 
  14.  
  15. import Queue 
  16.  
  17.   
  18.  
  19. class Consumer(threading.Thread): 
  20.  
  21. def __init__(self, queue): 
  22.  
  23.     threading.Thread.__init__(self) 
  24.  
  25.     self._queue = queue 
  26.  
  27.   
  28.  
  29. def run(self): 
  30.  
  31.     while True
  32.  
  33.         # queue.get() blocks the current thread until 
  34.  
  35.         # an item is retrieved. 
  36.  
  37.         msg = self._queue.get() 
  38.  
  39.         # Checks if the current message is 
  40.  
  41.         # the "Poison Pill" 
  42.  
  43.         if isinstance(msg, str) and msg == 'quit'
  44.  
  45.             # if so, exists the loop 
  46.  
  47.             break 
  48.  
  49.         # "Processes" (or in our case, prints) the queue item 
  50.  
  51.         print "I'm a thread, and I received %s!!" % msg 
  52.  
  53.         # Always be friendly! 
  54.  
  55.     print 'Bye byes!' 
  56.  
  57.   
  58.  
  59. def Producer(): 
  60.  
  61.     # Queue is used to share items between 
  62.  
  63.     # the threads. 
  64.  
  65.     queue = Queue.Queue() 
  66.  
  67.   
  68.  
  69.     # Create an instance of the worker 
  70.  
  71.     worker = Consumer(queue) 
  72.  
  73.     # start calls the internal run() method to 
  74.  
  75.     # kick off the thread 
  76.  
  77.     worker.start() 
  78.  
  79.   
  80.  
  81.     # variable to keep track of when we started 
  82.  
  83.     start_time = time.time() 
  84.  
  85.     # While under 5 seconds.. 
  86.  
  87.     while time.time() - start_time < 5: 
  88.  
  89.         # "Produce" a piece of work and stick it in 
  90.  
  91.         # the queue for the Consumer to process 
  92.  
  93.         queue.put('something at %s' % time.time()) 
  94.  
  95.     # Sleep a bit just to avoid an absurd number of messages 
  96.  
  97.     time.sleep(1) 
  98.  
  99.   
  100.  
  101.     # This the "poison pill" method of killing a thread. 
  102.  
  103.     queue.put('quit'
  104.  
  105.     # wait for the thread to close down 
  106.  
  107.     worker.join() 
  108.  
  109.   
  110.  
  111. if __name__ == '__main__'
  112.  
  113. Producer()  

唔…….感覺有點像Java。

我現在并不想說明使用Producer / Consume來解決線程/多進程的方法是錯誤的——因為它肯定正確,而且在很多情況下它是最佳方法。但我不認為這是平時寫代碼的最佳選擇。

它的問題所在(個人觀點)

首先,你需要創建一個樣板式的鋪墊類。然后,你再創建一個隊列,通過其傳遞對象和監管隊列的兩端來完成任務。(如果你想實現數據的交換或存儲,通常還涉及另一個隊列的參與)。

Worker越多,問題越多。

接下來,你應該會創建一個worker類的pool來提高Python的速度。下面是IBM tutorial給出的較好的方法。這也是程序員們在利用多線程檢索web頁面時的常用方法。

  1. #Example2.py 
  2.  
  3. ""
  4.  
  5. A more realistic thread pool example 
  6.  
  7. ""
  8.  
  9.   
  10.  
  11. import time 
  12.  
  13. import threading 
  14.  
  15. import Queue 
  16.  
  17. import urllib2 
  18.  
  19.   
  20.  
  21. class Consumer(threading.Thread): 
  22.  
  23.     def __init__(self, queue): 
  24.  
  25.         threading.Thread.__init__(self) 
  26.  
  27.         self._queue = queue 
  28.  
  29.   
  30.  
  31.     def run(self): 
  32.  
  33.         while True
  34.  
  35.             content = self._queue.get() 
  36.  
  37.             if isinstance(content, str) and content == "quit"
  38.  
  39.                 break 
  40.  
  41.             response = urllib2.urlopen(content) 
  42.  
  43.        print "Bye byes!" 
  44.  
  45.   
  46.  
  47. def Producer(): 
  48.  
  49.     urls = [ 
  50.  
  51.          "http://www.python.org&#039;, &#039;http://www.yahoo.com"
  52.  
  53.         "http://www.scala.org&#039;, &#039;http://www.google.com"
  54.  
  55.     # etc.. 
  56.  
  57.     ] 
  58.  
  59.     queue = Queue.Queue() 
  60.  
  61.     worker_threads = build_worker_pool(queue, 4) 
  62.  
  63.     start_time = time.time() 
  64.  
  65.   
  66.  
  67.     # Add the urls to process 
  68.  
  69.     for url in urls: 
  70.  
  71.         queue.put(url)   
  72.  
  73.     # Add the poison pillv 
  74.  
  75.     for worker in worker_threads: 
  76.  
  77.         queue.put("quit"
  78.  
  79.     for worker in worker_threads: 
  80.  
  81.         worker.join() 
  82.  
  83.   
  84.  
  85.     print "Done! Time taken: {}".format(time.time() - start_time) 
  86.  
  87.   
  88.  
  89. def build_worker_pool(queue, size): 
  90.  
  91.     workers = [] 
  92.  
  93.     for _ in range(size): 
  94.  
  95.         worker = Consumer(queue) 
  96.  
  97.         worker.start() 
  98.  
  99.         workers.append(worker) 
  100.  
  101.     return workers 
  102.  
  103.   
  104.  
  105. if __name__ == &#039;__main__&#039;: 
  106.  
  107.     Producer()  

它的確能運行,但是這些代碼多么復雜阿!它包括了初始化方法、線程跟蹤列表以及和我一樣容易在死鎖問題上出錯的人的噩夢——大量的join語句。而這些還僅僅只是繁瑣的開始!

我們目前為止都完成了什么?基本上什么都沒有。上面的代碼幾乎一直都只是在進行傳遞。這是很基礎的方法,很容易出錯(該死,我剛才忘了在隊列對象上還需要調用task_done()方法(但是我懶得修改了)),性價比很低。還好,我們還有更好的方法。

介紹:Map

Map是一個很棒的小功能,同時它也是Python并行代碼快速運行的關鍵。給不熟悉的人講解一下吧,map是從函數語言Lisp來的。map函數能夠按序映射出另一個函數。例如

  1. urls = ['http://www.yahoo.com''http://www.reddit.com'
  2.  
  3. results = map(urllib2.urlopen, urls)  

這里調用urlopen方法來把調用結果全部按序返回并存儲到一個列表里。就像:

  1. results = [] 
  2.  
  3. for url in urls: 
  4.  
  5. results.append(urllib2.urlopen(url))  

Map按序處理這些迭代。調用這個函數,它就會返回給我們一個按序存儲著結果的簡易列表。

為什么它這么厲害呢?因為只要有了合適的庫,map能使并行運行得十分流暢! 

 

 

 

有兩個能夠支持通過map函數來完成并行的庫:一個是multiprocessing,另一個是鮮為人知但功能強大的子文件:multiprocessing.dummy。

題外話:這個是什么?你從來沒聽說過dummy多進程庫?我也是最近才知道的。它在多進程的說明文檔里面僅僅只被提到了一句。而且那一句就是大概讓你知道有這么個東西的存在。我敢說,這樣幾近拋售的做法造成的后果是不堪設想的!

Dummy就是多進程模塊的克隆文件。唯一不同的是,多進程模塊使用的是進程,而dummy則使用線程(當然,它有所有Python常見的限制)。也就是說,數據由一個傳遞給另一個。這能夠使得數據輕松的在這兩個之間進行前進和回躍,特別是對于探索性程序來說十分有用,因為你不用確定框架調用到底是IO 還是CPU模式。

準備開始

要做到通過map函數來完成并行,你應該先導入裝有它們的模塊:

  1. from multiprocessing import Pool 
  2.  
  3. from multiprocessing.dummy import Pool as ThreadPool  

再初始化:

  1. pool = ThreadPool() 

這簡單的一句就能代替我們的build_worker_pool 函數在example2.py中的所有工作。換句話說,它創建了許多有效的worker,啟動它們來為接下來的工作做準備,以及把它們存儲在不同的位置,方便使用。

Pool對象需要一些參數,但最重要的是:進程。它決定pool中的worker數量。如果你不填的話,它就會默認為你電腦的內核數值。

如果你在CPU模式下使用多進程pool,通常內核數越大速度就越快(還有很多其它因素)。但是,當進行線程或者處理網絡綁定之類的工作時,情況會比較復雜所以應該使用pool的準確大小。

  1. pool = ThreadPool(4) # Sets the pool size to 4 

如果你運行過多線程,多線程間的切換將會浪費許多時間,所以你最好耐心調試出最適合的任務數。

我們現在已經創建了pool對象,馬上就能有簡單的并行程序了,所以讓我們重新寫example2.py中的url opener吧!

  1. import urllib2 
  2.  
  3. from multiprocessing.dummy import Pool as ThreadPool 
  4.  
  5.   
  6.  
  7. urls = [ 
  8.  
  9. 'http://www.python.org'
  10.  
  11. 'http://www.python.org/about/'
  12.  
  13. 'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html'
  14.  
  15. 'http://www.python.org/doc/'
  16.  
  17. 'http://www.python.org/download/'
  18.  
  19. 'http://www.python.org/getit/'
  20.  
  21. 'http://www.python.org/community/'
  22.  
  23. 'https://wiki.python.org/moin/'
  24.  
  25. 'http://planet.python.org/'
  26.  
  27. 'https://wiki.python.org/moin/LocalUserGroups'
  28.  
  29. 'http://www.python.org/psf/'
  30.  
  31. 'http://docs.python.org/devguide/'
  32.  
  33. 'http://www.python.org/community/awards/' 
  34.  
  35. # etc.. 
  36.  
  37.  
  38.   
  39.  
  40. # Make the Pool of workers 
  41.  
  42. pool = ThreadPool(4) 
  43.  
  44. Open the urls in their own threads 
  45.  
  46. and return the results 
  47.  
  48. results = pool.map(urllib2.urlopen, urls) 
  49.  
  50. #close the pool and wait for the work to finish 
  51.  
  52. pool.close() 
  53.  
  54. pool.join()  

看吧!這次的代碼僅用了4行就完成了所有的工作。其中3句還是簡單的固定寫法。調用map就能完成我們前面例子中40行的內容!為了更形象地表明兩種方法的差異,我還分別給它們運行的時間計時。 

 

 

 

結果: 

 

 

 

相當出色!并且也表明了為什么要細心調試pool的大小。在這里,只要大于9,就能使其運行速度加快。

實例2:

生成成千上萬的縮略圖

我們在CPU模式下來完成吧!我工作中就經常需要處理大量的圖像文件夾。其任務之一就是創建縮略圖。這在并行任務中已經有很成熟的方法了。

基礎的單線程創建

  1. import os 
  2.  
  3. import PIL 
  4.  
  5.   
  6.  
  7. from multiprocessing import Pool 
  8.  
  9. from PIL import Image 
  10.  
  11.   
  12.  
  13. SIZE = (75,75) 
  14.  
  15. SAVE_DIRECTORY = 'thumbs' 
  16.  
  17.   
  18.  
  19. def get_image_paths(folder): 
  20.  
  21. return (os.path.join(folder, f) 
  22.  
  23. for f in os.listdir(folder) 
  24.  
  25. if 'jpeg' in f) 
  26.  
  27.   
  28.  
  29. def create_thumbnail(filename): 
  30.  
  31. im = Image.open(filename) 
  32.  
  33. im.thumbnail(SIZE, Image.ANTIALIAS) 
  34.  
  35. base, fname = os.path.split(filename) 
  36.  
  37. save_path = os.path.join(base, SAVE_DIRECTORY, fname) 
  38.  
  39. im.save(save_path) 
  40.  
  41.   
  42.  
  43. if __name__ == '__main__'
  44.  
  45. folder = os.path.abspath( 
  46.  
  47. '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840'
  48.  
  49. os.mkdir(os.path.join(folder, SAVE_DIRECTORY)) 
  50.  
  51.   
  52.  
  53. images = get_image_paths(folder) 
  54.  
  55.   
  56.  
  57. for image in images: 
  58.  
  59.              create_thumbnail(Image)  

對于一個例子來說,這是有點難,但本質上,這就是向程序傳遞一個文件夾,然后將其中的所有圖片抓取出來,并最終在它們各自的目錄下創建和儲存縮略圖。

我的電腦處理大約6000張圖片用了27.9秒。

如果我們用并行調用map來代替for循環的話:

  1. import os 
  2.  
  3. import PIL 
  4.  
  5.   
  6.  
  7. from multiprocessing import Pool 
  8.  
  9. from PIL import Image 
  10.  
  11.   
  12.  
  13. SIZE = (75,75) 
  14.  
  15. SAVE_DIRECTORY = 'thumbs' 
  16.  
  17.   
  18.  
  19. def get_image_paths(folder): 
  20.  
  21. return (os.path.join(folder, f) 
  22.  
  23. for f in os.listdir(folder) 
  24.  
  25. if 'jpeg' in f) 
  26.  
  27.   
  28.  
  29. def create_thumbnail(filename): 
  30.  
  31. im = Image.open(filename) 
  32.  
  33. im.thumbnail(SIZE, Image.ANTIALIAS) 
  34.  
  35. base, fname = os.path.split(filename) 
  36.  
  37. save_path = os.path.join(base, SAVE_DIRECTORY, fname) 
  38.  
  39. im.save(save_path) 
  40.  
  41.   
  42.  
  43. if __name__ == '__main__'
  44.  
  45. folder = os.path.abspath( 
  46.  
  47. '11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840'
  48.  
  49. os.mkdir(os.path.join(folder, SAVE_DIRECTORY)) 
  50.  
  51.   
  52.  
  53. images = get_image_paths(folder) 
  54.  
  55.   
  56.  
  57. pool = Pool() 
  58.  
  59.         pool.map(create_thumbnail,images) 
  60.  
  61.         pool.close() 
  62.  
  63.         pool.join()  

5.6秒!

對于只改變了幾行代碼而言,這是大大地提升了運行速度。這個方法還能更快,只要你將cpu 和 io的任務分別用它們的進程和線程來運行——但也常造成死鎖。總之,綜合考慮到 map這個實用的功能,以及人為線程管理的缺失,我覺得這是一個美觀,可靠還容易debug的方法。

好了,文章結束了。一行完成并行任務。 

責任編輯:龐桂玉 來源: Python開發者
相關推薦

2014-02-12 13:43:50

代碼并行任務

2022-04-09 09:11:33

Python

2020-08-19 10:30:25

代碼Python多線程

2016-12-02 08:53:18

Python一行代碼

2021-11-02 16:25:41

Python代碼技巧

2020-09-28 12:34:38

Python代碼開發

2020-08-12 14:54:00

Python代碼開發

2017-04-05 11:10:23

Javascript代碼前端

2013-11-29 13:14:30

代碼網頁設計

2020-01-10 22:56:56

Python圖像處理Linux

2022-09-28 10:12:50

Python代碼可視化

2024-05-31 13:14:05

2020-09-09 16:00:22

Linux進程

2021-08-31 09:49:37

CPU執行語言

2023-09-12 10:10:57

開發者工具開源

2021-04-19 10:38:06

代碼開發工具

2022-02-23 14:37:48

代碼Pythonbug

2024-11-08 17:22:22

2023-11-10 09:41:44

Python代碼

2022-04-14 07:57:52

Python代碼熱力圖
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产成人免费 | 国产精品视频一区二区三区四区国 | 国产欧美精品一区二区 | 午夜理伦三级理论三级在线观看 | 日日夜夜精品免费视频 | 欧美一区二区在线观看 | 亚洲精品电影 | 欧美成人一区二区 | 在线日韩福利 | 三级黄色网址 | 91网站在线播放 | 国产探花在线精品一区二区 | 一级a爱片性色毛片免费 | 免费观看国产视频在线 | 国产精品成人国产乱一区 | 久久国内精品 | 精品亚洲国产成av人片传媒 | 亚洲福利一区二区 | 成年网站在线观看 | 久热国产精品视频 | 久久99国产精品 | 日日摸夜夜添夜夜添精品视频 | 久久久噜噜噜久久中文字幕色伊伊 | 日韩欧美综合在线视频 | 动漫www.被爆羞羞av44 | 欧美日韩一区二区三区四区五区 | 日韩欧美视频网站 | 国产高清美女一级a毛片久久w | 亚洲网站观看 | 国产精品区一区二区三区 | 97人人干 | 精品国产乱码久久久久久久久 | 在线视频一区二区 | 九九热国产视频 | 精品一区二区三区在线观看 | 欧美aⅴ | 午夜寂寞网站 | 一级高清免费毛片 | 久久精品视频免费看 | 性色的免费视频 | 中文字幕亚洲一区二区三区 |