Coverage for aiocoap/transports/ws.py: 0%

207 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-18 09:25 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5""" 

6This moduel implements a TokenInterface for `CoAP over WebSockets`_. 

7 

8.. _`CoAP over WebSockets`: https://tools.ietf.org/html/rfc8323#section-4 

9 

10As with CoAP-over-TCP, while the transport distinguishes a connection initiator 

11("WebSocket (and TCP) client") and a receiver ("WebSocket (and TCP) server"), 

12both sides can take both roles in CoAP (ie. as a CoAP server and a CoAP 

13client). As the WebSocket client can not possibly be connected to (even by the 

14same server -- once the connection is closed, it's gone and even a new one 

15likely has a different port), aiocoap does not allow expressing their addresses 

16in URIs (given they wouldn't serve their purpose as URLs and don't provide any 

17stability either). Requests to a CoAP-over-WS client can be made by assigning 

18the remote to an outgoing request. 

19 

20Port choice 

21----------- 

22 

23Unlike the other transports, CoAP-over-WS is specified with a privileged port 

24(port 80) as the default port. This is impractical for aiocoap servers for two 

25reasons: 

26 

27 * Unless explicitly configured, aiocoap is typically run as an unprivileged 

28 user (and has no provisions in place to receive a socket by other means 

29 than opening it). 

30 

31 * Where a CoAP-over-WS proxy is run, there is often a "proper" website 

32 running on the same port on a full HTTP server. That server is usually 

33 capable of forwarding requests, whereas the ``websockets`` module used by 

34 aiocoap is in no position to either serve websites nor to proxy to an 

35 underlying server. 

36 

37The recommended setup is therefore to run a full web server at port 80, and 

38configure it to proxy incoming requests for WebSockets at `/.well-known/coap` 

39to aiocoap's server, which defaults to binding to port 8683. 

40 

41The port choice of outgoing connections, or the interpretation of the 

42protocol's default port (ie. the port implied by ``coap+ws://hostname/``) is of 

43course unaffected by this. 

44 

45.. warning:: 

46 

47 Due to a shortcoming of aiocoap's way of specifying ports to bind 

48 to, if a port is explicitly stated to bind to, CoAP-over-WS will bind to that 

49 port plus 3000 (resulting in the abovementioned 8683 for 5683). If TLS server 

50 keys are given, the TLS server is launched on the next port after the HTTP 

51 server (typically 8684). 

52 

53Using on pyodide_ 

54----------------- 

55 

56When use on pyodide_, 

57instead of using the ``websockets`` module, 

58a simplified client-only back-end is used, 

59which utilizes the browser's WebSocket API. 

60 

61.. _pyodide: https://pyodide.org/ 

62""" 

63 

64from __future__ import annotations 

65 

66from typing import Dict, List 

67from collections import namedtuple 

68import asyncio 

69import functools 

70import http 

71import weakref 

72 

73from aiocoap import Message, interfaces, ABORT, util, error 

74from aiocoap.transports import rfc8323common 

75from ..credentials import CredentialsMap 

76from ..defaults import is_pyodide 

77 

78if not is_pyodide: 

79 import websockets.asyncio.connection 

80 import websockets.asyncio.server 

81else: 

82 import aiocoap.util.pyodide_websockets as websockets # type: ignore[no-redef] 

83 

84 

85def _decode_message(data: bytes) -> Message: 

86 codeoffset = 1 

87 tokenoffset = 2 

88 

89 tkl = data[0] 

90 if tkl > 8: 

91 raise error.UnparsableMessage("Overly long token") 

92 code = data[codeoffset] 

93 token = data[tokenoffset : tokenoffset + tkl] 

94 

95 msg = Message(code=code, token=token) 

96 

97 msg.payload = msg.opt.decode(data[tokenoffset + tkl :]) 

98 

99 return msg 

100 

101 

102def _serialize(msg: Message) -> bytes: 

103 tkl = len(msg.token) 

104 if tkl > 8: 

105 raise ValueError("Overly long token") 

106 

107 data = [ 

108 bytes( 

109 ( 

110 tkl, 

111 msg.code, 

112 ) 

113 ), 

114 msg.token, 

115 msg.opt.encode(), 

116 ] 

117 if msg.payload: 

118 data += [b"\xff", msg.payload] 

119 

120 return b"".join(data) 

121 

122 

123PoolKey = namedtuple("PoolKey", ("scheme", "hostinfo")) 

124 

125 

126class WSRemote(rfc8323common.RFC8323Remote, interfaces.EndpointAddress): 

