Coverage for aiocoap / edhoc.py: 86%

314 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-22 09:52 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""Internal module containing types used inside EDHOC security contexts""" 

6 

7import abc 

8import enum 

9import io 

10from pathlib import Path 

11import random 

12from typing import Optional, Dict, Literal 

13import os 

14 

15import cbor2 

16import lakers 

17 

18from . import oscore, credentials, error 

19from . import Message 

20from .numbers import POST 

21from .numbers.eaditem import EADLabel, grease_labels 

22 

23 

24def load_cbor_or_edn(filename: Path): 

25 """Common heuristic for whether something is CBOR or EDN""" 

26 import cbor_diag 

27 import cbor2 

28 

29 with filename.open("rb") as binary: 

30 try: 

31 result = cbor2.load(binary) 

32 except cbor2.CBORDecodeError: 

33 pass 

34 else: 

35 if binary.read(1) == b"": 

36 return result 

37 # else it apparently hasn't been CBOR all through... 

38 with filename.open() as textual: 

39 try: 

40 converted = cbor_diag.diag2cbor(textual.read()) 

41 except ValueError: 

42 raise credentials.CredentialsLoadError( 

43 "Data loaded from %s was recognized neither as CBOR nor CBOR Diagnostic Notation (EDN)" 

44 % filename 

45 ) 

46 # no need to check for completeness: diag2cbor doesn't do diagnostic 

47 # sequences, AIU that's not even a thing 

48 return cbor2.loads(converted) 

49 

50 

51class CoseKeyForEdhoc: 

52 kty: int 

53 crv: int 

54 d: bytes 

55 

56 @classmethod 

57 def from_file(cls, filename: Path) -> "CoseKeyForEdhoc": 

58 """Load a key from a file (in CBOR or EDN), asserting that the file is not group/world readable""" 

59 if filename.stat().st_mode & 0o077 != 0: 

60 raise credentials.CredentialsLoadError( 

61 "Refusing to load private key that is group or world accessible" 

62 ) 

63 

64 loaded = load_cbor_or_edn(filename) 

65 return cls.from_map(loaded) 

66 

67 @classmethod 

68 def from_map(cls, key: dict) -> "CoseKeyForEdhoc": 

69 if not isinstance(key, dict): 

70 raise credentials.CredentialsLoadError( 

71 "Data is not shaped like COSE_KEY (expected top-level dictionary)" 

72 ) 

73 if 1 not in key: 

74 raise credentials.CredentialsLoadError( 

75 "Data is not shaped like COSE_KEY (expected key 1 (kty) in top-level dictionary)" 

76 ) 

77 if key[1] != 2: 

78 raise credentials.CredentialsLoadError( 

79 "Private key type %s is not supported (currently only 2 (EC) is supported)" 

80 % (key[1],) 

81 ) 

82 

83 if key.get(-1) != 1: 

84 raise credentials.CredentialsLoadError( 

85 "Private key of type EC requires key -1 (crv), currently supported values: 1 (P-256)" 

86 ) 

87 

88 if not isinstance(key.get(-4), bytes) or len(key[-4]) != 32: 

89 raise credentials.CredentialsLoadError( 

90 "Private key of type EC P-256 requires key -4 (d) to be a 32-byte long byte string" 

91 ) 

92 

93 if any(k not in (1, -1, -4) for k in key): 

94 raise credentials.CredentialsLoadError( 

95 "Extraneous data in key, consider allow-listing the item if acceptable" 

96 ) 

97 

98 s = cls() 

99 s.kty = 2 # EC 

100 s.crv = 1 # P-256 

101 s.d = key[-4] 

102 

103 return s 

104 

105 def secret_to_map(self) -> dict: 

106 # kty: EC, crv: P-256, d: ... 

107 return {1: self.kty, -1: self.crv, -4: self.d} 

108 

109 # Should we deprecate filename, add a generate_in_file method? (It's there 

110 # because generate originally depended on a file system) 

111 @classmethod 

112 def generate(cls, filename: Optional[Path] = None) -> "CoseKeyForEdhoc": 

113 """Generate a key inside a file 

114 

115 This returns the generated private key. 

