更多优质内容
请关注公众号

Python多线程和多进程(一) GIL锁和使用Thread创建多线程-阿沛IT博客

正文内容

Python多线程和多进程(一) GIL锁和使用Thread创建多线程

栏目:Python 系列:Python多线程和多进程系列 发布时间:2020-04-06 23:40 浏览量:4468

Python中的GIL锁

GIL 全局解释器锁

python中一个线程对应于c语言中的一个线程。

GIL锁是“一个进程有且仅有一个的锁,该锁用于控制多线程同一时刻只能有一个线程使用CPU”

GIL使得同一时刻,只有一个线程运行在一个CPU上。这意味着一个进程中多个线程只能用到一个CPU而无法将多个线程映射到多个CPU上(即使你的电脑有多核CPU),所以这个进程中的多线程是并发的而不是并行的(即同一时刻只有一个线程在运行)。

当然,GIL的存在不意味着我们不需要进行线程间的同步,因为即使是单核计算机下线程的并发也会造成资源的竞争。

GIL锁作用的整个过程是这样的:
一个线程A想要使用CPU来执行程序就要先拿到进程中的GIL锁,拿到GIL锁的线程才能够执行,其他线程无法执行。
当A执行到时间片结束或者即使时间片没有结束但是A遇到阻塞(如IO操作,或者等待其他线程互斥锁的释放等情况),A就会释放GIL锁,让另一个线程B占有这个锁,这样线程B就可以占有CPU并运行。
通过上述方式,多个线程间轮流获取GIL锁和CPU并发的运行(可以理解为CPU是被GIL锁住的,如果线程B要使用CPU必须等线程A释放GIL锁才能使用CPU)。

因此,GIL锁的释放只需满足以下两个条件中的一个:
1.线程的时间片使用完毕(或者运行完一定行数的字节码)
2.线程遇到阻塞/等待的状态,此时即使时间片没有用完也会释放GIL锁

    
有些人会有一个误区,认为一个线程完全执行完才会释放GIL锁给其他线程执行。这样是错的,这样多线程就不是并发而是串行了。


==================================================

多线程编程

下面以爬虫作为例子,爬虫是一个很适合使用多线程完成的任务1. 通过Thread类+函数创建线程

# coding=utf-8

from threading import Thread
from time import sleep,time

def get_detail_url():
    print("开始爬取文章url")
    sleep(1)    # 通过sleep模拟爬取过程
    print("文章url爬取结束")

def get_detail_content():
    print("开始爬取文章详情")
    sleep(2)    # 通过sleep模拟爬取过程
    print("文章详情爬取结束")

if __name__ == "__main__":
    st = time()
    print("主线程负责计时")

    t1 = Thread(target=get_detail_url)
    t2 = Thread(target=get_detail_content)
    t1.start()
    t2.start()
    et = time()

    print("任务结束,耗时:%.2f" % (et-st))


结果如下:
主线程负责计时
开始爬取文章url
开始爬取文章详情
任务结束,耗时:0.00
文章url爬取结束
文章详情爬取结束

分析:上面的进程中有3个线程,主线程,t1线程和t2线程。由于3个线程间是并发执行,所以他们是同时执行的,主线程不会等待另外两个线程执行完才结束。所以,t1,t2线程没有执行完的时候,主线程就已经执行到print("任务结束,耗时:%.2f" % (et-st)),得到耗时为0.00
然后,主线程结束。

但是t1,t2线程没有执行完,所以进程没有结束,即这个脚本没有结束。进程要等最后一个线程执行完后才会结束。

因此,t2线程最后才执行完,t2执行完后,脚本才结束。所以
“文章url爬取结束”
“文章详情爬取结束”
可以得到输出

结论:主线程不会等待t1,t2线程结束才结束,但是整个进程会等待t1,t2线程结束才结束。(进程会等待进程内所有线程结束才结束)


新需求1:我希望t1,t2线程执行完才执行主线程的 print;
可以通过join方法,让主线程等待t1,t2线程的执行完才往下执行,这个过程会阻塞主线程。    

if __name__ == "__main__":
    st = time()
    print("主线程负责计时")

    t1 = Thread(target=get_detail_url)
    t2 = Thread(target=get_detail_content)
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()   # 会等待t1,t2运行完
    
    et = time()

    print("任务结束,耗时:%.2f" % (et-st))


    
结果:
主线程负责计时
开始爬取文章url
开始爬取文章详情
文章url爬取结束
文章详情爬取结束
任务结束,耗时:2.00
    
PS:多个join之间不会阻塞,也就是说t1.join()执行完后,t2.join()可以立刻执行,但是t2.join()后的主线程代码会被阻塞。
需求2:我希望主线程结束时进程也结束(脚本结束),即使t1,t2线程没有执行完。
可以通过 setDaemon(True) 将t1,t2设为守护线程来实现

