Coverage for aiocoap/transports/ws.py: 86%

209 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-30 11:17 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5""" 

6This moduel implements a TokenInterface for `CoAP over WebSockets`_. 

7 

8.. _`CoAP over WebSockets`: https://tools.ietf.org/html/rfc8323#section-4 

9 

10As with CoAP-over-TCP, while the transport distinguishes a connection initiator 

11("WebSocket (and TCP) client") and a receiver ("WebSocket (and TCP) server"), 

12both sides can take both roles in CoAP (ie. as a CoAP server and a CoAP 

13client). As the WebSocket client can not possibly be connected to (even by the 

14same server -- once the connection is closed, it's gone and even a new one 

15likely has a different port), aiocoap does not allow expressing their addresses 

16in URIs (given they wouldn't serve their purpose as URLs and don't provide any 

17stability either). Requests to a CoAP-over-WS client can be made by assigning 

18the remote to an outgoing request. 

19 

20Port choice 

21----------- 

22 

23Unlike the other transports, CoAP-over-WS is specified with a privileged port 

24(port 80) as the default port. This is impractical for aiocoap servers for two 

25reasons: 

26 

27 * Unless explicitly configured, aiocoap is typically run as an unprivileged 

28 user (and has no provisions in place to receive a socket by other means 

29 than opening it). 

30 

31 * Where a CoAP-over-WS proxy is run, there is often a "proper" website 

32 running on the same port on a full HTTP server. That server is usually 

33 capable of forwarding requests, whereas the ``websockets`` module used by 

34 aiocoap is in no position to either serve websites nor to proxy to an 

35 underlying server. 

36 

37The recommended setup is therefore to run a full web server at port 80, and 

38configure it to proxy incoming requests for WebSockets at `/.well-known/coap` 

39to aiocoap's server, which defaults to binding to port 8683. 

40 

41The port choice of outgoing connections, or the interpretation of the 

42protocol's default port (ie. the port implied by ``coap+ws://hostname/``) is of 

43course unaffected by this. 

44 

45.. warning:: 

46 

47 Due to a shortcoming of aiocoap's way of specifying ports to bind 

48 to, if a port is explicitly stated to bind to, CoAP-over-WS will bind to that 

49 port plus 3000 (resulting in the abovementioned 8683 for 5683). If TLS server 

50 keys are given, the TLS server is launched on the next port after the HTTP 

51 server (typically 8684). 

52 

53Using on pyodide_ 

54----------------- 

55 

56When use on pyodide_, 

57instead of using the ``websockets`` module, 

58a simplified client-only back-end is used, 

59which utilizes the browser's WebSocket API. 

60 

61.. _pyodide: https://pyodide.org/ 

62""" 

63 

64from __future__ import annotations 

65 

66from typing import Dict, List 

67from collections import namedtuple 

68import asyncio 

69import functools 

70import http 

71import weakref 

72 

73from aiocoap import Message, interfaces, ABORT, util, error 

74from aiocoap.transports import rfc8323common 

75from ..credentials import CredentialsMap 

76from ..defaults import is_pyodide 

77from ..message import Direction 

78 

79if not is_pyodide: 

80 import websockets.asyncio.connection 

81 import websockets.asyncio.server 

82else: 

83 import aiocoap.util.pyodide_websockets as websockets # type: ignore[no-redef] 

84 

85 

86def _decode_message(data: bytes) -> Message: 

87 codeoffset = 1 

88 tokenoffset = 2 

89 

90 tkl = data[0] 

91 if tkl > 8: 

92 raise error.UnparsableMessage("Overly long token") 

93 code = data[codeoffset] 

94 token = data[tokenoffset : tokenoffset + tkl] 

95 

96 msg = Message(code=code, _token=token) 

97 

98 msg.payload = msg.opt.decode(data[tokenoffset + tkl :]) 

99 msg.direction = Direction.INCOMING 

100 

101 return msg 

102 

103 

104def _serialize(msg: Message) -> bytes: 

105 tkl = len(msg.token) 

106 if tkl > 8: 

107 raise ValueError("Overly long token") 

108 

109 data = [ 

110 bytes( 

111 ( 

112 tkl, 

113 msg.code, 

114 ) 

115 ), 

116 msg.token, 

117 msg.opt.encode(), 

118 ] 

119 if msg.payload: 

120 data += [b"\xff", msg.payload] 

121 

122 return b"".join(data) 

123 

124 

125PoolKey = namedtuple("PoolKey", ("scheme", "hostinfo")) 

126 

127 

128class WSRemote(rfc8323common.RFC8323Remote, interfaces.EndpointAddress): 

