多进程+多线程打造高效率爬虫

百家 作者:数据分析 2017-10-18 02:31:22

Hello 大家好!今天要跟大家分享如何用多进程+多线程打造高效率爬虫。


为什么需要多进程爬虫


你是不是发现下载图片速度特别慢、难以忍受啊!对于这种问题 一般解决办法就是多进程了!一个进程速度慢!我就用十个进程,相当于十个人一起干。速度就会快很多啦!

(为什么不说多线程?懂点Python的小伙伴都知道、GIL的存在导致Python的多线程点有坑啊!)

今天就教大家来做一个多进程的爬虫。

(其实吧、可以用来做一个超简化版的分布式爬虫)


当然还有一种加速的方法叫做“异步”!因为爬虫大部分时间都是在等待response中!‘异步’则能让程序在等待response的时间去做的其他事情。

(不过这玩意儿非三言两语能说明白就先不跟大家掰扯了!)


学过Python基础的同学都知道、在多进程中,进程之间是不能相互通信的,这就有一个很坑爹的问题的出现了!多个进程怎么知道哪些需要爬取、哪些已经被爬取了?


这就涉及到一个东西!这玩意儿叫做队列!!队列!!队列!!其实吧正常来说应该给大家用队列来完成这个教程的, 比如 Tornado 的queue模块。

(如果需要更为稳定健壮的队列,则请考虑使用Celery这一类的专用消息传递工具)


不过为了简化技术种类啊!

(才不会告诉你们是我懒,嫌麻烦呢!)

这次我们使用MongoDB。


构建思路


好了!先来理一下思路:

每个进程需要知道哪些URL爬取过了、哪些URL需要爬取!我们来给每个URL设置两种状态:

outstanding:等待爬取的URL

complete:爬取完成的URL


诶!等等我们好像忘了啥? 失败的URL的怎么办啊?我们在增加一种状态:

processing:正在进行的URL。


嗯!当一个所有初始的URL状态都为outstanding;当开始爬取的时候状态改为:processing;爬取完成状态改为:complete;失败的URL重置状态为:outstanding。为了能够处理URL进程被终止的情况、我们设置一个计时参数,当超过这个值时;我们则将状态重置为outstanding。


下面开整Go Go Go!


首先我们需要一个模块:datetime(这个模块比内置time模块要好使一点)

下面是队列的代码:

from datetime import datetime, timedelta
from pymongo import MongoClient, errors

class MogoQueue():

   OUTSTANDING = 1 ##初始状态
   PROCESSING = 2 ##正在下载状态
   COMPLETE = 3 ##下载完成状态

   def __init__(self, db, collection, timeout=300):##初始mongodb连接
       self.client = MongoClient()
       self.Client = self.client[db]
       self.db = self.Client[collection]
       self.timeout = timeout

   def __bool__(self):
       """
       这个函数,我的理解是如果下面的表达为真,则整个类为真
       至于有什么用,后面我会注明的(如果我的理解有误,请指点出来谢谢,我也是Python新手)
       $ne的意思是不匹配
       """
       record = self.db.find_one(
           {'status': {'$ne': self.COMPLETE}}
       )
       return True if record else False

   def push(self, url, title): ##这个函数用来添加新的URL进队列
       try:
           self.db.insert({'_id': url, 'status': self.OUTSTANDING, '主题': title})
           print(url, '插入队列成功')
       except errors.DuplicateKeyError as e:  ##报错则代表已经存在于队列之中了
           print(url, '已经存在于队列中了')
           pass
   def push_imgurl(self, title, url):
       try:
           self.db.insert({'_id': title, 'statue': self.OUTSTANDING, 'url': url})
           print('图片地址插入成功')
       except errors.DuplicateKeyError as e:
           print('地址已经存在了')
           pass

   def pop(self):
       """
       这个函数会查询队列中的所有状态为OUTSTANDING的值,
       更改状态,(query后面是查询)(update后面是更新)
       并返回_id(就是我们的URL),MongDB好使吧,^_^
       如果没有OUTSTANDING的值则调用repair()函数重置所有超时的状态为OUTSTANDING,
       $set是设置的意思,和MySQL的set语法一个意思
       """
       record = self.db.find_and_modify(
           query={'status': self.OUTSTANDING},
           update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}}
       )
       if record:
           return record['_id']
       else:
           self.repair()
           raise KeyError

   def pop_title(self, url):
       record = self.db.find_one({'_id': url})
       return record['主题']

   def peek(self):
       """这个函数是取出状态为 OUTSTANDING的文档并返回_id(URL)"""
       record = self.db.find_one({'status': self.OUTSTANDING})
       if record:
           return record['_id']

   def complete(self, url):
       """这个函数是更新已完成的URL完成"""
       self.db.update({'_id': url}, {'$set': {'status': self.COMPLETE}})

   def repair(self):
       """这个函数是重置状态$lt是比较"""
       record = self.db.find_and_modify(
          query={
              'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
              'status': {'$ne': self.COMPLETE}
          },
           update={'$set': {'status': self.OUTSTANDING}}
       )
       if record:
           print('重置URL状态', record['_id'])

   def clear(self):
       """这个函数只有第一次才调用、后续不要调用、因为这是删库啊!"""
       self.db.drop()


好了,队列我们做好了,下面是获取所有页面的代码。


from Download import request 

from mongodb_queue import MogoQueue 

