Coverage for aiocoap/messagemanager.py: 83%

222 statements  

« prev     ^ index     » next       coverage.py v7.6.3, created at 2024-10-15 22:10 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module contains all internals needed to manage messages on unreliable 

6transports, ie. everything that deals in message types or Message IDs. 

7 

8Currently, it also provides the mechanisms for managing tokens, but those will 

9be split into dedicated classes. 

10""" 

11 

12import asyncio 

13import functools 

14import random 

15from typing import Callable, Dict, List, Tuple, Optional 

16 

17from . import error 

18from . import interfaces 

19from .interfaces import EndpointAddress 

20from .message import Message 

21from .numbers.types import CON, ACK, RST, NON 

22from .numbers.codes import EMPTY 

23 

24 

25class MessageManager(interfaces.TokenInterface, interfaces.MessageManager): 

26 """This MessageManager Drives a message interface following the rules of 

27 RFC7252 CoAP over UDP. 

28 

29 It takes care of picking message IDs (mid) for outgoing messages, 

30 retransmitting CON messages, and to react appropriately to incoming 

31 messages' type, sending ACKs either immediately or later. 

32 

33 It creates piggy-backed responses by keeping an eye on the tokens the 

34 messages are sent with, but otherwise ignores the tokens. (It inspects 

35 tokens *only* where required by its sub-layer). 

36 """ 

37 

38 def __init__(self, token_manager) -> None: 

39 self.token_manager = token_manager 

40 

41 self.message_id = random.randint(0, 65535) 

42 #: Tracker of recently received messages (by remote and message ID). 

43 #: Maps them to a response message when one is already known. 

44 self._recent_messages: Dict[Tuple[EndpointAddress, int], Optional[Message]] = {} 

45 #: Active exchanges i.e. sent CON messages (remote, message-id): 

46 #: (messageerror_monitor monitor, cancellable timeout) 

47 self._active_exchanges: Dict[ 

48 Tuple[EndpointAddress, int], Tuple[Callable[[], None], asyncio.Handle] 

49 ] = {} 

50 #: Per-remote list of (backlogged package, messageerror_monitor) 

51 #: tuples (keys exist iff there is an active_exchange with that node) 

52 self._backlogs: Dict[ 

53 EndpointAddress, List[Tuple[Message, Callable[[], None]]] 

54 ] = {} 

55 

56 #: Maps pending remote/token combinations to the MID a response can be 

57 #: piggybacked on, and the timeout that should be cancelled if it is. 

58 self._piggyback_opportunities: Dict[ 

59 Tuple[EndpointAddress, bytes], Tuple[int, asyncio.TimerHandle] 

60 ] = {} 

61 

62 self.log = token_manager.log 

63 self.loop = token_manager.loop 

64 

65 # self.message_interface = … -- needs to be set post-construction, because the message_interface in its constructor already needs to get its manager 

66 

67 def __repr__(self): 

68 return "<%s for %s>" % ( 

69 type(self).__name__, 

70 getattr(self, "message_interface", "(unbound)"), 

71 ) 

72 

73 @property 

74 def client_credentials(self): 

75 return self.token_manager.client_credentials 

76 

77 async def shutdown(self): 

78 for messageerror_monitor, cancellable in self._active_exchanges.values(): 

79 # Not calling messageerror_monitor: This is not message specific, 

80 # and its shutdown will take care of these things 

81 cancellable.cancel() 

82 self._active_exchanges = None 

83 

84 await self.message_interface.shutdown() 

85 

86 # 

87 # implementing the MessageManager interface 

88 # 

89 

90 def dispatch_message(self, message): 

91 """Feed a message through the message-id, message-type and message-code 

