Coverage for src/aiocoap/transports/ws.py: 0%

206 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-20 17:26 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5""" 

6This moduel implements a TokenInterface for `CoAP over WebSockets`_. 

7 

8.. _`CoAP over WebSockets`: https://tools.ietf.org/html/rfc8323#section-4 

9 

10As with CoAP-over-TCP, while the transport distinguishes a connection initiator 

11("WebSocket (and TCP) client") and a receiver ("WebSocket (and TCP) server"), 

12both sides can take both roles in CoAP (ie. as a CoAP server and a CoAP 

13client). As the WebSocket client can not possibly be connected to (even by the 

14same server -- once the connection is closed, it's gone and even a new one 

15likely has a different port), aiocoap does not allow expressing their addresses 

16in URIs (given they wouldn't serve their purpose as URLs and don't provide any 

17stability either). Requests to a CoAP-over-WS client can be made by assigning 

18the remote to an outgoing request. 

19 

20Port choice 

21----------- 

22 

23Unlike the other transports, CoAP-over-WS is specified with a privileged port 

24(port 80) as the default port. This is impractical for aiocoap servers for two 

25reasons: 

26 

27 * Unless explicitly configured, aiocoap is typically run as an unprivileged 

28 user (and has no provisions in place to receive a socket by other means 

29 than opening it). 

30 

31 * Where a CoAP-over-WS proxy is run, there is often a "proper" website 

32 running on the same port on a full HTTP server. That server is usually 

33 capable of forwarding requests, whereas the ``websockets`` module used by 

34 aiocoap is in no position to either serve websites nor to proxy to an 

35 underlying server. 

36 

37The recommended setup is therefore to run a full web server at port 80, and 

38configure it to proxy incoming requests for WebSockets at `/.well-known/coap` 

39to aiocoap's server, which defaults to binding to port 8683. 

40 

41The port choice of outgoing connections, or the interpretation of the 

42protocol's default port (ie. the port implied by ``coap+ws://hostname/``) is of 

43course unaffected by this. 

44 

45.. warning:: 

46 

47 Due to a shortcoming of aiocoap's way of specifying ports to bind 

48 to, if a port is explicitly stated to bind to, CoAP-over-WS will bind to that 

49 port plus 3000 (resulting in the abovementioned 8683 for 5683). If TLS server 

50 keys are given, the TLS server is launched on the next port after the HTTP 

51 server (typically 8684). 

52 

53Using on pyodide_ 

54----------------- 

55 

56When use on pyodide_, 

57instead of using the ``websockets`` module, 

58a simplified client-only back-end is used, 

59which utilizes the browser's WebSocket API. 

60 

61.. _pyodide: https://pyodide.org/ 

62""" 

63 

64from __future__ import annotations 

65 

66from typing import Dict, List 

67from collections import namedtuple 

68import asyncio 

69import functools 

70import http 

71import weakref 

72 

73from aiocoap import Message, interfaces, ABORT, util, error 

74from aiocoap.transports import rfc8323common 

75from ..credentials import CredentialsMap 

76from ..defaults import is_pyodide 

77 

78if not is_pyodide: 

79 import websockets.asyncio.connection 

80 import websockets.asyncio.server 

81else: 

82 import aiocoap.util.pyodide_websockets as websockets # type: ignore[no-redef] 

83 

84 

85def _decode_message(data: bytes) -> Message: 

86 codeoffset = 1 

87 tokenoffset = 2 

88 

89 tkl = data[0] 

90 if tkl > 8: 

91 raise error.UnparsableMessage("Overly long token") 

92 code = data[codeoffset] 

93 token = data[tokenoffset : tokenoffset + tkl] 

94 

95 msg = Message(code=code, token=token) 

96 

97 msg.payload = msg.opt.decode(data[tokenoffset + tkl :]) 

98 

99 return msg 

100 

101 

102def _serialize(msg: Message) -> bytes: 

103 tkl = len(msg.token) 

104 if tkl > 8: 

105 raise ValueError("Overly long token") 

106 

107 data = [ 

108 bytes( 

109 ( 

110 tkl, 

111 msg.code, 

112 ) 

113 ), 

114 msg.token, 

115 msg.opt.encode(), 

116 ] 

117 if msg.payload: 

118 data += [b"\xff", msg.payload] 

119 

120 return b"".join(data) 

121 

122 

123PoolKey = namedtuple("PoolKey", ("scheme", "hostinfo")) 

124 

125 

126class WSRemote(rfc8323common.RFC8323Remote, interfaces.EndpointAddress): 

