隨著寫程式經驗越來越多,我們開始會碰到一些需要等待一段時間才能完成任務,
例如 1. 利用 API 或是 request 批量下載大量資料 2. 需要大量使用 CPU 進行數學運算的工作。
這時候我們就可以考慮是否適合使用平行運算技術進行加速。常見的有多線程(multithreading)以及多進程(multiprocessing)。
本文主要介紹以下三點:

  1. 多進程與多線程的差異
  2. Python 實作多線程
  3. 以多線程改寫加速現有爬蟲程式

What are Multiprocessing and Multithreading?

Difference between process and threads

首先,什麼是進程 Process?什麼是執行緒 Threads?如果讀者是使用的 Mac 的話,可以在「活動監視器」的 CPU 中,看到右下角呈現目前執行的執行緒與程序數量。

  • Process 簡單來說進程就是正在運行中的程式,例如 App, 軟體等。
  • Thread 則是在 Process 運行中,因為需要同時進行運算、介面互動、資料庫搜索等工作,這些任務即可稱作執行緒 Threads。

Process 就像一間工廠,其中有需多的工具(Files, Data),而 Threads 則為工人,Multiprocessing 代表就多間工廠,各自使用自家的工人(Threads)與工具(Files, Data),而 Multithreading 則像是一間工廠中娉請多個工人加速工廠的產能。

所以多進程 Multiprocessing 代表系統同時運行多個軟體,而多線程 Multithreading 則代表一個 Process 中同時執行多個執行緒的能力。

其中多線程與多進程的差異在於,多線程在一個程序中執行多個執行緒進行不同任務,這些任務的開始時間幾乎同時,並且一個接一個的進行,這會提供一種同時運行的錯覺,但其實並沒有。
而多進程則是以多個 Processes 在多個 CPU 上執行達到真正意義上的平行運算,而這些核心之間並不共享各自的資源,
例如下圖所示,當我今天有一個 func 會運行兩次,其中執行一次時間大約 1 秒,執行運行兩次時間大約 2 秒,而使用 Multithreading 與 Multiprocessing 差異則在於這兩個 func 是否真正的同時並行處理。

CPU Bound and I/O Bound Tasks

上面提到會需要執行很久的任務可大致分為兩種情形,需要大量的運算或是從硬碟、網路存取寫入大量的資料,其中,

  • CPU Bound 代表高度仰賴 CPU 運算的任務,例如圖片處理、數學運算、機器學習等。
  • I/O Bound 代表這個工作的等待時間主要在於資料的存取,例如爬蟲、從硬碟載資料,而這部分的操作則不太需要 CPU 的參與。

Multithreading 就非常適合加速 I/O Bound 的任務,將需要載入寫出的資料分配給多個執行緒,
就像是我們在網路上下載照片、影片等等,我們不會一起載一張,載好再載下一張,我們一定都是點完所有的下載連接,再等各自資料一一載完。
但在 Python 會受到 Global Interprter Lock, GIL 特性的影響,GIL 限制 Python 解釋器同時只能有一個 Python 字節碼執行,為了避免執行緒間數據競爭的問題,
在 CPU Bound 任務中,CPU 核心不斷地切換執行緒,但只有一個執行緒能夠執行 Python 代碼使得其他執行緒處於等待時間,而浪費 CPU 時間,
所以在 CPU Bound 任務上,更常會使用 Multiprocessing,利用多個 Processor 加速運算。

Let’s Get Practical: Multithreading in Python!

Multithreading 可以使用 Python 標準庫 threading 實現,主要流程有以下:

  1. Create Thread:分配執行的任務,也就是 Python function 給每個執行緒
  2. Start Execution:執行緒開始執行任務
  3. Wait for the thread to complete execution:當我們在第二步開始執行後,如果沒有設置檢查點,程式會直接運行接下來的程式,所以我們需要設置檢查點,等待執行緒完成任務才往後運行後續程式。