from bs4 import BeautifulSoupspider_queue = MogoQueue('meinvxiezhenji', 'crawl_queue')def start(url):   

 response = request.get(url, 3)    

Soup = BeautifulSoup(response.text, 'lxml')    

all_a = Soup.find('div', class_='all').find_all('a')    

for a in all_a:        

title = a.get_text()        

url = a['href']        

spider_queue.push(url, title)   

 """上面这个调用就是把URL写入MongoDB的队列了"""

 if __name__ == "__main__":    start('http://www.mzitu.com/all') 

"""这一段儿就不解释了哦!超级简单的"""


下面就是多进程+多线程的下载代码了:


import os
import time
import threading
import multiprocessing
from mongodb_queue import MogoQueue
from Download import request
from bs4 import BeautifulSoup

SLEEP_TIME = 1

def mzitu_crawler(max_threads=10):
   crawl_queue = MogoQueue('meinvxiezhenji', 'crawl_queue') ##这个是我们获取URL的队列
   ##img_queue = MogoQueue('meinvxiezhenji', 'img_queue')
   def pageurl_crawler():
       while True:
           try:
               url = crawl_queue.pop()
               print(url)
           except KeyError:
               print('队列没有数据')
               break
           else:
               img_urls = []
               req = request.get(url, 3).text
               title = crawl_queue.pop_title(url)
               mkdir(title)
               os.chdir('D:mzitu\' + title)
               max_span = BeautifulSoup(req, 'lxml').find('div', class_='pagenavi').find_all('span')[-2].get_text()
               for page in range(1, int(max_span) + 1):
                   page_url = url + '/' + str(page)
                   img_url = BeautifulSoup(request.get(page_url, 3).text, 'lxml').find('div', class_='main-image').find('img')['src']
                   img_urls.append(img_url)
                   save(img_url)
               crawl_queue.complete(url) ##设置为完成状态
               ##img_queue.push_imgurl(title, img_urls)
               ##print('插入数据库成功')

   def save(img_url):
       name = img_url[-9:-4]
       print(u'开始保存:', img_url)
       img = request.get(img_url, 3)
       f = open(name + '.jpg', 'ab')
       f.write(img.content)
       f.close()

   def mkdir(path):
       path = path.strip()
       isExists = os.path.exists(os.path.join("D:mzitu", path))
       if not isExists:
           print(u'建了一个名字叫做', path, u'的文件夹!')
           os.makedirs(os.path.join("D:mzitu", path))
           return True
       else:
           print(u'名字叫做', path, u'的文件夹已经存在了!')
           return False

   threads = []
   while threads or crawl_queue:
       """
       这儿crawl_queue用上了,就是我们__bool__函数的作用,为真则代表我们MongoDB队列里面还有数据
       threads 或者 crawl_queue为真都代表我们还没下载完成,程序就会继续执行
       """
       for thread in threads:
           if not thread.is_alive(): ##is_alive是判断是否为空,不是空则在队列中删掉
               threads.remove(thread)
       while len(threads) < max_threads or crawl_queue.peek(): ##线程池中的线程少于max_threads 或者 crawl_qeue时
           thread = threading.Thread(target=pageurl_crawler) ##创建线程
           thread.setDaemon(True) ##设置守护线程
           thread.start() ##启动线程
           threads.append(thread) ##添加进线程队列
       time.sleep(SLEEP_TIME)

def process_crawler():
   process = []
   num_cpus = multiprocessing.cpu_count()
   print('将会启动进程数为:', num_cpus)
   for i in range(num_cpus):
       p = multiprocessing.Process(target=mzitu_crawler) ##创建进程
       p.start() ##启动进程
       process.append(p) ##添加进进程队列
   for p in process:
       p.join() ##等待进程队列里面的进程结束

if __name__ == "__main__":
   process_crawler()

好啦!一个多进程多线的爬虫就完成了。

(其实可以设置一下MongoDB,然后调整一下连接配置,在多台机器上跑哦!!嗯,就是超级简化版的分布式爬虫了,虽然很是简陋。)


各位小哥儿可以参考上面代码,单独处理图片地址试试(就是多个进程直接下载图片)

我测试了一下八分钟能下载100套图~


△很漂亮噢~


还可以在下载图片那一块儿加上异步(毕竟下载图片是I\O等待最久的时间了),当然也会复杂许多。


那么为了帮助小伙伴们更好的学习爬虫技术,我们邀请到了行业内著名的强子老师给大家免费聊一聊关于爬虫技术的知识,深入浅出的讲解如何用Python实现各种爬虫功能。



多进程爬虫构建

免费技术分享直播课


报名方式

扫描下方的二维码

或者直接通过搜索QQ号:311849089

扫描二维码加入专属学习群

腾讯课堂Python研究院高级讲师


强子老师


8年Python开发经验,帮助国内的创业公司做 Web 开发、爬虫、数据可视化。Python 开发及开源爱好者,vim 控,熟悉 Django, redis, docker 等。 先后在多个项目中成功的实施数据可视化,熟悉 Matlab / iGraph / d3 / neo4j 等可视化工具。

关注公众号:拾黑(shiheibook)了解更多

[广告]赞助链接:

四季很好,只要有你,文娱排行榜:https://www.yaopaiming.com/
让资讯触达的更精准有趣:https://www.0xu.cn/

公众号 关注网络尖刀微信公众号
随时掌握互联网精彩
赞助链接