127 _connection: websockets.asyncio.connection.Connection 

128 # Only used to ensure that remotes are associated to the right pool -- not 

129 # that there'd be any good reason to have multiple of those. 

130 _pool: weakref.ReferenceType[WSPool] 

131 _poolkey: PoolKey 

132 

133 scheme = None # Override property -- it's per instance here 

134 

135 def __init__( 

136 self, 

137 pool, 

138 connection, 

139 loop, 

140 log, 

141 *, 

142 scheme, 

143 local_hostinfo=None, 

144 remote_hostinfo=None, 

145 ): 

146 super().__init__() 

147 self._pool = weakref.ref(pool) 

148 self._connection = connection 

149 self.loop = loop 

150 self.log = log 

151 

152 self._local_is_server = isinstance( 

153 connection, websockets.asyncio.server.ServerConnection 

154 ) 

155 

156 if local_hostinfo is None: 

157 self._local_hostinfo = self._connection.local_address[:2] 

158 else: 

159 self._local_hostinfo = local_hostinfo 

160 if remote_hostinfo is None: 

161 self._remote_hostinfo = self._connection.remote_address[:2] 

162 else: 

163 self._remote_hostinfo = remote_hostinfo 

164 

165 self.scheme = scheme 

166 

167 # Goes both for client and for server ends; on the server end, it 

168 # ensures that role reversal URIs can be used even when passed as URIs 

169 # and not as remotes (although that's of course only possible locally). 

170 self._poolkey = PoolKey(self.scheme, self.hostinfo) 

171 

172 # Necessary for RFC8323Remote 

173 

174 def _abort_with(self, msg, *, close_code=1002): 

175 # Like _send_message, this may take actual time -- but unlike there, 

176 # there's no need to regulate back-pressure 

177 self.loop.create_task( 

178 self._abort_with_waiting(msg, close_code=close_code), 

179 name="Abortion WebSocket sonnection with %r" % msg, 

180 ) 

181 

182 # Unlike _send_message, this is pulled out of the the _abort_with function 

183 # as it's also used in _run_recv_loop 

184 async def _abort_with_waiting(self, msg, *, close_code): 

185 self.log.debug("Aborting with message: %r", msg) 

186 try: 

187 await self._connection.send(_serialize(msg)) 

188 except Exception as e: 

189 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e) 

190 await self._connection.close(code=close_code) 

191 

192 def _send_message(self, msg): 

193 # FIXME overhaul back-pressure model 

194 async def send(): 

195 self.log.debug("Sending message: %r", msg) 

196 try: 

197 await self._connection.send(_serialize(msg)) 

198 except Exception as e: 

199 self.log.error( 

200 "Sending to a WebSocket should not raise errors", exc_info=e 

201 ) 

202 

203 self.loop.create_task( 

204 send(), 

205 name="WebSocket sending of %r" % msg, 

206 ) 

207 

208 async def release(self): 

209 await super().release() 

210 try: 

211 await self._connection.wait_closed() 

212 except asyncio.CancelledError: 

213 self.log.warning( 

214 "Connection %s was not closed by peer in time after release", self 

215 ) 

216 

217 

218class WSPool(interfaces.TokenInterface): 

219 _outgoing_starting: Dict[PoolKey, asyncio.Task] 

220 _pool: Dict[PoolKey, WSRemote] 

221 

222 _servers: List[websockets.asyncio.server.Server] 

223 

224 def __init__(self, tman, log, loop) -> None: 

225 self.loop = loop 

226 

227 self._pool = {} 

228 self._outgoing_starting = {} 

229 

230 self._servers = [] 

231 

232 # See where it is used for documentation, remove when not needed any more 

233 self._in_shutdown = False 

234 

235 self._tokenmanager = tman 

236 self.log = log 

237 

238 self._client_credentials: CredentialsMap 

239 

240 @classmethod 

241 async def create_transport( 

242 cls, 

243 tman: interfaces.TokenManager, 

244 log, 

245 loop, 

246 *, 

247 client_credentials, 

248 server_bind=None, 

249 server_context=None, 

250 ): 

251 self = cls(tman, log, loop) 

252 

253 self._client_credentials = client_credentials 

254 

255 if server_bind: 

256 host, port = server_bind 

257 if port is None: 

258 port = 8683 

259 elif port != 0: 

260 # FIXME see module documentation 

261 port = port + 3000 

262 

263 server = await websockets.asyncio.server.serve( 

264 functools.partial(self._new_connection, scheme="coap+ws"), 

265 host, 

266 port, 

267 # Ignoring type: Documentation says strings are OK 

268 subprotocols=["coap"], # type: ignore[list-item] 

269 process_request=self._process_request, 

270 ping_interval=None, # "SHOULD NOT be used" 

271 ) 

