Coverage for aiocoap/edhoc.py: 85%

288 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-05 18:37 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""Internal module containing types used inside EDHOC security contexts""" 

6 

7import abc 

8import enum 

9import io 

10from pathlib import Path 

11import random 

12from typing import Optional, Dict, Literal 

13import os 

14 

15import cbor2 

16import lakers 

17 

18from . import oscore, credentials, error 

19from . import Message 

20from .numbers import POST 

21from .numbers.eaditem import EADLabel 

22 

23 

24def load_cbor_or_edn(filename: Path): 

25 """Common heuristic for whether something is CBOR or EDN""" 

26 import cbor_diag 

27 import cbor2 

28 

29 with filename.open("rb") as binary: 

30 try: 

31 result = cbor2.load(binary) 

32 except cbor2.CBORDecodeError: 

33 pass 

34 else: 

35 if binary.read(1) == b"": 

36 return result 

37 # else it apparently hasn't been CBOR all through... 

38 with filename.open() as textual: 

39 try: 

40 converted = cbor_diag.diag2cbor(textual.read()) 

41 except ValueError: 

42 raise credentials.CredentialsLoadError( 

43 "Data loaded from %s was recognized neither as CBOR nor CBOR Diagnostic Notation (EDN)" 

44 % filename 

45 ) 

46 # no need to check for completeness: diag2cbor doesn't do diagnostic 

47 # sequences, AIU that's not even a thing 

48 return cbor2.loads(converted) 

49 

50 

51class CoseKeyForEdhoc: 

52 kty: int 

53 crv: int 

54 d: bytes 

55 

56 @classmethod 

57 def from_file(cls, filename: Path) -> "CoseKeyForEdhoc": 

58 """Load a key from a file (in CBOR or EDN), asserting that the file is not group/world readable""" 

59 if filename.stat().st_mode & 0o077 != 0: 

60 raise credentials.CredentialsLoadError( 

61 "Refusing to load private key that is group or world accessible" 

62 ) 

63 

64 loaded = load_cbor_or_edn(filename) 

65 return cls.from_map(loaded) 

66 

67 @classmethod 

68 def from_map(cls, key: dict) -> "CoseKeyForEdhoc": 

69 if not isinstance(key, dict): 

70 raise credentials.CredentialsLoadError( 

71 "Data is not shaped like COSE_KEY (expected top-level dictionary)" 

72 ) 

73 if 1 not in key: 

74 raise credentials.CredentialsLoadError( 

75 "Data is not shaped like COSE_KEY (expected key 1 (kty) in top-level dictionary)" 

76 ) 

77 if key[1] != 2: 

78 raise credentials.CredentialsLoadError( 

79 "Private key type %s is not supported (currently only 2 (EC) is supported)" 

80 % (key[1],) 

81 ) 

82 

83 if key.get(-1) != 1: 

84 raise credentials.CredentialsLoadError( 

85 "Private key of type EC requires key -1 (crv), currently supported values: 1 (P-256)" 

86 ) 

87 

88 if not isinstance(key.get(-4), bytes) or len(key[-4]) != 32: 

89 raise credentials.CredentialsLoadError( 

90 "Private key of type EC P-256 requires key -4 (d) to be a 32-byte long byte string" 

91 ) 

92 

93 if any(k not in (1, -1, -4) for k in key): 

94 raise credentials.CredentialsLoadError( 

95 "Extraneous data in key, consider allow-listing the item if acceptable" 

96 ) 

97 

98 s = cls() 

99 s.kty = 2 # EC 

100 s.crv = 1 # P-256 

101 s.d = key[-4] 

102 

103 return s 

104 

105 def secret_to_map(self) -> dict: 

106 # kty: EC, crv: P-256, d: ... 

107 return {1: self.kty, -1: self.crv, -4: self.d} 

108 

109 # Should we deprecate filename, add a generate_in_file method? (It's there 

110 # because generate originally depended on a file system) 

111 @classmethod 

112 def generate(cls, filename: Optional[Path] = None) -> "CoseKeyForEdhoc": 

113 """Generate a key inside a file 

