Package flumotion :: Package twisted :: Module integration
[hide private]

Source Code for Module flumotion.twisted.integration

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Framework for writing automated integration tests. 
 24   
 25  This module provides a way of writing automated integration tests from 
 26  within Twisted's unit testing framework, trial. Test cases are 
 27  constructed as subclasses of the normal trial 
 28  L{twisted.trial.unittest.TestCase} class. 
 29   
 30  Integration tests look like normal test methods, except that they are 
 31  decorated with L{integration.test}, take an extra "plan" argument, and 
 32  do not return anything. For example:: 
 33   
 34    from twisted.trial import unittest 
 35    from flumotion.twisted import integration 
 36   
 37    class IntegrationTestExample(unittest.TestCase): 
 38        @integration.test 
 39        def testEchoFunctionality(self, plan): 
 40            process = plan.spawn('echo', 'hello world') 
 41            plan.wait(process, 0) 
 42   
 43  This example will spawn a process, as if you typed "echo 'hello world'" 
 44  at the shell prompt. It then waits for the process to exit, expecting 
 45  the exit status to be 0. 
 46   
 47  The example illustrates two of the fundamental plan operators, spawn and 
 48  wait. "spawn" spawns a process. "wait" waits for a process to finish. 
 49  The other operators are "spawnPar", which spawns a number of processes 
 50  in parallel, "waitPar", which waits for a number of processes in 
 51  parallel, and "kill", which kills one or more processes via SIGTERM and 
 52  then waits for them to exit. 
 53   
 54  It is evident that this framework is most appropriate for testing the 
 55  integration of multiple processes, and is not suitable for in-process 
 56  tests. The plan that is built up is only executed after the test method 
 57  exits, via the L{integration.test} decorator; the writer of the 
 58  integration test does not have access to the plan's state. 
 59   
 60  Note that all process exits must be anticipated. If at any point the 
 61  integration tester receives SIGCHLD, the next operation must be a wait 
 62  for that process. If this is not the case, the test is interpreted as 
 63  having failed. 
 64   
 65  Also note that while the test is running, the stdout and stderr of each 
 66  spawned process is redirected into log files in a subdirectory of where 
 67  the test is located. For example, in the previous example, the following 
 68  files will be created:: 
 69   
 70    $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stdout 
 71    $testdir/IntegrationTestExample-$date/testEchoFunctionality/echo.stderr 
 72   
 73  In the case that multiple echo commands are run in the same plan, the 
 74  subsequent commands will be named as echo-1, echo-2, and the like. Upon 
 75  successful completion of the test case, the log directory will be 
 76  deleted. 
 77  """ 
 78   
 79  import os 
 80  import signal 
 81   
 82  from twisted.python import failure 
 83  from twisted.internet import reactor, protocol, defer 
 84  from flumotion.common import log as flog 
 85   
 86  __version__ = "$Rev: 7162 $" 
 87   
 88   
 89  # Twisted's reactor.iterate() is defined like this: 
 90  # 
 91  #     def iterate(self, delay=0): 
 92  #        """See twisted.internet.interfaces.IReactorCore.iterate. 
 93  #        """ 
 94  #        self.runUntilCurrent() 
 95  #        self.doIteration(delay) 
 96  # 
 97  # runUntilCurrent runs all the procs on the threadCallQueue. So if 
 98  # something is added to the threadCallQueue between runUntilCurrent() 
 99  # and doIteration(), the reactor needs to have an fd ready for reading 