116 """ 

117 

118 from cryptography.hazmat.primitives.asymmetric import ec 

119 

120 key = ec.generate_private_key(curve=ec.SECP256R1()) 

121 

122 s = cls() 

123 s.kty = 2 # EC 

124 s.crv = 1 # P-256 

125 s.d = key.private_numbers().private_value.to_bytes(32, "big") 

126 

127 if filename is not None: 

128 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL 

129 if hasattr(os, "O_BINARY"): 

130 flags |= os.O_BINARY 

131 descriptor = os.open(filename, flags, mode=0o600) 

132 try: 

133 with open(descriptor, "wb") as keyfile: 

134 cbor2.dump(s.secret_to_map(), keyfile) 

135 except Exception: 

136 filename.unlink() 

137 raise 

138 

139 return s 

140 

141 def as_ccs( 

142 self, kid: Optional[bytes], subject: Optional[str] 

143 ) -> Dict[Literal[14], dict]: 

144 """Given a key, generate a corresponding KCCS""" 

145 

146 from cryptography.hazmat.primitives.asymmetric import ec 

147 

148 private = ec.derive_private_key(int.from_bytes(self.d, "big"), ec.SECP256R1()) 

149 public = private.public_key() 

150 

151 x = public.public_numbers().x.to_bytes(32, "big") 

152 y = public.public_numbers().y.to_bytes(32, "big") 

153 # kty: EC2, crv: P-256, x, y 

154 cosekey = {1: 2, -1: 1, -2: x, -3: y} 

155 if kid is not None: 

156 cosekey[2] = kid 

157 # cnf: COSE_Key 

158 credential_kccs: dict = {8: {1: cosekey}} 

159 if subject is not None: 

160 credential_kccs[2] = subject 

161 

162 # kccs: cnf 

163 return {14: credential_kccs} 

164 

165 

166class GreaseSettings: 

167 """Object that is carried along for an EDHOC exchange to decide whether to 

168 apply GREASE. 

169 

170 Having such an object allows control over how frequently (or even how) to 

171 apply GREASE. Currently, it implements just the recommended pattern of 

172 I-D.ietf-lake-edhoc-grease-01, which includes applying GREASE whenever an 

173 unprocessed option is found. 

174 """ 

175 

176 def __init__(self): 

177 # 6 bit have the 1/64 chance of being all 0 

178 self.bits = 6 

179 

180 def should_grease(self): 

181 """Random function that returns True once every ``2^{self.bits}`` 

182 times.""" 

183 return random.getrandbits(self.bits) == 0 

184 

185 def grease_ead(self, eads: list[lakers.EADItem]): 

186 """Perform the limited-fingerprinting pattern of Section 2.1.1 of 

187 draft-ietf-lake-edhoc-grease-01.""" 

188 

189 if not self.should_grease(): 

190 return eads 

191 

192 label = random.choice(grease_labels) 

193 length = random.randrange(9, 41) 

194 

195 return [ 

196 *eads, 

197 lakers.EADItem(label, value=random.randbytes(length), is_critical=False), 

198 ] 

199 

200 def update_from_ead(self, eads: list[lakers.EADItem]): 

201 """Increment the likelihood of using own grease from the remaining 

202 incoming EAD items (after processable items have been removed). 

203 

