Coverage for aiocoap / transports / ws.py: 84%

217 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 12:28 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5""" 

6This module implements a TokenInterface for `CoAP over WebSockets`_. 

7 

8.. _`CoAP over WebSockets`: https://tools.ietf.org/html/rfc8323#section-4 

9 

10As with CoAP-over-TCP, while the transport distinguishes a connection initiator 

11("WebSocket (and TCP) client") and a receiver ("WebSocket (and TCP) server"), 

12both sides can take both roles in CoAP (ie. as a CoAP server and a CoAP 

13client). As the WebSocket client can not possibly be connected to (even by the 

14same server -- once the connection is closed, it's gone and even a new one 

15likely has a different port), aiocoap does not allow expressing their addresses 

16in URIs (given they wouldn't serve their purpose as URLs and don't provide any 

17stability either). Requests to a CoAP-over-WS client can be made by assigning 

18the remote to an outgoing request. 

19 

20Port choice 

21----------- 

22 

23Unlike the other transports, CoAP-over-WS is specified with a privileged port 

24(port 80) as the default port. This is impractical for aiocoap servers for two 

25reasons: 

26 

27 * Unless explicitly configured, aiocoap is typically run as an unprivileged 

28 user (and has no provisions in place to receive a socket by other means 

29 than opening it). 

30 

31 * Where a CoAP-over-WS proxy is run, there is often a "proper" website 

32 running on the same port on a full HTTP server. That server is usually 

33 capable of forwarding requests, whereas the ``websockets`` module used by 

34 aiocoap is in no position to either serve websites nor to proxy to an 

35 underlying server. 

36 

37The recommended setup is therefore to run a full web server at port 80, and 

38configure it to proxy incoming requests for WebSockets at `/.well-known/coap` 

39to aiocoap's server, which defaults to binding to port 8683. 

40 

41The port choice of outgoing connections, or the interpretation of the 

42protocol's default port (ie. the port implied by ``coap+ws://hostname/``) is of 

43course unaffected by this. 

44 

45.. warning:: 

46 

47 Due to a shortcoming of aiocoap's way of specifying ports to bind 

48 to, if a port is explicitly stated to bind to, CoAP-over-WS will bind to that 

49 port plus 3000 (resulting in the abovementioned 8683 for 5683). If TLS server 

50 keys are given, the TLS server is launched on the next port after the HTTP 

51 server (typically 8684). 

52 

53Using on pyodide_ 

54----------------- 

55 

56When use on pyodide_, 

57instead of using the ``websockets`` module, 

58a simplified client-only back-end is used, 

59which utilizes the browser's WebSocket API. 

60 

61.. _pyodide: https://pyodide.org/ 

62""" 

63 

64from __future__ import annotations 

65 

66from typing import Dict, List 

67from collections import namedtuple 

68import asyncio 

69import functools 

70import http 

71import weakref 

72 

73from aiocoap import Message, interfaces, ABORT, util, error 

74from aiocoap.transports import rfc8323common 

75from ..credentials import CredentialsMap 

76from ..defaults import is_pyodide 

77from ..message import Direction 

78 

79if not is_pyodide: 

80 import websockets.asyncio.connection 

81 import websockets.asyncio.server 

82else: 

83 import aiocoap.util.pyodide_websockets as websockets # type: ignore[no-redef] 

84 

85 

86def _decode_message(data: bytes) -> Message: 

87 codeoffset = 1 

88 tokenoffset = 2 

89 

90 tkl = data[0] 

91 if tkl > 8: 

92 raise error.UnparsableMessage("Overly long token") 

93 code = data[codeoffset] 

94 token = data[tokenoffset : tokenoffset + tkl] 

95 

96 msg = Message(code=code, _token=token) 

97 

98 msg.payload = msg.opt.decode(data[tokenoffset + tkl :]) 

99 msg.direction = Direction.INCOMING 

100 

101 return msg 

102 

103 

104def _serialize(msg: Message) -> bytes: 

105 tkl = len(msg.token) 

106 if tkl > 8: 

107 raise ValueError("Overly long token") 

108 

109 data = [ 

110 bytes( 

111 ( 

112 tkl, 

113 msg.code, 

114 ) 

115 ), 

116 msg.token, 

117 msg.opt.encode(), 

118 ] 

119 if msg.payload: 

120 data += [b"\xff", msg.payload] 

121 

122 return b"".join(data) 

123 

124 

125PoolKey = namedtuple("PoolKey", ("scheme", "hostinfo")) 

126 

127 

128class InvalidStatus(error.NetworkError): 

129 def __str__(self): 

130 return str(self.__cause__) 

131 

132 def extra_help(self, hints={}): 

