Coverage for aiocoap / messagemanager.py: 83%
225 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:32 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module contains all internals needed to manage messages on unreliable
6transports, ie. everything that deals in message types or Message IDs.
8Currently, it also provides the mechanisms for managing tokens, but those will
9be split into dedicated classes.
10"""
12import asyncio
13import functools
14import random
15from typing import Callable, Dict, List, Tuple, Optional
17from . import error
18from . import interfaces
19from .interfaces import EndpointAddress
20from .message import Message
21from .numbers.types import CON, ACK, RST, NON
22from .numbers.codes import EMPTY
25class MessageManager(interfaces.TokenInterface, interfaces.MessageManager):
26 """This MessageManager Drives a message interface following the rules of
27 RFC7252 CoAP over UDP.
29 It takes care of picking message IDs (mid) for outgoing messages,
30 retransmitting CON messages, and to react appropriately to incoming
31 messages' type, sending ACKs either immediately or later.
33 It creates piggy-backed responses by keeping an eye on the tokens the
34 messages are sent with, but otherwise ignores the tokens. (It inspects
35 tokens *only* where required by its sub-layer).
36 """
38 message_interface: interfaces.MessageInterface
39 # needs to be set post-construction, because the message_interface in its constructor already needs to get its manager
41 def __init__(self, token_manager) -> None:
42 self.token_manager = token_manager
44 self.message_id = random.randint(0, 65535)
45 #: Tracker of recently received messages (by remote and message ID).
46 #: Maps them to a response message when one is already known.
47 self._recent_messages: Dict[Tuple[EndpointAddress, int], Optional[Message]] = {}
48 #: Active exchanges i.e. sent CON messages (remote, message-id):
49 #: (messageerror_monitor monitor, cancellable timeout)
50 self._active_exchanges: Dict[
51 Tuple[EndpointAddress, int], Tuple[Callable[[], None], asyncio.Handle]
52 ] = {}
53 #: Per-remote list of (backlogged package, messageerror_monitor)
54 #: tuples (keys exist iff there is an active_exchange with that node)
55 self._backlogs: Dict[
56 EndpointAddress, List[Tuple[Message, Callable[[], None]]]
57 ] = {}
59 #: Maps pending remote/token combinations to the MID a response can be
60 #: piggybacked on, and the timeout that should be cancelled if it is.
61 self._piggyback_opportunities: Dict[
62 Tuple[EndpointAddress, bytes], Tuple[int, asyncio.TimerHandle]
63 ] = {}
65 self.log = token_manager.log
66 self.loop = token_manager.loop
68 def __repr__(self):
69 return "<%s for %s>" % (
70 type(self).__name__,
71 getattr(self, "message_interface", "(unbound)"),
72 )
74 @property
75 def client_credentials(self):
76 return self.token_manager.client_credentials
78 async def shutdown(self):
79 for messageerror_monitor, cancellable in self._active_exchanges.values():
80 # Not calling messageerror_monitor: This is not message specific,
81 # and its shutdown will take care of these things
82 cancellable.cancel()
83 self._active_exchanges = None
85 await self.message_interface.shutdown()
87 #
88 # implementing the MessageManager interface
89 #
91 def dispatch_message(self, message):
92 """Feed a message through the message-id, message-type and message-code
93 sublayers of CoAP"""
95 self.log.debug("Incoming message %r", message)
96 if message.code.is_request():
97 # Responses don't get deduplication because they "are idempotent or
98 # can be handled in an idempotent fashion" (RFC 7252 Section 4.5).
99 # This means that a separate response may get a RST when it is
100 # arrives at the aiocoap client twice. Note that this does not
101 # impede the operation of observations: Their token is still active
102 # so they are ACK'd, and deduplication based on observation numbers
103 # filters out the rest.
104 #
105 # This saves memory, and allows stateful transports to be shut down
106 # expeditiously unless kept alive by something else (otherwise,
107 # they'd linger for EXCHANGE_LIFETIME with no good reason).
108 if self._deduplicate_message(message) is True:
109 return
111 if message.mtype in (ACK, RST):
112 self._remove_exchange(message)
114 if message.code is EMPTY and message.mtype is CON:
115 self._process_ping(message)
116 elif message.code is EMPTY and message.mtype in (ACK, RST):
117 pass # empty ack has already been handled above
118 elif message.code.is_request() and message.mtype in (CON, NON):
119 # the request handler will have to deal with sending ACK itself, as
120 # it might be timeout-related
121 self._process_request(message)
122 elif message.code.is_response() and message.mtype in (CON, NON, ACK):
123 success = self._process_response(message)
124 if success:
125 if message.mtype is CON:
126 self._send_empty_ack(
127 message.remote,
128 message.mid,
129 reason="acknowledging incoming response",
130 )
131 else:
132 # A peer mustn't send a CON to multicast, but if a malicious
133 # peer does, we better not answer
134 if message.mtype == CON and not message.remote.is_multicast_locally:
135 self.log.info("Response not recognized - sending RST.")
136 rst = Message(_mtype=RST, _mid=message.mid, code=EMPTY, payload="")
137 rst.remote = message.remote.as_response_address()
138 self._send_initially(rst)
139 else:
140 self.log.info(
141 "Ignoring unknown response (which is not a unicast CON)"
142 )
143 else:
144 self.log.warning(
145 "Received a message with code %s and type %s (those don't fit) from %s, ignoring it.",
146 message.code,
147 message.mtype,
148 message.remote,
149 )
151 def dispatch_error(self, error, remote):
152 if self._active_exchanges is None:
153 # Not entirely sure where it is so far; better just raise a warning
154 # than an exception later, nothing terminally bad should come of
155 # this error.
156 self.log.warning(
157 "Internal shutdown sequence mismatch: error dispatched through messagemanager after shutown"
158 )
159 return
161 self.log.debug("Incoming error %s from %r", error, remote)
163 # cancel requests first, and then exchanges: cancelling the pending
164 # exchange would trigger enqueued requests to be transmitted
165 self.token_manager.dispatch_error(error, remote)
167 keys_for_removal = []
168 for key, (
169 messageerror_monitor,
170 cancellable_timeout,
171 ) in self._active_exchanges.items():
172 (exchange_remote, message_id) = key
173 if remote == exchange_remote:
174 keys_for_removal.append(key)
175 for k in keys_for_removal:
176 (messageerror_monitor, cancellable_timeout) = self._active_exchanges.pop(k)
177 cancellable_timeout.cancel()
178 # not triggering the messageerror_monitor: that already got the
179 # clue from the token manager
180 self._backlogs.pop(remote, ())
181 # while that's an iterable over messages and messageerror monitors, not
182 # triggering them either for th esame reason as above
184 #
185 # coap dispatch, message-id sublayer: duplicate handling
186 #
188 def _deduplicate_message(self, message):
189 """Return True if a message is a duplicate, and re-send the stored
190 response if available.
192 Duplicate is a message with the same Message ID (mid) and sender
193 (remote), as message received within last EXCHANGE_LIFETIME seconds
194 (usually 247 seconds)."""
196 key = (message.remote, message.mid)
197 if key in self._recent_messages:
198 if message.mtype is CON:
199 if self._recent_messages[key] is not None:
200 self.log.info("Duplicate CON received, sending old response again")
201 # not going via send_message because that would strip the
202 # mid and might do all other sorts of checks
203 self._send_initially(self._recent_messages[key])
204 else:
205 self.log.info("Duplicate CON received, no response to send yet")
206 else:
207 self.log.info("Duplicate NON, ACK or RST received")
208 return True
209 else:
210 self.log.debug("New unique message received")
211 self.loop.call_later(
212 message.transport_tuning.EXCHANGE_LIFETIME,
213 functools.partial(self._recent_messages.pop, key),
214 )
215 self._recent_messages[key] = None
216 return False
218 def _store_response_for_duplicates(self, message):
219 """If the message is the response can be used to satisfy a future
220 duplicate message, store it."""
222 key = (message.remote, message.mid)
223 if key in self._recent_messages:
224 self._recent_messages[key] = message
226 #
227 # coap dispatch, message-type sublayer: retransmission handling
228 #
230 def _add_exchange(self, message, messageerror_monitor):
231 """Add an "exchange" for outgoing CON message.
233 CON (Confirmable) messages are automatically retransmitted by protocol
234 until ACK or RST message with the same Message ID is received from
235 target host."""
237 key = (message.remote, message.mid)
239 if message.remote not in self._backlogs:
240 self._backlogs[message.remote] = []
242 timeout = random.uniform(
243 message.transport_tuning.ACK_TIMEOUT,
244 message.transport_tuning.ACK_TIMEOUT
245 * message.transport_tuning.ACK_RANDOM_FACTOR,
246 )
248 next_retransmission = self._schedule_retransmit(message, timeout, 0)
249 self._active_exchanges[key] = (messageerror_monitor, next_retransmission)
251 self.log.debug("Exchange added, message ID: %d.", message.mid)
253 def _remove_exchange(self, message):
254 """Remove exchange from active exchanges and cancel the timeout to next
255 retransmission."""
256 key = (message.remote, message.mid)
258 if key not in self._active_exchanges:
259 # Before turning this up to a warning, consider https://github.com/chrysn/aiocoap/issues/288
260 self.log.info(
261 "Received %s from %s, but could not match it to a running exchange.",
262 message.mtype,
263 message.remote,
264 )
265 return
267 messageerror_monitor, next_retransmission = self._active_exchanges.pop(key)
268 next_retransmission.cancel()
269 if message.mtype is RST:
270 messageerror_monitor()
271 self.log.debug("Exchange removed, message ID: %d.", message.mid)
273 self._continue_backlog(message.remote)
275 def _continue_backlog(self, remote):
276 """After an exchange has been removed, start working off the backlog or
277 clear it completely."""
279 if remote not in self._backlogs:
280 # if active exchanges were something we could do a
281 # .register_finally() on, we could chain them like that; if we
282 # implemented anything but NSTART=1, we'll need a more elaborate
283 # system anyway
284 raise AssertionError(
285 "backlogs/active_exchange relation violated (implementation error)"
286 )
288 # first iteration is sure to happen, others happen only if the enqueued
289 # messages were NONs
290 while not any(r == remote for r, mid in self._active_exchanges.keys()):
291 if self._backlogs[remote] != []:
292 next_message, messageerror_monitor = self._backlogs[remote].pop(0)
293 self._send_initially(next_message, messageerror_monitor)
294 else:
295 del self._backlogs[remote]
296 break
298 def _schedule_retransmit(self, message, timeout, retransmission_counter):
299 """Create and return a call_later for first or subsequent
300 retransmissions."""
302 # while this could just as well be done in a lambda or with the
303 # arguments passed to call_later, in this form makes the test cases
304 # easier to debug (it's about finding where references to a Context
305 # are kept around; contexts should be able to shut down in an orderly
306 # way without littering references in the loop)
308 def retr(
309 self=self,
310 message=message,
311 timeout=timeout,
312 retransmission_counter=retransmission_counter,
313 doc="If you read this, have a look at _schedule_retransmit",
314 id=object(),
315 ):
316 self._retransmit(message, timeout, retransmission_counter)
318 return self.loop.call_later(timeout, retr)
320 def _retransmit(self, message, timeout, retransmission_counter):
321 """Retransmit CON message that has not been ACKed or RSTed."""
322 key = (message.remote, message.mid)
324 messageerror_monitor, next_retransmission = self._active_exchanges.pop(key)
325 # this should be a no-op, but let's be sure
326 next_retransmission.cancel()
328 if retransmission_counter < message.transport_tuning.MAX_RETRANSMIT:
329 self.log.info("Retransmission, Message ID: %d.", message.mid)
330 self._send_via_transport(message)
331 retransmission_counter += 1
332 timeout *= 2
334 next_retransmission = self._schedule_retransmit(
335 message, timeout, retransmission_counter
336 )
337 self._active_exchanges[key] = (messageerror_monitor, next_retransmission)
338 else:
339 self.log.info("Exchange timed out trying to transmit %s", message)
340 del self._backlogs[message.remote]
341 self.token_manager.dispatch_error(
342 error.ConRetransmitsExceeded("Retransmissions exceeded"), message.remote
343 )
345 #
346 # coap dispatch, message-code sublayer: triggering custom actions based on incoming messages
347 #
349 def _process_ping(self, message):
350 self.log.info("Received CoAP Ping from %s, replying with RST.", message.remote)
351 rst = Message(_mtype=RST, _mid=message.mid, code=EMPTY, payload=b"")
352 rst.remote = message.remote.as_response_address()
353 # not going via send_message because that would strip the mid, and we
354 # already know that it can go straight to the wire
355 self._send_initially(rst)
357 def _process_request(self, request):
358 """Spawn a responder for an incoming request, or feed a long-running
359 responder if one exists."""
361 if request.mtype == CON:
363 def on_timeout(self, remote, token):
364 mid, own_timeout = self._piggyback_opportunities.pop((remote, token))
365 self._send_empty_ack(
366 request.remote, mid, "Response took too long to prepare"
367 )
369 handle = self.loop.call_later(
370 request.transport_tuning.EMPTY_ACK_DELAY,
371 on_timeout,
372 self,
373 request.remote,
374 request.token,
375 )
376 key = (request.remote, request.token)
377 if key in self._piggyback_opportunities:
378 self.log.warning(
379 "New request came in while old request not"
380 " ACKed yet. Possible mismatch between EMPTY_ACK_DELAY"
381 " and EXCHANGE_LIFETIME. Cancelling ACK to ward off any"
382 " further confusion."
383 )
384 mid, old_handle = self._piggyback_opportunities.pop(key)
385 old_handle.cancel()
386 self._piggyback_opportunities[key] = (request.mid, handle)
388 self.token_manager.process_request(request)
390 def _process_response(self, response):
391 """Feed a response back to whatever might expect it.
393 Returns True if the response was expected (and should be ACK'd
394 depending on mtype), and False if it was not expected (and should be
395 RST'd)."""
397 self.log.debug("Received Response: %r", response)
399 return self.token_manager.process_response(response)
401 #
402 # outgoing messages
403 #
405 async def recognize_remote(self, message):
406 return await self.message_interface.recognize_remote(message.remote)
408 async def determine_remote(self, message):
409 return await self.message_interface.determine_remote(message)
411 def send_message(self, message, messageerror_monitor):
412 """Encode and send message. This takes care of retransmissions (if
413 CON), message IDs and rate limiting, but does not hook any events to
414 responses. (Use the :class:`Request` class or responding resources
415 instead; those are the typical callers of this function.)
417 If notification about the progress of the exchange is required, an
418 ExchangeMonitor can be passed in, which will receive the appropriate
419 callbacks."""
421 if message.mid is not None:
422 # if you can give any reason why the application should provide a
423 # fixed mid, lower the log level on demand and provide the reason
424 # in a comment.
425 self.log.warning(
426 "Message ID set on to-be-sent message, this is"
427 " probably unintended; clearing it."
428 )
429 message.mid = None
431 if message.code.is_response():
432 no_response = (message.opt.no_response or 0) & (
433 1 << message.code.class_ - 1
434 ) != 0
436 piggyback_key = (message.remote, message.token)
437 if piggyback_key in self._piggyback_opportunities:
438 mid, handle = self._piggyback_opportunities.pop(piggyback_key)
439 handle.cancel()
441 if no_response:
442 new_message = Message(code=EMPTY, mid=mid, mtype=ACK)
443 new_message.remote = message.remote.as_response_address()
444 message = new_message
445 self.log.debug(
446 "Turning to-be-sent message into an empty ACK due to no_response option."
447 )
448 else:
449 message.mtype = ACK
450 message.mid = mid
451 else:
452 if no_response:
453 self.log.debug(
454 "Stopping message in message manager as it is no_response and no ACK is pending."
455 )
456 return
458 message.opt.no_response = None
460 if message.mtype is None:
461 if self._active_exchanges is None:
462 # during shutdown, this is all we can do
463 message.mtype = NON
464 else:
465 if message.remote.is_multicast:
466 message.mtype = NON
467 else:
468 # All forcing factors are now accounted for -- either value
469 # is now OK (and if the transport_tuning doesn't tell us,
470 # we'll use sensible defaults)
472 match message.transport_tuning.reliability:
473 case True:
474 message.mtype = CON
475 case False:
476 message.mtype = NON
477 case None:
478 if (
479 message.request is not None
480 and message.request.mtype is NON
481 ):
482 message.mtype = NON
483 else:
484 message.mtype = CON
485 else:
486 if self._active_exchanges is None:
487 self.log.warning(
488 "Forcing message to be sent as NON even though specified because transport is shutting down"
489 )
490 message.mtype = NON
492 if message.mtype == CON and message.remote.is_multicast:
493 raise error.ConToMulticast
495 if message.mid is None:
496 message.mid = self._next_message_id()
498 if message.mtype == CON and message.remote in self._backlogs:
499 assert any(
500 remote == message.remote for (remote, _) in self._active_exchanges
501 )
502 self.log.debug("Message to %s put into backlog", message.remote)
503 self._backlogs[message.remote].append((message, messageerror_monitor))
504 else:
505 self._send_initially(message, messageerror_monitor)
507 def _send_initially(self, message, messageerror_monitor=None):
508 """Put the message on the wire for the first time, starting retransmission timeouts"""
510 self.log.debug("Sending message %r", message)
512 if message.mtype is CON:
513 assert messageerror_monitor is not None, (
514 "messageerror_monitor needs to be set for CONs"
515 )
516 self._add_exchange(message, messageerror_monitor)
518 self._store_response_for_duplicates(message)
520 self._send_via_transport(message)
522 def _send_via_transport(self, message):
523 """Put the message on the wire"""
525 self.message_interface.send(message)
527 def _next_message_id(self):
528 """Reserve and return a new message ID."""
529 message_id = self.message_id
530 self.message_id = 0xFFFF & (1 + self.message_id)
531 return message_id
533 def _send_empty_ack(self, remote, mid, reason):
534 """Send separate empty ACK for any reason.
536 Currently, this can happen only once per responder, that is, when the
537 last block1 has been transferred and the first block2 is not ready
538 yet."""
540 self.log.debug("Sending empty ACK: %s", reason)
541 ack = Message(
542 _mtype=ACK,
543 code=EMPTY,
544 payload=b"",
545 )
546 ack.remote = remote.as_response_address()
547 ack.mid = mid
548 # not going via send_message because that would strip the mid, and we
549 # already know that it can go straight to the wire
550 self._send_initially(ack)