Coverage for aiocoap / transports / slipmux.py: 82%

261 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:32 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module implements slipmux-03_, with some adjustments regarding URI design 

6taken from transport-indication_: It chooses the pattern `{NAME}.dev.alt` for 

7devices named /dev/{NAME}, which are treated case-insensitively. 

8 

9Configuration 

10============= 

11 

12While the ``slipmux`` transport is implicitly enabled whenever its dependencies 

13are met, it has no effect on servers (because devices are not automatically 

14bound to: a slipmux device is not generally recognizable as such). 

15 

16To set up server configuration, or for extra options such as the use or 

17creation of UNIX sockets, this transport can be configured by setting 

18:class:`aiocoap.config.SlipmuxParameters` in the transports configured at 

19context creation (:meth:`Context.create_client_context() 

20<aiocoap.protocol.Context.create_client_context>` / 

21:meth:`create_server_context() 

22<aiocoap.protocol.Context.create_server_context>`). 

23 

24Usage example 

25============= 

26 

27Client with physical hardware 

28----------------------------- 

29 

30Assuming you have a constrained device that suports slipmux connected to a PC 

31as ``/dev/ttyACM0``, you can run:: 

32 

33 $ aiocoap-client coap://ttyacm0.dev.alt/.well-known/core 

34 

35and interact with resources as found there; no further configuration is needed. 

36 

37Server with test peer 

38--------------------- 

39 

40To mock slipmux without a real serial connection, you can configure a slipmux 

41host name to open up a UNIX socket instead. Write this configuration into 

42``config-unix-listen.toml``:: 

43 

44 [transport.slipmux.devices] 

45 my-listener = { unix-listen = "/tmp/coap.socket" } 

46 

47Then run a server such as the file server:: 

48 

49 $ aiocoap-fileserver --server-config config-unix-listen.toml 

50 

51You can then run a client such as Jelly intreactively with that server:: 

52 

53 $ cargo install Jelly 

54 $ Jelly /tmp/coap.socket 

55 

56Beware that the aiocoap based server needs to be restarted when the client 

57disconnects (see Caveats below). 

58 

59Client with test peer 

60--------------------- 

61 

62You can also run aiocoap as a test client, but as that doesn't take a file name 

