Package flumotion :: Package component :: Module component
[hide private]

Source Code for Module flumotion.component.component

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  worker-side objects for components 
 24  """ 
 25   
 26  import os 
 27  import time 
 28  import socket 
 29   
 30  from twisted.internet import reactor, error, defer 
 31  from twisted.spread import pb 
 32  from twisted.python import reflect 
 33  from zope.interface import implements 
 34   
 35  from flumotion.configure import configure 
 36  from flumotion.common import interfaces, errors, log, planet, medium 
 37  from flumotion.common import componentui, common, messages 
 38  from flumotion.common import interfaces, reflectcall, debug 
 39  from flumotion.common.i18n import N_, gettexter 
 40  from flumotion.common.planet import moods 
 41  from flumotion.common.poller import Poller 
 42  from flumotion.twisted import credentials 
 43  from flumotion.twisted import pb as fpb 
 44   
 45   
 46  __version__ = "$Rev: 7162 $" 
 47  T_ = gettexter() 
 48   
 49   
50 -class ComponentClientFactory(fpb.ReconnectingFPBClientFactory):
51 """ 52 I am a client factory for a component logging in to the manager. 53 """ 54 logCategory = 'component' 55 perspectiveInterface = interfaces.IComponentMedium 56
57 - def __init__(self, component):
58 """ 59 @param component: L{flumotion.component.component.BaseComponent} 60 """ 61 # doing this as a class method triggers a doc error 62 fpb.ReconnectingFPBClientFactory.__init__(self) 63 64 self.component = component 65 # make a medium to interface with the manager 66 self.medium = component.componentMediumClass(component) 67 component.setMedium(self.medium) 68 69 self.maxDelay = 10 70 # get the interfaces implemented by the component medium class 71 #FIXME: interface 72 #self.interfaces = self.medium.__class__.__implements__ 73 74 self.logName = component.name
75
76 - def clientConnectionMade(self, broker):
77 self.medium.broker = broker 78 fpb.ReconnectingFPBClientFactory.clientConnectionMade(self, broker)
79 80 # vmethod implementation 81
82 - def gotDeferredLogin(self, d):
83 84 def remoteDisconnected(remoteReference): 85 if reactor.killed: 86 self.log('Connection to manager lost due to shutdown') 87 else: 88 self.warning('Lost connection to manager, ' 89 'will attempt to reconnect')
90 91 def loginCallback(reference): 92 self.info("Logged in to manager") 93 self.debug("remote reference %r" % reference) 94 95 self.medium.setRemoteReference(reference) 96 reference.notifyOnDisconnect(remoteDisconnected)
97 98 def loginFailedDisconnect(failure): 99 # We _always_ disconnect. Then, we may log a more specific failure 100 # message at a higher warning level. 101 self.debug('Login failed, reason: %s, disconnecting', failure) 102 self.disconnect() 103 return failure 104 105 def accessDeniedErrback(failure): 106 failure.trap(errors.NotAuthenticatedError) 107 self.warning('Access denied.') 108 109 def connectionRefusedErrback(failure): 110 failure.trap(error.ConnectionRefusedError) 111 self.warning('Connection to manager refused.') 112 113 def alreadyLoggedInErrback(failure): 114 failure.trap(errors.AlreadyConnectedError) 115 self.warning('Component with id %s is already logged in.', 116 self.medium.authenticator.avatarId) 117 118 def loginFailedErrback(failure): 119 self.warning('Login failed, reason: %s' % failure) 120 121 d.addCallback(loginCallback) 122 d.addErrback(loginFailedDisconnect) 123 d.addErrback(accessDeniedErrback) 124 d.addErrback(connectionRefusedErrback) 125 d.addErrback(alreadyLoggedInErrback) 126 d.addErrback(loginFailedErrback) 127 128 # we want to save the authenticator 129
130 - def startLogin(self, authenticator):
131 self.medium.setAuthenticator(authenticator) 132 return fpb.ReconnectingFPBClientFactory.startLogin(self, authenticator)
133 134
135 -def _maybeDeferredChain(procs, *args, **kwargs):
136 """ 137 Creates a deferred chain created by chaining calls to the given 138 procedures, each of them made with the given args and kwargs. 139 Only the result of the last procedure is returned; results for the 140 other procedures are discarded. 141 142 Failures triggered during any of the procedure short-circuit execution 143 of the other procedures and should be handled by the errbacks attached 144 to the deferred returned here. 145 146 @rtype: L{twisted.internet.defer.Deferred} 147 """ 148 149 def call_proc(_, p): 150 log.debug('', 'calling %r', p) 151 return p(*args, **kwargs)
152 p, procs = procs[0], procs[1:] 153 d = defer.maybeDeferred(call_proc, None, p) 154 for p in procs: 155 d.addCallback(call_proc, p) 156 return d 157 158 # needs to be before BaseComponent because BaseComponent references it 159 160
161 -class BaseComponentMedium(medium.PingingMedium):
162 """ 163 I am a medium interfacing with a manager-side avatar. 164 I implement a Referenceable for the manager's avatar to call on me. 165 I have a remote reference to the manager's avatar to call upon. 166 I am created by the L{ComponentClientFactory}. 167 168 @cvar authenticator: the authenticator used to log in to manager 169 @type authenticator: L{flumotion.twisted.pb.Authenticator} 170 """ 171 172 implements(interfaces.IComponentMedium) 173 logCategory = 'basecompmed' 174
175 - def __init__(self, component):
176 """ 177 @param component: L{flumotion.component.component.BaseComponent} 178 """ 179 self.comp = component 180 self.authenticator = None 181 self.broker = None
182
183 - def setRemoteReference(self, reference):
184 self.broker = None # We no longer need that reference 185 medium.PingingMedium.setRemoteReference(self, reference)
186 187 ### our methods 188
189 - def setup(self, config):
190 pass
191
192 - def getManagerIP(self):
193 """ 194 Return the manager IP as seen by us. 195 """ 196 assert self.remote or self.broker 197 broker = self.broker or self.remote.broker 198 peer = broker.transport.getPeer() 199 try: 200 host = peer.host 201 except AttributeError: 202 host = peer[1] 203 204 res = socket.gethostbyname(host) 205 self.debug("getManagerIP(): we think the manager's IP is %r" % res) 206 return res
207
208 - def getIP(self):
209 """ 210 Return the IP of this component based on connection to the manager. 211 212 Note: this is insufficient in general, and should be replaced by 213 network mapping stuff later. 214 """ 215 assert self.remote 216 host = self.remote.broker.transport.getHost() 217 self.debug("getIP(): using %r as our IP", host.host) 218 return host.host
219
220 - def setAuthenticator(self, authenticator):
221 """ 222 Set the authenticator the client factory has used to log in to the 223 manager. Can be reused by the component's medium to make 224 feed connections which also get authenticated by the manager's 225 bouncer. 226 227 @type authenticator: L{flumotion.twisted.pb.Authenticator} 228 """ 229 self.authenticator = authenticator
230 231 ### pb.Referenceable remote methods 232 ### called from manager by our avatar 233
234 - def remote_getState(self):
235 """ 236 Return the state of the component, which will be serialized to a 237 L{flumotion.common.planet.ManagerJobState} object. 238 239 @rtype: L{flumotion.common.planet.WorkerJobState} 240 @returns: state of component 241 """ 242 # we can only get the IP after we have a remote reference, so add it 243 # here 244 self.comp.state.set('manager-ip', self.getManagerIP()) 245 return self.comp.state
246
247 - def remote_getConfig(self):
248 """ 249 Return the configuration of the component. 250 251 @rtype: dict 252 @returns: component's current configuration 253 """ 254 return self.comp.config
255
256 - def remote_stop(self):
257 self.info('Stopping component') 258 return self.comp.stop()
259
260 - def remote_reloadComponent(self):
261 """Reload modules in the component.""" 262 from flumotion.common.reload import reloadFlumotion 263 reloadFlumotion()
264
265 - def remote_getUIState(self):
266 """Get a WorkerComponentUIState containing details needed to 267 present an admin-side UI state 268 """ 269 return self.comp.uiState
270
272 """ 273 Base implementation of getMasterClockInfo, can be overridden by 274 subclasses. By default, just returns None. 275 """ 276 return None
277
278 - def remote_getVersions(self):
279 return debug.getVersions()
280
281 - def remote_setFluDebug(self, debug):
282 """ 283 Sets the Flumotion debugging levels based on the passed debug string. 284 285 @since: 0.6.0 286 """ 287 self.debug('Setting Flumotion debug level to %s' % debug) 288 log.setDebug(debug)
289 290
291 -class BaseComponent(common.InitMixin, log.Loggable):
292 """ 293 I am the base class for all Flumotion components. 294 295 @ivar name: the name of the component 296 @type name: string 297 @ivar medium: the component's medium 298 @type medium: L{BaseComponentMedium} 299 @ivar uiState: state of the component to be shown in a UI. 300 Contains at least the following keys. 301 - cpu-percent: percentage of CPU use in last interval 302 - start-time: time when component was started, in epoch 303 seconds 304 - current-time: current time in epoch seconds, as seen on 305 component's machine, which might be out of 306 sync 307 - virtual-size: virtual memory size in bytes 308 Subclasses can add additional keys for their respective UI. 309 @type uiState: L{componentui.WorkerComponentUIState} 310 311 @cvar componentMediumClass: the medium class to use for this component 312 @type componentMediumClass: child class of L{BaseComponentMedium} 313 """ 314 315 logCategory = 'basecomp' 316 componentMediumClass = BaseComponentMedium 317
318 - def __init__(self, config, haveError=None):
319 """ 320 Subclasses should not override __init__ at all. 321 322 Instead, they should implement init(), which will be called 323 by this implementation automatically. 324 325 L{flumotion.common.common.InitMixin} for more details. 326 """ 327 self.debug("initializing %r with config %r", type(self), config) 328 self.config = config 329 self._haveError = haveError 330 331 # this will call self.init() for all implementors of init() 332 common.InitMixin.__init__(self) 333 334 self.setup()
335 336 # BaseComponent interface for subclasses related to component protocol 337
338 - def init(self):
339 """ 340 A subclass should do as little as possible in its init method. 341 In particular, it should not try to access resources. 342 343 Failures during init are marshalled back to the manager through 344 the worker's remote_create method, since there is no component state 345 proxied to the manager yet at the time of init. 346 """ 347 self.state = planet.WorkerJobState() 348 349 self.name = self.config['name'] 350 351 self.state.set('pid', os.getpid()) 352 self.setMood(moods.waking) 353 354 self.medium = None # the medium connecting us to the manager's avatar 355 356 self.uiState = componentui.WorkerComponentUIState() 357 self.uiState.addKey('cpu-percent') 358 self.uiState.addKey('start-time') 359 self.uiState.addKey('current-time') 360 self.uiState.addKey('virtual-size') 361 362 self.plugs = {} 363 364 self._happyWaits = [] 365 366 # Start the cpu-usage updating. 367 self._lastTime = time.time() 368 self._lastClock = time.clock() 369 self._cpuPoller = Poller(self._pollCPU, 5) 370 self._memoryPoller = Poller(self._pollMemory, 60) 371 372 self._shutdownHook = None
373
374 - def do_check(self):
375 """ 376 Subclasses can implement me to run any checks before the component 377 performs setup. 378 379 Messages can be added to the component state's 'messages' list key. 380 Any error messages added will trigger the component going to sad, 381 with L{flumotion.common.errors.ComponentSetupError} being raised 382 before getting to setup stage; do_setup() will not be called. 383 384 In the event of a fatal problem that can't be expressed through an 385 error message, this method should raise an exception or return a 386 failure. 387 388 It is not necessary to chain up in this function. The return 389 value may be a deferred. 390 """ 391 return defer.maybeDeferred(self.check_properties, 392 self.config['properties'], 393 self.addMessage)
394
395 - def check_properties(self, properties, addMessage):
396 """ 397 BaseComponent convenience vmethod for running checks. 398 399 A component implementation can override this method to run any 400 checks that it needs to. Typically, a check_properties 401 implementation will call the provided addMessage() callback to 402 note warnings or errors. For errors, addMessage() will set 403 component's mood to sad, which will abort the init process 404 before getting to do_setup(). 405 406 @param properties: The component's properties 407 @type properties: dict of string => object 408 @param addMessage: Thunk to add a message to the component 409 state. Will raise an exception if the 410 message is of level ERROR. 411 @type addMessage: L{flumotion.common.messages.Message} -> None 412 """ 413 pass
414
415 - def do_setup(self):
416 """ 417 Subclasses can implement me to set up the component before it is 418 started. It should set up the component, possibly opening files 419 and resources. 420 Non-programming errors should not be raised, but returned as a 421 failing deferred. 422 423 The return value may be a deferred. 424 """ 425 for socket, plugs in self.config['plugs'].items(): 426 self.plugs[socket] = [] 427 for plug in plugs: 428 entry = plug['entries']['default'] 429 instance = reflectcall.reflectCall(entry['module-name'], 430 entry['function-name'], 431 plug) 432 self.plugs[socket].append(instance) 433 self.debug('Starting plug %r on socket %s', 434 instance, socket) 435 instance.start(self) 436 437 # Call check methods, starting from the base class and working down to 438 # subclasses. 439 checks = common.get_all_methods(self, 'do_check', False) 440 441 def checkErrorCallback(result): 442 # if the mood is now sad, it means an error was encountered 443 # during check, and we should return a failure here. 444 # since the checks are responsible for adding a message, 445 # this is a handled error. 446 current = self.state.get('mood') 447 if current == moods.sad.value: 448 self.warning('Running checks made the component sad.') 449 raise errors.ComponentSetupHandledError()
450 451 checks.append(checkErrorCallback) 452 return _maybeDeferredChain(checks, self)
453
454 - def do_stop(self):
455 """ 456 BaseComponent vmethod for stopping. 457 The component should do any cleanup it needs, but must not set the 458 component's mood to sleeping. 459 460 @Returns: L{twisted.internet.defer.Deferred} 461 """ 462 for socket, plugs in self.plugs.items(): 463 for plug in plugs: 464 self.debug('Stopping plug %r on socket %s', plug, socket) 465 plug.stop(self) 466 467 for message in self.state.get('messages'): 468 # FIXME: not necessary 469 self.state.remove('messages', message) 470 471 if self._cpuPoller: 472 self._cpuPoller.stop() 473 self._cpuPoller = None 474 if self._memoryPoller: 475 self._memoryPoller.stop() 476 self._memoryPoller = None 477 478 if self._shutdownHook: 479 self.debug('_stoppedCallback: firing shutdown hook') 480 self._shutdownHook()
481 482 ### BaseComponent implementation related to compoment protocol 483
484 - def setup(self):
485 """ 486 Sets up the component. Called during __init__, so be sure not 487 to raise exceptions, instead adding messages to the component 488 state. 489 """ 490 491 def run_setups(): 492 setups = common.get_all_methods(self, 'do_setup', False) 493 return _maybeDeferredChain(setups, self)
494 495 def setup_complete(_): 496 self.debug('setup completed') 497 self.setup_completed() 498 499 def got_error(failure): 500 txt = log.getFailureMessage(failure) 501 self.debug('got_error: %s', txt) 502 if not failure.check(errors.ComponentSetupHandledError): 503 self.warning('Setup failed: %s', txt) 504 m = messages.Error(T_(N_("Could not setup component.")), 505 debug=txt, 506 mid="component-setup-%s" % self.name) 507 # will call setMood(moods.sad) 508 self.addMessage(m) 509 510 # swallow 511 return None 512 513 self.setMood(moods.waking) 514 self.uiState.set('start-time', time.time()) 515 516 d = run_setups() 517 d.addCallbacks(setup_complete, got_error) 518 # all status info via messages and the mood 519
520 - def setup_completed(self):
521 self.debug('turning happy') 522 self.setMood(moods.happy)
523
524 - def setShutdownHook(self, shutdownHook):
525 """ 526 Set the shutdown hook for this component (replacing any previous hook). 527 When a component is stopped, then this hook will be fired. 528 """ 529 self._shutdownHook = shutdownHook
530
531 - def stop(self):
532 """ 533 Tell the component to stop. 534 The connection to the manager will be closed. 535 The job process will also finish. 536 """ 537 self.debug('BaseComponent.stop') 538 539 # Set ourselves to waking while we're shutting down. 540 self.setMood(moods.waking) 541 542 # Run stop methods, starting from the subclass, up to this base class. 543 stops = common.get_all_methods(self, 'do_stop', True) 544 return _maybeDeferredChain(stops, self)
545 546 ### BaseComponent public methods 547
548 - def getName(self):
549 return self.name
550
551 - def setWorkerName(self, workerName):
552 self.state.set('workerName', workerName)
553
554 - def getWorkerName(self):
555 return self.state.get('workerName')
556
557 - def setMedium(self, medium):
558 assert isinstance(medium, BaseComponentMedium) 559 self.medium = medium 560 self.medium.logName = self.getName()
561
562 - def setMood(self, mood):
563 """ 564 Set the given mood on the component if it's different from the current 565 one. 566 """ 567 current = self.state.get('mood') 568 569 if current == mood.value: 570 self.log('already in mood %r' % mood) 571 return 572 elif current == moods.sad.value: 573 self.info('tried to set mood to %r, but already sad :-(' % mood) 574 return 575 576 self.doLog(log.DEBUG, -2, 'MOOD changed to %r by caller', mood) 577 self.state.set('mood', mood.value) 578 579 if mood == moods.happy: 580 while self._happyWaits: 581 self._happyWaits.pop(0).callback(None) 582 elif mood == moods.sad: 583 while self._happyWaits: 584 self._happyWaits.pop(0).errback(errors.ComponentStartError())
585
586 - def getMood(self):
587 """ 588 Gets the mood on the component. 589 590 @rtype: int 591 """ 592 return self.state.get('mood')
593
594 - def waitForHappy(self):
595 mood = self.getMood() 596 if mood == moods.happy.value: 597 return defer.succeed(None) 598 elif mood == moods.sad.value: 599 return defer.fail(errors.ComponentStartError()) 600 else: 601 d = defer.Deferred() 602 self._happyWaits.append(d) 603 return d
604
605 - def addMessage(self, message):
606 """ 607 Add a message to the component. 608 If any of the messages is an error, the component will turn sad. 609 610 @type message: L{flumotion.common.messages.Message} 611 """ 612 self.state.append('messages', message) 613 if message.level == messages.ERROR: 614 self.debug('error message, turning sad') 615 self.setMood(moods.sad) 616 if self._haveError: 617 self._haveError(message)
618
619 - def fixRenamedProperties(self, properties, list):
620 """ 621 Fix properties that have been renamed from a previous version, 622 and add a warning for them. 623 624 @param properties: properties; will be modified as a result. 625 @type properties: dict 626 @param list: list of (old, new) tuples of property names. 627 @type list: list of tuple of (str, str) 628 """ 629 found = [] 630 for old, new in list: 631 if old in properties: 632 found.append((old, new)) 633 634 if found: 635 m = messages.Warning(T_(N_( 636 "Your configuration uses deprecated properties. " 637 "Please update your configuration and correct them.\n")), 638 mid="deprecated") 639 for old, new in found: 640 m.add(T_(N_( 641 "Please rename '%s' to '%s'.\n"), 642 old, new)) 643 self.debug("Setting new property '%s' to %r", new, 644 properties[old]) 645 properties[new] = properties[old] 646 del properties[old] 647 self.addMessage(m)
648
649 - def adminCallRemote(self, methodName, *args, **kwargs):
650 """ 651 Call a remote method on all admin client views on this component. 652 653 This gets serialized through the manager and multiplexed to all 654 admin clients, and from there on to all views connected to each 655 admin client model. 656 657 Because there can be any number of admin clients that this call 658 will go out do, it does not make sense to have one return value. 659 This function will return None always. 660 """ 661 if self.medium: 662 self.medium.callRemote("adminCallRemote", methodName, 663 *args, **kwargs) 664 else: 665 self.debug('asked to adminCallRemote(%s, *%r, **%r), but ' 666 'no manager.' 667 % (methodName, args, kwargs))
668
669 - def _pollCPU(self):
670 # update CPU time stats 671 nowTime = time.time() 672 nowClock = time.clock() 673 deltaTime = nowTime - self._lastTime 674 deltaClock = nowClock - self._lastClock 675 self._lastTime = nowTime 676 self._lastClock = nowClock 677 # deltaClock can be < 0 if time.clock() wrapped around 678 if deltaClock >= 0: 679 CPU = deltaClock/deltaTime 680 self.log('latest CPU use: %r', CPU) 681 self.uiState.set('cpu-percent', CPU) 682 683 self.uiState.set('current-time', nowTime)
684
685 - def _pollMemory(self):
686 # Figure out our virtual memory size and report that. 687 # I don't know a nicer way to find vsize than groping /proc/ 688 handle = open('/proc/%d/stat' % os.getpid()) 689 line = handle.read() 690 handle.close() 691 fields = line.split() 692 # field 1 (comm) could potentially contain spaces and thus split over 693 # multiple list items, but our processes do not contain spaces 694 vsize = int(fields[22]) 695 self.log('vsize is %d', vsize) 696 self.uiState.set('virtual-size', vsize)
697