Coverage for aiocoap/edhoc.py: 0%

296 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-18 17:48 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""Internal module containing types used inside EDHOC security contexts""" 

6 

7import abc 

8import enum 

9import io 

10from pathlib import Path 

11import random 

12from typing import Optional, Dict, Literal 

13import os 

14 

15import cbor2 

16import lakers 

17 

18from . import oscore, credentials, error 

19from . import Message 

20from .numbers import POST 

21 

22 

23def load_cbor_or_edn(filename: Path): 

24 """Common heuristic for whether something is CBOR or EDN""" 

25 import cbor_diag 

26 import cbor2 

27 

28 with filename.open("rb") as binary: 

29 try: 

30 result = cbor2.load(binary) 

31 except cbor2.CBORDecodeError: 

32 pass 

33 else: 

34 if binary.read(1) == b"": 

35 return result 

36 # else it apparently hasn't been CBOR all through... 

37 with filename.open() as textual: 

38 try: 

39 converted = cbor_diag.diag2cbor(textual.read()) 

40 except ValueError: 

41 raise credentials.CredentialsLoadError( 

42 "Data loaded from %s was recognized neither as CBOR nor CBOR Diagnostic Notation (EDN)" 

43 % filename 

44 ) 

45 # no need to check for completeness: diag2cbor doesn't do diagnostic 

46 # sequences, AIU that's not even a thing 

47 return cbor2.loads(converted) 

48 

49 

50class CoseKeyForEdhoc: 

51 kty: int 

52 crv: int 

53 d: bytes 

54 

55 @classmethod 

56 def from_file(cls, filename: Path) -> "CoseKeyForEdhoc": 

57 """Load a key from a file (in CBOR or EDN), asserting that the file is not group/world readable""" 

58 if filename.stat().st_mode & 0o077 != 0: 

59 raise credentials.CredentialsLoadError( 

60 "Refusing to load private key that is group or world accessible" 

61 ) 

62 

63 loaded = load_cbor_or_edn(filename) 

64 return cls.from_map(loaded) 

65 

66 @classmethod 

67 def from_map(cls, key: dict) -> "CoseKeyForEdhoc": 

68 if not isinstance(key, dict): 

69 raise credentials.CredentialsLoadError( 

70 "Data is not shaped like COSE_KEY (expected top-level dictionary)" 

71 ) 

72 if 1 not in key: 

73 raise credentials.CredentialsLoadError( 

74 "Data is not shaped like COSE_KEY (expected key 1 (kty) in top-level dictionary)" 

75 ) 

76 if key[1] != 2: 

77 raise credentials.CredentialsLoadError( 

78 "Private key type %s is not supported (currently only 2 (EC) is supported)" 

79 % (key[1],) 

80 ) 

81 

82 if key.get(-1) != 1: 

83 raise credentials.CredentialsLoadError( 

84 "Private key of type EC requires key -1 (crv), currently supported values: 1 (P-256)" 

85 ) 

86 

87 if not isinstance(key.get(-4), bytes) or len(key[-4]) != 32: 

88 raise credentials.CredentialsLoadError( 

89 "Private key of type EC P-256 requires key -4 (d) to be a 32-byte long byte string" 

90 ) 

91 

92 if any(k not in (1, -1, -4) for k in key): 

93 raise credentials.CredentialsLoadError( 

94 "Extraneous data in key, consider allow-listing the item if acceptable" 

95 ) 

96 

97 s = cls() 

98 s.kty = 2 # EC 

99 s.crv = 1 # P-256 

100 s.d = key[-4] 

101 

102 return s 

103 

104 def secret_to_map(self) -> dict: 

105 # kty: EC, crv: P-256, d: ... 

106 return {1: self.kty, -1: self.crv, -4: self.d} 

107 

108 # Should we deprecate filename, add a generate_in_file method? (It's there 

109 # because generate originally depended on a file system) 

110 @classmethod 

111 def generate(cls, filename: Optional[Path] = None) -> "CoseKeyForEdhoc": 

112 """Generate a key inside a file 

113 

114 This returns the generated private key. 

