分布式爬虫

多线程爬取博客园

博客园的新闻分页地址 https://news.cnblogs.com/n/page/10/ ,多线程成批爬取新闻的标题和链接
https://news.cnblogs.com/n/page/2/ ,这个url中变化的是最后的数字一直在变,它是页码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
from queue import Queue
from concurrent.futures import ThreadPoolExecutor


BASE_URL = 'http://news.cnblogs.com'
NEWS_PAGE = '/n/page/'
# https://news.cnblogs.com/n/page/2/ 列表页
# https://news.cnblogs.com/n/628919/ 详情页

headers = {
'User-agent': "Mozilla/5.0 (Windows; U; Windows NT 6.1; zh-CN) AppleWebKit/537.36 (KHTML,like Gecko)"
" Version / 5.0.1Safari / 537.36"
}

# 异步,队列,以后换成第三方队列
urls = Queue()


# 创建博客园的新闻urls,每页30条新闻
def starts_url(start, stop, step=1):
for i in range(start, stop + 1, step):
url = "{}{}{}/".format(BASE_URL, NEWS_PAGE, i)
print(url)
urls.put(url) # 加入队列
print('任务链接创建完毕')


# 爬取页面
def crawler():
url = urls.get() # 阻塞,拿一条
with requests.get(url, headers=headers) as response:
html = response.text

# 解析
soup = BeautifulSoup(html, 'lxml')
# h2.news_entry > a
# //h2[@new_entry=""]/a
titles = soup.select('h2.news_entry > a')
for title in titles:
print(title.get('href'), title.text)


# starts_url(1, 1)
# crawler()

# 线程池
executor = ThreadPoolExecutor(10)

executor.submit(starts_url, 1, 1)
for i in range(5):
executor.submit(crawler)

解析内容是一个比较耗时的过程,不适合放在crawler中同步处理。同样使用队列解耦

现在线程都是拿一条数据,执行完就结束了。修改为可以不停的从队列中取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from threading import Event


BASE_URL = 'http://news.cnblogs.com'
NEWS_PAGE = '/n/page/'
# https://news.cnblogs.com/n/page/2/ 列表页
# https://news.cnblogs.com/n/628919/ 详情页

headers = {
'User-agent': "Mozilla/5.0 (Windows; U; Windows NT 6.1; zh-CN) AppleWebKit/537.36 (KHTML,like Gecko)"
" Version / 5.0.1Safari / 537.36"
}

# 异步,队列,以后换成第三方队列
urls = Queue()# 待爬取队列
htmls = Queue() # 待分析队列
outputs = Queue() # 待输出队列



# 创建博客园的新闻urls,每页30条新闻
def starts_url(start, stop, step=1):
for i in range(start, stop + 1, step):
url = "{}{}{}/".format(BASE_URL, NEWS_PAGE, i)
print(url)
urls.put(url) # 加入队列
print('任务链接创建完毕')


# 爬取页面
def crawler(e:Event):
while not e.is_set():
url = urls.get() # 阻塞,拿一条
with requests.get(url, headers=headers) as response:
html = response.text
htmls.put(html)

# 解析页面
def parse(e:Event):
# 解析
while not e.is_set():
html = htmls.get()

soup = BeautifulSoup(html, 'lxml')
# h2.news_entry > a
# //h2[@new_entry=""]/a
titles = soup.select('h2.news_entry > a')
for title in titles:
href = BASE_URL + title.get('href','')
txt = title.text
val = href, txt
outputs.put(val)
print(val)

event = Event()

# 线程池
executor = ThreadPoolExecutor(10)

executor.submit(starts_url, 1, 1)
for i in range(5):
executor.submit(crawler, event)
for i in range(5):
executor.submit(parse, event)

html分析函数parse,分析完成后,需要将结果持久化。不要在parse中直接持久化,放入队列中,统一持久化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from threading import Event


BASE_URL = 'http://news.cnblogs.com'
NEWS_PAGE = '/n/page/'
# https://news.cnblogs.com/n/page/2/ 列表页
# https://news.cnblogs.com/n/628919/ 详情页

