scrapy框架之Twisted

发布时间 2023-12-08 15:22:52作者: 木屐呀

 ① getPage

1 1 # socket对象(如果下载完成..自动从事件循环中移除)
2 2 from twisted.web.client import getPage

详解:

 1 def getPage(url, contextFactory=None, *args, **kwargs):
 2     """
 3     Download a web page as a string.  
 4 
 5     Download a page. Return a deferred, which will callback with a
 6     page (as a string) or errback with a description of the error.
 7 
 8     See L{HTTPClientFactory} to see what extra arguments can be passed.
 9     """
10     return _makeGetterFactory(  #######################
11         url,
12         HTTPClientFactory,########################
13         contextFactory=contextFactory,
14         *args, **kwargs).deferred  ###########################
 1 def _makeGetterFactory(url, factoryFactory, contextFactory=None,
 2                        *args, **kwargs):
 3     """
 4     Create and connect an HTTP page getting factory.
 5 
 6     Any additional positional or keyword arguments are used when calling
 7     C{factoryFactory}.
 8 
 9     @param factoryFactory: Factory factory that is called with C{url}, C{args}
10         and C{kwargs} to produce the getter
11 
12     @param contextFactory: Context factory to use when creating a secure
13         connection, defaulting to L{None}
14 
15     @return: The factory created by C{factoryFactory}
16     """
17     uri = URI.fromBytes(url)
18     factory = factoryFactory(url, *args, **kwargs) #########################
19     if uri.scheme == b'https':
20         from twisted.internet import ssl
21         if contextFactory is None:
22             contextFactory = ssl.ClientContextFactory() 
23         reactor.connectSSL( ########################
24             nativeString(uri.host), uri.port, factory, contextFactory) 
25     else:
26         reactor.connectTCP(nativeString(uri.host), uri.port, factory)#############################
27     return factory
 1 class HTTPClientFactory(protocol.ClientFactory):
 2     """Download a given URL.
 3 
 4     @type deferred: Deferred
 5     @ivar deferred: A Deferred that will fire when the content has
 6           been retrieved. Once this is fired, the ivars `status', `version',
 7           and `message' will be set.
 8 
 9     @type status: bytes
10     @ivar status: The status of the response.
11 
12     @type version: bytes
13     @ivar version: The version of the response.
14 
15     @type message: bytes
16     @ivar message: The text message returned with the status.
17 
18     @type response_headers: dict
19     @ivar response_headers: The headers that were specified in the
20           response from the server.
21 
22     @type method: bytes
23     @ivar method: The HTTP method to use in the request.  This should be one of
24         OPTIONS, GET, HEAD, POST, PUT, DELETE, TRACE, or CONNECT (case
25         matters).  Other values may be specified if the server being contacted
26         supports them.
27 
28     @type redirectLimit: int
29     @ivar redirectLimit: The maximum number of HTTP redirects that can occur
30           before it is assumed that the redirection is endless.
31 
32     @type afterFoundGet: C{bool}
33     @ivar afterFoundGet: Deviate from the HTTP 1.1 RFC by handling redirects
34         the same way as most web browsers; if the request method is POST and a
35         302 status is encountered, the redirect is followed with a GET method
36 
37     @type _redirectCount: int
38     @ivar _redirectCount: The current number of HTTP redirects encountered.
39 
40     @ivar _disconnectedDeferred: A L{Deferred} which only fires after the last
41         connection associated with the request (redirects may cause multiple
42         connections to be required) has closed.  The result Deferred will only
43         fire after this Deferred, so that callers can be assured that there are
44         no more event sources in the reactor once they get the result.
45     """
46 
47     protocol = HTTPPageGetter
48 
49     url = None
50     scheme = None
51     host = b''
52     port = None
53     path = None
54 
55     def __init__(self, url, method=b'GET', postdata=None, headers=None,
56                  agent=b"Twisted PageGetter", timeout=0, cookies=None,
57                  followRedirect=True, redirectLimit=20,
58                  afterFoundGet=False):
59         self.followRedirect = followRedirect
60         self.redirectLimit = redirectLimit
61         self._redirectCount = 0
62         self.timeout = timeout
63         self.agent = agent
64         self.afterFoundGet = afterFoundGet
65         if cookies is None:
66             cookies = {}
67         self.cookies = cookies
68         if headers is not None:
69             self.headers = InsensitiveDict(headers)
70         else:
71             self.headers = InsensitiveDict()
72         if postdata is not None:
73             self.headers.setdefault(b'Content-Length',
74                                     intToBytes(len(postdata)))
75             # just in case a broken http/1.1 decides to keep connection alive
76             self.headers.setdefault(b"connection", b"close")
77         self.postdata = postdata
78         self.method = method
79 
80         self.setURL(url)
81 
82         self.waiting = 1
83         self._disconnectedDeferred = defer.Deferred()  
84         self.deferred = defer.Deferred()  ###################################
85         # Make sure the first callback on the result Deferred pauses the
86         # callback chain until the request connection is closed.
87         self.deferred.addBoth(self._waitForDisconnect)
88         self.response_headers = None

 ② defer

