Coverage for aiocoap / protocol.py: 89%
522 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module contains the classes that are responsible for keeping track of
6messages:
8* :class:`Context` roughly represents the CoAP endpoint (basically a UDP
9 socket) -- something that can send requests and possibly can answer
10 incoming requests.
12 Incoming requests are processed in tasks created by the context.
14* a :class:`Request` gets generated whenever a request gets sent to keep
15 track of the response
17Logging
18~~~~~~~
20Several constructors of the Context accept a logger name; these names go into
21the construction of a Python logger.
23Log events will be emitted to these on different levels, with "warning" and
24above being a practical default for things that should may warrant reviewing by
25an operator:
27* DEBUG is used for things that occur even under perfect conditions.
28* INFO is for things that are well expected, but might be interesting during
29 testing a network of nodes and not just when debugging the library. (This
30 includes timeouts, retransmissions, and pings.)
31* WARNING is for everything that indicates a malbehaved peer. These don't
32 *necessarily* indicate a client bug, though: Things like requesting a
33 nonexistent block can just as well happen when a resource's content has
34 changed between blocks. The library will not go out of its way to determine
35 whether there is a plausible explanation for the odd behavior, and will
36 report something as a warning in case of doubt.
37* ERROR is used when something clearly went wrong. This includes irregular
38 connection terminations and resource handler errors (which are demoted to
39 error responses), and can often contain a backtrace.
41Logs will generally reveal messages exchanged between this and other systems,
42and attackers can observe their encrypted counterparts. Private or shared keys
43are only logged through an internal `log_secret` function, which usually
44replaces them with a redacted value. Setting the ``AIOCOAP_REVEAL_KEYS``
45environment variable to the value ``show secrets in logs`` bypasses that
46mechanism. As an additional precaution, this is only accepted if the effective
47user has write access to the aiocoap source code.
48"""
50import asyncio
51import weakref
52import time
54from .credentials import CredentialsMap
55from .message import Message
56from .messagemanager import MessageManager
57from .tokenmanager import TokenManager
58from .pipe import Pipe, run_driving_pipe, error_to_message
59from .util import DeprecationWarning
60from . import interfaces
61from . import error
62from .numbers import INTERNAL_SERVER_ERROR, NOT_FOUND, CONTINUE, SHUTDOWN_TIMEOUT
63from .config import TransportParameters
65import warnings
66import logging
69class Context(interfaces.RequestProvider):
70 """Applications' entry point to the network
72 A :class:`.Context` coordinates one or more network :mod:`.transports`
73 implementations and dispatches data between them and the application.
75 The application can start requests using the message dispatch methods, and
76 set a :class:`resources.Site` that will answer requests directed to the
77 application as a server.
79 On the library-internals side, it is the prime implementation of the
80 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and
81 :class:`Response` classes on demand, and decides which transport
82 implementations to start and which are to handle which messages.
84 **Context creation and destruction**
86 The following functions are provided for creating and stopping a context:
88 .. note::
90 A typical application should only ever create one context, even (or
91 especially when) it acts both as a server and as a client (in which
92 case a server context should be created).
94 A context that is not used any more must be shut down using
95 :meth:`.shutdown()`, but typical applications will not need to because
96 they use the context for the full process lifetime.
98 .. automethod:: create_client_context
99 .. automethod:: create_server_context
101 .. automethod:: shutdown
103 **Dispatching messages**
105 CoAP requests can be sent using the following functions:
107 .. automethod:: request
109 If more control is needed, you can create a :class:`Request` yourself and
110 pass the context to it.
113 **Other methods and properties**
115 The remaining methods and properties are to be considered unstable even
116 when the project reaches a stable version number; please file a feature
117 request for stabilization if you want to reliably access any of them.
118 """
120 def __init__(
121 self,
122 loop=None,
123 serversite=None,
124 loggername="coap",
125 client_credentials=None,
126 server_credentials=None,
127 ):
128 self.log = logging.getLogger(loggername)
130 self.loop = loop or asyncio.get_running_loop()
132 self.serversite = serversite
134 self.request_interfaces = []
136 self.client_credentials = client_credentials or CredentialsMap()
137 self.server_credentials = server_credentials or CredentialsMap()
139 #
140 # convenience methods for class instantiation
141 #
143 async def _append_tokenmanaged_messagemanaged_transport(
144 self, message_interface_constructor
145 ):
146 tman = TokenManager(self)
147 mman = MessageManager(tman)
148 transport = await message_interface_constructor(mman)
150 mman.message_interface = transport
151 tman.token_interface = mman
153 self.request_interfaces.append(tman)
155 async def _append_tokenmanaged_transport(self, token_interface_constructor):
156 tman = TokenManager(self)
157 transport = await token_interface_constructor(tman)
159 tman.token_interface = transport
161 self.request_interfaces.append(tman)
163 @classmethod
164 async def create_client_context(
165 cls,
166 *,
167 loggername="coap",
168 loop=None,
169 transports: TransportParameters | None | dict | list[str] = None,
170 ):
171 """Create a context bound to all addresses on a random listening port.
173 This is the easiest way to get a context suitable for sending client
174 requests.
176 :param TransportParameters|dict|list|None transports: A Configuration
177 item that lists which transports are to be configured, and how they are
178 to be bound. If left unset, a default choice of transports is
179 initialized.
181 For ease of use, a `dict` can be passed in, which gets :meth:`.load()
182 <aiocoap.util.dataclass_data.LoadStoreClass.load>`ed, or a list can
183 be passed in that merely lists the transports (equivalent to ``{k: {}
184 for k in transports}``).
186 :meta private:
187 (not actually private, just hiding from automodule due to being
188 grouped with the important functions)
189 """
191 if loop is None:
192 loop = asyncio.get_running_loop()
194 self = cls(loop=loop, serversite=None, loggername=loggername)
196 selected_transports = TransportParameters._compat_create(transports)
197 if selected_transports.is_server is None:
198 selected_transports.is_server = False
199 selected_transports._apply_defaults()
201 self.log.debug(
202 "Creating client context from transport configuration %r",
203 selected_transports,
204 )
206 # FIXME make defaults overridable (postponed until they become configurable too)
207 if selected_transports.oscore:
208 from .transports.oscore import TransportOSCORE
210 oscoretransport = TransportOSCORE(self, self)
211 self.request_interfaces.append(oscoretransport)
212 if selected_transports.slipmux:
213 from .transports.slipmux import MessageInterfaceSlipmux
215 await self._append_tokenmanaged_messagemanaged_transport(
216 lambda mman: MessageInterfaceSlipmux.create_transport_endpoint(
217 selected_transports,
218 mman,
219 log=self.log,
220 loop=self.loop,
221 )
222 )
223 if selected_transports.udp6:
224 from .transports.udp6 import MessageInterfaceUDP6
226 # This can probably be generalized into something like
227 # _append_tokenmanaged_messagemanaged_transport, maybe already with
228 # using an interface rather than a lambda (now that we
229 # comprehensively pass in all relevant options through the transport parameters).
230 async for mint in MessageInterfaceUDP6.prepare_transport_endpoints(
231 params=selected_transports,
232 log=self.log,
233 loop=loop,
234 ):
235 tman = TokenManager(self)
236 mman = MessageManager(tman)
237 mint._ctx = mman
238 mman.message_interface = mint
239 tman.token_interface = mman
240 self.request_interfaces.append(tman)
241 await mint.start_transport_endpoint()
242 if selected_transports.simple6:
243 from .transports.simple6 import MessageInterfaceSimple6
245 await self._append_tokenmanaged_messagemanaged_transport(
246 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(
247 mman, log=self.log, loop=loop
248 )
249 )
250 if selected_transports.tinydtls:
251 from .transports.tinydtls import MessageInterfaceTinyDTLS
253 await self._append_tokenmanaged_messagemanaged_transport(
254 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(
255 mman, log=self.log, loop=loop
256 )
257 )
258 if selected_transports.tcpclient:
259 from .transports.tcp import TCPClient
261 await self._append_tokenmanaged_transport(
262 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)
263 )
264 if selected_transports.tlsclient:
265 from .transports.tls import TLSClient
267 await self._append_tokenmanaged_transport(
268 lambda tman: TLSClient.create_client_transport(
269 tman, self.log, loop, self.client_credentials
270 )
271 )
272 if selected_transports.ws:
273 from .transports.ws import WSPool
275 await self._append_tokenmanaged_transport(
276 lambda tman: WSPool.create_transport(
277 tman, self.log, loop, client_credentials=self.client_credentials
278 )
279 )
281 return self
283 @classmethod
284 async def create_server_context(
285 cls,
286 site,
287 bind=None,
288 *,
289 loggername="coap-server",
290 loop=None,
291 _ssl_context=None,
292 multicast=[],
293 server_credentials=None,
294 transports: TransportParameters | None | dict | list[str] = None,
295 ):
296 """Create a context, bound to all addresses on the CoAP port (unless
297 otherwise specified in the ``bind`` argument).
299 This is the easiest way to get a context suitable both for sending
300 client and accepting server requests.
302 The ``bind`` argument, if given, needs to be a 2-tuple of IP address
303 string and port number, where the port number can be None to use the default port.
305 If ``multicast`` is given, it needs to be a list of (multicast address,
306 interface name) tuples, which will all be joined. (The IPv4 style of
307 selecting the interface by a local address is not supported; users may
308 want to use the netifaces package to arrive at an interface name for an
309 address).
311 As a shortcut, the list may also contain interface names alone. Those
312 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with
313 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6.
315 Under some circumstances you may already need a context to pass into
316 the site for creation; this is typically the case for servers that
317 trigger requests on their own. For those cases, it is usually easiest
318 to pass None in as a site, and set the fully constructed site later by
319 assigning to the ``serversite`` attribute.
321 :param TransportParameters|dict|list|None transports: A Configuration
322 item that lists which transports are to be configured, and how they are
323 to be bound. If left unset, a default choice of transports is
324 initialized.
326 For ease of use, a `dict` can be passed in, which gets :meth:`.load()
327 <aiocoap.util.dataclass_data.LoadStoreClass.load>`ed, or a list can
328 be passed in that merely lists the transports (equivalent to ``{k: {}
329 for k in transports}``).
331 :meta private:
332 (not actually private, just hiding from automodule due to being
333 grouped with the important functions)
334 """
336 if loop is None:
337 loop = asyncio.get_running_loop()
339 self = cls(
340 loop=loop,
341 serversite=site,
342 loggername=loggername,
343 server_credentials=server_credentials,
344 )
346 multicast_done = not multicast
348 selected_transports = TransportParameters._compat_create(transports)
349 if selected_transports.is_server is None:
350 selected_transports.is_server = True
351 selected_transports._legacy_bind = bind
352 selected_transports._legacy_multicast = multicast
353 selected_transports._apply_defaults()
355 self.log.debug(
356 "Creating server context from transport configuration %r",
357 selected_transports,
358 )
360 if selected_transports.oscore:
361 from .transports.oscore import TransportOSCORE
363 oscoretransport = TransportOSCORE(self, self)
364 self.request_interfaces.append(oscoretransport)
365 if selected_transports.slipmux:
366 from .transports.slipmux import MessageInterfaceSlipmux
368 await self._append_tokenmanaged_messagemanaged_transport(
369 lambda mman: MessageInterfaceSlipmux.create_transport_endpoint(
370 selected_transports,
371 mman,
372 log=self.log,
373 loop=self.loop,
374 )
375 )
376 # FIXME this is duplicated from the client version, as it uses the transport parameters
377 if selected_transports.udp6:
378 from .transports.udp6 import MessageInterfaceUDP6
380 # This can probably be generalized into something like
381 # _append_tokenmanaged_messagemanaged_transport, maybe already with
382 # using an interface rather than a lambda (now that we
383 # comprehensively pass in all relevant options through the
384 # transport parameters).
385 async for mint in MessageInterfaceUDP6.prepare_transport_endpoints(
386 params=selected_transports,
387 log=self.log,
388 loop=loop,
389 ):
390 tman = TokenManager(self)
391 mman = MessageManager(tman)
392 mint._ctx = mman
393 mman.message_interface = mint
394 tman.token_interface = mman
395 self.request_interfaces.append(tman)
396 await mint.start_transport_endpoint()
397 multicast_done = True
398 # FIXME this is duplicated from the client version, as those are client-only anyway
399 if selected_transports.simple6:
400 from .transports.simple6 import MessageInterfaceSimple6
402 await self._append_tokenmanaged_messagemanaged_transport(
403 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(
404 mman, log=self.log, loop=loop
405 )
406 )
407 elif selected_transports.tinydtls:
408 from .transports.tinydtls import MessageInterfaceTinyDTLS
410 await self._append_tokenmanaged_messagemanaged_transport(
411 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(
412 mman, log=self.log, loop=loop
413 )
414 )
415 # FIXME end duplication
416 if selected_transports.tinydtls_server:
417 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer
419 await self._append_tokenmanaged_messagemanaged_transport(
420 lambda mman: MessageInterfaceTinyDTLSServer.create_server(
421 bind,
422 mman,
423 log=self.log,
424 loop=loop,
425 server_credentials=self.server_credentials,
426 )
427 )
428 if selected_transports.simplesocketserver:
429 from .transports.simplesocketserver import MessageInterfaceSimpleServer
431 await self._append_tokenmanaged_messagemanaged_transport(
432 lambda mman: MessageInterfaceSimpleServer.create_server(
433 bind, mman, log=self.log, loop=loop
434 )
435 )
436 if selected_transports.tcpserver:
437 from .transports.tcp import TCPServer
439 await self._append_tokenmanaged_transport(
440 lambda tman: TCPServer.create_server(bind, tman, self.log, loop)
441 )
442 if selected_transports.tcpclient:
443 from .transports.tcp import TCPClient
445 await self._append_tokenmanaged_transport(
446 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)
447 )
448 if selected_transports.tlsserver:
449 if _ssl_context is not None:
450 from .transports.tls import TLSServer
452 await self._append_tokenmanaged_transport(
453 lambda tman: TLSServer.create_server(
454 bind, tman, self.log, loop, _ssl_context
455 )
456 )
457 else:
458 # Could also be a warning, but at least as of now, TLS often
459 # enabled implicitly, and turning this into a warning is
460 # excessive as long as we don't have a recommentation about how
461 # users should acknowledge that they don't need TLS anyway.
462 self.log.info("Not opening TLS server: No server certificates present")
463 if selected_transports.tlsclient:
464 from .transports.tls import TLSClient
466 await self._append_tokenmanaged_transport(
467 lambda tman: TLSClient.create_client_transport(
468 tman, self.log, loop, self.client_credentials
469 )
470 )
471 if selected_transports.ws:
472 from .transports.ws import WSPool
474 await self._append_tokenmanaged_transport(
475 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind
476 lambda tman: WSPool.create_transport(
477 tman,
478 self.log,
479 loop,
480 client_credentials=self.client_credentials,
481 server_bind=bind or (None, None),
482 server_context=_ssl_context,
483 )
484 )
486 if not multicast_done:
487 self.log.warning(
488 "Multicast was requested, but no multicast capable transport was selected."
489 )
491 # This is used in tests to wait for externally launched servers to be ready
492 self.log.debug("Server ready to receive requests")
494 return self
496 async def shutdown(self):
497 """Take down any listening sockets and stop all related timers.
499 After this coroutine terminates, and once all external references to
500 the object are dropped, it should be garbage-collectable.
502 This method takes up to
503 :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing
504 transports to perform any cleanup implemented in them (such as orderly
505 connection shutdown and cancelling observations, where the latter is
506 currently not implemented).
508 :meta private:
509 (not actually private, just hiding from automodule due to being
510 grouped with the important functions)
511 """
513 self.log.debug("Shutting down context")
515 done, pending = await asyncio.wait(
516 [
517 asyncio.create_task(
518 ri.shutdown(),
519 name="Shutdown of %r" % ri,
520 )
521 for ri in self.request_interfaces
522 ],
523 timeout=SHUTDOWN_TIMEOUT,
524 )
525 for item in done:
526 await item
527 if pending:
528 # Apart from being useful to see, this also ensures that developers
529 # see the error in the logs during test suite runs -- and the error
530 # should be easier to follow than the "we didn't garbage collect
531 # everything" errors we see anyway (or otherwise, if the error is
532 # escalated into a test failure)
533 self.log.error(
534 "Shutdown timeout exceeded, returning anyway. Interfaces still busy: %s",
535 pending,
536 )
538 # FIXME: determine how official this should be, or which part of it is
539 # public -- now that BlockwiseRequest uses it. (And formalize what can
540 # change about messages and what can't after the remote has been thusly
541 # populated).
542 async def find_remote_and_interface(self, message):
543 if message.remote is None:
544 raise error.MissingRemoteError()
545 for ri in self.request_interfaces:
546 if await ri.fill_or_recognize_remote(message):
547 return ri
548 raise error.NoRequestInterface()
550 def request(self, request_message, handle_blockwise=True):
551 if handle_blockwise:
552 return BlockwiseRequest(self, request_message)
554 pipe = Pipe(request_message, self.log)
555 # Request sets up callbacks at creation
556 result = Request(pipe, self.loop, self.log)
558 async def send():
559 try:
560 request_interface = await self.find_remote_and_interface(
561 request_message
562 )
563 request_interface.request(pipe)
564 except Exception as e:
565 pipe.add_exception(e)
566 return
568 self.loop.create_task(
569 send(),
570 name="Request processing of %r" % result,
571 )
572 return result
574 # the following are under consideration for moving into Site or something
575 # mixed into it
577 def render_to_pipe(self, pipe):
578 """Fill a pipe by running the site's render_to_pipe interface and
579 handling errors."""
581 pr_that_can_receive_errors = error_to_message(pipe, self.log)
583 run_driving_pipe(
584 pr_that_can_receive_errors,
585 self._render_to_pipe(pipe),
586 name="Rendering for %r" % pipe.request,
587 )
589 async def _render_to_pipe(self, pipe):
590 if self.serversite is None:
591 pipe.add_response(
592 Message(code=NOT_FOUND, payload=b"not a server"), is_last=True
593 )
594 return
596 return await self.serversite.render_to_pipe(pipe)
599class BaseRequest:
600 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`"""
603class BaseUnicastRequest(BaseRequest):
604 """A utility class that offers the :attr:`response_raising` and
605 :attr:`response_nonraising` alternatives to waiting for the
606 :attr:`response` future whose error states can be presented either as an
607 unsuccessful response (eg. 4.04) or an exception.
609 It also provides some internal tools for handling anything that has a
610 :attr:`response` future and an :attr:`observation`"""
612 @property
613 async def response_raising(self):
614 """An awaitable that returns if a response comes in and is successful,
615 otherwise raises generic network exception or a
616 :class:`.error.ResponseWrappingError` for unsuccessful responses.
618 Experimental Interface."""
620 response = await self.response
621 if not response.code.is_successful():
622 raise error.ResponseWrappingError(response)
624 return response
626 @property
627 async def response_nonraising(self):
628 """An awaitable that rather returns a 500ish fabricated message (as a
629 proxy would return) instead of raising an exception.
631 Experimental Interface."""
633 # FIXME: Can we smuggle error_to_message into the underlying pipe?
634 # That should make observe notifications into messages rather
635 # than exceptions as well, plus it has fallbacks for `e.to_message()`
636 # raising.
638 try:
639 return await self.response
640 except error.RenderableError as e:
641 return e.to_message()
642 except Exception:
643 return Message(code=INTERNAL_SERVER_ERROR)
646class Request(interfaces.Request, BaseUnicastRequest):
647 def __init__(self, pipe, loop, log):
648 self._pipe = pipe
650 self.response = loop.create_future()
652 if pipe.request.opt.observe == 0:
653 self.observation = ClientObservation()
654 else:
655 self.observation = None
657 self._runner = self._run()
658 self._runner.send(None)
660 def process(event):
661 try:
662 # would be great to have self or the runner as weak ref, but
663 # see ClientObservation.register_callback comments -- while
664 # that is around, we can't weakref here.
665 self._runner.send(event)
666 return True
667 except StopIteration:
668 return False
670 self._stop_interest = self._pipe.on_event(process)
672 self.log = log
674 self.response.add_done_callback(self._response_cancellation_handler)
676 def _response_cancellation_handler(self, response):
677 # Propagate cancellation to the runner (if interest in the first
678 # response is lost, there won't be observation items to pull out), but
679 # not general completion (because if it's completed and not cancelled,
680 # eg. when an observation is active)
681 if self.response.cancelled() and self._runner is not None:
682 # Dropping the only reference makes it stop with GeneratorExit,
683 # similar to a cancelled task
684 self._runner = None
685 self._stop_interest()
686 # Otherwise, there will be a runner still around, and it's its task to
687 # call _stop_interest.
689 @staticmethod
690 def _add_response_properties(response, request):
691 response.request = request
693 def _run(self):
694 # FIXME: This is in iterator form because it used to be a task that
695 # awaited futures, and that code could be easily converted to an
696 # iterator. I'm not sure that's a bad state here, but at least it
697 # should be a more conscious decision to make this an iterator rather
698 # than just having it happen to be one.
699 #
700 # FIXME: check that responses come from the same remmote as long as we're assuming unicast
702 first_event = yield None
704 if first_event.message is not None:
705 self._add_response_properties(first_event.message, self._pipe.request)
706 self.response.set_result(first_event.message)
707 else:
708 self.response.set_exception(first_event.exception)
709 if not isinstance(first_event.exception, error.Error):
710 self.log.warning(
711 "An exception that is not an aiocoap Error was raised "
712 "from a transport; please report this as a bug in "
713 "aiocoap: %r",
714 first_event.exception,
715 )
717 if self.observation is None:
718 if not first_event.is_last:
719 self.log.error(
720 "Pipe indicated more possible responses"
721 " while the Request handler would not know what to"
722 " do with them, stopping any further request."
723 )
724 self._stop_interest()
725 return
727 if first_event.is_last:
728 self.observation.error(error.NotObservable())
729 return
731 if first_event.message.opt.observe is None:
732 self.log.error(
733 "Pipe indicated more possible responses"
734 " while the Request handler would not know what to"
735 " do with them, stopping any further request."
736 )
737 self._stop_interest()
738 return
740 # variable names from RFC7641 Section 3.4
741 v1 = first_event.message.opt.observe
742 t1 = time.time()
744 while True:
745 # We don't really support cancellation of observations yet (see
746 # https://github.com/chrysn/aiocoap/issues/92), but at least
747 # stopping the interest is a way to free the local resources after
748 # the first observation update, and to make the MID handler RST the
749 # observation on the next.
750 # FIXME: there *is* now a .on_cancel callback, we should at least
751 # hook into that, and possibly even send a proper cancellation
752 # then.
753 next_event = yield True
754 if self.observation.cancelled:
755 self._stop_interest()
756 return
758 if next_event.exception is not None:
759 self.observation.error(next_event.exception)
760 if not next_event.is_last:
761 self._stop_interest()
762 if not isinstance(next_event.exception, error.Error):
763 self.log.warning(
764 "An exception that is not an aiocoap Error was "
765 "raised from a transport during an observation; "
766 "please report this as a bug in aiocoap: %r",
767 next_event.exception,
768 )
769 return
771 self._add_response_properties(next_event.message, self._pipe.request)
773 if next_event.message.opt.observe is not None:
774 # check for reordering
775 v2 = next_event.message.opt.observe
776 t2 = time.time()
778 is_recent = (
779 (v1 < v2 and v2 - v1 < 2**23)
780 or (v1 > v2 and v1 - v2 > 2**23)
781 or (
782 t2
783 > t1
784 + self._pipe.request.transport_tuning.OBSERVATION_RESET_TIME
785 )
786 )
787 if is_recent:
788 t1 = t2
789 v1 = v2
790 else:
791 # the terminal message is always the last
792 is_recent = True
794 if is_recent:
795 self.observation.callback(next_event.message)
797 if next_event.is_last:
798 self.observation.error(error.ObservationCancelled())
799 return
801 if next_event.message.opt.observe is None:
802 self.observation.error(error.ObservationCancelled())
803 self.log.error(
804 "Pipe indicated more possible responses"
805 " while the Request handler would not know what to"
806 " do with them, stopping any further request."
807 )
808 self._stop_interest()
809 return
812class BlockwiseRequest(BaseUnicastRequest, interfaces.Request):
813 def __init__(self, protocol, app_request):
814 self.protocol = protocol
815 self.log = self.protocol.log.getChild("blockwise-requester")
817 self.response = protocol.loop.create_future()
819 if app_request.opt.observe is not None:
820 self.observation = ClientObservation()
821 else:
822 self.observation = None
824 self._runner = protocol.loop.create_task(
825 self._run_outer(
826 app_request,
827 self.response,
828 weakref.ref(self.observation)
829 if self.observation is not None
830 else lambda: None,
831 self.protocol,
832 self.log,
833 ),
834 name="Blockwise runner for %r" % app_request,
835 )
836 self.response.add_done_callback(self._response_cancellation_handler)
838 def _response_cancellation_handler(self, response_future):
839 # see Request._response_cancellation_handler
840 if self.response.cancelled():
841 self._runner.cancel()
843 @classmethod
844 async def _run_outer(cls, app_request, response, weak_observation, protocol, log):
845 try:
846 await cls._run(app_request, response, weak_observation, protocol, log)
847 except asyncio.CancelledError:
848 pass # results already set
849 except Exception as e:
850 logged = False
851 if not response.done():
852 logged = True
853 response.set_exception(e)
854 obs = weak_observation()
855 if app_request.opt.observe is not None and obs is not None:
856 logged = True
857 obs.error(e)
858 if not logged:
859 # should be unreachable
860 log.error(
861 "Exception in BlockwiseRequest runner neither went to response nor to observation: %s",
862 e,
863 exc_info=e,
864 )
866 # This is a class method because that allows self and self.observation to
867 # be freed even when this task is running, and the task to stop itself --
868 # otherwise we couldn't know when users just "forget" about a request
869 # object after using its response (esp. in observe cases) and leave this
870 # task running.
871 @classmethod
872 async def _run(cls, app_request, response, weak_observation, protocol, log):
873 # we need to populate the remote right away, because the choice of
874 # blocks depends on it.
875 await protocol.find_remote_and_interface(app_request)
877 size_exp = app_request.remote.maximum_block_size_exp
879 if app_request.opt.block1 is not None:
880 warnings.warn(
881 "Setting a block1 option in a managed block-wise transfer is deprecated. Instead, set request.remote.maximum_block_size_exp to the desired value",
882 DeprecationWarning,
883 stacklevel=2,
884 )
885 assert app_request.opt.block1.block_number == 0, (
886 "Unexpected block number in app_request"
887 )
888 assert not app_request.opt.block1.more, (
889 "Unexpected more-flag in app_request"
890 )
891 # this is where the library user can traditionally pass in size
892 # exponent hints into the library.
893 size_exp = app_request.opt.block1.size_exponent
895 # Offset in the message in blocks of size_exp. Whoever changes size_exp
896 # is responsible for updating this number.
897 block_cursor = 0
899 while True:
900 # ... send a chunk
902 if size_exp >= 6:
903 # FIXME from maximum_payload_size
904 fragmentation_threshold = app_request.remote.maximum_payload_size
905 else:
906 fragmentation_threshold = 2 ** (size_exp + 4)
908 if (
909 app_request.opt.block1 is not None
910 or len(app_request.payload) > fragmentation_threshold
911 ):
912 current_block1 = app_request._extract_block(
913 block_cursor, size_exp, app_request.remote.maximum_payload_size
914 )
915 if block_cursor == 0:
916 current_block1.opt.size1 = len(app_request.payload)
917 else:
918 current_block1 = app_request
920 blockrequest = protocol.request(current_block1, handle_blockwise=False)
921 blockresponse = await blockrequest.response
923 # store for future blocks to ensure that the next blocks will be
924 # sent from the same source address (in the UDP case; for many
925 # other transports it won't matter). carrying along locally set block size limitation
926 if (
927 app_request.remote.maximum_block_size_exp
928 < blockresponse.remote.maximum_block_size_exp
929 ):
930 blockresponse.remote.maximum_block_size_exp = (
931 app_request.remote.maximum_block_size_exp
932 )
933 app_request.remote = blockresponse.remote
935 if blockresponse.opt.block1 is None:
936 if blockresponse.code.is_successful() and current_block1.opt.block1:
937 log.warning(
938 "Block1 option completely ignored by server, assuming it knows what it is doing."
939 )
940 # FIXME: handle 4.13 and retry with the indicated size option
941 break
943 block1 = blockresponse.opt.block1
944 log.debug(
945 "Response with Block1 option received, number = %d, more = %d, size_exp = %d.",
946 block1.block_number,
947 block1.more,
948 block1.size_exponent,
949 )
951 if block1.block_number != current_block1.opt.block1.block_number:
952 raise error.UnexpectedBlock1Option("Block number mismatch")
954 if size_exp == 7:
955 block_cursor += len(current_block1.payload) // 1024
956 else:
957 block_cursor += 1
959 while block1.size_exponent < size_exp:
960 block_cursor *= 2
961 size_exp -= 1
963 if not current_block1.opt.block1.more:
964 if block1.more or blockresponse.code == CONTINUE:
965 # treating this as a protocol error -- letting it slip
966 # through would misrepresent the whole operation as an
967 # over-all 2.xx (successful) one.
968 raise error.UnexpectedBlock1Option(
969 "Server asked for more data at end of body"
970 )
971 break
973 # checks before preparing the next round:
975 if blockresponse.opt.observe:
976 # we're not *really* interested in that block, we just sent an
977 # observe option to indicate that we'll want to observe the
978 # resulting representation as a whole
979 log.warning(
980 "Server answered Observe in early Block1 phase, cancelling the erroneous observation."
981 )
982 blockrequest.observe.cancel()
984 if block1.more:
985 # FIXME i think my own server is dowing this wrong
986 # if response.code != CONTINUE:
987 # raise error.UnexpectedBlock1Option("more-flag set but no Continue")
988 pass
989 else:
990 if not blockresponse.code.is_successful():
991 break
992 else:
993 # ignoring (discarding) the successful intermediate result, waiting for a final one
994 continue
996 lower_observation = None
997 if app_request.opt.observe is not None:
998 if blockresponse.opt.observe is not None:
999 lower_observation = blockrequest.observation
1000 else:
1001 obs = weak_observation()
1002 if obs:
1003 obs.error(error.NotObservable())
1004 del obs
1006 assert blockresponse is not None, "Block1 loop broke without setting a response"
1007 blockresponse.opt.block1 = None
1009 # FIXME check with RFC7959: it just says "send requests similar to the
1010 # requests in the Block1 phase", what does that mean? using the last
1011 # block1 as a reference for now, especially because in the
1012 # only-one-request-block case, that's the original request we must send
1013 # again and again anyway
1014 assembled_response = await cls._complete_by_requesting_block2(
1015 protocol, current_block1, blockresponse, log
1016 )
1018 response.set_result(assembled_response)
1019 # finally set the result
1021 if lower_observation is not None:
1022 # FIXME this can all be simplified a lot since it's no more
1023 # expected that observations shut themselves down when GC'd.
1024 obs = weak_observation()
1025 del weak_observation
1026 if obs is None:
1027 lower_observation.cancel()
1028 return
1029 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask
1030 subtask = asyncio.create_task(
1031 cls._run_observation(
1032 app_request,
1033 lower_observation,
1034 future_weak_observation,
1035 protocol,
1036 log,
1037 ),
1038 name="Blockwise observation for %r" % app_request,
1039 )
1040 future_weak_observation.set_result(
1041 weakref.ref(obs, lambda obs: subtask.cancel())
1042 )
1043 obs.on_cancel(subtask.cancel)
1044 del obs
1045 await subtask
1047 @classmethod
1048 async def _run_observation(
1049 cls, original_request, lower_observation, future_weak_observation, protocol, log
1050 ):
1051 weak_observation = await future_weak_observation
1052 # we can use weak_observation() here at any time, because whenever that
1053 # becomes None, this task gets cancelled
1054 try:
1055 async for block1_notification in lower_observation:
1056 log.debug("Notification received")
1057 full_notification = await cls._complete_by_requesting_block2(
1058 protocol, original_request, block1_notification, log
1059 )
1060 log.debug("Reporting completed notification")
1061 weak_observation().callback(full_notification)
1062 # FIXME verify that this loop actually ends iff the observation
1063 # was cancelled -- otherwise find out the cause(s) or make it not
1064 # cancel under indistinguishable circumstances
1065 weak_observation().error(error.ObservationCancelled())
1066 except asyncio.CancelledError:
1067 return
1068 except Exception as e:
1069 weak_observation().error(e)
1070 finally:
1071 # We generally avoid idempotent cancellation, but we may have
1072 # reached this point either due to an earlier cancellation or
1073 # without one
1074 if not lower_observation.cancelled:
1075 lower_observation.cancel()
1077 @classmethod
1078 async def _complete_by_requesting_block2(
1079 cls, protocol, request_to_repeat, initial_response, log
1080 ):
1081 # FIXME this can probably be deduplicated against BlockwiseRequest
1083 if (
1084 initial_response.opt.block2 is None
1085 or initial_response.opt.block2.more is False
1086 ):
1087 initial_response.opt.block2 = None
1088 return initial_response
1090 if initial_response.opt.block2.block_number != 0:
1091 log.error("Error assembling blockwise response (expected first block)")
1092 raise error.UnexpectedBlock2()
1094 assembled_response = initial_response
1095 last_response = initial_response
1096 while True:
1097 current_block2 = request_to_repeat._generate_next_block2_request(
1098 assembled_response
1099 )
1101 current_block2 = current_block2.copy(remote=initial_response.remote)
1103 blockrequest = protocol.request(current_block2, handle_blockwise=False)
1104 last_response = await blockrequest.response
1106 if last_response.opt.block2 is None:
1107 log.warning(
1108 "Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response."
1109 )
1110 return last_response
1112 block2 = last_response.opt.block2
1113 log.debug(
1114 "Response with Block2 option received, number = %d, more = %d, size_exp = %d.",
1115 block2.block_number,
1116 block2.more,
1117 block2.size_exponent,
1118 )
1119 try:
1120 assembled_response._append_response_block(last_response)
1121 except error.Error as e:
1122 log.error("Error assembling blockwise response, passing on error %r", e)
1123 raise
1125 if block2.more is False:
1126 return assembled_response
1129class ClientObservation:
1130 """An interface to observe notification updates arriving on a request.
1132 This class does not actually provide any of the observe functionality, it
1133 is purely a container for dispatching the messages via asynchronous
1134 iteration. It gets driven (ie. populated with responses or errors including
1135 observation termination) by a Request object.
1136 """
1138 def __init__(self):
1139 self.callbacks = []
1140 self.errbacks = []
1142 self.cancelled = False
1143 self._on_cancel = []
1145 self._latest_response = None
1146 # the analogous error is stored in _cancellation_reason when cancelled.
1148 def __aiter__(self):
1149 """`async for` interface to observations.
1151 This is the preferred interface to obtaining observations."""
1152 it = self._Iterator()
1153 self.register_callback(it.push, _suppress_deprecation=True)
1154 self.register_errback(it.push_err, _suppress_deprecation=True)
1155 return it
1157 class _Iterator:
1158 def __init__(self):
1159 self._future = asyncio.get_running_loop().create_future()
1161 def push(self, item):
1162 if self._future.done():
1163 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy
1164 self._future = asyncio.get_running_loop().create_future()
1165 self._future.set_result(item)
1167 def push_err(self, e):
1168 if self._future.done():
1169 self._future = asyncio.get_running_loop().create_future()
1170 self._future.set_exception(e)
1172 async def __anext__(self):
1173 f = self._future
1174 try:
1175 result = await self._future
1176 # FIXME see `await servobs._trigger` comment: might waiting for
1177 # the original future not yield the first future's result when
1178 # a quick second future comes in in a push?
1179 if f is self._future:
1180 self._future = asyncio.get_running_loop().create_future()
1181 return result
1182 except (error.NotObservable, error.ObservationCancelled):
1183 # only exit cleanly when the server -- right away or later --
1184 # states that the resource is not observable any more
1185 # FIXME: check whether an unsuccessful message is still passed
1186 # as an observation result (or whether it should be)
1187 raise StopAsyncIteration
1189 def __del__(self):
1190 if self._future.done():
1191 try:
1192 # Fetch the result so any errors show up at least in the
1193 # finalizer output
1194 self._future.result()
1195 except (error.ObservationCancelled, error.NotObservable):
1196 # This is the case at the end of an observation cancelled
1197 # by the server.
1198 pass
1199 except error.NetworkError:
1200 # This will already have shown up in the main result too.
1201 pass
1202 except (error.LibraryShutdown, asyncio.CancelledError):
1203 pass
1204 # Anything else flying out of this is unexpected and probably a
1205 # library error
1207 # When this function is removed, we can finally do cleanup better. Right
1208 # now, someone could register a callback that doesn't hold any references,
1209 # so we can't just stop the request when nobody holds a reference to this
1210 # any more. Once we're all in pull mode, we can make the `process` function
1211 # that sends data in here use a weak reference (because any possible
1212 # recipient would need to hold a reference to self or the iterator, and
1213 # thus _run).
1214 def register_callback(self, callback, _suppress_deprecation=False):
1215 """Call the callback whenever a response to the message comes in, and
1216 pass the response to it.
1218 The use of this function is deprecated: Use the asynchronous iteration
1219 interface instead."""
1220 if not _suppress_deprecation:
1221 warnings.warn(
1222 "register_callback on observe results is deprecated: Use `async for notify in request.observation` instead.",
1223 DeprecationWarning,
1224 stacklevel=2,
1225 )
1226 if self.cancelled:
1227 return
1229 self.callbacks.append(callback)
1230 if self._latest_response is not None:
1231 callback(self._latest_response)
1233 def register_errback(self, callback, _suppress_deprecation=False):
1234 """Call the callback whenever something goes wrong with the
1235 observation, and pass an exception to the callback. After such a
1236 callback is called, no more callbacks will be issued.
1238 The use of this function is deprecated: Use the asynchronous iteration
1239 interface instead."""
1240 if not _suppress_deprecation:
1241 warnings.warn(
1242 "register_errback on observe results is deprecated: Use `async for notify in request.observation` instead.",
1243 DeprecationWarning,
1244 stacklevel=2,
1245 )
1246 if self.cancelled:
1247 callback(self._cancellation_reason)
1248 return
1249 self.errbacks.append(callback)
1251 def callback(self, response):
1252 """Notify all listeners of an incoming response"""
1254 self._latest_response = response
1256 for c in self.callbacks:
1257 c(response)
1259 def error(self, exception):
1260 """Notify registered listeners that the observation went wrong. This
1261 can only be called once."""
1263 if self.errbacks is None:
1264 raise RuntimeError(
1265 "Error raised in an already cancelled ClientObservation"
1266 ) from exception
1267 for c in self.errbacks:
1268 c(exception)
1270 self.cancel()
1271 self._cancellation_reason = exception
1273 def cancel(self):
1274 # FIXME determine whether this is called by anything other than error,
1275 # and make it private so there is always a _cancellation_reason
1276 """Cease to generate observation or error events. This will not
1277 generate an error by itself.
1279 This function is only needed while register_callback and
1280 register_errback are around; once their deprecations are acted on,
1281 dropping the asynchronous iterator will automatically cancel the
1282 observation.
1283 """
1285 assert not self.cancelled, "ClientObservation cancelled twice"
1287 # make sure things go wrong when someone tries to continue this
1288 self.errbacks = None
1289 self.callbacks = None
1291 self.cancelled = True
1292 while self._on_cancel:
1293 self._on_cancel.pop()()
1295 self._cancellation_reason = None
1297 def on_cancel(self, callback):
1298 if self.cancelled:
1299 callback()
1300 self._on_cancel.append(callback)
1302 def __repr__(self):
1303 return "<%s %s at %#x>" % (
1304 type(self).__name__,
1305 "(cancelled)"
1306 if self.cancelled
1307 else "(%s call-, %s errback(s))"
1308 % (len(self.callbacks), len(self.errbacks)),
1309 id(self),
1310 )
1313class ServerObservation:
1314 def __init__(self):
1315 self._accepted = False
1316 self._trigger = asyncio.get_running_loop().create_future()
1317 # A deregistration is "early" if it happens before the response message
1318 # is actually sent; calling deregister() in that time (typically during
1319 # `render()`) will not send an unsuccessful response message but just
1320 # sent this flag which is set to None as soon as it is too late for an
1321 # early deregistration.
1322 # This mechanism is temporary until more of aiocoap behaves like
1323 # Pipe which does not suffer from this limitation.
1324 self._early_deregister = False
1325 self._late_deregister = False
1327 def accept(self, cancellation_callback):
1328 self._accepted = True
1329 self._cancellation_callback = cancellation_callback
1331 def deregister(self, reason=None):
1332 if self._early_deregister is False:
1333 self._early_deregister = True
1334 return
1336 warnings.warn(
1337 "Late use of ServerObservation.deregister() is"
1338 " deprecated, use .trigger with an unsuccessful value"
1339 " instead",
1340 DeprecationWarning,
1341 )
1342 self.trigger(
1343 Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable")
1344 )
1346 def trigger(self, response=None, *, is_last=False):
1347 """Send an updated response; if None is given, the observed resource's
1348 rendering will be invoked to produce one.
1350 `is_last` can be set to True to indicate that no more responses will be
1351 sent. Note that an unsuccessful response will be the last no matter
1352 what is_last says, as such a message always terminates a CoAP
1353 observation."""
1354 if is_last:
1355 self._late_deregister = True
1356 if self._trigger.done():
1357 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy
1358 self._trigger = asyncio.get_running_loop().create_future()
1359 self._trigger.set_result(response)