114 

115 This returns the generated private key. 

116 """ 

117 

118 from cryptography.hazmat.primitives.asymmetric import ec 

119 

120 key = ec.generate_private_key(curve=ec.SECP256R1()) 

121 

122 s = cls() 

123 s.kty = 2 # EC 

124 s.crv = 1 # P-256 

125 s.d = key.private_numbers().private_value.to_bytes(32, "big") 

126 

127 if filename is not None: 

128 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL 

129 if hasattr(os, "O_BINARY"): 

130 flags |= os.O_BINARY 

131 descriptor = os.open(filename, flags, mode=0o600) 

132 try: 

133 with open(descriptor, "wb") as keyfile: 

134 cbor2.dump(s.secret_to_map(), keyfile) 

135 except Exception: 

136 filename.unlink() 

137 raise 

138 

139 return s 

140 

141 def as_ccs( 

142 self, kid: Optional[bytes], subject: Optional[str] 

143 ) -> Dict[Literal[14], dict]: 

144 """Given a key, generate a corresponding KCCS""" 

145 

146 from cryptography.hazmat.primitives.asymmetric import ec 

147 

148 private = ec.derive_private_key(int.from_bytes(self.d, "big"), ec.SECP256R1()) 

149 public = private.public_key() 

150 

151 x = public.public_numbers().x.to_bytes(32, "big") 

152 y = public.public_numbers().y.to_bytes(32, "big") 

153 # kty: EC2, crv: P-256, x, y 

154 cosekey = {1: 2, -1: 1, -2: x, -3: y} 

155 if kid is not None: 

156 cosekey[2] = kid 

157 # cnf: COSE_Key 

158 credential_kccs: dict = {8: {1: cosekey}} 

159 if subject is not None: 

160 credential_kccs[2] = subject 

161 

162 # kccs: cnf 

163 return {14: credential_kccs} 

164 

165 

166class EdhocCredentials(credentials._Objectish): 

167 own_key: Optional[CoseKeyForEdhoc] 

168 suite: int 

169 method: int 

170 own_cred: Optional[dict] 

171 peer_cred: Optional[dict] 

172 

173 #: Whether the combined flow should be used when using this credential set. 

174 #: 

175 #: If unset or None, this the decision is left to the library (which at the 

176 #: time of writing always picks True). 

177 use_combined_edhoc: Optional[bool] 

178 

179 def __init__( 

180 self, 

181 suite: int, 

182 method: int, 

183 own_cred_style: Optional[str] = None, 

184 peer_cred: Optional[dict] = None, 

185 own_cred: Optional[dict] = None, 

186 private_key_file: Optional[str] = None, 

187 private_key: Optional[dict] = None, 

188 use_combined_edhoc: Optional[bool] = None, 

189 ): 

190 from . import edhoc 

191 

192 self.suite = suite 

193 self.method = method 

194 self.own_cred = own_cred 

195 self.peer_cred = peer_cred 

196 self.use_combined_edhoc = use_combined_edhoc 

197 

198 if private_key_file is not None and private_key is not None: 

199 raise credentials.CredentialsLoadError( 

200 "private_key is mutually exclusive with private_key_file" 

201 ) 

202 if private_key_file is not None: 

203 # FIXME: We should carry around a base 

204 private_key_path = Path(private_key_file) 

205 # FIXME: We left loading the file to the user, and now we're once more 

206 # in a position where we guess the file type 

207 self.own_key = CoseKeyForEdhoc.from_file(private_key_path) 

208 elif private_key is not None: 

209 self.own_key = CoseKeyForEdhoc.from_map(private_key) 

210 else: 

211 self.own_key = None 

212 

213 if own_cred_style is None: 

214 self.own_cred_style = None 

215 else: 

216 self.own_cred_style = edhoc.OwnCredStyle(own_cred_style) 

217 

218 if self.own_cred == {"unauthenticated": True}: 

219 if self.own_key is not None or self.own_cred_style not in ( 

220 None, 

221 OwnCredStyle.ByValue, 

222 ): 

223 raise credentials.CredentialsLoadError( 

224 "For local unauthenticated use, no key can be give, and own_cred_style needs to be by-value or absent" 

225 ) 

226 self.own_key = CoseKeyForEdhoc.generate() 

227 # FIXME: kid and subj should rather not be sent, but lakers insists on their presence 

228 self.own_cred = self.own_key.as_ccs(kid=b"?", subject="") 

229 self.own_cred_style = OwnCredStyle.ByValue 

230 else: 

231 if (own_cred is None) != (own_cred_style is None) or (own_cred is None) != ( 

232 self.own_key is None 

233 ): 

234 raise credentials.CredentialsLoadError( 

235 "If own credentials are given, all of own_cred, own_cred_style and private_key(_path) need to be given" 

236 ) 

237 

238 # FIXME: This is only used on the client side, and expects that all parts (own and peer) are present 

239 self._established_context = None 

240 

241 def find_edhoc_by_id_cred_peer(self, id_cred_peer): 

242 if self.peer_cred is None: 

243 return None 

244 if 14 not in self.peer_cred: 

245 # Only recognizing CCS so far 

246 return None 

247 

248 if id_cred_peer == cbor2.dumps(self.peer_cred, canonical=True): 

249 # credential by value 

250 return cbor2.dumps(self.peer_cred[14], canonical=True) 

251 

252 # cnf / COS_Key / kid, should be present in all CCS 

253 kid = self.peer_cred[14][8][1].get(2) 

254 if kid is not None and id_cred_peer == cbor2.dumps({4: kid}, canonical=True): 

255 # credential by kid 

256 return cbor2.dumps(self.peer_cred[14], canonical=True) 

257 

258 def peer_cred_is_unauthenticated(self): 

259 # FIXME: This is rather weird internal API, and rather weird 

260 # format-wise -- but it will suffice until credentials are rewritten. 

261 return self.peer_cred is not None and self.peer_cred == { 

262 "unauthenticated": True 

263 } 

264 

265 async def establish_context( 

266 self, 

267 wire, 

268 underlying_address, 

269 underlying_proxy_scheme, 

270 underlying_uri_host, 

271 logger, 

272 ): 

273 logger.info( 

274 "No OSCORE context found for EDHOC context %r, initiating one.", self 

275 ) 

276 # The semantic identifier (an arbitrary string) 

277 # 

278 # FIXME: We don't support role reversal yet, but once we 

279 # register this context to be available for incoming 

280 # requests, we'll have to pick more carefully 

281 c_i = bytes([random.randint(0, 23)]) 

282 initiator = lakers.EdhocInitiator() 

283 message_1 = initiator.prepare_message_1( 

284 c_i, 

285 # We could also send this depending on our configuration, ie. not 

286 # send it if we do expect credentials. But that'll also reveal to 

287 # an extent that we are ready to do TOFU, so sending it 

288 # unconditionally is a good first step, also because it allows 

289 # peers to migrate towards sending by reference as a default. 

290 [lakers.EADItem(EADLabel.CRED_BY_VALUE, is_critical=False)], 

291 ) 

292 

293 msg1 = Message( 

294 code=POST, 

295 proxy_scheme=underlying_proxy_scheme, 

296 uri_host=underlying_uri_host, 

297 uri_path=[".well-known", "edhoc"], 

298 payload=cbor2.dumps(True) + message_1, 

299 ) 

300 msg1.remote = underlying_address 

301 msg2 = await wire.request(msg1).response_raising 

302 

303 (c_r, id_cred_r, ead_2) = initiator.parse_message_2(msg2.payload) 

304 if any(e.is_critical() for e in ead_2): 

305 self.log.error("Aborting EDHOC: Critical EAD2 present") 

306 raise error.BadRequest 

307 

308 assert isinstance(self.own_cred, dict) and list(self.own_cred.keys()) == [14], ( 

309 "So far can only process CCS style own credentials a la {14: ...}, own_cred = %r" 

310 % self.own_cred 

311 ) 

312 cred_i = cbor2.dumps(self.own_cred[14], canonical=True) 

313 key_i = self.own_key.d 

314 

315 logger.debug("EDHOC responder sent message_2 with ID_CRED_R = %r", id_cred_r) 

316 if self.peer_cred_is_unauthenticated(): 

317 # Not doing further checks (eg. for trailing bytes) or re-raising: This 

318 # was already checked by lakers 

319 parsed = cbor2.loads(id_cred_r) 

320 

321 if 14 not in parsed: 

322 raise credentials.CredentialsMissingError( 

323 "Peer presented credential-by-reference (or anything else that's not a KCCS) when no credential was pre-agreed" 

324 ) 

325 

326 # We could also pick the [14] out of the serialized stream and 

327 # treat it as an opaque item, but cbor2 doesn't really do serialized items. 

328 cred_r = cbor2.dumps(parsed[14], canonical=True) 

329 else: 

330 # We could look into id_cred_r, which is a CBOR encoded 

331 # byte string, and could start comparing ... but actually 

332 # EDHOC and Lakers protect us from misbinding attacks (is 

333 # that what they are called?), so we can just put in our 

334 # expected credential here 

335 # 

336 # FIXME: But looking into it might give us a better error than just 

337 # "Mac2 verification failed" 

338 

339 # FIXME add assert on the structure or start doing the 

340 # generalization that'll fail at startup 

341 cred_r = cbor2.dumps(self.peer_cred[14], canonical=True) 

342 

343 initiator.verify_message_2( 

344 key_i, 

345 cred_i, 

346 cred_r, 

347 ) # odd that we provide that here rather than in the next function 

348 

349 logger.debug("Message 2 was verified") 

350 

351 secctx = EdhocInitiatorContext(initiator, c_i, c_r, self.own_cred_style, logger) 

352 

353 if self.use_combined_edhoc is not False: 

354 secctx.complete_without_message_4() 

355 # That's enough: Message 3 can well be sent along with the next 

356 # message. 

357 return secctx 

358 

359 logger.debug("Sending explicit message 3 without optimization") 

360 

361 message_3 = secctx._message_3 

362 assert message_3 is not None 

363 secctx._message_3 = None 

364 

365 if len(c_r) == 1 and (0 <= c_r[0] < 24 or 0x20 <= c_r[0] < 0x38): 

366 c_r_encoded = c_r 

367 else: 

368 c_r_encoded = cbor2.dumps(c_r) 

369 msg3 = Message( 

370 code=POST, 

371 proxy_scheme=underlying_proxy_scheme, 

372 uri_host=underlying_uri_host, 

373 uri_path=[".well-known", "edhoc"], 

374 payload=c_r_encoded + message_3, 

375 ) 

376 msg3.remote = underlying_address 

377 msg4 = await wire.request(msg3).response_raising 

378 

379 secctx.complete_with_message_4(msg4.payload) 

380 

381 logger.debug("Received message 4, context is ready") 

382 

383 return secctx 

384 

385 

386class _EdhocContextBase( 

387 oscore.CanProtect, oscore.CanUnprotect, oscore.SecurityContextUtils 

388): 

389 def __init__(self, logger): 

390 self.log = logger 

391 

392 def post_seqnoincrease(self): 

393 # The context is not persisted 

394 pass 

395 

396 def protect(self, message, request_id=None, *, kid_context=True): 

397 outer_message, request_id = super().protect( 

398 message, request_id=request_id, kid_context=kid_context 

399 ) 

400 message_3 = self.message_3_to_include() 

401 if message_3 is not None: 

402 outer_message.opt.edhoc = True 

403 outer_message.payload = message_3 + outer_message.payload 

404 return outer_message, request_id 

405 

406 def _make_ready(self, edhoc_context, c_ours, c_theirs): 

407 # FIXME: both should offer this 

408 if ( 

409 isinstance(edhoc_context, lakers.EdhocResponder) 

410 or edhoc_context.selected_cipher_suite() == 2 

411 ): 

412 self.alg_aead = oscore.algorithms["AES-CCM-16-64-128"] 

413 self.hashfun = oscore.hashfunctions["sha256"] 

414 else: 

415 raise RuntimeError("Unknown suite") 

416 

417 # we did check for critical EADs, there was no out-of-band agreement, so 8 it is 

418 oscore_salt_length = 8 

419 # I figure that one would be ageed out-of-band as well (currently no 

420 # options to set/change this are known) 

421 self.id_context = None 

422 self.recipient_replay_window = oscore.ReplayWindow(32, lambda: None) 

423 

424 master_secret = edhoc_context.edhoc_exporter(0, [], self.alg_aead.key_bytes) 

425 master_salt = edhoc_context.edhoc_exporter(1, [], oscore_salt_length) 

426 

427 self.sender_id = c_theirs 

428 self.recipient_id = c_ours 

429 if self.sender_id == self.recipient_id: 

430 raise ValueError("Bad IDs: identical ones were picked") 

431 

432 self.derive_keys(master_salt, master_secret) 

433 

434 self.sender_sequence_number = 0 

435 self.recipient_replay_window.initialize_empty() 

436 

437 self.log.debug("EDHOC context %r ready for OSCORE operation", self) 

438 

439 @abc.abstractmethod 

440 def message_3_to_include(self) -> Optional[bytes]: 

441 """An encoded message_3 to include in outgoing messages 