272 self._servers.append(server) 

273 

274 if server_context is not None: 

275 server = await websockets.asyncio.server.serve( 

276 functools.partial(self._new_connection, scheme="coaps+ws"), 

277 host, 

278 port + 1, 

279 # Ignoring type: Documentation says strings are OK 

280 subprotocols=["coap"], # type: ignore[list-item] 

281 process_request=self._process_request, 

282 ping_interval=None, # "SHOULD NOT be used" 

283 ssl=server_context, 

284 ) 

285 self._servers.append(server) 

286 

287 return self 

288 

289 # Helpers for WebScoket server 

290 

291 async def _new_connection(self, websocket, path=None, *, scheme): 

292 # ignoring path: Already checked in _process_request 

293 # 

294 # (path is present up to 10.0 and absent in 10.1; keeping it around to 

295 # stay compatible with different versions). 

296 

297 hostheader = websocket.request.headers["Host"] 

298 local_hostinfo = util.hostportsplit(hostheader) 

299 

300 remote = WSRemote( 

301 self, 

302 websocket, 

303 self.loop, 

304 self.log, 

305 scheme=scheme, 

306 local_hostinfo=local_hostinfo, 

307 ) 

308 self._pool[remote._poolkey] = remote 

309 

310 await self._run_recv_loop(remote) 

311 

312 @staticmethod 

313 async def _process_request(connection, request): 

314 path = connection.request.path 

315 if path != "/.well-known/coap": 

316 return (http.HTTPStatus.NOT_FOUND, [], b"") 

317 # Continue with WebSockets 

318 return None 

319 

320 # Helpers for WebScoket client 

321 

322 def _connect(self, key: PoolKey): 

323 self._outgoing_starting[key] = self.loop.create_task( 

324 self._connect_task(key), 

325 name="WebSocket connection opening to %r" % (key,), 

326 ) 

327 

328 async def _connect_task(self, key: PoolKey): 

329 try: 

330 ssl_context = self._client_credentials.ssl_client_context( 

331 key.scheme, key.hostinfo 

332 ) 

333 

334 hostinfo_split = util.hostportsplit(key.hostinfo) 

335 

336 ws_scheme = {"coap+ws": "ws", "coaps+ws": "wss"}[key.scheme] 

337 

338 if ws_scheme == "ws" and ssl_context is not None: 

339 raise ValueError( 

340 "An SSL context was provided for a remote accessed via a plaintext websockets." 

341 ) 

342 if ws_scheme == "wss": 

343 if is_pyodide: 

344 if ssl_context is not None: 

345 raise ValueError( 

346 "The pyodide websocket implementation can't be configured with a non-default context -- connections created in a browser will always use the browser's CA set." 

347 ) 

348 else: 

349 if ssl_context is None: 

350 # Like with TLSClient, we need to pass in a default 

351 # context and can't rely on the websocket library to 

352 # use a default one when wss is requested (it doesn't) 

353 import ssl 

354 

355 ssl_context = ssl.create_default_context() 

356 

357 websocket = await websockets.connect( 

358 "%s://%s/.well-known/coap" % (ws_scheme, key.hostinfo), 

359 # Ignoring type: Documentation says strings are OK 

360 subprotocols=["coap"], # type: ignore[list-item] 

361 ping_interval=None, 

362 ssl=ssl_context, 

363 ) 

364 

365 remote = WSRemote( 

366 self, 

367 websocket, 

368 self.loop, 

369 self.log, 

370 scheme=key.scheme, 

371 remote_hostinfo=hostinfo_split, 

372 ) 

373 assert remote._poolkey == key, "Pool key construction is inconsistent" 

374 self._pool[key] = remote 

375 

376 self.loop.create_task( 

377 self._run_recv_loop(remote), 

378 name="WebSocket receive loop for %r" % (key,), 

379 ) 

380 

381 return remote 

382 finally: 

383 del self._outgoing_starting[key] 

384 

385 # Implementation of TokenInterface 

386 

387 async def fill_or_recognize_remote(self, message): 

388 if isinstance(message.remote, WSRemote) and message.remote._pool() is self: 

389 return True 

390 

391 if message.requested_scheme in ("coap+ws", "coaps+ws"): 

392 key = PoolKey(message.requested_scheme, message.remote.hostinfo) 

393 

394 if key in self._pool: 

395 message.remote = self._pool[key] 

