Coverage for aiocoap/oscore.py: 85%

1128 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-28 12:34 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module contains the tools to send OSCORE secured messages. 

6 

7It only deals with the algorithmic parts, the security context and protection 

8and unprotection of messages. It does not touch on the integration of OSCORE in 

9the larger aiocoap stack of having a context or requests; that's what 

10:mod:`aiocoap.transports.osore` is for.`""" 

11 

12from __future__ import annotations 

13 

14from collections import namedtuple 

15import io 

16import json 

17import binascii 

18import os 

19import os.path 

20import tempfile 

21import abc 

22from typing import Optional, List, Any, Tuple 

23import secrets 

24import warnings 

25 

26from aiocoap.message import Message 

27from aiocoap.util import cryptography_additions, deprecation_getattr, Sentinel 

28from aiocoap.numbers import GET, POST, FETCH, CHANGED, UNAUTHORIZED, CONTENT 

29from aiocoap import error 

30from . import credentials 

31 

32from cryptography.hazmat.primitives.ciphers import aead 

33from cryptography.hazmat.primitives.kdf.hkdf import HKDF 

34from cryptography.hazmat.primitives import ciphers, hashes 

35import cryptography.hazmat.backends 

36import cryptography.exceptions 

37from cryptography.hazmat.primitives import asymmetric, serialization 

38from cryptography.hazmat.primitives.asymmetric.utils import ( 

39 decode_dss_signature, 

40 encode_dss_signature, 

41) 

42 

43import cbor2 as cbor 

44 

45import filelock 

46 

47MAX_SEQNO = 2**40 - 1 

48 

49# Relevant values from the IANA registry "CBOR Object Signing and Encryption (COSE)" 

50COSE_KID = 4 

51COSE_PIV = 6 

52COSE_KID_CONTEXT = 10 

53# from RFC9338 

54COSE_COUNTERSIGNATURE0 = 12 

55# from RFC9528 

56COSE_KCCS = 14 

57 

58COMPRESSION_BITS_N = 0b111 

59COMPRESSION_BIT_K = 0b1000 

60COMPRESSION_BIT_H = 0b10000 

61COMPRESSION_BIT_GROUP = 0b100000 # Group Flag from draft-ietf-core-oscore-groupcomm-21 

62COMPRESSION_BITS_RESERVED = 0b11000000 

63 

64CWT_CLAIM_CNF = 8 

65CWT_CNF_COSE_KEY = 1 

66COSE_KEY_COMMON_KTY = 1 

67COSE_KTY_OKP = 1 

68COSE_KTY_EC2 = 2 

69COSE_KEY_COMMON_ALG = 3 

70COSE_KEY_OKP_CRV = -1 

71COSE_KEY_OKP_X = -2 

72COSE_KEY_EC2_X = -2 

73COSE_KEY_EC2_Y = -3 

74 

75# While the original values were simple enough to be used in literals, starting 

76# with oscore-groupcomm we're using more compact values 

77 

78INFO_TYPE_KEYSTREAM_REQUEST = True 

79INFO_TYPE_KEYSTREAM_RESPONSE = False 

80 

81PRESENT_BUT_NO_VALUE_YET = Sentinel("Value will be populated later") 

82 

83 

84class CodeStyle(namedtuple("_CodeStyle", ("request", "response"))): 

85 FETCH_CONTENT: CodeStyle 

86 POST_CHANGED: CodeStyle 

87 

88 @classmethod 

89 def from_request(cls, request) -> CodeStyle: 

90 if request == FETCH: 

91 return cls.FETCH_CONTENT 

92 elif request == POST: 

93 return cls.POST_CHANGED 

94 else: 

95 raise ValueError("Invalid request code %r" % request) 

96 

97 

98CodeStyle.FETCH_CONTENT = CodeStyle(FETCH, CONTENT) 

99CodeStyle.POST_CHANGED = CodeStyle(POST, CHANGED) 

100 

101 

102class DeterministicKey: 

103 """Singleton to indicate that for this key member no public or private key 

104 is available because it is the Deterministic Client (see 

105 <https://www.ietf.org/archive/id/draft-amsuess-core-cachable-oscore-01.html>) 

106 

107 This is highly experimental not only from an implementation but also from a 

108 specification point of view. The specification has not received adaequate 

109 review that would justify using it in any non-experimental scenario. 

110 """ 

111 

112 

113DETERMINISTIC_KEY = DeterministicKey() 

114del DeterministicKey 

115 

116 

117class NotAProtectedMessage(error.Error, ValueError): 

118 """Raised when verification is attempted on a non-OSCORE message""" 

119 

120 def __init__(self, message, plain_message): 

121 super().__init__(message) 

122 self.plain_message = plain_message 

123 

124 

125class ProtectionInvalid(error.Error, ValueError): 

126 """Raised when verification of an OSCORE message fails""" 

127 

128 

129class DecodeError(ProtectionInvalid): 

130 """Raised when verification of an OSCORE message fails because CBOR or compressed data were erroneous""" 

131 

132 

133class ReplayError(ProtectionInvalid): 

134 """Raised when verification of an OSCORE message fails because the sequence numbers was already used""" 

135 

136 

137class ReplayErrorWithEcho(ProtectionInvalid, error.RenderableError): 

138 """Raised when verification of an OSCORE message fails because the 

139 recipient replay window is uninitialized, but a 4.01 Echo can be 

140 constructed with the data in the exception that can lead to the client 

141 assisting in replay window recovery""" 

142 

143 def __init__(self, secctx, request_id, echo): 

144 self.secctx = secctx 

145 self.request_id = request_id 

146 self.echo = echo 

147 

148 def to_message(self): 

149 inner = Message( 

150 code=UNAUTHORIZED, 

151 echo=self.echo, 

152 ) 

153 outer, _ = self.secctx.protect(inner, request_id=self.request_id) 

154 return outer 

155 

156 

157class ContextUnavailable(error.Error, ValueError): 

158 """Raised when a context is (currently or permanently) unavailable for 

159 protecting or unprotecting a message""" 

160 

161 

162class RequestIdentifiers: 

163 """A container for details that need to be passed along from the 

164 (un)protection of a request to the (un)protection of the response; these 

165 data ensure that the request-response binding process works by passing 

166 around the request's partial IV. 

167 

168 Users of this module should never create or interact with instances, but 

169 just pass them around. 

170 """ 

171 

172 def __init__(self, kid, partial_iv, nonce, can_reuse_nonce, request_code): 

173 self.kid = kid 

174 self.partial_iv = partial_iv 

175 self.nonce = nonce 

176 self.can_reuse_nonce = can_reuse_nonce 

177 self.code_style = CodeStyle.from_request(request_code) 

178 

179 self.request_hash = None 

180 

181 def get_reusable_nonce_and_piv(self): 

182 """Return the nonce and the partial IV if can_reuse_nonce is True, and 

183 set can_reuse_nonce to False.""" 

184 

185 if self.can_reuse_nonce: 

186 self.can_reuse_nonce = False 

187 return (self.nonce, self.partial_iv) 

188 else: 

189 return (None, None) 

190 

191 

192def _xor_bytes(a, b): 

193 assert len(a) == len(b), "XOR needs consistent lengths" 

194 # FIXME is this an efficient thing to do, or should we store everything 

195 # that possibly needs xor'ing as long integers with an associated length? 

196 return bytes(_a ^ _b for (_a, _b) in zip(a, b)) 

197 

198 

199class SymmetricEncryptionAlgorithm(metaclass=abc.ABCMeta): 

200 """A symmetric algorithm 

201 

202 The algorithm's API is the AEAD API with addtional authenticated data: The 

203 algorihm may or may not verify that data. Algorithms that actually do 

204 verify the data are recognized by also being AeadAlgorithm. 

205 """ 

206 

207 value: int 

208 key_bytes: int 

209 tag_bytes: int 

210 iv_bytes: int 

211 

212 @abc.abstractmethod 

213 def encrypt(cls, plaintext, aad, key, iv): 

214 """Return ciphertext + tag for given input data""" 

215 

216 @abc.abstractmethod 

217 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

218 """Reverse encryption. Must raise ProtectionInvalid on any error 

219 stemming from untrusted data.""" 

220 

221 @staticmethod 

222 def _build_encrypt0_structure(protected, external_aad): 

223 assert protected == {}, "Unexpected data in protected bucket" 

224 protected_serialized = b"" # were it into an empty dict, it'd be the cbor dump 

225 enc_structure = ["Encrypt0", protected_serialized, external_aad] 

226 

227 return cbor.dumps(enc_structure) 

228 

229 

230class AeadAlgorithm(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta): 

231 """A symmetric algorithm that provides authentication, including 

232 authentication of additional data.""" 

233 

234 

235class AES_CBC(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta): 

236 """AES in CBC mode using tthe Python cryptography library""" 

237 

238 tag_bytes = 0 

239 iv_bytes = 0 

240 # This introduces padding -- this library doesn't need to care because 

241 # Python does allocation for us, but others may need to rethink their 

242 # buffer allocation strategies. 

243 

244 @classmethod 

245 def _cipher(cls, key, iv): 

246 return ciphers.base.Cipher( 

247 ciphers.algorithms.AES(key), 

248 ciphers.modes.CBC(iv), 

249 ) 

250 

251 @classmethod 

252 def encrypt(cls, plaintext, _aad, key, iv): 

253 # FIXME: Ignoring aad violates https://www.rfc-editor.org/rfc/rfc9459.html#name-implementation-consideratio but is required for Group OSCORE 

254 

255 # Padding according to https://www.rfc-editor.org/rfc/rfc5652#section-6.3 

256 k = cls.key_bytes 

257 assert ( 

258 k < 256 

259 ), "Algorithm with this key size should not have been created in the first plae" 

260 pad_byte = k - (len(plaintext) % k) 

261 pad_bytes = bytes((pad_byte,)) * pad_byte 

262 plaintext += pad_bytes 

263 

264 encryptor = cls._cipher(key, iv).encryptor() 

265 result = encryptor.update(plaintext) 

266 result += encryptor.finalize() 

267 return result 

268 

269 @classmethod 

270 def decrypt(cls, ciphertext_and_tag, _aad, key, iv): 

271 # FIXME: Ignoring aad violates https://www.rfc-editor.org/rfc/rfc9459.html#name-implementation-consideratio but is required for Group OSCORE 

272 

273 k = cls.key_bytes 

274 if ciphertext_and_tag == b"" or len(ciphertext_and_tag) % k != 0: 

275 raise ProtectionInvalid("Message length does not match padding") 

276 

277 decryptor = cls._cipher(key, iv).decryptor() 

278 result = decryptor.update(ciphertext_and_tag) 

279 result += decryptor.finalize() 

280 

281 # Padding according to https://www.rfc-editor.org/rfc/rfc5652#section-6.3 

282 claimed_padding = result[-1] 

283 if claimed_padding == 0 or claimed_padding > k: 

284 raise ProtectionInvalid("Padding does not match key") 

285 if result[-claimed_padding:] != bytes((claimed_padding,)) * claimed_padding: 

286 raise ProtectionInvalid("Padding is inconsistent") 

287 

288 return result[:-claimed_padding] 

289 

290 

291class A128CBC(AES_CBC): 

292 # from RFC9459 

293 value = -65531 

294 key_bytes = 16 # 128-bit key 

295 iv_bytes = 16 # 16-octet nonce 

296 

297 

298class AES_CCM(AeadAlgorithm, metaclass=abc.ABCMeta): 

299 """AES-CCM implemented using the Python cryptography library""" 

300 

301 @classmethod 

302 def encrypt(cls, plaintext, aad, key, iv): 

303 return aead.AESCCM(key, cls.tag_bytes).encrypt(iv, plaintext, aad) 

304 

305 @classmethod 

306 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

307 try: 

308 return aead.AESCCM(key, cls.tag_bytes).decrypt(iv, ciphertext_and_tag, aad) 

309 except cryptography.exceptions.InvalidTag: 

310 raise ProtectionInvalid("Tag invalid") 

311 

312 

313class AES_CCM_16_64_128(AES_CCM): 

314 # from RFC8152 and draft-ietf-core-object-security-0[012] 3.2.1 

315 value = 10 

316 key_bytes = 16 # 128-bit key 

317 tag_bytes = 8 # 64-bit tag 

318 iv_bytes = 13 # 13-byte nonce 

319 

320 

321class AES_CCM_16_64_256(AES_CCM): 

322 # from RFC8152 

323 value = 11 

324 key_bytes = 32 # 256-bit key 

325 tag_bytes = 8 # 64-bit tag 

326 iv_bytes = 13 # 13-byte nonce 

327 

328 

329class AES_CCM_64_64_128(AES_CCM): 

330 # from RFC8152 

331 value = 12 

332 key_bytes = 16 # 128-bit key 

333 tag_bytes = 8 # 64-bit tag 

334 iv_bytes = 7 # 7-byte nonce 

335 

336 

337class AES_CCM_64_64_256(AES_CCM): 

338 # from RFC8152 

339 value = 13 

340 key_bytes = 32 # 256-bit key 

341 tag_bytes = 8 # 64-bit tag 

342 iv_bytes = 7 # 7-byte nonce 

343 

344 

345class AES_CCM_16_128_128(AES_CCM): 

346 # from RFC8152 

347 value = 30 

348 key_bytes = 16 # 128-bit key 

349 tag_bytes = 16 # 128-bit tag 

350 iv_bytes = 13 # 13-byte nonce 

351 

352 

353class AES_CCM_16_128_256(AES_CCM): 

354 # from RFC8152 

355 value = 31 

356 key_bytes = 32 # 256-bit key 

357 tag_bytes = 16 # 128-bit tag 

358 iv_bytes = 13 # 13-byte nonce 

359 

360 

361class AES_CCM_64_128_128(AES_CCM): 

362 # from RFC8152 

363 value = 32 

364 key_bytes = 16 # 128-bit key 

365 tag_bytes = 16 # 128-bit tag 

366 iv_bytes = 7 # 7-byte nonce 

367 

368 

369class AES_CCM_64_128_256(AES_CCM): 

370 # from RFC8152 

371 value = 33 

372 key_bytes = 32 # 256-bit key 

373 tag_bytes = 16 # 128-bit tag 

374 iv_bytes = 7 # 7-byte nonce 

375 

376 

377class AES_GCM(AeadAlgorithm, metaclass=abc.ABCMeta): 

378 """AES-GCM implemented using the Python cryptography library""" 

379 

380 iv_bytes = 12 # 96 bits fixed size of the nonce 

381 

382 @classmethod 

383 def encrypt(cls, plaintext, aad, key, iv): 

384 return aead.AESGCM(key).encrypt(iv, plaintext, aad) 

385 

386 @classmethod 

387 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

388 try: 

389 return aead.AESGCM(key).decrypt(iv, ciphertext_and_tag, aad) 

390 except cryptography.exceptions.InvalidTag: 

391 raise ProtectionInvalid("Tag invalid") 

392 

393 

394class A128GCM(AES_GCM): 

395 # from RFC8152 

396 value = 1 

397 key_bytes = 16 # 128-bit key 

398 tag_bytes = 16 # 128-bit tag 

399 

400 

401class A192GCM(AES_GCM): 

402 # from RFC8152 

403 value = 2 

404 key_bytes = 24 # 192-bit key 

405 tag_bytes = 16 # 128-bit tag 

406 

407 

408class A256GCM(AES_GCM): 

409 # from RFC8152 

410 value = 3 

411 key_bytes = 32 # 256-bit key 

412 tag_bytes = 16 # 128-bit tag 

413 

414 

415class ChaCha20Poly1305(AeadAlgorithm): 

416 # from RFC8152 

417 value = 24 

418 key_bytes = 32 # 256-bit key 

419 tag_bytes = 16 # 128-bit tag 

420 iv_bytes = 12 # 96-bit nonce 

421 

422 @classmethod 

423 def encrypt(cls, plaintext, aad, key, iv): 

424 return aead.ChaCha20Poly1305(key).encrypt(iv, plaintext, aad) 

425 

426 @classmethod 

427 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

428 try: 

429 return aead.ChaCha20Poly1305(key).decrypt(iv, ciphertext_and_tag, aad) 

430 except cryptography.exceptions.InvalidTag: 

431 raise ProtectionInvalid("Tag invalid") 

432 

433 

434class AlgorithmCountersign(metaclass=abc.ABCMeta): 

435 """A fully parameterized COSE countersign algorithm 

436 

437 An instance is able to provide all the alg_signature, par_countersign and 

438 par_countersign_key parameters taht go into the Group OSCORE algorithms 

439 field. 

440 """ 

441 

442 value: int | str 

443 

444 @abc.abstractmethod 

445 def sign(self, body, external_aad, private_key): 

446 """Return the signature produced by the key when using 

447 CounterSignature0 as describe in draft-ietf-cose-countersign-01""" 

448 

449 @abc.abstractmethod 

450 def verify(self, signature, body, external_aad, public_key): 

451 """Verify a signature in analogy to sign""" 

452 

453 @abc.abstractmethod 

454 def generate_with_ccs(self) -> Tuple[Any, bytes]: 

455 """Return a usable private key along with a CCS describing it""" 

456 

457 @abc.abstractmethod 

458 def public_from_private(self, private_key): 

459 """Given a private key, derive the publishable key""" 

460 

461 @abc.abstractmethod 

462 def from_kccs(self, ccs: bytes) -> Any: 

463 """Given a CCS, extract the public key, or raise a ValueError if the 

464 credential format does not align with the type. 

465 

466 The type is not exactly Any, but whichever type is used by this 

467 algorithm class.""" 

468 

469 @staticmethod 

470 def _build_countersign_structure(body, external_aad): 

471 countersign_structure = [ 

472 "CounterSignature0", 

473 b"", 

474 b"", 

475 external_aad, 

476 body, 

477 ] 

478 tobesigned = cbor.dumps(countersign_structure) 

479 return tobesigned 

480 

481 @abc.abstractproperty 

482 def signature_length(self) -> int: 

483 """The length of a signature using this algorithm""" 

484 

485 @abc.abstractproperty 

486 def curve_number(self) -> int: 

487 """Registered curve number used with this algorithm. 

488 

489 Only used for verification of credentials' details""" 

490 

491 

492class AlgorithmStaticStatic(metaclass=abc.ABCMeta): 

493 @abc.abstractmethod 

494 def staticstatic(self, private_key, public_key): 

495 """Derive a shared static-static secret from a private and a public key""" 

496 

497 

498def _from_kccs_common(ccs: bytes) -> dict: 

499 """Check that the CCS contains a CNF claim that is a COSE Key, and return 

500 that key""" 

501 

502 try: 

503 parsed = cbor.loads(ccs) 

504 except cbor.CBORDecodeError as e: 

505 raise ValueError("CCS not in CBOR format") from e 

506 

507 if ( 

508 not isinstance(parsed, dict) 

509 or CWT_CLAIM_CNF not in parsed 

510 or not isinstance(parsed[CWT_CLAIM_CNF], dict) 

511 or CWT_CNF_COSE_KEY not in parsed[CWT_CLAIM_CNF] 

512 or not isinstance(parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY], dict) 

513 ): 

514 raise ValueError("CCS must contain a COSE Key dict in a CNF") 

515 

516 return parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY] 

517 

518 

519class Ed25519(AlgorithmCountersign): 

520 def sign(self, body, aad, private_key): 

521 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes( 

522 private_key 

523 ) 

524 return private_key.sign(self._build_countersign_structure(body, aad)) 

525 

526 def verify(self, signature, body, aad, public_key): 

527 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key) 

528 try: 

529 public_key.verify(signature, self._build_countersign_structure(body, aad)) 

530 except cryptography.exceptions.InvalidSignature: 

531 raise ProtectionInvalid("Signature mismatch") 

532 

533 def _generate(self): 

534 key = asymmetric.ed25519.Ed25519PrivateKey.generate() 

535 # FIXME: We could avoid handing the easy-to-misuse bytes around if the 

536 # current algorithm interfaces did not insist on passing the 

537 # exchangable representations -- and generally that should be more 

538 # efficient. 

539 return key.private_bytes( 

540 encoding=serialization.Encoding.Raw, 

541 format=serialization.PrivateFormat.Raw, 

542 encryption_algorithm=serialization.NoEncryption(), 

543 ) 

544 

545 def generate_with_ccs(self) -> Tuple[Any, bytes]: 

546 private = self._generate() 

547 public = self.public_from_private(private) 

548 

549 ccs = cbor.dumps( 

550 { 

551 CWT_CLAIM_CNF: { 

552 CWT_CNF_COSE_KEY: { 

553 COSE_KEY_COMMON_KTY: COSE_KTY_OKP, 

554 COSE_KEY_COMMON_ALG: self.value, 

555 COSE_KEY_OKP_CRV: self.curve_number, 

556 COSE_KEY_OKP_X: public, 

557 } 

558 } 

559 } 

560 ) 

561 

562 return (private, ccs) 

563 

564 def public_from_private(self, private_key): 

565 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes( 

566 private_key 

567 ) 

568 public_key = private_key.public_key() 

569 return public_key.public_bytes( 

570 encoding=serialization.Encoding.Raw, 

571 format=serialization.PublicFormat.Raw, 

572 ) 

573 

574 def from_kccs(self, ccs: bytes) -> Any: 

575 # eg. {1: 1, 3: -8, -1: 6, -2: h'77 ... 88'} 

576 cose_key = _from_kccs_common(ccs) 

577 

578 if ( 

579 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_OKP 

580 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value 

581 and cose_key.get(COSE_KEY_OKP_CRV) == self.curve_number 

582 and COSE_KEY_OKP_X in cose_key 

583 ): 

584 return cose_key[COSE_KEY_OKP_X] 

585 else: 

586 raise ValueError("Key type not recognized from CCS key %r" % cose_key) 

587 

588 value = -8 

589 curve_number = 6 

590 

591 signature_length = 64 

592 

593 

594class EcdhSsHkdf256(AlgorithmStaticStatic): 

595 # FIXME: This class uses the Edwards keys as private and public keys, and 

596 # not the converted ones. This will be problematic if pairwise-only 

597 # contexts are to be set up. 

598 

599 value = -27 

600 

601 # FIXME these two will be different when using the Montgomery keys directly 

602 

603 # This one will only be used when establishing and distributing pairwise-only keys 

604 public_from_private = Ed25519.public_from_private 

605 

606 def staticstatic(self, private_key, public_key): 

607 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes( 

608 private_key 

609 ) 

610 private_key = cryptography_additions.sk_to_curve25519(private_key) 

611 

612 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key) 

613 public_key = cryptography_additions.pk_to_curve25519(public_key) 

614 

615 return private_key.exchange(public_key) 

616 

617 

618class ECDSA_SHA256_P256(AlgorithmCountersign, AlgorithmStaticStatic): 

619 # Trying a new construction approach -- should work just as well given 

620 # we're just passing Python objects around 

621 def from_public_parts(self, x: bytes, y: bytes): 

622 """Create a public key from its COSE values""" 

623 return asymmetric.ec.EllipticCurvePublicNumbers( 

624 int.from_bytes(x, "big"), 

625 int.from_bytes(y, "big"), 

626 asymmetric.ec.SECP256R1(), 

627 ).public_key() 

628 

629 def from_kccs(self, ccs: bytes) -> Any: 

630 cose_key = _from_kccs_common(ccs) 

631 

632 if ( 

633 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_EC2 

634 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value 

635 and COSE_KEY_EC2_X in cose_key 

636 and COSE_KEY_EC2_Y in cose_key 

637 ): 

638 return self.from_public_parts( 

639 x=cose_key[COSE_KEY_EC2_X], 

640 y=cose_key[COSE_KEY_EC2_Y], 

641 ) 

642 else: 

643 raise ValueError("Key type not recognized from CCS key %r" % cose_key) 

644 

645 def from_private_parts(self, x: bytes, y: bytes, d: bytes): 

646 public_numbers = self.from_public_parts(x, y).public_numbers() 

647 private_numbers = asymmetric.ec.EllipticCurvePrivateNumbers( 

648 int.from_bytes(d, "big"), public_numbers 

649 ) 

650 return private_numbers.private_key() 

651 

652 def sign(self, body, aad, private_key): 

653 der_signature = private_key.sign( 

654 self._build_countersign_structure(body, aad), 

655 asymmetric.ec.ECDSA(hashes.SHA256()), 

656 ) 

657 (r, s) = decode_dss_signature(der_signature) 

658 

659 return r.to_bytes(32, "big") + s.to_bytes(32, "big") 

660 

661 def verify(self, signature, body, aad, public_key): 

662 r = signature[:32] 

663 s = signature[32:] 

664 r = int.from_bytes(r, "big") 

665 s = int.from_bytes(s, "big") 

666 der_signature = encode_dss_signature(r, s) 

667 try: 

668 public_key.verify( 

669 der_signature, 

670 self._build_countersign_structure(body, aad), 

671 asymmetric.ec.ECDSA(hashes.SHA256()), 

672 ) 

673 except cryptography.exceptions.InvalidSignature: 

674 raise ProtectionInvalid("Signature mismatch") 

675 

676 def _generate(self): 

677 return asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) 

678 

679 def generate_with_ccs(self) -> Tuple[Any, bytes]: 

680 private = self._generate() 

681 public = self.public_from_private(private) 

682 # FIXME: Deduplicate with edhoc.py 

683 x = public.public_numbers().x.to_bytes(32, "big") 

684 y = public.public_numbers().y.to_bytes(32, "big") 

685 

686 ccs = cbor.dumps( 

687 { 

688 CWT_CLAIM_CNF: { 

689 CWT_CNF_COSE_KEY: { 

690 COSE_KEY_COMMON_KTY: COSE_KTY_EC2, 

691 COSE_KEY_COMMON_ALG: self.value, 

692 COSE_KEY_EC2_X: x, 

693 COSE_KEY_EC2_Y: y, 

694 } 

695 } 

696 } 

697 ) 

698 

699 return (private, ccs) 

700 

701 def public_from_private(self, private_key): 

702 return private_key.public_key() 

703 

704 def staticstatic(self, private_key, public_key): 

705 return private_key.exchange(asymmetric.ec.ECDH(), public_key) 

706 

707 value = -7 # FIXME: when used as a static-static algorithm, does this become -27? see shepherd review. 

708 curve_number = 1 

709 

710 signature_length = 64 

711 

712 

713algorithms = { 

714 "AES-CCM-16-64-128": AES_CCM_16_64_128(), 

715 "AES-CCM-16-64-256": AES_CCM_16_64_256(), 

716 "AES-CCM-64-64-128": AES_CCM_64_64_128(), 

717 "AES-CCM-64-64-256": AES_CCM_64_64_256(), 

718 "AES-CCM-16-128-128": AES_CCM_16_128_128(), 

719 "AES-CCM-16-128-256": AES_CCM_16_128_256(), 

720 "AES-CCM-64-128-128": AES_CCM_64_128_128(), 

721 "AES-CCM-64-128-256": AES_CCM_64_128_256(), 

722 "ChaCha20/Poly1305": ChaCha20Poly1305(), 

723 "A128GCM": A128GCM(), 

724 "A192GCM": A192GCM(), 

725 "A256GCM": A256GCM(), 

726} 

727 

728# algorithms with full parameter set 

729algorithms_countersign = { 

730 # maybe needs a different name... 

731 "EdDSA on Ed25519": Ed25519(), 

732 "ECDSA w/ SHA-256 on P-256": ECDSA_SHA256_P256(), 

733} 

734 

735algorithms_staticstatic = { 

736 "ECDH-SS + HKDF-256": EcdhSsHkdf256(), 

737} 

738 

739DEFAULT_ALGORITHM = "AES-CCM-16-64-128" 

740 

741_hash_backend = cryptography.hazmat.backends.default_backend() 

742hashfunctions = { 

743 "sha256": hashes.SHA256(), 

744 "sha384": hashes.SHA384(), 

745 "sha512": hashes.SHA512(), 

746} 

747 

748DEFAULT_HASHFUNCTION = "sha256" 

749 

750DEFAULT_WINDOWSIZE = 32 

751 

752 

753class BaseSecurityContext: 

754 # Deprecated marker for whether the class uses the 

755 # ContextWhereExternalAadIsGroup mixin; see documentation there. 

756 external_aad_is_group = False 

757 

758 # Authentication information carried with this security context; managed 

759 # externally by whatever creates the security context. 

760 authenticated_claims: List[str] = [] 

761 

762 #: AEAD algorithm. This may be None if it is not set in an OSCORE group context. 

763 alg_aead: Optional[AeadAlgorithm] 

764 

765 @property 

766 def algorithm(self): 

767 warnings.warn( 

768 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2 

769 ) 

770 return self.alg_aead 

771 

772 @algorithm.setter 

773 def algorithm(self, value): 

774 warnings.warn( 

775 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2 

776 ) 

777 self.alg_aead = value 

778 

779 hashfun: hashes.HashAlgorithm 

780 

781 def _construct_nonce(self, partial_iv_short, piv_generator_id): 

782 pad_piv = b"\0" * (5 - len(partial_iv_short)) 

783 

784 s = bytes([len(piv_generator_id)]) 

785 pad_id = b"\0" * (self.alg_aead.iv_bytes - 6 - len(piv_generator_id)) 

786 

787 components = s + pad_id + piv_generator_id + pad_piv + partial_iv_short 

788 

789 used_common_iv = self.common_iv[: len(components)] 

790 nonce = _xor_bytes(used_common_iv, components) 

791 

792 return nonce 

793 

794 def _extract_external_aad( 

795 self, message, request_id, local_is_sender: bool 

796 ) -> bytes: 

797 """Build the serialized external AAD from information in the message 

798 and the request_id. 

799 

800 Information about whether the local context is the sender of the 

801 message is only relevant to group contexts, where it influences whose 

802 authentication credentials are placed in the AAD. 

803 """ 

804 # If any option were actually Class I, it would be something like 

805 # 

806 # the_options = pick some of(message) 

807 # class_i_options = Message(the_options).opt.encode() 

808 

809 oscore_version = 1 

810 class_i_options = b"" 

811 if request_id.request_hash is not None: 

812 class_i_options = Message(request_hash=request_id.request_hash).opt.encode() 

813 

814 algorithms: List[int | str | None] = [ 

815 None if self.alg_aead is None else self.alg_aead.value 

816 ] 

817 if isinstance(self, ContextWhereExternalAadIsGroup): 

818 algorithms.append( 

819 None if self.alg_group_enc is None else self.alg_group_enc.value 

820 ) 

821 algorithms.append( 

822 None if self.alg_signature is None else self.alg_signature.value 

823 ) 

824 algorithms.append( 

825 None 

826 if self.alg_pairwise_key_agreement is None 

827 else self.alg_pairwise_key_agreement.value 

828 ) 

829 

830 external_aad = [ 

831 oscore_version, 

832 algorithms, 

833 request_id.kid, 

834 request_id.partial_iv, 

835 class_i_options, 

836 ] 

837 

838 if isinstance(self, ContextWhereExternalAadIsGroup): 

839 # FIXME: We may need to carry this over in the request_id when 

840 # observation span group rekeyings 

841 external_aad.append(self.id_context) 

842 

843 assert message.opt.oscore is not None, "Double OSCORE" 

844 external_aad.append(message.opt.oscore) 

845 

846 if local_is_sender: 

847 external_aad.append(self.sender_auth_cred) 

848 else: 

849 external_aad.append(self.recipient_auth_cred) 

850 external_aad.append(self.group_manager_cred) 

851 

852 return cbor.dumps(external_aad) 

853 

854 

855class ContextWhereExternalAadIsGroup(BaseSecurityContext): 

856 """The protection and unprotection functions will use the Group OSCORE AADs 

857 rather than the regular OSCORE AADs iff a context uses this mixin. (Ie. 

858 alg_group_enc etc are added to the algorithms, and request_kid_context, 

859 OSCORE_option, sender_auth_cred and gm_cred are added). 

860 

861 This does not necessarily match the is_signing property (as pairwise 

862 contexts use this but don't sign), and is distinct from the added OSCORE 

863 option in the AAD (as that's only applicable for the external AAD as 

864 extracted for signing and signature verification purposes).""" 

865 

866 id_context: bytes 

867 

868 external_aad_is_group = True 

869 

870 alg_group_enc: Optional[AeadAlgorithm] 

871 alg_signature: Optional[AlgorithmCountersign] 

872 # This is also of type AlgorithmCountersign because the staticstatic 

873 # function is sitting on the same type. 

874 alg_pairwise_key_agreement: Optional[AlgorithmCountersign] 

875 

876 sender_auth_cred: bytes 

877 recipient_auth_cred: bytes 

878 group_manager_cred: bytes 

879 

880 

881# FIXME pull interface components from SecurityContext up here 

882class CanProtect(BaseSecurityContext, metaclass=abc.ABCMeta): 

883 # The protection function will add a signature acccording to the context's 

884 # alg_signature attribute if this is true 

885 is_signing = False 

886 

887 # Send the KID when protecting responses 

888 # 

889 # Once group pairwise mode is implemented, this will need to become a 

890 # parameter to protect(), which is stored at the point where the incoming 

891 # context is turned into an outgoing context. (Currently, such a mechanism 

892 # isn't there yet, and oscore_wrapper protects responses with the very same 

893 # context they came in on). 

894 responses_send_kid = False 

895 

896 @staticmethod 

897 def _compress(protected, unprotected, ciphertext): 

898 """Pack the untagged COSE_Encrypt0 object described by the *args 

899 into two bytestrings suitable for the Object-Security option and the 

900 message body""" 

901 

902 if protected: 

903 raise RuntimeError( 

904 "Protection produced a message that has uncompressable fields." 

905 ) 

906 

907 piv = unprotected.pop(COSE_PIV, b"") 

908 if len(piv) > COMPRESSION_BITS_N: 

909 raise ValueError("Can't encode overly long partial IV") 

910 

911 firstbyte = len(piv) 

912 if COSE_KID in unprotected: 

913 firstbyte |= COMPRESSION_BIT_K 

914 kid_data = unprotected.pop(COSE_KID) 

915 else: 

916 kid_data = b"" 

917 

918 if COSE_KID_CONTEXT in unprotected: 

919 firstbyte |= COMPRESSION_BIT_H 

920 kid_context = unprotected.pop(COSE_KID_CONTEXT) 

921 s = len(kid_context) 

922 if s > 255: 

923 raise ValueError("KID Context too long") 

924 s_kid_context = bytes((s,)) + kid_context 

925 else: 

926 s_kid_context = b"" 

927 

928 if COSE_COUNTERSIGNATURE0 in unprotected: 

929 firstbyte |= COMPRESSION_BIT_GROUP 

930 

931 unprotected.pop(COSE_COUNTERSIGNATURE0) 

932 

933 # ciphertext will eventually also get the countersignature, but 

934 # that happens later when the option is already processed. 

935 

936 if unprotected: 

937 raise RuntimeError( 

938 "Protection produced a message that has uncompressable fields." 

939 ) 

940 

941 if firstbyte: 

942 option = bytes([firstbyte]) + piv + s_kid_context + kid_data 

943 else: 

944 option = b"" 

945 

946 return (option, ciphertext) 

947 

948 def protect(self, message, request_id=None, *, kid_context=True): 

949 """Given a plain CoAP message, create a protected message that contains 

950 message's options in the inner or outer CoAP message as described in 

951 OSCOAP. 

952 

953 If the message is a response to a previous message, the additional data 

954 from unprotecting the request are passed in as request_id. When 

955 request data is present, its partial IV is reused if possible. The 

956 security context's ID context is encoded in the resulting message 

957 unless kid_context is explicitly set to a False; other values for the 

958 kid_context can be passed in as byte string in the same parameter. 

959 """ 

960 

961 assert ( 

962 (request_id is None) == message.code.is_request() 

963 ), "Requestishness of code to protect does not match presence of request ID" 

964 

965 outer_message, plaintext = self._split_message(message, request_id) 

966 

967 protected = {} 

968 nonce = None 

969 unprotected = {} 

970 if request_id is not None: 

971 nonce, partial_iv_short = request_id.get_reusable_nonce_and_piv() 

972 if nonce is not None: 

973 partial_iv_generated_by = request_id.kid 

974 

975 if nonce is None: 

976 nonce, partial_iv_short = self._build_new_nonce() 

977 partial_iv_generated_by = self.sender_id 

978 

979 unprotected[COSE_PIV] = partial_iv_short 

980 

981 if message.code.is_request(): 

982 unprotected[COSE_KID] = self.sender_id 

983 

984 request_id = RequestIdentifiers( 

985 self.sender_id, 

986 partial_iv_short, 

987 nonce, 

988 can_reuse_nonce=None, 

989 request_code=outer_message.code, 

990 ) 

991 

992 if kid_context is True: 

993 if self.id_context is not None: 

994 unprotected[COSE_KID_CONTEXT] = self.id_context 

995 elif kid_context is not False: 

996 unprotected[COSE_KID_CONTEXT] = kid_context 

997 else: 

998 if self.responses_send_kid: 

999 unprotected[COSE_KID] = self.sender_id 

1000 

1001 # Putting in a dummy value as the signature calculation will already need some of the compression result 

1002 if self.is_signing: 

1003 unprotected[COSE_COUNTERSIGNATURE0] = b"" 

1004 # FIXME: Running this twice quite needlessly (just to get the oscore option for sending) 

1005 option_data, _ = self._compress(protected, unprotected, b"") 

1006 

1007 outer_message.opt.oscore = option_data 

1008 

1009 external_aad = self._extract_external_aad( 

1010 outer_message, request_id, local_is_sender=True 

1011 ) 

1012 

1013 aad = self.alg_aead._build_encrypt0_structure(protected, external_aad) 

1014 

1015 key = self._get_sender_key(outer_message, external_aad, plaintext, request_id) 

1016 

1017 ciphertext = self.alg_aead.encrypt(plaintext, aad, key, nonce) 

1018 

1019 _, payload = self._compress(protected, unprotected, ciphertext) 

1020 

1021 if self.is_signing: 

1022 signature = self.alg_signature.sign(payload, external_aad, self.private_key) 

1023 keystream = self._kdf_for_keystreams( 

1024 partial_iv_generated_by, 

1025 partial_iv_short, 

1026 self.signature_encryption_key, 

1027 self.sender_id, 

1028 INFO_TYPE_KEYSTREAM_REQUEST 

1029 if message.code.is_request() 

1030 else INFO_TYPE_KEYSTREAM_RESPONSE, 

1031 ) 

1032 encrypted_signature = _xor_bytes(signature, keystream) 

1033 payload += encrypted_signature 

1034 outer_message.payload = payload 

1035 

1036 # FIXME go through options section 

1037 

1038 # the request_id in the second argument should be discarded by the 

1039 # caller when protecting a response -- is that reason enough for an 

1040 # `if` and returning None? 

1041 return outer_message, request_id 

1042 

1043 def _get_sender_key(self, outer_message, aad, plaintext, request_id): 

1044 """Customization hook of the protect function 

1045 

1046 While most security contexts have a fixed sender key, deterministic 

1047 requests need to shake up a few things. They need to modify the outer 

1048 message, as well as the request_id as it will later be used to 

1049 unprotect the response.""" 

1050 return self.sender_key 

1051 

1052 def _split_message(self, message, request_id): 

1053 """Given a protected message, return the outer message that contains 

1054 all Class I and Class U options (but without payload or Object-Security 

1055 option), and the encoded inner message that contains all Class E 

1056 options and the payload. 

1057 

1058 This leaves the messages' remotes unset.""" 

1059 

1060 if message.code.is_request(): 

1061 outer_host = message.opt.uri_host 

1062 proxy_uri = message.opt.proxy_uri 

1063 

1064 inner_message = message.copy( 

1065 uri_host=None, 

1066 uri_port=None, 

1067 proxy_uri=None, 

1068 proxy_scheme=None, 

1069 ) 

1070 inner_message.remote = None 

1071 

1072 if proxy_uri is not None: 

1073 # Use set_request_uri to split up the proxy URI into its 

1074 # components; extract, preserve and clear them. 

1075 inner_message.set_request_uri(proxy_uri, set_uri_host=False) 

1076 if inner_message.opt.proxy_uri is not None: 

1077 raise ValueError("Can not split Proxy-URI into options") 

1078 outer_uri = inner_message.remote.uri_base 

1079 inner_message.remote = None 

1080 inner_message.opt.proxy_scheme = None 

1081 

1082 if message.opt.observe is None: 

1083 outer_code = POST 

1084 else: 

1085 outer_code = FETCH 

1086 else: 

1087 outer_host = None 

1088 proxy_uri = None 

1089 

1090 inner_message = message.copy() 

1091 

1092 outer_code = request_id.code_style.response 

1093 

1094 # no max-age because these are always successsful responses 

1095 outer_message = Message( 

1096 code=outer_code, 

1097 uri_host=outer_host, 

1098 observe=None if message.code.is_response() else message.opt.observe, 

1099 ) 

1100 if proxy_uri is not None: 

1101 outer_message.set_request_uri(outer_uri) 

1102 

1103 plaintext = bytes([inner_message.code]) + inner_message.opt.encode() 

1104 if inner_message.payload: 

1105 plaintext += bytes([0xFF]) 

1106 plaintext += inner_message.payload 

1107 

1108 return outer_message, plaintext 

1109 

1110 def _build_new_nonce(self): 

1111 """This implements generation of a new nonce, assembled as per Figure 5 

1112 of draft-ietf-core-object-security-06. Returns the shortened partial IV 

1113 as well.""" 

1114 seqno = self.new_sequence_number() 

1115 

1116 partial_iv = seqno.to_bytes(5, "big") 

1117 

1118 return ( 

1119 self._construct_nonce(partial_iv, self.sender_id), 

1120 partial_iv.lstrip(b"\0") or b"\0", 

1121 ) 

1122 

1123 # sequence number handling 

1124 

1125 def new_sequence_number(self): 

1126 """Return a new sequence number; the implementation is responsible for 

1127 never returning the same value twice in a given security context. 

1128 

1129 May raise ContextUnavailable.""" 

1130 retval = self.sender_sequence_number 

1131 if retval >= MAX_SEQNO: 

1132 raise ContextUnavailable("Sequence number too large, context is exhausted.") 

1133 self.sender_sequence_number += 1 

1134 self.post_seqnoincrease() 

1135 return retval 

1136 

1137 # implementation defined 

1138 

1139 @abc.abstractmethod 

1140 def post_seqnoincrease(self): 

1141 """Ensure that sender_sequence_number is stored""" 

1142 raise 

1143 

1144 def context_from_response(self, unprotected_bag) -> CanUnprotect: 

1145 """When receiving a response to a request protected with this security 

1146 context, pick the security context with which to unprotect the response 

1147 given the unprotected information from the Object-Security option. 

1148 

1149 This allow picking the right security context in a group response, and 

1150 helps getting a new short-lived context for B.2 mode. The default 

1151 behaivor is returning self. 

1152 """ 

1153 

1154 # FIXME justify by moving into a mixin for CanProtectAndUnprotect 

1155 return self # type: ignore 

1156 

1157 

1158class CanUnprotect(BaseSecurityContext): 

1159 def unprotect(self, protected_message, request_id=None): 

1160 assert ( 

1161 (request_id is not None) == protected_message.code.is_response() 

1162 ), "Requestishness of code to unprotect does not match presence of request ID" 

1163 is_response = protected_message.code.is_response() 

1164 

1165 # Set to a raisable exception on replay check failures; it will be 

1166 # raised, but the package may still be processed in the course of Echo handling. 

1167 replay_error = None 

1168 

1169 protected_serialized, protected, unprotected, ciphertext = ( 

1170 self._extract_encrypted0(protected_message) 

1171 ) 

1172 

1173 if protected: 

1174 raise ProtectionInvalid("The protected field is not empty") 

1175 

1176 # FIXME check for duplicate keys in protected 

1177 

1178 if unprotected.pop(COSE_KID_CONTEXT, self.id_context) != self.id_context: 

1179 # FIXME is this necessary? 

1180 raise ProtectionInvalid("Sender ID context does not match") 

1181 

1182 if unprotected.pop(COSE_KID, self.recipient_id) != self.recipient_id: 

1183 # for most cases, this is caught by the session ID dispatch, but in 

1184 # responses (where explicit sender IDs are atypical), this is a 

1185 # valid check 

1186 raise ProtectionInvalid("Sender ID does not match") 

1187 

1188 if COSE_PIV not in unprotected: 

1189 if not is_response: 

1190 raise ProtectionInvalid("No sequence number provided in request") 

1191 

1192 nonce = request_id.nonce 

1193 seqno = None # sentinel for not striking out anyting 

1194 partial_iv_short = request_id.partial_iv 

1195 partial_iv_generated_by = request_id.kid 

1196 else: 

1197 partial_iv_short = unprotected.pop(COSE_PIV) 

1198 partial_iv_generated_by = self.recipient_id 

1199 

1200 nonce = self._construct_nonce(partial_iv_short, self.recipient_id) 

1201 

1202 seqno = int.from_bytes(partial_iv_short, "big") 

1203 

1204 if not is_response: 

1205 if not self.recipient_replay_window.is_initialized(): 

1206 replay_error = ReplayError("Sequence number check unavailable") 

1207 elif not self.recipient_replay_window.is_valid(seqno): 

1208 replay_error = ReplayError("Sequence number was re-used") 

1209 

1210 if replay_error is not None and self.echo_recovery is None: 

1211 # Don't even try decoding if there is no reason to 

1212 raise replay_error 

1213 

1214 request_id = RequestIdentifiers( 

1215 self.recipient_id, 

1216 partial_iv_short, 

1217 nonce, 

1218 can_reuse_nonce=replay_error is None, 

1219 request_code=protected_message.code, 

1220 ) 

1221 

1222 if unprotected.pop(COSE_COUNTERSIGNATURE0, None) is not None: 

1223 try: 

1224 alg_signature = self.alg_signature 

1225 except NameError: 

1226 raise DecodeError( 

1227 "Group messages can not be decoded with this non-group context" 

1228 ) 

1229 

1230 siglen = alg_signature.signature_length 

1231 if len(ciphertext) < siglen: 

1232 raise DecodeError("Message too short for signature") 

1233 encrypted_signature = ciphertext[-siglen:] 

1234 

1235 keystream = self._kdf_for_keystreams( 

1236 partial_iv_generated_by, 

1237 partial_iv_short, 

1238 self.signature_encryption_key, 

1239 self.recipient_id, 

1240 INFO_TYPE_KEYSTREAM_REQUEST 

1241 if protected_message.code.is_request() 

1242 else INFO_TYPE_KEYSTREAM_RESPONSE, 

1243 ) 

1244 signature = _xor_bytes(encrypted_signature, keystream) 

1245 

1246 ciphertext = ciphertext[:-siglen] 

1247 else: 

1248 signature = None 

1249 

1250 if unprotected: 

1251 raise DecodeError("Unsupported unprotected option") 

1252 

1253 if ( 

1254 len(ciphertext) < self.alg_aead.tag_bytes + 1 

1255 ): # +1 assures access to plaintext[0] (the code) 

1256 raise ProtectionInvalid("Ciphertext too short") 

1257 

1258 external_aad = self._extract_external_aad( 

1259 protected_message, request_id, local_is_sender=False 

1260 ) 

1261 enc_structure = ["Encrypt0", protected_serialized, external_aad] 

1262 aad = cbor.dumps(enc_structure) 

1263 

1264 key = self._get_recipient_key(protected_message) 

1265 

1266 plaintext = self.alg_aead.decrypt(ciphertext, aad, key, nonce) 

1267 

1268 self._post_decrypt_checks( 

1269 external_aad, plaintext, protected_message, request_id 

1270 ) 

1271 

1272 if not is_response and seqno is not None and replay_error is None: 

1273 self.recipient_replay_window.strike_out(seqno) 

1274 

1275 if signature is not None: 

1276 # Only doing the expensive signature validation once the cheaper decyrption passed 

1277 alg_signature.verify( 

1278 signature, ciphertext, external_aad, self.recipient_public_key 

1279 ) 

1280 

1281 # FIXME add options from unprotected 

1282 

1283 unprotected_message = Message(code=plaintext[0]) 

1284 unprotected_message.payload = unprotected_message.opt.decode(plaintext[1:]) 

1285 

1286 try_initialize = ( 

1287 not self.recipient_replay_window.is_initialized() 

1288 and self.echo_recovery is not None 

1289 ) 

1290 if try_initialize: 

1291 if protected_message.code.is_request(): 

1292 # Either accept into replay window and clear replay error, or raise 

1293 # something that can turn into a 4.01,Echo response 

1294 if unprotected_message.opt.echo == self.echo_recovery: 

1295 self.recipient_replay_window.initialize_from_freshlyseen(seqno) 

1296 replay_error = None 

1297 else: 

1298 raise ReplayErrorWithEcho( 

1299 secctx=self, request_id=request_id, echo=self.echo_recovery 

1300 ) 

1301 else: 

1302 # We can initialize the replay window from a response as well. 

1303 # The response is guaranteed fresh as it was AEAD-decoded to 

1304 # match a request sent by this process. 

1305 # 

1306 # This is rare, as it only works when the server uses an own 

1307 # sequence number, eg. when sending a notification or when 

1308 # acting again on a retransmitted safe request whose response 

1309 # it did not cache. 

1310 # 

1311 # Nothing bad happens if we can't make progress -- we just 

1312 # don't initialize the replay window that wouldn't have been 

1313 # checked for a response anyway. 

1314 if seqno is not None: 

1315 self.recipient_replay_window.initialize_from_freshlyseen(seqno) 

1316 

1317 if replay_error is not None: 

1318 raise replay_error 

1319 

1320 if unprotected_message.code.is_request(): 

1321 if protected_message.opt.observe != 0: 

1322 unprotected_message.opt.observe = None 

1323 else: 

1324 if protected_message.opt.observe is not None: 

1325 # -1 ensures that they sort correctly in later reordering 

1326 # detection. Note that neither -1 nor high (>3 byte) sequence 

1327 # numbers can be serialized in the Observe option, but they are 

1328 # in this implementation accepted for passing around. 

1329 unprotected_message.opt.observe = -1 if seqno is None else seqno 

1330 

1331 return unprotected_message, request_id 

1332 

1333 def _get_recipient_key(self, protected_message): 

1334 """Customization hook of the unprotect function 

1335 

1336 While most security contexts have a fixed recipient key, deterministic 

1337 requests build it on demand.""" 

1338 return self.recipient_key 

1339 

1340 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id): 

1341 """Customization hook of the unprotect function after decryption 

1342 

1343 While most security contexts are good with the default checks, 

1344 deterministic requests need to perform additional checks while AAD and 

1345 plaintext information is still available, and modify the request_id for 

1346 the later protection step of the response.""" 

1347 

1348 @staticmethod 

1349 def _uncompress(option_data, payload): 

1350 if option_data == b"": 

1351 firstbyte = 0 

1352 else: 

1353 firstbyte = option_data[0] 

1354 tail = option_data[1:] 

1355 

1356 unprotected = {} 

1357 

1358 if firstbyte & COMPRESSION_BITS_RESERVED: 

1359 raise DecodeError("Protected data uses reserved fields") 

1360 

1361 pivsz = firstbyte & COMPRESSION_BITS_N 

1362 if pivsz: 

1363 if len(tail) < pivsz: 

1364 raise DecodeError("Partial IV announced but not present") 

1365 unprotected[COSE_PIV] = tail[:pivsz] 

1366 tail = tail[pivsz:] 

1367 

1368 if firstbyte & COMPRESSION_BIT_H: 

1369 # kid context hint 

1370 s = tail[0] 

1371 if len(tail) - 1 < s: 

1372 raise DecodeError("Context hint announced but not present") 

1373 tail = tail[1:] 

1374 unprotected[COSE_KID_CONTEXT] = tail[:s] 

1375 tail = tail[s:] 

1376 

1377 if firstbyte & COMPRESSION_BIT_K: 

1378 kid = tail 

1379 unprotected[COSE_KID] = kid 

1380 

1381 if firstbyte & COMPRESSION_BIT_GROUP: 

1382 # Not really; As this is (also) used early on (before the KID 

1383 # context is even known, because it's just getting extracted), this 

1384 # is returning an incomplete value here and leaves it to the later 

1385 # processing to strip the right number of bytes from the ciphertext 

1386 unprotected[COSE_COUNTERSIGNATURE0] = PRESENT_BUT_NO_VALUE_YET 

1387 

1388 return b"", {}, unprotected, payload 

1389 

1390 @classmethod 

1391 def _extract_encrypted0(cls, message): 

1392 if message.opt.oscore is None: 

1393 raise NotAProtectedMessage("No Object-Security option present", message) 

1394 

1395 protected_serialized, protected, unprotected, ciphertext = cls._uncompress( 

1396 message.opt.oscore, message.payload 

1397 ) 

1398 return protected_serialized, protected, unprotected, ciphertext 

1399 

1400 # implementation defined 

1401 

1402 def context_for_response(self) -> CanProtect: 

1403 """After processing a request with this context, with which security 

1404 context should an outgoing response be protected? By default, it's the 

1405 same context.""" 

1406 # FIXME: Is there any way in which the handler may want to influence 

1407 # the decision taken here? Or would, then, the handler just call a more 

1408 # elaborate but similar function when setting the response's remote 

1409 # already? 

1410 

1411 # FIXME justify by moving into a mixin for CanProtectAndUnprotect 

1412 return self # type: ignore 

1413 

1414 

1415class SecurityContextUtils(BaseSecurityContext): 

1416 def _kdf(self, salt, ikm, role_id, out_type): 

1417 """The HKDF as used to derive sender and recipient key and IV in 

1418 RFC8613 Section 3.2.1, and analogously the Group Encryption Key of oscore-groupcomm. 

1419 """ 

1420 

1421 # The field in info is called `alg_aead` defined in RFC8613, but in 

1422 # group OSCORE something that's very clearly *not* alg_aead is put in 

1423 # there. 

1424 the_field_called_alg_aead = self.alg_aead.value 

1425 

1426 if out_type == "Key": 

1427 out_bytes = self.alg_aead.key_bytes 

1428 elif out_type == "IV": 

1429 out_bytes = max( 

1430 ( 

1431 a.iv_bytes 

1432 for a in [self.alg_aead, getattr(self, "alg_group_enc", None)] 

1433 if a is not None 

1434 ) 

1435 ) 

1436 elif out_type == "SEKey": 

1437 # "While the obtained Signature Encryption Key is never used with 

1438 # the Group Encryption Algorithm, its length was chosen to obtain a 

1439 # matching level of security." 

1440 out_bytes = self.alg_group_enc.key_bytes 

1441 

1442 the_field_called_alg_aead = self.alg_group_enc.value 

1443 else: 

1444 raise ValueError("Output type not recognized") 

1445 

1446 info = [ 

1447 role_id, 

1448 self.id_context, 

1449 the_field_called_alg_aead, 

1450 out_type, 

1451 out_bytes, 

1452 ] 

1453 return self._kdf_lowlevel(salt, ikm, info, out_bytes) 

1454 

1455 def _kdf_for_keystreams(self, piv_generated_by, salt, ikm, role_id, out_type): 

1456 """The HKDF as used to derive the keystreams of oscore-groupcomm.""" 

1457 

1458 out_bytes = self.alg_signature.signature_length 

1459 

1460 assert out_type in ( 

1461 INFO_TYPE_KEYSTREAM_REQUEST, 

1462 INFO_TYPE_KEYSTREAM_RESPONSE, 

1463 ), "Output type not recognized" 

1464 

1465 info = [ 

1466 piv_generated_by, 

1467 self.id_context, 

1468 out_type, 

1469 out_bytes, 

1470 ] 

1471 return self._kdf_lowlevel(salt, ikm, info, out_bytes) 

1472 

1473 def _kdf_lowlevel(self, salt: bytes, ikm: bytes, info: list, l: int) -> bytes: # noqa: E741 (signature follows RFC definition) 

1474 """The HKDF function as used in RFC8613 and oscore-groupcomm (notated 

1475 there as ``something = HKDF(...)`` 

1476 

1477 Note that `info` typically contains `L` at some point. 

1478 

1479 When `info` takes the conventional structure of pid, id_context, 

1480 ald_aead, type, L], it may make sense to extend the `_kdf` function to 

1481 support that case, or `_kdf_for_keystreams` for a different structure, as 

1482 they are the more high-level tools.""" 

1483 hkdf = HKDF( 

1484 algorithm=self.hashfun, 

1485 length=l, 

1486 salt=salt, 

1487 info=cbor.dumps(info), 

1488 backend=_hash_backend, 

1489 ) 

1490 expanded = hkdf.derive(ikm) 

1491 return expanded 

1492 

1493 def derive_keys(self, master_salt, master_secret): 

1494 """Populate sender_key, recipient_key and common_iv from the algorithm, 

1495 hash function and id_context already configured beforehand, and from 

1496 the passed salt and secret.""" 

1497 

1498 self.sender_key = self._kdf(master_salt, master_secret, self.sender_id, "Key") 

1499 self.recipient_key = self._kdf( 

1500 master_salt, master_secret, self.recipient_id, "Key" 

1501 ) 

1502 

1503 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV") 

1504 

1505 # really more of the Credentials interface 

1506 

1507 def get_oscore_context_for(self, unprotected): 

1508 """Return a sutiable context (most easily self) for an incoming request 

1509 if its unprotected data (COSE_KID, COSE_KID_CONTEXT) fit its 

1510 description. If it doesn't match, it returns None. 

1511 

1512 The default implementation just strictly checks for whether kid and any 

1513 kid context match (not matching if a local KID context is set but none 

1514 is given in the request); modes like Group OSCORE can spin up aspect 

1515 objects here. 

1516 """ 

1517 if ( 

1518 unprotected.get(COSE_KID, None) == self.recipient_id 

1519 and unprotected.get(COSE_KID_CONTEXT, None) == self.id_context 

1520 ): 

1521 return self 

1522 

1523 

1524class ReplayWindow: 

1525 """A regular replay window of a fixed size. 

1526 

1527 It is implemented as an index and a bitfield (represented by an integer) 

1528 whose least significant bit represents the seqyence number of the index, 

1529 and a 1 indicates that a number was seen. No shenanigans around implicit 

1530 leading ones (think floating point normalization) happen. 

1531 

1532 >>> w = ReplayWindow(32, lambda: None) 

1533 >>> w.initialize_empty() 

1534 >>> w.strike_out(5) 

1535 >>> w.is_valid(3) 

1536 True 

1537 >>> w.is_valid(5) 

1538 False 

1539 >>> w.strike_out(0) 

1540 >>> w.strike_out(1) 

1541 >>> w.strike_out(2) 

1542 >>> w.is_valid(1) 

1543 False 

1544 

1545 Jumping ahead by the window size invalidates older numbers: 

1546 

1547 >>> w.is_valid(4) 

1548 True 

1549 >>> w.strike_out(35) 

1550 >>> w.is_valid(4) 

1551 True 

1552 >>> w.strike_out(36) 

1553 >>> w.is_valid(4) 

1554 False 

1555 

1556 Usage safety 

1557 ------------ 

1558 

1559 For every key, the replay window can only be initielized empty once. On 

1560 later uses, it needs to be persisted by storing the output of 

1561 self.persist() somewhere and loaded from that persisted data. 

1562 

1563 It is acceptable to store persistance data in the strike_out_callback, but 

1564 that must then ensure that the data is written (flushed to a file or 

1565 committed to a database), but that is usually inefficient. 

1566 

1567 Stability 

1568 --------- 

1569 

1570 This class is not considered for stabilization yet and an implementation 

1571 detail of the SecurityContext implementation(s). 

1572 """ 

1573 

1574 _index = None 

1575 """Sequence number represented by the least significant bit of _bitfield""" 

1576 _bitfield = None 

1577 """Integer interpreted as a bitfield, self._size wide. A digit 1 at any bit 

1578 indicates that the bit's index (its power of 2) plus self._index was 

1579 already seen.""" 

1580 

1581 def __init__(self, size, strike_out_callback): 

1582 self._size = size 

1583 self.strike_out_callback = strike_out_callback 

1584 

1585 def is_initialized(self): 

1586 return self._index is not None 

1587 

1588 def initialize_empty(self): 

1589 self._index = 0 

1590 self._bitfield = 0 

1591 

1592 def initialize_from_persisted(self, persisted): 

1593 self._index = persisted["index"] 

1594 self._bitfield = persisted["bitfield"] 

1595 

1596 def initialize_from_freshlyseen(self, seen): 

1597 """Initialize the replay window with a particular value that is just 

1598 being observed in a fresh (ie. generated by the peer later than any 

1599 messages processed before state was lost here) message. This marks the 

1600 seen sequence number and all preceding it as invalid, and and all later 

1601 ones as valid.""" 

1602 self._index = seen 

1603 self._bitfield = 1 

1604 

1605 def is_valid(self, number): 

1606 if number < self._index: 

1607 return False 

1608 if number >= self._index + self._size: 

1609 return True 

1610 return (self._bitfield >> (number - self._index)) & 1 == 0 

1611 

1612 def strike_out(self, number): 

1613 if not self.is_valid(number): 

1614 raise ValueError( 

1615 "Sequence number is not valid any more and " 

1616 "thus can't be removed from the window" 

1617 ) 

1618 overshoot = number - (self._index + self._size - 1) 

1619 if overshoot > 0: 

1620 self._index += overshoot 

1621 self._bitfield >>= overshoot 

1622 assert self.is_valid(number), "Sequence number was not valid before strike-out" 

1623 self._bitfield |= 1 << (number - self._index) 

1624 

1625 self.strike_out_callback() 

1626 

1627 def persist(self): 

1628 """Return a dict containing internal state which can be passed to init 

1629 to recreated the replay window.""" 

1630 

1631 return {"index": self._index, "bitfield": self._bitfield} 

1632 

1633 

1634class FilesystemSecurityContext( 

1635 CanProtect, CanUnprotect, SecurityContextUtils, credentials._Objectish 

1636): 

1637 """Security context stored in a directory as distinct files containing 

1638 containing 

1639 

1640 * Master secret, master salt, sender and recipient ID, 

1641 optionally algorithm, the KDF hash function, and replay window size 

1642 (settings.json and secrets.json, where the latter is typically readable 

1643 only for the user) 

1644 * sequence numbers and replay windows (sequence.json, the only file the 

1645 process needs write access to) 

1646 

1647 The static parameters can all either be placed in settings.json or 

1648 secrets.json, but must not be present in both; the presence of either file 

1649 is sufficient. 

1650 

1651 .. warning:: 

1652 

1653 Security contexts must never be copied around and used after another 

1654 copy was used. They should only ever be moved, and if they are copied 

1655 (eg. as a part of a system backup), restored contexts must not be used 

1656 again; they need to be replaced with freshly created ones. 

1657 

1658 An additional file named `lock` is created to prevent the accidental use of 

1659 a context by to concurrent programs. 

1660 

1661 Note that the sequence number file is updated in an atomic fashion which 

1662 requires file creation privileges in the directory. If privilege separation 

1663 between settings/key changes and sequence number changes is desired, one 

1664 way to achieve that on Linux is giving the aiocoap process's user group 

1665 write permissions on the directory and setting the sticky bit on the 

1666 directory, thus forbidding the user to remove the settings/secret files not 

1667 owned by him. 

1668 

1669 Writes due to sent sequence numbers are reduced by applying a variation on 

1670 the mechanism of RFC8613 Appendix B.1.1 (incrementing the persisted sender 

1671 seqence number in steps of `k`). That value is automatically grown from 

1672 sequence_number_chunksize_start up to sequence_number_chunksize_limit. 

1673 At runtime, the receive window is not stored but kept indeterminate. In 

1674 case of an abnormal shutdown, the server uses the mechanism described in 

1675 Appendix B.1.2 to recover. 

1676 """ 

1677 

1678 # possibly overridden in constructor 

1679 alg_aead = algorithms[DEFAULT_ALGORITHM] 

1680 

1681 class LoadError(ValueError): 

1682 """Exception raised with a descriptive message when trying to load a 

1683 faulty security context""" 

1684 

1685 def __init__( 

1686 self, 

1687 basedir: str, 

1688 sequence_number_chunksize_start=10, 

1689 sequence_number_chunksize_limit=10000, 

1690 ): 

1691 self.basedir = basedir 

1692 

1693 self.lockfile: Optional[filelock.FileLock] = filelock.FileLock( 

1694 os.path.join(basedir, "lock") 

1695 ) 

1696 # 0.001: Just fail if it can't be acquired 

1697 # See https://github.com/benediktschmitt/py-filelock/issues/57 

1698 try: 

1699 self.lockfile.acquire(timeout=0.001) 

1700 # see https://github.com/PyCQA/pycodestyle/issues/703 

1701 except: # noqa: E722 

1702 # No lock, no loading, no need to fail in __del__ 

1703 self.lockfile = None 

1704 raise 

1705 

1706 # Always enabled as committing to a file for every received request 

1707 # would be a terrible burden. 

1708 self.echo_recovery = secrets.token_bytes(8) 

1709 

1710 try: 

1711 self._load() 

1712 except KeyError as k: 

1713 raise self.LoadError("Configuration key missing: %s" % (k.args[0],)) 

1714 

1715 self.sequence_number_chunksize_start = sequence_number_chunksize_start 

1716 self.sequence_number_chunksize_limit = sequence_number_chunksize_limit 

1717 self.sequence_number_chunksize = sequence_number_chunksize_start 

1718 

1719 self.sequence_number_persisted = self.sender_sequence_number 

1720 

1721 def _load(self): 

1722 # doesn't check for KeyError on every occasion, relies on __init__ to 

1723 # catch that 

1724 

1725 data = {} 

1726 for readfile in ("secret.json", "settings.json"): 

1727 try: 

1728 with open(os.path.join(self.basedir, readfile)) as f: 

1729 filedata = json.load(f) 

1730 except FileNotFoundError: 

1731 continue 

1732 

1733 for key, value in filedata.items(): 

1734 if key.endswith("_hex"): 

1735 key = key[:-4] 

1736 value = binascii.unhexlify(value) 

1737 elif key.endswith("_ascii"): 

1738 key = key[:-6] 

1739 value = value.encode("ascii") 

1740 

1741 if key in data: 

1742 raise self.LoadError( 

1743 "Datum %r present in multiple input files at %r." 

1744 % (key, self.basedir) 

1745 ) 

1746 

1747 data[key] = value 

1748 

1749 self.alg_aead = algorithms[data.get("algorithm", DEFAULT_ALGORITHM)] 

1750 self.hashfun = hashfunctions[data.get("kdf-hashfun", DEFAULT_HASHFUNCTION)] 

1751 

1752 windowsize = data.get("window", DEFAULT_WINDOWSIZE) 

1753 if not isinstance(windowsize, int): 

1754 raise self.LoadError("Non-integer replay window") 

1755 

1756 self.sender_id = data["sender-id"] 

1757 self.recipient_id = data["recipient-id"] 

1758 

1759 if ( 

1760 max(len(self.sender_id), len(self.recipient_id)) 

1761 > self.alg_aead.iv_bytes - 6 

1762 ): 

1763 raise self.LoadError( 

1764 "Sender or Recipient ID too long (maximum length %s for this algorithm)" 

1765 % (self.alg_aead.iv_bytes - 6) 

1766 ) 

1767 

1768 master_secret = data["secret"] 

1769 master_salt = data.get("salt", b"") 

1770 self.id_context = data.get("id-context", None) 

1771 

1772 self.derive_keys(master_salt, master_secret) 

1773 

1774 self.recipient_replay_window = ReplayWindow( 

1775 windowsize, self._replay_window_changed 

1776 ) 

1777 try: 

1778 with open(os.path.join(self.basedir, "sequence.json")) as f: 

1779 sequence = json.load(f) 

1780 except FileNotFoundError: 

1781 self.sender_sequence_number = 0 

1782 self.recipient_replay_window.initialize_empty() 

1783 self.replay_window_persisted = True 

1784 else: 

1785 self.sender_sequence_number = int(sequence["next-to-send"]) 

1786 received = sequence["received"] 

1787 if received == "unknown": 

1788 # The replay window will stay uninitialized, which triggers 

1789 # Echo recovery 

1790 self.replay_window_persisted = False 

1791 else: 

1792 try: 

1793 self.recipient_replay_window.initialize_from_persisted(received) 

1794 except (ValueError, TypeError, KeyError): 

1795 # Not being particularly careful about what could go wrong: If 

1796 # someone tampers with the replay data, we're already in *big* 

1797 # trouble, of which I fail to see how it would become worse 

1798 # than a crash inside the application around "failure to 

1799 # right-shift a string" or that like; at worst it'd result in 

1800 # nonce reuse which tampering with the replay window file 

1801 # already does. 

1802 raise self.LoadError( 

1803 "Persisted replay window state was not understood" 

1804 ) 

1805 self.replay_window_persisted = True 

1806 

1807 # This is called internally whenever a new sequence number is taken or 

1808 # crossed out from the window, and blocks a lot; B.1 mode mitigates that. 

1809 # 

1810 # Making it async and block in a threadpool would mitigate the blocking of 

1811 # other messages, but the more visible effect of this will be that no 

1812 # matter if sync or async, a reply will need to wait for a file sync 

1813 # operation to conclude. 

1814 def _store(self): 

1815 tmphand, tmpnam = tempfile.mkstemp( 

1816 dir=self.basedir, prefix=".sequence-", suffix=".json", text=True 

1817 ) 

1818 

1819 data = {"next-to-send": self.sequence_number_persisted} 

1820 if not self.replay_window_persisted: 

1821 data["received"] = "unknown" 

1822 else: 

1823 data["received"] = self.recipient_replay_window.persist() 

1824 

1825 # Using io.open (instead os.fdopen) and binary / write with encode 

1826 # rather than dumps as that works even while the interpreter is 

1827 # shutting down. 

1828 # 

1829 # This can be relaxed when there is a defined shutdown sequence for 

1830 # security contexts that's triggered from the general context shutdown 

1831 # -- but right now, there isn't. 

1832 with io.open(tmphand, "wb") as tmpfile: 

1833 tmpfile.write(json.dumps(data).encode("utf8")) 

1834 tmpfile.flush() 

1835 os.fsync(tmpfile.fileno()) 

1836 

1837 os.replace(tmpnam, os.path.join(self.basedir, "sequence.json")) 

1838 

1839 def _replay_window_changed(self): 

1840 if self.replay_window_persisted: 

1841 # Just remove the sequence numbers once from the file 

1842 self.replay_window_persisted = False 

1843 self._store() 

1844 

1845 def post_seqnoincrease(self): 

1846 if self.sender_sequence_number > self.sequence_number_persisted: 

1847 self.sequence_number_persisted += self.sequence_number_chunksize 

1848 

1849 self.sequence_number_chunksize = min( 

1850 self.sequence_number_chunksize * 2, self.sequence_number_chunksize_limit 

1851 ) 

1852 # FIXME: this blocks -- see https://github.com/chrysn/aiocoap/issues/178 

1853 self._store() 

1854 

1855 # The = case would only happen if someone deliberately sets all 

1856 # numbers to 1 to force persisting on every step 

1857 assert ( 

1858 self.sender_sequence_number <= self.sequence_number_persisted 

1859 ), "Using a sequence number that has been persisted already" 

1860 

1861 def _destroy(self): 

1862 """Release the lock file, and ensure tha he object has become 

1863 unusable. 

1864 

1865 If there is unpersisted state from B.1 operation, the actually used 

1866 number and replay window gets written back to the file to allow 

1867 resumption without wasting digits or round-trips. 

1868 """ 

1869 # FIXME: Arrange for a more controlled shutdown through the credentials 

1870 

1871 self.replay_window_persisted = True 

1872 self.sequence_number_persisted = self.sender_sequence_number 

1873 self._store() 

1874 

1875 del self.sender_key 

1876 del self.recipient_key 

1877 

1878 os.unlink(self.lockfile.lock_file) 

1879 self.lockfile.release() 

1880 

1881 self.lockfile = None 

1882 

1883 def __del__(self): 

1884 if self.lockfile is not None: 

1885 self._destroy() 

1886 

1887 @classmethod 

1888 def from_item(cls, init_data): 

1889 """Overriding _Objectish's from_item because the parameter name for 

1890 basedir is contextfile for historical reasons""" 

1891 

1892 def constructor( 

1893 basedir: Optional[str] = None, contextfile: Optional[str] = None 

1894 ): 

1895 if basedir is not None and contextfile is not None: 

1896 raise credentials.CredentialsLoadError( 

1897 "Conflicting arguments basedir and contextfile; just contextfile instead" 

1898 ) 

1899 if basedir is None and contextfile is None: 

1900 raise credentials.CredentialsLoadError("Missing item 'basedir'") 

1901 if contextfile is not None: 

1902 warnings.warn( 

1903 "Property contextfile was renamed to basedir in OSCORE credentials entries", 

1904 DeprecationWarning, 

1905 stacklevel=2, 

1906 ) 

1907 basedir = contextfile 

1908 assert ( 

1909 basedir is not None 

1910 ) # This helps mypy which would otherwise not see that the above ensures this already 

1911 return cls(basedir) 

1912 

1913 return credentials._call_from_structureddata( 

1914 constructor, cls.__name__, init_data 

1915 ) 

1916 

1917 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

1918 return set((self.recipient_id,)) 

1919 

1920 

1921class GroupContext(ContextWhereExternalAadIsGroup, BaseSecurityContext): 

1922 is_signing = True 

1923 responses_send_kid = True 

1924 

1925 @abc.abstractproperty 

1926 def private_key(self): 

1927 """Private key used to sign outgoing messages. 

1928 

1929 Contexts not designed to send messages may raise a RuntimeError here; 

1930 that necessity may later go away if some more accurate class modelling 

1931 is found.""" 

1932 

1933 @abc.abstractproperty 

1934 def recipient_public_key(self): 

1935 """Public key used to verify incoming messages. 

1936 

1937 Contexts not designed to receive messages (because they'd have aspects 

1938 for that) may raise a RuntimeError here; that necessity may later go 

1939 away if some more accurate class modelling is found.""" 

1940 

1941 

1942class SimpleGroupContext(GroupContext, CanProtect, CanUnprotect, SecurityContextUtils): 

1943 """A context for an OSCORE group 

1944 

1945 This is a non-persistable version of a group context that does not support 

1946 any group manager or rekeying; it is set up statically at startup. 

1947 

1948 It is intended for experimentation and demos, but aims to be correct enough 

1949 to be usable securely. 

1950 """ 

1951 

1952 # set during initialization (making all those attributes rather than 

1953 # possibly properties as they might be in super) 

1954 sender_id = None 

1955 id_context = None # type: ignore 

1956 private_key = None 

1957 alg_aead = None 

1958 hashfun = None # type: ignore 

1959 alg_signature = None 

1960 alg_group_enc = None 

1961 alg_pairwise_key_agreement = None 

1962 sender_auth_cred = None # type: ignore 

1963 group_manager_cred = None # type: ignore 

1964 cred_fmt = None 

1965 # This is currently not evaluated, but any GM interaction will need to have this information available. 

1966 group_manager_cred_fmt = None 

1967 

1968 def __init__( 

1969 self, 

1970 alg_aead, 

1971 hashfun, 

1972 alg_signature, 

1973 alg_group_enc, 

1974 alg_pairwise_key_agreement, 

1975 group_id, 

1976 master_secret, 

1977 master_salt, 

1978 sender_id, 

1979 private_key, 

1980 sender_auth_cred, 

1981 peers, 

1982 group_manager_cred, 

1983 cred_fmt=COSE_KCCS, 

1984 group_manager_cred_fmt=COSE_KCCS, 

1985 ): 

1986 self.sender_id = sender_id 

1987 self.id_context = group_id 

1988 self.private_key = private_key 

1989 self.alg_aead = alg_aead 

1990 self.hashfun = hashfun 

1991 self.alg_signature = alg_signature 

1992 self.alg_group_enc = alg_group_enc 

1993 self.alg_pairwise_key_agreement = alg_pairwise_key_agreement 

1994 self.sender_auth_cred = sender_auth_cred 

1995 self.group_manager_cred = group_manager_cred 

1996 self.cred_fmt = cred_fmt 

1997 self.group_manager_cred_fmt = group_manager_cred_fmt 

1998 

1999 self.peers = peers.keys() 

2000 self.recipient_public_keys = { 

2001 k: self._parse_credential(v) for (k, v) in peers.items() 

2002 } 

2003 self.recipient_auth_creds = peers 

2004 self.recipient_replay_windows = {} 

2005 for k in self.peers: 

2006 # no need to persist, the whole group is ephemeral 

2007 w = ReplayWindow(32, lambda: None) 

2008 w.initialize_empty() 

2009 self.recipient_replay_windows[k] = w 

2010 

2011 self.derive_keys(master_salt, master_secret) 

2012 self.sender_sequence_number = 0 

2013 

2014 sender_public_key = self._parse_credential(sender_auth_cred) 

2015 if ( 

2016 self.alg_signature.public_from_private(self.private_key) 

2017 != sender_public_key 

2018 ): 

2019 raise ValueError( 

2020 "The key in the provided sender credential does not match the private key" 

2021 ) 

2022 

2023 def _parse_credential(self, credential: bytes): 

2024 """Extract the public key (in the public_key format the respective 

2025 AlgorithmCountersign needs) from credentials. This raises a ValueError 

2026 if the credentials do not match the group's cred_fmt, or if the 

2027 parameters do not match those configured in the group. 

2028 

2029 This currently discards any information that is present in the 

2030 credential that exceeds the key. (In a future version, this could 

2031 return both the key and extracted other data, where that other data 

2032 would be stored with the peer this is parsed from). 

2033 """ 

2034 

2035 if self.cred_fmt != COSE_KCCS: 

2036 raise ValueError( 

2037 "Credential parsing is currently only implemented for CCSs" 

2038 ) 

2039 

2040 assert self.alg_signature is not None 

2041 

2042 return self.alg_signature.from_kccs(credential) 

2043 

2044 def __repr__(self): 

2045 return "<%s with group %r sender_id %r and %d peers>" % ( 

2046 type(self).__name__, 

2047 self.id_context.hex(), 

2048 self.sender_id.hex(), 

2049 len(self.peers), 

2050 ) 

2051 

2052 @property 

2053 def recipient_public_key(self): 

2054 raise RuntimeError( 

2055 "Group context without key indication was used for verification" 

2056 ) 

2057 

2058 def derive_keys(self, master_salt, master_secret): 

2059 # FIXME unify with parent? 

2060 

2061 self.sender_key = self._kdf(master_salt, master_secret, self.sender_id, "Key") 

2062 self.recipient_keys = { 

2063 recipient_id: self._kdf(master_salt, master_secret, recipient_id, "Key") 

2064 for recipient_id in self.peers 

2065 } 

2066 

2067 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV") 

2068 

2069 # but this one is new 

2070 

2071 self.signature_encryption_key = self._kdf( 

2072 master_salt, master_secret, b"", "SEKey" 

2073 ) 

2074 

2075 def post_seqnoincrease(self): 

2076 """No-op because it's ephemeral""" 

2077 

2078 def context_from_response(self, unprotected_bag) -> CanUnprotect: 

2079 # sender ID *needs to be* here -- if this were a pairwise request, it 

2080 # would not run through here 

2081 try: 

2082 sender_kid = unprotected_bag[COSE_KID] 

2083 except KeyError: 

2084 raise DecodeError("Group server failed to send own sender KID") 

2085 

2086 if COSE_COUNTERSIGNATURE0 in unprotected_bag: 

2087 return _GroupContextAspect(self, sender_kid) 

2088 else: 

2089 return _PairwiseContextAspect(self, sender_kid) 

2090 

2091 def get_oscore_context_for(self, unprotected): 

2092 if unprotected.get(COSE_KID_CONTEXT, None) != self.id_context: 

2093 return None 

2094 

2095 kid = unprotected.get(COSE_KID, None) 

2096 if kid in self.peers: 

2097 if COSE_COUNTERSIGNATURE0 in unprotected: 

2098 return _GroupContextAspect(self, kid) 

2099 elif self.recipient_public_keys[kid] is DETERMINISTIC_KEY: 

2100 return _DeterministicUnprotectProtoAspect(self, kid) 

2101 else: 

2102 return _PairwiseContextAspect(self, kid) 

2103 

2104 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

2105 # not conflicting: groups always send KID Context 

2106 return set() 

2107 

2108 # yet to stabilize... 

2109 

2110 def pairwise_for(self, recipient_id): 

2111 return _PairwiseContextAspect(self, recipient_id) 

2112 

2113 def for_sending_deterministic_requests( 

2114 self, deterministic_id, target_server: Optional[bytes] 

2115 ): 

2116 return _DeterministicProtectProtoAspect(self, deterministic_id, target_server) 

2117 

2118 

2119class _GroupContextAspect(GroupContext, CanUnprotect, SecurityContextUtils): 

2120 """The concrete context this host has with a particular peer 

2121 

2122 As all actual data is stored in the underlying groupcontext, this acts as 

2123 an accessor to that object (which picks the right recipient key). 

2124 

2125 This accessor is for receiving messages in group mode from a particular 

2126 peer; it does not send (and turns into a pairwise context through 

2127 context_for_response before it comes to that). 

2128 """ 

2129 

2130 def __init__(self, groupcontext: GroupContext, recipient_id: bytes) -> None: 

2131 self.groupcontext = groupcontext 

2132 self.recipient_id = recipient_id 

2133 

2134 def __repr__(self): 

2135 return "<%s inside %r with the peer %r>" % ( 

2136 type(self).__name__, 

2137 self.groupcontext, 

2138 self.recipient_id.hex(), 

2139 ) 

2140 

2141 private_key = None 

2142 

2143 # not inline because the equivalent lambda would not be recognized by mypy 

2144 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2145 @property 

2146 def id_context(self): 

2147 return self.groupcontext.id_context 

2148 

2149 @property 

2150 def alg_aead(self): 

2151 return self.groupcontext.alg_aead 

2152 

2153 @property 

2154 def alg_signature(self): 

2155 return self.groupcontext.alg_signature 

2156 

2157 @property 

2158 def alg_group_enc(self): 

2159 return self.groupcontext.alg_group_enc 

2160 

2161 @property 

2162 def alg_pairwise_key_agreement(self): 

2163 return self.groupcontext.alg_pairwise_key_agreement 

2164 

2165 @property 

2166 def group_manager_cred(self): 

2167 return self.groupcontext.group_manager_cred 

2168 

2169 @property 

2170 def common_iv(self): 

2171 return self.groupcontext.common_iv 

2172 

2173 @property 

2174 def hashfun(self): 

2175 return self.groupcontext.hashfun 

2176 

2177 @property 

2178 def signature_encryption_key(self): 

2179 return self.groupcontext.signature_encryption_key 

2180 

2181 @property 

2182 def recipient_key(self): 

2183 return self.groupcontext.recipient_keys[self.recipient_id] 

2184 

2185 @property 

2186 def recipient_public_key(self): 

2187 return self.groupcontext.recipient_public_keys[self.recipient_id] 

2188 

2189 @property 

2190 def recipient_auth_cred(self): 

2191 return self.groupcontext.recipient_auth_creds[self.recipient_id] 

2192 

2193 @property 

2194 def recipient_replay_window(self): 

2195 return self.groupcontext.recipient_replay_windows[self.recipient_id] 

2196 

2197 def context_for_response(self): 

2198 return self.groupcontext.pairwise_for(self.recipient_id) 

2199 

2200 @property 

2201 def sender_auth_cred(self): 

2202 raise RuntimeError( 

2203 "Could relay the sender auth credential from the group context, but it shouldn't matter here" 

2204 ) 

2205 

2206 

2207class _PairwiseContextAspect( 

2208 GroupContext, CanProtect, CanUnprotect, SecurityContextUtils 

2209): 

2210 is_signing = False 

2211 

2212 def __init__(self, groupcontext, recipient_id): 

2213 self.groupcontext = groupcontext 

2214 self.recipient_id = recipient_id 

2215 

2216 shared_secret = self.alg_pairwise_key_agreement.staticstatic( 

2217 self.groupcontext.private_key, 

2218 self.groupcontext.recipient_public_keys[recipient_id], 

2219 ) 

2220 

2221 self.sender_key = self._kdf( 

2222 self.groupcontext.sender_key, 

2223 ( 

2224 self.groupcontext.sender_auth_cred 

2225 + self.groupcontext.recipient_auth_creds[recipient_id] 

2226 + shared_secret 

2227 ), 

2228 self.groupcontext.sender_id, 

2229 "Key", 

2230 ) 

2231 self.recipient_key = self._kdf( 

2232 self.groupcontext.recipient_keys[recipient_id], 

2233 ( 

2234 self.groupcontext.recipient_auth_creds[recipient_id] 

2235 + self.groupcontext.sender_auth_cred 

2236 + shared_secret 

2237 ), 

2238 self.recipient_id, 

2239 "Key", 

2240 ) 

2241 

2242 def __repr__(self): 

2243 return "<%s based on %r with the peer %r>" % ( 

2244 type(self).__name__, 

2245 self.groupcontext, 

2246 self.recipient_id.hex(), 

2247 ) 

2248 

2249 # FIXME: actually, only to be sent in requests 

2250 

2251 # not inline because the equivalent lambda would not be recognized by mypy 

2252 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2253 @property 

2254 def id_context(self): 

2255 return self.groupcontext.id_context 

2256 

2257 @property 

2258 def alg_aead(self): 

2259 return self.groupcontext.alg_aead 

2260 

2261 @property 

2262 def hashfun(self): 

2263 return self.groupcontext.hashfun 

2264 

2265 @property 

2266 def alg_signature(self): 

2267 return self.groupcontext.alg_signature 

2268 

2269 @property 

2270 def alg_group_enc(self): 

2271 return self.groupcontext.alg_group_enc 

2272 

2273 @property 

2274 def alg_pairwise_key_agreement(self): 

2275 return self.groupcontext.alg_pairwise_key_agreement 

2276 

2277 @property 

2278 def group_manager_cred(self): 

2279 return self.groupcontext.group_manager_cred 

2280 

2281 @property 

2282 def common_iv(self): 

2283 return self.groupcontext.common_iv 

2284 

2285 @property 

2286 def sender_id(self): 

2287 return self.groupcontext.sender_id 

2288 

2289 @property 

2290 def recipient_auth_cred(self): 

2291 return self.groupcontext.recipient_auth_creds[self.recipient_id] 

2292 

2293 @property 

2294 def sender_auth_cred(self): 

2295 return self.groupcontext.sender_auth_cred 

2296 

2297 @property 

2298 def recipient_replay_window(self): 

2299 return self.groupcontext.recipient_replay_windows[self.recipient_id] 

2300 

2301 # Set at initialization 

2302 recipient_key = None 

2303 sender_key = None 

2304 

2305 @property 

2306 def sender_sequence_number(self): 

2307 return self.groupcontext.sender_sequence_number 

2308 

2309 @sender_sequence_number.setter 

2310 def sender_sequence_number(self, new): 

2311 self.groupcontext.sender_sequence_number = new 

2312 

2313 def post_seqnoincrease(self): 

2314 self.groupcontext.post_seqnoincrease() 

2315 

2316 # same here -- not needed because not signing 

2317 private_key = property(post_seqnoincrease) 

2318 recipient_public_key = property(post_seqnoincrease) 

2319 

2320 def context_from_response(self, unprotected_bag) -> CanUnprotect: 

2321 if unprotected_bag.get(COSE_KID, self.recipient_id) != self.recipient_id: 

2322 raise DecodeError( 

2323 "Response coming from a different server than requested, not attempting to decrypt" 

2324 ) 

2325 

2326 if COSE_COUNTERSIGNATURE0 in unprotected_bag: 

2327 # It'd be an odd thing to do, but it's source verified, so the 

2328 # server hopefully has reasons to make this readable to other group 

2329 # members. 

2330 return _GroupContextAspect(self.groupcontext, self.recipient_id) 

2331 else: 

2332 return self 

2333 

2334 

2335class _DeterministicProtectProtoAspect( 

2336 ContextWhereExternalAadIsGroup, CanProtect, SecurityContextUtils 

2337): 

2338 """This implements the sending side of Deterministic Requests. 

2339 

2340 While simialr to a _PairwiseContextAspect, it only derives the key at 

2341 protection time, as the plain text is hashed into the key.""" 

2342 

2343 deterministic_hashfun = hashes.SHA256() 

2344 

2345 def __init__(self, groupcontext, sender_id, target_server: Optional[bytes]): 

2346 self.groupcontext = groupcontext 

2347 self.sender_id = sender_id 

2348 self.target_server = target_server 

2349 

2350 def __repr__(self): 

2351 return "<%s based on %r with the sender ID %r%s>" % ( 

2352 type(self).__name__, 

2353 self.groupcontext, 

2354 self.sender_id.hex(), 

2355 "limited to responses from %s" % self.target_server 

2356 if self.target_server is not None 

2357 else "", 

2358 ) 

2359 

2360 def new_sequence_number(self): 

2361 return 0 

2362 

2363 def post_seqnoincrease(self): 

2364 pass 

2365 

2366 def context_from_response(self, unprotected_bag): 

2367 if self.target_server is None: 

2368 if COSE_KID not in unprotected_bag: 

2369 raise DecodeError( 

2370 "Server did not send a KID and no particular one was addressed" 

2371 ) 

2372 else: 

2373 if unprotected_bag.get(COSE_KID, self.target_server) != self.target_server: 

2374 raise DecodeError( 

2375 "Response coming from a different server than requested, not attempting to decrypt" 

2376 ) 

2377 

2378 if COSE_COUNTERSIGNATURE0 not in unprotected_bag: 

2379 # Could just as well pass and later barf when the group context doesn't find a signature 

2380 raise DecodeError( 

2381 "Response to deterministic request came from unsecure pairwise context" 

2382 ) 

2383 

2384 return _GroupContextAspect( 

2385 self.groupcontext, unprotected_bag.get(COSE_KID, self.target_server) 

2386 ) 

2387 

2388 def _get_sender_key(self, outer_message, aad, plaintext, request_id): 

2389 if outer_message.code.is_response(): 

2390 raise RuntimeError("Deterministic contexts shouldn't protect responses") 

2391 

2392 basekey = self.groupcontext.recipient_keys[self.sender_id] 

2393 

2394 h = hashes.Hash(self.deterministic_hashfun) 

2395 h.update(basekey) 

2396 h.update(aad) 

2397 h.update(plaintext) 

2398 request_hash = h.finalize() 

2399 

2400 outer_message.opt.request_hash = request_hash 

2401 outer_message.code = FETCH 

2402 

2403 # By this time, the AADs have all been calculated already; setting this 

2404 # for the benefit of the response parsing later 

2405 request_id.request_hash = request_hash 

2406 # FIXME I don't think this ever comes to bear but want to be sure 

2407 # before removing this line (this should only be client-side) 

2408 request_id.can_reuse_nonce = False 

2409 # FIXME: we're still sending a h'00' PIV. Not wrong, just a wasted byte. 

2410 

2411 return self._kdf(basekey, request_hash, self.sender_id, "Key") 

2412 

2413 # details needed for various operations, especially eAAD generation 

2414 

2415 # not inline because the equivalent lambda would not be recognized by mypy 

2416 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2417 @property 

2418 def alg_aead(self): 

2419 return self.groupcontext.alg_aead 

2420 

2421 @property 

2422 def hashfun(self): 

2423 return self.groupcontext.hashfun 

2424 

2425 @property 

2426 def common_iv(self): 

2427 return self.groupcontext.common_iv 

2428 

2429 @property 

2430 def id_context(self): 

2431 return self.groupcontext.id_context 

2432 

2433 @property 

2434 def alg_signature(self): 

2435 return self.groupcontext.alg_signature 

2436 

2437 

2438class _DeterministicUnprotectProtoAspect( 

2439 ContextWhereExternalAadIsGroup, CanUnprotect, SecurityContextUtils 

2440): 

2441 """This implements the sending side of Deterministic Requests. 

2442 

2443 While simialr to a _PairwiseContextAspect, it only derives the key at 

2444 unprotection time, based on information given as Request-Hash.""" 

2445 

2446 # Unless None, this is the value by which the running process recognizes 

2447 # that the second phase of a B.1.2 replay window recovery Echo option comes 

2448 # from the current process, and thus its sequence number is fresh 

2449 echo_recovery = None 

2450 

2451 deterministic_hashfun = hashes.SHA256() 

2452 

2453 class ZeroIsAlwaysValid: 

2454 """Special-purpose replay window that accepts 0 indefinitely""" 

2455 

2456 def is_initialized(self): 

2457 return True 

2458 

2459 def is_valid(self, number): 

2460 # No particular reason to be lax here 

2461 return number == 0 

2462 

2463 def strike_out(self, number): 

2464 # FIXME: I'd rather indicate here that it's a potential replay, have the 

2465 # request_id.can_reuse_nonce = False 

2466 # set here rather than in _post_decrypt_checks, and thus also get 

2467 # the check for whether it's a safe method 

2468 pass 

2469 

2470 def persist(self): 

2471 pass 

2472 

2473 def __init__(self, groupcontext, recipient_id): 

2474 self.groupcontext = groupcontext 

2475 self.recipient_id = recipient_id 

2476 

2477 self.recipient_replay_window = self.ZeroIsAlwaysValid() 

2478 

2479 def __repr__(self): 

2480 return "<%s based on %r with the recipient ID %r>" % ( 

2481 type(self).__name__, 

2482 self.groupcontext, 

2483 self.recipient_id.hex(), 

2484 ) 

2485 

2486 def context_for_response(self): 

2487 return self.groupcontext 

2488 

2489 def _get_recipient_key(self, protected_message): 

2490 return self._kdf( 

2491 self.groupcontext.recipient_keys[self.recipient_id], 

2492 protected_message.opt.request_hash, 

2493 self.recipient_id, 

2494 "Key", 

2495 ) 

2496 

2497 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id): 

2498 if plaintext[0] not in (GET, FETCH): # FIXME: "is safe" 

2499 # FIXME: accept but return inner Unauthorized. (Raising Unauthorized 

2500 # here would just create an unprotected Unauthorized, which is not 

2501 # what's spec'd for here) 

2502 raise ProtectionInvalid("Request was not safe") 

2503 

2504 basekey = self.groupcontext.recipient_keys[self.recipient_id] 

2505 

2506 h = hashes.Hash(self.deterministic_hashfun) 

2507 h.update(basekey) 

2508 h.update(aad) 

2509 h.update(plaintext) 

2510 request_hash = h.finalize() 

2511 

2512 if request_hash != protected_message.opt.request_hash: 

2513 raise ProtectionInvalid( 

2514 "Client's hash of the plaintext diverges from the actual request hash" 

2515 ) 

2516 

2517 # This is intended for the protection of the response, and the 

2518 # later use in signature in the unprotect function is not happening 

2519 # here anyway, neither is the later use for Echo requests 

2520 request_id.request_hash = request_hash 

2521 request_id.can_reuse_nonce = False 

2522 

2523 # details needed for various operations, especially eAAD generation 

2524 

2525 # not inline because the equivalent lambda would not be recognized by mypy 

2526 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2527 @property 

2528 def alg_aead(self): 

2529 return self.groupcontext.alg_aead 

2530 

2531 @property 

2532 def hashfun(self): 

2533 return self.groupcontext.hashfun 

2534 

2535 @property 

2536 def common_iv(self): 

2537 return self.groupcontext.common_iv 

2538 

2539 @property 

2540 def id_context(self): 

2541 return self.groupcontext.id_context 

2542 

2543 @property 

2544 def alg_signature(self): 

2545 return self.groupcontext.alg_signature 

2546 

2547 

2548def verify_start(message): 

2549 """Extract the unprotected COSE options from a 

2550 message for the verifier to then pick a security context to actually verify 

2551 the message. (Future versions may also report fields from both unprotected 

2552 and protected, if the protected bag is ever used with OSCORE.). 

2553 

2554 Call this only requests; for responses, you'll have to know the security 

2555 context anyway, and there is usually no information to be gained.""" 

2556 

2557 _, _, unprotected, _ = CanUnprotect._extract_encrypted0(message) 

2558 

2559 return unprotected 

2560 

2561 

2562_getattr__ = deprecation_getattr( 

2563 { 

2564 "COSE_COUNTERSINGATURE0": "COSE_COUNTERSIGNATURE0", 

2565 "Algorithm": "AeadAlgorithm", 

2566 }, 

2567 globals(), 

2568)