Coverage for aiocoap / oscore.py: 85%

1220 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 12:28 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module contains the tools to send OSCORE secured messages. 

6 

7It only deals with the algorithmic parts, the security context and protection 

8and unprotection of messages. It does not touch on the integration of OSCORE in 

9the larger aiocoap stack of having a context or requests; that's what 

10:mod:`aiocoap.transports.osore` is for.`""" 

11 

12from __future__ import annotations 

13 

14from collections import namedtuple 

15import io 

16import json 

17import binascii 

18import os 

19import os.path 

20import tempfile 

21import abc 

22from typing import Optional, List, Any, Tuple 

23import secrets 

24import warnings 

25import logging 

26 

27from aiocoap.message import Message, Direction 

28from aiocoap.util import cryptography_additions, deprecation_getattr, Sentinel 

29from aiocoap.numbers import GET, POST, FETCH, CHANGED, UNAUTHORIZED, CONTENT 

30from aiocoap import error 

31from . import credentials 

32from .util import DeprecationWarning 

33from aiocoap.defaults import log_secret 

34 

35from cryptography.hazmat.primitives.ciphers import aead 

36from cryptography.hazmat.primitives.kdf.hkdf import HKDF 

37from cryptography.hazmat.primitives import ciphers, hashes 

38import cryptography.hazmat.backends 

39import cryptography.exceptions 

40from cryptography.hazmat.primitives import asymmetric, serialization 

41from cryptography.hazmat.primitives.asymmetric.utils import ( 

42 decode_dss_signature, 

43 encode_dss_signature, 

44) 

45 

46import cbor2 as cbor 

47 

48import filelock 

49 

50# Logger through which log events from cryptographic operations (both inside 

51# the primitives and around key derivation) are traced. 

52_alglog = logging.getLogger("aiocoap.cryptography") 

53 

54MAX_SEQNO = 2**40 - 1 

55 

56# Relevant values from the IANA registry "CBOR Object Signing and Encryption (COSE)" 

57COSE_KID = 4 

58COSE_PIV = 6 

59COSE_KID_CONTEXT = 10 

60# from RFC9338 

61COSE_COUNTERSIGNATURE0 = 12 

62# from RFC9528 

63COSE_KCCS = 14 

64 

65COMPRESSION_BITS_N = 0b111 

66COMPRESSION_BIT_K = 0b1000 

67COMPRESSION_BIT_H = 0b10000 

68COMPRESSION_BIT_GROUP = 0b100000 # Group Flag from draft-ietf-core-oscore-groupcomm-21 

69COMPRESSION_BITS_RESERVED = 0b11000000 

70 

71CWT_CLAIM_CNF = 8 

72CWT_CNF_COSE_KEY = 1 

73COSE_KEY_COMMON_KTY = 1 

74COSE_KTY_OKP = 1 

75COSE_KTY_EC2 = 2 

76COSE_KEY_COMMON_ALG = 3 

77COSE_KEY_OKP_CRV = -1 

78COSE_KEY_OKP_X = -2 

79COSE_KEY_EC2_X = -2 

80COSE_KEY_EC2_Y = -3 

81 

82# While the original values were simple enough to be used in literals, starting 

83# with oscore-groupcomm we're using more compact values 

84 

85INFO_TYPE_KEYSTREAM_REQUEST = True 

86INFO_TYPE_KEYSTREAM_RESPONSE = False 

87 

88PRESENT_BUT_NO_VALUE_YET = Sentinel("Value will be populated later") 

89 

90 

91class CodeStyle(namedtuple("_CodeStyle", ("request", "response"))): 

92 FETCH_CONTENT: CodeStyle 

93 POST_CHANGED: CodeStyle 

94 

95 @classmethod 

96 def from_request(cls, request) -> CodeStyle: 

97 if request == FETCH: 

98 return cls.FETCH_CONTENT 

99 elif request == POST: 

100 return cls.POST_CHANGED 

101 else: 

102 raise ValueError("Invalid request code %r" % request) 

103 

104 

105CodeStyle.FETCH_CONTENT = CodeStyle(FETCH, CONTENT) 

106CodeStyle.POST_CHANGED = CodeStyle(POST, CHANGED) 

107 

108 

109class _DeterministicKey: 

110 """Singleton to indicate that for this key member no public or private key 

111 is available because it is the Deterministic Client (see 

112 <https://www.ietf.org/archive/id/draft-amsuess-core-cachable-oscore-01.html>) 

113 

114 This is highly experimental not only from an implementation but also from a 

115 specification point of view. The specification has not received adaequate 

116 review that would justify using it in any non-experimental scenario. 

117 """ 

118 

119 

120DETERMINISTIC_KEY = _DeterministicKey() 

121 

122 

123class NotAProtectedMessage(error.Error, ValueError): 

124 """Raised when verification is attempted on a non-OSCORE message""" 

125 

126 def __init__(self, message, plain_message): 

127 super().__init__(message) 

128 self.plain_message = plain_message 

129 

130 

131class ProtectionInvalid(error.Error, ValueError): 

132 """Raised when verification of an OSCORE message fails""" 

133 

134 

135class DecodeError(ProtectionInvalid): 

136 """Raised when verification of an OSCORE message fails because CBOR or compressed data were erroneous""" 

137 

138 

139class ReplayError(ProtectionInvalid): 

140 """Raised when verification of an OSCORE message fails because the sequence numbers was already used""" 

141 

142 

143class ReplayErrorWithEcho(ProtectionInvalid, error.RenderableError): 

144 """Raised when verification of an OSCORE message fails because the 

145 recipient replay window is uninitialized, but a 4.01 Echo can be 

146 constructed with the data in the exception that can lead to the client 

147 assisting in replay window recovery""" 

148 

149 def __init__(self, secctx, request_id, echo): 

150 self.secctx = secctx 

151 self.request_id = request_id 

152 self.echo = echo 

153 

154 def to_message(self): 

155 inner = Message( 

156 code=UNAUTHORIZED, 

157 echo=self.echo, 

158 ) 

159 outer, _ = self.secctx.protect(inner, request_id=self.request_id) 

160 return outer 

161 

162 

163class ContextUnavailable(error.Error, ValueError): 

164 """Raised when a context is (currently or permanently) unavailable for 

165 protecting or unprotecting a message""" 

166 

167 

168class RequestIdentifiers: 

169 """A container for details that need to be passed along from the 

170 (un)protection of a request to the (un)protection of the response; these 

171 data ensure that the request-response binding process works by passing 

172 around the request's partial IV. 

173 

174 Users of this module should never create or interact with instances, but 

175 just pass them around. 

176 """ 

177 

178 def __init__(self, kid, partial_iv, can_reuse_nonce, request_code): 

179 # The sender ID of whoever generated the partial IV 

180 self.kid = kid 

181 self.partial_iv = partial_iv 

182 self.can_reuse_nonce = can_reuse_nonce 

183 self.code_style = CodeStyle.from_request(request_code) 

184 

185 self.request_hash = None 

186 

187 def get_reusable_kid_and_piv(self): 

188 """Return the kid and the partial IV if can_reuse_nonce is True, and 

189 set can_reuse_nonce to False.""" 

190 

191 if self.can_reuse_nonce: 

192 self.can_reuse_nonce = False 

193 return (self.kid, self.partial_iv) 

194 else: 

195 return (None, None) 

196 

197 

198def _xor_bytes(a, b): 

199 assert len(a) == len(b), "XOR needs consistent lengths" 

200 # FIXME is this an efficient thing to do, or should we store everything 

201 # that possibly needs xor'ing as long integers with an associated length? 

202 return bytes(_a ^ _b for (_a, _b) in zip(a, b)) 

203 

204 

205class SymmetricEncryptionAlgorithm(metaclass=abc.ABCMeta): 

206 """A symmetric algorithm 

207 

208 The algorithm's API is the AEAD API with additional authenticated data: The 

209 algorithm may or may not verify that data. Algorithms that actually do 

210 verify the data are recognized by also being AeadAlgorithm. 

211 """ 

212 

213 value: int 

214 key_bytes: int 

215 tag_bytes: int 

216 iv_bytes: int 

217 

218 @abc.abstractmethod 

219 def encrypt(cls, plaintext, aad, key, iv): 

220 """Return ciphertext + tag for given input data""" 

221 

222 @abc.abstractmethod 

223 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

224 """Reverse encryption. Must raise ProtectionInvalid on any error 

225 stemming from untrusted data.""" 

226 

227 @staticmethod 

228 def _build_encrypt0_structure(protected, external_aad): 

229 assert protected == {}, "Unexpected data in protected bucket" 

230 protected_serialized = b"" # were it into an empty dict, it'd be the cbor dump 

231 enc_structure = ["Encrypt0", protected_serialized, external_aad] 

232 

233 return cbor.dumps(enc_structure) 

234 

235 

236class AeadAlgorithm(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta): 

237 """A symmetric algorithm that provides authentication, including 

238 authentication of additional data.""" 

239 

240 

241class AES_CBC(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta): 

242 """AES in CBC mode using the Python cryptography library""" 

243 

244 tag_bytes = 0 

245 # This introduces padding -- this library doesn't need to care because 

246 # Python does allocation for us, but others may need to rethink their 

247 # buffer allocation strategies. 

248 

249 @classmethod 

250 def _cipher(cls, key, iv): 

251 return ciphers.base.Cipher( 

252 ciphers.algorithms.AES(key), 

253 ciphers.modes.CBC(iv), 

254 ) 

255 

256 @classmethod 

257 def encrypt(cls, plaintext, _aad, key, iv): 

258 # FIXME: Ignoring aad violates https://www.rfc-editor.org/rfc/rfc9459.html#name-implementation-consideratio but is required for Group OSCORE 

259 

260 # Padding according to https://www.rfc-editor.org/rfc/rfc5652#section-6.3 

261 k = cls.key_bytes 

262 assert k < 256, ( 

263 "Algorithm with this key size should not have been created in the first plae" 

264 ) 

265 pad_byte = k - (len(plaintext) % k) 

266 pad_bytes = bytes((pad_byte,)) * pad_byte 

267 plaintext += pad_bytes 

268 

269 encryptor = cls._cipher(key, iv).encryptor() 

270 result = encryptor.update(plaintext) 

271 result += encryptor.finalize() 

272 return result 

273 

274 @classmethod 

275 def decrypt(cls, ciphertext_and_tag, _aad, key, iv): 

276 # FIXME: Ignoring aad violates https://www.rfc-editor.org/rfc/rfc9459.html#name-implementation-consideratio but is required for Group OSCORE 

277 

278 k = cls.key_bytes 

279 if ciphertext_and_tag == b"" or len(ciphertext_and_tag) % k != 0: 

280 raise ProtectionInvalid("Message length does not match padding") 

281 

282 decryptor = cls._cipher(key, iv).decryptor() 

283 result = decryptor.update(ciphertext_and_tag) 

284 result += decryptor.finalize() 

285 

286 # Padding according to https://www.rfc-editor.org/rfc/rfc5652#section-6.3 

287 claimed_padding = result[-1] 

288 if claimed_padding == 0 or claimed_padding > k: 

289 raise ProtectionInvalid("Padding does not match key") 

290 if result[-claimed_padding:] != bytes((claimed_padding,)) * claimed_padding: 

291 raise ProtectionInvalid("Padding is inconsistent") 

292 

293 return result[:-claimed_padding] 

294 

295 

296class A128CBC(AES_CBC): 

297 # from RFC9459 

298 value = -65531 

299 key_bytes = 16 # 128-bit key 

300 iv_bytes = 16 # 16-octet nonce 

301 

302 

303class AES_CCM(AeadAlgorithm, metaclass=abc.ABCMeta): 

304 """AES-CCM implemented using the Python cryptography library""" 

305 

306 @classmethod 

307 def encrypt(cls, plaintext, aad, key, iv): 

308 return aead.AESCCM(key, cls.tag_bytes).encrypt(iv, plaintext, aad) 

309 

310 @classmethod 

311 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

312 try: 

313 return aead.AESCCM(key, cls.tag_bytes).decrypt(iv, ciphertext_and_tag, aad) 

314 except cryptography.exceptions.InvalidTag: 

315 raise ProtectionInvalid("Tag invalid") 

316 

317 

318class AES_CCM_16_64_128(AES_CCM): 

319 # from RFC8152 and draft-ietf-core-object-security-0[012] 3.2.1 

320 value = 10 

321 key_bytes = 16 # 128-bit key 

322 tag_bytes = 8 # 64-bit tag 

323 iv_bytes = 13 # 13-byte nonce 

324 

325 

326class AES_CCM_16_64_256(AES_CCM): 

327 # from RFC8152 

328 value = 11 

329 key_bytes = 32 # 256-bit key 

330 tag_bytes = 8 # 64-bit tag 

331 iv_bytes = 13 # 13-byte nonce 

332 

333 

334class AES_CCM_64_64_128(AES_CCM): 

335 # from RFC8152 

336 value = 12 

337 key_bytes = 16 # 128-bit key 

338 tag_bytes = 8 # 64-bit tag 

339 iv_bytes = 7 # 7-byte nonce 

340 

341 

342class AES_CCM_64_64_256(AES_CCM): 

343 # from RFC8152 

344 value = 13 

345 key_bytes = 32 # 256-bit key 

346 tag_bytes = 8 # 64-bit tag 

347 iv_bytes = 7 # 7-byte nonce 

348 

349 

350class AES_CCM_16_128_128(AES_CCM): 

351 # from RFC8152 

352 value = 30 

353 key_bytes = 16 # 128-bit key 

354 tag_bytes = 16 # 128-bit tag 

355 iv_bytes = 13 # 13-byte nonce 

356 

357 

358class AES_CCM_16_128_256(AES_CCM): 

359 # from RFC8152 

360 value = 31 

361 key_bytes = 32 # 256-bit key 

362 tag_bytes = 16 # 128-bit tag 

363 iv_bytes = 13 # 13-byte nonce 

364 

365 

366class AES_CCM_64_128_128(AES_CCM): 

367 # from RFC8152 

368 value = 32 

369 key_bytes = 16 # 128-bit key 

370 tag_bytes = 16 # 128-bit tag 

371 iv_bytes = 7 # 7-byte nonce 

372 

373 

374class AES_CCM_64_128_256(AES_CCM): 

375 # from RFC8152 

376 value = 33 

377 key_bytes = 32 # 256-bit key 

378 tag_bytes = 16 # 128-bit tag 

379 iv_bytes = 7 # 7-byte nonce 

380 

381 

382class AES_GCM(AeadAlgorithm, metaclass=abc.ABCMeta): 

383 """AES-GCM implemented using the Python cryptography library""" 

384 

385 iv_bytes = 12 # 96 bits fixed size of the nonce 

386 

387 @classmethod 

388 def encrypt(cls, plaintext, aad, key, iv): 

389 return aead.AESGCM(key).encrypt(iv, plaintext, aad) 

390 

391 @classmethod 

392 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

393 try: 

394 return aead.AESGCM(key).decrypt(iv, ciphertext_and_tag, aad) 

395 except cryptography.exceptions.InvalidTag: 

396 raise ProtectionInvalid("Tag invalid") 

397 

398 

399class A128GCM(AES_GCM): 

400 # from RFC8152 

401 value = 1 

402 key_bytes = 16 # 128-bit key 

403 tag_bytes = 16 # 128-bit tag 

404 

405 

406class A192GCM(AES_GCM): 

407 # from RFC8152 

408 value = 2 

409 key_bytes = 24 # 192-bit key 

410 tag_bytes = 16 # 128-bit tag 

411 

412 

413class A256GCM(AES_GCM): 

414 # from RFC8152 

415 value = 3 

416 key_bytes = 32 # 256-bit key 

417 tag_bytes = 16 # 128-bit tag 

418 

419 

420class ChaCha20Poly1305(AeadAlgorithm): 

421 # from RFC8152 

422 value = 24 

423 key_bytes = 32 # 256-bit key 

424 tag_bytes = 16 # 128-bit tag 

425 iv_bytes = 12 # 96-bit nonce 

426 

427 @classmethod 

428 def encrypt(cls, plaintext, aad, key, iv): 

429 return aead.ChaCha20Poly1305(key).encrypt(iv, plaintext, aad) 

430 

431 @classmethod 

432 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

433 try: 

434 return aead.ChaCha20Poly1305(key).decrypt(iv, ciphertext_and_tag, aad) 

435 except cryptography.exceptions.InvalidTag: 

436 raise ProtectionInvalid("Tag invalid") 

437 

438 

439class AlgorithmCountersign(metaclass=abc.ABCMeta): 

440 """A fully parameterized COSE countersign algorithm 

441 

442 An instance is able to provide all the alg_signature, par_countersign and 

443 par_countersign_key parameters that go into the Group OSCORE algorithms 

444 field. 

445 """ 

446 

447 value: int | str 

448 

449 @abc.abstractmethod 

450 def sign(self, body, external_aad, private_key): 

451 """Return the signature produced by the key when using 

452 CounterSignature0 as describe in draft-ietf-cose-countersign-01""" 

453 

454 @abc.abstractmethod 

455 def verify(self, signature, body, external_aad, public_key): 

456 """Verify a signature in analogy to sign""" 

457 

458 @abc.abstractmethod 

459 def generate_with_ccs(self) -> Tuple[Any, bytes]: 

460 """Return a usable private key along with a CCS describing it""" 

461 

462 @abc.abstractmethod 

463 def public_from_private(self, private_key): 

464 """Given a private key, derive the publishable key""" 

465 

466 @abc.abstractmethod 

467 def from_kccs(self, ccs: bytes) -> Any: 

468 """Given a CCS, extract the public key, or raise a ValueError if the 

469 credential format does not align with the type. 

470 

471 The type is not exactly Any, but whichever type is used by this 

472 algorithm class.""" 

473 

474 @staticmethod 

475 def _build_countersign_structure(body, external_aad): 

476 countersign_structure = [ 

477 "CounterSignature0", 

478 b"", 

479 b"", 

480 external_aad, 

481 body, 

482 ] 

483 tobesigned = cbor.dumps(countersign_structure) 

484 return tobesigned 

485 

486 @abc.abstractproperty 

487 def signature_length(self) -> int: 

488 """The length of a signature using this algorithm""" 

489 

490 @abc.abstractproperty 

491 def curve_number(self) -> int: 

492 """Registered curve number used with this algorithm. 

493 

494 Only used for verification of credentials' details""" 

495 

496 

497class AlgorithmStaticStatic(metaclass=abc.ABCMeta): 

498 @abc.abstractmethod 

499 def staticstatic(self, private_key, public_key): 

500 """Derive a shared static-static secret from a private and a public key""" 

501 

502 

503def _from_kccs_common(ccs: bytes) -> dict: 

504 """Check that the CCS contains a CNF claim that is a COSE Key, and return 

505 that key""" 

506 

507 try: 

508 parsed = cbor.loads(ccs) 

509 except cbor.CBORDecodeError as e: 

510 raise ValueError("CCS not in CBOR format") from e 

511 

512 if ( 

513 not isinstance(parsed, dict) 

514 or CWT_CLAIM_CNF not in parsed 

515 or not isinstance(parsed[CWT_CLAIM_CNF], dict) 

516 or CWT_CNF_COSE_KEY not in parsed[CWT_CLAIM_CNF] 

517 or not isinstance(parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY], dict) 

518 ): 

519 raise ValueError("CCS must contain a COSE Key dict in a CNF") 

520 

521 return parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY] 

522 

523 

524class Ed25519(AlgorithmCountersign): 

525 def sign(self, body, aad, private_key): 

526 _alglog.debug("Performing signature:") 

527 _alglog.debug("* body: %s", body.hex()) 

528 _alglog.debug("* AAD: %s", aad.hex()) 

529 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes( 

530 private_key 

531 ) 

532 return private_key.sign(self._build_countersign_structure(body, aad)) 

533 

534 def verify(self, signature, body, aad, public_key): 

535 _alglog.debug("Verifying signature:") 

536 _alglog.debug("* body: %s", body.hex()) 

537 _alglog.debug("* AAD: %s", aad.hex()) 

538 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key) 

539 try: 

540 public_key.verify(signature, self._build_countersign_structure(body, aad)) 

541 except cryptography.exceptions.InvalidSignature: 

542 _alglog.debug("Signature was invalid.") 

543 raise ProtectionInvalid("Signature mismatch") 

544 

545 def _generate(self): 

546 key = asymmetric.ed25519.Ed25519PrivateKey.generate() 

547 # FIXME: We could avoid handing the easy-to-misuse bytes around if the 

548 # current algorithm interfaces did not insist on passing the 

549 # exchangeable representations -- and generally that should be more 

550 # efficient. 

551 return key.private_bytes( 

552 encoding=serialization.Encoding.Raw, 

553 format=serialization.PrivateFormat.Raw, 

554 encryption_algorithm=serialization.NoEncryption(), 

555 ) 

556 

557 def generate_with_ccs(self) -> Tuple[Any, bytes]: 

558 private = self._generate() 

559 public = self.public_from_private(private) 

560 

561 ccs = cbor.dumps( 

562 { 

563 CWT_CLAIM_CNF: { 

564 CWT_CNF_COSE_KEY: { 

565 COSE_KEY_COMMON_KTY: COSE_KTY_OKP, 

566 COSE_KEY_COMMON_ALG: self.value, 

567 COSE_KEY_OKP_CRV: self.curve_number, 

568 COSE_KEY_OKP_X: public, 

569 } 

570 } 

571 } 

572 ) 

573 

574 return (private, ccs) 

575 

576 def public_from_private(self, private_key): 

577 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes( 

578 private_key 

579 ) 

580 public_key = private_key.public_key() 

581 return public_key.public_bytes( 

582 encoding=serialization.Encoding.Raw, 

583 format=serialization.PublicFormat.Raw, 

584 ) 

585 

586 def from_kccs(self, ccs: bytes) -> Any: 

587 # eg. {1: 1, 3: -8, -1: 6, -2: h'77 ... 88'} 

588 cose_key = _from_kccs_common(ccs) 

589 

590 if ( 

591 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_OKP 

592 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value 

593 and cose_key.get(COSE_KEY_OKP_CRV) == self.curve_number 

594 and COSE_KEY_OKP_X in cose_key 

595 ): 

596 return cose_key[COSE_KEY_OKP_X] 

597 else: 

598 raise ValueError("Key type not recognized from CCS key %r" % cose_key) 

599 

600 value = -8 

601 curve_number = 6 

602 

603 signature_length = 64 

604 

605 

606class EcdhSsHkdf256(AlgorithmStaticStatic): 

607 # FIXME: This class uses the Edwards keys as private and public keys, and 

608 # not the converted ones. This will be problematic if pairwise-only 

609 # contexts are to be set up. 

610 

611 value = -27 

612 

613 # FIXME these two will be different when using the Montgomery keys directly 

614 

615 # This one will only be used when establishing and distributing pairwise-only keys 

616 public_from_private = Ed25519.public_from_private 

617 

618 def staticstatic(self, private_key, public_key): 

619 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes( 

620 private_key 

621 ) 

622 private_key = cryptography_additions.sk_to_curve25519(private_key) 

623 

624 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key) 

625 public_key = cryptography_additions.pk_to_curve25519(public_key) 

626 

627 return private_key.exchange(public_key) 

628 

629 

630class ECDSA_SHA256_P256(AlgorithmCountersign, AlgorithmStaticStatic): 

631 # Trying a new construction approach -- should work just as well given 

632 # we're just passing Python objects around 

633 def from_public_parts(self, x: bytes, y: bytes): 

634 """Create a public key from its COSE values""" 

635 return asymmetric.ec.EllipticCurvePublicNumbers( 

636 int.from_bytes(x, "big"), 

637 int.from_bytes(y, "big"), 

638 asymmetric.ec.SECP256R1(), 

639 ).public_key() 

640 

641 def from_kccs(self, ccs: bytes) -> Any: 

642 cose_key = _from_kccs_common(ccs) 

643 

644 if ( 

645 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_EC2 

646 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value 

647 and COSE_KEY_EC2_X in cose_key 

648 and COSE_KEY_EC2_Y in cose_key 

649 ): 

650 return self.from_public_parts( 

651 x=cose_key[COSE_KEY_EC2_X], 

652 y=cose_key[COSE_KEY_EC2_Y], 

653 ) 

654 else: 

655 raise ValueError("Key type not recognized from CCS key %r" % cose_key) 

656 

657 def from_private_parts(self, x: bytes, y: bytes, d: bytes): 

658 public_numbers = self.from_public_parts(x, y).public_numbers() 

659 private_numbers = asymmetric.ec.EllipticCurvePrivateNumbers( 

660 int.from_bytes(d, "big"), public_numbers 

661 ) 

662 return private_numbers.private_key() 

663 

664 def sign(self, body, aad, private_key): 

665 der_signature = private_key.sign( 

666 self._build_countersign_structure(body, aad), 

667 asymmetric.ec.ECDSA(hashes.SHA256()), 

668 ) 

669 (r, s) = decode_dss_signature(der_signature) 

670 

671 return r.to_bytes(32, "big") + s.to_bytes(32, "big") 

672 

673 def verify(self, signature, body, aad, public_key): 

674 r = signature[:32] 

675 s = signature[32:] 

676 r = int.from_bytes(r, "big") 

677 s = int.from_bytes(s, "big") 

678 der_signature = encode_dss_signature(r, s) 

679 try: 

680 public_key.verify( 

681 der_signature, 

682 self._build_countersign_structure(body, aad), 

683 asymmetric.ec.ECDSA(hashes.SHA256()), 

684 ) 

685 except cryptography.exceptions.InvalidSignature: 

686 raise ProtectionInvalid("Signature mismatch") 

687 

688 def _generate(self): 

689 return asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) 

690 

691 def generate_with_ccs(self) -> Tuple[Any, bytes]: 

692 private = self._generate() 

693 public = self.public_from_private(private) 

694 # FIXME: Deduplicate with edhoc.py 

695 x = public.public_numbers().x.to_bytes(32, "big") 

696 y = public.public_numbers().y.to_bytes(32, "big") 

697 

698 ccs = cbor.dumps( 

699 { 

700 CWT_CLAIM_CNF: { 

701 CWT_CNF_COSE_KEY: { 

702 COSE_KEY_COMMON_KTY: COSE_KTY_EC2, 

703 COSE_KEY_COMMON_ALG: self.value, 

704 COSE_KEY_EC2_X: x, 

705 COSE_KEY_EC2_Y: y, 

706 } 

707 } 

708 } 

709 ) 

710 

711 return (private, ccs) 

712 

713 def public_from_private(self, private_key): 

714 return private_key.public_key() 

715 

716 def staticstatic(self, private_key, public_key): 

717 return private_key.exchange(asymmetric.ec.ECDH(), public_key) 

718 

719 value = -7 # FIXME: when used as a static-static algorithm, does this become -27? see shepherd review. 

720 curve_number = 1 

721 

722 signature_length = 64 

723 

724 

725algorithms = { 

726 "AES-CCM-16-64-128": AES_CCM_16_64_128(), 

727 "AES-CCM-16-64-256": AES_CCM_16_64_256(), 

728 "AES-CCM-64-64-128": AES_CCM_64_64_128(), 

729 "AES-CCM-64-64-256": AES_CCM_64_64_256(), 

730 "AES-CCM-16-128-128": AES_CCM_16_128_128(), 

731 "AES-CCM-16-128-256": AES_CCM_16_128_256(), 

732 "AES-CCM-64-128-128": AES_CCM_64_128_128(), 

733 "AES-CCM-64-128-256": AES_CCM_64_128_256(), 

734 "ChaCha20/Poly1305": ChaCha20Poly1305(), 

735 "A128GCM": A128GCM(), 

736 "A192GCM": A192GCM(), 

737 "A256GCM": A256GCM(), 

738 "A128CBC": A128CBC(), 

739} 

740 

741# algorithms with full parameter set 

742algorithms_countersign = { 

743 # maybe needs a different name... 

744 "EdDSA on Ed25519": Ed25519(), 

745 "ECDSA w/ SHA-256 on P-256": ECDSA_SHA256_P256(), 

746} 

747 

748algorithms_staticstatic = { 

749 "ECDH-SS + HKDF-256": EcdhSsHkdf256(), 

750} 

751 

752DEFAULT_ALGORITHM = "AES-CCM-16-64-128" 

753 

754_hash_backend = cryptography.hazmat.backends.default_backend() 

755hashfunctions = { 

756 "sha256": hashes.SHA256(), 

757 "sha384": hashes.SHA384(), 

758 "sha512": hashes.SHA512(), 

759} 

760 

761DEFAULT_HASHFUNCTION = "sha256" 

762 

763DEFAULT_WINDOWSIZE = 32 

764 

765 

766class BaseSecurityContext: 

767 # Deprecated marker for whether the class uses the 

768 # ContextWhereExternalAadIsGroup mixin; see documentation there. 

769 external_aad_is_group = False 

770 

771 # Authentication information carried with this security context; managed 

772 # externally by whatever creates the security context. 

773 authenticated_claims: List[str] = [] 

774 

775 #: AEAD algorithm. This may be None if it is not set in an OSCORE group context. 

776 alg_aead: Optional[AeadAlgorithm] 

777 

778 #: The common IV of the common context. 

779 #: 

780 #: This may be longer than needed for constructing IVs with any particular 

781 #: algorithm, as per <https://www.ietf.org/archive/id/draft-ietf-core-oscore-groupcomm-23.html#section-2.1.4> 

782 common_iv: bytes 

783 

784 id_context: Optional[bytes] 

785 

786 @property 

787 def algorithm(self): 

788 warnings.warn( 

789 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2 

790 ) 

791 return self.alg_aead 

792 

793 @algorithm.setter 

794 def algorithm(self, value): 

795 warnings.warn( 

796 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2 

797 ) 

798 self.alg_aead = value 

799 

800 hashfun: hashes.HashAlgorithm 

801 

802 def _construct_nonce( 

803 self, partial_iv_short, piv_generator_id, alg: SymmetricEncryptionAlgorithm 

804 ): 

805 pad_piv = b"\0" * (5 - len(partial_iv_short)) 

806 

807 s = bytes([len(piv_generator_id)]) 

808 pad_id = b"\0" * (alg.iv_bytes - 6 - len(piv_generator_id)) 

809 

810 components = s + pad_id + piv_generator_id + pad_piv + partial_iv_short 

811 

812 used_common_iv = self.common_iv[: len(components)] 

813 nonce = _xor_bytes(used_common_iv, components) 

814 _alglog.debug( 

815 "Nonce construction: common %s ^ components %s = %s", 

816 self.common_iv.hex(), 

817 components.hex(), 

818 nonce.hex(), 

819 ) 

820 

821 return nonce 

822 

823 def _extract_external_aad( 

824 self, message, request_id, local_is_sender: bool 

825 ) -> bytes: 

826 """Build the serialized external AAD from information in the message 

827 and the request_id. 

828 

829 Information about whether the local context is the sender of the 

830 message is only relevant to group contexts, where it influences whose 

831 authentication credentials are placed in the AAD. 

832 """ 

833 # If any option were actually Class I, it would be something like 

834 # 

835 # the_options = pick some of(message) 

836 # class_i_options = Message(the_options).opt.encode() 

837 

838 oscore_version = 1 

839 class_i_options = b"" 

840 if request_id.request_hash is not None: 

841 class_i_options = Message(request_hash=request_id.request_hash).opt.encode() 

842 

843 algorithms: List[int | str | None] = [ 

844 None if self.alg_aead is None else self.alg_aead.value 

845 ] 

846 if isinstance(self, ContextWhereExternalAadIsGroup): 

847 algorithms.append( 

848 None if self.alg_group_enc is None else self.alg_group_enc.value 

849 ) 

850 algorithms.append( 

851 None if self.alg_signature is None else self.alg_signature.value 

852 ) 

853 algorithms.append( 

854 None 

855 if self.alg_pairwise_key_agreement is None 

856 else self.alg_pairwise_key_agreement.value 

857 ) 

858 

859 external_aad = [ 

860 oscore_version, 

861 algorithms, 

862 request_id.kid, 

863 request_id.partial_iv, 

864 class_i_options, 

865 ] 

866 

867 if isinstance(self, ContextWhereExternalAadIsGroup): 

868 # FIXME: We may need to carry this over in the request_id when 

869 # observation span group rekeyings 

870 external_aad.append(self.id_context) 

871 

872 assert message.opt.oscore is not None, "Double OSCORE" 

873 external_aad.append(message.opt.oscore) 

874 

875 if local_is_sender: 

876 external_aad.append(self.sender_auth_cred) 

877 else: 

878 external_aad.append(self.recipient_auth_cred) 

879 external_aad.append(self.group_manager_cred) 

880 

881 return cbor.dumps(external_aad) 

882 

883 

884class ContextWhereExternalAadIsGroup(BaseSecurityContext): 

885 """The protection and unprotection functions will use the Group OSCORE AADs 

886 rather than the regular OSCORE AADs iff a context uses this mixin. (Ie. 

887 alg_group_enc etc are added to the algorithms, and request_kid_context, 

888 OSCORE_option, sender_auth_cred and gm_cred are added). 

889 

890 This does not necessarily match the is_signing property (as pairwise 

891 contexts use this but don't sign), and is distinct from the added OSCORE 

892 option in the AAD (as that's only applicable for the external AAD as 

893 extracted for signing and signature verification purposes).""" 

894 

895 id_context: bytes 

896 

897 external_aad_is_group = True 

898 

899 alg_group_enc: Optional[SymmetricEncryptionAlgorithm] 

900 alg_signature: Optional[AlgorithmCountersign] 

901 # This is also of type AlgorithmCountersign because the staticstatic 

902 # function is sitting on the same type. 

903 alg_pairwise_key_agreement: Optional[AlgorithmCountersign] 

904 

905 sender_auth_cred: bytes 

906 recipient_auth_cred: bytes 

907 group_manager_cred: bytes 

908 

909 

910# FIXME pull interface components from SecurityContext up here 

911class CanProtect(BaseSecurityContext, metaclass=abc.ABCMeta): 

912 # The protection function will add a signature according to the context's 

913 # alg_signature attribute if this is true 

914 is_signing = False 

915 

916 # Send the KID when protecting responses 

917 # 

918 # Once group pairwise mode is implemented, this will need to become a 

919 # parameter to protect(), which is stored at the point where the incoming 

920 # context is turned into an outgoing context. (Currently, such a mechanism 

921 # isn't there yet, and oscore_wrapper protects responses with the very same 

922 # context they came in on). 

923 responses_send_kid = False 

924 

925 #: The KID sent by this party when sending requests, or answering to group 

926 #: requests. 

927 sender_id: bytes 

928 

929 @staticmethod 

930 def _compress(protected, unprotected, ciphertext): 

931 """Pack the untagged COSE_Encrypt0 object described by the *args 

932 into two bytestrings suitable for the Object-Security option and the 

933 message body""" 

934 

935 if protected: 

936 raise RuntimeError( 

937 "Protection produced a message that has uncompressable fields." 

938 ) 

939 

940 piv = unprotected.pop(COSE_PIV, b"") 

941 if len(piv) > COMPRESSION_BITS_N: 

942 raise ValueError("Can't encode overly long partial IV") 

943 

944 firstbyte = len(piv) 

945 if COSE_KID in unprotected: 

946 firstbyte |= COMPRESSION_BIT_K 

947 kid_data = unprotected.pop(COSE_KID) 

948 else: 

949 kid_data = b"" 

950 

951 if COSE_KID_CONTEXT in unprotected: 

952 firstbyte |= COMPRESSION_BIT_H 

953 kid_context = unprotected.pop(COSE_KID_CONTEXT) 

954 s = len(kid_context) 

955 if s > 255: 

956 raise ValueError("KID Context too long") 

957 s_kid_context = bytes((s,)) + kid_context 

958 else: 

959 s_kid_context = b"" 

960 

961 if COSE_COUNTERSIGNATURE0 in unprotected: 

962 firstbyte |= COMPRESSION_BIT_GROUP 

963 

964 unprotected.pop(COSE_COUNTERSIGNATURE0) 

965 

966 # ciphertext will eventually also get the countersignature, but 

967 # that happens later when the option is already processed. 

968 

969 if unprotected: 

970 raise RuntimeError( 

971 "Protection produced a message that has uncompressable fields." 

972 ) 

973 

974 if firstbyte: 

975 option = bytes([firstbyte]) + piv + s_kid_context + kid_data 

976 else: 

977 option = b"" 

978 

979 return (option, ciphertext) 

980 

981 def protect(self, message, request_id=None, *, kid_context=True): 

982 """Given a plain CoAP message, create a protected message that contains 

983 message's options in the inner or outer CoAP message as described in 

984 OSCOAP. 

985 

986 If the message is a response to a previous message, the additional data 

987 from unprotecting the request are passed in as request_id. When 

988 request data is present, its partial IV is reused if possible. The 

989 security context's ID context is encoded in the resulting message 

990 unless kid_context is explicitly set to a False; other values for the 

991 kid_context can be passed in as byte string in the same parameter. 

992 """ 

993 

994 _alglog.debug( 

995 "Protecting message %s with context %s and request ID %s", 

996 message, 

997 self, 

998 request_id, 

999 ) 

1000 

1001 assert (request_id is None) == message.code.is_request(), ( 

1002 "Requestishness of code to protect does not match presence of request ID" 

1003 ) 

1004 

1005 assert message.direction is Direction.OUTGOING 

1006 

1007 outer_message, plaintext = self._split_message(message, request_id) 

1008 

1009 outer_message.direction = Direction.OUTGOING 

1010 # There are currently no properties that relate to OSCORE that'd need to be discarded. 

1011 outer_message.transport_tuning = message.transport_tuning 

1012 

1013 protected = {} 

1014 nonce = None 

1015 partial_iv_generated_by = None 

1016 unprotected = {} 

1017 if request_id is not None: 

1018 partial_iv_generated_by, partial_iv_short = ( 

1019 request_id.get_reusable_kid_and_piv() 

1020 ) 

1021 

1022 alg_symmetric = self.alg_group_enc if self.is_signing else self.alg_aead 

1023 assert isinstance(alg_symmetric, AeadAlgorithm) or self.is_signing, ( 

1024 "Non-AEAD algorithms can only be used in signing modes." 

1025 ) 

1026 

1027 if partial_iv_generated_by is None: 

1028 nonce, partial_iv_short = self._build_new_nonce(alg_symmetric) 

1029 partial_iv_generated_by = self.sender_id 

1030 

1031 unprotected[COSE_PIV] = partial_iv_short 

1032 else: 

1033 nonce = self._construct_nonce( 

1034 partial_iv_short, partial_iv_generated_by, alg_symmetric 

1035 ) 

1036 

1037 if message.code.is_request(): 

1038 unprotected[COSE_KID] = self.sender_id 

1039 

1040 request_id = RequestIdentifiers( 

1041 self.sender_id, 

1042 partial_iv_short, 

1043 can_reuse_nonce=None, 

1044 request_code=outer_message.code, 

1045 ) 

1046 

1047 if kid_context is True: 

1048 if self.id_context is not None: 

1049 unprotected[COSE_KID_CONTEXT] = self.id_context 

1050 elif kid_context is not False: 

1051 unprotected[COSE_KID_CONTEXT] = kid_context 

1052 else: 

1053 if self.responses_send_kid: 

1054 unprotected[COSE_KID] = self.sender_id 

1055 

1056 # Putting in a dummy value as the signature calculation will already need some of the compression result 

1057 if self.is_signing: 

1058 unprotected[COSE_COUNTERSIGNATURE0] = b"" 

1059 # FIXME: Running this twice quite needlessly (just to get the oscore option for sending) 

1060 option_data, _ = self._compress(protected, unprotected, b"") 

1061 

1062 outer_message.opt.oscore = option_data 

1063 

1064 external_aad = self._extract_external_aad( 

1065 outer_message, request_id, local_is_sender=True 

1066 ) 

1067 

1068 aad = SymmetricEncryptionAlgorithm._build_encrypt0_structure( 

1069 protected, external_aad 

1070 ) 

1071 

1072 key = self._get_sender_key(outer_message, external_aad, plaintext, request_id) 

1073 

1074 _alglog.debug("Encrypting Encrypt0:") 

1075 _alglog.debug("* aad = %s", aad.hex()) 

1076 _alglog.debug("* nonce = %s", nonce.hex()) 

1077 _alglog.debug("* key = %s", log_secret(key.hex())) 

1078 _alglog.debug("* algorithm = %s", alg_symmetric) 

1079 ciphertext = alg_symmetric.encrypt(plaintext, aad, key, nonce) 

1080 

1081 _alglog.debug("Produced ciphertext %s", ciphertext.hex()) 

1082 

1083 _, payload = self._compress(protected, unprotected, ciphertext) 

1084 

1085 if self.is_signing: 

1086 signature = self.alg_signature.sign(payload, external_aad, self.private_key) 

1087 # This is bordering "it's OK to log it in plain", because a reader 

1088 # of the log can access both the plaintext and the ciphertext, but 

1089 # still, it is called a key. 

1090 _alglog.debug( 

1091 "Producing keystream from signature encryption key: %s", 

1092 log_secret(self.signature_encryption_key.hex()), 

1093 ) 

1094 keystream = self._kdf_for_keystreams( 

1095 partial_iv_generated_by, 

1096 partial_iv_short, 

1097 self.signature_encryption_key, 

1098 self.sender_id, 

1099 INFO_TYPE_KEYSTREAM_REQUEST 

1100 if message.code.is_request() 

1101 else INFO_TYPE_KEYSTREAM_RESPONSE, 

1102 ) 

1103 _alglog.debug("Keystream is %s", keystream.hex()) 

1104 encrypted_signature = _xor_bytes(signature, keystream) 

1105 _alglog.debug("Encrypted signature %s", encrypted_signature.hex()) 

1106 payload += encrypted_signature 

1107 outer_message.payload = payload 

1108 

1109 # FIXME go through options section 

1110 

1111 _alglog.debug( 

1112 "Protecting the message succeeded, yielding ciphertext %s and request ID %s", 

1113 outer_message, 

1114 request_id, 

1115 ) 

1116 # the request_id in the second argument should be discarded by the 

1117 # caller when protecting a response -- is that reason enough for an 

1118 # `if` and returning None? 

1119 return outer_message, request_id 

1120 

1121 def _get_sender_key(self, outer_message, aad, plaintext, request_id): 

1122 """Customization hook of the protect function 

1123 

1124 While most security contexts have a fixed sender key, deterministic 

1125 requests need to shake up a few things. They need to modify the outer 

1126 message, as well as the request_id as it will later be used to 

1127 unprotect the response.""" 

1128 return self.sender_key 

1129 

1130 def _split_message(self, message, request_id): 

1131 """Given a protected message, return the outer message that contains 

1132 all Class I and Class U options (but without payload or Object-Security 

1133 option), and the encoded inner message that contains all Class E 

1134 options and the payload. 

1135 

1136 This leaves the messages' remotes unset.""" 

1137 

1138 if message.code.is_request(): 

1139 outer_host = message.opt.uri_host 

1140 proxy_uri = message.opt.proxy_uri 

1141 

1142 inner_message = message.copy( 

1143 uri_host=None, 

1144 uri_port=None, 

1145 proxy_uri=None, 

1146 proxy_scheme=None, 

1147 ) 

1148 inner_message.remote = None 

1149 

1150 if proxy_uri is not None: 

1151 # Use set_request_uri to split up the proxy URI into its 

1152 # components; extract, preserve and clear them. 

1153 inner_message.set_request_uri(proxy_uri, set_uri_host=False) 

1154 if inner_message.opt.proxy_uri is not None: 

1155 raise ValueError("Can not split Proxy-URI into options") 

1156 outer_uri = inner_message.remote.uri_base 

1157 inner_message.remote = None 

1158 inner_message.opt.proxy_scheme = None 

1159 

1160 if message.opt.observe is None: 

1161 outer_code = POST 

1162 else: 

1163 outer_code = FETCH 

1164 else: 

1165 outer_host = None 

1166 proxy_uri = None 

1167 

1168 inner_message = message.copy() 

1169 

1170 outer_code = request_id.code_style.response 

1171 

1172 # no max-age because these are always successful responses 

1173 outer_message = Message( 

1174 code=outer_code, 

1175 uri_host=outer_host, 

1176 observe=None if message.code.is_response() else message.opt.observe, 

1177 ) 

1178 if proxy_uri is not None: 

1179 outer_message.set_request_uri(outer_uri) 

1180 

1181 plaintext = bytes([inner_message.code]) + inner_message.opt.encode() 

1182 if inner_message.payload: 

1183 plaintext += bytes([0xFF]) 

1184 plaintext += inner_message.payload 

1185 

1186 return outer_message, plaintext 

1187 

1188 def _build_new_nonce(self, alg: SymmetricEncryptionAlgorithm): 

1189 """This implements generation of a new nonce, assembled as per Figure 5 

1190 of draft-ietf-core-object-security-06. Returns the shortened partial IV 

1191 as well.""" 

1192 seqno = self.new_sequence_number() 

1193 

1194 partial_iv = seqno.to_bytes(5, "big") 

1195 

1196 return ( 

1197 self._construct_nonce(partial_iv, self.sender_id, alg), 

1198 partial_iv.lstrip(b"\0") or b"\0", 

1199 ) 

1200 

1201 # sequence number handling 

1202 

1203 def new_sequence_number(self): 

1204 """Return a new sequence number; the implementation is responsible for 

1205 never returning the same value twice in a given security context. 

1206 

1207 May raise ContextUnavailable.""" 

1208 retval = self.sender_sequence_number 

1209 if retval >= MAX_SEQNO: 

1210 raise ContextUnavailable("Sequence number too large, context is exhausted.") 

1211 self.sender_sequence_number += 1 

1212 self.post_seqnoincrease() 

1213 return retval 

1214 

1215 # implementation defined 

1216 

1217 @abc.abstractmethod 

1218 def post_seqnoincrease(self): 

1219 """Ensure that sender_sequence_number is stored""" 

1220 raise 

1221 

1222 def context_from_response(self, unprotected_bag) -> CanUnprotect: 

1223 """When receiving a response to a request protected with this security 

1224 context, pick the security context with which to unprotect the response 

1225 given the unprotected information from the Object-Security option. 

1226 

1227 This allow picking the right security context in a group response, and 

1228 helps getting a new short-lived context for B.2 mode. The default 

1229 behavior is returning self. 

1230 """ 

1231 

1232 # FIXME justify by moving into a mixin for CanProtectAndUnprotect 

1233 return self # type: ignore 

1234 

1235 

1236class CanUnprotect(BaseSecurityContext): 

1237 recipient_key: bytes 

1238 

1239 def unprotect(self, protected_message, request_id=None): 

1240 _alglog.debug( 

1241 "Unprotecting message %s with context %s and request ID %s", 

1242 protected_message, 

1243 self, 

1244 request_id, 

1245 ) 

1246 

1247 assert (request_id is not None) == protected_message.code.is_response(), ( 

1248 "Requestishness of code to unprotect does not match presence of request ID" 

1249 ) 

1250 is_response = protected_message.code.is_response() 

1251 

1252 assert protected_message.direction is Direction.INCOMING 

1253 

1254 # Set to a raisable exception on replay check failures; it will be 

1255 # raised, but the package may still be processed in the course of Echo handling. 

1256 replay_error = None 

1257 

1258 protected_serialized, protected, unprotected, ciphertext = ( 

1259 self._extract_encrypted0(protected_message) 

1260 ) 

1261 

1262 if protected: 

1263 raise ProtectionInvalid("The protected field is not empty") 

1264 

1265 # FIXME check for duplicate keys in protected 

1266 

1267 if unprotected.pop(COSE_KID_CONTEXT, self.id_context) != self.id_context: 

1268 # FIXME is this necessary? 

1269 raise ProtectionInvalid("Sender ID context does not match") 

1270 

1271 if unprotected.pop(COSE_KID, self.recipient_id) != self.recipient_id: 

1272 # for most cases, this is caught by the session ID dispatch, but in 

1273 # responses (where explicit sender IDs are atypical), this is a 

1274 # valid check 

1275 raise ProtectionInvalid("Sender ID does not match") 

1276 

1277 if COSE_PIV not in unprotected: 

1278 if not is_response: 

1279 raise ProtectionInvalid("No sequence number provided in request") 

1280 

1281 seqno = None # sentinel for not striking out anything 

1282 partial_iv_short = request_id.partial_iv 

1283 partial_iv_generated_by = request_id.kid 

1284 else: 

1285 partial_iv_short = unprotected.pop(COSE_PIV) 

1286 partial_iv_generated_by = self.recipient_id 

1287 

1288 seqno = int.from_bytes(partial_iv_short, "big") 

1289 

1290 if not is_response: 

1291 if not self.recipient_replay_window.is_initialized(): 

1292 replay_error = ReplayError("Sequence number check unavailable") 

1293 elif not self.recipient_replay_window.is_valid(seqno): 

1294 replay_error = ReplayError("Sequence number was reused") 

1295 

1296 if replay_error is not None and self.echo_recovery is None: 

1297 # Don't even try decoding if there is no reason to 

1298 raise replay_error 

1299 

1300 request_id = RequestIdentifiers( 

1301 partial_iv_generated_by, 

1302 partial_iv_short, 

1303 can_reuse_nonce=replay_error is None, 

1304 request_code=protected_message.code, 

1305 ) 

1306 

1307 external_aad = self._extract_external_aad( 

1308 protected_message, request_id, local_is_sender=False 

1309 ) 

1310 

1311 if unprotected.pop(COSE_COUNTERSIGNATURE0, None) is not None: 

1312 try: 

1313 alg_signature = self.alg_signature 

1314 except NameError: 

1315 raise DecodeError( 

1316 "Group messages can not be decoded with this non-group context" 

1317 ) 

1318 

1319 siglen = alg_signature.signature_length 

1320 if len(ciphertext) < siglen: 

1321 raise DecodeError("Message too short for signature") 

1322 encrypted_signature = ciphertext[-siglen:] 

1323 

1324 _alglog.debug( 

1325 "Producing keystream from signature encryption key: %s", 

1326 log_secret(self.signature_encryption_key.hex()), 

1327 ) 

1328 keystream = self._kdf_for_keystreams( 

1329 partial_iv_generated_by, 

1330 partial_iv_short, 

1331 self.signature_encryption_key, 

1332 self.recipient_id, 

1333 INFO_TYPE_KEYSTREAM_REQUEST 

1334 if protected_message.code.is_request() 

1335 else INFO_TYPE_KEYSTREAM_RESPONSE, 

1336 ) 

1337 _alglog.debug("Encrypted signature %s", encrypted_signature.hex()) 

1338 _alglog.debug("Keystream is %s", keystream.hex()) 

1339 signature = _xor_bytes(encrypted_signature, keystream) 

1340 

1341 ciphertext = ciphertext[:-siglen] 

1342 

1343 alg_signature.verify( 

1344 signature, ciphertext, external_aad, self.recipient_public_key 

1345 ) 

1346 

1347 alg_symmetric = self.alg_group_enc 

1348 else: 

1349 alg_symmetric = self.alg_aead 

1350 

1351 if unprotected: 

1352 raise DecodeError("Unsupported unprotected option") 

1353 

1354 if ( 

1355 len(ciphertext) < self.alg_aead.tag_bytes + 1 

1356 ): # +1 assures access to plaintext[0] (the code) 

1357 raise ProtectionInvalid("Ciphertext too short") 

1358 

1359 enc_structure = ["Encrypt0", protected_serialized, external_aad] 

1360 aad = cbor.dumps(enc_structure) 

1361 

1362 key = self._get_recipient_key(protected_message, alg_symmetric) 

1363 

1364 nonce = self._construct_nonce( 

1365 partial_iv_short, partial_iv_generated_by, alg_symmetric 

1366 ) 

1367 

1368 _alglog.debug("Decrypting Encrypt0:") 

1369 _alglog.debug("* ciphertext = %s", ciphertext.hex()) 

1370 _alglog.debug("* aad = %s", aad.hex()) 

1371 _alglog.debug("* nonce = %s", nonce.hex()) 

1372 _alglog.debug("* key = %s", log_secret(key.hex())) 

1373 _alglog.debug("* algorithm = %s", alg_symmetric) 

1374 try: 

1375 plaintext = alg_symmetric.decrypt(ciphertext, aad, key, nonce) 

1376 except Exception as e: 

1377 _alglog.debug("Unprotecting failed") 

1378 raise e 

1379 

1380 self._post_decrypt_checks( 

1381 external_aad, plaintext, protected_message, request_id 

1382 ) 

1383 

1384 if not is_response and seqno is not None and replay_error is None: 

1385 self.recipient_replay_window.strike_out(seqno) 

1386 

1387 # FIXME add options from unprotected 

1388 

1389 unprotected_message = Message(code=plaintext[0]) 

1390 unprotected_message.payload = unprotected_message.opt.decode(plaintext[1:]) 

1391 unprotected_message.direction = Direction.INCOMING 

1392 

1393 try_initialize = ( 

1394 not self.recipient_replay_window.is_initialized() 

1395 and self.echo_recovery is not None 

1396 ) 

1397 if try_initialize: 

1398 if protected_message.code.is_request(): 

1399 # Either accept into replay window and clear replay error, or raise 

1400 # something that can turn into a 4.01,Echo response 

1401 if unprotected_message.opt.echo == self.echo_recovery: 

1402 self.recipient_replay_window.initialize_from_freshlyseen(seqno) 

1403 replay_error = None 

1404 else: 

1405 raise ReplayErrorWithEcho( 

1406 secctx=self, request_id=request_id, echo=self.echo_recovery 

1407 ) 

1408 else: 

1409 # We can initialize the replay window from a response as well. 

1410 # The response is guaranteed fresh as it was AEAD-decoded to 

1411 # match a request sent by this process. 

1412 # 

1413 # This is rare, as it only works when the server uses an own 

1414 # sequence number, eg. when sending a notification or when 

1415 # acting again on a retransmitted safe request whose response 

1416 # it did not cache. 

1417 # 

1418 # Nothing bad happens if we can't make progress -- we just 

1419 # don't initialize the replay window that wouldn't have been 

1420 # checked for a response anyway. 

1421 if seqno is not None: 

1422 self.recipient_replay_window.initialize_from_freshlyseen(seqno) 

1423 

1424 if replay_error is not None: 

1425 raise replay_error 

1426 

1427 if unprotected_message.code.is_request(): 

1428 if protected_message.opt.observe != 0: 

1429 unprotected_message.opt.observe = None 

1430 else: 

1431 if protected_message.opt.observe is not None: 

1432 # -1 ensures that they sort correctly in later reordering 

1433 # detection. Note that neither -1 nor high (>3 byte) sequence 

1434 # numbers can be serialized in the Observe option, but they are 

1435 # in this implementation accepted for passing around. 

1436 unprotected_message.opt.observe = -1 if seqno is None else seqno 

1437 

1438 _alglog.debug( 

1439 "Unprotecting succeeded, yielding plaintext %s and request_id %s", 

1440 unprotected_message, 

1441 request_id, 

1442 ) 

1443 return unprotected_message, request_id 

1444 

1445 def _get_recipient_key( 

1446 self, protected_message, algorithm: SymmetricEncryptionAlgorithm 

1447 ): 

1448 """Customization hook of the unprotect function 

1449 

1450 While most security contexts have a fixed recipient key, group contexts 

1451 have multiple, and deterministic requests build it on demand.""" 

1452 return self.recipient_key 

1453 

1454 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id): 

1455 """Customization hook of the unprotect function after decryption 

1456 

1457 While most security contexts are good with the default checks, 

1458 deterministic requests need to perform additional checks while AAD and 

1459 plaintext information is still available, and modify the request_id for 

1460 the later protection step of the response.""" 

1461 

1462 @staticmethod 

1463 def _uncompress(option_data, payload): 

1464 if option_data == b"": 

1465 firstbyte = 0 

1466 else: 

1467 firstbyte = option_data[0] 

1468 tail = option_data[1:] 

1469 

1470 unprotected = {} 

1471 

1472 if firstbyte & COMPRESSION_BITS_RESERVED: 

1473 raise DecodeError("Protected data uses reserved fields") 

1474 

1475 pivsz = firstbyte & COMPRESSION_BITS_N 

1476 if pivsz: 

1477 if len(tail) < pivsz: 

1478 raise DecodeError("Partial IV announced but not present") 

1479 unprotected[COSE_PIV] = tail[:pivsz] 

1480 tail = tail[pivsz:] 

1481 

1482 if firstbyte & COMPRESSION_BIT_H: 

1483 # kid context hint 

1484 s = tail[0] 

1485 if len(tail) - 1 < s: 

1486 raise DecodeError("Context hint announced but not present") 

1487 tail = tail[1:] 

1488 unprotected[COSE_KID_CONTEXT] = tail[:s] 

1489 tail = tail[s:] 

1490 

1491 if firstbyte & COMPRESSION_BIT_K: 

1492 kid = tail 

1493 unprotected[COSE_KID] = kid 

1494 

1495 if firstbyte & COMPRESSION_BIT_GROUP: 

1496 # Not really; As this is (also) used early on (before the KID 

1497 # context is even known, because it's just getting extracted), this 

1498 # is returning an incomplete value here and leaves it to the later 

1499 # processing to strip the right number of bytes from the ciphertext 

1500 unprotected[COSE_COUNTERSIGNATURE0] = PRESENT_BUT_NO_VALUE_YET 

1501 

1502 return b"", {}, unprotected, payload 

1503 

1504 @classmethod 

1505 def _extract_encrypted0(cls, message): 

1506 if message.opt.oscore is None: 

1507 raise NotAProtectedMessage("No Object-Security option present", message) 

1508 

1509 protected_serialized, protected, unprotected, ciphertext = cls._uncompress( 

1510 message.opt.oscore, message.payload 

1511 ) 

1512 return protected_serialized, protected, unprotected, ciphertext 

1513 

1514 # implementation defined 

1515 

1516 def context_for_response(self) -> CanProtect: 

1517 """After processing a request with this context, with which security 

1518 context should an outgoing response be protected? By default, it's the 

1519 same context.""" 

1520 # FIXME: Is there any way in which the handler may want to influence 

1521 # the decision taken here? Or would, then, the handler just call a more 

1522 # elaborate but similar function when setting the response's remote 

1523 # already? 

1524 

1525 # FIXME justify by moving into a mixin for CanProtectAndUnprotect 

1526 return self # type: ignore 

1527 

1528 

1529class SecurityContextUtils(BaseSecurityContext): 

1530 def _kdf( 

1531 self, 

1532 salt, 

1533 ikm, 

1534 role_id, 

1535 out_type, 

1536 key_alg: Optional[SymmetricEncryptionAlgorithm] = None, 

1537 ): 

1538 """The HKDF as used to derive sender and recipient key and IV in 

1539 RFC8613 Section 3.2.1, and analogously the Group Encryption Key of oscore-groupcomm. 

1540 """ 

1541 

1542 _alglog.debug("Deriving through KDF:") 

1543 _alglog.debug("* salt = %s", salt.hex() if salt else salt) 

1544 _alglog.debug("* ikm = %s", log_secret(ikm.hex())) 

1545 _alglog.debug("* role_id = %s", role_id.hex()) 

1546 _alglog.debug("* out_type = %r", out_type) 

1547 _alglog.debug("* key_alg = %r", key_alg) 

1548 

1549 # The field in info is called `alg_aead` defined in RFC8613, but in 

1550 # group OSCORE something that's very clearly *not* alg_aead is put in 

1551 # there. 

1552 # 

1553 # The rules about this come both from 

1554 # https://www.ietf.org/archive/id/draft-ietf-core-oscore-groupcomm-23.html#section-2.3 

1555 # and 

1556 # https://www.ietf.org/archive/id/draft-ietf-core-oscore-groupcomm-23.html#section-2.1.9 

1557 # but they produce the same outcome. 

1558 if hasattr(self, "alg_group_enc") and self.alg_group_enc is not None: 

1559 the_field_called_alg_aead = self.alg_group_enc.value 

1560 else: 

1561 assert self.alg_aead is not None, ( 

1562 "At least alg_aead or alg_group_enc needs to be set on a context." 

1563 ) 

1564 the_field_called_alg_aead = self.alg_aead.value 

1565 

1566 assert (key_alg is None) ^ (out_type == "Key") 

1567 if out_type == "Key": 

1568 # Duplicate assertion needed while mypy can not see that the assert 

1569 # above the if is stricter than this. 

1570 assert key_alg is not None 

1571 out_bytes = key_alg.key_bytes 

1572 the_field_called_alg_aead = key_alg.value 

1573 elif out_type == "IV": 

1574 assert self.alg_aead is not None, ( 

1575 "At least alg_aead or alg_group_enc needs to be set on a context." 

1576 ) 

1577 out_bytes = max( 

1578 ( 

1579 a.iv_bytes 

1580 for a in [self.alg_aead, getattr(self, "alg_group_enc", None)] 

1581 if a is not None 

1582 ) 

1583 ) 

1584 elif out_type == "SEKey": 

1585 assert isinstance(self, GroupContext) and self.alg_group_enc is not None, ( 

1586 "SEKey derivation is only defined for group contexts with a group encryption algorithm." 

1587 ) 

1588 # "While the obtained Signature Encryption Key is never used with 

1589 # the Group Encryption Algorithm, its length was chosen to obtain a 

1590 # matching level of security." 

1591 out_bytes = self.alg_group_enc.key_bytes 

1592 else: 

1593 raise ValueError("Output type not recognized") 

1594 

1595 _alglog.debug("* the_field_called_alg_aead = %s", the_field_called_alg_aead) 

1596 

1597 info = [ 

1598 role_id, 

1599 self.id_context, 

1600 the_field_called_alg_aead, 

1601 out_type, 

1602 out_bytes, 

1603 ] 

1604 _alglog.debug("* info = %r", info) 

1605 ret = self._kdf_lowlevel(salt, ikm, info, out_bytes) 

1606 _alglog.debug("Derivation of %r produced %s", out_type, log_secret(ret.hex())) 

1607 return ret 

1608 

1609 def _kdf_for_keystreams(self, piv_generated_by, salt, ikm, role_id, out_type): 

1610 """The HKDF as used to derive the keystreams of oscore-groupcomm.""" 

1611 

1612 out_bytes = self.alg_signature.signature_length 

1613 

1614 assert out_type in ( 

1615 INFO_TYPE_KEYSTREAM_REQUEST, 

1616 INFO_TYPE_KEYSTREAM_RESPONSE, 

1617 ), "Output type not recognized" 

1618 

1619 info = [ 

1620 piv_generated_by, 

1621 self.id_context, 

1622 out_type, 

1623 out_bytes, 

1624 ] 

1625 return self._kdf_lowlevel(salt, ikm, info, out_bytes) 

1626 

1627 def _kdf_lowlevel(self, salt: bytes, ikm: bytes, info: list, l: int) -> bytes: # noqa: E741 (signature follows RFC definition) 

1628 """The HKDF function as used in RFC8613 and oscore-groupcomm (notated 

1629 there as ``something = HKDF(...)`` 

1630 

1631 Note that `info` typically contains `L` at some point. 

1632 

1633 When `info` takes the conventional structure of pid, id_context, 

1634 ald_aead, type, L], it may make sense to extend the `_kdf` function to 

1635 support that case, or `_kdf_for_keystreams` for a different structure, as 

1636 they are the more high-level tools.""" 

1637 hkdf = HKDF( 

1638 algorithm=self.hashfun, 

1639 length=l, 

1640 salt=salt, 

1641 info=cbor.dumps(info), 

1642 backend=_hash_backend, 

1643 ) 

1644 expanded = hkdf.derive(ikm) 

1645 return expanded 

1646 

1647 def derive_keys(self, master_salt, master_secret): 

1648 """Populate sender_key, recipient_key and common_iv from the algorithm, 

1649 hash function and id_context already configured beforehand, and from 

1650 the passed salt and secret.""" 

1651 

1652 self.sender_key = self._kdf( 

1653 master_salt, master_secret, self.sender_id, "Key", self.alg_aead 

1654 ) 

1655 self.recipient_key = self._kdf( 

1656 master_salt, master_secret, self.recipient_id, "Key", self.alg_aead 

1657 ) 

1658 

1659 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV") 

1660 

1661 # really more of the Credentials interface 

1662 

1663 def get_oscore_context_for(self, unprotected): 

1664 """Return a sutiable context (most easily self) for an incoming request 

1665 if its unprotected data (COSE_KID, COSE_KID_CONTEXT) fit its 

1666 description. If it doesn't match, it returns None. 

1667 

1668 The default implementation just strictly checks for whether kid and any 

1669 kid context match (not matching if a local KID context is set but none 

1670 is given in the request); modes like Group OSCORE can spin up aspect 

1671 objects here. 

1672 """ 

1673 if ( 

1674 unprotected.get(COSE_KID, None) == self.recipient_id 

1675 and unprotected.get(COSE_KID_CONTEXT, None) == self.id_context 

1676 ): 

1677 return self 

1678 

1679 

1680class ReplayWindow: 

1681 """A regular replay window of a fixed size. 

1682 

1683 It is implemented as an index and a bitfield (represented by an integer) 

1684 whose least significant bit represents the seqyence number of the index, 

1685 and a 1 indicates that a number was seen. No shenanigans around implicit 

1686 leading ones (think floating point normalization) happen. 

1687 

1688 >>> w = ReplayWindow(32, lambda: None) 

1689 >>> w.initialize_empty() 

1690 >>> w.strike_out(5) 

1691 >>> w.is_valid(3) 

1692 True 

1693 >>> w.is_valid(5) 

1694 False 

1695 >>> w.strike_out(0) 

1696 >>> w.strike_out(1) 

1697 >>> w.strike_out(2) 

1698 >>> w.is_valid(1) 

1699 False 

1700 

1701 Jumping ahead by the window size invalidates older numbers: 

1702 

1703 >>> w.is_valid(4) 

1704 True 

1705 >>> w.strike_out(35) 

1706 >>> w.is_valid(4) 

1707 True 

1708 >>> w.strike_out(36) 

1709 >>> w.is_valid(4) 

1710 False 

1711 

1712 Usage safety 

1713 ------------ 

1714 

1715 For every key, the replay window can only be initielized empty once. On 

1716 later uses, it needs to be persisted by storing the output of 

1717 self.persist() somewhere and loaded from that persisted data. 

1718 

1719 It is acceptable to store persistence data in the strike_out_callback, but 

1720 that must then ensure that the data is written (flushed to a file or 

1721 committed to a database), but that is usually inefficient. 

1722 

1723 Stability 

1724 --------- 

1725 

1726 This class is not considered for stabilization yet and an implementation 

1727 detail of the SecurityContext implementation(s). 

1728 """ 

1729 

1730 _index = None 

1731 """Sequence number represented by the least significant bit of _bitfield""" 

1732 _bitfield = None 

1733 """Integer interpreted as a bitfield, self._size wide. A digit 1 at any bit 

1734 indicates that the bit's index (its power of 2) plus self._index was 

1735 already seen.""" 

1736 

1737 def __init__(self, size, strike_out_callback): 

1738 self._size = size 

1739 self.strike_out_callback = strike_out_callback 

1740 

1741 def is_initialized(self): 

1742 return self._index is not None 

1743 

1744 def initialize_empty(self): 

1745 self._index = 0 

1746 self._bitfield = 0 

1747 

1748 def initialize_from_persisted(self, persisted): 

1749 self._index = persisted["index"] 

1750 self._bitfield = persisted["bitfield"] 

1751 

1752 def initialize_from_freshlyseen(self, seen): 

1753 """Initialize the replay window with a particular value that is just 

1754 being observed in a fresh (ie. generated by the peer later than any 

1755 messages processed before state was lost here) message. This marks the 

1756 seen sequence number and all preceding it as invalid, and and all later 

1757 ones as valid.""" 

1758 self._index = seen 

1759 self._bitfield = 1 

1760 

1761 def is_valid(self, number): 

1762 if number < self._index: 

1763 return False 

1764 if number >= self._index + self._size: 

1765 return True 

1766 return (self._bitfield >> (number - self._index)) & 1 == 0 

1767 

1768 def strike_out(self, number): 

1769 if not self.is_valid(number): 

1770 raise ValueError( 

1771 "Sequence number is not valid any more and " 

1772 "thus can't be removed from the window" 

1773 ) 

1774 overshoot = number - (self._index + self._size - 1) 

1775 if overshoot > 0: 

1776 self._index += overshoot 

1777 self._bitfield >>= overshoot 

1778 assert self.is_valid(number), "Sequence number was not valid before strike-out" 

1779 self._bitfield |= 1 << (number - self._index) 

1780 

1781 self.strike_out_callback() 

1782 

1783 def persist(self): 

1784 """Return a dict containing internal state which can be passed to init 

1785 to recreated the replay window.""" 

1786 

1787 return {"index": self._index, "bitfield": self._bitfield} 

1788 

1789 

1790class FilesystemSecurityContext( 

1791 CanProtect, CanUnprotect, SecurityContextUtils, credentials._Objectish 

1792): 

1793 """Security context stored in a directory as distinct files containing 

1794 containing 

1795 

1796 * Master secret, master salt, sender and recipient ID, 

1797 optionally algorithm, the KDF hash function, and replay window size 

1798 (settings.json and secrets.json, where the latter is typically readable 

1799 only for the user) 

1800 * sequence numbers and replay windows (sequence.json, the only file the 

1801 process needs write access to) 

1802 

1803 The static parameters can all either be placed in settings.json or 

1804 secrets.json, but must not be present in both; the presence of either file 

1805 is sufficient. 

1806 

1807 .. warning:: 

1808 

1809 Security contexts must never be copied around and used after another 

1810 copy was used. They should only ever be moved, and if they are copied 

1811 (eg. as a part of a system backup), restored contexts must not be used 

1812 again; they need to be replaced with freshly created ones. 

1813 

1814 An additional file named `lock` is created to prevent the accidental use of 

1815 a context by to concurrent programs. 

1816 

1817 Note that the sequence number file is updated in an atomic fashion which 

1818 requires file creation privileges in the directory. If privilege separation 

1819 between settings/key changes and sequence number changes is desired, one 

1820 way to achieve that on Linux is giving the aiocoap process's user group 

1821 write permissions on the directory and setting the sticky bit on the 

1822 directory, thus forbidding the user to remove the settings/secret files not 

1823 owned by him. 

1824 

1825 Writes due to sent sequence numbers are reduced by applying a variation on 

1826 the mechanism of RFC8613 Appendix B.1.1 (incrementing the persisted sender 

1827 sequence number in steps of `k`). That value is automatically grown from 

1828 sequence_number_chunksize_start up to sequence_number_chunksize_limit. 

1829 At runtime, the receive window is not stored but kept indeterminate. In 

1830 case of an abnormal shutdown, the server uses the mechanism described in 

1831 Appendix B.1.2 to recover. 

1832 """ 

1833 

1834 # possibly overridden in constructor 

1835 # 

1836 # Type is ignored because while it *is* AlgAead, mypy can't tell. 

1837 alg_aead = algorithms[DEFAULT_ALGORITHM] # type: ignore 

1838 

1839 class LoadError(ValueError): 

1840 """Exception raised with a descriptive message when trying to load a 

1841 faulty security context""" 

1842 

1843 def __init__( 

1844 self, 

1845 basedir: str, 

1846 sequence_number_chunksize_start=10, 

1847 sequence_number_chunksize_limit=10000, 

1848 ): 

1849 self.basedir = basedir 

1850 

1851 self.lockfile: Optional[filelock.FileLock] = filelock.FileLock( 

1852 os.path.join(basedir, "lock") 

1853 ) 

1854 # 0.001: Just fail if it can't be acquired 

1855 # See https://github.com/benediktschmitt/py-filelock/issues/57 

1856 try: 

1857 self.lockfile.acquire(timeout=0.001) 

1858 # see https://github.com/PyCQA/pycodestyle/issues/703 

1859 except: # noqa: E722 

1860 # No lock, no loading, no need to fail in __del__ 

1861 self.lockfile = None 

1862 raise 

1863 

1864 # Always enabled as committing to a file for every received request 

1865 # would be a terrible burden. 

1866 self.echo_recovery = secrets.token_bytes(8) 

1867 

1868 try: 

1869 self._load() 

1870 except KeyError as k: 

1871 raise self.LoadError("Configuration key missing: %s" % (k.args[0],)) 

1872 

1873 self.sequence_number_chunksize_start = sequence_number_chunksize_start 

1874 self.sequence_number_chunksize_limit = sequence_number_chunksize_limit 

1875 self.sequence_number_chunksize = sequence_number_chunksize_start 

1876 

1877 self.sequence_number_persisted = self.sender_sequence_number 

1878 

1879 def _load(self): 

1880 # doesn't check for KeyError on every occasion, relies on __init__ to 

1881 # catch that 

1882 

1883 data = {} 

1884 for readfile in ("secret.json", "settings.json"): 

1885 try: 

1886 with open(os.path.join(self.basedir, readfile)) as f: 

1887 filedata = json.load(f) 

1888 except FileNotFoundError: 

1889 continue 

1890 

1891 for key, value in filedata.items(): 

1892 if key.endswith("_hex"): 

1893 key = key[:-4] 

1894 value = binascii.unhexlify(value) 

1895 elif key.endswith("_ascii"): 

1896 key = key[:-6] 

1897 value = value.encode("ascii") 

1898 

1899 if key in data: 

1900 raise self.LoadError( 

1901 "Datum %r present in multiple input files at %r." 

1902 % (key, self.basedir) 

1903 ) 

1904 

1905 data[key] = value 

1906 

1907 self.alg_aead = algorithms[data.get("algorithm", DEFAULT_ALGORITHM)] 

1908 self.hashfun = hashfunctions[data.get("kdf-hashfun", DEFAULT_HASHFUNCTION)] 

1909 

1910 windowsize = data.get("window", DEFAULT_WINDOWSIZE) 

1911 if not isinstance(windowsize, int): 

1912 raise self.LoadError("Non-integer replay window") 

1913 

1914 self.sender_id = data["sender-id"] 

1915 self.recipient_id = data["recipient-id"] 

1916 

1917 if ( 

1918 max(len(self.sender_id), len(self.recipient_id)) 

1919 > self.alg_aead.iv_bytes - 6 

1920 ): 

1921 raise self.LoadError( 

1922 "Sender or Recipient ID too long (maximum length %s for this algorithm)" 

1923 % (self.alg_aead.iv_bytes - 6) 

1924 ) 

1925 

1926 master_secret = data["secret"] 

1927 master_salt = data.get("salt", b"") 

1928 self.id_context = data.get("id-context", None) 

1929 

1930 self.derive_keys(master_salt, master_secret) 

1931 

1932 self.recipient_replay_window = ReplayWindow( 

1933 windowsize, self._replay_window_changed 

1934 ) 

1935 try: 

1936 with open(os.path.join(self.basedir, "sequence.json")) as f: 

1937 sequence = json.load(f) 

1938 except FileNotFoundError: 

1939 self.sender_sequence_number = 0 

1940 self.recipient_replay_window.initialize_empty() 

1941 self.replay_window_persisted = True 

1942 else: 

1943 self.sender_sequence_number = int(sequence["next-to-send"]) 

1944 received = sequence["received"] 

1945 if received == "unknown": 

1946 # The replay window will stay uninitialized, which triggers 

1947 # Echo recovery 

1948 self.replay_window_persisted = False 

1949 else: 

1950 try: 

1951 self.recipient_replay_window.initialize_from_persisted(received) 

1952 except (ValueError, TypeError, KeyError): 

1953 # Not being particularly careful about what could go wrong: If 

1954 # someone tampers with the replay data, we're already in *big* 

1955 # trouble, of which I fail to see how it would become worse 

1956 # than a crash inside the application around "failure to 

1957 # right-shift a string" or that like; at worst it'd result in 

1958 # nonce reuse which tampering with the replay window file 

1959 # already does. 

1960 raise self.LoadError( 

1961 "Persisted replay window state was not understood" 

1962 ) 

1963 self.replay_window_persisted = True 

1964 

1965 # This is called internally whenever a new sequence number is taken or 

1966 # crossed out from the window, and blocks a lot; B.1 mode mitigates that. 

1967 # 

1968 # Making it async and block in a threadpool would mitigate the blocking of 

1969 # other messages, but the more visible effect of this will be that no 

1970 # matter if sync or async, a reply will need to wait for a file sync 

1971 # operation to conclude. 

1972 def _store(self): 

1973 tmphand, tmpnam = tempfile.mkstemp( 

1974 dir=self.basedir, prefix=".sequence-", suffix=".json", text=True 

1975 ) 

1976 

1977 data = {"next-to-send": self.sequence_number_persisted} 

1978 if not self.replay_window_persisted: 

1979 data["received"] = "unknown" 

1980 else: 

1981 data["received"] = self.recipient_replay_window.persist() 

1982 

1983 # Using io.open (instead os.fdopen) and binary / write with encode 

1984 # rather than dumps as that works even while the interpreter is 

1985 # shutting down. 

1986 # 

1987 # This can be relaxed when there is a defined shutdown sequence for 

1988 # security contexts that's triggered from the general context shutdown 

1989 # -- but right now, there isn't. 

1990 with io.open(tmphand, "wb") as tmpfile: 

1991 tmpfile.write(json.dumps(data).encode("utf8")) 

1992 tmpfile.flush() 

1993 os.fsync(tmpfile.fileno()) 

1994 

1995 os.replace(tmpnam, os.path.join(self.basedir, "sequence.json")) 

1996 

1997 def _replay_window_changed(self): 

1998 if self.replay_window_persisted: 

1999 # Just remove the sequence numbers once from the file 

2000 self.replay_window_persisted = False 

2001 self._store() 

2002 

2003 def post_seqnoincrease(self): 

2004 if self.sender_sequence_number > self.sequence_number_persisted: 

2005 self.sequence_number_persisted += self.sequence_number_chunksize 

2006 

2007 self.sequence_number_chunksize = min( 

2008 self.sequence_number_chunksize * 2, self.sequence_number_chunksize_limit 

2009 ) 

2010 # FIXME: this blocks -- see https://github.com/chrysn/aiocoap/issues/178 

2011 self._store() 

2012 

2013 # The = case would only happen if someone deliberately sets all 

2014 # numbers to 1 to force persisting on every step 

2015 assert self.sender_sequence_number <= self.sequence_number_persisted, ( 

2016 "Using a sequence number that has been persisted already" 

2017 ) 

2018 

2019 def _destroy(self): 

2020 """Release the lock file, and ensure that the object has become 

2021 unusable. 

2022 

2023 If there is unpersisted state from B.1 operation, the actually used 

2024 number and replay window gets written back to the file to allow 

2025 resumption without wasting digits or round-trips. 

2026 """ 

2027 # FIXME: Arrange for a more controlled shutdown through the credentials 

2028 

2029 self.replay_window_persisted = True 

2030 self.sequence_number_persisted = self.sender_sequence_number 

2031 self._store() 

2032 

2033 del self.sender_key 

2034 del self.recipient_key 

2035 

2036 os.unlink(self.lockfile.lock_file) 

2037 self.lockfile.release() 

2038 

2039 self.lockfile = None 

2040 

2041 def __del__(self): 

2042 if self.lockfile is not None: 

2043 self._destroy() 

2044 

2045 @classmethod 

2046 def from_item(cls, init_data): 

2047 """Overriding _Objectish's from_item because the parameter name for 

2048 basedir is contextfile for historical reasons""" 

2049 

2050 def constructor( 

2051 basedir: Optional[str] = None, contextfile: Optional[str] = None 

2052 ): 

2053 if basedir is not None and contextfile is not None: 

2054 raise credentials.CredentialsLoadError( 

2055 "Conflicting arguments basedir and contextfile; just contextfile instead" 

2056 ) 

2057 if basedir is None and contextfile is None: 

2058 raise credentials.CredentialsLoadError("Missing item 'basedir'") 

2059 if contextfile is not None: 

2060 warnings.warn( 

2061 "Property contextfile was renamed to basedir in OSCORE credentials entries", 

2062 DeprecationWarning, 

2063 stacklevel=2, 

2064 ) 

2065 basedir = contextfile 

2066 assert ( 

2067 basedir is not None 

2068 ) # This helps mypy which would otherwise not see that the above ensures this already 

2069 return cls(basedir) 

2070 

2071 return credentials._call_from_structureddata( 

2072 constructor, cls.__name__, init_data 

2073 ) 

2074 

2075 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

2076 return set((self.recipient_id,)) 

2077 

2078 

2079class GroupContext(ContextWhereExternalAadIsGroup, BaseSecurityContext): 

2080 is_signing = True 

2081 responses_send_kid = True 

2082 

2083 @abc.abstractproperty 

2084 def private_key(self): 

2085 """Private key used to sign outgoing messages. 

2086 

2087 Contexts not designed to send messages may raise a RuntimeError here; 

2088 that necessity may later go away if some more accurate class modelling 

2089 is found.""" 

2090 

2091 @abc.abstractproperty 

2092 def recipient_public_key(self): 

2093 """Public key used to verify incoming messages. 

2094 

2095 Contexts not designed to receive messages (because they'd have aspects 

2096 for that) may raise a RuntimeError here; that necessity may later go 

2097 away if some more accurate class modelling is found.""" 

2098 

2099 

2100class SimpleGroupContext(GroupContext, CanProtect, CanUnprotect, SecurityContextUtils): 

2101 """A context for an OSCORE group 

2102 

2103 This is a non-persistable version of a group context that does not support 

2104 any group manager or rekeying; it is set up statically at startup. 

2105 

2106 It is intended for experimentation and demos, but aims to be correct enough 

2107 to be usable securely. 

2108 """ 

2109 

2110 # set during initialization (making all those attributes rather than 

2111 # possibly properties as they might be in super) 

2112 sender_id = None # type: ignore 

2113 id_context = None # type: ignore 

2114 private_key = None 

2115 alg_aead = None 

2116 hashfun = None # type: ignore 

2117 alg_signature = None 

2118 alg_group_enc = None 

2119 alg_pairwise_key_agreement = None 

2120 sender_auth_cred = None # type: ignore 

2121 group_manager_cred = None # type: ignore 

2122 cred_fmt = None 

2123 # This is currently not evaluated, but any GM interaction will need to have this information available. 

2124 group_manager_cred_fmt = None 

2125 

2126 def __init__( 

2127 self, 

2128 alg_aead, 

2129 hashfun, 

2130 alg_signature, 

2131 alg_group_enc, 

2132 alg_pairwise_key_agreement, 

2133 group_id, 

2134 master_secret, 

2135 master_salt, 

2136 sender_id, 

2137 private_key, 

2138 sender_auth_cred, 

2139 peers, 

2140 group_manager_cred, 

2141 cred_fmt=COSE_KCCS, 

2142 group_manager_cred_fmt=COSE_KCCS, 

2143 ): 

2144 self.sender_id = sender_id 

2145 self.id_context = group_id 

2146 self.private_key = private_key 

2147 self.alg_aead = alg_aead 

2148 self.hashfun = hashfun 

2149 self.alg_signature = alg_signature 

2150 self.alg_group_enc = alg_group_enc 

2151 self.alg_pairwise_key_agreement = alg_pairwise_key_agreement 

2152 self.sender_auth_cred = sender_auth_cred 

2153 self.group_manager_cred = group_manager_cred 

2154 self.cred_fmt = cred_fmt 

2155 self.group_manager_cred_fmt = group_manager_cred_fmt 

2156 

2157 self.peers = peers.keys() 

2158 self.recipient_public_keys = { 

2159 k: self._parse_credential(v) for (k, v) in peers.items() 

2160 } 

2161 self.recipient_auth_creds = peers 

2162 self.recipient_replay_windows = {} 

2163 for k in self.peers: 

2164 # no need to persist, the whole group is ephemeral 

2165 w = ReplayWindow(32, lambda: None) 

2166 w.initialize_empty() 

2167 self.recipient_replay_windows[k] = w 

2168 

2169 self.derive_keys(master_salt, master_secret) 

2170 self.sender_sequence_number = 0 

2171 

2172 sender_public_key = self._parse_credential(sender_auth_cred) 

2173 if ( 

2174 self.alg_signature.public_from_private(self.private_key) 

2175 != sender_public_key 

2176 ): 

2177 raise ValueError( 

2178 "The key in the provided sender credential does not match the private key" 

2179 ) 

2180 

2181 def _parse_credential(self, credential: bytes | _DeterministicKey): 

2182 """Extract the public key (in the public_key format the respective 

2183 AlgorithmCountersign needs) from credentials. This raises a ValueError 

2184 if the credentials do not match the group's cred_fmt, or if the 

2185 parameters do not match those configured in the group. 

2186 

2187 This currently discards any information that is present in the 

2188 credential that exceeds the key. (In a future version, this could 

2189 return both the key and extracted other data, where that other data 

2190 would be stored with the peer this is parsed from). 

2191 """ 

2192 

2193 if credential is DETERMINISTIC_KEY: 

2194 return credential 

2195 

2196 if self.cred_fmt != COSE_KCCS: 

2197 raise ValueError( 

2198 "Credential parsing is currently only implemented for CCSs" 

2199 ) 

2200 

2201 assert self.alg_signature is not None 

2202 

2203 return self.alg_signature.from_kccs(credential) 

2204 

2205 def __repr__(self): 

2206 return "<%s with group %r sender_id %r and %d peers>" % ( 

2207 type(self).__name__, 

2208 self.id_context.hex(), 

2209 self.sender_id.hex(), 

2210 len(self.peers), 

2211 ) 

2212 

2213 @property 

2214 def recipient_public_key(self): 

2215 raise RuntimeError( 

2216 "Group context without key indication was used for verification" 

2217 ) 

2218 

2219 def _get_sender_key(self, outer_message, aad, plaintext, request_id): 

2220 # If we even get here, there has to be a alg_group_enc, and thus the sender key does match it 

2221 return self._sender_key 

2222 

2223 def derive_keys(self, master_salt, master_secret): 

2224 the_main_alg = ( 

2225 self.alg_group_enc if self.alg_group_enc is not None else self.alg_aead 

2226 ) 

2227 

2228 self._sender_key = self._kdf( 

2229 master_salt, master_secret, self.sender_id, "Key", the_main_alg 

2230 ) 

2231 self.recipient_keys = { 

2232 recipient_id: self._kdf( 

2233 master_salt, master_secret, recipient_id, "Key", the_main_alg 

2234 ) 

2235 for recipient_id in self.peers 

2236 } 

2237 

2238 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV") 

2239 

2240 self.signature_encryption_key = self._kdf( 

2241 master_salt, master_secret, b"", "SEKey" 

2242 ) 

2243 

2244 def post_seqnoincrease(self): 

2245 """No-op because it's ephemeral""" 

2246 

2247 def context_from_response(self, unprotected_bag) -> CanUnprotect: 

2248 # sender ID *needs to be* here -- if this were a pairwise request, it 

2249 # would not run through here 

2250 try: 

2251 sender_kid = unprotected_bag[COSE_KID] 

2252 except KeyError: 

2253 raise DecodeError("Group server failed to send own sender KID") 

2254 

2255 if COSE_COUNTERSIGNATURE0 in unprotected_bag: 

2256 return _GroupContextAspect(self, sender_kid) 

2257 else: 

2258 return _PairwiseContextAspect(self, sender_kid) 

2259 

2260 def get_oscore_context_for(self, unprotected): 

2261 if unprotected.get(COSE_KID_CONTEXT, None) != self.id_context: 

2262 return None 

2263 

2264 kid = unprotected.get(COSE_KID, None) 

2265 if kid in self.peers: 

2266 if COSE_COUNTERSIGNATURE0 in unprotected: 

2267 return _GroupContextAspect(self, kid) 

2268 elif self.recipient_public_keys[kid] is DETERMINISTIC_KEY: 

2269 return _DeterministicUnprotectProtoAspect(self, kid) 

2270 else: 

2271 return _PairwiseContextAspect(self, kid) 

2272 

2273 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

2274 # not conflicting: groups always send KID Context 

2275 return set() 

2276 

2277 # yet to stabilize... 

2278 

2279 def pairwise_for(self, recipient_id): 

2280 return _PairwiseContextAspect(self, recipient_id) 

2281 

2282 def for_sending_deterministic_requests( 

2283 self, deterministic_id, target_server: Optional[bytes] 

2284 ): 

2285 return _DeterministicProtectProtoAspect(self, deterministic_id, target_server) 

2286 

2287 

2288class _GroupContextAspect(GroupContext, CanUnprotect, SecurityContextUtils): 

2289 """The concrete context this host has with a particular peer 

2290 

2291 As all actual data is stored in the underlying groupcontext, this acts as 

2292 an accessor to that object (which picks the right recipient key). 

2293 

2294 This accessor is for receiving messages in group mode from a particular 

2295 peer; it does not send (and turns into a pairwise context through 

2296 context_for_response before it comes to that). 

2297 """ 

2298 

2299 def __init__(self, groupcontext: GroupContext, recipient_id: bytes) -> None: 

2300 self.groupcontext = groupcontext 

2301 self.recipient_id = recipient_id 

2302 

2303 def __repr__(self): 

2304 return "<%s inside %r with the peer %r>" % ( 

2305 type(self).__name__, 

2306 self.groupcontext, 

2307 self.recipient_id.hex(), 

2308 ) 

2309 

2310 private_key = None 

2311 

2312 # not inline because the equivalent lambda would not be recognized by mypy 

2313 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2314 @property 

2315 def id_context(self): 

2316 return self.groupcontext.id_context 

2317 

2318 @property 

2319 def alg_aead(self): 

2320 return self.groupcontext.alg_aead 

2321 

2322 @property 

2323 def alg_signature(self): 

2324 return self.groupcontext.alg_signature 

2325 

2326 @property 

2327 def alg_group_enc(self): 

2328 return self.groupcontext.alg_group_enc 

2329 

2330 @property 

2331 def alg_pairwise_key_agreement(self): 

2332 return self.groupcontext.alg_pairwise_key_agreement 

2333 

2334 @property 

2335 def group_manager_cred(self): 

2336 return self.groupcontext.group_manager_cred 

2337 

2338 @property 

2339 def common_iv(self): 

2340 return self.groupcontext.common_iv 

2341 

2342 @property 

2343 def hashfun(self): 

2344 return self.groupcontext.hashfun 

2345 

2346 @property 

2347 def signature_encryption_key(self): 

2348 return self.groupcontext.signature_encryption_key 

2349 

2350 @property 

2351 def recipient_key(self): 

2352 # If we even get here, there has to be a alg_group_enc, and thus the recipient key does match it 

2353 return self.groupcontext.recipient_keys[self.recipient_id] 

2354 

2355 @property 

2356 def recipient_public_key(self): 

2357 return self.groupcontext.recipient_public_keys[self.recipient_id] 

2358 

2359 @property 

2360 def recipient_auth_cred(self): 

2361 return self.groupcontext.recipient_auth_creds[self.recipient_id] 

2362 

2363 @property 

2364 def recipient_replay_window(self): 

2365 return self.groupcontext.recipient_replay_windows[self.recipient_id] 

2366 

2367 def context_for_response(self): 

2368 return self.groupcontext.pairwise_for(self.recipient_id) 

2369 

2370 @property 

2371 def sender_auth_cred(self): 

2372 raise RuntimeError( 

2373 "Could relay the sender auth credential from the group context, but it shouldn't matter here" 

2374 ) 

2375 

2376 

2377class _PairwiseContextAspect( 

2378 GroupContext, CanProtect, CanUnprotect, SecurityContextUtils 

2379): 

2380 is_signing = False 

2381 

2382 def __init__(self, groupcontext, recipient_id): 

2383 self.groupcontext = groupcontext 

2384 self.recipient_id = recipient_id 

2385 

2386 shared_secret = self.alg_pairwise_key_agreement.staticstatic( 

2387 self.groupcontext.private_key, 

2388 self.groupcontext.recipient_public_keys[recipient_id], 

2389 ) 

2390 

2391 self.sender_key = self._kdf( 

2392 self.groupcontext._sender_key, 

2393 ( 

2394 self.groupcontext.sender_auth_cred 

2395 + self.groupcontext.recipient_auth_creds[recipient_id] 

2396 + shared_secret 

2397 ), 

2398 self.groupcontext.sender_id, 

2399 "Key", 

2400 self.alg_group_enc if self.is_signing else self.alg_aead, 

2401 ) 

2402 self.recipient_key = self._kdf( 

2403 self.groupcontext.recipient_keys[recipient_id], 

2404 ( 

2405 self.groupcontext.recipient_auth_creds[recipient_id] 

2406 + self.groupcontext.sender_auth_cred 

2407 + shared_secret 

2408 ), 

2409 self.recipient_id, 

2410 "Key", 

2411 self.alg_group_enc if self.is_signing else self.alg_aead, 

2412 ) 

2413 

2414 def __repr__(self): 

2415 return "<%s based on %r with the peer %r>" % ( 

2416 type(self).__name__, 

2417 self.groupcontext, 

2418 self.recipient_id.hex(), 

2419 ) 

2420 

2421 # FIXME: actually, only to be sent in requests 

2422 

2423 # not inline because the equivalent lambda would not be recognized by mypy 

2424 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2425 @property 

2426 def id_context(self): 

2427 return self.groupcontext.id_context 

2428 

2429 @property 

2430 def alg_aead(self): 

2431 return self.groupcontext.alg_aead 

2432 

2433 @property 

2434 def hashfun(self): 

2435 return self.groupcontext.hashfun 

2436 

2437 @property 

2438 def alg_signature(self): 

2439 return self.groupcontext.alg_signature 

2440 

2441 @property 

2442 def alg_group_enc(self): 

2443 return self.groupcontext.alg_group_enc 

2444 

2445 @property 

2446 def alg_pairwise_key_agreement(self): 

2447 return self.groupcontext.alg_pairwise_key_agreement 

2448 

2449 @property 

2450 def group_manager_cred(self): 

2451 return self.groupcontext.group_manager_cred 

2452 

2453 @property 

2454 def common_iv(self): 

2455 return self.groupcontext.common_iv 

2456 

2457 @property 

2458 def sender_id(self): 

2459 return self.groupcontext.sender_id 

2460 

2461 @property 

2462 def recipient_auth_cred(self): 

2463 return self.groupcontext.recipient_auth_creds[self.recipient_id] 

2464 

2465 @property 

2466 def sender_auth_cred(self): 

2467 return self.groupcontext.sender_auth_cred 

2468 

2469 @property 

2470 def recipient_replay_window(self): 

2471 return self.groupcontext.recipient_replay_windows[self.recipient_id] 

2472 

2473 # Set at initialization (making all those attributes rather than 

2474 # possibly properties as they might be in super) 

2475 recipient_key = None # type: ignore 

2476 sender_key = None 

2477 

2478 @property 

2479 def sender_sequence_number(self): 

2480 return self.groupcontext.sender_sequence_number 

2481 

2482 @sender_sequence_number.setter 

2483 def sender_sequence_number(self, new): 

2484 self.groupcontext.sender_sequence_number = new 

2485 

2486 def post_seqnoincrease(self): 

2487 self.groupcontext.post_seqnoincrease() 

2488 

2489 # same here -- not needed because not signing 

2490 private_key = property(post_seqnoincrease) 

2491 recipient_public_key = property(post_seqnoincrease) 

2492 

2493 def context_from_response(self, unprotected_bag) -> CanUnprotect: 

2494 if unprotected_bag.get(COSE_KID, self.recipient_id) != self.recipient_id: 

2495 raise DecodeError( 

2496 "Response coming from a different server than requested, not attempting to decrypt" 

2497 ) 

2498 

2499 if COSE_COUNTERSIGNATURE0 in unprotected_bag: 

2500 # It'd be an odd thing to do, but it's source verified, so the 

2501 # server hopefully has reasons to make this readable to other group 

2502 # members. 

2503 return _GroupContextAspect(self.groupcontext, self.recipient_id) 

2504 else: 

2505 return self 

2506 

2507 

2508class _DeterministicProtectProtoAspect( 

2509 ContextWhereExternalAadIsGroup, CanProtect, SecurityContextUtils 

2510): 

2511 """This implements the sending side of Deterministic Requests. 

2512 

2513 While similar to a _PairwiseContextAspect, it only derives the key at 

2514 protection time, as the plain text is hashed into the key.""" 

2515 

2516 deterministic_hashfun = hashes.SHA256() 

2517 

2518 def __init__(self, groupcontext, sender_id, target_server: Optional[bytes]): 

2519 self.groupcontext = groupcontext 

2520 self.sender_id = sender_id 

2521 self.target_server = target_server 

2522 

2523 def __repr__(self): 

2524 return "<%s based on %r with the sender ID %r%s>" % ( 

2525 type(self).__name__, 

2526 self.groupcontext, 

2527 self.sender_id.hex(), 

2528 "limited to responses from %s" % self.target_server 

2529 if self.target_server is not None 

2530 else "", 

2531 ) 

2532 

2533 def new_sequence_number(self): 

2534 return 0 

2535 

2536 def post_seqnoincrease(self): 

2537 pass 

2538 

2539 def context_from_response(self, unprotected_bag): 

2540 if self.target_server is None: 

2541 if COSE_KID not in unprotected_bag: 

2542 raise DecodeError( 

2543 "Server did not send a KID and no particular one was addressed" 

2544 ) 

2545 else: 

2546 if unprotected_bag.get(COSE_KID, self.target_server) != self.target_server: 

2547 raise DecodeError( 

2548 "Response coming from a different server than requested, not attempting to decrypt" 

2549 ) 

2550 

2551 if COSE_COUNTERSIGNATURE0 not in unprotected_bag: 

2552 # Could just as well pass and later barf when the group context doesn't find a signature 

2553 raise DecodeError( 

2554 "Response to deterministic request came from insecure pairwise context" 

2555 ) 

2556 

2557 return _GroupContextAspect( 

2558 self.groupcontext, unprotected_bag.get(COSE_KID, self.target_server) 

2559 ) 

2560 

2561 def _get_sender_key(self, outer_message, aad, plaintext, request_id): 

2562 if outer_message.code.is_response(): 

2563 raise RuntimeError("Deterministic contexts shouldn't protect responses") 

2564 

2565 basekey = self.groupcontext.recipient_keys[self.sender_id] 

2566 

2567 h = hashes.Hash(self.deterministic_hashfun) 

2568 h.update(basekey) 

2569 h.update(aad) 

2570 h.update(plaintext) 

2571 request_hash = h.finalize() 

2572 

2573 outer_message.opt.request_hash = request_hash 

2574 outer_message.code = FETCH 

2575 

2576 # By this time, the AADs have all been calculated already; setting this 

2577 # for the benefit of the response parsing later 

2578 request_id.request_hash = request_hash 

2579 # FIXME I don't think this ever comes to bear but want to be sure 

2580 # before removing this line (this should only be client-side) 

2581 request_id.can_reuse_nonce = False 

2582 # FIXME: we're still sending a h'00' PIV. Not wrong, just a wasted byte. 

2583 

2584 return self._kdf(basekey, request_hash, self.sender_id, "Key", self.alg_aead) 

2585 

2586 # details needed for various operations, especially eAAD generation 

2587 

2588 @property 

2589 def sender_auth_cred(self): 

2590 # When preparing the external_aad, the element 'sender_cred' in the 

2591 # aad_array takes the empty CBOR byte string (0x40). 

2592 return b"" 

2593 

2594 # not inline because the equivalent lambda would not be recognized by mypy 

2595 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2596 @property 

2597 def alg_aead(self): 

2598 return self.groupcontext.alg_aead 

2599 

2600 @property 

2601 def alg_group_enc(self): 

2602 return self.groupcontext.alg_group_enc 

2603 

2604 @property 

2605 def alg_pairwise_key_agreement(self): 

2606 return self.groupcontext.alg_pairwise_key_agreement 

2607 

2608 @property 

2609 def hashfun(self): 

2610 return self.groupcontext.hashfun 

2611 

2612 @property 

2613 def common_iv(self): 

2614 return self.groupcontext.common_iv 

2615 

2616 @property 

2617 def id_context(self): 

2618 return self.groupcontext.id_context 

2619 

2620 @property 

2621 def alg_signature(self): 

2622 return self.groupcontext.alg_signature 

2623 

2624 @property 

2625 def group_manager_cred(self): 

2626 return self.groupcontext.group_manager_cred 

2627 

2628 

2629class _DeterministicUnprotectProtoAspect( 

2630 ContextWhereExternalAadIsGroup, CanUnprotect, SecurityContextUtils 

2631): 

2632 """This implements the sending side of Deterministic Requests. 

2633 

2634 While similar to a _PairwiseContextAspect, it only derives the key at 

2635 unprotection time, based on information given as Request-Hash.""" 

2636 

2637 # Unless None, this is the value by which the running process recognizes 

2638 # that the second phase of a B.1.2 replay window recovery Echo option comes 

2639 # from the current process, and thus its sequence number is fresh 

2640 echo_recovery = None 

2641 

2642 deterministic_hashfun = hashes.SHA256() 

2643 

2644 class ZeroIsAlwaysValid: 

2645 """Special-purpose replay window that accepts 0 indefinitely""" 

2646 

2647 def is_initialized(self): 

2648 return True 

2649 

2650 def is_valid(self, number): 

2651 # No particular reason to be lax here 

2652 return number == 0 

2653 

2654 def strike_out(self, number): 

2655 # FIXME: I'd rather indicate here that it's a potential replay, have the 

2656 # request_id.can_reuse_nonce = False 

2657 # set here rather than in _post_decrypt_checks, and thus also get 

2658 # the check for whether it's a safe method 

2659 pass 

2660 

2661 def persist(self): 

2662 pass 

2663 

2664 def __init__(self, groupcontext, recipient_id): 

2665 self.groupcontext = groupcontext 

2666 self.recipient_id = recipient_id 

2667 

2668 self.recipient_replay_window = self.ZeroIsAlwaysValid() 

2669 

2670 def __repr__(self): 

2671 return "<%s based on %r with the recipient ID %r>" % ( 

2672 type(self).__name__, 

2673 self.groupcontext, 

2674 self.recipient_id.hex(), 

2675 ) 

2676 

2677 def context_for_response(self): 

2678 return self.groupcontext 

2679 

2680 def _get_recipient_key(self, protected_message, algorithm): 

2681 logging.critical( 

2682 "Deriving recipient key for protected message %s", protected_message 

2683 ) 

2684 return self._kdf( 

2685 self.groupcontext.recipient_keys[self.recipient_id], 

2686 protected_message.opt.request_hash, 

2687 self.recipient_id, 

2688 "Key", 

2689 algorithm, 

2690 ) 

2691 

2692 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id): 

2693 if plaintext[0] not in (GET, FETCH): # FIXME: "is safe" 

2694 # FIXME: accept but return inner Unauthorized. (Raising Unauthorized 

2695 # here would just create an unprotected Unauthorized, which is not 

2696 # what's spec'd for here) 

2697 raise ProtectionInvalid("Request was not safe") 

2698 

2699 basekey = self.groupcontext.recipient_keys[self.recipient_id] 

2700 

2701 h = hashes.Hash(self.deterministic_hashfun) 

2702 h.update(basekey) 

2703 h.update(aad) 

2704 h.update(plaintext) 

2705 request_hash = h.finalize() 

2706 

2707 if request_hash != protected_message.opt.request_hash: 

2708 raise ProtectionInvalid( 

2709 "Client's hash of the plaintext diverges from the actual request hash" 

2710 ) 

2711 

2712 # This is intended for the protection of the response, and the 

2713 # later use in signature in the unprotect function is not happening 

2714 # here anyway, neither is the later use for Echo requests 

2715 request_id.request_hash = request_hash 

2716 request_id.can_reuse_nonce = False 

2717 

2718 # details needed for various operations, especially eAAD generation 

2719 

2720 @property 

2721 def recipient_auth_cred(self): 

2722 # When preparing the external_aad, the element 'sender_cred' in the 

2723 # aad_array takes the empty CBOR byte string (0x40). 

2724 return b"" 

2725 

2726 # not inline because the equivalent lambda would not be recognized by mypy 

2727 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2728 @property 

2729 def alg_aead(self): 

2730 return self.groupcontext.alg_aead 

2731 

2732 @property 

2733 def alg_group_enc(self): 

2734 return self.groupcontext.alg_group_enc 

2735 

2736 @property 

2737 def alg_pairwise_key_agreement(self): 

2738 return self.groupcontext.alg_pairwise_key_agreement 

2739 

2740 @property 

2741 def hashfun(self): 

2742 return self.groupcontext.hashfun 

2743 

2744 @property 

2745 def common_iv(self): 

2746 return self.groupcontext.common_iv 

2747 

2748 @property 

2749 def id_context(self): 

2750 return self.groupcontext.id_context 

2751 

2752 @property 

2753 def alg_signature(self): 

2754 return self.groupcontext.alg_signature 

2755 

2756 @property 

2757 def group_manager_cred(self): 

2758 return self.groupcontext.group_manager_cred 

2759 

2760 

2761def verify_start(message): 

2762 """Extract the unprotected COSE options from a 

2763 message for the verifier to then pick a security context to actually verify 

2764 the message. (Future versions may also report fields from both unprotected 

2765 and protected, if the protected bag is ever used with OSCORE.). 

2766 

2767 Call this only requests; for responses, you'll have to know the security 

2768 context anyway, and there is usually no information to be gained.""" 

2769 

2770 _, _, unprotected, _ = CanUnprotect._extract_encrypted0(message) 

2771 

2772 return unprotected 

2773 

2774 

2775_getattr__ = deprecation_getattr( 

2776 { 

2777 "COSE_COUNTERSINGATURE0": "COSE_COUNTERSIGNATURE0", 

2778 "Algorithm": "AeadAlgorithm", 

2779 }, 

2780 globals(), 

2781)