更多优质内容
请关注公众号

Python 多线程+多进程+任务队列爬取文章-张柏沛IT博客

正文内容

Python 多线程+多进程+任务队列爬取文章

栏目:Python 系列: 发布时间:2020-03-09 00:18 浏览量:4605

上一章介绍了python的多线程和多进程的简单知识,这一章通过python多进程和多线程写一个爬取投诉文章的爬虫。

 

爬取内容如下:

投诉详情页的多个字段

每个详情页对应的投诉平台

每个详情页对应的图片

平台详情的多个字段

平台的缩略图

 

爬取方式:

采用多进程+多线程+redis任务队列的方式

开启5个子进程分别负责5个任务:爬取投诉列表页 ,爬取投诉详情页,爬取投诉平台,爬取投诉和平台的图片,数据入库

5个子进程并行执行

每一个子进程开启10个线程并发执行(由于只是作为测试python多进程和多线程的程序,而不是有目的的恶意爬取,所以只开了10个线程)

通过redis的list结构的任务队列作为各个子进程之间的通信工具

 

使用到的redis数据结构:

 

爬取流程:

 

数据表设计:

# 投诉表 consult
+-------------+------------------+------+-----+---------+----------------+
| Field       | Type             | Null | Key | Default | Extra          |
+-------------+------------------+------+-----+---------+----------------+
| id          | int(10) unsigned | NO   | PRI | NULL    | auto_increment |
| user_id     | int(10) unsigned | YES  | MUL | NULL    |                |
| name        | varchar(30)      | YES  |     | NULL    |                |
| thumb       | varchar(255)     | YES  |     | NULL    |                |
| phone       | varchar(20)      | YES  |     | NULL    |                |
| email       | varchar(100)     | YES  |     | NULL    |                |
| qq          | varchar(20)      | YES  |     | NULL    |                |
| cp_type     | varchar(255)     | YES  |     | NULL    |                |
| appeal      | varchar(255)     | YES  |     | NULL    |                |
| platform    | varchar(255)     | YES  | MUL | NULL    |                |
| title       | varchar(255)     | YES  |     | NULL    |                |
| content     | mediumtext       | YES  |     | NULL    |                |
| create_time | int(11)          | YES  |     | NULL    |                |
| advice      | text             | YES  |     | NULL    |                |
| status      | tinyint(4)       | YES  |     | 0       |                |
| platform_id | int(11)          | YES  |     | 0       |                |
| view_time   | int(11)          | YES  |     | 0       |                |
| order       | int(11)          | YES  |     | 0       |                |
| is_crawl    | tinyint(4)       | YES  |     | 0       |                |
+-------------+------------------+------+-----+---------+----------------+


# 平台表 platform
+------------+------------------+------+-----+---------+----------------+
| Field      | Type             | Null | Key | Default | Extra          |
+------------+------------------+------+-----+---------+----------------+
| id         | int(10) unsigned | NO   | PRI | NULL    | auto_increment |
| name       | varchar(255)     | YES  | UNI | NULL    |                |
| is_partner | tinyint(4)       | YES  |     | 0       |                |
| thumb      | varchar(255)     | YES  |     |         |                |
| mark       | tinyint(4)       | YES  |     | 0       |                |
+------------+------------------+------+-----+---------+----------------+


# 投诉关联的图片表  picture
+------------+------------------+------+-----+---------+----------------+
| Field      | Type             | Null | Key | Default | Extra          |
+------------+------------------+------+-----+---------+----------------+
| id         | int(10) unsigned | NO   | PRI | NULL    | auto_increment |
| name       | varchar(60)      | YES  | UNI | NULL    |                |
| compressed | tinyint(4)       | YES  |     | 0       |                |
| consult_id | int(11)          | YES  | MUL | NULL    |                |
| part       | tinyint(4)       | YES  |     | 0       |                |
+------------+------------------+------+-----+---------+----------------+


# 记录爬取过的投诉详情连接  crawled_url
+------------+------------------+------+-----+---------+----------------+
| Field      | Type             | Null | Key | Default | Extra          |
+------------+------------------+------+-----+---------+----------------+
| id         | int(10) unsigned | NO   | PRI | NULL    | auto_increment |
| consult_id | int(10) unsigned | YES  | UNI | NULL    |                |
| url        | varchar(255)     | YES  | UNI | NULL    |                |
+------------+------------------+------+-----+---------+----------------+

 

