scrapy框架之自定义简易scrapy框架

发布时间 2023-12-08 15:22:52作者: 木屐呀

自定义low版Scrapy框架:

  1 from twisted.internet import reactor #事件循环(终止条件,所有的socket都已经移除)
  2 from twisted.web.client import getPage #socket对象(如果下载完成..自动从事件循环中移除)
  3 from twisted.internet import defer #defer.Deferred 特殊的socket对象(不发送请求,手动从事件循环移除)
  4 
  5 class Request(object):
  6 
  7     def __init__(self,url,callback):
  8         self.url = url
  9         self.callback = callback
 10 
 11 #自定义response
 12 class HttpResponse(object):
 13 
 14     def __init__(self,content,request):
 15         self.content = content
 16         self.request = request
 17         self.url = request.url
 18         self.text  = str(content,encoding='utf-8')
 19 
 20 class ChoutiSpider(object):
 21     name = 'chouti'
 22 
 23     def start_requests(self):
 24         start_url = ['http://www.baidu.com','http://www.bing.com']
 25         for url in start_url:
 26             yield Request(url,self.parse)
 27 
 28     def parse(self,response):
 29         print(response)  #response是下载的页面
 30         # yield Request('http://www.cnblogs.com')
 31         #1. crawling移除
 32         #2. 获取parse yield值
 33         #3. 再次去队列中获取
 34         yield Request(url='http://www.cnblogs.com',callback=self.parse)
 35 
 36 #队列
 37 import queue
 38 Q = queue.Queue()
 39 
 40 class Engine(object):
 41 
 42     def __init__(self):
 43         self._close = None
 44         self.max = 5  #最大并发数
 45         self.crawling = [] #已经发请求但未接收到响应下载的数量
 46 
 47     #自定义回调函数parse
 48     def get_response_callback(self,content,request):
 49         """
 50         调用用户spider中定义的parse方法,并将新请求添加到调度器中
 51         :param content:
 52         :param request:
 53         :return:
 54         """
 55         self.crawling.remove(request) #下载完成,移除请求
 56         resp = HttpResponse(content,request)  #将content,request封装成类,自定义response
 57         result = request.callback(resp)  #执行回调parse方法
 58         print(result)
 59         import types
 60         if isinstance(result,types.GeneratorType):  #判断函数返回是否是生成器
 61             for req in result:
 62                 Q.put(req)  #将parse返回的生成器请求放入队列中
 63 
 64 
 65     def _next_request(self):
 66         """
 67         去取request对象,并发送请求
 68         :return:
 69         """
 70         #若队列内和正在请求得数量都为0时,进行终止
 71         if Q.qsize() == 0 and len(self.crawling) == 0:
 72             self._close.callback(None)  # 终止defer.Deferred()
 73             return
 74 
 75         if len(self.crawling) >= self.max:  #大于
 76             return
 77         while len(self.crawling) < self.max:  #小于,则从队列一直取
 78             try:
 79                 req = Q.get(block=False) #加上block = False,表示不等待,没有即报错
 80                 self.crawling.append(req)
 81                 d = getPage(req.url.encode('utf-8')) #创建socket对象
 82                 #页面下载完成,调用get_response_callback
 83                 d.addCallback(self.get_response_callback,req)  #响应后执行回调函数
 84                 # d.addCallback(self._next_request) #再次取request,发送请求
 85                 d.addCallback(lambda _:reactor.callLater(0,self._next_request)) #效果与上面一样
 86             except Exception as e:
 87                 return
 88 
 89     @defer.inlineCallbacks
 90     def crawl(self,spider):
 91         #将初始request对象添加到调度器
 92         start_requests = iter(spider.start_requests())
 93         while True:
 94             try:
 95                 request = next(start_requests)
 96                 Q.put(request)  #把请求放入队列
 97             except StopIteration as e:
 98                 break  #没有,则中断循环
 99 
100         #反复去调度器中取任务,发送请求下载
101         # self._next_request()
102         reactor.callLater(0, self._next_request) #效果与上面一样,只是是事件循环来调用
103 
104         #hang住事件循环
105         self.close = defer.Deferred()
106         yield self._close
107 
108 
109 _active = set()
110 engine = Engine()
111 
112 spider = ChoutiSpider()
113 d = engine.crawl(spider)
114 _active.add(d)
115 
116 
117 dd = defer.DeferredList(_active)
118 dd.addBoth(lambda a:reactor.stop())
119 
120 reactor.run()

