Coverage for aiocoap/transports/ws.py: 85%

206 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-28 12:34 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5""" 

6This moduel implements a TokenInterface for `CoAP over WebSockets`_. 

7 

8.. _`CoAP over WebSockets`: https://tools.ietf.org/html/rfc8323#section-4 

9 

10As with CoAP-over-TCP, while the transport distinguishes a connection initiator 

11("WebSocket (and TCP) client") and a receiver ("WebSocket (and TCP) server"), 

12both sides can take both roles in CoAP (ie. as a CoAP server and a CoAP 

13client). As the WebSocket client can not possibly be connected to (even by the 

14same server -- once the connection is closed, it's gone and even a new one 

15likely has a different port), aiocoap does not allow expressing their addresses 

16in URIs (given they wouldn't serve their purpose as URLs and don't provide any 

17stability either). Requests to a CoAP-over-WS client can be made by assigning 

18the remote to an outgoing request. 

19 

20Port choice 

21----------- 

22 

23Unlike the other transports, CoAP-over-WS is specified with a privileged port 

24(port 80) as the default port. This is impractical for aiocoap servers for two 

25reasons: 

26 

27 * Unless explicitly configured, aiocoap is typically run as an unprivileged 

28 user (and has no provisions in place to receive a socket by other means 

29 than opening it). 

30 

31 * Where a CoAP-over-WS proxy is run, there is often a "proper" website 

32 running on the same port on a full HTTP server. That server is usually 

33 capable of forwarding requests, whereas the ``websockets`` module used by 

34 aiocoap is in no position to either serve websites nor to proxy to an 

35 underlying server. 

36 

37The recommended setup is therefore to run a full web server at port 80, and 

38configure it to proxy incoming requests for WebSockets at `/.well-known/coap` 

39to aiocoap's server, which defaults to binding to port 8683. 

40 

41The port choice of outgoing connections, or the interpretation of the 

42protocol's default port (ie. the port implied by ``coap+ws://hostname/``) is of 

43course unaffected by this. 

44 

45.. warning:: 

46 

47 Due to a shortcoming of aiocoap's way of specifying ports to bind 

48 to, if a port is explicitly stated to bind to, CoAP-over-WS will bind to that 

49 port plus 3000 (resulting in the abovementioned 8683 for 5683). If TLS server 

50 keys are given, the TLS server is launched on the next port after the HTTP 

51 server (typically 8684). 

52 

53Using on pyodide_ 

54----------------- 

55 

56When use on pyodide_, 

57instead of using the ``websockets`` module, 

58a simplified client-only back-end is used, 

59which utilizes the browser's WebSocket API. 

60 

61.. _pyodide: https://pyodide.org/ 

62""" 

63 

64from __future__ import annotations 

65 

66from typing import Dict, List 

67from collections import namedtuple 

68import asyncio 

69import functools 

70import http 

71import weakref 

72 

73from aiocoap import Message, interfaces, ABORT, util, error 

74from aiocoap.transports import rfc8323common 

75from ..credentials import CredentialsMap 

76from ..defaults import is_pyodide 

77 

78if is_pyodide: 

79 import aiocoap.util.pyodide_websockets as websockets 

80else: 

81 import websockets # type: ignore 

82 

83 

84def _decode_message(data: bytes) -> Message: 

85 codeoffset = 1 

86 tokenoffset = 2 

87 

88 tkl = data[0] 

89 if tkl > 8: 

90 raise error.UnparsableMessage("Overly long token") 

91 code = data[codeoffset] 

92 token = data[tokenoffset : tokenoffset + tkl] 

93 

94 msg = Message(code=code, token=token) 

95 

96 msg.payload = msg.opt.decode(data[tokenoffset + tkl :]) 

97 

98 return msg 

99 

100 

101def _serialize(msg: Message) -> bytes: 

102 tkl = len(msg.token) 

103 if tkl > 8: 

104 raise ValueError("Overly long token") 

105 

106 data = [ 

107 bytes( 

108 ( 

109 tkl, 

110 msg.code, 

111 ) 

112 ), 

113 msg.token, 

114 msg.opt.encode(), 

115 ] 

116 if msg.payload: 

117 data += [b"\xff", msg.payload] 

118 

119 return b"".join(data) 

120 

121 

122PoolKey = namedtuple("PoolKey", ("scheme", "hostinfo")) 

123 

124 

125class WSRemote(rfc8323common.RFC8323Remote, interfaces.EndpointAddress): 

126 _connection: websockets.WebSocketCommonProtocol 

127 # Only used to ensure that remotes are associated to the right pool -- not 