Without multithreading

以下程式定義了一個 function load_data_from_somewhere(),模擬一個下載資料的程式,其中接受一個參數 need_sec 代表這個函數需要執行多久,模擬下載的時間。
我們利用迴圈將 load_data_from_somewhere 運行兩次,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time

start = time.perf_counter()

def load_data_from_somewhere(need_sec=1):
print('Downloading ...')
time.sleep(need_sec)
print(f'Done! it takes {need_sec} second(s)')

for _ in range(2):
load_data_from_somewhere()

finish = time.perf_counter()

print('---')
print(f'Total Cost {round(finish-start, 2)} second(s)')

輸出會如下,依序運行了兩次,總共花費兩秒。

1
2
3
4
5
6
Downloading ...
Done! it takes 1 second(s)
Downloading ...
Done! it takes 1 second(s)
---
Total Cost 2.01 second(s)

Speeding up by Threading

接下來我們載入 threading,還記得我們上面提到使用多線程的流程嗎?

  1. Create Thread:透過 theading.Thread 創建執行緒,並透過 target 指定執行的任務
  2. Start Execution:.start()
  3. Wait for the thread to complete execution:.join()

在下面程式中,我創建了兩個執行緒並命名為 worker1, worker2,一樣是執行兩次下載資料的函示,發現這次只需要 1.01 秒了!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time
import threading

start = time.perf_counter()

def load_data_from_somewhere(need_sec=1):
print('Downloading ...\n')
time.sleep(need_sec)
print(f'Done! it takes {need_sec} second(s)\n')

# Create Thread
worker1 = threading.Thread(target=load_data_from_somewhere)
worker2 = threading.Thread(target=load_data_from_somewhere)

# Start Execution Execution
worker1.start()
worker2.start()

# Wait for the thread to complete execution
worker1.join()
worker2.join()

finish = time.perf_counter()

print('---')
print(f'Total Cost {round(finish-start, 2)} second(s)')
1
2
3
4
5
6
7
8
9
10
11
# output

Downloading ...

Downloading ...

Done! it takes 1 second(s)
Done! it takes 1 second(s)

---
Total Cost 1.01 second(s)

High level API concurrent.futures

concurrent.futures 是一個很方便的高級 API 幫助我們更快速地實現多進程與多線程,
使用 ThreadPoolExectuor,透過 submit 給予任務即可。

在以下的程式我做了三件事情,

  1. 透過 spleep_list 動態以 **kwargs 或是 *args 傳入 load_data_from_somewhere 需要的參數。
  2. 利用 concurrent.futures.ThreadPoolExecutor() 在 with context 以 .submit() 執行多執行緒任務。
  3. 利用迴圈創建多個執行緒。

共執行了五次 load_data_from_somewhere,耗時只使用了 5 秒鐘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import concurrent.futures
import time

start = time.perf_counter()

def load_data_from_somewhere(need_sec=1):
print('Downloading ...\n')
time.sleep(need_sec)
print(f'Done! it takes {need_sec} second(s)\n')


sleep_list = [5, 4, 3, 2, 1]
with concurrent.futures.ThreadPoolExecutor() as executor:
for sec in sleep_list:
executor.submit(load_data_from_somewhere, **{"need_sec": sec})

finish = time.perf_counter()

print('---')
print(f'Total Cost {round(finish-start, 2)} second(s)')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#output
Downloading ...

Downloading ...

Downloading ...

Downloading ...

Downloading ...

Done! it takes 1 second(s)

Done! it takes 2 second(s)

Done! it takes 3 second(s)

Done! it takes 4 second(s)

Done! it takes 5 second(s)

---
Total Cost 5.01 second(s)

How to get the return value of a function from each thread

可以利用 .result() 得到函數的回傳值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import concurrent.futures
import time

start = time.perf_counter()

def load_data_from_somewhere(need_sec=1):
print('Downloading ...\n')
time.sleep(need_sec)
return f'Done! it takes {need_sec} second(s)\n'


