Coverage for aiocoap / transports / slipmux.py: 82%
261 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:32 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module implements slipmux-03_, with some adjustments regarding URI design
6taken from transport-indication_: It chooses the pattern `{NAME}.dev.alt` for
7devices named /dev/{NAME}, which are treated case-insensitively.
9Configuration
10=============
12While the ``slipmux`` transport is implicitly enabled whenever its dependencies
13are met, it has no effect on servers (because devices are not automatically
14bound to: a slipmux device is not generally recognizable as such).
16To set up server configuration, or for extra options such as the use or
17creation of UNIX sockets, this transport can be configured by setting
18:class:`aiocoap.config.SlipmuxParameters` in the transports configured at
19context creation (:meth:`Context.create_client_context()
20<aiocoap.protocol.Context.create_client_context>` /
21:meth:`create_server_context()
22<aiocoap.protocol.Context.create_server_context>`).
24Usage example
25=============
27Client with physical hardware
28-----------------------------
30Assuming you have a constrained device that suports slipmux connected to a PC
31as ``/dev/ttyACM0``, you can run::
33 $ aiocoap-client coap://ttyacm0.dev.alt/.well-known/core
35and interact with resources as found there; no further configuration is needed.
37Server with test peer
38---------------------
40To mock slipmux without a real serial connection, you can configure a slipmux
41host name to open up a UNIX socket instead. Write this configuration into
42``config-unix-listen.toml``::
44 [transport.slipmux.devices]
45 my-listener = { unix-listen = "/tmp/coap.socket" }
47Then run a server such as the file server::
49 $ aiocoap-fileserver --server-config config-unix-listen.toml
51You can then run a client such as Jelly intreactively with that server::
53 $ cargo install Jelly
54 $ Jelly /tmp/coap.socket
56Beware that the aiocoap based server needs to be restarted when the client
57disconnects (see Caveats below).
59Client with test peer
60---------------------
62You can also run aiocoap as a test client, but as that doesn't take a file name
63argument to connect to (because it operates on URIs' host components), some
64configuration is necessary. Store this in ``config-unix-connect.toml``::
66 [transport.slipmux.devices]
67 my-connection = { unix-connect = "/tmp/coap.socket" }
69Then run::
71 $ aiocoap-client coap://my-connection.dev.alt/.well-known/core --config config-unix-connect.toml
73Caveats
74=======
76While servers do connect automatically to any configured slipmux endpoint, they
77do not reconnect automatically when that device goes away or is replaced (as
78may happen when resetting a development board, depending on its USB UART
79implementation). The same is true for UNIX sockets.
81Error handing is generally incomplete when it comes to I/O errors.
83This transport is currently not tested automatically, as only the client side
84is implemented, with no mechanism for acting on (eg.) a UNIX socket instead.
86.. _slipmux-03: https://datatracker.ietf.org/doc/draft-bormann-t2trg-slipmux/03/
87.. _transport-indication: https://datatracker.ietf.org/doc/draft-ietf-core-transport-indication
89------
90"""
92from __future__ import annotations
94# Implementation experience notes:
95#
96# * lower/upper in device names is tedious, esp. with defaults such as ttyACM0
97# * FCS is very old reference that had no test vectors; single example would be helpful.
98# * Ports numbers?
100import asyncio
101import os
102from pathlib import Path
103import string
104from typing import Optional
105import weakref
107import serial_asyncio
109from .. import Message
110from .. import error, interfaces
112# circular but allows matching on constants
113from . import slipmux
115from ..config import TransportParameters, SlipmuxDevice
117# from RFC1055
118ESC = 0o333
119END = 0o300
120ESC_END = 0o334
121ESC_ESC = 0o335
122# but at runtime we need the combined forms
123BYTE_ESC = bytes((ESC,))
124BYTE_END = bytes((END,))
125BYTES_ESC_ESC_END = BYTE_ESC + bytes((ESC_END,))
126BYTES_ESC_ESC_ESC = BYTE_ESC + bytes((ESC_ESC,))
128# from slipmux draft
129HEADER_CONTROL = 0xA9
131# from RFC3986
132_UNRESERVED = string.ascii_lowercase + string.digits + "-._~"
134# Values from RFC1662
135# fmt: off
136_FCS_LOOKUP = (
137 0x0000, 0x1189, 0x2312, 0x329B, 0x4624, 0x57AD, 0x6536, 0x74BF,
138 0x8C48, 0x9DC1, 0xAF5A, 0xBED3, 0xCA6C, 0xDBE5, 0xE97E, 0xF8F7,
139 0x1081, 0x0108, 0x3393, 0x221A, 0x56A5, 0x472C, 0x75B7, 0x643E,
140 0x9CC9, 0x8D40, 0xBFDB, 0xAE52, 0xDAED, 0xCB64, 0xF9FF, 0xE876,
141 0x2102, 0x308B, 0x0210, 0x1399, 0x6726, 0x76AF, 0x4434, 0x55BD,
142 0xAD4A, 0xBCC3, 0x8E58, 0x9FD1, 0xEB6E, 0xFAE7, 0xC87C, 0xD9F5,
143 0x3183, 0x200A, 0x1291, 0x0318, 0x77A7, 0x662E, 0x54B5, 0x453C,
144 0xBDCB, 0xAC42, 0x9ED9, 0x8F50, 0xFBEF, 0xEA66, 0xD8FD, 0xC974,
145 0x4204, 0x538D, 0x6116, 0x709F, 0x0420, 0x15A9, 0x2732, 0x36BB,
146 0xCE4C, 0xDFC5, 0xED5E, 0xFCD7, 0x8868, 0x99E1, 0xAB7A, 0xBAF3,
147 0x5285, 0x430C, 0x7197, 0x601E, 0x14A1, 0x0528, 0x37B3, 0x263A,
148 0xDECD, 0xCF44, 0xFDDF, 0xEC56, 0x98E9, 0x8960, 0xBBFB, 0xAA72,
149 0x6306, 0x728F, 0x4014, 0x519D, 0x2522, 0x34AB, 0x0630, 0x17B9,
150 0xEF4E, 0xFEC7, 0xCC5C, 0xDDD5, 0xA96A, 0xB8E3, 0x8A78, 0x9BF1,
151 0x7387, 0x620E, 0x5095, 0x411C, 0x35A3, 0x242A, 0x16B1, 0x0738,
152 0xFFCF, 0xEE46, 0xDCDD, 0xCD54, 0xB9EB, 0xA862, 0x9AF9, 0x8B70,
153 0x8408, 0x9581, 0xA71A, 0xB693, 0xC22C, 0xD3A5, 0xE13E, 0xF0B7,
154 0x0840, 0x19C9, 0x2B52, 0x3ADB, 0x4E64, 0x5FED, 0x6D76, 0x7CFF,
155 0x9489, 0x8500, 0xB79B, 0xA612, 0xD2AD, 0xC324, 0xF1BF, 0xE036,
156 0x18C1, 0x0948, 0x3BD3, 0x2A5A, 0x5EE5, 0x4F6C, 0x7DF7, 0x6C7E,
157 0xA50A, 0xB483, 0x8618, 0x9791, 0xE32E, 0xF2A7, 0xC03C, 0xD1B5,
158 0x2942, 0x38CB, 0x0A50, 0x1BD9, 0x6F66, 0x7EEF, 0x4C74, 0x5DFD,
159 0xB58B, 0xA402, 0x9699, 0x8710, 0xF3AF, 0xE226, 0xD0BD, 0xC134,
160 0x39C3, 0x284A, 0x1AD1, 0x0B58, 0x7FE7, 0x6E6E, 0x5CF5, 0x4D7C,
161 0xC60C, 0xD785, 0xE51E, 0xF497, 0x8028, 0x91A1, 0xA33A, 0xB2B3,
162 0x4A44, 0x5BCD, 0x6956, 0x78DF, 0x0C60, 0x1DE9, 0x2F72, 0x3EFB,
163 0xD68D, 0xC704, 0xF59F, 0xE416, 0x90A9, 0x8120, 0xB3BB, 0xA232,
164 0x5AC5, 0x4B4C, 0x79D7, 0x685E, 0x1CE1, 0x0D68, 0x3FF3, 0x2E7A,
165 0xE70E, 0xF687, 0xC41C, 0xD595, 0xA12A, 0xB0A3, 0x8238, 0x93B1,
166 0x6B46, 0x7ACF, 0x4854, 0x59DD, 0x2D62, 0x3CEB, 0x0E70, 0x1FF9,
167 0xF78F, 0xE606, 0xD49D, 0xC514, 0xB1AB, 0xA022, 0x92B9, 0x8330,
168 0x7BC7, 0x6A4E, 0x58D5, 0x495C, 0x3DE3, 0x2C6A, 0x1EF1, 0x0F78,
169)
170# fmt: on
173def _hostname_to_devicename(hostname: str) -> Optional[str]:
174 """Extracts a lower-case device name from a host name in the way described
175 in the module documentation. This does not yet attempt to find a suitable
176 device.
178 >>> _hostname_to_devicename("ttyacm0.dev.alt")
179 'ttyacm0'
180 >>> _hostname_to_devicename("example.com") is None
181 True
182 """
183 if not hostname.endswith(".dev.alt"):
184 return None
186 if any(c not in _UNRESERVED for c in hostname):
187 return None
189 return hostname.removesuffix(".dev.alt")
192def _checksum(data: bytes) -> bytes:
193 """Calculates the Control message checksum (16-bit FCS from RFC1662),
194 following its implementation in appendix C.2.
196 >>> _checksum(b"coap is cool!")
197 b'$&'
198 """
200 fcs = 0xFFFF
201 for byte in data:
202 fcs = (fcs >> 8) ^ _FCS_LOOKUP[(fcs ^ byte) & 0xFF]
203 fcs ^= 0xFFFF
204 return fcs.to_bytes(2, "little")
207class SlipmuxAddress(interfaces.EndpointAddress):
208 def __init__(self, hostname, interface):
209 if _hostname_to_devicename(hostname) is None:
210 raise ValueError(f"Not a recognized host name: {hostname!r}")
211 # This is overly strict, but we can still relax, and this simplifies
212 # eg. hostinfo, and won't need normalization.
213 self._host = hostname
214 self._interface = weakref.ref(interface)
216 scheme = "coap"
217 is_multicast = False
218 is_multicast_locally = False
220 interface = property(lambda self: self._interface())
222 def __hash__(self):
223 return hash(self._host)
225 def __eq__(self, other):
226 return self._host == other._host
228 def __repr__(self):
229 """
230 >>> SlipmuxAddress("ttyusb0.dev.alt", MessageInterfaceSlipmux(..., ..., ..., ...))
231 <SlipmuxAddress ttyusb0.dev.alt>
232 """
233 return "<%s %s>" % (
234 type(self).__name__,
235 self._host,
236 )
238 @property
239 def hostinfo(self):
240 return self._host
242 @property
243 def hostinfo_local(self):
244 raise error.AnonymousHost
246 @property
247 def uri_base(self):
248 return "coap://" + self.hostinfo
250 @property
251 def uri_base_local(self):
252 raise error.AnonymousHost
254 @property
255 def blockwise_key(self):
256 return self._host
259class MessageInterfaceSlipmux(interfaces.MessageInterface):
260 """Message Interface for Slipmux.
262 As serial ports generally not be opened multiple times, this does not even
263 try to manage running connections in the Endpoint objects, but spools
264 endpoints until they break, at which point any attempt to send a message
265 reconnects.
267 This keeps any number of connections open with no attempts to limit them,
268 as the number of serial connections a system has is generally way lower
269 than any RAM limits.
270 """
272 def __init__(
273 self, params: TransportParameters, ctx: interfaces.MessageManager, log, loop
274 ):
275 self.__ctx = ctx
276 self.__log = log
277 self.__pool: dict[SlipmuxAddress, SlipmuxProtocol] = {}
278 self.__loop = loop
279 self.__params = params
280 self.__unixlisten_states: dict[SlipmuxAddress, _UnixListenState] = {}
282 async def shutdown(self):
283 for t in self.__unixlisten_states.values():
284 t.shutdown()
285 for proto in self.__pool.values():
286 proto.transport.close()
287 return
289 async def _get(self, remote: SlipmuxAddress) -> SlipmuxProtocol:
290 if remote in self.__pool:
291 return self.__pool[remote]
292 devicename = _hostname_to_devicename(remote._host)
293 assert devicename is not None, "Checked at construction at recognition"
295 starting_future = self.__loop.create_future()
296 weakself = weakref.ref(self)
297 connlog = self.__log.getChild(f"device-{devicename}")
299 def protocol_factory():
300 return SlipmuxProtocol(starting_future, weakself, remote, connlog)
302 assert self.__params.slipmux is not None
303 devparams = self.__params.slipmux.devices.get(devicename, SlipmuxDevice())
304 if devparams.unix_connect is not None:
305 (_, protocol) = await asyncio.get_running_loop().create_unix_connection(
306 protocol_factory,
307 # Type is ignored because mypy seems not to know that event
308 # loops *do* accept paths.
309 devparams.unix_connect, # type: ignore
310 )
311 elif devparams.unix_listen is not None:
312 # We only reach this when a request is sent there. Then
313 # async-blocking on sending that request makes some sense: UNIX
314 # sockets are used mostly in testing, and both sides need to be
315 # ready to start talking. Suspending execution at the ._get() (and
316 # thus recognizing the remote) will create some unexpected delays
317 # when a process sends a request and then expects waiting for a
318 # response to take long (rather than the initial recognition), but
319 # that is probably fine.
320 protocol = await self.__unixlisten_states[remote].get_waiting()
321 else:
322 full_devicename: Path | str | None = devparams.device
323 if full_devicename is None:
324 for filename in os.listdir("/dev/"):
325 if filename.lower() == devicename:
326 full_devicename = "/dev/" + filename
327 break
328 else:
329 # sensible fallback for Windows, I guess
330 full_devicename = devicename
331 (_, protocol) = await serial_asyncio.create_serial_connection(
332 self.__loop,
333 protocol_factory,
334 full_devicename,
335 baudrate=115200,
336 )
338 self.__pool[remote] = protocol
339 startup_error = await starting_future
340 if startup_error is not None:
341 raise startup_error
342 return protocol
344 def _get_immediately(self, remote: SlipmuxAddress) -> SlipmuxProtocol:
345 """Gets the relevant protcol, expecting that it is present (eg. when
346 sending, because the remote was just recognized, which would have
347 started the connection)"""
348 return self.__pool[remote]
350 def send(self, message):
351 protocol = self._get_immediately(message.remote)
352 # FIXME: Where do we best do this?
353 if message.opt.uri_host == message.remote.hostinfo:
354 message.opt.uri_host = None
355 protocol.send_control(message.encode())
357 async def recognize_remote(self, remote):
358 if isinstance(remote, SlipmuxAddress) and remote.interface == self:
359 # See determine_remote: from the current API, this is the only
360 # asynchronous point before a send.
361 await self._get(remote)
362 return True
363 return False
365 async def determine_remote(self, message):
366 # FIXME: Should we allow ports?
367 if (
368 message.remote.scheme == "coap"
369 and _hostname_to_devicename(message.remote.hostinfo) is not None
370 ):
371 address = SlipmuxAddress(message.remote.hostinfo, self)
372 # We have to connect now and await connection -- not just because
373 # this gives us errors reasonably fast, but also because `.send` in
374 # the message interface is synchronus.
375 await self._get(address)
376 return address
378 @classmethod
379 async def create_transport_endpoint(
380 cls,
381 params: TransportParameters,
382 ctx: interfaces.MessageManager,
383 log,
384 loop,
385 ):
386 slef = cls(params, ctx, log, loop)
387 if params.is_server:
388 assert params.slipmux is not None
389 for key, value in params.slipmux.devices.items():
390 remote = SlipmuxAddress(f"{key}.dev.alt", slef)
391 if value.unix_listen:
392 # _get would async-block, we need to spawn an actual task
393 # that'll keep working with incoming peers
394 unixlisten = slef.__unixlisten_states[remote] = _UnixListenState(
395 slef,
396 slef.__log.getChild(f"serverdevice-{key}"),
397 loop,
398 remote,
399 value,
400 )
401 await unixlisten.start()
402 else:
403 await slef._get(remote)
404 return slef
406 # Provided for _UnixListenState
408 def unixlisten_available(self, remote, protocol):
409 self.__pool[remote] = protocol
411 # provided for SlipmuxProtocol
413 def received(self, remote, data):
414 try:
415 try:
416 message = Message.decode(data, remote)
417 except error.UnparsableMessage:
418 self.__log.warning("Ignoring unparsable message from %s", remote)
419 return
421 self.__ctx.dispatch_message(message)
423 except BaseException as exc:
424 # Catching here because util.asyncio.recvmsg inherits
425 # _SelectorDatagramTransport's bad handling of callback errors;
426 # this is the last time we have a log at hand.
427 self.__log.error(
428 "Exception raised through dispatch_message: %s", exc, exc_info=exc
429 )
430 raise
432 def terminated(self, remote, exception):
433 if state := self.__unixlisten_states.get(remote):
434 state.disconnected()
435 self.__ctx.dispatch_error(exception, remote)
438class SlipmuxProtocol(asyncio.Protocol):
439 def __init__(self, starting_future, weakinstance, remote_handle, log):
440 self.__starting = starting_future
441 self.__instance = weakinstance
442 self.__remote_handle = remote_handle
443 self.__control_frame: Optional[list] = None
444 self.__escape = False
445 self.__log = log
447 def send_control(self, data: bytes):
448 data = bytes((HEADER_CONTROL,)) + data
449 data += _checksum(data)
450 data = data.replace(BYTE_ESC, BYTES_ESC_ESC_ESC).replace(
451 BYTE_END, BYTES_ESC_ESC_END
452 )
453 # seems to be general practice to send leading ENDs generously
454 data = BYTE_END + data + BYTE_END
455 self.transport.write(data)
457 def connection_made(self, transport):
458 self.transport = transport
459 self.__starting.set_result(None)
461 def _end(self, is_regular: bool):
462 """Restores the state after an END has been received.
464 On a regular END condition, this checks the FCS and emits the frame, if
465 any, and then resets; otherwise, it only resets."""
466 if is_regular and isinstance(self.__control_frame, list):
467 if len(self.__control_frame) < 2:
468 self.__log.warn("Control frame too short")
469 else:
470 message = bytes(self.__control_frame[:-2])
471 fcs = bytes(self.__control_frame[-2:])
472 if fcs != _checksum(bytes((HEADER_CONTROL,)) + message):
473 self.__log.warn("FCS mismatch")
474 else:
475 instance = self.__instance()
476 if instance is None:
477 self.__log.warn(
478 "Discarding incoming message: Instance shut down"
479 )
480 else:
481 instance.received(self.__remote_handle, message)
482 self.__escape = False
483 self.__control_frame = None
485 def data_received(self, data):
486 for byte in data:
487 if self.__escape:
488 match byte:
489 case slipmux.ESC_END:
490 byte = END
491 case slipmux.ESC_ESC:
492 byte = ESC
493 case slipmux.END:
494 self.__log.info("Frame has been aborted.")
495 self._end(False)
496 continue
497 case _:
498 self._end(False)
499 self.__log.warning(
500 "Framing error: Non-Escape value after Escape"
501 )
502 continue
503 self.__escape = False
504 else:
505 if byte == ESC:
506 self.__escape = True
507 continue
508 if byte == END:
509 self._end(True)
510 continue
512 if self.__control_frame is None:
513 if byte == HEADER_CONTROL:
514 self.__control_frame = []
515 else:
516 self.__control_frame = False
517 continue
519 if isinstance(self.__control_frame, list):
520 self.__control_frame.append(byte)
522 def connection_lost(self, exc):
523 if not self.__starting.done():
524 self.set_result(exc)
526 instance = self.__instance()
527 if instance is None:
528 self.__log.warn(
529 "Discarding disconnect error (%r from %r): Instance shut down",
530 exc,
531 self.__remote_handler,
532 )
533 else:
534 instance.terminated(self.__remote_handle, exc)
536 # We do not implement pause_writing / resume_writing: the message interface
537 # has no backpressure, so we'll just fill up the buffer, but given that
538 # CoAP's flow control applies anyway (and it is way more conservative than
539 # any actual UART's baud rate), there is little risk of excessive buffer
540 # build-up.
543class _UnixListenState:
544 """Wrapper around a UNIX listening server. Serves one connection at a time."""
546 def __init__(self, message_interface, log, loop, remote, transport_params):
547 self.__message_interface = weakref.ref(message_interface)
548 self.__log = log
549 self.__loop = loop
550 self.__remote = remote
552 self.__unix_socket_filename = transport_params.unix_listen
553 assert self.__unix_socket_filename is not None, (
554 "_UnixListenState created on non-unix-listen transport config"
555 )
557 self.__current_connection = self.__loop.create_future()
559 async def start(self):
560 """Runs asynchronous initialization steps (creating the UNIX socket).
562 Run this exactly once per instance, after init."""
564 self.__server = await self.__loop.create_unix_server(
565 self.protocol_factory,
566 self.__unix_socket_filename,
567 )
569 def disconnected(self):
570 self.__current_connection = self.__loop.create_future()
572 def shutdown(self):
573 self.__server.close()
575 @property
576 def message_interface(self) -> MessageInterfaceSlipmux:
577 mi = self.__message_interface()
578 if mi is None:
579 self.__log.warn("Message interface vanished without a shutdown")
580 raise asyncio.CancelledError
581 return mi
583 def protocol_factory(self):
584 if self.__current_connection.done():
586 class ShutDownImmediately:
587 def connection_made(self, transport):
588 transport.write(
589 b"\xc0\nRefusing connection: a slipmux session is currently ongoing.\n\xc0"
590 )
591 transport.close()
593 def connection_lost(self, exc):
594 pass
596 return ShutDownImmediately()
597 else:
598 # We're not using it: As a listening UNIX socket we can trust that
599 # when the future factory is called, an connection_made is run
600 # immediately.
601 starting_future = self.__loop.create_future()
602 protocol = SlipmuxProtocol(
603 starting_future, self.__message_interface, self.__remote, self.__log
604 )
605 self.__current_connection.set_result(protocol)
606 self.message_interface.unixlisten_available(self.__remote, protocol)
607 return protocol
609 async def get_waiting(self) -> SlipmuxProtocol:
610 """Returns the connected protocol, or waits until one is connected"""
611 return await self.__current_connection