1 #defer.Deferred 特殊的socket对象(不发送请求,手动从事件循环移除)
2 from twisted.internet import defer

详细:

class Deferred:
    """
    This is a callback which will be put off until later.

    Why do we want this? Well, in cases where a function in a threaded
    program would block until it gets a result, for Twisted it should
    not block. Instead, it should return a L{Deferred}.

    This can be implemented for protocols that run over the network by
    writing an asynchronous protocol for L{twisted.internet}. For methods
    that come from outside packages that are not under our control, we use
    threads (see for example L{twisted.enterprise.adbapi}).

    For more information about Deferreds, see doc/core/howto/defer.html or
    U{http://twistedmatrix.com/documents/current/core/howto/defer.html}

    When creating a Deferred, you may provide a canceller function, which
    will be called by d.cancel() to let you do any clean-up necessary if the
    user decides not to wait for the deferred to complete.

    @ivar called: A flag which is C{False} until either C{callback} or
        C{errback} is called and afterwards always C{True}.
    @type called: L{bool}

    @ivar paused: A counter of how many unmatched C{pause} calls have been made
        on this instance.
    @type paused: L{int}

    @ivar _suppressAlreadyCalled: A flag used by the cancellation mechanism
        which is C{True} if the Deferred has no canceller and has been
        cancelled, C{False} otherwise.  If C{True}, it can be expected that
        C{callback} or C{errback} will eventually be called and the result
        should be silently discarded.
    @type _suppressAlreadyCalled: L{bool}

    @ivar _runningCallbacks: A flag which is C{True} while this instance is
        executing its callback chain, used to stop recursive execution of
        L{_runCallbacks}
    @type _runningCallbacks: L{bool}

    @ivar _chainedTo: If this L{Deferred} is waiting for the result of another
        L{Deferred}, this is a reference to the other Deferred.  Otherwise,
        L{None}.
    """

    called = False
    paused = False
    _debugInfo = None
    _suppressAlreadyCalled = False

    # Are we currently running a user-installed callback?  Meant to prevent
    # recursive running of callbacks when a reentrant call to add a callback is
    # used.
    _runningCallbacks = False

    # Keep this class attribute for now, for compatibility with code that
    # sets it directly.
    debug = False

    _chainedTo = None

    def __init__(self, canceller=None):
        """
        Initialize a L{Deferred}.

        @param canceller: a callable used to stop the pending operation
            scheduled by this L{Deferred} when L{Deferred.cancel} is
            invoked. The canceller will be passed the deferred whose
            cancelation is requested (i.e., self).

            If a canceller is not given, or does not invoke its argument's
            C{callback} or C{errback} method, L{Deferred.cancel} will
            invoke L{Deferred.errback} with a L{CancelledError}.

            Note that if a canceller is not given, C{callback} or
            C{errback} may still be invoked exactly once, even though
            defer.py will have already invoked C{errback}, as described
            above.  This allows clients of code which returns a L{Deferred}
            to cancel it without requiring the L{Deferred} instantiator to
            provide any specific implementation support for cancellation.
            New in 10.1.

        @type canceller: a 1-argument callable which takes a L{Deferred}. The
            return result is ignored.
        """
        self.callbacks = []
        self._canceller = canceller
        if self.debug:
            self._debugInfo = DebugInfo()
            self._debugInfo.creator = traceback.format_stack()[:-1]
 1     def callback(self, result):
 2         """
 3         Run all success callbacks that have been added to this L{Deferred}.
 4 
 5         Each callback will have its result passed as the first argument to
 6         the next; this way, the callbacks act as a 'processing chain'.  If
 7         the success-callback returns a L{Failure} or raises an L{Exception},
 8         processing will continue on the *error* callback chain.  If a
 9         callback (or errback) returns another L{Deferred}, this L{Deferred}
10         will be chained to it (and further callbacks will not run until that
11         L{Deferred} has a result).
12 
13         An instance of L{Deferred} may only have either L{callback} or
14         L{errback} called on it, and only once.
15 
16         @param result: The object which will be passed to the first callback
17             added to this L{Deferred} (via L{addCallback}).
18 
19         @raise AlreadyCalledError: If L{callback} or L{errback} has already been
20             called on this L{Deferred}.
21         """
22         assert not isinstance(result, Deferred)
23         self._startRunCallbacks(result)  #########################
 1     def _startRunCallbacks(self, result):
 2         if self.called:
 3             if self._suppressAlreadyCalled:
 4                 self._suppressAlreadyCalled = False
 5                 return
 6             if self.debug:
 7                 if self._debugInfo is None:
 8                     self._debugInfo = DebugInfo()
 9                 extra = "\n" + self._debugInfo._getDebugTracebacks()
