Coverage for aiocoap/edhoc.py: 83%

250 statements  

« prev     ^ index     » next       coverage.py v7.6.3, created at 2024-10-15 22:10 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""Internal module containing types used inside EDHOC security contexts""" 

6 

7import abc 

8import enum 

9import io 

10from pathlib import Path 

11import random 

12from typing import Optional, Dict, Literal 

13import os 

14 

15import cbor2 

16import lakers 

17 

18from . import oscore, credentials, error 

19from . import Message 

20from .numbers import POST 

21 

22 

23def load_cbor_or_edn(filename: Path): 

24 """Common heuristic for whether something is CBOR or EDN""" 

25 import cbor_diag 

26 import cbor2 

27 

28 with filename.open("rb") as binary: 

29 try: 

30 result = cbor2.load(binary) 

31 except cbor2.CBORDecodeError: 

32 pass 

33 else: 

34 if binary.read(1) == b"": 

35 return result 

36 # else it apparently hasn't been CBOR all through... 

37 with filename.open() as textual: 

38 try: 

39 converted = cbor_diag.diag2cbor(textual.read()) 

40 except ValueError: 

41 raise credentials.CredentialsLoadError( 

42 "Data loaded from %s was recognized neither as CBOR nor CBOR Diagnostic Notation (EDN)" 

43 % filename 

44 ) 

45 # no need to check for completeness: diag2cbor doesn't do diagnostic 

46 # sequences, AIU that's not even a thing 

47 return cbor2.loads(converted) 

48 

49 

50class CoseKeyForEdhoc: 

51 kty: int 

52 crv: int 

53 d: bytes 

54 

55 @classmethod 

56 def from_file(cls, filename: Path) -> "CoseKeyForEdhoc": 

57 """Load a key from a file (in CBOR or EDN), asserting that the file is not group/world readable""" 

58 if filename.stat().st_mode & 0o077 != 0: 

59 raise credentials.CredentialsLoadError( 

60 "Refusing to load private key that is group or world accessible" 

61 ) 

62 

63 loaded = load_cbor_or_edn(filename) 

64 return cls.from_map(loaded) 

65 

66 @classmethod 

67 def from_map(cls, key: dict) -> "CoseKeyForEdhoc": 

68 if not isinstance(key, dict): 

69 raise credentials.CredentialsLoadError( 

70 "Data is not shaped like COSE_KEY (expected top-level dictionary)" 

71 ) 

72 if 1 not in key: 

73 raise credentials.CredentialsLoadError( 

74 "Data is not shaped like COSE_KEY (expected key 1 (kty) in top-level dictionary)" 

75 ) 

76 if key[1] != 2: 

77 raise credentials.CredentialsLoadError( 

78 "Private key type %s is not supported (currently only 2 (EC) is supported)" 

79 % (key[1],) 

80 ) 

81 

82 if key.get(-1) != 1: 

83 raise credentials.CredentialsLoadError( 

84 "Private key of type EC requires key -1 (crv), currently supported values: 1 (P-256)" 

85 ) 

86 

87 if not isinstance(key.get(-4), bytes) or len(key[-4]) != 32: 

88 raise credentials.CredentialsLoadError( 

89 "Private key of type EC P-256 requires key -4 (d) to be a 32-byte long byte string" 

90 ) 

91 

92 if any(k not in (1, -1, -4) for k in key): 

93 raise credentials.CredentialsLoadError( 

94 "Extraneous data in key, consider allow-listing the item if acceptable" 

95 ) 

96 

97 s = cls() 

98 s.kty = 2 # EC 

99 s.crv = 1 # P-256 

100 s.d = key[-4] 

101 

102 return s 

103 

104 def secret_to_map(self) -> dict: 

105 # kty: EC, crv: P-256, d: ... 

106 return {1: self.kty, -1: self.crv, -4: self.d} 

107 

108 # Should we deprecate filename, add a generate_in_file method? (It's there 

109 # because generate originally depended on a file system) 

110 @classmethod 

111 def generate(cls, filename: Optional[Path] = None) -> "CoseKeyForEdhoc": 

112 """Generate a key inside a file 