129 _connection: websockets.asyncio.connection.Connection 

130 # Only used to ensure that remotes are associated to the right pool -- not 

131 # that there'd be any good reason to have multiple of those. 

132 _pool: weakref.ReferenceType[WSPool] 

133 _poolkey: PoolKey 

134 

135 scheme = None # Override property -- it's per instance here 

136 

137 def __init__( 

138 self, 

139 pool, 

140 connection, 

141 loop, 

142 log, 

143 *, 

144 scheme, 

145 local_hostinfo=None, 

146 remote_hostinfo=None, 

147 ): 

148 super().__init__() 

149 self._pool = weakref.ref(pool) 

150 self._connection = connection 

151 self.loop = loop 

152 self.log = log 

153 

154 self._local_is_server = isinstance( 

155 connection, websockets.asyncio.server.ServerConnection 

156 ) 

157 

158 if local_hostinfo is None: 

159 self._local_hostinfo = self._connection.local_address[:2] 

160 else: 

161 self._local_hostinfo = local_hostinfo 

162 if remote_hostinfo is None: 

163 self._remote_hostinfo = self._connection.remote_address[:2] 

164 else: 

165 self._remote_hostinfo = remote_hostinfo 

166 

167 self.scheme = scheme 

168 

169 # Goes both for client and for server ends; on the server end, it 

170 # ensures that role reversal URIs can be used even when passed as URIs 

171 # and not as remotes (although that's of course only possible locally). 

172 self._poolkey = PoolKey(self.scheme, self.hostinfo) 

173 

174 # Necessary for RFC8323Remote 

175 

176 def _abort_with(self, msg, *, close_code=1002): 

177 # Like _send_message, this may take actual time -- but unlike there, 

178 # there's no need to regulate back-pressure 

179 self.loop.create_task( 

180 self._abort_with_waiting(msg, close_code=close_code), 

181 name="Abortion WebSocket sonnection with %r" % msg, 

182 ) 

183 

184 # Unlike _send_message, this is pulled out of the the _abort_with function 

185 # as it's also used in _run_recv_loop 

186 async def _abort_with_waiting(self, msg, *, close_code): 

187 self.log.debug("Aborting with message: %r", msg) 

188 try: 

189 await self._connection.send(_serialize(msg)) 

190 except Exception as e: 

191 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e) 

192 await self._connection.close(code=close_code) 

193 

194 def _send_message(self, msg): 

195 # FIXME overhaul back-pressure model 

196 async def send(): 

197 self.log.debug("Sending message: %r", msg) 

198 try: 

199 await self._connection.send(_serialize(msg)) 

200 except Exception as e: 

201 self.log.error( 

202 "Sending to a WebSocket should not raise errors", exc_info=e 

203 ) 

204 

205 self.loop.create_task( 

206 send(), 

207 name="WebSocket sending of %r" % msg, 

208 ) 

209 

210 async def release(self): 

211 await super().release() 

212 try: 

213 await self._connection.wait_closed() 

214 except asyncio.CancelledError: 

215 self.log.warning( 

216 "Connection %s was not closed by peer in time after release", self 

217 ) 

218 

219 

220class WSPool(interfaces.TokenInterface): 

221 _outgoing_starting: Dict[PoolKey, asyncio.Task] 

222 _pool: Dict[PoolKey, WSRemote] 

223 

224 _servers: List[websockets.asyncio.server.Server] 

225 

226 def __init__(self, tman, log, loop) -> None: 

227 self.loop = loop 

228 

229 self._pool = {} 

230 self._outgoing_starting = {} 

231 

232 self._servers = [] 

233 

234 # See where it is used for documentation, remove when not needed any more 

235 self._in_shutdown = False 

236 

237 self._tokenmanager = tman 

238 self.log = log 

239 

240 self._client_credentials: CredentialsMap 

241 

242 @classmethod 

243 async def create_transport( 

244 cls, 

245 tman: interfaces.TokenManager, 

246 log, 

247 loop, 

248 *, 

249 client_credentials, 

250 server_bind=None, 

251 server_context=None, 

252 ): 

253 self = cls(tman, log, loop) 

254 

255 self._client_credentials = client_credentials 

256 

257 if server_bind: 

258 host, port = server_bind 

259 if port is None: 

260 port = 8683 

261 elif port != 0: 

262 # FIXME see module documentation 

263 port = port + 3000 

264 

