Package flumotion :: Package component :: Package consumers :: Package httpstreamer :: Module resources
[hide private]

Source Code for Module flumotion.component.consumers.httpstreamer.resources

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import socket 
 24  import time 
 25  import errno 
 26  import string 
 27  import resource 
 28  import fcntl 
 29   
 30  import gst 
 31   
 32  try: 
 33      from twisted.web import http 
 34  except ImportError: 
 35      from twisted.protocols import http 
 36   
 37  from twisted.web import server, resource as web_resource 
 38  from twisted.internet import reactor, defer 
 39  from twisted.python import reflect 
 40   
 41  from flumotion.configure import configure 
 42  from flumotion.common import errors 
 43   
 44  from flumotion.common import common, log, keycards 
 45   
 46  from flumotion.component.base import http as httpbase 
 47   
 48  __all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer'] 
 49  __version__ = "$Rev: 7931 $" 
 50   
 51  HTTP_NAME = 'FlumotionHTTPServer' 
 52  HTTP_VERSION = configure.version 
 53   
 54  ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN"> 
 55  <html> 
 56  <head> 
 57    <title>%(code)d %(error)s</title> 
 58  </head> 
 59  <body> 
 60  <h2>%(code)d %(error)s</h2> 
 61  </body> 
 62  </html> 
 63  """ 
 64   
 65  HTTP_SERVER = '%s/%s' % (HTTP_NAME, HTTP_VERSION) 
 66   
 67  ### the Twisted resource that handles the base URL 
 68   
 69   
70 -class HTTPStreamingResource(web_resource.Resource, log.Loggable):
71 72 __reserve_fds__ = 50 # number of fd's to reserve for non-streaming 73 74 logCategory = 'httpstreamer' 75 76 # IResource interface variable; True means it will not chain requests 77 # further down the path to other resource providers through 78 # getChildWithDefault 79 isLeaf = True 80
81 - def __init__(self, streamer, httpauth):
82 """ 83 @param streamer: L{MultifdSinkStreamer} 84 """ 85 self.streamer = streamer 86 self.httpauth = httpauth 87 88 self._requests = {} # request fd -> Request 89 90 self.maxclients = self.getMaxAllowedClients(-1) 91 self.maxbandwidth = -1 # not limited by default 92 93 # If set, a URL to redirect a user to when the limits above are reached 94 self._redirectOnFull = None 95 96 self._removing = {} # Optional deferred notification of client removals 97 98 socket = 'flumotion.component.plugs.request.RequestLoggerPlug' 99 self.loggers = streamer.plugs.get(socket, []) 100 101 socket = \ 102 'flumotion.component.plugs.requestmodifier.RequestModifierPlug' 103 self.modifiers = streamer.plugs.get(socket, []) 104 105 self.logfilter = None 106 107 web_resource.Resource.__init__(self)
108
109 - def clientRemoved(self, sink, fd, reason, stats):
110 # this is the callback attached to our flumotion component, 111 # not the GStreamer element 112 if fd in self._requests: 113 request = self._requests[fd] 114 self._removeClient(request, fd, stats) 115 else: 116 self.warning('[fd %5d] not found in _requests' % fd)
117
118 - def removeAllClients(self):
119 """ 120 Start to remove all the clients connected (this will complete 121 asynchronously from another thread) 122 123 Returns a deferred that will fire once they're all removed. 124 """ 125 l = [] 126 for fd in self._requests: 127 self._removing[fd] = defer.Deferred() 128 l.append(self._removing[fd]) 129 self.streamer.remove_client(fd) 130 131 return defer.DeferredList(l)
132
133 - def setRoot(self, path):
134 self.putChild(path, self)
135
136 - def setLogFilter(self, logfilter):
137 self.logfilter = logfilter
138
139 - def rotateLogs(self):
140 """ 141 Close the logfile, then reopen using the previous logfilename 142 """ 143 for logger in self.loggers: 144 self.debug('rotating logger %r' % logger) 145 logger.rotate()
146
147 - def logWrite(self, fd, ip, request, stats):
148 149 headers = request.getAllHeaders() 150 151 if stats: 152 bytes_sent = stats[0] 153 time_connected = int(stats[3] / gst.SECOND) 154 else: 155 bytes_sent = -1 156 time_connected = -1 157 158 args = {'ip': ip, 159 'time': time.gmtime(), 160 'method': request.method, 161 'uri': request.uri, 162 'username': '-', # FIXME: put the httpauth name 163 'get-parameters': request.args, 164 'clientproto': request.clientproto, 165 'response': request.code, 166 'bytes-sent': bytes_sent, 167 'referer': headers.get('referer', None), 168 'user-agent': headers.get('user-agent', None), 169 'time-connected': time_connected} 170 171 l = [] 172 for logger in self.loggers: 173 l.append(defer.maybeDeferred( 174 logger.event, 'http_session_completed', args)) 175 176 return defer.DeferredList(l)
177
178 - def setUserLimit(self, limit):
179 self.info('setting maxclients to %d' % limit) 180 self.maxclients = self.getMaxAllowedClients(limit) 181 # Log what we actually managed to set it to. 182 self.info('set maxclients to %d' % self.maxclients)
183
184 - def setBandwidthLimit(self, limit):
185 self.maxbandwidth = limit 186 self.info("set maxbandwidth to %d", self.maxbandwidth)
187
188 - def setRedirectionOnLimits(self, url):
189 self._redirectOnFull = url
190 191 # FIXME: rename to writeHeaders 192
193 - def _writeHeaders(self, request):
194 """ 195 Write out the HTTP headers for the incoming HTTP request. 196 197 @rtype: boolean 198 @returns: whether or not the file descriptor can be used further. 199 """ 200 fd = request.transport.fileno() 201 fdi = request.fdIncoming 202 203 # the fd could have been closed, in which case it will be -1 204 if fd == -1: 205 self.info('[fd %5d] Client gone before writing header' % fdi) 206 # FIXME: do this ? del request 207 return False 208 if fd != request.fdIncoming: 209 self.warning('[fd %5d] does not match current fd %d' % (fdi, fd)) 210 # FIXME: do this ? del request 211 return False 212 213 content = self.streamer.get_content_type() 214 request.setHeader('Server', HTTP_SERVER) 215 request.setHeader('Date', http.datetimeToString()) 216 request.setHeader('Cache-Control', 'no-cache') 217 request.setHeader('Cache-Control', 'private') 218 request.setHeader('Content-type', content) 219 220 # Call request modifiers 221 for modifier in self.modifiers: 222 modifier.modify(request) 223 224 # Mimic Twisted as close as possible 225 headers = [] 226 for name, value in request.headers.items(): 227 headers.append('%s: %s\r\n' % (name.capitalize(), value)) 228 for cookie in request.cookies: 229 headers.append('%s: %s\r\n' % ("Set-Cookie", cookie)) 230 231 232 # ASF needs a Pragma header for live broadcasts 233 # Apparently ASF breaks on WMP port 80 if you use the pragma header 234 # - Sep 5 2006 235 #if content in [ 236 # "video/x-ms-asf", 237 # "audio/x-ms-asf", 238 #]: 239 #setHeader('Pragma', 'features=broadcast') 240 241 #self.debug('setting Content-type to %s' % mime) 242 ### FIXME: there's a window where Twisted could have removed the 243 # fd because the client disconnected. Catch EBADF correctly here. 244 try: 245 # TODO: This is a non-blocking socket, we really should check 246 # return values here, or just let twisted handle all of this 247 # normally, and not hand off the fd until after twisted has 248 # finished writing the headers. 249 os.write(fd, 'HTTP/1.0 200 OK\r\n%s\r\n' % ''.join(headers)) 250 # tell TwistedWeb we already wrote headers ourselves 251 request.startedWriting = True 252 return True 253 except OSError, (no, s): 254 if no == errno.EBADF: 255 self.info('[fd %5d] client gone before writing header' % fd) 256 elif no == errno.ECONNRESET: 257 self.info( 258 '[fd %5d] client reset connection writing header' % fd) 259 else: 260 self.info( 261 '[fd %5d] unhandled write error when writing header: %s' 262 % (fd, s)) 263 # trigger cleanup of request 264 del request 265 return False
266
267 - def isReady(self):
268 if self.streamer.caps == None: 269 self.debug('We have no caps yet') 270 return False 271 272 return True
273
274 - def getMaxAllowedClients(self, maxclients):
275 """ 276 maximum number of allowed clients based on soft limit for number of 277 open file descriptors and fd reservation. Increases soft limit to 278 hard limit if possible. 279 """ 280 (softmax, hardmax) = resource.getrlimit(resource.RLIMIT_NOFILE) 281 import sys 282 version = sys.version_info 283 284 if maxclients != -1: 285 neededfds = maxclients + self.__reserve_fds__ 286 287 # Bug in python 2.4.3, see 288 # http://sourceforge.net/tracker/index.php?func=detail& 289 # aid=1494314&group_id=5470&atid=105470 290 if version[:3] == (2, 4, 3) and \ 291 not hasattr(socket, "has_2_4_3_patch"): 292 self.warning( 293 'Setting hardmax to 1024 due to python 2.4.3 bug') 294 hardmax = 1024 295 296 if neededfds > softmax: 297 lim = min(neededfds, hardmax) 298 resource.setrlimit(resource.RLIMIT_NOFILE, (lim, hardmax)) 299 return lim - self.__reserve_fds__ 300 else: 301 return maxclients 302 else: 303 return softmax - self.__reserve_fds__
304
305 - def reachedServerLimits(self):
306 if self.maxclients >= 0 and len(self._requests) >= self.maxclients: 307 return True 308 elif self.maxbandwidth >= 0: 309 # Reject if adding one more client would take us over the limit. 310 if ((len(self._requests) + 1) * 311 self.streamer.getCurrentBitrate() >= self.maxbandwidth): 312 return True 313 return False
314
315 - def _addClient(self, request):
316 """ 317 Add a request, so it can be used for statistics. 318 319 @param request: the request 320 @type request: twisted.protocol.http.Request 321 """ 322 323 fd = request.transport.fileno() 324 self._requests[fd] = request
325
326 - def _logRequestFromIP(self, ip):
327 """ 328 Returns whether we want to log a request from this IP; allows us to 329 filter requests from automated monitoring systems. 330 """ 331 if self.logfilter: 332 return not self.logfilter.isInRange(ip) 333 else: 334 return True
335
336 - def _removeClient(self, request, fd, stats):
337 """ 338 Removes a request and add logging. 339 Note that it does not disconnect the client; it is called in reaction 340 to a client disconnecting. 341 It also removes the keycard if one was created. 342 343 @param request: the request 344 @type request: L{twisted.protocols.http.Request} 345 @param fd: the file descriptor for the client being removed 346 @type fd: L{int} 347 @param stats: the statistics for the removed client 348 @type stats: GValueArray 349 """ 350 351 # PROBE: finishing request; see httpserver.httpserver 352 self.debug('[fd %5d] (ts %f) finishing request %r', 353 request.transport.fileno(), time.time(), request) 354 355 ip = request.getClientIP() 356 if self._logRequestFromIP(ip): 357 d = self.logWrite(fd, ip, request, stats) 358 else: 359 d = defer.succeed(True) 360 self.info('[fd %5d] Client from %s disconnected' % (fd, ip)) 361 362 # We can't call request.finish(), since we already "stole" the fd, we 363 # just loseConnection on the transport directly, and delete the 364 # Request object, after cleaning up the bouncer bits. 365 self.httpauth.cleanupAuth(fd) 366 367 self.debug('[fd %5d] (ts %f) closing transport %r', fd, time.time(), 368 request.transport) 369 # This will close the underlying socket. We first remove the request 370 # from our fd->request map, since the moment we call this the fd might 371 # get re-added. 372 del self._requests[fd] 373 request.transport.loseConnection() 374 375 self.debug('[fd %5d] closed transport %r' % (fd, request.transport)) 376 377 def _done(_): 378 if fd in self._removing: 379 self.debug("client is removed; firing deferred") 380 removeD = self._removing.pop(fd) 381 removeD.callback(None) 382 383 # PROBE: finished request; see httpserver.httpserver 384 self.debug('[fd %5d] (ts %f) finished request %r', 385 fd, time.time(), request)
386 387 d.addCallback(_done) 388 return d
389
390 - def handleAuthenticatedRequest(self, res, request):
391 392 # PROBE: authenticated request; see httpserver.httpfile 393 self.debug('[fd %5d] (ts %f) authenticated request %r', 394 request.transport.fileno(), time.time(), request) 395 396 if request.method == 'GET': 397 self._handleNewClient(request) 398 elif request.method == 'HEAD': 399 self.debug('handling HEAD request') 400 self._writeHeaders(request) 401 request.finish() 402 else: 403 raise AssertionError 404 405 return res
406 407 ### resource.Resource methods 408 409 # this is the callback receiving the request initially 410
411 - def _render(self, request):
412 fd = request.transport.fileno() 413 # we store the fd again in the request using it as an id for later 414 # on, so we can check when an fd went away (being -1) inbetween 415 request.fdIncoming = fd 416 417 # PROBE: incoming request; see httpserver.httpfile 418 self.debug('[fd %5d] (ts %f) incoming request %r', 419 fd, time.time(), request) 420 421 self.info('[fd %5d] Incoming client connection from %s' % ( 422 fd, request.getClientIP())) 423 self.debug('[fd %5d] _render(): request %s' % ( 424 fd, request)) 425 426 if not self.isReady(): 427 return self._handleNotReady(request) 428 elif self.reachedServerLimits(): 429 return self._handleServerFull(request) 430 431 self.debug('_render(): asked for (possible) authentication') 432 d = self.httpauth.startAuthentication(request) 433 d.addCallback(self.handleAuthenticatedRequest, request) 434 # Authentication has failed and we've written a response; nothing 435 # more to do 436 d.addErrback(lambda x: None) 437 438 # we MUST return this from our _render. 439 return server.NOT_DONE_YET
440
441 - def _handleNotReady(self, request):
442 self.debug('Not sending data, it\'s not ready') 443 return server.NOT_DONE_YET
444
445 - def _handleServerFull(self, request):
446 if self._redirectOnFull: 447 self.debug("Redirecting client, client limit %d reached", 448 self.maxclients) 449 error_code = http.FOUND 450 request.setHeader('location', self._redirectOnFull) 451 else: 452 self.debug('Refusing clients, client limit %d reached' % 453 self.maxclients) 454 error_code = http.SERVICE_UNAVAILABLE 455 456 request.setHeader('content-type', 'text/html') 457 458 request.setHeader('server', HTTP_VERSION) 459 request.setResponseCode(error_code) 460 461 return ERROR_TEMPLATE % {'code': error_code, 462 'error': http.RESPONSES[error_code]}
463
464 - def _handleNewClient(self, request):
465 # everything fulfilled, serve to client 466 fdi = request.fdIncoming 467 if not self._writeHeaders(request): 468 self.debug("[fd %5d] not adding as a client" % fdi) 469 return 470 self._addClient(request) 471 472 # take over the file descriptor from Twisted by removing them from 473 # the reactor 474 # spiv told us to remove* on request.transport, and that works 475 # then we figured out that a new request is only a Reader, so we 476 # remove the removedWriter - this is because we never write to the 477 # socket through twisted, only with direct os.write() calls from 478 # _writeHeaders. 479 480 # see http://twistedmatrix.com/trac/ticket/1796 for a guarantee 481 # that this is a supported way of stealing the socket 482 fd = fdi 483 self.debug("[fd %5d] taking away from Twisted" % fd) 484 reactor.removeReader(request.transport) 485 #reactor.removeWriter(request.transport) 486 487 # check if it's really an open fd (i.e. that twisted didn't close it 488 # before the removeReader() call) 489 try: 490 fcntl.fcntl(fd, fcntl.F_GETFL) 491 except IOError, e: 492 if e.errno == errno.EBADF: 493 self.warning("[fd %5d] is not actually open, ignoring" % fd) 494 else: 495 self.warning("[fd %5d] error during check: %s (%d)" % ( 496 fd, e.strerror, e.errno)) 497 return 498 499 # hand it to multifdsink 500 self.streamer.add_client(fd) 501 ip = request.getClientIP() 502 503 # PROBE: started request; see httpfile.httpfile 504 self.debug('[fd %5d] (ts %f) started request %r', 505 fd, time.time(), request) 506 507 self.info('[fd %5d] Started streaming to %s' % (fd, ip))
508 509 render_GET = _render 510 render_HEAD = _render 511 512
513 -class HTTPRoot(web_resource.Resource, log.Loggable):
514 logCategory = "httproot" 515
516 - def getChildWithDefault(self, path, request):
517 # we override this method so that we can look up tree resources 518 # directly without having their parents. 519 # There's probably a more Twisted way of doing this, but ... 520 fullPath = path 521 if request.postpath: 522 fullPath += '/' + string.join(request.postpath, '/') 523 self.debug("[fd %5d] Incoming request %r for path %s", 524 request.transport.fileno(), request, fullPath) 525 r = web_resource.Resource.getChildWithDefault(self, fullPath, request) 526 self.debug("Returning resource %r" % r) 527 return r
528