多线程爬取博客园
博客园的新闻分页地址 https://news.cnblogs.com/n/page/10/ ,多线程成批爬取新闻的标题和链接https://news.cnblogs.com/n/page/2/
,这个url中变化的是最后的数字一直在变,它是页码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
BASE_URL = 'http://news.cnblogs.com'
NEWS_PAGE = '/n/page/'
# https://news.cnblogs.com/n/page/2/ 列表页
# https://news.cnblogs.com/n/628919/ 详情页
headers = {
'User-agent': "Mozilla/5.0 (Windows; U; Windows NT 6.1; zh-CN) AppleWebKit/537.36 (KHTML,like Gecko)"
" Version / 5.0.1Safari / 537.36"
}
# 异步,队列,以后换成第三方队列
urls = Queue()
# 创建博客园的新闻urls,每页30条新闻
def starts_url(start, stop, step=1):
for i in range(start, stop + 1, step):
url = "{}{}{}/".format(BASE_URL, NEWS_PAGE, i)
print(url)
urls.put(url) # 加入队列
print('任务链接创建完毕')
# 爬取页面
def crawler():
url = urls.get() # 阻塞,拿一条
with requests.get(url, headers=headers) as response:
html = response.text
# 解析
soup = BeautifulSoup(html, 'lxml')
# h2.news_entry > a
# //h2[@new_entry=""]/a
titles = soup.select('h2.news_entry > a')
for title in titles:
print(title.get('href'), title.text)
# starts_url(1, 1)
# crawler()
# 线程池
executor = ThreadPoolExecutor(10)
executor.submit(starts_url, 1, 1)
for i in range(5):
executor.submit(crawler)
解析内容是一个比较耗时的过程,不适合放在crawler中同步处理。同样使用队列解耦
现在线程都是拿一条数据,执行完就结束了。修改为可以不停的从队列中取数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from threading import Event
BASE_URL = 'http://news.cnblogs.com'
NEWS_PAGE = '/n/page/'
# https://news.cnblogs.com/n/page/2/ 列表页
# https://news.cnblogs.com/n/628919/ 详情页
headers = {
'User-agent': "Mozilla/5.0 (Windows; U; Windows NT 6.1; zh-CN) AppleWebKit/537.36 (KHTML,like Gecko)"
" Version / 5.0.1Safari / 537.36"
}
# 异步,队列,以后换成第三方队列
urls = Queue()# 待爬取队列
htmls = Queue() # 待分析队列
outputs = Queue() # 待输出队列
# 创建博客园的新闻urls,每页30条新闻
def starts_url(start, stop, step=1):
for i in range(start, stop + 1, step):
url = "{}{}{}/".format(BASE_URL, NEWS_PAGE, i)
print(url)
urls.put(url) # 加入队列
print('任务链接创建完毕')
# 爬取页面
def crawler(e:Event):
while not e.is_set():
url = urls.get() # 阻塞,拿一条
with requests.get(url, headers=headers) as response:
html = response.text
htmls.put(html)
# 解析页面
def parse(e:Event):
# 解析
while not e.is_set():
html = htmls.get()
soup = BeautifulSoup(html, 'lxml')
# h2.news_entry > a
# //h2[@new_entry=""]/a
titles = soup.select('h2.news_entry > a')
for title in titles:
href = BASE_URL + title.get('href','')
txt = title.text
val = href, txt
outputs.put(val)
print(val)
event = Event()
# 线程池
executor = ThreadPoolExecutor(10)
executor.submit(starts_url, 1, 1)
for i in range(5):
executor.submit(crawler, event)
for i in range(5):
executor.submit(parse, event)
html分析函数parse,分析完成后,需要将结果持久化。不要在parse中直接持久化,放入队列中,统一持久化1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from threading import Event
BASE_URL = 'http://news.cnblogs.com'
NEWS_PAGE = '/n/page/'
# https://news.cnblogs.com/n/page/2/ 列表页
# https://news.cnblogs.com/n/628919/ 详情页
headers = {
'User-agent': "Mozilla/5.0 (Windows; U; Windows NT 6.1; zh-CN) AppleWebKit/537.36 (KHTML,like Gecko)"
" Version / 5.0.1Safari / 537.36"
}
# 异步,队列,以后换成第三方队列
urls = Queue()# 待爬取队列
htmls = Queue() # 待分析队列
outputs = Queue() # 待输出队列
# 创建博客园的新闻urls,每页30条新闻
def starts_url(start, stop, step=1):
for i in range(start, stop + 1, step):
url = "{}{}{}/".format(BASE_URL, NEWS_PAGE, i)
print(url)
urls.put(url) # 加入队列
print('任务链接创建完毕')
# 爬取页面
def crawler(e:Event):
while not e.is_set():
url = urls.get() # 阻塞,拿一条
with requests.get(url, headers=headers) as response:
html = response.text
htmls.put(html)
# 解析页面
def parse(e:Event):
# 解析
while not e.is_set():
html = htmls.get()
soup = BeautifulSoup(html, 'lxml')
# h2.news_entry > a
# //h2[@new_entry=""]/a
titles = soup.select('h2.news_entry > a')
for title in titles:
href = BASE_URL + title.get('href','')
txt = title.text
val = href, txt
outputs.put(val)
# print(val)
# 持久化
def persist(path, e: Event):
with open(path, 'a+', encoding='utf-8') as f:
while not e.is_set():
val = outputs.get()
print(val)
f.write("{}\x01{}\n".format(val[0], val[1]))
f.flush()
event = Event()
# 线程池
executor = ThreadPoolExecutor(10)
executor.submit(starts_url, 1, 1)
executor.submit(persist, 'd:/news.txt', event)
for i in range(5):
executor.submit(crawler, event)
for i in range(4):
executor.submit(parse, event)
这样一个实用的并行的爬虫就基本完成了,一般提取新的URL源源不断地注入到待爬取队列,就可以实现不间断的爬取了
可以很方便的扩展成多进程等版本
进阶(消息队列)
将队列换成第三方服务,本次采用较为常用RabbitMQ
选型
1、队列工作模式选择
- 以爬虫程序的htmls队列为例,这个队列有多个生产者(爬取函数)写入,有多个消费者(解析函数)读取。每一个消息只能被消费一次。所以,采用RabbitMQ的工作队列模式
RabbitMQ生产者、消费者两端都可以创建交换机、队列
2、队列中如何如何分发
- 工作队列模式,说到底就是路由模式。RabbitMQ的队列和工作队列模式,交换机都工作在direct,其实都是路由模式,只不过使用了缺省交换机
我们自己使用,可以单独创建交换机,不使用缺省交换机
3、队列是否断开删除
不能。如果每一条数据都要处理,不能因为某一端断开,然后队列就删除了,造成数据丢失
消息队列类
1 | # 新建messagequeue.py |
重构爬虫代码
1 | import requests |
爬取、解析、存储、url生成都可以完全独立,分别部署