代码实现:

目录结构

crawl_modules目录

列表页爬取类  CrawlList.py

# coding=utf-8

import requests,redis
from fake_useragent import UserAgent
from threading import Thread
from redis_conf import redis_conf


class CrawlList(Thread):
    base_url = "https://tousu.sina.com.cn"
    list_url = "https://tousu.sina.com.cn/api/index/feed?type=1&page_size=%s&page=%s"
    ua = UserAgent()
    redis = None
    page_size = 0
    max_page = 0

    def __init__(self, now_page):
        super(CrawlList, self).__init__()
        self.now_page = now_page

    @classmethod
    def get_header(cls):
        headers = {
            # "User-Agent": cls.ua.random
        }

        return headers

    @classmethod
    def start_thread(cls, page_size=10, max_page=10):  # 创建多线程并开启并发执行多线程
        cls.page_size = page_size
        cls.max_page = max_page
        cls.redis = redis.Redis(**redis_conf)  # 连接redis

        thread_list = []  # 线程池
        for page in range(1, cls.max_page + 1):
            thread_list.append(cls(page))
            if len(thread_list) >= 10:
                for thread in thread_list:
                    thread.start()

                for thread in thread_list:
                    thread.join()
                thread_list = []

        if len(thread_list):
            for thread in thread_list:
                thread.start()

            for thread in thread_list:
                thread.join()

    def run(self):
        print("开始任务:" + str(self.now_page))
        rd = self.__class__.redis

        list_url = self.__class__.list_url % (self.page_size, self.now_page)

        # 请求
        r = requests.get(list_url, headers=self.__class__.get_header())
        res = r.json()
        detail_items = res.get("result", {}).get("data", {}).get("lists", {})


        for detail_item in detail_items:
            detail_url = detail_item.get("main", {}).get("url", {})
            if detail_url.find("http") == -1:
                detail_url = "https:" + detail_url
            else:
                detail_url = self.base_url + detail_url

            # 将detail_url放入redis的集合和列表,其中集合是为了去重,列表是为了做消息队列
            if not rd.sismember("crawled_links:consult", detail_url):
                print(detail_url)
                rd.lpush("detail_links", detail_url)
                rd.sadd("crawled_links:consult", detail_url)        # 只要将detail_url放入任务队列中,就算是爬取到详情页了;如果爬取失败再将detail_url从去重集合中删除也不迟

if __name__ == "__main__":
    CrawlList.start_thread(page_size=10, max_page=10)

 

 

详情页爬取类  CrawlDetail.py  

# coding=utf-8

import requests, bs4, time
import redis
from fake_useragent import UserAgent
from threading import Thread
from redis_conf import redis_conf