204 This implementes the SHOULD of I-D.ietf-lake-edhoc-grease-01 Section 2.2.""" 

205 if len(eads): 

206 print(f"Found EAD items {eads}, going to 100%") 

207 self.bits = 0 

208 

209 

210class EdhocCredentials(credentials._Objectish): 

211 own_key: Optional[CoseKeyForEdhoc] 

212 suite: int 

213 method: int 

214 own_cred: Optional[dict] 

215 peer_cred: Optional[dict] 

216 

217 #: Whether the combined flow should be used when using this credential set. 

218 #: 

219 #: If unset or None, this the decision is left to the library (which at the 

220 #: time of writing always picks True). 

221 use_combined_edhoc: Optional[bool] 

222 

223 def __init__( 

224 self, 

225 suite: int, 

226 method: int, 

227 own_cred_style: Optional[str] = None, 

228 peer_cred: Optional[dict] = None, 

229 own_cred: Optional[dict] = None, 

230 private_key_file: Optional[str] = None, 

231 private_key: Optional[dict] = None, 

232 use_combined_edhoc: Optional[bool] = None, 

233 ): 

234 from . import edhoc 

235 

236 self.suite = suite 

237 self.method = method 

238 self.own_cred = own_cred 

239 self.peer_cred = peer_cred 

240 self.use_combined_edhoc = use_combined_edhoc 

241 

242 if private_key_file is not None and private_key is not None: 

243 raise credentials.CredentialsLoadError( 

244 "private_key is mutually exclusive with private_key_file" 

245 ) 

246 if private_key_file is not None: 

247 # FIXME: We should carry around a base 

248 private_key_path = Path(private_key_file) 

249 # FIXME: We left loading the file to the user, and now we're once more 

250 # in a position where we guess the file type 

251 self.own_key = CoseKeyForEdhoc.from_file(private_key_path) 

252 elif private_key is not None: 

253 self.own_key = CoseKeyForEdhoc.from_map(private_key) 

254 else: 

255 self.own_key = None 

256 

257 if own_cred_style is None: 

258 self.own_cred_style = None 

259 else: 

260 self.own_cred_style = edhoc.OwnCredStyle(own_cred_style) 

261 

262 if self.own_cred == {"unauthenticated": True}: 

263 if self.own_key is not None or self.own_cred_style not in ( 

264 None, 

265 OwnCredStyle.ByValue, 

266 ): 

267 raise credentials.CredentialsLoadError( 

268 "For local unauthenticated use, no key can be give, and own_cred_style needs to be by-value or absent" 

269 ) 

270 self.own_key = CoseKeyForEdhoc.generate() 

271 # FIXME: kid and subj should rather not be sent, but lakers insists on their presence 

272 self.own_cred = self.own_key.as_ccs(kid=b"?", subject="") 

273 self.own_cred_style = OwnCredStyle.ByValue 

274 else: 

275 if (own_cred is None) != (own_cred_style is None) or (own_cred is None) != ( 

276 self.own_key is None 

277 ): 

278 raise credentials.CredentialsLoadError( 

279 "If own credentials are given, all of own_cred, own_cred_style and private_key(_path) need to be given" 

280 ) 

281 

282 # FIXME: This is only used on the client side, and expects that all parts (own and peer) are present 

283 self._established_context = None 

284 

285 def find_edhoc_by_id_cred_peer(self, id_cred_peer): 

286 if self.peer_cred is None: 

287 return None 

288 if 14 not in self.peer_cred: 

289 # Only recognizing CCS so far 

290 return None 

291 

292 if id_cred_peer == cbor2.dumps(self.peer_cred, canonical=True): 

293 # credential by value 

294 return cbor2.dumps(self.peer_cred[14], canonical=True) 

295 

296 # cnf / COS_Key / kid, should be present in all CCS 

297 kid = self.peer_cred[14][8][1].get(2) 

298 if kid is not None and id_cred_peer == cbor2.dumps({4: kid}, canonical=True): 

299 # credential by kid 

300 return cbor2.dumps(self.peer_cred[14], canonical=True) 

301 

302 def peer_cred_is_unauthenticated(self): 

303 # FIXME: This is rather weird internal API, and rather weird 

304 # format-wise -- but it will suffice until credentials are rewritten. 

305 return self.peer_cred is not None and self.peer_cred == { 

306 "unauthenticated": True 

307 } 

308 

309 async def establish_context( 

310 self, 

311 wire, 

312 underlying_address, 

313 underlying_proxy_scheme, 

314 underlying_uri_host, 

315 logger, 

316 ): 

317 logger.info( 

318 "No OSCORE context found for EDHOC context %r, initiating one.", self 

319 ) 

320 

321 grease_settings = GreaseSettings() 

322 

323 # The semantic identifier (an arbitrary string) 

324 # 

325 # FIXME: We don't support role reversal yet, but once we 

326 # register this context to be available for incoming 

327 # requests, we'll have to pick more carefully 

328 c_i = bytes([random.randint(0, 23)]) 

329 initiator = lakers.EdhocInitiator() 

330 message_1 = initiator.prepare_message_1( 

331 c_i, 

332 # We could also send this depending on our configuration, ie. not 

333 # send it if we do expect credentials. But that'll also reveal to 

334 # an extent that we are ready to do TOFU, so sending it 

335 # unconditionally is a good first step, also because it allows 

336 # peers to migrate towards sending by reference as a default. 

337 grease_settings.grease_ead( 

338 [lakers.EADItem(EADLabel.CRED_BY_VALUE, is_critical=False)] 

339 ), 

340 ) 

341 

342 msg1 = Message( 

343 code=POST, 

344 proxy_scheme=underlying_proxy_scheme, 

345 uri_host=underlying_uri_host, 

346 uri_path=[".well-known", "edhoc"], 

347 payload=cbor2.dumps(True) + message_1, 

348 ) 

349 msg1.remote = underlying_address 

350 msg2 = await wire.request(msg1).response_raising 

351 

352 (c_r, id_cred_r, ead_2) = initiator.parse_message_2(msg2.payload) 

353 

354 new_ead = [] 

355 peer_requested_by_value = False 

356 for e in ead_2: 

357 if e.label() == EADLabel.CRED_BY_VALUE: 

358 peer_requested_by_value = True 

359 else: 

360 # Removing it from the new list not so much for is_critical, 

361 # for this item usually is not, but mostly for the 

362 # grease_settings 

363 new_ead.append(e) 

364 ead_2 = new_ead 

365 

366 if any(e.is_critical() for e in ead_2): 

367 self.log.error("Aborting EDHOC: Critical EAD2 present") 

368 raise error.BadRequest 

369 

370 grease_settings.update_from_ead(ead_2) 

371 

372 assert isinstance(self.own_cred, dict) and list(self.own_cred.keys()) == [14], ( 

373 "So far can only process CCS style own credentials a la {14: ...}, own_cred = %r" 

374 % self.own_cred 

375 ) 

376 cred_i = cbor2.dumps(self.own_cred[14], canonical=True) 

377 key_i = self.own_key.d 

378 

379 logger.debug("EDHOC responder sent message_2 with ID_CRED_R = %r", id_cred_r) 

380 if self.peer_cred_is_unauthenticated(): 

381 # Not doing further checks (eg. for trailing bytes) or re-raising: This 

382 # was already checked by lakers 

383 parsed = cbor2.loads(id_cred_r) 

384 

385 if 14 not in parsed: 

386 raise credentials.CredentialsMissingError( 

387 "Peer presented credential-by-reference (or anything else that's not a KCCS) when no credential was pre-agreed" 

388 ) 

389 

390 # We could also pick the [14] out of the serialized stream and 

391 # treat it as an opaque item, but cbor2 doesn't really do serialized items. 

392 cred_r = cbor2.dumps(parsed[14], canonical=True) 

393 else: 

394 # We could look into id_cred_r, which is a CBOR encoded 

395 # byte string, and could start comparing ... but actually 

396 # EDHOC and Lakers protect us from misbinding attacks (is 

397 # that what they are called?), so we can just put in our 

398 # expected credential here 

399 # 

400 # FIXME: But looking into it might give us a better error than just 

401 # "Mac2 verification failed" 

402 

403 # FIXME add assert on the structure or start doing the 

404 # generalization that'll fail at startup 

405 cred_r = cbor2.dumps(self.peer_cred[14], canonical=True) 

406 

407 initiator.verify_message_2( 

408 key_i, 

409 cred_i, 

410 cred_r, 

411 ) # odd that we provide that here rather than in the next function 

412 

413 logger.debug("Message 2 was verified") 

414 

415 secctx = EdhocInitiatorContext( 

416 initiator, 

417 c_i, 

418 c_r, 

419 lakers.CredentialTransfer.ByValue 

420 if peer_requested_by_value 

421 else self.own_cred_style, 

422 logger, 

423 grease_settings, 

424 ) 

425 

426 if self.use_combined_edhoc is not False: 

427 secctx.complete_without_message_4() 

428 # That's enough: Message 3 can well be sent along with the next 

429 # message. 

430 return secctx 

431 

432 logger.debug("Sending explicit message 3 without optimization") 

433 

434 message_3 = secctx._message_3 

435 assert message_3 is not None 

436 secctx._message_3 = None 

437 

438 if len(c_r) == 1 and (0 <= c_r[0] < 24 or 0x20 <= c_r[0] < 0x38): 

439 c_r_encoded = c_r 

440 else: 

441 c_r_encoded = cbor2.dumps(c_r) 

442 msg3 = Message( 

443 code=POST, 

444 proxy_scheme=underlying_proxy_scheme, 

445 uri_host=underlying_uri_host, 

446 uri_path=[".well-known", "edhoc"], 

447 payload=c_r_encoded + message_3, 

448 ) 

449 msg3.remote = underlying_address 

450 msg4 = await wire.request(msg3).response_raising 

451 

452 secctx.complete_with_message_4(msg4.payload) 

453 

454 logger.debug("Received message 4, context is ready") 

455 

456 return secctx 

457 

458 

459class _EdhocContextBase( 

460 oscore.CanProtect, oscore.CanUnprotect, oscore.SecurityContextUtils 

461): 

462 def __init__(self, logger, grease_settings): 

463 self.log = logger 

464 self.grease_settings = grease_settings 

465 

466 def post_seqnoincrease(self): 

467 # The context is not persisted 

468 pass 

469 

470 def protect(self, message, request_id=None, *, kid_context=True): 

471 outer_message, request_id = super().protect( 

472 message, request_id=request_id, kid_context=kid_context 

473 ) 

474 message_3 = self.message_3_to_include() 

475 if message_3 is not None: 

476 outer_message.opt.edhoc = True 

477 outer_message.payload = message_3 + outer_message.payload 

478 return outer_message, request_id 

479 

480 def _make_ready(self, edhoc_context, c_ours, c_theirs): 

481 # FIXME: both should offer this 

482 if ( 

483 isinstance(edhoc_context, lakers.EdhocResponder) 

484 or edhoc_context.selected_cipher_suite() == 2 

485 ): 

486 self.alg_aead = oscore.algorithms["AES-CCM-16-64-128"] 

487 self.hashfun = oscore.hashfunctions["sha256"] 

488 else: 

489 raise RuntimeError("Unknown suite") 

490 

491 # we did check for critical EADs, there was no out-of-band agreement, so 8 it is 

492 oscore_salt_length = 8 

493 # I figure that one would be ageed out-of-band as well (currently no 

494 # options to set/change this are known) 

495 self.id_context = None 

496 self.recipient_replay_window = oscore.ReplayWindow(32, lambda: None) 

497 

498 master_secret = edhoc_context.edhoc_exporter(0, [], self.alg_aead.key_bytes) 

499 master_salt = edhoc_context.edhoc_exporter(1, [], oscore_salt_length) 

500 

501 self.sender_id = c_theirs 

502 self.recipient_id = c_ours 

503 if self.sender_id == self.recipient_id: 

504 raise ValueError("Bad IDs: identical ones were picked") 

505 

506 self.derive_keys(master_salt, master_secret) 

507 

508 self.sender_sequence_number = 0 

509 self.recipient_replay_window.initialize_empty() 

510 

511 self.log.debug("EDHOC context %r ready for OSCORE operation", self) 

512 

513 @abc.abstractmethod 

514 def message_3_to_include(self) -> Optional[bytes]: 

515 """An encoded message_3 to include in outgoing messages 

