Package flumotion :: Package component :: Package misc :: Package httpserver :: Module httpfile
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpfile

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_misc_httpserver -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import string 
 23  import time 
 24   
 25  # mp4seek is a library to split MP4 files, see the MP4File class docstring 
 26  HAS_MP4SEEK = False 
 27  try: 
 28      import mp4seek.async 
 29      HAS_MP4SEEK = True 
 30  except ImportError: 
 31      pass 
 32   
 33  from twisted.web import resource, server, http 
 34  from twisted.web import error as weberror, static 
 35  from twisted.internet import defer, reactor, error, abstract 
 36  from twisted.cred import credentials 
 37  from twisted.python.failure import Failure 
 38   
 39  from flumotion.configure import configure 
 40  from flumotion.component import component 
 41  from flumotion.common import log, messages, errors, netutils 
 42  from flumotion.component.component import moods 
 43  from flumotion.component.misc.porter import porterclient 
 44  from flumotion.component.misc.httpserver import fileprovider 
 45  from flumotion.component.base import http as httpbase 
 46  from flumotion.twisted import fdserver 
 47   
 48  __version__ = "$Rev$" 
 49   
 50  LOG_CATEGORY = "httpserver" 
 51   
 52  try: 
 53      resource.ErrorPage 
 54      errorpage = resource 
 55  except AttributeError: 
 56      errorpage = weberror 
 57   
 58   
