Coverage for aiocoap/edhoc.py: 83%

258 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-28 12:34 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""Internal module containing types used inside EDHOC security contexts""" 

6 

7import abc 

8import enum 

9import io 

10from pathlib import Path 

11import random 

12from typing import Optional, Dict, Literal 

13import os 

14 

15import cbor2 

16import lakers 

17 

18from . import oscore, credentials, error 

19from . import Message 

20from .numbers import POST 

21 

22 

23def load_cbor_or_edn(filename: Path): 

24 """Common heuristic for whether something is CBOR or EDN""" 

25 import cbor_diag 

26 import cbor2 

27 

28 with filename.open("rb") as binary: 

29 try: 

30 result = cbor2.load(binary) 

31 except cbor2.CBORDecodeError: 

32 pass 

33 else: 

34 if binary.read(1) == b"": 

35 return result 

36 # else it apparently hasn't been CBOR all through... 

37 with filename.open() as textual: 

38 try: 

39 converted = cbor_diag.diag2cbor(textual.read()) 

40 except ValueError: 

41 raise credentials.CredentialsLoadError( 

42 "Data loaded from %s was recognized neither as CBOR nor CBOR Diagnostic Notation (EDN)" 

43 % filename 

44 ) 

45 # no need to check for completeness: diag2cbor doesn't do diagnostic 

46 # sequences, AIU that's not even a thing 

47 return cbor2.loads(converted) 

48 

49 

50class CoseKeyForEdhoc: 

51 kty: int 

52 crv: int 

53 d: bytes 

54 

55 @classmethod 

56 def from_file(cls, filename: Path) -> "CoseKeyForEdhoc": 

57 """Load a key from a file (in CBOR or EDN), asserting that the file is not group/world readable""" 

58 if filename.stat().st_mode & 0o077 != 0: 

59 raise credentials.CredentialsLoadError( 

60 "Refusing to load private key that is group or world accessible" 

61 ) 

62 

63 loaded = load_cbor_or_edn(filename) 

64 return cls.from_map(loaded) 

65 

66 @classmethod 

67 def from_map(cls, key: dict) -> "CoseKeyForEdhoc": 

68 if not isinstance(key, dict): 

69 raise credentials.CredentialsLoadError( 

70 "Data is not shaped like COSE_KEY (expected top-level dictionary)" 

71 ) 

72 if 1 not in key: 

73 raise credentials.CredentialsLoadError( 

74 "Data is not shaped like COSE_KEY (expected key 1 (kty) in top-level dictionary)" 

75 ) 

76 if key[1] != 2: 

77 raise credentials.CredentialsLoadError( 

78 "Private key type %s is not supported (currently only 2 (EC) is supported)" 

79 % (key[1],) 

80 ) 

81 

82 if key.get(-1) != 1: 

83 raise credentials.CredentialsLoadError( 

84 "Private key of type EC requires key -1 (crv), currently supported values: 1 (P-256)" 

85 ) 

86 

87 if not isinstance(key.get(-4), bytes) or len(key[-4]) != 32: 

88 raise credentials.CredentialsLoadError( 

89 "Private key of type EC P-256 requires key -4 (d) to be a 32-byte long byte string" 

90 ) 

91 

92 if any(k not in (1, -1, -4) for k in key): 

93 raise credentials.CredentialsLoadError( 

94 "Extraneous data in key, consider allow-listing the item if acceptable" 

95 ) 

96 

97 s = cls() 

98 s.kty = 2 # EC 

99 s.crv = 1 # P-256 

100 s.d = key[-4] 

101 

102 return s 

103 

104 def secret_to_map(self) -> dict: 

105 # kty: EC, crv: P-256, d: ... 

106 return {1: self.kty, -1: self.crv, -4: self.d} 

107 

108 # Should we deprecate filename, add a generate_in_file method? (It's there 

109 # because generate originally depended on a file system) 

110 @classmethod 

111 def generate(cls, filename: Optional[Path] = None) -> "CoseKeyForEdhoc": 

112 """Generate a key inside a file 

113 

114 This returns the generated private key. 

