Coverage for aiocoap/transports/ws.py: 85%
206 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""
6This moduel implements a TokenInterface for `CoAP over WebSockets`_.
8.. _`CoAP over WebSockets`: https://tools.ietf.org/html/rfc8323#section-4
10As with CoAP-over-TCP, while the transport distinguishes a connection initiator
11("WebSocket (and TCP) client") and a receiver ("WebSocket (and TCP) server"),
12both sides can take both roles in CoAP (ie. as a CoAP server and a CoAP
13client). As the WebSocket client can not possibly be connected to (even by the
14same server -- once the connection is closed, it's gone and even a new one
15likely has a different port), aiocoap does not allow expressing their addresses
16in URIs (given they wouldn't serve their purpose as URLs and don't provide any
17stability either). Requests to a CoAP-over-WS client can be made by assigning
18the remote to an outgoing request.
20Port choice
21-----------
23Unlike the other transports, CoAP-over-WS is specified with a privileged port
24(port 80) as the default port. This is impractical for aiocoap servers for two
25reasons:
27 * Unless explicitly configured, aiocoap is typically run as an unprivileged
28 user (and has no provisions in place to receive a socket by other means
29 than opening it).
31 * Where a CoAP-over-WS proxy is run, there is often a "proper" website
32 running on the same port on a full HTTP server. That server is usually
33 capable of forwarding requests, whereas the ``websockets`` module used by
34 aiocoap is in no position to either serve websites nor to proxy to an
35 underlying server.
37The recommended setup is therefore to run a full web server at port 80, and
38configure it to proxy incoming requests for WebSockets at `/.well-known/coap`
39to aiocoap's server, which defaults to binding to port 8683.
41The port choice of outgoing connections, or the interpretation of the
42protocol's default port (ie. the port implied by ``coap+ws://hostname/``) is of
43course unaffected by this.
45.. warning::
47 Due to a shortcoming of aiocoap's way of specifying ports to bind
48 to, if a port is explicitly stated to bind to, CoAP-over-WS will bind to that
49 port plus 3000 (resulting in the abovementioned 8683 for 5683). If TLS server
50 keys are given, the TLS server is launched on the next port after the HTTP
51 server (typically 8684).
53Using on pyodide_
54-----------------
56When use on pyodide_,
57instead of using the ``websockets`` module,
58a simplified client-only back-end is used,
59which utilizes the browser's WebSocket API.
61.. _pyodide: https://pyodide.org/
62"""
64from __future__ import annotations
66from typing import Dict, List
67from collections import namedtuple
68import asyncio
69import functools
70import http
71import weakref
73from aiocoap import Message, interfaces, ABORT, util, error
74from aiocoap.transports import rfc8323common
75from ..credentials import CredentialsMap
76from ..defaults import is_pyodide
78if is_pyodide:
79 import aiocoap.util.pyodide_websockets as websockets
80else:
81 import websockets # type: ignore
84def _decode_message(data: bytes) -> Message:
85 codeoffset = 1
86 tokenoffset = 2
88 tkl = data[0]
89 if tkl > 8:
90 raise error.UnparsableMessage("Overly long token")
91 code = data[codeoffset]
92 token = data[tokenoffset : tokenoffset + tkl]
94 msg = Message(code=code, token=token)
96 msg.payload = msg.opt.decode(data[tokenoffset + tkl :])
98 return msg
101def _serialize(msg: Message) -> bytes:
102 tkl = len(msg.token)
103 if tkl > 8:
104 raise ValueError("Overly long token")
106 data = [
107 bytes(
108 (
109 tkl,
110 msg.code,
111 )
112 ),
113 msg.token,
114 msg.opt.encode(),
115 ]
116 if msg.payload:
117 data += [b"\xff", msg.payload]
119 return b"".join(data)
122PoolKey = namedtuple("PoolKey", ("scheme", "hostinfo"))
125class WSRemote(rfc8323common.RFC8323Remote, interfaces.EndpointAddress):
126 _connection: websockets.WebSocketCommonProtocol
127 # Only used to ensure that remotes are associated to the right pool -- not
128 # that there'd be any good reason to have multiple of those.
129 _pool: weakref.ReferenceType[WSPool]
131 scheme = None # Override property -- it's per instance here
133 def __init__(
134 self,
135 pool,
136 connection,
137 loop,
138 log,
139 *,
140 scheme,
141 local_hostinfo=None,
142 remote_hostinfo=None,
143 ):
144 super().__init__()
145 self._pool = weakref.ref(pool)
146 self._connection = connection
147 self.loop = loop
148 self.log = log
150 self._local_is_server = isinstance(
151 connection, websockets.WebSocketServerProtocol
152 )
154 if local_hostinfo is None:
155 self._local_hostinfo = self._connection.local_address[:2]
156 else:
157 self._local_hostinfo = local_hostinfo
158 if remote_hostinfo is None:
159 self._remote_hostinfo = self._connection.remote_address[:2]
160 else:
161 self._remote_hostinfo = remote_hostinfo
163 self.scheme = scheme
165 # Goes both for client and for server ends; on the server end, it
166 # ensures that role reversal URIs can be used even when passed as URIs
167 # and not as remotes (although that's of course only possible locally).
168 self._poolkey = PoolKey(self.scheme, self.hostinfo)
170 # Necessary for RFC8323Remote
172 def _abort_with(self, msg, *, close_code=1002):
173 # Like _send_message, this may take actual time -- but unlike there,
174 # there's no need to regulate back-pressure
175 self.loop.create_task(
176 self._abort_with_waiting(msg, close_code=close_code),
177 name="Abortion WebSocket sonnection with %r" % msg,
178 )
180 # Unlike _send_message, this is pulled out of the the _abort_with function
181 # as it's also used in _run_recv_loop
182 async def _abort_with_waiting(self, msg, *, close_code):
183 self.log.debug("Aborting with message: %r", msg)
184 try:
185 await self._connection.send(_serialize(msg))
186 except Exception as e:
187 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e)
188 await self._connection.close(code=close_code)
190 def _send_message(self, msg):
191 # FIXME overhaul back-pressure model
192 async def send():
193 self.log.debug("Sending message: %r", msg)
194 try:
195 await self._connection.send(_serialize(msg))
196 except Exception as e:
197 self.log.error(
198 "Sending to a WebSocket should not raise errors", exc_info=e
199 )
201 self.loop.create_task(
202 send(),
203 name="WebSocket sending of %r" % msg,
204 )
206 async def release(self):
207 await super().release()
208 try:
209 await self._connection.wait_closed()
210 except asyncio.CancelledError:
211 self.log.warning(
212 "Connection %s was not closed by peer in time after release", self
213 )
216class WSPool(interfaces.TokenInterface):
217 _outgoing_starting: Dict[PoolKey, asyncio.Task]
218 _pool: Dict[PoolKey, WSRemote]
220 _servers: List[websockets.WebSocketServer]
222 def __init__(self, tman, log, loop) -> None:
223 self.loop = loop
225 self._pool = {}
226 self._outgoing_starting = {}
228 self._servers = []
230 # See where it is used for documentation, remove when not needed any more
231 self._in_shutdown = False
233 self._tokenmanager = tman
234 self.log = log
236 self._client_credentials: CredentialsMap
238 @classmethod
239 async def create_transport(
240 cls,
241 tman: interfaces.TokenManager,
242 log,
243 loop,
244 *,
245 client_credentials,
246 server_bind=None,
247 server_context=None,
248 ):
249 self = cls(tman, log, loop)
251 self._client_credentials = client_credentials
253 if server_bind:
254 host, port = server_bind
255 if port is None:
256 port = 8683
257 elif port != 0:
258 # FIXME see module documentation
259 port = port + 3000
261 server = await websockets.serve(
262 functools.partial(self._new_connection, scheme="coap+ws"),
263 host,
264 port,
265 subprotocols=["coap"],
266 process_request=self._process_request,
267 ping_interval=None, # "SHOULD NOT be used"
268 )
269 self._servers.append(server)
271 if server_context is not None:
272 server = await websockets.serve(
273 functools.partial(self._new_connection, scheme="coaps+ws"),
274 host,
275 port + 1,
276 subprotocols=["coap"],
277 process_request=self._process_request,
278 ping_interval=None, # "SHOULD NOT be used"
279 ssl=server_context,
280 )
281 self._servers.append(server)
283 return self
285 # Helpers for WebScoket server
287 async def _new_connection(self, websocket, path=None, *, scheme):
288 # ignoring path: Already checked in _process_request
289 #
290 # (path is present up to 10.0 and absent in 10.1; keeping it around to
291 # stay compatible with different versions).
293 hostheader = websocket.request_headers["Host"]
294 if hostheader.count(":") > 1 and "[" not in hostheader:
295 # Workaround for websockets version before
296 # https://github.com/aaugustin/websockets/issues/802
297 #
298 # To be removed once a websockets version with this fix can be
299 # depended on
300 hostheader = (
301 "["
302 + hostheader[: hostheader.rfind(":")]
303 + "]"
304 + hostheader[hostheader.rfind(":") :]
305 )
306 local_hostinfo = util.hostportsplit(hostheader)
308 remote = WSRemote(
309 self,
310 websocket,
311 self.loop,
312 self.log,
313 scheme=scheme,
314 local_hostinfo=local_hostinfo,
315 )
316 self._pool[remote._poolkey] = remote
318 await self._run_recv_loop(remote)
320 @staticmethod
321 async def _process_request(path, request_headers):
322 if path != "/.well-known/coap":
323 return (http.HTTPStatus.NOT_FOUND, [], b"")
324 # Continue with WebSockets
325 return None
327 # Helpers for WebScoket client
329 def _connect(self, key: PoolKey):
330 self._outgoing_starting[key] = self.loop.create_task(
331 self._connect_task(key),
332 name="WebSocket connection opening to %r" % (key,),
333 )
335 async def _connect_task(self, key: PoolKey):
336 try:
337 ssl_context = self._client_credentials.ssl_client_context(
338 key.scheme, key.hostinfo
339 )
341 hostinfo_split = util.hostportsplit(key.hostinfo)
343 ws_scheme = {"coap+ws": "ws", "coaps+ws": "wss"}[key.scheme]
345 if ws_scheme == "ws" and ssl_context is not None:
346 raise ValueError(
347 "An SSL context was provided for a remote accessed via a plaintext websockets."
348 )
349 if ws_scheme == "wss":
350 if is_pyodide:
351 if ssl_context is not None:
352 raise ValueError(
353 "The pyodide websocket implementation can't be configured with a non-default context -- connections created in a browser will always use the browser's CA set."
354 )
355 else:
356 if ssl_context is None:
357 # Like with TLSClient, we need to pass in a default
358 # context and can't rely on the websocket library to
359 # use a default one when wss is requested (it doesn't)
360 import ssl
362 ssl_context = ssl.create_default_context()
364 websocket = await websockets.connect(
365 "%s://%s/.well-known/coap" % (ws_scheme, key.hostinfo),
366 subprotocols=["coap"],
367 ping_interval=None,
368 ssl=ssl_context,
369 )
371 remote = WSRemote(
372 self,
373 websocket,
374 self.loop,
375 self.log,
376 scheme=key.scheme,
377 remote_hostinfo=hostinfo_split,
378 )
379 assert remote._poolkey == key, "Pool key construction is inconsistent"
380 self._pool[key] = remote
382 self.loop.create_task(
383 self._run_recv_loop(remote),
384 name="WebSocket receive loop for %r" % (key,),
385 )
387 return remote
388 finally:
389 del self._outgoing_starting[key]
391 # Implementation of TokenInterface
393 async def fill_or_recognize_remote(self, message):
394 if isinstance(message.remote, WSRemote) and message.remote._pool() is self:
395 return True
397 if message.requested_scheme in ("coap+ws", "coaps+ws"):
398 key = PoolKey(message.requested_scheme, message.remote.hostinfo)
400 if key in self._pool:
401 message.remote = self._pool[key]
402 if message.remote._connection.open:
403 return True
404 # else try opening a new one
406 if key not in self._outgoing_starting:
407 self._connect(key)
408 # It's a bit unorthodox to wait for an (at least partially)
409 # established connection in fill_or_recognize_remote, but it's
410 # not completely off off either, and it makes it way easier to
411 # not have partially initialized remotes around
412 message.remote = await self._outgoing_starting[key]
413 return True
415 return False
417 def send_message(self, message, messageerror_monitor):
418 # Ignoring messageerror_monitor: CoAP over reliable transports has no
419 # way of indicating that a particular message was bad, it always shuts
420 # down the complete connection
422 if message.code.is_response():
423 no_response = (message.opt.no_response or 0) & (
424 1 << message.code.class_ - 1
425 ) != 0
426 if no_response:
427 return
429 message.opt.no_response = None
431 message.remote._send_message(message)
433 async def shutdown(self):
434 self._in_shutdown = True
435 self.log.debug("Shutting down any connections on %r", self)
437 client_shutdowns = [
438 asyncio.create_task(
439 c.release(),
440 name="Close connection %s" % c,
441 )
442 for c in self._pool.values()
443 ]
445 server_shutdowns = []
446 while self._servers:
447 s = self._servers.pop()
448 # We could do something like
449 # >>> for websocket in s.websockets:
450 # >>> del websocket.logger.extra['websocket']
451 # to reduce the reference loops
452 # (websocket.logger.extra['websocket'] == websocket), but as the
453 # tests actually do run a GC collection once and that gets broken
454 # up, it's not worth adding fragilty here
455 s.close()
456 server_shutdowns.append(
457 asyncio.create_task(s.wait_closed(), name="Close server %s" % s),
458 )
460 # Placing client shutdowns before server shutdowns to give them a
461 # chance to send out Abort messages; the .close() method could be more
462 # helpful here by stopping new connections but letting us finish off
463 # the old ones
464 shutdowns = client_shutdowns + server_shutdowns
465 if shutdowns:
466 # wait is documented to require a non-empty set
467 await asyncio.wait(shutdowns)
469 # Incoming message processing
471 async def _run_recv_loop(self, remote):
472 remote._send_initial_csm()
474 while True:
475 try:
476 received = await remote._connection.recv()
477 except websockets.exceptions.ConnectionClosed:
478 # This check is purely needed to silence the warning printed
479 # from tokenmanager, "Internal shutdown sequence msismatch:
480 # error dispatched through tokenmanager after shutdown" -- and
481 # is a symptom of https://github.com/chrysn/aiocoap/issues/284
482 # and of the odd circumstance that we can't easily cancel the
483 # _run_recv_loop tasks (as we should while that issue is
484 # unresolved) in the shutdown handler.
485 if not self._in_shutdown:
486 # FIXME if deposited somewhere, mark that as stale?
487 self._tokenmanager.dispatch_error(
488 error.RemoteServerShutdown("Peer closed connection"), remote
489 )
490 return
492 if not isinstance(received, bytes):
493 await remote._abort_with_waiting(
494 Message(code=ABORT, payload=b"Text frame received"), close_code=1003
495 )
496 return
498 try:
499 msg = _decode_message(received)
500 except error.UnparsableMessage:
501 await remote._abort_with_waiting(
502 Message(code=ABORT, payload=b"Message parsing error"),
503 close_code=1007,
504 )
505 return
507 msg.remote = remote
509 if msg.code.is_signalling():
510 try:
511 remote._process_signaling(msg)
512 except rfc8323common.CloseConnection as e:
513 self._tokenmanager.dispatch_error(e.args[0], msg.remote)
514 self._pool.pop(remote._poolkey)
515 await remote._connection.close()
516 continue
518 if remote._remote_settings is None:
519 remote.abort("No CSM received")
520 return
522 if msg.code.is_response():
523 self._tokenmanager.process_response(msg)
524 # ignoring the return value; unexpected responses can be the
525 # asynchronous result of cancelled observations
526 else:
527 self._tokenmanager.process_request(msg)