if __name__ == "__main__":
    st = time()
    print("主线程负责计时")

    t1 = Thread(target=get_detail_url)
    t2 = Thread(target=get_detail_content)
    t1.setDaemon(True)      # 设为守护线程
    t2.setDaemon(True)      # 设为守护线程
    t1.start()
    t2.start()
    et = time()

    print("任务结束,耗时:%.2f" % (et-st))
    


结果:
主线程负责计时
开始爬取文章url
开始爬取文章详情
任务结束,耗时:0.00

假如 将 t1.setDaemon(True) 注释掉,结果为:
主线程负责计时
开始爬取文章url
开始爬取文章详情
任务结束,耗时:0.00
文章url爬取结束

因为 t1不是守护线程,t2是守护线程,那么进程会等待t1结束才结束,但不会等待t2结束。

结论:进程会等待不是守护线程的线程结束就结束。而守护线程即使还没执行完也会结束掉。



2. 通过继承Thread类创建线程

# coding=utf-8

from threading import Thread
from time import sleep,time

class GetDetailUrl(Thread):
    def __init__(self,name):
        super(GetDetailUrl,self).__init__()
        self.name=name

    def run(self):
        print(self.name+" 开始爬取文章url")
        sleep(1)    # 通过sleep模拟爬取过程
        print("文章url爬取结束")

class GetDetailContent(Thread):
    def __init__(self,name):
        super(GetDetailContent,self).__init__()
        self.name=name

    def run(self):
        print(self.name+"开始爬取文章详情")
        sleep(2)    # 通过sleep模拟爬取过程
        print("文章详情爬取结束")

if __name__ == "__main__":
    st = time()
    print("主线程负责计时")

    t1 = GetDetailUrl(name="get_detail_url")
    t2 = GetDetailContent(name="get_detail_content")
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    et = time()

    print("任务结束,耗时:%.2f" % (et-st))

PS:使用继承Thread的方式要重写run()方法


==================================================

线程间通信 - 共享变量和队列Queue

如果,多线程之间要完成的任务是相互独立互不干扰的话,那么线程之间是不需要进行通信的,自己干自己的事就行。

但是如果多线程之间要进行合作,那么就必须要进行线程通信。

还是以上面的爬虫为例子。

get_detail_url 用于爬取列表页中的文章url
get_detail_content 用于爬取文章详情

那么 get_detail_content 必须获取到 get_detail_url 爬到的url才能对文章页进行爬取详情。这就存在合作关系了。


合作方式1:使用共享变量
我们知道,进程是资源分配的基本单位,系统会给进程分配内存空间等资源,但是不会为线程分配内存等资源。所以一个进程内的所有线程是共享进程中内存块的数据的,这就是共享变量。

    

# coding=utf-8

from threading import Thread
from time import sleep,time

urls = []   # 任务列表,用于存放要爬取的文章详情url
is_finished = False     

def get_detail_content(urls,name):
    global is_finished

    print("%s开始爬取文章内容" % name)
    while not is_finished or len(urls): # 如果生产者没有生产完或者生产者生产完了但消费者没有消费完就从urls取出url进行爬取
        try:
            url = urls.pop()
            sleep(0.001)    # 爬取1个内容页花0.001秒
            print("%s 文章:%s 爬取结束" % (name,url))
        except:     # 这里是为了防止消费者消费太快,当生产者还在生产但urls元素为0时,pop会报错。此时应该重新判断urls中是否有元素
            continue
    print("所有文章内容爬取结束")

def get_detail_url(urls):
    global is_finished

    print("开始爬取文章列表页")
    art_id = 10000  # 假设共有10000个文章url
    page_url_num = 100
    start_id = 1
    while start_id<10000:
        end_id = start_id+page_url_num
        for id in range(start_id,end_id): # 假设有100页列表页,每一页有100个url,共10000个url
            url = "http://www.zbpblog.com/blog/%d.html" % id
            urls.append(url)
        sleep(0.01)    # 爬一个列表页花0.01秒
        start_id = end_id

    is_finished=True
    print("文章列表页爬取结束")

if __name__ == "__main__":
    st = time()
    print("主线程负责计时")

    # 创建1个生产者线程
    producer = Thread(target=get_detail_url,args=(urls,))

    consumers = []
    for i in range(3):  # 创建3个消费者线程
        name="Thread %d" % (i+1)
        consumer = Thread(target=get_detail_content,args=(urls,name))
        consumers.append(consumer)


    producer.start()
    sleep(0.1)  # 睡0.1秒是为了让生产者先生产些链接到urls中

    for consumer in consumers:
        consumer.start()

    producer.join()
    for consumer in consumers:
        consumer.join()
    et = time()

    print("任务结束,耗时:%.2f" % (et-st))


    
