Package flumotion :: Package component :: Package consumers :: Package httpstreamer :: Module httpstreamer
[hide private]

Source Code for Module flumotion.component.consumers.httpstreamer.httpstreamer

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23   
 24  import gst 
 25  from twisted.cred import credentials 
 26  from twisted.internet import reactor, error, defer 
 27  from twisted.web import server 
 28  from zope.interface import implements 
 29   
 30  from flumotion.common import gstreamer, errors 
 31  from flumotion.common import messages, netutils, interfaces 
 32  from flumotion.common.format import formatStorage, formatTime 
 33  from flumotion.common.i18n import N_, gettexter 
 34  from flumotion.component import feedcomponent 
 35  from flumotion.component.base import http 
 36  from flumotion.component.component import moods 
 37  from flumotion.component.consumers.httpstreamer import resources 
 38  from flumotion.component.misc.porter import porterclient 
 39  from flumotion.twisted import fdserver 
 40   
 41  __all__ = ['HTTPMedium', 'MultifdSinkStreamer'] 
 42  __version__ = "$Rev: 8058 $" 
 43  T_ = gettexter() 
 44  STATS_POLL_INTERVAL = 10 
 45  UI_UPDATE_THROTTLE_PERIOD = 2.0 # Don't update UI more than once every two 
 46                                  # seconds 
 47   
 48   
 49  # FIXME: generalize this class and move it out here ? 
 50   
 51   