92 sublayers of CoAP""" 

93 

94 self.log.debug("Incoming message %r", message) 

95 if message.code.is_request(): 

96 # Responses don't get deduplication because they "are idempotent or 

97 # can be handled in an idempotent fashion" (RFC 7252 Section 4.5). 

98 # This means that a separate response may get a RST when it is 

99 # arrives at the aiocoap client twice. Note that this does not 

100 # impede the operation of observations: Their token is still active 

101 # so they are ACK'd, and deduplication based on observation numbers 

102 # filters out the rest. 

103 # 

104 # This saves memory, and allows stateful transports to be shut down 

105 # expeditiously unless kept alive by something else (otherwise, 

106 # they'd linger for EXCHANGE_LIFETIME with no good reason). 

107 if self._deduplicate_message(message) is True: 

108 return 

109 

110 if message.mtype in (ACK, RST): 

111 self._remove_exchange(message) 

112 

113 if message.code is EMPTY and message.mtype is CON: 

114 self._process_ping(message) 

115 elif message.code is EMPTY and message.mtype in (ACK, RST): 

116 pass # empty ack has already been handled above 

117 elif message.code.is_request() and message.mtype in (CON, NON): 

118 # the request handler will have to deal with sending ACK itself, as 

119 # it might be timeout-related 

120 self._process_request(message) 

121 elif message.code.is_response() and message.mtype in (CON, NON, ACK): 

122 success = self._process_response(message) 

123 if success: 

124 if message.mtype is CON: 

125 self._send_empty_ack( 

126 message.remote, 

127 message.mid, 

128 reason="acknowledging incoming response", 

129 ) 

130 else: 

131 # A peer mustn't send a CON to multicast, but if a malicious 

132 # peer does, we better not answer 

133 if message.mtype == CON and not message.remote.is_multicast_locally: 

134 self.log.info("Response not recognized - sending RST.") 

135 rst = Message(mtype=RST, mid=message.mid, code=EMPTY, payload="") 

136 rst.remote = message.remote.as_response_address() 

137 self._send_initially(rst) 

138 else: 

139 self.log.info( 

140 "Ignoring unknown response (which is not a unicast CON)" 

141 ) 

142 else: 

143 self.log.warning( 

144 "Received a message with code %s and type %s (those don't fit) from %s, ignoring it.", 

145 message.code, 

146 message.mtype, 

147 message.remote, 

148 ) 

149 

150 def dispatch_error(self, error, remote): 

151 if self._active_exchanges is None: 

152 # Not entirely sure where it is so far; better just raise a warning 

153 # than an exception later, nothing terminally bad should come of 

154 # this error. 

155 self.log.warning( 

156 "Internal shutdown sequence msismatch: error dispatched through messagemanager after shutown" 

157 ) 

158 return 

159 

160 self.log.debug("Incoming error %s from %r", error, remote) 

161 

162 # cancel requests first, and then exchanges: cancelling the pending 

163 # exchange would trigger enqueued requests to be transmitted 

164 self.token_manager.dispatch_error(error, remote) 

165 

166 keys_for_removal = [] 

167 for key, ( 

168 messageerror_monitor, 

169 cancellable_timeout, 

170 ) in self._active_exchanges.items(): 

171 (exchange_remote, message_id) = key 

172 if remote == exchange_remote: 

173 keys_for_removal.append(key) 

174 for k in keys_for_removal: 

175 (messageerror_monitor, cancellable_timeout) = self._active_exchanges.pop(k) 

176 cancellable_timeout.cancel() 

177 # not triggering the messageerror_monitor: that already got the 

178 # clue from the token manager 

179 self._backlogs.pop(remote, ()) 

180 # while that's an iterable over messages and messageerror monitors, not 

181 # triggering them either for th esame reason as above 

182 

183 # 

184 # coap dispatch, message-id sublayer: duplicate handling 

185 # 

186 

187 def _deduplicate_message(self, message): 

188 """Return True if a message is a duplicate, and re-send the stored 

189 response if available. 

190 

191 Duplicate is a message with the same Message ID (mid) and sender 

192 (remote), as message received within last EXCHANGE_LIFETIME seconds 

193 (usually 247 seconds).""" 

194 

195 key = (message.remote, message.mid) 

196 if key in self._recent_messages: 

197 if message.mtype is CON: 

198 if self._recent_messages[key] is not None: 

199 self.log.info("Duplicate CON received, sending old response again") 

200 # not going via send_message because that would strip the 

201 # mid and might do all other sorts of checks 

202 self._send_initially(self._recent_messages[key]) 

203 else: 

204 self.log.info("Duplicate CON received, no response to send yet") 

205 else: 

206 self.log.info("Duplicate NON, ACK or RST received") 

207 return True 

208 else: 

209 self.log.debug("New unique message received") 

210 self.loop.call_later( 

211 message.transport_tuning.EXCHANGE_LIFETIME, 

212 functools.partial(self._recent_messages.pop, key), 

213 ) 

214 self._recent_messages[key] = None 

215 return False 

216 

217 def _store_response_for_duplicates(self, message): 

218 """If the message is the response can be used to satisfy a future 

