Python多线程和多进程(四) 线程同步之信号量-张柏沛IT博客

正文内容

Python多线程和多进程(四) 线程同步之信号量

栏目:Python 发布时间:2020-04-11 13:37 浏览量:239

同步方式3:信号量 semaphore


信号量是用于控制并发线程数量的锁。

还是以爬虫为例子。你可能有这么个需求:列表页爬到很多的详情页,我想对每个详情页开一个线程来爬。
但是如果1秒能够爬到30个详情页url,所以0.5秒内能够获取15个详情页url,假设每个详情页url要花0.5秒爬完,那么进程要维持平均15个线程来爬取详情页,才能保持生产者和消费者的速度一直。

如果1秒能爬100个详情页url,进程就要维持平均50个线程来爬详情页内容。

所以,此时开多少个线程取决于1秒能爬几个详情页url,而不是由开发者决定的。

但是我们知道,线程数量不是越多越好,线程数多了,CPU切换线程损耗的时间就多了。

所以,我们希望能够自己控制线程的数量。

例如,某个线程1秒能爬100个详情页url也好1000个url也好,但我希望能维持10个线程来根据详情页url进行爬取,一个线程爬取一个详情页内容。一个线程在爬取完之后,这个线程就关闭(线程执行完会自己关闭,无需手动关闭),并开启新线程,但始终保持有10个线程在工作。

from threading import Semaphore,Thread
from time import sleep
from random import uniform
class GetDetailContent(Thread):
    def __init__(self,sem,detail_url):
        super(GetDetailContent,self).__init__()
        self.sem = sem
        self.url = detail_url

    def run(self):
        sleep(uniform(0,1))      # 用sleep模拟爬取,为了展现线程是结束一个就生成一个而不是10个10个生成的,这里设定爬取每个页面的时间是随机的
        print("%s 成功爬取页面 %s" % (self.name,self.url))

        # 爬取完成后,释放信号量,没释放1次,计数器就会-1;如果计数器从满的状态-1,就会唤醒acquire()
        self.sem.release()

class GetDetailUrl:
    def __init__(self,thread_num=10):
        self.sem = Semaphore(thread_num)    # 定义一个信号量对象,允许并发的线程数为10个


    def do_task(self):
        for page in range(10):     # 假设有10页列表页
            for id in range(100):   # 每页有100个url
                self.sem.acquire()      # 信号量执行一次acquire就会在self.sem的内部计数器中加1,当计数器达到允许并发的线程数时就会进入等待状态
                url = "http://www.zbpblog.com/blog-%d.html" % id
                t = GetDetailContent(self.sem,url)      # 对每个详情页url创建一个线程来爬取
                t.start()

            sleep(1)    # 1秒爬取1个列表页


if __name__=="__main__":
    crawler = GetDetailUrl()
    crawler.do_task()
    
    


结果是:针对一个url会生成一个线程来爬。线程个数维持在10个不变。

 

下面贴出 信号量的源码 

class Semaphore:

    def __init__(self, value=1):
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._cond = Condition(Lock())
        self._value = value

    def acquire(self, blocking=True, timeout=None):
        if not blocking and timeout is not None:
            raise ValueError("can't specify timeout for non-blocking acquire")
        rc = False
        endtime = None
        with self._cond:
            while self._value == 0:
                if not blocking:
                    break
                if timeout is not None:
                    if endtime is None:
                        endtime = _time() + timeout
                    else:
                        timeout = endtime - _time()
                        if timeout <= 0:
                            break
                self._cond.wait(timeout)
            else:
                self._value -= 1
                rc = True
        return rc


    def release(self):
        with self._cond:
            self._value += 1
            self._cond.notify()

            
            
信号量是用 条件变量+计数器实现的。

__init__()的_value记录了可继续开启线程的个数
每执行一次acquire(),计数器_value会-1。但_value为0时,acquire()会调用条件变量的wait进入休眠

当执行release()的时候,计数器_value会+1,并且notify唤醒wait()使得可以继续开启新线程。


semaphore 信号量不仅可以控制线程数量,还可以控制如mysql连接,网络连接这样的连接数。
 

如果您需要转载,可以点击下方按钮可以进行复制粘贴;本站博客文章为原创,请转载时注明以下信息

张柏沛IT技术博客 > Python多线程和多进程(四) 线程同步之信号量

热门推荐
推荐新闻