10                 raise AlreadyCalledError(extra)
11             raise AlreadyCalledError
12         if self.debug:
13             if self._debugInfo is None:
14                 self._debugInfo = DebugInfo()
15             self._debugInfo.invoker = traceback.format_stack()[:-2]
16         self.called = True
17         self.result = result
18         self._runCallbacks()
  1     def _runCallbacks(self):
  2         """
  3         Run the chain of callbacks once a result is available.
  4 
  5         This consists of a simple loop over all of the callbacks, calling each
  6         with the current result and making the current result equal to the
  7         return value (or raised exception) of that call.
  8 
  9         If L{_runningCallbacks} is true, this loop won't run at all, since
 10         it is already running above us on the call stack.  If C{self.paused} is
 11         true, the loop also won't run, because that's what it means to be
 12         paused.
 13 
 14         The loop will terminate before processing all of the callbacks if a
 15         L{Deferred} without a result is encountered.
 16 
 17         If a L{Deferred} I{with} a result is encountered, that result is taken
 18         and the loop proceeds.
 19 
 20         @note: The implementation is complicated slightly by the fact that
 21             chaining (associating two L{Deferred}s with each other such that one
 22             will wait for the result of the other, as happens when a Deferred is
 23             returned from a callback on another L{Deferred}) is supported
 24             iteratively rather than recursively, to avoid running out of stack
 25             frames when processing long chains.
 26         """
 27         if self._runningCallbacks:
 28             # Don't recursively run callbacks
 29             return
 30 
 31         # Keep track of all the Deferreds encountered while propagating results
 32         # up a chain.  The way a Deferred gets onto this stack is by having
 33         # added its _continuation() to the callbacks list of a second Deferred
 34         # and then that second Deferred being fired.  ie, if ever had _chainedTo
 35         # set to something other than None, you might end up on this stack.
 36         chain = [self]
 37 
 38         while chain:
 39             current = chain[-1]
 40 
 41             if current.paused:
 42                 # This Deferred isn't going to produce a result at all.  All the
 43                 # Deferreds up the chain waiting on it will just have to...
 44                 # wait.
 45                 return
 46 
 47             finished = True
 48             current._chainedTo = None
 49             while current.callbacks:
 50                 item = current.callbacks.pop(0)
 51                 callback, args, kw = item[
 52                     isinstance(current.result, failure.Failure)]
 53                 args = args or ()
 54                 kw = kw or {}
 55 
 56                 # Avoid recursion if we can.
 57                 if callback is _CONTINUE:
 58                     # Give the waiting Deferred our current result and then
 59                     # forget about that result ourselves.
 60                     chainee = args[0]
 61                     chainee.result = current.result
 62                     current.result = None
 63                     # Making sure to update _debugInfo
 64                     if current._debugInfo is not None:
 65                         current._debugInfo.failResult = None
 66                     chainee.paused -= 1
 67                     chain.append(chainee)
 68                     # Delay cleaning this Deferred and popping it from the chain
 69                     # until after we've dealt with chainee.
 70                     finished = False
 71                     break
 72 
 73                 try:
 74                     current._runningCallbacks = True
 75                     try:
 76                         current.result = callback(current.result, *args, **kw)
 77                         if current.result is current:
 78                             warnAboutFunction(
 79                                 callback,
 80                                 "Callback returned the Deferred "
 81                                 "it was attached to; this breaks the "
 82                                 "callback chain and will raise an "
 83                                 "exception in the future.")
 84                     finally:
 85                         current._runningCallbacks = False
 86                 except:
 87                     # Including full frame information in the Failure is quite
 88                     # expensive, so we avoid it unless self.debug is set.
 89                     current.result = failure.Failure(captureVars=self.debug)
 90                 else:
 91                     if isinstance(current.result, Deferred):
 92                         # The result is another Deferred.  If it has a result,
 93                         # we can take it and keep going.
 94                         resultResult = getattr(current.result, 'result', _NO_RESULT)
 95                         if resultResult is _NO_RESULT or isinstance(resultResult, Deferred) or current.result.paused:
 96                             # Nope, it didn't.  Pause and chain.
 97                             current.pause()
 98                             current._chainedTo = current.result
 99                             # Note: current.result has no result, so it's not
