1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import heapq, logging, os, re, socket, time, types
20
21 from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url
22 from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout
23 from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException
24 from select import select
28 """
29 A utility for simpler and more intuitive handling of delivery
30 events related to outgoing i.e. sent messages.
31 """
32 - def __init__(self, auto_settle=True, delegate=None):
33 self.auto_settle = auto_settle
34 self.delegate = delegate
35
41
55
57 """
58 Called when the sender link has credit and messages can
59 therefore be transferred.
60 """
61 if self.delegate != None:
62 dispatch(self.delegate, 'on_sendable', event)
63
65 """
66 Called when the remote peer accepts an outgoing message.
67 """
68 if self.delegate != None:
69 dispatch(self.delegate, 'on_accepted', event)
70
72 """
73 Called when the remote peer rejects an outgoing message.
74 """
75 if self.delegate != None:
76 dispatch(self.delegate, 'on_rejected', event)
77
79 """
80 Called when the remote peer releases an outgoing message. Note
81 that this may be in response to either the RELEASE or MODIFIED
82 state as defined by the AMQP specification.
83 """
84 if self.delegate != None:
85 dispatch(self.delegate, 'on_released', event)
86
88 """
89 Called when the remote peer has settled the outgoing
90 message. This is the point at which it shouod never be
91 retransmitted.
92 """
93 if self.delegate != None:
94 dispatch(self.delegate, 'on_settled', event)
95
101
102 -class Reject(ProtonException):
103 """
104 An exception that indicate a message should be rejected
105 """
106 pass
107
109 """
110 An exception that indicate a message should be rejected
111 """
112 pass
113
120
127
128 - def release(self, delivery, delivered=True):
129 """
130 Releases a received message, making it available at the source
131 for any (other) interested receiver. The ``delivered``
132 parameter indicates whether this should be considered a
133 delivery attempt (and the delivery count updated) or not.
134 """
135 if delivered:
136 self.settle(delivery, Delivery.MODIFIED)
137 else:
138 self.settle(delivery, Delivery.RELEASED)
139
140 - def settle(self, delivery, state=None):
144
146 """
147 A utility for simpler and more intuitive handling of delivery
148 events related to incoming i.e. received messages.
149 """
150
151 - def __init__(self, auto_accept=True, delegate=None):
152 self.delegate = delegate
153 self.auto_accept = auto_accept
154
178
180 """
181 Called when a message is received. The message itself can be
182 obtained as a property on the event. For the purpose of
183 refering to this message in further actions (e.g. if
184 explicitly accepting it, the ``delivery`` should be used, also
185 obtainable via a property on the event.
186 """
187 if self.delegate != None:
188 dispatch(self.delegate, 'on_message', event)
189
191 if self.delegate != None:
192 dispatch(self.delegate, 'on_settled', event)
193
195 """
196 A utility that exposes 'endpoint' events i.e. the open/close for
197 links, sessions and connections in a more intuitive manner. A
198 XXX_opened method will be called when both local and remote peers
199 have opened the link, session or connection. This can be used to
200 confirm a locally initiated action for example. A XXX_opening
201 method will be called when the remote peer has requested an open
202 that was not initiated locally. By default this will simply open
203 locally, which then triggers the XXX_opened call. The same applies
204 to close.
205 """
206
207 - def __init__(self, peer_close_is_error=False, delegate=None):
208 self.delegate = delegate
209 self.peer_close_is_error = peer_close_is_error
210
211 @classmethod
214
215 @classmethod
218
219 @classmethod
222
223 @classmethod
226
227 @classmethod
230
231 @classmethod
237
246
255
264
268
275
279
286
290
297
299 if self.delegate != None:
300 dispatch(self.delegate, 'on_connection_opened', event)
301
303 if self.delegate != None:
304 dispatch(self.delegate, 'on_session_opened', event)
305
307 if self.delegate != None:
308 dispatch(self.delegate, 'on_link_opened', event)
309
311 if self.delegate != None:
312 dispatch(self.delegate, 'on_connection_opening', event)
313
315 if self.delegate != None:
316 dispatch(self.delegate, 'on_session_opening', event)
317
319 if self.delegate != None:
320 dispatch(self.delegate, 'on_link_opening', event)
321
323 if self.delegate != None:
324 dispatch(self.delegate, 'on_connection_error', event)
325 else:
326 self.log_error(event.connection, "connection")
327
329 if self.delegate != None:
330 dispatch(self.delegate, 'on_session_error', event)
331 else:
332 self.log_error(event.session, "session")
333 event.connection.close()
334
336 if self.delegate != None:
337 dispatch(self.delegate, 'on_link_error', event)
338 else:
339 self.log_error(event.link, "link")
340 event.connection.close()
341
343 if self.delegate != None:
344 dispatch(self.delegate, 'on_connection_closed', event)
345
347 if self.delegate != None:
348 dispatch(self.delegate, 'on_session_closed', event)
349
351 if self.delegate != None:
352 dispatch(self.delegate, 'on_link_closed', event)
353
355 if self.delegate != None:
356 dispatch(self.delegate, 'on_connection_closing', event)
357 elif self.peer_close_is_error:
358 self.on_connection_error(event)
359
361 if self.delegate != None:
362 dispatch(self.delegate, 'on_session_closing', event)
363 elif self.peer_close_is_error:
364 self.on_session_error(event)
365
367 if self.delegate != None:
368 dispatch(self.delegate, 'on_link_closing', event)
369 elif self.peer_close_is_error:
370 self.on_link_error(event)
371
374
378
380 """
381 A general purpose handler that makes the proton-c events somewhat
382 simpler to deal with and/or avoids repetitive tasks for common use
383 cases.
384 """
385 - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
393
409
415
422
429
431 """
432 Called when the event loop - the reactor - starts.
433 """
434 if hasattr(event.reactor, 'subclass'):
435 setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
436 self.on_start(event)
437
439 """
440 Called when the event loop starts. (Just an alias for on_reactor_init)
441 """
442 pass
444 """
445 Called when the connection is closed.
446 """
447 pass
449 """
450 Called when the session is closed.
451 """
452 pass
454 """
455 Called when the link is closed.
456 """
457 pass
459 """
460 Called when the peer initiates the closing of the connection.
461 """
462 pass
464 """
465 Called when the peer initiates the closing of the session.
466 """
467 pass
469 """
470 Called when the peer initiates the closing of the link.
471 """
472 pass
474 """
475 Called when the socket is disconnected.
476 """
477 pass
478
480 """
481 Called when the sender link has credit and messages can
482 therefore be transferred.
483 """
484 pass
485
487 """
488 Called when the remote peer accepts an outgoing message.
489 """
490 pass
491
493 """
494 Called when the remote peer rejects an outgoing message.
495 """
496 pass
497
499 """
500 Called when the remote peer releases an outgoing message. Note
501 that this may be in response to either the RELEASE or MODIFIED
502 state as defined by the AMQP specification.
503 """
504 pass
505
507 """
508 Called when the remote peer has settled the outgoing
509 message. This is the point at which it shouod never be
510 retransmitted.
511 """
512 pass
514 """
515 Called when a message is received. The message itself can be
516 obtained as a property on the event. For the purpose of
517 refering to this message in further actions (e.g. if
518 explicitly accepting it, the ``delivery`` should be used, also
519 obtainable via a property on the event.
520 """
521 pass
522
524 """
525 The interface for transaction handlers, i.e. objects that want to
526 be notified of state changes related to a transaction.
527 """
530
533
536
539
542
544 """
545 An extension to the MessagingHandler for applications using
546 transactions.
547 """
548
549 - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
551
552 - def accept(self, delivery, transaction=None):
557
558 from proton import WrappedHandler
559 from cproton import pn_flowcontroller, pn_handshaker, pn_iohandler
562
564 WrappedHandler.__init__(self, lambda: pn_flowcontroller(window))
565
567
569 WrappedHandler.__init__(self, pn_handshaker)
570
572
574 WrappedHandler.__init__(self, pn_iohandler)
575
577
579 self.selectables = []
580 self.delegate = IOHandler()
581
584
586 self.selectables.append(event.context)
587
590
592 sel = event.context
593 if sel.is_terminal:
594 self.selectables.remove(sel)
595 sel.release()
596
638