Coverage for aiocoap / transports / slipmux.py: 82%
260 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module implements slipmux-03_, with some adjustments regarding URI design
6taken from transport-indication_: It chooses the pattern `{NAME}.dev.alt` for
7devices named /dev/{NAME}, which are treated case-insensitively.
9Configuration
10=============
12While the ``slipmux`` transport is implicitly enabled whenever its dependencies
13are met, it has no effect on servers (because devices are not automatically
14bound to: a slipmux device is not generally recognizable as such).
16To set up server configuration, or for extra options such as the use or
17creation of UNIX sockets, this transport can be configured by setting
18:class:`aiocoap.config.SlipmuxParameters` in the transports configured at
19context creation (:meth:`Context.create_client_context()
20<aiocoap.protocol.Context.create_client_context>` /
21:meth:`create_server_context()
22<aiocoap.protocol.Context.create_server_context>`).
24Usage example
25=============
27Client with physical hardware
28-----------------------------
30Assuming you have a constrained device that suports slipmux connected to a PC
31as ``/dev/ttyACM0``, you can run::
33 $ aiocoap-client coap://ttyacm0.dev.alt/.well-known/core
35and interact with resources as found there; no further configuration is needed.
37Server with test peer
38---------------------
40To mock slipmux without a real serial connection, you can configure a slipmux
41host name to open up a UNIX socket instead. Write this configuration into
42``config-unix-listen.toml``::
44 [transport.slipmux.devices]
45 my-listener = { unix-listen = "/tmp/coap.socket" }
47Then run a server such as the file server::
49 $ aiocoap-fileserver --server-config config-unix-listen.toml
51You can then run a client such as Jelly intreactively with that server::
53 $ cargo install Jelly
54 $ Jelly /tmp/coap.socket
56Beware that the aiocoap based server needs to be restarted when the client
57disconnects (see Caveats below).
59Client with test peer
60---------------------
62You can also run aiocoap as a test client, but as that doesn't take a file name
63argument to connect to (because it operates on URIs' host components), some
64configuration is necessary. Store this in ``config-unix-connect.toml``::
66 [transport.slipmux.devices]
67 my-connection = { unix-connect = "/tmp/coap.socket" }
69Then run::
71 $ aiocoap-client coap://my-connection.dev.alt/.well-known/core --config config-unix-connect.toml
73Caveats
74=======
76While servers do connect automatically to any configured slipmux endpoint, they
77do not reconnect automatically when that device goes away or is replaced (as
78may happen when resetting a development board, depending on its USB UART
79implementation). The same is true for UNIX sockets.
81Error handing is generally incomplete when it comes to I/O errors.
83This transport is currently not tested automatically, as only the client side
84is implemented, with no mechanism for acting on (eg.) a UNIX socket instead.
86.. _slipmux-03: https://datatracker.ietf.org/doc/draft-bormann-t2trg-slipmux/03/
87.. _transport-indication: https://datatracker.ietf.org/doc/draft-ietf-core-transport-indication
89------
90"""
92from __future__ import annotations
94# Implementation experience notes:
95#
96# * lower/upper in device names is tedious, esp. with defaults such as ttyACM0
97# * FCS is very old reference that had no test vectors; single example would be helpful.
98# * Ports numbers?
100import asyncio
101import os
102from pathlib import Path
103import string
104from typing import Optional
105import weakref
107import serial_asyncio
109from .. import Message
110from .. import error, interfaces
112# circular but allows matching on constants
113from . import slipmux
115from ..config import TransportParameters, SlipmuxDevice
117# from RFC1055
118ESC = 0o333
119END = 0o300
120ESC_END = 0o334
121ESC_ESC = 0o335
122# but at runtime we need the combined forms
123BYTE_ESC = bytes((ESC,))
124BYTE_END = bytes((END,))
125BYTES_ESC_ESC_END = BYTE_ESC + bytes((ESC_END,))
126BYTES_ESC_ESC_ESC = BYTE_ESC + bytes((ESC_ESC,))
128# from slipmux draft
129HEADER_CONTROL = 0xA9
131# from RFC3986
132_UNRESERVED = string.ascii_lowercase + string.digits + "-._~"
134# Values from RFC1662
135# fmt: off
136_FCS_LOOKUP = (
137 0x0000, 0x1189, 0x2312, 0x329B, 0x4624, 0x57AD, 0x6536, 0x74BF,
138 0x8C48, 0x9DC1, 0xAF5A, 0xBED3, 0xCA6C, 0xDBE5, 0xE97E, 0xF8F7,
139 0x1081, 0x0108, 0x3393, 0x221A, 0x56A5, 0x472C, 0x75B7, 0x643E,
140 0x9CC9, 0x8D40, 0xBFDB, 0xAE52, 0xDAED, 0xCB64, 0xF9FF, 0xE876,
141 0x2102, 0x308B, 0x0210, 0x1399, 0x6726, 0x76AF, 0x4434, 0x55BD,
142 0xAD4A, 0xBCC3, 0x8E58, 0x9FD1, 0xEB6E, 0xFAE7, 0xC87C, 0xD9F5,
143 0x3183, 0x200A, 0x1291, 0x0318, 0x77A7, 0x662E, 0x54B5, 0x453C,
144 0xBDCB, 0xAC42, 0x9ED9, 0x8F50, 0xFBEF, 0xEA66, 0xD8FD, 0xC974,
145 0x4204, 0x538D, 0x6116, 0x709F, 0x0420, 0x15A9, 0x2732, 0x36BB,
146 0xCE4C, 0xDFC5, 0xED5E, 0xFCD7, 0x8868, 0x99E1, 0xAB7A, 0xBAF3,
147 0x5285, 0x430C, 0x7197, 0x601E, 0x14A1, 0x0528, 0x37B3, 0x263A,
148 0xDECD, 0xCF44, 0xFDDF, 0xEC56, 0x98E9, 0x8960, 0xBBFB, 0xAA72,
149 0x6306, 0x728F, 0x4014, 0x519D, 0x2522, 0x34AB, 0x0630, 0x17B9,
150 0xEF4E, 0xFEC7, 0xCC5C, 0xDDD5, 0xA96A, 0xB8E3, 0x8A78, 0x9BF1,
151 0x7387, 0x620E, 0x5095, 0x411C, 0x35A3, 0x242A, 0x16B1, 0x0738,
152 0xFFCF, 0xEE46, 0xDCDD, 0xCD54, 0xB9EB, 0xA862, 0x9AF9, 0x8B70,
153 0x8408, 0x9581, 0xA71A, 0xB693, 0xC22C, 0xD3A5, 0xE13E, 0xF0B7,
154 0x0840, 0x19C9, 0x2B52, 0x3ADB, 0x4E64, 0x5FED, 0x6D76, 0x7CFF,
155 0x9489, 0x8500, 0xB79B, 0xA612, 0xD2AD, 0xC324, 0xF1BF, 0xE036,
156 0x18C1, 0x0948, 0x3BD3, 0x2A5A, 0x5EE5, 0x4F6C, 0x7DF7, 0x6C7E,
157 0xA50A, 0xB483, 0x8618, 0x9791, 0xE32E, 0xF2A7, 0xC03C, 0xD1B5,
158 0x2942, 0x38CB, 0x0A50, 0x1BD9, 0x6F66, 0x7EEF, 0x4C74, 0x5DFD,
159 0xB58B, 0xA402, 0x9699, 0x8710, 0xF3AF, 0xE226, 0xD0BD, 0xC134,
160 0x39C3, 0x284A, 0x1AD1, 0x0B58, 0x7FE7, 0x6E6E, 0x5CF5, 0x4D7C,
161 0xC60C, 0xD785, 0xE51E, 0xF497, 0x8028, 0x91A1, 0xA33A, 0xB2B3,
162 0x4A44, 0x5BCD, 0x6956, 0x78DF, 0x0C60, 0x1DE9, 0x2F72, 0x3EFB,
163 0xD68D, 0xC704, 0xF59F, 0xE416, 0x90A9, 0x8120, 0xB3BB, 0xA232,
164 0x5AC5, 0x4B4C, 0x79D7, 0x685E, 0x1CE1, 0x0D68, 0x3FF3, 0x2E7A,
165 0xE70E, 0xF687, 0xC41C, 0xD595, 0xA12A, 0xB0A3, 0x8238, 0x93B1,
166 0x6B46, 0x7ACF, 0x4854, 0x59DD, 0x2D62, 0x3CEB, 0x0E70, 0x1FF9,
167 0xF78F, 0xE606, 0xD49D, 0xC514, 0xB1AB, 0xA022, 0x92B9, 0x8330,
168 0x7BC7, 0x6A4E, 0x58D5, 0x495C, 0x3DE3, 0x2C6A, 0x1EF1, 0x0F78,
169)
170# fmt: on
173def _hostname_to_devicename(hostname: str) -> Optional[str]:
174 """Extracts a lower-case device name from a host name in the way described
175 in the module documentation. This does not yet attempt to find a suitable
176 device.
178 >>> _hostname_to_devicename("ttyacm0.dev.alt")
179 'ttyacm0'
180 >>> _hostname_to_devicename("example.com") is None
181 True
182 """
183 if not hostname.endswith(".dev.alt"):
184 return None
186 if any(c not in _UNRESERVED for c in hostname):
187 return None
189 return hostname.removesuffix(".dev.alt")
192def _checksum(data: bytes) -> bytes:
193 """Calculates the Control message checksum (16-bit FCS from RFC1662),
194 following its implementation in appendix C.2.
196 >>> _checksum(b"coap is cool!")
197 b'$&'
198 """
200 fcs = 0xFFFF
201 for byte in data:
202 fcs = (fcs >> 8) ^ _FCS_LOOKUP[(fcs ^ byte) & 0xFF]
203 fcs ^= 0xFFFF
204 return fcs.to_bytes(2, "little")
207class SlipmuxAddress(interfaces.EndpointAddress):
208 def __init__(self, hostname, interface):
209 if _hostname_to_devicename(hostname) is None:
210 raise ValueError(f"Not a recognized host name: {hostname!r}")
211 # This is overly strict, but we can still relax, and this simplifies
212 # eg. hostinfo, and won't need normalization.
213 self._host = hostname
214 self._interface = weakref.ref(interface)
216 scheme = "coap"
217 is_multicast = False
218 is_multicast_locally = False
220 interface = property(lambda self: self._interface())
222 def __hash__(self):
223 return hash(self._host)
225 def __eq__(self, other):
226 return self._host == other._host
228 def __repr__(self):
229 """
230 >>> SlipmuxAddress("ttyusb0.dev.alt", MessageInterfaceSlipmux(..., ..., ..., ...))
231 <SlipmuxAddress ttyusb0.dev.alt>
232 """
233 return "<%s %s>" % (
234 type(self).__name__,
235 self._host,
236 )
238 @property
239 def hostinfo(self):
240 return self._host
242 @property
243 def hostinfo_local(self):
244 raise error.AnonymousHost
246 @property
247 def uri_base(self):
248 return "coap://" + self.hostinfo
250 @property
251 def uri_base_local(self):
252 raise error.AnonymousHost
254 @property
255 def blockwise_key(self):
256 return self._host
259class MessageInterfaceSlipmux(interfaces.MessageInterface):
260 """Message Interface for Slipmux.
262 As serial ports generally not be opened multiple times, this does not even
263 try to manage running connections in the Endpoint objects, but spools
264 endpoints until they break, at which point any attempt to send a message
265 reconnects.
267 This keeps any number of connections open with no attempts to limit them,
268 as the number of serial connections a system has is generally way lower
269 than any RAM limits.
270 """
272 def __init__(
273 self, params: TransportParameters, ctx: interfaces.MessageManager, log, loop
274 ):
275 self.__ctx = ctx
276 self.__log = log
277 self.__pool: dict[SlipmuxAddress, SlipmuxProtocol] = {}
278 self.__loop = loop
279 self.__params = params
280 self.__unixlisten_states: dict[SlipmuxAddress, _UnixListenState] = {}
282 async def shutdown(self):
283 for t in self.__unixlisten_states.values():
284 t.shutdown()
285 for proto in self.__pool.values():
286 proto.transport.close()
287 return
289 async def _get(self, remote: SlipmuxAddress) -> SlipmuxProtocol:
290 if remote in self.__pool:
291 return self.__pool[remote]
292 devicename = _hostname_to_devicename(remote._host)
293 assert devicename is not None, "Checked at construction at recognition"
295 starting_future = self.__loop.create_future()
296 weakself = weakref.ref(self)
297 connlog = self.__log.getChild(f"device-{devicename}")
299 def protocol_factory():
300 return SlipmuxProtocol(starting_future, weakself, remote, connlog)
302 assert self.__params.slipmux is not None
303 devparams = self.__params.slipmux.devices.get(devicename, SlipmuxDevice())
304 if devparams.unix_connect is not None:
305 (_, protocol) = await asyncio.get_running_loop().create_unix_connection(
306 protocol_factory,
307 # Type is ignored because mypy seems not to know that event
308 # loops *do* accept paths.
309 devparams.unix_connect, # type: ignore
310 )
311 elif devparams.unix_listen is not None:
312 # We only reach this when a request is sent there. Then
313 # async-blocking on sending that request makes some sense: UNIX
314 # sockets are used mostly in testing, and both sides need to be
315 # ready to start talking. Suspending execution at the ._get() (and
316 # thus recognizing the remote) will create some unexpected delays
317 # when a process sends a request and then expects waiting for a
318 # response to take long (rather than the initial recognition), but
319 # that is probably fine.
320 protocol = await self.__unixlisten_states[remote].get_waiting()
321 else:
322 full_devicename: Path | str | None = devparams.device
323 if full_devicename is None:
324 for filename in os.listdir("/dev/"):
325 if filename.lower() == devicename:
326 full_devicename = "/dev/" + filename
327 break
328 else:
329 # sensible fallback for Windows, I guess
330 full_devicename = devicename
331 (_, protocol) = await serial_asyncio.create_serial_connection(
332 self.__loop,
333 protocol_factory,
334 full_devicename,
335 baudrate=115200,
336 )
338 self.__pool[remote] = protocol
339 startup_error = await starting_future
340 if startup_error is not None:
341 raise startup_error
342 return protocol
344 def _get_immediately(self, remote: SlipmuxAddress) -> SlipmuxProtocol:
345 """Gets the relevant protcol, expecting that it is present (eg. when
346 sending, because the remote was just recognized, which would have
347 started the connection)"""
348 return self.__pool[remote]
350 def send(self, message):
351 protocol = self._get_immediately(message.remote)
352 # FIXME: Where do we best do this?
353 if message.opt.uri_host == message.remote.hostinfo:
354 message.opt.uri_host = None
355 protocol.send_control(message.encode())
357 async def recognize_remote(self, remote):
358 if isinstance(remote, SlipmuxAddress) and remote.interface == self:
359 # See determine_remote: from the current API, this is the only
360 # asynchronous point before a send.
361 await self._get(remote)
362 return False
364 async def determine_remote(self, message):
365 # FIXME: Should we allow ports?
366 if (
367 message.remote.scheme == "coap"
368 and _hostname_to_devicename(message.remote.hostinfo) is not None
369 ):
370 address = SlipmuxAddress(message.remote.hostinfo, self)
371 # We have to connect now and await connection -- not just because
372 # this gives us errors reasonably fast, but also because `.send` in
373 # the message interface is synchronus.
374 await self._get(address)
375 return address
377 @classmethod
378 async def create_transport_endpoint(
379 cls,
380 params: TransportParameters,
381 ctx: interfaces.MessageManager,
382 log,
383 loop,
384 ):
385 slef = cls(params, ctx, log, loop)
386 if params.is_server:
387 assert params.slipmux is not None
388 for key, value in params.slipmux.devices.items():
389 remote = SlipmuxAddress(f"{key}.dev.alt", slef)
390 if value.unix_listen:
391 # _get would async-block, we need to spawn an actual task
392 # that'll keep working with incoming peers
393 unixlisten = slef.__unixlisten_states[remote] = _UnixListenState(
394 slef,
395 slef.__log.getChild(f"serverdevice-{key}"),
396 loop,
397 remote,
398 value,
399 )
400 await unixlisten.start()
401 else:
402 await slef._get(remote)
403 return slef
405 # Provided for _UnixListenState
407 def unixlisten_available(self, remote, protocol):
408 self.__pool[remote] = protocol
410 # provided for SlipmuxProtocol
412 def received(self, remote, data):
413 try:
414 try:
415 message = Message.decode(data, remote)
416 except error.UnparsableMessage:
417 self.__log.warning("Ignoring unparsable message from %s", remote)
418 return
420 self.__ctx.dispatch_message(message)
422 except BaseException as exc:
423 # Catching here because util.asyncio.recvmsg inherits
424 # _SelectorDatagramTransport's bad handling of callback errors;
425 # this is the last time we have a log at hand.
426 self.__log.error(
427 "Exception raised through dispatch_message: %s", exc, exc_info=exc
428 )
429 raise
431 def terminated(self, remote, exception):
432 if state := self.__unixlisten_states.get(remote):
433 state.disconnected()
434 self.__ctx.dispatch_error(exception, remote)
437class SlipmuxProtocol(asyncio.Protocol):
438 def __init__(self, starting_future, weakinstance, remote_handle, log):
439 self.__starting = starting_future
440 self.__instance = weakinstance
441 self.__remote_handle = remote_handle
442 self.__control_frame: Optional[list] = None
443 self.__escape = False
444 self.__log = log
446 def send_control(self, data: bytes):
447 data = bytes((HEADER_CONTROL,)) + data
448 data += _checksum(data)
449 data = data.replace(BYTE_ESC, BYTES_ESC_ESC_ESC).replace(
450 BYTE_END, BYTES_ESC_ESC_END
451 )
452 # seems to be general practice to send leading ENDs generously
453 data = BYTE_END + data + BYTE_END
454 self.transport.write(data)
456 def connection_made(self, transport):
457 self.transport = transport
458 self.__starting.set_result(None)
460 def _end(self, is_regular: bool):
461 """Restores the state after an END has been received.
463 On a regular END condition, this checks the FCS and emits the frame, if
464 any, and then resets; otherwise, it only resets."""
465 if is_regular and isinstance(self.__control_frame, list):
466 if len(self.__control_frame) < 2:
467 self.__log.warn("Control frame too short")
468 else:
469 message = bytes(self.__control_frame[:-2])
470 fcs = bytes(self.__control_frame[-2:])
471 if fcs != _checksum(bytes((HEADER_CONTROL,)) + message):
472 self.__log.warn("FCS mismatch")
473 else:
474 instance = self.__instance()
475 if instance is None:
476 self.__log.warn(
477 "Discarding incoming message: Instance shut down"
478 )
479 else:
480 instance.received(self.__remote_handle, message)
481 self.__escape = False
482 self.__control_frame = None
484 def data_received(self, data):
485 for byte in data:
486 if self.__escape:
487 match byte:
488 case slipmux.ESC_END:
489 byte = END
490 case slipmux.ESC_ESC:
491 byte = ESC
492 case slipmux.END:
493 self.__log.info("Frame has been aborted.")
494 self._end(False)
495 continue
496 case _:
497 self._end(False)
498 self.__log.warning(
499 "Framing error: Non-Escape value after Escape"
500 )
501 continue
502 self.__escape = False
503 else:
504 if byte == ESC:
505 self.__escape = True
506 continue
507 if byte == END:
508 self._end(True)
509 continue
511 if self.__control_frame is None:
512 if byte == HEADER_CONTROL:
513 self.__control_frame = []
514 else:
515 self.__control_frame = False
516 continue
518 if isinstance(self.__control_frame, list):
519 self.__control_frame.append(byte)
521 def connection_lost(self, exc):
522 if not self.__starting.done():
523 self.set_result(exc)
525 instance = self.__instance()
526 if instance is None:
527 self.__log.warn(
528 "Discarding disconnect error (%r from %r): Instance shut down",
529 exc,
530 self.__remote_handler,
531 )
532 else:
533 instance.terminated(self.__remote_handle, exc)
535 # We do not implement pause_writing / resume_writing: the message interface
536 # has no backpressure, so we'll just fill up the buffer, but given that
537 # CoAP's flow control applies anyway (and it is way more conservative than
538 # any actual UART's baud rate), there is little risk of excessive buffer
539 # build-up.
542class _UnixListenState:
543 """Wrapper around a UNIX listening server. Serves one connection at a time."""
545 def __init__(self, message_interface, log, loop, remote, transport_params):
546 self.__message_interface = weakref.ref(message_interface)
547 self.__log = log
548 self.__loop = loop
549 self.__remote = remote
551 self.__unix_socket_filename = transport_params.unix_listen
552 assert self.__unix_socket_filename is not None, (
553 "_UnixListenState created on non-unix-listen transport config"
554 )
556 self.__current_connection = self.__loop.create_future()
558 async def start(self):
559 """Runs asynchronous initialization steps (creating the UNIX socket).
561 Run this exactly once per instance, after init."""
563 self.__server = await self.__loop.create_unix_server(
564 self.protocol_factory,
565 self.__unix_socket_filename,
566 )
568 def disconnected(self):
569 self.__current_connection = self.__loop.create_future()
571 def shutdown(self):
572 self.__server.close()
574 @property
575 def message_interface(self) -> MessageInterfaceSlipmux:
576 mi = self.__message_interface()
577 if mi is None:
578 self.__log.warn("Message interface vanished without a shutdown")
579 raise asyncio.CancelledError
580 return mi
582 def protocol_factory(self):
583 if self.__current_connection.done():
585 class ShutDownImmediately:
586 def connection_made(self, transport):
587 transport.write(
588 b"\xc0\nRefusing connection: a slipmux session is currently ongoing.\n\xc0"
589 )
590 transport.close()
592 def connection_lost(self, exc):
593 pass
595 return ShutDownImmediately()
596 else:
597 # We're not using it: As a listening UNIX socket we can trust that
598 # when the future factory is called, an connection_made is run
599 # immediately.
600 starting_future = self.__loop.create_future()
601 protocol = SlipmuxProtocol(
602 starting_future, self.__message_interface, self.__remote, self.__log
603 )
604 self.__current_connection.set_result(protocol)
605 self.message_interface.unixlisten_available(self.__remote, protocol)
606 return protocol
608 async def get_waiting(self) -> SlipmuxProtocol:
609 """Returns the connected protocol, or waits until one is connected"""
610 return await self.__current_connection