Coverage for src/aiocoap/edhoc.py: 0%

293 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-12 11:18 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""Internal module containing types used inside EDHOC security contexts""" 

6 

7import abc 

8import enum 

9import io 

10from pathlib import Path 

11import random 

12from typing import Optional, Dict, Literal 

13import os 

14 

15import cbor2 

16import lakers 

17 

18from . import oscore, credentials, error 

19from . import Message 

20from .numbers import POST 

21 

22 

23def load_cbor_or_edn(filename: Path): 

24 """Common heuristic for whether something is CBOR or EDN""" 

25 import cbor_diag 

26 import cbor2 

27 

28 with filename.open("rb") as binary: 

29 try: 

30 result = cbor2.load(binary) 

31 except cbor2.CBORDecodeError: 

32 pass 

33 else: 

34 if binary.read(1) == b"": 

35 return result 

36 # else it apparently hasn't been CBOR all through... 

37 with filename.open() as textual: 

38 try: 

39 converted = cbor_diag.diag2cbor(textual.read()) 

40 except ValueError: 

41 raise credentials.CredentialsLoadError( 

42 "Data loaded from %s was recognized neither as CBOR nor CBOR Diagnostic Notation (EDN)" 

43 % filename 

44 ) 

45 # no need to check for completeness: diag2cbor doesn't do diagnostic 

46 # sequences, AIU that's not even a thing 

47 return cbor2.loads(converted) 

48 

49 

50class CoseKeyForEdhoc: 

51 kty: int 

52 crv: int 

53 d: bytes 

54 

55 @classmethod 

56 def from_file(cls, filename: Path) -> "CoseKeyForEdhoc": 

57 """Load a key from a file (in CBOR or EDN), asserting that the file is not group/world readable""" 

58 if filename.stat().st_mode & 0o077 != 0: 

59 raise credentials.CredentialsLoadError( 

60 "Refusing to load private key that is group or world accessible" 

61 ) 

62 

63 loaded = load_cbor_or_edn(filename) 

64 return cls.from_map(loaded) 

65 

66 @classmethod 

67 def from_map(cls, key: dict) -> "CoseKeyForEdhoc": 

68 if not isinstance(key, dict): 

69 raise credentials.CredentialsLoadError( 

70 "Data is not shaped like COSE_KEY (expected top-level dictionary)" 

71 ) 

72 if 1 not in key: 

73 raise credentials.CredentialsLoadError( 

74 "Data is not shaped like COSE_KEY (expected key 1 (kty) in top-level dictionary)" 

75 ) 

76 if key[1] != 2: 

77 raise credentials.CredentialsLoadError( 

78 "Private key type %s is not supported (currently only 2 (EC) is supported)" 

79 % (key[1],) 

80 ) 

81 

82 if key.get(-1) != 1: 

83 raise credentials.CredentialsLoadError( 

84 "Private key of type EC requires key -1 (crv), currently supported values: 1 (P-256)" 

85 ) 

86 

87 if not isinstance(key.get(-4), bytes) or len(key[-4]) != 32: 

88 raise credentials.CredentialsLoadError( 

89 "Private key of type EC P-256 requires key -4 (d) to be a 32-byte long byte string" 

90 ) 

91 

92 if any(k not in (1, -1, -4) for k in key): 

93 raise credentials.CredentialsLoadError( 

94 "Extraneous data in key, consider allow-listing the item if acceptable" 

95 ) 

96 

97 s = cls() 

98 s.kty = 2 # EC 

99 s.crv = 1 # P-256 

100 s.d = key[-4] 

101 

102 return s 

103 

104 def secret_to_map(self) -> dict: 

105 # kty: EC, crv: P-256, d: ... 

106 return {1: self.kty, -1: self.crv, -4: self.d} 

107 

108 # Should we deprecate filename, add a generate_in_file method? (It's there 

109 # because generate originally depended on a file system) 

110 @classmethod 

111 def generate(cls, filename: Optional[Path] = None) -> "CoseKeyForEdhoc": 

112 """Generate a key inside a file 

113 

114 This returns the generated private key. 