100  # to shortcut the select(). This is done by callFromThread() calling 
101  # reactor.wakeUp(), which will write on the wakeup FD. 
102  # 
103  # HOWEVER. For some reason reactor.wakeUp() only writes on the fd if it 
104  # is being called from another thread. This is obviously borked in the 
105  # signal-handling case, when a signal arrives between runUntilCurrent() 
106  # and doIteration(), and is processed via reactor.callFromThread(), as 
107  # is the case with SIGCHLD. So we monkeypatch the reactor to always wake 
108  # the waker. This is twisted bug #1997. 
109  reactor.wakeUp = lambda: reactor.waker and reactor.waker.wakeUp() 
110   
111   
112 -def log(format, *args):
113 flog.doLog(flog.LOG, None, 'integration', format, args, -2)
114 115
116 -def debug(format, *args):
117 flog.doLog(flog.DEBUG, None, 'integration', format, args, -2)
118 119
120 -def info(format, *args):
121 flog.doLog(flog.INFO, None, 'integration', format, args, -2)
122 123
124 -def warning(format, *args):
125 flog.doLog(flog.WARN, None, 'integration', format, args, -2)
126 127
128 -def error(format, *args):
129 flog.doLog(flog.ERROR, None, 'integration', format, args, -2)
130 131
132 -def _which(executable):
133 if os.sep in executable: 134 if os.access(os.path.abspath(executable), os.X_OK): 135 return os.path.abspath(executable) 136 elif os.getenv('PATH'): 137 for path in os.getenv('PATH').split(os.pathsep): 138 if os.access(os.path.join(path, executable), os.X_OK): 139 return os.path.join(path, executable) 140 raise CommandNotFoundException(executable)
141 142
143 -class UnexpectedExitCodeException(Exception):
144
145 - def __init__(self, process, expectedCode, actualCode):
146 Exception.__init__(self) 147 self.process = process 148 self.expected = expectedCode 149 self.actual = actualCode
150
151 - def __str__(self):
152 return ('Expected exit code %r from %r, but got %r' 153 % (self.expected, self.process, self.actual))
154 155
156 -class UnexpectedExitException(Exception):
157
158 - def __init__(self, process):
159 Exception.__init__(self) 160 self.process = process
161
162 - def __str__(self):
163 return 'The process %r exited prematurely.' % self.process
164 165
166 -class CommandNotFoundException(Exception):
167
168 - def __init__(self, command):
169 Exception.__init__(self) 170 self.command = command
171
172 - def __str__(self):
173 return 'Command %r not found in the PATH.' % self.command
174 175
176 -class ProcessesStillRunningException(Exception):
177
178 - def __init__(self, processes):
179 Exception.__init__(self) 180 self.processes = processes
181
182 - def __str__(self):
183 return ('Processes still running at end of test: %r' 184 % (self.processes, ))
185 186
187 -class TimeoutException(Exception):
188
189 - def __init__(self, process, status):
190 self.process = process 191 self.status = status
192
193 - def __str__(self):
194 return ('Timed out waiting for %r to exit with status %r' 195 % (self.process, self.status))
196 197
198 -class ProcessProtocol(protocol.ProcessProtocol):
199
200 - def __init__(self):
201 self.exitDeferred = defer.Deferred() 202 self.timedOut = False
203
204 - def getDeferred(self):
205 return self.exitDeferred
206
207 - def timeout(self, process, status):
208 info('forcing timeout for process protocol %r', self) 209 self.timedOut = True 210 self.exitDeferred.errback(TimeoutException(process, status))
211
212 - def processEnded(self, status):
213 info('process ended with status %r, exit code %r', 214 status, status.value.exitCode) 215 if self.timedOut: 216 warning('already timed out??') 217 print 'already timed out quoi?' 218 else: 219 info('process ended with status %r, exit code %r', 220 status, status.value.exitCode) 221 self.exitDeferred.callback(status.value.exitCode)
222 223
224 -class Process:
225 NOT_STARTED, STARTED, STOPPED = 'NOT-STARTED', 'STARTED', 'STOPPED' 226
227 - def __init__(self, name, argv, testDir):
228 self.name = name 229 self.argv = (_which(argv[0]), ) + argv[1:] 230 self.testDir = testDir 231 232 self.pid = None 233 self.protocol = None 234 self.state = self.NOT_STARTED 235 self._timeoutDC = None 236 237 log('created process object %r', self)
238
239 - def start(self):
240 assert self.state == self.NOT_STARTED 241 242 self.protocol = ProcessProtocol() 243 244 stdout = open(os.path.join(self.testDir, self.name + '.stdout'), 'w') 245 stderr = open(os.path.join(self.testDir, self.name + '.stderr'), 'w') 246 # don't give it a stdin, output to log files 247 childFDs = {1: stdout.fileno(), 2: stderr.fileno()} 248 # There's a race condition in twisted.internet.process, whereby 249 # signals received between the fork() and exec() in the child 250 # are handled with the twisted handlers, i.e. postponed, but 251 # they never get called because of the exec(). The end is they 252 # are ignored. 253 # 254 # So, work around that by resetting the sigterm handler to the 255 # default so if we self.kill() immediately after self.start(), 256 # that the subprocess won't ignore the signal. This is a window 257 # in the parent in which SIGTERM will cause immediate 258 # termination instead of the twisted nice termination, but 259 # that's better than the kid missing the signal. 260 info('spawning process %r, argv=%r', self, self.argv) 261 termHandler = signal.signal(signal.SIGTERM, signal.SIG_DFL) 262 env = dict(os.environ) 263 env['FLU_DEBUG'] = '5' 264 process = reactor.spawnProcess(self.protocol, self.argv[0], 265 env=env, args=self.argv, 266 childFDs=childFDs) 267 signal.signal(signal.SIGTERM, termHandler) 268 # close our handles on the log files 269 stdout.close() 270 stderr.close() 271 272 # it's possible the process *already* exited, from within the 273 # spawnProcess itself. So set our state to STARTED, *then* 274 # attach the callback. 275 self.pid = process.pid 276 self.state = self.STARTED 277 278 def got_exit(res): 279 self.state = self.STOPPED 280 info('process %r has stopped', self) 281 return res
282 self.protocol.getDeferred().addCallback(got_exit)
283
284 - def kill(self, sig=signal.SIGTERM):
285 assert self.state == self.STARTED 286 info('killing process %r, signal %d', self, sig) 287 os.kill(self.pid, sig)
288
289 - def wait(self, status, timeout=20):
290 assert self.state != self.NOT_STARTED 291 info('waiting for process %r to exit', self) 292 d = self.protocol.getDeferred() 293 294 def got_exit(res): 295 debug('process %r exited with status %r', self, res) 296 if res != status: 297 warning('expected exit code %r for process %r, but got %r', 298 status, self, res) 299 raise UnexpectedExitCodeException(self, status, res)
300 d.addCallback(got_exit) 301 if self.state == self.STARTED: 302 self._timeoutDC = reactor.callLater(timeout, 303 self.protocol.timeout, 304 self, 305 status) 306 307 def cancel_timeout(res): 308 debug('cancelling timeout for %r', self) 309 if self._timeoutDC.active(): 310 self._timeoutDC.cancel() 311 return res 312 d.addCallbacks(cancel_timeout, cancel_timeout) 313 return d 314
315 - def __repr__(self):
316 return '<Process %s in state %s>' % (self.name, self.state)
317 318
319 -class PlanExecutor:
320 # both the vm and its ops 321
322 - def __init__(self):
323 self.processes = [] 324 self.timeout = 20
325
326 - def spawn(self, process):
327 assert process not in self.processes 328 self.processes.append(process) 329 process.start() 330 return defer.succeed(True)
331
332 - def checkExits(self, expectedExits):
333 for process in self.processes: 334 if (process.state != process.STARTED 335 and process not in expectedExits): 336 raise UnexpectedExitException(process)
337
338 - def kill(self, process):
339 assert process in self.processes 340 process.kill() 341 return defer.succeed(True)
342
343 - def wait(self, process, exitCode):
344 assert process in self.processes 345 346 def remove_from_processes_list(_): 347 self.processes.remove(process)
348 d = process.wait(exitCode, timeout=self.timeout) 349 d.addCallback(remove_from_processes_list) 350 return d
351
352 - def _checkProcesses(self, failure=None):
353 if self.processes: 354 warning('processes still running at end of test: %r', 355 self.processes) 356 e = ProcessesStillRunningException(self.processes) 357 dlist = [] 358 # reap all processes, and once we have them reaped, errback 359 for p in self.processes: 360 if p.state != p.STARTED: 361 continue 362 d = defer.Deferred() 363 dlist.append(d) 364 365 def callbacker(d): 366 return lambda status: d.callback(status.value.exitCode)
367 p.protocol.processEnded = callbacker(d) 368 p.kill(sig=signal.SIGKILL) 369 d = defer.DeferredList(dlist) 370 371 def error(_): 372 if failure: 373 return failure 374 else: 375 raise e 376 d.addCallback(error) 377 return d 378 return failure 379
380 - def run(self, ops, timeout=20):
381 self.timeout = timeout 382 d = defer.Deferred() 383 384 def run_op(_, op): 385 # print 'Last result: %r' % (_, ) 386 # print 'Now running: %s(%r)' % (op[0].__name__, op[1:]) 387 return op[0](*op[1:])
388 for op in ops: 389 d.addCallback(run_op, op) 390 d.addCallbacks(lambda _: self._checkProcesses(failure=None), 391 lambda failure: self._checkProcesses(failure=failure)) 392 393 # We should only spawn processes when twisted has set up its 394 # sighandlers. It does that *after* firing the reactor startup 395 # event and before entering the reactor loop. So, make sure 396 # twisted is ready for us by firing the plan in a callLater. 397 reactor.callLater(0, d.callback, None) 398 return d 399 400
401 -class Plan:
402
403 - def __init__(self, testCase, testName):
404 self.name = testName 405 self.testCaseName = testCase.__class__.__name__ 406 self.processes = {} 407 self.outputDir = self._makeOutputDir(os.getcwd()) 408 409 # put your boots on monterey jacks, cause this gravy just made a 410 # virtual machine whose instructions are python methods 411 self.vm = PlanExecutor() 412 self.ops = [] 413 self.timeout = 20
414
415 - def _makeOutputDir(self, testDir):
416 # ensure that testDir exists 417 try: 418 os.mkdir(testDir) 419 except OSError: 420 pass 421 tail = '%s-%s' % (self.testCaseName, self.name) 422 outputDir = os.path.join(testDir, tail) 423 os.mkdir(outputDir) 424 return outputDir
425
426 - def _cleanOutputDir(self):
427 for root, dirs, files in os.walk(self.outputDir, topdown=False): 428 for name in files: 429 os.remove(os.path.join(root, name)) 430 for name in dirs: 431 os.rmdir(os.path.join(root, name)) 432 os.rmdir(self.outputDir) 433 self.outputDir = None
434
435 - def _allocProcess(self, args):
436 command = args[0] 437 name = command 438 i = 0 439 while name in self.processes: 440 i += 1 441 name = '%s-%d' % (command, i) 442 process = Process(name, args, self.outputDir) 443 self.processes[name] = process 444 return process
445
446 - def _appendOp(self, *args):
447 self.ops.append(args)
448
449 - def setTimeout(self, timeout):
450 self.timeout = timeout
451
452 - def spawn(self, command, *args):
453 allArgs = (command, ) + args 454 process, = self.spawnPar(allArgs) 455 return process
456
457 - def spawnPar(self, *argvs):
458 processes = [] 459 self._appendOp(self.vm.checkExits, ()) 460 for argv in argvs: 461 assert isinstance(argv, tuple), \ 462 'all arguments to spawnPar must be tuples' 463 for arg in argv: 464 assert isinstance(arg, str), \ 465 'all subarguments to spawnPar must be strings' 466 processes.append(self._allocProcess(argv)) 467 for process in processes: 468 self._appendOp(self.vm.spawn, process) 469 return tuple(processes)
470
471 - def wait(self, process, status):
472 self.waitPar((process, status))
473
474 - def waitPar(self, *processStatusPairs):
475 processes = tuple([p for p, s in processStatusPairs]) 476 self._appendOp(self.vm.checkExits, processes) 477 for process, status in processStatusPairs: 478 self._appendOp(self.vm.wait, process, status)
479
480 - def kill(self, process, status=None):
481 self._appendOp(self.vm.checkExits, ()) 482 self._appendOp(self.vm.kill, process) 483 self._appendOp(self.vm.wait, process, status)
484
485 - def execute(self):
486 d = self.vm.run(self.ops, timeout=self.timeout) 487 d.addCallback(lambda _: self._cleanOutputDir()) 488 return d
489 490
491 -def test(proc):
492 testName = proc.__name__ 493 494 def wrappedtest(self): 495 plan = Plan(self, testName) 496 proc(self, plan) 497 return plan.execute()
498 try: 499 wrappedtest.__name__ = testName 500 except TypeError: 501 # can only set procedure names in python >= 2.4 502 pass 503 # trial seems to require a timeout, at least in twisted 2.4, so give 504 # it a nice one 505 wrappedtest.timeout = 666 506 return wrappedtest 507