1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Feed components, participating in the stream
24 """
25
26 import os
27
28 import gst
29 import gst.interfaces
30 import gobject
31
32 from twisted.internet import reactor, defer
33 from twisted.spread import pb
34 from zope.interface import implements
35
36 from flumotion.configure import configure
37 from flumotion.component import component as basecomponent
38 from flumotion.component import feed
39 from flumotion.common import common, interfaces, errors, log, pygobject, \
40 messages
41 from flumotion.common import gstreamer
42 from flumotion.common.i18n import N_, gettexter
43 from flumotion.common.planet import moods
44 from flumotion.common.pygobject import gsignal
45
46 __version__ = "$Rev$"
47 T_ = gettexter()
48
49
51 """
52 I am a component-side medium for a FeedComponent to interface with
53 the manager-side ComponentAvatar.
54 """
55 implements(interfaces.IComponentMedium)
56 logCategory = 'feedcompmed'
57 remoteLogName = 'feedserver'
58
60 """
61 @param component: L{flumotion.component.feedcomponent.FeedComponent}
62 """
63 basecomponent.BaseComponentMedium.__init__(self, component)
64
65 self._feederFeedServer = {}
66
67 self._feederPendingConnections = {}
68 self._eaterFeedServer = {}
69
70 self._eaterPendingConnections = {}
71 self.logName = component.name
72
73
74
77
79 """
80 Sets the GStreamer debugging levels based on the passed debug string.
81
82 @since: 0.4.2
83 """
84 self.debug('Setting GStreamer debug level to %s' % debug)
85 if not debug:
86 return
87
88 for part in debug.split(','):
89 glob = None
90 value = None
91 pair = part.split(':')
92 if len(pair) == 1:
93
94 value = int(pair[0])
95 elif len(pair) == 2:
96 glob, value = pair
97 value = int(value)
98 else:
99 self.warning("Cannot parse GStreamer debug setting '%s'." %
100 part)
101 continue
102
103 if glob:
104 try:
105
106 gst.debug_set_threshold_for_name(glob, value)
107 except TypeError:
108 self.warning("Cannot set glob %s to value %s" % (
109 glob, value))
110 else:
111 gst.debug_set_default_threshold(value)
112
113 self.comp.uiState.set('gst-debug', debug)
114
116 """
117 Tell the component the host and port for the FeedServer through which
118 it can connect a local eater to a remote feeder to eat the given
119 fullFeedId.
120
121 Called on by the manager-side ComponentAvatar.
122 """
123 if self._feederFeedServer.get(eaterAlias):
124 if self._feederFeedServer[eaterAlias] == (fullFeedId, host, port):
125 self.debug("Feed:%r is the same as the current one. "\
126 "Request ignored.", (fullFeedId, host, port))
127 return
128 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port)
129 return self.connectEater(eaterAlias)
130
143
145 """
146 Connect one of the medium's component's eaters to a remote feed.
147 Called by the component, both on initial connection and for
148 reconnecting.
149
150 @returns: deferred that will fire with a value of None
151 """
152
153
154 def gotFeed((feedId, fd)):
155 self._feederPendingConnections.pop(eaterAlias, None)
156 self.comp.eatFromFD(eaterAlias, feedId, fd)
157
158 if eaterAlias not in self._feederFeedServer:
159 self.debug("eatFrom() hasn't been called yet for eater %s",
160 eaterAlias)
161
162
163 return defer.succeed(None)
164
165 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias]
166
167 cancel = self._feederPendingConnections.pop(eaterAlias, None)
168 if cancel:
169 self.debug('cancelling previous connection attempt on %s',
170 eaterAlias)
171 cancel()
172
173 client = feed.FeedMedium(logName=self.comp.name)
174
175 d = client.requestFeed(host, port,
176 self._getAuthenticatorForFeed(eaterAlias),
177 fullFeedId)
178 self._feederPendingConnections[eaterAlias] = client.stopConnecting
179 d.addCallback(gotFeed)
180 return d
181
183 """
184 Tell the component to feed the given feed to the receiving component
185 accessible through the FeedServer on the given host and port.
186
187 Called on by the manager-side ComponentAvatar.
188 """
189 self._eaterFeedServer[fullFeedId] = (host, port)
190 self.connectFeeder(feederName, fullFeedId)
191
193 """
194 Tell the component to feed the given feed to the receiving component
195 accessible through the FeedServer on the given host and port.
196
197 Called on by the manager-side ComponentAvatar.
198 """
199
200 def gotFeed((fullFeedId, fd)):
201 self._eaterPendingConnections.pop(feederName, None)
202 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
203
204 if fullFeedId not in self._eaterFeedServer:
205 self.debug("feedTo() hasn't been called yet for feeder %s",
206 feederName)
207
208
209 return defer.succeed(None)
210
211 host, port = self._eaterFeedServer[fullFeedId]
212
213
214 cancel = self._eaterPendingConnections.pop(fullFeedId, None)
215 if cancel:
216 self.debug('cancelling previous connection attempt on %s',
217 feederName)
218 cancel()
219
220 client = feed.FeedMedium(logName=self.comp.name)
221
222 d = client.sendFeed(host, port,
223 self._getAuthenticatorForFeed(feederName),
224 fullFeedId)
225 self._eaterPendingConnections[feederName] = client.stopConnecting
226 d.addCallback(gotFeed)
227 return d
228
230 """
231 Tells the component to start providing a master clock on the given
232 UDP port.
233 Can only be called if setup() has been called on the component.
234
235 The IP address returned is the local IP the clock is listening on.
236
237 @returns: (ip, port, base_time)
238 @rtype: tuple of (str, int, long)
239 """
240 self.debug('remote_provideMasterClock(port=%r)' % port)
241 return self.comp.provide_master_clock(port)
242
244 """
245 Return the clock master info created by a previous call
246 to provideMasterClock.
247
248 @returns: (ip, port, base_time)
249 @rtype: tuple of (str, int, long)
250 """
251 return self.comp.get_master_clock()
252
255
256 - def remote_effect(self, effectName, methodName, *args, **kwargs):
257 """
258 Invoke the given methodName on the given effectName in this component.
259 The effect should implement effect_(methodName) to receive the call.
260 """
261 self.debug("calling %s on effect %s" % (methodName, effectName))
262 if not effectName in self.comp.effects:
263 raise errors.UnknownEffectError(effectName)
264 effect = self.comp.effects[effectName]
265 if not hasattr(effect, "effect_%s" % methodName):
266 raise errors.NoMethodError("%s on effect %s" % (methodName,
267 effectName))
268 method = getattr(effect, "effect_%s" % methodName)
269 try:
270 result = method(*args, **kwargs)
271 except TypeError:
272 msg = "effect method %s did not accept %s and %s" % (
273 methodName, args, kwargs)
274 self.debug(msg)
275 raise errors.RemoteRunError(msg)
276 self.debug("effect: result: %r" % result)
277 return result
278
281
282 from feedcomponent010 import FeedComponent
283
284 FeedComponent.componentMediumClass = FeedComponentMedium
285
286
288 """A component using gst-launch syntax
289
290 @cvar checkTimestamp: whether to check continuity of timestamps for eaters
291 @cvar checkOffset: whether to check continuity of offsets for
292 eaters
293 """
294
295 DELIMITER = '@'
296
297
298 checkTimestamp = False
299 checkOffset = False
300
301
302 FDSRC_TMPL = 'fdsrc name=%(name)s'
303 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay'
304 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\
305 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\
306 'recover-policy=1'
307 EATER_TMPL = None
308
328
329
330
356
365
366
367
369 """
370 Method that must be implemented by subclasses to produce the
371 gstparse string for the component's pipeline. Subclasses should
372 not chain up; this method raises a NotImplemented error.
373
374 Returns: a new pipeline string representation.
375 """
376 raise NotImplementedError('subclasses should implement '
377 'get_pipeline_string')
378
388
389
390
401
403 """
404 Expand the given pipeline string representation by substituting
405 blocks between '@' with a filled-in template.
406
407 @param pipeline: a pipeline string representation with variables
408 @param templatizers: A dict of prefix => procedure. Template
409 blocks in the pipeline will be replaced
410 with the result of calling the procedure
411 with what is left of the template after
412 taking off the prefix.
413 @returns: a new pipeline string representation.
414 """
415 assert pipeline != ''
416
417
418 if pipeline.count(self.DELIMITER) % 2 != 0:
419 raise TypeError("'%s' contains an odd number of '%s'"
420 % (pipeline, self.DELIMITER))
421
422 out = []
423 for i, block in enumerate(pipeline.split(self.DELIMITER)):
424
425
426 if i % 2 == 0:
427 out.append(block)
428 else:
429 block = block.strip()
430 try:
431 pos = block.index(':')
432 except ValueError:
433 raise TypeError("Template %r has no colon" % (block, ))
434 prefix = block[:pos+1]
435 if prefix not in templatizers:
436 raise TypeError("Template %r has invalid prefix %r"
437 % (block, prefix))
438 out.append(templatizers[prefix](block[pos+1:]))
439 return ''.join(out)
440
462
464 queue = self.get_queue_string(eaterAlias)
465 elementName = self.eaters[eaterAlias].elementName
466
467 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
468
470 elementName = self.feeders[feederName].elementName
471 return self.FEEDER_TMPL % {'name': elementName}
472
474 """
475 Return a parse-launch string to join the fdsrc eater element and
476 the depayer, for example '!' or '! queue !'. The string may have
477 no format strings.
478 """
479 return '!'
480
482 """
483 Method that returns the source pad of the final element in an eater.
484
485 @returns: the GStreamer source pad of the final element in an eater
486 @rtype: L{gst.Pad}
487 """
488 e = self.eaters[eaterAlias]
489 identity = self.get_element(e.elementName + '-identity')
490 depay = self.get_element(e.depayName)
491 srcpad = depay.get_pad("src")
492 if identity:
493 srcpad = identity.get_pad("src")
494 return srcpad
495
496
498 """
499 I am a part of a feed component for a specific group
500 of functionality.
501
502 @ivar name: name of the effect
503 @type name: string
504 @ivar component: component owning the effect
505 @type component: L{FeedComponent}
506 """
507 logCategory = "effect"
508
510 """
511 @param name: the name of the effect
512 """
513 self.name = name
514 self.setComponent(None)
515
517 """
518 Set the given component as the effect's owner.
519
520 @param component: the component to set as an owner of this effect
521 @type component: L{FeedComponent}
522 """
523 self.component = component
524 self.setUIState(component and component.uiState or None)
525
527 """
528 Set the given UI state on the effect. This method is ideal for
529 adding keys to the UI state.
530
531 @param state: the UI state for the component to use.
532 @type state: L{flumotion.common.componentui.WorkerComponentUIState}
533 """
534 self.uiState = state
535
537 """
538 Get the component owning this effect.
539
540 @rtype: L{FeedComponent}
541 """
542 return self.component
543
544
545 -class PostProcEffect (Effect):
546 """
547 I am an effect that is plugged in the pipeline to do a post processing
548 job and can be chained to other effect of the same class.
549
550 @ivar name: name of the effect
551 @type name: string
552 @ivar component: component owning the effect
553 @type component: L{FeedComponent}
554 @ivar sourcePad: pad of the source after which I'm plugged
555 @type sourcePad: L{GstPad}
556 @ivar effectBin: gstreamer bin doing the post processing effect
557 @type source: L{GstBin}
558 @ivar pipeline: pipeline holding the gstreamer elements
559 @type pipeline: L{GstPipeline}
560
561 """
562 logCategory = "effect"
563
564 - def __init__(self, name, sourcePad, effectBin, pipeline):
565 """
566 @param name: the name of the effect
567 @param sourcePad: pad of the source after which I'm plugged
568 @param effectBin: gstreamer bin doing the post processing effect
569 @param pipeline: pipeline holding the gstreamer elements
570 """
571 Effect.__init__(self, name)
572 self.sourcePad = sourcePad
573 self.effectBin = effectBin
574 self.pipeline = pipeline
575 self.plugged = False
576
578 """
579 Plug the effect in the pipeline unlinking the source element with it's
580 downstream peer
581 """
582 if self.plugged == True:
583 return
584
585
586 peerSinkPad = self.sourcePad
587 peerSrcPad = peerSinkPad.get_peer()
588 peerSinkPad.unlink(peerSrcPad)
589
590
591 self.effectBin.set_state(gst.STATE_PLAYING)
592 self.pipeline.add(self.effectBin)
593
594
595 peerSinkPad.link(self.effectBin.get_pad('sink'))
596 self.effectBin.get_pad('src').link(peerSrcPad)
597 self.plugged = True
598
599
673
674 signalid = queue.connect("underrun", _underrun_cb)
675
676
678
679 disconnectedPads = False
680
682 """Should be overrided by subclasses to provide the pipeline the
683 component uses.
684 """
685 return ""
686
688 self.EATER_TMPL += ' ! queue name=input-%(name)s'
689 self._reset_count = 0
690
691 self.uiState.addKey('reset-count', 0)
692
696
697
698
700 return [self.get_element(f.elementName + '-pay')
701 for f in self.feeders.values()]
702
706
708 raise NotImplementedError('Subclasses should implement '
709 'get_base_pipeline_string')
710
712 e = self.eaters[eaterAlias]
713 inputq = self.get_element('input-' + e.elementName)
714 return inputq.get_pad('src')
715
716
717
719 """
720 Add the event probes that will check for the flumotion-reset event.
721
722 Those will trigger the pipeline's blocking and posterior reload
723 """
724
725
726 def input_reset_event(pad, event):
727 if event.type != gst.EVENT_CUSTOM_DOWNSTREAM:
728 return True
729 if event.get_structure().get_name() != 'flumotion-reset':
730 return True
731 if self.disconnectedPads:
732 return False
733
734 self.log('RESET: in reset event received on input pad %r', pad)
735 self._reset_count = len(self.feeders)
736
737
738
739 self._block_eaters()
740
741
742 return False
743
744 def output_reset_event(pad, event):
745 if event.type != gst.EVENT_EOS:
746 return True
747
748 self.log('RESET: out reset event received on output pad %r', pad)
749
750
751
752
753
754
755 self._reset_count -= 1
756 if self._reset_count > 0:
757 return False
758
759 self._send_reset_event()
760 reactor.callFromThread(self._on_pipeline_drained)
761
762 return False
763
764 self.log('RESET: installing event probes for detecting changes')
765
766 for elem in self.get_input_elements():
767 self.debug('RESET: adding event probe for %s', elem.get_name())
768 elem.get_pad('sink').add_event_probe(input_reset_event)
769
770 for elem in self.get_output_elements():
771 self.debug('RESET: adding event probe for %s', elem.get_name())
772 elem.get_pad('sink').add_event_probe(output_reset_event)
773
775 """
776 Function that blocks all the identities of the eaters
777 """
778 for elem in self.get_input_elements():
779 pad = elem.get_pad('src')
780 self.debug("RESET: Blocking pad %s", pad)
781 pad.set_blocked_async(True, self._on_eater_blocked)
782
788
790 event = gst.event_new_custom(gst.EVENT_CUSTOM_DOWNSTREAM,
791 gst.Structure('flumotion-reset'))
792
793 for elem in self.get_output_elements():
794 pad = elem.get_pad('sink')
795 pad.send_event(event)
796
798 for pad in element.pads():
799 ppad = pad.get_peer()
800 if not ppad:
801 continue
802 if (pad.get_direction() in directions and
803 pad.get_direction() == gst.PAD_SINK):
804 self.debug('RESET: unlink %s with %s', pad, ppad)
805 ppad.unlink(pad)
806 elif (pad.get_direction() in directions and
807 pad.get_direction() == gst.PAD_SRC):
808 self.debug('RESET: unlink %s with %s', pad, ppad)
809 pad.unlink(ppad)
810
812 if done is None:
813 done = []
814 if not element:
815 return
816 if element in done:
817 return
818 if element in end:
819 return
820
821 for src in element.src_pads():
822 self.log('going to start by pad %r', src)
823 if not src.get_peer():
824 continue
825 peer = src.get_peer().get_parent()
826 self._remove_pipeline(pipeline, peer, end, done)
827 done.append(peer)
828 element.unlink(peer)
829
830 self.log("RESET: removing old element %s from pipeline", element)
831 element.set_state(gst.STATE_NULL)
832 pipeline.remove(element)
833
835
836
837
838
839 self.log('RESET: Going to rebuild the pipeline')
840
841 base_pipe = self._get_base_pipeline_string()
842
843
844
845 fake_pipeline = 'fakesrc name=start ! %s' % base_pipe
846 pipeline = gst.parse_launch(fake_pipeline)
847
848 def move_element(element, orig, dest):
849 if not element:
850 return
851 if element in done:
852 return
853
854 to_link = []
855 done.append(element)
856 self.log("RESET: going to remove %s", element)
857 for src in element.src_pads():
858 self.log("RESET: got src pad element %s", src)
859 if not src.get_peer():
860 continue
861 peer = src.get_peer().get_parent()
862 to_link.append(peer)
863
864 move_element(to_link[-1], orig, dest)
865
866 self._unlink_pads(element, [gst.PAD_SRC, gst.PAD_SINK])
867 orig.remove(element)
868 dest.add(element)
869
870 self.log("RESET: new element %s added to the pipeline", element)
871 for peer in to_link:
872 self.log("RESET: linking peers %s -> %s", element, peer)
873 element.link(peer)
874
875 done = []
876 start = pipeline.get_by_name('start').get_pad('src').get_peer()
877 move_element(start.get_parent(), pipeline, self.pipeline)
878
879
880
881
882
883
884 if len(self.get_input_elements()) == 1:
885 elem = self.get_input_elements()[0]
886 self.log("RESET: linking eater %r to %r", elem, done[0])
887 elem.link(done[0])
888
889
890 if len(self.get_output_elements()) == 1:
891 elem = self.get_output_elements()[0]
892 self.log("RESET: linking %r to feeder %r", done[-1], elem)
893 done[-1].link(elem)
894
895 self.configure_pipeline(self.pipeline, self.config['properties'])
896 self.pipeline.set_state(gst.STATE_PLAYING)
897 self._unblock_eaters()
898
899 resets = self.uiState.get('reset-count')
900 self.uiState.set('reset-count', resets+1)
901
902
903
905 self.log("RESET: Pad %s %s", pad,
906 (blocked and "blocked") or "unblocked")
907
909 self._on_pad_blocked(pad, blocked)
910 if blocked:
911 peer = pad.get_peer()
912 peer.send_event(gst.event_new_eos())
913
914
924
925
927 """
928 Component that is reconfigured when new changes arrive through the
929 flumotion-reset event (sent by the fms producer).
930 """
931 pass
932
933
935 """
936 This class provides for multi-input ParseLaunchComponents, such as muxers,
937 that handle flumotion-reset events for reconfiguration.
938 """
939
940 LINK_MUXER = False
941
943 return muxer.get_compatible_pad(srcpad, caps)
944
984
985 for e in self.eaters:
986 depay = self.get_element(self.eaters[e].depayName)
987 self._probes[depay] = \
988 depay.get_pad("src").add_buffer_probe(
989 buffer_probe_cb, depay, e)
990
992 if is_blocked:
993 self.fired_eaters = self.fired_eaters + 1
994 if self.fired_eaters == len(self.eaters):
995 self.debug("All pads are now blocked")
996 self.disconnectedPads = False
997 for e in self.eaters:
998 srcpad = self.get_eater_srcpad(e)
999 srcpad.set_blocked_async(False, self.is_blocked_cb)
1000