Coverage for aiocoap / transports / slipmux.py: 82%

260 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 12:28 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module implements slipmux-03_, with some adjustments regarding URI design 

6taken from transport-indication_: It chooses the pattern `{NAME}.dev.alt` for 

7devices named /dev/{NAME}, which are treated case-insensitively. 

8 

9Configuration 

10============= 

11 

12While the ``slipmux`` transport is implicitly enabled whenever its dependencies 

13are met, it has no effect on servers (because devices are not automatically 

14bound to: a slipmux device is not generally recognizable as such). 

15 

16To set up server configuration, or for extra options such as the use or 

17creation of UNIX sockets, this transport can be configured by setting 

18:class:`aiocoap.config.SlipmuxParameters` in the transports configured at 

19context creation (:meth:`Context.create_client_context() 

20<aiocoap.protocol.Context.create_client_context>` / 

21:meth:`create_server_context() 

22<aiocoap.protocol.Context.create_server_context>`). 

23 

24Usage example 

25============= 

26 

27Client with physical hardware 

28----------------------------- 

29 

30Assuming you have a constrained device that suports slipmux connected to a PC 

31as ``/dev/ttyACM0``, you can run:: 

32 

33 $ aiocoap-client coap://ttyacm0.dev.alt/.well-known/core 

34 

35and interact with resources as found there; no further configuration is needed. 

36 

37Server with test peer 

38--------------------- 

39 

40To mock slipmux without a real serial connection, you can configure a slipmux 

41host name to open up a UNIX socket instead. Write this configuration into 

42``config-unix-listen.toml``:: 

43 

44 [transport.slipmux.devices] 

45 my-listener = { unix-listen = "/tmp/coap.socket" } 

46 

47Then run a server such as the file server:: 

48 

49 $ aiocoap-fileserver --server-config config-unix-listen.toml 

50 

51You can then run a client such as Jelly intreactively with that server:: 

52 

53 $ cargo install Jelly 

54 $ Jelly /tmp/coap.socket 

55 

56Beware that the aiocoap based server needs to be restarted when the client 

57disconnects (see Caveats below). 

58 

59Client with test peer 

60--------------------- 

61 

62You can also run aiocoap as a test client, but as that doesn't take a file name 

