Python之多进程与多线程_宗而研之的博客-CSDN博客_python 多线程
基础
线程是一个基本的CPU执行单元。它必须依托于进程存活。一个线程是一个execution context(执行上下文),即一个CPU执行时所需要的一串指令。
进程是指一个程序在给定数据集合上的一次执行过程,是系统进行资源分配和运行调用的独立单位。可以简单地理解为操作系统中正在执行的程序。也就说,每个应用程序都有一个自己的进程。
每一个进程启动时都会最先产生一个线程,即主线程。然后主线程会再创建其他的子线程。
-
线程必须在某个进程中执行。
-
一个进程可包含多个线程,其中有且只有一个主线程。
-
多线程共享同个地址空间、打开的文件以及其他资源。
-
多进程共享物理内存、磁盘、打印机以及其他资源。
进程类
Python要进行多进程操作,需要用到muiltprocessing
库,其中的Process
类跟threading
模块的Thread
类很相似。所以直接看代码熟悉多进程。
直接使用Process:
from multiprocessing import Process
def show(name):
print("Process name is " + name)
if __name__ == "__main__":
proc = Process(target=show, args=('subprocess',))
proc.start()
proc.join()
继承Process重写类:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, name):
super(MyProcess, self).__init__()
self.name = name
def run(self):
print('process name :' + str(self.name))
time.sleep(1)
if __name__ == '__main__':
for i in range(3):
p = MyProcess(i)
p.start()
for i in range(3):
p.join()
Attention!
-
和线程不同,进程并没有办法拿到返回值(并没有result方法),所以使用多进程的时候需要比较谨慎。
-
进程无法将target指定为一个闭包(嵌套在函数内的函数),所以在函数内创建子进程的时候需要谨慎!
多进程通信
进程之间不共享数据的。如果进程之间需要进行通信,则要用到Queue模块
或者Pipe模块
来实现。
Queue
Queue是多进程安全的队列,可以实现多进程之间的数据传递。它主要有两个函数put
和get
。
put() 用以插入数据到队列中,put还有两个可选参数:blocked 和timeout。如果blocked为 True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出 Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
get()可以从队列读取并且删除一个元素。同样get有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且 timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。
from multiprocessing import Process, Queue
def put(queue):
queue.put('Queue 用法')
if __name__ == '__main__':
queue = Queue()
pro = Process(target=put, args=(queue,))
pro.start()
print(queue.get())
pro.join()
Pipe
Pipe的本质是进程之间的用管道数据传递,而不是数据共享,这和socket有点像。pipe() 返回两个连接对象分别表示管道的两端,每端都有send()和recv()函数。如果两个进程试图在同一时间的同一端进行读取和写入那么,这可能会损坏管道中的数据,具体用法如下:
from multiprocessing import Process, Pipe
def show(conn):
conn.send('Pipe 用法')
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
pro = Process(target=show, args=(child_conn,))
pro.start()
print(parent_conn.recv())
pro.join()
进程池
创建多个进程,我们不用傻傻地一个个去创建。我们可以使用Pool
模块来搞定。Pool 常用的方法如下:
Sample
# coding: utf-8
import multiprocessing
import time
def func(msg):
print("msg:", msg)
time.sleep(3)
print("end")
if __name__ == "__main__":
# 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
pool = multiprocessing.Pool(processes=3)
for i in range(5):
msg = "hello %d" % (i)
# 非阻塞式,子进程不影响主进程的执行,会直接运行到 pool.join()
pool.apply_async(func, (msg,))
# 阻塞式,先执行完子进程,再执行主进程
# pool.apply(func, (msg, ))
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
# 调用join之前,先调用close函数,否则会出错。
pool.close()
# 执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
pool.join()
print("Sub-process(es) done.")
"""
打印出来:
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
msg: hello 0
msg: hello 1
msg: hello 2
end
msg: hello 3
end
msg: hello 4
end
end
end
Sub-process(es) done.
"""
-
如上,进程池Pool被创建出来后,即使实际需要创建的进程数远远大于进程池的最大上限,p.apply_async(test)代码依旧会不停的执行,并不会停下等待;相当于向进程池提交了10个请求,会被放到一个队列中;
-
当执行完p1 = Pool(5)这条代码后,5条进程已经被创建出来了,只是还没有为他们各自分配任务,也就是说,无论有多少任务,实际的进程数只有5条,计算机每次最多5条进程并行。
-
当Pool中有进程任务执行完毕后,这条进程资源会被释放,pool会按先进先出的原则取出一个新的请求给空闲的进程继续执行;
-
当Pool所有的进程任务完成后,会产生5个僵尸进程,如果主线程不结束,系统不会自动回收资源,需要调用join函数去回收。
-
join函数是主进程等待子进程结束回收系统资源的,如果没有join,主程序退出后不管子进程有没有结束都会被强制杀死;
-
创建Pool池时,如果不指定进程最大数量,默认创建的进程数为系统的内核数量
进程池的map方法
这种方法可谓是调用多进程最快捷的方法,甚至应该没有之一。
案例
import multiprocessing as mul
import numpy
def auto_mul_process(func, data_list, proc_num):
pool = mul.Pool(proc_num)
rel = pool.map(func, data_list)
pool.close()
pool.join()
return rel
def func(i):
print(i*2)
if __name__ == '__main__':
auto_mul_process(func, numpy.arange(4), 3)
"""
打印0,2,4,6
"""
但是有个缺陷是,子进程自己不能再创造进程池进行多进程任务了
import multiprocessing as mul
import numpy
def auto_mul_process(func, data_list, proc_num):
pool = mul.Pool(proc_num)
rel = pool.map(func, data_list)
pool.close()
pool.join()
return rel
def func(i):
if i < 5:
auto_mul_process(func, (i+5, i+6), 2)
else:
print(i)
if __name__ == '__main__':
auto_mul_process(func, numpy.arange(3), 3)
"""
报错,AssertionError: daemonic processes are not allowed to have children
"""
通过这个报错可以看出该进程池的map函数创造出来的进程应该是守护进程,因此触发了该报错。
[文章导入自 http://qzq-go.notion.site/d6873f9a72374bd1a673b0864ba52be9 访问原文获取高清图片]