Package pike :: Module transport
[hide private]
[frames] | no frames]

Source Code for Module pike.transport

  1  # 
  2  # Copyright (c) 2016, Dell Technologies 
  3  # All rights reserved. 
  4  # 
  5  # Redistribution and use in source and binary forms, with or without 
  6  # modification, are permitted provided that the following conditions are met: 
  7  # 
  8  # 1. Redistributions of source code must retain the above copyright notice, 
  9  # this list of conditions and the following disclaimer. 
 10  # 2. Redistributions in binary form must reproduce the above copyright notice, 
 11  # this list of conditions and the following disclaimer in the documentation 
 12  # and/or other materials provided with the distribution. 
 13  # 
 14  # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
 15  # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
 16  # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
 17  # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 
 18  # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
 19  # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
 20  # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
 21  # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
 22  # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
 23  # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
 24  # POSSIBILITY OF SUCH DAMAGE. 
 25  # 
 26  # Module Name: 
 27  # 
 28  #        transport.py 
 29  # 
 30  # Abstract: 
 31  # 
 32  #        Async event loop, socket handling, and polling mechanisms 
 33  # 
 34  # Authors: Masen Furer (masen.furer@dell.com) 
 35  # 
 36  from errno import errorcode, EBADF, ECONNRESET, ENOTCONN, ESHUTDOWN, \ 
 37                    ECONNABORTED, EISCONN, EINPROGRESS, EALREADY, EWOULDBLOCK, \ 
 38                    EAGAIN 
 39  import select 
 40  import socket 
 41  import time 
 42   
 43  _reraised_exceptions = (KeyboardInterrupt, SystemExit) 
 44   
 45   