115 """ 

116 

117 from cryptography.hazmat.primitives.asymmetric import ec 

118 

119 key = ec.generate_private_key(curve=ec.SECP256R1()) 

120 

121 s = cls() 

122 s.kty = 2 # EC 

123 s.crv = 1 # P-256 

124 s.d = key.private_numbers().private_value.to_bytes(32, "big") 

125 

126 if filename is not None: 

127 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL 

128 if hasattr(os, "O_BINARY"): 

129 flags |= os.O_BINARY 

130 descriptor = os.open(filename, flags, mode=0o600) 

131 try: 

132 with open(descriptor, "wb") as keyfile: 

133 cbor2.dump(s.secret_to_map(), keyfile) 

134 except Exception: 

135 filename.unlink() 

136 raise 

137 

138 return s 

139 

140 def as_ccs( 

141 self, kid: Optional[bytes], subject: Optional[str] 

142 ) -> Dict[Literal[14], dict]: 

143 """Given a key, generate a corresponding KCCS""" 

144 

145 from cryptography.hazmat.primitives.asymmetric import ec 

146 

147 private = ec.derive_private_key(int.from_bytes(self.d, "big"), ec.SECP256R1()) 

148 public = private.public_key() 

149 

150 x = public.public_numbers().x.to_bytes(32, "big") 

151 y = public.public_numbers().y.to_bytes(32, "big") 

152 # kty: EC2, crv: P-256, x, y 

153 cosekey = {1: 2, -1: 1, -2: x, -3: y} 

154 if kid is not None: 

155 cosekey[2] = kid 

156 # cnf: COSE_Key 

157 credential_kccs: dict = {8: {1: cosekey}} 

158 if subject is not None: 

159 credential_kccs[2] = subject 

160 

161 # kccs: cnf 

162 return {14: credential_kccs} 

163 

164 

165class EdhocCredentials(credentials._Objectish): 

166 own_key: Optional[CoseKeyForEdhoc] 

167 suite: int 

168 method: int 

169 own_cred: Optional[dict] 

170 peer_cred: Optional[dict] 

171 

172 def __init__( 

173 self, 

174 suite: int, 

175 method: int, 

176 own_cred_style: Optional[str] = None, 

177 peer_cred: Optional[dict] = None, 

178 own_cred: Optional[dict] = None, 

179 private_key_file: Optional[str] = None, 

180 private_key: Optional[dict] = None, 

181 ): 

182 from . import edhoc 

183 

184 self.suite = suite 

185 self.method = method 

186 self.own_cred = own_cred 

187 self.peer_cred = peer_cred 

188 

189 if private_key_file is not None and private_key is not None: 

190 raise credentials.CredentialsLoadError( 

191 "private_key is mutually exclusive with private_key_file" 

192 ) 

193 if private_key_file is not None: 

194 # FIXME: We should carry around a base 

195 private_key_path = Path(private_key_file) 

196 # FIXME: We left loading the file to the user, and now we're once more 

197 # in a position where we guess the file type 

198 self.own_key = CoseKeyForEdhoc.from_file(private_key_path) 

199 elif private_key is not None: 

200 self.own_key = CoseKeyForEdhoc.from_map(private_key) 

201 else: 

202 self.own_key = None 

203 

204 if own_cred_style is None: 

205 self.own_cred_style = None 

206 else: 

207 self.own_cred_style = edhoc.OwnCredStyle(own_cred_style) 

208 

209 if self.own_cred == {"unauthenticated": True}: 

210 if self.own_key is not None or self.own_cred_style not in ( 

211 None, 

212 OwnCredStyle.ByValue, 

213 ): 

214 raise credentials.CredentialsLoadError( 

215 "For local unauthenticated use, no key can be give, and own_cred_style needs to be by-value or absent" 

216 ) 

217 self.own_key = CoseKeyForEdhoc.generate() 

218 # FIXME: kid and subj should rather not be sent, but lakers insists on their presence 

219 self.own_cred = self.own_key.as_ccs(kid=b"?", subject="") 

220 self.own_cred_style = OwnCredStyle.ByValue 

221 else: 

222 if (own_cred is None) != (own_cred_style is None) or (own_cred is None) != ( 

223 self.own_key is None 

224 ): 

225 raise credentials.CredentialsLoadError( 

226 "If own credentials are given, all of own_cred, own_cred_style and private_key(_path) need to be given" 

227 ) 

228 

229 # FIXME: This is only used on the client side, and expects that all parts (own and peer) are present 

230 self._established_context = None 

231 

232 def find_edhoc_by_id_cred_peer(self, id_cred_peer): 

233 if self.peer_cred is None: 

234 return None 

235 if 14 not in self.peer_cred: 

236 # Only recognizing CCS so far 

237 return None 

238 

239 if id_cred_peer == cbor2.dumps(self.peer_cred, canonical=True): 

240 # credential by value 

241 return cbor2.dumps(self.peer_cred[14], canonical=True) 

242 

243 # cnf / COS_Key / kid, should be present in all CCS 

244 kid = self.peer_cred[14][8][1].get(2) 

245 if kid is not None and id_cred_peer == cbor2.dumps({4: kid}, canonical=True): 

246 # credential by kid 

247 return cbor2.dumps(self.peer_cred[14], canonical=True) 

248 

249 def peer_cred_is_unauthenticated(self): 

250 # FIXME: This is rather weird internal API, and rather weird 

251 # format-wise -- but it will suffice until credentials are rewritten. 

252 return self.peer_cred is not None and self.peer_cred == { 

253 "unauthenticated": True 

254 } 

255 

256 async def establish_context( 

257 self, 

258 wire, 

259 underlying_address, 

260 underlying_proxy_scheme, 

261 underlying_uri_host, 

262 logger, 

263 ): 

264 logger.info( 

265 "No OSCORE context found for EDHOC context %r, initiating one.", self 

266 ) 

267 # FIXME: We don't support role reversal yet, but once we 

268 # register this context to be available for incoming 

269 # requests, we'll have to pick more carefully 

270 c_i = bytes([random.randint(0, 23)]) 

271 initiator = lakers.EdhocInitiator() 

272 message_1 = initiator.prepare_message_1(c_i) 

273 

274 msg1 = Message( 

275 code=POST, 

276 proxy_scheme=underlying_proxy_scheme, 

277 uri_host=underlying_uri_host, 

278 uri_path=[".well-known", "edhoc"], 

279 payload=cbor2.dumps(True) + message_1, 

280 ) 

281 msg1.remote = underlying_address 

282 msg2 = await wire.request(msg1).response_raising 

283 

284 (c_r, id_cred_r, ead_2) = initiator.parse_message_2(msg2.payload) 

285 

286 assert isinstance(self.own_cred, dict) and list(self.own_cred.keys()) == [14], ( 

287 "So far can only process CCS style own credentials a la {14: ...}, own_cred = %r" 

288 % self.own_cred 

289 ) 

290 cred_i = cbor2.dumps(self.own_cred[14], canonical=True) 

291 key_i = self.own_key.d 

292 

293 logger.debug("EDHOC responder sent message_2 with ID_CRED_R = %r", id_cred_r) 

294 if self.peer_cred_is_unauthenticated(): 

295 # Not doing further checks (eg. for trailing bytes) or re-raising: This 

296 # was already checked by lakers 

297 parsed = cbor2.loads(id_cred_r) 

298 

299 if 14 not in parsed: 

300 raise credentials.CredentialsMissingError( 

301 "Peer presented credential-by-reference (or anything else that's not a KCCS) when no credential was pre-agreed" 

302 ) 

303 

304 # We could also pick the [14] out of the serialized stream and 

305 # treat it as an opaque item, but cbor2 doesn't really do serialized items. 

306 cred_r = cbor2.dumps(parsed[14], canonical=True) 

307 else: 

308 # We could look into id_cred_r, which is a CBOR encoded 

309 # byte string, and could start comparing ... but actually 

310 # EDHOC and Lakers protect us from misbinding attacks (is 

311 # that what they are called?), so we can just put in our 

312 # expected credential here 

313 # 

314 # FIXME: But looking into it might give us a better error than just 

315 # "Mac2 verification failed" 

316 

317 # FIXME add assert on the structure or start doing the 

318 # generalization that'll fail at startup 

319 cred_r = cbor2.dumps(self.peer_cred[14], canonical=True) 

320 

321 initiator.verify_message_2( 

322 key_i, 

323 cred_i, 

324 cred_r, 

325 ) # odd that we provide that here rather than in the next function 

326 

327 logger.debug("Message 2 was verified") 

328 

329 return EdhocInitiatorContext(initiator, c_i, c_r, self.own_cred_style, logger) 

330 

331 

332class _EdhocContextBase( 

333 oscore.CanProtect, oscore.CanUnprotect, oscore.SecurityContextUtils 

334): 

335 def __init__(self, logger): 

336 self.log = logger 

337 

338 def post_seqnoincrease(self): 

339 # The context is not persisted 

340 pass 

341 

342 def protect(self, message, request_id=None, *, kid_context=True): 

343 outer_message, request_id = super().protect( 

344 message, request_id=request_id, kid_context=kid_context 

345 ) 

346 message_3 = self.message_3_to_include() 

347 if message_3 is not None: 

348 outer_message.opt.edhoc = True 

349 outer_message.payload = message_3 + outer_message.payload 

350 return outer_message, request_id 

351 

352 def _make_ready(self, edhoc_context, c_ours, c_theirs): 

353 # only in lakers >= 0.5 that is present 

354 if hasattr(edhoc_context, "completed_without_message_4"): 

355 edhoc_context.completed_without_message_4() 

356 

357 # FIXME: both should offer this 

358 if ( 

359 isinstance(edhoc_context, lakers.EdhocResponder) 

360 or edhoc_context.selected_cipher_suite() == 2 

361 ): 

362 self.alg_aead = oscore.algorithms["AES-CCM-16-64-128"] 

363 self.hashfun = oscore.hashfunctions["sha256"] 

364 else: 

365 raise RuntimeError("Unknown suite") 

366 

367 # we did check for critical EADs, there was no out-of-band agreement, so 8 it is 

368 oscore_salt_length = 8 

369 # I figure that one would be ageed out-of-band as well (currently no 

370 # options to set/change this are known) 

371 self.id_context = None 

372 self.recipient_replay_window = oscore.ReplayWindow(32, lambda: None) 

373 

374 master_secret = edhoc_context.edhoc_exporter(0, [], self.alg_aead.key_bytes) 

375 master_salt = edhoc_context.edhoc_exporter(1, [], oscore_salt_length) 

376 

377 self.sender_id = c_theirs 

378 self.recipient_id = c_ours 

379 if self.sender_id == self.recipient_id: 

380 raise ValueError("Bad IDs: identical ones were picked") 

381 

382 self.derive_keys(master_salt, master_secret) 

383 

384 self.sender_sequence_number = 0 

385 self.recipient_replay_window.initialize_empty() 

386 

387 self.log.debug("EDHOC context %r ready for OSCORE operation", self) 

388 

389 @abc.abstractmethod 

390 def message_3_to_include(self) -> Optional[bytes]: 

391 """An encoded message_3 to include in outgoing messages 

