Python多线程和多进程(三)  线程同步之条件变量-张柏沛IT博客

正文内容

Python多线程和多进程(三) 线程同步之条件变量

栏目:Python 发布时间:2020-04-11 13:19 浏览量:358


同步方式2:条件变量

首先,条件变量必须要配合互斥锁使用,因为条件变量是一种多线程竞争的共享资源。
通过条件变量可以实现等待和通知的机制。

最基本的使用方式为:

cond = Condition()  # 创建一个条件变量
cond.acquire()      # 给条件变量上锁

cond.wait()         # 等待,会阻塞下面的代码执行,当其他线程调用notify的时候才会被唤醒
do_something()      
cond.notify()       # 通知和唤醒其他使用了条件变量cond的线程
cond.release()

 

其中:
cond.acquire()  其实就是用一个互斥锁上锁,相当于 lock.acquire()
cond.wait()     会干三件事:1.会释放互斥锁,让其他使用了相同条件变量的线程可以拿到锁来运行。2.进入阻塞休眠状态,让出CPU。3.当本线程的wait()被其他线程notify唤醒后,wait()会重新拿到锁。
cond.notify()    通知其他线程,唤醒其他线程的wait()。
cond.release()  释放互斥锁

wait()和release()必须在锁内执行。如果不获取锁就直接调用wait()和release()就会报错


PS:使用 
with cond:
    if my_condition:
        cond.wait()
    do_something()
    cond.notify()
    
和 
cond.acquire()
if my_condition:
    cond.wait()
do_something()
cond.notify()
cond.release()

是一样的。

使用with就相当于在代码外面包了一层锁。


例子1:控制多线程有序执行
有两个人A,B进行对话,分别说1~6这几个数字,要求A先说,A说完B才能说,B说完A才能再说
A:1
B:2
A:3
B:4
A:5
B:6

from threading import Condition,Thread

cond = Condition()
a_say = [1,3,5]
b_say = [2,4,6]

class A(Thread):
    def __init__(self,cond,say):
        super(A,self).__init__(name="A")
        self.cond = cond
        self.say = say

    def run(self):
        self.cond.acquire()
        for i in range(len(self.say)):
            print("%s say %d" % (self.name,self.say.pop(0)))
            self.cond.notify()  # A说完就要通知B,让B开始说

            if len(self.say):
                self.cond.wait()    # A说完就不能在说,而是等待B说完,等B通知到A,A才能继续说

        self.cond.release()


class B(Thread):
    def __init__(self,cond,say):
        super(B,self).__init__(name="B")
        self.cond = cond
        self.say = say

    def run(self):
        self.cond.acquire()
        for i in range(len(self.say)):
            self.cond.wait()    # 一开始是A先说而不是B先说,所以一开始B是处于等待状态
            print("%s say %d" % (self.name,self.say.pop(0)))
            self.cond.notify()  # B说完就要通知A,让A继续说

        self.cond.release()

if __name__=="__main__":
    a = A(cond,a_say)
    b = B(cond,b_say)

    b.start()       # 必须让b线程先启动,a后启动,如果a先启动,那么a会在b没有执行wait()的情况下执行notify(),所以这个notify()通知相当于无效。之后a执行wait().b也执行wait()双方都处于等待,于是这个进程就卡住
    a.start()


    
    

这里使用了一个条件变量来实现


当然,我们也可以使用多个互斥锁来实现多线程的有序执行,前面互斥锁的例子中已经展示。

 


例子2:用互斥锁,条件变量和列表实现一个线程安全的队列:
 

# coding=utf-8

from threading import Thread
from threading import Lock,Condition
import random

class ThreadSafeQueue:
    def __init__(self,max_size=0,blocking=True,timeout=None):  # 默认队列没有限制最大空间
        self.max_size=max_size
        self.blocking=blocking  # 默认等待阻塞
        self.timeout=timeout    # 默认等待时间无限长
        self.lock = Lock()
        self.cond = Condition(lock=self.lock)  # 这个条件变量所使用的锁是自定义的互斥锁,而不使用Condition内部定义的重入锁
        self.queue = []

    def size(self):     # self.queue是线程共享资源,所有关于self.queue的使用都要加锁,包括查和改
        self.lock.acquire()
        size = len(self.queue)
        self.lock.release()

        return size

    def batch_push(self,items):
        if not isinstance(items,list):
            items=list(items)
        for item in items:
            self.push(item)

    def push(self,item):
        self.cond.acquire()
        while self.max_size>0 and len(self.queue)>=self.max_size:
            if self.blocking:
                res = self.cond.wait(timeout=self.timeout)    # 如果超过timeout还没被唤醒,则返回False
                
                if not res:
                    self.cond.release()
                    return False
            else:
                self.cond.release()
                return False

        self.queue.append(item)
        self.cond.notify()
        self.cond.release()


        return True

    def pop(self):
        self.cond.acquire()
        while len(self.queue)<=0:
            if self.blocking:
                res=self.cond.wait(timeout=self.timeout)
                
                if not res:
                    self.cond.release()
                    return False
            else:
                self.cond.release()
                return False

        item = self.queue.pop()
        self.cond.notify()          # 通知生产者可以继续生产
        self.cond.release()

        return item

    def get(self,index):
        self.lock.acquire()
        try:
            item = self.queue[index]
        except:
            item=None
        self.lock.release()

        return item