133 return ( 

134 "This indicates that while an HTTP server was reached, " 

135 "it is not offering CoAP-over-WebSockets at the standard-defined location " 

136 "/.well-known/coap." 

137 ) 

138 

139 

140class WSRemote(rfc8323common.RFC8323Remote, interfaces.EndpointAddress): 

141 _connection: websockets.asyncio.connection.Connection 

142 # Only used to ensure that remotes are associated to the right pool -- not 

143 # that there'd be any good reason to have multiple of those. 

144 _pool: weakref.ReferenceType[WSPool] 

145 _poolkey: PoolKey 

146 

147 scheme = None # Override property -- it's per instance here 

148 

149 def __init__( 

150 self, 

151 pool, 

152 connection, 

153 loop, 

154 log, 

155 *, 

156 scheme, 

157 local_hostinfo=None, 

158 remote_hostinfo=None, 

159 ): 

160 super().__init__() 

161 self._pool = weakref.ref(pool) 

162 self._connection = connection 

163 self.loop = loop 

164 self.log = log 

165 

166 self._local_is_server = isinstance( 

167 connection, websockets.asyncio.server.ServerConnection 

168 ) 

169 

170 if local_hostinfo is None: 

171 self._local_hostinfo = self._connection.local_address[:2] 

172 else: 

173 self._local_hostinfo = local_hostinfo 

174 if remote_hostinfo is None: 

175 self._remote_hostinfo = self._connection.remote_address[:2] 

176 else: 

177 self._remote_hostinfo = remote_hostinfo 

178 

179 self.scheme = scheme 

180 

181 # Goes both for client and for server ends; on the server end, it 

182 # ensures that role reversal URIs can be used even when passed as URIs 

183 # and not as remotes (although that's of course only possible locally). 

184 self._poolkey = PoolKey(self.scheme, self.hostinfo) 

185 

186 # Necessary for RFC8323Remote 

187 

188 def _abort_with(self, msg, *, close_code=1002): 

189 # Like _send_message, this may take actual time -- but unlike there, 

190 # there's no need to regulate back-pressure 

191 self.loop.create_task( 

192 self._abort_with_waiting(msg, close_code=close_code), 

193 name="Abortion WebSocket sonnection with %r" % msg, 

194 ) 

195 

196 # Unlike _send_message, this is pulled out of the the _abort_with function 

197 # as it's also used in _run_recv_loop 

198 async def _abort_with_waiting(self, msg, *, close_code): 

199 self.log.debug("Aborting with message: %r", msg) 

200 try: 

201 await self._connection.send(_serialize(msg)) 

202 except Exception as e: 

203 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e) 

204 await self._connection.close(code=close_code) 

205 

206 def _send_message(self, msg): 

207 # FIXME overhaul back-pressure model 

208 async def send(): 

209 self.log.debug("Sending message: %r", msg) 

210 try: 

211 await self._connection.send(_serialize(msg)) 

212 except Exception as e: 

213 self.log.error( 

214 "Sending to a WebSocket should not raise errors", exc_info=e 

215 ) 

216 

217 self.loop.create_task( 

218 send(), 

219 name="WebSocket sending of %r" % msg, 

220 ) 

221 

222 async def release(self): 

223 await super().release() 

224 try: 

225 await self._connection.wait_closed() 

226 except asyncio.CancelledError: 

227 self.log.warning( 

228 "Connection %s was not closed by peer in time after release", self 

229 ) 

230 

231 

232class WSPool(interfaces.TokenInterface): 

233 _outgoing_starting: Dict[PoolKey, asyncio.Task] 

234 _pool: Dict[PoolKey, WSRemote] 

235 

236 _servers: List[websockets.asyncio.server.Server] 

237 

238 def __init__(self, tman, log, loop) -> None: 

239 self.loop = loop 

240 

241 self._pool = {} 

242 self._outgoing_starting = {} 

243 

244 self._servers = [] 

245 

246 # See where it is used for documentation, remove when not needed any more 

247 self._in_shutdown = False 

248 

249 self._tokenmanager = tman 

250 self.log = log 

251 

252 self._client_credentials: CredentialsMap 

253 

254 @classmethod 

255 async def create_transport( 

256 cls, 

257 tman: interfaces.TokenManager, 

258 log, 

259 loop, 

260 *, 

261 client_credentials, 

262 server_bind=None, 

263 server_context=None, 

264 ): 

265 self = cls(tman, log, loop) 

266 

267 self._client_credentials = client_credentials 

268 

269 if server_bind: 

270 host, port = server_bind 

271 if port is None: 

272 port = 8683 

273 elif port != 0: 

274 # FIXME see module documentation 

275 port = port + 3000 

276 