442 

443 This may modify self to only return something once.""" 

444 

445 

446class EdhocInitiatorContext(_EdhocContextBase): 

447 """An OSCORE context that is derived from an EDHOC exchange. 

448 

449 It does not require that the EDHOC exchange has completed -- it can be set 

450 up by an initiator already when message 2 has been received, prepares a 

451 message 3 at setup time, and sends it with the first request that is sent 

452 through it.""" 

453 

454 # FIXME: Should we rather send it with *every* request that is sent before a message 4 is received implicitly? 

455 def __init__(self, initiator, c_ours, c_theirs, cred_i_mode, logger): 

456 super().__init__(logger) 

457 

458 # Only this line is role specific 

459 self._message_3, _i_prk_out = initiator.prepare_message_3( 

460 cred_i_mode.as_lakers(), None 

461 ) 

462 

463 self._incomplete = True 

464 self._init_details = (initiator, c_ours, c_theirs) 

465 

466 def complete_without_message_4(self) -> None: 

467 (initiator, c_ours, c_theirs) = self._init_details 

468 initiator.completed_without_message_4() 

469 self._make_ready(initiator, c_ours, c_theirs) 

470 self._incomplete = False 

471 self._init_details = None 

472 

473 def complete_with_message_4(self, message_4: bytes) -> None: 

474 (initiator, c_ours, c_theirs) = self._init_details 

475 initiator.process_message_4(message_4) 

476 self._make_ready(initiator, c_ours, c_theirs) 

477 self._incomplete = False 

478 self._init_details = None 

479 

480 def message_3_to_include(self) -> Optional[bytes]: 

481 if self._message_3 is not None: 

482 result = self._message_3 

483 self._message_3 = None 

484 return result 

485 return None 

486 

487 

488class EdhocResponderContext(_EdhocContextBase): 

489 def __init__(self, responder, c_i, c_r, server_credentials, logger): 

490 super().__init__(logger) 

491 

492 # storing them where they will later be overwritten with themselves 

493 self.recipient_id = c_r 

494 self.sender_id = c_i 

495 

496 self._responder = responder 

497 # Through these we'll look up id_cred_i 

498 self._server_credentials = server_credentials 

499 

500 self.authenticated_claims = [] 

501 

502 # Not sure why mypy even tolerates this -- we're clearly not ready for 

503 # a general protect/unprotect, and things only work because all 

504 # relevant functions get their checks introduced 

505 self._incomplete = True 

506 

507 self._message_4 = None 

508 

509 def message_3_to_include(self) -> Optional[bytes]: 

510 # as a responder we never send one 

511 return None 

512 

513 def get_oscore_context_for(self, unprotected): 

514 if oscore.COSE_KID_CONTEXT in unprotected: 

515 return None 

516 if unprotected.get(oscore.COSE_KID) == self.recipient_id: 

517 return self 

518 

519 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

520 return set((self.recipient_id,)) 

521 

522 def protect(self, *args, **kwargs): 

523 if self._incomplete: 

524 raise RuntimeError( 

525 "EDHOC has not completed yet, waiting for message 3, can not protect own messages yet" 

526 ) 

527 return super().protect(*args, **kwargs) 

528 

529 def unprotect(self, protected_message, request_id=None): 

530 if self._incomplete: 

531 if not protected_message.opt.edhoc: 

532 self.log.error( 

533 "OSCORE failed: No EDHOC message 3 received and none present" 

534 ) 

535 raise error.BadRequest("EDHOC incomplete") 

536 

537 payload_stream = io.BytesIO(protected_message.payload) 

538 # discarding result -- just need to have a point to split 

539 _ = cbor2.load(payload_stream) 

540 m3len = payload_stream.tell() 

541 message_3 = protected_message.payload[:m3len] 

542 

543 self._offer_message_3(message_3) 

544 

545 protected_message = protected_message.copy( 

546 edhoc=False, payload=protected_message.payload[m3len:] 

547 ) 

548 

549 return super().unprotect(protected_message, request_id) 

550 

551 def _offer_message_3(self, message_3: bytes) -> bytes: 

552 """Send in a message 3, obtain a message 4 

