同步方式3:信号量 semaphore
信号量是用于控制并发线程数量的锁。
还是以爬虫为例子。你可能有这么个需求:列表页爬到很多的详情页,我想对每个详情页开一个线程来爬。
但是如果1秒能够爬到30个详情页url,所以0.5秒内能够获取15个详情页url,假设每个详情页url要花0.5秒爬完,那么进程要维持平均15个线程来爬取详情页,才能保持生产者和消费者的速度一直。
如果1秒能爬100个详情页url,进程就要维持平均50个线程来爬详情页内容。
所以,此时开多少个线程取决于1秒能爬几个详情页url,而不是由开发者决定的。
但是我们知道,线程数量不是越多越好,线程数多了,CPU切换线程损耗的时间就多了。
所以,我们希望能够自己控制线程的数量。
例如,某个线程1秒能爬100个详情页url也好1000个url也好,但我希望能维持10个线程来根据详情页url进行爬取,一个线程爬取一个详情页内容。一个线程在爬取完之后,这个线程就关闭(线程执行完会自己关闭,无需手动关闭),并开启新线程,但始终保持有10个线程在工作。
from threading import Semaphore,Thread
from time import sleep
from random import uniform
class GetDetailContent(Thread):
def __init__(self,sem,detail_url):
super(GetDetailContent,self).__init__()
self.sem = sem
self.url = detail_url
def run(self):
sleep(uniform(0,1)) # 用sleep模拟爬取,为了展现线程是结束一个就生成一个而不是10个10个生成的,这里设定爬取每个页面的时间是随机的
print("%s 成功爬取页面 %s" % (self.name,self.url))
# 爬取完成后,释放信号量,没释放1次,计数器就会-1;如果计数器从满的状态-1,就会唤醒acquire()
self.sem.release()
class GetDetailUrl:
def __init__(self,thread_num=10):
self.sem = Semaphore(thread_num) # 定义一个信号量对象,允许并发的线程数为10个
def do_task(self):
for page in range(10): # 假设有10页列表页
for id in range(100): # 每页有100个url
self.sem.acquire() # 信号量执行一次acquire就会在self.sem的内部计数器中加1,当计数器达到允许并发的线程数时就会进入等待状态
url = "http://www.zbpblog.com/blog-%d.html" % id
t = GetDetailContent(self.sem,url) # 对每个详情页url创建一个线程来爬取
t.start()
sleep(1) # 1秒爬取1个列表页
if __name__=="__main__":
crawler = GetDetailUrl()
crawler.do_task()
结果是:针对一个url会生成一个线程来爬。线程个数维持在10个不变。
下面贴出 信号量的源码
class Semaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._value -= 1
rc = True
return rc
def release(self):
with self._cond:
self._value += 1
self._cond.notify()
信号量是用 条件变量+计数器实现的。
__init__()的_value记录了可继续开启线程的个数
每执行一次acquire(),计数器_value会-1。但_value为0时,acquire()会调用条件变量的wait进入休眠
当执行release()的时候,计数器_value会+1,并且notify唤醒wait()使得可以继续开启新线程。
semaphore 信号量不仅可以控制线程数量,还可以控制如mysql连接,网络连接这样的连接数。