392 

393 This may modify self to only return something once.""" 

394 

395 

396class EdhocInitiatorContext(_EdhocContextBase): 

397 """An OSCORE context that is derived from an EDHOC exchange. 

398 

399 It does not require that the EDHOC exchange has completed -- it can be set 

400 up by an initiator already when message 2 has been received, prepares a 

401 message 3 at setup time, and sends it with the first request that is sent 

402 through it.""" 

403 

404 # FIXME: Should we rather send it with *every* request that is sent before a message 4 is received implicitly? 

405 def __init__(self, initiator, c_ours, c_theirs, cred_i_mode, logger): 

406 super().__init__(logger) 

407 

408 # Only this line is role specific 

409 self._message_3, _i_prk_out = initiator.prepare_message_3( 

410 cred_i_mode.as_lakers(), None 

411 ) 

412 

413 self._make_ready(initiator, c_ours, c_theirs) 

414 

415 def message_3_to_include(self) -> Optional[bytes]: 

416 if self._message_3 is not None: 

417 result = self._message_3 

418 self._message_3 = None 

419 return result 

420 return None 

421 

422 

423class EdhocResponderContext(_EdhocContextBase): 

424 def __init__(self, responder, c_i, c_r, server_credentials, logger): 

425 super().__init__(logger) 

426 

427 # storing them where they will later be overwritten with themselves 

428 self.recipient_id = c_r 

429 self.sender_id = c_i 

430 

431 self._responder = responder 

432 # Through these we'll look up id_cred_i 

433 self._server_credentials = server_credentials 

434 

435 self.authenticated_claims = [] 

436 

437 # Not sure why mypy even tolerates this -- we're clearly not ready for 

438 # a general protect/unprotect, and things only work because all 

439 # relevant functions get their checks introduced 

440 self._incomplete = True 

441 

442 def message_3_to_include(self) -> Optional[bytes]: 

443 # as a responder we never send one 

444 return None 

445 

446 def get_oscore_context_for(self, unprotected): 

447 if oscore.COSE_KID_CONTEXT in unprotected: 

448 return None 

449 if unprotected.get(oscore.COSE_KID) == self.recipient_id: 

450 return self 

451 

452 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

453 return set((self.recipient_id,)) 

454 

455 def protect(self, *args, **kwargs): 

456 if self._incomplete: 

457 raise RuntimeError( 

458 "EDHOC has not completed yet, waiting for message 3, can not protect own messages yet" 

459 ) 

460 return super().protect(*args, **kwargs) 

461 

462 def unprotect(self, protected_message, request_id=None): 

463 if self._incomplete: 

464 if not protected_message.opt.edhoc: 

465 self.log.error( 

466 "OSCORE failed: No EDHOC message 3 received and none present" 

467 ) 

468 raise error.BadRequest("EDHOC incomplete") 

469 

470 payload_stream = io.BytesIO(protected_message.payload) 

471 # discarding result -- just need to have a point to split 

472 _ = cbor2.load(payload_stream) 

473 m3len = payload_stream.tell() 

474 message_3 = protected_message.payload[:m3len] 

475 

476 id_cred_i, ead_3 = self._responder.parse_message_3(message_3) 

477 if ead_3 is not None: 

478 self.log.error("Aborting EDHOC: EAD3 present") 

479 raise error.BadRequest 

480 

481 try: 

482 (cred_i, claims) = self._server_credentials.find_edhoc_by_id_cred_peer( 

483 id_cred_i 

484 ) 

485 except KeyError: 

486 self.log.error( 

487 "Aborting EDHOC: No credentials found for client with id_cred_i=h'%s'", 

488 id_cred_i.hex(), 

489 ) 

490 raise error.BadRequest 

491 

492 self.authenticated_claims.extend(claims) 

493 

494 self._responder.verify_message_3(cred_i) 

495 

496 self._make_ready(self._responder, self.recipient_id, self.sender_id) 

497 self._incomplete = False 

498 

499 protected_message = protected_message.copy( 

500 edhoc=False, payload=protected_message.payload[m3len:] 

501 ) 

502 

503 return super().unprotect(protected_message, request_id) 

504 

505 

506class OwnCredStyle(enum.Enum): 

507 """Guidance for how the own credential should be sent in an EDHOC 

508 exchange""" 

509 

510 ByKeyId = "by-key-id" 

511 ByValue = "by-value" 

512 

513 def as_lakers(self): 

514 """Convert the enum into Lakers' reepresentation of the same concept. 

515 

516 The types may eventually be unified, but so far, Lakers doesn't make 

517 the distinctions we expect to make yet.""" 

518 if self == self.ByKeyId: 

519 # FIXME: Mismatch to be fixed in lakers -- currently the only way 

520 # it allows sending by reference is by Key ID 

521 return lakers.CredentialTransfer.ByReference 

522 if self == self.ByValue: 

523 return lakers.CredentialTransfer.ByValue 

524 else: 

525 raise RuntimeError("enum variant not covered")