Coverage for src/aiocoap/pipe.py: 0%
119 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-12 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-12 11:18 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5import asyncio
6from collections import namedtuple
7import functools
9from . import error
10from .numbers import INTERNAL_SERVER_ERROR
13class Pipe:
14 """Low-level meeting point between a request and a any responses that come
15 back on it.
17 A single request message is placed in the Pipe at creation time.
18 Any responses, as well as any exception happening in the course of
19 processing, are passed back to the requester along the Pipe. A
20 response can carry an indication of whether it is final; an exception
21 always is.
23 This object is used both on the client side (where the Context on behalf of
24 the application creates a Pipe and passes it to the network
25 transports that send the request and fill in any responses) and on the
26 server side (where the Context creates one for an incoming request and
27 eventually lets the server implementation populate it with responses).
29 This currently follows a callback dispatch style. (It may be developed into
30 something where only awaiting a response drives the proces, though).
32 Currently, the requester sets up the object, connects callbacks, and then
33 passes the Pipe on to whatever creates the response.
35 The creator of responses is notified by the Pipe of a loss of
36 interest in a response when there are no more callback handlers registered
37 by registering an on_interest_end callback. As the response callbacks need
38 to be already in place when the Pipe is passed on to the
39 responder, the absence event callbacks is signalled by callign the callback
40 immediately on registration.
42 To accurately model "loss of interest", it is important to use the
43 two-phase setup of first registering actual callbacks and then producing
44 events and/or placing on_interest_end callbacks; this is not clearly
45 expressed in type or state yet. (One possibility would be for the
46 Pipe to carry a preparation boolean, and which prohibits event
47 sending during preparation and is_interest=True callback creation
48 afterwards).
50 This was previously named PlumbingRequest.
52 **Stability**
54 Sites and resources implemented by provinding a
55 :meth:`~aiocoap.interfaces.Resource.render_to_pipe` method can stably use
56 the :meth:`add_response` method of a Pipe (or something that quacks like
57 it).
59 They should not rely on :meth:`add_exception` but rather just raise the
60 exception, and neither register :meth:`on_event` handlers (being the sole
61 producer of events) nor hook to :meth:`on_interest_end` (instead, they can
62 use finally clauses or async context managers to handle any cleanup when
63 the cancellation of the render task indicates the peer's loss of interest).
64 """
66 Event = namedtuple("Event", ("message", "exception", "is_last"))
68 # called by the initiator of the request
70 def __init__(self, request, log):
71 self.request = request
72 self.log = log
74 self._event_callbacks = []
75 """list[(callback, is_interest)], or None during event processing, or
76 False when there were no more event callbacks and an the
77 on_interest_end callbacks have already been called"""
79 def __repr__(self):
80 return "<%s at %#x around %r with %r callbacks (thereof %r interests)>" % (
81 type(self).__name__,
82 id(self),
83 self.request,
84 len(self._event_callbacks)
85 if self._event_callbacks
86 else self._event_callbacks,
87 sum(1 for (e, is_interest) in self._event_callbacks if is_interest)
88 if self._event_callbacks
89 else self._event_callbacks,
90 )
92 def _any_interest(self):
93 return any(is_interest for (cb, is_interest) in self._event_callbacks)
95 def poke(self):
96 """Ask the responder for a life sign. It is up to the responder to
97 ignore this (eg. because the responder is the library/application and
98 can't be just gone), to issue a generic transport-dependent 'ping' to
99 see whether the connection is still alive, or to retransmit the request
100 if it is an observation over an unreliable channel.
102 In any case, no status is reported directly to the poke, but if
103 whatever the responder does fails, it will send an appropriate error
104 message as a response."""
105 raise NotImplementedError()
107 def on_event(self, callback, is_interest=True):
108 """Call callback on any event. The callback must return True to be
109 called again after an event. Callbacks must not produce new events or
110 deregister unrelated event handlers.
112 If is_interest=False, the callback will not be counted toward the
113 active callbacks, and will receive a (None, None, is_last=True) event
114 eventually.
116 To unregister the handler, call the returned closure; this can trigger
117 on_interest_end callbacks.
118 """
119 self._event_callbacks.append((callback, is_interest))
120 return functools.partial(self._unregister_on_event, callback)
122 def _unregister_on_event(self, callback):
123 if self._event_callbacks is False:
124 # They wouldn't be called any more so they're already dropped.a
125 # It's OK that the caller cleans up after itself: Sure it could
126 # register an on_interest_end, but that's really not warranted if
127 # all it wants to know is whether it'll have to execute cleanup
128 # when it's shutting down or not.
129 return
131 self._event_callbacks = [
132 (cb, i) for (cb, i) in self._event_callbacks if callback is not cb
133 ]
134 if not self._any_interest():
135 self._end()
137 def on_interest_end(self, callback):
138 """Register a callback that will be called exactly once -- either right
139 now if there is not even a current indicated interest, or at a last
140 event, or when no more interests are present"""
142 if self._event_callbacks is False:
143 # Happens, for example, when a proxy receives multiple requests on a single token
144 self.log.warning(
145 "on_interest_end callback %r added after %r has already ended",
146 callback,
147 self,
148 )
149 callback()
150 return
152 if self._any_interest():
153 self._event_callbacks.append(
154 (
155 lambda e: ((callback(), False) if e.is_last else (None, True))[1],
156 False,
157 )
158 )
159 else:
160 callback()
162 def _end(self):
163 cbs = self._event_callbacks
164 self._event_callbacks = False
165 tombstone = self.Event(None, None, True)
166 [cb(tombstone) for (cb, _) in cbs]
168 # called by the responding side
170 def _add_event(self, event):
171 if self._event_callbacks is False:
172 if event.exception is not None:
173 self.log.error(
174 "Discarded exception in %r added after %r has ended",
175 event,
176 self,
177 exception=event.exception,
178 )
179 else:
180 # Happens, for example, when a proxy receives multiple requests on a single token
181 self.log.warning(
182 "Response %r added after %r has already ended", event, self
183 )
184 return
186 for cb, is_interest in self._event_callbacks[:]:
187 keep_calling = cb(event)
188 if not keep_calling:
189 if self._event_callbacks is False:
190 # All interest was just lost during the callback
191 return
193 self._event_callbacks.remove((cb, is_interest))
195 if not self._any_interest():
196 self._end()
198 def add_response(self, response, is_last=False):
199 self._add_event(self.Event(response, None, is_last))
201 def add_exception(self, exception):
202 self._add_event(self.Event(None, exception, True))
205def run_driving_pipe(pipe, coroutine, name=None):
206 """Create a task from a coroutine where the end of the coroutine produces a
207 terminal event on the pipe, and lack of interest in the pipe cancels the
208 task.
210 The coroutine will typically produce output into the pipe; that
211 connection is set up by the caller like as in
212 ``run_driving_pipe(pipe, render_to(pipe))``.
214 The create task is not returned, as the only sensible operation on it would
215 be cancellation and that's already set up from the pipe.
216 """
218 async def wrapped():
219 try:
220 await coroutine
221 except Exception as e:
222 pipe.add_exception(e)
223 # Not doing anything special about cancellation: it indicates the
224 # peer's loss of interest, so there's no use in sending anythign out to
225 # someone not listening any more
227 task = asyncio.create_task(
228 wrapped(),
229 name=name,
230 )
231 pipe.on_interest_end(task.cancel)
234def error_to_message(old_pr, log):
235 """Given a pipe set up by the requester, create a new pipe to pass on to a
236 responder.
238 Any exceptions produced by the responder will be turned into terminal
239 responses on the original pipe, and loss of interest is forwarded."""
241 from .message import Message
243 next_pr = Pipe(old_pr.request, log)
245 def on_event(event):
246 if event.message is not None:
247 old_pr.add_response(event.message, event.is_last)
248 return not event.is_last
250 e = event.exception
252 if isinstance(e, error.RenderableError):
253 # the repr() here is quite imporant for garbage collection
254 log.info(
255 "Render request raised a renderable error (%s), responding accordingly.",
256 repr(e),
257 )
258 try:
259 msg = e.to_message()
260 if msg is None:
261 # This deserves a separate check because the ABC checks
262 # that should ensure that the default to_message method is
263 # never used in concrete classes fails due to the metaclass
264 # conflict between ABC and Exceptions
265 raise ValueError(
266 "Exception to_message failed to produce a message on %r" % e
267 )
268 except Exception as e2:
269 log.error(
270 "Rendering the renderable exception failed: %r", e2, exc_info=e2
271 )
272 msg = Message(code=INTERNAL_SERVER_ERROR)
273 old_pr.add_response(msg, is_last=True)
274 else:
275 log.error(
276 "An exception occurred while rendering a resource: %r", e, exc_info=e
277 )
278 old_pr.add_response(Message(code=INTERNAL_SERVER_ERROR), is_last=True)
280 return False
282 remove_interest = next_pr.on_event(on_event)
283 old_pr.on_interest_end(remove_interest)
284 return next_pr
287class IterablePipe:
288 """A stand-in for a Pipe that the requesting party can use
289 instead. It should behave just like a Pipe to the responding
290 party, but the caller does not register on_event handlers and instead
291 iterates asynchronously over the events.
293 Note that the PR can be aitered over only once, and does not support any
294 additional hook settings once asynchronous iteration is started; this is
295 consistent with the usage pattern of pipes.
296 """
298 def __init__(self, request):
299 self.request = request
301 self.__on_interest_end = []
303 # FIXME: This is unbounded -- pipes should gain support for
304 # backpressure.
305 self.__queue = asyncio.Queue()
307 def on_interest_end(self, callback):
308 try:
309 self.__on_interest_end.append(callback)
310 except AttributeError:
311 raise RuntimeError(
312 "Attempted to declare interest in the end of a IterablePipe on which iteration already started"
313 ) from None
315 def __aiter__(self):
316 i = self.Iterator(self.__queue, self.__on_interest_end)
317 del self.__on_interest_end
318 return i
320 def _add_event(self, e):
321 self.__queue.put_nowait(e)
323 def add_response(self, response, is_last=False):
324 self._add_event(Pipe.Event(response, None, is_last))
326 def add_exception(self, exception):
327 self._add_event(Pipe.Event(None, exception, True))
329 class Iterator:
330 def __init__(self, queue, on_interest_end):
331 self.__queue = queue
332 self.__on_interest_end = on_interest_end
334 async def __anext__(self):
335 return await self.__queue.get()
337 def __del__(self):
338 # This is pretty reliable as the iterator is only created and
339 # referenced in the desugaring of the `async for`.
340 for c in self.__on_interest_end:
341 c()