516 

517 This may modify self to only return something once.""" 

518 

519 

520class EdhocInitiatorContext(_EdhocContextBase): 

521 """An OSCORE context that is derived from an EDHOC exchange. 

522 

523 It does not require that the EDHOC exchange has completed -- it can be set 

524 up by an initiator already when message 2 has been received, prepares a 

525 message 3 at setup time, and sends it with the first request that is sent 

526 through it.""" 

527 

528 # FIXME: Should we rather send it with *every* request that is sent before a message 4 is received implicitly? 

529 def __init__( 

530 self, initiator, c_ours, c_theirs, cred_i_mode, logger, grease_settings 

531 ): 

532 super().__init__(logger, grease_settings) 

533 

534 # Only this line is role specific 

535 self._message_3, _i_prk_out = initiator.prepare_message_3( 

536 cred_i_mode.as_lakers(), 

537 self.grease_settings.grease_ead([]), 

538 ) 

539 

540 self._incomplete = True 

541 self._init_details = (initiator, c_ours, c_theirs) 

542 

543 def complete_without_message_4(self) -> None: 

544 (initiator, c_ours, c_theirs) = self._init_details 

545 initiator.completed_without_message_4() 

546 self._make_ready(initiator, c_ours, c_theirs) 

547 self._incomplete = False 

548 self._init_details = None 

549 

550 def complete_with_message_4(self, message_4: bytes) -> None: 

551 (initiator, c_ours, c_theirs) = self._init_details 

552 initiator.process_message_4(message_4) 

553 self._make_ready(initiator, c_ours, c_theirs) 

554 self._incomplete = False 

555 self._init_details = None 

556 

557 def message_3_to_include(self) -> Optional[bytes]: 

558 if self._message_3 is not None: 

559 result = self._message_3 

560 self._message_3 = None 

561 return result 

562 return None 

563 

564 

565class EdhocResponderContext(_EdhocContextBase): 

566 def __init__( 

567 self, responder, c_i, c_r, server_credentials, logger, grease_settings 

568 ): 

569 super().__init__(logger, grease_settings) 

570 

571 # storing them where they will later be overwritten with themselves 

572 self.recipient_id = c_r 

573 self.sender_id = c_i 

574 

575 self._responder = responder 

576 # Through these we'll look up id_cred_i 

577 self._server_credentials = server_credentials 

578 

579 self.authenticated_claims = [] 

580 

581 # Not sure why mypy even tolerates this -- we're clearly not ready for 

582 # a general protect/unprotect, and things only work because all 

583 # relevant functions get their checks introduced 

584 self._incomplete = True 

585 

586 self._message_4 = None 

587 

588 def message_3_to_include(self) -> Optional[bytes]: 

589 # as a responder we never send one 

590 return None 

591 

592 def get_oscore_context_for(self, unprotected): 

593 if oscore.COSE_KID_CONTEXT in unprotected: 

594 return None 

595 if unprotected.get(oscore.COSE_KID) == self.recipient_id: 

596 return self 

597 

598 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

599 return set((self.recipient_id,)) 

600 

601 def protect(self, *args, **kwargs): 

602 if self._incomplete: 

603 raise RuntimeError( 

604 "EDHOC has not completed yet, waiting for message 3, can not protect own messages yet" 

605 ) 

606 return super().protect(*args, **kwargs) 

607 

608 def unprotect(self, protected_message, request_id=None): 

609 if self._incomplete: 

610 if not protected_message.opt.edhoc: 

611 self.log.error( 

612 "OSCORE failed: No EDHOC message 3 received and none present" 

613 ) 

614 raise error.BadRequest("EDHOC incomplete") 

615 

616 payload_stream = io.BytesIO(protected_message.payload) 

617 # discarding result -- just need to have a point to split 

618 _ = cbor2.load(payload_stream) 

619 m3len = payload_stream.tell() 

620 message_3 = protected_message.payload[:m3len] 

621 

622 self._offer_message_3(message_3) 

623 

624 protected_message = protected_message.copy( 

625 edhoc=False, payload=protected_message.payload[m3len:] 

626 ) 

627 

628 return super().unprotect(protected_message, request_id) 

629 

630 def _offer_message_3(self, message_3: bytes) -> bytes: 

631 """Send in a message 3, obtain a message 4 