class CrawlDetail(Thread):
    base_url = "https://tousu.sina.com.cn"

    ua = UserAgent()
    redis = None

    def __init__(self, detail_link):
        super(CrawlDetail, self).__init__()
        self.url = detail_link

        # 建立连接池
        # self.dbpool = PooledDB(pymysql,maxconnections=15,blocking=True,**mysql_conf)

    @classmethod
    def get_header(cls):
        headers = {
            # "User-Agent": cls.ua.random
        }

        return headers

    @classmethod
    def start_thread(cls):
        cls.redis = redis.Redis(**redis_conf)

        # 从消息队列中取出分批取出所有详情页
        thread_list=[]  #线程池
        while True:
            detail_link = cls.redis.brpop("detail_links",1)     # 如果1s内没有任务则返回None,并继续监听任务
            print(detail_link)

            if not detail_link:
                if len(thread_list):
                    for thread in thread_list:
                        thread.start()
                    for thread in thread_list:
                        thread.join()

                    thread_list = []
                continue

            thread_list.append(cls(detail_link[1]))
            if len(thread_list)>=10:
                for thread in thread_list:
                    thread.start()
                for thread in thread_list:
                    thread.join()

                thread_list=[]


    def run(self):
        try:
            r = requests.get(self.url,headers=self.__class__.get_header())
            soup = bs4.BeautifulSoup(r.text,"html.parser")
            fields={}
            fields['title']=soup.find("h1",class_="article").get_text()
            ts_li = soup.find("ul",class_="ts-q-list").find_all_next("li")

            ts_cont = soup.find_all("div", class_="ts-d-item")[-1].find("div", class_="ts-reply")
            fields['platform']=ts_li[1].find("a").get_text()
            fields['cp_type'] = ts_li[2].get_text()
            fields['appeal'] = ts_li[3].get_text()
            fields['content'] = ts_cont.find_all("p")[-1].get_text()
            fields['is_crawl']="1"
            fields['name']=soup.find("span",class_="u-name").get_text()
            fields['create_time'] = str(int(time.time()))
            fields['url'] = self.url

            fields["id"]=str(self.redis.incr("consult_id"))   # 生成自增的consult_id

            fields['platform_url'] = "https://" + ts_li[1].find("a")['href'].strip(r"/")
            fields['imgs'] = [ fields["id"]+"|https://" + a["href"].strip(r"/") for a in ts_cont.find_all("a", class_="example-image-link")]

            self.insert_redis(fields)

            print(self.url)
            print(fields)
        except BaseException as e:
            # 有两种选择:

            # 1. 如果有报错,重新将detail_url压入任务队列 detail_links
            # self.redis.lpush("detail_links",self.url)

            # 2. 如果有报错,将detail_url从crawled_links:consult删除
            self .redis.srem("crawled_links:consult",self.url)

            print("错误链接:"+self.url)
            print(e)

    # 将文章详情写入hash,将hash的key写入mysql入库的任务队列mysql_line;
    # 将图片写入爬虫任务队列 img_line;
    # 将平台页写入爬虫任务队列 platform_line
    def insert_redis(self,data):
        rd = self.__class__.redis

        imgs = data.pop("imgs")
        if len(imgs):
            rd.lpush("img_line",*imgs)   #将图片写入爬虫任务队列 img_line

        platform = data.pop("platform_url")
        if not rd.sismember("crawled_links:platform",platform):
            rd.lpush("platform_line",data["id"]+"|"+platform)    #将平台页写入爬虫任务队列 platform_line
            rd.sadd("crawled_links:platform",platform)     # 将平台链接写入到集合,防止重复爬取

        hash_key ="detail:"+str(data['id'])
        rd.hmset(hash_key,data)  # 将文章详情写入hash
        rd.lpush("mysql_line",hash_key)  # 详情页写入mysql入库的任务队列detail_mysql_line;


if __name__ == "__main__":
    CrawlDetail.start_thread()

 

 

平台爬取类  CrawlPlatform.py

import redis,requests
from bs4 import BeautifulSoup
from threading import Thread
from redis_conf import redis_conf
from fake_useragent import UserAgent

class CrawlPlatform(Thread):
    base_url = "https://tousu.sina.com.cn"
    ua = UserAgent()

    def __init__(self,platform_link):
        super(CrawlPlatform,self).__init__()
        platform = platform_link.split("|")
        self.consult_id = platform[0]
        self.platform_url = platform[1]

    @classmethod
    def get_header(cls):
        return {
            # "User-Agent":cls.ua.random
        }

    @classmethod
    def start_thread(cls):
        cls.redis = redis.Redis(**redis_conf)
        thread_list=[]

        # 使用阻塞队列弹出任务
        while True:
            platform_link = cls.redis.brpop("platform_line",1)  # 如果1秒内任务队列都没有任务就会返回None,但是不会结束脚本
            print(platform_link)
            if not platform_link:
                if len(thread_list):
                    for thread in thread_list:
                        thread.start()
                    for thread in thread_list:
                        thread.join()

                    thread_list=[]
                continue

            thread_list.append(cls(platform_link[1]))
            if len(thread_list)>=10:
                for thread in thread_list:
                    thread.start()
                for thread in thread_list:
                    thread.join()

                thread_list = []



    def run(self):
            if not self.redis.sismember("platform_links",self.platform_url):
                try:
                    # 爬取平台
                    fields={}
                    r = requests.get(self.platform_url,headers=self.__class__.get_header())
                    soup = BeautifulSoup(r.text,"html.parser")
                    thumb_src= "https://"+soup.find("div",class_="m-u-img").find("img")['src'].strip("/")
                    fields['name'] = soup.find("span",class_="name").get_text()
                    fields['consult_id']=self.consult_id

                    # 生成platform唯一id
                    fields['id'] = str(self.redis.incr("platform_id"))

                    # 将平台的thumb_src写入爬虫任务队列 img_line;
                    self.redis.lpush("img_line","pf|"+fields['id']+"|"+thumb_src)

                    # 将平台写入hash和mysql任务队列
                    hash_key = "platform:" + str(fields['id'])
                    self.redis.hmset(hash_key, fields)
                    self.redis.lpush("mysql_line", hash_key)

                except BaseException as e:
                    self.redis.srem("crawled_links:platform",self.platform_url)
                    print("错误链接:" + self.platform_url)
                    print(e)