219 duplicate message, store it.""" 

220 

221 key = (message.remote, message.mid) 

222 if key in self._recent_messages: 

223 self._recent_messages[key] = message 

224 

225 # 

226 # coap dispatch, message-type sublayer: retransmission handling 

227 # 

228 

229 def _add_exchange(self, message, messageerror_monitor): 

230 """Add an "exchange" for outgoing CON message. 

231 

232 CON (Confirmable) messages are automatically retransmitted by protocol 

233 until ACK or RST message with the same Message ID is received from 

234 target host.""" 

235 

236 key = (message.remote, message.mid) 

237 

238 if message.remote not in self._backlogs: 

239 self._backlogs[message.remote] = [] 

240 

241 timeout = random.uniform( 

242 message.transport_tuning.ACK_TIMEOUT, 

243 message.transport_tuning.ACK_TIMEOUT 

244 * message.transport_tuning.ACK_RANDOM_FACTOR, 

245 ) 

246 

247 next_retransmission = self._schedule_retransmit(message, timeout, 0) 

248 self._active_exchanges[key] = (messageerror_monitor, next_retransmission) 

249 

250 self.log.debug("Exchange added, message ID: %d.", message.mid) 

251 

252 def _remove_exchange(self, message): 

253 """Remove exchange from active exchanges and cancel the timeout to next 

254 retransmission.""" 

255 key = (message.remote, message.mid) 

256 

257 if key not in self._active_exchanges: 

258 # Before turning this up to a warning, consider https://github.com/chrysn/aiocoap/issues/288 

259 self.log.info( 

260 "Received %s from %s, but could not match it to a running exchange.", 

261 message.mtype, 

262 message.remote, 

263 ) 

264 return 

265 

266 messageerror_monitor, next_retransmission = self._active_exchanges.pop(key) 

267 next_retransmission.cancel() 

268 if message.mtype is RST: 

269 messageerror_monitor() 

270 self.log.debug("Exchange removed, message ID: %d.", message.mid) 

271 

272 self._continue_backlog(message.remote) 

273 

274 def _continue_backlog(self, remote): 

275 """After an exchange has been removed, start working off the backlog or 

276 clear it completely.""" 

277 

278 if remote not in self._backlogs: 

279 # if active exchanges were something we could do a 

280 # .register_finally() on, we could chain them like that; if we 

281 # implemented anything but NSTART=1, we'll need a more elaborate 

282 # system anyway 

283 raise AssertionError( 

284 "backlogs/active_exchange relation violated (implementation error)" 

285 ) 

286 

287 # first iteration is sure to happen, others happen only if the enqueued 

288 # messages were NONs 

289 while not any(r == remote for r, mid in self._active_exchanges.keys()): 

290 if self._backlogs[remote] != []: 

291 next_message, messageerror_monitor = self._backlogs[remote].pop(0) 

292 self._send_initially(next_message, messageerror_monitor) 

293 else: 

294 del self._backlogs[remote] 

295 break 

296 

297 def _schedule_retransmit(self, message, timeout, retransmission_counter): 

298 """Create and return a call_later for first or subsequent 