46 -class Transport(object):
47 """ 48 Transport is responsible for managing the underlying socket, registering 49 for socket events, dispatching read, write, and errors to higher layers. 50 It is analogous to asyncore.dispatcher and is a drop in replacement for 51 most purposes. 52 53 If the alternate_poller is specified on instantiation, then the connection 54 will register for events on that poller as opposed to the global poller. 55 """
56 - def __init__(self, alternate_poller=None):
57 self.addr = None 58 self.connected = False 59 self.socket = None 60 self._fileno = None 61 if alternate_poller is not None: 62 self.poller = alternate_poller 63 else: 64 self.poller = poller # global poller
65
66 - def create_socket(self, family, type):
67 """ 68 Creates the underlying non-blocking socket and associates it with this 69 Transport's underlying poller 70 """ 71 self.family_and_type = family, type 72 sock = socket.socket(family, type) 73 sock.setblocking(0) 74 self.set_socket(sock)
75
76 - def set_socket(self, sock):
77 """ 78 mirror the given Socket sock's file descriptor on the Transport and 79 register this Transport with the underlying poller 80 """ 81 self.socket = sock 82 self._fileno = sock.fileno() 83 self.poller.add_channel(self)
84
85 - def connect(self, address):
86 """ 87 begin establishing a connection to the (host, port) address tuple. 88 89 must call create_socket first. if the underlying socket is non-blocking 90 then this command may return before the connection is established. 91 92 higher level code should wait for the handle_connect event to signal 93 that the endpoint is successfully connected 94 """ 95 self.connected = False 96 err = self.socket.connect_ex(address) 97 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK): 98 return 99 if err in (0, EISCONN): 100 self.addr = address 101 self.handle_connect_event() 102 else: 103 raise socket.error(err, errorcode[err])
104
105 - def close(self):
106 """ 107 close the underlying socket connection and unregister this Transport 108 from the underlying poller 109 """ 110 self.socket.close() 111 self.connected = False 112 self.poller.del_channel(self)
113
114 - def send(self, data):
115 """ 116 send data bytes over the connection. if the socket would block, 117 schedule this Transport to be notified when the socket is available 118 for writing. handle_write will be called in this case. 119 120 returns the number of bytes sent or zero if the write would block 121 """ 122 result = 0 123 try: 124 result = self.socket.send(data) 125 except socket.error as err: 126 if err.errno == EAGAIN: 127 # reschedule the send when the socket is ready 128 self.poller.defer_write(self) 129 else: 130 # raise non-retryable errors 131 raise 132 return result
133
134 - def recv(self, bufsize):
135 """ 136 recv bufsize bytes over the connection. if the socket would block, 137 then return an empty buffer. When the socket is available for reading 138 handle_read will be called. 139 140 returns a string representing the bytes received 141 """ 142 result = '' 143 try: 144 result = self.socket.recv(bufsize) 145 if result == '': 146 raise EOFError("Remote host closed connection") 147 except socket.error as err: 148 # raise non-retryable errors 149 if err.errno != EAGAIN: 150 raise 151 return result
152
153 - def handle_connect_event(self):
154 """ 155 called internally when the socket becomes connected 156 """ 157 self.connected = True 158 self.handle_connect()
159
160 - def handle_connect(self):
161 """ 162 callback fired when connection is established 163 """ 164 pass
165
166 - def handle_read(self):
167 """ 168 callback fired when the socket has data available 169 """ 170 pass
171
172 - def handle_write(self):
173 """ 174 callback fired when the socket is available for writing 175 176 note: unlike asyncore, write notifications are not provided by default. 177 this is a performance optimization because the socket is usually 178 available for writing, and the application usually knows when it wants 179 to write. There is no point in filling the event queues with 180 write ready messages that will be ignored if the client has no data to 181 send. 182 183 Instead, applications are expected to implement handle_write, but to 184 call it directly when data is to be sent. IF the socket would block, 185 EALREADY will be handled by the Transport. The Transport requests a 186 single write notification from the pollwer; when received, handle_write 187 will be called once signalling that the socket may now be ready to retry 188 189 If the application would prefer to be notified when the socket is ready 190 to write, transport.poller.defer_write(transport) may be called to 191 schedule a single handle_write callback. 192 """ 193 pass
194
195 - def handle_close(self):
196 """ 197 callback fired when the socket is closed 198 """ 199 pass
200
201 - def handle_error(self):
202 """ 203 callback fired if a non-recoverable exception is raised 204 """ 205 pass
206 207
208 -class BasePoller(object):
209 """ 210 A poller is an underlying event monitoring system. This generic class 211 can be built upon to implement efficient file descriptor polling methods 212 which are available on various platforms. 213 214 A minimal subclass must implement the poll() function which performs a 215 single iteration of the event loop across all monitored Transports and 216 calls process_readables and process_writables with the correct values. 217 218 Subclasses should, in most cases call, into BasePoller methods in order 219 to maintain proper accounting structures. The exception is when the poller 220 handles accounting itself. 221 """
222 - def __init__(self):
223 """ 224 initialize the poller and register any kernel global structures 225 necessary to monitor the file descriptors 226 """ 227 self.connections = {} 228 self.deferred_writers = set()
229
230 - def add_channel(self, transport):
231 """ 232 begin monitoring the transport socket for read/connect events 233 234 the underlying poller should not monitor Transports for writability 235 except when: 236 * the Transport's connection has not yet been established 237 * the Transport has been passed as an argument to defer_write 238 """ 239 self.connections[transport._fileno] = transport 240 transport.poller = self
241
242 - def del_channel(self, transport):
243 """ 244 stop monitoring the transport socket 245 """ 246 del self.connections[transport._fileno]
247
248 - def defer_write(self, transport):
249 """ 250 defers a write on the given transport. once the async poller determines 251 that the transport can be written to, handle_write will be called 252 """ 253 self.deferred_writers.add(transport._fileno)
254
255 - def loop(self, timeout=None, count=None):
256 """ 257 enter the async event loop for the given timeout or number of iterations 258 """ 259 start = time.time() 260 complete_iterations = 0 261 while True: 262 if count is not None and complete_iterations >= count: 263 break 264 self.poll() 265 if timeout is not None and time.time() > start + timeout: 266 break 267 complete_iterations += 1
268
269 - def poll(self):
270 """ 271 Must be implemented by subclasses to execute a single iteration of the 272 event loop. Based on the outcome of the events, the following actions 273 MUST be performed 274 275 * process_readables is called with a list of file descriptors which 276 have data available for reading 277 * process_writables is called with a list of file descriptors which 278 have data available for writing 279 """ 280 raise NotImplementedError("BasePoller does not have a polling mechanism")
281
282 - def process_readables(self, readables):
283 """ 284 call handle_read on each applicable fd in the readables sequence and 285 subsequently handle_error if any exception is raised or handle_close 286 if the underlying socket is no longer connected 287 """ 288 for fileno in readables: 289 t = self.connections[fileno] 290 try: 291 t.handle_read() 292 except socket.error, e: 293 if e.args[0] not in (EBADF, ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED): 294 t.handle_error() 295 else: 296 t.handle_close() 297 except _reraised_exceptions: 298 raise 299 except: 300 t.handle_error()
301
302 - def process_writables(self, writables):
303 """ 304 for each Transport t corresponding to an fd in the writables sequence, 305 if t is not marked as connected, call handle_connect_event 306 otherwise call handle_write and remove the Transport from the set 307 of deferred writers 308 process close and error events if exception is encountered 309 """ 310 for fileno in writables: 311 t = self.connections[fileno] 312 try: 313 if not t.connected: 314 t.handle_connect_event() 315 else: 316 if fileno in self.deferred_writers: 317 self.deferred_writers.remove(fileno) 318 t.handle_write() 319 except socket.error, e: 320 if e.args[0] not in (EBADF, ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED): 321 t.handle_error() 322 else: 323 t.handle_close() 324 except _reraised_exceptions: 325 raise 326 except: 327 t.handle_error()
328 329
330 -class KQueuePoller(BasePoller):
331 """ 332 Implementation of KQueue, available on Mac OS and BSD derivatives 333 """
334 - def __init__(self):
335 super(KQueuePoller, self).__init__() 336 self.kq = select.kqueue() 337 self.batch_size = 10
338
339 - def add_channel(self, transport):
340 super(KQueuePoller, self).add_channel(transport) 341 events = [select.kevent(transport._fileno, 342 filter=select.KQ_FILTER_READ, 343 flags=select.KQ_EV_ADD | select.KQ_EV_ENABLE), 344 select.kevent(transport._fileno, 345 filter=select.KQ_FILTER_WRITE, 346 flags=(select.KQ_EV_ADD | 347 select.KQ_EV_ENABLE | 348 select.KQ_EV_ONESHOT))] 349 self.kq.control(events, 0)
350
351 - def defer_write(self, transport):
352 super(KQueuePoller, self).defer_write(transport) 353 events = [select.kevent(transport._fileno, 354 filter=select.KQ_FILTER_WRITE, 355 flags=(select.KQ_EV_ADD | 356 select.KQ_EV_ENABLE | 357 select.KQ_EV_ONESHOT))] 358 self.kq.control(events, 0)
359
360 - def poll(self):
361 events = self.kq.control(None, self.batch_size, 0) 362 readables = [] 363 writables = [] 364 for ev in events: 365 if ev.filter == select.KQ_FILTER_READ: 366 readables.append(ev.ident) 367 elif ev.filter == select.KQ_FILTER_WRITE: 368 writables.append(ev.ident) 369 self.process_readables(readables) 370 self.process_writables(writables)
371 372
373 -class SelectPoller(BasePoller):
374 """ 375 Implementation of select, available on most platforms as a fallback. 376 377 Roughly equivalent performance to using asyncore 378 """
379 - def poll(self):
380 non_connected = [t._fileno for t in self.connections.values() if not t.connected] 381 readers = self.connections.keys() 382 writers = non_connected + list(self.deferred_writers) 383 readables, writables, _ = select.select(readers, 384 writers, 385 [], 0) 386 self.process_readables(readables) 387 self.process_writables(writables)
388 389
390 -class PollPoller(BasePoller):
391 """ 392 Implementation of poll, available on Linux 393 """
394 - def __init__(self):
395 super(PollPoller, self).__init__() 396 self.p = select.poll() 397 self.read_events = ( 398 select.POLLIN | 399 select.POLLERR | 400 select.POLLHUP | 401 select.POLLNVAL | 402 select.POLLMSG | 403 select.POLLPRI) 404 self.write_events = select.POLLOUT
405
406 - def add_channel(self, transport):
407 super(PollPoller, self).add_channel(transport) 408 self.p.register( 409 transport._fileno, 410 self.read_events | self.write_events)
411
412 - def del_channel(self, transport):
413 super(PollPoller, self).del_channel(transport) 414 self.p.unregister(transport._fileno)
415
416 - def defer_write(self, transport):
417 super(PollPoller, self).defer_write(transport) 418 self.p.modify( 419 transport._fileno, 420 self.read_events | self.write_events)
421
422 - def poll(self):
423 events = self.p.poll(0) 424 readables = [] 425 writables = [] 426 for fd, event in events: 427 if event & self.read_events: 428 readables.append(fd) 429 elif event & self.write_events: 430 writables.append(fd) 431 self.p.modify(fd, self.read_events) 432 self.process_readables(readables) 433 self.process_writables(writables)
434 435 436 # Global poller / loop function for simple use cases 437 # more advanced tests or frameworks may use a custom 438 # poller implementation by setting a poller object onto 439 # a group of transports. 440 # The global poller will use the best polling mechanism available on the system 441 if hasattr(select, "kqueue"): 442 poller = KQueuePoller() 443 elif hasattr(select, "poll"): 444 poller = PollPoller() 445 else: 446 poller = SelectPoller() 447 448
449 -def loop(timeout=None, count=None):
450 poller.loop(timeout, count)
451