127 _connection: websockets.asyncio.connection.Connection 

128 # Only used to ensure that remotes are associated to the right pool -- not 

129 # that there'd be any good reason to have multiple of those. 

130 _pool: weakref.ReferenceType[WSPool] 

131 _poolkey: PoolKey 

132 

133 scheme = None # Override property -- it's per instance here 

134 

135 def __init__( 

136 self, 

137 pool, 

138 connection, 

139 loop, 

140 log, 

141 *, 

142 scheme, 

143 local_hostinfo=None, 

144 remote_hostinfo=None, 

145 ): 

146 super().__init__() 

147 self._pool = weakref.ref(pool) 

148 self._connection = connection 

149 self.loop = loop 

150 self.log = log 

151 

152 self._local_is_server = isinstance( 

153 connection, websockets.asyncio.server.ServerConnection 

154 ) 

155 

156 if local_hostinfo is None: 

157 self._local_hostinfo = self._connection.local_address[:2] 

158 else: 

159 self._local_hostinfo = local_hostinfo 

160 if remote_hostinfo is None: 

161 self._remote_hostinfo = self._connection.remote_address[:2] 

162 else: 

163 self._remote_hostinfo = remote_hostinfo 

164 

165 self.scheme = scheme 

166 

167 # Goes both for client and for server ends; on the server end, it 

168 # ensures that role reversal URIs can be used even when passed as URIs 

169 # and not as remotes (although that's of course only possible locally). 

170 self._poolkey = PoolKey(self.scheme, self.hostinfo) 

171 

172 # Necessary for RFC8323Remote 

173 

174 def _abort_with(self, msg, *, close_code=1002): 

175 # Like _send_message, this may take actual time -- but unlike there, 

176 # there's no need to regulate back-pressure 

177 self.loop.create_task( 

178 self._abort_with_waiting(msg, close_code=close_code), 

179 name="Abortion WebSocket sonnection with %r" % msg, 

180 ) 

181 

182 # Unlike _send_message, this is pulled out of the the _abort_with function 

183 # as it's also used in _run_recv_loop 

184 async def _abort_with_waiting(self, msg, *, close_code): 

185 self.log.debug("Aborting with message: %r", msg) 

186 try: 

187 await self._connection.send(_serialize(msg)) 

188 except Exception as e: 

189 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e) 

190 await self._connection.close(code=close_code) 

191 

192 def _send_message(self, msg): 

193 # FIXME overhaul back-pressure model 

194 async def send(): 

195 self.log.debug("Sending message: %r", msg) 

196 try: 

197 await self._connection.send(_serialize(msg)) 

198 except Exception as e: 

199 self.log.error( 

200 "Sending to a WebSocket should not raise errors", exc_info=e 

201 ) 

202 

203 self.loop.create_task( 

204 send(), 

205 name="WebSocket sending of %r" % msg, 

206 ) 

207 

208 async def release(self): 

209 await super().release() 

210 try: 

211 await self._connection.wait_closed() 

212 except asyncio.CancelledError: 

213 self.log.warning( 

214 "Connection %s was not closed by peer in time after release", self 

215 ) 

216 

217 

218class WSPool(interfaces.TokenInterface): 

219 _outgoing_starting: Dict[PoolKey, asyncio.Task] 

220 _pool: Dict[PoolKey, WSRemote] 

221 

222 _servers: List[websockets.asyncio.server.Server] 

223 

224 def __init__(self, tman, log, loop) -> None: 

225 self.loop = loop 

226 

227 self._pool = {} 

228 self._outgoing_starting = {} 

229 

230 self._servers = [] 

231 

232 # See where it is used for documentation, remove when not needed any more 

233 self._in_shutdown = False 

234 

235 self._tokenmanager = tman 

236 self.log = log 

237 

238 self._client_credentials: CredentialsMap 

239 

240 @classmethod 

241 async def create_transport( 

242 cls, 

243 tman: interfaces.TokenManager, 

244 log, 

245 loop, 

246 *, 

247 client_credentials, 

248 server_bind=None, 

249 server_context=None, 

250 ): 

251 self = cls(tman, log, loop) 

252 

253 self._client_credentials = client_credentials 

254 

255 if server_bind: 

256 host, port = server_bind 

257 if port is None: 

258 port = 8683 

259 elif port != 0: 

260 # FIXME see module documentation 

261 port = port + 3000 

262 