63argument to connect to (because it operates on URIs' host components), some 

64configuration is necessary. Store this in ``config-unix-connect.toml``:: 

65 

66 [transport.slipmux.devices] 

67 my-connection = { unix-connect = "/tmp/coap.socket" } 

68 

69Then run:: 

70 

71 $ aiocoap-client coap://my-connection.dev.alt/.well-known/core --config config-unix-connect.toml 

72 

73Caveats 

74======= 

75 

76While servers do connect automatically to any configured slipmux endpoint, they 

77do not reconnect automatically when that device goes away or is replaced (as 

78may happen when resetting a development board, depending on its USB UART 

79implementation). The same is true for UNIX sockets. 

80 

81Error handing is generally incomplete when it comes to I/O errors. 

82 

83This transport is currently not tested automatically, as only the client side 

84is implemented, with no mechanism for acting on (eg.) a UNIX socket instead. 

85 

86.. _slipmux-03: https://datatracker.ietf.org/doc/draft-bormann-t2trg-slipmux/03/ 

87.. _transport-indication: https://datatracker.ietf.org/doc/draft-ietf-core-transport-indication 

88 

89------ 

90""" 

91 

92from __future__ import annotations 

93 

94# Implementation experience notes: 

95# 

96# * lower/upper in device names is tedious, esp. with defaults such as ttyACM0 

97# * FCS is very old reference that had no test vectors; single example would be helpful. 

98# * Ports numbers? 

99 

100import asyncio 

101import os 

102from pathlib import Path 

103import string 

104from typing import Optional 

105import weakref 

106 

107import serial_asyncio 

108 

109from .. import Message 

110from .. import error, interfaces 

111 

112# circular but allows matching on constants 

113from . import slipmux 

114 

115from ..config import TransportParameters, SlipmuxDevice 

116 

117# from RFC1055 

118ESC = 0o333 

119END = 0o300 

120ESC_END = 0o334 

121ESC_ESC = 0o335 

122# but at runtime we need the combined forms 

123BYTE_ESC = bytes((ESC,)) 

124BYTE_END = bytes((END,)) 

125BYTES_ESC_ESC_END = BYTE_ESC + bytes((ESC_END,)) 

126BYTES_ESC_ESC_ESC = BYTE_ESC + bytes((ESC_ESC,)) 

127 

128# from slipmux draft 

129HEADER_CONTROL = 0xA9 

130 

131# from RFC3986 

132_UNRESERVED = string.ascii_lowercase + string.digits + "-._~" 

133 

134# Values from RFC1662 

135# fmt: off 

136_FCS_LOOKUP = ( 

137 0x0000, 0x1189, 0x2312, 0x329B, 0x4624, 0x57AD, 0x6536, 0x74BF, 

138 0x8C48, 0x9DC1, 0xAF5A, 0xBED3, 0xCA6C, 0xDBE5, 0xE97E, 0xF8F7, 

139 0x1081, 0x0108, 0x3393, 0x221A, 0x56A5, 0x472C, 0x75B7, 0x643E, 

140 0x9CC9, 0x8D40, 0xBFDB, 0xAE52, 0xDAED, 0xCB64, 0xF9FF, 0xE876, 

141 0x2102, 0x308B, 0x0210, 0x1399, 0x6726, 0x76AF, 0x4434, 0x55BD, 

142 0xAD4A, 0xBCC3, 0x8E58, 0x9FD1, 0xEB6E, 0xFAE7, 0xC87C, 0xD9F5, 

143 0x3183, 0x200A, 0x1291, 0x0318, 0x77A7, 0x662E, 0x54B5, 0x453C, 

144 0xBDCB, 0xAC42, 0x9ED9, 0x8F50, 0xFBEF, 0xEA66, 0xD8FD, 0xC974, 

145 0x4204, 0x538D, 0x6116, 0x709F, 0x0420, 0x15A9, 0x2732, 0x36BB, 

146 0xCE4C, 0xDFC5, 0xED5E, 0xFCD7, 0x8868, 0x99E1, 0xAB7A, 0xBAF3, 

147 0x5285, 0x430C, 0x7197, 0x601E, 0x14A1, 0x0528, 0x37B3, 0x263A, 

148 0xDECD, 0xCF44, 0xFDDF, 0xEC56, 0x98E9, 0x8960, 0xBBFB, 0xAA72, 

149 0x6306, 0x728F, 0x4014, 0x519D, 0x2522, 0x34AB, 0x0630, 0x17B9, 

150 0xEF4E, 0xFEC7, 0xCC5C, 0xDDD5, 0xA96A, 0xB8E3, 0x8A78, 0x9BF1, 

151 0x7387, 0x620E, 0x5095, 0x411C, 0x35A3, 0x242A, 0x16B1, 0x0738, 

152 0xFFCF, 0xEE46, 0xDCDD, 0xCD54, 0xB9EB, 0xA862, 0x9AF9, 0x8B70, 

153 0x8408, 0x9581, 0xA71A, 0xB693, 0xC22C, 0xD3A5, 0xE13E, 0xF0B7, 

154 0x0840, 0x19C9, 0x2B52, 0x3ADB, 0x4E64, 0x5FED, 0x6D76, 0x7CFF, 

155 0x9489, 0x8500, 0xB79B, 0xA612, 0xD2AD, 0xC324, 0xF1BF, 0xE036, 

156 0x18C1, 0x0948, 0x3BD3, 0x2A5A, 0x5EE5, 0x4F6C, 0x7DF7, 0x6C7E, 

157 0xA50A, 0xB483, 0x8618, 0x9791, 0xE32E, 0xF2A7, 0xC03C, 0xD1B5, 

158 0x2942, 0x38CB, 0x0A50, 0x1BD9, 0x6F66, 0x7EEF, 0x4C74, 0x5DFD, 

159 0xB58B, 0xA402, 0x9699, 0x8710, 0xF3AF, 0xE226, 0xD0BD, 0xC134, 

160 0x39C3, 0x284A, 0x1AD1, 0x0B58, 0x7FE7, 0x6E6E, 0x5CF5, 0x4D7C, 

161 0xC60C, 0xD785, 0xE51E, 0xF497, 0x8028, 0x91A1, 0xA33A, 0xB2B3, 

162 0x4A44, 0x5BCD, 0x6956, 0x78DF, 0x0C60, 0x1DE9, 0x2F72, 0x3EFB, 

163 0xD68D, 0xC704, 0xF59F, 0xE416, 0x90A9, 0x8120, 0xB3BB, 0xA232, 

164 0x5AC5, 0x4B4C, 0x79D7, 0x685E, 0x1CE1, 0x0D68, 0x3FF3, 0x2E7A, 

165 0xE70E, 0xF687, 0xC41C, 0xD595, 0xA12A, 0xB0A3, 0x8238, 0x93B1, 

166 0x6B46, 0x7ACF, 0x4854, 0x59DD, 0x2D62, 0x3CEB, 0x0E70, 0x1FF9, 

167 0xF78F, 0xE606, 0xD49D, 0xC514, 0xB1AB, 0xA022, 0x92B9, 0x8330, 

168 0x7BC7, 0x6A4E, 0x58D5, 0x495C, 0x3DE3, 0x2C6A, 0x1EF1, 0x0F78, 

169) 

170# fmt: on 

171 

172 

173def _hostname_to_devicename(hostname: str) -> Optional[str]: 

174 """Extracts a lower-case device name from a host name in the way described 

175 in the module documentation. This does not yet attempt to find a suitable 

176 device. 

177 

178 >>> _hostname_to_devicename("ttyacm0.dev.alt") 

179 'ttyacm0' 

180 >>> _hostname_to_devicename("example.com") is None 

181 True 

182 """ 

183 if not hostname.endswith(".dev.alt"): 

184 return None 

185 

186 if any(c not in _UNRESERVED for c in hostname): 

187 return None 

188 

189 return hostname.removesuffix(".dev.alt") 

190 

191 

192def _checksum(data: bytes) -> bytes: 

193 """Calculates the Control message checksum (16-bit FCS from RFC1662), 

194 following its implementation in appendix C.2. 

195 

196 >>> _checksum(b"coap is cool!") 

197 b'$&' 

198 """ 

199 

200 fcs = 0xFFFF 

201 for byte in data: 

202 fcs = (fcs >> 8) ^ _FCS_LOOKUP[(fcs ^ byte) & 0xFF] 

203 fcs ^= 0xFFFF 

204 return fcs.to_bytes(2, "little") 

205 

206 

207class SlipmuxAddress(interfaces.EndpointAddress): 

208 def __init__(self, hostname, interface): 

209 if _hostname_to_devicename(hostname) is None: 

210 raise ValueError(f"Not a recognized host name: {hostname!r}") 

211 # This is overly strict, but we can still relax, and this simplifies 

212 # eg. hostinfo, and won't need normalization. 

213 self._host = hostname 

214 self._interface = weakref.ref(interface) 

215 

216 scheme = "coap" 

217 is_multicast = False 

218 is_multicast_locally = False 

219 

220 interface = property(lambda self: self._interface()) 

221 

222 def __hash__(self): 

223 return hash(self._host) 

224 

225 def __eq__(self, other): 

226 return self._host == other._host 

227 

228 def __repr__(self): 

229 """ 

230 >>> SlipmuxAddress("ttyusb0.dev.alt", MessageInterfaceSlipmux(..., ..., ..., ...)) 

231 <SlipmuxAddress ttyusb0.dev.alt> 

232 """ 

233 return "<%s %s>" % ( 

234 type(self).__name__, 

235 self._host, 

236 ) 

237 

238 @property 

239 def hostinfo(self): 

240 return self._host 

241 

242 @property 

243 def hostinfo_local(self): 

244 raise error.AnonymousHost 

245 

246 @property 

247 def uri_base(self): 

248 return "coap://" + self.hostinfo 

249 

250 @property 

251 def uri_base_local(self): 

252 raise error.AnonymousHost 

253 

254 @property 

255 def blockwise_key(self): 

256 return self._host 

257 

258 

259class MessageInterfaceSlipmux(interfaces.MessageInterface): 

260 """Message Interface for Slipmux. 

261 

262 As serial ports generally not be opened multiple times, this does not even 

263 try to manage running connections in the Endpoint objects, but spools 

264 endpoints until they break, at which point any attempt to send a message 

265 reconnects. 

266 

267 This keeps any number of connections open with no attempts to limit them, 

268 as the number of serial connections a system has is generally way lower 

269 than any RAM limits. 

270 """ 

271 

272 def __init__( 

273 self, params: TransportParameters, ctx: interfaces.MessageManager, log, loop 

274 ): 

275 self.__ctx = ctx 

276 self.__log = log 

277 self.__pool: dict[SlipmuxAddress, SlipmuxProtocol] = {} 

278 self.__loop = loop 

279 self.__params = params 

280 self.__unixlisten_states: dict[SlipmuxAddress, _UnixListenState] = {} 

281 

282 async def shutdown(self): 

283 for t in self.__unixlisten_states.values(): 

284 t.shutdown() 

285 for proto in self.__pool.values(): 

286 proto.transport.close() 

287 return 

288 

289 async def _get(self, remote: SlipmuxAddress) -> SlipmuxProtocol: 

290 if remote in self.__pool: 

291 return self.__pool[remote] 

292 devicename = _hostname_to_devicename(remote._host) 

293 assert devicename is not None, "Checked at construction at recognition" 

294 

295 starting_future = self.__loop.create_future() 

296 weakself = weakref.ref(self) 

297 connlog = self.__log.getChild(f"device-{devicename}") 

298 

299 def protocol_factory(): 

300 return SlipmuxProtocol(starting_future, weakself, remote, connlog) 

301 

302 assert self.__params.slipmux is not None 

303 devparams = self.__params.slipmux.devices.get(devicename, SlipmuxDevice()) 

304 if devparams.unix_connect is not None: 

305 (_, protocol) = await asyncio.get_running_loop().create_unix_connection( 

306 protocol_factory, 

307 # Type is ignored because mypy seems not to know that event 

308 # loops *do* accept paths. 

309 devparams.unix_connect, # type: ignore 

310 ) 

311 elif devparams.unix_listen is not None: 

312 # We only reach this when a request is sent there. Then 

313 # async-blocking on sending that request makes some sense: UNIX 

314 # sockets are used mostly in testing, and both sides need to be 

315 # ready to start talking. Suspending execution at the ._get() (and 

316 # thus recognizing the remote) will create some unexpected delays 

317 # when a process sends a request and then expects waiting for a 

318 # response to take long (rather than the initial recognition), but 

319 # that is probably fine. 

320 protocol = await self.__unixlisten_states[remote].get_waiting() 

321 else: 

322 full_devicename: Path | str | None = devparams.device 

323 if full_devicename is None: 

324 for filename in os.listdir("/dev/"): 

325 if filename.lower() == devicename: 

326 full_devicename = "/dev/" + filename 

327 break 

328 else: 

329 # sensible fallback for Windows, I guess 

330 full_devicename = devicename 

331 (_, protocol) = await serial_asyncio.create_serial_connection( 

332 self.__loop, 

333 protocol_factory, 

334 full_devicename, 

335 baudrate=115200, 

336 ) 

337 

338 self.__pool[remote] = protocol 

339 startup_error = await starting_future 

340 if startup_error is not None: 

341 raise startup_error 

342 return protocol 

343 

344 def _get_immediately(self, remote: SlipmuxAddress) -> SlipmuxProtocol: 

345 """Gets the relevant protcol, expecting that it is present (eg. when 

346 sending, because the remote was just recognized, which would have 

347 started the connection)""" 

348 return self.__pool[remote] 

349 

350 def send(self, message): 

351 protocol = self._get_immediately(message.remote) 

352 # FIXME: Where do we best do this? 

353 if message.opt.uri_host == message.remote.hostinfo: 

354 message.opt.uri_host = None 

355 protocol.send_control(message.encode()) 

356 

357 async def recognize_remote(self, remote): 

358 if isinstance(remote, SlipmuxAddress) and remote.interface == self: 

359 # See determine_remote: from the current API, this is the only 

360 # asynchronous point before a send. 

361 await self._get(remote) 

362 return False 

363 

364 async def determine_remote(self, message): 

365 # FIXME: Should we allow ports? 

366 if ( 

367 message.remote.scheme == "coap" 

368 and _hostname_to_devicename(message.remote.hostinfo) is not None 

369 ): 

370 address = SlipmuxAddress(message.remote.hostinfo, self) 

371 # We have to connect now and await connection -- not just because 

372 # this gives us errors reasonably fast, but also because `.send` in 

373 # the message interface is synchronus. 

374 await self._get(address) 

375 return address 

376 

377 @classmethod 

378 async def create_transport_endpoint( 

379 cls, 

380 params: TransportParameters, 

381 ctx: interfaces.MessageManager, 

382 log, 

383 loop, 

384 ): 

385 slef = cls(params, ctx, log, loop) 

386 if params.is_server: 

387 assert params.slipmux is not None 

388 for key, value in params.slipmux.devices.items(): 

389 remote = SlipmuxAddress(f"{key}.dev.alt", slef) 

390 if value.unix_listen: 

391 # _get would async-block, we need to spawn an actual task 

392 # that'll keep working with incoming peers 

393 unixlisten = slef.__unixlisten_states[remote] = _UnixListenState( 

394 slef, 

395 slef.__log.getChild(f"serverdevice-{key}"), 

396 loop, 

397 remote, 

398 value, 

399 ) 

400 await unixlisten.start() 

401 else: 

402 await slef._get(remote) 

403 return slef 

404 

405 # Provided for _UnixListenState 

406 

407 def unixlisten_available(self, remote, protocol): 

408 self.__pool[remote] = protocol 

409 

410 # provided for SlipmuxProtocol 

411 

412 def received(self, remote, data): 

413 try: 

414 try: 

415 message = Message.decode(data, remote) 

416 except error.UnparsableMessage: 

417 self.__log.warning("Ignoring unparsable message from %s", remote) 

418 return 

419 

420 self.__ctx.dispatch_message(message) 

421 

422 except BaseException as exc: 

423 # Catching here because util.asyncio.recvmsg inherits 

424 # _SelectorDatagramTransport's bad handling of callback errors; 

425 # this is the last time we have a log at hand. 

426 self.__log.error( 

427 "Exception raised through dispatch_message: %s", exc, exc_info=exc 

428 ) 

429 raise 

430 

431 def terminated(self, remote, exception): 

432 if state := self.__unixlisten_states.get(remote): 

433 state.disconnected() 

434 self.__ctx.dispatch_error(exception, remote) 

435 

436 

437class SlipmuxProtocol(asyncio.Protocol): 

438 def __init__(self, starting_future, weakinstance, remote_handle, log): 

439 self.__starting = starting_future 

440 self.__instance = weakinstance 

441 self.__remote_handle = remote_handle 

442 self.__control_frame: Optional[list] = None 

443 self.__escape = False 

444 self.__log = log 

445 

446 def send_control(self, data: bytes): 

447 data = bytes((HEADER_CONTROL,)) + data 

448 data += _checksum(data) 

449 data = data.replace(BYTE_ESC, BYTES_ESC_ESC_ESC).replace( 

450 BYTE_END, BYTES_ESC_ESC_END 

451 ) 

452 # seems to be general practice to send leading ENDs generously 

453 data = BYTE_END + data + BYTE_END 

454 self.transport.write(data) 

455 

456 def connection_made(self, transport): 

457 self.transport = transport 

458 self.__starting.set_result(None) 

459 

460 def _end(self, is_regular: bool): 

461 """Restores the state after an END has been received. 

462 

463 On a regular END condition, this checks the FCS and emits the frame, if 

464 any, and then resets; otherwise, it only resets.""" 

465 if is_regular and isinstance(self.__control_frame, list): 

466 if len(self.__control_frame) < 2: 

467 self.__log.warn("Control frame too short") 

468 else: 

469 message = bytes(self.__control_frame[:-2]) 

470 fcs = bytes(self.__control_frame[-2:]) 

471 if fcs != _checksum(bytes((HEADER_CONTROL,)) + message): 

472 self.__log.warn("FCS mismatch") 

473 else: 

474 instance = self.__instance() 

475 if instance is None: 

476 self.__log.warn( 

477 "Discarding incoming message: Instance shut down" 

478 ) 

479 else: 

480 instance.received(self.__remote_handle, message) 

481 self.__escape = False 

482 self.__control_frame = None 

483 

484 def data_received(self, data): 

485 for byte in data: 

486 if self.__escape: 

487 match byte: 

488 case slipmux.ESC_END: 

489 byte = END 

490 case slipmux.ESC_ESC: 

491 byte = ESC 

492 case slipmux.END: 

493 self.__log.info("Frame has been aborted.") 

494 self._end(False) 

495 continue 

496 case _: 

497 self._end(False) 

498 self.__log.warning( 

499 "Framing error: Non-Escape value after Escape" 

500 ) 

501 continue 

502 self.__escape = False 

503 else: 

504 if byte == ESC: 

505 self.__escape = True 

506 continue 

507 if byte == END: 

508 self._end(True) 

509 continue 

510 

511 if self.__control_frame is None: 

512 if byte == HEADER_CONTROL: 

513 self.__control_frame = [] 

514 else: 

515 self.__control_frame = False 

516 continue 

517 

518 if isinstance(self.__control_frame, list): 

519 self.__control_frame.append(byte) 

520 

521 def connection_lost(self, exc): 

522 if not self.__starting.done(): 

523 self.set_result(exc) 

524 

525 instance = self.__instance() 

526 if instance is None: 

527 self.__log.warn( 

528 "Discarding disconnect error (%r from %r): Instance shut down", 

529 exc, 

530 self.__remote_handler, 

531 ) 

532 else: 

533 instance.terminated(self.__remote_handle, exc) 

534 

535 # We do not implement pause_writing / resume_writing: the message interface 

536 # has no backpressure, so we'll just fill up the buffer, but given that 

537 # CoAP's flow control applies anyway (and it is way more conservative than 

538 # any actual UART's baud rate), there is little risk of excessive buffer 

539 # build-up. 

540 

541 

542class _UnixListenState: 

543 """Wrapper around a UNIX listening server. Serves one connection at a time.""" 

544 

545 def __init__(self, message_interface, log, loop, remote, transport_params): 

546 self.__message_interface = weakref.ref(message_interface) 

547 self.__log = log 

548 self.__loop = loop 

549 self.__remote = remote 

550 

551 self.__unix_socket_filename = transport_params.unix_listen 

552 assert self.__unix_socket_filename is not None, ( 

553 "_UnixListenState created on non-unix-listen transport config" 

554 ) 

555 

556 self.__current_connection = self.__loop.create_future() 

557 

558 async def start(self): 

559 """Runs asynchronous initialization steps (creating the UNIX socket). 

560 

561 Run this exactly once per instance, after init.""" 

562 

563 self.__server = await self.__loop.create_unix_server( 

564 self.protocol_factory, 

565 self.__unix_socket_filename, 

566 ) 

567 

568 def disconnected(self): 

569 self.__current_connection = self.__loop.create_future() 

570 

571 def shutdown(self): 

572 self.__server.close() 

573 

574 @property 

575 def message_interface(self) -> MessageInterfaceSlipmux: 

576 mi = self.__message_interface() 

577 if mi is None: 

578 self.__log.warn("Message interface vanished without a shutdown") 

579 raise asyncio.CancelledError 

580 return mi 

581 

582 def protocol_factory(self): 

583 if self.__current_connection.done(): 

584 

585 class ShutDownImmediately: 

586 def connection_made(self, transport): 

587 transport.write( 

588 b"\xc0\nRefusing connection: a slipmux session is currently ongoing.\n\xc0" 

589 ) 

590 transport.close() 

591 

592 def connection_lost(self, exc): 

593 pass 

594 

595 return ShutDownImmediately() 

596 else: 

597 # We're not using it: As a listening UNIX socket we can trust that 

598 # when the future factory is called, an connection_made is run 

599 # immediately. 

600 starting_future = self.__loop.create_future() 

601 protocol = SlipmuxProtocol( 

602 starting_future, self.__message_interface, self.__remote, self.__log 

603 ) 

604 self.__current_connection.set_result(protocol) 

605 self.message_interface.unixlisten_available(self.__remote, protocol) 

606 return protocol 

607 

608 async def get_waiting(self) -> SlipmuxProtocol: 

609 """Returns the connected protocol, or waits until one is connected""" 

610 return await self.__current_connection