128 # that there'd be any good reason to have multiple of those. 

129 _pool: weakref.ReferenceType[WSPool] 

130 

131 scheme = None # Override property -- it's per instance here 

132 

133 def __init__( 

134 self, 

135 pool, 

136 connection, 

137 loop, 

138 log, 

139 *, 

140 scheme, 

141 local_hostinfo=None, 

142 remote_hostinfo=None, 

143 ): 

144 super().__init__() 

145 self._pool = weakref.ref(pool) 

146 self._connection = connection 

147 self.loop = loop 

148 self.log = log 

149 

150 self._local_is_server = isinstance( 

151 connection, websockets.WebSocketServerProtocol 

152 ) 

153 

154 if local_hostinfo is None: 

155 self._local_hostinfo = self._connection.local_address[:2] 

156 else: 

157 self._local_hostinfo = local_hostinfo 

158 if remote_hostinfo is None: 

159 self._remote_hostinfo = self._connection.remote_address[:2] 

160 else: 

161 self._remote_hostinfo = remote_hostinfo 

162 

163 self.scheme = scheme 

164 

165 # Goes both for client and for server ends; on the server end, it 

166 # ensures that role reversal URIs can be used even when passed as URIs 

167 # and not as remotes (although that's of course only possible locally). 

168 self._poolkey = PoolKey(self.scheme, self.hostinfo) 

169 

170 # Necessary for RFC8323Remote 

171 

172 def _abort_with(self, msg, *, close_code=1002): 

173 # Like _send_message, this may take actual time -- but unlike there, 

174 # there's no need to regulate back-pressure 

175 self.loop.create_task( 

176 self._abort_with_waiting(msg, close_code=close_code), 

177 name="Abortion WebSocket sonnection with %r" % msg, 

178 ) 

179 

180 # Unlike _send_message, this is pulled out of the the _abort_with function 

181 # as it's also used in _run_recv_loop 

182 async def _abort_with_waiting(self, msg, *, close_code): 

183 self.log.debug("Aborting with message: %r", msg) 

184 try: 

185 await self._connection.send(_serialize(msg)) 

186 except Exception as e: 

187 self.log.error("Sending to a WebSocket should not raise errors", exc_info=e) 

188 await self._connection.close(code=close_code) 

189 

190 def _send_message(self, msg): 

191 # FIXME overhaul back-pressure model 

192 async def send(): 

193 self.log.debug("Sending message: %r", msg) 

194 try: 

195 await self._connection.send(_serialize(msg)) 

196 except Exception as e: 

197 self.log.error( 

198 "Sending to a WebSocket should not raise errors", exc_info=e 

199 ) 

200 

201 self.loop.create_task( 

202 send(), 

203 name="WebSocket sending of %r" % msg, 

204 ) 

205 

206 async def release(self): 

207 await super().release() 

208 try: 

209 await self._connection.wait_closed() 

210 except asyncio.CancelledError: 

211 self.log.warning( 

212 "Connection %s was not closed by peer in time after release", self 

213 ) 

214 

215 

216class WSPool(interfaces.TokenInterface): 

217 _outgoing_starting: Dict[PoolKey, asyncio.Task] 

218 _pool: Dict[PoolKey, WSRemote] 

219 

220 _servers: List[websockets.WebSocketServer] 

221 

222 def __init__(self, tman, log, loop) -> None: 

223 self.loop = loop 

224 

225 self._pool = {} 

226 self._outgoing_starting = {} 

227 

228 self._servers = [] 

229 

230 # See where it is used for documentation, remove when not needed any more 

231 self._in_shutdown = False 

232 

233 self._tokenmanager = tman 

234 self.log = log 

235 

236 self._client_credentials: CredentialsMap 

237 

238 @classmethod 

239 async def create_transport( 

240 cls, 

241 tman: interfaces.TokenManager, 

242 log, 

243 loop, 

244 *, 

245 client_credentials, 

246 server_bind=None, 

247 server_context=None, 

248 ): 

249 self = cls(tman, log, loop) 

250 

251 self._client_credentials = client_credentials 

252 

253 if server_bind: 

254 host, port = server_bind 

255 if port is None: 

256 port = 8683 

257 elif port != 0: 

258 # FIXME see module documentation 

259 port = port + 3000 

260 

261 server = await websockets.serve( 

262 functools.partial(self._new_connection, scheme="coap+ws"), 

263 host, 

264 port, 

265 subprotocols=["coap"], 

266 process_request=self._process_request, 

267 ping_interval=None, # "SHOULD NOT be used" 

268 ) 