100                             # running its callbacks right now.  Therefore we can
101                             # append to the callbacks list directly instead of
102                             # using addCallbacks.
103                             current.result.callbacks.append(current._continuation())
104                             break
105                         else:
106                             # Yep, it did.  Steal it.
107                             current.result.result = None
108                             # Make sure _debugInfo's failure state is updated.
109                             if current.result._debugInfo is not None:
110                                 current.result._debugInfo.failResult = None
111                             current.result = resultResult
112 
113             if finished:
114                 # As much of the callback chain - perhaps all of it - as can be
115                 # processed right now has been.  The current Deferred is waiting on
116                 # another Deferred or for more callbacks.  Before finishing with it,
117                 # make sure its _debugInfo is in the proper state.
118                 if isinstance(current.result, failure.Failure):
119                     # Stash the Failure in the _debugInfo for unhandled error
120                     # reporting.
121                     current.result.cleanFailure()
122                     if current._debugInfo is None:
123                         current._debugInfo = DebugInfo()
124                     current._debugInfo.failResult = current.result
125                 else:
126                     # Clear out any Failure in the _debugInfo, since the result
127                     # is no longer a Failure.
128                     if current._debugInfo is not None:
129                         current._debugInfo.failResult = None
130 
131                 # This Deferred is done, pop it from the chain and move back up
132                 # to the Deferred which supplied us with our result.
133                 chain.pop()  ###########################

③ reactor

