Coverage for aiocoap/pipe.py: 89%
117 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5import asyncio
6from collections import namedtuple
7import functools
9from . import error
10from .numbers import INTERNAL_SERVER_ERROR
13class Pipe:
14 """Low-level meeting point between a request and a any responses that come
15 back on it.
17 A single request message is placed in the Pipe at creation time.
18 Any responses, as well as any exception happening in the course of
19 processing, are passed back to the requester along the Pipe. A
20 response can carry an indication of whether it is final; an exception
21 always is.
23 This object is used both on the client side (where the Context on behalf of
24 the application creates a Pipe and passes it to the network
25 transports that send the request and fill in any responses) and on the
26 server side (where the Context creates one for an incoming request and
27 eventually lets the server implementation populate it with responses).
29 This currently follows a callback dispatch style. (It may be developed into
30 something where only awaiting a response drives the proces, though).
32 Currently, the requester sets up the object, connects callbacks, and then
33 passes the Pipe on to whatever creates the response.
35 The creator of responses is notified by the Pipe of a loss of
36 interest in a response when there are no more callback handlers registered
37 by registering an on_interest_end callback. As the response callbacks need
38 to be already in place when the Pipe is passed on to the
39 responder, the absence event callbacks is signalled by callign the callback
40 immediately on registration.
42 To accurately model "loss of interest", it is important to use the
43 two-phase setup of first registering actual callbacks and then producing
44 events and/or placing on_interest_end callbacks; this is not clearly
45 expressed in type or state yet. (One possibility would be for the
46 Pipe to carry a preparation boolean, and which prohibits event
47 sending during preparation and is_interest=True callback creation
48 afterwards).
50 This was previously named PlumbingRequest.
52 **Stability**
54 Sites and resources implemented by provinding a
55 :meth:`~aiocoap.interfaces.Resource.render_to_pipe` method can stably use
56 the :meth:`add_response` method of a Pipe (or something that quacks like
57 it).
59 They should not rely on :meth:`add_exception` but rather just raise the
60 exception, and neither register :meth:`on_event` handlers (being the sole
61 producer of events) nor hook to :meth:`on_interest_end` (instead, they can
62 use finally clauses or async context managers to handle any cleanup when
63 the cancellation of the render task indicates the peer's loss of interest).
64 """
66 Event = namedtuple("Event", ("message", "exception", "is_last"))
68 # called by the initiator of the request
70 def __init__(self, request, log):
71 self.request = request
72 self.log = log
74 self._event_callbacks = []
75 """list[(callback, is_interest)], or None during event processing, or
76 False when there were no more event callbacks and an the
77 on_interest_end callbacks have already been called"""
79 def __repr__(self):
80 return "<%s at %#x around %r with %r callbacks (thereof %r interests)>" % (
81 type(self).__name__,
82 id(self),
83 self.request,
84 len(self._event_callbacks)
85 if self._event_callbacks
86 else self._event_callbacks,
87 sum(1 for (e, is_interest) in self._event_callbacks if is_interest)
88 if self._event_callbacks
89 else self._event_callbacks,
90 )
92 def _any_interest(self):
93 return any(is_interest for (cb, is_interest) in self._event_callbacks)
95 def poke(self):
96 """Ask the responder for a life sign. It is up to the responder to
97 ignore this (eg. because the responder is the library/application and
98 can't be just gone), to issue a generic transport-dependent 'ping' to
99 see whether the connection is still alive, or to retransmit the request
100 if it is an observation over an unreliable channel.
102 In any case, no status is reported directly to the poke, but if
103 whatever the responder does fails, it will send an appropriate error
104 message as a response."""
105 raise NotImplementedError()
107 def on_event(self, callback, is_interest=True):
108 """Call callback on any event. The callback must return True to be
109 called again after an event. Callbacks must not produce new events or
110 deregister unrelated event handlers.
112 If is_interest=False, the callback will not be counted toward the
113 active callbacks, and will receive a (None, None, is_last=True) event
114 eventually.
116 To unregister the handler, call the returned closure; this can trigger
117 on_interest_end callbacks.
118 """
119 self._event_callbacks.append((callback, is_interest))
120 return functools.partial(self._unregister_on_event, callback)
122 def _unregister_on_event(self, callback):
123 if self._event_callbacks is False:
124 # They wouldn't be called any more so they're already dropped.a
125 # It's OK that the caller cleans up after itself: Sure it could
126 # register an on_interest_end, but that's really not warranted if
127 # all it wants to know is whether it'll have to execute cleanup
128 # when it's shutting down or not.
129 return
131 self._event_callbacks = [
132 (cb, i) for (cb, i) in self._event_callbacks if callback is not cb
133 ]
134 if not self._any_interest():
135 self._end()
137 def on_interest_end(self, callback):
138 """Register a callback that will be called exactly once -- either right
139 now if there is not even a current indicated interest, or at a last
140 event, or when no more interests are present"""
142 if self._event_callbacks is False:
143 # Happens, for example, when a proxy receives multiple requests on a single token
144 self.log.warning(
145 "on_interest_end callback %r added after %r has already ended",
146 callback,
147 self,
148 )
149 callback()
150 return
152 if self._any_interest():
153 self._event_callbacks.append(
154 (
155 lambda e: ((callback(), False) if e.is_last else (None, True))[1],
156 False,
157 )
158 )
159 else:
160 callback()
162 def _end(self):
163 cbs = self._event_callbacks
164 self._event_callbacks = False
165 tombstone = self.Event(None, None, True)
166 [cb(tombstone) for (cb, _) in cbs]
168 # called by the responding side
170 def _add_event(self, event):
171 if self._event_callbacks is False:
172 # Happens, for example, when a proxy receives multiple requests on a single token
173 self.log.warning(
174 "Response %r added after %r has already ended", event, self
175 )
176 return
178 for cb, is_interest in self._event_callbacks[:]:
179 keep_calling = cb(event)
180 if not keep_calling:
181 if self._event_callbacks is False:
182 # All interest was just lost during the callback
183 return
185 self._event_callbacks.remove((cb, is_interest))
187 if not self._any_interest():
188 self._end()
190 def add_response(self, response, is_last=False):
191 self._add_event(self.Event(response, None, is_last))
193 def add_exception(self, exception):
194 self._add_event(self.Event(None, exception, True))
197def run_driving_pipe(pipe, coroutine, name=None):
198 """Create a task from a coroutine where the end of the coroutine produces a
199 terminal event on the pipe, and lack of interest in the pipe cancels the
200 task.
202 The coroutine will typically produce output into the pipe; that
203 connection is set up by the caller like as in
204 ``run_driving_pipe(pipe, render_to(pipe))``.
206 The create task is not returned, as the only sensible operation on it would
207 be cancellation and that's already set up from the pipe.
208 """
210 async def wrapped():
211 try:
212 await coroutine
213 except Exception as e:
214 pipe.add_exception(e)
215 # Not doing anything special about cancellation: it indicates the
216 # peer's loss of interest, so there's no use in sending anythign out to
217 # someone not listening any more
219 task = asyncio.create_task(
220 wrapped(),
221 name=name,
222 )
223 pipe.on_interest_end(task.cancel)
226def error_to_message(old_pr, log):
227 """Given a pipe set up by the requester, create a new pipe to pass on to a
228 responder.
230 Any exceptions produced by the responder will be turned into terminal
231 responses on the original pipe, and loss of interest is forwarded."""
233 from .message import Message
235 next_pr = Pipe(old_pr.request, log)
237 def on_event(event):
238 if event.message is not None:
239 old_pr.add_response(event.message, event.is_last)
240 return not event.is_last
242 e = event.exception
244 if isinstance(e, error.RenderableError):
245 # the repr() here is quite imporant for garbage collection
246 log.info(
247 "Render request raised a renderable error (%s), responding accordingly.",
248 repr(e),
249 )
250 try:
251 msg = e.to_message()
252 if msg is None:
253 # This deserves a separate check because the ABC checks
254 # that should ensure that the default to_message method is
255 # never used in concrete classes fails due to the metaclass
256 # conflict between ABC and Exceptions
257 raise ValueError(
258 "Exception to_message failed to produce a message on %r" % e
259 )
260 except Exception as e2:
261 log.error(
262 "Rendering the renderable exception failed: %r", e2, exc_info=e2
263 )
264 msg = Message(code=INTERNAL_SERVER_ERROR)
265 old_pr.add_response(msg, is_last=True)
266 else:
267 log.error(
268 "An exception occurred while rendering a resource: %r", e, exc_info=e
269 )
270 old_pr.add_response(Message(code=INTERNAL_SERVER_ERROR), is_last=True)
272 return False
274 remove_interest = next_pr.on_event(on_event)
275 old_pr.on_interest_end(remove_interest)
276 return next_pr
279class IterablePipe:
280 """A stand-in for a Pipe that the requesting party can use
281 instead. It should behave just like a Pipe to the responding
282 party, but the caller does not register on_event handlers and instead
283 iterates asynchronously over the events.
285 Note that the PR can be aitered over only once, and does not support any
286 additional hook settings once asynchronous iteration is started; this is
287 consistent with the usage pattern of pipes.
288 """
290 def __init__(self, request):
291 self.request = request
293 self.__on_interest_end = []
295 # FIXME: This is unbounded -- pipes should gain support for
296 # backpressure.
297 self.__queue = asyncio.Queue()
299 def on_interest_end(self, callback):
300 try:
301 self.__on_interest_end.append(callback)
302 except AttributeError:
303 raise RuntimeError(
304 "Attempted to declare interest in the end of a IterablePipe on which iteration already started"
305 ) from None
307 def __aiter__(self):
308 i = self.Iterator(self.__queue, self.__on_interest_end)
309 del self.__on_interest_end
310 return i
312 def _add_event(self, e):
313 self.__queue.put_nowait(e)
315 def add_response(self, response, is_last=False):
316 self._add_event(Pipe.Event(response, None, is_last))
318 def add_exception(self, exception):
319 self._add_event(Pipe.Event(None, exception, True))
321 class Iterator:
322 def __init__(self, queue, on_interest_end):
323 self.__queue = queue
324 self.__on_interest_end = on_interest_end
326 async def __anext__(self):
327 return await self.__queue.get()
329 def __del__(self):
330 # This is pretty reliable as the iterator is only created and
331 # referenced in the desugaring of the `async for`.
332 for c in self.__on_interest_end:
333 c()