Coverage for aiocoap/protocol.py: 88%
495 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module contains the classes that are responsible for keeping track of
6messages:
8* :class:`Context` roughly represents the CoAP endpoint (basically a UDP
9 socket) -- something that can send requests and possibly can answer
10 incoming requests.
12 Incoming requests are processed in tasks created by the context.
14* a :class:`Request` gets generated whenever a request gets sent to keep
15 track of the response
17Logging
18~~~~~~~
20Several constructors of the Context accept a logger name; these names go into
21the construction of a Python logger.
23Log events will be emitted to these on different levels, with "warning" and
24above being a practical default for things that should may warrant reviewing by
25an operator:
27* DEBUG is used for things that occur even under perfect conditions.
28* INFO is for things that are well expected, but might be interesting during
29 testing a network of nodes and not just when debugging the library. (This
30 includes timeouts, retransmissions, and pings.)
31* WARNING is for everything that indicates a malbehaved peer. These don't
32 *necessarily* indicate a client bug, though: Things like requesting a
33 nonexistent block can just as well happen when a resource's content has
34 changed between blocks. The library will not go out of its way to determine
35 whether there is a plausible explanation for the odd behavior, and will
36 report something as a warning in case of doubt.
37* ERROR is used when something clearly went wrong. This includes irregular
38 connection terminations and resource handler errors (which are demoted to
39 error responses), and can often contain a backtrace.
40"""
42import asyncio
43import weakref
44import time
45from typing import Optional, List
47from . import defaults
48from .credentials import CredentialsMap
49from .message import Message
50from .messagemanager import MessageManager
51from .tokenmanager import TokenManager
52from .pipe import Pipe, run_driving_pipe, error_to_message
53from . import interfaces
54from . import error
55from .numbers import INTERNAL_SERVER_ERROR, NOT_FOUND, CONTINUE, SHUTDOWN_TIMEOUT
57import warnings
58import logging
61class Context(interfaces.RequestProvider):
62 """Applications' entry point to the network
64 A :class:`.Context` coordinates one or more network :mod:`.transports`
65 implementations and dispatches data between them and the application.
67 The application can start requests using the message dispatch methods, and
68 set a :class:`resources.Site` that will answer requests directed to the
69 application as a server.
71 On the library-internals side, it is the prime implementation of the
72 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and
73 :class:`Response` classes on demand, and decides which transport
74 implementations to start and which are to handle which messages.
76 **Context creation and destruction**
78 The following functions are provided for creating and stopping a context:
80 .. note::
82 A typical application should only ever create one context, even (or
83 especially when) it acts both as a server and as a client (in which
84 case a server context should be created).
86 A context that is not used any more must be shut down using
87 :meth:`.shutdown()`, but typical applications will not need to because
88 they use the context for the full process lifetime.
90 .. automethod:: create_client_context
91 .. automethod:: create_server_context
93 .. automethod:: shutdown
95 **Dispatching messages**
97 CoAP requests can be sent using the following functions:
99 .. automethod:: request
101 If more control is needed, you can create a :class:`Request` yourself and
102 pass the context to it.
105 **Other methods and properties**
107 The remaining methods and properties are to be considered unstable even
108 when the project reaches a stable version number; please file a feature
109 request for stabilization if you want to reliably access any of them.
110 """
112 def __init__(
113 self,
114 loop=None,
115 serversite=None,
116 loggername="coap",
117 client_credentials=None,
118 server_credentials=None,
119 ):
120 self.log = logging.getLogger(loggername)
122 self.loop = loop or asyncio.get_event_loop()
124 self.serversite = serversite
126 self.request_interfaces = []
128 self.client_credentials = client_credentials or CredentialsMap()
129 self.server_credentials = server_credentials or CredentialsMap()
131 #
132 # convenience methods for class instanciation
133 #
135 async def _append_tokenmanaged_messagemanaged_transport(
136 self, message_interface_constructor
137 ):
138 tman = TokenManager(self)
139 mman = MessageManager(tman)
140 transport = await message_interface_constructor(mman)
142 mman.message_interface = transport
143 tman.token_interface = mman
145 self.request_interfaces.append(tman)
147 async def _append_tokenmanaged_transport(self, token_interface_constructor):
148 tman = TokenManager(self)
149 transport = await token_interface_constructor(tman)
151 tman.token_interface = transport
153 self.request_interfaces.append(tman)
155 @classmethod
156 async def create_client_context(
157 cls, *, loggername="coap", loop=None, transports: Optional[List[str]] = None
158 ):
159 """Create a context bound to all addresses on a random listening port.
161 This is the easiest way to get a context suitable for sending client
162 requests.
164 :meta private:
165 (not actually private, just hiding from automodule due to being
166 grouped with the important functions)
167 """
169 if loop is None:
170 loop = asyncio.get_event_loop()
172 self = cls(loop=loop, serversite=None, loggername=loggername)
174 selected_transports = transports or defaults.get_default_clienttransports(
175 loop=loop
176 )
178 # FIXME make defaults overridable (postponed until they become configurable too)
179 for transportname in selected_transports:
180 if transportname == "udp6":
181 from .transports.udp6 import MessageInterfaceUDP6
183 await self._append_tokenmanaged_messagemanaged_transport(
184 lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint(
185 mman, log=self.log, loop=loop
186 )
187 )
188 elif transportname == "simple6":
189 from .transports.simple6 import MessageInterfaceSimple6
191 await self._append_tokenmanaged_messagemanaged_transport(
192 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(
193 mman, log=self.log, loop=loop
194 )
195 )
196 elif transportname == "tinydtls":
197 from .transports.tinydtls import MessageInterfaceTinyDTLS
199 await self._append_tokenmanaged_messagemanaged_transport(
200 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(
201 mman, log=self.log, loop=loop
202 )
203 )
204 elif transportname == "tcpclient":
205 from .transports.tcp import TCPClient
207 await self._append_tokenmanaged_transport(
208 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)
209 )
210 elif transportname == "tlsclient":
211 from .transports.tls import TLSClient
213 await self._append_tokenmanaged_transport(
214 lambda tman: TLSClient.create_client_transport(
215 tman, self.log, loop, self.client_credentials
216 )
217 )
218 elif transportname == "ws":
219 from .transports.ws import WSPool
221 await self._append_tokenmanaged_transport(
222 lambda tman: WSPool.create_transport(
223 tman, self.log, loop, client_credentials=self.client_credentials
224 )
225 )
226 elif transportname == "oscore":
227 from .transports.oscore import TransportOSCORE
229 oscoretransport = TransportOSCORE(self, self)
230 self.request_interfaces.append(oscoretransport)
231 else:
232 raise RuntimeError(
233 "Transport %r not know for client context creation" % transportname
234 )
236 return self
238 @classmethod
239 async def create_server_context(
240 cls,
241 site,
242 bind=None,
243 *,
244 loggername="coap-server",
245 loop=None,
246 _ssl_context=None,
247 multicast=[],
248 server_credentials=None,
249 transports: Optional[List[str]] = None,
250 ):
251 """Create a context, bound to all addresses on the CoAP port (unless
252 otherwise specified in the ``bind`` argument).
254 This is the easiest way to get a context suitable both for sending
255 client and accepting server requests.
257 The ``bind`` argument, if given, needs to be a 2-tuple of IP address
258 string and port number, where the port number can be None to use the default port.
260 If ``multicast`` is given, it needs to be a list of (multicast address,
261 interface name) tuples, which will all be joined. (The IPv4 style of
262 selecting the interface by a local address is not supported; users may
263 want to use the netifaces package to arrive at an interface name for an
264 address).
266 As a shortcut, the list may also contain interface names alone. Those
267 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with
268 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6.
270 Under some circumstances you may already need a context to pass into
271 the site for creation; this is typically the case for servers that
272 trigger requests on their own. For those cases, it is usually easiest
273 to pass None in as a site, and set the fully constructed site later by
274 assigning to the ``serversite`` attribute.
276 :meta private:
277 (not actually private, just hiding from automodule due to being
278 grouped with the important functions)
279 """
281 if loop is None:
282 loop = asyncio.get_event_loop()
284 self = cls(
285 loop=loop,
286 serversite=site,
287 loggername=loggername,
288 server_credentials=server_credentials,
289 )
291 multicast_done = not multicast
293 selected_transports = transports or defaults.get_default_servertransports(
294 loop=loop
295 )
297 for transportname in selected_transports:
298 if transportname == "udp6":
299 from .transports.udp6 import MessageInterfaceUDP6
301 await self._append_tokenmanaged_messagemanaged_transport(
302 lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint(
303 mman, log=self.log, loop=loop, bind=bind, multicast=multicast
304 )
305 )
306 multicast_done = True
307 # FIXME this is duplicated from the client version, as those are client-only anyway
308 elif transportname == "simple6":
309 from .transports.simple6 import MessageInterfaceSimple6
311 await self._append_tokenmanaged_messagemanaged_transport(
312 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint(
313 mman, log=self.log, loop=loop
314 )
315 )
316 elif transportname == "tinydtls":
317 from .transports.tinydtls import MessageInterfaceTinyDTLS
319 await self._append_tokenmanaged_messagemanaged_transport(
320 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint(
321 mman, log=self.log, loop=loop
322 )
323 )
324 # FIXME end duplication
325 elif transportname == "tinydtls_server":
326 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer
328 await self._append_tokenmanaged_messagemanaged_transport(
329 lambda mman: MessageInterfaceTinyDTLSServer.create_server(
330 bind,
331 mman,
332 log=self.log,
333 loop=loop,
334 server_credentials=self.server_credentials,
335 )
336 )
337 elif transportname == "simplesocketserver":
338 from .transports.simplesocketserver import MessageInterfaceSimpleServer
340 await self._append_tokenmanaged_messagemanaged_transport(
341 lambda mman: MessageInterfaceSimpleServer.create_server(
342 bind, mman, log=self.log, loop=loop
343 )
344 )
345 elif transportname == "tcpserver":
346 from .transports.tcp import TCPServer
348 await self._append_tokenmanaged_transport(
349 lambda tman: TCPServer.create_server(bind, tman, self.log, loop)
350 )
351 elif transportname == "tcpclient":
352 from .transports.tcp import TCPClient
354 await self._append_tokenmanaged_transport(
355 lambda tman: TCPClient.create_client_transport(tman, self.log, loop)
356 )
357 elif transportname == "tlsserver":
358 if _ssl_context is not None:
359 from .transports.tls import TLSServer
361 await self._append_tokenmanaged_transport(
362 lambda tman: TLSServer.create_server(
363 bind, tman, self.log, loop, _ssl_context
364 )
365 )
366 elif transportname == "tlsclient":
367 from .transports.tls import TLSClient
369 await self._append_tokenmanaged_transport(
370 lambda tman: TLSClient.create_client_transport(
371 tman, self.log, loop, self.client_credentials
372 )
373 )
374 elif transportname == "ws":
375 from .transports.ws import WSPool
377 await self._append_tokenmanaged_transport(
378 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind
379 lambda tman: WSPool.create_transport(
380 tman,
381 self.log,
382 loop,
383 client_credentials=self.client_credentials,
384 server_bind=bind or (None, None),
385 server_context=_ssl_context,
386 )
387 )
388 elif transportname == "oscore":
389 from .transports.oscore import TransportOSCORE
391 oscoretransport = TransportOSCORE(self, self)
392 self.request_interfaces.append(oscoretransport)
393 else:
394 raise RuntimeError(
395 "Transport %r not know for server context creation" % transportname
396 )
398 if not multicast_done:
399 self.log.warning(
400 "Multicast was requested, but no multicast capable transport was selected."
401 )
403 # This is used in tests to wait for externally launched servers to be ready
404 self.log.debug("Server ready to receive requests")
406 return self
408 async def shutdown(self):
409 """Take down any listening sockets and stop all related timers.
411 After this coroutine terminates, and once all external references to
412 the object are dropped, it should be garbage-collectable.
414 This method takes up to
415 :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing
416 transports to perform any cleanup implemented in them (such as orderly
417 connection shutdown and cancelling observations, where the latter is
418 currently not implemented).
420 :meta private:
421 (not actually private, just hiding from automodule due to being
422 grouped with the important functions)
423 """
425 self.log.debug("Shutting down context")
427 done, pending = await asyncio.wait(
428 [
429 asyncio.create_task(
430 ri.shutdown(),
431 name="Shutdown of %r" % ri,
432 )
433 for ri in self.request_interfaces
434 ],
435 timeout=SHUTDOWN_TIMEOUT,
436 )
437 for item in done:
438 await item
439 if pending:
440 # Apart from being useful to see, this also ensures that developers
441 # see the error in the logs during test suite runs -- and the error
442 # should be easier to follow than the "we didn't garbage collect
443 # everything" errors we see anyway (or otherwise, if the error is
444 # escalated into a test failure)
445 self.log.error(
446 "Shutdown timeout exceeded, returning anyway. Interfaces still busy: %s",
447 pending,
448 )
450 # FIXME: determine how official this should be, or which part of it is
451 # public -- now that BlockwiseRequest uses it. (And formalize what can
452 # change about messages and what can't after the remote has been thusly
453 # populated).
454 async def find_remote_and_interface(self, message):
455 if message.remote is None:
456 raise error.MissingRemoteError()
457 for ri in self.request_interfaces:
458 if await ri.fill_or_recognize_remote(message):
459 return ri
460 raise RuntimeError("No request interface could route message")
462 def request(self, request_message, handle_blockwise=True):
463 if handle_blockwise:
464 return BlockwiseRequest(self, request_message)
466 pipe = Pipe(request_message, self.log)
467 # Request sets up callbacks at creation
468 result = Request(pipe, self.loop, self.log)
470 async def send():
471 try:
472 request_interface = await self.find_remote_and_interface(
473 request_message
474 )
475 request_interface.request(pipe)
476 except Exception as e:
477 pipe.add_exception(e)
478 return
480 self.loop.create_task(
481 send(),
482 name="Request processing of %r" % result,
483 )
484 return result
486 # the following are under consideration for moving into Site or something
487 # mixed into it
489 def render_to_pipe(self, pipe):
490 """Fill a pipe by running the site's render_to_pipe interface and
491 handling errors."""
493 pr_that_can_receive_errors = error_to_message(pipe, self.log)
495 run_driving_pipe(
496 pr_that_can_receive_errors,
497 self._render_to_pipe(pipe),
498 name="Rendering for %r" % pipe.request,
499 )
501 async def _render_to_pipe(self, pipe):
502 if self.serversite is None:
503 pipe.add_response(
504 Message(code=NOT_FOUND, payload=b"not a server"), is_last=True
505 )
506 return
508 return await self.serversite.render_to_pipe(pipe)
511class BaseRequest(object):
512 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`"""
515class BaseUnicastRequest(BaseRequest):
516 """A utility class that offers the :attr:`response_raising` and
517 :attr:`response_nonraising` alternatives to waiting for the
518 :attr:`response` future whose error states can be presented either as an
519 unsuccessful response (eg. 4.04) or an exception.
521 It also provides some internal tools for handling anything that has a
522 :attr:`response` future and an :attr:`observation`"""
524 @property
525 async def response_raising(self):
526 """An awaitable that returns if a response comes in and is successful,
527 otherwise raises generic network exception or a
528 :class:`.error.ResponseWrappingError` for unsuccessful responses.
530 Experimental Interface."""
532 response = await self.response
533 if not response.code.is_successful():
534 raise error.ResponseWrappingError(response)
536 return response
538 @property
539 async def response_nonraising(self):
540 """An awaitable that rather returns a 500ish fabricated message (as a
541 proxy would return) instead of raising an exception.
543 Experimental Interface."""
545 # FIXME: Can we smuggle error_to_message into the underlying pipe?
546 # That should make observe notifications into messages rather
547 # than exceptions as well, plus it has fallbacks for `e.to_message()`
548 # raising.
550 try:
551 return await self.response
552 except error.RenderableError as e:
553 return e.to_message()
554 except Exception:
555 return Message(code=INTERNAL_SERVER_ERROR)
558class Request(interfaces.Request, BaseUnicastRequest):
559 # FIXME: Implement timing out with REQUEST_TIMEOUT here
561 def __init__(self, pipe, loop, log):
562 self._pipe = pipe
564 self.response = loop.create_future()
566 if pipe.request.opt.observe == 0:
567 self.observation = ClientObservation()
568 else:
569 self.observation = None
571 self._runner = self._run()
572 self._runner.send(None)
574 def process(event):
575 try:
576 # would be great to have self or the runner as weak ref, but
577 # see ClientObservation.register_callback comments -- while
578 # that is around, we can't weakref here.
579 self._runner.send(event)
580 return True
581 except StopIteration:
582 return False
584 self._stop_interest = self._pipe.on_event(process)
586 self.log = log
588 self.response.add_done_callback(self._response_cancellation_handler)
590 def _response_cancellation_handler(self, response):
591 # Propagate cancellation to the runner (if interest in the first
592 # response is lost, there won't be observation items to pull out), but
593 # not general completion (because if it's completed and not cancelled,
594 # eg. when an observation is active)
595 if self.response.cancelled() and self._runner is not None:
596 # Dropping the only reference makes it stop with GeneratorExit,
597 # similar to a cancelled task
598 self._runner = None
599 self._stop_interest()
600 # Otherwise, there will be a runner still around, and it's its task to
601 # call _stop_interest.
603 @staticmethod
604 def _add_response_properties(response, request):
605 response.request = request
607 def _run(self):
608 # FIXME: This is in iterator form because it used to be a task that
609 # awaited futures, and that code could be easily converted to an
610 # iterator. I'm not sure that's a bad state here, but at least it
611 # should be a more conscious decision to make this an iterator rather
612 # than just having it happen to be one.
613 #
614 # FIXME: check that responses come from the same remmote as long as we're assuming unicast
616 first_event = yield None
618 if first_event.message is not None:
619 self._add_response_properties(first_event.message, self._pipe.request)
620 self.response.set_result(first_event.message)
621 else:
622 self.response.set_exception(first_event.exception)
623 if not isinstance(first_event.exception, error.Error):
624 self.log.warning(
625 "An exception that is not an aiocoap Error was raised "
626 "from a transport; please report this as a bug in "
627 "aiocoap: %r",
628 first_event.exception,
629 )
631 if self.observation is None:
632 if not first_event.is_last:
633 self.log.error(
634 "Pipe indicated more possible responses"
635 " while the Request handler would not know what to"
636 " do with them, stopping any further request."
637 )
638 self._stop_interest()
639 return
641 if first_event.is_last:
642 self.observation.error(error.NotObservable())
643 return
645 if first_event.message.opt.observe is None:
646 self.log.error(
647 "Pipe indicated more possible responses"
648 " while the Request handler would not know what to"
649 " do with them, stopping any further request."
650 )
651 self._stop_interest()
652 return
654 # variable names from RFC7641 Section 3.4
655 v1 = first_event.message.opt.observe
656 t1 = time.time()
658 while True:
659 # We don't really support cancellation of observations yet (see
660 # https://github.com/chrysn/aiocoap/issues/92), but at least
661 # stopping the interest is a way to free the local resources after
662 # the first observation update, and to make the MID handler RST the
663 # observation on the next.
664 # FIXME: there *is* now a .on_cancel callback, we should at least
665 # hook into that, and possibly even send a proper cancellation
666 # then.
667 next_event = yield True
668 if self.observation.cancelled:
669 self._stop_interest()
670 return
672 if next_event.exception is not None:
673 self.observation.error(next_event.exception)
674 if not next_event.is_last:
675 self._stop_interest()
676 if not isinstance(next_event.exception, error.Error):
677 self.log.warning(
678 "An exception that is not an aiocoap Error was "
679 "raised from a transport during an observation; "
680 "please report this as a bug in aiocoap: %r",
681 next_event.exception,
682 )
683 return
685 self._add_response_properties(next_event.message, self._pipe.request)
687 if next_event.message.opt.observe is not None:
688 # check for reordering
689 v2 = next_event.message.opt.observe
690 t2 = time.time()
692 is_recent = (
693 (v1 < v2 and v2 - v1 < 2**23)
694 or (v1 > v2 and v1 - v2 > 2**23)
695 or (
696 t2
697 > t1
698 + self._pipe.request.transport_tuning.OBSERVATION_RESET_TIME
699 )
700 )
701 if is_recent:
702 t1 = t2
703 v1 = v2
704 else:
705 # the terminal message is always the last
706 is_recent = True
708 if is_recent:
709 self.observation.callback(next_event.message)
711 if next_event.is_last:
712 self.observation.error(error.ObservationCancelled())
713 return
715 if next_event.message.opt.observe is None:
716 self.observation.error(error.ObservationCancelled())
717 self.log.error(
718 "Pipe indicated more possible responses"
719 " while the Request handler would not know what to"
720 " do with them, stopping any further request."
721 )
722 self._stop_interest()
723 return
726class BlockwiseRequest(BaseUnicastRequest, interfaces.Request):
727 def __init__(self, protocol, app_request):
728 self.protocol = protocol
729 self.log = self.protocol.log.getChild("blockwise-requester")
731 self.response = protocol.loop.create_future()
733 if app_request.opt.observe is not None:
734 self.observation = ClientObservation()
735 else:
736 self.observation = None
738 self._runner = protocol.loop.create_task(
739 self._run_outer(
740 app_request,
741 self.response,
742 weakref.ref(self.observation)
743 if self.observation is not None
744 else lambda: None,
745 self.protocol,
746 self.log,
747 ),
748 name="Blockwise runner for %r" % app_request,
749 )
750 self.response.add_done_callback(self._response_cancellation_handler)
752 def _response_cancellation_handler(self, response_future):
753 # see Request._response_cancellation_handler
754 if self.response.cancelled():
755 self._runner.cancel()
757 @classmethod
758 async def _run_outer(cls, app_request, response, weak_observation, protocol, log):
759 try:
760 await cls._run(app_request, response, weak_observation, protocol, log)
761 except asyncio.CancelledError:
762 pass # results already set
763 except Exception as e:
764 logged = False
765 if not response.done():
766 logged = True
767 response.set_exception(e)
768 obs = weak_observation()
769 if app_request.opt.observe is not None and obs is not None:
770 logged = True
771 obs.error(e)
772 if not logged:
773 # should be unreachable
774 log.error(
775 "Exception in BlockwiseRequest runner neither went to response nor to observation: %s",
776 e,
777 exc_info=e,
778 )
780 # This is a class method because that allows self and self.observation to
781 # be freed even when this task is running, and the task to stop itself --
782 # otherwise we couldn't know when users just "forget" about a request
783 # object after using its response (esp. in observe cases) and leave this
784 # task running.
785 @classmethod
786 async def _run(cls, app_request, response, weak_observation, protocol, log):
787 # we need to populate the remote right away, because the choice of
788 # blocks depends on it.
789 await protocol.find_remote_and_interface(app_request)
791 size_exp = app_request.remote.maximum_block_size_exp
793 if app_request.opt.block1 is not None:
794 warnings.warn(
795 "Setting a block1 option in a managed block-wise transfer is deprecated. Instead, set request.remote.maximum_block_size_exp to the desired value",
796 DeprecationWarning,
797 stacklevel=2,
798 )
799 assert (
800 app_request.opt.block1.block_number == 0
801 ), "Unexpected block number in app_request"
802 assert (
803 not app_request.opt.block1.more
804 ), "Unexpected more-flag in app_request"
805 # this is where the library user can traditionally pass in size
806 # exponent hints into the library.
807 size_exp = app_request.opt.block1.size_exponent
809 # Offset in the message in blocks of size_exp. Whoever changes size_exp
810 # is responsible for updating this number.
811 block_cursor = 0
813 while True:
814 # ... send a chunk
816 if size_exp >= 6:
817 # FIXME from maximum_payload_size
818 fragmentation_threshold = app_request.remote.maximum_payload_size
819 else:
820 fragmentation_threshold = 2 ** (size_exp + 4)
822 if (
823 app_request.opt.block1 is not None
824 or len(app_request.payload) > fragmentation_threshold
825 ):
826 current_block1 = app_request._extract_block(
827 block_cursor, size_exp, app_request.remote.maximum_payload_size
828 )
829 if block_cursor == 0:
830 current_block1.opt.size1 = len(app_request.payload)
831 else:
832 current_block1 = app_request
834 blockrequest = protocol.request(current_block1, handle_blockwise=False)
835 blockresponse = await blockrequest.response
837 # store for future blocks to ensure that the next blocks will be
838 # sent from the same source address (in the UDP case; for many
839 # other transports it won't matter). carrying along locally set block size limitation
840 if (
841 app_request.remote.maximum_block_size_exp
842 < blockresponse.remote.maximum_block_size_exp
843 ):
844 blockresponse.remote.maximum_block_size_exp = (
845 app_request.remote.maximum_block_size_exp
846 )
847 app_request.remote = blockresponse.remote
849 if blockresponse.opt.block1 is None:
850 if blockresponse.code.is_successful() and current_block1.opt.block1:
851 log.warning(
852 "Block1 option completely ignored by server, assuming it knows what it is doing."
853 )
854 # FIXME: handle 4.13 and retry with the indicated size option
855 break
857 block1 = blockresponse.opt.block1
858 log.debug(
859 "Response with Block1 option received, number = %d, more = %d, size_exp = %d.",
860 block1.block_number,
861 block1.more,
862 block1.size_exponent,
863 )
865 if block1.block_number != current_block1.opt.block1.block_number:
866 raise error.UnexpectedBlock1Option("Block number mismatch")
868 if size_exp == 7:
869 block_cursor += len(current_block1.payload) // 1024
870 else:
871 block_cursor += 1
873 while block1.size_exponent < size_exp:
874 block_cursor *= 2
875 size_exp -= 1
877 if not current_block1.opt.block1.more:
878 if block1.more or blockresponse.code == CONTINUE:
879 # treating this as a protocol error -- letting it slip
880 # through would misrepresent the whole operation as an
881 # over-all 2.xx (successful) one.
882 raise error.UnexpectedBlock1Option(
883 "Server asked for more data at end of body"
884 )
885 break
887 # checks before preparing the next round:
889 if blockresponse.opt.observe:
890 # we're not *really* interested in that block, we just sent an
891 # observe option to indicate that we'll want to observe the
892 # resulting representation as a whole
893 log.warning(
894 "Server answered Observe in early Block1 phase, cancelling the erroneous observation."
895 )
896 blockrequest.observe.cancel()
898 if block1.more:
899 # FIXME i think my own server is dowing this wrong
900 # if response.code != CONTINUE:
901 # raise error.UnexpectedBlock1Option("more-flag set but no Continue")
902 pass
903 else:
904 if not blockresponse.code.is_successful():
905 break
906 else:
907 # ignoring (discarding) the successul intermediate result, waiting for a final one
908 continue
910 lower_observation = None
911 if app_request.opt.observe is not None:
912 if blockresponse.opt.observe is not None:
913 lower_observation = blockrequest.observation
914 else:
915 obs = weak_observation()
916 if obs:
917 obs.error(error.NotObservable())
918 del obs
920 assert blockresponse is not None, "Block1 loop broke without setting a response"
921 blockresponse.opt.block1 = None
923 # FIXME check with RFC7959: it just says "send requests similar to the
924 # requests in the Block1 phase", what does that mean? using the last
925 # block1 as a reference for now, especially because in the
926 # only-one-request-block case, that's the original request we must send
927 # again and again anyway
928 assembled_response = await cls._complete_by_requesting_block2(
929 protocol, current_block1, blockresponse, log
930 )
932 response.set_result(assembled_response)
933 # finally set the result
935 if lower_observation is not None:
936 # FIXME this can all be simplified a lot since it's no more
937 # expected that observations shut themselves down when GC'd.
938 obs = weak_observation()
939 del weak_observation
940 if obs is None:
941 lower_observation.cancel()
942 return
943 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask
944 subtask = asyncio.create_task(
945 cls._run_observation(
946 app_request,
947 lower_observation,
948 future_weak_observation,
949 protocol,
950 log,
951 ),
952 name="Blockwise observation for %r" % app_request,
953 )
954 future_weak_observation.set_result(
955 weakref.ref(obs, lambda obs: subtask.cancel())
956 )
957 obs.on_cancel(subtask.cancel)
958 del obs
959 await subtask
961 @classmethod
962 async def _run_observation(
963 cls, original_request, lower_observation, future_weak_observation, protocol, log
964 ):
965 weak_observation = await future_weak_observation
966 # we can use weak_observation() here at any time, because whenever that
967 # becomes None, this task gets cancelled
968 try:
969 async for block1_notification in lower_observation:
970 log.debug("Notification received")
971 full_notification = await cls._complete_by_requesting_block2(
972 protocol, original_request, block1_notification, log
973 )
974 log.debug("Reporting completed notification")
975 weak_observation().callback(full_notification)
976 # FIXME verify that this loop actually ends iff the observation
977 # was cancelled -- otherwise find out the cause(s) or make it not
978 # cancel under indistinguishable circumstances
979 weak_observation().error(error.ObservationCancelled())
980 except asyncio.CancelledError:
981 return
982 except Exception as e:
983 weak_observation().error(e)
984 finally:
985 # We generally avoid idempotent cancellation, but we may have
986 # reached this point either due to an earlier cancellation or
987 # without one
988 if not lower_observation.cancelled:
989 lower_observation.cancel()
991 @classmethod
992 async def _complete_by_requesting_block2(
993 cls, protocol, request_to_repeat, initial_response, log
994 ):
995 # FIXME this can probably be deduplicated against BlockwiseRequest
997 if (
998 initial_response.opt.block2 is None
999 or initial_response.opt.block2.more is False
1000 ):
1001 initial_response.opt.block2 = None
1002 return initial_response
1004 if initial_response.opt.block2.block_number != 0:
1005 log.error("Error assembling blockwise response (expected first block)")
1006 raise error.UnexpectedBlock2()
1008 assembled_response = initial_response
1009 last_response = initial_response
1010 while True:
1011 current_block2 = request_to_repeat._generate_next_block2_request(
1012 assembled_response
1013 )
1015 current_block2 = current_block2.copy(remote=initial_response.remote)
1017 blockrequest = protocol.request(current_block2, handle_blockwise=False)
1018 last_response = await blockrequest.response
1020 if last_response.opt.block2 is None:
1021 log.warning(
1022 "Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response."
1023 )
1024 return last_response
1026 block2 = last_response.opt.block2
1027 log.debug(
1028 "Response with Block2 option received, number = %d, more = %d, size_exp = %d.",
1029 block2.block_number,
1030 block2.more,
1031 block2.size_exponent,
1032 )
1033 try:
1034 assembled_response._append_response_block(last_response)
1035 except error.Error as e:
1036 log.error("Error assembling blockwise response, passing on error %r", e)
1037 raise
1039 if block2.more is False:
1040 return assembled_response
1043class ClientObservation:
1044 """An interface to observe notification updates arriving on a request.
1046 This class does not actually provide any of the observe functionality, it
1047 is purely a container for dispatching the messages via asynchronous
1048 iteration. It gets driven (ie. populated with responses or errors including
1049 observation termination) by a Request object.
1050 """
1052 def __init__(self):
1053 self.callbacks = []
1054 self.errbacks = []
1056 self.cancelled = False
1057 self._on_cancel = []
1059 self._latest_response = None
1060 # the analogous error is stored in _cancellation_reason when cancelled.
1062 def __aiter__(self):
1063 """`async for` interface to observations.
1065 This is the preferred interface to obtaining observations."""
1066 it = self._Iterator()
1067 self.register_callback(it.push, _suppress_deprecation=True)
1068 self.register_errback(it.push_err, _suppress_deprecation=True)
1069 return it
1071 class _Iterator:
1072 def __init__(self):
1073 self._future = asyncio.get_event_loop().create_future()
1075 def push(self, item):
1076 if self._future.done():
1077 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy
1078 self._future = asyncio.get_event_loop().create_future()
1079 self._future.set_result(item)
1081 def push_err(self, e):
1082 if self._future.done():
1083 self._future = asyncio.get_event_loop().create_future()
1084 self._future.set_exception(e)
1086 async def __anext__(self):
1087 f = self._future
1088 try:
1089 result = await self._future
1090 # FIXME see `await servobs._trigger` comment: might waiting for
1091 # the original future not yield the first future's result when
1092 # a quick second future comes in in a push?
1093 if f is self._future:
1094 self._future = asyncio.get_event_loop().create_future()
1095 return result
1096 except (error.NotObservable, error.ObservationCancelled):
1097 # only exit cleanly when the server -- right away or later --
1098 # states that the resource is not observable any more
1099 # FIXME: check whether an unsuccessful message is still passed
1100 # as an observation result (or whether it should be)
1101 raise StopAsyncIteration
1103 def __del__(self):
1104 if self._future.done():
1105 try:
1106 # Fetch the result so any errors show up at least in the
1107 # finalizer output
1108 self._future.result()
1109 except (error.ObservationCancelled, error.NotObservable):
1110 # This is the case at the end of an observation cancelled
1111 # by the server.
1112 pass
1113 except error.NetworkError:
1114 # This will already have shown up in the main result too.
1115 pass
1116 except (error.LibraryShutdown, asyncio.CancelledError):
1117 pass
1118 # Anything else flying out of this is unexpected and probably a
1119 # library error
1121 # When this function is removed, we can finally do cleanup better. Right
1122 # now, someone could register a callback that doesn't hold any references,
1123 # so we can't just stop the request when nobody holds a reference to this
1124 # any more. Once we're all in pull mode, we can make the `process` function
1125 # that sends data in here use a weak reference (because any possible
1126 # recipient would need to hold a reference to self or the iterator, and
1127 # thus _run).
1128 def register_callback(self, callback, _suppress_deprecation=False):
1129 """Call the callback whenever a response to the message comes in, and
1130 pass the response to it.
1132 The use of this function is deprecated: Use the asynchronous iteration
1133 interface instead."""
1134 if not _suppress_deprecation:
1135 warnings.warn(
1136 "register_callback on observe results is deprected: Use `async for notify in request.observation` instead.",
1137 DeprecationWarning,
1138 stacklevel=2,
1139 )
1140 if self.cancelled:
1141 return
1143 self.callbacks.append(callback)
1144 if self._latest_response is not None:
1145 callback(self._latest_response)
1147 def register_errback(self, callback, _suppress_deprecation=False):
1148 """Call the callback whenever something goes wrong with the
1149 observation, and pass an exception to the callback. After such a
1150 callback is called, no more callbacks will be issued.
1152 The use of this function is deprecated: Use the asynchronous iteration
1153 interface instead."""
1154 if not _suppress_deprecation:
1155 warnings.warn(
1156 "register_errback on observe results is deprected: Use `async for notify in request.observation` instead.",
1157 DeprecationWarning,
1158 stacklevel=2,
1159 )
1160 if self.cancelled:
1161 callback(self._cancellation_reason)
1162 return
1163 self.errbacks.append(callback)
1165 def callback(self, response):
1166 """Notify all listeners of an incoming response"""
1168 self._latest_response = response
1170 for c in self.callbacks:
1171 c(response)
1173 def error(self, exception):
1174 """Notify registered listeners that the observation went wrong. This
1175 can only be called once."""
1177 if self.errbacks is None:
1178 raise RuntimeError(
1179 "Error raised in an already cancelled ClientObservation"
1180 ) from exception
1181 for c in self.errbacks:
1182 c(exception)
1184 self.cancel()
1185 self._cancellation_reason = exception
1187 def cancel(self):
1188 # FIXME determine whether this is called by anything other than error,
1189 # and make it private so there is always a _cancellation_reason
1190 """Cease to generate observation or error events. This will not
1191 generate an error by itself.
1193 This function is only needed while register_callback and
1194 register_errback are around; once their deprecations are acted on,
1195 dropping the asynchronous iterator will automatically cancel the
1196 observation.
1197 """
1199 assert not self.cancelled, "ClientObservation cancelled twice"
1201 # make sure things go wrong when someone tries to continue this
1202 self.errbacks = None
1203 self.callbacks = None
1205 self.cancelled = True
1206 while self._on_cancel:
1207 self._on_cancel.pop()()
1209 self._cancellation_reason = None
1211 def on_cancel(self, callback):
1212 if self.cancelled:
1213 callback()
1214 self._on_cancel.append(callback)
1216 def __repr__(self):
1217 return "<%s %s at %#x>" % (
1218 type(self).__name__,
1219 "(cancelled)"
1220 if self.cancelled
1221 else "(%s call-, %s errback(s))"
1222 % (len(self.callbacks), len(self.errbacks)),
1223 id(self),
1224 )
1227class ServerObservation:
1228 def __init__(self):
1229 self._accepted = False
1230 self._trigger = asyncio.get_event_loop().create_future()
1231 # A deregistration is "early" if it happens before the response message
1232 # is actually sent; calling deregister() in that time (typically during
1233 # `render()`) will not send an unsuccessful response message but just
1234 # sent this flag which is set to None as soon as it is too late for an
1235 # early deregistration.
1236 # This mechanism is temporary until more of aiocoap behaves like
1237 # Pipe which does not suffer from this limitation.
1238 self._early_deregister = False
1239 self._late_deregister = False
1241 def accept(self, cancellation_callback):
1242 self._accepted = True
1243 self._cancellation_callback = cancellation_callback
1245 def deregister(self, reason=None):
1246 if self._early_deregister is False:
1247 self._early_deregister = True
1248 return
1250 warnings.warn(
1251 "Late use of ServerObservation.deregister() is"
1252 " deprecated, use .trigger with an unsuccessful value"
1253 " instead",
1254 DeprecationWarning,
1255 )
1256 self.trigger(
1257 Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable")
1258 )
1260 def trigger(self, response=None, *, is_last=False):
1261 """Send an updated response; if None is given, the observed resource's
1262 rendering will be invoked to produce one.
1264 `is_last` can be set to True to indicate that no more responses will be
1265 sent. Note that an unsuccessful response will be the last no matter
1266 what is_last says, as such a message always terminates a CoAP
1267 observation."""
1268 if is_last:
1269 self._late_deregister = True
1270 if self._trigger.done():
1271 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy
1272 self._trigger = asyncio.get_event_loop().create_future()
1273 self._trigger.set_result(response)