Coverage for src/aiocoap/messagemanager.py: 0%
222 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 12:14 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 12:14 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module contains all internals needed to manage messages on unreliable
6transports, ie. everything that deals in message types or Message IDs.
8Currently, it also provides the mechanisms for managing tokens, but those will
9be split into dedicated classes.
10"""
12import asyncio
13import functools
14import random
15from typing import Callable, Dict, List, Tuple, Optional
17from . import error
18from . import interfaces
19from .interfaces import EndpointAddress
20from .message import Message
21from .numbers.types import CON, ACK, RST, NON
22from .numbers.codes import EMPTY
25class MessageManager(interfaces.TokenInterface, interfaces.MessageManager):
26 """This MessageManager Drives a message interface following the rules of
27 RFC7252 CoAP over UDP.
29 It takes care of picking message IDs (mid) for outgoing messages,
30 retransmitting CON messages, and to react appropriately to incoming
31 messages' type, sending ACKs either immediately or later.
33 It creates piggy-backed responses by keeping an eye on the tokens the
34 messages are sent with, but otherwise ignores the tokens. (It inspects
35 tokens *only* where required by its sub-layer).
36 """
38 def __init__(self, token_manager) -> None:
39 self.token_manager = token_manager
41 self.message_id = random.randint(0, 65535)
42 #: Tracker of recently received messages (by remote and message ID).
43 #: Maps them to a response message when one is already known.
44 self._recent_messages: Dict[Tuple[EndpointAddress, int], Optional[Message]] = {}
45 #: Active exchanges i.e. sent CON messages (remote, message-id):
46 #: (messageerror_monitor monitor, cancellable timeout)
47 self._active_exchanges: Dict[
48 Tuple[EndpointAddress, int], Tuple[Callable[[], None], asyncio.Handle]
49 ] = {}
50 #: Per-remote list of (backlogged package, messageerror_monitor)
51 #: tuples (keys exist iff there is an active_exchange with that node)
52 self._backlogs: Dict[
53 EndpointAddress, List[Tuple[Message, Callable[[], None]]]
54 ] = {}
56 #: Maps pending remote/token combinations to the MID a response can be
57 #: piggybacked on, and the timeout that should be cancelled if it is.
58 self._piggyback_opportunities: Dict[
59 Tuple[EndpointAddress, bytes], Tuple[int, asyncio.TimerHandle]
60 ] = {}
62 self.log = token_manager.log
63 self.loop = token_manager.loop
65 # self.message_interface = … -- needs to be set post-construction, because the message_interface in its constructor already needs to get its manager
67 def __repr__(self):
68 return "<%s for %s>" % (
69 type(self).__name__,
70 getattr(self, "message_interface", "(unbound)"),
71 )
73 @property
74 def client_credentials(self):
75 return self.token_manager.client_credentials
77 async def shutdown(self):
78 for messageerror_monitor, cancellable in self._active_exchanges.values():
79 # Not calling messageerror_monitor: This is not message specific,
80 # and its shutdown will take care of these things
81 cancellable.cancel()
82 self._active_exchanges = None
84 await self.message_interface.shutdown()
86 #
87 # implementing the MessageManager interface
88 #
90 def dispatch_message(self, message):
91 """Feed a message through the message-id, message-type and message-code
92 sublayers of CoAP"""
94 self.log.debug("Incoming message %r", message)
95 if message.code.is_request():
96 # Responses don't get deduplication because they "are idempotent or
97 # can be handled in an idempotent fashion" (RFC 7252 Section 4.5).
98 # This means that a separate response may get a RST when it is
99 # arrives at the aiocoap client twice. Note that this does not
100 # impede the operation of observations: Their token is still active
101 # so they are ACK'd, and deduplication based on observation numbers
102 # filters out the rest.
103 #
104 # This saves memory, and allows stateful transports to be shut down
105 # expeditiously unless kept alive by something else (otherwise,
106 # they'd linger for EXCHANGE_LIFETIME with no good reason).
107 if self._deduplicate_message(message) is True:
108 return
110 if message.mtype in (ACK, RST):
111 self._remove_exchange(message)
113 if message.code is EMPTY and message.mtype is CON:
114 self._process_ping(message)
115 elif message.code is EMPTY and message.mtype in (ACK, RST):
116 pass # empty ack has already been handled above
117 elif message.code.is_request() and message.mtype in (CON, NON):
118 # the request handler will have to deal with sending ACK itself, as
119 # it might be timeout-related
120 self._process_request(message)
121 elif message.code.is_response() and message.mtype in (CON, NON, ACK):
122 success = self._process_response(message)
123 if success:
124 if message.mtype is CON:
125 self._send_empty_ack(
126 message.remote,
127 message.mid,
128 reason="acknowledging incoming response",
129 )
130 else:
131 # A peer mustn't send a CON to multicast, but if a malicious
132 # peer does, we better not answer
133 if message.mtype == CON and not message.remote.is_multicast_locally:
134 self.log.info("Response not recognized - sending RST.")
135 rst = Message(mtype=RST, mid=message.mid, code=EMPTY, payload="")
136 rst.remote = message.remote.as_response_address()
137 self._send_initially(rst)
138 else:
139 self.log.info(
140 "Ignoring unknown response (which is not a unicast CON)"
141 )
142 else:
143 self.log.warning(
144 "Received a message with code %s and type %s (those don't fit) from %s, ignoring it.",
145 message.code,
146 message.mtype,
147 message.remote,
148 )
150 def dispatch_error(self, error, remote):
151 if self._active_exchanges is None:
152 # Not entirely sure where it is so far; better just raise a warning
153 # than an exception later, nothing terminally bad should come of
154 # this error.
155 self.log.warning(
156 "Internal shutdown sequence mismatch: error dispatched through messagemanager after shutown"
157 )
158 return
160 self.log.debug("Incoming error %s from %r", error, remote)
162 # cancel requests first, and then exchanges: cancelling the pending
163 # exchange would trigger enqueued requests to be transmitted
164 self.token_manager.dispatch_error(error, remote)
166 keys_for_removal = []
167 for key, (
168 messageerror_monitor,
169 cancellable_timeout,
170 ) in self._active_exchanges.items():
171 (exchange_remote, message_id) = key
172 if remote == exchange_remote:
173 keys_for_removal.append(key)
174 for k in keys_for_removal:
175 (messageerror_monitor, cancellable_timeout) = self._active_exchanges.pop(k)
176 cancellable_timeout.cancel()
177 # not triggering the messageerror_monitor: that already got the
178 # clue from the token manager
179 self._backlogs.pop(remote, ())
180 # while that's an iterable over messages and messageerror monitors, not
181 # triggering them either for th esame reason as above
183 #
184 # coap dispatch, message-id sublayer: duplicate handling
185 #
187 def _deduplicate_message(self, message):
188 """Return True if a message is a duplicate, and re-send the stored
189 response if available.
191 Duplicate is a message with the same Message ID (mid) and sender
192 (remote), as message received within last EXCHANGE_LIFETIME seconds
193 (usually 247 seconds)."""
195 key = (message.remote, message.mid)
196 if key in self._recent_messages:
197 if message.mtype is CON:
198 if self._recent_messages[key] is not None:
199 self.log.info("Duplicate CON received, sending old response again")
200 # not going via send_message because that would strip the
201 # mid and might do all other sorts of checks
202 self._send_initially(self._recent_messages[key])
203 else:
204 self.log.info("Duplicate CON received, no response to send yet")
205 else:
206 self.log.info("Duplicate NON, ACK or RST received")
207 return True
208 else:
209 self.log.debug("New unique message received")
210 self.loop.call_later(
211 message.transport_tuning.EXCHANGE_LIFETIME,
212 functools.partial(self._recent_messages.pop, key),
213 )
214 self._recent_messages[key] = None
215 return False
217 def _store_response_for_duplicates(self, message):
218 """If the message is the response can be used to satisfy a future
219 duplicate message, store it."""
221 key = (message.remote, message.mid)
222 if key in self._recent_messages:
223 self._recent_messages[key] = message
225 #
226 # coap dispatch, message-type sublayer: retransmission handling
227 #
229 def _add_exchange(self, message, messageerror_monitor):
230 """Add an "exchange" for outgoing CON message.
232 CON (Confirmable) messages are automatically retransmitted by protocol
233 until ACK or RST message with the same Message ID is received from
234 target host."""
236 key = (message.remote, message.mid)
238 if message.remote not in self._backlogs:
239 self._backlogs[message.remote] = []
241 timeout = random.uniform(
242 message.transport_tuning.ACK_TIMEOUT,
243 message.transport_tuning.ACK_TIMEOUT
244 * message.transport_tuning.ACK_RANDOM_FACTOR,
245 )
247 next_retransmission = self._schedule_retransmit(message, timeout, 0)
248 self._active_exchanges[key] = (messageerror_monitor, next_retransmission)
250 self.log.debug("Exchange added, message ID: %d.", message.mid)
252 def _remove_exchange(self, message):
253 """Remove exchange from active exchanges and cancel the timeout to next
254 retransmission."""
255 key = (message.remote, message.mid)
257 if key not in self._active_exchanges:
258 # Before turning this up to a warning, consider https://github.com/chrysn/aiocoap/issues/288
259 self.log.info(
260 "Received %s from %s, but could not match it to a running exchange.",
261 message.mtype,
262 message.remote,
263 )
264 return
266 messageerror_monitor, next_retransmission = self._active_exchanges.pop(key)
267 next_retransmission.cancel()
268 if message.mtype is RST:
269 messageerror_monitor()
270 self.log.debug("Exchange removed, message ID: %d.", message.mid)
272 self._continue_backlog(message.remote)
274 def _continue_backlog(self, remote):
275 """After an exchange has been removed, start working off the backlog or
276 clear it completely."""
278 if remote not in self._backlogs:
279 # if active exchanges were something we could do a
280 # .register_finally() on, we could chain them like that; if we
281 # implemented anything but NSTART=1, we'll need a more elaborate
282 # system anyway
283 raise AssertionError(
284 "backlogs/active_exchange relation violated (implementation error)"
285 )
287 # first iteration is sure to happen, others happen only if the enqueued
288 # messages were NONs
289 while not any(r == remote for r, mid in self._active_exchanges.keys()):
290 if self._backlogs[remote] != []:
291 next_message, messageerror_monitor = self._backlogs[remote].pop(0)
292 self._send_initially(next_message, messageerror_monitor)
293 else:
294 del self._backlogs[remote]
295 break
297 def _schedule_retransmit(self, message, timeout, retransmission_counter):
298 """Create and return a call_later for first or subsequent
299 retransmissions."""
301 # while this could just as well be done in a lambda or with the
302 # arguments passed to call_later, in this form makes the test cases
303 # easier to debug (it's about finding where references to a Context
304 # are kept around; contexts should be able to shut down in an orderly
305 # way without littering references in the loop)
307 def retr(
308 self=self,
309 message=message,
310 timeout=timeout,
311 retransmission_counter=retransmission_counter,
312 doc="If you read this, have a look at _schedule_retransmit",
313 id=object(),
314 ):
315 self._retransmit(message, timeout, retransmission_counter)
317 return self.loop.call_later(timeout, retr)
319 def _retransmit(self, message, timeout, retransmission_counter):
320 """Retransmit CON message that has not been ACKed or RSTed."""
321 key = (message.remote, message.mid)
323 messageerror_monitor, next_retransmission = self._active_exchanges.pop(key)
324 # this should be a no-op, but let's be sure
325 next_retransmission.cancel()
327 if retransmission_counter < message.transport_tuning.MAX_RETRANSMIT:
328 self.log.info("Retransmission, Message ID: %d.", message.mid)
329 self._send_via_transport(message)
330 retransmission_counter += 1
331 timeout *= 2
333 next_retransmission = self._schedule_retransmit(
334 message, timeout, retransmission_counter
335 )
336 self._active_exchanges[key] = (messageerror_monitor, next_retransmission)
337 else:
338 self.log.info("Exchange timed out trying to transmit %s", message)
339 del self._backlogs[message.remote]
340 self.token_manager.dispatch_error(
341 error.ConRetransmitsExceeded("Retransmissions exceeded"), message.remote
342 )
344 #
345 # coap dispatch, message-code sublayer: triggering custom actions based on incoming messages
346 #
348 def _process_ping(self, message):
349 self.log.info("Received CoAP Ping from %s, replying with RST.", message.remote)
350 rst = Message(mtype=RST, mid=message.mid, code=EMPTY, payload=b"")
351 rst.remote = message.remote.as_response_address()
352 # not going via send_message because that would strip the mid, and we
353 # already know that it can go straight to the wire
354 self._send_initially(rst)
356 def _process_request(self, request):
357 """Spawn a responder for an incoming request, or feed a long-running
358 responder if one exists."""
360 if request.mtype == CON:
362 def on_timeout(self, remote, token):
363 mid, own_timeout = self._piggyback_opportunities.pop((remote, token))
364 self._send_empty_ack(
365 request.remote, mid, "Response took too long to prepare"
366 )
368 handle = self.loop.call_later(
369 request.transport_tuning.EMPTY_ACK_DELAY,
370 on_timeout,
371 self,
372 request.remote,
373 request.token,
374 )
375 key = (request.remote, request.token)
376 if key in self._piggyback_opportunities:
377 self.log.warning(
378 "New request came in while old request not"
379 " ACKed yet. Possible mismatch between EMPTY_ACK_DELAY"
380 " and EXCHANGE_LIFETIME. Cancelling ACK to ward off any"
381 " further confusion."
382 )
383 mid, old_handle = self._piggyback_opportunities.pop(key)
384 old_handle.cancel()
385 self._piggyback_opportunities[key] = (request.mid, handle)
387 self.token_manager.process_request(request)
389 def _process_response(self, response):
390 """Feed a response back to whatever might expect it.
392 Returns True if the response was expected (and should be ACK'd
393 depending on mtype), and False if it was not expected (and should be
394 RST'd)."""
396 self.log.debug("Received Response: %r", response)
398 return self.token_manager.process_response(response)
400 #
401 # outgoing messages
402 #
404 async def fill_or_recognize_remote(self, message):
405 if message.remote is not None:
406 if await self.message_interface.recognize_remote(message.remote):
407 return True
408 remote = await self.message_interface.determine_remote(message)
409 if remote is not None:
410 message.remote = remote
411 return True
412 return False
414 def send_message(self, message, messageerror_monitor):
415 """Encode and send message. This takes care of retransmissions (if
416 CON), message IDs and rate limiting, but does not hook any events to
417 responses. (Use the :class:`Request` class or responding resources
418 instead; those are the typical callers of this function.)
420 If notification about the progress of the exchange is required, an
421 ExchangeMonitor can be passed in, which will receive the appropriate
422 callbacks."""
424 if message.mid is not None:
425 # if you can give any reason why the application should provide a
426 # fixed mid, lower the log level on demand and provide the reason
427 # in a comment.
428 self.log.warning(
429 "Message ID set on to-be-sent message, this is"
430 " probably unintended; clearing it."
431 )
432 message.mid = None
434 if message.code.is_response():
435 no_response = (message.opt.no_response or 0) & (
436 1 << message.code.class_ - 1
437 ) != 0
439 piggyback_key = (message.remote, message.token)
440 if piggyback_key in self._piggyback_opportunities:
441 mid, handle = self._piggyback_opportunities.pop(piggyback_key)
442 handle.cancel()
444 if no_response:
445 new_message = Message(code=EMPTY, mid=mid, mtype=ACK)
446 new_message.remote = message.remote.as_response_address()
447 message = new_message
448 self.log.debug(
449 "Turning to-be-sent message into an empty ACK due to no_response option."
450 )
451 else:
452 message.mtype = ACK
453 message.mid = mid
454 else:
455 if no_response:
456 self.log.debug(
457 "Stopping message in message manager as it is no_response and no ACK is pending."
458 )
459 return
461 message.opt.no_response = None
463 if message.mtype is None:
464 if self._active_exchanges is None:
465 # during shutdown, this is all we can do
466 message.mtype = NON
467 else:
468 if message.remote.is_multicast:
469 message.mtype = NON
470 else:
471 # FIXME: on responses, this should take the request into
472 # consideration (cf. RFC7252 Section 5.2.3, answer to NON
473 # SHOULD be NON)
474 message.mtype = CON
475 else:
476 if self._active_exchanges is None:
477 self.log.warning(
478 "Forcing message to be sent as NON even though specified because transport is shutting down"
479 )
480 message.mtype = NON
482 if message.mtype == CON and message.remote.is_multicast:
483 raise ValueError("Refusing to send CON message to multicast address")
485 if message.mid is None:
486 message.mid = self._next_message_id()
488 if message.mtype == CON and message.remote in self._backlogs:
489 assert any(
490 remote == message.remote for (remote, _) in self._active_exchanges
491 )
492 self.log.debug("Message to %s put into backlog", message.remote)
493 self._backlogs[message.remote].append((message, messageerror_monitor))
494 else:
495 self._send_initially(message, messageerror_monitor)
497 def _send_initially(self, message, messageerror_monitor=None):
498 """Put the message on the wire for the first time, starting retransmission timeouts"""
500 self.log.debug("Sending message %r", message)
502 if message.mtype is CON:
503 assert messageerror_monitor is not None, (
504 "messageerror_monitor needs to be set for CONs"
505 )
506 self._add_exchange(message, messageerror_monitor)
508 self._store_response_for_duplicates(message)
510 self._send_via_transport(message)
512 def _send_via_transport(self, message):
513 """Put the message on the wire"""
515 self.message_interface.send(message)
517 def _next_message_id(self):
518 """Reserve and return a new message ID."""
519 message_id = self.message_id
520 self.message_id = 0xFFFF & (1 + self.message_id)
521 return message_id
523 def _send_empty_ack(self, remote, mid, reason):
524 """Send separate empty ACK for any reason.
526 Currently, this can happen only once per responder, that is, when the
527 last block1 has been transferred and the first block2 is not ready
528 yet."""
530 self.log.debug("Sending empty ACK: %s", reason)
531 ack = Message(
532 mtype=ACK,
533 code=EMPTY,
534 payload=b"",
535 )
536 ack.remote = remote.as_response_address()
537 ack.mid = mid
538 # not going via send_message because that would strip the mid, and we
539 # already know that it can go straight to the wire
540 self._send_initially(ack)