1 # 事件循环(终止条件,所有的socket都已经移除)
2 from twisted.internet import reactor
 1 # Copyright (c) Twisted Matrix Laboratories.
 2 # See LICENSE for details.
 3 
 4 """
 5 The reactor is the Twisted event loop within Twisted, the loop which drives
 6 applications using Twisted. The reactor provides APIs for networking,
 7 threading, dispatching events, and more.
 8 
 9 The default reactor depends on the platform and will be installed if this
10 module is imported without another reactor being explicitly installed
11 beforehand. Regardless of which reactor is installed, importing this module is
12 the correct way to get a reference to it.
13 
14 New application code should prefer to pass and accept the reactor as a
15 parameter where it is needed, rather than relying on being able to import this
16 module to get a reference.  This simplifies unit testing and may make it easier
17 to one day support multiple reactors (as a performance enhancement), though
18 this is not currently possible.
19 
20 @see: L{IReactorCore<twisted.internet.interfaces.IReactorCore>}
21 @see: L{IReactorTime<twisted.internet.interfaces.IReactorTime>}
22 @see: L{IReactorProcess<twisted.internet.interfaces.IReactorProcess>}
23 @see: L{IReactorTCP<twisted.internet.interfaces.IReactorTCP>}
24 @see: L{IReactorSSL<twisted.internet.interfaces.IReactorSSL>}
25 @see: L{IReactorUDP<twisted.internet.interfaces.IReactorUDP>}
26 @see: L{IReactorMulticast<twisted.internet.interfaces.IReactorMulticast>}
27 @see: L{IReactorUNIX<twisted.internet.interfaces.IReactorUNIX>}
28 @see: L{IReactorUNIXDatagram<twisted.internet.interfaces.IReactorUNIXDatagram>}
29 @see: L{IReactorFDSet<twisted.internet.interfaces.IReactorFDSet>}
30 @see: L{IReactorThreads<twisted.internet.interfaces.IReactorThreads>}
31 @see: L{IReactorPluggableResolver<twisted.internet.interfaces.IReactorPluggableResolver>}
32 """
33 
34 from __future__ import division, absolute_import
35 
36 import sys
37 del sys.modules['twisted.internet.reactor']
38 from twisted.internet import default
39 default.install() #######################这里就不深挖了