59 -class BadRequest(errorpage.ErrorPage):
60 """ 61 Web error for invalid requests 62 """ 63
64 - def __init__(self, message="Invalid request format"):
65 errorpage.ErrorPage.__init__(self, http.BAD_REQUEST, 66 "Bad Request", message)
67 68
69 -class InternalServerError(errorpage.ErrorPage):
70 """ 71 Web error for internal failures 72 """ 73
74 - def __init__(self, message="The server failed to complete the request"):
75 errorpage.ErrorPage.__init__(self, http.INTERNAL_SERVER_ERROR, 76 "Internal Server Error", message)
77 78
79 -class ServiceUnavailableError(errorpage.ErrorPage):
80 """ 81 Web error for when the request cannot be served. 82 """ 83
84 - def __init__(self, message="The server is currently unable to handle " 85 "the request due to a temporary overloading " 86 "or maintenance of the server"):
87 errorpage.ErrorPage.__init__(self, http.SERVICE_UNAVAILABLE, 88 "Service Unavailable", message)
89 90
91 -class File(resource.Resource, log.Loggable):
92 """ 93 this file is inspired by/adapted from twisted.web.static 94 """ 95 96 logCategory = LOG_CATEGORY 97 98 defaultType = "application/octet-stream" 99 100 childNotFound = errorpage.NoResource("File not found.") 101 forbiddenerrorpage = errorpage.ForbiddenResource("Access forbidden") 102 badRequest = BadRequest() 103 internalServerError = InternalServerError() 104 serviceUnavailable = ServiceUnavailableError() 105
106 - def __init__(self, path, httpauth, 107 mimeToResource=None, 108 rateController=None, 109 requestModifiers=None, 110 metadataProvider=None):
111 resource.Resource.__init__(self) 112 113 self._path = path 114 self._httpauth = httpauth 115 # mapping of mime type -> File subclass 116 self._mimeToResource = mimeToResource or {} 117 self._rateController = rateController 118 self._metadataProvider = metadataProvider 119 self._requestModifiers = requestModifiers or [] 120 self._factory = MimedFileFactory(httpauth, self._mimeToResource, 121 rateController=rateController, 122 metadataProvider=metadataProvider, 123 requestModifiers=requestModifiers)
124
125 - def getChild(self, path, request):
126 self.log('getChild: self %r, path %r', self, path) 127 # we handle a request ending in '/' as well; this is how those come in 128 if path == '': 129 return self 130 131 try: 132 child = self._path.child(path) 133 except fileprovider.NotFoundError: 134 return self.childNotFound 135 except fileprovider.AccessError: 136 return self.forbiddenResource 137 except fileprovider.InsecureError: 138 return self.badRequest 139 140 return self._factory.create(child)
141
142 - def render(self, request):
143 """ 144 The request gets rendered by asking the httpauth object for 145 authentication, which returns a deferred. 146 This deferred will callback when the request gets authenticated. 147 """ 148 149 # PROBE: incoming request; see httpstreamer.resources 150 self.debug('[fd %5d] (ts %f) incoming request %r', 151 request.transport.fileno(), time.time(), request) 152 # Different headers not normally set in static.File... 153 # Specify that we will close the connection after this request, and 154 # that the client must not issue further requests. 155 # We do this because future requests on this server might actually need 156 # to go to a different process (because of the porter) 157 request.setHeader('Server', 'Flumotion/%s' % configure.version) 158 request.setHeader('Connection', 'close') 159 160 d = self._httpauth.startAuthentication(request) 161 d.addCallbacks(self._requestAuthenticated, self._authenticationFailed, 162 callbackArgs=(request, ), errbackArgs=(request, )) 163 # return NOT_DONE_YET, as required by the twisted.web interfaces 164 return server.NOT_DONE_YET
165
166 - def _authenticationFailed(self, failure, request):
167 # Authentication failed; nothing more to do, just swallow the 168 # failure. The object responsible for authentication has already 169 # written a proper response to the client and closed the request. 170 pass
171
172 - def _requestAuthenticated(self, result, request):
173 # Authentication suceeded. Start rendering the request. 174 # We always want to call _terminateRequest after rendering, 175 # regardless of whether there's a failure while rendering it or not. 176 d = defer.succeed(result) 177 d.addCallback(self._renderRequest, request) 178 d.addBoth(self._terminateRequest, request) 179 return d
180
181 - def _terminateRequest(self, body, request):
182 if body == server.NOT_DONE_YET: 183 # _renderRequest will return NOT_DONE_YET if it started serving the 184 # file. This means the callback chain started by _renderRequest has 185 # finished and we're currently serving the file. 186 return 187 if isinstance(body, Failure): 188 # Something went wrong, log it 189 self.warning("Failure during request rendering: %s", 190 log.getFailureMessage(body)) 191 if body.check(weberror.Error): 192 err = body.value 193 page = weberror.ErrorPage(err.status, err.message, 194 err.response) 195 elif body.check(fileprovider.UnavailableError): 196 page = self.serviceUnavailable 197 elif body.check(fileprovider.AccessError): 198 page = self.forbiddenResource 199 elif body.check(fileprovider.NotFoundError): 200 page = self.childNotFound 201 else: 202 page = self.internalServerError 203 body = page.render(request) 204 if body: 205 # the callback chain from _renderRequest chose to return a string 206 # body, write it out to the client 207 request.write(body) 208 self.debug('[fd %5d] Terminate request %r', 209 request.transport.fileno(), request) 210 request.finish()
211
212 - def _renderRequest(self, _, request):
213 214 # PROBE: authenticated request; see httpstreamer.resources 215 self.debug('[fd %5d] (ts %f) authenticated request %r', 216 request.transport.fileno(), time.time(), request) 217 218 # Now that we're authenticated (or authentication wasn't requested), 219 # write the file (or appropriate other response) to the client. 220 # We override static.File to implement Range requests, and to get 221 # access to the transfer object to abort it later; the bulk of this 222 # is a direct copy of static.File.render, though. 223 224 self.debug("Opening file %s", self._path) 225 226 # Use a deferred chain to uniformize error handling 227 # whether open returns a Deferred or directly a file. 228 d = defer.Deferred() 229 d.addCallback(lambda _: self._path.open()) 230 d.addCallbacks(self._gotProvider, self._fileOpenFailure, 231 callbackArgs=[request], errbackArgs=[request]) 232 d.callback(None) 233 return d
234
235 - def _fileOpenFailure(self, failure, request):
236 if failure.check(fileprovider.NotFoundError): 237 self.debug("Could not find resource %s", self._path) 238 return self.childNotFound.render(request) 239 if failure.check(fileprovider.CannotOpenError): 240 self.debug("%s is a directory, can't be GET", self._path) 241 return self.childNotFound.render(request) 242 if failure.check(fileprovider.AccessError): 243 return self.forbiddenResource.render(request) 244 return failure
245
246 - def _gotProvider(self, provider, request):
247 self.debug("Rendering file %s", self._path) 248 249 # Different headers not normally set in static.File... 250 # Specify that we will close the connection after this request, and 251 # that the client must not issue further requests. 252 # We do this because future requests on this server might actually need 253 # to go to a different process (because of the porter) 254 request.setHeader('Server', 'Flumotion/%s' % configure.version) 255 request.setHeader('Connection', 'close') 256 # We can do range requests, in bytes. 257 # UGLY HACK FIXME: if pdf, then do not accept range requests 258 # because Adobe Reader plugin messes up 259 if not self._path.mimeType == 'application/pdf': 260 request.setHeader('Accept-Ranges', 'bytes') 261 262 if request.setLastModified(provider.getmtime()) is http.CACHED: 263 return '' 264 265 contentType = provider.mimeType or self.defaultType 266 267 if contentType: 268 self.debug('File content type: %r' % contentType) 269 request.setHeader('content-type', contentType) 270 271 fileSize = provider.getsize() 272 # first and last byte offset we will write 273 first = 0 274 last = fileSize - 1 275 276 requestRange = request.getHeader('range') 277 if requestRange is not None: 278 # We have a partial data request. 279 # for interpretation of range, see RFC 2068 14.36 280 # examples: bytes=500-999; bytes=-500 (suffix mode; last 500) 281 self.log('range request, %r', requestRange) 282 rangeKeyValue = string.split(requestRange, '=') 283 if len(rangeKeyValue) != 2: 284 request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) 285 return '' 286 287 if rangeKeyValue[0] != 'bytes': 288 request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) 289 return '' 290 291 # ignore a set of range requests for now, only take the first 292 ranges = rangeKeyValue[1].split(',')[0] 293 l = ranges.split('-') 294 if len(l) != 2: 295 request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) 296 return '' 297 298 start, end = l 299 300 if start: 301 # byte-range-spec 302 first = int(start) 303 if end: 304 last = min(int(end), last) 305 elif end: 306 # suffix-byte-range-spec 307 count = int(end) 308 # we can't serve more than there are in the file 309 if count > fileSize: 310 count = fileSize 311 first = fileSize - count 312 else: 313 # need at least start or end 314 request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE) 315 return '' 316 317 # Start sending from the requested position in the file 318 if first: 319 # TODO: logs suggest this is called with negative values, 320 # figure out how 321 self.debug("Request for range \"%s\" of file, seeking to " 322 "%d of total file size %d", ranges, first, fileSize) 323 provider.seek(first) 324 325 # FIXME: is it still partial if the request was for the complete 326 # file ? Couldn't find a conclusive answer in the spec. 327 request.setResponseCode(http.PARTIAL_CONTENT) 328 request.setHeader('Content-Range', "bytes %d-%d/%d" % 329 (first, last, fileSize)) 330 331 request.setResponseRange(first, last, fileSize) 332 d = defer.maybeDeferred(self.do_prepareBody, 333 request, provider, first, last) 334 335 def dispatchMethod(header, request): 336 if request.method == 'HEAD': 337 # the _terminateRequest callback will be fired, and the request 338 # will be finished 339 return '' 340 return self._startRequest(request, header, provider, first, last)
341 342 d.addCallback(dispatchMethod, request) 343 344 return d
345
346 - def _startRequest(self, request, header, provider, first, last):
347 # Call request modifiers 348 for modifier in self._requestModifiers: 349 modifier.modify(request) 350 351 # PROBE: started request; see httpstreamer.resources 352 self.debug('[fd %5d] (ts %f) started request %r', 353 request.transport.fileno(), time.time(), request) 354 355 if self._metadataProvider: 356 self.log("Retrieving metadata using %r", self._metadataProvider) 357 # The URL can't be rewrited. If we ever want to do so the API 358 # of the file provider will have to be changed. 359 d = self._metadataProvider.getMetadata(request.path) 360 else: 361 d = defer.succeed(None) 362 363 def metadataError(failure): 364 self.warning('Error retrieving metadata for file %s' 365 ' using plug %r. %r', 366 request.path, 367 self._metadataProvider, 368 failure.value)
369 370 d.addErrback(metadataError) 371 d.addCallback(self._configureTransfer, request, header, 372 provider, first, last) 373 374 return d 375
376 - def _configureTransfer(self, metadata, request, header, 377 provider, first, last):
378 if self._rateController: 379 self.debug("Creating RateControl object using plug %r and " 380 "metadata %r", self._rateController, metadata) 381 382 # We are passing a metadata dictionary as Proxy settings. 383 # So the rate control can use it if needed. 384 d = defer.maybeDeferred( 385 self._rateController.createProducerConsumerProxy, 386 request, metadata) 387 else: 388 d = defer.succeed(request) 389 390 def attachProxy(consumer, provider, header, first, last): 391 # If we have a header, give it to the consumer first 392 if header: 393 consumer.write(header) 394 395 # Set the provider first, because for very small file 396 # the transfer could terminate right away. 397 request._provider = provider 398 transfer = FileTransfer(provider, last + 1, consumer) 399 request._transfer = transfer 400 401 # The important NOT_DONE_YET was already returned by the render() 402 # method and the value returned here is just part of a convention 403 # between _renderRequest and _terminateRequest. The latter assumes 404 # that if the deferred chain initiated by _renderRequest will fire 405 # with NOT_DONE_YET if the transfer is in progress. 406 return server.NOT_DONE_YET
407 408 d.addCallback(attachProxy, provider, header, first, last) 409 410 return d 411
412 - def do_prepareBody(self, request, provider, first, last):
413 """ 414 I am called before the body of the response gets written, 415 and after generic header setting has been done. 416 417 I set Content-Length. 418 419 Override me to send additional headers, or to prefix the body 420 with data headers. 421 422 I can return a Deferred, that should fire with a string header. That 423 header will be written to the request. 424 """ 425 request.setHeader("Content-Length", str(last - first + 1)) 426 return ''
427 428
429 -class MimedFileFactory(log.Loggable):
430 """ 431 I create File subclasses based on the mime type of the given path. 432 """ 433 434 logCategory = LOG_CATEGORY 435 436 defaultType = "application/octet-stream" 437
438 - def __init__(self, httpauth, 439 mimeToResource=None, 440 rateController=None, 441 requestModifiers=None, 442 metadataProvider=None):
443 self._httpauth = httpauth 444 self._mimeToResource = mimeToResource or {} 445 self._rateController = rateController 446 self._requestModifiers = requestModifiers 447 self._metadataProvider = metadataProvider
448
449 - def create(self, path):
450 """ 451 Creates and returns an instance of a File subclass based 452 on the mime type of the given path. 453 """ 454 mimeType = path.mimeType or self.defaultType 455 self.debug("Create %s file for %s", mimeType, path) 456 klazz = self._mimeToResource.get(mimeType, File) 457 return klazz(path, self._httpauth, 458 mimeToResource=self._mimeToResource, 459 rateController=self._rateController, 460 requestModifiers=self._requestModifiers, 461 metadataProvider=self._metadataProvider)
462 463
464 -class FLVFile(File):
465 """ 466 I am a File resource for FLV files. 467 I can handle requests with a 'start' GET parameter. 468 This parameter represents the byte offset from where to start. 469 If it is non-zero, I will output an FLV header so the result is 470 playable. 471 """ 472 header = 'FLV\x01\x01\000\000\000\x09\000\000\000\x09' 473
474 - def do_prepareBody(self, request, provider, first, last):
475 self.log('do_prepareBody for FLV') 476 length = last - first + 1 477 ret = '' 478 479 # if there is a non-zero start get parameter, prefix the body with 480 # our FLV header 481 # each value is a list 482 try: 483 start = int(request.args.get('start', ['0'])[0]) 484 except ValueError: 485 start = 0 486 # range request takes precedence over our start parsing 487 if request.getHeader('range') is None and start: 488 self.debug('Start %d passed, seeking', start) 489 provider.seek(start) 490 length = last - start + 1 + len(self.header) 491 ret = self.header 492 493 request.setHeader("Content-Length", str(length)) 494 495 return ret
496 497
498 -class MP4File(File):
499 """ 500 I am a File resource for MP4 files. 501 If I have a library for manipulating MP4 files available, I can handle 502 requests with a 'start' GET parameter, Without the library, I ignore this 503 parameter. 504 The 'start' parameter represents the time offset from where to start, in 505 seconds. If it is non-zero, I will seek inside the file to the sample with 506 that time, and prepend the content with rebuilt MP4 tables, to make the 507 output playable. 508 """ 509
510 - def do_prepareBody(self, request, provider, first, last):
511 self.log('do_prepareBody for MP4') 512 length = last - first + 1 513 ret = '' 514 515 # if there is a non-zero start get parameter, split the file, prefix 516 # the body with the regenerated header and seek inside the provider 517 try: 518 start = float(request.args.get('start', ['0'])[0]) 519 except ValueError: 520 start = 0 521 # range request takes precedence over our start parsing 522 if request.getHeader('range') is None and start and HAS_MP4SEEK: 523 self.debug('Start %f passed, seeking', start) 524 provider.seek(0) 525 d = self._split_file(provider, start) 526 527 def seekAndSetContentLength(header_and_offset): 528 header, offset = header_and_offset 529 # the header is a file-like object with the file pointer at the 530 # end, the offset is a number 531 length = last - offset + 1 + header.tell() 532 provider.seek(offset) 533 request.setHeader("Content-Length", str(length)) 534 header.seek(0) 535 return header.read()
536 537 def seekingFailed(failure): 538 # swallow the failure and serve the file from the beginning 539 self.warning("Seeking in MP4 file %s failed: %s", provider, 540 log.getFailureMessage(failure)) 541 provider.seek(0) 542 request.setHeader('Content-Length', str(length)) 543 return ret
544 545 d.addCallback(seekAndSetContentLength) 546 d.addErrback(seekingFailed) 547 return d 548 else: 549 request.setHeader('Content-Length', str(length)) 550 return defer.succeed(ret) 551
552 - def _split_file(self, provider, start):
553 d = defer.Deferred() 554 555 def read_some_data(how_much, from_where): 556 if how_much: 557 provider.seek(from_where) 558 read_d = provider.read(how_much) 559 read_d.addCallback(splitter.feed) 560 read_d.addErrback(d.errback) 561 else: 562 d.callback(splitter.result())
563 564 splitter = mp4seek.async.Splitter(start) 565 splitter.start(read_some_data) 566 567 return d 568 569
570 -class FileTransfer(log.Loggable):
571 """ 572 A class to represent the transfer of a file over the network. 573 """ 574 575 logCategory = LOG_CATEGORY 576 577 consumer = None 578
579 - def __init__(self, provider, size, consumer):
580 """ 581 @param provider: an asynchronous file provider 582 @type provider: L{fileprovider.File} 583 @param size: file position to which file should be read 584 @type size: int 585 @param consumer: consumer to receive the data 586 @type consumer: L{twisted.internet.interfaces.IFinishableConsumer} 587 """ 588 self.provider = provider 589 self.size = size 590 self.consumer = consumer 591 self.written = self.provider.tell() 592 self.bytesWritten = 0 593 self._pending = None 594 self._again = False # True if resume was called while waiting for data 595 self._finished = False # Set when we finish a transfer 596 self.debug("Calling registerProducer on %r", consumer) 597 consumer.registerProducer(self, 0)
598
599 - def resumeProducing(self):
600 if not self.consumer: 601 return 602 self._produce()
603
604 - def pauseProducing(self):
605 pass
606
607 - def stopProducing(self):
608 self.debug('Stop producing from %s at %d/%d bytes', 609 self.provider, self.provider.tell(), self.size) 610 # even though it's the consumer stopping us, from looking at 611 # twisted code it looks like we still are required to 612 # unregister and notify the request that we're done... 613 self._terminate()
614
615 - def _produce(self):
616 if self._pending: 617 # We already are waiting for data, just remember more is needed 618 self._again = True 619 return 620 self._again = False 621 d = self.provider.read(min(abstract.FileDescriptor.bufferSize, 622 self.size - self.written)) 623 self._pending = d 624 d.addCallbacks(self._cbGotData, self._ebReadFailed)
625
626 - def _cbGotData(self, data):
627 self._pending = None 628 629 # We might have got a stopProducing before the _cbGotData callback has 630 # been fired, so we might be in the _finished state. If so, just 631 # return. 632 if self._finished: 633 return 634 635 if data: 636 # WARNING! This call goes back to the reactor! Read the comment in 637 # _writeToConsumer! 638 self._writeToConsumer(data) 639 640 # We again might be in _finished state, because we might just 641 # got out of the reactor after writing some data to the consumer. 642 # 643 # The story goes thusly: 644 # 1) you write the last data chunk 645 # 2) before you get out of _writeToConsumer(), the _cbGotData gets 646 # fired again 647 # 3) because it's the last write (we've written the entire file) 648 # _terminate() gets called 649 # 4) consumer and provider are set to None 650 # 5) you return from the _writeToConsumer call 651 # 652 # If this happened, just exit (again) 653 if self._finished: 654 return 655 656 if self.provider.tell() == self.size: 657 self.debug('Written entire file of %d bytes from %s', 658 self.size, self.provider) 659 self._terminate() 660 elif self._again: 661 # Continue producing 662 self._produce()
663
664 - def _ebReadFailed(self, failure):
665 self._pending = None 666 667 if self._finished: 668 return 669 670 self.warning('Failure during file %s reading: %s', 671 self.provider, log.getFailureMessage(failure)) 672 self._terminate()
673
674 - def _writeToConsumer(self, data):
675 self.written += len(data) 676 self.bytesWritten += len(data) 677 # this .write will spin the reactor, calling .doWrite and then 678 # .resumeProducing again, so be prepared for a re-entrant call 679 self.consumer.write(data)
680
681 - def _terminate(self):
682 if self.size != self.bytesWritten: 683 self.warning("Terminated before writing the full %s bytes, " 684 "only %s byte written", self.size, self.bytesWritten) 685 try: 686 self.provider.close() 687 finally: 688 self.provider = None 689 self.consumer.unregisterProducer() 690 self.consumer.finish() 691 self.consumer = None 692 self._finished = True
693