63argument to connect to (because it operates on URIs' host components), some 

64configuration is necessary. Store this in ``config-unix-connect.toml``:: 

65 

66 [transport.slipmux.devices] 

67 my-connection = { unix-connect = "/tmp/coap.socket" } 

68 

69Then run:: 

70 

71 $ aiocoap-client coap://my-connection.dev.alt/.well-known/core --config config-unix-connect.toml 

72 

73Caveats 

74======= 

75 

76While servers do connect automatically to any configured slipmux endpoint, they 

77do not reconnect automatically when that device goes away or is replaced (as 

78may happen when resetting a development board, depending on its USB UART 

79implementation). The same is true for UNIX sockets. 

80 

81Error handing is generally incomplete when it comes to I/O errors. 

82 

83This transport is currently not tested automatically, as only the client side 

84is implemented, with no mechanism for acting on (eg.) a UNIX socket instead. 

85 

86.. _slipmux-03: https://datatracker.ietf.org/doc/draft-bormann-t2trg-slipmux/03/ 

87.. _transport-indication: https://datatracker.ietf.org/doc/draft-ietf-core-transport-indication 

88 

89------ 

90""" 

91 

92from __future__ import annotations 

93 

94# Implementation experience notes: 

95# 

96# * lower/upper in device names is tedious, esp. with defaults such as ttyACM0 

97# * FCS is very old reference that had no test vectors; single example would be helpful. 

98# * Ports numbers? 

99 

100import asyncio 

101import os 

102from pathlib import Path 

103import string 

104from typing import Optional 

105import weakref 

106 

107import serial_asyncio 

108 

109from .. import Message 

110from .. import error, interfaces 

111 

112# circular but allows matching on constants 

113from . import slipmux 

114 

115from ..config import TransportParameters, SlipmuxDevice 

116 

117# from RFC1055 

118ESC = 0o333 

119END = 0o300 

120ESC_END = 0o334 

121ESC_ESC = 0o335 

122# but at runtime we need the combined forms 

123BYTE_ESC = bytes((ESC,)) 

124BYTE_END = bytes((END,)) 

125BYTES_ESC_ESC_END = BYTE_ESC + bytes((ESC_END,)) 

126BYTES_ESC_ESC_ESC = BYTE_ESC + bytes((ESC_ESC,)) 

127 

128# from slipmux draft 

129HEADER_CONTROL = 0xA9 

130 

131# from RFC3986 

132_UNRESERVED = string.ascii_lowercase + string.digits + "-._~" 

133 

134# Values from RFC1662 

135# fmt: off 

136_FCS_LOOKUP = ( 

137 0x0000, 0x1189, 0x2312, 0x329B, 0x4624, 0x57AD, 0x6536, 0x74BF, 

138 0x8C48, 0x9DC1, 0xAF5A, 0xBED3, 0xCA6C, 0xDBE5, 0xE97E, 0xF8F7, 

139 0x1081, 0x0108, 0x3393, 0x221A, 0x56A5, 0x472C, 0x75B7, 0x643E, 

140 0x9CC9, 0x8D40, 0xBFDB, 0xAE52, 0xDAED, 0xCB64, 0xF9FF, 0xE876, 

141 0x2102, 0x308B, 0x0210, 0x1399, 0x6726, 0x76AF, 0x4434, 0x55BD, 

142 0xAD4A, 0xBCC3, 0x8E58, 0x9FD1, 0xEB6E, 0xFAE7, 0xC87C, 0xD9F5, 

143 0x3183, 0x200A, 0x1291, 0x0318, 0x77A7, 0x662E, 0x54B5, 0x453C, 

144 0xBDCB, 0xAC42, 0x9ED9, 0x8F50, 0xFBEF, 0xEA66, 0xD8FD, 0xC974, 

145 0x4204, 0x538D, 0x6116, 0x709F, 0x0420, 0x15A9, 0x2732, 0x36BB, 

146 0xCE4C, 0xDFC5, 0xED5E, 0xFCD7, 0x8868, 0x99E1, 0xAB7A, 0xBAF3, 

147 0x5285, 0x430C, 0x7197, 0x601E, 0x14A1, 0x0528, 0x37B3, 0x263A, 

148 0xDECD, 0xCF44, 0xFDDF, 0xEC56, 0x98E9, 0x8960, 0xBBFB, 0xAA72, 

149 0x6306, 0x728F, 0x4014, 0x519D, 0x2522, 0x34AB, 0x0630, 0x17B9, 

150 0xEF4E, 0xFEC7, 0xCC5C, 0xDDD5, 0xA96A, 0xB8E3, 0x8A78, 0x9BF1, 

151 0x7387, 0x620E, 0x5095, 0x411C, 0x35A3, 0x242A, 0x16B1, 0x0738, 

152 0xFFCF, 0xEE46, 0xDCDD, 0xCD54, 0xB9EB, 0xA862, 0x9AF9, 0x8B70, 

153 0x8408, 0x9581, 0xA71A, 0xB693, 0xC22C, 0xD3A5, 0xE13E, 0xF0B7, 

154 0x0840, 0x19C9, 0x2B52, 0x3ADB, 0x4E64, 0x5FED, 0x6D76, 0x7CFF, 

155 0x9489, 0x8500, 0xB79B, 0xA612, 0xD2AD, 0xC324, 0xF1BF, 0xE036, 

156 0x18C1, 0x0948, 0x3BD3, 0x2A5A, 0x5EE5, 0x4F6C, 0x7DF7, 0x6C7E, 

157 0xA50A, 0xB483, 0x8618, 0x9791, 0xE32E, 0xF2A7, 0xC03C, 0xD1B5, 

158 0x2942, 0x38CB, 0x0A50, 0x1BD9, 0x6F66, 0x7EEF, 0x4C74, 0x5DFD, 

159 0xB58B, 0xA402, 0x9699, 0x8710, 0xF3AF, 0xE226, 0xD0BD, 0xC134, 

160 0x39C3, 0x284A, 0x1AD1, 0x0B58, 0x7FE7, 0x6E6E, 0x5CF5, 0x4D7C, 

161 0xC60C, 0xD785, 0xE51E, 0xF497, 0x8028, 0x91A1, 0xA33A, 0xB2B3, 

162 0x4A44, 0x5BCD, 0x6956, 0x78DF, 0x0C60, 0x1DE9, 0x2F72, 0x3EFB, 

163 0xD68D, 0xC704, 0xF59F, 0xE416, 0x90A9, 0x8120, 0xB3BB, 0xA232, 

164 0x5AC5, 0x4B4C, 0x79D7, 0x685E, 0x1CE1, 0x0D68, 0x3FF3, 0x2E7A, 

165 0xE70E, 0xF687, 0xC41C, 0xD595, 0xA12A, 0xB0A3, 0x8238, 0x93B1, 

166 0x6B46, 0x7ACF, 0x4854, 0x59DD, 0x2D62, 0x3CEB, 0x0E70, 0x1FF9, 

167 0xF78F, 0xE606, 0xD49D, 0xC514, 0xB1AB, 0xA022, 0x92B9, 0x8330, 

168 0x7BC7, 0x6A4E, 0x58D5, 0x495C, 0x3DE3, 0x2C6A, 0x1EF1, 0x0F78, 

169) 

170# fmt: on 

171 

172 

173def _hostname_to_devicename(hostname: str) -> Optional[str]: 

174 """Extracts a lower-case device name from a host name in the way described 

175 in the module documentation. This does not yet attempt to find a suitable 

176 device. 

177 

178 >>> _hostname_to_devicename("ttyacm0.dev.alt") 

179 'ttyacm0' 

180 >>> _hostname_to_devicename("example.com") is None 

181 True 

182 """ 

183 if not hostname.endswith(".dev.alt"): 

184 return None 

185 

186 if any(c not in _UNRESERVED for c in hostname): 

187 return None 

188 

189 return hostname.removesuffix(".dev.alt") 

190 

191 

192def _checksum(data: bytes) -> bytes: 

193 """Calculates the Control message checksum (16-bit FCS from RFC1662), 

194 following its implementation in appendix C.2. 

195 

196 >>> _checksum(b"coap is cool!") 

197 b'$&' 

198 """ 

199 

200 fcs = 0xFFFF 

201 for byte in data: 

202 fcs = (fcs >> 8) ^ _FCS_LOOKUP[(fcs ^ byte) & 0xFF] 

203 fcs ^= 0xFFFF 

204 return fcs.to_bytes(2, "little") 

205 

206 

207class SlipmuxAddress(interfaces.EndpointAddress): 

208 def __init__(self, hostname, interface): 

209 if _hostname_to_devicename(hostname) is None: 

210 raise ValueError(f"Not a recognized host name: {hostname!r}") 

211 # This is overly strict, but we can still relax, and this simplifies 

212 # eg. hostinfo, and won't need normalization. 

213 self._host = hostname 

214 self._interface = weakref.ref(interface) 

215 

216 scheme = "coap" 

217 is_multicast = False 

218 is_multicast_locally = False 

219 

220 interface = property(lambda self: self._interface()) 

221 

222 def __hash__(self): 

223 return hash(self._host) 

224 

225 def __eq__(self, other): 

226 return self._host == other._host 

227 

228 def __repr__(self): 

229 """ 

230 >>> SlipmuxAddress("ttyusb0.dev.alt", MessageInterfaceSlipmux(..., ..., ..., ...)) 

231 <SlipmuxAddress ttyusb0.dev.alt> 

232 """ 

233 return "<%s %s>" % ( 

234 type(self).__name__, 

235 self._host, 

236 ) 

237 

238 @property 

239 def hostinfo(self): 

240 return self._host 

241 

242 @property 

243 def hostinfo_local(self): 

244 raise error.AnonymousHost 

245 

246 @property 

247 def uri_base(self): 

248 return "coap://" + self.hostinfo 

249 

250 @property 

251 def uri_base_local(self): 

252 raise error.AnonymousHost 

253 

254 @property 

255 def blockwise_key(self): 

256 return self._host 

257 

258 

259class MessageInterfaceSlipmux(interfaces.MessageInterface): 

260 """Message Interface for Slipmux. 

261 

262 As serial ports generally not be opened multiple times, this does not even 

263 try to manage running connections in the Endpoint objects, but spools 

264 endpoints until they break, at which point any attempt to send a message 

265 reconnects. 

266 

267 This keeps any number of connections open with no attempts to limit them, 

268 as the number of serial connections a system has is generally way lower 

269 than any RAM limits. 

270 """ 

271 

272 def __init__( 

273 self, params: TransportParameters, ctx: interfaces.MessageManager, log, loop 

274 ): 

275 self.__ctx = ctx 

276 self.__log = log 

277 self.__pool: dict[SlipmuxAddress, SlipmuxProtocol] = {} 

278 self.__loop = loop 

279 self.__params = params 

280 self.__unixlisten_states: dict[SlipmuxAddress, _UnixListenState] = {} 

281 

282 async def shutdown(self): 

283 for t in self.__unixlisten_states.values(): 

284 t.shutdown() 

285 for proto in self.__pool.values(): 

286 proto.transport.close() 

287 return 

288 

289 async def _get(self, remote: SlipmuxAddress) -> SlipmuxProtocol: 

290 if remote in self.__pool: 

291 return self.__pool[remote] 

292 devicename = _hostname_to_devicename(remote._host) 

293 assert devicename is not None, "Checked at construction at recognition" 

294 

295 starting_future = self.__loop.create_future() 

296 weakself = weakref.ref(self) 

297 connlog = self.__log.getChild(f"device-{devicename}") 

298 

299 def protocol_factory(): 

300 return SlipmuxProtocol(starting_future, weakself, remote, connlog) 

301 

302 assert self.__params.slipmux is not None 

303 devparams = self.__params.slipmux.devices.get(devicename, SlipmuxDevice()) 

304 if devparams.unix_connect is not None: 

305 (_, protocol) = await asyncio.get_running_loop().create_unix_connection( 

306 protocol_factory, 

307 # Type is ignored because mypy seems not to know that event 

308 # loops *do* accept paths. 

309 devparams.unix_connect, # type: ignore 

310 ) 

311 elif devparams.unix_listen is not None: 

312 # We only reach this when a request is sent there. Then 

313 # async-blocking on sending that request makes some sense: UNIX 

314 # sockets are used mostly in testing, and both sides need to be 

315 # ready to start talking. Suspending execution at the ._get() (and 

316 # thus recognizing the remote) will create some unexpected delays 

317 # when a process sends a request and then expects waiting for a 

318 # response to take long (rather than the initial recognition), but 

319 # that is probably fine. 

320 protocol = await self.__unixlisten_states[remote].get_waiting() 

321 else: 

322 full_devicename: Path | str | None = devparams.device 

323 if full_devicename is None: 

324 for filename in os.listdir("/dev/"): 

325 if filename.lower() == devicename: 

326 full_devicename = "/dev/" + filename 

327 break 

328 else: 

329 # sensible fallback for Windows, I guess 

330 full_devicename = devicename 

331 (_, protocol) = await serial_asyncio.create_serial_connection( 

332 self.__loop, 

333 protocol_factory, 

334 full_devicename, 

335 baudrate=115200, 

336 ) 

337 

338 self.__pool[remote] = protocol 

339 startup_error = await starting_future 

340 if startup_error is not None: 

341 raise startup_error 

342 return protocol 

343 

344 def _get_immediately(self, remote: SlipmuxAddress) -> SlipmuxProtocol: 

345 """Gets the relevant protcol, expecting that it is present (eg. when 

346 sending, because the remote was just recognized, which would have 

347 started the connection)""" 

348 return self.__pool[remote] 

349 

350 def send(self, message): 

351 protocol = self._get_immediately(message.remote) 

352 # FIXME: Where do we best do this? 

353 if message.opt.uri_host == message.remote.hostinfo: 

354 message.opt.uri_host = None 

355 protocol.send_control(message.encode()) 

356 

357 async def recognize_remote(self, remote): 

358 if isinstance(remote, SlipmuxAddress) and remote.interface == self: 

359 # See determine_remote: from the current API, this is the only 

360 # asynchronous point before a send. 

361 await self._get(remote) 

362 return True 

363 return False 

364 

365 async def determine_remote(self, message): 

366 # FIXME: Should we allow ports? 

367 if ( 

368 message.remote.scheme == "coap" 

369 and _hostname_to_devicename(message.remote.hostinfo) is not None 

370 ): 

371 address = SlipmuxAddress(message.remote.hostinfo, self) 

372 # We have to connect now and await connection -- not just because 

373 # this gives us errors reasonably fast, but also because `.send` in 

374 # the message interface is synchronus. 

375 await self._get(address) 

376 return address 

377 

378 @classmethod 

379 async def create_transport_endpoint( 

380 cls, 

381 params: TransportParameters, 

382 ctx: interfaces.MessageManager, 

383 log, 

384 loop, 

385 ): 

386 slef = cls(params, ctx, log, loop) 

387 if params.is_server: 

388 assert params.slipmux is not None 

389 for key, value in params.slipmux.devices.items(): 

390 remote = SlipmuxAddress(f"{key}.dev.alt", slef) 

391 if value.unix_listen: 

392 # _get would async-block, we need to spawn an actual task 

393 # that'll keep working with incoming peers 

394 unixlisten = slef.__unixlisten_states[remote] = _UnixListenState( 

395 slef, 

396 slef.__log.getChild(f"serverdevice-{key}"), 

397 loop, 

398 remote, 

399 value, 

400 ) 

401 await unixlisten.start() 

402 else: 

403 await slef._get(remote) 

404 return slef 

405 

406 # Provided for _UnixListenState 

407 

408 def unixlisten_available(self, remote, protocol): 

409 self.__pool[remote] = protocol 

410 

411 # provided for SlipmuxProtocol 

412 

413 def received(self, remote, data): 

414 try: 

415 try: 

416 message = Message.decode(data, remote) 

417 except error.UnparsableMessage: 

418 self.__log.warning("Ignoring unparsable message from %s", remote) 

419 return 

420 

421 self.__ctx.dispatch_message(message) 

422 

423 except BaseException as exc: 

424 # Catching here because util.asyncio.recvmsg inherits 

425 # _SelectorDatagramTransport's bad handling of callback errors; 

426 # this is the last time we have a log at hand. 

427 self.__log.error( 

428 "Exception raised through dispatch_message: %s", exc, exc_info=exc 

429 ) 

430 raise 

431 

432 def terminated(self, remote, exception): 

433 if state := self.__unixlisten_states.get(remote): 

434 state.disconnected() 

435 self.__ctx.dispatch_error(exception, remote) 

436 

437 

438class SlipmuxProtocol(asyncio.Protocol): 

439 def __init__(self, starting_future, weakinstance, remote_handle, log): 

440 self.__starting = starting_future 

441 self.__instance = weakinstance 

442 self.__remote_handle = remote_handle 

443 self.__control_frame: Optional[list] = None 

444 self.__escape = False 

445 self.__log = log 

446 

447 def send_control(self, data: bytes): 

448 data = bytes((HEADER_CONTROL,)) + data 

449 data += _checksum(data) 

450 data = data.replace(BYTE_ESC, BYTES_ESC_ESC_ESC).replace( 

451 BYTE_END, BYTES_ESC_ESC_END 

452 ) 

453 # seems to be general practice to send leading ENDs generously 

454 data = BYTE_END + data + BYTE_END 

455 self.transport.write(data) 

456 

457 def connection_made(self, transport): 

458 self.transport = transport 

459 self.__starting.set_result(None) 

460 

461 def _end(self, is_regular: bool): 

462 """Restores the state after an END has been received. 

463 

464 On a regular END condition, this checks the FCS and emits the frame, if 

465 any, and then resets; otherwise, it only resets.""" 

466 if is_regular and isinstance(self.__control_frame, list): 

467 if len(self.__control_frame) < 2: 

468 self.__log.warn("Control frame too short") 

469 else: 

470 message = bytes(self.__control_frame[:-2]) 

471 fcs = bytes(self.__control_frame[-2:]) 

472 if fcs != _checksum(bytes((HEADER_CONTROL,)) + message): 

473 self.__log.warn("FCS mismatch") 

474 else: 

475 instance = self.__instance() 

476 if instance is None: 

477 self.__log.warn( 

478 "Discarding incoming message: Instance shut down" 

479 ) 

480 else: 

481 instance.received(self.__remote_handle, message) 

482 self.__escape = False 

483 self.__control_frame = None 

484 

485 def data_received(self, data): 

486 for byte in data: 

487 if self.__escape: 

488 match byte: 

489 case slipmux.ESC_END: 

490 byte = END 

491 case slipmux.ESC_ESC: 

492 byte = ESC 

493 case slipmux.END: 

494 self.__log.info("Frame has been aborted.") 

495 self._end(False) 

496 continue 

497 case _: 

498 self._end(False) 

499 self.__log.warning( 

500 "Framing error: Non-Escape value after Escape" 

501 ) 

502 continue 

503 self.__escape = False 

504 else: 

505 if byte == ESC: 

506 self.__escape = True 

507 continue 

508 if byte == END: 

509 self._end(True) 

510 continue 

511 

512 if self.__control_frame is None: 

513 if byte == HEADER_CONTROL: 

514 self.__control_frame = [] 

515 else: 

516 self.__control_frame = False 

517 continue 

518 

519 if isinstance(self.__control_frame, list): 

520 self.__control_frame.append(byte) 

521 

522 def connection_lost(self, exc): 

523 if not self.__starting.done(): 

524 self.set_result(exc) 

525 

526 instance = self.__instance() 

527 if instance is None: 

528 self.__log.warn( 

529 "Discarding disconnect error (%r from %r): Instance shut down", 

530 exc, 

531 self.__remote_handler, 

532 ) 

533 else: 

534 instance.terminated(self.__remote_handle, exc) 

535 

536 # We do not implement pause_writing / resume_writing: the message interface 

537 # has no backpressure, so we'll just fill up the buffer, but given that 

538 # CoAP's flow control applies anyway (and it is way more conservative than 

539 # any actual UART's baud rate), there is little risk of excessive buffer 

540 # build-up. 

541 

542 

543class _UnixListenState: 

544 """Wrapper around a UNIX listening server. Serves one connection at a time.""" 

545 

546 def __init__(self, message_interface, log, loop, remote, transport_params): 

547 self.__message_interface = weakref.ref(message_interface) 

548 self.__log = log 

549 self.__loop = loop 

550 self.__remote = remote 

551 

552 self.__unix_socket_filename = transport_params.unix_listen 

553 assert self.__unix_socket_filename is not None, ( 

554 "_UnixListenState created on non-unix-listen transport config" 

555 ) 

556 

557 self.__current_connection = self.__loop.create_future() 

558 

559 async def start(self): 

560 """Runs asynchronous initialization steps (creating the UNIX socket). 

561 

562 Run this exactly once per instance, after init.""" 

563 

564 self.__server = await self.__loop.create_unix_server( 

565 self.protocol_factory, 

566 self.__unix_socket_filename, 

567 ) 

568 

569 def disconnected(self): 

570 self.__current_connection = self.__loop.create_future() 

571 

572 def shutdown(self): 

573 self.__server.close() 

574 

575 @property 

576 def message_interface(self) -> MessageInterfaceSlipmux: 

577 mi = self.__message_interface() 

578 if mi is None: 

579 self.__log.warn("Message interface vanished without a shutdown") 

580 raise asyncio.CancelledError 

581 return mi 

582 

583 def protocol_factory(self): 

584 if self.__current_connection.done(): 

585 

586 class ShutDownImmediately: 

587 def connection_made(self, transport): 

588 transport.write( 

589 b"\xc0\nRefusing connection: a slipmux session is currently ongoing.\n\xc0" 

590 ) 

591 transport.close() 

592 

593 def connection_lost(self, exc): 

594 pass 

595 

596 return ShutDownImmediately() 

597 else: 

598 # We're not using it: As a listening UNIX socket we can trust that 

599 # when the future factory is called, an connection_made is run 

600 # immediately. 

601 starting_future = self.__loop.create_future() 

602 protocol = SlipmuxProtocol( 

603 starting_future, self.__message_interface, self.__remote, self.__log 

604 ) 

605 self.__current_connection.set_result(protocol) 

606 self.message_interface.unixlisten_available(self.__remote, protocol) 

607 return protocol 

608 

609 async def get_waiting(self) -> SlipmuxProtocol: 

610 """Returns the connected protocol, or waits until one is connected""" 

611 return await self.__current_connection