headers = {
'User-agent': "Mozilla/5.0 (Windows; U; Windows NT 6.1; zh-CN) AppleWebKit/537.36 (KHTML,like Gecko)"
" Version / 5.0.1Safari / 537.36"
}

# 异步,队列,以后换成第三方队列
urls = Queue()# 待爬取队列
htmls = Queue() # 待分析队列
outputs = Queue() # 待输出队列



# 创建博客园的新闻urls,每页30条新闻
def starts_url(start, stop, step=1):
for i in range(start, stop + 1, step):
url = "{}{}{}/".format(BASE_URL, NEWS_PAGE, i)
print(url)
urls.put(url) # 加入队列
print('任务链接创建完毕')


# 爬取页面
def crawler(e:Event):
while not e.is_set():
url = urls.get() # 阻塞,拿一条
with requests.get(url, headers=headers) as response:
html = response.text
htmls.put(html)

# 解析页面
def parse(e:Event):
# 解析
while not e.is_set():
html = htmls.get()

soup = BeautifulSoup(html, 'lxml')
# h2.news_entry > a
# //h2[@new_entry=""]/a
titles = soup.select('h2.news_entry > a')
for title in titles:
href = BASE_URL + title.get('href','')
txt = title.text
val = href, txt
outputs.put(val)
# print(val)


# 持久化
def persist(path, e: Event):
with open(path, 'a+', encoding='utf-8') as f:
while not e.is_set():
val = outputs.get()
print(val)
f.write("{}\x01{}\n".format(val[0], val[1]))
f.flush()

event = Event()

# 线程池
executor = ThreadPoolExecutor(10)

executor.submit(starts_url, 1, 1)
executor.submit(persist, 'd:/news.txt', event)
for i in range(5):
executor.submit(crawler, event)
for i in range(4):
executor.submit(parse, event)

这样一个实用的并行的爬虫就基本完成了,一般提取新的URL源源不断地注入到待爬取队列,就可以实现不间断的爬取了
可以很方便的扩展成多进程等版本

进阶(消息队列)

将队列换成第三方服务,本次采用较为常用RabbitMQ

选型

1、队列工作模式选择

  • 以爬虫程序的htmls队列为例,这个队列有多个生产者(爬取函数)写入,有多个消费者(解析函数)读取。每一个消息只能被消费一次。所以,采用RabbitMQ的工作队列模式

RabbitMQ生产者、消费者两端都可以创建交换机、队列

2、队列中如何如何分发

  • 工作队列模式,说到底就是路由模式。RabbitMQ的队列和工作队列模式,交换机都工作在direct,其实都是路由模式,只不过使用了缺省交换机

我们自己使用,可以单独创建交换机,不使用缺省交换机

3、队列是否断开删除

不能。如果每一条数据都要处理,不能因为某一端断开,然后队列就删除了,造成数据丢失

消息队列类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 新建messagequeue.py
import pika

class MessageBase:
def __init__(self, host, port, user, password, virtualhost, exchange, queue):
self.exchange_name = exchange
self.queue_name = queue

url = "amqp://{}:{}@{}:{}/{}".format(
user, password, host, port, virtualhost
)
params = pika.URLParameters(url)
self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()
# 指定交换机使用路由模式
self.channel.exchange_declare(
exchange=self.exchange_name, # 指定交换机
exchange_type='direct' # 路由
)

# 设置queue,一定要绑定,不使用缺省交换机了
self.channel.queue_declare(queue=self.queue_name, exclusive=False)
# routing_key不指定使用队列名
self.channel.queue_bind(queue=self.queue_name, exchange=self.exchange_name)


class Producer(MessageBase):
def produce(self, message):
self.channel.basic_publish(
exchange=self.exchange_name, # 使用指定交换机
routing_key=self.queue_name,
body=message)

class Consumer(MessageBase):
def consume(self):
# basic_get 非阻塞拿一条不循环
method, props, body = self.channel.basic_get(
queue=self.queue_name, # 去指定的queue上获取数据
auto_ack=True # 自动应答
) # 不阻塞方法,拿不到就是(None, None, None)
return body # 有可能是None


