主要参考文献
threading --- 基于线程的并行 - Python中文版 - API参考文档 (apiref.com)
Python 多线程 | 菜鸟教程 (runoob.com)
多线程类似于同时执行多个不同程序,多线程运行有如下优点:
-
使用线程可以把占据长时间的程序中的任务放到后台去处理。
-
用户界面可以更加吸引人,这样比如用户点击了一个按钮去触发某些事件的处理,可以弹出一个进度条来显示处理的进度
-
程序的运行速度可能加快
-
在一些等待的任务实现上如用户输入、文件读写和网络收发数据等,线程就比较有用了。在这种情况下我们可以释放一些珍贵的资源如内存占用等等。
线程在执行过程中与进程还是有区别的。每个独立的进程有一个程序运行的入口、顺序执行序列和程序的出口。但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
每个线程都有他自己的一组CPU寄存器,称为线程的上下文,该上下文反映了线程上次运行该线程的CPU寄存器的状态。
指令指针和堆栈指针寄存器是线程上下文中两个最重要的寄存器,线程总是在进程得到上下文中运行的,这些地址都用于标志拥有线程的进程地址空间中的内存。
-
线程可以被抢占(中断)。
-
在其他线程正在运行时,线程可以暂时搁置(也称为睡眠) -- 这就是线程的退让。
基础
当线程对象一但被创建,其活动一定会因调用线程的 start()
方法开始。这会在独立的控制线程调用 run()
方法。
一旦线程活动开始,该线程会被认为是 '存活的' 。当它的 run()
方法终结了(不管是正常的还是抛出未被处理的异常),就不是'存活的'。
其他线程可以调用一个线程的 join()
方法。这会阻塞调用该方法的线程,直到被调用 join()
方法的线程终结。
线程有名字。名字可以传递给构造函数,也可以通过 name
属性读取或者修改。
创建线程实例:
t1 = threading.Thread(target=run,args=('t1',))
t1.start()
Thread.__init__()
拥有的参数如下:
-
target 是用于
run()
方法调用的可调用对象。默认是None
,表示不需要调用任何方法。 -
name 是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其中 N 是小的十进制数。
-
args 是用于调用目标函数的参数元组。默认是
()
。 -
kwargs 是用于调用目标函数的关键字参数字典。默认是
{}
。
常用的函数:
start()
开始线程活动。
它在一个线程里最多只能被调用一次。它安排对象的 run()
方法在一个独立的控制进程中调用。如果调用这个方法的次数大于一次,会抛出 RuntimeError
。
join(timeout=None)
等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用 join()
的线程终结 -- 不管是正常终结还是抛出未处理异常 -- 或者直到发生超时,超时选项是可选的,其单位是浮点秒数。
可以这么理解,join使得该线程和使用该方法的线程合并(join)在了一起,所以会一起结束。
如果不调用join函数,该线程不会被等待执行完成,一旦主线程完成了,该线程将会被无条件地关闭回收资源。因此一般来说join函数都会被调用。
is_alive()
返回线程是否存活。
当 run()
方法刚开始直到 run()
方法刚结束,这个方法返回 True
。模块函数 enumerate()
返回包含所有存活线程的列表。
Tips
1、python的多线程任务可以嵌套,也就是说,用一个线程调用了一个函数,该函数也可以再创造出多个不同的线程来执行不同的任务。
线程锁
详解!Python 中这 5 种最常用的线程锁,你会用了吗?CSDN博客
线程安全
线程安全是多线程或多进程编程中的一个概念,在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况。
线程安全的问题最主要还是由线程切换导致的,比如一个房间(进程)中有10颗糖(资源),除此g之外还有3个小人(1个主线程、2个子线程),当小人A吃了3颗糖后被系统强制进行休息时他认为还剩下7颗糖,而当小人B工作后又吃掉了3颗糖,那么当小人A重新上岗时会认为糖还剩下7颗,但是实际上只有4颗了。
上述例子中线程A和线程B的数据不同步,这就是线程安全问题,它可能导致非常严重的意外情况发生,我们按下面这个示例来进行说明。
import threading
num = 0
def add():
global num
for i in range(10_000_000):
num += 1
def sub():
global num
for i in range(10_000_000):
num -= 1
if __name__ == "__main__":
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# 结果三次采集
# num result : 669214
# num result : -1849179
# num result : -525674
五种不同的锁
-
同步锁:lock(一次只能放行一个)
-
递归锁:rlock(一次只能放行一个)
-
条件锁:condition(一次可以放行任意个)
-
事件锁:event(一次全部放行)
-
信号量锁:semaphore(一次可以放行特定个)
Lock() 同步锁
-
同步是指在互斥的基础上(大多数情况),通过其他机制实现访问者对资源的有序访问
-
同步其实已经实现了互斥,是互斥的一种更为复杂的实现,因为它在互斥的基础上实现了有序访问的特点
-
互斥指的是某一资源同一时刻仅能有一个访问者对其进行访问,具有唯一性和排他性,但是互斥无法限制访问者对资源的访问顺序,即访问是无序的
下面是threading模块与同步锁提供的相关方法:
threading.Lock()
返回一个同步锁对象
lockObject.acquire(blocking=True, timeout=1)
上锁,当一个线程在执行被上锁代码块时,将不允许切换到其他线程运行,默认锁失效时间为1秒
lockObject.release()
解锁,当一个线程在执行未被上锁代码块时,将允许系统根据策略自行切换到其他线程中运行
lockObject.locaked()
判断该锁对象是否处于上锁状态,返回一个布尔值
同步锁一次只能放行一个线程,一个被加锁的线程在运行时不会将执行权交出去,只有当该线程被解锁时才会将执行权通过系统调度交由其他线程。
import threading
num = 0
def add():
lock.acquire()
global num
for i in range(10_000_000):
num += 1
lock.release()
def sub():
lock.acquire()
global num
for i in range(10_000_000):
num -= 1
lock.release()
if __name__ == "__main__":
lock = threading.Lock()
subThread01 = threading.Thread(target=add)
subThread02 = threading.Thread(target=sub)
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# num result : 0
对于同步锁来说,一次acquire()必须对应一次release(),不能出现连续重复使用多次acquire()后再重复使用多次release()的操作,这样会引起死锁造成程序的阻塞,完全不动了
由于threading.Lock()对象中实现了__enter__()与__exit__()方法,故我们可以使用with语句进行上下文管理形式的加锁解锁操作
def add():
with lock:
# 自动加锁
global num
for i in range(10_000_000):
num += 1
# 自动解锁
RLock() 递归锁
递归锁是同步锁的一个升级版本,在同步锁的基础上可以做到连续重复使用多次acquire()后再重复使用多次release()的操作,但是一定要注意加锁次数和解锁次数必须一致,否则也将引发死锁现象。
由于threading.RLock()对象中实现了__enter__()与__exit__()方法,故我们可以使用with语句进行上下文管理形式的加锁解锁操作。
说明省略
Condition() 条件锁
条件锁是在递归锁的基础上增加了能够暂停线程运行的功能。并且我们可以使用wait()与notify()来控制线程执行的个数。
threading.Condition()
返回一个条件锁对象
lockObject.acquire(blocking=True, timeout=1)
上锁,当一个线程在执行被上锁代码块时,将不允许切换到其他线程运行,默认锁失效时间为1秒
lockObject.release()
解锁,当一个线程在执行未被上锁代码块时,将允许系统根据策略自行切换到其他线程中运行
lockObject.wait(timeout=None)
将当前线程设置为“等待”状态,只有该线程接到“通知”或者超时时间到期之后才会继续运行,在“等待”状态下的线程将允许系统根据策略自行切换到其他线程中运行
lockObject.wait_for(predicate, timeout=None)
将当前线程设置为“等待”状态,只有该线程的predicate返回一个True或者超时时间到期之后才会继续运行,在“等待”状态下的线程将允许系统根据策略自行切换到其他线程中运行。注意:predicate参数应当传入一个可调用对象,且返回结果为bool类型。
lockObject.notify(n=1)
通知一个当前状态为“等待”的线程继续运行,也可以通过参数n通知多个
lockObject.notify_all()
通知所有当前状态为“等待”的线程继续运行
import threading
currentRunThreadNumber = 0
maxSubThreadNumber = 10
def task():
global currentRunThreadNumber
thName = threading.currentThread().name
condLock.acquire() # 上锁
print("start and wait run thread : %s" % thName)
condLock.wait() # 暂停线程运行、等待唤醒
currentRunThreadNumber += 1
print("carry on run thread : %s" % thName)
condLock.release() # 解锁
if __name__ == "__main__":
condLock = threading.Condition()
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
while currentRunThreadNumber < maxSubThreadNumber:
notifyNumber = int(
input("Please enter the number of threads that need to be notified to run:"))
condLock.acquire()
condLock.notify(notifyNumber) # 放行
condLock.release()
print("main thread run end")
# 先启动10个子线程,然后这些子线程会全部变为等待状态
# start and wait run thread : Thread-1
# start and wait run thread : Thread-2
# start and wait run thread : Thread-3
# start and wait run thread : Thread-4
# start and wait run thread : Thread-5
# start and wait run thread : Thread-6
# start and wait run thread : Thread-7
# start and wait run thread : Thread-8
# start and wait run thread : Thread-9
# start and wait run thread : Thread-10
# 批量发送通知,放行特定数量的子线程继续运行
# Please enter the number of threads that need to be notified to run:5 # 放行5个
# carry on run thread : Thread-4
# carry on run thread : Thread-3
# carry on run thread : Thread-1
# carry on run thread : Thread-2
# carry on run thread : Thread-5
# Please enter the number of threads that need to be notified to run:5 # 放行5个
# carry on run thread : Thread-8
# carry on run thread : Thread-10
# carry on run thread : Thread-6
# carry on run thread : Thread-9
# carry on run thread : Thread-7
# Please enter the number of threads that need to be notified to run:1
# main thread run end
由于threading.Condition()对象中实现了__enter__()与__exit__()方法,故我们可以使用with语句进行上下文管理形式的加锁解锁操作。
Event() 事件锁
事件锁是基于条件锁来做的,它与条件锁的区别在于一次只能放行全部,不能放行任意个数量的子线程继续运行。
我们可以将事件锁看为红绿灯,当红灯时所有子线程都暂停运行,并进入“等待”状态,当绿灯时所有子线程都恢复“运行”。
事件锁不能利用with语句来进行使用,只能按照常规方式。
Semaphore() 信号量锁
信号量锁也是根据条件锁来做的,它与条件锁和事件锁的区别如下:
-
条件锁:一次可以放行任意个处于“等待”状态的线程
-
事件锁:一次可以放行全部的处于“等待”状态的线程
-
信号量锁:通过规定,成批的放行特定个处于“上锁”状态的线程
ThreadPool
Intro
线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor
用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。
Exectuor
submit(fn, *args, **kwargs)
将 fn 函数提交给线程池。程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以Python 使用 Future 来代表。
-
args
代表传给 fn 函数的参数, -
*kwargs
代表以关键字参数的形式为 fn 函数传入参数。
map(func, *iterables, timeout=None,chunksize=1)
该函数类似于全局函数 map(func, *iterables)
,只是该函数将会启动多个线程,以异步方式立即对iterables 执行 map 处理。
shutdown(wait=True)
:关闭线程池。
Future
提供了如下方法
cancel()
:取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回False;否则,程序会取消该任务,并返回 True。
cancelled()
:返回 Future 代表的线程任务是否被成功取消。
running()
:如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
done()
:如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
result(timeout=None)
:获取该 Future 代表的线程任务最后返回的结果。如果 Future代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
exception(timeout=None)
:获取该 Future代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回None。
add_done_callback(fn)
:为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该fn 函数。
Samples
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import time
# 定义一个准备作为线程任务的函数
def action(_max):
my_sum = 0
for i in range(_max):
print(threading.current_thread().name + str(i))
my_sum += i
return my_sum
def get_result(future):
print(future.result())
# 创建一个包含两个线程的线程池
with ThreadPoolExecutor(max_workers=2) as pool:
# 向线程池中提交一个任务,50 会作为 action 函数的参数
future1 = pool.submit(action, 5)
# 向线程池中再提交一个任务
future2 = pool.submit(action, 10)
# print(future2.result())
# 为future1添加线程完成的回调函数
future1.add_done_callback(get_result)
print('--------')
上面的代码为future1添加了回调函数,会在线程任务结束时获取其返回值
主程序最后一行代码打印了条横线。由于程序并未直接调用future1,future2的result方法,所以主线程并不会被阻塞,能够立刻看到打印出的横线。换言之,如果需要调用future的result方法,则主线程会被阻塞,会等到线程池完全运行结束后再打印横线。
接下来将会看到两个新线程并发执行,当线程任务执行完成后,getresult() 函数被触发,输出线程任务的返回值。
另外,由于线程池实现了上下文管理协议 (ontext Manage Protocol) ,因此,程序可以使用with 语句来管理线程池,这样即可避免手动关闭线程池,如上面的程序所示。
[文章导入自 http://qzq-go.notion.site/1615b6de2db743318dc8b306d243c37e 访问原文获取高清图片]