113 

114 This returns the generated private key. 

115 """ 

116 

117 from cryptography.hazmat.primitives.asymmetric import ec 

118 

119 key = ec.generate_private_key(curve=ec.SECP256R1()) 

120 

121 s = cls() 

122 s.kty = 2 # EC 

123 s.crv = 1 # P-256 

124 s.d = key.private_numbers().private_value.to_bytes(32, "big") 

125 

126 if filename is not None: 

127 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL 

128 if hasattr(os, "O_BINARY"): 

129 flags |= os.O_BINARY 

130 descriptor = os.open(filename, flags, mode=0o600) 

131 try: 

132 with open(descriptor, "wb") as keyfile: 

133 cbor2.dump(s.secret_to_map(), keyfile) 

134 except Exception: 

135 filename.unlink() 

136 raise 

137 

138 return s 

139 

140 def as_ccs( 

141 self, kid: Optional[bytes], subject: Optional[str] 

142 ) -> Dict[Literal[14], dict]: 

143 """Given a key, generate a corresponding KCCS""" 

144 

145 from cryptography.hazmat.primitives.asymmetric import ec 

146 

147 private = ec.derive_private_key(int.from_bytes(self.d, "big"), ec.SECP256R1()) 

148 public = private.public_key() 

149 

150 x = public.public_numbers().x.to_bytes(32, "big") 

151 y = public.public_numbers().y.to_bytes(32, "big") 

152 # kty: EC2, crv: P-256, x, y 

153 cosekey = {1: 2, -1: 1, -2: x, -3: y} 

154 if kid is not None: 

155 cosekey[2] = kid 

156 # cnf: COSE_Key 

157 credential_kccs: dict = {8: {1: cosekey}} 

158 if subject is not None: 

159 credential_kccs[2] = subject 

160 

161 # kccs: cnf 

162 return {14: credential_kccs} 

163 

164 

165class EdhocCredentials(credentials._Objectish): 

166 own_key: Optional[CoseKeyForEdhoc] 

167 suite: int 

168 method: int 

169 own_cred: Optional[dict] 

170 peer_cred: Optional[dict] 

171 

172 def __init__( 

173 self, 

174 suite: int, 

175 method: int, 

176 own_cred_style: Optional[str] = None, 

177 peer_cred: Optional[dict] = None, 

178 own_cred: Optional[dict] = None, 

179 private_key_file: Optional[str] = None, 

180 private_key: Optional[dict] = None, 

181 ): 

182 from . import edhoc 

183 

184 self.suite = suite 

185 self.method = method 

186 self.own_cred = own_cred 

187 self.peer_cred = peer_cred 

188 

189 if private_key_file is not None and private_key is not None: 

190 raise credentials.CredentialsLoadError( 

191 "private_key is mutually exclusive with private_key_file" 

192 ) 

193 if private_key_file is not None: 

194 # FIXME: We should carry around a base 

195 private_key_path = Path(private_key_file) 

196 # FIXME: We left loading the file to the user, and now we're once more 

197 # in a position where we guess the file type 

198 self.own_key = CoseKeyForEdhoc.from_file(private_key_path) 

199 elif private_key is not None: 

200 self.own_key = CoseKeyForEdhoc.from_map(private_key) 

201 else: 

202 self.own_key = None 

203 

204 if (own_cred is None) != (own_cred_style is None) or (own_cred is None) != ( 

205 self.own_key is None 

206 ): 

207 raise credentials.CredentialsLoadError( 

208 "If own credentials are given, all of own_cred, own_cred_style and private_key(_path) need to be given" 

209 ) 

210 

211 if own_cred_style is None: 

212 self.own_cred_style = None 

213 else: 

214 self.own_cred_style = edhoc.OwnCredStyle(own_cred_style) 

215 

216 # FIXME: This is only used on the client side, and expects that all parts (own and peer) are present 

217 self._established_context = None 

218 

219 def find_edhoc_by_id_cred_peer(self, id_cred_peer): 

220 if self.peer_cred is None: 

221 return None 

222 if 14 not in self.peer_cred: 

223 # Only recognizing CCS so far 

224 return None 

225 

226 if id_cred_peer == cbor2.dumps(self.peer_cred[14], canonical=True): 

227 # credential by value 

228 return cbor2.dumps(self.peer_cred[14], canonical=True) 

229 

230 # cnf / COS_Key / kid, should be present in all CCS 

231 kid = self.peer_cred[14][8][1].get(2) 

232 if kid is not None and id_cred_peer == kid: 

233 # credential by kid 

234 return cbor2.dumps(self.peer_cred[14], canonical=True) 

235 

236 def peer_cred_is_unauthenticated(self): 

237 # FIXME: This is rather weird internal API, and rather weird 

238 # format-wise -- but it will suffice until credentials are rewritten. 

239 return self.peer_cred is not None and self.peer_cred == { 

240 "unauthenticated": True 

241 } 

242 

243 async def establish_context( 

244 self, 

245 wire, 

246 underlying_address, 

247 underlying_proxy_scheme, 

248 underlying_uri_host, 

249 logger, 

250 ): 

251 logger.info( 

252 "No OSCORE context found for EDHOC context %r, initiating one.", self 

253 ) 

254 # FIXME: We don't support role reversal yet, but once we 

255 # register this context to be available for incoming 

256 # requests, we'll have to pick more carefully 

257 c_i = bytes([random.randint(0, 23)]) 

258 initiator = lakers.EdhocInitiator() 

259 message_1 = initiator.prepare_message_1(c_i) 

260 

261 msg1 = Message( 

262 code=POST, 

263 proxy_scheme=underlying_proxy_scheme, 

264 uri_host=underlying_uri_host, 

265 uri_path=[".well-known", "edhoc"], 

266 payload=cbor2.dumps(True) + message_1, 

267 ) 

268 msg1.remote = underlying_address 

269 msg2 = await wire.request(msg1).response_raising 

270 

271 (c_r, id_cred_r, ead_2) = initiator.parse_message_2(msg2.payload) 

272 

273 assert isinstance(self.own_cred, dict) and list(self.own_cred.keys()) == [14], ( 

274 "So far can only process CCS style own credentials a la {14: ...}, own_cred = %r" 

275 % self.own_cred 

276 ) 

277 cred_i = cbor2.dumps(self.own_cred[14], canonical=True) 

278 key_i = self.own_key.d 

279 

280 logger.debug("EDHOC responder sent message_2 with ID_CRED_R = %r", id_cred_r) 

281 if self.peer_cred == {"unauthenticated": True}: 

282 # Not doing further checks (eg. for trailing bytes) or re-raising: This 

283 # was already checked by lakers 

284 parsed = cbor2.loads(id_cred_r) 

285 

286 if not isinstance(parsed, dict): 

287 raise credentials.CredentialsMissingError( 

288 "Peer presented credential-by-reference when no credential was pre-agreed" 

289 ) 

290 

291 cred_r = id_cred_r 

292 else: 

293 # We could look into id_cred_r, which is a CBOR encoded 

294 # byte string, and could start comparing ... but actually 

295 # EDHOC and Lakers protect us from misbinding attacks (is 

296 # that what they are called?), so we can just put in our 

297 # expected credential here 

298 # 

299 # FIXME: But looking into it might give us a better error than just 

300 # "Mac2 verification failed" 

301 

302 # FIXME add assert on the structure or start doing the 

303 # generalization that'll fail at startup 

304 cred_r = cbor2.dumps(self.peer_cred[14], canonical=True) 

305 

306 initiator.verify_message_2( 

307 key_i, 

308 cred_i, 

309 cred_r, 

310 ) # odd that we provide that here rather than in the next function 

311 

312 logger.debug("Message 2 was verified") 

313 

314 return EdhocInitiatorContext(initiator, c_i, c_r, self.own_cred_style, logger) 

315 

316 

317class _EdhocContextBase( 

318 oscore.CanProtect, oscore.CanUnprotect, oscore.SecurityContextUtils 

319): 

320 def __init__(self, logger): 

321 self.log = logger 

322 

323 def post_seqnoincrease(self): 

324 # The context is not persisted 

325 pass 

326 

327 def protect(self, message, request_id=None, *, kid_context=True): 

328 outer_message, request_id = super().protect( 

329 message, request_id=request_id, kid_context=kid_context 

330 ) 

331 message_3 = self.message_3_to_include() 

332 if message_3 is not None: 

333 outer_message.opt.edhoc = True 

334 outer_message.payload = message_3 + outer_message.payload 

335 return outer_message, request_id 

336 

337 def _make_ready(self, edhoc_context, c_ours, c_theirs): 

338 # FIXME: both should offer this 

339 if ( 

340 isinstance(edhoc_context, lakers.EdhocResponder) 

341 or edhoc_context.selected_cipher_suite() == 2 

342 ): 

343 self.alg_aead = oscore.algorithms["AES-CCM-16-64-128"] 

344 self.hashfun = oscore.hashfunctions["sha256"] 

345 else: 

346 raise RuntimeError("Unknown suite") 

347 

348 # we did check for critical EADs, there was no out-of-band agreement, so 8 it is 

349 oscore_salt_length = 8 

350 # I figure that one would be ageed out-of-band as well (currently no 

351 # options to set/change this are known) 

352 self.id_context = None 

353 self.recipient_replay_window = oscore.ReplayWindow(32, lambda: None) 

354 

355 master_secret = edhoc_context.edhoc_exporter(0, [], self.alg_aead.key_bytes) 

356 master_salt = edhoc_context.edhoc_exporter(1, [], oscore_salt_length) 

357 

358 self.sender_id = c_theirs 

359 self.recipient_id = c_ours 

360 if self.sender_id == self.recipient_id: 

361 raise ValueError("Bad IDs: identical ones were picked") 

362 

363 self.derive_keys(master_salt, master_secret) 

364 

365 self.sender_sequence_number = 0 

366 self.recipient_replay_window.initialize_empty() 

367 

368 self.log.debug("EDHOC context %r ready for OSCORE operation", self) 

369 

370 @abc.abstractmethod 

371 def message_3_to_include(self) -> Optional[bytes]: 

372 """An encoded message_3 to include in outgoing messages 