if __name__ == '__main__':
qs = ('urls', 'htmls', 'outputs')
# for q in qs:
# p = Producer('192.168.142.140', 5672, 'wayne', 'wayne', 'test', 'news', q)
# for i in range(40):
# msg = '{}-data-{:02}'.format(q, i)
# p.produce(msg)

c1 = Consumer('192.168.1.5', 5672, 'lqx', 'lqx', 'test', 'news', qs[0])
c2 = Consumer('192.168.1.5', 5672, 'lqx', 'lqx', 'test', 'news', qs[1])
c3 = Consumer('192.168.1.5', 5672, 'lqx', 'lqx', 'test', 'news', qs[2])

for i in range(40):
print(c1.consume())
print(c2.consume())
print(c3.consume())

重构爬虫代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import requests
from bs4 import BeautifulSoup
from bs4.element import Tag
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from threading import Event
import simplejson
from messagequeue import Producer, Consumer


BASE_URL = 'http://news.cnblogs.com'
NEWS_PAGE = '/n/page/'
# https://news.cnblogs.com/n/page/2/ 列表页
# https://news.cnblogs.com/n/628919/ 详情页

headers = {
'User-agent': "Mozilla/5.0 (Windows; U; Windows NT 6.1; zh-CN) AppleWebKit/537.36 (KHTML,like Gecko)"
" Version / 5.0.1Safari / 537.36"
}

# 异步,队列,以后换成第三方队列
# urls = Queue()# 待爬取队列
# htmls = Queue() # 待分析队列
# outputs = Queue() # 待输出队列

# 创建博客园的新闻urls,每页30条新闻
def starts_url(start, stop, step=1):
p = Producer('192.168.1.5', 5672, 'lqx', 'lqx', 'test', 'news', 'urls')
for i in range(start, stop + 1, step):
url = "{}{}{}/".format(BASE_URL, NEWS_PAGE, i)
print(url)
# urls.put(url) # 加入队列
p.produce(url)
print('任务链接创建完毕')


# 爬取页面
def crawler(e:Event):
p = Producer('192.168.1.5', 5672, 'lqx', 'lqx', 'test', 'news', 'urls')
c = Consumer('192.168.1.5', 5672, 'lqx', 'lqx', 'test', 'news', 'urls')
while not e.wait(1):
# url = urls.get() # 阻塞,拿一条
url = c.consume()
if url:
with requests.get(url, headers=headers) as response:
if response.status_code == 200:
html = response.text
#htmls.put(html)
p.produce(html)


# 解析页面
def parse(e:Event):
# 解析
p = Producer('192.168.1.5', 5672, 'lqx', 'lqx', 'test', 'news', 'urls')
c = Consumer('192.168.1.5', 5672, 'lqx', 'lqx', 'test', 'news', 'urls')
while not e.wait(1):
# html = htmls.get()
html = c.consume()

if html:
soup = BeautifulSoup(html, 'lxml')
# h2.news_entry > a
# //h2[@new_entry=""]/a
titles = soup.select('h2.news_entry > a')
for title in titles:
val = simplejson.dumps({
'title': title.text,
'url': BASE_URL + title.get('href','')
})
# outputs.put(val)
p.produce(val)
# print(val)

# 持久化
def persist(path, e: Event):
# 以后持久化到数据库当中去
c = Consumer('192.168.1.5', 5672, 'lqx', 'lqx', 'test', 'news', 'urls')
with open(path, 'a+', encoding='utf-8') as f:
while not e.wait(1):
# val = outputs.get()
data = c.consume()
if data:
val = simplejson.loads(data)
f.write("{}\x01{}\n".format(val['url'], val['title']))
f.flush()

event = Event()

# 线程池
executor = ThreadPoolExecutor(10)

executor.submit(starts_url, 1, 2)
executor.submit(persist, 'd:/news.txt', event)

for i in range(5):
executor.submit(crawler, event)
for i in range(4):
executor.submit(parse, event)

爬取、解析、存储、url生成都可以完全独立,分别部署

感谢支持 !
0%