多线程爬取博客园
博客园的新闻分页地址 https://news.cnblogs.com/n/page/10/ ,多线程成批爬取新闻的标题和链接https://news.cnblogs.com/n/page/2/ ,这个url中变化的是最后的数字一直在变,它是页码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
BASE_URL = 'http://news.cnblogs.com'
NEWS_PAGE = '/n/page/'
# https://news.cnblogs.com/n/page/2/ 列表页
# https://news.cnblogs.com/n/628919/ 详情页
headers = {
    'User-agent': "Mozilla/5.0 (Windows; U; Windows NT 6.1; zh-CN) AppleWebKit/537.36 (KHTML,like Gecko)"
                  " Version / 5.0.1Safari / 537.36"
}
# 异步,队列,以后换成第三方队列
urls = Queue()
# 创建博客园的新闻urls,每页30条新闻
def starts_url(start, stop, step=1):
    for i in range(start, stop + 1, step):
        url = "{}{}{}/".format(BASE_URL, NEWS_PAGE, i)
        print(url)
        urls.put(url)  # 加入队列
    print('任务链接创建完毕')
# 爬取页面
def crawler():
    url = urls.get()  # 阻塞,拿一条
    with requests.get(url, headers=headers) as response:
        html = response.text
        # 解析
        soup = BeautifulSoup(html, 'lxml')
        # h2.news_entry > a
        # //h2[@new_entry=""]/a
        titles = soup.select('h2.news_entry > a')
        for title in titles:
            print(title.get('href'), title.text)
# starts_url(1, 1)
# crawler()
# 线程池
executor = ThreadPoolExecutor(10)
executor.submit(starts_url, 1, 1)
for i in range(5):
    executor.submit(crawler)
解析内容是一个比较耗时的过程,不适合放在crawler中同步处理。同样使用队列解耦
现在线程都是拿一条数据,执行完就结束了。修改为可以不停的从队列中取数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from threading import Event
BASE_URL = 'http://news.cnblogs.com'
NEWS_PAGE = '/n/page/'
# https://news.cnblogs.com/n/page/2/ 列表页
# https://news.cnblogs.com/n/628919/ 详情页
headers = {
    'User-agent': "Mozilla/5.0 (Windows; U; Windows NT 6.1; zh-CN) AppleWebKit/537.36 (KHTML,like Gecko)"
                  " Version / 5.0.1Safari / 537.36"
}
# 异步,队列,以后换成第三方队列
urls = Queue()# 待爬取队列
htmls = Queue() # 待分析队列
outputs = Queue() # 待输出队列
# 创建博客园的新闻urls,每页30条新闻
def starts_url(start, stop, step=1):
    for i in range(start, stop + 1, step):
        url = "{}{}{}/".format(BASE_URL, NEWS_PAGE, i)
        print(url)
        urls.put(url)  # 加入队列
    print('任务链接创建完毕')
# 爬取页面
def crawler(e:Event):
    while not e.is_set():
        url = urls.get()  # 阻塞,拿一条
        with requests.get(url, headers=headers) as response:
            html = response.text
            htmls.put(html)
# 解析页面
def parse(e:Event):
    # 解析
    while not e.is_set():
        html = htmls.get()
        soup = BeautifulSoup(html, 'lxml')
        # h2.news_entry > a
        # //h2[@new_entry=""]/a
        titles = soup.select('h2.news_entry > a')
        for title in titles:
            href = BASE_URL + title.get('href','')
            txt = title.text
            val = href, txt
            outputs.put(val)
            print(val)
event = Event()
# 线程池
executor = ThreadPoolExecutor(10)
executor.submit(starts_url, 1, 1)
for i in range(5):
    executor.submit(crawler, event)
for i in range(5):
    executor.submit(parse, event)
html分析函数parse,分析完成后,需要将结果持久化。不要在parse中直接持久化,放入队列中,统一持久化1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from threading import Event
BASE_URL = 'http://news.cnblogs.com'
NEWS_PAGE = '/n/page/'
# https://news.cnblogs.com/n/page/2/ 列表页
# https://news.cnblogs.com/n/628919/ 详情页
headers = {
    'User-agent': "Mozilla/5.0 (Windows; U; Windows NT 6.1; zh-CN) AppleWebKit/537.36 (KHTML,like Gecko)"
                  " Version / 5.0.1Safari / 537.36"
}
# 异步,队列,以后换成第三方队列
urls = Queue()# 待爬取队列
htmls = Queue() # 待分析队列
outputs = Queue() # 待输出队列
# 创建博客园的新闻urls,每页30条新闻
def starts_url(start, stop, step=1):
    for i in range(start, stop + 1, step):
        url = "{}{}{}/".format(BASE_URL, NEWS_PAGE, i)
        print(url)
        urls.put(url)  # 加入队列
    print('任务链接创建完毕')
# 爬取页面
def crawler(e:Event):
    while not e.is_set():
        url = urls.get()  # 阻塞,拿一条
        with requests.get(url, headers=headers) as response:
            html = response.text
            htmls.put(html)
# 解析页面
def parse(e:Event):
    # 解析
    while not e.is_set():
        html = htmls.get()
        soup = BeautifulSoup(html, 'lxml')
        # h2.news_entry > a
        # //h2[@new_entry=""]/a
        titles = soup.select('h2.news_entry > a')
        for title in titles:
            href = BASE_URL + title.get('href','')
            txt = title.text
            val = href, txt
            outputs.put(val)
            # print(val)
# 持久化
def persist(path, e: Event):
    with open(path, 'a+', encoding='utf-8') as f:
        while not e.is_set():
            val = outputs.get()
            print(val)
            f.write("{}\x01{}\n".format(val[0], val[1]))
            f.flush()
event = Event()
# 线程池
executor = ThreadPoolExecutor(10)
executor.submit(starts_url, 1, 1)
executor.submit(persist, 'd:/news.txt', event)
for i in range(5):
    executor.submit(crawler, event)
for i in range(4):
    executor.submit(parse, event)
这样一个实用的并行的爬虫就基本完成了,一般提取新的URL源源不断地注入到待爬取队列,就可以实现不间断的爬取了
可以很方便的扩展成多进程等版本
进阶(消息队列)
将队列换成第三方服务,本次采用较为常用RabbitMQ
选型
1、队列工作模式选择
- 以爬虫程序的htmls队列为例,这个队列有多个生产者(爬取函数)写入,有多个消费者(解析函数)读取。每一个消息只能被消费一次。所以,采用RabbitMQ的工作队列模式
 
RabbitMQ生产者、消费者两端都可以创建交换机、队列
2、队列中如何如何分发
- 工作队列模式,说到底就是路由模式。RabbitMQ的队列和工作队列模式,交换机都工作在direct,其实都是路由模式,只不过使用了缺省交换机
 
我们自己使用,可以单独创建交换机,不使用缺省交换机
3、队列是否断开删除
不能。如果每一条数据都要处理,不能因为某一端断开,然后队列就删除了,造成数据丢失
消息队列类
1  | # 新建messagequeue.py  | 
重构爬虫代码
1  | import requests  | 
爬取、解析、存储、url生成都可以完全独立,分别部署