Coverage for aiocoap / messagemanager.py: 83%

225 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:32 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module contains all internals needed to manage messages on unreliable 

6transports, ie. everything that deals in message types or Message IDs. 

7 

8Currently, it also provides the mechanisms for managing tokens, but those will 

9be split into dedicated classes. 

10""" 

11 

12import asyncio 

13import functools 

14import random 

15from typing import Callable, Dict, List, Tuple, Optional 

16 

17from . import error 

18from . import interfaces 

19from .interfaces import EndpointAddress 

20from .message import Message 

21from .numbers.types import CON, ACK, RST, NON 

22from .numbers.codes import EMPTY 

23 

24 

25class MessageManager(interfaces.TokenInterface, interfaces.MessageManager): 

26 """This MessageManager Drives a message interface following the rules of 

27 RFC7252 CoAP over UDP. 

28 

29 It takes care of picking message IDs (mid) for outgoing messages, 

30 retransmitting CON messages, and to react appropriately to incoming 

31 messages' type, sending ACKs either immediately or later. 

32 

33 It creates piggy-backed responses by keeping an eye on the tokens the 

34 messages are sent with, but otherwise ignores the tokens. (It inspects 

35 tokens *only* where required by its sub-layer). 

36 """ 

37 

38 message_interface: interfaces.MessageInterface 

39 # needs to be set post-construction, because the message_interface in its constructor already needs to get its manager 

40 

41 def __init__(self, token_manager) -> None: 

42 self.token_manager = token_manager 

43 

44 self.message_id = random.randint(0, 65535) 

45 #: Tracker of recently received messages (by remote and message ID). 

46 #: Maps them to a response message when one is already known. 

47 self._recent_messages: Dict[Tuple[EndpointAddress, int], Optional[Message]] = {} 

48 #: Active exchanges i.e. sent CON messages (remote, message-id): 

49 #: (messageerror_monitor monitor, cancellable timeout) 

50 self._active_exchanges: Dict[ 

51 Tuple[EndpointAddress, int], Tuple[Callable[[], None], asyncio.Handle] 

52 ] = {} 

53 #: Per-remote list of (backlogged package, messageerror_monitor) 

54 #: tuples (keys exist iff there is an active_exchange with that node) 

55 self._backlogs: Dict[ 

56 EndpointAddress, List[Tuple[Message, Callable[[], None]]] 

57 ] = {} 

58 

59 #: Maps pending remote/token combinations to the MID a response can be 

60 #: piggybacked on, and the timeout that should be cancelled if it is. 

61 self._piggyback_opportunities: Dict[ 

62 Tuple[EndpointAddress, bytes], Tuple[int, asyncio.TimerHandle] 

63 ] = {} 

64 

65 self.log = token_manager.log 

66 self.loop = token_manager.loop 

67 

68 def __repr__(self): 

69 return "<%s for %s>" % ( 

70 type(self).__name__, 

71 getattr(self, "message_interface", "(unbound)"), 

72 ) 

73 

74 @property 

75 def client_credentials(self): 

76 return self.token_manager.client_credentials 

77 

78 async def shutdown(self): 

79 for messageerror_monitor, cancellable in self._active_exchanges.values(): 

80 # Not calling messageerror_monitor: This is not message specific, 

81 # and its shutdown will take care of these things 

82 cancellable.cancel() 

83 self._active_exchanges = None 

84 

85 await self.message_interface.shutdown() 

86 

87 # 

88 # implementing the MessageManager interface 

89 # 

90 

91 def dispatch_message(self, message): 

92 """Feed a message through the message-id, message-type and message-code 