265 server = await websockets.asyncio.server.serve( 

266 functools.partial(self._new_connection, scheme="coap+ws"), 

267 host, 

268 port, 

269 # Ignoring type: Documentation says strings are OK 

270 subprotocols=["coap"], # type: ignore[list-item] 

271 process_request=self._process_request, 

272 ping_interval=None, # "SHOULD NOT be used" 

273 ) 

274 self._servers.append(server) 

275 

276 if server_context is not None: 

277 server = await websockets.asyncio.server.serve( 

278 functools.partial(self._new_connection, scheme="coaps+ws"), 

279 host, 

280 port + 1, 

281 # Ignoring type: Documentation says strings are OK 

282 subprotocols=["coap"], # type: ignore[list-item] 

283 process_request=self._process_request, 

284 ping_interval=None, # "SHOULD NOT be used" 

285 ssl=server_context, 

286 ) 

287 self._servers.append(server) 

288 

289 return self 

290 

291 # Helpers for WebScoket server 

292 

293 async def _new_connection(self, websocket, path=None, *, scheme): 

294 # ignoring path: Already checked in _process_request 

295 # 

296 # (path is present up to 10.0 and absent in 10.1; keeping it around to 

297 # stay compatible with different versions). 

298 

299 hostheader = websocket.request.headers["Host"] 

300 local_hostinfo = util.hostportsplit(hostheader) 

301 

302 remote = WSRemote( 

303 self, 

304 websocket, 

305 self.loop, 

306 self.log, 

307 scheme=scheme, 

308 local_hostinfo=local_hostinfo, 

309 ) 

310 self._pool[remote._poolkey] = remote 

311 

312 await self._run_recv_loop(remote) 

313 

314 @staticmethod 

315 async def _process_request(connection, request): 

316 path = connection.request.path 

317 if path != "/.well-known/coap": 

318 return (http.HTTPStatus.NOT_FOUND, [], b"") 

319 # Continue with WebSockets 

320 return None 

321 

322 # Helpers for WebScoket client 

323 

324 def _connect(self, key: PoolKey): 

325 self._outgoing_starting[key] = self.loop.create_task( 

326 self._connect_task(key), 

327 name="WebSocket connection opening to %r" % (key,), 

328 ) 

329 

330 async def _connect_task(self, key: PoolKey): 

331 try: 

332 ssl_context = self._client_credentials.ssl_client_context( 

333 key.scheme, key.hostinfo 

334 ) 

335 

336 hostinfo_split = util.hostportsplit(key.hostinfo) 

337 

338 ws_scheme = {"coap+ws": "ws", "coaps+ws": "wss"}[key.scheme] 

339 

340 if ws_scheme == "ws" and ssl_context is not None: 

341 raise ValueError( 

342 "An SSL context was provided for a remote accessed via a plaintext websockets." 

343 ) 

344 if ws_scheme == "wss": 

345 if is_pyodide: 

346 if ssl_context is not None: 

347 raise ValueError( 

348 "The pyodide websocket implementation can't be configured with a non-default context -- connections created in a browser will always use the browser's CA set." 

349 ) 

350 else: 

351 if ssl_context is None: 

352 # Like with TLSClient, we need to pass in a default 

353 # context and can't rely on the websocket library to 

354 # use a default one when wss is requested (it doesn't) 

355 import ssl 

356 

357 ssl_context = ssl.create_default_context() 

358 

359 websocket = await websockets.connect( 

360 "%s://%s/.well-known/coap" % (ws_scheme, key.hostinfo), 

361 # Ignoring type: Documentation says strings are OK 

362 subprotocols=["coap"], # type: ignore[list-item] 

363 ping_interval=None, 

364 ssl=ssl_context, 

365 ) 

366 

367 remote = WSRemote( 

368 self, 

369 websocket, 

370 self.loop, 

371 self.log, 

372 scheme=key.scheme, 

373 remote_hostinfo=hostinfo_split, 

374 ) 

375 assert remote._poolkey == key, "Pool key construction is inconsistent" 

376 self._pool[key] = remote 

377 

378 self.loop.create_task( 

379 self._run_recv_loop(remote), 

380 name="WebSocket receive loop for %r" % (key,), 

381 ) 

382 

383 return remote 

384 finally: 

385 del self._outgoing_starting[key] 

386 

387 # Implementation of TokenInterface 

388 

389 async def fill_or_recognize_remote(self, message): 

390 if isinstance(message.remote, WSRemote) and message.remote._pool() is self: 

391 return True 

392 

393 if message.requested_scheme in ("coap+ws", "coaps+ws"): 

394 key = PoolKey(message.requested_scheme, message.remote.hostinfo) 

395 

396 if key in self._pool: 

397 message.remote = self._pool[key] 