373 

374 This may modify self to only return something once.""" 

375 

376 

377class EdhocInitiatorContext(_EdhocContextBase): 

378 """An OSCORE context that is derived from an EDHOC exchange. 

379 

380 It does not require that the EDHOC exchange has completed -- it can be set 

381 up by an initiator already when message 2 has been received, prepares a 

382 message 3 at setup time, and sends it with the first request that is sent 

383 through it.""" 

384 

385 # FIXME: Should we rather send it with *every* request that is sent before a message 4 is received implicitly? 

386 def __init__(self, initiator, c_ours, c_theirs, cred_i_mode, logger): 

387 super().__init__(logger) 

388 

389 # Only this line is role specific 

390 self._message_3, _i_prk_out = initiator.prepare_message_3( 

391 cred_i_mode.as_lakers(), None 

392 ) 

393 

394 self._make_ready(initiator, c_ours, c_theirs) 

395 

396 def message_3_to_include(self) -> Optional[bytes]: 

397 if self._message_3 is not None: 

398 result = self._message_3 

399 self._message_3 = None 

400 return result 

401 return None 

402 

403 

404class EdhocResponderContext(_EdhocContextBase): 

405 def __init__(self, responder, c_i, c_r, server_credentials, logger): 

406 super().__init__(logger) 

407 

408 # storing them where they will later be overwritten with themselves 

409 self.recipient_id = c_r 

410 self.sender_id = c_i 

411 

412 self._responder = responder 

413 # Through these we'll look up id_cred_i 

414 self._server_credentials = server_credentials 

415 

416 self.authenticated_claims = [] 

417 

418 # Not sure why mypy even tolerates this -- we're clearly not ready for 

419 # a general protect/unprotect, and things only work because all 

420 # relevant functions get their checks introduced 

421 self._incomplete = True 

422 

423 def message_3_to_include(self) -> Optional[bytes]: 

424 # as a responder we never send one 

425 return None 

426 

427 def get_oscore_context_for(self, unprotected): 

428 if oscore.COSE_KID_CONTEXT in unprotected: 

429 return None 

430 if unprotected.get(oscore.COSE_KID) == self.recipient_id: 

431 return self 

432 

433 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

434 return set((self.recipient_id,)) 

435 

436 def protect(self, *args, **kwargs): 

437 if self._incomplete: 

438 raise RuntimeError( 

439 "EDHOC has not completed yet, waiting for message 3, can not protect own messages yet" 

440 ) 

441 return super().protect(*args, **kwargs) 

442 

443 def unprotect(self, protected_message, request_id=None): 

444 if self._incomplete: 

445 if not protected_message.opt.edhoc: 

446 self.log.error( 

447 "OSCORE failed: No EDHOC message 3 received and none present" 

448 ) 

449 raise error.BadRequest("EDHOC incomplete") 

450 

451 payload_stream = io.BytesIO(protected_message.payload) 

452 # discarding result -- just need to have a point to split 

453 _ = cbor2.load(payload_stream) 

454 m3len = payload_stream.tell() 

455 message_3 = protected_message.payload[:m3len] 

456 

457 id_cred_i, ead_3 = self._responder.parse_message_3(message_3) 

458 if ead_3 is not None: 

459 self.log.error("Aborting EDHOC: EAD3 present") 

460 raise error.BadRequest 

461 

462 try: 

463 (cred_i, claims) = self._server_credentials.find_edhoc_by_id_cred_peer( 

464 id_cred_i 

465 ) 

466 except KeyError: 

467 self.log.error( 

468 "Aborting EDHOC: No credentials found for client with id_cred_i=h'%s'", 

469 id_cred_i.hex(), 

470 ) 

471 raise error.BadRequest 

472 

473 self.authenticated_claims.extend(claims) 

474 

475 self._responder.verify_message_3(cred_i) 

476 

477 self._make_ready(self._responder, self.recipient_id, self.sender_id) 

478 self._incomplete = False 

479 

480 protected_message = protected_message.copy( 

481 edhoc=False, payload=protected_message.payload[m3len:] 

482 ) 

483 

484 return super().unprotect(protected_message, request_id) 

485 

486 

487class OwnCredStyle(enum.Enum): 

488 """Guidance for how the own credential should be sent in an EDHOC 

489 exchange""" 

490 

491 ByKeyId = "by-key-id" 

492 ByValue = "by-value" 

493 

494 def as_lakers(self): 

495 """Convert the enum into Lakers' reepresentation of the same concept. 

496 

497 The types may eventually be unified, but so far, Lakers doesn't make 

498 the distinctions we expect to make yet.""" 

499 if self == self.ByKeyId: 

500 # FIXME: Mismatch to be fixed in lakers -- currently the only way 

501 # it allows sending by reference is by Key ID 

502 return lakers.CredentialTransfer.ByReference 

503 if self == self.ByValue: 

504 return lakers.CredentialTransfer.ByValue 

505 else: 

506 raise RuntimeError("enum variant not covered")