93 sublayers of CoAP""" 

94 

95 self.log.debug("Incoming message %r", message) 

96 if message.code.is_request(): 

97 # Responses don't get deduplication because they "are idempotent or 

98 # can be handled in an idempotent fashion" (RFC 7252 Section 4.5). 

99 # This means that a separate response may get a RST when it is 

100 # arrives at the aiocoap client twice. Note that this does not 

101 # impede the operation of observations: Their token is still active 

102 # so they are ACK'd, and deduplication based on observation numbers 

103 # filters out the rest. 

104 # 

105 # This saves memory, and allows stateful transports to be shut down 

106 # expeditiously unless kept alive by something else (otherwise, 

107 # they'd linger for EXCHANGE_LIFETIME with no good reason). 

108 if self._deduplicate_message(message) is True: 

109 return 

110 

111 if message.mtype in (ACK, RST): 

112 self._remove_exchange(message) 

113 

114 if message.code is EMPTY and message.mtype is CON: 

115 self._process_ping(message) 

116 elif message.code is EMPTY and message.mtype in (ACK, RST): 

117 pass # empty ack has already been handled above 

118 elif message.code.is_request() and message.mtype in (CON, NON): 

119 # the request handler will have to deal with sending ACK itself, as 

120 # it might be timeout-related 

121 self._process_request(message) 

122 elif message.code.is_response() and message.mtype in (CON, NON, ACK): 

123 success = self._process_response(message) 

124 if success: 

125 if message.mtype is CON: 

126 self._send_empty_ack( 

127 message.remote, 

128 message.mid, 

129 reason="acknowledging incoming response", 

130 ) 

131 else: 

132 # A peer mustn't send a CON to multicast, but if a malicious 

133 # peer does, we better not answer 

134 if message.mtype == CON and not message.remote.is_multicast_locally: 

135 self.log.info("Response not recognized - sending RST.") 

136 rst = Message(_mtype=RST, _mid=message.mid, code=EMPTY, payload="") 

137 rst.remote = message.remote.as_response_address() 

138 self._send_initially(rst) 

139 else: 

140 self.log.info( 

141 "Ignoring unknown response (which is not a unicast CON)" 

142 ) 

143 else: 

144 self.log.warning( 

145 "Received a message with code %s and type %s (those don't fit) from %s, ignoring it.", 

146 message.code, 

147 message.mtype, 

148 message.remote, 

149 ) 

150 

151 def dispatch_error(self, error, remote): 

152 if self._active_exchanges is None: 

153 # Not entirely sure where it is so far; better just raise a warning 

154 # than an exception later, nothing terminally bad should come of 

155 # this error. 

156 self.log.warning( 

157 "Internal shutdown sequence mismatch: error dispatched through messagemanager after shutown" 

158 ) 

159 return 

160 

161 self.log.debug("Incoming error %s from %r", error, remote) 

162 

163 # cancel requests first, and then exchanges: cancelling the pending 

164 # exchange would trigger enqueued requests to be transmitted 

165 self.token_manager.dispatch_error(error, remote) 

166 

167 keys_for_removal = [] 

168 for key, ( 

169 messageerror_monitor, 

170 cancellable_timeout, 

171 ) in self._active_exchanges.items(): 

172 (exchange_remote, message_id) = key 

173 if remote == exchange_remote: 

174 keys_for_removal.append(key) 

175 for k in keys_for_removal: 

176 (messageerror_monitor, cancellable_timeout) = self._active_exchanges.pop(k) 

177 cancellable_timeout.cancel() 

178 # not triggering the messageerror_monitor: that already got the 

179 # clue from the token manager 

180 self._backlogs.pop(remote, ()) 

181 # while that's an iterable over messages and messageerror monitors, not 

182 # triggering them either for th esame reason as above 

183 

184 # 

185 # coap dispatch, message-id sublayer: duplicate handling 

186 # 

187 

188 def _deduplicate_message(self, message): 

189 """Return True if a message is a duplicate, and re-send the stored 

190 response if available. 

191 

192 Duplicate is a message with the same Message ID (mid) and sender 

193 (remote), as message received within last EXCHANGE_LIFETIME seconds 

194 (usually 247 seconds).""" 

195 

196 key = (message.remote, message.mid) 

197 if key in self._recent_messages: 

198 if message.mtype is CON: 

199 if self._recent_messages[key] is not None: 

200 self.log.info("Duplicate CON received, sending old response again") 

201 # not going via send_message because that would strip the 

202 # mid and might do all other sorts of checks 

203 self._send_initially(self._recent_messages[key]) 

204 else: 

205 self.log.info("Duplicate CON received, no response to send yet") 

206 else: 

207 self.log.info("Duplicate NON, ACK or RST received") 

208 return True 

209 else: 

210 self.log.debug("New unique message received") 

211 self.loop.call_later( 

212 message.transport_tuning.EXCHANGE_LIFETIME, 

213 functools.partial(self._recent_messages.pop, key), 

214 ) 

215 self._recent_messages[key] = None 

216 return False 

217 

218 def _store_response_for_duplicates(self, message): 

219 """If the message is the response can be used to satisfy a future 

