Coverage for src/aiocoap/transports/ws.py: 0%
206 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-20 17:26 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-20 17:26 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""
6This moduel implements a TokenInterface for `CoAP over WebSockets`_.
8.. _`CoAP over WebSockets`: https://tools.ietf.org/html/rfc8323#section-4
10As with CoAP-over-TCP, while the transport distinguishes a connection initiator
11("WebSocket (and TCP) client") and a receiver ("WebSocket (and TCP) server"),
12both sides can take both roles in CoAP (ie. as a CoAP server and a CoAP
13client). As the WebSocket client can not possibly be connected to (even by the
14same server -- once the connection is closed, it's gone and even a new one
15likely has a different port), aiocoap does not allow expressing their addresses
16in URIs (given they wouldn't serve their purpose as URLs and don't provide any
17stability either). Requests to a CoAP-over-WS client can be made by assigning
18the remote to an outgoing request.
20Port choice
21-----------
23Unlike the other transports, CoAP-over-WS is specified with a privileged port
24(port 80) as the default port. This is impractical for aiocoap servers for two
25reasons:
27 * Unless explicitly configured, aiocoap is typically run as an unprivileged
28 user (and has no provisions in place to receive a socket by other means
29 than opening it).
31 * Where a CoAP-over-WS proxy is run, there is often a "proper" website
32 running on the same port on a full HTTP server. That server is usually
33 capable of forwarding requests, whereas the ``websockets`` module used by
34 aiocoap is in no position to either serve websites nor to proxy to an
35 underlying server.
37The recommended setup is therefore to run a full web server at port 80, and
38configure it to proxy incoming requests for WebSockets at `/.well-known/coap`
39to aiocoap's server, which defaults to binding to port 8683.
41The port choice of outgoing connections, or the interpretation of the
42protocol's default port (ie. the port implied by ``coap+ws://hostname/``) is of
43course unaffected by this.
45.. warning::
47 Due to a shortcoming of aiocoap's way of specifying ports to bind
48 to, if a port is explicitly stated to bind to, CoAP-over-WS will bind to that
49 port plus 3000 (resulting in the abovementioned 8683 for 5683). If TLS server
50 keys are given, the TLS server is launched on the next port after the HTTP
51 server (typically 8684).
53Using on pyodide_
54-----------------
56When use on pyodide_,
57instead of using the ``websockets`` module,
58a simplified client-only back-end is used,
59which utilizes the browser's WebSocket API.
61.. _pyodide: https://pyodide.org/
62"""
64from __future__ import annotations
66from typing import Dict, List
67from collections import namedtuple
68import asyncio
69import functools
70import http
71import weakref
73from aiocoap import Message, interfaces, ABORT, util, error
74from aiocoap.transports import rfc8323common
75from ..credentials import CredentialsMap
76from ..defaults import is_pyodide
78if not is_pyodide:
79 import websockets.asyncio.connection
80 import websockets.asyncio.server
81else:
82 import aiocoap.util.pyodide_websockets as websockets # type: ignore[no-redef]
85def _decode_message(data: bytes) -> Message:
86 codeoffset = 1
87 tokenoffset = 2
89 tkl = data[0]
90 if tkl > 8:
91 raise error.UnparsableMessage("Overly long token")
92 code = data[codeoffset]
93 token = data[tokenoffset : tokenoffset + tkl]
95 msg = Message(code=code, token=token)
97 msg.payload = msg.opt.decode(data[tokenoffset + tkl :])
99 return msg
102def _serialize(msg: Message) -> bytes:
103 tkl = len(msg.token)
104 if tkl > 8:
105 raise ValueError("Overly long token")
107 data = [
108 bytes(
109 (
110 tkl,
111 msg.code,
112 )
113 ),
114 msg.token,
115 msg.opt.encode(),
116 ]
117 if msg.payload:
118 data += [b"\xff", msg.payload]
120 return b"".join(data)
123PoolKey = namedtuple("PoolKey", ("scheme", "hostinfo"))
126class WSRemote(rfc8323common.RFC8323Remote, interfaces.EndpointAddress):
127 _connection: websockets.asyncio.connection.Connection
128 # Only used to ensure that remotes are associated to the right pool -- not
129 # that there'd be any good reason to have multiple of those.
130 _pool: weakref.ReferenceType[WSPool]
131 _poolkey: PoolKey
133 scheme = None # Override property -- it's per instance here
135 def __init__(
136 self,
137 pool,
138 connection,
139 loop,
140 log,
141 *,
142 scheme,
143 local_hostinfo=None,
144 remote_hostinfo=None,
145 ):
146 super().__init__()
147 self._pool = weakref.ref(pool)
148 self._connection = connection
149 self.loop = loop
150 self.log = log
152 self._local_is_server = isinstance(
153 connection, websockets.asyncio.server.ServerConnection
154 )
156 if local_hostinfo is None:
157 self._local_hostinfo = self._connection.local_address[:2]
158 else:
159 self._local_hostinfo = local_hostinfo
160 if remote_hostinfo is None:
161 self._remote_hostinfo = self._connection.remote_address[:2]
162 else:
163 self._remote_hostinfo = remote_hostinfo
165 self.scheme = scheme
167 # Goes both for client and for server ends; on the server end, it
168 # ensures that role reversal URIs can be used even when passed as URIs
169 # and not as remotes (although that's of course only possible locally).
170 self._poolkey = PoolKey(self.scheme, self.hostinfo)
172 # Necessary for RFC8323Remote
174 def _abort_with(self, msg, *, close_code=1002):
175 # Like _send_message, this may take actual time -- but unlike there,
176 # there's no need to regulate back-pressure
177 self.loop.create_task(
178 self._abort_with_waiting(msg, close_code=close_code),
179 name="Abortion WebSocket sonnection with %r" % msg,
180 )
182 # Unlike _send_message, this is pulled out of the the _abort_with function
183 # as it's also used in _run_recv_loop
184 async def _abort_with_waiting(self, msg, *, close_code):
185 self.log.debug("Aborting with message: %r", msg)
186 try:
187 await self._connection.send(_serialize(msg))
188 except Exception as e:
189 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e)
190 await self._connection.close(code=close_code)
192 def _send_message(self, msg):
193 # FIXME overhaul back-pressure model
194 async def send():
195 self.log.debug("Sending message: %r", msg)
196 try:
197 await self._connection.send(_serialize(msg))
198 except Exception as e:
199 self.log.error(
200 "Sending to a WebSocket should not raise errors", exc_info=e
201 )
203 self.loop.create_task(
204 send(),
205 name="WebSocket sending of %r" % msg,
206 )
208 async def release(self):
209 await super().release()
210 try:
211 await self._connection.wait_closed()
212 except asyncio.CancelledError:
213 self.log.warning(
214 "Connection %s was not closed by peer in time after release", self
215 )
218class WSPool(interfaces.TokenInterface):
219 _outgoing_starting: Dict[PoolKey, asyncio.Task]
220 _pool: Dict[PoolKey, WSRemote]
222 _servers: List[websockets.asyncio.server.Server]
224 def __init__(self, tman, log, loop) -> None:
225 self.loop = loop
227 self._pool = {}
228 self._outgoing_starting = {}
230 self._servers = []
232 # See where it is used for documentation, remove when not needed any more
233 self._in_shutdown = False
235 self._tokenmanager = tman
236 self.log = log
238 self._client_credentials: CredentialsMap
240 @classmethod
241 async def create_transport(
242 cls,
243 tman: interfaces.TokenManager,
244 log,
245 loop,
246 *,
247 client_credentials,
248 server_bind=None,
249 server_context=None,
250 ):
251 self = cls(tman, log, loop)
253 self._client_credentials = client_credentials
255 if server_bind:
256 host, port = server_bind
257 if port is None:
258 port = 8683
259 elif port != 0:
260 # FIXME see module documentation
261 port = port + 3000
263 server = await websockets.asyncio.server.serve(
264 functools.partial(self._new_connection, scheme="coap+ws"),
265 host,
266 port,
267 # Ignoring type: Documentation says strings are OK
268 subprotocols=["coap"], # type: ignore[list-item]
269 process_request=self._process_request,
270 ping_interval=None, # "SHOULD NOT be used"
271 )
272 self._servers.append(server)
274 if server_context is not None:
275 server = await websockets.asyncio.server.serve(
276 functools.partial(self._new_connection, scheme="coaps+ws"),
277 host,
278 port + 1,
279 # Ignoring type: Documentation says strings are OK
280 subprotocols=["coap"], # type: ignore[list-item]
281 process_request=self._process_request,
282 ping_interval=None, # "SHOULD NOT be used"
283 ssl=server_context,
284 )
285 self._servers.append(server)
287 return self
289 # Helpers for WebScoket server
291 async def _new_connection(self, websocket, path=None, *, scheme):
292 # ignoring path: Already checked in _process_request
293 #
294 # (path is present up to 10.0 and absent in 10.1; keeping it around to
295 # stay compatible with different versions).
297 hostheader = websocket.request.headers["Host"]
298 local_hostinfo = util.hostportsplit(hostheader)
300 remote = WSRemote(
301 self,
302 websocket,
303 self.loop,
304 self.log,
305 scheme=scheme,
306 local_hostinfo=local_hostinfo,
307 )
308 self._pool[remote._poolkey] = remote
310 await self._run_recv_loop(remote)
312 @staticmethod
313 async def _process_request(connection, request):
314 path = connection.request.path
315 if path != "/.well-known/coap":
316 return (http.HTTPStatus.NOT_FOUND, [], b"")
317 # Continue with WebSockets
318 return None
320 # Helpers for WebScoket client
322 def _connect(self, key: PoolKey):
323 self._outgoing_starting[key] = self.loop.create_task(
324 self._connect_task(key),
325 name="WebSocket connection opening to %r" % (key,),
326 )
328 async def _connect_task(self, key: PoolKey):
329 try:
330 ssl_context = self._client_credentials.ssl_client_context(
331 key.scheme, key.hostinfo
332 )
334 hostinfo_split = util.hostportsplit(key.hostinfo)
336 ws_scheme = {"coap+ws": "ws", "coaps+ws": "wss"}[key.scheme]
338 if ws_scheme == "ws" and ssl_context is not None:
339 raise ValueError(
340 "An SSL context was provided for a remote accessed via a plaintext websockets."
341 )
342 if ws_scheme == "wss":
343 if is_pyodide:
344 if ssl_context is not None:
345 raise ValueError(
346 "The pyodide websocket implementation can't be configured with a non-default context -- connections created in a browser will always use the browser's CA set."
347 )
348 else:
349 if ssl_context is None:
350 # Like with TLSClient, we need to pass in a default
351 # context and can't rely on the websocket library to
352 # use a default one when wss is requested (it doesn't)
353 import ssl
355 ssl_context = ssl.create_default_context()
357 websocket = await websockets.connect(
358 "%s://%s/.well-known/coap" % (ws_scheme, key.hostinfo),
359 # Ignoring type: Documentation says strings are OK
360 subprotocols=["coap"], # type: ignore[list-item]
361 ping_interval=None,
362 ssl=ssl_context,
363 )
365 remote = WSRemote(
366 self,
367 websocket,
368 self.loop,
369 self.log,
370 scheme=key.scheme,
371 remote_hostinfo=hostinfo_split,
372 )
373 assert remote._poolkey == key, "Pool key construction is inconsistent"
374 self._pool[key] = remote
376 self.loop.create_task(
377 self._run_recv_loop(remote),
378 name="WebSocket receive loop for %r" % (key,),
379 )
381 return remote
382 finally:
383 del self._outgoing_starting[key]
385 # Implementation of TokenInterface
387 async def fill_or_recognize_remote(self, message):
388 if isinstance(message.remote, WSRemote) and message.remote._pool() is self:
389 return True
391 if message.requested_scheme in ("coap+ws", "coaps+ws"):
392 key = PoolKey(message.requested_scheme, message.remote.hostinfo)
394 if key in self._pool:
395 message.remote = self._pool[key]
396 # It would have handled the ConnectionClosed on recv and
397 # removed itself if it was not live.
398 return True
400 if key not in self._outgoing_starting:
401 self._connect(key)
402 # It's a bit unorthodox to wait for an (at least partially)
403 # established connection in fill_or_recognize_remote, but it's
404 # not completely off off either, and it makes it way easier to
405 # not have partially initialized remotes around
406 message.remote = await self._outgoing_starting[key]
407 return True
409 return False
411 def send_message(self, message, messageerror_monitor):
412 # Ignoring messageerror_monitor: CoAP over reliable transports has no
413 # way of indicating that a particular message was bad, it always shuts
414 # down the complete connection
416 if message.code.is_response():
417 no_response = (message.opt.no_response or 0) & (
418 1 << message.code.class_ - 1
419 ) != 0
420 if no_response:
421 return
423 message.opt.no_response = None
425 message.remote._send_message(message)
427 async def shutdown(self):
428 self._in_shutdown = True
429 self.log.debug("Shutting down any connections on %r", self)
431 client_shutdowns = [
432 asyncio.create_task(
433 c.release(),
434 name="Close connection %s" % c,
435 )
436 for c in self._pool.values()
437 ]
439 server_shutdowns = []
440 while self._servers:
441 s = self._servers.pop()
442 # We could do something like
443 # >>> for websocket in s.websockets:
444 # >>> del websocket.logger.extra['websocket']
445 # to reduce the reference loops
446 # (websocket.logger.extra['websocket'] == websocket), but as the
447 # tests actually do run a GC collection once and that gets broken
448 # up, it's not worth adding fragilty here
449 s.close()
450 server_shutdowns.append(
451 asyncio.create_task(s.wait_closed(), name="Close server %s" % s),
452 )
454 # Placing client shutdowns before server shutdowns to give them a
455 # chance to send out Abort messages; the .close() method could be more
456 # helpful here by stopping new connections but letting us finish off
457 # the old ones
458 shutdowns = client_shutdowns + server_shutdowns
459 if shutdowns:
460 # wait is documented to require a non-empty set
461 await asyncio.wait(shutdowns)
463 # Incoming message processing
465 async def _run_recv_loop(self, remote):
466 remote._send_initial_csm()
468 while True:
469 try:
470 received = await remote._connection.recv()
471 except websockets.exceptions.ConnectionClosed:
472 # This check is purely needed to silence the warning printed
473 # from tokenmanager, "Internal shutdown sequence msismatch:
474 # error dispatched through tokenmanager after shutdown" -- and
475 # is a symptom of https://github.com/chrysn/aiocoap/issues/284
476 # and of the odd circumstance that we can't easily cancel the
477 # _run_recv_loop tasks (as we should while that issue is
478 # unresolved) in the shutdown handler.
479 if not self._in_shutdown:
480 # FIXME if deposited somewhere, mark that as stale?
481 self._tokenmanager.dispatch_error(
482 error.RemoteServerShutdown("Peer closed connection"), remote
483 )
484 return
486 if not isinstance(received, bytes):
487 await remote._abort_with_waiting(
488 Message(code=ABORT, payload=b"Text frame received"), close_code=1003
489 )
490 return
492 try:
493 msg = _decode_message(received)
494 except error.UnparsableMessage:
495 await remote._abort_with_waiting(
496 Message(code=ABORT, payload=b"Message parsing error"),
497 close_code=1007,
498 )
499 return
501 msg.remote = remote
503 if msg.code.is_signalling():
504 try:
505 remote._process_signaling(msg)
506 except rfc8323common.CloseConnection as e:
507 self._tokenmanager.dispatch_error(e.args[0], msg.remote)
508 self._pool.pop(remote._poolkey)
509 await remote._connection.close()
510 continue
512 if remote._remote_settings is None:
513 remote.abort("No CSM received")
514 return
516 if msg.code.is_response():
517 self._tokenmanager.process_response(msg)
518 # ignoring the return value; unexpected responses can be the
519 # asynchronous result of cancelled observations
520 else:
521 self._tokenmanager.process_request(msg)