Coverage for aiocoap / protocol.py: 89%
493 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 10:42 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 10:42 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module contains the classes that are responsible for keeping track of
6messages:
8* :class:`Context` roughly represents the CoAP endpoint (basically a UDP
9 socket) -- something that can send requests and possibly can answer
10 incoming requests.
12 Incoming requests are processed in tasks created by the context.
14* a :class:`Request` gets generated whenever a request gets sent to keep
15 track of the response
17Logging
18~~~~~~~
20Several constructors of the Context accept a logger name; these names go into
21the construction of a Python logger.
23Log events will be emitted to these on different levels, with "warning" and
24above being a practical default for things that should may warrant reviewing by
25an operator:
27* DEBUG is used for things that occur even under perfect conditions.
28* INFO is for things that are well expected, but might be interesting during
29 testing a network of nodes and not just when debugging the library. (This
30 includes timeouts, retransmissions, and pings.)
31* WARNING is for everything that indicates a malbehaved peer. These don't
32 *necessarily* indicate a client bug, though: Things like requesting a
33 nonexistent block can just as well happen when a resource's content has
34 changed between blocks. The library will not go out of its way to determine
35 whether there is a plausible explanation for the odd behavior, and will
36 report something as a warning in case of doubt.
37* ERROR is used when something clearly went wrong. This includes irregular
38 connection terminations and resource handler errors (which are demoted to
39 error responses), and can often contain a backtrace.
41Logs will generally reveal messages exchanged between this and other systems,
42and attackers can observe their encrypted counterparts. Private or shared keys
43are only logged through an internal `log_secret` function, which usually
44replaces them with a redacted value. Setting the ``AIOCOAP_REVEAL_KEYS``
45environment variable to the value ``show secrets in logs`` bypasses that
46mechanism. As an additional precaution, this is only accepted if the effective
47user has write access to the aiocoap source code.
48"""
50import asyncio
51import weakref
52import time
54from . import defaults
55from .credentials import CredentialsMap
56from .message import Message
57from .messagemanager import MessageManager
58from .tokenmanager import TokenManager
59from .pipe import Pipe, run_driving_pipe, error_to_message
60from . import interfaces
61from . import error
62from .numbers import INTERNAL_SERVER_ERROR, NOT_FOUND, CONTINUE, SHUTDOWN_TIMEOUT
63from .transport_params import TransportParameters
65import warnings
66import logging
69class Context(interfaces.RequestProvider):
70 """Applications' entry point to the network
72 A :class:`.Context` coordinates one or more network :mod:`.transports`
73 implementations and dispatches data between them and the application.
75 The application can start requests using the message dispatch methods, and
76 set a :class:`resources.Site` that will answer requests directed to the
77 application as a server.
79 On the library-internals side, it is the prime implementation of the
80 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and
81 :class:`Response` classes on demand, and decides which transport
82 implementations to start and which are to handle which messages.
84 **Context creation and destruction**
86 The following functions are provided for creating and stopping a context:
88 .. note::
90 A typical application should only ever create one context, even (or
91 especially when) it acts both as a server and as a client (in which
92 case a server context should be created).
94 A context that is not used any more must be shut down using
95 :meth:`.shutdown()`, but typical applications will not need to because
96 they use the context for the full process lifetime.
98 .. automethod:: create_client_context
99 .. automethod:: create_server_context
101 .. automethod:: shutdown
103 **Dispatching messages**
105 CoAP requests can be sent using the following functions:
107 .. automethod:: request
109 If more control is needed, you can create a :class:`Request` yourself and
110 pass the context to it.
113 **Other methods and properties**
115 The remaining methods and properties are to be considered unstable even
116 when the project reaches a stable version number; please file a feature
117 request for stabilization if you want to reliably access any of them.
118 """
120 def __init__(
121 self,
122 loop=None,
123 serversite=None,
124 loggername="coap",
125 client_credentials=None,
126 server_credentials=None,
127 ):
128 self.log = logging.getLogger(loggername)
130 self.loop = loop or asyncio.get_event_loop()
132 self.serversite = serversite
134 self.request_interfaces = []
136 self.client_credentials = client_credentials or CredentialsMap()
137 self.server_credentials = server_credentials or CredentialsMap()
139 #
140 # convenience methods for class instantiation
141 #
143 async def _append_tokenmanaged_messagemanaged_transport(
144 self, message_interface_constructor
145 ):
146 tman = TokenManager(self)
147 mman = MessageManager(tman)
148 transport = await message_interface_constructor(mman)
150 mman.message_interface = transport
151 tman.token_interface = mman
153 self.request_interfaces.append(tman)
155 async def _append_tokenmanaged_transport(self, token_interface_constructor):
156 tman = TokenManager(self)
157 transport = await token_interface_constructor(tman)
159 tman.token_interface = transport
161 self.request_interfaces.append(tman)
163 @classmethod
164 async def create_client_context(
165 cls,
166 *,
167 loggername="coap",
168 loop=None,
169 transports: TransportParameters | None | dict | list[str] = None,
170 ):
171 """Create a context bound to all addresses on a random listening port.
173 This is the easiest way to get a context suitable for sending client
174 requests.
176 :meta private:
177 (not actually private, just hiding from automodule due to being
178 grouped with the important functions)
179 """
181 if loop is None:
182 loop = asyncio.get_event_loop()
184 self = cls(loop=loop, serversite=None, loggername=loggername)
186 selected_transports = transports or list(
187 defaults.get_default_clienttransports(loop=loop)
188 )
189 selected_transports = TransportParameters._compat_create(selected_transports)
191 # FIXME make defaults overridable (postponed until they become configurable too)
192 if selected_transports.oscore:
193 from .transports.oscore import TransportOSCORE
195 oscoretransport = TransportOSCORE(self, self)
196 self.request_interfaces.append(oscoretransport)
197 if selected_transports.udp6:
198 from .transports.udp6 import MessageInterfaceUDP6
200 await self._append_tokenmanaged_messagemanaged_transport(
201 lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint(
202 mman, log=self.log, loop=loop
203 )
204 )
205 if selected_transports.simple6:
206 from .transports.simple6 import MessageInterfaceSimple6
208 await self._append_tokenmanaged_messagemanaged_transport(
209 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(
210 mman, log=self.log, loop=loop
211 )
212 )
213 if selected_transports.tinydtls:
214 from .transports.tinydtls import MessageInterfaceTinyDTLS
216 await self._append_tokenmanaged_messagemanaged_transport(
217 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(
218 mman, log=self.log, loop=loop
219 )
220 )
221 if selected_transports.tcpclient:
222 from .transports.tcp import TCPClient
224 await self._append_tokenmanaged_transport(
225 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)
226 )
227 if selected_transports.tlsclient:
228 from .transports.tls import TLSClient
230 await self._append_tokenmanaged_transport(
231 lambda tman: TLSClient.create_client_transport(
232 tman, self.log, loop, self.client_credentials
233 )
234 )
235 if selected_transports.ws:
236 from .transports.ws import WSPool
238 await self._append_tokenmanaged_transport(
239 lambda tman: WSPool.create_transport(
240 tman, self.log, loop, client_credentials=self.client_credentials
241 )
242 )
244 return self
246 @classmethod
247 async def create_server_context(
248 cls,
249 site,
250 bind=None,
251 *,
252 loggername="coap-server",
253 loop=None,
254 _ssl_context=None,
255 multicast=[],
256 server_credentials=None,
257 transports: TransportParameters | None | dict | list[str] = None,
258 ):
259 """Create a context, bound to all addresses on the CoAP port (unless
260 otherwise specified in the ``bind`` argument).
262 This is the easiest way to get a context suitable both for sending
263 client and accepting server requests.
265 The ``bind`` argument, if given, needs to be a 2-tuple of IP address
266 string and port number, where the port number can be None to use the default port.
268 If ``multicast`` is given, it needs to be a list of (multicast address,
269 interface name) tuples, which will all be joined. (The IPv4 style of
270 selecting the interface by a local address is not supported; users may
271 want to use the netifaces package to arrive at an interface name for an
272 address).
274 As a shortcut, the list may also contain interface names alone. Those
275 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with
276 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6.
278 Under some circumstances you may already need a context to pass into
279 the site for creation; this is typically the case for servers that
280 trigger requests on their own. For those cases, it is usually easiest
281 to pass None in as a site, and set the fully constructed site later by
282 assigning to the ``serversite`` attribute.
284 :meta private:
285 (not actually private, just hiding from automodule due to being
286 grouped with the important functions)
287 """
289 if loop is None:
290 loop = asyncio.get_event_loop()
292 self = cls(
293 loop=loop,
294 serversite=site,
295 loggername=loggername,
296 server_credentials=server_credentials,
297 )
299 multicast_done = not multicast
301 selected_transports = transports or list(
302 defaults.get_default_servertransports(loop=loop)
303 )
304 selected_transports = TransportParameters._compat_create(selected_transports)
306 if selected_transports.oscore:
307 from .transports.oscore import TransportOSCORE
309 oscoretransport = TransportOSCORE(self, self)
310 self.request_interfaces.append(oscoretransport)
311 if selected_transports.udp6:
312 from .transports.udp6 import MessageInterfaceUDP6
314 await self._append_tokenmanaged_messagemanaged_transport(
315 lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(
316 mman, log=self.log, loop=loop, bind=bind, multicast=multicast
317 )
318 )
319 multicast_done = True
320 # FIXME this is duplicated from the client version, as those are client-only anyway
321 if selected_transports.simple6:
322 from .transports.simple6 import MessageInterfaceSimple6
324 await self._append_tokenmanaged_messagemanaged_transport(
325 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(
326 mman, log=self.log, loop=loop
327 )
328 )
329 elif selected_transports.tinydtls:
330 from .transports.tinydtls import MessageInterfaceTinyDTLS
332 await self._append_tokenmanaged_messagemanaged_transport(
333 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(
334 mman, log=self.log, loop=loop
335 )
336 )
337 # FIXME end duplication
338 if selected_transports.tinydtls_server:
339 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer
341 await self._append_tokenmanaged_messagemanaged_transport(
342 lambda mman: MessageInterfaceTinyDTLSServer.create_server(
343 bind,
344 mman,
345 log=self.log,
346 loop=loop,
347 server_credentials=self.server_credentials,
348 )
349 )
350 if selected_transports.simplesocketserver:
351 from .transports.simplesocketserver import MessageInterfaceSimpleServer
353 await self._append_tokenmanaged_messagemanaged_transport(
354 lambda mman: MessageInterfaceSimpleServer.create_server(
355 bind, mman, log=self.log, loop=loop
356 )
357 )
358 if selected_transports.tcpserver:
359 from .transports.tcp import TCPServer
361 await self._append_tokenmanaged_transport(
362 lambda tman: TCPServer.create_server(bind, tman, self.log, loop)
363 )
364 if selected_transports.tcpclient:
365 from .transports.tcp import TCPClient
367 await self._append_tokenmanaged_transport(
368 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)
369 )
370 if selected_transports.tlsserver:
371 if _ssl_context is not None:
372 from .transports.tls import TLSServer
374 await self._append_tokenmanaged_transport(
375 lambda tman: TLSServer.create_server(
376 bind, tman, self.log, loop, _ssl_context
377 )
378 )
379 if selected_transports.tlsclient:
380 from .transports.tls import TLSClient
382 await self._append_tokenmanaged_transport(
383 lambda tman: TLSClient.create_client_transport(
384 tman, self.log, loop, self.client_credentials
385 )
386 )
387 if selected_transports.ws:
388 from .transports.ws import WSPool
390 await self._append_tokenmanaged_transport(
391 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind
392 lambda tman: WSPool.create_transport(
393 tman,
394 self.log,
395 loop,
396 client_credentials=self.client_credentials,
397 server_bind=bind or (None, None),
398 server_context=_ssl_context,
399 )
400 )
402 if not multicast_done:
403 self.log.warning(
404 "Multicast was requested, but no multicast capable transport was selected."
405 )
407 # This is used in tests to wait for externally launched servers to be ready
408 self.log.debug("Server ready to receive requests")
410 return self
412 async def shutdown(self):
413 """Take down any listening sockets and stop all related timers.
415 After this coroutine terminates, and once all external references to
416 the object are dropped, it should be garbage-collectable.
418 This method takes up to
419 :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing
420 transports to perform any cleanup implemented in them (such as orderly
421 connection shutdown and cancelling observations, where the latter is
422 currently not implemented).
424 :meta private:
425 (not actually private, just hiding from automodule due to being
426 grouped with the important functions)
427 """
429 self.log.debug("Shutting down context")
431 done, pending = await asyncio.wait(
432 [
433 asyncio.create_task(
434 ri.shutdown(),
435 name="Shutdown of %r" % ri,
436 )
437 for ri in self.request_interfaces
438 ],
439 timeout=SHUTDOWN_TIMEOUT,
440 )
441 for item in done:
442 await item
443 if pending:
444 # Apart from being useful to see, this also ensures that developers
445 # see the error in the logs during test suite runs -- and the error
446 # should be easier to follow than the "we didn't garbage collect
447 # everything" errors we see anyway (or otherwise, if the error is
448 # escalated into a test failure)
449 self.log.error(
450 "Shutdown timeout exceeded, returning anyway. Interfaces still busy: %s",
451 pending,
452 )
454 # FIXME: determine how official this should be, or which part of it is
455 # public -- now that BlockwiseRequest uses it. (And formalize what can
456 # change about messages and what can't after the remote has been thusly
457 # populated).
458 async def find_remote_and_interface(self, message):
459 if message.remote is None:
460 raise error.MissingRemoteError()
461 for ri in self.request_interfaces:
462 if await ri.fill_or_recognize_remote(message):
463 return ri
464 raise error.NoRequestInterface()
466 def request(self, request_message, handle_blockwise=True):
467 if handle_blockwise:
468 return BlockwiseRequest(self, request_message)
470 pipe = Pipe(request_message, self.log)
471 # Request sets up callbacks at creation
472 result = Request(pipe, self.loop, self.log)
474 async def send():
475 try:
476 request_interface = await self.find_remote_and_interface(
477 request_message
478 )
479 request_interface.request(pipe)
480 except Exception as e:
481 pipe.add_exception(e)
482 return
484 self.loop.create_task(
485 send(),
486 name="Request processing of %r" % result,
487 )
488 return result
490 # the following are under consideration for moving into Site or something
491 # mixed into it
493 def render_to_pipe(self, pipe):
494 """Fill a pipe by running the site's render_to_pipe interface and
495 handling errors."""
497 pr_that_can_receive_errors = error_to_message(pipe, self.log)
499 run_driving_pipe(
500 pr_that_can_receive_errors,
501 self._render_to_pipe(pipe),
502 name="Rendering for %r" % pipe.request,
503 )
505 async def _render_to_pipe(self, pipe):
506 if self.serversite is None:
507 pipe.add_response(
508 Message(code=NOT_FOUND, payload=b"not a server"), is_last=True
509 )
510 return
512 return await self.serversite.render_to_pipe(pipe)
515class BaseRequest:
516 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`"""
519class BaseUnicastRequest(BaseRequest):
520 """A utility class that offers the :attr:`response_raising` and
521 :attr:`response_nonraising` alternatives to waiting for the
522 :attr:`response` future whose error states can be presented either as an
523 unsuccessful response (eg. 4.04) or an exception.
525 It also provides some internal tools for handling anything that has a
526 :attr:`response` future and an :attr:`observation`"""
528 @property
529 async def response_raising(self):
530 """An awaitable that returns if a response comes in and is successful,
531 otherwise raises generic network exception or a
532 :class:`.error.ResponseWrappingError` for unsuccessful responses.
534 Experimental Interface."""
536 response = await self.response
537 if not response.code.is_successful():
538 raise error.ResponseWrappingError(response)
540 return response
542 @property
543 async def response_nonraising(self):
544 """An awaitable that rather returns a 500ish fabricated message (as a
545 proxy would return) instead of raising an exception.
547 Experimental Interface."""
549 # FIXME: Can we smuggle error_to_message into the underlying pipe?
550 # That should make observe notifications into messages rather
551 # than exceptions as well, plus it has fallbacks for `e.to_message()`
552 # raising.
554 try:
555 return await self.response
556 except error.RenderableError as e:
557 return e.to_message()
558 except Exception:
559 return Message(code=INTERNAL_SERVER_ERROR)
562class Request(interfaces.Request, BaseUnicastRequest):
563 def __init__(self, pipe, loop, log):
564 self._pipe = pipe
566 self.response = loop.create_future()
568 if pipe.request.opt.observe == 0:
569 self.observation = ClientObservation()
570 else:
571 self.observation = None
573 self._runner = self._run()
574 self._runner.send(None)
576 def process(event):
577 try:
578 # would be great to have self or the runner as weak ref, but
579 # see ClientObservation.register_callback comments -- while
580 # that is around, we can't weakref here.
581 self._runner.send(event)
582 return True
583 except StopIteration:
584 return False
586 self._stop_interest = self._pipe.on_event(process)
588 self.log = log
590 self.response.add_done_callback(self._response_cancellation_handler)
592 def _response_cancellation_handler(self, response):
593 # Propagate cancellation to the runner (if interest in the first
594 # response is lost, there won't be observation items to pull out), but
595 # not general completion (because if it's completed and not cancelled,
596 # eg. when an observation is active)
597 if self.response.cancelled() and self._runner is not None:
598 # Dropping the only reference makes it stop with GeneratorExit,
599 # similar to a cancelled task
600 self._runner = None
601 self._stop_interest()
602 # Otherwise, there will be a runner still around, and it's its task to
603 # call _stop_interest.
605 @staticmethod
606 def _add_response_properties(response, request):
607 response.request = request
609 def _run(self):
610 # FIXME: This is in iterator form because it used to be a task that
611 # awaited futures, and that code could be easily converted to an
612 # iterator. I'm not sure that's a bad state here, but at least it
613 # should be a more conscious decision to make this an iterator rather
614 # than just having it happen to be one.
615 #
616 # FIXME: check that responses come from the same remmote as long as we're assuming unicast
618 first_event = yield None
620 if first_event.message is not None:
621 self._add_response_properties(first_event.message, self._pipe.request)
622 self.response.set_result(first_event.message)
623 else:
624 self.response.set_exception(first_event.exception)
625 if not isinstance(first_event.exception, error.Error):
626 self.log.warning(
627 "An exception that is not an aiocoap Error was raised "
628 "from a transport; please report this as a bug in "
629 "aiocoap: %r",
630 first_event.exception,
631 )
633 if self.observation is None:
634 if not first_event.is_last:
635 self.log.error(
636 "Pipe indicated more possible responses"
637 " while the Request handler would not know what to"
638 " do with them, stopping any further request."
639 )
640 self._stop_interest()
641 return
643 if first_event.is_last:
644 self.observation.error(error.NotObservable())
645 return
647 if first_event.message.opt.observe is None:
648 self.log.error(
649 "Pipe indicated more possible responses"
650 " while the Request handler would not know what to"
651 " do with them, stopping any further request."
652 )
653 self._stop_interest()
654 return
656 # variable names from RFC7641 Section 3.4
657 v1 = first_event.message.opt.observe
658 t1 = time.time()
660 while True:
661 # We don't really support cancellation of observations yet (see
662 # https://github.com/chrysn/aiocoap/issues/92), but at least
663 # stopping the interest is a way to free the local resources after
664 # the first observation update, and to make the MID handler RST the
665 # observation on the next.
666 # FIXME: there *is* now a .on_cancel callback, we should at least
667 # hook into that, and possibly even send a proper cancellation
668 # then.
669 next_event = yield True
670 if self.observation.cancelled:
671 self._stop_interest()
672 return
674 if next_event.exception is not None:
675 self.observation.error(next_event.exception)
676 if not next_event.is_last:
677 self._stop_interest()
678 if not isinstance(next_event.exception, error.Error):
679 self.log.warning(
680 "An exception that is not an aiocoap Error was "
681 "raised from a transport during an observation; "
682 "please report this as a bug in aiocoap: %r",
683 next_event.exception,
684 )
685 return
687 self._add_response_properties(next_event.message, self._pipe.request)
689 if next_event.message.opt.observe is not None:
690 # check for reordering
691 v2 = next_event.message.opt.observe
692 t2 = time.time()
694 is_recent = (
695 (v1 < v2 and v2 - v1 < 2**23)
696 or (v1 > v2 and v1 - v2 > 2**23)
697 or (
698 t2
699 > t1
700 + self._pipe.request.transport_tuning.OBSERVATION_RESET_TIME
701 )
702 )
703 if is_recent:
704 t1 = t2
705 v1 = v2
706 else:
707 # the terminal message is always the last
708 is_recent = True
710 if is_recent:
711 self.observation.callback(next_event.message)
713 if next_event.is_last:
714 self.observation.error(error.ObservationCancelled())
715 return
717 if next_event.message.opt.observe is None:
718 self.observation.error(error.ObservationCancelled())
719 self.log.error(
720 "Pipe indicated more possible responses"
721 " while the Request handler would not know what to"
722 " do with them, stopping any further request."
723 )
724 self._stop_interest()
725 return
728class BlockwiseRequest(BaseUnicastRequest, interfaces.Request):
729 def __init__(self, protocol, app_request):
730 self.protocol = protocol
731 self.log = self.protocol.log.getChild("blockwise-requester")
733 self.response = protocol.loop.create_future()
735 if app_request.opt.observe is not None:
736 self.observation = ClientObservation()
737 else:
738 self.observation = None
740 self._runner = protocol.loop.create_task(
741 self._run_outer(
742 app_request,
743 self.response,
744 weakref.ref(self.observation)
745 if self.observation is not None
746 else lambda: None,
747 self.protocol,
748 self.log,
749 ),
750 name="Blockwise runner for %r" % app_request,
751 )
752 self.response.add_done_callback(self._response_cancellation_handler)
754 def _response_cancellation_handler(self, response_future):
755 # see Request._response_cancellation_handler
756 if self.response.cancelled():
757 self._runner.cancel()
759 @classmethod
760 async def _run_outer(cls, app_request, response, weak_observation, protocol, log):
761 try:
762 await cls._run(app_request, response, weak_observation, protocol, log)
763 except asyncio.CancelledError:
764 pass # results already set
765 except Exception as e:
766 logged = False
767 if not response.done():
768 logged = True
769 response.set_exception(e)
770 obs = weak_observation()
771 if app_request.opt.observe is not None and obs is not None:
772 logged = True
773 obs.error(e)
774 if not logged:
775 # should be unreachable
776 log.error(
777 "Exception in BlockwiseRequest runner neither went to response nor to observation: %s",
778 e,
779 exc_info=e,
780 )
782 # This is a class method because that allows self and self.observation to
783 # be freed even when this task is running, and the task to stop itself --
784 # otherwise we couldn't know when users just "forget" about a request
785 # object after using its response (esp. in observe cases) and leave this
786 # task running.
787 @classmethod
788 async def _run(cls, app_request, response, weak_observation, protocol, log):
789 # we need to populate the remote right away, because the choice of
790 # blocks depends on it.
791 await protocol.find_remote_and_interface(app_request)
793 size_exp = app_request.remote.maximum_block_size_exp
795 if app_request.opt.block1 is not None:
796 warnings.warn(
797 "Setting a block1 option in a managed block-wise transfer is deprecated. Instead, set request.remote.maximum_block_size_exp to the desired value",
798 DeprecationWarning,
799 stacklevel=2,
800 )
801 assert app_request.opt.block1.block_number == 0, (
802 "Unexpected block number in app_request"
803 )
804 assert not app_request.opt.block1.more, (
805 "Unexpected more-flag in app_request"
806 )
807 # this is where the library user can traditionally pass in size
808 # exponent hints into the library.
809 size_exp = app_request.opt.block1.size_exponent
811 # Offset in the message in blocks of size_exp. Whoever changes size_exp
812 # is responsible for updating this number.
813 block_cursor = 0
815 while True:
816 # ... send a chunk
818 if size_exp >= 6:
819 # FIXME from maximum_payload_size
820 fragmentation_threshold = app_request.remote.maximum_payload_size
821 else:
822 fragmentation_threshold = 2 ** (size_exp + 4)
824 if (
825 app_request.opt.block1 is not None
826 or len(app_request.payload) > fragmentation_threshold
827 ):
828 current_block1 = app_request._extract_block(
829 block_cursor, size_exp, app_request.remote.maximum_payload_size
830 )
831 if block_cursor == 0:
832 current_block1.opt.size1 = len(app_request.payload)
833 else:
834 current_block1 = app_request
836 blockrequest = protocol.request(current_block1, handle_blockwise=False)
837 blockresponse = await blockrequest.response
839 # store for future blocks to ensure that the next blocks will be
840 # sent from the same source address (in the UDP case; for many
841 # other transports it won't matter). carrying along locally set block size limitation
842 if (
843 app_request.remote.maximum_block_size_exp
844 < blockresponse.remote.maximum_block_size_exp
845 ):
846 blockresponse.remote.maximum_block_size_exp = (
847 app_request.remote.maximum_block_size_exp
848 )
849 app_request.remote = blockresponse.remote
851 if blockresponse.opt.block1 is None:
852 if blockresponse.code.is_successful() and current_block1.opt.block1:
853 log.warning(
854 "Block1 option completely ignored by server, assuming it knows what it is doing."
855 )
856 # FIXME: handle 4.13 and retry with the indicated size option
857 break
859 block1 = blockresponse.opt.block1
860 log.debug(
861 "Response with Block1 option received, number = %d, more = %d, size_exp = %d.",
862 block1.block_number,
863 block1.more,
864 block1.size_exponent,
865 )
867 if block1.block_number != current_block1.opt.block1.block_number:
868 raise error.UnexpectedBlock1Option("Block number mismatch")
870 if size_exp == 7:
871 block_cursor += len(current_block1.payload) // 1024
872 else:
873 block_cursor += 1
875 while block1.size_exponent < size_exp:
876 block_cursor *= 2
877 size_exp -= 1
879 if not current_block1.opt.block1.more:
880 if block1.more or blockresponse.code == CONTINUE:
881 # treating this as a protocol error -- letting it slip
882 # through would misrepresent the whole operation as an
883 # over-all 2.xx (successful) one.
884 raise error.UnexpectedBlock1Option(
885 "Server asked for more data at end of body"
886 )
887 break
889 # checks before preparing the next round:
891 if blockresponse.opt.observe:
892 # we're not *really* interested in that block, we just sent an
893 # observe option to indicate that we'll want to observe the
894 # resulting representation as a whole
895 log.warning(
896 "Server answered Observe in early Block1 phase, cancelling the erroneous observation."
897 )
898 blockrequest.observe.cancel()
900 if block1.more:
901 # FIXME i think my own server is dowing this wrong
902 # if response.code != CONTINUE:
903 # raise error.UnexpectedBlock1Option("more-flag set but no Continue")
904 pass
905 else:
906 if not blockresponse.code.is_successful():
907 break
908 else:
909 # ignoring (discarding) the successful intermediate result, waiting for a final one
910 continue
912 lower_observation = None
913 if app_request.opt.observe is not None:
914 if blockresponse.opt.observe is not None:
915 lower_observation = blockrequest.observation
916 else:
917 obs = weak_observation()
918 if obs:
919 obs.error(error.NotObservable())
920 del obs
922 assert blockresponse is not None, "Block1 loop broke without setting a response"
923 blockresponse.opt.block1 = None
925 # FIXME check with RFC7959: it just says "send requests similar to the
926 # requests in the Block1 phase", what does that mean? using the last
927 # block1 as a reference for now, especially because in the
928 # only-one-request-block case, that's the original request we must send
929 # again and again anyway
930 assembled_response = await cls._complete_by_requesting_block2(
931 protocol, current_block1, blockresponse, log
932 )
934 response.set_result(assembled_response)
935 # finally set the result
937 if lower_observation is not None:
938 # FIXME this can all be simplified a lot since it's no more
939 # expected that observations shut themselves down when GC'd.
940 obs = weak_observation()
941 del weak_observation
942 if obs is None:
943 lower_observation.cancel()
944 return
945 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask
946 subtask = asyncio.create_task(
947 cls._run_observation(
948 app_request,
949 lower_observation,
950 future_weak_observation,
951 protocol,
952 log,
953 ),
954 name="Blockwise observation for %r" % app_request,
955 )
956 future_weak_observation.set_result(
957 weakref.ref(obs, lambda obs: subtask.cancel())
958 )
959 obs.on_cancel(subtask.cancel)
960 del obs
961 await subtask
963 @classmethod
964 async def _run_observation(
965 cls, original_request, lower_observation, future_weak_observation, protocol, log
966 ):
967 weak_observation = await future_weak_observation
968 # we can use weak_observation() here at any time, because whenever that
969 # becomes None, this task gets cancelled
970 try:
971 async for block1_notification in lower_observation:
972 log.debug("Notification received")
973 full_notification = await cls._complete_by_requesting_block2(
974 protocol, original_request, block1_notification, log
975 )
976 log.debug("Reporting completed notification")
977 weak_observation().callback(full_notification)
978 # FIXME verify that this loop actually ends iff the observation
979 # was cancelled -- otherwise find out the cause(s) or make it not
980 # cancel under indistinguishable circumstances
981 weak_observation().error(error.ObservationCancelled())
982 except asyncio.CancelledError:
983 return
984 except Exception as e:
985 weak_observation().error(e)
986 finally:
987 # We generally avoid idempotent cancellation, but we may have
988 # reached this point either due to an earlier cancellation or
989 # without one
990 if not lower_observation.cancelled:
991 lower_observation.cancel()
993 @classmethod
994 async def _complete_by_requesting_block2(
995 cls, protocol, request_to_repeat, initial_response, log
996 ):
997 # FIXME this can probably be deduplicated against BlockwiseRequest
999 if (
1000 initial_response.opt.block2 is None
1001 or initial_response.opt.block2.more is False
1002 ):
1003 initial_response.opt.block2 = None
1004 return initial_response
1006 if initial_response.opt.block2.block_number != 0:
1007 log.error("Error assembling blockwise response (expected first block)")
1008 raise error.UnexpectedBlock2()
1010 assembled_response = initial_response
1011 last_response = initial_response
1012 while True:
1013 current_block2 = request_to_repeat._generate_next_block2_request(
1014 assembled_response
1015 )
1017 current_block2 = current_block2.copy(remote=initial_response.remote)
1019 blockrequest = protocol.request(current_block2, handle_blockwise=False)
1020 last_response = await blockrequest.response
1022 if last_response.opt.block2 is None:
1023 log.warning(
1024 "Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response."
1025 )
1026 return last_response
1028 block2 = last_response.opt.block2
1029 log.debug(
1030 "Response with Block2 option received, number = %d, more = %d, size_exp = %d.",
1031 block2.block_number,
1032 block2.more,
1033 block2.size_exponent,
1034 )
1035 try:
1036 assembled_response._append_response_block(last_response)
1037 except error.Error as e:
1038 log.error("Error assembling blockwise response, passing on error %r", e)
1039 raise
1041 if block2.more is False:
1042 return assembled_response
1045class ClientObservation:
1046 """An interface to observe notification updates arriving on a request.
1048 This class does not actually provide any of the observe functionality, it
1049 is purely a container for dispatching the messages via asynchronous
1050 iteration. It gets driven (ie. populated with responses or errors including
1051 observation termination) by a Request object.
1052 """
1054 def __init__(self):
1055 self.callbacks = []
1056 self.errbacks = []
1058 self.cancelled = False
1059 self._on_cancel = []
1061 self._latest_response = None
1062 # the analogous error is stored in _cancellation_reason when cancelled.
1064 def __aiter__(self):
1065 """`async for` interface to observations.
1067 This is the preferred interface to obtaining observations."""
1068 it = self._Iterator()
1069 self.register_callback(it.push, _suppress_deprecation=True)
1070 self.register_errback(it.push_err, _suppress_deprecation=True)
1071 return it
1073 class _Iterator:
1074 def __init__(self):
1075 self._future = asyncio.get_event_loop().create_future()
1077 def push(self, item):
1078 if self._future.done():
1079 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy
1080 self._future = asyncio.get_event_loop().create_future()
1081 self._future.set_result(item)
1083 def push_err(self, e):
1084 if self._future.done():
1085 self._future = asyncio.get_event_loop().create_future()
1086 self._future.set_exception(e)
1088 async def __anext__(self):
1089 f = self._future
1090 try:
1091 result = await self._future
1092 # FIXME see `await servobs._trigger` comment: might waiting for
1093 # the original future not yield the first future's result when
1094 # a quick second future comes in in a push?
1095 if f is self._future:
1096 self._future = asyncio.get_event_loop().create_future()
1097 return result
1098 except (error.NotObservable, error.ObservationCancelled):
1099 # only exit cleanly when the server -- right away or later --
1100 # states that the resource is not observable any more
1101 # FIXME: check whether an unsuccessful message is still passed
1102 # as an observation result (or whether it should be)
1103 raise StopAsyncIteration
1105 def __del__(self):
1106 if self._future.done():
1107 try:
1108 # Fetch the result so any errors show up at least in the
1109 # finalizer output
1110 self._future.result()
1111 except (error.ObservationCancelled, error.NotObservable):
1112 # This is the case at the end of an observation cancelled
1113 # by the server.
1114 pass
1115 except error.NetworkError:
1116 # This will already have shown up in the main result too.
1117 pass
1118 except (error.LibraryShutdown, asyncio.CancelledError):
1119 pass
1120 # Anything else flying out of this is unexpected and probably a
1121 # library error
1123 # When this function is removed, we can finally do cleanup better. Right
1124 # now, someone could register a callback that doesn't hold any references,
1125 # so we can't just stop the request when nobody holds a reference to this
1126 # any more. Once we're all in pull mode, we can make the `process` function
1127 # that sends data in here use a weak reference (because any possible
1128 # recipient would need to hold a reference to self or the iterator, and
1129 # thus _run).
1130 def register_callback(self, callback, _suppress_deprecation=False):
1131 """Call the callback whenever a response to the message comes in, and
1132 pass the response to it.
1134 The use of this function is deprecated: Use the asynchronous iteration
1135 interface instead."""
1136 if not _suppress_deprecation:
1137 warnings.warn(
1138 "register_callback on observe results is deprecated: Use `async for notify in request.observation` instead.",
1139 DeprecationWarning,
1140 stacklevel=2,
1141 )
1142 if self.cancelled:
1143 return
1145 self.callbacks.append(callback)
1146 if self._latest_response is not None:
1147 callback(self._latest_response)
1149 def register_errback(self, callback, _suppress_deprecation=False):
1150 """Call the callback whenever something goes wrong with the
1151 observation, and pass an exception to the callback. After such a
1152 callback is called, no more callbacks will be issued.
1154 The use of this function is deprecated: Use the asynchronous iteration
1155 interface instead."""
1156 if not _suppress_deprecation:
1157 warnings.warn(
1158 "register_errback on observe results is deprecated: Use `async for notify in request.observation` instead.",
1159 DeprecationWarning,
1160 stacklevel=2,
1161 )
1162 if self.cancelled:
1163 callback(self._cancellation_reason)
1164 return
1165 self.errbacks.append(callback)
1167 def callback(self, response):
1168 """Notify all listeners of an incoming response"""
1170 self._latest_response = response
1172 for c in self.callbacks:
1173 c(response)
1175 def error(self, exception):
1176 """Notify registered listeners that the observation went wrong. This
1177 can only be called once."""
1179 if self.errbacks is None:
1180 raise RuntimeError(
1181 "Error raised in an already cancelled ClientObservation"
1182 ) from exception
1183 for c in self.errbacks:
1184 c(exception)
1186 self.cancel()
1187 self._cancellation_reason = exception
1189 def cancel(self):
1190 # FIXME determine whether this is called by anything other than error,
1191 # and make it private so there is always a _cancellation_reason
1192 """Cease to generate observation or error events. This will not
1193 generate an error by itself.
1195 This function is only needed while register_callback and
1196 register_errback are around; once their deprecations are acted on,
1197 dropping the asynchronous iterator will automatically cancel the
1198 observation.
1199 """
1201 assert not self.cancelled, "ClientObservation cancelled twice"
1203 # make sure things go wrong when someone tries to continue this
1204 self.errbacks = None
1205 self.callbacks = None
1207 self.cancelled = True
1208 while self._on_cancel:
1209 self._on_cancel.pop()()
1211 self._cancellation_reason = None
1213 def on_cancel(self, callback):
1214 if self.cancelled:
1215 callback()
1216 self._on_cancel.append(callback)
1218 def __repr__(self):
1219 return "<%s %s at %#x>" % (
1220 type(self).__name__,
1221 "(cancelled)"
1222 if self.cancelled
1223 else "(%s call-, %s errback(s))"
1224 % (len(self.callbacks), len(self.errbacks)),
1225 id(self),
1226 )
1229class ServerObservation:
1230 def __init__(self):
1231 self._accepted = False
1232 self._trigger = asyncio.get_event_loop().create_future()
1233 # A deregistration is "early" if it happens before the response message
1234 # is actually sent; calling deregister() in that time (typically during
1235 # `render()`) will not send an unsuccessful response message but just
1236 # sent this flag which is set to None as soon as it is too late for an
1237 # early deregistration.
1238 # This mechanism is temporary until more of aiocoap behaves like
1239 # Pipe which does not suffer from this limitation.
1240 self._early_deregister = False
1241 self._late_deregister = False
1243 def accept(self, cancellation_callback):
1244 self._accepted = True
1245 self._cancellation_callback = cancellation_callback
1247 def deregister(self, reason=None):
1248 if self._early_deregister is False:
1249 self._early_deregister = True
1250 return
1252 warnings.warn(
1253 "Late use of ServerObservation.deregister() is"
1254 " deprecated, use .trigger with an unsuccessful value"
1255 " instead",
1256 DeprecationWarning,
1257 )
1258 self.trigger(
1259 Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable")
1260 )
1262 def trigger(self, response=None, *, is_last=False):
1263 """Send an updated response; if None is given, the observed resource's
1264 rendering will be invoked to produce one.
1266 `is_last` can be set to True to indicate that no more responses will be
1267 sent. Note that an unsuccessful response will be the last no matter
1268 what is_last says, as such a message always terminates a CoAP
1269 observation."""
1270 if is_last:
1271 self._late_deregister = True
1272 if self._trigger.done():
1273 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy
1274 self._trigger = asyncio.get_event_loop().create_future()
1275 self._trigger.set_result(response)