52 -class Stats:
53
54 - def __init__(self, sink):
55 self.sink = sink 56 57 self.no_clients = 0 58 self.clients_added_count = 0 59 self.clients_removed_count = 0 60 self.start_time = time.time() 61 # keep track of the highest number and the last epoch this was reached 62 self.peak_client_number = 0 63 self.peak_epoch = self.start_time 64 self.load_deltas = [0, 0] 65 self._load_deltas_period = 10 # seconds 66 self._load_deltas_ongoing = [time.time(), 0, 0] 67 self._currentBitrate = -1 # not known yet 68 self._lastBytesReceived = -1 # not known yet 69 70 # keep track of average clients by tracking last average and its time 71 self.average_client_number = 0 72 self.average_time = self.start_time 73 74 self.hostname = "localhost" 75 self.port = 0 76 self.mountPoint = "/"
77
78 - def _updateAverage(self):
79 # update running average of clients connected 80 now = time.time() 81 # calculate deltas 82 dt1 = self.average_time - self.start_time 83 dc1 = self.average_client_number 84 dt2 = now - self.average_time 85 dc2 = self.no_clients 86 self.average_time = now # we can update now that we used self.a 87 if dt1 == 0: 88 # first measurement 89 self.average_client_number = 0 90 else: 91 dt = dt1 + dt2 92 before = (dc1 * dt1) / dt 93 after = dc2 * dt2 / dt 94 self.average_client_number = before + after
95
96 - def clientAdded(self):
97 self._updateAverage() 98 99 self.no_clients += 1 100 self.clients_added_count +=1 101 102 # >= so we get the last epoch this peak was achieved 103 if self.no_clients >= self.peak_client_number: 104 self.peak_epoch = time.time() 105 self.peak_client_number = self.no_clients
106
107 - def clientRemoved(self):
108 self._updateAverage() 109 self.no_clients -= 1 110 self.clients_removed_count +=1
111
112 - def _updateStats(self):
113 """ 114 Periodically, update our statistics on load deltas, and update the 115 UIState with new values for total bytes, bitrate, etc. 116 """ 117 118 oldtime, oldadd, oldremove = self._load_deltas_ongoing 119 add, remove = self.clients_added_count, self.clients_removed_count 120 now = time.time() 121 diff = float(now - oldtime) 122 123 self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff] 124 self._load_deltas_ongoing = [now, add, remove] 125 126 bytesReceived = self.getBytesReceived() 127 if self._lastBytesReceived >= 0: 128 self._currentBitrate = ((bytesReceived - self._lastBytesReceived) * 129 8 / STATS_POLL_INTERVAL) 130 self._lastBytesReceived = bytesReceived 131 132 self.update_ui_state() 133 134 self._updateCallLaterId = reactor.callLater(STATS_POLL_INTERVAL, 135 self._updateStats)
136
137 - def getCurrentBitrate(self):
138 if self._currentBitrate >= 0: 139 return self._currentBitrate 140 else: 141 return self.getBytesReceived() * 8 / self.getUptime()
142
143 - def getBytesSent(self):
144 return self.sink.get_property('bytes-served')
145
146 - def getBytesReceived(self):
147 return self.sink.get_property('bytes-to-serve')
148
149 - def getUptime(self):
150 return time.time() - self.start_time
151
152 - def getClients(self):
153 return self.no_clients
154
155 - def getPeakClients(self):
156 return self.peak_client_number
157
158 - def getPeakEpoch(self):
159 return self.peak_epoch
160
161 - def getAverageClients(self):
162 return self.average_client_number
163
164 - def getUrl(self):
165 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
166
167 - def getLoadDeltas(self):
168 return self.load_deltas
169
170 - def updateState(self, set):
171 c = self 172 173 bytes_sent = c.getBytesSent() 174 bytes_received = c.getBytesReceived() 175 uptime = c.getUptime() 176 177 set('stream-mime', c.get_mime()) 178 set('stream-url', c.getUrl()) 179 set('stream-uptime', formatTime(uptime)) 180 bitspeed = bytes_received * 8 / uptime 181 currentbitrate = self.getCurrentBitrate() 182 set('stream-bitrate', formatStorage(bitspeed) + 'bit/s') 183 set('stream-current-bitrate', 184 formatStorage(currentbitrate) + 'bit/s') 185 set('stream-totalbytes', formatStorage(bytes_received) + 'Byte') 186 set('stream-bitrate-raw', bitspeed) 187 set('stream-totalbytes-raw', bytes_received) 188 189 set('clients-current', str(c.getClients())) 190 set('clients-max', str(c.getMaxClients())) 191 set('clients-peak', str(c.getPeakClients())) 192 set('clients-peak-time', c.getPeakEpoch()) 193 set('clients-average', str(int(c.getAverageClients()))) 194 195 bitspeed = bytes_sent * 8 / uptime 196 set('consumption-bitrate', formatStorage(bitspeed) + 'bit/s') 197 set('consumption-bitrate-current', 198 formatStorage(currentbitrate * c.getClients()) + 'bit/s') 199 set('consumption-totalbytes', formatStorage(bytes_sent) + 'Byte') 200 set('consumption-bitrate-raw', bitspeed) 201 set('consumption-totalbytes-raw', bytes_sent)
202 203
204 -class HTTPMedium(feedcomponent.FeedComponentMedium):
205
206 - def __init__(self, comp):
207 """ 208 @type comp: L{Stats} 209 """ 210 feedcomponent.FeedComponentMedium.__init__(self, comp)
211
212 - def authenticate(self, bouncerName, keycard):
213 """ 214 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None. 215 """ 216 d = self.callRemote('authenticate', bouncerName, keycard) 217 return d
218
219 - def keepAlive(self, bouncerName, issuerName, ttl):
220 """ 221 @rtype: L{twisted.internet.defer.Deferred} 222 """ 223 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
224
225 - def removeKeycardId(self, bouncerName, keycardId):
226 """ 227 @rtype: L{twisted.internet.defer.Deferred} 228 """ 229 return self.callRemote('removeKeycardId', bouncerName, keycardId)
230 231 ### remote methods for manager to call on 232
233 - def remote_expireKeycard(self, keycardId):
234 return self.comp.httpauth.expireKeycard(keycardId)
235
236 - def remote_expireKeycards(self, keycardIds):
237 return self.comp.httpauth.expireKeycards(keycardIds)
238
239 - def remote_notifyState(self):
240 self.comp.update_ui_state()
241
242 - def remote_rotateLog(self):
243 self.comp.resource.rotateLogs()
244
245 - def remote_getStreamData(self):
246 return self.comp.getStreamData()
247
248 - def remote_getLoadData(self):
249 return self.comp.getLoadData()
250
251 - def remote_updatePorterDetails(self, path, username, password):
252 return self.comp.updatePorterDetails(path, username, password)
253 254 255 ### the actual component is a streamer using multifdsink 256 257
258 -class MultifdSinkStreamer(feedcomponent.ParseLaunchComponent, Stats):
259 implements(interfaces.IStreamingComponent) 260 261 checkOffset = True 262 263 # this object is given to the HTTPMedium as comp 264 logCategory = 'cons-http' 265 266 pipe_template = 'multifdsink name=sink ' + \ 267 'sync=false ' + \ 268 'recover-policy=3' 269 270 componentMediumClass = HTTPMedium 271
272 - def init(self):
273 reactor.debug = True 274 self.debug("HTTP streamer initialising") 275 276 self.caps = None 277 self.resource = None 278 self.httpauth = None 279 self.mountPoint = None 280 self.burst_on_connect = False 281 282 self.description = None 283 284 self.type = None 285 286 # Used if we've slaved to a porter. 287 self._pbclient = None 288 self._porterUsername = None 289 self._porterPassword = None 290 self._porterPath = None 291 292 # Or if we're a master, we open our own port here. Also used for URLs 293 # in the porter case. 294 self.port = None 295 # We listen on this interface, if set. 296 self.iface = None 297 298 self._tport = None 299 300 self._updateCallLaterId = None 301 self._lastUpdate = 0 302 self._updateUI_DC = None 303 304 self._pending_removals = {} 305 306 for i in ('stream-mime', 'stream-uptime', 'stream-current-bitrate', 307 'stream-bitrate', 'stream-totalbytes', 'clients-current', 308 'clients-max', 'clients-peak', 'clients-peak-time', 309 'clients-average', 'consumption-bitrate', 310 'consumption-bitrate-current', 311 'consumption-totalbytes', 'stream-bitrate-raw', 312 'stream-totalbytes-raw', 'consumption-bitrate-raw', 313 'consumption-totalbytes-raw', 'stream-url'): 314 self.uiState.addKey(i, None)
315
316 - def getDescription(self):
317 return self.description
318
319 - def get_pipeline_string(self, properties):
320 return self.pipe_template
321
322 - def check_properties(self, props, addMessage):
323 324 # F0.6: remove backwards-compatible properties 325 self.fixRenamedProperties(props, [ 326 ('issuer', 'issuer-class'), 327 ('mount_point', 'mount-point'), 328 ('porter_socket_path', 'porter-socket-path'), 329 ('porter_username', 'porter-username'), 330 ('porter_password', 'porter-password'), 331 ('user_limit', 'client-limit'), 332 ('bandwidth_limit', 'bandwidth-limit'), 333 ('burst_on_connect', 'burst-on-connect'), 334 ('burst_size', 'burst-size'), 335 ]) 336 337 if props.get('type', 'master') == 'slave': 338 for k in 'socket-path', 'username', 'password': 339 if not 'porter-' + k in props: 340 raise errors.ConfigError("slave mode, missing required" 341 " property 'porter-%s'" % k) 342 343 if 'burst-size' in props and 'burst-time' in props: 344 raise errors.ConfigError('both burst-size and burst-time ' 345 'set, cannot satisfy') 346 347 # tcp is where multifdsink is 348 version = gstreamer.get_plugin_version('tcp') 349 if version < (0, 10, 9, 1): 350 m = messages.Error(T_(N_( 351 "Version %s of the '%s' GStreamer plug-in is too old.\n"), 352 ".".join(map(str, version)), 'multifdsink')) 353 m.add(T_(N_("Please upgrade '%s' to version %s."), 354 'gst-plugins-base', '0.10.10')) 355 addMessage(m)
356
357 - def time_bursting_supported(self, sink):
358 try: 359 sink.get_property('units-max') 360 return True 361 except TypeError: 362 return False
363
364 - def setup_burst_mode(self, sink):
365 if self.burst_on_connect: 366 if self.burst_time and self.time_bursting_supported(sink): 367 self.debug("Configuring burst mode for %f second burst", 368 self.burst_time) 369 # Set a burst for configurable minimum time, plus extra to 370 # start from a keyframe if needed. 371 sink.set_property('sync-method', 4) # burst-keyframe 372 sink.set_property('burst-unit', 2) # time 373 sink.set_property('burst-value', 374 long(self.burst_time * gst.SECOND)) 375 376 # We also want to ensure that we have sufficient data available 377 # to satisfy this burst; and an appropriate maximum, all 378 # specified in units of time. 379 sink.set_property('time-min', 380 long((self.burst_time + 5) * gst.SECOND)) 381 382 sink.set_property('unit-type', 2) # time 383 sink.set_property('units-soft-max', 384 long((self.burst_time + 8) * gst.SECOND)) 385 sink.set_property('units-max', 386 long((self.burst_time + 10) * gst.SECOND)) 387 elif self.burst_size: 388 self.debug("Configuring burst mode for %d kB burst", 389 self.burst_size) 390 # If we have a burst-size set, use modern 391 # needs-recent-multifdsink behaviour to have complex bursting. 392 # In this mode, we burst a configurable minimum, plus extra 393 # so we start from a keyframe (or less if we don't have a 394 # keyframe available) 395 sink.set_property('sync-method', 'burst-keyframe') 396 sink.set_property('burst-unit', 'bytes') 397 sink.set_property('burst-value', self.burst_size * 1024) 398 399 # To use burst-on-connect, we need to ensure that multifdsink 400 # has a minimum amount of data available - assume 512 kB beyond 401 # the burst amount so that we should have a keyframe available 402 sink.set_property('bytes-min', (self.burst_size + 512) * 1024) 403 404 # And then we need a maximum still further above that - the 405 # exact value doesn't matter too much, but we want it 406 # reasonably small to limit memory usage. multifdsink doesn't 407 # give us much control here, we can only specify the max 408 # values in buffers. We assume each buffer is close enough 409 # to 4kB - true for asf and ogg, at least 410 sink.set_property('buffers-soft-max', 411 (self.burst_size + 1024) / 4) 412 sink.set_property('buffers-max', 413 (self.burst_size + 2048) / 4) 414 415 else: 416 # Old behaviour; simple burst-from-latest-keyframe 417 self.debug("simple burst-on-connect, setting sync-method 2") 418 sink.set_property('sync-method', 2) 419 420 sink.set_property('buffers-soft-max', 250) 421 sink.set_property('buffers-max', 500) 422 else: 423 self.debug("no burst-on-connect, setting sync-method 0") 424 sink.set_property('sync-method', 0) 425 426 sink.set_property('buffers-soft-max', 250) 427 sink.set_property('buffers-max', 500)
428
429 - def configure_pipeline(self, pipeline, properties):
430 Stats.__init__(self, sink=self.get_element('sink')) 431 432 self._updateCallLaterId = reactor.callLater(10, self._updateStats) 433 434 mountPoint = properties.get('mount-point', '') 435 if not mountPoint.startswith('/'): 436 mountPoint = '/' + mountPoint 437 self.mountPoint = mountPoint 438 439 # Hostname is used for a variety of purposes. We do a best-effort guess 440 # where nothing else is possible, but it's much preferable to just 441 # configure this 442 self.hostname = properties.get('hostname', None) 443 self.iface = self.hostname # We listen on this if explicitly 444 # configured, but not if it's only guessed 445 # at by the below code. 446 if not self.hostname: 447 # Don't call this nasty, nasty, probably flaky function unless we 448 # need to. 449 self.hostname = netutils.guess_public_hostname() 450 451 self.description = properties.get('description', None) 452 if self.description is None: 453 self.description = "Flumotion Stream" 454 455 # FIXME: tie these together more nicely 456 self.httpauth = http.HTTPAuthentication(self) 457 self.resource = resources.HTTPStreamingResource(self, 458 self.httpauth) 459 460 # check how to set client sync mode 461 sink = self.get_element('sink') 462 self.burst_on_connect = properties.get('burst-on-connect', False) 463 self.burst_size = properties.get('burst-size', 0) 464 self.burst_time = properties.get('burst-time', 0.0) 465 466 self.setup_burst_mode(sink) 467 468 sink.connect('deep-notify::caps', self._notify_caps_cb) 469 470 # these are made threadsafe using idle_add in the handler 471 sink.connect('client-added', self._client_added_handler) 472 473 # We now require a sufficiently recent multifdsink anyway that we can 474 # use the new client-fd-removed signal 475 sink.connect('client-fd-removed', self._client_fd_removed_cb) 476 sink.connect('client-removed', self._client_removed_cb) 477 478 if 'client-limit' in properties: 479 limit = int(properties['client-limit']) 480 self.resource.setUserLimit(limit) 481 if limit != self.resource.maxclients: 482 m = messages.Info(T_(N_( 483 "Your system configuration does not allow the maximum " 484 "client limit to be set to %d clients."), 485 limit)) 486 m.description = T_(N_( 487 "Learn how to increase the maximum number of clients.")) 488 m.section = 'chapter-optimization' 489 m.anchor = 'section-configuration-system-fd' 490 self.addMessage(m) 491 492 if 'bandwidth-limit' in properties: 493 limit = int(properties['bandwidth-limit']) 494 if limit < 1000: 495 # The wizard used to set this as being in Mbps, oops. 496 self.debug("Bandwidth limit set to unreasonably low %d bps, " 497 "assuming this is meant to be Mbps", limit) 498 limit *= 1000000 499 self.resource.setBandwidthLimit(limit) 500 501 if 'redirect-on-overflow' in properties: 502 self.resource.setRedirectionOnLimits( 503 properties['redirect-on-overflow']) 504 505 if 'bouncer' in properties: 506 self.httpauth.setBouncerName(properties['bouncer']) 507 508 if 'issuer-class' in properties: 509 self.httpauth.setIssuerClass(properties['issuer-class']) 510 511 if 'duration' in properties: 512 self.httpauth.setDefaultDuration( 513 float(properties['duration'])) 514 515 if 'domain' in properties: 516 self.httpauth.setDomain(properties['domain']) 517 518 if 'avatarId' in self.config: 519 self.httpauth.setRequesterId(self.config['avatarId']) 520 521 if 'ip-filter' in properties: 522 logFilter = http.LogFilter() 523 for f in properties['ip-filter']: 524 logFilter.addIPFilter(f) 525 self.resource.setLogFilter(logFilter) 526 527 self.type = properties.get('type', 'master') 528 if self.type == 'slave': 529 # already checked for these in do_check 530 self._porterPath = properties['porter-socket-path'] 531 self._porterUsername = properties['porter-username'] 532 self._porterPassword = properties['porter-password'] 533 534 self.port = int(properties.get('port', 8800))
535
536 - def __repr__(self):
537 return '<MultifdSinkStreamer (%s)>' % self.name
538
539 - def getMaxClients(self):
540 return self.resource.maxclients
541
542 - def get_mime(self):
543 if self.caps: 544 return self.caps.get_structure(0).get_name()
545
546 - def get_content_type(self):
547 mime = self.get_mime() 548 if mime == 'multipart/x-mixed-replace': 549 mime += ";boundary=ThisRandomString" 550 return mime
551
552 - def getUrl(self):
553 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
554
555 - def getStreamData(self):
556 socket = 'flumotion.component.plugs.streamdata.StreamDataProviderPlug' 557 if self.plugs[socket]: 558 plug = self.plugs[socket][-1] 559 return plug.getStreamData() 560 else: 561 return {'protocol': 'HTTP', 562 'description': self.description, 563 'url': self.getUrl()}
564
565 - def getLoadData(self):
566 """Return a tuple (deltaadded, deltaremoved, bytes_transferred, 567 current_clients, current_load) of our current bandwidth and 568 user values. 569 The deltas are estimates of how much bitrate is added, removed 570 due to client connections, disconnections, per second. 571 """ 572 # We calculate the estimated clients added/removed per second, then 573 # multiply by the stream bitrate 574 deltaadded, deltaremoved = self.getLoadDeltas() 575 576 bytes_received = self.getBytesReceived() 577 uptime = self.getUptime() 578 bitrate = bytes_received * 8 / uptime 579 580 bytes_sent = self.getBytesSent() 581 clients_connected = self.getClients() 582 current_load = bitrate * clients_connected 583 584 return (deltaadded * bitrate, deltaremoved * bitrate, bytes_sent, 585 clients_connected, current_load)
586
587 - def add_client(self, fd):
588 sink = self.get_element('sink') 589 sink.emit('add', fd)
590
591 - def remove_client(self, fd):
592 sink = self.get_element('sink') 593 sink.emit('remove', fd)
594
595 - def remove_all_clients(self):
596 """Remove all the clients. 597 598 Returns a deferred fired once all clients have been removed. 599 """ 600 if self.resource: 601 # can be None if we never went happy 602 self.debug("Asking for all clients to be removed") 603 return self.resource.removeAllClients()
604
605 - def update_ui_state(self):
606 """Update the uiState object. 607 Such updates (through this function) are throttled to a maximum rate, 608 to avoid saturating admin clients with traffic when many clients are 609 connecting/disconnecting. 610 """ 611 612 def setIfChanged(k, v): 613 if self.uiState.get(k) != v: 614 self.uiState.set(k, v)
615 616 def update_ui_state_later(): 617 self._updateUI_DC = None 618 self.update_ui_state()
619 620 now = time.time() 621 622 # If we haven't updated too recently, do it immediately. 623 if now - self._lastUpdate >= UI_UPDATE_THROTTLE_PERIOD: 624 if self._updateUI_DC: 625 self._updateUI_DC.cancel() 626 self._updateUI_DC = None 627 628 self._lastUpdate = now 629 # fixme: have updateState just update what changed itself 630 # without the hack above 631 self.updateState(setIfChanged) 632 elif not self._updateUI_DC: 633 # Otherwise, schedule doing this in a few seconds (unless an update 634 # was already scheduled) 635 self._updateUI_DC = reactor.callLater(UI_UPDATE_THROTTLE_PERIOD, 636 update_ui_state_later) 637
638 - def _client_added_handler(self, sink, fd):
639 self.log('[fd %5d] client_added_handler', fd) 640 Stats.clientAdded(self) 641 self.update_ui_state()
642
643 - def _client_removed_handler(self, sink, fd, reason, stats):
644 self.log('[fd %5d] client_removed_handler, reason %s', fd, reason) 645 if reason.value_name == 'GST_CLIENT_STATUS_ERROR': 646 self.warning('[fd %5d] Client removed because of write error' % fd) 647 648 self.resource.clientRemoved(sink, fd, reason, stats) 649 Stats.clientRemoved(self) 650 self.update_ui_state()
651 652 ### START OF THREAD-AWARE CODE (called from non-reactor threads) 653
654 - def _notify_caps_cb(self, element, pad, param):
655 caps = pad.get_negotiated_caps() 656 if caps == None: 657 return 658 659 caps_str = gstreamer.caps_repr(caps) 660 self.debug('Got caps: %s' % caps_str) 661 662 if not self.caps == None: 663 self.warning('Already had caps: %s, replacing' % caps_str) 664 665 self.debug('Storing caps: %s' % caps_str) 666 self.caps = caps 667 668 reactor.callFromThread(self.update_ui_state)
669 670 # We now use both client-removed and client-fd-removed. We call get-stats 671 # from the first callback ('client-removed'), but don't actually start 672 # removing the client until we get 'client-fd-removed'. This ensures that 673 # there's no window in which multifdsink still knows about the fd, 674 # but we've actually closed it, so we no longer get spurious duplicates. 675 # this can be called from both application and streaming thread ! 676
677 - def _client_removed_cb(self, sink, fd, reason):
678 stats = sink.emit('get-stats', fd) 679 self._pending_removals[fd] = (stats, reason)
680 681 # this can be called from both application and streaming thread ! 682
683 - def _client_fd_removed_cb(self, sink, fd):
684 (stats, reason) = self._pending_removals.pop(fd) 685 686 reactor.callFromThread(self._client_removed_handler, sink, fd, 687 reason, stats)
688 689 ### END OF THREAD-AWARE CODE 690
691 - def do_stop(self):
692 if self._updateCallLaterId: 693 self._updateCallLaterId.cancel() 694 self._updateCallLaterId = None 695 696 if self.httpauth: 697 self.httpauth.stopKeepAlive() 698 699 if self._tport: 700 self._tport.stopListening() 701 702 l = [] 703 # After we stop listening (so new connections aren't possible), 704 # disconnect (and thus log) all the old ones. 705 clients = self.remove_all_clients() 706 if clients: 707 l.append(clients) 708 709 if self.type == 'slave' and self._pbclient: 710 l.append(self._pbclient.deregisterPath(self.mountPoint)) 711 return defer.DeferredList(l)
712
713 - def updatePorterDetails(self, path, username, password):
714 """Provide a new set of porter login information, for when we're 715 in slave mode and the porter changes. 716 If we're currently connected, this won't disconnect - it'll just change 717 the information so that next time we try and connect we'll use the 718 new ones 719 """ 720 if self.type == 'slave': 721 self._porterUsername = username 722 self._porterPassword = password 723 724 creds = credentials.UsernamePassword(self._porterUsername, 725 self._porterPassword) 726 727 self._pbclient.startLogin(creds, self._pbclient.medium) 728 729 # If we've changed paths, we must do some extra work. 730 if path != self._porterPath: 731 self.debug("Changing porter login to use \"%s\"", path) 732 self._porterPath = path 733 self._pbclient.stopTrying() # Stop trying to connect with the 734 # old connector. 735 self._pbclient.resetDelay() 736 reactor.connectWith( 737 fdserver.FDConnector, self._porterPath, 738 self._pbclient, 10, checkPID=False) 739 else: 740 raise errors.WrongStateError( 741 "Can't specify porter details in master mode")
742
743 - def do_pipeline_playing(self):
744 # Override this to not set the component happy; instead do this once 745 # both the pipeline has started AND we've logged in to the porter. 746 if hasattr(self, '_porterDeferred'): 747 d = self._porterDeferred 748 else: 749 d = defer.succeed(None) 750 self.httpauth.scheduleKeepAlive() 751 d.addCallback(lambda res: 752 feedcomponent.ParseLaunchComponent.do_pipeline_playing( 753 self)) 754 return d
755
756 - def do_setup(self):
757 root = resources.HTTPRoot() 758 # TwistedWeb wants the child path to not include the leading / 759 mount = self.mountPoint[1:] 760 root.putChild(mount, self.resource) 761 if self.type == 'slave': 762 # Streamer is slaved to a porter. 763 764 # We have two things we want to do in parallel: 765 # - ParseLaunchComponent.do_start() 766 # - log in to the porter, then register our mountpoint with 767 # the porter. 768 # So, we return a DeferredList with a deferred for each of 769 # these tasks. The second one's a bit tricky: we pass a dummy 770 # deferred to our PorterClientFactory that gets fired once 771 # we've done all of the tasks the first time (it's an 772 # automatically-reconnecting client factory, and we only fire 773 # this deferred the first time) 774 775 self._porterDeferred = d = defer.Deferred() 776 mountpoints = [self.mountPoint] 777 self._pbclient = porterclient.HTTPPorterClientFactory( 778 server.Site(resource=root), mountpoints, d) 779 780 creds = credentials.UsernamePassword(self._porterUsername, 781 self._porterPassword) 782 self._pbclient.startLogin(creds, self._pbclient.medium) 783 784 self.debug("Starting porter login at \"%s\"", self._porterPath) 785 # This will eventually cause d to fire 786 reactor.connectWith( 787 fdserver.FDConnector, self._porterPath, 788 self._pbclient, 10, checkPID=False) 789 else: 790 # Streamer is standalone. 791 try: 792 self.debug('Listening on %d' % self.port) 793 iface = self.iface or "" 794 self._tport = reactor.listenTCP( 795 self.port, server.Site(resource=root), 796 interface=iface) 797 except error.CannotListenError: 798 t = 'Port %d is not available.' % self.port 799 self.warning(t) 800 m = messages.Error(T_(N_( 801 "Network error: TCP port %d is not available."), 802 self.port)) 803 self.addMessage(m) 804 self.setMood(moods.sad) 805 return defer.fail(errors.ComponentSetupHandledError(t))
806