自定义小型Scrapy框架

  1 from twisted.internet import reactor   # 事件循环(终止条件,所有的socket都已经移除)
  2 from twisted.web.client import getPage # socket对象(如果下载完成,自动从时间循环中移除...)
  3 from twisted.internet import defer     # defer.Deferred 特殊的socket对象 (不会发请求,手动移除)
  4 from queue import Queue
  5 
  6 class Request(object):
  7     """
  8     用于封装用户请求相关信息
  9     """
 10     def __init__(self,url,callback):
 11         self.url = url
 12         self.callback = callback
 13 
 14 class HttpResponse(object):
 15 
 16     def __init__(self,content,request):
 17         self.content = content
 18         self.request = request
 19 
 20 class Scheduler(object):
 21     """
 22     任务调度器
 23     """
 24     def __init__(self):
 25         self.q = Queue()
 26 
 27     def open(self):
 28         pass
 29  
 30     def next_request(self):  #从队列中获取req
 31         try:
 32             req = self.q.get(block=False)
 33         except Exception as e:
 34             req = None
 35         return req
 36 
 37     def enqueue_request(self,req): #将req放入对垒中
 38         self.q.put(req)
 39 
 40     def size(self):  #获取队列中req的数量
 41         return self.q.qsize()
 42 
 43 class ExecutionEngine(object):
 44     """
 45     引擎:所有调度
 46     """
 47     def __init__(self):
 48         self._close = None
 49         self.scheduler = None
 50         self.max = 5
 51         self.crawlling = []
 52     def get_response_callback(self,content,request): #处理请求
 53         self.crawlling.remove(request)
 54         response = HttpResponse(content,request)
 55         result = request.callback(response)
 56         import types
 57         if isinstance(result,types.GeneratorType):
 58             for req in result:
 59                 self.scheduler.enqueue_request(req)
 60 
 61     def _next_request(self):
 62         if self.scheduler.size() == 0 and len(self.crawlling) == 0:
 63             self._close.callback(None)
 64             return
 65 
 66         while len(self.crawlling) < self.max:
 67             req = self.scheduler.next_request()  #6.1引擎从调度器获取req
 68             if not req:
 69                 return
 70             self.crawlling.append(req) #6.2添加到正在请求未响应的列表,计数实现最大并发数
 71             d = getPage(req.url.encode('utf-8'))
 72             d.addCallback(self.get_response_callback,req) #6.3响应后调用callback函数
 73             d.addCallback(lambda _:reactor.callLater(0,self._next_request)) #6.4循环请求
 74 
 75     @defer.inlineCallbacks
 76     def open_spider(self,start_requests):
 77         self.scheduler = Scheduler()
 78         yield self.scheduler.open()
 79         while True:
 80             try:
 81                 req = next(start_requests)
 82             except StopIteration as e:
 83                 break
 84             self.scheduler.enqueue_request(req) #5.1引擎将req请求给调度器放入队列
 85         reactor.callLater(0,self._next_request) #5.2向调度器获取req请求
 86 
 87     @defer.inlineCallbacks
 88     def start(self):
 89         self._close = defer.Deferred() #创建Deffered对象,将事件循环挂起
 90         yield self._close
 91 
 92 class Crawler(object):
 93     """
 94     用户封装调度器以及引擎的...
 95     """
 96     def _create_engine(self): #4.1
 97         return ExecutionEngine()
 98 
 99     def _create_spider(self,spider_cls_path): #4.2
