1 from __future__ import absolute_import
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import logging, os, socket, time, types
21 from heapq import heappush, heappop, nsmallest
22 from proton import Collector, Connection, ConnectionException, Delivery, Described, dispatch
23 from proton import Endpoint, Event, EventBase, EventType, generate_uuid, Handler, Link, Message
24 from proton import ProtonException, PN_ACCEPTED, PN_PYREF, SASL, Session, SSL, SSLDomain, SSLUnavailable, symbol
25 from proton import Terminus, Timeout, Transport, TransportException, ulong, Url
26 from select import select
27 from proton.handlers import OutgoingMessageHandler
28 from proton import unicode2utf8, utf82unicode
29
30 import traceback
31 from proton import WrappedHandler, _chandler, secs2millis, millis2secs, timeout2millis, millis2timeout, Selectable
32 from .wrapper import Wrapper, PYCTX
33 from cproton import *
34 from . import _compat
35
36 try:
37 import Queue
38 except ImportError:
39 import queue as Queue
40
41 -class Task(Wrapper):
42
43 @staticmethod
45 if impl is None:
46 return None
47 else:
48 return Task(impl)
49
52
55
57 pn_task_cancel(self._impl)
58
60
63
64 - def set_ssl_domain(self, ssl_domain):
65 pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
66
68 pn_acceptor_close(self._impl)
69
71
72 @staticmethod
74 if impl is None:
75 return None
76 else:
77 record = pn_reactor_attachments(impl)
78 attrs = pn_void2py(pn_record_get(record, PYCTX))
79 if attrs and 'subclass' in attrs:
80 return attrs['subclass'](impl=impl)
81 else:
82 return Reactor(impl=impl)
83
84 - def __init__(self, *handlers, **kwargs):
88
91
93 self.errors.append(info)
94 self.yield_()
95
97 return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error)
98
100 impl = _chandler(handler, self.on_error)
101 pn_reactor_set_global_handler(self._impl, impl)
102 pn_decref(impl)
103
104 global_handler = property(_get_global, _set_global)
105
107 return millis2timeout(pn_reactor_get_timeout(self._impl))
108
110 return pn_reactor_set_timeout(self._impl, timeout2millis(secs))
111
112 timeout = property(_get_timeout, _set_timeout)
113
115 pn_reactor_yield(self._impl)
116
118 return pn_reactor_mark(self._impl)
119
121 return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error)
122
124 impl = _chandler(handler, self.on_error)
125 pn_reactor_set_handler(self._impl, impl)
126 pn_decref(impl)
127
128 handler = property(_get_handler, _set_handler)
129
138
140 n = pn_reactor_wakeup(self._impl)
141 if n: raise IOError(pn_error_text(pn_io_error(pn_reactor_io(self._impl))))
142
144 pn_reactor_start(self._impl)
145
146 @property
148 return pn_reactor_quiesced(self._impl)
149
151 if self.errors:
152 for exc, value, tb in self.errors[:-1]:
153 traceback.print_exception(exc, value, tb)
154 exc, value, tb = self.errors[-1]
155 _compat.raise_(exc, value, tb)
156
158 result = pn_reactor_process(self._impl)
159 self._check_errors()
160 return result
161
163 pn_reactor_stop(self._impl)
164 self._check_errors()
165
167 impl = _chandler(task, self.on_error)
168 task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
169 pn_decref(impl)
170 return task
171
172 - def acceptor(self, host, port, handler=None):
173 impl = _chandler(handler, self.on_error)
174 aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
175 pn_decref(impl)
176 if aimpl:
177 return Acceptor(aimpl)
178 else:
179 raise IOError("%s (%s:%s)" % (pn_error_text(pn_io_error(pn_reactor_io(self._impl))), host, port))
180
182 """Deprecated: use connection_to_host() instead
183 """
184 impl = _chandler(handler, self.on_error)
185 result = Connection.wrap(pn_reactor_connection(self._impl, impl))
186 if impl: pn_decref(impl)
187 return result
188
190 """Create an outgoing Connection that will be managed by the reactor.
191 The reator's pn_iohandler will create a socket connection to the host
192 once the connection is opened.
193 """
194 conn = self.connection(handler)
195 self.set_connection_host(conn, host, port)
196 return conn
197
199 """Change the address used by the connection. The address is
200 used by the reactor's iohandler to create an outgoing socket
201 connection. This must be set prior to opening the connection.
202 """
203 pn_reactor_set_connection_host(self._impl,
204 connection._impl,
205 unicode2utf8(str(host)),
206 unicode2utf8(str(port)))
207
209 """This may be used to retrieve the remote peer address.
210 @return: string containing the address in URL format or None if no
211 address is available. Use the proton.Url class to create a Url object
212 from the returned value.
213 """
214 _url = pn_reactor_get_connection_address(self._impl, connection._impl)
215 return utf82unicode(_url)
216
218 impl = _chandler(handler, self.on_error)
219 result = Selectable.wrap(pn_reactor_selectable(self._impl))
220 if impl:
221 record = pn_selectable_attachments(result._impl)
222 pn_record_set_handler(record, impl)
223 pn_decref(impl)
224 return result
225
227 pn_reactor_update(self._impl, sel._impl)
228
230 pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
231
232 from proton import wrappers as _wrappers
233 _wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
234 _wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
238 """
239 Can be added to a reactor to allow events to be triggered by an
240 external thread but handled on the event thread associated with
241 the reactor. An instance of this class can be passed to the
242 Reactor.selectable() method of the reactor in order to activate
243 it. The close() method should be called when it is no longer
244 needed, to allow the event loop to end if needed.
245 """
247 self.queue = Queue.Queue()
248 self.pipe = os.pipe()
249 self._closed = False
250
252 """
253 Request that the given event be dispatched on the event thread
254 of the reactor to which this EventInjector was added.
255 """
256 self.queue.put(event)
257 os.write(self.pipe[1], _compat.str2bin("!"))
258
260 """
261 Request that this EventInjector be closed. Existing events
262 will be dispctahed on the reactors event dispactch thread,
263 then this will be removed from the set of interest.
264 """
265 self._closed = True
266 os.write(self.pipe[1], _compat.str2bin("!"))
267
270
276
286
289 """
290 Application defined event, which can optionally be associated with
291 an engine object and or an arbitrary subject
292 """
293 - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
306
310
312 """
313 Class to track state of an AMQP 1.0 transaction.
314 """
315 - def __init__(self, txn_ctrl, handler, settle_before_discharge=False):
316 self.txn_ctrl = txn_ctrl
317 self.handler = handler
318 self.id = None
319 self._declare = None
320 self._discharge = None
321 self.failed = False
322 self._pending = []
323 self.settle_before_discharge = settle_before_discharge
324 self.declare()
325
328
331
333 self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
334
338
343
344 - def send(self, sender, msg, tag=None):
349
356
357 - def update(self, delivery, state=None):
361
367
370
393
395 """
396 Abstract interface for link configuration options
397 """
399 """
400 Subclasses will implement any configuration logic in this
401 method
402 """
403 pass
404 - def test(self, link):
405 """
406 Subclasses can override this to selectively apply an option
407 e.g. based on some link criteria
408 """
409 return True
410
414
419
421 - def apply(self, sender): pass
423
425 - def apply(self, receiver): pass
427
442
445 self.filter_set = filter_set
446
447 - def apply(self, receiver):
449
451 """
452 Configures a link with a message selector filter
453 """
454 - def __init__(self, value, name='selector'):
456
458 - def apply(self, receiver):
461
462 -class Move(ReceiverOption):
463 - def apply(self, receiver):
465
466 -class Copy(ReceiverOption):
467 - def apply(self, receiver):
469
477
482
489
492 self._default_session = None
493
495 if not self._default_session:
496 self._default_session = _create_session(connection)
497 self._default_session.context = self
498 return self._default_session
499
503
505 """
506 Internal handler that triggers the necessary socket connect for an
507 opened connection.
508 """
511
513 if not self._override(event):
514 event.dispatch(self.base)
515
517 conn = event.connection
518 return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
519
521 """
522 Internal handler that triggers the necessary socket connect for an
523 opened connection.
524 """
537
538 - def _connect(self, connection, reactor):
569
572
578
581
596
599
602
604 """
605 A reconnect strategy involving an increasing delay between
606 retries, up to a maximum or 10 seconds.
607 """
610
613
621
624 self.values = [Url(v) for v in values]
625 self.i = iter(self.values)
626
629
631 try:
632 return next(self.i)
633 except StopIteration:
634 self.i = iter(self.values)
635 return next(self.i)
636
649
652 """A representation of the AMQP concept of a 'container', which
653 lossely speaking is something that establishes links to or from
654 another container, over which messages are transfered. This is
655 an extension to the Reactor class that adds convenience methods
656 for creating connections and sender- or receiver- links.
657 """
658 - def __init__(self, *handlers, **kwargs):
674
675 - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs):
676 """
677 Initiates the establishment of an AMQP connection. Returns an
678 instance of proton.Connection.
679
680 @param url: URL string of process to connect to
681
682 @param urls: list of URL strings of process to try to connect to
683
684 Only one of url or urls should be specified.
685
686 @param reconnect: A value of False will prevent the library
687 form automatically trying to reconnect if the underlying
688 socket is disconnected before the connection has been closed.
689
690 @param heartbeat: A value in milliseconds indicating the
691 desired frequency of heartbeats used to test the underlying
692 socket is alive.
693
694 @param ssl_domain: SSL configuration in the form of an
695 instance of proton.SSLdomain.
696
697 @param handler: a connection scoped handler that will be
698 called to process any events in the scope of this connection
699 or its child links
700
701 @param kwargs: sasl_enabled, which determines whether a sasl layer is
702 used for the connection; allowed_mechs an optional list of SASL
703 mechanisms to allow if sasl is enabled; allow_insecure_mechs a flag
704 indicating whether insecure mechanisms, such as PLAIN over a
705 non-encrypted socket, are allowed; 'virtual_host' the hostname to set
706 in the Open performative used by peer to determine the correct
707 back-end service for the client. If 'virtual_host' is not supplied the
708 host field from the URL is used instead."
709
710 """
711 conn = self.connection(handler)
712 conn.container = self.container_id or str(generate_uuid())
713 conn.offered_capabilities = kwargs.get('offered_capabilities')
714 conn.desired_capabilities = kwargs.get('desired_capabilities')
715 conn.properties = kwargs.get('properties')
716
717 connector = Connector(conn)
718 connector.allow_insecure_mechs = kwargs.get('allow_insecure_mechs', self.allow_insecure_mechs)
719 connector.allowed_mechs = kwargs.get('allowed_mechs', self.allowed_mechs)
720 connector.sasl_enabled = kwargs.get('sasl_enabled', self.sasl_enabled)
721 connector.user = kwargs.get('user', self.user)
722 connector.password = kwargs.get('password', self.password)
723 connector.virtual_host = kwargs.get('virtual_host')
724 if connector.virtual_host:
725
726 conn.hostname = connector.virtual_host
727
728 conn._overrides = connector
729 if url: connector.address = Urls([url])
730 elif urls: connector.address = Urls(urls)
731 elif address: connector.address = address
732 else: raise ValueError("One of url, urls or address required")
733 if heartbeat:
734 connector.heartbeat = heartbeat
735 if reconnect:
736 connector.reconnect = reconnect
737 elif reconnect is None:
738 connector.reconnect = Backoff()
739
740
741 connector.ssl_domain = ssl_domain or (self.ssl and self.ssl.client)
742 conn._session_policy = SessionPerConnection()
743 conn.open()
744 return conn
745
746 - def _get_id(self, container, remote, local):
747 if local and remote: "%s-%s-%s" % (container, remote, local)
748 elif local: return "%s-%s" % (container, local)
749 elif remote: return "%s-%s" % (container, remote)
750 else: return "%s-%s" % (container, str(generate_uuid()))
751
764
765 - def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None):
766 """
767 Initiates the establishment of a link over which messages can
768 be sent. Returns an instance of proton.Sender.
769
770 There are two patterns of use. (1) A connection can be passed
771 as the first argument, in which case the link is established
772 on that connection. In this case the target address can be
773 specified as the second argument (or as a keyword
774 argument). The source address can also be specified if
775 desired. (2) Alternatively a URL can be passed as the first
776 argument. In this case a new connection will be establised on
777 which the link will be attached. If a path is specified and
778 the target is not, then the path of the URL is used as the
779 target address.
780
781 The name of the link may be specified if desired, otherwise a
782 unique name will be generated.
783
784 Various LinkOptions can be specified to further control the
785 attachment.
786 """
787 if isinstance(context, _compat.STRING_TYPES):
788 context = Url(context)
789 if isinstance(context, Url) and not target:
790 target = context.path
791 session = self._get_session(context)
792 snd = session.sender(name or self._get_id(session.connection.container, target, source))
793 if source:
794 snd.source.address = source
795 if target:
796 snd.target.address = target
797 if handler != None:
798 snd.handler = handler
799 if tags:
800 snd.tag_generator = tags
801 _apply_link_options(options, snd)
802 snd.open()
803 return snd
804
805 - def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None):
806 """
807 Initiates the establishment of a link over which messages can
808 be received (aka a subscription). Returns an instance of
809 proton.Receiver.
810
811 There are two patterns of use. (1) A connection can be passed
812 as the first argument, in which case the link is established
813 on that connection. In this case the source address can be
814 specified as the second argument (or as a keyword
815 argument). The target address can also be specified if
816 desired. (2) Alternatively a URL can be passed as the first
817 argument. In this case a new connection will be establised on
818 which the link will be attached. If a path is specified and
819 the source is not, then the path of the URL is used as the
820 target address.
821
822 The name of the link may be specified if desired, otherwise a
823 unique name will be generated.
824
825 Various LinkOptions can be specified to further control the
826 attachment.
827 """
828 if isinstance(context, _compat.STRING_TYPES):
829 context = Url(context)
830 if isinstance(context, Url) and not source:
831 source = context.path
832 session = self._get_session(context)
833 rcv = session.receiver(name or self._get_id(session.connection.container, source, target))
834 if source:
835 rcv.source.address = source
836 if dynamic:
837 rcv.source.dynamic = True
838 if target:
839 rcv.target.address = target
840 if handler != None:
841 rcv.handler = handler
842 _apply_link_options(options, rcv)
843 rcv.open()
844 return rcv
845
847 if not _get_attr(context, '_txn_ctrl'):
848 class InternalTransactionHandler(OutgoingMessageHandler):
849 def __init__(self):
850 super(InternalTransactionHandler, self).__init__(auto_settle=True)
851
852 def on_settled(self, event):
853 if hasattr(event.delivery, "transaction"):
854 event.transaction = event.delivery.transaction
855 event.delivery.transaction.handle_outcome(event)
856 context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl', handler=InternalTransactionHandler())
857 context._txn_ctrl.target.type = Terminus.COORDINATOR
858 context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
859 return Transaction(context._txn_ctrl, handler, settle_before_discharge)
860
861 - def listen(self, url, ssl_domain=None):
862 """
863 Initiates a server socket, accepting incoming AMQP connections
864 on the interface and port specified.
865 """
866 url = Url(url)
867 acceptor = self.acceptor(url.host, url.port)
868 ssl_config = ssl_domain
869 if not ssl_config and url.scheme == 'amqps':
870
871 if self.ssl:
872 ssl_config = self.ssl.server
873 else:
874 raise SSLUnavailable("amqps: SSL libraries not found")
875 if ssl_config:
876 acceptor.set_ssl_domain(ssl_config)
877 return acceptor
878
883