Coverage for aiocoap/oscore.py: 85%

1219 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-30 11:17 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module contains the tools to send OSCORE secured messages. 

6 

7It only deals with the algorithmic parts, the security context and protection 

8and unprotection of messages. It does not touch on the integration of OSCORE in 

9the larger aiocoap stack of having a context or requests; that's what 

10:mod:`aiocoap.transports.osore` is for.`""" 

11 

12from __future__ import annotations 

13 

14from collections import namedtuple 

15import io 

16import json 

17import binascii 

18import os 

19import os.path 

20import tempfile 

21import abc 

22from typing import Optional, List, Any, Tuple 

23import secrets 

24import warnings 

25import logging 

26 

27from aiocoap.message import Message, Direction 

28from aiocoap.util import cryptography_additions, deprecation_getattr, Sentinel 

29from aiocoap.numbers import GET, POST, FETCH, CHANGED, UNAUTHORIZED, CONTENT 

30from aiocoap import error 

31from . import credentials 

32from aiocoap.defaults import log_secret 

33 

34from cryptography.hazmat.primitives.ciphers import aead 

35from cryptography.hazmat.primitives.kdf.hkdf import HKDF 

36from cryptography.hazmat.primitives import ciphers, hashes 

37import cryptography.hazmat.backends 

38import cryptography.exceptions 

39from cryptography.hazmat.primitives import asymmetric, serialization 

40from cryptography.hazmat.primitives.asymmetric.utils import ( 

41 decode_dss_signature, 

42 encode_dss_signature, 

43) 

44 

45import cbor2 as cbor 

46 

47import filelock 

48 

49# Logger through which log events from cryptographic operations (both inside 

50# the primitives and around key derivation) are traced. 

51_alglog = logging.getLogger("aiocoap.cryptography") 

52 

53MAX_SEQNO = 2**40 - 1 

54 

55# Relevant values from the IANA registry "CBOR Object Signing and Encryption (COSE)" 

56COSE_KID = 4 

57COSE_PIV = 6 

58COSE_KID_CONTEXT = 10 

59# from RFC9338 

60COSE_COUNTERSIGNATURE0 = 12 

61# from RFC9528 

62COSE_KCCS = 14 

63 

64COMPRESSION_BITS_N = 0b111 

65COMPRESSION_BIT_K = 0b1000 

66COMPRESSION_BIT_H = 0b10000 

67COMPRESSION_BIT_GROUP = 0b100000 # Group Flag from draft-ietf-core-oscore-groupcomm-21 

68COMPRESSION_BITS_RESERVED = 0b11000000 

69 

70CWT_CLAIM_CNF = 8 

71CWT_CNF_COSE_KEY = 1 

72COSE_KEY_COMMON_KTY = 1 

73COSE_KTY_OKP = 1 

74COSE_KTY_EC2 = 2 

75COSE_KEY_COMMON_ALG = 3 

76COSE_KEY_OKP_CRV = -1 

77COSE_KEY_OKP_X = -2 

78COSE_KEY_EC2_X = -2 

79COSE_KEY_EC2_Y = -3 

80 

81# While the original values were simple enough to be used in literals, starting 

82# with oscore-groupcomm we're using more compact values 

83 

84INFO_TYPE_KEYSTREAM_REQUEST = True 

85INFO_TYPE_KEYSTREAM_RESPONSE = False 

86 

87PRESENT_BUT_NO_VALUE_YET = Sentinel("Value will be populated later") 

88 

89 

90class CodeStyle(namedtuple("_CodeStyle", ("request", "response"))): 

91 FETCH_CONTENT: CodeStyle 

92 POST_CHANGED: CodeStyle 

93 

94 @classmethod 

95 def from_request(cls, request) -> CodeStyle: 

96 if request == FETCH: 

97 return cls.FETCH_CONTENT 

98 elif request == POST: 

99 return cls.POST_CHANGED 

100 else: 

101 raise ValueError("Invalid request code %r" % request) 

102 

103 

104CodeStyle.FETCH_CONTENT = CodeStyle(FETCH, CONTENT) 

105CodeStyle.POST_CHANGED = CodeStyle(POST, CHANGED) 

106 

107 

108class _DeterministicKey: 

109 """Singleton to indicate that for this key member no public or private key 

110 is available because it is the Deterministic Client (see 

111 <https://www.ietf.org/archive/id/draft-amsuess-core-cachable-oscore-01.html>) 

112 

113 This is highly experimental not only from an implementation but also from a 

114 specification point of view. The specification has not received adaequate 

115 review that would justify using it in any non-experimental scenario. 

116 """ 

117 

118 

119DETERMINISTIC_KEY = _DeterministicKey() 

120 

121 

122class NotAProtectedMessage(error.Error, ValueError): 

123 """Raised when verification is attempted on a non-OSCORE message""" 

124 

125 def __init__(self, message, plain_message): 

126 super().__init__(message) 

127 self.plain_message = plain_message 

128 

129 

130class ProtectionInvalid(error.Error, ValueError): 

131 """Raised when verification of an OSCORE message fails""" 

132 

133 

134class DecodeError(ProtectionInvalid): 

135 """Raised when verification of an OSCORE message fails because CBOR or compressed data were erroneous""" 

136 

137 

138class ReplayError(ProtectionInvalid): 

139 """Raised when verification of an OSCORE message fails because the sequence numbers was already used""" 

140 

141 

142class ReplayErrorWithEcho(ProtectionInvalid, error.RenderableError): 

143 """Raised when verification of an OSCORE message fails because the 

144 recipient replay window is uninitialized, but a 4.01 Echo can be 

145 constructed with the data in the exception that can lead to the client 

146 assisting in replay window recovery""" 

147 

148 def __init__(self, secctx, request_id, echo): 

149 self.secctx = secctx 

150 self.request_id = request_id 

151 self.echo = echo 

152 

153 def to_message(self): 

154 inner = Message( 

155 code=UNAUTHORIZED, 

156 echo=self.echo, 

157 ) 

158 outer, _ = self.secctx.protect(inner, request_id=self.request_id) 

159 return outer 

160 

161 

162class ContextUnavailable(error.Error, ValueError): 

163 """Raised when a context is (currently or permanently) unavailable for 

164 protecting or unprotecting a message""" 

165 

166 

167class RequestIdentifiers: 

168 """A container for details that need to be passed along from the 

169 (un)protection of a request to the (un)protection of the response; these 

170 data ensure that the request-response binding process works by passing 

171 around the request's partial IV. 

172 

173 Users of this module should never create or interact with instances, but 

174 just pass them around. 

175 """ 

176 

177 def __init__(self, kid, partial_iv, can_reuse_nonce, request_code): 

178 # The sender ID of whoever generated the partial IV 

179 self.kid = kid 

180 self.partial_iv = partial_iv 

181 self.can_reuse_nonce = can_reuse_nonce 

182 self.code_style = CodeStyle.from_request(request_code) 

183 

184 self.request_hash = None 

185 

186 def get_reusable_kid_and_piv(self): 

187 """Return the kid and the partial IV if can_reuse_nonce is True, and 

188 set can_reuse_nonce to False.""" 

189 

190 if self.can_reuse_nonce: 

191 self.can_reuse_nonce = False 

192 return (self.kid, self.partial_iv) 

193 else: 

194 return (None, None) 

195 

196 

197def _xor_bytes(a, b): 

198 assert len(a) == len(b), "XOR needs consistent lengths" 

199 # FIXME is this an efficient thing to do, or should we store everything 

200 # that possibly needs xor'ing as long integers with an associated length? 

201 return bytes(_a ^ _b for (_a, _b) in zip(a, b)) 

202 

203 

204class SymmetricEncryptionAlgorithm(metaclass=abc.ABCMeta): 

205 """A symmetric algorithm 

206 

207 The algorithm's API is the AEAD API with addtional authenticated data: The 

208 algorihm may or may not verify that data. Algorithms that actually do 

209 verify the data are recognized by also being AeadAlgorithm. 

210 """ 

211 

212 value: int 

213 key_bytes: int 

214 tag_bytes: int 

215 iv_bytes: int 

216 

217 @abc.abstractmethod 

218 def encrypt(cls, plaintext, aad, key, iv): 

219 """Return ciphertext + tag for given input data""" 

220 

221 @abc.abstractmethod 

222 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

223 """Reverse encryption. Must raise ProtectionInvalid on any error 

224 stemming from untrusted data.""" 

225 

226 @staticmethod 

227 def _build_encrypt0_structure(protected, external_aad): 

228 assert protected == {}, "Unexpected data in protected bucket" 

229 protected_serialized = b"" # were it into an empty dict, it'd be the cbor dump 

230 enc_structure = ["Encrypt0", protected_serialized, external_aad] 

231 

232 return cbor.dumps(enc_structure) 

233 

234 

235class AeadAlgorithm(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta): 

236 """A symmetric algorithm that provides authentication, including 

237 authentication of additional data.""" 

238 

239 

240class AES_CBC(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta): 

241 """AES in CBC mode using tthe Python cryptography library""" 

242 

243 tag_bytes = 0 

244 # This introduces padding -- this library doesn't need to care because 

245 # Python does allocation for us, but others may need to rethink their 

246 # buffer allocation strategies. 

247 

248 @classmethod 

249 def _cipher(cls, key, iv): 

250 return ciphers.base.Cipher( 

251 ciphers.algorithms.AES(key), 

252 ciphers.modes.CBC(iv), 

253 ) 

254 

255 @classmethod 

256 def encrypt(cls, plaintext, _aad, key, iv): 

257 # FIXME: Ignoring aad violates https://www.rfc-editor.org/rfc/rfc9459.html#name-implementation-consideratio but is required for Group OSCORE 

258 

259 # Padding according to https://www.rfc-editor.org/rfc/rfc5652#section-6.3 

260 k = cls.key_bytes 

261 assert k < 256, ( 

262 "Algorithm with this key size should not have been created in the first plae" 

263 ) 

264 pad_byte = k - (len(plaintext) % k) 

265 pad_bytes = bytes((pad_byte,)) * pad_byte 

266 plaintext += pad_bytes 

267 

268 encryptor = cls._cipher(key, iv).encryptor() 

269 result = encryptor.update(plaintext) 

270 result += encryptor.finalize() 

271 return result 

272 

273 @classmethod 

274 def decrypt(cls, ciphertext_and_tag, _aad, key, iv): 

275 # FIXME: Ignoring aad violates https://www.rfc-editor.org/rfc/rfc9459.html#name-implementation-consideratio but is required for Group OSCORE 

276 

277 k = cls.key_bytes 

278 if ciphertext_and_tag == b"" or len(ciphertext_and_tag) % k != 0: 

279 raise ProtectionInvalid("Message length does not match padding") 

280 

281 decryptor = cls._cipher(key, iv).decryptor() 

282 result = decryptor.update(ciphertext_and_tag) 

283 result += decryptor.finalize() 

284 

285 # Padding according to https://www.rfc-editor.org/rfc/rfc5652#section-6.3 

286 claimed_padding = result[-1] 

287 if claimed_padding == 0 or claimed_padding > k: 

288 raise ProtectionInvalid("Padding does not match key") 

289 if result[-claimed_padding:] != bytes((claimed_padding,)) * claimed_padding: 

290 raise ProtectionInvalid("Padding is inconsistent") 

291 

292 return result[:-claimed_padding] 

293 

294 

295class A128CBC(AES_CBC): 

296 # from RFC9459 

297 value = -65531 

298 key_bytes = 16 # 128-bit key 

299 iv_bytes = 16 # 16-octet nonce 

300 

301 

302class AES_CCM(AeadAlgorithm, metaclass=abc.ABCMeta): 

303 """AES-CCM implemented using the Python cryptography library""" 

304 

305 @classmethod 

306 def encrypt(cls, plaintext, aad, key, iv): 

307 return aead.AESCCM(key, cls.tag_bytes).encrypt(iv, plaintext, aad) 

308 

309 @classmethod 

310 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

311 try: 

312 return aead.AESCCM(key, cls.tag_bytes).decrypt(iv, ciphertext_and_tag, aad) 

313 except cryptography.exceptions.InvalidTag: 

314 raise ProtectionInvalid("Tag invalid") 

315 

316 

317class AES_CCM_16_64_128(AES_CCM): 

318 # from RFC8152 and draft-ietf-core-object-security-0[012] 3.2.1 

319 value = 10 

320 key_bytes = 16 # 128-bit key 

321 tag_bytes = 8 # 64-bit tag 

322 iv_bytes = 13 # 13-byte nonce 

323 

324 

325class AES_CCM_16_64_256(AES_CCM): 

326 # from RFC8152 

327 value = 11 

328 key_bytes = 32 # 256-bit key 

329 tag_bytes = 8 # 64-bit tag 

330 iv_bytes = 13 # 13-byte nonce 

331 

332 

333class AES_CCM_64_64_128(AES_CCM): 

334 # from RFC8152 

335 value = 12 

336 key_bytes = 16 # 128-bit key 

337 tag_bytes = 8 # 64-bit tag 

338 iv_bytes = 7 # 7-byte nonce 

339 

340 

341class AES_CCM_64_64_256(AES_CCM): 

342 # from RFC8152 

343 value = 13 

344 key_bytes = 32 # 256-bit key 

345 tag_bytes = 8 # 64-bit tag 

346 iv_bytes = 7 # 7-byte nonce 

347 

348 

349class AES_CCM_16_128_128(AES_CCM): 

350 # from RFC8152 

351 value = 30 

352 key_bytes = 16 # 128-bit key 

353 tag_bytes = 16 # 128-bit tag 

354 iv_bytes = 13 # 13-byte nonce 

355 

356 

357class AES_CCM_16_128_256(AES_CCM): 

358 # from RFC8152 

359 value = 31 

360 key_bytes = 32 # 256-bit key 

361 tag_bytes = 16 # 128-bit tag 

362 iv_bytes = 13 # 13-byte nonce 

363 

364 

365class AES_CCM_64_128_128(AES_CCM): 

366 # from RFC8152 

367 value = 32 

368 key_bytes = 16 # 128-bit key 

369 tag_bytes = 16 # 128-bit tag 

370 iv_bytes = 7 # 7-byte nonce 

371 

372 

373class AES_CCM_64_128_256(AES_CCM): 

374 # from RFC8152 

375 value = 33 

376 key_bytes = 32 # 256-bit key 

377 tag_bytes = 16 # 128-bit tag 

378 iv_bytes = 7 # 7-byte nonce 

379 

380 

381class AES_GCM(AeadAlgorithm, metaclass=abc.ABCMeta): 

382 """AES-GCM implemented using the Python cryptography library""" 

383 

384 iv_bytes = 12 # 96 bits fixed size of the nonce 

385 

386 @classmethod 

387 def encrypt(cls, plaintext, aad, key, iv): 

388 return aead.AESGCM(key).encrypt(iv, plaintext, aad) 

389 

390 @classmethod 

391 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

392 try: 

393 return aead.AESGCM(key).decrypt(iv, ciphertext_and_tag, aad) 

394 except cryptography.exceptions.InvalidTag: 

395 raise ProtectionInvalid("Tag invalid") 

396 

397 

398class A128GCM(AES_GCM): 

399 # from RFC8152 

400 value = 1 

401 key_bytes = 16 # 128-bit key 

402 tag_bytes = 16 # 128-bit tag 

403 

404 

405class A192GCM(AES_GCM): 

406 # from RFC8152 

407 value = 2 

408 key_bytes = 24 # 192-bit key 

409 tag_bytes = 16 # 128-bit tag 

410 

411 

412class A256GCM(AES_GCM): 

413 # from RFC8152 

414 value = 3 

415 key_bytes = 32 # 256-bit key 

416 tag_bytes = 16 # 128-bit tag 

417 

418 

419class ChaCha20Poly1305(AeadAlgorithm): 

420 # from RFC8152 

421 value = 24 

422 key_bytes = 32 # 256-bit key 

423 tag_bytes = 16 # 128-bit tag 

424 iv_bytes = 12 # 96-bit nonce 

425 

426 @classmethod 

427 def encrypt(cls, plaintext, aad, key, iv): 

428 return aead.ChaCha20Poly1305(key).encrypt(iv, plaintext, aad) 

429 

430 @classmethod 

431 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

432 try: 

433 return aead.ChaCha20Poly1305(key).decrypt(iv, ciphertext_and_tag, aad) 

434 except cryptography.exceptions.InvalidTag: 

435 raise ProtectionInvalid("Tag invalid") 

436 

437 

438class AlgorithmCountersign(metaclass=abc.ABCMeta): 

439 """A fully parameterized COSE countersign algorithm 

440 

441 An instance is able to provide all the alg_signature, par_countersign and 

442 par_countersign_key parameters taht go into the Group OSCORE algorithms 

443 field. 

444 """ 

445 

446 value: int | str 

447 

448 @abc.abstractmethod 

449 def sign(self, body, external_aad, private_key): 

450 """Return the signature produced by the key when using 

451 CounterSignature0 as describe in draft-ietf-cose-countersign-01""" 

452 

453 @abc.abstractmethod 

454 def verify(self, signature, body, external_aad, public_key): 

455 """Verify a signature in analogy to sign""" 

456 

457 @abc.abstractmethod 

458 def generate_with_ccs(self) -> Tuple[Any, bytes]: 

459 """Return a usable private key along with a CCS describing it""" 

460 

461 @abc.abstractmethod 

462 def public_from_private(self, private_key): 

463 """Given a private key, derive the publishable key""" 

464 

465 @abc.abstractmethod 

466 def from_kccs(self, ccs: bytes) -> Any: 

467 """Given a CCS, extract the public key, or raise a ValueError if the 

468 credential format does not align with the type. 

469 

470 The type is not exactly Any, but whichever type is used by this 

471 algorithm class.""" 

472 

473 @staticmethod 

474 def _build_countersign_structure(body, external_aad): 

475 countersign_structure = [ 

476 "CounterSignature0", 

477 b"", 

478 b"", 

479 external_aad, 

480 body, 

481 ] 

482 tobesigned = cbor.dumps(countersign_structure) 

483 return tobesigned 

484 

485 @abc.abstractproperty 

486 def signature_length(self) -> int: 

487 """The length of a signature using this algorithm""" 

488 

489 @abc.abstractproperty 

490 def curve_number(self) -> int: 

491 """Registered curve number used with this algorithm. 

492 

493 Only used for verification of credentials' details""" 

494 

495 

496class AlgorithmStaticStatic(metaclass=abc.ABCMeta): 

497 @abc.abstractmethod 

498 def staticstatic(self, private_key, public_key): 

499 """Derive a shared static-static secret from a private and a public key""" 

500 

501 

502def _from_kccs_common(ccs: bytes) -> dict: 

503 """Check that the CCS contains a CNF claim that is a COSE Key, and return 

504 that key""" 

505 

506 try: 

507 parsed = cbor.loads(ccs) 

508 except cbor.CBORDecodeError as e: 

509 raise ValueError("CCS not in CBOR format") from e 

510 

511 if ( 

512 not isinstance(parsed, dict) 

513 or CWT_CLAIM_CNF not in parsed 

514 or not isinstance(parsed[CWT_CLAIM_CNF], dict) 

515 or CWT_CNF_COSE_KEY not in parsed[CWT_CLAIM_CNF] 

516 or not isinstance(parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY], dict) 

517 ): 

518 raise ValueError("CCS must contain a COSE Key dict in a CNF") 

519 

520 return parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY] 

521 

522 

523class Ed25519(AlgorithmCountersign): 

524 def sign(self, body, aad, private_key): 

525 _alglog.debug("Perfoming signature:") 

526 _alglog.debug("* body: %s", body.hex()) 

527 _alglog.debug("* AAD: %s", aad.hex()) 

528 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes( 

529 private_key 

530 ) 

531 return private_key.sign(self._build_countersign_structure(body, aad)) 

532 

533 def verify(self, signature, body, aad, public_key): 

534 _alglog.debug("Verifying signature:") 

535 _alglog.debug("* body: %s", body.hex()) 

536 _alglog.debug("* AAD: %s", aad.hex()) 

537 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key) 

538 try: 

539 public_key.verify(signature, self._build_countersign_structure(body, aad)) 

540 except cryptography.exceptions.InvalidSignature: 

541 _alglog.debug("Signature was invalid.") 

542 raise ProtectionInvalid("Signature mismatch") 

543 

544 def _generate(self): 

545 key = asymmetric.ed25519.Ed25519PrivateKey.generate() 

546 # FIXME: We could avoid handing the easy-to-misuse bytes around if the 

547 # current algorithm interfaces did not insist on passing the 

548 # exchangable representations -- and generally that should be more 

549 # efficient. 

550 return key.private_bytes( 

551 encoding=serialization.Encoding.Raw, 

552 format=serialization.PrivateFormat.Raw, 

553 encryption_algorithm=serialization.NoEncryption(), 

554 ) 

555 

556 def generate_with_ccs(self) -> Tuple[Any, bytes]: 

557 private = self._generate() 

558 public = self.public_from_private(private) 

559 

560 ccs = cbor.dumps( 

561 { 

562 CWT_CLAIM_CNF: { 

563 CWT_CNF_COSE_KEY: { 

564 COSE_KEY_COMMON_KTY: COSE_KTY_OKP, 

565 COSE_KEY_COMMON_ALG: self.value, 

566 COSE_KEY_OKP_CRV: self.curve_number, 

567 COSE_KEY_OKP_X: public, 

568 } 

569 } 

570 } 

571 ) 

572 

573 return (private, ccs) 

574 

575 def public_from_private(self, private_key): 

576 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes( 

577 private_key 

578 ) 

579 public_key = private_key.public_key() 

580 return public_key.public_bytes( 

581 encoding=serialization.Encoding.Raw, 

582 format=serialization.PublicFormat.Raw, 

583 ) 

584 

585 def from_kccs(self, ccs: bytes) -> Any: 

586 # eg. {1: 1, 3: -8, -1: 6, -2: h'77 ... 88'} 

587 cose_key = _from_kccs_common(ccs) 

588 

589 if ( 

590 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_OKP 

591 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value 

592 and cose_key.get(COSE_KEY_OKP_CRV) == self.curve_number 

593 and COSE_KEY_OKP_X in cose_key 

594 ): 

595 return cose_key[COSE_KEY_OKP_X] 

596 else: 

597 raise ValueError("Key type not recognized from CCS key %r" % cose_key) 

598 

599 value = -8 

600 curve_number = 6 

601 

602 signature_length = 64 

603 

604 

605class EcdhSsHkdf256(AlgorithmStaticStatic): 

606 # FIXME: This class uses the Edwards keys as private and public keys, and 

607 # not the converted ones. This will be problematic if pairwise-only 

608 # contexts are to be set up. 

609 

610 value = -27 

611 

612 # FIXME these two will be different when using the Montgomery keys directly 

613 

614 # This one will only be used when establishing and distributing pairwise-only keys 

615 public_from_private = Ed25519.public_from_private 

616 

617 def staticstatic(self, private_key, public_key): 

618 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes( 

619 private_key 

620 ) 

621 private_key = cryptography_additions.sk_to_curve25519(private_key) 

622 

623 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key) 

624 public_key = cryptography_additions.pk_to_curve25519(public_key) 

625 

626 return private_key.exchange(public_key) 

627 

628 

629class ECDSA_SHA256_P256(AlgorithmCountersign, AlgorithmStaticStatic): 

630 # Trying a new construction approach -- should work just as well given 

631 # we're just passing Python objects around 

632 def from_public_parts(self, x: bytes, y: bytes): 

633 """Create a public key from its COSE values""" 

634 return asymmetric.ec.EllipticCurvePublicNumbers( 

635 int.from_bytes(x, "big"), 

636 int.from_bytes(y, "big"), 

637 asymmetric.ec.SECP256R1(), 

638 ).public_key() 

639 

640 def from_kccs(self, ccs: bytes) -> Any: 

641 cose_key = _from_kccs_common(ccs) 

642 

643 if ( 

644 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_EC2 

645 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value 

646 and COSE_KEY_EC2_X in cose_key 

647 and COSE_KEY_EC2_Y in cose_key 

648 ): 

649 return self.from_public_parts( 

650 x=cose_key[COSE_KEY_EC2_X], 

651 y=cose_key[COSE_KEY_EC2_Y], 

652 ) 

653 else: 

654 raise ValueError("Key type not recognized from CCS key %r" % cose_key) 

655 

656 def from_private_parts(self, x: bytes, y: bytes, d: bytes): 

657 public_numbers = self.from_public_parts(x, y).public_numbers() 

658 private_numbers = asymmetric.ec.EllipticCurvePrivateNumbers( 

659 int.from_bytes(d, "big"), public_numbers 

660 ) 

661 return private_numbers.private_key() 

662 

663 def sign(self, body, aad, private_key): 

664 der_signature = private_key.sign( 

665 self._build_countersign_structure(body, aad), 

666 asymmetric.ec.ECDSA(hashes.SHA256()), 

667 ) 

668 (r, s) = decode_dss_signature(der_signature) 

669 

670 return r.to_bytes(32, "big") + s.to_bytes(32, "big") 

671 

672 def verify(self, signature, body, aad, public_key): 

673 r = signature[:32] 

674 s = signature[32:] 

675 r = int.from_bytes(r, "big") 

676 s = int.from_bytes(s, "big") 

677 der_signature = encode_dss_signature(r, s) 

678 try: 

679 public_key.verify( 

680 der_signature, 

681 self._build_countersign_structure(body, aad), 

682 asymmetric.ec.ECDSA(hashes.SHA256()), 

683 ) 

684 except cryptography.exceptions.InvalidSignature: 

685 raise ProtectionInvalid("Signature mismatch") 

686 

687 def _generate(self): 

688 return asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) 

689 

690 def generate_with_ccs(self) -> Tuple[Any, bytes]: 

691 private = self._generate() 

692 public = self.public_from_private(private) 

693 # FIXME: Deduplicate with edhoc.py 

694 x = public.public_numbers().x.to_bytes(32, "big") 

695 y = public.public_numbers().y.to_bytes(32, "big") 

696 

697 ccs = cbor.dumps( 

698 { 

699 CWT_CLAIM_CNF: { 

700 CWT_CNF_COSE_KEY: { 

701 COSE_KEY_COMMON_KTY: COSE_KTY_EC2, 

702 COSE_KEY_COMMON_ALG: self.value, 

703 COSE_KEY_EC2_X: x, 

704 COSE_KEY_EC2_Y: y, 

705 } 

706 } 

707 } 

708 ) 

709 

710 return (private, ccs) 

711 

712 def public_from_private(self, private_key): 

713 return private_key.public_key() 

714 

715 def staticstatic(self, private_key, public_key): 

716 return private_key.exchange(asymmetric.ec.ECDH(), public_key) 

717 

718 value = -7 # FIXME: when used as a static-static algorithm, does this become -27? see shepherd review. 

719 curve_number = 1 

720 

721 signature_length = 64 

722 

723 

724algorithms = { 

725 "AES-CCM-16-64-128": AES_CCM_16_64_128(), 

726 "AES-CCM-16-64-256": AES_CCM_16_64_256(), 

727 "AES-CCM-64-64-128": AES_CCM_64_64_128(), 

728 "AES-CCM-64-64-256": AES_CCM_64_64_256(), 

729 "AES-CCM-16-128-128": AES_CCM_16_128_128(), 

730 "AES-CCM-16-128-256": AES_CCM_16_128_256(), 

731 "AES-CCM-64-128-128": AES_CCM_64_128_128(), 

732 "AES-CCM-64-128-256": AES_CCM_64_128_256(), 

733 "ChaCha20/Poly1305": ChaCha20Poly1305(), 

734 "A128GCM": A128GCM(), 

735 "A192GCM": A192GCM(), 

736 "A256GCM": A256GCM(), 

737 "A128CBC": A128CBC(), 

738} 

739 

740# algorithms with full parameter set 

741algorithms_countersign = { 

742 # maybe needs a different name... 

743 "EdDSA on Ed25519": Ed25519(), 

744 "ECDSA w/ SHA-256 on P-256": ECDSA_SHA256_P256(), 

745} 

746 

747algorithms_staticstatic = { 

748 "ECDH-SS + HKDF-256": EcdhSsHkdf256(), 

749} 

750 

751DEFAULT_ALGORITHM = "AES-CCM-16-64-128" 

752 

753_hash_backend = cryptography.hazmat.backends.default_backend() 

754hashfunctions = { 

755 "sha256": hashes.SHA256(), 

756 "sha384": hashes.SHA384(), 

757 "sha512": hashes.SHA512(), 

758} 

759 

760DEFAULT_HASHFUNCTION = "sha256" 

761 

762DEFAULT_WINDOWSIZE = 32 

763 

764 

765class BaseSecurityContext: 

766 # Deprecated marker for whether the class uses the 

767 # ContextWhereExternalAadIsGroup mixin; see documentation there. 

768 external_aad_is_group = False 

769 

770 # Authentication information carried with this security context; managed 

771 # externally by whatever creates the security context. 

772 authenticated_claims: List[str] = [] 

773 

774 #: AEAD algorithm. This may be None if it is not set in an OSCORE group context. 

775 alg_aead: Optional[AeadAlgorithm] 

776 

777 #: The common IV of the common context. 

778 #: 

779 #: This may be longer than needed for constructing IVs with any particular 

780 #: algorithm, as per <https://www.ietf.org/archive/id/draft-ietf-core-oscore-groupcomm-23.html#section-2.1.4> 

781 common_iv: bytes 

782 

783 id_context: Optional[bytes] 

784 

785 @property 

786 def algorithm(self): 

787 warnings.warn( 

788 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2 

789 ) 

790 return self.alg_aead 

791 

792 @algorithm.setter 

793 def algorithm(self, value): 

794 warnings.warn( 

795 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2 

796 ) 

797 self.alg_aead = value 

798 

799 hashfun: hashes.HashAlgorithm 

800 

801 def _construct_nonce( 

802 self, partial_iv_short, piv_generator_id, alg: SymmetricEncryptionAlgorithm 

803 ): 

804 pad_piv = b"\0" * (5 - len(partial_iv_short)) 

805 

806 s = bytes([len(piv_generator_id)]) 

807 pad_id = b"\0" * (alg.iv_bytes - 6 - len(piv_generator_id)) 

808 

809 components = s + pad_id + piv_generator_id + pad_piv + partial_iv_short 

810 

811 used_common_iv = self.common_iv[: len(components)] 

812 nonce = _xor_bytes(used_common_iv, components) 

813 _alglog.debug( 

814 "Nonce construction: common %s ^ components %s = %s", 

815 self.common_iv.hex(), 

816 components.hex(), 

817 nonce.hex(), 

818 ) 

819 

820 return nonce 

821 

822 def _extract_external_aad( 

823 self, message, request_id, local_is_sender: bool 

824 ) -> bytes: 

825 """Build the serialized external AAD from information in the message 

826 and the request_id. 

827 

828 Information about whether the local context is the sender of the 

829 message is only relevant to group contexts, where it influences whose 

830 authentication credentials are placed in the AAD. 

831 """ 

832 # If any option were actually Class I, it would be something like 

833 # 

834 # the_options = pick some of(message) 

835 # class_i_options = Message(the_options).opt.encode() 

836 

837 oscore_version = 1 

838 class_i_options = b"" 

839 if request_id.request_hash is not None: 

840 class_i_options = Message(request_hash=request_id.request_hash).opt.encode() 

841 

842 algorithms: List[int | str | None] = [ 

843 None if self.alg_aead is None else self.alg_aead.value 

844 ] 

845 if isinstance(self, ContextWhereExternalAadIsGroup): 

846 algorithms.append( 

847 None if self.alg_group_enc is None else self.alg_group_enc.value 

848 ) 

849 algorithms.append( 

850 None if self.alg_signature is None else self.alg_signature.value 

851 ) 

852 algorithms.append( 

853 None 

854 if self.alg_pairwise_key_agreement is None 

855 else self.alg_pairwise_key_agreement.value 

856 ) 

857 

858 external_aad = [ 

859 oscore_version, 

860 algorithms, 

861 request_id.kid, 

862 request_id.partial_iv, 

863 class_i_options, 

864 ] 

865 

866 if isinstance(self, ContextWhereExternalAadIsGroup): 

867 # FIXME: We may need to carry this over in the request_id when 

868 # observation span group rekeyings 

869 external_aad.append(self.id_context) 

870 

871 assert message.opt.oscore is not None, "Double OSCORE" 

872 external_aad.append(message.opt.oscore) 

873 

874 if local_is_sender: 

875 external_aad.append(self.sender_auth_cred) 

876 else: 

877 external_aad.append(self.recipient_auth_cred) 

878 external_aad.append(self.group_manager_cred) 

879 

880 return cbor.dumps(external_aad) 

881 

882 

883class ContextWhereExternalAadIsGroup(BaseSecurityContext): 

884 """The protection and unprotection functions will use the Group OSCORE AADs 

885 rather than the regular OSCORE AADs iff a context uses this mixin. (Ie. 

886 alg_group_enc etc are added to the algorithms, and request_kid_context, 

887 OSCORE_option, sender_auth_cred and gm_cred are added). 

888 

889 This does not necessarily match the is_signing property (as pairwise 

890 contexts use this but don't sign), and is distinct from the added OSCORE 

891 option in the AAD (as that's only applicable for the external AAD as 

892 extracted for signing and signature verification purposes).""" 

893 

894 id_context: bytes 

895 

896 external_aad_is_group = True 

897 

898 alg_group_enc: Optional[SymmetricEncryptionAlgorithm] 

899 alg_signature: Optional[AlgorithmCountersign] 

900 # This is also of type AlgorithmCountersign because the staticstatic 

901 # function is sitting on the same type. 

902 alg_pairwise_key_agreement: Optional[AlgorithmCountersign] 

903 

904 sender_auth_cred: bytes 

905 recipient_auth_cred: bytes 

906 group_manager_cred: bytes 

907 

908 

909# FIXME pull interface components from SecurityContext up here 

910class CanProtect(BaseSecurityContext, metaclass=abc.ABCMeta): 

911 # The protection function will add a signature acccording to the context's 

912 # alg_signature attribute if this is true 

913 is_signing = False 

914 

915 # Send the KID when protecting responses 

916 # 

917 # Once group pairwise mode is implemented, this will need to become a 

918 # parameter to protect(), which is stored at the point where the incoming 

919 # context is turned into an outgoing context. (Currently, such a mechanism 

920 # isn't there yet, and oscore_wrapper protects responses with the very same 

921 # context they came in on). 

922 responses_send_kid = False 

923 

924 #: The KID sent by this party when sending requests, or answering to group 

925 #: requests. 

926 sender_id: bytes 

927 

928 @staticmethod 

929 def _compress(protected, unprotected, ciphertext): 

930 """Pack the untagged COSE_Encrypt0 object described by the *args 

931 into two bytestrings suitable for the Object-Security option and the 

932 message body""" 

933 

934 if protected: 

935 raise RuntimeError( 

936 "Protection produced a message that has uncompressable fields." 

937 ) 

938 

939 piv = unprotected.pop(COSE_PIV, b"") 

940 if len(piv) > COMPRESSION_BITS_N: 

941 raise ValueError("Can't encode overly long partial IV") 

942 

943 firstbyte = len(piv) 

944 if COSE_KID in unprotected: 

945 firstbyte |= COMPRESSION_BIT_K 

946 kid_data = unprotected.pop(COSE_KID) 

947 else: 

948 kid_data = b"" 

949 

950 if COSE_KID_CONTEXT in unprotected: 

951 firstbyte |= COMPRESSION_BIT_H 

952 kid_context = unprotected.pop(COSE_KID_CONTEXT) 

953 s = len(kid_context) 

954 if s > 255: 

955 raise ValueError("KID Context too long") 

956 s_kid_context = bytes((s,)) + kid_context 

957 else: 

958 s_kid_context = b"" 

959 

960 if COSE_COUNTERSIGNATURE0 in unprotected: 

961 firstbyte |= COMPRESSION_BIT_GROUP 

962 

963 unprotected.pop(COSE_COUNTERSIGNATURE0) 

964 

965 # ciphertext will eventually also get the countersignature, but 

966 # that happens later when the option is already processed. 

967 

968 if unprotected: 

969 raise RuntimeError( 

970 "Protection produced a message that has uncompressable fields." 

971 ) 

972 

973 if firstbyte: 

974 option = bytes([firstbyte]) + piv + s_kid_context + kid_data 

975 else: 

976 option = b"" 

977 

978 return (option, ciphertext) 

979 

980 def protect(self, message, request_id=None, *, kid_context=True): 

981 """Given a plain CoAP message, create a protected message that contains 

982 message's options in the inner or outer CoAP message as described in 

983 OSCOAP. 

984 

985 If the message is a response to a previous message, the additional data 

986 from unprotecting the request are passed in as request_id. When 

987 request data is present, its partial IV is reused if possible. The 

988 security context's ID context is encoded in the resulting message 

989 unless kid_context is explicitly set to a False; other values for the 

990 kid_context can be passed in as byte string in the same parameter. 

991 """ 

992 

993 _alglog.debug( 

994 "Protecting message %s with context %s and request ID %s", 

995 message, 

996 self, 

997 request_id, 

998 ) 

999 

1000 assert (request_id is None) == message.code.is_request(), ( 

1001 "Requestishness of code to protect does not match presence of request ID" 

1002 ) 

1003 

1004 assert message.direction is Direction.OUTGOING 

1005 

1006 outer_message, plaintext = self._split_message(message, request_id) 

1007 

1008 outer_message.direction = Direction.OUTGOING 

1009 # There are currently no properties that relate to OSCORE that'd need to be discarded. 

1010 outer_message.transport_tuning = message.transport_tuning 

1011 

1012 protected = {} 

1013 nonce = None 

1014 partial_iv_generated_by = None 

1015 unprotected = {} 

1016 if request_id is not None: 

1017 partial_iv_generated_by, partial_iv_short = ( 

1018 request_id.get_reusable_kid_and_piv() 

1019 ) 

1020 

1021 alg_symmetric = self.alg_group_enc if self.is_signing else self.alg_aead 

1022 assert isinstance(alg_symmetric, AeadAlgorithm) or self.is_signing, ( 

1023 "Non-AEAD algorithms can only be used in signing modes." 

1024 ) 

1025 

1026 if partial_iv_generated_by is None: 

1027 nonce, partial_iv_short = self._build_new_nonce(alg_symmetric) 

1028 partial_iv_generated_by = self.sender_id 

1029 

1030 unprotected[COSE_PIV] = partial_iv_short 

1031 else: 

1032 nonce = self._construct_nonce( 

1033 partial_iv_short, partial_iv_generated_by, alg_symmetric 

1034 ) 

1035 

1036 if message.code.is_request(): 

1037 unprotected[COSE_KID] = self.sender_id 

1038 

1039 request_id = RequestIdentifiers( 

1040 self.sender_id, 

1041 partial_iv_short, 

1042 can_reuse_nonce=None, 

1043 request_code=outer_message.code, 

1044 ) 

1045 

1046 if kid_context is True: 

1047 if self.id_context is not None: 

1048 unprotected[COSE_KID_CONTEXT] = self.id_context 

1049 elif kid_context is not False: 

1050 unprotected[COSE_KID_CONTEXT] = kid_context 

1051 else: 

1052 if self.responses_send_kid: 

1053 unprotected[COSE_KID] = self.sender_id 

1054 

1055 # Putting in a dummy value as the signature calculation will already need some of the compression result 

1056 if self.is_signing: 

1057 unprotected[COSE_COUNTERSIGNATURE0] = b"" 

1058 # FIXME: Running this twice quite needlessly (just to get the oscore option for sending) 

1059 option_data, _ = self._compress(protected, unprotected, b"") 

1060 

1061 outer_message.opt.oscore = option_data 

1062 

1063 external_aad = self._extract_external_aad( 

1064 outer_message, request_id, local_is_sender=True 

1065 ) 

1066 

1067 aad = SymmetricEncryptionAlgorithm._build_encrypt0_structure( 

1068 protected, external_aad 

1069 ) 

1070 

1071 key = self._get_sender_key(outer_message, external_aad, plaintext, request_id) 

1072 

1073 _alglog.debug("Encrypting Encrypt0:") 

1074 _alglog.debug("* aad = %s", aad.hex()) 

1075 _alglog.debug("* nonce = %s", nonce.hex()) 

1076 _alglog.debug("* key = %s", log_secret(key.hex())) 

1077 _alglog.debug("* algorithm = %s", alg_symmetric) 

1078 ciphertext = alg_symmetric.encrypt(plaintext, aad, key, nonce) 

1079 

1080 _alglog.debug("Produced ciphertext %s", ciphertext.hex()) 

1081 

1082 _, payload = self._compress(protected, unprotected, ciphertext) 

1083 

1084 if self.is_signing: 

1085 signature = self.alg_signature.sign(payload, external_aad, self.private_key) 

1086 # This is bordering "it's OK to log it in plain", because a reader 

1087 # of the log can access both the plaintext and the ciphertext, but 

1088 # still, it is called a key. 

1089 _alglog.debug( 

1090 "Producing keystream from signature encryption key: %s", 

1091 log_secret(self.signature_encryption_key.hex()), 

1092 ) 

1093 keystream = self._kdf_for_keystreams( 

1094 partial_iv_generated_by, 

1095 partial_iv_short, 

1096 self.signature_encryption_key, 

1097 self.sender_id, 

1098 INFO_TYPE_KEYSTREAM_REQUEST 

1099 if message.code.is_request() 

1100 else INFO_TYPE_KEYSTREAM_RESPONSE, 

1101 ) 

1102 _alglog.debug("Keystream is %s", keystream.hex()) 

1103 encrypted_signature = _xor_bytes(signature, keystream) 

1104 _alglog.debug("Encrypted signature %s", encrypted_signature.hex()) 

1105 payload += encrypted_signature 

1106 outer_message.payload = payload 

1107 

1108 # FIXME go through options section 

1109 

1110 _alglog.debug( 

1111 "Protecting the message succeeded, yielding ciphertext %s and request ID %s", 

1112 outer_message, 

1113 request_id, 

1114 ) 

1115 # the request_id in the second argument should be discarded by the 

1116 # caller when protecting a response -- is that reason enough for an 

1117 # `if` and returning None? 

1118 return outer_message, request_id 

1119 

1120 def _get_sender_key(self, outer_message, aad, plaintext, request_id): 

1121 """Customization hook of the protect function 

1122 

1123 While most security contexts have a fixed sender key, deterministic 

1124 requests need to shake up a few things. They need to modify the outer 

1125 message, as well as the request_id as it will later be used to 

1126 unprotect the response.""" 

1127 return self.sender_key 

1128 

1129 def _split_message(self, message, request_id): 

1130 """Given a protected message, return the outer message that contains 

1131 all Class I and Class U options (but without payload or Object-Security 

1132 option), and the encoded inner message that contains all Class E 

1133 options and the payload. 

1134 

1135 This leaves the messages' remotes unset.""" 

1136 

1137 if message.code.is_request(): 

1138 outer_host = message.opt.uri_host 

1139 proxy_uri = message.opt.proxy_uri 

1140 

1141 inner_message = message.copy( 

1142 uri_host=None, 

1143 uri_port=None, 

1144 proxy_uri=None, 

1145 proxy_scheme=None, 

1146 ) 

1147 inner_message.remote = None 

1148 

1149 if proxy_uri is not None: 

1150 # Use set_request_uri to split up the proxy URI into its 

1151 # components; extract, preserve and clear them. 

1152 inner_message.set_request_uri(proxy_uri, set_uri_host=False) 

1153 if inner_message.opt.proxy_uri is not None: 

1154 raise ValueError("Can not split Proxy-URI into options") 

1155 outer_uri = inner_message.remote.uri_base 

1156 inner_message.remote = None 

1157 inner_message.opt.proxy_scheme = None 

1158 

1159 if message.opt.observe is None: 

1160 outer_code = POST 

1161 else: 

1162 outer_code = FETCH 

1163 else: 

1164 outer_host = None 

1165 proxy_uri = None 

1166 

1167 inner_message = message.copy() 

1168 

1169 outer_code = request_id.code_style.response 

1170 

1171 # no max-age because these are always successsful responses 

1172 outer_message = Message( 

1173 code=outer_code, 

1174 uri_host=outer_host, 

1175 observe=None if message.code.is_response() else message.opt.observe, 

1176 ) 

1177 if proxy_uri is not None: 

1178 outer_message.set_request_uri(outer_uri) 

1179 

1180 plaintext = bytes([inner_message.code]) + inner_message.opt.encode() 

1181 if inner_message.payload: 

1182 plaintext += bytes([0xFF]) 

1183 plaintext += inner_message.payload 

1184 

1185 return outer_message, plaintext 

1186 

1187 def _build_new_nonce(self, alg: SymmetricEncryptionAlgorithm): 

1188 """This implements generation of a new nonce, assembled as per Figure 5 

1189 of draft-ietf-core-object-security-06. Returns the shortened partial IV 

1190 as well.""" 

1191 seqno = self.new_sequence_number() 

1192 

1193 partial_iv = seqno.to_bytes(5, "big") 

1194 

1195 return ( 

1196 self._construct_nonce(partial_iv, self.sender_id, alg), 

1197 partial_iv.lstrip(b"\0") or b"\0", 

1198 ) 

1199 

1200 # sequence number handling 

1201 

1202 def new_sequence_number(self): 

1203 """Return a new sequence number; the implementation is responsible for 

1204 never returning the same value twice in a given security context. 

1205 

1206 May raise ContextUnavailable.""" 

1207 retval = self.sender_sequence_number 

1208 if retval >= MAX_SEQNO: 

1209 raise ContextUnavailable("Sequence number too large, context is exhausted.") 

1210 self.sender_sequence_number += 1 

1211 self.post_seqnoincrease() 

1212 return retval 

1213 

1214 # implementation defined 

1215 

1216 @abc.abstractmethod 

1217 def post_seqnoincrease(self): 

1218 """Ensure that sender_sequence_number is stored""" 

1219 raise 

1220 

1221 def context_from_response(self, unprotected_bag) -> CanUnprotect: 

1222 """When receiving a response to a request protected with this security 

1223 context, pick the security context with which to unprotect the response 

1224 given the unprotected information from the Object-Security option. 

1225 

1226 This allow picking the right security context in a group response, and 

1227 helps getting a new short-lived context for B.2 mode. The default 

1228 behaivor is returning self. 

1229 """ 

1230 

1231 # FIXME justify by moving into a mixin for CanProtectAndUnprotect 

1232 return self # type: ignore 

1233 

1234 

1235class CanUnprotect(BaseSecurityContext): 

1236 recipient_key: bytes 

1237 

1238 def unprotect(self, protected_message, request_id=None): 

1239 _alglog.debug( 

1240 "Unprotecting message %s with context %s and request ID %s", 

1241 protected_message, 

1242 self, 

1243 request_id, 

1244 ) 

1245 

1246 assert (request_id is not None) == protected_message.code.is_response(), ( 

1247 "Requestishness of code to unprotect does not match presence of request ID" 

1248 ) 

1249 is_response = protected_message.code.is_response() 

1250 

1251 assert protected_message.direction is Direction.INCOMING 

1252 

1253 # Set to a raisable exception on replay check failures; it will be 

1254 # raised, but the package may still be processed in the course of Echo handling. 

1255 replay_error = None 

1256 

1257 protected_serialized, protected, unprotected, ciphertext = ( 

1258 self._extract_encrypted0(protected_message) 

1259 ) 

1260 

1261 if protected: 

1262 raise ProtectionInvalid("The protected field is not empty") 

1263 

1264 # FIXME check for duplicate keys in protected 

1265 

1266 if unprotected.pop(COSE_KID_CONTEXT, self.id_context) != self.id_context: 

1267 # FIXME is this necessary? 

1268 raise ProtectionInvalid("Sender ID context does not match") 

1269 

1270 if unprotected.pop(COSE_KID, self.recipient_id) != self.recipient_id: 

1271 # for most cases, this is caught by the session ID dispatch, but in 

1272 # responses (where explicit sender IDs are atypical), this is a 

1273 # valid check 

1274 raise ProtectionInvalid("Sender ID does not match") 

1275 

1276 if COSE_PIV not in unprotected: 

1277 if not is_response: 

1278 raise ProtectionInvalid("No sequence number provided in request") 

1279 

1280 seqno = None # sentinel for not striking out anyting 

1281 partial_iv_short = request_id.partial_iv 

1282 partial_iv_generated_by = request_id.kid 

1283 else: 

1284 partial_iv_short = unprotected.pop(COSE_PIV) 

1285 partial_iv_generated_by = self.recipient_id 

1286 

1287 seqno = int.from_bytes(partial_iv_short, "big") 

1288 

1289 if not is_response: 

1290 if not self.recipient_replay_window.is_initialized(): 

1291 replay_error = ReplayError("Sequence number check unavailable") 

1292 elif not self.recipient_replay_window.is_valid(seqno): 

1293 replay_error = ReplayError("Sequence number was re-used") 

1294 

1295 if replay_error is not None and self.echo_recovery is None: 

1296 # Don't even try decoding if there is no reason to 

1297 raise replay_error 

1298 

1299 request_id = RequestIdentifiers( 

1300 partial_iv_generated_by, 

1301 partial_iv_short, 

1302 can_reuse_nonce=replay_error is None, 

1303 request_code=protected_message.code, 

1304 ) 

1305 

1306 external_aad = self._extract_external_aad( 

1307 protected_message, request_id, local_is_sender=False 

1308 ) 

1309 

1310 if unprotected.pop(COSE_COUNTERSIGNATURE0, None) is not None: 

1311 try: 

1312 alg_signature = self.alg_signature 

1313 except NameError: 

1314 raise DecodeError( 

1315 "Group messages can not be decoded with this non-group context" 

1316 ) 

1317 

1318 siglen = alg_signature.signature_length 

1319 if len(ciphertext) < siglen: 

1320 raise DecodeError("Message too short for signature") 

1321 encrypted_signature = ciphertext[-siglen:] 

1322 

1323 _alglog.debug( 

1324 "Producing keystream from signature encryption key: %s", 

1325 log_secret(self.signature_encryption_key.hex()), 

1326 ) 

1327 keystream = self._kdf_for_keystreams( 

1328 partial_iv_generated_by, 

1329 partial_iv_short, 

1330 self.signature_encryption_key, 

1331 self.recipient_id, 

1332 INFO_TYPE_KEYSTREAM_REQUEST 

1333 if protected_message.code.is_request() 

1334 else INFO_TYPE_KEYSTREAM_RESPONSE, 

1335 ) 

1336 _alglog.debug("Encrypted signature %s", encrypted_signature.hex()) 

1337 _alglog.debug("Keystream is %s", keystream.hex()) 

1338 signature = _xor_bytes(encrypted_signature, keystream) 

1339 

1340 ciphertext = ciphertext[:-siglen] 

1341 

1342 alg_signature.verify( 

1343 signature, ciphertext, external_aad, self.recipient_public_key 

1344 ) 

1345 

1346 alg_symmetric = self.alg_group_enc 

1347 else: 

1348 alg_symmetric = self.alg_aead 

1349 

1350 if unprotected: 

1351 raise DecodeError("Unsupported unprotected option") 

1352 

1353 if ( 

1354 len(ciphertext) < self.alg_aead.tag_bytes + 1 

1355 ): # +1 assures access to plaintext[0] (the code) 

1356 raise ProtectionInvalid("Ciphertext too short") 

1357 

1358 enc_structure = ["Encrypt0", protected_serialized, external_aad] 

1359 aad = cbor.dumps(enc_structure) 

1360 

1361 key = self._get_recipient_key(protected_message, alg_symmetric) 

1362 

1363 nonce = self._construct_nonce( 

1364 partial_iv_short, partial_iv_generated_by, alg_symmetric 

1365 ) 

1366 

1367 _alglog.debug("Decrypting Encrypt0:") 

1368 _alglog.debug("* ciphertext = %s", ciphertext.hex()) 

1369 _alglog.debug("* aad = %s", aad.hex()) 

1370 _alglog.debug("* nonce = %s", nonce.hex()) 

1371 _alglog.debug("* key = %s", log_secret(key.hex())) 

1372 _alglog.debug("* algorithm = %s", alg_symmetric) 

1373 try: 

1374 plaintext = alg_symmetric.decrypt(ciphertext, aad, key, nonce) 

1375 except Exception as e: 

1376 _alglog.debug("Unprotecting failed") 

1377 raise e 

1378 

1379 self._post_decrypt_checks( 

1380 external_aad, plaintext, protected_message, request_id 

1381 ) 

1382 

1383 if not is_response and seqno is not None and replay_error is None: 

1384 self.recipient_replay_window.strike_out(seqno) 

1385 

1386 # FIXME add options from unprotected 

1387 

1388 unprotected_message = Message(code=plaintext[0]) 

1389 unprotected_message.payload = unprotected_message.opt.decode(plaintext[1:]) 

1390 unprotected_message.direction = Direction.INCOMING 

1391 

1392 try_initialize = ( 

1393 not self.recipient_replay_window.is_initialized() 

1394 and self.echo_recovery is not None 

1395 ) 

1396 if try_initialize: 

1397 if protected_message.code.is_request(): 

1398 # Either accept into replay window and clear replay error, or raise 

1399 # something that can turn into a 4.01,Echo response 

1400 if unprotected_message.opt.echo == self.echo_recovery: 

1401 self.recipient_replay_window.initialize_from_freshlyseen(seqno) 

1402 replay_error = None 

1403 else: 

1404 raise ReplayErrorWithEcho( 

1405 secctx=self, request_id=request_id, echo=self.echo_recovery 

1406 ) 

1407 else: 

1408 # We can initialize the replay window from a response as well. 

1409 # The response is guaranteed fresh as it was AEAD-decoded to 

1410 # match a request sent by this process. 

1411 # 

1412 # This is rare, as it only works when the server uses an own 

1413 # sequence number, eg. when sending a notification or when 

1414 # acting again on a retransmitted safe request whose response 

1415 # it did not cache. 

1416 # 

1417 # Nothing bad happens if we can't make progress -- we just 

1418 # don't initialize the replay window that wouldn't have been 

1419 # checked for a response anyway. 

1420 if seqno is not None: 

1421 self.recipient_replay_window.initialize_from_freshlyseen(seqno) 

1422 

1423 if replay_error is not None: 

1424 raise replay_error 

1425 

1426 if unprotected_message.code.is_request(): 

1427 if protected_message.opt.observe != 0: 

1428 unprotected_message.opt.observe = None 

1429 else: 

1430 if protected_message.opt.observe is not None: 

1431 # -1 ensures that they sort correctly in later reordering 

1432 # detection. Note that neither -1 nor high (>3 byte) sequence 

1433 # numbers can be serialized in the Observe option, but they are 

1434 # in this implementation accepted for passing around. 

1435 unprotected_message.opt.observe = -1 if seqno is None else seqno 

1436 

1437 _alglog.debug( 

1438 "Unprotecting succeeded, yielding plaintext %s and request_id %s", 

1439 unprotected_message, 

1440 request_id, 

1441 ) 

1442 return unprotected_message, request_id 

1443 

1444 def _get_recipient_key( 

1445 self, protected_message, algorithm: SymmetricEncryptionAlgorithm 

1446 ): 

1447 """Customization hook of the unprotect function 

1448 

1449 While most security contexts have a fixed recipient key, group contexts 

1450 have multiple, and deterministic requests build it on demand.""" 

1451 return self.recipient_key 

1452 

1453 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id): 

1454 """Customization hook of the unprotect function after decryption 

1455 

1456 While most security contexts are good with the default checks, 

1457 deterministic requests need to perform additional checks while AAD and 

1458 plaintext information is still available, and modify the request_id for 

1459 the later protection step of the response.""" 

1460 

1461 @staticmethod 

1462 def _uncompress(option_data, payload): 

1463 if option_data == b"": 

1464 firstbyte = 0 

1465 else: 

1466 firstbyte = option_data[0] 

1467 tail = option_data[1:] 

1468 

1469 unprotected = {} 

1470 

1471 if firstbyte & COMPRESSION_BITS_RESERVED: 

1472 raise DecodeError("Protected data uses reserved fields") 

1473 

1474 pivsz = firstbyte & COMPRESSION_BITS_N 

1475 if pivsz: 

1476 if len(tail) < pivsz: 

1477 raise DecodeError("Partial IV announced but not present") 

1478 unprotected[COSE_PIV] = tail[:pivsz] 

1479 tail = tail[pivsz:] 

1480 

1481 if firstbyte & COMPRESSION_BIT_H: 

1482 # kid context hint 

1483 s = tail[0] 

1484 if len(tail) - 1 < s: 

1485 raise DecodeError("Context hint announced but not present") 

1486 tail = tail[1:] 

1487 unprotected[COSE_KID_CONTEXT] = tail[:s] 

1488 tail = tail[s:] 

1489 

1490 if firstbyte & COMPRESSION_BIT_K: 

1491 kid = tail 

1492 unprotected[COSE_KID] = kid 

1493 

1494 if firstbyte & COMPRESSION_BIT_GROUP: 

1495 # Not really; As this is (also) used early on (before the KID 

1496 # context is even known, because it's just getting extracted), this 

1497 # is returning an incomplete value here and leaves it to the later 

1498 # processing to strip the right number of bytes from the ciphertext 

1499 unprotected[COSE_COUNTERSIGNATURE0] = PRESENT_BUT_NO_VALUE_YET 

1500 

1501 return b"", {}, unprotected, payload 

1502 

1503 @classmethod 

1504 def _extract_encrypted0(cls, message): 

1505 if message.opt.oscore is None: 

1506 raise NotAProtectedMessage("No Object-Security option present", message) 

1507 

1508 protected_serialized, protected, unprotected, ciphertext = cls._uncompress( 

1509 message.opt.oscore, message.payload 

1510 ) 

1511 return protected_serialized, protected, unprotected, ciphertext 

1512 

1513 # implementation defined 

1514 

1515 def context_for_response(self) -> CanProtect: 

1516 """After processing a request with this context, with which security 

1517 context should an outgoing response be protected? By default, it's the 

1518 same context.""" 

1519 # FIXME: Is there any way in which the handler may want to influence 

1520 # the decision taken here? Or would, then, the handler just call a more 

1521 # elaborate but similar function when setting the response's remote 

1522 # already? 

1523 

1524 # FIXME justify by moving into a mixin for CanProtectAndUnprotect 

1525 return self # type: ignore 

1526 

1527 

1528class SecurityContextUtils(BaseSecurityContext): 

1529 def _kdf( 

1530 self, 

1531 salt, 

1532 ikm, 

1533 role_id, 

1534 out_type, 

1535 key_alg: Optional[SymmetricEncryptionAlgorithm] = None, 

1536 ): 

1537 """The HKDF as used to derive sender and recipient key and IV in 

1538 RFC8613 Section 3.2.1, and analogously the Group Encryption Key of oscore-groupcomm. 

1539 """ 

1540 

1541 _alglog.debug("Deriving through KDF:") 

1542 _alglog.debug("* salt = %s", salt.hex() if salt else salt) 

1543 _alglog.debug("* ikm = %s", log_secret(ikm.hex())) 

1544 _alglog.debug("* role_id = %s", role_id.hex()) 

1545 _alglog.debug("* out_type = %r", out_type) 

1546 _alglog.debug("* key_alg = %r", key_alg) 

1547 

1548 # The field in info is called `alg_aead` defined in RFC8613, but in 

1549 # group OSCORE something that's very clearly *not* alg_aead is put in 

1550 # there. 

1551 # 

1552 # The rules about this come both from 

1553 # https://www.ietf.org/archive/id/draft-ietf-core-oscore-groupcomm-23.html#section-2.3 

1554 # and 

1555 # https://www.ietf.org/archive/id/draft-ietf-core-oscore-groupcomm-23.html#section-2.1.9 

1556 # but they produce the same outcome. 

1557 if hasattr(self, "alg_group_enc") and self.alg_group_enc is not None: 

1558 the_field_called_alg_aead = self.alg_group_enc.value 

1559 else: 

1560 assert self.alg_aead is not None, ( 

1561 "At least alg_aead or alg_group_enc needs to be set on a context." 

1562 ) 

1563 the_field_called_alg_aead = self.alg_aead.value 

1564 

1565 assert (key_alg is None) ^ (out_type == "Key") 

1566 if out_type == "Key": 

1567 # Duplicate assertion needed while mypy can not see that the assert 

1568 # above the if is stricter than this. 

1569 assert key_alg is not None 

1570 out_bytes = key_alg.key_bytes 

1571 the_field_called_alg_aead = key_alg.value 

1572 elif out_type == "IV": 

1573 assert self.alg_aead is not None, ( 

1574 "At least alg_aead or alg_group_enc needs to be set on a context." 

1575 ) 

1576 out_bytes = max( 

1577 ( 

1578 a.iv_bytes 

1579 for a in [self.alg_aead, getattr(self, "alg_group_enc", None)] 

1580 if a is not None 

1581 ) 

1582 ) 

1583 elif out_type == "SEKey": 

1584 assert isinstance(self, GroupContext) and self.alg_group_enc is not None, ( 

1585 "SEKey derivation is only defined for group contexts with a group encryption algorithm." 

1586 ) 

1587 # "While the obtained Signature Encryption Key is never used with 

1588 # the Group Encryption Algorithm, its length was chosen to obtain a 

1589 # matching level of security." 

1590 out_bytes = self.alg_group_enc.key_bytes 

1591 else: 

1592 raise ValueError("Output type not recognized") 

1593 

1594 _alglog.debug("* the_field_called_alg_aead = %s", the_field_called_alg_aead) 

1595 

1596 info = [ 

1597 role_id, 

1598 self.id_context, 

1599 the_field_called_alg_aead, 

1600 out_type, 

1601 out_bytes, 

1602 ] 

1603 _alglog.debug("* info = %r", info) 

1604 ret = self._kdf_lowlevel(salt, ikm, info, out_bytes) 

1605 _alglog.debug("Derivation of %r produced %s", out_type, log_secret(ret.hex())) 

1606 return ret 

1607 

1608 def _kdf_for_keystreams(self, piv_generated_by, salt, ikm, role_id, out_type): 

1609 """The HKDF as used to derive the keystreams of oscore-groupcomm.""" 

1610 

1611 out_bytes = self.alg_signature.signature_length 

1612 

1613 assert out_type in ( 

1614 INFO_TYPE_KEYSTREAM_REQUEST, 

1615 INFO_TYPE_KEYSTREAM_RESPONSE, 

1616 ), "Output type not recognized" 

1617 

1618 info = [ 

1619 piv_generated_by, 

1620 self.id_context, 

1621 out_type, 

1622 out_bytes, 

1623 ] 

1624 return self._kdf_lowlevel(salt, ikm, info, out_bytes) 

1625 

1626 def _kdf_lowlevel(self, salt: bytes, ikm: bytes, info: list, l: int) -> bytes: # noqa: E741 (signature follows RFC definition) 

1627 """The HKDF function as used in RFC8613 and oscore-groupcomm (notated 

1628 there as ``something = HKDF(...)`` 

1629 

1630 Note that `info` typically contains `L` at some point. 

1631 

1632 When `info` takes the conventional structure of pid, id_context, 

1633 ald_aead, type, L], it may make sense to extend the `_kdf` function to 

1634 support that case, or `_kdf_for_keystreams` for a different structure, as 

1635 they are the more high-level tools.""" 

1636 hkdf = HKDF( 

1637 algorithm=self.hashfun, 

1638 length=l, 

1639 salt=salt, 

1640 info=cbor.dumps(info), 

1641 backend=_hash_backend, 

1642 ) 

1643 expanded = hkdf.derive(ikm) 

1644 return expanded 

1645 

1646 def derive_keys(self, master_salt, master_secret): 

1647 """Populate sender_key, recipient_key and common_iv from the algorithm, 

1648 hash function and id_context already configured beforehand, and from 

1649 the passed salt and secret.""" 

1650 

1651 self.sender_key = self._kdf( 

1652 master_salt, master_secret, self.sender_id, "Key", self.alg_aead 

1653 ) 

1654 self.recipient_key = self._kdf( 

1655 master_salt, master_secret, self.recipient_id, "Key", self.alg_aead 

1656 ) 

1657 

1658 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV") 

1659 

1660 # really more of the Credentials interface 

1661 

1662 def get_oscore_context_for(self, unprotected): 

1663 """Return a sutiable context (most easily self) for an incoming request 

1664 if its unprotected data (COSE_KID, COSE_KID_CONTEXT) fit its 

1665 description. If it doesn't match, it returns None. 

1666 

1667 The default implementation just strictly checks for whether kid and any 

1668 kid context match (not matching if a local KID context is set but none 

1669 is given in the request); modes like Group OSCORE can spin up aspect 

1670 objects here. 

1671 """ 

1672 if ( 

1673 unprotected.get(COSE_KID, None) == self.recipient_id 

1674 and unprotected.get(COSE_KID_CONTEXT, None) == self.id_context 

1675 ): 

1676 return self 

1677 

1678 

1679class ReplayWindow: 

1680 """A regular replay window of a fixed size. 

1681 

1682 It is implemented as an index and a bitfield (represented by an integer) 

1683 whose least significant bit represents the seqyence number of the index, 

1684 and a 1 indicates that a number was seen. No shenanigans around implicit 

1685 leading ones (think floating point normalization) happen. 

1686 

1687 >>> w = ReplayWindow(32, lambda: None) 

1688 >>> w.initialize_empty() 

1689 >>> w.strike_out(5) 

1690 >>> w.is_valid(3) 

1691 True 

1692 >>> w.is_valid(5) 

1693 False 

1694 >>> w.strike_out(0) 

1695 >>> w.strike_out(1) 

1696 >>> w.strike_out(2) 

1697 >>> w.is_valid(1) 

1698 False 

1699 

1700 Jumping ahead by the window size invalidates older numbers: 

1701 

1702 >>> w.is_valid(4) 

1703 True 

1704 >>> w.strike_out(35) 

1705 >>> w.is_valid(4) 

1706 True 

1707 >>> w.strike_out(36) 

1708 >>> w.is_valid(4) 

1709 False 

1710 

1711 Usage safety 

1712 ------------ 

1713 

1714 For every key, the replay window can only be initielized empty once. On 

1715 later uses, it needs to be persisted by storing the output of 

1716 self.persist() somewhere and loaded from that persisted data. 

1717 

1718 It is acceptable to store persistance data in the strike_out_callback, but 

1719 that must then ensure that the data is written (flushed to a file or 

1720 committed to a database), but that is usually inefficient. 

1721 

1722 Stability 

1723 --------- 

1724 

1725 This class is not considered for stabilization yet and an implementation 

1726 detail of the SecurityContext implementation(s). 

1727 """ 

1728 

1729 _index = None 

1730 """Sequence number represented by the least significant bit of _bitfield""" 

1731 _bitfield = None 

1732 """Integer interpreted as a bitfield, self._size wide. A digit 1 at any bit 

1733 indicates that the bit's index (its power of 2) plus self._index was 

1734 already seen.""" 

1735 

1736 def __init__(self, size, strike_out_callback): 

1737 self._size = size 

1738 self.strike_out_callback = strike_out_callback 

1739 

1740 def is_initialized(self): 

1741 return self._index is not None 

1742 

1743 def initialize_empty(self): 

1744 self._index = 0 

1745 self._bitfield = 0 

1746 

1747 def initialize_from_persisted(self, persisted): 

1748 self._index = persisted["index"] 

1749 self._bitfield = persisted["bitfield"] 

1750 

1751 def initialize_from_freshlyseen(self, seen): 

1752 """Initialize the replay window with a particular value that is just 

1753 being observed in a fresh (ie. generated by the peer later than any 

1754 messages processed before state was lost here) message. This marks the 

1755 seen sequence number and all preceding it as invalid, and and all later 

1756 ones as valid.""" 

1757 self._index = seen 

1758 self._bitfield = 1 

1759 

1760 def is_valid(self, number): 

1761 if number < self._index: 

1762 return False 

1763 if number >= self._index + self._size: 

1764 return True 

1765 return (self._bitfield >> (number - self._index)) & 1 == 0 

1766 

1767 def strike_out(self, number): 

1768 if not self.is_valid(number): 

1769 raise ValueError( 

1770 "Sequence number is not valid any more and " 

1771 "thus can't be removed from the window" 

1772 ) 

1773 overshoot = number - (self._index + self._size - 1) 

1774 if overshoot > 0: 

1775 self._index += overshoot 

1776 self._bitfield >>= overshoot 

1777 assert self.is_valid(number), "Sequence number was not valid before strike-out" 

1778 self._bitfield |= 1 << (number - self._index) 

1779 

1780 self.strike_out_callback() 

1781 

1782 def persist(self): 

1783 """Return a dict containing internal state which can be passed to init 

1784 to recreated the replay window.""" 

1785 

1786 return {"index": self._index, "bitfield": self._bitfield} 

1787 

1788 

1789class FilesystemSecurityContext( 

1790 CanProtect, CanUnprotect, SecurityContextUtils, credentials._Objectish 

1791): 

1792 """Security context stored in a directory as distinct files containing 

1793 containing 

1794 

1795 * Master secret, master salt, sender and recipient ID, 

1796 optionally algorithm, the KDF hash function, and replay window size 

1797 (settings.json and secrets.json, where the latter is typically readable 

1798 only for the user) 

1799 * sequence numbers and replay windows (sequence.json, the only file the 

1800 process needs write access to) 

1801 

1802 The static parameters can all either be placed in settings.json or 

1803 secrets.json, but must not be present in both; the presence of either file 

1804 is sufficient. 

1805 

1806 .. warning:: 

1807 

1808 Security contexts must never be copied around and used after another 

1809 copy was used. They should only ever be moved, and if they are copied 

1810 (eg. as a part of a system backup), restored contexts must not be used 

1811 again; they need to be replaced with freshly created ones. 

1812 

1813 An additional file named `lock` is created to prevent the accidental use of 

1814 a context by to concurrent programs. 

1815 

1816 Note that the sequence number file is updated in an atomic fashion which 

1817 requires file creation privileges in the directory. If privilege separation 

1818 between settings/key changes and sequence number changes is desired, one 

1819 way to achieve that on Linux is giving the aiocoap process's user group 

1820 write permissions on the directory and setting the sticky bit on the 

1821 directory, thus forbidding the user to remove the settings/secret files not 

1822 owned by him. 

1823 

1824 Writes due to sent sequence numbers are reduced by applying a variation on 

1825 the mechanism of RFC8613 Appendix B.1.1 (incrementing the persisted sender 

1826 seqence number in steps of `k`). That value is automatically grown from 

1827 sequence_number_chunksize_start up to sequence_number_chunksize_limit. 

1828 At runtime, the receive window is not stored but kept indeterminate. In 

1829 case of an abnormal shutdown, the server uses the mechanism described in 

1830 Appendix B.1.2 to recover. 

1831 """ 

1832 

1833 # possibly overridden in constructor 

1834 # 

1835 # Type is ignored because while it *is* AlgAead, mypy can't tell. 

1836 alg_aead = algorithms[DEFAULT_ALGORITHM] # type: ignore 

1837 

1838 class LoadError(ValueError): 

1839 """Exception raised with a descriptive message when trying to load a 

1840 faulty security context""" 

1841 

1842 def __init__( 

1843 self, 

1844 basedir: str, 

1845 sequence_number_chunksize_start=10, 

1846 sequence_number_chunksize_limit=10000, 

1847 ): 

1848 self.basedir = basedir 

1849 

1850 self.lockfile: Optional[filelock.FileLock] = filelock.FileLock( 

1851 os.path.join(basedir, "lock") 

1852 ) 

1853 # 0.001: Just fail if it can't be acquired 

1854 # See https://github.com/benediktschmitt/py-filelock/issues/57 

1855 try: 

1856 self.lockfile.acquire(timeout=0.001) 

1857 # see https://github.com/PyCQA/pycodestyle/issues/703 

1858 except: # noqa: E722 

1859 # No lock, no loading, no need to fail in __del__ 

1860 self.lockfile = None 

1861 raise 

1862 

1863 # Always enabled as committing to a file for every received request 

1864 # would be a terrible burden. 

1865 self.echo_recovery = secrets.token_bytes(8) 

1866 

1867 try: 

1868 self._load() 

1869 except KeyError as k: 

1870 raise self.LoadError("Configuration key missing: %s" % (k.args[0],)) 

1871 

1872 self.sequence_number_chunksize_start = sequence_number_chunksize_start 

1873 self.sequence_number_chunksize_limit = sequence_number_chunksize_limit 

1874 self.sequence_number_chunksize = sequence_number_chunksize_start 

1875 

1876 self.sequence_number_persisted = self.sender_sequence_number 

1877 

1878 def _load(self): 

1879 # doesn't check for KeyError on every occasion, relies on __init__ to 

1880 # catch that 

1881 

1882 data = {} 

1883 for readfile in ("secret.json", "settings.json"): 

1884 try: 

1885 with open(os.path.join(self.basedir, readfile)) as f: 

1886 filedata = json.load(f) 

1887 except FileNotFoundError: 

1888 continue 

1889 

1890 for key, value in filedata.items(): 

1891 if key.endswith("_hex"): 

1892 key = key[:-4] 

1893 value = binascii.unhexlify(value) 

1894 elif key.endswith("_ascii"): 

1895 key = key[:-6] 

1896 value = value.encode("ascii") 

1897 

1898 if key in data: 

1899 raise self.LoadError( 

1900 "Datum %r present in multiple input files at %r." 

1901 % (key, self.basedir) 

1902 ) 

1903 

1904 data[key] = value 

1905 

1906 self.alg_aead = algorithms[data.get("algorithm", DEFAULT_ALGORITHM)] 

1907 self.hashfun = hashfunctions[data.get("kdf-hashfun", DEFAULT_HASHFUNCTION)] 

1908 

1909 windowsize = data.get("window", DEFAULT_WINDOWSIZE) 

1910 if not isinstance(windowsize, int): 

1911 raise self.LoadError("Non-integer replay window") 

1912 

1913 self.sender_id = data["sender-id"] 

1914 self.recipient_id = data["recipient-id"] 

1915 

1916 if ( 

1917 max(len(self.sender_id), len(self.recipient_id)) 

1918 > self.alg_aead.iv_bytes - 6 

1919 ): 

1920 raise self.LoadError( 

1921 "Sender or Recipient ID too long (maximum length %s for this algorithm)" 

1922 % (self.alg_aead.iv_bytes - 6) 

1923 ) 

1924 

1925 master_secret = data["secret"] 

1926 master_salt = data.get("salt", b"") 

1927 self.id_context = data.get("id-context", None) 

1928 

1929 self.derive_keys(master_salt, master_secret) 

1930 

1931 self.recipient_replay_window = ReplayWindow( 

1932 windowsize, self._replay_window_changed 

1933 ) 

1934 try: 

1935 with open(os.path.join(self.basedir, "sequence.json")) as f: 

1936 sequence = json.load(f) 

1937 except FileNotFoundError: 

1938 self.sender_sequence_number = 0 

1939 self.recipient_replay_window.initialize_empty() 

1940 self.replay_window_persisted = True 

1941 else: 

1942 self.sender_sequence_number = int(sequence["next-to-send"]) 

1943 received = sequence["received"] 

1944 if received == "unknown": 

1945 # The replay window will stay uninitialized, which triggers 

1946 # Echo recovery 

1947 self.replay_window_persisted = False 

1948 else: 

1949 try: 

1950 self.recipient_replay_window.initialize_from_persisted(received) 

1951 except (ValueError, TypeError, KeyError): 

1952 # Not being particularly careful about what could go wrong: If 

1953 # someone tampers with the replay data, we're already in *big* 

1954 # trouble, of which I fail to see how it would become worse 

1955 # than a crash inside the application around "failure to 

1956 # right-shift a string" or that like; at worst it'd result in 

1957 # nonce reuse which tampering with the replay window file 

1958 # already does. 

1959 raise self.LoadError( 

1960 "Persisted replay window state was not understood" 

1961 ) 

1962 self.replay_window_persisted = True 

1963 

1964 # This is called internally whenever a new sequence number is taken or 

1965 # crossed out from the window, and blocks a lot; B.1 mode mitigates that. 

1966 # 

1967 # Making it async and block in a threadpool would mitigate the blocking of 

1968 # other messages, but the more visible effect of this will be that no 

1969 # matter if sync or async, a reply will need to wait for a file sync 

1970 # operation to conclude. 

1971 def _store(self): 

1972 tmphand, tmpnam = tempfile.mkstemp( 

1973 dir=self.basedir, prefix=".sequence-", suffix=".json", text=True 

1974 ) 

1975 

1976 data = {"next-to-send": self.sequence_number_persisted} 

1977 if not self.replay_window_persisted: 

1978 data["received"] = "unknown" 

1979 else: 

1980 data["received"] = self.recipient_replay_window.persist() 

1981 

1982 # Using io.open (instead os.fdopen) and binary / write with encode 

1983 # rather than dumps as that works even while the interpreter is 

1984 # shutting down. 

1985 # 

1986 # This can be relaxed when there is a defined shutdown sequence for 

1987 # security contexts that's triggered from the general context shutdown 

1988 # -- but right now, there isn't. 

1989 with io.open(tmphand, "wb") as tmpfile: 

1990 tmpfile.write(json.dumps(data).encode("utf8")) 

1991 tmpfile.flush() 

1992 os.fsync(tmpfile.fileno()) 

1993 

1994 os.replace(tmpnam, os.path.join(self.basedir, "sequence.json")) 

1995 

1996 def _replay_window_changed(self): 

1997 if self.replay_window_persisted: 

1998 # Just remove the sequence numbers once from the file 

1999 self.replay_window_persisted = False 

2000 self._store() 

2001 

2002 def post_seqnoincrease(self): 

2003 if self.sender_sequence_number > self.sequence_number_persisted: 

2004 self.sequence_number_persisted += self.sequence_number_chunksize 

2005 

2006 self.sequence_number_chunksize = min( 

2007 self.sequence_number_chunksize * 2, self.sequence_number_chunksize_limit 

2008 ) 

2009 # FIXME: this blocks -- see https://github.com/chrysn/aiocoap/issues/178 

2010 self._store() 

2011 

2012 # The = case would only happen if someone deliberately sets all 

2013 # numbers to 1 to force persisting on every step 

2014 assert self.sender_sequence_number <= self.sequence_number_persisted, ( 

2015 "Using a sequence number that has been persisted already" 

2016 ) 

2017 

2018 def _destroy(self): 

2019 """Release the lock file, and ensure tha he object has become 

2020 unusable. 

2021 

2022 If there is unpersisted state from B.1 operation, the actually used 

2023 number and replay window gets written back to the file to allow 

2024 resumption without wasting digits or round-trips. 

2025 """ 

2026 # FIXME: Arrange for a more controlled shutdown through the credentials 

2027 

2028 self.replay_window_persisted = True 

2029 self.sequence_number_persisted = self.sender_sequence_number 

2030 self._store() 

2031 

2032 del self.sender_key 

2033 del self.recipient_key 

2034 

2035 os.unlink(self.lockfile.lock_file) 

2036 self.lockfile.release() 

2037 

2038 self.lockfile = None 

2039 

2040 def __del__(self): 

2041 if self.lockfile is not None: 

2042 self._destroy() 

2043 

2044 @classmethod 

2045 def from_item(cls, init_data): 

2046 """Overriding _Objectish's from_item because the parameter name for 

2047 basedir is contextfile for historical reasons""" 

2048 

2049 def constructor( 

2050 basedir: Optional[str] = None, contextfile: Optional[str] = None 

2051 ): 

2052 if basedir is not None and contextfile is not None: 

2053 raise credentials.CredentialsLoadError( 

2054 "Conflicting arguments basedir and contextfile; just contextfile instead" 

2055 ) 

2056 if basedir is None and contextfile is None: 

2057 raise credentials.CredentialsLoadError("Missing item 'basedir'") 

2058 if contextfile is not None: 

2059 warnings.warn( 

2060 "Property contextfile was renamed to basedir in OSCORE credentials entries", 

2061 DeprecationWarning, 

2062 stacklevel=2, 

2063 ) 

2064 basedir = contextfile 

2065 assert ( 

2066 basedir is not None 

2067 ) # This helps mypy which would otherwise not see that the above ensures this already 

2068 return cls(basedir) 

2069 

2070 return credentials._call_from_structureddata( 

2071 constructor, cls.__name__, init_data 

2072 ) 

2073 

2074 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

2075 return set((self.recipient_id,)) 

2076 

2077 

2078class GroupContext(ContextWhereExternalAadIsGroup, BaseSecurityContext): 

2079 is_signing = True 

2080 responses_send_kid = True 

2081 

2082 @abc.abstractproperty 

2083 def private_key(self): 

2084 """Private key used to sign outgoing messages. 

2085 

2086 Contexts not designed to send messages may raise a RuntimeError here; 

2087 that necessity may later go away if some more accurate class modelling 

2088 is found.""" 

2089 

2090 @abc.abstractproperty 

2091 def recipient_public_key(self): 

2092 """Public key used to verify incoming messages. 

2093 

2094 Contexts not designed to receive messages (because they'd have aspects 

2095 for that) may raise a RuntimeError here; that necessity may later go 

2096 away if some more accurate class modelling is found.""" 

2097 

2098 

2099class SimpleGroupContext(GroupContext, CanProtect, CanUnprotect, SecurityContextUtils): 

2100 """A context for an OSCORE group 

2101 

2102 This is a non-persistable version of a group context that does not support 

2103 any group manager or rekeying; it is set up statically at startup. 

2104 

2105 It is intended for experimentation and demos, but aims to be correct enough 

2106 to be usable securely. 

2107 """ 

2108 

2109 # set during initialization (making all those attributes rather than 

2110 # possibly properties as they might be in super) 

2111 sender_id = None # type: ignore 

2112 id_context = None # type: ignore 

2113 private_key = None 

2114 alg_aead = None 

2115 hashfun = None # type: ignore 

2116 alg_signature = None 

2117 alg_group_enc = None 

2118 alg_pairwise_key_agreement = None 

2119 sender_auth_cred = None # type: ignore 

2120 group_manager_cred = None # type: ignore 

2121 cred_fmt = None 

2122 # This is currently not evaluated, but any GM interaction will need to have this information available. 

2123 group_manager_cred_fmt = None 

2124 

2125 def __init__( 

2126 self, 

2127 alg_aead, 

2128 hashfun, 

2129 alg_signature, 

2130 alg_group_enc, 

2131 alg_pairwise_key_agreement, 

2132 group_id, 

2133 master_secret, 

2134 master_salt, 

2135 sender_id, 

2136 private_key, 

2137 sender_auth_cred, 

2138 peers, 

2139 group_manager_cred, 

2140 cred_fmt=COSE_KCCS, 

2141 group_manager_cred_fmt=COSE_KCCS, 

2142 ): 

2143 self.sender_id = sender_id 

2144 self.id_context = group_id 

2145 self.private_key = private_key 

2146 self.alg_aead = alg_aead 

2147 self.hashfun = hashfun 

2148 self.alg_signature = alg_signature 

2149 self.alg_group_enc = alg_group_enc 

2150 self.alg_pairwise_key_agreement = alg_pairwise_key_agreement 

2151 self.sender_auth_cred = sender_auth_cred 

2152 self.group_manager_cred = group_manager_cred 

2153 self.cred_fmt = cred_fmt 

2154 self.group_manager_cred_fmt = group_manager_cred_fmt 

2155 

2156 self.peers = peers.keys() 

2157 self.recipient_public_keys = { 

2158 k: self._parse_credential(v) for (k, v) in peers.items() 

2159 } 

2160 self.recipient_auth_creds = peers 

2161 self.recipient_replay_windows = {} 

2162 for k in self.peers: 

2163 # no need to persist, the whole group is ephemeral 

2164 w = ReplayWindow(32, lambda: None) 

2165 w.initialize_empty() 

2166 self.recipient_replay_windows[k] = w 

2167 

2168 self.derive_keys(master_salt, master_secret) 

2169 self.sender_sequence_number = 0 

2170 

2171 sender_public_key = self._parse_credential(sender_auth_cred) 

2172 if ( 

2173 self.alg_signature.public_from_private(self.private_key) 

2174 != sender_public_key 

2175 ): 

2176 raise ValueError( 

2177 "The key in the provided sender credential does not match the private key" 

2178 ) 

2179 

2180 def _parse_credential(self, credential: bytes | _DeterministicKey): 

2181 """Extract the public key (in the public_key format the respective 

2182 AlgorithmCountersign needs) from credentials. This raises a ValueError 

2183 if the credentials do not match the group's cred_fmt, or if the 

2184 parameters do not match those configured in the group. 

2185 

2186 This currently discards any information that is present in the 

2187 credential that exceeds the key. (In a future version, this could 

2188 return both the key and extracted other data, where that other data 

2189 would be stored with the peer this is parsed from). 

2190 """ 

2191 

2192 if credential is DETERMINISTIC_KEY: 

2193 return credential 

2194 

2195 if self.cred_fmt != COSE_KCCS: 

2196 raise ValueError( 

2197 "Credential parsing is currently only implemented for CCSs" 

2198 ) 

2199 

2200 assert self.alg_signature is not None 

2201 

2202 return self.alg_signature.from_kccs(credential) 

2203 

2204 def __repr__(self): 

2205 return "<%s with group %r sender_id %r and %d peers>" % ( 

2206 type(self).__name__, 

2207 self.id_context.hex(), 

2208 self.sender_id.hex(), 

2209 len(self.peers), 

2210 ) 

2211 

2212 @property 

2213 def recipient_public_key(self): 

2214 raise RuntimeError( 

2215 "Group context without key indication was used for verification" 

2216 ) 

2217 

2218 def _get_sender_key(self, outer_message, aad, plaintext, request_id): 

2219 # If we even get here, there has to be a alg_group_enc, and thus the sender key does match it 

2220 return self._sender_key 

2221 

2222 def derive_keys(self, master_salt, master_secret): 

2223 the_main_alg = ( 

2224 self.alg_group_enc if self.alg_group_enc is not None else self.alg_aead 

2225 ) 

2226 

2227 self._sender_key = self._kdf( 

2228 master_salt, master_secret, self.sender_id, "Key", the_main_alg 

2229 ) 

2230 self.recipient_keys = { 

2231 recipient_id: self._kdf( 

2232 master_salt, master_secret, recipient_id, "Key", the_main_alg 

2233 ) 

2234 for recipient_id in self.peers 

2235 } 

2236 

2237 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV") 

2238 

2239 self.signature_encryption_key = self._kdf( 

2240 master_salt, master_secret, b"", "SEKey" 

2241 ) 

2242 

2243 def post_seqnoincrease(self): 

2244 """No-op because it's ephemeral""" 

2245 

2246 def context_from_response(self, unprotected_bag) -> CanUnprotect: 

2247 # sender ID *needs to be* here -- if this were a pairwise request, it 

2248 # would not run through here 

2249 try: 

2250 sender_kid = unprotected_bag[COSE_KID] 

2251 except KeyError: 

2252 raise DecodeError("Group server failed to send own sender KID") 

2253 

2254 if COSE_COUNTERSIGNATURE0 in unprotected_bag: 

2255 return _GroupContextAspect(self, sender_kid) 

2256 else: 

2257 return _PairwiseContextAspect(self, sender_kid) 

2258 

2259 def get_oscore_context_for(self, unprotected): 

2260 if unprotected.get(COSE_KID_CONTEXT, None) != self.id_context: 

2261 return None 

2262 

2263 kid = unprotected.get(COSE_KID, None) 

2264 if kid in self.peers: 

2265 if COSE_COUNTERSIGNATURE0 in unprotected: 

2266 return _GroupContextAspect(self, kid) 

2267 elif self.recipient_public_keys[kid] is DETERMINISTIC_KEY: 

2268 return _DeterministicUnprotectProtoAspect(self, kid) 

2269 else: 

2270 return _PairwiseContextAspect(self, kid) 

2271 

2272 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

2273 # not conflicting: groups always send KID Context 

2274 return set() 

2275 

2276 # yet to stabilize... 

2277 

2278 def pairwise_for(self, recipient_id): 

2279 return _PairwiseContextAspect(self, recipient_id) 

2280 

2281 def for_sending_deterministic_requests( 

2282 self, deterministic_id, target_server: Optional[bytes] 

2283 ): 

2284 return _DeterministicProtectProtoAspect(self, deterministic_id, target_server) 

2285 

2286 

2287class _GroupContextAspect(GroupContext, CanUnprotect, SecurityContextUtils): 

2288 """The concrete context this host has with a particular peer 

2289 

2290 As all actual data is stored in the underlying groupcontext, this acts as 

2291 an accessor to that object (which picks the right recipient key). 

2292 

2293 This accessor is for receiving messages in group mode from a particular 

2294 peer; it does not send (and turns into a pairwise context through 

2295 context_for_response before it comes to that). 

2296 """ 

2297 

2298 def __init__(self, groupcontext: GroupContext, recipient_id: bytes) -> None: 

2299 self.groupcontext = groupcontext 

2300 self.recipient_id = recipient_id 

2301 

2302 def __repr__(self): 

2303 return "<%s inside %r with the peer %r>" % ( 

2304 type(self).__name__, 

2305 self.groupcontext, 

2306 self.recipient_id.hex(), 

2307 ) 

2308 

2309 private_key = None 

2310 

2311 # not inline because the equivalent lambda would not be recognized by mypy 

2312 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2313 @property 

2314 def id_context(self): 

2315 return self.groupcontext.id_context 

2316 

2317 @property 

2318 def alg_aead(self): 

2319 return self.groupcontext.alg_aead 

2320 

2321 @property 

2322 def alg_signature(self): 

2323 return self.groupcontext.alg_signature 

2324 

2325 @property 

2326 def alg_group_enc(self): 

2327 return self.groupcontext.alg_group_enc 

2328 

2329 @property 

2330 def alg_pairwise_key_agreement(self): 

2331 return self.groupcontext.alg_pairwise_key_agreement 

2332 

2333 @property 

2334 def group_manager_cred(self): 

2335 return self.groupcontext.group_manager_cred 

2336 

2337 @property 

2338 def common_iv(self): 

2339 return self.groupcontext.common_iv 

2340 

2341 @property 

2342 def hashfun(self): 

2343 return self.groupcontext.hashfun 

2344 

2345 @property 

2346 def signature_encryption_key(self): 

2347 return self.groupcontext.signature_encryption_key 

2348 

2349 @property 

2350 def recipient_key(self): 

2351 # If we even get here, there has to be a alg_group_enc, and thus the recipient key does match it 

2352 return self.groupcontext.recipient_keys[self.recipient_id] 

2353 

2354 @property 

2355 def recipient_public_key(self): 

2356 return self.groupcontext.recipient_public_keys[self.recipient_id] 

2357 

2358 @property 

2359 def recipient_auth_cred(self): 

2360 return self.groupcontext.recipient_auth_creds[self.recipient_id] 

2361 

2362 @property 

2363 def recipient_replay_window(self): 

2364 return self.groupcontext.recipient_replay_windows[self.recipient_id] 

2365 

2366 def context_for_response(self): 

2367 return self.groupcontext.pairwise_for(self.recipient_id) 

2368 

2369 @property 

2370 def sender_auth_cred(self): 

2371 raise RuntimeError( 

2372 "Could relay the sender auth credential from the group context, but it shouldn't matter here" 

2373 ) 

2374 

2375 

2376class _PairwiseContextAspect( 

2377 GroupContext, CanProtect, CanUnprotect, SecurityContextUtils 

2378): 

2379 is_signing = False 

2380 

2381 def __init__(self, groupcontext, recipient_id): 

2382 self.groupcontext = groupcontext 

2383 self.recipient_id = recipient_id 

2384 

2385 shared_secret = self.alg_pairwise_key_agreement.staticstatic( 

2386 self.groupcontext.private_key, 

2387 self.groupcontext.recipient_public_keys[recipient_id], 

2388 ) 

2389 

2390 self.sender_key = self._kdf( 

2391 self.groupcontext._sender_key, 

2392 ( 

2393 self.groupcontext.sender_auth_cred 

2394 + self.groupcontext.recipient_auth_creds[recipient_id] 

2395 + shared_secret 

2396 ), 

2397 self.groupcontext.sender_id, 

2398 "Key", 

2399 self.alg_group_enc if self.is_signing else self.alg_aead, 

2400 ) 

2401 self.recipient_key = self._kdf( 

2402 self.groupcontext.recipient_keys[recipient_id], 

2403 ( 

2404 self.groupcontext.recipient_auth_creds[recipient_id] 

2405 + self.groupcontext.sender_auth_cred 

2406 + shared_secret 

2407 ), 

2408 self.recipient_id, 

2409 "Key", 

2410 self.alg_group_enc if self.is_signing else self.alg_aead, 

2411 ) 

2412 

2413 def __repr__(self): 

2414 return "<%s based on %r with the peer %r>" % ( 

2415 type(self).__name__, 

2416 self.groupcontext, 

2417 self.recipient_id.hex(), 

2418 ) 

2419 

2420 # FIXME: actually, only to be sent in requests 

2421 

2422 # not inline because the equivalent lambda would not be recognized by mypy 

2423 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2424 @property 

2425 def id_context(self): 

2426 return self.groupcontext.id_context 

2427 

2428 @property 

2429 def alg_aead(self): 

2430 return self.groupcontext.alg_aead 

2431 

2432 @property 

2433 def hashfun(self): 

2434 return self.groupcontext.hashfun 

2435 

2436 @property 

2437 def alg_signature(self): 

2438 return self.groupcontext.alg_signature 

2439 

2440 @property 

2441 def alg_group_enc(self): 

2442 return self.groupcontext.alg_group_enc 

2443 

2444 @property 

2445 def alg_pairwise_key_agreement(self): 

2446 return self.groupcontext.alg_pairwise_key_agreement 

2447 

2448 @property 

2449 def group_manager_cred(self): 

2450 return self.groupcontext.group_manager_cred 

2451 

2452 @property 

2453 def common_iv(self): 

2454 return self.groupcontext.common_iv 

2455 

2456 @property 

2457 def sender_id(self): 

2458 return self.groupcontext.sender_id 

2459 

2460 @property 

2461 def recipient_auth_cred(self): 

2462 return self.groupcontext.recipient_auth_creds[self.recipient_id] 

2463 

2464 @property 

2465 def sender_auth_cred(self): 

2466 return self.groupcontext.sender_auth_cred 

2467 

2468 @property 

2469 def recipient_replay_window(self): 

2470 return self.groupcontext.recipient_replay_windows[self.recipient_id] 

2471 

2472 # Set at initialization (making all those attributes rather than 

2473 # possibly properties as they might be in super) 

2474 recipient_key = None # type: ignore 

2475 sender_key = None 

2476 

2477 @property 

2478 def sender_sequence_number(self): 

2479 return self.groupcontext.sender_sequence_number 

2480 

2481 @sender_sequence_number.setter 

2482 def sender_sequence_number(self, new): 

2483 self.groupcontext.sender_sequence_number = new 

2484 

2485 def post_seqnoincrease(self): 

2486 self.groupcontext.post_seqnoincrease() 

2487 

2488 # same here -- not needed because not signing 

2489 private_key = property(post_seqnoincrease) 

2490 recipient_public_key = property(post_seqnoincrease) 

2491 

2492 def context_from_response(self, unprotected_bag) -> CanUnprotect: 

2493 if unprotected_bag.get(COSE_KID, self.recipient_id) != self.recipient_id: 

2494 raise DecodeError( 

2495 "Response coming from a different server than requested, not attempting to decrypt" 

2496 ) 

2497 

2498 if COSE_COUNTERSIGNATURE0 in unprotected_bag: 

2499 # It'd be an odd thing to do, but it's source verified, so the 

2500 # server hopefully has reasons to make this readable to other group 

2501 # members. 

2502 return _GroupContextAspect(self.groupcontext, self.recipient_id) 

2503 else: 

2504 return self 

2505 

2506 

2507class _DeterministicProtectProtoAspect( 

2508 ContextWhereExternalAadIsGroup, CanProtect, SecurityContextUtils 

2509): 

2510 """This implements the sending side of Deterministic Requests. 

2511 

2512 While simialr to a _PairwiseContextAspect, it only derives the key at 

2513 protection time, as the plain text is hashed into the key.""" 

2514 

2515 deterministic_hashfun = hashes.SHA256() 

2516 

2517 def __init__(self, groupcontext, sender_id, target_server: Optional[bytes]): 

2518 self.groupcontext = groupcontext 

2519 self.sender_id = sender_id 

2520 self.target_server = target_server 

2521 

2522 def __repr__(self): 

2523 return "<%s based on %r with the sender ID %r%s>" % ( 

2524 type(self).__name__, 

2525 self.groupcontext, 

2526 self.sender_id.hex(), 

2527 "limited to responses from %s" % self.target_server 

2528 if self.target_server is not None 

2529 else "", 

2530 ) 

2531 

2532 def new_sequence_number(self): 

2533 return 0 

2534 

2535 def post_seqnoincrease(self): 

2536 pass 

2537 

2538 def context_from_response(self, unprotected_bag): 

2539 if self.target_server is None: 

2540 if COSE_KID not in unprotected_bag: 

2541 raise DecodeError( 

2542 "Server did not send a KID and no particular one was addressed" 

2543 ) 

2544 else: 

2545 if unprotected_bag.get(COSE_KID, self.target_server) != self.target_server: 

2546 raise DecodeError( 

2547 "Response coming from a different server than requested, not attempting to decrypt" 

2548 ) 

2549 

2550 if COSE_COUNTERSIGNATURE0 not in unprotected_bag: 

2551 # Could just as well pass and later barf when the group context doesn't find a signature 

2552 raise DecodeError( 

2553 "Response to deterministic request came from unsecure pairwise context" 

2554 ) 

2555 

2556 return _GroupContextAspect( 

2557 self.groupcontext, unprotected_bag.get(COSE_KID, self.target_server) 

2558 ) 

2559 

2560 def _get_sender_key(self, outer_message, aad, plaintext, request_id): 

2561 if outer_message.code.is_response(): 

2562 raise RuntimeError("Deterministic contexts shouldn't protect responses") 

2563 

2564 basekey = self.groupcontext.recipient_keys[self.sender_id] 

2565 

2566 h = hashes.Hash(self.deterministic_hashfun) 

2567 h.update(basekey) 

2568 h.update(aad) 

2569 h.update(plaintext) 

2570 request_hash = h.finalize() 

2571 

2572 outer_message.opt.request_hash = request_hash 

2573 outer_message.code = FETCH 

2574 

2575 # By this time, the AADs have all been calculated already; setting this 

2576 # for the benefit of the response parsing later 

2577 request_id.request_hash = request_hash 

2578 # FIXME I don't think this ever comes to bear but want to be sure 

2579 # before removing this line (this should only be client-side) 

2580 request_id.can_reuse_nonce = False 

2581 # FIXME: we're still sending a h'00' PIV. Not wrong, just a wasted byte. 

2582 

2583 return self._kdf(basekey, request_hash, self.sender_id, "Key", self.alg_aead) 

2584 

2585 # details needed for various operations, especially eAAD generation 

2586 

2587 @property 

2588 def sender_auth_cred(self): 

2589 # When preparing the external_aad, the element 'sender_cred' in the 

2590 # aad_array takes the empty CBOR byte string (0x40). 

2591 return b"" 

2592 

2593 # not inline because the equivalent lambda would not be recognized by mypy 

2594 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2595 @property 

2596 def alg_aead(self): 

2597 return self.groupcontext.alg_aead 

2598 

2599 @property 

2600 def alg_group_enc(self): 

2601 return self.groupcontext.alg_group_enc 

2602 

2603 @property 

2604 def alg_pairwise_key_agreement(self): 

2605 return self.groupcontext.alg_pairwise_key_agreement 

2606 

2607 @property 

2608 def hashfun(self): 

2609 return self.groupcontext.hashfun 

2610 

2611 @property 

2612 def common_iv(self): 

2613 return self.groupcontext.common_iv 

2614 

2615 @property 

2616 def id_context(self): 

2617 return self.groupcontext.id_context 

2618 

2619 @property 

2620 def alg_signature(self): 

2621 return self.groupcontext.alg_signature 

2622 

2623 @property 

2624 def group_manager_cred(self): 

2625 return self.groupcontext.group_manager_cred 

2626 

2627 

2628class _DeterministicUnprotectProtoAspect( 

2629 ContextWhereExternalAadIsGroup, CanUnprotect, SecurityContextUtils 

2630): 

2631 """This implements the sending side of Deterministic Requests. 

2632 

2633 While simialr to a _PairwiseContextAspect, it only derives the key at 

2634 unprotection time, based on information given as Request-Hash.""" 

2635 

2636 # Unless None, this is the value by which the running process recognizes 

2637 # that the second phase of a B.1.2 replay window recovery Echo option comes 

2638 # from the current process, and thus its sequence number is fresh 

2639 echo_recovery = None 

2640 

2641 deterministic_hashfun = hashes.SHA256() 

2642 

2643 class ZeroIsAlwaysValid: 

2644 """Special-purpose replay window that accepts 0 indefinitely""" 

2645 

2646 def is_initialized(self): 

2647 return True 

2648 

2649 def is_valid(self, number): 

2650 # No particular reason to be lax here 

2651 return number == 0 

2652 

2653 def strike_out(self, number): 

2654 # FIXME: I'd rather indicate here that it's a potential replay, have the 

2655 # request_id.can_reuse_nonce = False 

2656 # set here rather than in _post_decrypt_checks, and thus also get 

2657 # the check for whether it's a safe method 

2658 pass 

2659 

2660 def persist(self): 

2661 pass 

2662 

2663 def __init__(self, groupcontext, recipient_id): 

2664 self.groupcontext = groupcontext 

2665 self.recipient_id = recipient_id 

2666 

2667 self.recipient_replay_window = self.ZeroIsAlwaysValid() 

2668 

2669 def __repr__(self): 

2670 return "<%s based on %r with the recipient ID %r>" % ( 

2671 type(self).__name__, 

2672 self.groupcontext, 

2673 self.recipient_id.hex(), 

2674 ) 

2675 

2676 def context_for_response(self): 

2677 return self.groupcontext 

2678 

2679 def _get_recipient_key(self, protected_message, algorithm): 

2680 logging.critical( 

2681 "Deriving recipient key for protected message %s", protected_message 

2682 ) 

2683 return self._kdf( 

2684 self.groupcontext.recipient_keys[self.recipient_id], 

2685 protected_message.opt.request_hash, 

2686 self.recipient_id, 

2687 "Key", 

2688 algorithm, 

2689 ) 

2690 

2691 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id): 

2692 if plaintext[0] not in (GET, FETCH): # FIXME: "is safe" 

2693 # FIXME: accept but return inner Unauthorized. (Raising Unauthorized 

2694 # here would just create an unprotected Unauthorized, which is not 

2695 # what's spec'd for here) 

2696 raise ProtectionInvalid("Request was not safe") 

2697 

2698 basekey = self.groupcontext.recipient_keys[self.recipient_id] 

2699 

2700 h = hashes.Hash(self.deterministic_hashfun) 

2701 h.update(basekey) 

2702 h.update(aad) 

2703 h.update(plaintext) 

2704 request_hash = h.finalize() 

2705 

2706 if request_hash != protected_message.opt.request_hash: 

2707 raise ProtectionInvalid( 

2708 "Client's hash of the plaintext diverges from the actual request hash" 

2709 ) 

2710 

2711 # This is intended for the protection of the response, and the 

2712 # later use in signature in the unprotect function is not happening 

2713 # here anyway, neither is the later use for Echo requests 

2714 request_id.request_hash = request_hash 

2715 request_id.can_reuse_nonce = False 

2716 

2717 # details needed for various operations, especially eAAD generation 

2718 

2719 @property 

2720 def recipient_auth_cred(self): 

2721 # When preparing the external_aad, the element 'sender_cred' in the 

2722 # aad_array takes the empty CBOR byte string (0x40). 

2723 return b"" 

2724 

2725 # not inline because the equivalent lambda would not be recognized by mypy 

2726 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2727 @property 

2728 def alg_aead(self): 

2729 return self.groupcontext.alg_aead 

2730 

2731 @property 

2732 def alg_group_enc(self): 

2733 return self.groupcontext.alg_group_enc 

2734 

2735 @property 

2736 def alg_pairwise_key_agreement(self): 

2737 return self.groupcontext.alg_pairwise_key_agreement 

2738 

2739 @property 

2740 def hashfun(self): 

2741 return self.groupcontext.hashfun 

2742 

2743 @property 

2744 def common_iv(self): 

2745 return self.groupcontext.common_iv 

2746 

2747 @property 

2748 def id_context(self): 

2749 return self.groupcontext.id_context 

2750 

2751 @property 

2752 def alg_signature(self): 

2753 return self.groupcontext.alg_signature 

2754 

2755 @property 

2756 def group_manager_cred(self): 

2757 return self.groupcontext.group_manager_cred 

2758 

2759 

2760def verify_start(message): 

2761 """Extract the unprotected COSE options from a 

2762 message for the verifier to then pick a security context to actually verify 

2763 the message. (Future versions may also report fields from both unprotected 

2764 and protected, if the protected bag is ever used with OSCORE.). 

2765 

2766 Call this only requests; for responses, you'll have to know the security 

2767 context anyway, and there is usually no information to be gained.""" 

2768 

2769 _, _, unprotected, _ = CanUnprotect._extract_encrypted0(message) 

2770 

2771 return unprotected 

2772 

2773 

2774_getattr__ = deprecation_getattr( 

2775 { 

2776 "COSE_COUNTERSINGATURE0": "COSE_COUNTERSIGNATURE0", 

2777 "Algorithm": "AeadAlgorithm", 

2778 }, 

2779 globals(), 

2780)