Package flumotion :: Package twisted :: Module flavors
[hide private]

Source Code for Module flumotion.twisted.flavors

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_flavors -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Flumotion Twisted-like flavors 
 24   
 25  Inspired by L{twisted.spread.flavors} 
 26  """ 
 27   
 28  from twisted.internet import defer 
 29  from twisted.spread import pb 
 30  from zope.interface import Interface 
 31  from flumotion.common import log 
 32   
 33  __version__ = "$Rev: 7740 $" 
 34   
 35   
 36  ### Generice Cacheable/RemoteCache for state objects 
 37   
 38   
39 -class IStateListener(Interface):
40 """ 41 I am an interface for objects that want to listen to changes on 42 cached states. 43 """ 44
45 - def stateSet(self, object, key, value):
46 """ 47 @type object: L{StateRemoteCache} 48 @param object: the state object having changed 49 @type key: string 50 @param key: the key being set 51 @param value: the value the key is being set to 52 53 The given key on the given object has been set to the given value. 54 """
55
56 - def stateAppend(self, object, key, value):
57 """ 58 @type object: L{StateRemoteCache} 59 @param object: the state object having changed 60 @type key: string 61 @param key: the key being appended to 62 @param value: the value being appended to the list given by key 63 64 The given value has been added to the list given by the key. 65 """
66
67 - def stateRemove(self, object, key, value):
68 """ 69 @type object: L{StateRemoteCache} 70 @param object: the state object having changed 71 @type key: string 72 @param key: the key being removed from 73 @param value: the value being removed from the list given by key 74 75 The given value has been removed from the list given by the key. 76 """
77 78
79 -class StateCacheable(pb.Cacheable):
80 """ 81 I am a cacheable state object. 82 83 I cache key-value pairs, where values can be either single objects 84 or list of objects. 85 """ 86
87 - def __init__(self):
88 self._observers = [] 89 self._dict = {}
90 91 # our methods 92
93 - def addKey(self, key, value=None):
94 """ 95 Add a key to the state cache so it can be used with set. 96 """ 97 self._dict[key] = value
98 99 # don't use [] as the default value, it creates only one reference and 100 # reuses it 101
102 - def addListKey(self, key, value=None):
103 """ 104 Add a key for a list of objects to the state cache. 105 """ 106 if value is None: 107 value = [] 108 self._dict[key] = value
109 110 # don't use {} as the default value, it creates only one reference and 111 # reuses it 112
113 - def addDictKey(self, key, value=None):
114 """ 115 Add a key for a dict value to the state cache. 116 """ 117 if value is None: 118 value = {} 119 self._dict[key] = value
120
121 - def hasKey(self, key):
122 return key in self._dict.keys()
123
124 - def keys(self):
125 return self._dict.keys()
126
127 - def get(self, key, otherwise=None):
128 """ 129 Get the state cache value for the given key. 130 131 Return otherwise in case where key is present but value None. 132 """ 133 if not key in self._dict.keys(): 134 raise KeyError('%s in %r' % (key, self)) 135 136 v = self._dict[key] 137 # not v would also trigger empty lists 138 if v == None: 139 return otherwise 140 141 return v
142
143 - def set(self, key, value):
144 """ 145 Set a given state key to the given value. 146 Notifies observers of this Cacheable through observe_set. 147 """ 148 if not key in self._dict.keys(): 149 raise KeyError('%s in %r' % (key, self)) 150 151 self._dict[key] = value 152 dList = [o.callRemote('set', key, value) for o in self._observers] 153 return defer.DeferredList(dList)
154
155 - def append(self, key, value):
156 """ 157 Append the given object to the given list. 158 Notifies observers of this Cacheable through observe_append. 159 """ 160 if not key in self._dict.keys(): 161 raise KeyError('%s in %r' % (key, self)) 162 163 self._dict[key].append(value) 164 dList = [o.callRemote('append', key, value) for o in self._observers] 165 return defer.DeferredList(dList)
166
167 - def remove(self, key, value):
168 """ 169 Remove the given object from the given list. 170 Notifies observers of this Cacheable through observe_remove. 171 """ 172 if not key in self._dict.keys(): 173 raise KeyError('%s in %r' % (key, self)) 174 175 try: 176 self._dict[key].remove(value) 177 except ValueError: 178 raise ValueError('value %r not in list %r for key %r' % ( 179 value, self._dict[key], key)) 180 dList = [o.callRemote('remove', key, value) for o in self._observers] 181 dl = defer.DeferredList(dList) 182 return dl
183
184 - def setitem(self, key, subkey, value):
185 """ 186 Set a value in the given dict. 187 Notifies observers of this Cacheable through observe_setitem. 188 """ 189 if not key in self._dict.keys(): 190 raise KeyError('%s in %r' % (key, self)) 191 192 self._dict[key][subkey] = value 193 dList = [o.callRemote('setitem', key, subkey, value) 194 for o in self._observers] 195 return defer.DeferredList(dList)
196
197 - def delitem(self, key, subkey):
198 """ 199 Removes an element from the given dict. Note that the key refers 200 to the dict; it is the subkey (and its value) that will be removed. 201 Notifies observers of this Cacheable through observe_delitem. 202 """ 203 if not key in self._dict.keys(): 204 raise KeyError('%s in %r' % (key, self)) 205 206 try: 207 value = self._dict[key].pop(subkey) 208 except KeyError: 209 raise KeyError('key %r not in dict %r for key %r' % ( 210 subkey, self._dict[key], key)) 211 dList = [o.callRemote('delitem', key, subkey, value) for o in 212 self._observers] 213 dl = defer.DeferredList(dList) 214 return dl
215 216 # pb.Cacheable methods 217
218 - def getStateToCacheAndObserveFor(self, perspective, observer):
219 self._observers.append(observer) 220 return self._dict
221
222 - def stoppedObserving(self, perspective, observer):
223 self._observers.remove(observer)
224 225 # At some point, a StateRemoteCache will become invalid. The normal way 226 # would be losing the connection to the RemoteCacheable, although 227 # particular kinds of RemoteCache objects might have other ways 228 # (e.g. component removed from flow). 229 # 230 # We support listening for invalidation events. However, in order to 231 # ensure predictable program behavior, we can't do a notifyOnDisconnect 232 # directly on the broker. If we did that, program semantics would be 233 # dependent on the call order of the notifyOnDisconnect methods, which 234 # would likely lead to heisenbugs. 235 # 236 # Instead, invalidation will only be performed by the application, if at 237 # all, via an explicit call to invalidate(). 238 239
240 -class StateRemoteCache(pb.RemoteCache):
241 """ 242 I am a remote cache of a state object. 243 """ 244
245 - def __init__(self):
246 self._listeners = {}
247 # no constructor 248 # pb.RemoteCache.__init__(self) 249 250 # our methods 251
252 - def hasKey(self, key):
253 return key in self._dict.keys()
254
255 - def keys(self):
256 return self._dict.keys()
257
258 - def get(self, key, otherwise=None):
259 """ 260 Get the state cache value for the given key. 261 262 Return otherwise in case where key is present but value None. 263 """ 264 if not key in self._dict.keys(): 265 raise KeyError('%s in %r' % (key, self)) 266 267 v = self._dict[key] 268 # compare to actual None, otherwise we also get zero-like values 269 if v == None: 270 return otherwise 271 272 return v
273
274 - def _ensureListeners(self):
275 # when this is created through serialization from a JobCS, 276 # __init__ does not seem to get called, so create self._listeners 277 if not hasattr(self, '_listeners'): 278 # FIXME: this means that callbacks will be fired in 279 # arbitrary order; should be fired in order of connecting. 280 # Use twisted.python.util.OrderedDict instead 281 self._listeners = {}
282 283 #F0.8: remove set=None and move set_=None there 284
285 - def addListener(self, listener, set=None, append=None, remove=None, 286 setitem=None, delitem=None, invalidate=None, set_=None):
287 """ 288 Adds a listener to the remote cache. 289 290 The caller will be notified of state events via the functions 291 given as the 'set', 'append', and 'remove', 'setitem', and 292 'delitem' keyword arguments. 293 294 Always call this method using keyword arguments for the functions; 295 calling them with positional arguments is not supported. 296 297 Setting one of the event handlers to None will ignore that 298 event. It is an error for all event handlers to be None. 299 300 @param listener: new listener object that wants to receive 301 cache state change notifications. 302 @type listener: object implementing 303 L{flumotion.twisted.flavors.IStateListener} 304 @param set_: procedure to call when a value is set 305 @type set_: procedure(object, key, value) -> None 306 @param append: procedure to call when a value is appended to a list 307 @type append: procedure(object, key, value) -> None 308 @param remove: procedure to call when a value is removed from 309 a list 310 @type remove: procedure(object, key, value) -> None 311 @param setitem: procedure to call when a value is set in a dict 312 @type setitem: procedure(object, key, subkey, value) -> None 313 @param delitem: procedure to call when a value is removed 314 from a dict. 315 @type delitem: procedure(object, key, subkey, value) -> None 316 @param invalidate: procedure to call when this cache has been 317 invalidated. 318 @type invalidate: procedure(object) -> None 319 """ 320 # F0.8: remove set 321 if set: 322 import warnings 323 warnings.warn('Please use the set_ kwarg instead', 324 DeprecationWarning, stacklevel=2) 325 set_ = set 326 327 if not (set_ or append or remove or setitem or delitem or invalidate): 328 # FIXME: remove this behavior in F0.6 329 import sys 330 log.safeprintf(sys.stderr, 331 "Warning: Use of deprecated %r.addListener(%r)" 332 " without explicit event handlers\n", self, 333 listener) 334 set_ = listener.stateSet 335 append = listener.stateAppend 336 remove = listener.stateRemove 337 338 self._ensureListeners() 339 if listener in self._listeners: 340 raise KeyError( 341 "%r is already a listener of %r" % (listener, self)) 342 self._listeners[listener] = [set_, append, remove, setitem, 343 delitem, invalidate] 344 if invalidate and hasattr(self, '_cache_invalid'): 345 invalidate(self)
346
347 - def removeListener(self, listener):
348 self._ensureListeners() 349 if listener not in self._listeners: 350 raise KeyError(listener) 351 del self._listeners[listener]
352 353 # pb.RemoteCache methods 354
355 - def setCopyableState(self, dict):
356 self._dict = dict
357
358 - def _notifyListeners(self, index, *args):
359 # notify our local listeners; compute set of procs first, so as 360 # to allow the listeners set to change during the calls 361 self._ensureListeners() 362 for proc in [tup[index] for tup in self._listeners.values()]: 363 if proc: 364 try: 365 proc(self, *args) 366 except Exception, e: 367 # These are all programming errors 368 log.warning("stateremotecache", 369 'Exception in StateCache handler: %s', 370 log.getExceptionMessage(e))
371
372 - def observe_set(self, key, value):
373 self._dict[key] = value 374 # if we also subclass from Cacheable, then we're a proxy, so proxy 375 if hasattr(self, 'set'): 376 StateCacheable.set(self, key, value) 377 378 self._notifyListeners(0, key, value)
379
380 - def observe_append(self, key, value):
381 # if we also subclass from Cacheable, then we're a proxy, so proxy 382 if hasattr(self, 'append'): 383 StateCacheable.append(self, key, value) 384 else: 385 self._dict[key].append(value) 386 387 self._notifyListeners(1, key, value)
388
389 - def observe_remove(self, key, value):
390 # if we also subclass from Cacheable, then we're a proxy, so proxy 391 if hasattr(self, 'remove'): 392 StateCacheable.remove(self, key, value) 393 else: 394 try: 395 self._dict[key].remove(value) 396 except ValueError: 397 raise ValueError("value %r not under key %r with values %r" % 398 (value, key, self._dict[key])) 399 400 self._notifyListeners(2, key, value)
401
402 - def observe_setitem(self, key, subkey, value):
403 # if we also subclass from Cacheable, then we're a proxy, so proxy 404 if hasattr(self, 'setitem'): 405 StateCacheable.setitem(self, key, subkey, value) 406 else: 407 self._dict[key][subkey] = value 408 409 self._notifyListeners(3, key, subkey, value)
410
411 - def observe_delitem(self, key, subkey, value):
412 # if we also subclass from Cacheable, then we're a proxy, so proxy 413 if hasattr(self, 'delitem'): 414 StateCacheable.delitem(self, key, subkey) 415 else: 416 try: 417 del self._dict[key][subkey] 418 except KeyError: 419 raise KeyError("key %r not in dict %r for state dict %r" % 420 (subkey, self._dict[key], self._dict)) 421 422 self._notifyListeners(4, key, subkey, value)
423
424 - def invalidate(self):
425 """Invalidate this StateRemoteCache. 426 427 Calling this method will result in the invalidate callback being 428 called for all listeners that passed an invalidate handler to 429 addListener. This method is not called automatically; it is 430 provided as a convenience to applications. 431 """ 432 assert not hasattr(self, '_cache_invalid'), \ 433 'object has already been invalidated' 434 # if we also subclass from Cacheable, there is currently no way 435 # to remotely invalidate the cache. that's ok though, because 436 # double-caches are currently only used by the manager, which 437 # does not call invalidate() on its caches. 438 self._cache_invalid = True 439 440 self._notifyListeners(5)
441