220 duplicate message, store it.""" 

221 

222 key = (message.remote, message.mid) 

223 if key in self._recent_messages: 

224 self._recent_messages[key] = message 

225 

226 # 

227 # coap dispatch, message-type sublayer: retransmission handling 

228 # 

229 

230 def _add_exchange(self, message, messageerror_monitor): 

231 """Add an "exchange" for outgoing CON message. 

232 

233 CON (Confirmable) messages are automatically retransmitted by protocol 

234 until ACK or RST message with the same Message ID is received from 

235 target host.""" 

236 

237 key = (message.remote, message.mid) 

238 

239 if message.remote not in self._backlogs: 

240 self._backlogs[message.remote] = [] 

241 

242 timeout = random.uniform( 

243 message.transport_tuning.ACK_TIMEOUT, 

244 message.transport_tuning.ACK_TIMEOUT 

245 * message.transport_tuning.ACK_RANDOM_FACTOR, 

246 ) 

247 

248 next_retransmission = self._schedule_retransmit(message, timeout, 0) 

249 self._active_exchanges[key] = (messageerror_monitor, next_retransmission) 

250 

251 self.log.debug("Exchange added, message ID: %d.", message.mid) 

252 

253 def _remove_exchange(self, message): 

254 """Remove exchange from active exchanges and cancel the timeout to next 

255 retransmission.""" 

256 key = (message.remote, message.mid) 

257 

258 if key not in self._active_exchanges: 

259 # Before turning this up to a warning, consider https://github.com/chrysn/aiocoap/issues/288 

260 self.log.info( 

261 "Received %s from %s, but could not match it to a running exchange.", 

262 message.mtype, 

263 message.remote, 

264 ) 

265 return 

266 

267 messageerror_monitor, next_retransmission = self._active_exchanges.pop(key) 

268 next_retransmission.cancel() 

269 if message.mtype is RST: 

270 messageerror_monitor() 

271 self.log.debug("Exchange removed, message ID: %d.", message.mid) 

272 

273 self._continue_backlog(message.remote) 

274 

275 def _continue_backlog(self, remote): 

276 """After an exchange has been removed, start working off the backlog or 

277 clear it completely.""" 

278 

279 if remote not in self._backlogs: 

280 # if active exchanges were something we could do a 

281 # .register_finally() on, we could chain them like that; if we 

282 # implemented anything but NSTART=1, we'll need a more elaborate 

283 # system anyway 

284 raise AssertionError( 

285 "backlogs/active_exchange relation violated (implementation error)" 

286 ) 

287 

288 # first iteration is sure to happen, others happen only if the enqueued 

289 # messages were NONs 

290 while not any(r == remote for r, mid in self._active_exchanges.keys()): 

291 if self._backlogs[remote] != []: 

292 next_message, messageerror_monitor = self._backlogs[remote].pop(0) 

293 self._send_initially(next_message, messageerror_monitor) 

294 else: 

295 del self._backlogs[remote] 

296 break 

297 

298 def _schedule_retransmit(self, message, timeout, retransmission_counter): 

299 """Create and return a call_later for first or subsequent 