sleep_list = [5, 4, 3, 2, 1]
with concurrent.futures.ThreadPoolExecutor() as executor:
results = [executor.submit(load_data_from_somewhere, **{"need_sec": sec}) for sec in sleep_list]
for f in concurrent.futures.as_completed(results):
print(f.result())

finish = time.perf_counter()

print('---')
print(f'Total Cost {round(finish-start, 2)} second(s)')

Speeding up an existing web scraping program by multithreading

接下來我們嘗試改寫一支現有的爬蟲程式,以多線程進行加速。

我寫了一支 function 用以抓取聯合新聞網中,即時新聞的即時列表,並以 while 迴圈動態抓取所有 page 的內容,也就是調整網址中的 page 參數,
並且將抓取來的內容以 pandas.to_csv() 載回本地硬碟。
其中參數:

  • cn_page:總共要抓幾篇。
  • init_page:從第幾篇開始抓。
  • sleep_time:遇到無 response 時睡多久。
  • break_times_limit:同一網址抓取不到 Response 幾次則放棄。

這個任務就符合上述所提到的 I/O Bound 任務,主要運行的時間在等待 request 回傳內容以及 sleep 與寫出資料,所以很適合使用 multithreading 進行加速,
筆者實測,5000 個 page 原本需要 100+ 分鐘,使用多線程將至 11 分鐘左右。

爬蟲 function:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import requests
import pandas as pd
from tqdm import tqdm
import time

def get_url(cn_page: int=10,
init_page: int=0,
sleep_time:float = 0.5,
break_times_limit:int = 2,
save_dir = './data') -> pd.DataFrame:
result_list = []
untrack_page = []
current = init_page
break_times = 0
pbar = tqdm(desc=f'while loop init_page:{init_page}; cn_page: {cn_page}', total=cn_page)
while current < init_page+cn_page or break_times > 0:
if break_times >= break_times_limit:
untrack_page.append(current)
current += 1
break_times = 0
pbar.update(1)
continue
try:
url = f"https://udn.com/api/more?page={current}&id=&channelId=1&cate_id=99&type=breaknews&totalRecNo=24215"
result = requests.get(url).json()['lists']
result_list.extend(result)
break_times = 0
result_df = pd.DataFrame(result)
result_df.to_csv(f"{save_dir}/result_page_{current}.csv",
index=False,
encoding='utf_8_sig'
)
current += 1
pbar.update(1)
except:
break_times += 1
time.sleep(sleep_time)
pass

return f'get {current}+{cn_page}'

沒有使用多線程:

1
2
3
4
5
6
7
8
num_workers = 50
cn_page = 5000
step = int(cn_page/num_workers)

results = []
for init_page in range(0, cn_page, step):
get_url(**{'cn_page': step,
'init_page': init_page})

使用多線程(threading):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import threading

num_workers = 50
cn_page = 5000
step = int(cn_page/num_workers)

workers = []
for init_page in range(0, cn_page, step):
worker = threading.Thread(target=get_url,
kwargs={'cn_page': step,
'init_page': init_page})
worker.start()
workers.append(worker)

for worker in workers:
worker.join()

使用多線程(concurrent.futures):

1
2
3
4
5
6
7
8
9
10
11
import concurrent.futures

num_workers = 50
cn_page = 5000
step = int(cn_page/num_workers)

with concurrent.futures.ThreadPoolExecutor() as executor:
for init_page in range(0, cn_page, step):
executor.submit(get_url,
**{'cn_page': step,
'init_page': init_page})

此文章同步於 Medium
也歡迎大家傳送 Linkedin 連結邀請給我。Linkedin Profile

Refer

GeeksforGeeks.org, Difference Between Multithreading vs Multiprocessing in Python
Corey Schafer’s Python Threading Tutorial: Run Code Concurrently Using the Threading Module