115 """ 

116 

117 from cryptography.hazmat.primitives.asymmetric import ec 

118 

119 key = ec.generate_private_key(curve=ec.SECP256R1()) 

120 

121 s = cls() 

122 s.kty = 2 # EC 

123 s.crv = 1 # P-256 

124 s.d = key.private_numbers().private_value.to_bytes(32, "big") 

125 

126 if filename is not None: 

127 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL 

128 if hasattr(os, "O_BINARY"): 

129 flags |= os.O_BINARY 

130 descriptor = os.open(filename, flags, mode=0o600) 

131 try: 

132 with open(descriptor, "wb") as keyfile: 

133 cbor2.dump(s.secret_to_map(), keyfile) 

134 except Exception: 

135 filename.unlink() 

136 raise 

137 

138 return s 

139 

140 def as_ccs( 

141 self, kid: Optional[bytes], subject: Optional[str] 

142 ) -> Dict[Literal[14], dict]: 

143 """Given a key, generate a corresponding KCCS""" 

144 

145 from cryptography.hazmat.primitives.asymmetric import ec 

146 

147 private = ec.derive_private_key(int.from_bytes(self.d, "big"), ec.SECP256R1()) 

148 public = private.public_key() 

149 

150 x = public.public_numbers().x.to_bytes(32, "big") 

151 y = public.public_numbers().y.to_bytes(32, "big") 

152 # kty: EC2, crv: P-256, x, y 

153 cosekey = {1: 2, -1: 1, -2: x, -3: y} 

154 if kid is not None: 

155 cosekey[2] = kid 

156 # cnf: COSE_Key 

157 credential_kccs: dict = {8: {1: cosekey}} 

158 if subject is not None: 

159 credential_kccs[2] = subject 

160 

161 # kccs: cnf 

162 return {14: credential_kccs} 

163 

164 

165class EdhocCredentials(credentials._Objectish): 

166 own_key: Optional[CoseKeyForEdhoc] 

167 suite: int 

168 method: int 

169 own_cred: Optional[dict] 

170 peer_cred: Optional[dict] 

171 

172 #: Whether the combined flow should be used when using this credential set. 

173 #: 

174 #: If unset or None, this the decision is left to the library (which at the 

175 #: time of writing always picks True). 

176 use_combined_edhoc: Optional[bool] 

177 

178 def __init__( 

179 self, 

180 suite: int, 

181 method: int, 

182 own_cred_style: Optional[str] = None, 

183 peer_cred: Optional[dict] = None, 

184 own_cred: Optional[dict] = None, 

185 private_key_file: Optional[str] = None, 

186 private_key: Optional[dict] = None, 

187 use_combined_edhoc: Optional[bool] = None, 

188 ): 

189 from . import edhoc 

190 

191 self.suite = suite 

192 self.method = method 

193 self.own_cred = own_cred 

194 self.peer_cred = peer_cred 

195 self.use_combined_edhoc = use_combined_edhoc 

196 

197 if private_key_file is not None and private_key is not None: 

198 raise credentials.CredentialsLoadError( 

199 "private_key is mutually exclusive with private_key_file" 

200 ) 

201 if private_key_file is not None: 

202 # FIXME: We should carry around a base 

203 private_key_path = Path(private_key_file) 

204 # FIXME: We left loading the file to the user, and now we're once more 

205 # in a position where we guess the file type 

206 self.own_key = CoseKeyForEdhoc.from_file(private_key_path) 

207 elif private_key is not None: 

208 self.own_key = CoseKeyForEdhoc.from_map(private_key) 

209 else: 

210 self.own_key = None 

211 

212 if own_cred_style is None: 

213 self.own_cred_style = None 

214 else: 

215 self.own_cred_style = edhoc.OwnCredStyle(own_cred_style) 

216 

217 if self.own_cred == {"unauthenticated": True}: 

218 if self.own_key is not None or self.own_cred_style not in ( 

219 None, 

220 OwnCredStyle.ByValue, 

221 ): 

222 raise credentials.CredentialsLoadError( 

223 "For local unauthenticated use, no key can be give, and own_cred_style needs to be by-value or absent" 

224 ) 

225 self.own_key = CoseKeyForEdhoc.generate() 

226 # FIXME: kid and subj should rather not be sent, but lakers insists on their presence 

227 self.own_cred = self.own_key.as_ccs(kid=b"?", subject="") 

228 self.own_cred_style = OwnCredStyle.ByValue 

229 else: 

230 if (own_cred is None) != (own_cred_style is None) or (own_cred is None) != ( 

231 self.own_key is None 

232 ): 

233 raise credentials.CredentialsLoadError( 

234 "If own credentials are given, all of own_cred, own_cred_style and private_key(_path) need to be given" 

235 ) 

236 

237 # FIXME: This is only used on the client side, and expects that all parts (own and peer) are present 

238 self._established_context = None 

239 

240 def find_edhoc_by_id_cred_peer(self, id_cred_peer): 

241 if self.peer_cred is None: 

242 return None 

243 if 14 not in self.peer_cred: 

244 # Only recognizing CCS so far 

245 return None 

246 

247 if id_cred_peer == cbor2.dumps(self.peer_cred, canonical=True): 

248 # credential by value 

249 return cbor2.dumps(self.peer_cred[14], canonical=True) 

250 

251 # cnf / COS_Key / kid, should be present in all CCS 

252 kid = self.peer_cred[14][8][1].get(2) 

253 if kid is not None and id_cred_peer == cbor2.dumps({4: kid}, canonical=True): 

254 # credential by kid 

255 return cbor2.dumps(self.peer_cred[14], canonical=True) 

256 

257 def peer_cred_is_unauthenticated(self): 

258 # FIXME: This is rather weird internal API, and rather weird 

259 # format-wise -- but it will suffice until credentials are rewritten. 

260 return self.peer_cred is not None and self.peer_cred == { 

261 "unauthenticated": True 

262 } 

263 

264 async def establish_context( 

265 self, 

266 wire, 

267 underlying_address, 

268 underlying_proxy_scheme, 

269 underlying_uri_host, 

270 logger, 

271 ): 

272 logger.info( 

273 "No OSCORE context found for EDHOC context %r, initiating one.", self 

274 ) 

275 # The semantic identifier (an arbitrary string) 

276 # 

277 # FIXME: We don't support role reversal yet, but once we 

278 # register this context to be available for incoming 

279 # requests, we'll have to pick more carefully 

280 c_i = bytes([random.randint(0, 23)]) 

281 initiator = lakers.EdhocInitiator() 

282 message_1 = initiator.prepare_message_1(c_i) 

283 

284 msg1 = Message( 

285 code=POST, 

286 proxy_scheme=underlying_proxy_scheme, 

287 uri_host=underlying_uri_host, 

288 uri_path=[".well-known", "edhoc"], 

289 payload=cbor2.dumps(True) + message_1, 

290 ) 

291 msg1.remote = underlying_address 

292 msg2 = await wire.request(msg1).response_raising 

293 

294 (c_r, id_cred_r, ead_2) = initiator.parse_message_2(msg2.payload) 

295 if any(e.is_critical() for e in ead_2): 

296 self.log.error("Aborting EDHOC: Critical EAD2 present") 

297 raise error.BadRequest 

298 

299 assert isinstance(self.own_cred, dict) and list(self.own_cred.keys()) == [14], ( 

300 "So far can only process CCS style own credentials a la {14: ...}, own_cred = %r" 

301 % self.own_cred 

302 ) 

303 cred_i = cbor2.dumps(self.own_cred[14], canonical=True) 

304 key_i = self.own_key.d 

305 

306 logger.debug("EDHOC responder sent message_2 with ID_CRED_R = %r", id_cred_r) 

307 if self.peer_cred_is_unauthenticated(): 

308 # Not doing further checks (eg. for trailing bytes) or re-raising: This 

309 # was already checked by lakers 

310 parsed = cbor2.loads(id_cred_r) 

311 

312 if 14 not in parsed: 

313 raise credentials.CredentialsMissingError( 

314 "Peer presented credential-by-reference (or anything else that's not a KCCS) when no credential was pre-agreed" 

315 ) 

316 

317 # We could also pick the [14] out of the serialized stream and 

318 # treat it as an opaque item, but cbor2 doesn't really do serialized items. 

319 cred_r = cbor2.dumps(parsed[14], canonical=True) 

320 else: 

321 # We could look into id_cred_r, which is a CBOR encoded 

322 # byte string, and could start comparing ... but actually 

323 # EDHOC and Lakers protect us from misbinding attacks (is 

324 # that what they are called?), so we can just put in our 

325 # expected credential here 

326 # 

327 # FIXME: But looking into it might give us a better error than just 

328 # "Mac2 verification failed" 

329 

330 # FIXME add assert on the structure or start doing the 

331 # generalization that'll fail at startup 

332 cred_r = cbor2.dumps(self.peer_cred[14], canonical=True) 

333 

334 initiator.verify_message_2( 

335 key_i, 

336 cred_i, 

337 cred_r, 

338 ) # odd that we provide that here rather than in the next function 

339 

340 logger.debug("Message 2 was verified") 

341 

342 secctx = EdhocInitiatorContext(initiator, c_i, c_r, self.own_cred_style, logger) 

343 

344 if self.use_combined_edhoc is not False: 

345 secctx.complete_without_message_4() 

346 # That's enough: Message 3 can well be sent along with the next 

347 # message. 

348 return secctx 

349 

350 logger.debug("Sending explicit message 3 without optimization") 

351 

352 message_3 = secctx._message_3 

353 assert message_3 is not None 

354 secctx._message_3 = None 

355 

356 if len(c_r) == 1 and (0 <= c_r[0] < 24 or 0x20 <= c_r[0] < 0x38): 

357 c_r_encoded = c_r 

358 else: 

359 c_r_encoded = cbor2.dumps(c_r) 

360 msg3 = Message( 

361 code=POST, 

362 proxy_scheme=underlying_proxy_scheme, 

363 uri_host=underlying_uri_host, 

364 uri_path=[".well-known", "edhoc"], 

365 payload=c_r_encoded + message_3, 

366 ) 

367 msg3.remote = underlying_address 

368 msg4 = await wire.request(msg3).response_raising 

369 

370 secctx.complete_with_message_4(msg4.payload) 

371 

372 logger.debug("Received message 4, context is ready") 

373 

374 return secctx 

375 

376 

377class _EdhocContextBase( 

378 oscore.CanProtect, oscore.CanUnprotect, oscore.SecurityContextUtils 

379): 

380 def __init__(self, logger): 

381 self.log = logger 

382 

383 def post_seqnoincrease(self): 

384 # The context is not persisted 

385 pass 

386 

387 def protect(self, message, request_id=None, *, kid_context=True): 

388 outer_message, request_id = super().protect( 

389 message, request_id=request_id, kid_context=kid_context 

390 ) 

391 message_3 = self.message_3_to_include() 

392 if message_3 is not None: 

393 outer_message.opt.edhoc = True 

394 outer_message.payload = message_3 + outer_message.payload 

395 return outer_message, request_id 

396 

397 def _make_ready(self, edhoc_context, c_ours, c_theirs): 

398 # FIXME: both should offer this 

399 if ( 

400 isinstance(edhoc_context, lakers.EdhocResponder) 

401 or edhoc_context.selected_cipher_suite() == 2 

402 ): 

403 self.alg_aead = oscore.algorithms["AES-CCM-16-64-128"] 

404 self.hashfun = oscore.hashfunctions["sha256"] 

405 else: 

406 raise RuntimeError("Unknown suite") 

407 

408 # we did check for critical EADs, there was no out-of-band agreement, so 8 it is 

409 oscore_salt_length = 8 

410 # I figure that one would be ageed out-of-band as well (currently no 

411 # options to set/change this are known) 

412 self.id_context = None 

413 self.recipient_replay_window = oscore.ReplayWindow(32, lambda: None) 

414 

415 master_secret = edhoc_context.edhoc_exporter(0, [], self.alg_aead.key_bytes) 

416 master_salt = edhoc_context.edhoc_exporter(1, [], oscore_salt_length) 

417 

418 self.sender_id = c_theirs 

419 self.recipient_id = c_ours 

420 if self.sender_id == self.recipient_id: 

421 raise ValueError("Bad IDs: identical ones were picked") 

422 

423 self.derive_keys(master_salt, master_secret) 

424 

425 self.sender_sequence_number = 0 

426 self.recipient_replay_window.initialize_empty() 

427 

428 self.log.debug("EDHOC context %r ready for OSCORE operation", self) 

429 

430 @abc.abstractmethod 

431 def message_3_to_include(self) -> Optional[bytes]: 

432 """An encoded message_3 to include in outgoing messages 