300 retransmissions.""" 

301 

302 # while this could just as well be done in a lambda or with the 

303 # arguments passed to call_later, in this form makes the test cases 

304 # easier to debug (it's about finding where references to a Context 

305 # are kept around; contexts should be able to shut down in an orderly 

306 # way without littering references in the loop) 

307 

308 def retr( 

309 self=self, 

310 message=message, 

311 timeout=timeout, 

312 retransmission_counter=retransmission_counter, 

313 doc="If you read this, have a look at _schedule_retransmit", 

314 id=object(), 

315 ): 

316 self._retransmit(message, timeout, retransmission_counter) 

317 

318 return self.loop.call_later(timeout, retr) 

319 

320 def _retransmit(self, message, timeout, retransmission_counter): 

321 """Retransmit CON message that has not been ACKed or RSTed.""" 

322 key = (message.remote, message.mid) 

323 

324 messageerror_monitor, next_retransmission = self._active_exchanges.pop(key) 

325 # this should be a no-op, but let's be sure 

326 next_retransmission.cancel() 

327 

328 if retransmission_counter < message.transport_tuning.MAX_RETRANSMIT: 

329 self.log.info("Retransmission, Message ID: %d.", message.mid) 

330 self._send_via_transport(message) 

331 retransmission_counter += 1 

332 timeout *= 2 

333 

334 next_retransmission = self._schedule_retransmit( 

335 message, timeout, retransmission_counter 

336 ) 

337 self._active_exchanges[key] = (messageerror_monitor, next_retransmission) 

338 else: 

339 self.log.info("Exchange timed out trying to transmit %s", message) 

340 del self._backlogs[message.remote] 

341 self.token_manager.dispatch_error( 

342 error.ConRetransmitsExceeded("Retransmissions exceeded"), message.remote 

343 ) 

344 

345 # 

346 # coap dispatch, message-code sublayer: triggering custom actions based on incoming messages 

347 # 

348 

349 def _process_ping(self, message): 

350 self.log.info("Received CoAP Ping from %s, replying with RST.", message.remote) 

351 rst = Message(_mtype=RST, _mid=message.mid, code=EMPTY, payload=b"") 

352 rst.remote = message.remote.as_response_address() 

353 # not going via send_message because that would strip the mid, and we 

354 # already know that it can go straight to the wire 

355 self._send_initially(rst) 

356 

357 def _process_request(self, request): 

358 """Spawn a responder for an incoming request, or feed a long-running 

359 responder if one exists.""" 

360 

361 if request.mtype == CON: 

362 

363 def on_timeout(self, remote, token): 

364 mid, own_timeout = self._piggyback_opportunities.pop((remote, token)) 

365 self._send_empty_ack( 

366 request.remote, mid, "Response took too long to prepare" 

367 ) 

368 

369 handle = self.loop.call_later( 

370 request.transport_tuning.EMPTY_ACK_DELAY, 

371 on_timeout, 

372 self, 

373 request.remote, 

374 request.token, 

375 ) 

376 key = (request.remote, request.token) 

377 if key in self._piggyback_opportunities: 

378 self.log.warning( 

379 "New request came in while old request not" 

380 " ACKed yet. Possible mismatch between EMPTY_ACK_DELAY" 

381 " and EXCHANGE_LIFETIME. Cancelling ACK to ward off any" 

382 " further confusion." 

383 ) 

384 mid, old_handle = self._piggyback_opportunities.pop(key) 

385 old_handle.cancel() 

386 self._piggyback_opportunities[key] = (request.mid, handle) 

387 

388 self.token_manager.process_request(request) 

389 

390 def _process_response(self, response): 

391 """Feed a response back to whatever might expect it. 

392 

