Coverage for aiocoap / transports / ws.py: 84%
214 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:32 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""
6This module implements a TokenInterface for `CoAP over WebSockets`_.
8.. _`CoAP over WebSockets`: https://tools.ietf.org/html/rfc8323#section-4
10As with CoAP-over-TCP, while the transport distinguishes a connection initiator
11("WebSocket (and TCP) client") and a receiver ("WebSocket (and TCP) server"),
12both sides can take both roles in CoAP (ie. as a CoAP server and a CoAP
13client). As the WebSocket client can not possibly be connected to (even by the
14same server -- once the connection is closed, it's gone and even a new one
15likely has a different port), aiocoap does not allow expressing their addresses
16in URIs (given they wouldn't serve their purpose as URLs and don't provide any
17stability either). Requests to a CoAP-over-WS client can be made by assigning
18the remote to an outgoing request.
20Port choice
21-----------
23Unlike the other transports, CoAP-over-WS is specified with a privileged port
24(port 80) as the default port. This is impractical for aiocoap servers for two
25reasons:
27 * Unless explicitly configured, aiocoap is typically run as an unprivileged
28 user (and has no provisions in place to receive a socket by other means
29 than opening it).
31 * Where a CoAP-over-WS proxy is run, there is often a "proper" website
32 running on the same port on a full HTTP server. That server is usually
33 capable of forwarding requests, whereas the ``websockets`` module used by
34 aiocoap is in no position to either serve websites nor to proxy to an
35 underlying server.
37The recommended setup is therefore to run a full web server at port 80, and
38configure it to proxy incoming requests for WebSockets at `/.well-known/coap`
39to aiocoap's server, which defaults to binding to port 8683.
41The port choice of outgoing connections, or the interpretation of the
42protocol's default port (ie. the port implied by ``coap+ws://hostname/``) is of
43course unaffected by this.
45.. warning::
47 Due to a shortcoming of aiocoap's way of specifying ports to bind
48 to, if a port is explicitly stated to bind to, CoAP-over-WS will bind to that
49 port plus 3000 (resulting in the abovementioned 8683 for 5683). If TLS server
50 keys are given, the TLS server is launched on the next port after the HTTP
51 server (typically 8684).
53Using on pyodide_
54-----------------
56When use on pyodide_,
57instead of using the ``websockets`` module,
58a simplified client-only back-end is used,
59which utilizes the browser's WebSocket API.
61.. _pyodide: https://pyodide.org/
62"""
64from __future__ import annotations
66from typing import Dict, List
67from collections import namedtuple
68import asyncio
69import functools
70import http
71import weakref
73from aiocoap import Message, interfaces, ABORT, util, error
74from aiocoap.transports import rfc8323common
75from ..credentials import CredentialsMap
76from .. import defaults
77from ..message import Direction
79if not defaults.is_pyodide:
80 import websockets.asyncio.connection
81 import websockets.asyncio.server
82else:
83 import aiocoap.util.pyodide_websockets as websockets # type: ignore[no-redef]
86def _decode_message(data: bytes) -> Message:
87 codeoffset = 1
88 tokenoffset = 2
90 tkl = data[0]
91 if tkl > 8:
92 raise error.UnparsableMessage("Overly long token")
93 code = data[codeoffset]
94 token = data[tokenoffset : tokenoffset + tkl]
96 msg = Message(code=code, _token=token)
98 msg.payload = msg.opt.decode(data[tokenoffset + tkl :])
99 msg.direction = Direction.INCOMING
101 return msg
104def _serialize(msg: Message) -> bytes:
105 tkl = len(msg.token)
106 if tkl > 8:
107 raise ValueError("Overly long token")
109 data = [
110 bytes(
111 (
112 tkl,
113 msg.code,
114 )
115 ),
116 msg.token,
117 msg.opt.encode(),
118 ]
119 if msg.payload:
120 data += [b"\xff", msg.payload]
122 return b"".join(data)
125PoolKey = namedtuple("PoolKey", ("scheme", "hostinfo"))
128class InvalidStatus(error.NetworkError):
129 def __str__(self):
130 return str(self.__cause__)
132 def extra_help(self, hints={}):
133 return (
134 "This indicates that while an HTTP server was reached, "
135 "it is not offering CoAP-over-WebSockets at the standard-defined location "
136 "/.well-known/coap."
137 )
140class WSRemote(rfc8323common.RFC8323Remote, interfaces.EndpointAddress):
141 _connection: websockets.asyncio.connection.Connection
142 # Only used to ensure that remotes are associated to the right pool -- not
143 # that there'd be any good reason to have multiple of those.
144 _pool: weakref.ReferenceType[WSPool]
145 _poolkey: PoolKey
147 scheme = None # Override property -- it's per instance here
149 def __init__(
150 self,
151 pool,
152 connection,
153 loop,
154 log,
155 *,
156 scheme,
157 local_hostinfo=None,
158 remote_hostinfo=None,
159 ):
160 super().__init__()
161 self._pool = weakref.ref(pool)
162 self._connection = connection
163 self.loop = loop
164 self.log = log
166 self._local_is_server = isinstance(
167 connection, websockets.asyncio.server.ServerConnection
168 )
170 if local_hostinfo is None:
171 self._local_hostinfo = self._connection.local_address[:2]
172 else:
173 self._local_hostinfo = local_hostinfo
174 if remote_hostinfo is None:
175 self._remote_hostinfo = self._connection.remote_address[:2]
176 else:
177 self._remote_hostinfo = remote_hostinfo
179 self.scheme = scheme
181 # Goes both for client and for server ends; on the server end, it
182 # ensures that role reversal URIs can be used even when passed as URIs
183 # and not as remotes (although that's of course only possible locally).
184 self._poolkey = PoolKey(self.scheme, self.hostinfo)
186 # Necessary for RFC8323Remote
188 def _abort_with(self, msg, *, close_code=1002):
189 # Like _send_message, this may take actual time -- but unlike there,
190 # there's no need to regulate back-pressure
191 self.loop.create_task(
192 self._abort_with_waiting(msg, close_code=close_code),
193 name="Abortion WebSocket sonnection with %r" % msg,
194 )
196 # Unlike _send_message, this is pulled out of the the _abort_with function
197 # as it's also used in _run_recv_loop
198 async def _abort_with_waiting(self, msg, *, close_code):
199 self.log.debug("Aborting with message: %r", msg)
200 try:
201 await self._connection.send(_serialize(msg))
202 except Exception as e:
203 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e)
204 await self._connection.close(code=close_code)
206 def _send_message(self, msg):
207 # FIXME overhaul back-pressure model
208 async def send():
209 self.log.debug("Sending message: %r", msg)
210 try:
211 await self._connection.send(_serialize(msg))
212 except Exception as e:
213 self.log.error(
214 "Sending to a WebSocket should not raise errors", exc_info=e
215 )
217 self.loop.create_task(
218 send(),
219 name="WebSocket sending of %r" % msg,
220 )
222 async def release(self):
223 await super().release()
224 try:
225 await self._connection.wait_closed()
226 except asyncio.CancelledError:
227 self.log.warning(
228 "Connection %s was not closed by peer in time after release", self
229 )
232class WSPool(interfaces.TokenInterface):
233 _outgoing_starting: Dict[PoolKey, asyncio.Task]
234 _pool: Dict[PoolKey, WSRemote]
236 _servers: List[websockets.asyncio.server.Server]
238 def __init__(self, tman, log, loop) -> None:
239 self.loop = loop
241 self._pool = {}
242 self._outgoing_starting = {}
244 self._servers = []
246 # See where it is used for documentation, remove when not needed any more
247 self._in_shutdown = False
249 self._tokenmanager = tman
250 self.log = log
252 self._client_credentials: CredentialsMap
254 @classmethod
255 async def create_transport(
256 cls,
257 tman: interfaces.TokenManager,
258 log,
259 loop,
260 *,
261 client_credentials,
262 server_bind=None,
263 server_context=None,
264 ):
265 self = cls(tman, log, loop)
267 self._client_credentials = client_credentials
269 if server_bind:
270 host, port = server_bind
271 if port is None:
272 port = 8683
273 elif port != 0:
274 # FIXME see module documentation
275 port = port + 3000
277 server = await websockets.asyncio.server.serve(
278 functools.partial(self._new_connection, scheme="coap+ws"),
279 host,
280 port,
281 # Ignoring type: Documentation says strings are OK
282 subprotocols=["coap"], # type: ignore[list-item]
283 process_request=self._process_request,
284 ping_interval=None, # "SHOULD NOT be used"
285 reuse_port=defaults.has_reuse_port(),
286 )
287 self._servers.append(server)
289 if server_context is not None:
290 server = await websockets.asyncio.server.serve(
291 functools.partial(self._new_connection, scheme="coaps+ws"),
292 host,
293 port + 1,
294 # Ignoring type: Documentation says strings are OK
295 subprotocols=["coap"], # type: ignore[list-item]
296 process_request=self._process_request,
297 ping_interval=None, # "SHOULD NOT be used"
298 ssl=server_context,
299 )
300 self._servers.append(server)
302 return self
304 # Helpers for WebScoket server
306 async def _new_connection(self, websocket, path=None, *, scheme):
307 # ignoring path: Already checked in _process_request
308 #
309 # (path is present up to 10.0 and absent in 10.1; keeping it around to
310 # stay compatible with different versions).
312 hostheader = websocket.request.headers["Host"]
313 local_hostinfo = util.hostportsplit(hostheader)
315 remote = WSRemote(
316 self,
317 websocket,
318 self.loop,
319 self.log,
320 scheme=scheme,
321 local_hostinfo=local_hostinfo,
322 )
323 self._pool[remote._poolkey] = remote
325 await self._run_recv_loop(remote)
327 @staticmethod
328 async def _process_request(connection, request):
329 path = connection.request.path
330 if path != "/.well-known/coap":
331 return (http.HTTPStatus.NOT_FOUND, [], b"")
332 # Continue with WebSockets
333 return None
335 # Helpers for WebScoket client
337 def _connect(self, key: PoolKey):
338 self._outgoing_starting[key] = self.loop.create_task(
339 self._connect_task(key),
340 name="WebSocket connection opening to %r" % (key,),
341 )
343 async def _connect_task(self, key: PoolKey):
344 try:
345 ssl_context = self._client_credentials.ssl_client_context(
346 key.scheme, key.hostinfo
347 )
349 hostinfo_split = util.hostportsplit(key.hostinfo)
351 ws_scheme = {"coap+ws": "ws", "coaps+ws": "wss"}[key.scheme]
353 if ws_scheme == "ws" and ssl_context is not None:
354 raise ValueError(
355 "An SSL context was provided for a remote accessed via a plaintext websockets."
356 )
357 if ws_scheme == "wss":
358 if defaults.is_pyodide:
359 if ssl_context is not None:
360 raise ValueError(
361 "The pyodide websocket implementation can't be configured with a non-default context -- connections created in a browser will always use the browser's CA set."
362 )
363 else:
364 if ssl_context is None:
365 # Like with TLSClient, we need to pass in a default
366 # context and can't rely on the websocket library to
367 # use a default one when wss is requested (it doesn't)
368 import ssl
370 ssl_context = ssl.create_default_context()
372 try:
373 websocket = await websockets.connect(
374 "%s://%s/.well-known/coap" % (ws_scheme, key.hostinfo),
375 # Ignoring type: Documentation says strings are OK
376 subprotocols=["coap"], # type: ignore[list-item]
377 ping_interval=None,
378 ssl=ssl_context,
379 )
380 except websockets.exceptions.InvalidStatus as e:
381 raise InvalidStatus from e
383 remote = WSRemote(
384 self,
385 websocket,
386 self.loop,
387 self.log,
388 scheme=key.scheme,
389 remote_hostinfo=hostinfo_split,
390 )
391 assert remote._poolkey == key, "Pool key construction is inconsistent"
392 self._pool[key] = remote
394 self.loop.create_task(
395 self._run_recv_loop(remote),
396 name="WebSocket receive loop for %r" % (key,),
397 )
399 return remote
400 finally:
401 del self._outgoing_starting[key]
403 # Implementation of TokenInterface
405 async def recognize_remote(self, message):
406 return isinstance(message.remote, WSRemote) and message.remote._pool() is self
408 async def determine_remote(self, message):
409 if message.requested_scheme in ("coap+ws", "coaps+ws"):
410 key = PoolKey(message.requested_scheme, message.remote.hostinfo)
412 if key in self._pool:
413 # It would have handled the ConnectionClosed on recv and
414 # removed itself if it was not live.
415 return self._pool[key]
417 if key not in self._outgoing_starting:
418 self._connect(key)
419 # It's a bit unorthodox to wait for an (at least partially)
420 # established connection in determine_remote, but it's
421 # not completely off off either, and it makes it way easier to
422 # not have partially initialized remotes around
423 return await self._outgoing_starting[key]
425 def send_message(self, message, messageerror_monitor):
426 # Ignoring messageerror_monitor: CoAP over reliable transports has no
427 # way of indicating that a particular message was bad, it always shuts
428 # down the complete connection
430 if message.code.is_response():
431 no_response = (message.opt.no_response or 0) & (
432 1 << message.code.class_ - 1
433 ) != 0
434 if no_response:
435 return
437 message.opt.no_response = None
439 message.remote._send_message(message)
441 async def shutdown(self):
442 self._in_shutdown = True
443 self.log.debug("Shutting down any connections on %r", self)
444 self._tokenmanager = None
446 client_shutdowns = [
447 asyncio.create_task(
448 c.release(),
449 name="Close connection %s" % c,
450 )
451 for c in self._pool.values()
452 ]
454 server_shutdowns = []
455 while self._servers:
456 s = self._servers.pop()
457 # We could do something like
458 # >>> for websocket in s.websockets:
459 # >>> del websocket.logger.extra['websocket']
460 # to reduce the reference loops
461 # (websocket.logger.extra['websocket'] == websocket), but as the
462 # tests actually do run a GC collection once and that gets broken
463 # up, it's not worth adding fragilty here
464 s.close()
465 server_shutdowns.append(
466 asyncio.create_task(s.wait_closed(), name="Close server %s" % s),
467 )
469 # Placing client shutdowns before server shutdowns to give them a
470 # chance to send out Abort messages; the .close() method could be more
471 # helpful here by stopping new connections but letting us finish off
472 # the old ones
473 shutdowns = client_shutdowns + server_shutdowns
474 if shutdowns:
475 # wait is documented to require a non-empty set
476 await asyncio.wait(shutdowns)
478 # Incoming message processing
480 async def _run_recv_loop(self, remote):
481 remote._send_initial_csm()
483 while True:
484 try:
485 received = await remote._connection.recv()
486 except websockets.exceptions.ConnectionClosed:
487 # This check is purely needed to silence the warning printed
488 # from tokenmanager, "Internal shutdown sequence msismatch:
489 # error dispatched through tokenmanager after shutdown" -- and
490 # is a symptom of https://github.com/chrysn/aiocoap/issues/284
491 # and of the odd circumstance that we can't easily cancel the
492 # _run_recv_loop tasks (as we should while that issue is
493 # unresolved) in the shutdown handler.
494 if not self._in_shutdown:
495 # FIXME if deposited somewhere, mark that as stale?
496 self._tokenmanager.dispatch_error(
497 error.RemoteServerShutdown("Peer closed connection"), remote
498 )
499 return
501 if not isinstance(received, bytes):
502 await remote._abort_with_waiting(
503 Message(code=ABORT, payload=b"Text frame received"), close_code=1003
504 )
505 return
507 try:
508 msg = _decode_message(received)
509 except error.UnparsableMessage:
510 await remote._abort_with_waiting(
511 Message(code=ABORT, payload=b"Message parsing error"),
512 close_code=1007,
513 )
514 return
516 msg.remote = remote
518 if msg.code.is_signalling():
519 try:
520 remote._process_signaling(msg)
521 except rfc8323common.CloseConnection as e:
522 self._tokenmanager.dispatch_error(e.args[0], msg.remote)
523 self._pool.pop(remote._poolkey)
524 await remote._connection.close()
525 continue
527 if remote._remote_settings is None:
528 remote.abort("No CSM received")
529 return
531 if msg.code.is_response():
532 self._tokenmanager.process_response(msg)
533 # ignoring the return value; unexpected responses can be the
534 # asynchronous result of cancelled observations
535 else:
536 self._tokenmanager.process_request(msg)