263 server = await websockets.asyncio.server.serve( 

264 functools.partial(self._new_connection, scheme="coap+ws"), 

265 host, 

266 port, 

267 # Ignoring type: Documentation says strings are OK 

268 subprotocols=["coap"], # type: ignore[list-item] 

269 process_request=self._process_request, 

270 ping_interval=None, # "SHOULD NOT be used" 

271 ) 

272 self._servers.append(server) 

273 

274 if server_context is not None: 

275 server = await websockets.asyncio.server.serve( 

276 functools.partial(self._new_connection, scheme="coaps+ws"), 

277 host, 

278 port + 1, 

279 # Ignoring type: Documentation says strings are OK 

280 subprotocols=["coap"], # type: ignore[list-item] 

281 process_request=self._process_request, 

282 ping_interval=None, # "SHOULD NOT be used" 

283 ssl=server_context, 

284 ) 

285 self._servers.append(server) 

286 

287 return self 

288 

289 # Helpers for WebScoket server 

290 

291 async def _new_connection(self, websocket, path=None, *, scheme): 

292 # ignoring path: Already checked in _process_request 

293 # 

294 # (path is present up to 10.0 and absent in 10.1; keeping it around to 

295 # stay compatible with different versions). 

296 

297 hostheader = websocket.request.headers["Host"] 

298 local_hostinfo = util.hostportsplit(hostheader) 

299 

300 remote = WSRemote( 

301 self, 

302 websocket, 

303 self.loop, 

304 self.log, 

305 scheme=scheme, 

306 local_hostinfo=local_hostinfo, 

307 ) 

308 self._pool[remote._poolkey] = remote 

309 

310 await self._run_recv_loop(remote) 

311 

312 @staticmethod 

313 async def _process_request(connection, request): 

314 path = connection.request.path 

315 if path != "/.well-known/coap": 

316 return (http.HTTPStatus.NOT_FOUND, [], b"") 

317 # Continue with WebSockets 

318 return None 

319 

320 # Helpers for WebScoket client 

321 

322 def _connect(self, key: PoolKey): 

323 self._outgoing_starting[key] = self.loop.create_task( 

324 self._connect_task(key), 

325 name="WebSocket connection opening to %r" % (key,), 

326 ) 

327 

328 async def _connect_task(self, key: PoolKey): 

329 try: 

330 ssl_context = self._client_credentials.ssl_client_context( 

331 key.scheme, key.hostinfo 

332 ) 

333 

334 hostinfo_split = util.hostportsplit(key.hostinfo) 

335 

336 ws_scheme = {"coap+ws": "ws", "coaps+ws": "wss"}[key.scheme] 

337 

338 if ws_scheme == "ws" and ssl_context is not None: 

339 raise ValueError( 

340 "An SSL context was provided for a remote accessed via a plaintext websockets." 

341 ) 

342 if ws_scheme == "wss": 

343 if is_pyodide: 

344 if ssl_context is not None: 

345 raise ValueError( 

346 "The pyodide websocket implementation can't be configured with a non-default context -- connections created in a browser will always use the browser's CA set." 

347 ) 

348 else: 

349 if ssl_context is None: 

350 # Like with TLSClient, we need to pass in a default 

351 # context and can't rely on the websocket library to 

352 # use a default one when wss is requested (it doesn't) 

353 import ssl 

354 

355 ssl_context = ssl.create_default_context() 

356 

357 websocket = await websockets.connect( 

358 "%s://%s/.well-known/coap" % (ws_scheme, key.hostinfo), 

359 # Ignoring type: Documentation says strings are OK 

360 subprotocols=["coap"], # type: ignore[list-item] 

361 ping_interval=None, 

362 ssl=ssl_context, 

363 ) 

364 

365 remote = WSRemote( 

366 self, 

367 websocket, 

368 self.loop, 

369 self.log, 

370 scheme=key.scheme, 

371 remote_hostinfo=hostinfo_split, 

372 ) 

373 assert remote._poolkey == key, "Pool key construction is inconsistent" 

374 self._pool[key] = remote 

375 

376 self.loop.create_task( 

377 self._run_recv_loop(remote), 

378 name="WebSocket receive loop for %r" % (key,), 

379 ) 

380 

381 return remote 

382 finally: 

383 del self._outgoing_starting[key] 

384 

385 # Implementation of TokenInterface 

386 

387 async def fill_or_recognize_remote(self, message): 

388 if isinstance(message.remote, WSRemote) and message.remote._pool() is self: 

389 return True 

390 

391 if message.requested_scheme in ("coap+ws", "coaps+ws"): 

392 key = PoolKey(message.requested_scheme, message.remote.hostinfo) 

393 

394 if key in self._pool: 

395 message.remote = self._pool[key] 

