Python中的多进程和多线程

发布于 2020-05-25  392 次阅读


Python中的多进程和多线程

线程和进程

线程必须依赖于进程存在,是进程中的一个执行路径。进程是线程的容器,一个进程内至少有一个线程。

线程

线程是CPU调度的基本单位。一个进程内也需要并行执行多个程序,实现不同的功能(递归无穷无尽)。同一个进程下的线程共享所在进程的地址空间和其他资源。但线程有自己的栈和栈指针、程序计数器等寄存器。线程是一种轻量级进程(LWP),与进程相比,线程给操作系统带来的创建、维护和管理的负担要轻,意味着线程的代价或开销比较小。

进程

进程是程序在某个数据集合上的一次运行活动,是系统资源分配的单位;有自己独立的地址空间,有利于资源的管理和保护。进程创建、切换开销大,相对性能低


IO or CPU?

实际中一个任务都是IO和CPU的结合,只是会有偏重。可以使用python多线程和多进程加速的程度来判断该任务的偏向。

IO bound和CPU bound

  • I/O代表Input和Output,主要瓶颈在数据读取、交换,与硬盘,网络传输速率等有关。如下载文件,读取文件的等。

  • 计算型任务,CPU速度越快,计算越快,如数学计算,图像处理,算法迭代等。

GIL:Global interpreter Lock

CPython解释器(与python无关,Jython就没有限制)本身就不是线程安全的,因此有全局解释器锁(GIL,同时GIL也简化了CPython解释器和C语言扩展的实现),一次只允许使用一个线程执行Python字节码。标准库中所有阻塞型I/O函数(包括sleep函数),在等待操作系统返回结果时都会释放GIL,这意味着一个Python线程等待网络响应时,阻塞型I/O函数会释放GIL,再运行一个线程。因此,尽管有GIL,Python线程还是能在I/O密集型应用中发挥作用。python中多进程才是真正的并行计算(品一品python中的包取名Threading和Multiprocessing),如果需要做CPU密集型(CPU密集型工作可以试试PyPy)处理,使用多进程才能利用所有可用的CPU核心。


Python中相关的包(相关代码笔记来自参考资料[5][6])

针对线程,python3废弃了原来的thread模块,换成了高级的threading模块。针对进程,python3对应的包是multiprocessing。concurrent.futures是使用线程和进程的最新方式,对应使用的函数分别是futures.ThreadPoolExecutor,futures.ProcessPoolExecutor。如果使用futures对某个作业不够灵活或者使用场景较复杂,可能就要使用threading或multiprocessing自行制定方案。

线程

顺序执行

import time

start = time.perf_counter()

def do_something():
    print('Sleeping 1 second...')
    time.sleep(1)
    print('Done Sleeping...')


for _ in range(5):
    do_something()
finish = time.perf_counter()
print(f'Finished in {round(finish-start, 2)} second(s)')

threading:多线程执行

import time
import threading

start = time.perf_counter()

def do_something(seconds):
    print(f'Sleeping {seconds} second...')
    time.sleep(seconds)
    print('Done Sleeping...')

threads = []
for _ in range(5):
    t = threading.Thread(target=do_something, args=[1.5])
    t.start()  # to start the threads
    threads.append(t)

for thread in threads:
    thread.join()  # make sure that they completed before moving on

finish = time.perf_counter()
print(f'Finished in {round(finish-start, 2)} second(s)')

concurrent.futures 简化操作
-version 1

import time
from concurrent import futures

start = time.perf_counter()


def do_something(seconds):
    print(f'Sleeping {seconds} second...')
    time.sleep(seconds)
    return f'Done Sleeping...{seconds}'


with futures.ThreadPoolExecutor() as executor:
    secs = [5, 4, 3, 2, 1]
    results = [executor.submit(do_something, sec) for sec in secs]
    for f in futures.as_completed(results):  # complete first, no wait
        print(f.result())

finish = time.perf_counter()
print(f'Finished in {round(finish - start, 2)} second(s)')

-version 2

import time
from concurrent import futures

start = time.perf_counter()


def do_something(seconds):
    print(f'Sleeping {seconds} second...')
    time.sleep(seconds)
    return f'Done Sleeping...{seconds}'


with futures.ThreadPoolExecutor() as executor:
    secs = [5, 4, 3, 2, 1]
    results = executor.map(do_something, secs)  # the order make sense!!
    for result in results:
        print(result)

finish = time.perf_counter()
print(f'Finished in {round(finish - start, 2)} second(s)')

进程

使用多核CPU时,多进程并行才能真正体现机器的能力

import time
import multiprocessing

def do_something(seconds):
    print(f'Sleeping {seconds} second...')
    time.sleep(seconds)
    return f'Done Sleeping...{seconds}'


if __name__ == '__main__':
    start = time.perf_counter()
    processes = []
    for _ in range(8):
        p = multiprocessing.Process(target=do_something, args=[1.5])
        p.start()
        processes.append(p)
    for process in processes:
        process.join()
    finish = time.perf_counter()
    print(f'Finished in {round(finish - start, 2)} second(s)')

使用concurrent.futures进行多进程时,只需要将futures.ThreadPoolExecutor替换为futures.ProcessPoolExecutor即可

实际案例

获取吉卜力工作室公开的壁纸
ghibli_multithread.py

"""Download pictures in ghibli.com
Sample run::
    $ python3 ghibli_multithread.py
"""
import os
import time
import sys

import requests
import re

from concurrent import futures

MAX_WORKERS = 20
BASE_URL = 'http://www.ghibli.jp/info/013251/'
DEST_DIR = 'downloads/'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def show(text):
    print(text, end=' ')
    sys.stdout.flush()


def download_one(cc):  # <3>
    image = requests.get(cc[0]).content
    show(cc[1])
    save_flag(image, cc[1] + '.jpg')
    return cc


def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one, cc_list)
    return len(list(res))


def get_data(base_url):
    resp = requests.get(base_url)
    resp_html = resp.content.decode()
    urls_origin = re.findall(r'\<a href.*?\"panelarea\"\>', resp_html)
    urls = [re.findall(r'\".*?\"', url)[:2] for url in urls_origin]
    pic_urls = [(x[0][1:-1], x[1][1:-1]) for x in urls]
    return pic_urls


def main():
    pic_urls = get_data(BASE_URL)
    t0 = time.time()
    count = download_many(pic_urls)
    elapsed = time.time() - t0
    msg = '\n{} pictures downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main()

ghibli.py

import sys
import os
import time

import re
import requests

from ghibli_multithread import save_flag, show, download_one, get_data

BASE_URL = 'http://www.ghibli.jp/info/013251/'
DEST_DIR = 'downloads/'


def main():
    pic_urls = get_data(BASE_URL)
    t0 = time.time()
    for pic_url in pic_urls:
        download_one(pic_url)
    elapsed = time.time() - t0
    msg = '\n{} pictures downloaded in {:.2f}s'
    print(msg.format(len(pic_urls), elapsed))


if __name__ == '__main__':
    main()

硬件环境:3.7 GHz 6-Core Intel Core i5 9代
顺序执行:26 pictures downloaded in 54.47s
多进程:26 pictures downloaded in 6.89s
只修改ThreadPoolExecutor为ProcessPoolExecutor后:26 pictures downloaded in 8.47s

  1. 《流畅的python》的十七章

  2. pyhton_multiprocessing的博客

  3. 阮一峰:进程和线程的一个简单解释(重点看评论)

  4. 线程和进程的区别博客

  5. Python Multiprocessing Tutorial:multiprocessing

  6. Python Multiprocessing Tutorial:threading