if __name__=="__main__":
    CrawlPlatform.start_thread()

 

 

 

图片爬取类  CrawlImg.py

# coding=utf-8

import requests,redis,os
from threading import Thread
from dl_img import dl_img
from fake_useragent import UserAgent
from redis_conf import redis_conf

class CrawlImg(Thread):
    ua = UserAgent()

    def __init__(self,img):
        super(CrawlImg,self).__init__()
        self.img = img

    @classmethod
    def get_header(cls):
        return {
            # "User-Agent": cls.ua.random
        }

    @classmethod
    def start_thread(cls):
        # cls.dir_path = dir_path
        cls.redis = redis.Redis(**redis_conf)

        thread_list=[]
        while True:
            img_src = cls.redis.brpop("img_line",1)
            print(img_src)
            if not img_src:
                if len(thread_list):
                    for thread in thread_list:
                        thread.start()

                    for thread in thread_list:
                        thread.join()

                    thread_list=[]
                continue

            thread_list.append(cls(img_src[1]))
            if len(thread_list)>=10:
                for thread in thread_list:
                    thread.start()

                for thread in thread_list:
                    thread.join()

                thread_list = []


    def run(self):
        img_list = self.img.split("|")

        if img_list[0]=="pf":
            img_url = img_list[-1]
            platform_id = img_list[1]
            dir_path = "../uploads/platformImg"
            img_path = dl_img(img_url, dir_path)
            if img_path:
                self.redis.lpush("mysql_line", "pfimg|" + platform_id + "|" + img_path)
        else:
            img_url = img_list[-1]
            consult_id = img_list[0]
            dir_path = "../uploads/consultImg"
            img_path = dl_img(img_url, dir_path)
            if img_path:
                self.redis.lpush("mysql_line", "img|" + consult_id + "|" + img_path)


if __name__=="__main__":
    os.chdir("..")
    CrawlImg.start_thread()

 

 

图片下载模块  dl_img.py

import requests,hashlib,os
from fake_useragent import UserAgent

def dl_img(pic_url,dir_path):
    ua = UserAgent()
    headers = {"User-Agent":ua.random}
    r = requests.get(pic_url,headers=headers)
    if r.status_code==200:
        dir_path = dir_path.replace("\\","/").strip("/")
        real_dir_path = os.path.abspath(dir_path).replace("\\","/")
        m=hashlib.md5()
        m.update(pic_url.encode())
        fn = m.hexdigest()
        fn = fn[8:24]+".jpg"
        real_img_path = real_dir_path+"/"+fn
        with open(real_img_path,"wb") as f:
            f.write(r.content)

        return dir_path.strip(r".")+"/"+fn
    else:
        return False

if __name__ == "__main__":
    print(dl_img("https://s3.tousu.sina.com.cn/products/202003/2264dbaf4f8ad20614458e82b6327e61.jpg?&ssig=qESUn9ST80&KID=sina,tousu&Expires=1583323106","../uploads/platform"))

下载的图片以原图片文件名进行md5()加密并截取中间部分为命名。

 

 

数据入库   InsertDb.py

# coding=utf-8

import pymysql,redis
from DBUtils.PooledDB import PooledDB
from threading import Thread
from mysql_conf import mysql_conf
from redis_conf import redis_conf