115 """ 

116 

117 from cryptography.hazmat.primitives.asymmetric import ec 

118 

119 key = ec.generate_private_key(curve=ec.SECP256R1()) 

120 

121 s = cls() 

122 s.kty = 2 # EC 

123 s.crv = 1 # P-256 

124 s.d = key.private_numbers().private_value.to_bytes(32, "big") 

125 

126 if filename is not None: 

127 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL 

128 if hasattr(os, "O_BINARY"): 

129 flags |= os.O_BINARY 

130 descriptor = os.open(filename, flags, mode=0o600) 

131 try: 

132 with open(descriptor, "wb") as keyfile: 

133 cbor2.dump(s.secret_to_map(), keyfile) 

134 except Exception: 

135 filename.unlink() 

136 raise 

137 

138 return s 

139 

140 def as_ccs( 

141 self, kid: Optional[bytes], subject: Optional[str] 

142 ) -> Dict[Literal[14], dict]: 

143 """Given a key, generate a corresponding KCCS""" 

144 

145 from cryptography.hazmat.primitives.asymmetric import ec 

146 

147 private = ec.derive_private_key(int.from_bytes(self.d, "big"), ec.SECP256R1()) 

148 public = private.public_key() 

149 

150 x = public.public_numbers().x.to_bytes(32, "big") 

151 y = public.public_numbers().y.to_bytes(32, "big") 

152 # kty: EC2, crv: P-256, x, y 

153 cosekey = {1: 2, -1: 1, -2: x, -3: y} 

154 if kid is not None: 

155 cosekey[2] = kid 

156 # cnf: COSE_Key 

157 credential_kccs: dict = {8: {1: cosekey}} 

158 if subject is not None: 

159 credential_kccs[2] = subject 

160 

161 # kccs: cnf 

162 return {14: credential_kccs} 

163 

164 

165class EdhocCredentials(credentials._Objectish): 

166 own_key: Optional[CoseKeyForEdhoc] 

167 suite: int 

168 method: int 

169 own_cred: Optional[dict] 

170 peer_cred: Optional[dict] 

171 

172 #: Whether the combined flow should be used when using this credential set. 

173 #: 

174 #: If unset or None, this the decision is left to the library (which at the 

175 #: time of writing always picks True). 

176 use_combined_edhoc: Optional[bool] 

177 

178 def __init__( 

179 self, 

180 suite: int, 

181 method: int, 

182 own_cred_style: Optional[str] = None, 

183 peer_cred: Optional[dict] = None, 

184 own_cred: Optional[dict] = None, 

185 private_key_file: Optional[str] = None, 

186 private_key: Optional[dict] = None, 

187 use_combined_edhoc: Optional[bool] = None, 

188 ): 

189 from . import edhoc 

190 

191 self.suite = suite 

192 self.method = method 

193 self.own_cred = own_cred 

194 self.peer_cred = peer_cred 

195 self.use_combined_edhoc = use_combined_edhoc 

196 

197 if private_key_file is not None and private_key is not None: 

198 raise credentials.CredentialsLoadError( 

199 "private_key is mutually exclusive with private_key_file" 

200 ) 

201 if private_key_file is not None: 

202 # FIXME: We should carry around a base 

203 private_key_path = Path(private_key_file) 

204 # FIXME: We left loading the file to the user, and now we're once more 

205 # in a position where we guess the file type 

206 self.own_key = CoseKeyForEdhoc.from_file(private_key_path) 

207 elif private_key is not None: 

208 self.own_key = CoseKeyForEdhoc.from_map(private_key) 

209 else: 

210 self.own_key = None 

211 

212 if own_cred_style is None: 

213 self.own_cred_style = None 

214 else: 

215 self.own_cred_style = edhoc.OwnCredStyle(own_cred_style) 

216 

217 if self.own_cred == {"unauthenticated": True}: 

218 if self.own_key is not None or self.own_cred_style not in ( 

219 None, 

220 OwnCredStyle.ByValue, 

221 ): 

222 raise credentials.CredentialsLoadError( 

223 "For local unauthenticated use, no key can be give, and own_cred_style needs to be by-value or absent" 

224 ) 

225 self.own_key = CoseKeyForEdhoc.generate() 

226 # FIXME: kid and subj should rather not be sent, but lakers insists on their presence 

227 self.own_cred = self.own_key.as_ccs(kid=b"?", subject="") 

228 self.own_cred_style = OwnCredStyle.ByValue 

229 else: 

230 if (own_cred is None) != (own_cred_style is None) or (own_cred is None) != ( 

231 self.own_key is None 

232 ): 

233 raise credentials.CredentialsLoadError( 

234 "If own credentials are given, all of own_cred, own_cred_style and private_key(_path) need to be given" 

235 ) 

236 

237 # FIXME: This is only used on the client side, and expects that all parts (own and peer) are present 

238 self._established_context = None 

239 

240 def find_edhoc_by_id_cred_peer(self, id_cred_peer): 

241 if self.peer_cred is None: 

242 return None 

243 if 14 not in self.peer_cred: 

244 # Only recognizing CCS so far 

245 return None 

246 

247 if id_cred_peer == cbor2.dumps(self.peer_cred, canonical=True): 

248 # credential by value 

249 return cbor2.dumps(self.peer_cred[14], canonical=True) 

250 

251 # cnf / COS_Key / kid, should be present in all CCS 

252 kid = self.peer_cred[14][8][1].get(2) 

253 if kid is not None and id_cred_peer == cbor2.dumps({4: kid}, canonical=True): 

254 # credential by kid 

255 return cbor2.dumps(self.peer_cred[14], canonical=True) 

256 

257 def peer_cred_is_unauthenticated(self): 

258 # FIXME: This is rather weird internal API, and rather weird 

259 # format-wise -- but it will suffice until credentials are rewritten. 

260 return self.peer_cred is not None and self.peer_cred == { 

261 "unauthenticated": True 

262 } 

263 

264 async def establish_context( 

265 self, 

266 wire, 

267 underlying_address, 

268 underlying_proxy_scheme, 

269 underlying_uri_host, 

270 logger, 

271 ): 

272 logger.info( 

273 "No OSCORE context found for EDHOC context %r, initiating one.", self 

274 ) 

275 # The semantic identifier (an arbitrary string) 

276 # 

277 # FIXME: We don't support role reversal yet, but once we 

278 # register this context to be available for incoming 

279 # requests, we'll have to pick more carefully 

280 c_i = bytes([random.randint(0, 23)]) 

281 initiator = lakers.EdhocInitiator() 

282 message_1 = initiator.prepare_message_1(c_i) 

283 

284 msg1 = Message( 

285 code=POST, 

286 proxy_scheme=underlying_proxy_scheme, 

287 uri_host=underlying_uri_host, 

288 uri_path=[".well-known", "edhoc"], 

289 payload=cbor2.dumps(True) + message_1, 

290 ) 

291 msg1.remote = underlying_address 

292 msg2 = await wire.request(msg1).response_raising 

293 

294 (c_r, id_cred_r, ead_2) = initiator.parse_message_2(msg2.payload) 

295 

296 assert isinstance(self.own_cred, dict) and list(self.own_cred.keys()) == [14], ( 

297 "So far can only process CCS style own credentials a la {14: ...}, own_cred = %r" 

298 % self.own_cred 

299 ) 

300 cred_i = cbor2.dumps(self.own_cred[14], canonical=True) 

301 key_i = self.own_key.d 

302 

303 logger.debug("EDHOC responder sent message_2 with ID_CRED_R = %r", id_cred_r) 

304 if self.peer_cred_is_unauthenticated(): 

305 # Not doing further checks (eg. for trailing bytes) or re-raising: This 

306 # was already checked by lakers 

307 parsed = cbor2.loads(id_cred_r) 

308 

309 if 14 not in parsed: 

310 raise credentials.CredentialsMissingError( 

311 "Peer presented credential-by-reference (or anything else that's not a KCCS) when no credential was pre-agreed" 

312 ) 

313 

314 # We could also pick the [14] out of the serialized stream and 

315 # treat it as an opaque item, but cbor2 doesn't really do serialized items. 

316 cred_r = cbor2.dumps(parsed[14], canonical=True) 

317 else: 

318 # We could look into id_cred_r, which is a CBOR encoded 

319 # byte string, and could start comparing ... but actually 

320 # EDHOC and Lakers protect us from misbinding attacks (is 

321 # that what they are called?), so we can just put in our 

322 # expected credential here 

323 # 

324 # FIXME: But looking into it might give us a better error than just 

325 # "Mac2 verification failed" 

326 

327 # FIXME add assert on the structure or start doing the 

328 # generalization that'll fail at startup 

329 cred_r = cbor2.dumps(self.peer_cred[14], canonical=True) 

330 

331 initiator.verify_message_2( 

332 key_i, 

333 cred_i, 

334 cred_r, 

335 ) # odd that we provide that here rather than in the next function 

336 

337 logger.debug("Message 2 was verified") 

338 

339 secctx = EdhocInitiatorContext(initiator, c_i, c_r, self.own_cred_style, logger) 

340 

341 if self.use_combined_edhoc is not False: 

342 secctx.complete_without_message_4() 

343 # That's enough: Message 3 can well be sent along with the next 

344 # message. 

345 return secctx 

346 

347 logger.debug("Sending explicit message 3 without optimization") 

348 

349 message_3 = secctx._message_3 

350 assert message_3 is not None 

351 secctx._message_3 = None 

352 

353 if len(c_r) == 1 and (0 <= c_r[0] < 24 or 0x20 <= c_r[0] < 0x38): 

354 c_r_encoded = c_r 

355 else: 

356 c_r_encoded = cbor2.dumps(c_r) 

357 msg3 = Message( 

358 code=POST, 

359 proxy_scheme=underlying_proxy_scheme, 

360 uri_host=underlying_uri_host, 

361 uri_path=[".well-known", "edhoc"], 

362 payload=c_r_encoded + message_3, 

363 ) 

364 msg3.remote = underlying_address 

365 msg4 = await wire.request(msg3).response_raising 

366 

367 secctx.complete_with_message_4(msg4.payload) 

368 

369 logger.debug("Received message 4, context is ready") 

370 

371 return secctx 

372 

373 

374class _EdhocContextBase( 

375 oscore.CanProtect, oscore.CanUnprotect, oscore.SecurityContextUtils 

376): 

377 def __init__(self, logger): 

378 self.log = logger 

379 

380 def post_seqnoincrease(self): 

381 # The context is not persisted 

382 pass 

383 

384 def protect(self, message, request_id=None, *, kid_context=True): 

385 outer_message, request_id = super().protect( 

386 message, request_id=request_id, kid_context=kid_context 

387 ) 

388 message_3 = self.message_3_to_include() 

389 if message_3 is not None: 

390 outer_message.opt.edhoc = True 

391 outer_message.payload = message_3 + outer_message.payload 

392 return outer_message, request_id 

393 

394 def _make_ready(self, edhoc_context, c_ours, c_theirs): 

395 # FIXME: both should offer this 

396 if ( 

397 isinstance(edhoc_context, lakers.EdhocResponder) 

398 or edhoc_context.selected_cipher_suite() == 2 

399 ): 

400 self.alg_aead = oscore.algorithms["AES-CCM-16-64-128"] 

401 self.hashfun = oscore.hashfunctions["sha256"] 

402 else: 

403 raise RuntimeError("Unknown suite") 

404 

405 # we did check for critical EADs, there was no out-of-band agreement, so 8 it is 

406 oscore_salt_length = 8 

407 # I figure that one would be ageed out-of-band as well (currently no 

408 # options to set/change this are known) 

409 self.id_context = None 

410 self.recipient_replay_window = oscore.ReplayWindow(32, lambda: None) 

411 

412 master_secret = edhoc_context.edhoc_exporter(0, [], self.alg_aead.key_bytes) 

413 master_salt = edhoc_context.edhoc_exporter(1, [], oscore_salt_length) 

414 

415 self.sender_id = c_theirs 

416 self.recipient_id = c_ours 

417 if self.sender_id == self.recipient_id: 

418 raise ValueError("Bad IDs: identical ones were picked") 

419 

420 self.derive_keys(master_salt, master_secret) 

421 

422 self.sender_sequence_number = 0 

423 self.recipient_replay_window.initialize_empty() 

424 

425 self.log.debug("EDHOC context %r ready for OSCORE operation", self) 

426 

427 @abc.abstractmethod 

428 def message_3_to_include(self) -> Optional[bytes]: 

429 """An encoded message_3 to include in outgoing messages 