433 

434 This may modify self to only return something once.""" 

435 

436 

437class EdhocInitiatorContext(_EdhocContextBase): 

438 """An OSCORE context that is derived from an EDHOC exchange. 

439 

440 It does not require that the EDHOC exchange has completed -- it can be set 

441 up by an initiator already when message 2 has been received, prepares a 

442 message 3 at setup time, and sends it with the first request that is sent 

443 through it.""" 

444 

445 # FIXME: Should we rather send it with *every* request that is sent before a message 4 is received implicitly? 

446 def __init__(self, initiator, c_ours, c_theirs, cred_i_mode, logger): 

447 super().__init__(logger) 

448 

449 # Only this line is role specific 

450 self._message_3, _i_prk_out = initiator.prepare_message_3( 

451 cred_i_mode.as_lakers(), None 

452 ) 

453 

454 self._incomplete = True 

455 self._init_details = (initiator, c_ours, c_theirs) 

456 

457 def complete_without_message_4(self) -> None: 

458 (initiator, c_ours, c_theirs) = self._init_details 

459 initiator.completed_without_message_4() 

460 self._make_ready(initiator, c_ours, c_theirs) 

461 self._incomplete = False 

462 self._init_details = None 

463 

464 def complete_with_message_4(self, message_4: bytes) -> None: 

465 (initiator, c_ours, c_theirs) = self._init_details 

466 initiator.process_message_4(message_4) 

467 self._make_ready(initiator, c_ours, c_theirs) 

468 self._incomplete = False 

469 self._init_details = None 

470 

471 def message_3_to_include(self) -> Optional[bytes]: 

472 if self._message_3 is not None: 

473 result = self._message_3 

474 self._message_3 = None 

475 return result 

476 return None 

477 

478 

479class EdhocResponderContext(_EdhocContextBase): 

480 def __init__(self, responder, c_i, c_r, server_credentials, logger): 

481 super().__init__(logger) 

482 

483 # storing them where they will later be overwritten with themselves 

484 self.recipient_id = c_r 

485 self.sender_id = c_i 

486 

487 self._responder = responder 

488 # Through these we'll look up id_cred_i 

489 self._server_credentials = server_credentials 

490 

491 self.authenticated_claims = [] 

492 

493 # Not sure why mypy even tolerates this -- we're clearly not ready for 

494 # a general protect/unprotect, and things only work because all 

495 # relevant functions get their checks introduced 

496 self._incomplete = True 

497 

498 self._message_4 = None 

499 

500 def message_3_to_include(self) -> Optional[bytes]: 

501 # as a responder we never send one 

502 return None 

503 

504 def get_oscore_context_for(self, unprotected): 

505 if oscore.COSE_KID_CONTEXT in unprotected: 

506 return None 

507 if unprotected.get(oscore.COSE_KID) == self.recipient_id: 

508 return self 

509 

510 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

511 return set((self.recipient_id,)) 

512 

513 def protect(self, *args, **kwargs): 

514 if self._incomplete: 

515 raise RuntimeError( 

516 "EDHOC has not completed yet, waiting for message 3, can not protect own messages yet" 

517 ) 

518 return super().protect(*args, **kwargs) 

519 

520 def unprotect(self, protected_message, request_id=None): 

521 if self._incomplete: 

522 if not protected_message.opt.edhoc: 

523 self.log.error( 

524 "OSCORE failed: No EDHOC message 3 received and none present" 

525 ) 

526 raise error.BadRequest("EDHOC incomplete") 

527 

528 payload_stream = io.BytesIO(protected_message.payload) 

529 # discarding result -- just need to have a point to split 

530 _ = cbor2.load(payload_stream) 

531 m3len = payload_stream.tell() 

532 message_3 = protected_message.payload[:m3len] 

533 

534 self._offer_message_3(message_3) 

535 

536 protected_message = protected_message.copy( 

537 edhoc=False, payload=protected_message.payload[m3len:] 

538 ) 

539 

540 return super().unprotect(protected_message, request_id) 

541 

542 def _offer_message_3(self, message_3: bytes) -> bytes: 

543 """Send in a message 3, obtain a message 4 

