隨著寫程式經驗越來越多,我們開始會碰到一些需要等待一段時間才能完成任務, 例如 1. 利用 API 或是 request 批量下載大量資料 2. 需要大量使用 CPU 進行數學運算的工作。 這時候我們就可以考慮是否適合使用平行運算技術進行加速。常見的有多線程(multithreading)以及多進程(multiprocessing)。 本文主要介紹以下三點:
多進程與多線程的差異
Python 實作多線程
以多線程改寫加速現有爬蟲程式
What are Multiprocessing and Multithreading? Difference between process and threads 首先,什麼是進程 Process?什麼是執行緒 Threads?如果讀者是使用的 Mac 的話,可以在「活動監視器」的 CPU 中,看到右下角呈現目前執行的執行緒與程序數量。
Process 簡單來說進程就是正在運行中的程式,例如 App, 軟體等。
Thread 則是在 Process 運行中,因為需要同時進行運算、介面互動、資料庫搜索等工作,這些任務即可稱作執行緒 Threads。
Process 就像一間工廠,其中有需多的工具(Files, Data),而 Threads 則為工人,Multiprocessing 代表就多間工廠,各自使用自家的工人(Threads)與工具(Files, Data),而 Multithreading 則像是一間工廠中娉請多個工人加速工廠的產能。
所以多進程 Multiprocessing 代表系統同時運行多個軟體,而多線程 Multithreading 則代表一個 Process 中同時執行多個執行緒的能力。
其中多線程與多進程的差異在於,多線程在一個程序中執行多個執行緒進行不同任務,這些任務的開始時間幾乎同時,並且一個接一個的進行,這會提供一種同時運行的錯覺,但其實並沒有。 而多進程則是以多個 Processes 在多個 CPU 上執行達到真正意義上的平行運算,而這些核心之間並不共享各自的資源, 例如下圖所示,當我今天有一個 func 會運行兩次,其中執行一次時間大約 1 秒,執行運行兩次時間大約 2 秒,而使用 Multithreading 與 Multiprocessing 差異則在於這兩個 func 是否真正的同時並行處理。
CPU Bound and I/O Bound Tasks 上面提到會需要執行很久的任務可大致分為兩種情形,需要大量的運算或是從硬碟、網路存取寫入大量的資料,其中,
CPU Bound 代表高度仰賴 CPU 運算的任務,例如圖片處理、數學運算、機器學習等。
I/O Bound 代表這個工作的等待時間主要在於資料的存取,例如爬蟲、從硬碟載資料,而這部分的操作則不太需要 CPU 的參與。
Multithreading 就非常適合加速 I/O Bound 的任務,將需要載入寫出的資料分配給多個執行緒, 就像是我們在網路上下載照片、影片等等,我們不會一起載一張,載好再載下一張,我們一定都是點完所有的下載連接,再等各自資料一一載完。 但在 Python 會受到 Global Interprter Lock, GIL 特性的影響,GIL 限制 Python 解釋器同時只能有一個 Python 字節碼執行,為了避免執行緒間數據競爭的問題, 在 CPU Bound 任務中,CPU 核心不斷地切換執行緒,但只有一個執行緒能夠執行 Python 代碼使得其他執行緒處於等待時間,而浪費 CPU 時間, 所以在 CPU Bound 任務上,更常會使用 Multiprocessing,利用多個 Processor 加速運算。
Let’s Get Practical: Multithreading in Python! Multithreading 可以使用 Python 標準庫 threading
實現,主要流程有以下:
Create Thread:分配執行的任務,也就是 Python function 給每個執行緒
Start Execution:執行緒開始執行任務
Wait for the thread to complete execution:當我們在第二步開始執行後,如果沒有設置檢查點,程式會直接運行接下來的程式,所以我們需要設置檢查點,等待執行緒完成任務才往後運行後續程式。
Without multithreading 以下程式定義了一個 function load_data_from_somewhere()
,模擬一個下載資料的程式,其中接受一個參數 need_sec
代表這個函數需要執行多久,模擬下載的時間。 我們利用迴圈將 load_data_from_somewhere
運行兩次,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import timestart = time.perf_counter() def load_data_from_somewhere (need_sec=1 ): print ('Downloading ...' ) time.sleep(need_sec) print (f'Done! it takes {need_sec} second(s)' ) for _ in range (2 ): load_data_from_somewhere() finish = time.perf_counter() print ('---' )print (f'Total Cost {round (finish-start, 2 )} second(s)' )
輸出會如下,依序運行了兩次,總共花費兩秒。
1 2 3 4 5 6 Downloading ... Done! it takes 1 second(s) Downloading ... Done! it takes 1 second(s) --- Total Cost 2.01 second(s)
Speeding up by Threading 接下來我們載入 threading
,還記得我們上面提到使用多線程的流程嗎?
Create Thread:透過 theading.Thread
創建執行緒,並透過 target
指定執行的任務
Start Execution:.start()
Wait for the thread to complete execution:.join()
在下面程式中,我創建了兩個執行緒並命名為 worker1, worker2,一樣是執行兩次下載資料的函示,發現這次只需要 1.01 秒了!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import timeimport threadingstart = time.perf_counter() def load_data_from_somewhere (need_sec=1 ): print ('Downloading ...\n' ) time.sleep(need_sec) print (f'Done! it takes {need_sec} second(s)\n' ) worker1 = threading.Thread(target=load_data_from_somewhere) worker2 = threading.Thread(target=load_data_from_somewhere) worker1.start() worker2.start() worker1.join() worker2.join() finish = time.perf_counter() print ('---' )print (f'Total Cost {round (finish-start, 2 )} second(s)' )
1 2 3 4 5 6 7 8 9 10 11 # output Downloading ... Downloading ... Done! it takes 1 second(s) Done! it takes 1 second(s) --- Total Cost 1.01 second(s)
High level API concurrent.futures concurrent.futures 是一個很方便的高級 API 幫助我們更快速地實現多進程與多線程, 使用 ThreadPoolExectuor
,透過 submit
給予任務即可。
在以下的程式我做了三件事情,
透過 spleep_list
動態以 **kwargs
或是 *args
傳入 load_data_from_somewhere
需要的參數。
利用 concurrent.futures.ThreadPoolExecutor()
在 with context 以 .submit()
執行多執行緒任務。
利用迴圈創建多個執行緒。
共執行了五次 load_data_from_somewhere
,耗時只使用了 5 秒鐘。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import concurrent.futuresimport timestart = time.perf_counter() def load_data_from_somewhere (need_sec=1 ): print ('Downloading ...\n' ) time.sleep(need_sec) print (f'Done! it takes {need_sec} second(s)\n' ) sleep_list = [5 , 4 , 3 , 2 , 1 ] with concurrent.futures.ThreadPoolExecutor() as executor: for sec in sleep_list: executor.submit(load_data_from_somewhere, **{"need_sec" : sec}) finish = time.perf_counter() print ('---' )print (f'Total Cost {round (finish-start, 2 )} second(s)' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 #output Downloading ... Downloading ... Downloading ... Downloading ... Downloading ... Done! it takes 1 second(s) Done! it takes 2 second(s) Done! it takes 3 second(s) Done! it takes 4 second(s) Done! it takes 5 second(s) --- Total Cost 5.01 second(s)
How to get the return value of a function from each thread 可以利用 .result()
得到函數的回傳值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import concurrent.futuresimport timestart = time.perf_counter() def load_data_from_somewhere (need_sec=1 ): print ('Downloading ...\n' ) time.sleep(need_sec) return f'Done! it takes {need_sec} second(s)\n' sleep_list = [5 , 4 , 3 , 2 , 1 ] with concurrent.futures.ThreadPoolExecutor() as executor: results = [executor.submit(load_data_from_somewhere, **{"need_sec" : sec}) for sec in sleep_list] for f in concurrent.futures.as_completed(results): print (f.result()) finish = time.perf_counter() print ('---' )print (f'Total Cost {round (finish-start, 2 )} second(s)' )
Speeding up an existing web scraping program by multithreading 接下來我們嘗試改寫一支現有的爬蟲程式,以多線程進行加速。
我寫了一支 function 用以抓取聯合新聞網中,即時新聞的即時列表,並以 while 迴圈動態抓取所有 page 的內容,也就是調整網址中的 page 參數, 並且將抓取來的內容以 pandas.to_csv()
載回本地硬碟。 其中參數:
cn_page:總共要抓幾篇。
init_page:從第幾篇開始抓。
sleep_time:遇到無 response 時睡多久。
break_times_limit:同一網址抓取不到 Response 幾次則放棄。
這個任務就符合上述所提到的 I/O Bound 任務,主要運行的時間在等待 request 回傳內容以及 sleep 與寫出資料,所以很適合使用 multithreading 進行加速, 筆者實測,5000 個 page 原本需要 100+ 分鐘,使用多線程將至 11 分鐘左右。
爬蟲 function:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import requestsimport pandas as pdfrom tqdm import tqdmimport timedef get_url (cn_page: int =10 , init_page: int =0 , sleep_time:float = 0.5 , break_times_limit:int = 2 , save_dir = './data' ) -> pd.DataFrame: result_list = [] untrack_page = [] current = init_page break_times = 0 pbar = tqdm(desc=f'while loop init_page:{init_page} ; cn_page: {cn_page} ' , total=cn_page) while current < init_page+cn_page or break_times > 0 : if break_times >= break_times_limit: untrack_page.append(current) current += 1 break_times = 0 pbar.update(1 ) continue try : url = f"https://udn.com/api/more?page={current} &id=&channelId=1&cate_id=99&type=breaknews&totalRecNo=24215" result = requests.get(url).json()['lists' ] result_list.extend(result) break_times = 0 result_df = pd.DataFrame(result) result_df.to_csv(f"{save_dir} /result_page_{current} .csv" , index=False , encoding='utf_8_sig' ) current += 1 pbar.update(1 ) except : break_times += 1 time.sleep(sleep_time) pass return f'get {current} +{cn_page} '
沒有使用多線程:
1 2 3 4 5 6 7 8 num_workers = 50 cn_page = 5000 step = int (cn_page/num_workers) results = [] for init_page in range (0 , cn_page, step): get_url(**{'cn_page' : step, 'init_page' : init_page})
使用多線程(threading):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import threadingnum_workers = 50 cn_page = 5000 step = int (cn_page/num_workers) workers = [] for init_page in range (0 , cn_page, step): worker = threading.Thread(target=get_url, kwargs={'cn_page' : step, 'init_page' : init_page}) worker.start() workers.append(worker) for worker in workers: worker.join()
使用多線程(concurrent.futures):
1 2 3 4 5 6 7 8 9 10 11 import concurrent.futuresnum_workers = 50 cn_page = 5000 step = int (cn_page/num_workers) with concurrent.futures.ThreadPoolExecutor() as executor: for init_page in range (0 , cn_page, step): executor.submit(get_url, **{'cn_page' : step, 'init_page' : init_page})
此文章同步於 Medium 。 也歡迎大家傳送 Linkedin 連結邀請給我。Linkedin Profile
Refer GeeksforGeeks.org, Difference Between Multithreading vs Multiprocessing in Python Corey Schafer’s Python Threading Tutorial: Run Code Concurrently Using the Threading Module