430 

431 This may modify self to only return something once.""" 

432 

433 

434class EdhocInitiatorContext(_EdhocContextBase): 

435 """An OSCORE context that is derived from an EDHOC exchange. 

436 

437 It does not require that the EDHOC exchange has completed -- it can be set 

438 up by an initiator already when message 2 has been received, prepares a 

439 message 3 at setup time, and sends it with the first request that is sent 

440 through it.""" 

441 

442 # FIXME: Should we rather send it with *every* request that is sent before a message 4 is received implicitly? 

443 def __init__(self, initiator, c_ours, c_theirs, cred_i_mode, logger): 

444 super().__init__(logger) 

445 

446 # Only this line is role specific 

447 self._message_3, _i_prk_out = initiator.prepare_message_3( 

448 cred_i_mode.as_lakers(), None 

449 ) 

450 

451 self._incomplete = True 

452 self._init_details = (initiator, c_ours, c_theirs) 

453 

454 def complete_without_message_4(self) -> None: 

455 (initiator, c_ours, c_theirs) = self._init_details 

456 initiator.completed_without_message_4() 

457 self._make_ready(initiator, c_ours, c_theirs) 

458 self._incomplete = False 

459 self._init_details = None 

460 

461 def complete_with_message_4(self, message_4: bytes) -> None: 

462 (initiator, c_ours, c_theirs) = self._init_details 

463 initiator.process_message_4(message_4) 

464 self._make_ready(initiator, c_ours, c_theirs) 

465 self._incomplete = False 

466 self._init_details = None 

467 

468 def message_3_to_include(self) -> Optional[bytes]: 

469 if self._message_3 is not None: 

470 result = self._message_3 

471 self._message_3 = None 

472 return result 

473 return None 

474 

475 

476class EdhocResponderContext(_EdhocContextBase): 

477 def __init__(self, responder, c_i, c_r, server_credentials, logger): 

478 super().__init__(logger) 

479 

480 # storing them where they will later be overwritten with themselves 

481 self.recipient_id = c_r 

482 self.sender_id = c_i 

483 

484 self._responder = responder 

485 # Through these we'll look up id_cred_i 

486 self._server_credentials = server_credentials 

487 

488 self.authenticated_claims = [] 

489 

490 # Not sure why mypy even tolerates this -- we're clearly not ready for 

491 # a general protect/unprotect, and things only work because all 

492 # relevant functions get their checks introduced 

493 self._incomplete = True 

494 

495 self._message_4 = None 

496 

497 def message_3_to_include(self) -> Optional[bytes]: 

498 # as a responder we never send one 

499 return None 

500 

501 def get_oscore_context_for(self, unprotected): 

502 if oscore.COSE_KID_CONTEXT in unprotected: 

503 return None 

504 if unprotected.get(oscore.COSE_KID) == self.recipient_id: 

505 return self 

506 

507 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

508 return set((self.recipient_id,)) 

509 

510 def protect(self, *args, **kwargs): 

511 if self._incomplete: 

512 raise RuntimeError( 

513 "EDHOC has not completed yet, waiting for message 3, can not protect own messages yet" 

514 ) 

515 return super().protect(*args, **kwargs) 

516 

517 def unprotect(self, protected_message, request_id=None): 

518 if self._incomplete: 

519 if not protected_message.opt.edhoc: 

520 self.log.error( 

521 "OSCORE failed: No EDHOC message 3 received and none present" 

522 ) 

523 raise error.BadRequest("EDHOC incomplete") 

524 

525 payload_stream = io.BytesIO(protected_message.payload) 

526 # discarding result -- just need to have a point to split 

527 _ = cbor2.load(payload_stream) 

528 m3len = payload_stream.tell() 

529 message_3 = protected_message.payload[:m3len] 

530 

531 self._offer_message_3(message_3) 

532 

533 protected_message = protected_message.copy( 

534 edhoc=False, payload=protected_message.payload[m3len:] 

535 ) 

536 

537 return super().unprotect(protected_message, request_id) 

538 

539 def _offer_message_3(self, message_3: bytes) -> bytes: 

540 """Send in a message 3, obtain a message 4 

