Python中的多进程和多线程
线程和进程
线程必须依赖于进程存在,是进程中的一个执行路径。进程是线程的容器,一个进程内至少有一个线程。
线程
线程是CPU调度的基本单位。一个进程内也需要并行执行多个程序,实现不同的功能(递归无穷无尽)。同一个进程下的线程共享所在进程的地址空间和其他资源。但线程有自己的栈和栈指针、程序计数器等寄存器。线程是一种轻量级进程(LWP),与进程相比,线程给操作系统带来的创建、维护和管理的负担要轻,意味着线程的代价或开销比较小。
进程
进程是程序在某个数据集合上的一次运行活动,是系统资源分配的单位;有自己独立的地址空间,有利于资源的管理和保护。进程创建、切换开销大,相对性能低
IO or CPU?
实际中一个任务都是IO和CPU的结合,只是会有偏重。可以使用python多线程和多进程加速的程度来判断该任务的偏向。
IO bound和CPU bound
- I/O代表Input和Output,主要瓶颈在数据读取、交换,与硬盘,网络传输速率等有关。如下载文件,读取文件的等。
-
计算型任务,CPU速度越快,计算越快,如数学计算,图像处理,算法迭代等。
GIL:Global interpreter Lock
CPython解释器(与python无关,Jython就没有限制)本身就不是线程安全的,因此有全局解释器锁(GIL,同时GIL也简化了CPython解释器和C语言扩展的实现),一次只允许使用一个线程执行Python字节码。标准库中所有阻塞型I/O函数(包括sleep函数),在等待操作系统返回结果时都会释放GIL,这意味着一个Python线程等待网络响应时,阻塞型I/O函数会释放GIL,再运行一个线程。因此,尽管有GIL,Python线程还是能在I/O密集型应用中发挥作用。python中多进程才是真正的并行计算(品一品python中的包取名Threading和Multiprocessing),如果需要做CPU密集型(CPU密集型工作可以试试PyPy)处理,使用多进程才能利用所有可用的CPU核心。
Python中相关的包(相关代码笔记来自参考资料[5][6])
针对线程,python3废弃了原来的thread模块,换成了高级的threading模块。针对进程,python3对应的包是multiprocessing。concurrent.futures是使用线程和进程的最新方式,对应使用的函数分别是futures.ThreadPoolExecutor,futures.ProcessPoolExecutor。如果使用futures对某个作业不够灵活或者使用场景较复杂,可能就要使用threading或multiprocessing自行制定方案。
线程
顺序执行
import time
start = time.perf_counter()
def do_something():
print('Sleeping 1 second...')
time.sleep(1)
print('Done Sleeping...')
for _ in range(5):
do_something()
finish = time.perf_counter()
print(f'Finished in {round(finish-start, 2)} second(s)')
threading:多线程执行
import time
import threading
start = time.perf_counter()
def do_something(seconds):
print(f'Sleeping {seconds} second...')
time.sleep(seconds)
print('Done Sleeping...')
threads = []
for _ in range(5):
t = threading.Thread(target=do_something, args=[1.5])
t.start() # to start the threads
threads.append(t)
for thread in threads:
thread.join() # make sure that they completed before moving on
finish = time.perf_counter()
print(f'Finished in {round(finish-start, 2)} second(s)')
concurrent.futures 简化操作
-version 1
import time
from concurrent import futures
start = time.perf_counter()
def do_something(seconds):
print(f'Sleeping {seconds} second...')
time.sleep(seconds)
return f'Done Sleeping...{seconds}'
with futures.ThreadPoolExecutor() as executor:
secs = [5, 4, 3, 2, 1]
results = [executor.submit(do_something, sec) for sec in secs]
for f in futures.as_completed(results): # complete first, no wait
print(f.result())
finish = time.perf_counter()
print(f'Finished in {round(finish - start, 2)} second(s)')
-version 2
import time
from concurrent import futures
start = time.perf_counter()
def do_something(seconds):
print(f'Sleeping {seconds} second...')
time.sleep(seconds)
return f'Done Sleeping...{seconds}'
with futures.ThreadPoolExecutor() as executor:
secs = [5, 4, 3, 2, 1]
results = executor.map(do_something, secs) # the order make sense!!
for result in results:
print(result)
finish = time.perf_counter()
print(f'Finished in {round(finish - start, 2)} second(s)')
进程
使用多核CPU时,多进程并行才能真正体现机器的能力
import time
import multiprocessing
def do_something(seconds):
print(f'Sleeping {seconds} second...')
time.sleep(seconds)
return f'Done Sleeping...{seconds}'
if __name__ == '__main__':
start = time.perf_counter()
processes = []
for _ in range(8):
p = multiprocessing.Process(target=do_something, args=[1.5])
p.start()
processes.append(p)
for process in processes:
process.join()
finish = time.perf_counter()
print(f'Finished in {round(finish - start, 2)} second(s)')
使用concurrent.futures进行多进程时,只需要将futures.ThreadPoolExecutor替换为futures.ProcessPoolExecutor即可
实际案例
获取吉卜力工作室公开的壁纸
ghibli_multithread.py
"""Download pictures in ghibli.com
Sample run::
$ python3 ghibli_multithread.py
"""
import os
import time
import sys
import requests
import re
from concurrent import futures
MAX_WORKERS = 20
BASE_URL = 'http://www.ghibli.jp/info/013251/'
DEST_DIR = 'downloads/'
def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)
def show(text):
print(text, end=' ')
sys.stdout.flush()
def download_one(cc): # <3>
image = requests.get(cc[0]).content
show(cc[1])
save_flag(image, cc[1] + '.jpg')
return cc
def download_many(cc_list):
workers = min(MAX_WORKERS, len(cc_list))
with futures.ThreadPoolExecutor(workers) as executor:
res = executor.map(download_one, cc_list)
return len(list(res))
def get_data(base_url):
resp = requests.get(base_url)
resp_html = resp.content.decode()
urls_origin = re.findall(r'\<a href.*?\"panelarea\"\>', resp_html)
urls = [re.findall(r'\".*?\"', url)[:2] for url in urls_origin]
pic_urls = [(x[0][1:-1], x[1][1:-1]) for x in urls]
return pic_urls
def main():
pic_urls = get_data(BASE_URL)
t0 = time.time()
count = download_many(pic_urls)
elapsed = time.time() - t0
msg = '\n{} pictures downloaded in {:.2f}s'
print(msg.format(count, elapsed))
if __name__ == '__main__':
main()
ghibli.py
import sys
import os
import time
import re
import requests
from ghibli_multithread import save_flag, show, download_one, get_data
BASE_URL = 'http://www.ghibli.jp/info/013251/'
DEST_DIR = 'downloads/'
def main():
pic_urls = get_data(BASE_URL)
t0 = time.time()
for pic_url in pic_urls:
download_one(pic_url)
elapsed = time.time() - t0
msg = '\n{} pictures downloaded in {:.2f}s'
print(msg.format(len(pic_urls), elapsed))
if __name__ == '__main__':
main()
硬件环境:3.7 GHz 6-Core Intel Core i5 9代
顺序执行:26 pictures downloaded in 54.47s
多进程:26 pictures downloaded in 6.89s
只修改ThreadPoolExecutor为ProcessPoolExecutor后:26 pictures downloaded in 8.47s
Comments | NOTHING