# 生产者
def produce(q,n):
    for i in range(100000):
        q.push(i)
        print("Thread %d push %d" % (n,i))

def consumer(q,n):
    count_none = 0  # 如果q.pop()阻塞次数大于10则停止while循环
    while True:
        item = q.pop()
        if item is False:
            count_none+=1
        else:
            count_none=0
            print("Thread %d pop %d" % (n,item))

        if count_none>=10:
            break


# 测试
if __name__=="__main__":
    queue = ThreadSafeQueue(1000)       # 测试阻塞队列,结果是,消费者消费完所有产品后阻塞等待新产品生产,一直处于等待状态
    # queue = ThreadSafeQueue(1000,timeout=1)       # 测试阻塞队列,结果是,消费者消费完所有产品后阻塞等待新产品生产,阻塞10次后自动跳出循环
    # queue = ThreadSafeQueue(1000,blocking=False)    # 测试非阻塞队列,结果是,生产者由于多次被阻塞而放弃了很多次生产产品,消费者消费完所有产品后直接结束

    # 创建两个生产者线程,一个消费者线程,使得生产产品的速度比消费产品的速度快,这样消费产品不会等待,而生产产品会等待
    t1 = Thread(target=produce,args=(queue,1))
    t2 = Thread(target=produce,args=(queue,2))
    t3 = Thread(target=consumer,args=(queue,3))
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()
    
 

  
    
    


下面说一下Condition的底层是怎么实现的:
1.实例化 Condition 的时候,Condition的__init__会生成一个 RLock 重入锁,这个锁用于保护条件变量对象的使用。我们叫这把锁为 R

2.执行wait()前必须执行cond.acquire()对条件变量上锁,上的锁就是 R;
  执行wait() 的时候,wait()做了这么几件事:
  2-1. wait()会创建一个互斥锁,我们把这个互斥锁叫做X,并对X调用acquire()上锁: X.acquire(),之后将锁X放到一个双向队列 Q 中。
  2-2. wait()释放锁 R,这样其他线程才能获取锁R并执行一些任务代码
  2-3. wait()在释放 R之后,会再对X上一次锁,X.acquire() ; 由于连续对X上两次锁,所以会发生死锁,这样wait()进入阻塞状态。
  所以 条件变量的wait() 是通过死锁的方式来实现阻塞等待的功能的!!
  
  2-4.wait()的锁X被其他线程的notify()释放后,会重新对R上锁,锁X就再也不会被使用。下一次调用wait()的时候会生成一把新的X锁
  
3.其他线程获取到锁R,并执行一些任务代码,之后执行notify()唤醒之前那个线程的wait()
  notify()做了这么几件事:
  3-1. 从队列 Q 头部弹出锁 X,释放锁X。通过释放锁X实现唤醒wait()的。之后这个锁X就永远不会被用到了
  
  
总结: Condition的实现用了两把锁:
__init__()时创建的重入锁R 和 wait()是创建的互斥锁X
R用于保护条件变量和一些共享变量的线程安全
X不用于线程安全,而是用于制造死锁达成阻塞效果

R会重复使用,X是一次性使用,每次会生成新的X


下面贴出 Condition中__init__,wait()和notify()的源码:
 

class Condition:
    

    def __init__(self, lock=None):
        if lock is None:
            lock = RLock()      # 创建一个重入锁 R。如果手动传入 lock 则使用用户传入的lock。
        self._lock = lock

        self.acquire = lock.acquire
        self.release = lock.release
        
        try:
            self._release_save = lock._release_save
        except AttributeError:
            pass
        try:
            self._acquire_restore = lock._acquire_restore
        except AttributeError:
            pass
        try:
            self._is_owned = lock._is_owned
        except AttributeError:
            pass
        self._waiters = _deque()

    

    def wait(self, timeout=None):
        if not self._is_owned():
            raise RuntimeError("cannot wait on un-acquired lock")
        waiter = _allocate_lock()       #### 互斥锁 X ####
        waiter.acquire()                #### 对 X 上锁 ####
        self._waiters.append(waiter)    #### 将 X 添加到队列 Q ####
        saved_state = self._release_save()      #### 释放锁 R ####
        gotit = False
        try:    
            if timeout is None:
                waiter.acquire()        #### 对互斥锁 X 第二次上锁,达成死锁,实现了阻塞 ####
                gotit = True
            else:
                if timeout > 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)      #### X被释放后,对R重新上锁 ####
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except ValueError:
                    pass

    
    def notify(self, n=1):
        if not self._is_owned():
            raise RuntimeError("cannot notify on un-acquired lock")
        all_waiters = self._waiters
        waiters_to_notify = _deque(_islice(all_waiters, n))
        if not waiters_to_notify:
            return
        for waiter in waiters_to_notify:
            waiter.release()        #### 释放锁 X ####
            try:
                all_waiters.remove(waiter)      #### 将锁X从队列Q中弹出 ####
            except ValueError:
                pass

    


    

如果您需要转载,可以点击下方按钮可以进行复制粘贴;本站博客文章为原创,请转载时注明以下信息

张柏沛IT技术博客 > Python多线程和多进程(三) 线程同步之条件变量

热门推荐
推荐新闻