上面的程序中有生产者 get_detail_url, 还有消费者 get_detail_content
生产者负责往 urls 中添加元素,消费者负责从urls取出元素并进行爬取内容,通过这种方式进行合作,合作的媒介就是urls这个共享变量。

共享变量有两个 
urls :存放产品的容器
is_finished :判断生产者是否生成完毕

上面生产者的生成速度是1个消费者消费产品速度的10倍。所以如果将消费者线程从3个跳到10个,可以提高效率,从16~18秒缩减到4~6秒。


虽然共享变量可以实现线程间通信,但是这种使用共享变量通信的方式是线程不安全的。原因很简单,生产者和3个消费者这4个线程共同竞争使用urls这一个资源,很可能会导致urls被改乱造成数据不一致的问题。结果就是,消费者可能重复消费同一个产品或者有些产品没能append到urls中。

所以一般共享变量要结合锁来保证线程竞争的使用这个资源变量时是安全有序的。



合作方式2:使用Queue队列

Queue相比于普通的list结构而言,Queue是线程安全的,而list不是线程安全的。原因是Queue内部使用了锁和条件变量来进行线程同步,但是list没有用到线程同步技术。
 

# coding=utf-8

from threading import Thread
from time import sleep,time
from queue import Queue

urls = Queue(500)   # 任务队列,用于存放要爬取的文章详情url,队列最多容纳500个任务

def get_detail_content(urls,name):
    print("%s开始爬取文章内容" % name)
    while True:
        url = urls.get()    # 当队列为空时,get会阻塞线程,线程进入休眠,直到队列有元素才会被唤醒
        sleep(0.001)    # 爬取1个url花0.001秒
        print("%s 文章:%s 爬取结束" % (name,url))

    print("所有文章内容爬取结束")

def get_detail_url(urls):
    print("开始爬取文章列表页")
    art_id = 10000  # 假设共有10000个文章url
    page_url_num = 100
    start_id = 1
    while start_id<10000:
        end_id = start_id+page_url_num
        for id in range(start_id,end_id): # 假设有100页列表页,每一页有100个url,共10000个url
            url = "http://www.zbpblog.com/blog/%d.html" % id
            urls.put(url)   # 如果队列满了,put会阻塞线程,线程进入休眠,直到队列有空间了,线程才被唤醒
        sleep(0.01)    # 爬一个列表页花0.01秒
        start_id = end_id

    print("文章列表页爬取结束")

if __name__ == "__main__":
    st = time()
    print("主线程负责计时")

    # 创建1个生产者线程
    producer = Thread(target=get_detail_url,args=(urls,))

    consumers = []
    for i in range(10):  # 创建10个消费者线程
        name="Thread %d" % (i+1)
        consumer = Thread(target=get_detail_content,args=(urls,name))
        consumers.append(consumer)


    producer.start()

    for consumer in consumers:
        consumer.start()

    producer.join()
    for consumer in consumers:
        consumer.join()
    et = time()

    print("任务结束,耗时:%.2f" % (et-st))


这个例子相比于之前的使用非线程安全的共享变量而言,queue是保证了线程安全的。

但是这个例子有一个小缺陷:当所有任务消费完之后,所有消费者线程都在执行urls.get()时进入阻塞。
这就导致主线程一直等待消费者线程结束。
所以整个进程无法结束,也不能打印出任务执行的时间。


我们可以稍微改进一下:
思路如下:
1.设置进程不等待消费者线程和生产者线程执行结束而结束,所以对消费者和生产者线程使用 setDaemon(True) 设置为守护线程
2.进程需要等待任务队列中的任务被执行完才结束,并且打印出任务耗时,这里可以使用 Queue的join()方法

如下:

# coding=utf-8

from threading import Thread
from time import sleep,time
from queue import Queue

urls = Queue(500)   
#is_finished = False

def get_detail_content(urls,name):
    print("%s开始爬取文章内容" % name)
    while True:
        url = urls.get()    
        sleep(0.001)    
        urls.task_done()        # 标记这一次取出来的url任务已经执行完
        print("%s 文章:%s 爬取结束" % (name,url))

    print("所有文章内容爬取结束")

def get_detail_url(urls):
    print("开始爬取文章列表页")
    art_id = 10000  # 假设共有10000个文章url
    page_url_num = 100
    start_id = 1
    while start_id<10000:
        end_id = start_id+page_url_num
        for id in range(start_id,end_id): 
            url = "http://www.zbpblog.com/blog/%d.html" % id
            urls.put(url)   
        sleep(0.01)    
        start_id = end_id

    print("文章列表页爬取结束")