299 retransmissions.""" 

300 

301 # while this could just as well be done in a lambda or with the 

302 # arguments passed to call_later, in this form makes the test cases 

303 # easier to debug (it's about finding where references to a Context 

304 # are kept around; contexts should be able to shut down in an orderly 

305 # way without littering references in the loop) 

306 

307 def retr( 

308 self=self, 

309 message=message, 

310 timeout=timeout, 

311 retransmission_counter=retransmission_counter, 

312 doc="If you read this, have a look at _schedule_retransmit", 

313 id=object(), 

314 ): 

315 self._retransmit(message, timeout, retransmission_counter) 

316 

317 return self.loop.call_later(timeout, retr) 

318 

319 def _retransmit(self, message, timeout, retransmission_counter): 

320 """Retransmit CON message that has not been ACKed or RSTed.""" 

321 key = (message.remote, message.mid) 

322 

323 messageerror_monitor, next_retransmission = self._active_exchanges.pop(key) 

324 # this should be a no-op, but let's be sure 

325 next_retransmission.cancel() 

326 

327 if retransmission_counter < message.transport_tuning.MAX_RETRANSMIT: 

328 self.log.info("Retransmission, Message ID: %d.", message.mid) 

329 self._send_via_transport(message) 

330 retransmission_counter += 1 

331 timeout *= 2 

332 

333 next_retransmission = self._schedule_retransmit( 

334 message, timeout, retransmission_counter 

335 ) 

336 self._active_exchanges[key] = (messageerror_monitor, next_retransmission) 

337 else: 

338 self.log.info("Exchange timed out trying to transmit %s", message) 

339 del self._backlogs[message.remote] 

340 self.token_manager.dispatch_error( 

341 error.ConRetransmitsExceeded("Retransmissions exceeded"), message.remote 

342 ) 

343 

344 # 

345 # coap dispatch, message-code sublayer: triggering custom actions based on incoming messages 

346 # 

347 

348 def _process_ping(self, message): 

349 self.log.info("Received CoAP Ping from %s, replying with RST.", message.remote) 

350 rst = Message(mtype=RST, mid=message.mid, code=EMPTY, payload=b"") 

351 rst.remote = message.remote.as_response_address() 

352 # not going via send_message because that would strip the mid, and we 

353 # already know that it can go straight to the wire 

354 self._send_initially(rst) 

355 

356 def _process_request(self, request): 

357 """Spawn a responder for an incoming request, or feed a long-running 

358 responder if one exists.""" 

359 

360 if request.mtype == CON: 

361 

362 def on_timeout(self, remote, token): 

363 mid, own_timeout = self._piggyback_opportunities.pop((remote, token)) 

364 self._send_empty_ack( 

365 request.remote, mid, "Response took too long to prepare" 

366 ) 

367 

368 handle = self.loop.call_later( 

369 request.transport_tuning.EMPTY_ACK_DELAY, 

370 on_timeout, 

371 self, 

372 request.remote, 

373 request.token, 

374 ) 

375 key = (request.remote, request.token) 

376 if key in self._piggyback_opportunities: 

377 self.log.warning( 

378 "New request came in while old request not" 

379 " ACKed yet. Possible mismatch between EMPTY_ACK_DELAY" 

380 " and EXCHANGE_LIFETIME. Cancelling ACK to ward off any" 

381 " further confusion." 

382 ) 

383 mid, old_handle = self._piggyback_opportunities.pop(key) 

384 old_handle.cancel() 

385 self._piggyback_opportunities[key] = (request.mid, handle) 

386 

387 self.token_manager.process_request(request) 

388 

389 def _process_response(self, response): 

390 """Feed a response back to whatever might expect it. 

391 

