Coverage for aiocoap/protocol.py: 88%
495 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-05 18:37 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-05 18:37 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module contains the classes that are responsible for keeping track of
6messages:
8* :class:`Context` roughly represents the CoAP endpoint (basically a UDP
9 socket) -- something that can send requests and possibly can answer
10 incoming requests.
12 Incoming requests are processed in tasks created by the context.
14* a :class:`Request` gets generated whenever a request gets sent to keep
15 track of the response
17Logging
18~~~~~~~
20Several constructors of the Context accept a logger name; these names go into
21the construction of a Python logger.
23Log events will be emitted to these on different levels, with "warning" and
24above being a practical default for things that should may warrant reviewing by
25an operator:
27* DEBUG is used for things that occur even under perfect conditions.
28* INFO is for things that are well expected, but might be interesting during
29 testing a network of nodes and not just when debugging the library. (This
30 includes timeouts, retransmissions, and pings.)
31* WARNING is for everything that indicates a malbehaved peer. These don't
32 *necessarily* indicate a client bug, though: Things like requesting a
33 nonexistent block can just as well happen when a resource's content has
34 changed between blocks. The library will not go out of its way to determine
35 whether there is a plausible explanation for the odd behavior, and will
36 report something as a warning in case of doubt.
37* ERROR is used when something clearly went wrong. This includes irregular
38 connection terminations and resource handler errors (which are demoted to
39 error responses), and can often contain a backtrace.
41Logs will generally reveal messages exchanged between this and other systems,
42and attackers can observe their encrypted counterparts. Private or shared keys
43are only logged through an internal `log_secret` function, which usually
44replaces them with a redacted value. Setting the ``AIOCOAP_REVEAL_KEYS``
45environment variable to the value ``show secrets in logs`` bypasses that
46mechanism. As an additional precaution, this is only accepted if the effective
47user has write access to the aiocoap source code.
48"""
50import asyncio
51import weakref
52import time
53from typing import Optional, List
55from . import defaults
56from .credentials import CredentialsMap
57from .message import Message
58from .messagemanager import MessageManager
59from .tokenmanager import TokenManager
60from .pipe import Pipe, run_driving_pipe, error_to_message
61from . import interfaces
62from . import error
63from .numbers import INTERNAL_SERVER_ERROR, NOT_FOUND, CONTINUE, SHUTDOWN_TIMEOUT
65import warnings
66import logging
69class Context(interfaces.RequestProvider):
70 """Applications' entry point to the network
72 A :class:`.Context` coordinates one or more network :mod:`.transports`
73 implementations and dispatches data between them and the application.
75 The application can start requests using the message dispatch methods, and
76 set a :class:`resources.Site` that will answer requests directed to the
77 application as a server.
79 On the library-internals side, it is the prime implementation of the
80 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and
81 :class:`Response` classes on demand, and decides which transport
82 implementations to start and which are to handle which messages.
84 **Context creation and destruction**
86 The following functions are provided for creating and stopping a context:
88 .. note::
90 A typical application should only ever create one context, even (or
91 especially when) it acts both as a server and as a client (in which
92 case a server context should be created).
94 A context that is not used any more must be shut down using
95 :meth:`.shutdown()`, but typical applications will not need to because
96 they use the context for the full process lifetime.
98 .. automethod:: create_client_context
99 .. automethod:: create_server_context
101 .. automethod:: shutdown
103 **Dispatching messages**
105 CoAP requests can be sent using the following functions:
107 .. automethod:: request
109 If more control is needed, you can create a :class:`Request` yourself and
110 pass the context to it.
113 **Other methods and properties**
115 The remaining methods and properties are to be considered unstable even
116 when the project reaches a stable version number; please file a feature
117 request for stabilization if you want to reliably access any of them.
118 """
120 def __init__(
121 self,
122 loop=None,
123 serversite=None,
124 loggername="coap",
125 client_credentials=None,
126 server_credentials=None,
127 ):
128 self.log = logging.getLogger(loggername)
130 self.loop = loop or asyncio.get_event_loop()
132 self.serversite = serversite
134 self.request_interfaces = []
136 self.client_credentials = client_credentials or CredentialsMap()
137 self.server_credentials = server_credentials or CredentialsMap()
139 #
140 # convenience methods for class instantiation
141 #
143 async def _append_tokenmanaged_messagemanaged_transport(
144 self, message_interface_constructor
145 ):
146 tman = TokenManager(self)
147 mman = MessageManager(tman)
148 transport = await message_interface_constructor(mman)
150 mman.message_interface = transport
151 tman.token_interface = mman
153 self.request_interfaces.append(tman)
155 async def _append_tokenmanaged_transport(self, token_interface_constructor):
156 tman = TokenManager(self)
157 transport = await token_interface_constructor(tman)
159 tman.token_interface = transport
161 self.request_interfaces.append(tman)
163 @classmethod
164 async def create_client_context(
165 cls, *, loggername="coap", loop=None, transports: Optional[List[str]] = None
166 ):
167 """Create a context bound to all addresses on a random listening port.
169 This is the easiest way to get a context suitable for sending client
170 requests.
172 :meta private:
173 (not actually private, just hiding from automodule due to being
174 grouped with the important functions)
175 """
177 if loop is None:
178 loop = asyncio.get_event_loop()
180 self = cls(loop=loop, serversite=None, loggername=loggername)
182 selected_transports = transports or defaults.get_default_clienttransports(
183 loop=loop
184 )
186 # FIXME make defaults overridable (postponed until they become configurable too)
187 for transportname in selected_transports:
188 if transportname == "udp6":
189 from .transports.udp6 import MessageInterfaceUDP6
191 await self._append_tokenmanaged_messagemanaged_transport(
192 lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint(
193 mman, log=self.log, loop=loop
194 )
195 )
196 elif transportname == "simple6":
197 from .transports.simple6 import MessageInterfaceSimple6
199 await self._append_tokenmanaged_messagemanaged_transport(
200 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(
201 mman, log=self.log, loop=loop
202 )
203 )
204 elif transportname == "tinydtls":
205 from .transports.tinydtls import MessageInterfaceTinyDTLS
207 await self._append_tokenmanaged_messagemanaged_transport(
208 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(
209 mman, log=self.log, loop=loop
210 )
211 )
212 elif transportname == "tcpclient":
213 from .transports.tcp import TCPClient
215 await self._append_tokenmanaged_transport(
216 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)
217 )
218 elif transportname == "tlsclient":
219 from .transports.tls import TLSClient
221 await self._append_tokenmanaged_transport(
222 lambda tman: TLSClient.create_client_transport(
223 tman, self.log, loop, self.client_credentials
224 )
225 )
226 elif transportname == "ws":
227 from .transports.ws import WSPool
229 await self._append_tokenmanaged_transport(
230 lambda tman: WSPool.create_transport(
231 tman, self.log, loop, client_credentials=self.client_credentials
232 )
233 )
234 elif transportname == "oscore":
235 from .transports.oscore import TransportOSCORE
237 oscoretransport = TransportOSCORE(self, self)
238 self.request_interfaces.append(oscoretransport)
239 else:
240 raise RuntimeError(
241 "Transport %r not know for client context creation" % transportname
242 )
244 return self
246 @classmethod
247 async def create_server_context(
248 cls,
249 site,
250 bind=None,
251 *,
252 loggername="coap-server",
253 loop=None,
254 _ssl_context=None,
255 multicast=[],
256 server_credentials=None,
257 transports: Optional[List[str]] = None,
258 ):
259 """Create a context, bound to all addresses on the CoAP port (unless
260 otherwise specified in the ``bind`` argument).
262 This is the easiest way to get a context suitable both for sending
263 client and accepting server requests.
265 The ``bind`` argument, if given, needs to be a 2-tuple of IP address
266 string and port number, where the port number can be None to use the default port.
268 If ``multicast`` is given, it needs to be a list of (multicast address,
269 interface name) tuples, which will all be joined. (The IPv4 style of
270 selecting the interface by a local address is not supported; users may
271 want to use the netifaces package to arrive at an interface name for an
272 address).
274 As a shortcut, the list may also contain interface names alone. Those
275 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with
276 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6.
278 Under some circumstances you may already need a context to pass into
279 the site for creation; this is typically the case for servers that
280 trigger requests on their own. For those cases, it is usually easiest
281 to pass None in as a site, and set the fully constructed site later by
282 assigning to the ``serversite`` attribute.
284 :meta private:
285 (not actually private, just hiding from automodule due to being
286 grouped with the important functions)
287 """
289 if loop is None:
290 loop = asyncio.get_event_loop()
292 self = cls(
293 loop=loop,
294 serversite=site,
295 loggername=loggername,
296 server_credentials=server_credentials,
297 )
299 multicast_done = not multicast
301 selected_transports = transports or defaults.get_default_servertransports(
302 loop=loop
303 )
305 for transportname in selected_transports:
306 if transportname == "udp6":
307 from .transports.udp6 import MessageInterfaceUDP6
309 await self._append_tokenmanaged_messagemanaged_transport(
310 lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(
311 mman, log=self.log, loop=loop, bind=bind, multicast=multicast
312 )
313 )
314 multicast_done = True
315 # FIXME this is duplicated from the client version, as those are client-only anyway
316 elif transportname == "simple6":
317 from .transports.simple6 import MessageInterfaceSimple6
319 await self._append_tokenmanaged_messagemanaged_transport(
320 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(
321 mman, log=self.log, loop=loop
322 )
323 )
324 elif transportname == "tinydtls":
325 from .transports.tinydtls import MessageInterfaceTinyDTLS
327 await self._append_tokenmanaged_messagemanaged_transport(
328 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(
329 mman, log=self.log, loop=loop
330 )
331 )
332 # FIXME end duplication
333 elif transportname == "tinydtls_server":
334 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer
336 await self._append_tokenmanaged_messagemanaged_transport(
337 lambda mman: MessageInterfaceTinyDTLSServer.create_server(
338 bind,
339 mman,
340 log=self.log,
341 loop=loop,
342 server_credentials=self.server_credentials,
343 )
344 )
345 elif transportname == "simplesocketserver":
346 from .transports.simplesocketserver import MessageInterfaceSimpleServer
348 await self._append_tokenmanaged_messagemanaged_transport(
349 lambda mman: MessageInterfaceSimpleServer.create_server(
350 bind, mman, log=self.log, loop=loop
351 )
352 )
353 elif transportname == "tcpserver":
354 from .transports.tcp import TCPServer
356 await self._append_tokenmanaged_transport(
357 lambda tman: TCPServer.create_server(bind, tman, self.log, loop)
358 )
359 elif transportname == "tcpclient":
360 from .transports.tcp import TCPClient
362 await self._append_tokenmanaged_transport(
363 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)
364 )
365 elif transportname == "tlsserver":
366 if _ssl_context is not None:
367 from .transports.tls import TLSServer
369 await self._append_tokenmanaged_transport(
370 lambda tman: TLSServer.create_server(
371 bind, tman, self.log, loop, _ssl_context
372 )
373 )
374 elif transportname == "tlsclient":
375 from .transports.tls import TLSClient
377 await self._append_tokenmanaged_transport(
378 lambda tman: TLSClient.create_client_transport(
379 tman, self.log, loop, self.client_credentials
380 )
381 )
382 elif transportname == "ws":
383 from .transports.ws import WSPool
385 await self._append_tokenmanaged_transport(
386 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind
387 lambda tman: WSPool.create_transport(
388 tman,
389 self.log,
390 loop,
391 client_credentials=self.client_credentials,
392 server_bind=bind or (None, None),
393 server_context=_ssl_context,
394 )
395 )
396 elif transportname == "oscore":
397 from .transports.oscore import TransportOSCORE
399 oscoretransport = TransportOSCORE(self, self)
400 self.request_interfaces.append(oscoretransport)
401 else:
402 raise RuntimeError(
403 "Transport %r not know for server context creation" % transportname
404 )
406 if not multicast_done:
407 self.log.warning(
408 "Multicast was requested, but no multicast capable transport was selected."
409 )
411 # This is used in tests to wait for externally launched servers to be ready
412 self.log.debug("Server ready to receive requests")
414 return self
416 async def shutdown(self):
417 """Take down any listening sockets and stop all related timers.
419 After this coroutine terminates, and once all external references to
420 the object are dropped, it should be garbage-collectable.
422 This method takes up to
423 :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing
424 transports to perform any cleanup implemented in them (such as orderly
425 connection shutdown and cancelling observations, where the latter is
426 currently not implemented).
428 :meta private:
429 (not actually private, just hiding from automodule due to being
430 grouped with the important functions)
431 """
433 self.log.debug("Shutting down context")
435 done, pending = await asyncio.wait(
436 [
437 asyncio.create_task(
438 ri.shutdown(),
439 name="Shutdown of %r" % ri,
440 )
441 for ri in self.request_interfaces
442 ],
443 timeout=SHUTDOWN_TIMEOUT,
444 )
445 for item in done:
446 await item
447 if pending:
448 # Apart from being useful to see, this also ensures that developers
449 # see the error in the logs during test suite runs -- and the error
450 # should be easier to follow than the "we didn't garbage collect
451 # everything" errors we see anyway (or otherwise, if the error is
452 # escalated into a test failure)
453 self.log.error(
454 "Shutdown timeout exceeded, returning anyway. Interfaces still busy: %s",
455 pending,
456 )
458 # FIXME: determine how official this should be, or which part of it is
459 # public -- now that BlockwiseRequest uses it. (And formalize what can
460 # change about messages and what can't after the remote has been thusly
461 # populated).
462 async def find_remote_and_interface(self, message):
463 if message.remote is None:
464 raise error.MissingRemoteError()
465 for ri in self.request_interfaces:
466 if await ri.fill_or_recognize_remote(message):
467 return ri
468 raise error.NoRequestInterface()
470 def request(self, request_message, handle_blockwise=True):
471 if handle_blockwise:
472 return BlockwiseRequest(self, request_message)
474 pipe = Pipe(request_message, self.log)
475 # Request sets up callbacks at creation
476 result = Request(pipe, self.loop, self.log)
478 async def send():
479 try:
480 request_interface = await self.find_remote_and_interface(
481 request_message
482 )
483 request_interface.request(pipe)
484 except Exception as e:
485 pipe.add_exception(e)
486 return
488 self.loop.create_task(
489 send(),
490 name="Request processing of %r" % result,
491 )
492 return result
494 # the following are under consideration for moving into Site or something
495 # mixed into it
497 def render_to_pipe(self, pipe):
498 """Fill a pipe by running the site's render_to_pipe interface and
499 handling errors."""
501 pr_that_can_receive_errors = error_to_message(pipe, self.log)
503 run_driving_pipe(
504 pr_that_can_receive_errors,
505 self._render_to_pipe(pipe),
506 name="Rendering for %r" % pipe.request,
507 )
509 async def _render_to_pipe(self, pipe):
510 if self.serversite is None:
511 pipe.add_response(
512 Message(code=NOT_FOUND, payload=b"not a server"), is_last=True
513 )
514 return
516 return await self.serversite.render_to_pipe(pipe)
519class BaseRequest:
520 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`"""
523class BaseUnicastRequest(BaseRequest):
524 """A utility class that offers the :attr:`response_raising` and
525 :attr:`response_nonraising` alternatives to waiting for the
526 :attr:`response` future whose error states can be presented either as an
527 unsuccessful response (eg. 4.04) or an exception.
529 It also provides some internal tools for handling anything that has a
530 :attr:`response` future and an :attr:`observation`"""
532 @property
533 async def response_raising(self):
534 """An awaitable that returns if a response comes in and is successful,
535 otherwise raises generic network exception or a
536 :class:`.error.ResponseWrappingError` for unsuccessful responses.
538 Experimental Interface."""
540 response = await self.response
541 if not response.code.is_successful():
542 raise error.ResponseWrappingError(response)
544 return response
546 @property
547 async def response_nonraising(self):
548 """An awaitable that rather returns a 500ish fabricated message (as a
549 proxy would return) instead of raising an exception.
551 Experimental Interface."""
553 # FIXME: Can we smuggle error_to_message into the underlying pipe?
554 # That should make observe notifications into messages rather
555 # than exceptions as well, plus it has fallbacks for `e.to_message()`
556 # raising.
558 try:
559 return await self.response
560 except error.RenderableError as e:
561 return e.to_message()
562 except Exception:
563 return Message(code=INTERNAL_SERVER_ERROR)
566class Request(interfaces.Request, BaseUnicastRequest):
567 def __init__(self, pipe, loop, log):
568 self._pipe = pipe
570 self.response = loop.create_future()
572 if pipe.request.opt.observe == 0:
573 self.observation = ClientObservation()
574 else:
575 self.observation = None
577 self._runner = self._run()
578 self._runner.send(None)
580 def process(event):
581 try:
582 # would be great to have self or the runner as weak ref, but
583 # see ClientObservation.register_callback comments -- while
584 # that is around, we can't weakref here.
585 self._runner.send(event)
586 return True
587 except StopIteration:
588 return False
590 self._stop_interest = self._pipe.on_event(process)
592 self.log = log
594 self.response.add_done_callback(self._response_cancellation_handler)
596 def _response_cancellation_handler(self, response):
597 # Propagate cancellation to the runner (if interest in the first
598 # response is lost, there won't be observation items to pull out), but
599 # not general completion (because if it's completed and not cancelled,
600 # eg. when an observation is active)
601 if self.response.cancelled() and self._runner is not None:
602 # Dropping the only reference makes it stop with GeneratorExit,
603 # similar to a cancelled task
604 self._runner = None
605 self._stop_interest()
606 # Otherwise, there will be a runner still around, and it's its task to
607 # call _stop_interest.
609 @staticmethod
610 def _add_response_properties(response, request):
611 response.request = request
613 def _run(self):
614 # FIXME: This is in iterator form because it used to be a task that
615 # awaited futures, and that code could be easily converted to an
616 # iterator. I'm not sure that's a bad state here, but at least it
617 # should be a more conscious decision to make this an iterator rather
618 # than just having it happen to be one.
619 #
620 # FIXME: check that responses come from the same remmote as long as we're assuming unicast
622 first_event = yield None
624 if first_event.message is not None:
625 self._add_response_properties(first_event.message, self._pipe.request)
626 self.response.set_result(first_event.message)
627 else:
628 self.response.set_exception(first_event.exception)
629 if not isinstance(first_event.exception, error.Error):
630 self.log.warning(
631 "An exception that is not an aiocoap Error was raised "
632 "from a transport; please report this as a bug in "
633 "aiocoap: %r",
634 first_event.exception,
635 )
637 if self.observation is None:
638 if not first_event.is_last:
639 self.log.error(
640 "Pipe indicated more possible responses"
641 " while the Request handler would not know what to"
642 " do with them, stopping any further request."
643 )
644 self._stop_interest()
645 return
647 if first_event.is_last:
648 self.observation.error(error.NotObservable())
649 return
651 if first_event.message.opt.observe is None:
652 self.log.error(
653 "Pipe indicated more possible responses"
654 " while the Request handler would not know what to"
655 " do with them, stopping any further request."
656 )
657 self._stop_interest()
658 return
660 # variable names from RFC7641 Section 3.4
661 v1 = first_event.message.opt.observe
662 t1 = time.time()
664 while True:
665 # We don't really support cancellation of observations yet (see
666 # https://github.com/chrysn/aiocoap/issues/92), but at least
667 # stopping the interest is a way to free the local resources after
668 # the first observation update, and to make the MID handler RST the
669 # observation on the next.
670 # FIXME: there *is* now a .on_cancel callback, we should at least
671 # hook into that, and possibly even send a proper cancellation
672 # then.
673 next_event = yield True
674 if self.observation.cancelled:
675 self._stop_interest()
676 return
678 if next_event.exception is not None:
679 self.observation.error(next_event.exception)
680 if not next_event.is_last:
681 self._stop_interest()
682 if not isinstance(next_event.exception, error.Error):
683 self.log.warning(
684 "An exception that is not an aiocoap Error was "
685 "raised from a transport during an observation; "
686 "please report this as a bug in aiocoap: %r",
687 next_event.exception,
688 )
689 return
691 self._add_response_properties(next_event.message, self._pipe.request)
693 if next_event.message.opt.observe is not None:
694 # check for reordering
695 v2 = next_event.message.opt.observe
696 t2 = time.time()
698 is_recent = (
699 (v1 < v2 and v2 - v1 < 2**23)
700 or (v1 > v2 and v1 - v2 > 2**23)
701 or (
702 t2
703 > t1
704 + self._pipe.request.transport_tuning.OBSERVATION_RESET_TIME
705 )
706 )
707 if is_recent:
708 t1 = t2
709 v1 = v2
710 else:
711 # the terminal message is always the last
712 is_recent = True
714 if is_recent:
715 self.observation.callback(next_event.message)
717 if next_event.is_last:
718 self.observation.error(error.ObservationCancelled())
719 return
721 if next_event.message.opt.observe is None:
722 self.observation.error(error.ObservationCancelled())
723 self.log.error(
724 "Pipe indicated more possible responses"
725 " while the Request handler would not know what to"
726 " do with them, stopping any further request."
727 )
728 self._stop_interest()
729 return
732class BlockwiseRequest(BaseUnicastRequest, interfaces.Request):
733 def __init__(self, protocol, app_request):
734 self.protocol = protocol
735 self.log = self.protocol.log.getChild("blockwise-requester")
737 self.response = protocol.loop.create_future()
739 if app_request.opt.observe is not None:
740 self.observation = ClientObservation()
741 else:
742 self.observation = None
744 self._runner = protocol.loop.create_task(
745 self._run_outer(
746 app_request,
747 self.response,
748 weakref.ref(self.observation)
749 if self.observation is not None
750 else lambda: None,
751 self.protocol,
752 self.log,
753 ),
754 name="Blockwise runner for %r" % app_request,
755 )
756 self.response.add_done_callback(self._response_cancellation_handler)
758 def _response_cancellation_handler(self, response_future):
759 # see Request._response_cancellation_handler
760 if self.response.cancelled():
761 self._runner.cancel()
763 @classmethod
764 async def _run_outer(cls, app_request, response, weak_observation, protocol, log):
765 try:
766 await cls._run(app_request, response, weak_observation, protocol, log)
767 except asyncio.CancelledError:
768 pass # results already set
769 except Exception as e:
770 logged = False
771 if not response.done():
772 logged = True
773 response.set_exception(e)
774 obs = weak_observation()
775 if app_request.opt.observe is not None and obs is not None:
776 logged = True
777 obs.error(e)
778 if not logged:
779 # should be unreachable
780 log.error(
781 "Exception in BlockwiseRequest runner neither went to response nor to observation: %s",
782 e,
783 exc_info=e,
784 )
786 # This is a class method because that allows self and self.observation to
787 # be freed even when this task is running, and the task to stop itself --
788 # otherwise we couldn't know when users just "forget" about a request
789 # object after using its response (esp. in observe cases) and leave this
790 # task running.
791 @classmethod
792 async def _run(cls, app_request, response, weak_observation, protocol, log):
793 # we need to populate the remote right away, because the choice of
794 # blocks depends on it.
795 await protocol.find_remote_and_interface(app_request)
797 size_exp = app_request.remote.maximum_block_size_exp
799 if app_request.opt.block1 is not None:
800 warnings.warn(
801 "Setting a block1 option in a managed block-wise transfer is deprecated. Instead, set request.remote.maximum_block_size_exp to the desired value",
802 DeprecationWarning,
803 stacklevel=2,
804 )
805 assert app_request.opt.block1.block_number == 0, (
806 "Unexpected block number in app_request"
807 )
808 assert not app_request.opt.block1.more, (
809 "Unexpected more-flag in app_request"
810 )
811 # this is where the library user can traditionally pass in size
812 # exponent hints into the library.
813 size_exp = app_request.opt.block1.size_exponent
815 # Offset in the message in blocks of size_exp. Whoever changes size_exp
816 # is responsible for updating this number.
817 block_cursor = 0
819 while True:
820 # ... send a chunk
822 if size_exp >= 6:
823 # FIXME from maximum_payload_size
824 fragmentation_threshold = app_request.remote.maximum_payload_size
825 else:
826 fragmentation_threshold = 2 ** (size_exp + 4)
828 if (
829 app_request.opt.block1 is not None
830 or len(app_request.payload) > fragmentation_threshold
831 ):
832 current_block1 = app_request._extract_block(
833 block_cursor, size_exp, app_request.remote.maximum_payload_size
834 )
835 if block_cursor == 0:
836 current_block1.opt.size1 = len(app_request.payload)
837 else:
838 current_block1 = app_request
840 blockrequest = protocol.request(current_block1, handle_blockwise=False)
841 blockresponse = await blockrequest.response
843 # store for future blocks to ensure that the next blocks will be
844 # sent from the same source address (in the UDP case; for many
845 # other transports it won't matter). carrying along locally set block size limitation
846 if (
847 app_request.remote.maximum_block_size_exp
848 < blockresponse.remote.maximum_block_size_exp
849 ):
850 blockresponse.remote.maximum_block_size_exp = (
851 app_request.remote.maximum_block_size_exp
852 )
853 app_request.remote = blockresponse.remote
855 if blockresponse.opt.block1 is None:
856 if blockresponse.code.is_successful() and current_block1.opt.block1:
857 log.warning(
858 "Block1 option completely ignored by server, assuming it knows what it is doing."
859 )
860 # FIXME: handle 4.13 and retry with the indicated size option
861 break
863 block1 = blockresponse.opt.block1
864 log.debug(
865 "Response with Block1 option received, number = %d, more = %d, size_exp = %d.",
866 block1.block_number,
867 block1.more,
868 block1.size_exponent,
869 )
871 if block1.block_number != current_block1.opt.block1.block_number:
872 raise error.UnexpectedBlock1Option("Block number mismatch")
874 if size_exp == 7:
875 block_cursor += len(current_block1.payload) // 1024
876 else:
877 block_cursor += 1
879 while block1.size_exponent < size_exp:
880 block_cursor *= 2
881 size_exp -= 1
883 if not current_block1.opt.block1.more:
884 if block1.more or blockresponse.code == CONTINUE:
885 # treating this as a protocol error -- letting it slip
886 # through would misrepresent the whole operation as an
887 # over-all 2.xx (successful) one.
888 raise error.UnexpectedBlock1Option(
889 "Server asked for more data at end of body"
890 )
891 break
893 # checks before preparing the next round:
895 if blockresponse.opt.observe:
896 # we're not *really* interested in that block, we just sent an
897 # observe option to indicate that we'll want to observe the
898 # resulting representation as a whole
899 log.warning(
900 "Server answered Observe in early Block1 phase, cancelling the erroneous observation."
901 )
902 blockrequest.observe.cancel()
904 if block1.more:
905 # FIXME i think my own server is dowing this wrong
906 # if response.code != CONTINUE:
907 # raise error.UnexpectedBlock1Option("more-flag set but no Continue")
908 pass
909 else:
910 if not blockresponse.code.is_successful():
911 break
912 else:
913 # ignoring (discarding) the successful intermediate result, waiting for a final one
914 continue
916 lower_observation = None
917 if app_request.opt.observe is not None:
918 if blockresponse.opt.observe is not None:
919 lower_observation = blockrequest.observation
920 else:
921 obs = weak_observation()
922 if obs:
923 obs.error(error.NotObservable())
924 del obs
926 assert blockresponse is not None, "Block1 loop broke without setting a response"
927 blockresponse.opt.block1 = None
929 # FIXME check with RFC7959: it just says "send requests similar to the
930 # requests in the Block1 phase", what does that mean? using the last
931 # block1 as a reference for now, especially because in the
932 # only-one-request-block case, that's the original request we must send
933 # again and again anyway
934 assembled_response = await cls._complete_by_requesting_block2(
935 protocol, current_block1, blockresponse, log
936 )
938 response.set_result(assembled_response)
939 # finally set the result
941 if lower_observation is not None:
942 # FIXME this can all be simplified a lot since it's no more
943 # expected that observations shut themselves down when GC'd.
944 obs = weak_observation()
945 del weak_observation
946 if obs is None:
947 lower_observation.cancel()
948 return
949 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask
950 subtask = asyncio.create_task(
951 cls._run_observation(
952 app_request,
953 lower_observation,
954 future_weak_observation,
955 protocol,
956 log,
957 ),
958 name="Blockwise observation for %r" % app_request,
959 )
960 future_weak_observation.set_result(
961 weakref.ref(obs, lambda obs: subtask.cancel())
962 )
963 obs.on_cancel(subtask.cancel)
964 del obs
965 await subtask
967 @classmethod
968 async def _run_observation(
969 cls, original_request, lower_observation, future_weak_observation, protocol, log
970 ):
971 weak_observation = await future_weak_observation
972 # we can use weak_observation() here at any time, because whenever that
973 # becomes None, this task gets cancelled
974 try:
975 async for block1_notification in lower_observation:
976 log.debug("Notification received")
977 full_notification = await cls._complete_by_requesting_block2(
978 protocol, original_request, block1_notification, log
979 )
980 log.debug("Reporting completed notification")
981 weak_observation().callback(full_notification)
982 # FIXME verify that this loop actually ends iff the observation
983 # was cancelled -- otherwise find out the cause(s) or make it not
984 # cancel under indistinguishable circumstances
985 weak_observation().error(error.ObservationCancelled())
986 except asyncio.CancelledError:
987 return
988 except Exception as e:
989 weak_observation().error(e)
990 finally:
991 # We generally avoid idempotent cancellation, but we may have
992 # reached this point either due to an earlier cancellation or
993 # without one
994 if not lower_observation.cancelled:
995 lower_observation.cancel()
997 @classmethod
998 async def _complete_by_requesting_block2(
999 cls, protocol, request_to_repeat, initial_response, log
1000 ):
1001 # FIXME this can probably be deduplicated against BlockwiseRequest
1003 if (
1004 initial_response.opt.block2 is None
1005 or initial_response.opt.block2.more is False
1006 ):
1007 initial_response.opt.block2 = None
1008 return initial_response
1010 if initial_response.opt.block2.block_number != 0:
1011 log.error("Error assembling blockwise response (expected first block)")
1012 raise error.UnexpectedBlock2()
1014 assembled_response = initial_response
1015 last_response = initial_response
1016 while True:
1017 current_block2 = request_to_repeat._generate_next_block2_request(
1018 assembled_response
1019 )
1021 current_block2 = current_block2.copy(remote=initial_response.remote)
1023 blockrequest = protocol.request(current_block2, handle_blockwise=False)
1024 last_response = await blockrequest.response
1026 if last_response.opt.block2 is None:
1027 log.warning(
1028 "Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response."
1029 )
1030 return last_response
1032 block2 = last_response.opt.block2
1033 log.debug(
1034 "Response with Block2 option received, number = %d, more = %d, size_exp = %d.",
1035 block2.block_number,
1036 block2.more,
1037 block2.size_exponent,
1038 )
1039 try:
1040 assembled_response._append_response_block(last_response)
1041 except error.Error as e:
1042 log.error("Error assembling blockwise response, passing on error %r", e)
1043 raise
1045 if block2.more is False:
1046 return assembled_response
1049class ClientObservation:
1050 """An interface to observe notification updates arriving on a request.
1052 This class does not actually provide any of the observe functionality, it
1053 is purely a container for dispatching the messages via asynchronous
1054 iteration. It gets driven (ie. populated with responses or errors including
1055 observation termination) by a Request object.
1056 """
1058 def __init__(self):
1059 self.callbacks = []
1060 self.errbacks = []
1062 self.cancelled = False
1063 self._on_cancel = []
1065 self._latest_response = None
1066 # the analogous error is stored in _cancellation_reason when cancelled.
1068 def __aiter__(self):
1069 """`async for` interface to observations.
1071 This is the preferred interface to obtaining observations."""
1072 it = self._Iterator()
1073 self.register_callback(it.push, _suppress_deprecation=True)
1074 self.register_errback(it.push_err, _suppress_deprecation=True)
1075 return it
1077 class _Iterator:
1078 def __init__(self):
1079 self._future = asyncio.get_event_loop().create_future()
1081 def push(self, item):
1082 if self._future.done():
1083 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy
1084 self._future = asyncio.get_event_loop().create_future()
1085 self._future.set_result(item)
1087 def push_err(self, e):
1088 if self._future.done():
1089 self._future = asyncio.get_event_loop().create_future()
1090 self._future.set_exception(e)
1092 async def __anext__(self):
1093 f = self._future
1094 try:
1095 result = await self._future
1096 # FIXME see `await servobs._trigger` comment: might waiting for
1097 # the original future not yield the first future's result when
1098 # a quick second future comes in in a push?
1099 if f is self._future:
1100 self._future = asyncio.get_event_loop().create_future()
1101 return result
1102 except (error.NotObservable, error.ObservationCancelled):
1103 # only exit cleanly when the server -- right away or later --
1104 # states that the resource is not observable any more
1105 # FIXME: check whether an unsuccessful message is still passed
1106 # as an observation result (or whether it should be)
1107 raise StopAsyncIteration
1109 def __del__(self):
1110 if self._future.done():
1111 try:
1112 # Fetch the result so any errors show up at least in the
1113 # finalizer output
1114 self._future.result()
1115 except (error.ObservationCancelled, error.NotObservable):
1116 # This is the case at the end of an observation cancelled
1117 # by the server.
1118 pass
1119 except error.NetworkError:
1120 # This will already have shown up in the main result too.
1121 pass
1122 except (error.LibraryShutdown, asyncio.CancelledError):
1123 pass
1124 # Anything else flying out of this is unexpected and probably a
1125 # library error
1127 # When this function is removed, we can finally do cleanup better. Right
1128 # now, someone could register a callback that doesn't hold any references,
1129 # so we can't just stop the request when nobody holds a reference to this
1130 # any more. Once we're all in pull mode, we can make the `process` function
1131 # that sends data in here use a weak reference (because any possible
1132 # recipient would need to hold a reference to self or the iterator, and
1133 # thus _run).
1134 def register_callback(self, callback, _suppress_deprecation=False):
1135 """Call the callback whenever a response to the message comes in, and
1136 pass the response to it.
1138 The use of this function is deprecated: Use the asynchronous iteration
1139 interface instead."""
1140 if not _suppress_deprecation:
1141 warnings.warn(
1142 "register_callback on observe results is deprecated: Use `async for notify in request.observation` instead.",
1143 DeprecationWarning,
1144 stacklevel=2,
1145 )
1146 if self.cancelled:
1147 return
1149 self.callbacks.append(callback)
1150 if self._latest_response is not None:
1151 callback(self._latest_response)
1153 def register_errback(self, callback, _suppress_deprecation=False):
1154 """Call the callback whenever something goes wrong with the
1155 observation, and pass an exception to the callback. After such a
1156 callback is called, no more callbacks will be issued.
1158 The use of this function is deprecated: Use the asynchronous iteration
1159 interface instead."""
1160 if not _suppress_deprecation:
1161 warnings.warn(
1162 "register_errback on observe results is deprecated: Use `async for notify in request.observation` instead.",
1163 DeprecationWarning,
1164 stacklevel=2,
1165 )
1166 if self.cancelled:
1167 callback(self._cancellation_reason)
1168 return
1169 self.errbacks.append(callback)
1171 def callback(self, response):
1172 """Notify all listeners of an incoming response"""
1174 self._latest_response = response
1176 for c in self.callbacks:
1177 c(response)
1179 def error(self, exception):
1180 """Notify registered listeners that the observation went wrong. This
1181 can only be called once."""
1183 if self.errbacks is None:
1184 raise RuntimeError(
1185 "Error raised in an already cancelled ClientObservation"
1186 ) from exception
1187 for c in self.errbacks:
1188 c(exception)
1190 self.cancel()
1191 self._cancellation_reason = exception
1193 def cancel(self):
1194 # FIXME determine whether this is called by anything other than error,
1195 # and make it private so there is always a _cancellation_reason
1196 """Cease to generate observation or error events. This will not
1197 generate an error by itself.
1199 This function is only needed while register_callback and
1200 register_errback are around; once their deprecations are acted on,
1201 dropping the asynchronous iterator will automatically cancel the
1202 observation.
1203 """
1205 assert not self.cancelled, "ClientObservation cancelled twice"
1207 # make sure things go wrong when someone tries to continue this
1208 self.errbacks = None
1209 self.callbacks = None
1211 self.cancelled = True
1212 while self._on_cancel:
1213 self._on_cancel.pop()()
1215 self._cancellation_reason = None
1217 def on_cancel(self, callback):
1218 if self.cancelled:
1219 callback()
1220 self._on_cancel.append(callback)
1222 def __repr__(self):
1223 return "<%s %s at %#x>" % (
1224 type(self).__name__,
1225 "(cancelled)"
1226 if self.cancelled
1227 else "(%s call-, %s errback(s))"
1228 % (len(self.callbacks), len(self.errbacks)),
1229 id(self),
1230 )
1233class ServerObservation:
1234 def __init__(self):
1235 self._accepted = False
1236 self._trigger = asyncio.get_event_loop().create_future()
1237 # A deregistration is "early" if it happens before the response message
1238 # is actually sent; calling deregister() in that time (typically during
1239 # `render()`) will not send an unsuccessful response message but just
1240 # sent this flag which is set to None as soon as it is too late for an
1241 # early deregistration.
1242 # This mechanism is temporary until more of aiocoap behaves like
1243 # Pipe which does not suffer from this limitation.
1244 self._early_deregister = False
1245 self._late_deregister = False
1247 def accept(self, cancellation_callback):
1248 self._accepted = True
1249 self._cancellation_callback = cancellation_callback
1251 def deregister(self, reason=None):
1252 if self._early_deregister is False:
1253 self._early_deregister = True
1254 return
1256 warnings.warn(
1257 "Late use of ServerObservation.deregister() is"
1258 " deprecated, use .trigger with an unsuccessful value"
1259 " instead",
1260 DeprecationWarning,
1261 )
1262 self.trigger(
1263 Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable")
1264 )
1266 def trigger(self, response=None, *, is_last=False):
1267 """Send an updated response; if None is given, the observed resource's
1268 rendering will be invoked to produce one.
1270 `is_last` can be set to True to indicate that no more responses will be
1271 sent. Note that an unsuccessful response will be the last no matter
1272 what is_last says, as such a message always terminates a CoAP
1273 observation."""
1274 if is_last:
1275 self._late_deregister = True
1276 if self._trigger.done():
1277 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy
1278 self._trigger = asyncio.get_event_loop().create_future()
1279 self._trigger.set_result(response)