if __name__ == "__main__":
    st = time()
    print("主线程负责计时")

    producer = Thread(target=get_detail_url,args=(urls,))
    producer.setDaemon(True)    # 设置为守护线程

    consumers = []
    for i in range(10):  
        name="Thread %d" % (i+1)
        consumer = Thread(target=get_detail_content,args=(urls,name))
        consumer.setDaemon(True)    # 设置为守护线程
        consumers.append(consumer)


    producer.start()

    for consumer in consumers:
        consumer.start()

    urls.join()     # 等待urls队列的任务被执行完才往下执行
   
    et = time()

    print("任务结束,耗时:%.2f" % (et-st))

注释的地方就是做了修改的地方。

PS: Queue的join()方法必须配合task_done()方法一起使用!

Queue的join方法的唤醒条件:
1当队列中所有任务被弹出,队列中元素为0
2.每个被弹出的任务都执行了task_done()来标记这个任务已被完成

两个条件缺一不可

上面程序过程如下: 
首先,所有线程开始运行,同时主线程被urls.join()阻塞
消费者不停消费产品,每消费完一个产品都会执行task_done()标记每个任务已完成
当消费者消费完所有任务,并且执行完所有任务,urls.join()会被唤醒,同时所有消费者线程会执行到 urls.get()被阻塞。但是由于进程不等待这些守护线程,所以主线程结束了,所有线程都直接结束。


上面这种使用Queue的join()方法只适用于生产者生产速度快于消费者消费速度。假如我将page_url_num = 100改为page_url_num = 5
生产速度从原本的0.01秒100条变为0.01秒5条,这个时候10个消费者线程会马上把队列中所有任务消费完并且每个任务都用task_done标记为已完成的任务,所以urls.join()被唤醒,然后主线程结束,生产者和消费者线程也结束,但是生产者的10000个任务还没有生产完呢!

为了解决这个问题,可以在生产者线程中做一个is_finished的全局变量标记,表示生产者是否生成完毕。如果没有生产完毕,即使消费者完成了现有队列的所有任务,也会重复调用join()

# coding=utf-8

from threading import Thread
from time import sleep,time
from queue import Queue

urls = Queue(500)
is_finished = False     ####################

def get_detail_content(urls,name):
    print("%s开始爬取文章内容" % name)
    while True:
        url = urls.get()
        sleep(0.001)
        urls.task_done()        # 标记这一次取出来的url任务已经执行完
        print("%s 文章:%s 爬取结束" % (name,url))

    print("所有文章内容爬取结束")

def get_detail_url(urls):
    global is_finished      ####################

    print("开始爬取文章列表页")
    art_id = 10000  # 假设共有10000个文章url
    page_url_num = 10
    start_id = 1
    while start_id<10000:
        end_id = start_id+page_url_num
        for id in range(start_id,end_id):
            url = "http://www.zbpblog.com/blog/%d.html" % id
            urls.put(url)
        sleep(0.01)
        start_id = end_id
        
    is_finished=True        ####################

    print("文章列表页爬取结束")

if __name__ == "__main__":
    st = time()
    print("主线程负责计时")

    producer = Thread(target=get_detail_url,args=(urls,))
    producer.setDaemon(True)    # 设置为守护线程

    consumers = []
    for i in range(10):
        name="Thread %d" % (i+1)
        consumer = Thread(target=get_detail_content,args=(urls,name))
        consumer.setDaemon(True)    # 设置为守护线程
        consumers.append(consumer)


    producer.start()

    for consumer in consumers:
        consumer.start()

    while not is_finished:      ######################
        urls.join()     # 等待urls队列的任务被执行完才往下执行

    et = time()

    print("任务结束,耗时:%.2f" % (et-st))


    
标记了 #################### 的地方是做出变更的地方
下面贴出Queue中 join和task_done 的源码:

class Queue:
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0
     
    def task_done(self):
        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished

    def join(self):
        with self.all_tasks_done:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()

                
join是通过条件变量实现的。
当 self.unfinished_tasks 大于0时就会进入阻塞。而 unfinished_tasks 会在调用put() 给队列添加元素时进行+1 操作。在调用 task_done() 时进行-1操作并检测 unfinished_tasks 是否为0 ,如果为0就唤醒join中的条件变量。

代码段 小部件



更多内容请关注微信公众号
zbpblog微信公众号

如果您需要转载,可以点击下方按钮可以进行复制粘贴;本站博客文章为原创,请转载时注明以下信息

张柏沛IT技术博客 > Python多线程和多进程(一) GIL锁和使用Thread创建多线程

热门推荐
推荐新闻