同步方式2:条件变量
首先,条件变量必须要配合互斥锁使用,因为条件变量是一种多线程竞争的共享资源。
通过条件变量可以实现等待和通知的机制。
最基本的使用方式为:
cond = Condition() # 创建一个条件变量
cond.acquire() # 给条件变量上锁
cond.wait() # 等待,会阻塞下面的代码执行,当其他线程调用notify的时候才会被唤醒
do_something()
cond.notify() # 通知和唤醒其他使用了条件变量cond的线程
cond.release()
其中:
cond.acquire() 其实就是用一个互斥锁上锁,相当于 lock.acquire()
cond.wait() 会干三件事:1.会释放互斥锁,让其他使用了相同条件变量的线程可以拿到锁来运行。2.进入阻塞休眠状态,让出CPU。3.当本线程的wait()被其他线程notify唤醒后,wait()会重新拿到锁。
cond.notify() 通知其他线程,唤醒其他线程的wait()。
cond.release() 释放互斥锁
wait()和release()必须在锁内执行。如果不获取锁就直接调用wait()和release()就会报错
PS:使用
with cond:
if my_condition:
cond.wait()
do_something()
cond.notify()
和
cond.acquire()
if my_condition:
cond.wait()
do_something()
cond.notify()
cond.release()
是一样的。
使用with就相当于在代码外面包了一层锁。
例子1:控制多线程有序执行
有两个人A,B进行对话,分别说1~6这几个数字,要求A先说,A说完B才能说,B说完A才能再说
A:1
B:2
A:3
B:4
A:5
B:6
from threading import Condition,Thread
cond = Condition()
a_say = [1,3,5]
b_say = [2,4,6]
class A(Thread):
def __init__(self,cond,say):
super(A,self).__init__(name="A")
self.cond = cond
self.say = say
def run(self):
self.cond.acquire()
for i in range(len(self.say)):
print("%s say %d" % (self.name,self.say.pop(0)))
self.cond.notify() # A说完就要通知B,让B开始说
if len(self.say):
self.cond.wait() # A说完就不能在说,而是等待B说完,等B通知到A,A才能继续说
self.cond.release()
class B(Thread):
def __init__(self,cond,say):
super(B,self).__init__(name="B")
self.cond = cond
self.say = say
def run(self):
self.cond.acquire()
for i in range(len(self.say)):
self.cond.wait() # 一开始是A先说而不是B先说,所以一开始B是处于等待状态
print("%s say %d" % (self.name,self.say.pop(0)))
self.cond.notify() # B说完就要通知A,让A继续说
self.cond.release()
if __name__=="__main__":
a = A(cond,a_say)
b = B(cond,b_say)
b.start() # 必须让b线程先启动,a后启动,如果a先启动,那么a会在b没有执行wait()的情况下执行notify(),所以这个notify()通知相当于无效。之后a执行wait().b也执行wait()双方都处于等待,于是这个进程就卡住
a.start()
这里使用了一个条件变量来实现
当然,我们也可以使用多个互斥锁来实现多线程的有序执行,前面互斥锁的例子中已经展示。
例子2:用互斥锁,条件变量和列表实现一个线程安全的队列:
# coding=utf-8
from threading import Thread
from threading import Lock,Condition
import random
class ThreadSafeQueue:
def __init__(self,max_size=0,blocking=True,timeout=None): # 默认队列没有限制最大空间
self.max_size=max_size
self.blocking=blocking # 默认等待阻塞
self.timeout=timeout # 默认等待时间无限长
self.lock = Lock()
self.cond = Condition(lock=self.lock) # 这个条件变量所使用的锁是自定义的互斥锁,而不使用Condition内部定义的重入锁
self.queue = []
def size(self): # self.queue是线程共享资源,所有关于self.queue的使用都要加锁,包括查和改
self.lock.acquire()
size = len(self.queue)
self.lock.release()
return size
def batch_push(self,items):
if not isinstance(items,list):
items=list(items)
for item in items:
self.push(item)
def push(self,item):
self.cond.acquire()
while self.max_size>0 and len(self.queue)>=self.max_size:
if self.blocking:
res = self.cond.wait(timeout=self.timeout) # 如果超过timeout还没被唤醒,则返回False
if not res:
self.cond.release()
return False
else:
self.cond.release()
return False
self.queue.append(item)
self.cond.notify()
self.cond.release()
return True
def pop(self):
self.cond.acquire()
while len(self.queue)<=0:
if self.blocking:
res=self.cond.wait(timeout=self.timeout)
if not res:
self.cond.release()
return False
else:
self.cond.release()
return False
item = self.queue.pop()
self.cond.notify() # 通知生产者可以继续生产
self.cond.release()
return item
def get(self,index):
self.lock.acquire()
try:
item = self.queue[index]
except:
item=None
self.lock.release()
return item
# 生产者
def produce(q,n):
for i in range(100000):
q.push(i)
print("Thread %d push %d" % (n,i))
def consumer(q,n):
count_none = 0 # 如果q.pop()阻塞次数大于10则停止while循环
while True:
item = q.pop()
if item is False:
count_none+=1
else:
count_none=0
print("Thread %d pop %d" % (n,item))
if count_none>=10:
break
# 测试
if __name__=="__main__":
queue = ThreadSafeQueue(1000) # 测试阻塞队列,结果是,消费者消费完所有产品后阻塞等待新产品生产,一直处于等待状态
# queue = ThreadSafeQueue(1000,timeout=1) # 测试阻塞队列,结果是,消费者消费完所有产品后阻塞等待新产品生产,阻塞10次后自动跳出循环
# queue = ThreadSafeQueue(1000,blocking=False) # 测试非阻塞队列,结果是,生产者由于多次被阻塞而放弃了很多次生产产品,消费者消费完所有产品后直接结束
# 创建两个生产者线程,一个消费者线程,使得生产产品的速度比消费产品的速度快,这样消费产品不会等待,而生产产品会等待
t1 = Thread(target=produce,args=(queue,1))
t2 = Thread(target=produce,args=(queue,2))
t3 = Thread(target=consumer,args=(queue,3))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
下面说一下Condition的底层是怎么实现的:
1.实例化 Condition 的时候,Condition的__init__会生成一个 RLock 重入锁,这个锁用于保护条件变量对象的使用。我们叫这把锁为 R
2.执行wait()前必须执行cond.acquire()对条件变量上锁,上的锁就是 R;
执行wait() 的时候,wait()做了这么几件事:
2-1. wait()会创建一个互斥锁,我们把这个互斥锁叫做X,并对X调用acquire()上锁: X.acquire(),之后将锁X放到一个双向队列 Q 中。
2-2. wait()释放锁 R,这样其他线程才能获取锁R并执行一些任务代码
2-3. wait()在释放 R之后,会再对X上一次锁,X.acquire() ; 由于连续对X上两次锁,所以会发生死锁,这样wait()进入阻塞状态。
所以 条件变量的wait() 是通过死锁的方式来实现阻塞等待的功能的!!
2-4.wait()的锁X被其他线程的notify()释放后,会重新对R上锁,锁X就再也不会被使用。下一次调用wait()的时候会生成一把新的X锁
3.其他线程获取到锁R,并执行一些任务代码,之后执行notify()唤醒之前那个线程的wait()
notify()做了这么几件事:
3-1. 从队列 Q 头部弹出锁 X,释放锁X。通过释放锁X实现唤醒wait()的。之后这个锁X就永远不会被用到了
总结: Condition的实现用了两把锁:
__init__()时创建的重入锁R 和 wait()是创建的互斥锁X
R用于保护条件变量和一些共享变量的线程安全
X不用于线程安全,而是用于制造死锁达成阻塞效果
R会重复使用,X是一次性使用,每次会生成新的X
下面贴出 Condition中__init__,wait()和notify()的源码:
class Condition:
def __init__(self, lock=None):
if lock is None:
lock = RLock() # 创建一个重入锁 R。如果手动传入 lock 则使用用户传入的lock。
self._lock = lock
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock() #### 互斥锁 X ####
waiter.acquire() #### 对 X 上锁 ####
self._waiters.append(waiter) #### 将 X 添加到队列 Q ####
saved_state = self._release_save() #### 释放锁 R ####
gotit = False
try:
if timeout is None:
waiter.acquire() #### 对互斥锁 X 第二次上锁,达成死锁,实现了阻塞 ####
gotit = True
else:
if timeout > 0:
gotit = waiter.acquire(True, timeout)
else:
gotit = waiter.acquire(False)
return gotit
finally:
self._acquire_restore(saved_state) #### X被释放后,对R重新上锁 ####
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
all_waiters = self._waiters
waiters_to_notify = _deque(_islice(all_waiters, n))
if not waiters_to_notify:
return
for waiter in waiters_to_notify:
waiter.release() #### 释放锁 X ####
try:
all_waiters.remove(waiter) #### 将锁X从队列Q中弹出 ####
except ValueError:
pass