Coverage for aiocoap / messagemanager.py: 83%
230 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module contains all internals needed to manage messages on unreliable
6transports, ie. everything that deals in message types or Message IDs.
8Currently, it also provides the mechanisms for managing tokens, but those will
9be split into dedicated classes.
10"""
12import asyncio
13import functools
14import random
15from typing import Callable, Dict, List, Tuple, Optional
17from . import error
18from . import interfaces
19from .interfaces import EndpointAddress
20from .message import Message
21from .numbers.types import CON, ACK, RST, NON
22from .numbers.codes import EMPTY
25class MessageManager(interfaces.TokenInterface, interfaces.MessageManager):
26 """This MessageManager Drives a message interface following the rules of
27 RFC7252 CoAP over UDP.
29 It takes care of picking message IDs (mid) for outgoing messages,
30 retransmitting CON messages, and to react appropriately to incoming
31 messages' type, sending ACKs either immediately or later.
33 It creates piggy-backed responses by keeping an eye on the tokens the
34 messages are sent with, but otherwise ignores the tokens. (It inspects
35 tokens *only* where required by its sub-layer).
36 """
38 message_interface: interfaces.MessageInterface
39 # needs to be set post-construction, because the message_interface in its constructor already needs to get its manager
41 def __init__(self, token_manager) -> None:
42 self.token_manager = token_manager
44 self.message_id = random.randint(0, 65535)
45 #: Tracker of recently received messages (by remote and message ID).
46 #: Maps them to a response message when one is already known.
47 self._recent_messages: Dict[Tuple[EndpointAddress, int], Optional[Message]] = {}
48 #: Active exchanges i.e. sent CON messages (remote, message-id):
49 #: (messageerror_monitor monitor, cancellable timeout)
50 self._active_exchanges: Dict[
51 Tuple[EndpointAddress, int], Tuple[Callable[[], None], asyncio.Handle]
52 ] = {}
53 #: Per-remote list of (backlogged package, messageerror_monitor)
54 #: tuples (keys exist iff there is an active_exchange with that node)
55 self._backlogs: Dict[
56 EndpointAddress, List[Tuple[Message, Callable[[], None]]]
57 ] = {}
59 #: Maps pending remote/token combinations to the MID a response can be
60 #: piggybacked on, and the timeout that should be cancelled if it is.
61 self._piggyback_opportunities: Dict[
62 Tuple[EndpointAddress, bytes], Tuple[int, asyncio.TimerHandle]
63 ] = {}
65 self.log = token_manager.log
66 self.loop = token_manager.loop
68 def __repr__(self):
69 return "<%s for %s>" % (
70 type(self).__name__,
71 getattr(self, "message_interface", "(unbound)"),
72 )
74 @property
75 def client_credentials(self):
76 return self.token_manager.client_credentials
78 async def shutdown(self):
79 for messageerror_monitor, cancellable in self._active_exchanges.values():
80 # Not calling messageerror_monitor: This is not message specific,
81 # and its shutdown will take care of these things
82 cancellable.cancel()
83 self._active_exchanges = None
85 await self.message_interface.shutdown()
87 #
88 # implementing the MessageManager interface
89 #
91 def dispatch_message(self, message):
92 """Feed a message through the message-id, message-type and message-code
93 sublayers of CoAP"""
95 self.log.debug("Incoming message %r", message)
96 if message.code.is_request():
97 # Responses don't get deduplication because they "are idempotent or
98 # can be handled in an idempotent fashion" (RFC 7252 Section 4.5).
99 # This means that a separate response may get a RST when it is
100 # arrives at the aiocoap client twice. Note that this does not
101 # impede the operation of observations: Their token is still active
102 # so they are ACK'd, and deduplication based on observation numbers
103 # filters out the rest.
104 #
105 # This saves memory, and allows stateful transports to be shut down
106 # expeditiously unless kept alive by something else (otherwise,
107 # they'd linger for EXCHANGE_LIFETIME with no good reason).
108 if self._deduplicate_message(message) is True:
109 return
111 if message.mtype in (ACK, RST):
112 self._remove_exchange(message)
114 if message.code is EMPTY and message.mtype is CON:
115 self._process_ping(message)
116 elif message.code is EMPTY and message.mtype in (ACK, RST):
117 pass # empty ack has already been handled above
118 elif message.code.is_request() and message.mtype in (CON, NON):
119 # the request handler will have to deal with sending ACK itself, as
120 # it might be timeout-related
121 self._process_request(message)
122 elif message.code.is_response() and message.mtype in (CON, NON, ACK):
123 success = self._process_response(message)
124 if success:
125 if message.mtype is CON:
126 self._send_empty_ack(
127 message.remote,
128 message.mid,
129 reason="acknowledging incoming response",
130 )
131 else:
132 # A peer mustn't send a CON to multicast, but if a malicious
133 # peer does, we better not answer
134 if message.mtype == CON and not message.remote.is_multicast_locally:
135 self.log.info("Response not recognized - sending RST.")
136 rst = Message(_mtype=RST, _mid=message.mid, code=EMPTY, payload="")
137 rst.remote = message.remote.as_response_address()
138 self._send_initially(rst)
139 else:
140 self.log.info(
141 "Ignoring unknown response (which is not a unicast CON)"
142 )
143 else:
144 self.log.warning(
145 "Received a message with code %s and type %s (those don't fit) from %s, ignoring it.",
146 message.code,
147 message.mtype,
148 message.remote,
149 )
151 def dispatch_error(self, error, remote):
152 if self._active_exchanges is None:
153 # Not entirely sure where it is so far; better just raise a warning
154 # than an exception later, nothing terminally bad should come of
155 # this error.
156 self.log.warning(
157 "Internal shutdown sequence mismatch: error dispatched through messagemanager after shutown"
158 )
159 return
161 self.log.debug("Incoming error %s from %r", error, remote)
163 # cancel requests first, and then exchanges: cancelling the pending
164 # exchange would trigger enqueued requests to be transmitted
165 self.token_manager.dispatch_error(error, remote)
167 keys_for_removal = []
168 for key, (
169 messageerror_monitor,
170 cancellable_timeout,
171 ) in self._active_exchanges.items():
172 (exchange_remote, message_id) = key
173 if remote == exchange_remote:
174 keys_for_removal.append(key)
175 for k in keys_for_removal:
176 (messageerror_monitor, cancellable_timeout) = self._active_exchanges.pop(k)
177 cancellable_timeout.cancel()
178 # not triggering the messageerror_monitor: that already got the
179 # clue from the token manager
180 self._backlogs.pop(remote, ())
181 # while that's an iterable over messages and messageerror monitors, not
182 # triggering them either for th esame reason as above
184 #
185 # coap dispatch, message-id sublayer: duplicate handling
186 #
188 def _deduplicate_message(self, message):
189 """Return True if a message is a duplicate, and re-send the stored
190 response if available.
192 Duplicate is a message with the same Message ID (mid) and sender
193 (remote), as message received within last EXCHANGE_LIFETIME seconds
194 (usually 247 seconds)."""
196 key = (message.remote, message.mid)
197 if key in self._recent_messages:
198 if message.mtype is CON:
199 if self._recent_messages[key] is not None:
200 self.log.info("Duplicate CON received, sending old response again")
201 # not going via send_message because that would strip the
202 # mid and might do all other sorts of checks
203 self._send_initially(self._recent_messages[key])
204 else:
205 self.log.info("Duplicate CON received, no response to send yet")
206 else:
207 self.log.info("Duplicate NON, ACK or RST received")
208 return True
209 else:
210 self.log.debug("New unique message received")
211 self.loop.call_later(
212 message.transport_tuning.EXCHANGE_LIFETIME,
213 functools.partial(self._recent_messages.pop, key),
214 )
215 self._recent_messages[key] = None
216 return False
218 def _store_response_for_duplicates(self, message):
219 """If the message is the response can be used to satisfy a future
220 duplicate message, store it."""
222 key = (message.remote, message.mid)
223 if key in self._recent_messages:
224 self._recent_messages[key] = message
226 #
227 # coap dispatch, message-type sublayer: retransmission handling
228 #
230 def _add_exchange(self, message, messageerror_monitor):
231 """Add an "exchange" for outgoing CON message.
233 CON (Confirmable) messages are automatically retransmitted by protocol
234 until ACK or RST message with the same Message ID is received from
235 target host."""
237 key = (message.remote, message.mid)
239 if message.remote not in self._backlogs:
240 self._backlogs[message.remote] = []
242 timeout = random.uniform(
243 message.transport_tuning.ACK_TIMEOUT,
244 message.transport_tuning.ACK_TIMEOUT
245 * message.transport_tuning.ACK_RANDOM_FACTOR,
246 )
248 next_retransmission = self._schedule_retransmit(message, timeout, 0)
249 self._active_exchanges[key] = (messageerror_monitor, next_retransmission)
251 self.log.debug("Exchange added, message ID: %d.", message.mid)
253 def _remove_exchange(self, message):
254 """Remove exchange from active exchanges and cancel the timeout to next
255 retransmission."""
256 key = (message.remote, message.mid)
258 if key not in self._active_exchanges:
259 # Before turning this up to a warning, consider https://github.com/chrysn/aiocoap/issues/288
260 self.log.info(
261 "Received %s from %s, but could not match it to a running exchange.",
262 message.mtype,
263 message.remote,
264 )
265 return
267 messageerror_monitor, next_retransmission = self._active_exchanges.pop(key)
268 next_retransmission.cancel()
269 if message.mtype is RST:
270 messageerror_monitor()
271 self.log.debug("Exchange removed, message ID: %d.", message.mid)
273 self._continue_backlog(message.remote)
275 def _continue_backlog(self, remote):
276 """After an exchange has been removed, start working off the backlog or
277 clear it completely."""
279 if remote not in self._backlogs:
280 # if active exchanges were something we could do a
281 # .register_finally() on, we could chain them like that; if we
282 # implemented anything but NSTART=1, we'll need a more elaborate
283 # system anyway
284 raise AssertionError(
285 "backlogs/active_exchange relation violated (implementation error)"
286 )
288 # first iteration is sure to happen, others happen only if the enqueued
289 # messages were NONs
290 while not any(r == remote for r, mid in self._active_exchanges.keys()):
291 if self._backlogs[remote] != []:
292 next_message, messageerror_monitor = self._backlogs[remote].pop(0)
293 self._send_initially(next_message, messageerror_monitor)
294 else:
295 del self._backlogs[remote]
296 break
298 def _schedule_retransmit(self, message, timeout, retransmission_counter):
299 """Create and return a call_later for first or subsequent
300 retransmissions."""
302 # while this could just as well be done in a lambda or with the
303 # arguments passed to call_later, in this form makes the test cases
304 # easier to debug (it's about finding where references to a Context
305 # are kept around; contexts should be able to shut down in an orderly
306 # way without littering references in the loop)
308 def retr(
309 self=self,
310 message=message,
311 timeout=timeout,
312 retransmission_counter=retransmission_counter,
313 doc="If you read this, have a look at _schedule_retransmit",
314 id=object(),
315 ):
316 self._retransmit(message, timeout, retransmission_counter)
318 return self.loop.call_later(timeout, retr)
320 def _retransmit(self, message, timeout, retransmission_counter):
321 """Retransmit CON message that has not been ACKed or RSTed."""
322 key = (message.remote, message.mid)
324 messageerror_monitor, next_retransmission = self._active_exchanges.pop(key)
325 # this should be a no-op, but let's be sure
326 next_retransmission.cancel()
328 if retransmission_counter < message.transport_tuning.MAX_RETRANSMIT:
329 self.log.info("Retransmission, Message ID: %d.", message.mid)
330 self._send_via_transport(message)
331 retransmission_counter += 1
332 timeout *= 2
334 next_retransmission = self._schedule_retransmit(
335 message, timeout, retransmission_counter
336 )
337 self._active_exchanges[key] = (messageerror_monitor, next_retransmission)
338 else:
339 self.log.info("Exchange timed out trying to transmit %s", message)
340 del self._backlogs[message.remote]
341 self.token_manager.dispatch_error(
342 error.ConRetransmitsExceeded("Retransmissions exceeded"), message.remote
343 )
345 #
346 # coap dispatch, message-code sublayer: triggering custom actions based on incoming messages
347 #
349 def _process_ping(self, message):
350 self.log.info("Received CoAP Ping from %s, replying with RST.", message.remote)
351 rst = Message(_mtype=RST, _mid=message.mid, code=EMPTY, payload=b"")
352 rst.remote = message.remote.as_response_address()
353 # not going via send_message because that would strip the mid, and we
354 # already know that it can go straight to the wire
355 self._send_initially(rst)
357 def _process_request(self, request):
358 """Spawn a responder for an incoming request, or feed a long-running
359 responder if one exists."""
361 if request.mtype == CON:
363 def on_timeout(self, remote, token):
364 mid, own_timeout = self._piggyback_opportunities.pop((remote, token))
365 self._send_empty_ack(
366 request.remote, mid, "Response took too long to prepare"
367 )
369 handle = self.loop.call_later(
370 request.transport_tuning.EMPTY_ACK_DELAY,
371 on_timeout,
372 self,
373 request.remote,
374 request.token,
375 )
376 key = (request.remote, request.token)
377 if key in self._piggyback_opportunities:
378 self.log.warning(
379 "New request came in while old request not"
380 " ACKed yet. Possible mismatch between EMPTY_ACK_DELAY"
381 " and EXCHANGE_LIFETIME. Cancelling ACK to ward off any"
382 " further confusion."
383 )
384 mid, old_handle = self._piggyback_opportunities.pop(key)
385 old_handle.cancel()
386 self._piggyback_opportunities[key] = (request.mid, handle)
388 self.token_manager.process_request(request)
390 def _process_response(self, response):
391 """Feed a response back to whatever might expect it.
393 Returns True if the response was expected (and should be ACK'd
394 depending on mtype), and False if it was not expected (and should be
395 RST'd)."""
397 self.log.debug("Received Response: %r", response)
399 return self.token_manager.process_response(response)
401 #
402 # outgoing messages
403 #
405 async def fill_or_recognize_remote(self, message):
406 if message.remote is not None:
407 if await self.message_interface.recognize_remote(message.remote):
408 return True
409 remote = await self.message_interface.determine_remote(message)
410 if remote is not None:
411 message.remote = remote
412 return True
413 return False
415 def send_message(self, message, messageerror_monitor):
416 """Encode and send message. This takes care of retransmissions (if
417 CON), message IDs and rate limiting, but does not hook any events to
418 responses. (Use the :class:`Request` class or responding resources
419 instead; those are the typical callers of this function.)
421 If notification about the progress of the exchange is required, an
422 ExchangeMonitor can be passed in, which will receive the appropriate
423 callbacks."""
425 if message.mid is not None:
426 # if you can give any reason why the application should provide a
427 # fixed mid, lower the log level on demand and provide the reason
428 # in a comment.
429 self.log.warning(
430 "Message ID set on to-be-sent message, this is"
431 " probably unintended; clearing it."
432 )
433 message.mid = None
435 if message.code.is_response():
436 no_response = (message.opt.no_response or 0) & (
437 1 << message.code.class_ - 1
438 ) != 0
440 piggyback_key = (message.remote, message.token)
441 if piggyback_key in self._piggyback_opportunities:
442 mid, handle = self._piggyback_opportunities.pop(piggyback_key)
443 handle.cancel()
445 if no_response:
446 new_message = Message(code=EMPTY, mid=mid, mtype=ACK)
447 new_message.remote = message.remote.as_response_address()
448 message = new_message
449 self.log.debug(
450 "Turning to-be-sent message into an empty ACK due to no_response option."
451 )
452 else:
453 message.mtype = ACK
454 message.mid = mid
455 else:
456 if no_response:
457 self.log.debug(
458 "Stopping message in message manager as it is no_response and no ACK is pending."
459 )
460 return
462 message.opt.no_response = None
464 if message.mtype is None:
465 if self._active_exchanges is None:
466 # during shutdown, this is all we can do
467 message.mtype = NON
468 else:
469 if message.remote.is_multicast:
470 message.mtype = NON
471 else:
472 # All forcing factors are now accounted for -- either value
473 # is now OK (and if the transport_tuning doesn't tell us,
474 # we'll use sensible defaults)
476 match message.transport_tuning.reliability:
477 case True:
478 message.mtype = CON
479 case False:
480 message.mtype = NON
481 case None:
482 if (
483 message.request is not None
484 and message.request.mtype is NON
485 ):
486 message.mtype = NON
487 else:
488 message.mtype = CON
489 else:
490 if self._active_exchanges is None:
491 self.log.warning(
492 "Forcing message to be sent as NON even though specified because transport is shutting down"
493 )
494 message.mtype = NON
496 if message.mtype == CON and message.remote.is_multicast:
497 raise error.ConToMulticast
499 if message.mid is None:
500 message.mid = self._next_message_id()
502 if message.mtype == CON and message.remote in self._backlogs:
503 assert any(
504 remote == message.remote for (remote, _) in self._active_exchanges
505 )
506 self.log.debug("Message to %s put into backlog", message.remote)
507 self._backlogs[message.remote].append((message, messageerror_monitor))
508 else:
509 self._send_initially(message, messageerror_monitor)
511 def _send_initially(self, message, messageerror_monitor=None):
512 """Put the message on the wire for the first time, starting retransmission timeouts"""
514 self.log.debug("Sending message %r", message)
516 if message.mtype is CON:
517 assert messageerror_monitor is not None, (
518 "messageerror_monitor needs to be set for CONs"
519 )
520 self._add_exchange(message, messageerror_monitor)
522 self._store_response_for_duplicates(message)
524 self._send_via_transport(message)
526 def _send_via_transport(self, message):
527 """Put the message on the wire"""
529 self.message_interface.send(message)
531 def _next_message_id(self):
532 """Reserve and return a new message ID."""
533 message_id = self.message_id
534 self.message_id = 0xFFFF & (1 + self.message_id)
535 return message_id
537 def _send_empty_ack(self, remote, mid, reason):
538 """Send separate empty ACK for any reason.
540 Currently, this can happen only once per responder, that is, when the
541 last block1 has been transferred and the first block2 is not ready
542 yet."""
544 self.log.debug("Sending empty ACK: %s", reason)
545 ack = Message(
546 _mtype=ACK,
547 code=EMPTY,
548 payload=b"",
549 )
550 ack.remote = remote.as_response_address()
551 ack.mid = mid
552 # not going via send_message because that would strip the mid, and we
553 # already know that it can go straight to the wire
554 self._send_initially(ack)