396 # It would have handled the ConnectionClosed on recv and 

397 # removed itself if it was not live. 

398 return True 

399 

400 if key not in self._outgoing_starting: 

401 self._connect(key) 

402 # It's a bit unorthodox to wait for an (at least partially) 

403 # established connection in fill_or_recognize_remote, but it's 

404 # not completely off off either, and it makes it way easier to 

405 # not have partially initialized remotes around 

406 message.remote = await self._outgoing_starting[key] 

407 return True 

408 

409 return False 

410 

411 def send_message(self, message, messageerror_monitor): 

412 # Ignoring messageerror_monitor: CoAP over reliable transports has no 

413 # way of indicating that a particular message was bad, it always shuts 

414 # down the complete connection 

415 

416 if message.code.is_response(): 

417 no_response = (message.opt.no_response or 0) & ( 

418 1 << message.code.class_ - 1 

419 ) != 0 

420 if no_response: 

421 return 

422 

423 message.opt.no_response = None 

424 

425 message.remote._send_message(message) 

426 

427 async def shutdown(self): 

428 self._in_shutdown = True 

429 self.log.debug("Shutting down any connections on %r", self) 

430 self._tokenmanager = None 

431 

432 client_shutdowns = [ 

433 asyncio.create_task( 

434 c.release(), 

435 name="Close connection %s" % c, 

436 ) 

437 for c in self._pool.values() 

438 ] 

439 

440 server_shutdowns = [] 

441 while self._servers: 

442 s = self._servers.pop() 

443 # We could do something like 

444 # >>> for websocket in s.websockets: 

445 # >>> del websocket.logger.extra['websocket'] 

446 # to reduce the reference loops 

447 # (websocket.logger.extra['websocket'] == websocket), but as the 

448 # tests actually do run a GC collection once and that gets broken 

449 # up, it's not worth adding fragilty here 

450 s.close() 

451 server_shutdowns.append( 

452 asyncio.create_task(s.wait_closed(), name="Close server %s" % s), 

453 ) 

454 

455 # Placing client shutdowns before server shutdowns to give them a 

456 # chance to send out Abort messages; the .close() method could be more 

457 # helpful here by stopping new connections but letting us finish off 

458 # the old ones 

459 shutdowns = client_shutdowns + server_shutdowns 

460 if shutdowns: 

461 # wait is documented to require a non-empty set 

462 await asyncio.wait(shutdowns) 

463 

464 # Incoming message processing 

465 

466 async def _run_recv_loop(self, remote): 

467 remote._send_initial_csm() 

468 

469 while True: 

470 try: 

471 received = await remote._connection.recv() 

472 except websockets.exceptions.ConnectionClosed: 

473 # This check is purely needed to silence the warning printed 

474 # from tokenmanager, "Internal shutdown sequence msismatch: 

475 # error dispatched through tokenmanager after shutdown" -- and 

476 # is a symptom of https://github.com/chrysn/aiocoap/issues/284 

477 # and of the odd circumstance that we can't easily cancel the 

478 # _run_recv_loop tasks (as we should while that issue is 

479 # unresolved) in the shutdown handler. 

480 if not self._in_shutdown: 

481 # FIXME if deposited somewhere, mark that as stale? 

482 self._tokenmanager.dispatch_error( 

483 error.RemoteServerShutdown("Peer closed connection"), remote 

484 ) 

485 return 

486 

487 if not isinstance(received, bytes): 

488 await remote._abort_with_waiting( 

489 Message(code=ABORT, payload=b"Text frame received"), close_code=1003 

490 ) 

491 return 

492 

493 try: 

494 msg = _decode_message(received) 

495 except error.UnparsableMessage: 

496 await remote._abort_with_waiting( 

497 Message(code=ABORT, payload=b"Message parsing error"), 

498 close_code=1007, 

499 ) 

500 return 

501 

502 msg.remote = remote 

503 

504 if msg.code.is_signalling(): 

505 try: 

506 remote._process_signaling(msg) 

507 except rfc8323common.CloseConnection as e: 

508 self._tokenmanager.dispatch_error(e.args[0], msg.remote) 

509 self._pool.pop(remote._poolkey) 

510 await remote._connection.close() 

511 continue 

512 

513 if remote._remote_settings is None: 

514 remote.abort("No CSM received") 

515 return 

516 

517 if msg.code.is_response(): 

518 self._tokenmanager.process_response(msg) 

519 # ignoring the return value; unexpected responses can be the 

520 # asynchronous result of cancelled observations 

521 else: 

522 self._tokenmanager.process_request(msg)