100         """
101 
102         :param spider_cls_path:  spider.chouti.ChoutiSpider
103         :return:
104         """
105         module_path,cls_name = spider_cls_path.rsplit('.',maxsplit=1)
106         import importlib
107         m = importlib.import_module(module_path)
108         cls = getattr(m,cls_name)
109         return cls()
110 
111     @defer.inlineCallbacks
112     def crawl(self,spider_cls_path):
113         engine = self._create_engine() #4.1
114         spider = self._create_spider(spider_cls_path) #4.2
115         start_requests = iter(spider.start_requests()) #4.3
116         yield engine.open_spider(start_requests)  #引擎将req请求给调度器放入队列,获取request请求,创建正常的defer对象
117         yield engine.start()  #创建defer.Deffered() 挂起
118 
119 class CrawlerProcess(object):
120     """
121     开启事件循环
122     """
123     def __init__(self):
124         self._active = set()
125 
126     def crawl(self,spider_cls_path):  #
127         """
128         :param spider_cls_path:
129         :return:
130         """
131         crawler = Crawler()
132         d = crawler.crawl(spider_cls_path) #3.1创建引擎和爬虫,创建socket和defer.Deffered对象,并开始请求
133         self._active.add(d) #3.2将socket和defer.Deffered对象添加
134 
135     def start(self):
136         dd = defer.DeferredList(self._active) #3.2监控是否完成
137         dd.addBoth(lambda _:reactor.stop())
138 
139         reactor.run()
140 
141 class Commond(object):
142 
143     def run(self):
144         crawl_process = CrawlerProcess()  #2.1实例化CrawlerProcess,开始事件循环
145         spider_cls_path_list = ['spider.chouti.ChoutiSpider','spider.cnblogs.CnblogsSpider',]
146         for spider_cls_path in spider_cls_path_list:
147             crawl_process.crawl(spider_cls_path) #2.2执行CrawlerProcess的crawl方法
148         crawl_process.start()  #2.3开启事件循环及监控关闭事件循环
149 
150 
151 if __name__ == '__main__':
152     #1.开始执行命令
153     cmd = Commond()
154     cmd.run()
engine.py
 1 from engine import Request
 2 class ChoutiSpider(object):
 3 
 4     name = 'chouti'
 5 
 6     def start_requests(self):
 7         start_url = ['http://www.baidu.com','http://www.bing.com',]
 8         for url in start_url:
 9             yield Request(url,self.parse)
10 
11     def parse(self,response):
12         print(response) #response是下载的页面
13         yield Request('http://www.cnblogs.com',callback=self.parse)
chouti.py

其他:

 1 from twisted.internet import defer
 2 from twisted.web.client import getPage
 3 from twisted.internet import reactor
 4 import threading
 5 
 6 
 7 def _next_request():
 8     _next_request_from_scheduler()
 9 