class InsertDb(Thread):
    def __init__(self,task):
        super(InsertDb, self).__init__()
        self.task=task

    @classmethod
    def start_thread(cls):
        cls.redis=redis.Redis(**redis_conf)

        # 创建连接池
        cls.pool = PooledDB(pymysql,maxconnections=25,blocking=True,**mysql_conf)

        thread_list=[]
        while True:
            task = cls.redis.brpop("mysql_line",1)
            # task = cls.redis.brpoplpush("mysql_line","mysql_line",1)
            print(task)
            if not task:
                if len(thread_list):
                    for thread in thread_list:
                        thread.start()
                    for thread in thread_list:
                        thread.join()

                    thread_list=[]

                continue

            thread_list.append(cls(task[1]))
            if len(thread_list)>=20:
                for thread in thread_list:
                    thread.start()
                for thread in thread_list:
                    thread.join()

                thread_list=[]

    def run(self):
        # 取出连接池中的连接
        conn = self.pool.connection()
        cursor = conn.cursor()

        try:    # 这里使用了事务,但是这里的事务只能使mysql回滚,保证redis任务和mysql数据一致,如果mysql数据写入或更新失败,redis任务要保留
            if self.task.startswith("detail"):
                sqls = self.insertDetail()
            elif self.task.startswith("platform"):
                sqls = self.insertPlatform()
            elif self.task.startswith("img"):
                sqls = self.insertImg()
            elif self.task.startswith("pfimg"):
                sqls = self.insertPfImg()

            for sql in sqls:
                cursor.execute(*sql)
            conn.commit()

            # 如果插入数据库成功,则删除platform和detail的hash;插入失败不会删除
            self.redis.delete(self.task)
        except BaseException as e:
            # 如果出错则回滚,并将任务压入到失败队列中
            self.redis.lpush("mysql_fail_line",self.task)
            conn.rollback()
            print("语句")
            print(sqls)
            print(self.task)
            print(e)
        finally:
            cursor.close()
            conn.close()    # 无论插入成功还是失败,都要将连接放回连接池

    def insertDetail(self):
        data = self.redis.hgetall(self.task)
        if not data :
            return []
        url = data.pop("url")
        crawled_url = {"url":url,"consult_id":data['id']}
        sql = self.createSql(data,"consult")
        sql2 = self.createSql(crawled_url,"crawled_url")

        return [(sql,data),(sql2,crawled_url)]

    def insertPlatform(self):
        data = self.redis.hgetall(self.task)
        if not data :
            return []
        consult_id = data.pop("consult_id")
        sql1 = self.createSql(data,"platform")
        sql2 = "update consult set platform_id='%s' where id=%s" % (data['id'],consult_id)

        return [(sql1,data),(sql2,)]

    def insertImg(self):
        data_list = self.task.split("|")
        data = {"name":data_list[-1],"consult_id":data_list[1]}
        sql = self.createSql(data,"picture")
        return [(sql,data)]

    def insertPfImg(self):
        data_list = self.task.split("|")
        sql = "update platform set thumb='%s' where id=%s" % (data_list[-1],data_list[1])
        return [(sql,)]

    def createSql(self,data,table):
        data_keys = data.keys()
        field_names = ",".join(data_keys)
        field_vals = ",".join([r"%(" + k + ")s" for k in data_keys])

        sql="insert ignore into %s (%s) values (%s)" % (table,field_names,field_vals)


        return sql

if __name__=="__main__":
    InsertDb.start_thread()

 

 

redis_conf.py

redis_conf = {
    "host":"127.0.0.1",
    "port":6379,
    "decode_responses":True     # 对redis的键值自动解码,否则从redis获取的键和值是未经解码的二进制
}

 

 

 总调度程序  crawl.py

# coding=utf-8

import os,sys
sys.path.append(os.path.abspath("./crawl_modules"))


from crawl_modules.CrawlDetail import CrawlDetail
from crawl_modules.CrawlList import CrawlList
from crawl_modules.CrawlPlatform import CrawlPlatform
from crawl_modules.CrawlImg import CrawlImg
from crawl_modules.InsertDb import InsertDb
from multiprocessing import Process

if __name__ == "__main__":      # 使用多进程时,多进程的代码必须放在 __name__=="__main__"中,否则会报错
    print("我是主进程:进程ID是%s,父进程ID是%s" % (os.getpid(),os.getppid()))
    process_targets = [CrawlList,CrawlDetail,CrawlPlatform,CrawlImg,InsertDb]

    # 创建5个进程,每一个进程创建多线程,通过redis任务队列进行通信
    processes=[]
    for process_target in process_targets:
        processes.append(Process(target=process_target.start_thread))
    for process in processes:
        process.start()
    for process in processes:
        process.join()

    for process in processes:
        process.terminate()     # 当所有进程执行结束时,回收子进程占用的内存

 

最后注明:本程序只作为测试python多线程多进程的测试程序,爬取到的数据在测试结束后自动删除不做任何商业或非商业用途




更多内容请关注微信公众号
zbpblog微信公众号

如果您需要转载,可以点击下方按钮可以进行复制粘贴;本站博客文章为原创,请转载时注明以下信息

张柏沛IT技术博客 > Python 多线程+多进程+任务队列爬取文章

热门推荐
推荐新闻