393 Returns True if the response was expected (and should be ACK'd 

394 depending on mtype), and False if it was not expected (and should be 

395 RST'd).""" 

396 

397 self.log.debug("Received Response: %r", response) 

398 

399 return self.token_manager.process_response(response) 

400 

401 # 

402 # outgoing messages 

403 # 

404 

405 async def recognize_remote(self, message): 

406 return await self.message_interface.recognize_remote(message.remote) 

407 

408 async def determine_remote(self, message): 

409 return await self.message_interface.determine_remote(message) 

410 

411 def send_message(self, message, messageerror_monitor): 

412 """Encode and send message. This takes care of retransmissions (if 

413 CON), message IDs and rate limiting, but does not hook any events to 

414 responses. (Use the :class:`Request` class or responding resources 

415 instead; those are the typical callers of this function.) 

416 

417 If notification about the progress of the exchange is required, an 

418 ExchangeMonitor can be passed in, which will receive the appropriate 

419 callbacks.""" 

420 

421 if message.mid is not None: 

422 # if you can give any reason why the application should provide a 

423 # fixed mid, lower the log level on demand and provide the reason 

424 # in a comment. 

425 self.log.warning( 

426 "Message ID set on to-be-sent message, this is" 

427 " probably unintended; clearing it." 

428 ) 

429 message.mid = None 

430 

431 if message.code.is_response(): 

432 no_response = (message.opt.no_response or 0) & ( 

433 1 << message.code.class_ - 1 

434 ) != 0 

435 

436 piggyback_key = (message.remote, message.token) 

437 if piggyback_key in self._piggyback_opportunities: 

438 mid, handle = self._piggyback_opportunities.pop(piggyback_key) 

439 handle.cancel() 

440 

441 if no_response: 

442 new_message = Message(code=EMPTY, mid=mid, mtype=ACK) 

443 new_message.remote = message.remote.as_response_address() 

444 message = new_message 

445 self.log.debug( 

446 "Turning to-be-sent message into an empty ACK due to no_response option." 

447 ) 

448 else: 

449 message.mtype = ACK 

450 message.mid = mid 

451 else: 

452 if no_response: 

453 self.log.debug( 

454 "Stopping message in message manager as it is no_response and no ACK is pending." 

455 ) 

456 return 

457 

458 message.opt.no_response = None 

459 

460 if message.mtype is None: 

461 if self._active_exchanges is None: 

462 # during shutdown, this is all we can do 

463 message.mtype = NON 

464 else: 

465 if message.remote.is_multicast: 

466 message.mtype = NON 

467 else: 

468 # All forcing factors are now accounted for -- either value 

469 # is now OK (and if the transport_tuning doesn't tell us, 

470 # we'll use sensible defaults) 

471 

472 match message.transport_tuning.reliability: 

473 case True: 

474 message.mtype = CON 

475 case False: 

476 message.mtype = NON 

477 case None: 

478 if ( 

479 message.request is not None 

480 and message.request.mtype is NON 

481 ): 

482 message.mtype = NON 

483 else: 

484 message.mtype = CON 

485 else: 

486 if self._active_exchanges is None: 

487 self.log.warning( 

488 "Forcing message to be sent as NON even though specified because transport is shutting down" 

489 ) 

490 message.mtype = NON 

491 

492 if message.mtype == CON and message.remote.is_multicast: 

493 raise error.ConToMulticast 

494 

495 if message.mid is None: 

496 message.mid = self._next_message_id() 

497 

498 if message.mtype == CON and message.remote in self._backlogs: 

499 assert any( 

500 remote == message.remote for (remote, _) in self._active_exchanges 

501 ) 

502 self.log.debug("Message to %s put into backlog", message.remote) 

503 self._backlogs[message.remote].append((message, messageerror_monitor)) 

504 else: 

505 self._send_initially(message, messageerror_monitor) 

506 

507 def _send_initially(self, message, messageerror_monitor=None): 

508 """Put the message on the wire for the first time, starting retransmission timeouts""" 

509 

510 self.log.debug("Sending message %r", message) 

511 

512 if message.mtype is CON: 

513 assert messageerror_monitor is not None, ( 

514 "messageerror_monitor needs to be set for CONs" 

515 ) 

516 self._add_exchange(message, messageerror_monitor) 

517 

518 self._store_response_for_duplicates(message) 

519 

520 self._send_via_transport(message) 

521 

522 def _send_via_transport(self, message): 

523 """Put the message on the wire""" 

524 

525 self.message_interface.send(message) 

526 

527 def _next_message_id(self): 

528 """Reserve and return a new message ID.""" 

529 message_id = self.message_id 

530 self.message_id = 0xFFFF & (1 + self.message_id) 

531 return message_id 

532 

533 def _send_empty_ack(self, remote, mid, reason): 

534 """Send separate empty ACK for any reason. 

535 

536 Currently, this can happen only once per responder, that is, when the 

537 last block1 has been transferred and the first block2 is not ready 

538 yet.""" 

539 

540 self.log.debug("Sending empty ACK: %s", reason) 

541 ack = Message( 

542 _mtype=ACK, 

543 code=EMPTY, 

544 payload=b"", 

545 ) 

546 ack.remote = remote.as_response_address() 

547 ack.mid = mid 

548 # not going via send_message because that would strip the mid, and we 

549 # already know that it can go straight to the wire 

550 self._send_initially(ack)