277 server = await websockets.asyncio.server.serve( 

278 functools.partial(self._new_connection, scheme="coap+ws"), 

279 host, 

280 port, 

281 # Ignoring type: Documentation says strings are OK 

282 subprotocols=["coap"], # type: ignore[list-item] 

283 process_request=self._process_request, 

284 ping_interval=None, # "SHOULD NOT be used" 

285 ) 

286 self._servers.append(server) 

287 

288 if server_context is not None: 

289 server = await websockets.asyncio.server.serve( 

290 functools.partial(self._new_connection, scheme="coaps+ws"), 

291 host, 

292 port + 1, 

293 # Ignoring type: Documentation says strings are OK 

294 subprotocols=["coap"], # type: ignore[list-item] 

295 process_request=self._process_request, 

296 ping_interval=None, # "SHOULD NOT be used" 

297 ssl=server_context, 

298 ) 

299 self._servers.append(server) 

300 

301 return self 

302 

303 # Helpers for WebScoket server 

304 

305 async def _new_connection(self, websocket, path=None, *, scheme): 

306 # ignoring path: Already checked in _process_request 

307 # 

308 # (path is present up to 10.0 and absent in 10.1; keeping it around to 

309 # stay compatible with different versions). 

310 

311 hostheader = websocket.request.headers["Host"] 

312 local_hostinfo = util.hostportsplit(hostheader) 

313 

314 remote = WSRemote( 

315 self, 

316 websocket, 

317 self.loop, 

318 self.log, 

319 scheme=scheme, 

320 local_hostinfo=local_hostinfo, 

321 ) 

322 self._pool[remote._poolkey] = remote 

323 

324 await self._run_recv_loop(remote) 

325 

326 @staticmethod 

327 async def _process_request(connection, request): 

328 path = connection.request.path 

329 if path != "/.well-known/coap": 

330 return (http.HTTPStatus.NOT_FOUND, [], b"") 

331 # Continue with WebSockets 

332 return None 

333 

334 # Helpers for WebScoket client 

335 

336 def _connect(self, key: PoolKey): 

337 self._outgoing_starting[key] = self.loop.create_task( 

338 self._connect_task(key), 

339 name="WebSocket connection opening to %r" % (key,), 

340 ) 

341 

342 async def _connect_task(self, key: PoolKey): 

343 try: 

344 ssl_context = self._client_credentials.ssl_client_context( 

345 key.scheme, key.hostinfo 

346 ) 

347 

348 hostinfo_split = util.hostportsplit(key.hostinfo) 

349 

350 ws_scheme = {"coap+ws": "ws", "coaps+ws": "wss"}[key.scheme] 

351 

352 if ws_scheme == "ws" and ssl_context is not None: 

353 raise ValueError( 

354 "An SSL context was provided for a remote accessed via a plaintext websockets." 

355 ) 

356 if ws_scheme == "wss": 

357 if is_pyodide: 

358 if ssl_context is not None: 

359 raise ValueError( 

360 "The pyodide websocket implementation can't be configured with a non-default context -- connections created in a browser will always use the browser's CA set." 

361 ) 

362 else: 

363 if ssl_context is None: 

364 # Like with TLSClient, we need to pass in a default 

365 # context and can't rely on the websocket library to 

366 # use a default one when wss is requested (it doesn't) 

367 import ssl 

368 

369 ssl_context = ssl.create_default_context() 

370 

371 try: 

372 websocket = await websockets.connect( 

373 "%s://%s/.well-known/coap" % (ws_scheme, key.hostinfo), 

374 # Ignoring type: Documentation says strings are OK 

375 subprotocols=["coap"], # type: ignore[list-item] 

376 ping_interval=None, 

377 ssl=ssl_context, 

378 ) 

379 except websockets.exceptions.InvalidStatus as e: 

380 raise InvalidStatus from e 

381 

382 remote = WSRemote( 

383 self, 

384 websocket, 

385 self.loop, 

386 self.log, 

387 scheme=key.scheme, 

388 remote_hostinfo=hostinfo_split, 

389 ) 

390 assert remote._poolkey == key, "Pool key construction is inconsistent" 

391 self._pool[key] = remote 

392 

393 self.loop.create_task( 

394 self._run_recv_loop(remote), 

395 name="WebSocket receive loop for %r" % (key,), 

396 ) 

397 

398 return remote 

399 finally: 

400 del self._outgoing_starting[key] 

401 

402 # Implementation of TokenInterface 

403 

404 async def fill_or_recognize_remote(self, message): 

405 if isinstance(message.remote, WSRemote) and message.remote._pool() is self: 

406 return True 

407 

408 if message.requested_scheme in ("coap+ws", "coaps+ws"): 