396 # It would have handled the ConnectionClosed on recv and 

397 # removed itself if it was not live. 

398 return True 

399 

400 if key not in self._outgoing_starting: 

401 self._connect(key) 

402 # It's a bit unorthodox to wait for an (at least partially) 

403 # established connection in fill_or_recognize_remote, but it's 

404 # not completely off off either, and it makes it way easier to 

405 # not have partially initialized remotes around 

406 message.remote = await self._outgoing_starting[key] 

407 return True 

408 

409 return False 

410 

411 def send_message(self, message, messageerror_monitor): 

412 # Ignoring messageerror_monitor: CoAP over reliable transports has no 

413 # way of indicating that a particular message was bad, it always shuts 

414 # down the complete connection 

415 

416 if message.code.is_response(): 

417 no_response = (message.opt.no_response or 0) & ( 

418 1 << message.code.class_ - 1 

419 ) != 0 

420 if no_response: 

421 return 

422 

423 message.opt.no_response = None 

424 

425 message.remote._send_message(message) 

426 

427 async def shutdown(self): 

428 self._in_shutdown = True 

429 self.log.debug("Shutting down any connections on %r", self) 

430 

431 client_shutdowns = [ 

432 asyncio.create_task( 

433 c.release(), 

434 name="Close connection %s" % c, 

435 ) 

436 for c in self._pool.values() 

437 ] 

438 

439 server_shutdowns = [] 

440 while self._servers: 

441 s = self._servers.pop() 

442 # We could do something like 

443 # >>> for websocket in s.websockets: 

444 # >>> del websocket.logger.extra['websocket'] 

445 # to reduce the reference loops 

446 # (websocket.logger.extra['websocket'] == websocket), but as the 

447 # tests actually do run a GC collection once and that gets broken 

448 # up, it's not worth adding fragilty here 

449 s.close() 

450 server_shutdowns.append( 

451 asyncio.create_task(s.wait_closed(), name="Close server %s" % s), 

452 ) 

453 

454 # Placing client shutdowns before server shutdowns to give them a 

455 # chance to send out Abort messages; the .close() method could be more 

456 # helpful here by stopping new connections but letting us finish off 

457 # the old ones 

458 shutdowns = client_shutdowns + server_shutdowns 

459 if shutdowns: 

460 # wait is documented to require a non-empty set 

461 await asyncio.wait(shutdowns) 

462 

463 # Incoming message processing 

464 

465 async def _run_recv_loop(self, remote): 

466 remote._send_initial_csm() 

467 

468 while True: 

469 try: 

470 received = await remote._connection.recv() 

471 except websockets.exceptions.ConnectionClosed: 

472 # This check is purely needed to silence the warning printed 

473 # from tokenmanager, "Internal shutdown sequence msismatch: 

474 # error dispatched through tokenmanager after shutdown" -- and 

475 # is a symptom of https://github.com/chrysn/aiocoap/issues/284 

476 # and of the odd circumstance that we can't easily cancel the 

477 # _run_recv_loop tasks (as we should while that issue is 

478 # unresolved) in the shutdown handler. 

479 if not self._in_shutdown: 

480 # FIXME if deposited somewhere, mark that as stale? 

481 self._tokenmanager.dispatch_error( 

482 error.RemoteServerShutdown("Peer closed connection"), remote 

483 ) 

484 return 

485 

486 if not isinstance(received, bytes): 

487 await remote._abort_with_waiting( 

488 Message(code=ABORT, payload=b"Text frame received"), close_code=1003 

489 ) 

490 return 

491 

492 try: 

493 msg = _decode_message(received) 

494 except error.UnparsableMessage: 

495 await remote._abort_with_waiting( 

496 Message(code=ABORT, payload=b"Message parsing error"), 

497 close_code=1007, 

498 ) 

499 return 

500 

501 msg.remote = remote 

502 

503 if msg.code.is_signalling(): 

504 try: 

505 remote._process_signaling(msg) 

506 except rfc8323common.CloseConnection as e: 

507 self._tokenmanager.dispatch_error(e.args[0], msg.remote) 

508 self._pool.pop(remote._poolkey) 

509 await remote._connection.close() 

510 continue 

511 

512 if remote._remote_settings is None: 

513 remote.abort("No CSM received") 

514 return 

515 

516 if msg.code.is_response(): 

517 self._tokenmanager.process_response(msg) 

518 # ignoring the return value; unexpected responses can be the 

519 # asynchronous result of cancelled observations 

520 else: 

521 self._tokenmanager.process_request(msg)