269 self._servers.append(server) 

270 

271 if server_context is not None: 

272 server = await websockets.serve( 

273 functools.partial(self._new_connection, scheme="coaps+ws"), 

274 host, 

275 port + 1, 

276 subprotocols=["coap"], 

277 process_request=self._process_request, 

278 ping_interval=None, # "SHOULD NOT be used" 

279 ssl=server_context, 

280 ) 

281 self._servers.append(server) 

282 

283 return self 

284 

285 # Helpers for WebScoket server 

286 

287 async def _new_connection(self, websocket, path=None, *, scheme): 

288 # ignoring path: Already checked in _process_request 

289 # 

290 # (path is present up to 10.0 and absent in 10.1; keeping it around to 

291 # stay compatible with different versions). 

292 

293 hostheader = websocket.request_headers["Host"] 

294 if hostheader.count(":") > 1 and "[" not in hostheader: 

295 # Workaround for websockets version before 

296 # https://github.com/aaugustin/websockets/issues/802 

297 # 

298 # To be removed once a websockets version with this fix can be 

299 # depended on 

300 hostheader = ( 

301 "[" 

302 + hostheader[: hostheader.rfind(":")] 

303 + "]" 

304 + hostheader[hostheader.rfind(":") :] 

305 ) 

306 local_hostinfo = util.hostportsplit(hostheader) 

307 

308 remote = WSRemote( 

309 self, 

310 websocket, 

311 self.loop, 

312 self.log, 

313 scheme=scheme, 

314 local_hostinfo=local_hostinfo, 

315 ) 

316 self._pool[remote._poolkey] = remote 

317 

318 await self._run_recv_loop(remote) 

319 

320 @staticmethod 

321 async def _process_request(path, request_headers): 

322 if path != "/.well-known/coap": 

323 return (http.HTTPStatus.NOT_FOUND, [], b"") 

324 # Continue with WebSockets 

325 return None 

326 

327 # Helpers for WebScoket client 

328 

329 def _connect(self, key: PoolKey): 

330 self._outgoing_starting[key] = self.loop.create_task( 

331 self._connect_task(key), 

332 name="WebSocket connection opening to %r" % (key,), 

333 ) 

334 

335 async def _connect_task(self, key: PoolKey): 

336 try: 

337 ssl_context = self._client_credentials.ssl_client_context( 

338 key.scheme, key.hostinfo 

339 ) 

340 

341 hostinfo_split = util.hostportsplit(key.hostinfo) 

342 

343 ws_scheme = {"coap+ws": "ws", "coaps+ws": "wss"}[key.scheme] 

344 

345 if ws_scheme == "ws" and ssl_context is not None: 

346 raise ValueError( 

347 "An SSL context was provided for a remote accessed via a plaintext websockets." 

348 ) 

349 if ws_scheme == "wss": 

350 if is_pyodide: 

351 if ssl_context is not None: 

352 raise ValueError( 

353 "The pyodide websocket implementation can't be configured with a non-default context -- connections created in a browser will always use the browser's CA set." 

354 ) 

355 else: 

356 if ssl_context is None: 

357 # Like with TLSClient, we need to pass in a default 

358 # context and can't rely on the websocket library to 

359 # use a default one when wss is requested (it doesn't) 

360 import ssl 

361 

362 ssl_context = ssl.create_default_context() 

363 

364 websocket = await websockets.connect( 

365 "%s://%s/.well-known/coap" % (ws_scheme, key.hostinfo), 

366 subprotocols=["coap"], 

367 ping_interval=None, 

368 ssl=ssl_context, 

369 ) 

370 

371 remote = WSRemote( 

372 self, 

373 websocket, 

374 self.loop, 

375 self.log, 

376 scheme=key.scheme, 

377 remote_hostinfo=hostinfo_split, 

378 ) 

379 assert remote._poolkey == key, "Pool key construction is inconsistent" 

380 self._pool[key] = remote 

381 

382 self.loop.create_task( 

383 self._run_recv_loop(remote), 

384 name="WebSocket receive loop for %r" % (key,), 

385 ) 

386 

387 return remote 

388 finally: 

389 del self._outgoing_starting[key] 

390 

391 # Implementation of TokenInterface 

392 

393 async def fill_or_recognize_remote(self, message): 

394 if isinstance(message.remote, WSRemote) and message.remote._pool() is self: 

395 return True 

396 

397 if message.requested_scheme in ("coap+ws", "coaps+ws"): 

398 key = PoolKey(message.requested_scheme, message.remote.hostinfo) 

399 