398 # It would have handled the ConnectionClosed on recv and 

399 # removed itself if it was not live. 

400 return True 

401 

402 if key not in self._outgoing_starting: 

403 self._connect(key) 

404 # It's a bit unorthodox to wait for an (at least partially) 

405 # established connection in fill_or_recognize_remote, but it's 

406 # not completely off off either, and it makes it way easier to 

407 # not have partially initialized remotes around 

408 message.remote = await self._outgoing_starting[key] 

409 return True 

410 

411 return False 

412 

413 def send_message(self, message, messageerror_monitor): 

414 # Ignoring messageerror_monitor: CoAP over reliable transports has no 

415 # way of indicating that a particular message was bad, it always shuts 

416 # down the complete connection 

417 

418 if message.code.is_response(): 

419 no_response = (message.opt.no_response or 0) & ( 

420 1 << message.code.class_ - 1 

421 ) != 0 

422 if no_response: 

423 return 

424 

425 message.opt.no_response = None 

426 

427 message.remote._send_message(message) 

428 

429 async def shutdown(self): 

430 self._in_shutdown = True 

431 self.log.debug("Shutting down any connections on %r", self) 

432 self._tokenmanager = None 

433 

434 client_shutdowns = [ 

435 asyncio.create_task( 

436 c.release(), 

437 name="Close connection %s" % c, 

438 ) 

439 for c in self._pool.values() 

440 ] 

441 

442 server_shutdowns = [] 

443 while self._servers: 

444 s = self._servers.pop() 

445 # We could do something like 

446 # >>> for websocket in s.websockets: 

447 # >>> del websocket.logger.extra['websocket'] 

448 # to reduce the reference loops 

449 # (websocket.logger.extra['websocket'] == websocket), but as the 

450 # tests actually do run a GC collection once and that gets broken 

451 # up, it's not worth adding fragilty here 

452 s.close() 

453 server_shutdowns.append( 

454 asyncio.create_task(s.wait_closed(), name="Close server %s" % s), 

455 ) 

456 

457 # Placing client shutdowns before server shutdowns to give them a 

458 # chance to send out Abort messages; the .close() method could be more 

459 # helpful here by stopping new connections but letting us finish off 

460 # the old ones 

461 shutdowns = client_shutdowns + server_shutdowns 

462 if shutdowns: 

463 # wait is documented to require a non-empty set 

464 await asyncio.wait(shutdowns) 

465 

466 # Incoming message processing 

467 

468 async def _run_recv_loop(self, remote): 

469 remote._send_initial_csm() 

470 

471 while True: 

472 try: 

473 received = await remote._connection.recv() 

474 except websockets.exceptions.ConnectionClosed: 

475 # This check is purely needed to silence the warning printed 

476 # from tokenmanager, "Internal shutdown sequence msismatch: 

477 # error dispatched through tokenmanager after shutdown" -- and 

478 # is a symptom of https://github.com/chrysn/aiocoap/issues/284 

479 # and of the odd circumstance that we can't easily cancel the 

480 # _run_recv_loop tasks (as we should while that issue is 

481 # unresolved) in the shutdown handler. 

482 if not self._in_shutdown: 

483 # FIXME if deposited somewhere, mark that as stale? 

484 self._tokenmanager.dispatch_error( 

485 error.RemoteServerShutdown("Peer closed connection"), remote 

486 ) 

487 return 

488 

489 if not isinstance(received, bytes): 

490 await remote._abort_with_waiting( 

491 Message(code=ABORT, payload=b"Text frame received"), close_code=1003 

492 ) 

493 return 

494 

495 try: 

496 msg = _decode_message(received) 

497 except error.UnparsableMessage: 

498 await remote._abort_with_waiting( 

499 Message(code=ABORT, payload=b"Message parsing error"), 

500 close_code=1007, 

501 ) 

502 return 

503 

504 msg.remote = remote 

505 

506 if msg.code.is_signalling(): 

507 try: 

508 remote._process_signaling(msg) 

509 except rfc8323common.CloseConnection as e: 

510 self._tokenmanager.dispatch_error(e.args[0], msg.remote) 

511 self._pool.pop(remote._poolkey) 

512 await remote._connection.close() 

513 continue 

514 

515 if remote._remote_settings is None: 

516 remote.abort("No CSM received") 

517 return 

518 

519 if msg.code.is_response(): 

520 self._tokenmanager.process_response(msg) 

521 # ignoring the return value; unexpected responses can be the 

522 # asynchronous result of cancelled observations 

523 else: 

524 self._tokenmanager.process_request(msg)