553 

554 It is safe to discard the output, eg. when the CoAP EDHOC option is 

555 used.""" 

556 if self._incomplete: 

557 id_cred_i, ead_3 = self._responder.parse_message_3(message_3) 

558 if any(e.is_critical() for e in ead_3): 

559 self.log.error("Aborting EDHOC: Critical EAD3 present") 

560 raise error.BadRequest 

561 

562 try: 

563 (cred_i, claims) = self._server_credentials.find_edhoc_by_id_cred_peer( 

564 id_cred_i 

565 ) 

566 except KeyError: 

567 self.log.error( 

568 "Aborting EDHOC: No credentials found for client with id_cred_i=h'%s'", 

569 id_cred_i.hex(), 

570 ) 

571 raise error.BadRequest 

572 

573 self.authenticated_claims.extend(claims) 

574 

575 self._responder.verify_message_3(cred_i) 

576 self._message_4 = self._responder.prepare_message_4() 

577 

578 self._make_ready(self._responder, self.recipient_id, self.sender_id) 

579 self._incomplete = False 

580 

581 return self._message_4 

582 

583 

584class OwnCredStyle(enum.Enum): 

585 """Guidance for how the own credential should be sent in an EDHOC 

586 exchange""" 

587 

588 ByKeyId = "by-key-id" 

589 ByValue = "by-value" 

590 

591 def as_lakers(self): 

592 """Convert the enum into Lakers' reepresentation of the same concept. 

593 

594 The types may eventually be unified, but so far, Lakers doesn't make 

595 the distinctions we expect to make yet.""" 

596 if self == self.ByKeyId: 

597 # FIXME: Mismatch to be fixed in lakers -- currently the only way 

598 # it allows sending by reference is by Key ID 

599 return lakers.CredentialTransfer.ByReference 

600 if self == self.ByValue: 

601 return lakers.CredentialTransfer.ByValue 

602 else: 

603 raise RuntimeError("enum variant not covered")