实例:

 

  1 # 事件循环(终止条件,所有的socket都已经移除)
  2 from twisted.internet import reactor
  3 # socket对象(如果下载完成..自动从事件循环中移除)
  4 from twisted.web.client import getPage
  5 #defer.Deferred 特殊的socket对象(不发送请求,手动从事件循环移除)
  6 from twisted.internet import defer
  7 
  8 #简单分为以下三步:
  9 #1.利用getPage创建socket
 10 #2.将socket添加到事件循环中
 11 #3.开始事件循环(内部发送请求,并接受响应,当所有的socket请求完成后,终止事件循环)
 12 
 13 #1.利用getPage创建socket
 14 # def response(content):
 15 #     print(content)
 16 #
 17 # def task():
 18 #     url = "http://www.baidu.com"
 19 #     d = getPage(url)
 20 #     d.addCallback(response) #响应后执行
 21 
 22 #2.将socket添加到事件循环中
 23 # def response(content):
 24 #     print(content)
 25 #
 26 # @defer.inlineCallbacks #分析源码
 27 # def task():
 28 #     url = "http://www.baidu.com"
 29 #     d = getPage(url.encode('utf-8')) #defer.Deferred()对象
 30 #     d.addCallback(response) #响应后执行
 31 #     yield d
 32 
 33 #3.开始事件循环(内部发送请求,并接受响应,当所有的socket请求完成后,终止事件循环)
 34 # def response(content):
 35 #     print(content)
 36 #
 37 # @defer.inlineCallbacks #分析源码
 38 # def task():
 39 #     url = "http://www.baidu.com"
 40 #     d = getPage(url.encode('utf-8')) #defer.Deferred()对象
 41 #     d.addCallback(response) #响应后执行
 42 #     yield d
 43 #
 44 # task()
 45 # reactor.run()
 46 
 47 #4.增加事件循环终止
 48 # def response(content):
 49 #     print(content)
 50 #
 51 # def done(*args,**kwargs):
 52 #     reactor.stop()
 53 #
 54 # @defer.inlineCallbacks #分析源码
 55 # def task():
 56 #     url = "http://www.baidu.com"
 57 #     d = getPage(url.encode('utf-8')) #defer.Deferred()对象
 58 #     d.addCallback(response) #响应后执行
 59 #     yield d
 60 #
 61 # d = task()
 62 # dd = defer.DeferredList([d,])
 63 # dd.addBoth(done) #监听d是否完成,执行done函数
 64 #
 65 # reactor.run()
 66 
 67 #5.for实现并发功能
 68 # def response(content):
 69 #     print(content)
 70 #
 71 # def done(*args,**kwargs):
 72 #     reactor.stop() #终止事件循环
 73 #
 74 # @defer.inlineCallbacks #分析源码
 75 # def task():
 76 #     url = "http://www.baidu.com"
 77 #     d = getPage(url.encode('utf-8')) #defer.Deferred()对象
 78 #     d.addCallback(response) #响应后执行
 79 #     yield d
 80 #     # url = "http://www.baidu.com"
 81 #     # d = getPage(url.encode('utf-8'))  # defer.Deferred()对象
 82 #     # d.addCallback(response)  # 响应后执行
 83 #     # yield d
 84 #
 85 # li = []
 86 # for i in range(10):
 87 #     d = task() #仅仅只是返回socket对象,实现并发
 88 #     li.append(d)
 89 #
 90 # dd = defer.DeferredList(li)
 91 # dd.addBoth(done) #监听d是否完成,执行done函数
 92 #
 93 # reactor.run() #开始事件循环
 94 
 95 
 96 #6.增加defer.Deferred()挂起
 97 _close  = None
 98 count = 0
 99 
100 def response(content):
101     print(content)
102     # global count
103     # count += 1
104     # if count == 3:
105     #     _close.callback(None)  #终止defer.Deferred()
106 
107 @defer.inlineCallbacks
108 def task():
109     """
110     每个爬虫的开始:start_requests
111     :return:
112     """
113     url = "http://www.baidu.com"
114     d1 = getPage(url.encode('utf-8')) #defer对象
115     d1.addCallback(response) #响应后执行
116 
117     url = "http://www.cnblogs.com"
118     d2 = getPage(url.encode('utf-8'))  # defer对象
119     d2.addCallback(response)  # 响应后执行
120 
121     url = "http://www.bing.com"
122     d3 = getPage(url.encode('utf-8'))  # defer对象
123     d3.addCallback(response)  # 响应后执行
124 
125     global _close  #修改全局变量
126     _close = defer.Deferred()
127     yield _close  #需要手动终止
128 
129 def done(*args,**kwargs):
130     reactor.stop() #终止事件循环
131 
132 #两个爬虫
133 spider1 = task()
134 spider2 = task()
135 
136 dd = defer.DeferredList([spider1,spider2])
137 dd.addBoth(done)  #监听d是否完成,执行done函数
138 
139 reactor.run()

总结:

 1 # 1.特殊对象
 2 #     -d = getPage(url.encode('utf-8'))    完成自动终止
 3 #     -d.addCallback(response)
 4 #     -defer.Deferred()  _close.callback(None)  挂起,手动终止
 5 #
 6 #
 7 # 2.defer.inlineCallbacks
 8 #
 9 # 3.reactor.callLater(0,函数名)
10 #
11 # 4.
12 #     reactor.run()
13 #     reactor.stop()
14 #
15 # 5.
16 #     dd = defer.DeferredList([d1,d2])
17 #     dd.addBoth(lambda _:reactor.stop())