544 

545 It is safe to discard the output, eg. when the CoAP EDHOC option is 

546 used.""" 

547 if self._incomplete: 

548 id_cred_i, ead_3 = self._responder.parse_message_3(message_3) 

549 if any(e.is_critical() for e in ead_3): 

550 self.log.error("Aborting EDHOC: Critical EAD3 present") 

551 raise error.BadRequest 

552 

553 try: 

554 (cred_i, claims) = self._server_credentials.find_edhoc_by_id_cred_peer( 

555 id_cred_i 

556 ) 

557 except KeyError: 

558 self.log.error( 

559 "Aborting EDHOC: No credentials found for client with id_cred_i=h'%s'", 

560 id_cred_i.hex(), 

561 ) 

562 raise error.BadRequest 

563 

564 self.authenticated_claims.extend(claims) 

565 

566 self._responder.verify_message_3(cred_i) 

567 self._message_4 = self._responder.prepare_message_4() 

568 

569 self._make_ready(self._responder, self.recipient_id, self.sender_id) 

570 self._incomplete = False 

571 

572 return self._message_4 

573 

574 

575class OwnCredStyle(enum.Enum): 

576 """Guidance for how the own credential should be sent in an EDHOC 

577 exchange""" 

578 

579 ByKeyId = "by-key-id" 

580 ByValue = "by-value" 

581 

582 def as_lakers(self): 

583 """Convert the enum into Lakers' reepresentation of the same concept. 

584 

585 The types may eventually be unified, but so far, Lakers doesn't make 

586 the distinctions we expect to make yet.""" 

587 if self == self.ByKeyId: 

588 # FIXME: Mismatch to be fixed in lakers -- currently the only way 

589 # it allows sending by reference is by Key ID 

590 return lakers.CredentialTransfer.ByReference 

591 if self == self.ByValue: 

592 return lakers.CredentialTransfer.ByValue 

593 else: 

594 raise RuntimeError("enum variant not covered")