10 
11 def _next_request_from_scheduler():
12     ret = getPage(bytes('http://www.chouti.com', encoding='utf8'))
13     ret.addCallback(callback)
14     ret.addCallback(lambda _: reactor.callLater(0, _next_request))
15 
16 
17 _closewait = None
18 
19 @defer.inlineCallbacks
20 def engine_start():
21     global _closewait
22     _closewait = defer.Deferred()
23     yield _closewait
24 
25 
26 @defer.inlineCallbacks
27 def task(url):
28     reactor.callLater(0, _next_request)
29     yield engine_start()
30 
31 
32 counter = 0
33 def callback(arg):
34     global counter
35     counter +=1
36     if counter == 10:
37         _closewait.callback(None)
38     print('one', len(arg))
39 
40 
41 def stop(arg):
42     print('all done', arg)
43     reactor.stop()
44 
45 
46 if __name__ == '__main__':
47     url = 'http://www.cnblogs.com'
48 
49     defer_list = []
50     deferObj = task(url)
51     defer_list.append(deferObj)
52 
53     v = defer.DeferredList(defer_list)
54     v.addBoth(stop)
55     reactor.run()
twisted示例
  1 from twisted.web.client import getPage, defer
  2 from twisted.internet import reactor
  3 import queue
  4 
  5 
  6 class Response(object):
  7     def __init__(self, body, request):
  8         self.body = body
  9         self.request = request
 10         self.url = request.url
 11 
 12     @property
 13     def text(self):
 14         return self.body.decode('utf-8')
 15 
 16 
 17 class Request(object):
 18     def __init__(self, url, callback=None):
 19         self.url = url
 20         self.callback = callback
 21 
 22 
 23 class Scheduler(object):
 24     def __init__(self, engine):
 25         self.q = queue.Queue()
 26         self.engine = engine
 27 
 28     def enqueue_request(self, request):
 29         self.q.put(request)
 30 
 31     def next_request(self):
 32         try:
 33             req = self.q.get(block=False)
 34         except Exception as e:
 35             req = None
 36 
 37         return req
 38 
 39     def size(self):
 40         return self.q.qsize()
 41 
 42 
 43 class ExecutionEngine(object):
 44     def __init__(self):
 45         self._closewait = None
 46         self.running = True
 47         self.start_requests = None
 48         self.scheduler = Scheduler(self)
 49 
 50         self.inprogress = set()
 51 
 52     def check_empty(self, response):
 53         if not self.running:
 54             self._closewait.callback('......')
 55 
 56     def _next_request(self):
 57         while self.start_requests:
 58             try:
 59                 request = next(self.start_requests)
 60             except StopIteration:
 61                 self.start_requests = None
 62             else:
 63                 self.scheduler.enqueue_request(request)
 64 
 65         while len(self.inprogress) < 5 and self.scheduler.size() > 0:  # 最大并发数为5
 66 
 67             request = self.scheduler.next_request()
 68             if not request:
 69                 break
 70 
 71             self.inprogress.add(request)
 72             d = getPage(bytes(request.url, encoding='utf-8'))
 73             d.addBoth(self._handle_downloader_output, request)
 74             d.addBoth(lambda x, req: self.inprogress.remove(req), request)
 75             d.addBoth(lambda x: self._next_request())
 76 
 77         if len(self.inprogress) == 0 and self.scheduler.size() == 0:
 78             self._closewait.callback(None)
 79 
 80     def _handle_downloader_output(self, body, request):
 81         """
 82         获取内容,执行回调函数,并且把回调函数中的返回值获取,并添加到队列中
 83         :param response: 
 84         :param request: 
 85         :return: 
 86         """
 87         import types
 88 
 89         response = Response(body, request)
 90         func = request.callback or self.spider.parse
 91         gen = func(response)
 92         if isinstance(gen, types.GeneratorType):
 93             for req in gen:
 94                 self.scheduler.enqueue_request(req)
 95 
 96     @defer.inlineCallbacks
 97     def start(self):
 98         self._closewait = defer.Deferred()
 99         yield self._closewait