392 Returns True if the response was expected (and should be ACK'd 

393 depending on mtype), and False if it was not expected (and should be 

394 RST'd).""" 

395 

396 self.log.debug("Received Response: %r", response) 

397 

398 return self.token_manager.process_response(response) 

399 

400 # 

401 # outgoing messages 

402 # 

403 

404 async def fill_or_recognize_remote(self, message): 

405 if message.remote is not None: 

406 if await self.message_interface.recognize_remote(message.remote): 

407 return True 

408 remote = await self.message_interface.determine_remote(message) 

409 if remote is not None: 

410 message.remote = remote 

411 return True 

412 return False 

413 

414 def send_message(self, message, messageerror_monitor): 

415 """Encode and send message. This takes care of retransmissions (if 

416 CON), message IDs and rate limiting, but does not hook any events to 

417 responses. (Use the :class:`Request` class or responding resources 

418 instead; those are the typical callers of this function.) 

419 

420 If notification about the progress of the exchange is required, an 

421 ExchangeMonitor can be passed in, which will receive the appropriate 

422 callbacks.""" 

423 

424 if message.mid is not None: 

425 # if you can give any reason why the application should provide a 

426 # fixed mid, lower the log level on demand and provide the reason 

427 # in a comment. 

428 self.log.warning( 

429 "Message ID set on to-be-sent message, this is" 

430 " probably unintended; clearing it." 

431 ) 

432 message.mid = None 

433 

434 if message.code.is_response(): 

435 no_response = (message.opt.no_response or 0) & ( 

436 1 << message.code.class_ - 1 

437 ) != 0 

438 

439 piggyback_key = (message.remote, message.token) 

440 if piggyback_key in self._piggyback_opportunities: 

441 mid, handle = self._piggyback_opportunities.pop(piggyback_key) 

442 handle.cancel() 

443 

444 if no_response: 

445 new_message = Message(code=EMPTY, mid=mid, mtype=ACK) 

446 new_message.remote = message.remote.as_response_address() 

447 message = new_message 

448 self.log.debug( 

449 "Turning to-be-sent message into an empty ACK due to no_response option." 

450 ) 

451 else: 

452 message.mtype = ACK 

453 message.mid = mid 

454 else: 

455 if no_response: 

456 self.log.debug( 

457 "Stopping message in message manager as it is no_response and no ACK is pending." 

458 ) 

459 return 

460 

461 message.opt.no_response = None 

462 

463 if message.mtype is None: 

464 if self._active_exchanges is None: 

465 # during shutdown, this is all we can do 

466 message.mtype = NON 

467 else: 

468 if message.remote.is_multicast: 

469 message.mtype = NON 

470 else: 

471 # FIXME: on responses, this should take the request into 

472 # consideration (cf. RFC7252 Section 5.2.3, answer to NON 

473 # SHOULD be NON) 

474 message.mtype = CON 

475 else: 

476 if self._active_exchanges is None: 

477 self.log.warning( 

478 "Forcing message to be sent as NON even though specified because transport is shutting down" 

479 ) 

480 message.mtype = NON 

481 

482 if message.mtype == CON and message.remote.is_multicast: 

483 raise ValueError("Refusing to send CON message to multicast address") 

484 

485 if message.mid is None: 

486 message.mid = self._next_message_id() 

487 

488 if message.mtype == CON and message.remote in self._backlogs: 

489 assert any( 

490 remote == message.remote for (remote, _) in self._active_exchanges 

491 ) 

492 self.log.debug("Message to %s put into backlog", message.remote) 

493 self._backlogs[message.remote].append((message, messageerror_monitor)) 

494 else: 

495 self._send_initially(message, messageerror_monitor) 

496 

497 def _send_initially(self, message, messageerror_monitor=None): 

498 """Put the message on the wire for the first time, starting retransmission timeouts""" 

499 

500 self.log.debug("Sending message %r", message) 

501 

502 if message.mtype is CON: 

503 assert ( 

504 messageerror_monitor is not None 

505 ), "messageerror_monitor needs to be set for CONs" 

506 self._add_exchange(message, messageerror_monitor) 

507 

508 self._store_response_for_duplicates(message) 

509 

510 self._send_via_transport(message) 

511 

512 def _send_via_transport(self, message): 

513 """Put the message on the wire""" 

514 

515 self.message_interface.send(message) 

516 

517 def _next_message_id(self): 

518 """Reserve and return a new message ID.""" 

519 message_id = self.message_id 

520 self.message_id = 0xFFFF & (1 + self.message_id) 

521 return message_id 

522 

523 def _send_empty_ack(self, remote, mid, reason): 

524 """Send separate empty ACK for any reason. 

525 

526 Currently, this can happen only once per responder, that is, when the 

527 last block1 has been transferred and the first block2 is not ready 

528 yet.""" 

529 

530 self.log.debug("Sending empty ACK: %s", reason) 

531 ack = Message( 

532 mtype=ACK, 

533 code=EMPTY, 

534 payload=b"", 

535 ) 

536 ack.remote = remote.as_response_address() 

537 ack.mid = mid 

538 # not going via send_message because that would strip the mid, and we 

539 # already know that it can go straight to the wire 

540 self._send_initially(ack)