632 

633 It is safe to discard the output, eg. when the CoAP EDHOC option is 

634 used.""" 

635 if self._incomplete: 

636 id_cred_i, ead_3 = self._responder.parse_message_3(message_3) 

637 if any(e.is_critical() for e in ead_3): 

638 self.log.error("Aborting EDHOC: Critical EAD3 present") 

639 raise error.BadRequest 

640 self.grease_settings.update_from_ead(ead_3) 

641 

642 try: 

643 (cred_i, claims) = self._server_credentials.find_edhoc_by_id_cred_peer( 

644 id_cred_i 

645 ) 

646 except KeyError: 

647 self.log.error( 

648 "Aborting EDHOC: No credentials found for client with id_cred_i=h'%s'", 

649 id_cred_i.hex(), 

650 ) 

651 raise error.BadRequest 

652 

653 self.authenticated_claims.extend(claims) 

654 

655 self._responder.verify_message_3(cred_i) 

656 self._message_4 = self._responder.prepare_message_4( 

657 self.grease_settings.grease_ead([]) 

658 ) 

659 

660 self._make_ready(self._responder, self.recipient_id, self.sender_id) 

661 self._incomplete = False 

662 

663 return self._message_4 

664 

665 

666class OwnCredStyle(enum.Enum): 

667 """Guidance for how the own credential should be sent in an EDHOC 

668 exchange""" 

669 

670 ByKeyId = "by-key-id" 

671 ByValue = "by-value" 

672 

673 def as_lakers(self): 

674 """Convert the enum into Lakers' reepresentation of the same concept. 

675 

676 The types may eventually be unified, but so far, Lakers doesn't make 

677 the distinctions we expect to make yet.""" 

678 if self == self.ByKeyId: 

679 # FIXME: Mismatch to be fixed in lakers -- currently the only way 

680 # it allows sending by reference is by Key ID 

681 return lakers.CredentialTransfer.ByReference 

682 if self == self.ByValue: 

683 return lakers.CredentialTransfer.ByValue 

684 else: 

685 raise RuntimeError("enum variant not covered")