100 
101     def open_spider(self, spider, start_requests):
102         self.start_requests = start_requests
103         self.spider = spider
104         reactor.callLater(0, self._next_request)
105 
106 
107 class Crawler(object):
108     def __init__(self, spidercls):
109         self.spidercls = spidercls
110 
111         self.spider = None
112         self.engine = None
113 
114     @defer.inlineCallbacks
115     def crawl(self):
116         self.engine = ExecutionEngine()
117         self.spider = self.spidercls()
118         start_requests = iter(self.spider.start_requests())
119         start_requests = iter(start_requests)
120         self.engine.open_spider(self.spider, start_requests)
121         yield self.engine.start()
122 
123 
124 class CrawlerProcess(object):
125     def __init__(self):
126         self._active = set()
127         self.crawlers = set()
128 
129     def crawl(self, spidercls, *args, **kwargs):
130         crawler = Crawler(spidercls)
131 
132         self.crawlers.add(crawler)
133         d = crawler.crawl(*args, **kwargs)
134         self._active.add(d)
135         return d
136 
137     def start(self):
138         dl = defer.DeferredList(self._active)
139         dl.addBoth(self._stop_reactor)
140         reactor.run()
141 
142     def _stop_reactor(self, _=None):
143         reactor.stop()
144 
145 
146 class Spider(object):
147     def start_requests(self):
148         for url in self.start_urls:
149             yield Request(url)
150 
151 
152 class ChoutiSpider(Spider):
153     name = "chouti"
154     start_urls = [
155         'http://dig.chouti.com/',
156     ]
157 
158     def parse(self, response):
159         print(response.text)
160 
161 
162 class CnblogsSpider(Spider):
163     name = "cnblogs"
164     start_urls = [
165         'http://www.cnblogs.com/',
166     ]
167 
168     def parse(self, response):
169         print(response.text)
170 
171 
172 if __name__ == '__main__':
173 
174     spider_cls_list = [ChoutiSpider, CnblogsSpider]
175 
176     crawler_process = CrawlerProcess()
177     for spider_cls in spider_cls_list:
178         crawler_process.crawl(spider_cls)
179 
180     crawler_process.start()
模拟scrapy框架
  1 mport types
  2 from twisted.internet import defer
  3 from twisted.web.client import getPage
  4 from twisted.internet import reactor
  5 
  6 
  7 
  8 class Request(object):
  9     def __init__(self, url, callback):
 10         self.url = url
 11         self.callback = callback
 12         self.priority = 0
 13 
 14 
 15 class HttpResponse(object):
 16     def __init__(self, content, request):
 17         self.content = content
 18         self.request = request
 19 
 20 
 21 class ChouTiSpider(object):
 22 
 23     def start_requests(self):
 24         url_list = ['http://www.cnblogs.com/', 'http://www.bing.com']
 25         for url in url_list:
 26             yield Request(url=url, callback=self.parse)
 27 
 28     def parse(self, response):
 29         print(response.request.url)
 30         # yield Request(url="http://www.baidu.com", callback=self.parse)
 31 
 32 
 33 
 34 
 35 from queue import Queue
 36 Q = Queue()
 37 
 38 
 39 class CallLaterOnce(object):
 40     def __init__(self, func, *a, **kw):
 41         self._func = func
 42         self._a = a
 43         self._kw = kw
 44         self._call = None
 45 
 46     def schedule(self, delay=0):
 47         if self._call is None:
 48             self._call = reactor.callLater(delay, self)
 49 
 50     def cancel(self):
 51         if self._call:
 52             self._call.cancel()
 53 
 54     def __call__(self):
 55         self._call = None
 56         return self._func(*self._a, **self._kw)
 57 
 58 
 59 class Engine(object):
 60     def __init__(self):
 61         self.nextcall = None
 62         self.crawlling = []
 63         self.max = 5
 64         self._closewait = None
 65 
 66     def get_response(self,content, request):
 67         response = HttpResponse(content, request)
 68         gen = request.callback(response)
 69         if isinstance(gen, types.GeneratorType):
 70             for req in gen:
 71                 req.priority = request.priority + 1
 72                 Q.put(req)
 73 
 74 
 75     def rm_crawlling(self,response,d):
 76         self.crawlling.remove(d)
 77 
 78     def _next_request(self,spider):
 79         if Q.qsize() == 0 and len(self.crawlling) == 0:
 80             self._closewait.callback(None)
 81 
 82         if len(self.crawlling) >= 5:
 83             return
 84         while len(self.crawlling) < 5:
 85             try:
 86                 req = Q.get(block=False)
 87             except Exception as e:
 88                 req = None
 89             if not req:
 90                 return
 91             d = getPage(req.url.encode('utf-8'))
 92             self.crawlling.append(d)
 93             d.addCallback(self.get_response, req)
 94             d.addCallback(self.rm_crawlling,d)
 95             d.addCallback(lambda _: self.nextcall.schedule())
 96 
 97 
 98     @defer.inlineCallbacks
 99     def crawl(self):
100         spider = ChouTiSpider()
101         start_requests = iter(spider.start_requests())
102         flag = True
103         while flag:
104             try:
105                 req = next(start_requests)
106                 Q.put(req)
107             except StopIteration as e:
108                 flag = False
109 
110         self.nextcall = CallLaterOnce(self._next_request,spider)
111         self.nextcall.schedule()
112 
113         self._closewait = defer.Deferred()
114         yield self._closewait
115 
116     @defer.inlineCallbacks
117     def pp(self):
118         yield self.crawl()
119 
120 _active = set()
121 obj = Engine()
122 d = obj.crawl()
123 _active.add(d)
124 
125 li = defer.DeferredList(_active)
126 li.addBoth(lambda _,*a,**kw: reactor.stop())
127 
128 reactor.run()
参考版

更多scrapy文档见:

http://scrapy-chs.readthedocs.io/zh_CN/latest/index.html

https://docs.scrapy.org/en/latest/

 

参考相关:https://www.cnblogs.com/wupeiqi/articles/6229292.html