Coverage for src/aiocoap/protocol.py: 0%
495 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-12 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-12 11:18 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module contains the classes that are responsible for keeping track of
6messages:
8* :class:`Context` roughly represents the CoAP endpoint (basically a UDP
9 socket) -- something that can send requests and possibly can answer
10 incoming requests.
12 Incoming requests are processed in tasks created by the context.
14* a :class:`Request` gets generated whenever a request gets sent to keep
15 track of the response
17Logging
18~~~~~~~
20Several constructors of the Context accept a logger name; these names go into
21the construction of a Python logger.
23Log events will be emitted to these on different levels, with "warning" and
24above being a practical default for things that should may warrant reviewing by
25an operator:
27* DEBUG is used for things that occur even under perfect conditions.
28* INFO is for things that are well expected, but might be interesting during
29 testing a network of nodes and not just when debugging the library. (This
30 includes timeouts, retransmissions, and pings.)
31* WARNING is for everything that indicates a malbehaved peer. These don't
32 *necessarily* indicate a client bug, though: Things like requesting a
33 nonexistent block can just as well happen when a resource's content has
34 changed between blocks. The library will not go out of its way to determine
35 whether there is a plausible explanation for the odd behavior, and will
36 report something as a warning in case of doubt.
37* ERROR is used when something clearly went wrong. This includes irregular
38 connection terminations and resource handler errors (which are demoted to
39 error responses), and can often contain a backtrace.
41Logs will generally reveal messages exchanged between this and other systems,
42and attackers can observe their encrypted counterparts. Private or shared keys
43are only logged through an internal `log_secret` function, which usually
44replaces them with a redacted value. Setting the ``AIOCOAP_REVEAL_KEYS``
45environment variable to the value ``show secrets in logs`` bypasses that
46mechanism. As an additional precaution, this is only accepted if the effective
47user has write access to the aiocoap source code.
48"""
50import asyncio
51import weakref
52import time
53from typing import Optional, List
55from . import defaults
56from .credentials import CredentialsMap
57from .message import Message
58from .messagemanager import MessageManager
59from .tokenmanager import TokenManager
60from .pipe import Pipe, run_driving_pipe, error_to_message
61from . import interfaces
62from . import error
63from .numbers import INTERNAL_SERVER_ERROR, NOT_FOUND, CONTINUE, SHUTDOWN_TIMEOUT
65import warnings
66import logging
69class Context(interfaces.RequestProvider):
70 """Applications' entry point to the network
72 A :class:`.Context` coordinates one or more network :mod:`.transports`
73 implementations and dispatches data between them and the application.
75 The application can start requests using the message dispatch methods, and
76 set a :class:`resources.Site` that will answer requests directed to the
77 application as a server.
79 On the library-internals side, it is the prime implementation of the
80 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and
81 :class:`Response` classes on demand, and decides which transport
82 implementations to start and which are to handle which messages.
84 **Context creation and destruction**
86 The following functions are provided for creating and stopping a context:
88 .. note::
90 A typical application should only ever create one context, even (or
91 especially when) it acts both as a server and as a client (in which
92 case a server context should be created).
94 A context that is not used any more must be shut down using
95 :meth:`.shutdown()`, but typical applications will not need to because
96 they use the context for the full process lifetime.
98 .. automethod:: create_client_context
99 .. automethod:: create_server_context
101 .. automethod:: shutdown
103 **Dispatching messages**
105 CoAP requests can be sent using the following functions:
107 .. automethod:: request
109 If more control is needed, you can create a :class:`Request` yourself and
110 pass the context to it.
113 **Other methods and properties**
115 The remaining methods and properties are to be considered unstable even
116 when the project reaches a stable version number; please file a feature
117 request for stabilization if you want to reliably access any of them.
118 """
120 def __init__(
121 self,
122 loop=None,
123 serversite=None,
124 loggername="coap",
125 client_credentials=None,
126 server_credentials=None,
127 ):
128 self.log = logging.getLogger(loggername)
130 self.loop = loop or asyncio.get_event_loop()
132 self.serversite = serversite
134 self.request_interfaces = []
136 self.client_credentials = client_credentials or CredentialsMap()
137 self.server_credentials = server_credentials or CredentialsMap()
139 #
140 # convenience methods for class instanciation
141 #
143 async def _append_tokenmanaged_messagemanaged_transport(
144 self, message_interface_constructor
145 ):
146 tman = TokenManager(self)
147 mman = MessageManager(tman)
148 transport = await message_interface_constructor(mman)
150 mman.message_interface = transport
151 tman.token_interface = mman
153 self.request_interfaces.append(tman)
155 async def _append_tokenmanaged_transport(self, token_interface_constructor):
156 tman = TokenManager(self)
157 transport = await token_interface_constructor(tman)
159 tman.token_interface = transport
161 self.request_interfaces.append(tman)
163 @classmethod
164 async def create_client_context(
165 cls, *, loggername="coap", loop=None, transports: Optional[List[str]] = None
166 ):
167 """Create a context bound to all addresses on a random listening port.
169 This is the easiest way to get a context suitable for sending client
170 requests.
172 :meta private:
173 (not actually private, just hiding from automodule due to being
174 grouped with the important functions)
175 """
177 if loop is None:
178 loop = asyncio.get_event_loop()
180 self = cls(loop=loop, serversite=None, loggername=loggername)
182 selected_transports = transports or defaults.get_default_clienttransports(
183 loop=loop
184 )
186 # FIXME make defaults overridable (postponed until they become configurable too)
187 for transportname in selected_transports:
188 if transportname == "udp6":
189 from .transports.udp6 import MessageInterfaceUDP6
191 await self._append_tokenmanaged_messagemanaged_transport(
192 lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint(
193 mman, log=self.log, loop=loop
194 )
195 )
196 elif transportname == "simple6":
197 from .transports.simple6 import MessageInterfaceSimple6
199 await self._append_tokenmanaged_messagemanaged_transport(
200 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(
201 mman, log=self.log, loop=loop
202 )
203 )
204 elif transportname == "tinydtls":
205 from .transports.tinydtls import MessageInterfaceTinyDTLS
207 await self._append_tokenmanaged_messagemanaged_transport(
208 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(
209 mman, log=self.log, loop=loop
210 )
211 )
212 elif transportname == "tcpclient":
213 from .transports.tcp import TCPClient
215 await self._append_tokenmanaged_transport(
216 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)
217 )
218 elif transportname == "tlsclient":
219 from .transports.tls import TLSClient
221 await self._append_tokenmanaged_transport(
222 lambda tman: TLSClient.create_client_transport(
223 tman, self.log, loop, self.client_credentials
224 )
225 )
226 elif transportname == "ws":
227 from .transports.ws import WSPool
229 await self._append_tokenmanaged_transport(
230 lambda tman: WSPool.create_transport(
231 tman, self.log, loop, client_credentials=self.client_credentials
232 )
233 )
234 elif transportname == "oscore":
235 from .transports.oscore import TransportOSCORE
237 oscoretransport = TransportOSCORE(self, self)
238 self.request_interfaces.append(oscoretransport)
239 else:
240 raise RuntimeError(
241 "Transport %r not know for client context creation" % transportname
242 )
244 return self
246 @classmethod
247 async def create_server_context(
248 cls,
249 site,
250 bind=None,
251 *,
252 loggername="coap-server",
253 loop=None,
254 _ssl_context=None,
255 multicast=[],
256 server_credentials=None,
257 transports: Optional[List[str]] = None,
258 ):
259 """Create a context, bound to all addresses on the CoAP port (unless
260 otherwise specified in the ``bind`` argument).
262 This is the easiest way to get a context suitable both for sending
263 client and accepting server requests.
265 The ``bind`` argument, if given, needs to be a 2-tuple of IP address
266 string and port number, where the port number can be None to use the default port.
268 If ``multicast`` is given, it needs to be a list of (multicast address,
269 interface name) tuples, which will all be joined. (The IPv4 style of
270 selecting the interface by a local address is not supported; users may
271 want to use the netifaces package to arrive at an interface name for an
272 address).
274 As a shortcut, the list may also contain interface names alone. Those
275 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with
276 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6.
278 Under some circumstances you may already need a context to pass into
279 the site for creation; this is typically the case for servers that
280 trigger requests on their own. For those cases, it is usually easiest
281 to pass None in as a site, and set the fully constructed site later by
282 assigning to the ``serversite`` attribute.
284 :meta private:
285 (not actually private, just hiding from automodule due to being
286 grouped with the important functions)
287 """
289 if loop is None:
290 loop = asyncio.get_event_loop()
292 self = cls(
293 loop=loop,
294 serversite=site,
295 loggername=loggername,
296 server_credentials=server_credentials,
297 )
299 multicast_done = not multicast
301 selected_transports = transports or defaults.get_default_servertransports(
302 loop=loop
303 )
305 for transportname in selected_transports:
306 if transportname == "udp6":
307 from .transports.udp6 import MessageInterfaceUDP6
309 await self._append_tokenmanaged_messagemanaged_transport(
310 lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(
311 mman, log=self.log, loop=loop, bind=bind, multicast=multicast
312 )
313 )
314 multicast_done = True
315 # FIXME this is duplicated from the client version, as those are client-only anyway
316 elif transportname == "simple6":
317 from .transports.simple6 import MessageInterfaceSimple6
319 await self._append_tokenmanaged_messagemanaged_transport(
320 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(
321 mman, log=self.log, loop=loop
322 )
323 )
324 elif transportname == "tinydtls":
325 from .transports.tinydtls import MessageInterfaceTinyDTLS
327 await self._append_tokenmanaged_messagemanaged_transport(
328 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(
329 mman, log=self.log, loop=loop
330 )
331 )
332 # FIXME end duplication
333 elif transportname == "tinydtls_server":
334 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer
336 await self._append_tokenmanaged_messagemanaged_transport(
337 lambda mman: MessageInterfaceTinyDTLSServer.create_server(
338 bind,
339 mman,
340 log=self.log,
341 loop=loop,
342 server_credentials=self.server_credentials,
343 )
344 )
345 elif transportname == "simplesocketserver":
346 from .transports.simplesocketserver import MessageInterfaceSimpleServer
348 await self._append_tokenmanaged_messagemanaged_transport(
349 lambda mman: MessageInterfaceSimpleServer.create_server(
350 bind, mman, log=self.log, loop=loop
351 )
352 )
353 elif transportname == "tcpserver":
354 from .transports.tcp import TCPServer
356 await self._append_tokenmanaged_transport(
357 lambda tman: TCPServer.create_server(bind, tman, self.log, loop)
358 )
359 elif transportname == "tcpclient":
360 from .transports.tcp import TCPClient
362 await self._append_tokenmanaged_transport(
363 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)
364 )
365 elif transportname == "tlsserver":
366 if _ssl_context is not None:
367 from .transports.tls import TLSServer
369 await self._append_tokenmanaged_transport(
370 lambda tman: TLSServer.create_server(
371 bind, tman, self.log, loop, _ssl_context
372 )
373 )
374 elif transportname == "tlsclient":
375 from .transports.tls import TLSClient
377 await self._append_tokenmanaged_transport(
378 lambda tman: TLSClient.create_client_transport(
379 tman, self.log, loop, self.client_credentials
380 )
381 )
382 elif transportname == "ws":
383 from .transports.ws import WSPool
385 await self._append_tokenmanaged_transport(
386 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind
387 lambda tman: WSPool.create_transport(
388 tman,
389 self.log,
390 loop,
391 client_credentials=self.client_credentials,
392 server_bind=bind or (None, None),
393 server_context=_ssl_context,
394 )
395 )
396 elif transportname == "oscore":
397 from .transports.oscore import TransportOSCORE
399 oscoretransport = TransportOSCORE(self, self)
400 self.request_interfaces.append(oscoretransport)
401 else:
402 raise RuntimeError(
403 "Transport %r not know for server context creation" % transportname
404 )
406 if not multicast_done:
407 self.log.warning(
408 "Multicast was requested, but no multicast capable transport was selected."
409 )
411 # This is used in tests to wait for externally launched servers to be ready
412 self.log.debug("Server ready to receive requests")
414 return self
416 async def shutdown(self):
417 """Take down any listening sockets and stop all related timers.
419 After this coroutine terminates, and once all external references to
420 the object are dropped, it should be garbage-collectable.
422 This method takes up to
423 :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing
424 transports to perform any cleanup implemented in them (such as orderly
425 connection shutdown and cancelling observations, where the latter is
426 currently not implemented).
428 :meta private:
429 (not actually private, just hiding from automodule due to being
430 grouped with the important functions)
431 """
433 self.log.debug("Shutting down context")
435 done, pending = await asyncio.wait(
436 [
437 asyncio.create_task(
438 ri.shutdown(),
439 name="Shutdown of %r" % ri,
440 )
441 for ri in self.request_interfaces
442 ],
443 timeout=SHUTDOWN_TIMEOUT,
444 )
445 for item in done:
446 await item
447 if pending:
448 # Apart from being useful to see, this also ensures that developers
449 # see the error in the logs during test suite runs -- and the error
450 # should be easier to follow than the "we didn't garbage collect
451 # everything" errors we see anyway (or otherwise, if the error is
452 # escalated into a test failure)
453 self.log.error(
454 "Shutdown timeout exceeded, returning anyway. Interfaces still busy: %s",
455 pending,
456 )
458 # FIXME: determine how official this should be, or which part of it is
459 # public -- now that BlockwiseRequest uses it. (And formalize what can
460 # change about messages and what can't after the remote has been thusly
461 # populated).
462 async def find_remote_and_interface(self, message):
463 if message.remote is None:
464 raise error.MissingRemoteError()
465 for ri in self.request_interfaces:
466 if await ri.fill_or_recognize_remote(message):
467 return ri
468 raise RuntimeError("No request interface could route message")
470 def request(self, request_message, handle_blockwise=True):
471 if handle_blockwise:
472 return BlockwiseRequest(self, request_message)
474 pipe = Pipe(request_message, self.log)
475 # Request sets up callbacks at creation
476 result = Request(pipe, self.loop, self.log)
478 async def send():
479 try:
480 request_interface = await self.find_remote_and_interface(
481 request_message
482 )
483 request_interface.request(pipe)
484 except Exception as e:
485 pipe.add_exception(e)
486 return
488 self.loop.create_task(
489 send(),
490 name="Request processing of %r" % result,
491 )
492 return result
494 # the following are under consideration for moving into Site or something
495 # mixed into it
497 def render_to_pipe(self, pipe):
498 """Fill a pipe by running the site's render_to_pipe interface and
499 handling errors."""
501 pr_that_can_receive_errors = error_to_message(pipe, self.log)
503 run_driving_pipe(
504 pr_that_can_receive_errors,
505 self._render_to_pipe(pipe),
506 name="Rendering for %r" % pipe.request,
507 )
509 async def _render_to_pipe(self, pipe):
510 if self.serversite is None:
511 pipe.add_response(
512 Message(code=NOT_FOUND, payload=b"not a server"), is_last=True
513 )
514 return
516 return await self.serversite.render_to_pipe(pipe)
519class BaseRequest(object):
520 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`"""
523class BaseUnicastRequest(BaseRequest):
524 """A utility class that offers the :attr:`response_raising` and
525 :attr:`response_nonraising` alternatives to waiting for the
526 :attr:`response` future whose error states can be presented either as an
527 unsuccessful response (eg. 4.04) or an exception.
529 It also provides some internal tools for handling anything that has a
530 :attr:`response` future and an :attr:`observation`"""
532 @property
533 async def response_raising(self):
534 """An awaitable that returns if a response comes in and is successful,
535 otherwise raises generic network exception or a
536 :class:`.error.ResponseWrappingError` for unsuccessful responses.
538 Experimental Interface."""
540 response = await self.response
541 if not response.code.is_successful():
542 raise error.ResponseWrappingError(response)
544 return response
546 @property
547 async def response_nonraising(self):
548 """An awaitable that rather returns a 500ish fabricated message (as a
549 proxy would return) instead of raising an exception.
551 Experimental Interface."""
553 # FIXME: Can we smuggle error_to_message into the underlying pipe?
554 # That should make observe notifications into messages rather
555 # than exceptions as well, plus it has fallbacks for `e.to_message()`
556 # raising.
558 try:
559 return await self.response
560 except error.RenderableError as e:
561 return e.to_message()
562 except Exception:
563 return Message(code=INTERNAL_SERVER_ERROR)
566class Request(interfaces.Request, BaseUnicastRequest):
567 # FIXME: Implement timing out with REQUEST_TIMEOUT here
569 def __init__(self, pipe, loop, log):
570 self._pipe = pipe
572 self.response = loop.create_future()
574 if pipe.request.opt.observe == 0:
575 self.observation = ClientObservation()
576 else:
577 self.observation = None
579 self._runner = self._run()
580 self._runner.send(None)
582 def process(event):
583 try:
584 # would be great to have self or the runner as weak ref, but
585 # see ClientObservation.register_callback comments -- while
586 # that is around, we can't weakref here.
587 self._runner.send(event)
588 return True
589 except StopIteration:
590 return False
592 self._stop_interest = self._pipe.on_event(process)
594 self.log = log
596 self.response.add_done_callback(self._response_cancellation_handler)
598 def _response_cancellation_handler(self, response):
599 # Propagate cancellation to the runner (if interest in the first
600 # response is lost, there won't be observation items to pull out), but
601 # not general completion (because if it's completed and not cancelled,
602 # eg. when an observation is active)
603 if self.response.cancelled() and self._runner is not None:
604 # Dropping the only reference makes it stop with GeneratorExit,
605 # similar to a cancelled task
606 self._runner = None
607 self._stop_interest()
608 # Otherwise, there will be a runner still around, and it's its task to
609 # call _stop_interest.
611 @staticmethod
612 def _add_response_properties(response, request):
613 response.request = request
615 def _run(self):
616 # FIXME: This is in iterator form because it used to be a task that
617 # awaited futures, and that code could be easily converted to an
618 # iterator. I'm not sure that's a bad state here, but at least it
619 # should be a more conscious decision to make this an iterator rather
620 # than just having it happen to be one.
621 #
622 # FIXME: check that responses come from the same remmote as long as we're assuming unicast
624 first_event = yield None
626 if first_event.message is not None:
627 self._add_response_properties(first_event.message, self._pipe.request)
628 self.response.set_result(first_event.message)
629 else:
630 self.response.set_exception(first_event.exception)
631 if not isinstance(first_event.exception, error.Error):
632 self.log.warning(
633 "An exception that is not an aiocoap Error was raised "
634 "from a transport; please report this as a bug in "
635 "aiocoap: %r",
636 first_event.exception,
637 )
639 if self.observation is None:
640 if not first_event.is_last:
641 self.log.error(
642 "Pipe indicated more possible responses"
643 " while the Request handler would not know what to"
644 " do with them, stopping any further request."
645 )
646 self._stop_interest()
647 return
649 if first_event.is_last:
650 self.observation.error(error.NotObservable())
651 return
653 if first_event.message.opt.observe is None:
654 self.log.error(
655 "Pipe indicated more possible responses"
656 " while the Request handler would not know what to"
657 " do with them, stopping any further request."
658 )
659 self._stop_interest()
660 return
662 # variable names from RFC7641 Section 3.4
663 v1 = first_event.message.opt.observe
664 t1 = time.time()
666 while True:
667 # We don't really support cancellation of observations yet (see
668 # https://github.com/chrysn/aiocoap/issues/92), but at least
669 # stopping the interest is a way to free the local resources after
670 # the first observation update, and to make the MID handler RST the
671 # observation on the next.
672 # FIXME: there *is* now a .on_cancel callback, we should at least
673 # hook into that, and possibly even send a proper cancellation
674 # then.
675 next_event = yield True
676 if self.observation.cancelled:
677 self._stop_interest()
678 return
680 if next_event.exception is not None:
681 self.observation.error(next_event.exception)
682 if not next_event.is_last:
683 self._stop_interest()
684 if not isinstance(next_event.exception, error.Error):
685 self.log.warning(
686 "An exception that is not an aiocoap Error was "
687 "raised from a transport during an observation; "
688 "please report this as a bug in aiocoap: %r",
689 next_event.exception,
690 )
691 return
693 self._add_response_properties(next_event.message, self._pipe.request)
695 if next_event.message.opt.observe is not None:
696 # check for reordering
697 v2 = next_event.message.opt.observe
698 t2 = time.time()
700 is_recent = (
701 (v1 < v2 and v2 - v1 < 2**23)
702 or (v1 > v2 and v1 - v2 > 2**23)
703 or (
704 t2
705 > t1
706 + self._pipe.request.transport_tuning.OBSERVATION_RESET_TIME
707 )
708 )
709 if is_recent:
710 t1 = t2
711 v1 = v2
712 else:
713 # the terminal message is always the last
714 is_recent = True
716 if is_recent:
717 self.observation.callback(next_event.message)
719 if next_event.is_last:
720 self.observation.error(error.ObservationCancelled())
721 return
723 if next_event.message.opt.observe is None:
724 self.observation.error(error.ObservationCancelled())
725 self.log.error(
726 "Pipe indicated more possible responses"
727 " while the Request handler would not know what to"
728 " do with them, stopping any further request."
729 )
730 self._stop_interest()
731 return
734class BlockwiseRequest(BaseUnicastRequest, interfaces.Request):
735 def __init__(self, protocol, app_request):
736 self.protocol = protocol
737 self.log = self.protocol.log.getChild("blockwise-requester")
739 self.response = protocol.loop.create_future()
741 if app_request.opt.observe is not None:
742 self.observation = ClientObservation()
743 else:
744 self.observation = None
746 self._runner = protocol.loop.create_task(
747 self._run_outer(
748 app_request,
749 self.response,
750 weakref.ref(self.observation)
751 if self.observation is not None
752 else lambda: None,
753 self.protocol,
754 self.log,
755 ),
756 name="Blockwise runner for %r" % app_request,
757 )
758 self.response.add_done_callback(self._response_cancellation_handler)
760 def _response_cancellation_handler(self, response_future):
761 # see Request._response_cancellation_handler
762 if self.response.cancelled():
763 self._runner.cancel()
765 @classmethod
766 async def _run_outer(cls, app_request, response, weak_observation, protocol, log):
767 try:
768 await cls._run(app_request, response, weak_observation, protocol, log)
769 except asyncio.CancelledError:
770 pass # results already set
771 except Exception as e:
772 logged = False
773 if not response.done():
774 logged = True
775 response.set_exception(e)
776 obs = weak_observation()
777 if app_request.opt.observe is not None and obs is not None:
778 logged = True
779 obs.error(e)
780 if not logged:
781 # should be unreachable
782 log.error(
783 "Exception in BlockwiseRequest runner neither went to response nor to observation: %s",
784 e,
785 exc_info=e,
786 )
788 # This is a class method because that allows self and self.observation to
789 # be freed even when this task is running, and the task to stop itself --
790 # otherwise we couldn't know when users just "forget" about a request
791 # object after using its response (esp. in observe cases) and leave this
792 # task running.
793 @classmethod
794 async def _run(cls, app_request, response, weak_observation, protocol, log):
795 # we need to populate the remote right away, because the choice of
796 # blocks depends on it.
797 await protocol.find_remote_and_interface(app_request)
799 size_exp = app_request.remote.maximum_block_size_exp
801 if app_request.opt.block1 is not None:
802 warnings.warn(
803 "Setting a block1 option in a managed block-wise transfer is deprecated. Instead, set request.remote.maximum_block_size_exp to the desired value",
804 DeprecationWarning,
805 stacklevel=2,
806 )
807 assert app_request.opt.block1.block_number == 0, (
808 "Unexpected block number in app_request"
809 )
810 assert not app_request.opt.block1.more, (
811 "Unexpected more-flag in app_request"
812 )
813 # this is where the library user can traditionally pass in size
814 # exponent hints into the library.
815 size_exp = app_request.opt.block1.size_exponent
817 # Offset in the message in blocks of size_exp. Whoever changes size_exp
818 # is responsible for updating this number.
819 block_cursor = 0
821 while True:
822 # ... send a chunk
824 if size_exp >= 6:
825 # FIXME from maximum_payload_size
826 fragmentation_threshold = app_request.remote.maximum_payload_size
827 else:
828 fragmentation_threshold = 2 ** (size_exp + 4)
830 if (
831 app_request.opt.block1 is not None
832 or len(app_request.payload) > fragmentation_threshold
833 ):
834 current_block1 = app_request._extract_block(
835 block_cursor, size_exp, app_request.remote.maximum_payload_size
836 )
837 if block_cursor == 0:
838 current_block1.opt.size1 = len(app_request.payload)
839 else:
840 current_block1 = app_request
842 blockrequest = protocol.request(current_block1, handle_blockwise=False)
843 blockresponse = await blockrequest.response
845 # store for future blocks to ensure that the next blocks will be
846 # sent from the same source address (in the UDP case; for many
847 # other transports it won't matter). carrying along locally set block size limitation
848 if (
849 app_request.remote.maximum_block_size_exp
850 < blockresponse.remote.maximum_block_size_exp
851 ):
852 blockresponse.remote.maximum_block_size_exp = (
853 app_request.remote.maximum_block_size_exp
854 )
855 app_request.remote = blockresponse.remote
857 if blockresponse.opt.block1 is None:
858 if blockresponse.code.is_successful() and current_block1.opt.block1:
859 log.warning(
860 "Block1 option completely ignored by server, assuming it knows what it is doing."
861 )
862 # FIXME: handle 4.13 and retry with the indicated size option
863 break
865 block1 = blockresponse.opt.block1
866 log.debug(
867 "Response with Block1 option received, number = %d, more = %d, size_exp = %d.",
868 block1.block_number,
869 block1.more,
870 block1.size_exponent,
871 )
873 if block1.block_number != current_block1.opt.block1.block_number:
874 raise error.UnexpectedBlock1Option("Block number mismatch")
876 if size_exp == 7:
877 block_cursor += len(current_block1.payload) // 1024
878 else:
879 block_cursor += 1
881 while block1.size_exponent < size_exp:
882 block_cursor *= 2
883 size_exp -= 1
885 if not current_block1.opt.block1.more:
886 if block1.more or blockresponse.code == CONTINUE:
887 # treating this as a protocol error -- letting it slip
888 # through would misrepresent the whole operation as an
889 # over-all 2.xx (successful) one.
890 raise error.UnexpectedBlock1Option(
891 "Server asked for more data at end of body"
892 )
893 break
895 # checks before preparing the next round:
897 if blockresponse.opt.observe:
898 # we're not *really* interested in that block, we just sent an
899 # observe option to indicate that we'll want to observe the
900 # resulting representation as a whole
901 log.warning(
902 "Server answered Observe in early Block1 phase, cancelling the erroneous observation."
903 )
904 blockrequest.observe.cancel()
906 if block1.more:
907 # FIXME i think my own server is dowing this wrong
908 # if response.code != CONTINUE:
909 # raise error.UnexpectedBlock1Option("more-flag set but no Continue")
910 pass
911 else:
912 if not blockresponse.code.is_successful():
913 break
914 else:
915 # ignoring (discarding) the successul intermediate result, waiting for a final one
916 continue
918 lower_observation = None
919 if app_request.opt.observe is not None:
920 if blockresponse.opt.observe is not None:
921 lower_observation = blockrequest.observation
922 else:
923 obs = weak_observation()
924 if obs:
925 obs.error(error.NotObservable())
926 del obs
928 assert blockresponse is not None, "Block1 loop broke without setting a response"
929 blockresponse.opt.block1 = None
931 # FIXME check with RFC7959: it just says "send requests similar to the
932 # requests in the Block1 phase", what does that mean? using the last
933 # block1 as a reference for now, especially because in the
934 # only-one-request-block case, that's the original request we must send
935 # again and again anyway
936 assembled_response = await cls._complete_by_requesting_block2(
937 protocol, current_block1, blockresponse, log
938 )
940 response.set_result(assembled_response)
941 # finally set the result
943 if lower_observation is not None:
944 # FIXME this can all be simplified a lot since it's no more
945 # expected that observations shut themselves down when GC'd.
946 obs = weak_observation()
947 del weak_observation
948 if obs is None:
949 lower_observation.cancel()
950 return
951 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask
952 subtask = asyncio.create_task(
953 cls._run_observation(
954 app_request,
955 lower_observation,
956 future_weak_observation,
957 protocol,
958 log,
959 ),
960 name="Blockwise observation for %r" % app_request,
961 )
962 future_weak_observation.set_result(
963 weakref.ref(obs, lambda obs: subtask.cancel())
964 )
965 obs.on_cancel(subtask.cancel)
966 del obs
967 await subtask
969 @classmethod
970 async def _run_observation(
971 cls, original_request, lower_observation, future_weak_observation, protocol, log
972 ):
973 weak_observation = await future_weak_observation
974 # we can use weak_observation() here at any time, because whenever that
975 # becomes None, this task gets cancelled
976 try:
977 async for block1_notification in lower_observation:
978 log.debug("Notification received")
979 full_notification = await cls._complete_by_requesting_block2(
980 protocol, original_request, block1_notification, log
981 )
982 log.debug("Reporting completed notification")
983 weak_observation().callback(full_notification)
984 # FIXME verify that this loop actually ends iff the observation
985 # was cancelled -- otherwise find out the cause(s) or make it not
986 # cancel under indistinguishable circumstances
987 weak_observation().error(error.ObservationCancelled())
988 except asyncio.CancelledError:
989 return
990 except Exception as e:
991 weak_observation().error(e)
992 finally:
993 # We generally avoid idempotent cancellation, but we may have
994 # reached this point either due to an earlier cancellation or
995 # without one
996 if not lower_observation.cancelled:
997 lower_observation.cancel()
999 @classmethod
1000 async def _complete_by_requesting_block2(
1001 cls, protocol, request_to_repeat, initial_response, log
1002 ):
1003 # FIXME this can probably be deduplicated against BlockwiseRequest
1005 if (
1006 initial_response.opt.block2 is None
1007 or initial_response.opt.block2.more is False
1008 ):
1009 initial_response.opt.block2 = None
1010 return initial_response
1012 if initial_response.opt.block2.block_number != 0:
1013 log.error("Error assembling blockwise response (expected first block)")
1014 raise error.UnexpectedBlock2()
1016 assembled_response = initial_response
1017 last_response = initial_response
1018 while True:
1019 current_block2 = request_to_repeat._generate_next_block2_request(
1020 assembled_response
1021 )
1023 current_block2 = current_block2.copy(remote=initial_response.remote)
1025 blockrequest = protocol.request(current_block2, handle_blockwise=False)
1026 last_response = await blockrequest.response
1028 if last_response.opt.block2 is None:
1029 log.warning(
1030 "Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response."
1031 )
1032 return last_response
1034 block2 = last_response.opt.block2
1035 log.debug(
1036 "Response with Block2 option received, number = %d, more = %d, size_exp = %d.",
1037 block2.block_number,
1038 block2.more,
1039 block2.size_exponent,
1040 )
1041 try:
1042 assembled_response._append_response_block(last_response)
1043 except error.Error as e:
1044 log.error("Error assembling blockwise response, passing on error %r", e)
1045 raise
1047 if block2.more is False:
1048 return assembled_response
1051class ClientObservation:
1052 """An interface to observe notification updates arriving on a request.
1054 This class does not actually provide any of the observe functionality, it
1055 is purely a container for dispatching the messages via asynchronous
1056 iteration. It gets driven (ie. populated with responses or errors including
1057 observation termination) by a Request object.
1058 """
1060 def __init__(self):
1061 self.callbacks = []
1062 self.errbacks = []
1064 self.cancelled = False
1065 self._on_cancel = []
1067 self._latest_response = None
1068 # the analogous error is stored in _cancellation_reason when cancelled.
1070 def __aiter__(self):
1071 """`async for` interface to observations.
1073 This is the preferred interface to obtaining observations."""
1074 it = self._Iterator()
1075 self.register_callback(it.push, _suppress_deprecation=True)
1076 self.register_errback(it.push_err, _suppress_deprecation=True)
1077 return it
1079 class _Iterator:
1080 def __init__(self):
1081 self._future = asyncio.get_event_loop().create_future()
1083 def push(self, item):
1084 if self._future.done():
1085 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy
1086 self._future = asyncio.get_event_loop().create_future()
1087 self._future.set_result(item)
1089 def push_err(self, e):
1090 if self._future.done():
1091 self._future = asyncio.get_event_loop().create_future()
1092 self._future.set_exception(e)
1094 async def __anext__(self):
1095 f = self._future
1096 try:
1097 result = await self._future
1098 # FIXME see `await servobs._trigger` comment: might waiting for
1099 # the original future not yield the first future's result when
1100 # a quick second future comes in in a push?
1101 if f is self._future:
1102 self._future = asyncio.get_event_loop().create_future()
1103 return result
1104 except (error.NotObservable, error.ObservationCancelled):
1105 # only exit cleanly when the server -- right away or later --
1106 # states that the resource is not observable any more
1107 # FIXME: check whether an unsuccessful message is still passed
1108 # as an observation result (or whether it should be)
1109 raise StopAsyncIteration
1111 def __del__(self):
1112 if self._future.done():
1113 try:
1114 # Fetch the result so any errors show up at least in the
1115 # finalizer output
1116 self._future.result()
1117 except (error.ObservationCancelled, error.NotObservable):
1118 # This is the case at the end of an observation cancelled
1119 # by the server.
1120 pass
1121 except error.NetworkError:
1122 # This will already have shown up in the main result too.
1123 pass
1124 except (error.LibraryShutdown, asyncio.CancelledError):
1125 pass
1126 # Anything else flying out of this is unexpected and probably a
1127 # library error
1129 # When this function is removed, we can finally do cleanup better. Right
1130 # now, someone could register a callback that doesn't hold any references,
1131 # so we can't just stop the request when nobody holds a reference to this
1132 # any more. Once we're all in pull mode, we can make the `process` function
1133 # that sends data in here use a weak reference (because any possible
1134 # recipient would need to hold a reference to self or the iterator, and
1135 # thus _run).
1136 def register_callback(self, callback, _suppress_deprecation=False):
1137 """Call the callback whenever a response to the message comes in, and
1138 pass the response to it.
1140 The use of this function is deprecated: Use the asynchronous iteration
1141 interface instead."""
1142 if not _suppress_deprecation:
1143 warnings.warn(
1144 "register_callback on observe results is deprected: Use `async for notify in request.observation` instead.",
1145 DeprecationWarning,
1146 stacklevel=2,
1147 )
1148 if self.cancelled:
1149 return
1151 self.callbacks.append(callback)
1152 if self._latest_response is not None:
1153 callback(self._latest_response)
1155 def register_errback(self, callback, _suppress_deprecation=False):
1156 """Call the callback whenever something goes wrong with the
1157 observation, and pass an exception to the callback. After such a
1158 callback is called, no more callbacks will be issued.
1160 The use of this function is deprecated: Use the asynchronous iteration
1161 interface instead."""
1162 if not _suppress_deprecation:
1163 warnings.warn(
1164 "register_errback on observe results is deprected: Use `async for notify in request.observation` instead.",
1165 DeprecationWarning,
1166 stacklevel=2,
1167 )
1168 if self.cancelled:
1169 callback(self._cancellation_reason)
1170 return
1171 self.errbacks.append(callback)
1173 def callback(self, response):
1174 """Notify all listeners of an incoming response"""
1176 self._latest_response = response
1178 for c in self.callbacks:
1179 c(response)
1181 def error(self, exception):
1182 """Notify registered listeners that the observation went wrong. This
1183 can only be called once."""
1185 if self.errbacks is None:
1186 raise RuntimeError(
1187 "Error raised in an already cancelled ClientObservation"
1188 ) from exception
1189 for c in self.errbacks:
1190 c(exception)
1192 self.cancel()
1193 self._cancellation_reason = exception
1195 def cancel(self):
1196 # FIXME determine whether this is called by anything other than error,
1197 # and make it private so there is always a _cancellation_reason
1198 """Cease to generate observation or error events. This will not
1199 generate an error by itself.
1201 This function is only needed while register_callback and
1202 register_errback are around; once their deprecations are acted on,
1203 dropping the asynchronous iterator will automatically cancel the
1204 observation.
1205 """
1207 assert not self.cancelled, "ClientObservation cancelled twice"
1209 # make sure things go wrong when someone tries to continue this
1210 self.errbacks = None
1211 self.callbacks = None
1213 self.cancelled = True
1214 while self._on_cancel:
1215 self._on_cancel.pop()()
1217 self._cancellation_reason = None
1219 def on_cancel(self, callback):
1220 if self.cancelled:
1221 callback()
1222 self._on_cancel.append(callback)
1224 def __repr__(self):
1225 return "<%s %s at %#x>" % (
1226 type(self).__name__,
1227 "(cancelled)"
1228 if self.cancelled
1229 else "(%s call-, %s errback(s))"
1230 % (len(self.callbacks), len(self.errbacks)),
1231 id(self),
1232 )
1235class ServerObservation:
1236 def __init__(self):
1237 self._accepted = False
1238 self._trigger = asyncio.get_event_loop().create_future()
1239 # A deregistration is "early" if it happens before the response message
1240 # is actually sent; calling deregister() in that time (typically during
1241 # `render()`) will not send an unsuccessful response message but just
1242 # sent this flag which is set to None as soon as it is too late for an
1243 # early deregistration.
1244 # This mechanism is temporary until more of aiocoap behaves like
1245 # Pipe which does not suffer from this limitation.
1246 self._early_deregister = False
1247 self._late_deregister = False
1249 def accept(self, cancellation_callback):
1250 self._accepted = True
1251 self._cancellation_callback = cancellation_callback
1253 def deregister(self, reason=None):
1254 if self._early_deregister is False:
1255 self._early_deregister = True
1256 return
1258 warnings.warn(
1259 "Late use of ServerObservation.deregister() is"
1260 " deprecated, use .trigger with an unsuccessful value"
1261 " instead",
1262 DeprecationWarning,
1263 )
1264 self.trigger(
1265 Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable")
1266 )
1268 def trigger(self, response=None, *, is_last=False):
1269 """Send an updated response; if None is given, the observed resource's
1270 rendering will be invoked to produce one.
1272 `is_last` can be set to True to indicate that no more responses will be
1273 sent. Note that an unsuccessful response will be the last no matter
1274 what is_last says, as such a message always terminates a CoAP
1275 observation."""
1276 if is_last:
1277 self._late_deregister = True
1278 if self._trigger.done():
1279 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy
1280 self._trigger = asyncio.get_event_loop().create_future()
1281 self._trigger.set_result(response)