1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 from errno import errorcode, EBADF, ECONNRESET, ENOTCONN, ESHUTDOWN, \
37 ECONNABORTED, EISCONN, EINPROGRESS, EALREADY, EWOULDBLOCK, \
38 EAGAIN
39 import select
40 import socket
41 import time
42
43 _reraised_exceptions = (KeyboardInterrupt, SystemExit)
44
45
47 """
48 Transport is responsible for managing the underlying socket, registering
49 for socket events, dispatching read, write, and errors to higher layers.
50 It is analogous to asyncore.dispatcher and is a drop in replacement for
51 most purposes.
52
53 If the alternate_poller is specified on instantiation, then the connection
54 will register for events on that poller as opposed to the global poller.
55 """
56 - def __init__(self, alternate_poller=None):
57 self.addr = None
58 self.connected = False
59 self.socket = None
60 self._fileno = None
61 if alternate_poller is not None:
62 self.poller = alternate_poller
63 else:
64 self.poller = poller
65
67 """
68 Creates the underlying non-blocking socket and associates it with this
69 Transport's underlying poller
70 """
71 self.family_and_type = family, type
72 sock = socket.socket(family, type)
73 sock.setblocking(0)
74 self.set_socket(sock)
75
77 """
78 mirror the given Socket sock's file descriptor on the Transport and
79 register this Transport with the underlying poller
80 """
81 self.socket = sock
82 self._fileno = sock.fileno()
83 self.poller.add_channel(self)
84
86 """
87 begin establishing a connection to the (host, port) address tuple.
88
89 must call create_socket first. if the underlying socket is non-blocking
90 then this command may return before the connection is established.
91
92 higher level code should wait for the handle_connect event to signal
93 that the endpoint is successfully connected
94 """
95 self.connected = False
96 err = self.socket.connect_ex(address)
97 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
98 return
99 if err in (0, EISCONN):
100 self.addr = address
101 self.handle_connect_event()
102 else:
103 raise socket.error(err, errorcode[err])
104
106 """
107 close the underlying socket connection and unregister this Transport
108 from the underlying poller
109 """
110 self.socket.close()
111 self.connected = False
112 self.poller.del_channel(self)
113
114 - def send(self, data):
115 """
116 send data bytes over the connection. if the socket would block,
117 schedule this Transport to be notified when the socket is available
118 for writing. handle_write will be called in this case.
119
120 returns the number of bytes sent or zero if the write would block
121 """
122 result = 0
123 try:
124 result = self.socket.send(data)
125 except socket.error as err:
126 if err.errno == EAGAIN:
127
128 self.poller.defer_write(self)
129 else:
130
131 raise
132 return result
133
134 - def recv(self, bufsize):
135 """
136 recv bufsize bytes over the connection. if the socket would block,
137 then return an empty buffer. When the socket is available for reading
138 handle_read will be called.
139
140 returns a string representing the bytes received
141 """
142 result = ''
143 try:
144 result = self.socket.recv(bufsize)
145 if result == '':
146 raise EOFError("Remote host closed connection")
147 except socket.error as err:
148
149 if err.errno != EAGAIN:
150 raise
151 return result
152
154 """
155 called internally when the socket becomes connected
156 """
157 self.connected = True
158 self.handle_connect()
159
161 """
162 callback fired when connection is established
163 """
164 pass
165
167 """
168 callback fired when the socket has data available
169 """
170 pass
171
173 """
174 callback fired when the socket is available for writing
175
176 note: unlike asyncore, write notifications are not provided by default.
177 this is a performance optimization because the socket is usually
178 available for writing, and the application usually knows when it wants
179 to write. There is no point in filling the event queues with
180 write ready messages that will be ignored if the client has no data to
181 send.
182
183 Instead, applications are expected to implement handle_write, but to
184 call it directly when data is to be sent. IF the socket would block,
185 EALREADY will be handled by the Transport. The Transport requests a
186 single write notification from the pollwer; when received, handle_write
187 will be called once signalling that the socket may now be ready to retry
188
189 If the application would prefer to be notified when the socket is ready
190 to write, transport.poller.defer_write(transport) may be called to
191 schedule a single handle_write callback.
192 """
193 pass
194
196 """
197 callback fired when the socket is closed
198 """
199 pass
200
202 """
203 callback fired if a non-recoverable exception is raised
204 """
205 pass
206
207
209 """
210 A poller is an underlying event monitoring system. This generic class
211 can be built upon to implement efficient file descriptor polling methods
212 which are available on various platforms.
213
214 A minimal subclass must implement the poll() function which performs a
215 single iteration of the event loop across all monitored Transports and
216 calls process_readables and process_writables with the correct values.
217
218 Subclasses should, in most cases call, into BasePoller methods in order
219 to maintain proper accounting structures. The exception is when the poller
220 handles accounting itself.
221 """
223 """
224 initialize the poller and register any kernel global structures
225 necessary to monitor the file descriptors
226 """
227 self.connections = {}
228 self.deferred_writers = set()
229
231 """
232 begin monitoring the transport socket for read/connect events
233
234 the underlying poller should not monitor Transports for writability
235 except when:
236 * the Transport's connection has not yet been established
237 * the Transport has been passed as an argument to defer_write
238 """
239 self.connections[transport._fileno] = transport
240 transport.poller = self
241
243 """
244 stop monitoring the transport socket
245 """
246 del self.connections[transport._fileno]
247
249 """
250 defers a write on the given transport. once the async poller determines
251 that the transport can be written to, handle_write will be called
252 """
253 self.deferred_writers.add(transport._fileno)
254
255 - def loop(self, timeout=None, count=None):
256 """
257 enter the async event loop for the given timeout or number of iterations
258 """
259 start = time.time()
260 complete_iterations = 0
261 while True:
262 if count is not None and complete_iterations >= count:
263 break
264 self.poll()
265 if timeout is not None and time.time() > start + timeout:
266 break
267 complete_iterations += 1
268
270 """
271 Must be implemented by subclasses to execute a single iteration of the
272 event loop. Based on the outcome of the events, the following actions
273 MUST be performed
274
275 * process_readables is called with a list of file descriptors which
276 have data available for reading
277 * process_writables is called with a list of file descriptors which
278 have data available for writing
279 """
280 raise NotImplementedError("BasePoller does not have a polling mechanism")
281
283 """
284 call handle_read on each applicable fd in the readables sequence and
285 subsequently handle_error if any exception is raised or handle_close
286 if the underlying socket is no longer connected
287 """
288 for fileno in readables:
289 t = self.connections[fileno]
290 try:
291 t.handle_read()
292 except socket.error, e:
293 if e.args[0] not in (EBADF, ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
294 t.handle_error()
295 else:
296 t.handle_close()
297 except _reraised_exceptions:
298 raise
299 except:
300 t.handle_error()
301
303 """
304 for each Transport t corresponding to an fd in the writables sequence,
305 if t is not marked as connected, call handle_connect_event
306 otherwise call handle_write and remove the Transport from the set
307 of deferred writers
308 process close and error events if exception is encountered
309 """
310 for fileno in writables:
311 t = self.connections[fileno]
312 try:
313 if not t.connected:
314 t.handle_connect_event()
315 else:
316 if fileno in self.deferred_writers:
317 self.deferred_writers.remove(fileno)
318 t.handle_write()
319 except socket.error, e:
320 if e.args[0] not in (EBADF, ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED):
321 t.handle_error()
322 else:
323 t.handle_close()
324 except _reraised_exceptions:
325 raise
326 except:
327 t.handle_error()
328
329
331 """
332 Implementation of KQueue, available on Mac OS and BSD derivatives
333 """
338
340 super(KQueuePoller, self).add_channel(transport)
341 events = [select.kevent(transport._fileno,
342 filter=select.KQ_FILTER_READ,
343 flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE),
344 select.kevent(transport._fileno,
345 filter=select.KQ_FILTER_WRITE,
346 flags=(select.KQ_EV_ADD |
347 select.KQ_EV_ENABLE |
348 select.KQ_EV_ONESHOT))]
349 self.kq.control(events, 0)
350
352 super(KQueuePoller, self).defer_write(transport)
353 events = [select.kevent(transport._fileno,
354 filter=select.KQ_FILTER_WRITE,
355 flags=(select.KQ_EV_ADD |
356 select.KQ_EV_ENABLE |
357 select.KQ_EV_ONESHOT))]
358 self.kq.control(events, 0)
359
361 events = self.kq.control(None, self.batch_size, 0)
362 readables = []
363 writables = []
364 for ev in events:
365 if ev.filter == select.KQ_FILTER_READ:
366 readables.append(ev.ident)
367 elif ev.filter == select.KQ_FILTER_WRITE:
368 writables.append(ev.ident)
369 self.process_readables(readables)
370 self.process_writables(writables)
371
372
374 """
375 Implementation of select, available on most platforms as a fallback.
376
377 Roughly equivalent performance to using asyncore
378 """
380 non_connected = [t._fileno for t in self.connections.values() if not t.connected]
381 readers = self.connections.keys()
382 writers = non_connected + list(self.deferred_writers)
383 readables, writables, _ = select.select(readers,
384 writers,
385 [], 0)
386 self.process_readables(readables)
387 self.process_writables(writables)
388
389
391 """
392 Implementation of poll, available on Linux
393 """
395 super(PollPoller, self).__init__()
396 self.p = select.poll()
397 self.read_events = (
398 select.POLLIN |
399 select.POLLERR |
400 select.POLLHUP |
401 select.POLLNVAL |
402 select.POLLMSG |
403 select.POLLPRI)
404 self.write_events = select.POLLOUT
405
411
415
421
423 events = self.p.poll(0)
424 readables = []
425 writables = []
426 for fd, event in events:
427 if event & self.read_events:
428 readables.append(fd)
429 elif event & self.write_events:
430 writables.append(fd)
431 self.p.modify(fd, self.read_events)
432 self.process_readables(readables)
433 self.process_writables(writables)
434
435
436
437
438
439
440
441 if hasattr(select, "kqueue"):
442 poller = KQueuePoller()
443 elif hasattr(select, "poll"):
444 poller = PollPoller()
445 else:
446 poller = SelectPoller()
447
448
449 -def loop(timeout=None, count=None):
451