400 if key in self._pool: 

401 message.remote = self._pool[key] 

402 if message.remote._connection.open: 

403 return True 

404 # else try opening a new one 

405 

406 if key not in self._outgoing_starting: 

407 self._connect(key) 

408 # It's a bit unorthodox to wait for an (at least partially) 

409 # established connection in fill_or_recognize_remote, but it's 

410 # not completely off off either, and it makes it way easier to 

411 # not have partially initialized remotes around 

412 message.remote = await self._outgoing_starting[key] 

413 return True 

414 

415 return False 

416 

417 def send_message(self, message, messageerror_monitor): 

418 # Ignoring messageerror_monitor: CoAP over reliable transports has no 

419 # way of indicating that a particular message was bad, it always shuts 

420 # down the complete connection 

421 

422 if message.code.is_response(): 

423 no_response = (message.opt.no_response or 0) & ( 

424 1 << message.code.class_ - 1 

425 ) != 0 

426 if no_response: 

427 return 

428 

429 message.opt.no_response = None 

430 

431 message.remote._send_message(message) 

432 

433 async def shutdown(self): 

434 self._in_shutdown = True 

435 self.log.debug("Shutting down any connections on %r", self) 

436 

437 client_shutdowns = [ 

438 asyncio.create_task( 

439 c.release(), 

440 name="Close connection %s" % c, 

441 ) 

442 for c in self._pool.values() 

443 ] 

444 

445 server_shutdowns = [] 

446 while self._servers: 

447 s = self._servers.pop() 

448 # We could do something like 

449 # >>> for websocket in s.websockets: 

450 # >>> del websocket.logger.extra['websocket'] 

451 # to reduce the reference loops 

452 # (websocket.logger.extra['websocket'] == websocket), but as the 

453 # tests actually do run a GC collection once and that gets broken 

454 # up, it's not worth adding fragilty here 

455 s.close() 

456 server_shutdowns.append( 

457 asyncio.create_task(s.wait_closed(), name="Close server %s" % s), 

458 ) 

459 

460 # Placing client shutdowns before server shutdowns to give them a 

461 # chance to send out Abort messages; the .close() method could be more 

462 # helpful here by stopping new connections but letting us finish off 

463 # the old ones 

464 shutdowns = client_shutdowns + server_shutdowns 

465 if shutdowns: 

466 # wait is documented to require a non-empty set 

467 await asyncio.wait(shutdowns) 

468 

469 # Incoming message processing 

470 

471 async def _run_recv_loop(self, remote): 

472 remote._send_initial_csm() 

473 

474 while True: 

475 try: 

476 received = await remote._connection.recv() 

477 except websockets.exceptions.ConnectionClosed: 

478 # This check is purely needed to silence the warning printed 

479 # from tokenmanager, "Internal shutdown sequence msismatch: 

480 # error dispatched through tokenmanager after shutdown" -- and 

481 # is a symptom of https://github.com/chrysn/aiocoap/issues/284 

482 # and of the odd circumstance that we can't easily cancel the 

483 # _run_recv_loop tasks (as we should while that issue is 

484 # unresolved) in the shutdown handler. 

485 if not self._in_shutdown: 

486 # FIXME if deposited somewhere, mark that as stale? 

487 self._tokenmanager.dispatch_error( 

488 error.RemoteServerShutdown("Peer closed connection"), remote 

489 ) 

490 return 

491 

492 if not isinstance(received, bytes): 

493 await remote._abort_with_waiting( 

494 Message(code=ABORT, payload=b"Text frame received"), close_code=1003 

495 ) 

496 return 

497 

498 try: 

499 msg = _decode_message(received) 

500 except error.UnparsableMessage: 

501 await remote._abort_with_waiting( 

502 Message(code=ABORT, payload=b"Message parsing error"), 

503 close_code=1007, 

504 ) 

505 return 

506 

507 msg.remote = remote 

508 

509 if msg.code.is_signalling(): 

510 try: 

511 remote._process_signaling(msg) 

512 except rfc8323common.CloseConnection as e: 

513 self._tokenmanager.dispatch_error(e.args[0], msg.remote) 

514 self._pool.pop(remote._poolkey) 

515 await remote._connection.close() 

516 continue 

517 

518 if remote._remote_settings is None: 

519 remote.abort("No CSM received") 

520 return 

521 

522 if msg.code.is_response(): 

523 self._tokenmanager.process_response(msg) 

524 # ignoring the return value; unexpected responses can be the 

525 # asynchronous result of cancelled observations 

526 else: 

527 self._tokenmanager.process_request(msg)