409 key = PoolKey(message.requested_scheme, message.remote.hostinfo) 

410 

411 if key in self._pool: 

412 message.remote = self._pool[key] 

413 # It would have handled the ConnectionClosed on recv and 

414 # removed itself if it was not live. 

415 return True 

416 

417 if key not in self._outgoing_starting: 

418 self._connect(key) 

419 # It's a bit unorthodox to wait for an (at least partially) 

420 # established connection in fill_or_recognize_remote, but it's 

421 # not completely off off either, and it makes it way easier to 

422 # not have partially initialized remotes around 

423 message.remote = await self._outgoing_starting[key] 

424 return True 

425 

426 return False 

427 

428 def send_message(self, message, messageerror_monitor): 

429 # Ignoring messageerror_monitor: CoAP over reliable transports has no 

430 # way of indicating that a particular message was bad, it always shuts 

431 # down the complete connection 

432 

433 if message.code.is_response(): 

434 no_response = (message.opt.no_response or 0) & ( 

435 1 << message.code.class_ - 1 

436 ) != 0 

437 if no_response: 

438 return 

439 

440 message.opt.no_response = None 

441 

442 message.remote._send_message(message) 

443 

444 async def shutdown(self): 

445 self._in_shutdown = True 

446 self.log.debug("Shutting down any connections on %r", self) 

447 self._tokenmanager = None 

448 

449 client_shutdowns = [ 

450 asyncio.create_task( 

451 c.release(), 

452 name="Close connection %s" % c, 

453 ) 

454 for c in self._pool.values() 

455 ] 

456 

457 server_shutdowns = [] 

458 while self._servers: 

459 s = self._servers.pop() 

460 # We could do something like 

461 # >>> for websocket in s.websockets: 

462 # >>> del websocket.logger.extra['websocket'] 

463 # to reduce the reference loops 

464 # (websocket.logger.extra['websocket'] == websocket), but as the 

465 # tests actually do run a GC collection once and that gets broken 

466 # up, it's not worth adding fragilty here 

467 s.close() 

468 server_shutdowns.append( 

469 asyncio.create_task(s.wait_closed(), name="Close server %s" % s), 

470 ) 

471 

472 # Placing client shutdowns before server shutdowns to give them a 

473 # chance to send out Abort messages; the .close() method could be more 

474 # helpful here by stopping new connections but letting us finish off 

475 # the old ones 

476 shutdowns = client_shutdowns + server_shutdowns 

477 if shutdowns: 

478 # wait is documented to require a non-empty set 

479 await asyncio.wait(shutdowns) 

480 

481 # Incoming message processing 

482 

483 async def _run_recv_loop(self, remote): 

484 remote._send_initial_csm() 

485 

486 while True: 

487 try: 

488 received = await remote._connection.recv() 

489 except websockets.exceptions.ConnectionClosed: 

490 # This check is purely needed to silence the warning printed 

491 # from tokenmanager, "Internal shutdown sequence msismatch: 

492 # error dispatched through tokenmanager after shutdown" -- and 

493 # is a symptom of https://github.com/chrysn/aiocoap/issues/284 

494 # and of the odd circumstance that we can't easily cancel the 

495 # _run_recv_loop tasks (as we should while that issue is 

496 # unresolved) in the shutdown handler. 

497 if not self._in_shutdown: 

498 # FIXME if deposited somewhere, mark that as stale? 

499 self._tokenmanager.dispatch_error( 

500 error.RemoteServerShutdown("Peer closed connection"), remote 

501 ) 

502 return 

503 

504 if not isinstance(received, bytes): 

505 await remote._abort_with_waiting( 

506 Message(code=ABORT, payload=b"Text frame received"), close_code=1003 

507 ) 

508 return 

509 

510 try: 

511 msg = _decode_message(received) 

512 except error.UnparsableMessage: 

513 await remote._abort_with_waiting( 

514 Message(code=ABORT, payload=b"Message parsing error"), 

515 close_code=1007, 

516 ) 

517 return 

518 

519 msg.remote = remote 

520 

521 if msg.code.is_signalling(): 

522 try: 

523 remote._process_signaling(msg) 

524 except rfc8323common.CloseConnection as e: 

525 self._tokenmanager.dispatch_error(e.args[0], msg.remote) 

526 self._pool.pop(remote._poolkey) 

527 await remote._connection.close() 

528 continue 

529 

530 if remote._remote_settings is None: 

531 remote.abort("No CSM received") 

532 return 

533 

534 if msg.code.is_response(): 

535 self._tokenmanager.process_response(msg) 

536 # ignoring the return value; unexpected responses can be the 

537 # asynchronous result of cancelled observations 

538 else: 

539 self._tokenmanager.process_request(msg)