541 

542 It is safe to discard the output, eg. when the CoAP EDHOC option is 

543 used.""" 

544 if self._incomplete: 

545 id_cred_i, ead_3 = self._responder.parse_message_3(message_3) 

546 if ead_3 is not None: 

547 self.log.error("Aborting EDHOC: EAD3 present") 

548 raise error.BadRequest 

549 

550 try: 

551 (cred_i, claims) = self._server_credentials.find_edhoc_by_id_cred_peer( 

552 id_cred_i 

553 ) 

554 except KeyError: 

555 self.log.error( 

556 "Aborting EDHOC: No credentials found for client with id_cred_i=h'%s'", 

557 id_cred_i.hex(), 

558 ) 

559 raise error.BadRequest 

560 

561 self.authenticated_claims.extend(claims) 

562 

563 self._responder.verify_message_3(cred_i) 

564 self._message_4 = self._responder.prepare_message_4() 

565 

566 self._make_ready(self._responder, self.recipient_id, self.sender_id) 

567 self._incomplete = False 

568 

569 return self._message_4 

570 

571 

572class OwnCredStyle(enum.Enum): 

573 """Guidance for how the own credential should be sent in an EDHOC 

574 exchange""" 

575 

576 ByKeyId = "by-key-id" 

577 ByValue = "by-value" 

578 

579 def as_lakers(self): 

580 """Convert the enum into Lakers' reepresentation of the same concept. 

581 

582 The types may eventually be unified, but so far, Lakers doesn't make 

583 the distinctions we expect to make yet.""" 

584 if self == self.ByKeyId: 

585 # FIXME: Mismatch to be fixed in lakers -- currently the only way 

586 # it allows sending by reference is by Key ID 

587 return lakers.CredentialTransfer.ByReference 

588 if self == self.ByValue: 

589 return lakers.CredentialTransfer.ByValue 

590 else: 

591 raise RuntimeError("enum variant not covered")