Coverage for aiocoap/oscore.py: 85%

1126 statements  

« prev     ^ index     » next       coverage.py v7.6.3, created at 2024-10-15 22:10 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module contains the tools to send OSCORE secured messages. 

6 

7It only deals with the algorithmic parts, the security context and protection 

8and unprotection of messages. It does not touch on the integration of OSCORE in 

9the larger aiocoap stack of having a context or requests; that's what 

10:mod:`aiocoap.transports.osore` is for.`""" 

11 

12from __future__ import annotations 

13 

14from collections import namedtuple 

15import io 

16import json 

17import binascii 

18import os 

19import os.path 

20import tempfile 

21import abc 

22from typing import Optional, List, Any, Tuple 

23import secrets 

24import warnings 

25 

26from aiocoap.message import Message 

27from aiocoap.util import cryptography_additions, deprecation_getattr, Sentinel 

28from aiocoap.numbers import GET, POST, FETCH, CHANGED, UNAUTHORIZED, CONTENT 

29from aiocoap import error 

30from . import credentials 

31 

32from cryptography.hazmat.primitives.ciphers import aead 

33from cryptography.hazmat.primitives.kdf.hkdf import HKDF 

34from cryptography.hazmat.primitives import ciphers, hashes 

35import cryptography.hazmat.backends 

36import cryptography.exceptions 

37from cryptography.hazmat.primitives import asymmetric, serialization 

38from cryptography.hazmat.primitives.asymmetric.utils import ( 

39 decode_dss_signature, 

40 encode_dss_signature, 

41) 

42 

43import cbor2 as cbor 

44 

45import filelock 

46 

47MAX_SEQNO = 2**40 - 1 

48 

49# Relevant values from the IANA registry "CBOR Object Signing and Encryption (COSE)" 

50COSE_KID = 4 

51COSE_PIV = 6 

52COSE_KID_CONTEXT = 10 

53# from RFC9338 

54COSE_COUNTERSIGNATURE0 = 12 

55# from RFC9528 

56COSE_KCCS = 14 

57 

58COMPRESSION_BITS_N = 0b111 

59COMPRESSION_BIT_K = 0b1000 

60COMPRESSION_BIT_H = 0b10000 

61COMPRESSION_BIT_GROUP = 0b100000 # Group Flag from draft-ietf-core-oscore-groupcomm-21 

62COMPRESSION_BITS_RESERVED = 0b11000000 

63 

64CWT_CLAIM_CNF = 8 

65CWT_CNF_COSE_KEY = 1 

66COSE_KEY_COMMON_KTY = 1 

67COSE_KTY_OKP = 1 

68COSE_KTY_EC2 = 2 

69COSE_KEY_COMMON_ALG = 3 

70COSE_KEY_OKP_CRV = -1 

71COSE_KEY_OKP_X = -2 

72COSE_KEY_EC2_X = -2 

73COSE_KEY_EC2_Y = -3 

74 

75# While the original values were simple enough to be used in literals, starting 

76# with oscore-groupcomm we're using more compact values 

77 

78INFO_TYPE_KEYSTREAM_REQUEST = True 

79INFO_TYPE_KEYSTREAM_RESPONSE = False 

80 

81PRESENT_BUT_NO_VALUE_YET = Sentinel("Value will be populated later") 

82 

83 

84class CodeStyle(namedtuple("_CodeStyle", ("request", "response"))): 

85 FETCH_CONTENT: CodeStyle 

86 POST_CHANGED: CodeStyle 

87 

88 @classmethod 

89 def from_request(cls, request) -> CodeStyle: 

90 if request == FETCH: 

91 return cls.FETCH_CONTENT 

92 elif request == POST: 

93 return cls.POST_CHANGED 

94 else: 

95 raise ValueError("Invalid request code %r" % request) 

96 

97 

98CodeStyle.FETCH_CONTENT = CodeStyle(FETCH, CONTENT) 

99CodeStyle.POST_CHANGED = CodeStyle(POST, CHANGED) 

100 

101 

102class DeterministicKey: 

103 """Singleton to indicate that for this key member no public or private key 

104 is available because it is the Deterministic Client (see 

105 <https://www.ietf.org/archive/id/draft-amsuess-core-cachable-oscore-01.html>) 

106 

107 This is highly experimental not only from an implementation but also from a 

108 specification point of view. The specification has not received adaequate 

109 review that would justify using it in any non-experimental scenario. 

110 """ 

111 

112 

113DETERMINISTIC_KEY = DeterministicKey() 

114del DeterministicKey 

115 

116 

117class NotAProtectedMessage(error.Error, ValueError): 

118 """Raised when verification is attempted on a non-OSCORE message""" 

119 

120 def __init__(self, message, plain_message): 

121 super().__init__(message) 

122 self.plain_message = plain_message 

123 

124 

125class ProtectionInvalid(error.Error, ValueError): 

126 """Raised when verification of an OSCORE message fails""" 

127 

128 

129class DecodeError(ProtectionInvalid): 

130 """Raised when verification of an OSCORE message fails because CBOR or compressed data were erroneous""" 

131 

132 

133class ReplayError(ProtectionInvalid): 

134 """Raised when verification of an OSCORE message fails because the sequence numbers was already used""" 

135 

136 

137class ReplayErrorWithEcho(ProtectionInvalid, error.RenderableError): 

138 """Raised when verification of an OSCORE message fails because the 

139 recipient replay window is uninitialized, but a 4.01 Echo can be 

140 constructed with the data in the exception that can lead to the client 

141 assisting in replay window recovery""" 

142 

143 def __init__(self, secctx, request_id, echo): 

144 self.secctx = secctx 

145 self.request_id = request_id 

146 self.echo = echo 

147 

148 def to_message(self): 

149 inner = Message( 

150 code=UNAUTHORIZED, 

151 echo=self.echo, 

152 ) 

153 outer, _ = self.secctx.protect(inner, request_id=self.request_id) 

154 return outer 

155 

156 

157class ContextUnavailable(error.Error, ValueError): 

158 """Raised when a context is (currently or permanently) unavailable for 

159 protecting or unprotecting a message""" 

160 

161 

162class RequestIdentifiers: 

163 """A container for details that need to be passed along from the 

164 (un)protection of a request to the (un)protection of the response; these 

165 data ensure that the request-response binding process works by passing 

166 around the request's partial IV. 

167 

168 Users of this module should never create or interact with instances, but 

169 just pass them around. 

170 """ 

171 

172 def __init__(self, kid, partial_iv, nonce, can_reuse_nonce, request_code): 

173 self.kid = kid 

174 self.partial_iv = partial_iv 

175 self.nonce = nonce 

176 self.can_reuse_nonce = can_reuse_nonce 

177 self.code_style = CodeStyle.from_request(request_code) 

178 

179 self.request_hash = None 

180 

181 def get_reusable_nonce_and_piv(self): 

182 """Return the nonce and the partial IV if can_reuse_nonce is True, and 

183 set can_reuse_nonce to False.""" 

184 

185 if self.can_reuse_nonce: 

186 self.can_reuse_nonce = False 

187 return (self.nonce, self.partial_iv) 

188 else: 

189 return (None, None) 

190 

191 

192def _xor_bytes(a, b): 

193 assert len(a) == len(b), "XOR needs consistent lengths" 

194 # FIXME is this an efficient thing to do, or should we store everything 

195 # that possibly needs xor'ing as long integers with an associated length? 

196 return bytes(_a ^ _b for (_a, _b) in zip(a, b)) 

197 

198 

199class SymmetricEncryptionAlgorithm(metaclass=abc.ABCMeta): 

200 """A symmetric algorithm 

201 

202 The algorithm's API is the AEAD API with addtional authenticated data: The 

203 algorihm may or may not verify that data. Algorithms that actually do 

204 verify the data are recognized by also being AeadAlgorithm. 

205 """ 

206 

207 value: int 

208 key_bytes: int 

209 tag_bytes: int 

210 iv_bytes: int 

211 

212 @abc.abstractmethod 

213 def encrypt(cls, plaintext, aad, key, iv): 

214 """Return ciphertext + tag for given input data""" 

215 

216 @abc.abstractmethod 

217 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

218 """Reverse encryption. Must raise ProtectionInvalid on any error 

219 stemming from untrusted data.""" 

220 

221 @staticmethod 

222 def _build_encrypt0_structure(protected, external_aad): 

223 assert protected == {}, "Unexpected data in protected bucket" 

224 protected_serialized = b"" # were it into an empty dict, it'd be the cbor dump 

225 enc_structure = ["Encrypt0", protected_serialized, external_aad] 

226 

227 return cbor.dumps(enc_structure) 

228 

229 

230class AeadAlgorithm(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta): 

231 """A symmetric algorithm that provides authentication, including 

232 authentication of additional data.""" 

233 

234 

235class AES_CBC(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta): 

236 """AES in CBC mode using tthe Python cryptography library""" 

237 

238 tag_bytes = 0 

239 iv_bytes = 0 

240 # This introduces padding -- this library doesn't need to care because 

241 # Python does allocation for us, but others may need to rethink their 

242 # buffer allocation strategies. 

243 

244 @classmethod 

245 def _cipher(cls, key, iv): 

246 return ciphers.base.Cipher( 

247 ciphers.algorithms.AES(key), 

248 ciphers.modes.CBC(iv), 

249 ) 

250 

251 @classmethod 

252 def encrypt(cls, plaintext, _aad, key, iv): 

253 # FIXME: Ignoring aad violates https://www.rfc-editor.org/rfc/rfc9459.html#name-implementation-consideratio but is required for Group OSCORE 

254 

255 # Padding according to https://www.rfc-editor.org/rfc/rfc5652#section-6.3 

256 k = cls.key_bytes 

257 assert ( 

258 k < 256 

259 ), "Algorithm with this key size should not have been created in the first plae" 

260 pad_byte = k - (len(plaintext) % k) 

261 pad_bytes = bytes((pad_byte,)) * pad_byte 

262 plaintext += pad_bytes 

263 

264 encryptor = cls._cipher(key, iv).encryptor() 

265 result = encryptor.update(plaintext) 

266 result += encryptor.finalize() 

267 return result 

268 

269 @classmethod 

270 def decrypt(cls, ciphertext_and_tag, _aad, key, iv): 

271 # FIXME: Ignoring aad violates https://www.rfc-editor.org/rfc/rfc9459.html#name-implementation-consideratio but is required for Group OSCORE 

272 

273 k = cls.key_bytes 

274 if ciphertext_and_tag == b"" or len(ciphertext_and_tag) % k != 0: 

275 raise ProtectionInvalid("Message length does not match padding") 

276 

277 decryptor = cls._cipher(key, iv).decryptor() 

278 result = decryptor.update(ciphertext_and_tag) 

279 result += decryptor.finalize() 

280 

281 # Padding according to https://www.rfc-editor.org/rfc/rfc5652#section-6.3 

282 claimed_padding = result[-1] 

283 if claimed_padding == 0 or claimed_padding > k: 

284 raise ProtectionInvalid("Padding does not match key") 

285 if result[-claimed_padding:] != bytes((claimed_padding,)) * claimed_padding: 

286 raise ProtectionInvalid("Padding is inconsistent") 

287 

288 return result[:-claimed_padding] 

289 

290 

291class A128CBC(AES_CBC): 

292 # from RFC9459 

293 value = -65531 

294 key_bytes = 16 # 128-bit key 

295 iv_bytes = 16 # 16-octet nonce 

296 

297 

298class AES_CCM(AeadAlgorithm, metaclass=abc.ABCMeta): 

299 """AES-CCM implemented using the Python cryptography library""" 

300 

301 @classmethod 

302 def encrypt(cls, plaintext, aad, key, iv): 

303 return aead.AESCCM(key, cls.tag_bytes).encrypt(iv, plaintext, aad) 

304 

305 @classmethod 

306 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

307 try: 

308 return aead.AESCCM(key, cls.tag_bytes).decrypt(iv, ciphertext_and_tag, aad) 

309 except cryptography.exceptions.InvalidTag: 

310 raise ProtectionInvalid("Tag invalid") 

311 

312 

313class AES_CCM_16_64_128(AES_CCM): 

314 # from RFC8152 and draft-ietf-core-object-security-0[012] 3.2.1 

315 value = 10 

316 key_bytes = 16 # 128-bit key 

317 tag_bytes = 8 # 64-bit tag 

318 iv_bytes = 13 # 13-byte nonce 

319 

320 

321class AES_CCM_16_64_256(AES_CCM): 

322 # from RFC8152 

323 value = 11 

324 key_bytes = 32 # 256-bit key 

325 tag_bytes = 8 # 64-bit tag 

326 iv_bytes = 13 # 13-byte nonce 

327 

328 

329class AES_CCM_64_64_128(AES_CCM): 

330 # from RFC8152 

331 value = 12 

332 key_bytes = 16 # 128-bit key 

333 tag_bytes = 8 # 64-bit tag 

334 iv_bytes = 7 # 7-byte nonce 

335 

336 

337class AES_CCM_64_64_256(AES_CCM): 

338 # from RFC8152 

339 value = 13 

340 key_bytes = 32 # 256-bit key 

341 tag_bytes = 8 # 64-bit tag 

342 iv_bytes = 7 # 7-byte nonce 

343 

344 

345class AES_CCM_16_128_128(AES_CCM): 

346 # from RFC8152 

347 value = 30 

348 key_bytes = 16 # 128-bit key 

349 tag_bytes = 16 # 128-bit tag 

350 iv_bytes = 13 # 13-byte nonce 

351 

352 

353class AES_CCM_16_128_256(AES_CCM): 

354 # from RFC8152 

355 value = 31 

356 key_bytes = 32 # 256-bit key 

357 tag_bytes = 16 # 128-bit tag 

358 iv_bytes = 13 # 13-byte nonce 

359 

360 

361class AES_CCM_64_128_128(AES_CCM): 

362 # from RFC8152 

363 value = 32 

364 key_bytes = 16 # 128-bit key 

365 tag_bytes = 16 # 128-bit tag 

366 iv_bytes = 7 # 7-byte nonce 

367 

368 

369class AES_CCM_64_128_256(AES_CCM): 

370 # from RFC8152 

371 value = 33 

372 key_bytes = 32 # 256-bit key 

373 tag_bytes = 16 # 128-bit tag 

374 iv_bytes = 7 # 7-byte nonce 

375 

376 

377class AES_GCM(AeadAlgorithm, metaclass=abc.ABCMeta): 

378 """AES-GCM implemented using the Python cryptography library""" 

379 

380 iv_bytes = 12 # 96 bits fixed size of the nonce 

381 

382 @classmethod 

383 def encrypt(cls, plaintext, aad, key, iv): 

384 return aead.AESGCM(key).encrypt(iv, plaintext, aad) 

385 

386 @classmethod 

387 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

388 try: 

389 return aead.AESGCM(key).decrypt(iv, ciphertext_and_tag, aad) 

390 except cryptography.exceptions.InvalidTag: 

391 raise ProtectionInvalid("Tag invalid") 

392 

393 

394class A128GCM(AES_GCM): 

395 # from RFC8152 

396 value = 1 

397 key_bytes = 16 # 128-bit key 

398 tag_bytes = 16 # 128-bit tag 

399 

400 

401class A192GCM(AES_GCM): 

402 # from RFC8152 

403 value = 2 

404 key_bytes = 24 # 192-bit key 

405 tag_bytes = 16 # 128-bit tag 

406 

407 

408class A256GCM(AES_GCM): 

409 # from RFC8152 

410 value = 3 

411 key_bytes = 32 # 256-bit key 

412 tag_bytes = 16 # 128-bit tag 

413 

414 

415class ChaCha20Poly1305(AeadAlgorithm): 

416 # from RFC8152 

417 value = 24 

418 key_bytes = 32 # 256-bit key 

419 tag_bytes = 16 # 128-bit tag 

420 iv_bytes = 12 # 96-bit nonce 

421 

422 @classmethod 

423 def encrypt(cls, plaintext, aad, key, iv): 

424 return aead.ChaCha20Poly1305(key).encrypt(iv, plaintext, aad) 

425 

426 @classmethod 

427 def decrypt(cls, ciphertext_and_tag, aad, key, iv): 

428 try: 

429 return aead.ChaCha20Poly1305(key).decrypt(iv, ciphertext_and_tag, aad) 

430 except cryptography.exceptions.InvalidTag: 

431 raise ProtectionInvalid("Tag invalid") 

432 

433 

434class AlgorithmCountersign(metaclass=abc.ABCMeta): 

435 """A fully parameterized COSE countersign algorithm 

436 

437 An instance is able to provide all the alg_signature, par_countersign and 

438 par_countersign_key parameters taht go into the Group OSCORE algorithms 

439 field. 

440 """ 

441 

442 value: int | str 

443 

444 @abc.abstractmethod 

445 def sign(self, body, external_aad, private_key): 

446 """Return the signature produced by the key when using 

447 CounterSignature0 as describe in draft-ietf-cose-countersign-01""" 

448 

449 @abc.abstractmethod 

450 def verify(self, signature, body, external_aad, public_key): 

451 """Verify a signature in analogy to sign""" 

452 

453 @abc.abstractmethod 

454 def generate_with_ccs(self) -> Tuple[Any, bytes]: 

455 """Return a usable private key along with a CCS describing it""" 

456 

457 @abc.abstractmethod 

458 def public_from_private(self, private_key): 

459 """Given a private key, derive the publishable key""" 

460 

461 @abc.abstractmethod 

462 def from_kccs(self, ccs: bytes) -> Any: 

463 """Given a CCS, extract the public key, or raise a ValueError if the 

464 credential format does not align with the type. 

465 

466 The type is not exactly Any, but whichever type is used by this 

467 algorithm class.""" 

468 

469 @staticmethod 

470 def _build_countersign_structure(body, external_aad): 

471 countersign_structure = [ 

472 "CounterSignature0", 

473 b"", 

474 b"", 

475 external_aad, 

476 body, 

477 ] 

478 tobesigned = cbor.dumps(countersign_structure) 

479 return tobesigned 

480 

481 @abc.abstractproperty 

482 def signature_length(self) -> int: 

483 """The length of a signature using this algorithm""" 

484 

485 @abc.abstractproperty 

486 def curve_number(self) -> int: 

487 """Registered curve number used with this algorithm. 

488 

489 Only used for verification of credentials' details""" 

490 

491 

492class AlgorithmStaticStatic(metaclass=abc.ABCMeta): 

493 @abc.abstractmethod 

494 def staticstatic(self, private_key, public_key): 

495 """Derive a shared static-static secret from a private and a public key""" 

496 

497 

498def _from_kccs_common(ccs: bytes) -> dict: 

499 """Check that the CCS contains a CNF claim that is a COSE Key, and return 

500 that key""" 

501 

502 try: 

503 parsed = cbor.loads(ccs) 

504 except cbor.CBORDecodeError as e: 

505 raise ValueError("CCS not in CBOR format") from e 

506 

507 if ( 

508 not isinstance(parsed, dict) 

509 or CWT_CLAIM_CNF not in parsed 

510 or not isinstance(parsed[CWT_CLAIM_CNF], dict) 

511 or CWT_CNF_COSE_KEY not in parsed[CWT_CLAIM_CNF] 

512 or not isinstance(parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY], dict) 

513 ): 

514 raise ValueError("CCS must contain a COSE Key dict in a CNF") 

515 

516 return parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY] 

517 

518 

519class Ed25519(AlgorithmCountersign): 

520 def sign(self, body, aad, private_key): 

521 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes( 

522 private_key 

523 ) 

524 return private_key.sign(self._build_countersign_structure(body, aad)) 

525 

526 def verify(self, signature, body, aad, public_key): 

527 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key) 

528 try: 

529 public_key.verify(signature, self._build_countersign_structure(body, aad)) 

530 except cryptography.exceptions.InvalidSignature: 

531 raise ProtectionInvalid("Signature mismatch") 

532 

533 def _generate(self): 

534 key = asymmetric.ed25519.Ed25519PrivateKey.generate() 

535 # FIXME: We could avoid handing the easy-to-misuse bytes around if the 

536 # current algorithm interfaces did not insist on passing the 

537 # exchangable representations -- and generally that should be more 

538 # efficient. 

539 return key.private_bytes( 

540 encoding=serialization.Encoding.Raw, 

541 format=serialization.PrivateFormat.Raw, 

542 encryption_algorithm=serialization.NoEncryption(), 

543 ) 

544 

545 def generate_with_ccs(self) -> Tuple[Any, bytes]: 

546 private = self._generate() 

547 public = self.public_from_private(private) 

548 

549 ccs = cbor.dumps( 

550 { 

551 CWT_CLAIM_CNF: { 

552 CWT_CNF_COSE_KEY: { 

553 COSE_KEY_COMMON_KTY: COSE_KTY_OKP, 

554 COSE_KEY_COMMON_ALG: self.value, 

555 COSE_KEY_OKP_CRV: self.curve_number, 

556 COSE_KEY_OKP_X: public, 

557 } 

558 } 

559 } 

560 ) 

561 

562 return (private, ccs) 

563 

564 def public_from_private(self, private_key): 

565 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes( 

566 private_key 

567 ) 

568 public_key = private_key.public_key() 

569 return public_key.public_bytes( 

570 encoding=serialization.Encoding.Raw, 

571 format=serialization.PublicFormat.Raw, 

572 ) 

573 

574 def from_kccs(self, ccs: bytes) -> Any: 

575 # eg. {1: 1, 3: -8, -1: 6, -2: h'77 ... 88'} 

576 cose_key = _from_kccs_common(ccs) 

577 

578 if ( 

579 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_OKP 

580 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value 

581 and cose_key.get(COSE_KEY_OKP_CRV) == self.curve_number 

582 and COSE_KEY_OKP_X in cose_key 

583 ): 

584 return cose_key[COSE_KEY_OKP_X] 

585 else: 

586 raise ValueError("Key type not recognized from CCS key %r" % cose_key) 

587 

588 value = -8 

589 curve_number = 6 

590 

591 signature_length = 64 

592 

593 

594class EcdhSsHkdf256(AlgorithmStaticStatic): 

595 # FIXME: This class uses the Edwards keys as private and public keys, and 

596 # not the converted ones. This will be problematic if pairwise-only 

597 # contexts are to be set up. 

598 

599 value = -27 

600 

601 # FIXME these two will be different when using the Montgomery keys directly 

602 

603 # This one will only be used when establishing and distributing pairwise-only keys 

604 public_from_private = Ed25519.public_from_private 

605 

606 def staticstatic(self, private_key, public_key): 

607 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes( 

608 private_key 

609 ) 

610 private_key = cryptography_additions.sk_to_curve25519(private_key) 

611 

612 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key) 

613 public_key = cryptography_additions.pk_to_curve25519(public_key) 

614 

615 return private_key.exchange(public_key) 

616 

617 

618class ECDSA_SHA256_P256(AlgorithmCountersign, AlgorithmStaticStatic): 

619 # Trying a new construction approach -- should work just as well given 

620 # we're just passing Python objects around 

621 def from_public_parts(self, x: bytes, y: bytes): 

622 """Create a public key from its COSE values""" 

623 return asymmetric.ec.EllipticCurvePublicNumbers( 

624 int.from_bytes(x, "big"), 

625 int.from_bytes(y, "big"), 

626 asymmetric.ec.SECP256R1(), 

627 ).public_key() 

628 

629 def from_kccs(self, ccs: bytes) -> Any: 

630 cose_key = _from_kccs_common(ccs) 

631 

632 if ( 

633 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_EC2 

634 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value 

635 and COSE_KEY_EC2_X in cose_key 

636 and COSE_KEY_EC2_Y in cose_key 

637 ): 

638 return self.from_public_parts( 

639 x=cose_key[COSE_KEY_EC2_X], 

640 y=cose_key[COSE_KEY_EC2_Y], 

641 ) 

642 else: 

643 raise ValueError("Key type not recognized from CCS key %r" % cose_key) 

644 

645 def from_private_parts(self, x: bytes, y: bytes, d: bytes): 

646 public_numbers = self.from_public_parts(x, y).public_numbers() 

647 private_numbers = asymmetric.ec.EllipticCurvePrivateNumbers( 

648 int.from_bytes(d, "big"), public_numbers 

649 ) 

650 return private_numbers.private_key() 

651 

652 def sign(self, body, aad, private_key): 

653 der_signature = private_key.sign( 

654 self._build_countersign_structure(body, aad), 

655 asymmetric.ec.ECDSA(hashes.SHA256()), 

656 ) 

657 (r, s) = decode_dss_signature(der_signature) 

658 

659 return r.to_bytes(32, "big") + s.to_bytes(32, "big") 

660 

661 def verify(self, signature, body, aad, public_key): 

662 r = signature[:32] 

663 s = signature[32:] 

664 r = int.from_bytes(r, "big") 

665 s = int.from_bytes(s, "big") 

666 der_signature = encode_dss_signature(r, s) 

667 try: 

668 public_key.verify( 

669 der_signature, 

670 self._build_countersign_structure(body, aad), 

671 asymmetric.ec.ECDSA(hashes.SHA256()), 

672 ) 

673 except cryptography.exceptions.InvalidSignature: 

674 raise ProtectionInvalid("Signature mismatch") 

675 

676 def _generate(self): 

677 return asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1()) 

678 

679 def generate_with_ccs(self) -> Tuple[Any, bytes]: 

680 private = self._generate() 

681 public = self.public_from_private(private) 

682 # FIXME: Deduplicate with edhoc.py 

683 x = public.public_numbers().x.to_bytes(32, "big") 

684 y = public.public_numbers().y.to_bytes(32, "big") 

685 

686 ccs = cbor.dumps( 

687 { 

688 CWT_CLAIM_CNF: { 

689 CWT_CNF_COSE_KEY: { 

690 COSE_KEY_COMMON_KTY: COSE_KTY_EC2, 

691 COSE_KEY_COMMON_ALG: self.value, 

692 COSE_KEY_EC2_X: x, 

693 COSE_KEY_EC2_Y: y, 

694 } 

695 } 

696 } 

697 ) 

698 

699 return (private, ccs) 

700 

701 def public_from_private(self, private_key): 

702 return private_key.public_key() 

703 

704 def staticstatic(self, private_key, public_key): 

705 return private_key.exchange(asymmetric.ec.ECDH(), public_key) 

706 

707 value = -7 # FIXME: when used as a static-static algorithm, does this become -27? see shepherd review. 

708 curve_number = 1 

709 

710 signature_length = 64 

711 

712 

713algorithms = { 

714 "AES-CCM-16-64-128": AES_CCM_16_64_128(), 

715 "AES-CCM-16-64-256": AES_CCM_16_64_256(), 

716 "AES-CCM-64-64-128": AES_CCM_64_64_128(), 

717 "AES-CCM-64-64-256": AES_CCM_64_64_256(), 

718 "AES-CCM-16-128-128": AES_CCM_16_128_128(), 

719 "AES-CCM-16-128-256": AES_CCM_16_128_256(), 

720 "AES-CCM-64-128-128": AES_CCM_64_128_128(), 

721 "AES-CCM-64-128-256": AES_CCM_64_128_256(), 

722 "ChaCha20/Poly1305": ChaCha20Poly1305(), 

723 "A128GCM": A128GCM(), 

724 "A192GCM": A192GCM(), 

725 "A256GCM": A256GCM(), 

726} 

727 

728# algorithms with full parameter set 

729algorithms_countersign = { 

730 # maybe needs a different name... 

731 "EdDSA on Ed25519": Ed25519(), 

732 "ECDSA w/ SHA-256 on P-256": ECDSA_SHA256_P256(), 

733} 

734 

735algorithms_staticstatic = { 

736 "ECDH-SS + HKDF-256": EcdhSsHkdf256(), 

737} 

738 

739DEFAULT_ALGORITHM = "AES-CCM-16-64-128" 

740 

741_hash_backend = cryptography.hazmat.backends.default_backend() 

742hashfunctions = { 

743 "sha256": hashes.SHA256(), 

744 "sha384": hashes.SHA384(), 

745 "sha512": hashes.SHA512(), 

746} 

747 

748DEFAULT_HASHFUNCTION = "sha256" 

749 

750DEFAULT_WINDOWSIZE = 32 

751 

752 

753class BaseSecurityContext: 

754 # Deprecated marker for whether the class uses the 

755 # ContextWhereExternalAadIsGroup mixin; see documentation there. 

756 external_aad_is_group = False 

757 

758 # Authentication information carried with this security context; managed 

759 # externally by whatever creates the security context. 

760 authenticated_claims: List[str] = [] 

761 

762 #: AEAD algorithm. This may be None if it is not set in an OSCORE group context. 

763 alg_aead: Optional[AeadAlgorithm] 

764 

765 @property 

766 def algorithm(self): 

767 warnings.warn( 

768 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2 

769 ) 

770 return self.alg_aead 

771 

772 @algorithm.setter 

773 def algorithm(self, value): 

774 warnings.warn( 

775 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2 

776 ) 

777 self.alg_aead = value 

778 

779 hashfun: hashes.HashAlgorithm 

780 

781 def _construct_nonce(self, partial_iv_short, piv_generator_id): 

782 pad_piv = b"\0" * (5 - len(partial_iv_short)) 

783 

784 s = bytes([len(piv_generator_id)]) 

785 pad_id = b"\0" * (self.alg_aead.iv_bytes - 6 - len(piv_generator_id)) 

786 

787 components = s + pad_id + piv_generator_id + pad_piv + partial_iv_short 

788 

789 # "least significant bits of the Common IV" 

790 used_common_iv = self.common_iv[len(self.common_iv) - len(components) :] 

791 nonce = _xor_bytes(used_common_iv, components) 

792 

793 return nonce 

794 

795 def _extract_external_aad( 

796 self, message, request_id, local_is_sender: bool 

797 ) -> bytes: 

798 """Build the serialized external AAD from information in the message 

799 and the request_id. 

800 

801 Information about whether the local context is the sender of the 

802 message is only relevant to group contexts, where it influences whose 

803 authentication credentials are placed in the AAD. 

804 """ 

805 # If any option were actually Class I, it would be something like 

806 # 

807 # the_options = pick some of(message) 

808 # class_i_options = Message(the_options).opt.encode() 

809 

810 oscore_version = 1 

811 class_i_options = b"" 

812 if request_id.request_hash is not None: 

813 class_i_options = Message(request_hash=request_id.request_hash).opt.encode() 

814 

815 algorithms: List[int | str | None] = [ 

816 None if self.alg_aead is None else self.alg_aead.value 

817 ] 

818 if isinstance(self, ContextWhereExternalAadIsGroup): 

819 algorithms.append( 

820 None if self.alg_group_enc is None else self.alg_group_enc.value 

821 ) 

822 algorithms.append( 

823 None if self.alg_signature is None else self.alg_signature.value 

824 ) 

825 algorithms.append( 

826 None 

827 if self.alg_pairwise_key_agreement is None 

828 else self.alg_pairwise_key_agreement.value 

829 ) 

830 

831 external_aad = [ 

832 oscore_version, 

833 algorithms, 

834 request_id.kid, 

835 request_id.partial_iv, 

836 class_i_options, 

837 ] 

838 

839 if isinstance(self, ContextWhereExternalAadIsGroup): 

840 # FIXME: We may need to carry this over in the request_id when 

841 # observation span group rekeyings 

842 external_aad.append(self.id_context) 

843 

844 assert message.opt.oscore is not None, "Double OSCORE" 

845 external_aad.append(message.opt.oscore) 

846 

847 if local_is_sender: 

848 external_aad.append(self.sender_auth_cred) 

849 else: 

850 external_aad.append(self.recipient_auth_cred) 

851 external_aad.append(self.group_manager_cred) 

852 

853 return cbor.dumps(external_aad) 

854 

855 

856class ContextWhereExternalAadIsGroup(BaseSecurityContext): 

857 """The protection and unprotection functions will use the Group OSCORE AADs 

858 rather than the regular OSCORE AADs iff a context uses this mixin. (Ie. 

859 alg_group_enc etc are added to the algorithms, and request_kid_context, 

860 OSCORE_option, sender_auth_cred and gm_cred are added). 

861 

862 This does not necessarily match the is_signing property (as pairwise 

863 contexts use this but don't sign), and is distinct from the added OSCORE 

864 option in the AAD (as that's only applicable for the external AAD as 

865 extracted for signing and signature verification purposes).""" 

866 

867 id_context: bytes 

868 

869 external_aad_is_group = True 

870 

871 alg_group_enc: Optional[AeadAlgorithm] 

872 alg_signature: Optional[AlgorithmCountersign] 

873 # This is also of type AlgorithmCountersign because the staticstatic 

874 # function is sitting on the same type. 

875 alg_pairwise_key_agreement: Optional[AlgorithmCountersign] 

876 

877 sender_auth_cred: bytes 

878 recipient_auth_cred: bytes 

879 group_manager_cred: bytes 

880 

881 

882# FIXME pull interface components from SecurityContext up here 

883class CanProtect(BaseSecurityContext, metaclass=abc.ABCMeta): 

884 # The protection function will add a signature acccording to the context's 

885 # alg_signature attribute if this is true 

886 is_signing = False 

887 

888 # Send the KID when protecting responses 

889 # 

890 # Once group pairwise mode is implemented, this will need to become a 

891 # parameter to protect(), which is stored at the point where the incoming 

892 # context is turned into an outgoing context. (Currently, such a mechanism 

893 # isn't there yet, and oscore_wrapper protects responses with the very same 

894 # context they came in on). 

895 responses_send_kid = False 

896 

897 @staticmethod 

898 def _compress(protected, unprotected, ciphertext): 

899 """Pack the untagged COSE_Encrypt0 object described by the *args 

900 into two bytestrings suitable for the Object-Security option and the 

901 message body""" 

902 

903 if protected: 

904 raise RuntimeError( 

905 "Protection produced a message that has uncompressable fields." 

906 ) 

907 

908 piv = unprotected.pop(COSE_PIV, b"") 

909 if len(piv) > COMPRESSION_BITS_N: 

910 raise ValueError("Can't encode overly long partial IV") 

911 

912 firstbyte = len(piv) 

913 if COSE_KID in unprotected: 

914 firstbyte |= COMPRESSION_BIT_K 

915 kid_data = unprotected.pop(COSE_KID) 

916 else: 

917 kid_data = b"" 

918 

919 if COSE_KID_CONTEXT in unprotected: 

920 firstbyte |= COMPRESSION_BIT_H 

921 kid_context = unprotected.pop(COSE_KID_CONTEXT) 

922 s = len(kid_context) 

923 if s > 255: 

924 raise ValueError("KID Context too long") 

925 s_kid_context = bytes((s,)) + kid_context 

926 else: 

927 s_kid_context = b"" 

928 

929 if COSE_COUNTERSIGNATURE0 in unprotected: 

930 firstbyte |= COMPRESSION_BIT_GROUP 

931 

932 unprotected.pop(COSE_COUNTERSIGNATURE0) 

933 

934 # ciphertext will eventually also get the countersignature, but 

935 # that happens later when the option is already processed. 

936 

937 if unprotected: 

938 raise RuntimeError( 

939 "Protection produced a message that has uncompressable fields." 

940 ) 

941 

942 if firstbyte: 

943 option = bytes([firstbyte]) + piv + s_kid_context + kid_data 

944 else: 

945 option = b"" 

946 

947 return (option, ciphertext) 

948 

949 def protect(self, message, request_id=None, *, kid_context=True): 

950 """Given a plain CoAP message, create a protected message that contains 

951 message's options in the inner or outer CoAP message as described in 

952 OSCOAP. 

953 

954 If the message is a response to a previous message, the additional data 

955 from unprotecting the request are passed in as request_id. When 

956 request data is present, its partial IV is reused if possible. The 

957 security context's ID context is encoded in the resulting message 

958 unless kid_context is explicitly set to a False; other values for the 

959 kid_context can be passed in as byte string in the same parameter. 

960 """ 

961 

962 assert ( 

963 (request_id is None) == message.code.is_request() 

964 ), "Requestishness of code to protect does not match presence of request ID" 

965 

966 outer_message, plaintext = self._split_message(message, request_id) 

967 

968 protected = {} 

969 nonce = None 

970 unprotected = {} 

971 if request_id is not None: 

972 nonce, partial_iv_short = request_id.get_reusable_nonce_and_piv() 

973 if nonce is not None: 

974 partial_iv_generated_by = request_id.kid 

975 

976 if nonce is None: 

977 nonce, partial_iv_short = self._build_new_nonce() 

978 partial_iv_generated_by = self.sender_id 

979 

980 unprotected[COSE_PIV] = partial_iv_short 

981 

982 if message.code.is_request(): 

983 unprotected[COSE_KID] = self.sender_id 

984 

985 request_id = RequestIdentifiers( 

986 self.sender_id, 

987 partial_iv_short, 

988 nonce, 

989 can_reuse_nonce=None, 

990 request_code=outer_message.code, 

991 ) 

992 

993 if kid_context is True: 

994 if self.id_context is not None: 

995 unprotected[COSE_KID_CONTEXT] = self.id_context 

996 elif kid_context is not False: 

997 unprotected[COSE_KID_CONTEXT] = kid_context 

998 else: 

999 if self.responses_send_kid: 

1000 unprotected[COSE_KID] = self.sender_id 

1001 

1002 # Putting in a dummy value as the signature calculation will already need some of the compression result 

1003 if self.is_signing: 

1004 unprotected[COSE_COUNTERSIGNATURE0] = b"" 

1005 # FIXME: Running this twice quite needlessly (just to get the oscore option for sending) 

1006 option_data, _ = self._compress(protected, unprotected, b"") 

1007 

1008 outer_message.opt.oscore = option_data 

1009 

1010 external_aad = self._extract_external_aad( 

1011 outer_message, request_id, local_is_sender=True 

1012 ) 

1013 

1014 aad = self.alg_aead._build_encrypt0_structure(protected, external_aad) 

1015 

1016 key = self._get_sender_key(outer_message, external_aad, plaintext, request_id) 

1017 

1018 ciphertext = self.alg_aead.encrypt(plaintext, aad, key, nonce) 

1019 

1020 _, payload = self._compress(protected, unprotected, ciphertext) 

1021 

1022 if self.is_signing: 

1023 signature = self.alg_signature.sign(payload, external_aad, self.private_key) 

1024 keystream = self._kdf_for_keystreams( 

1025 partial_iv_generated_by, 

1026 partial_iv_short, 

1027 self.signature_encryption_key, 

1028 self.sender_id, 

1029 INFO_TYPE_KEYSTREAM_REQUEST 

1030 if message.code.is_request() 

1031 else INFO_TYPE_KEYSTREAM_RESPONSE, 

1032 ) 

1033 encrypted_signature = _xor_bytes(signature, keystream) 

1034 payload += encrypted_signature 

1035 outer_message.payload = payload 

1036 

1037 # FIXME go through options section 

1038 

1039 # the request_id in the second argument should be discarded by the 

1040 # caller when protecting a response -- is that reason enough for an 

1041 # `if` and returning None? 

1042 return outer_message, request_id 

1043 

1044 def _get_sender_key(self, outer_message, aad, plaintext, request_id): 

1045 """Customization hook of the protect function 

1046 

1047 While most security contexts have a fixed sender key, deterministic 

1048 requests need to shake up a few things. They need to modify the outer 

1049 message, as well as the request_id as it will later be used to 

1050 unprotect the response.""" 

1051 return self.sender_key 

1052 

1053 def _split_message(self, message, request_id): 

1054 """Given a protected message, return the outer message that contains 

1055 all Class I and Class U options (but without payload or Object-Security 

1056 option), and the encoded inner message that contains all Class E 

1057 options and the payload. 

1058 

1059 This leaves the messages' remotes unset.""" 

1060 

1061 if message.code.is_request(): 

1062 outer_host = message.opt.uri_host 

1063 proxy_uri = message.opt.proxy_uri 

1064 

1065 inner_message = message.copy( 

1066 uri_host=None, 

1067 uri_port=None, 

1068 proxy_uri=None, 

1069 proxy_scheme=None, 

1070 ) 

1071 inner_message.remote = None 

1072 

1073 if proxy_uri is not None: 

1074 # Use set_request_uri to split up the proxy URI into its 

1075 # components; extract, preserve and clear them. 

1076 inner_message.set_request_uri(proxy_uri, set_uri_host=False) 

1077 if inner_message.opt.proxy_uri is not None: 

1078 raise ValueError("Can not split Proxy-URI into options") 

1079 outer_uri = inner_message.remote.uri_base 

1080 inner_message.remote = None 

1081 inner_message.opt.proxy_scheme = None 

1082 

1083 if message.opt.observe is None: 

1084 outer_code = POST 

1085 else: 

1086 outer_code = FETCH 

1087 else: 

1088 outer_host = None 

1089 proxy_uri = None 

1090 

1091 inner_message = message.copy() 

1092 

1093 outer_code = request_id.code_style.response 

1094 

1095 # no max-age because these are always successsful responses 

1096 outer_message = Message( 

1097 code=outer_code, 

1098 uri_host=outer_host, 

1099 observe=None if message.code.is_response() else message.opt.observe, 

1100 ) 

1101 if proxy_uri is not None: 

1102 outer_message.set_request_uri(outer_uri) 

1103 

1104 plaintext = bytes([inner_message.code]) + inner_message.opt.encode() 

1105 if inner_message.payload: 

1106 plaintext += bytes([0xFF]) 

1107 plaintext += inner_message.payload 

1108 

1109 return outer_message, plaintext 

1110 

1111 def _build_new_nonce(self): 

1112 """This implements generation of a new nonce, assembled as per Figure 5 

1113 of draft-ietf-core-object-security-06. Returns the shortened partial IV 

1114 as well.""" 

1115 seqno = self.new_sequence_number() 

1116 

1117 partial_iv = seqno.to_bytes(5, "big") 

1118 

1119 return ( 

1120 self._construct_nonce(partial_iv, self.sender_id), 

1121 partial_iv.lstrip(b"\0") or b"\0", 

1122 ) 

1123 

1124 # sequence number handling 

1125 

1126 def new_sequence_number(self): 

1127 """Return a new sequence number; the implementation is responsible for 

1128 never returning the same value twice in a given security context. 

1129 

1130 May raise ContextUnavailable.""" 

1131 retval = self.sender_sequence_number 

1132 if retval >= MAX_SEQNO: 

1133 raise ContextUnavailable("Sequence number too large, context is exhausted.") 

1134 self.sender_sequence_number += 1 

1135 self.post_seqnoincrease() 

1136 return retval 

1137 

1138 # implementation defined 

1139 

1140 @abc.abstractmethod 

1141 def post_seqnoincrease(self): 

1142 """Ensure that sender_sequence_number is stored""" 

1143 raise 

1144 

1145 def context_from_response(self, unprotected_bag) -> CanUnprotect: 

1146 """When receiving a response to a request protected with this security 

1147 context, pick the security context with which to unprotect the response 

1148 given the unprotected information from the Object-Security option. 

1149 

1150 This allow picking the right security context in a group response, and 

1151 helps getting a new short-lived context for B.2 mode. The default 

1152 behaivor is returning self. 

1153 """ 

1154 

1155 # FIXME justify by moving into a mixin for CanProtectAndUnprotect 

1156 return self # type: ignore 

1157 

1158 

1159class CanUnprotect(BaseSecurityContext): 

1160 def unprotect(self, protected_message, request_id=None): 

1161 assert ( 

1162 (request_id is not None) == protected_message.code.is_response() 

1163 ), "Requestishness of code to unprotect does not match presence of request ID" 

1164 is_response = protected_message.code.is_response() 

1165 

1166 # Set to a raisable exception on replay check failures; it will be 

1167 # raised, but the package may still be processed in the course of Echo handling. 

1168 replay_error = None 

1169 

1170 protected_serialized, protected, unprotected, ciphertext = ( 

1171 self._extract_encrypted0(protected_message) 

1172 ) 

1173 

1174 if protected: 

1175 raise ProtectionInvalid("The protected field is not empty") 

1176 

1177 # FIXME check for duplicate keys in protected 

1178 

1179 if unprotected.pop(COSE_KID_CONTEXT, self.id_context) != self.id_context: 

1180 # FIXME is this necessary? 

1181 raise ProtectionInvalid("Sender ID context does not match") 

1182 

1183 if unprotected.pop(COSE_KID, self.recipient_id) != self.recipient_id: 

1184 # for most cases, this is caught by the session ID dispatch, but in 

1185 # responses (where explicit sender IDs are atypical), this is a 

1186 # valid check 

1187 raise ProtectionInvalid("Sender ID does not match") 

1188 

1189 if COSE_PIV not in unprotected: 

1190 if not is_response: 

1191 raise ProtectionInvalid("No sequence number provided in request") 

1192 

1193 nonce = request_id.nonce 

1194 seqno = None # sentinel for not striking out anyting 

1195 partial_iv_short = request_id.partial_iv 

1196 partial_iv_generated_by = request_id.kid 

1197 else: 

1198 partial_iv_short = unprotected.pop(COSE_PIV) 

1199 partial_iv_generated_by = self.recipient_id 

1200 

1201 nonce = self._construct_nonce(partial_iv_short, self.recipient_id) 

1202 

1203 seqno = int.from_bytes(partial_iv_short, "big") 

1204 

1205 if not is_response: 

1206 if not self.recipient_replay_window.is_initialized(): 

1207 replay_error = ReplayError("Sequence number check unavailable") 

1208 elif not self.recipient_replay_window.is_valid(seqno): 

1209 replay_error = ReplayError("Sequence number was re-used") 

1210 

1211 if replay_error is not None and self.echo_recovery is None: 

1212 # Don't even try decoding if there is no reason to 

1213 raise replay_error 

1214 

1215 request_id = RequestIdentifiers( 

1216 self.recipient_id, 

1217 partial_iv_short, 

1218 nonce, 

1219 can_reuse_nonce=replay_error is None, 

1220 request_code=protected_message.code, 

1221 ) 

1222 

1223 if unprotected.pop(COSE_COUNTERSIGNATURE0, None) is not None: 

1224 try: 

1225 alg_signature = self.alg_signature 

1226 except NameError: 

1227 raise DecodeError( 

1228 "Group messages can not be decoded with this non-group context" 

1229 ) 

1230 

1231 siglen = alg_signature.signature_length 

1232 if len(ciphertext) < siglen: 

1233 raise DecodeError("Message too short for signature") 

1234 encrypted_signature = ciphertext[-siglen:] 

1235 

1236 keystream = self._kdf_for_keystreams( 

1237 partial_iv_generated_by, 

1238 partial_iv_short, 

1239 self.signature_encryption_key, 

1240 self.recipient_id, 

1241 INFO_TYPE_KEYSTREAM_REQUEST 

1242 if protected_message.code.is_request() 

1243 else INFO_TYPE_KEYSTREAM_RESPONSE, 

1244 ) 

1245 signature = _xor_bytes(encrypted_signature, keystream) 

1246 

1247 ciphertext = ciphertext[:-siglen] 

1248 else: 

1249 signature = None 

1250 

1251 if unprotected: 

1252 raise DecodeError("Unsupported unprotected option") 

1253 

1254 if ( 

1255 len(ciphertext) < self.alg_aead.tag_bytes + 1 

1256 ): # +1 assures access to plaintext[0] (the code) 

1257 raise ProtectionInvalid("Ciphertext too short") 

1258 

1259 external_aad = self._extract_external_aad( 

1260 protected_message, request_id, local_is_sender=False 

1261 ) 

1262 enc_structure = ["Encrypt0", protected_serialized, external_aad] 

1263 aad = cbor.dumps(enc_structure) 

1264 

1265 key = self._get_recipient_key(protected_message) 

1266 

1267 plaintext = self.alg_aead.decrypt(ciphertext, aad, key, nonce) 

1268 

1269 self._post_decrypt_checks( 

1270 external_aad, plaintext, protected_message, request_id 

1271 ) 

1272 

1273 if not is_response and seqno is not None and replay_error is None: 

1274 self.recipient_replay_window.strike_out(seqno) 

1275 

1276 if signature is not None: 

1277 # Only doing the expensive signature validation once the cheaper decyrption passed 

1278 alg_signature.verify( 

1279 signature, ciphertext, external_aad, self.recipient_public_key 

1280 ) 

1281 

1282 # FIXME add options from unprotected 

1283 

1284 unprotected_message = Message(code=plaintext[0]) 

1285 unprotected_message.payload = unprotected_message.opt.decode(plaintext[1:]) 

1286 

1287 try_initialize = ( 

1288 not self.recipient_replay_window.is_initialized() 

1289 and self.echo_recovery is not None 

1290 ) 

1291 if try_initialize: 

1292 if protected_message.code.is_request(): 

1293 # Either accept into replay window and clear replay error, or raise 

1294 # something that can turn into a 4.01,Echo response 

1295 if unprotected_message.opt.echo == self.echo_recovery: 

1296 self.recipient_replay_window.initialize_from_freshlyseen(seqno) 

1297 replay_error = None 

1298 else: 

1299 raise ReplayErrorWithEcho( 

1300 secctx=self, request_id=request_id, echo=self.echo_recovery 

1301 ) 

1302 else: 

1303 # We can initialize the replay window from a response as well. 

1304 # The response is guaranteed fresh as it was AEAD-decoded to 

1305 # match a request sent by this process. 

1306 # 

1307 # This is rare, as it only works when the server uses an own 

1308 # sequence number, eg. when sending a notification or when 

1309 # acting again on a retransmitted safe request whose response 

1310 # it did not cache. 

1311 # 

1312 # Nothing bad happens if we can't make progress -- we just 

1313 # don't initialize the replay window that wouldn't have been 

1314 # checked for a response anyway. 

1315 if seqno is not None: 

1316 self.recipient_replay_window.initialize_from_freshlyseen(seqno) 

1317 

1318 if replay_error is not None: 

1319 raise replay_error 

1320 

1321 if unprotected_message.code.is_request(): 

1322 if protected_message.opt.observe != 0: 

1323 unprotected_message.opt.observe = None 

1324 else: 

1325 if protected_message.opt.observe is not None: 

1326 # -1 ensures that they sort correctly in later reordering 

1327 # detection. Note that neither -1 nor high (>3 byte) sequence 

1328 # numbers can be serialized in the Observe option, but they are 

1329 # in this implementation accepted for passing around. 

1330 unprotected_message.opt.observe = -1 if seqno is None else seqno 

1331 

1332 return unprotected_message, request_id 

1333 

1334 def _get_recipient_key(self, protected_message): 

1335 """Customization hook of the unprotect function 

1336 

1337 While most security contexts have a fixed recipient key, deterministic 

1338 requests build it on demand.""" 

1339 return self.recipient_key 

1340 

1341 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id): 

1342 """Customization hook of the unprotect function after decryption 

1343 

1344 While most security contexts are good with the default checks, 

1345 deterministic requests need to perform additional checks while AAD and 

1346 plaintext information is still available, and modify the request_id for 

1347 the later protection step of the response.""" 

1348 

1349 @staticmethod 

1350 def _uncompress(option_data, payload): 

1351 if option_data == b"": 

1352 firstbyte = 0 

1353 else: 

1354 firstbyte = option_data[0] 

1355 tail = option_data[1:] 

1356 

1357 unprotected = {} 

1358 

1359 if firstbyte & COMPRESSION_BITS_RESERVED: 

1360 raise DecodeError("Protected data uses reserved fields") 

1361 

1362 pivsz = firstbyte & COMPRESSION_BITS_N 

1363 if pivsz: 

1364 if len(tail) < pivsz: 

1365 raise DecodeError("Partial IV announced but not present") 

1366 unprotected[COSE_PIV] = tail[:pivsz] 

1367 tail = tail[pivsz:] 

1368 

1369 if firstbyte & COMPRESSION_BIT_H: 

1370 # kid context hint 

1371 s = tail[0] 

1372 if len(tail) - 1 < s: 

1373 raise DecodeError("Context hint announced but not present") 

1374 tail = tail[1:] 

1375 unprotected[COSE_KID_CONTEXT] = tail[:s] 

1376 tail = tail[s:] 

1377 

1378 if firstbyte & COMPRESSION_BIT_K: 

1379 kid = tail 

1380 unprotected[COSE_KID] = kid 

1381 

1382 if firstbyte & COMPRESSION_BIT_GROUP: 

1383 # Not really; As this is (also) used early on (before the KID 

1384 # context is even known, because it's just getting extracted), this 

1385 # is returning an incomplete value here and leaves it to the later 

1386 # processing to strip the right number of bytes from the ciphertext 

1387 unprotected[COSE_COUNTERSIGNATURE0] = PRESENT_BUT_NO_VALUE_YET 

1388 

1389 return b"", {}, unprotected, payload 

1390 

1391 @classmethod 

1392 def _extract_encrypted0(cls, message): 

1393 if message.opt.oscore is None: 

1394 raise NotAProtectedMessage("No Object-Security option present", message) 

1395 

1396 protected_serialized, protected, unprotected, ciphertext = cls._uncompress( 

1397 message.opt.oscore, message.payload 

1398 ) 

1399 return protected_serialized, protected, unprotected, ciphertext 

1400 

1401 # implementation defined 

1402 

1403 def context_for_response(self) -> CanProtect: 

1404 """After processing a request with this context, with which security 

1405 context should an outgoing response be protected? By default, it's the 

1406 same context.""" 

1407 # FIXME: Is there any way in which the handler may want to influence 

1408 # the decision taken here? Or would, then, the handler just call a more 

1409 # elaborate but similar function when setting the response's remote 

1410 # already? 

1411 

1412 # FIXME justify by moving into a mixin for CanProtectAndUnprotect 

1413 return self # type: ignore 

1414 

1415 

1416class SecurityContextUtils(BaseSecurityContext): 

1417 def _kdf(self, salt, ikm, role_id, out_type): 

1418 """The HKDF as used to derive sender and recipient key and IV in 

1419 RFC8613 Section 3.2.1, and analogously the Group Encryption Key of oscore-groupcomm. 

1420 """ 

1421 if out_type == "Key": 

1422 out_bytes = self.alg_aead.key_bytes 

1423 elif out_type == "IV": 

1424 out_bytes = max( 

1425 ( 

1426 a.iv_bytes 

1427 for a in [self.alg_aead, getattr(self, "alg_group_enc", None)] 

1428 if a is not None 

1429 ) 

1430 ) 

1431 elif out_type == "SEKey": 

1432 # "While the obtained Signature Encryption Key is never used with 

1433 # the Group Encryption Algorithm, its length was chosen to obtain a 

1434 # matching level of security." 

1435 out_bytes = self.alg_group_enc.key_bytes 

1436 else: 

1437 raise ValueError("Output type not recognized") 

1438 

1439 info = [ 

1440 role_id, 

1441 self.id_context, 

1442 self.alg_aead.value, 

1443 out_type, 

1444 out_bytes, 

1445 ] 

1446 return self._kdf_lowlevel(salt, ikm, info, out_bytes) 

1447 

1448 def _kdf_for_keystreams(self, piv_generated_by, salt, ikm, role_id, out_type): 

1449 """The HKDF as used to derive the keystreams of oscore-groupcomm.""" 

1450 

1451 out_bytes = self.alg_signature.signature_length 

1452 

1453 assert out_type in ( 

1454 INFO_TYPE_KEYSTREAM_REQUEST, 

1455 INFO_TYPE_KEYSTREAM_RESPONSE, 

1456 ), "Output type not recognized" 

1457 

1458 info = [ 

1459 piv_generated_by, 

1460 self.id_context, 

1461 out_type, 

1462 out_bytes, 

1463 ] 

1464 return self._kdf_lowlevel(salt, ikm, info, out_bytes) 

1465 

1466 def _kdf_lowlevel(self, salt: bytes, ikm: bytes, info: list, l: int) -> bytes: # noqa: E741 (signature follows RFC definition) 

1467 """The HKDF function as used in RFC8613 and oscore-groupcomm (notated 

1468 there as ``something = HKDF(...)`` 

1469 

1470 Note that `info` typically contains `L` at some point. 

1471 

1472 When `info` takes the conventional structure of pid, id_context, 

1473 ald_aead, type, L], it may make sense to extend the `_kdf` function to 

1474 support that case, or `_kdf_for_keystreams` for a different structure, as 

1475 they are the more high-level tools.""" 

1476 hkdf = HKDF( 

1477 algorithm=self.hashfun, 

1478 length=l, 

1479 salt=salt, 

1480 info=cbor.dumps(info), 

1481 backend=_hash_backend, 

1482 ) 

1483 expanded = hkdf.derive(ikm) 

1484 return expanded 

1485 

1486 def derive_keys(self, master_salt, master_secret): 

1487 """Populate sender_key, recipient_key and common_iv from the algorithm, 

1488 hash function and id_context already configured beforehand, and from 

1489 the passed salt and secret.""" 

1490 

1491 self.sender_key = self._kdf(master_salt, master_secret, self.sender_id, "Key") 

1492 self.recipient_key = self._kdf( 

1493 master_salt, master_secret, self.recipient_id, "Key" 

1494 ) 

1495 

1496 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV") 

1497 

1498 # really more of the Credentials interface 

1499 

1500 def get_oscore_context_for(self, unprotected): 

1501 """Return a sutiable context (most easily self) for an incoming request 

1502 if its unprotected data (COSE_KID, COSE_KID_CONTEXT) fit its 

1503 description. If it doesn't match, it returns None. 

1504 

1505 The default implementation just strictly checks for whether kid and any 

1506 kid context match (not matching if a local KID context is set but none 

1507 is given in the request); modes like Group OSCORE can spin up aspect 

1508 objects here. 

1509 """ 

1510 if ( 

1511 unprotected.get(COSE_KID, None) == self.recipient_id 

1512 and unprotected.get(COSE_KID_CONTEXT, None) == self.id_context 

1513 ): 

1514 return self 

1515 

1516 

1517class ReplayWindow: 

1518 """A regular replay window of a fixed size. 

1519 

1520 It is implemented as an index and a bitfield (represented by an integer) 

1521 whose least significant bit represents the seqyence number of the index, 

1522 and a 1 indicates that a number was seen. No shenanigans around implicit 

1523 leading ones (think floating point normalization) happen. 

1524 

1525 >>> w = ReplayWindow(32, lambda: None) 

1526 >>> w.initialize_empty() 

1527 >>> w.strike_out(5) 

1528 >>> w.is_valid(3) 

1529 True 

1530 >>> w.is_valid(5) 

1531 False 

1532 >>> w.strike_out(0) 

1533 >>> w.strike_out(1) 

1534 >>> w.strike_out(2) 

1535 >>> w.is_valid(1) 

1536 False 

1537 

1538 Jumping ahead by the window size invalidates older numbers: 

1539 

1540 >>> w.is_valid(4) 

1541 True 

1542 >>> w.strike_out(35) 

1543 >>> w.is_valid(4) 

1544 True 

1545 >>> w.strike_out(36) 

1546 >>> w.is_valid(4) 

1547 False 

1548 

1549 Usage safety 

1550 ------------ 

1551 

1552 For every key, the replay window can only be initielized empty once. On 

1553 later uses, it needs to be persisted by storing the output of 

1554 self.persist() somewhere and loaded from that persisted data. 

1555 

1556 It is acceptable to store persistance data in the strike_out_callback, but 

1557 that must then ensure that the data is written (flushed to a file or 

1558 committed to a database), but that is usually inefficient. 

1559 

1560 Stability 

1561 --------- 

1562 

1563 This class is not considered for stabilization yet and an implementation 

1564 detail of the SecurityContext implementation(s). 

1565 """ 

1566 

1567 _index = None 

1568 """Sequence number represented by the least significant bit of _bitfield""" 

1569 _bitfield = None 

1570 """Integer interpreted as a bitfield, self._size wide. A digit 1 at any bit 

1571 indicates that the bit's index (its power of 2) plus self._index was 

1572 already seen.""" 

1573 

1574 def __init__(self, size, strike_out_callback): 

1575 self._size = size 

1576 self.strike_out_callback = strike_out_callback 

1577 

1578 def is_initialized(self): 

1579 return self._index is not None 

1580 

1581 def initialize_empty(self): 

1582 self._index = 0 

1583 self._bitfield = 0 

1584 

1585 def initialize_from_persisted(self, persisted): 

1586 self._index = persisted["index"] 

1587 self._bitfield = persisted["bitfield"] 

1588 

1589 def initialize_from_freshlyseen(self, seen): 

1590 """Initialize the replay window with a particular value that is just 

1591 being observed in a fresh (ie. generated by the peer later than any 

1592 messages processed before state was lost here) message. This marks the 

1593 seen sequence number and all preceding it as invalid, and and all later 

1594 ones as valid.""" 

1595 self._index = seen 

1596 self._bitfield = 1 

1597 

1598 def is_valid(self, number): 

1599 if number < self._index: 

1600 return False 

1601 if number >= self._index + self._size: 

1602 return True 

1603 return (self._bitfield >> (number - self._index)) & 1 == 0 

1604 

1605 def strike_out(self, number): 

1606 if not self.is_valid(number): 

1607 raise ValueError( 

1608 "Sequence number is not valid any more and " 

1609 "thus can't be removed from the window" 

1610 ) 

1611 overshoot = number - (self._index + self._size - 1) 

1612 if overshoot > 0: 

1613 self._index += overshoot 

1614 self._bitfield >>= overshoot 

1615 assert self.is_valid(number), "Sequence number was not valid before strike-out" 

1616 self._bitfield |= 1 << (number - self._index) 

1617 

1618 self.strike_out_callback() 

1619 

1620 def persist(self): 

1621 """Return a dict containing internal state which can be passed to init 

1622 to recreated the replay window.""" 

1623 

1624 return {"index": self._index, "bitfield": self._bitfield} 

1625 

1626 

1627class FilesystemSecurityContext( 

1628 CanProtect, CanUnprotect, SecurityContextUtils, credentials._Objectish 

1629): 

1630 """Security context stored in a directory as distinct files containing 

1631 containing 

1632 

1633 * Master secret, master salt, sender and recipient ID, 

1634 optionally algorithm, the KDF hash function, and replay window size 

1635 (settings.json and secrets.json, where the latter is typically readable 

1636 only for the user) 

1637 * sequence numbers and replay windows (sequence.json, the only file the 

1638 process needs write access to) 

1639 

1640 The static parameters can all either be placed in settings.json or 

1641 secrets.json, but must not be present in both; the presence of either file 

1642 is sufficient. 

1643 

1644 .. warning:: 

1645 

1646 Security contexts must never be copied around and used after another 

1647 copy was used. They should only ever be moved, and if they are copied 

1648 (eg. as a part of a system backup), restored contexts must not be used 

1649 again; they need to be replaced with freshly created ones. 

1650 

1651 An additional file named `lock` is created to prevent the accidental use of 

1652 a context by to concurrent programs. 

1653 

1654 Note that the sequence number file is updated in an atomic fashion which 

1655 requires file creation privileges in the directory. If privilege separation 

1656 between settings/key changes and sequence number changes is desired, one 

1657 way to achieve that on Linux is giving the aiocoap process's user group 

1658 write permissions on the directory and setting the sticky bit on the 

1659 directory, thus forbidding the user to remove the settings/secret files not 

1660 owned by him. 

1661 

1662 Writes due to sent sequence numbers are reduced by applying a variation on 

1663 the mechanism of RFC8613 Appendix B.1.1 (incrementing the persisted sender 

1664 seqence number in steps of `k`). That value is automatically grown from 

1665 sequence_number_chunksize_start up to sequence_number_chunksize_limit. 

1666 At runtime, the receive window is not stored but kept indeterminate. In 

1667 case of an abnormal shutdown, the server uses the mechanism described in 

1668 Appendix B.1.2 to recover. 

1669 """ 

1670 

1671 # possibly overridden in constructor 

1672 alg_aead = algorithms[DEFAULT_ALGORITHM] 

1673 

1674 class LoadError(ValueError): 

1675 """Exception raised with a descriptive message when trying to load a 

1676 faulty security context""" 

1677 

1678 def __init__( 

1679 self, 

1680 basedir: str, 

1681 sequence_number_chunksize_start=10, 

1682 sequence_number_chunksize_limit=10000, 

1683 ): 

1684 self.basedir = basedir 

1685 

1686 self.lockfile: Optional[filelock.FileLock] = filelock.FileLock( 

1687 os.path.join(basedir, "lock") 

1688 ) 

1689 # 0.001: Just fail if it can't be acquired 

1690 # See https://github.com/benediktschmitt/py-filelock/issues/57 

1691 try: 

1692 self.lockfile.acquire(timeout=0.001) 

1693 # see https://github.com/PyCQA/pycodestyle/issues/703 

1694 except: # noqa: E722 

1695 # No lock, no loading, no need to fail in __del__ 

1696 self.lockfile = None 

1697 raise 

1698 

1699 # Always enabled as committing to a file for every received request 

1700 # would be a terrible burden. 

1701 self.echo_recovery = secrets.token_bytes(8) 

1702 

1703 try: 

1704 self._load() 

1705 except KeyError as k: 

1706 raise self.LoadError("Configuration key missing: %s" % (k.args[0],)) 

1707 

1708 self.sequence_number_chunksize_start = sequence_number_chunksize_start 

1709 self.sequence_number_chunksize_limit = sequence_number_chunksize_limit 

1710 self.sequence_number_chunksize = sequence_number_chunksize_start 

1711 

1712 self.sequence_number_persisted = self.sender_sequence_number 

1713 

1714 def _load(self): 

1715 # doesn't check for KeyError on every occasion, relies on __init__ to 

1716 # catch that 

1717 

1718 data = {} 

1719 for readfile in ("secret.json", "settings.json"): 

1720 try: 

1721 with open(os.path.join(self.basedir, readfile)) as f: 

1722 filedata = json.load(f) 

1723 except FileNotFoundError: 

1724 continue 

1725 

1726 for key, value in filedata.items(): 

1727 if key.endswith("_hex"): 

1728 key = key[:-4] 

1729 value = binascii.unhexlify(value) 

1730 elif key.endswith("_ascii"): 

1731 key = key[:-6] 

1732 value = value.encode("ascii") 

1733 

1734 if key in data: 

1735 raise self.LoadError( 

1736 "Datum %r present in multiple input files at %r." 

1737 % (key, self.basedir) 

1738 ) 

1739 

1740 data[key] = value 

1741 

1742 self.alg_aead = algorithms[data.get("algorithm", DEFAULT_ALGORITHM)] 

1743 self.hashfun = hashfunctions[data.get("kdf-hashfun", DEFAULT_HASHFUNCTION)] 

1744 

1745 windowsize = data.get("window", DEFAULT_WINDOWSIZE) 

1746 if not isinstance(windowsize, int): 

1747 raise self.LoadError("Non-integer replay window") 

1748 

1749 self.sender_id = data["sender-id"] 

1750 self.recipient_id = data["recipient-id"] 

1751 

1752 if ( 

1753 max(len(self.sender_id), len(self.recipient_id)) 

1754 > self.alg_aead.iv_bytes - 6 

1755 ): 

1756 raise self.LoadError( 

1757 "Sender or Recipient ID too long (maximum length %s for this algorithm)" 

1758 % (self.alg_aead.iv_bytes - 6) 

1759 ) 

1760 

1761 master_secret = data["secret"] 

1762 master_salt = data.get("salt", b"") 

1763 self.id_context = data.get("id-context", None) 

1764 

1765 self.derive_keys(master_salt, master_secret) 

1766 

1767 self.recipient_replay_window = ReplayWindow( 

1768 windowsize, self._replay_window_changed 

1769 ) 

1770 try: 

1771 with open(os.path.join(self.basedir, "sequence.json")) as f: 

1772 sequence = json.load(f) 

1773 except FileNotFoundError: 

1774 self.sender_sequence_number = 0 

1775 self.recipient_replay_window.initialize_empty() 

1776 self.replay_window_persisted = True 

1777 else: 

1778 self.sender_sequence_number = int(sequence["next-to-send"]) 

1779 received = sequence["received"] 

1780 if received == "unknown": 

1781 # The replay window will stay uninitialized, which triggers 

1782 # Echo recovery 

1783 self.replay_window_persisted = False 

1784 else: 

1785 try: 

1786 self.recipient_replay_window.initialize_from_persisted(received) 

1787 except (ValueError, TypeError, KeyError): 

1788 # Not being particularly careful about what could go wrong: If 

1789 # someone tampers with the replay data, we're already in *big* 

1790 # trouble, of which I fail to see how it would become worse 

1791 # than a crash inside the application around "failure to 

1792 # right-shift a string" or that like; at worst it'd result in 

1793 # nonce reuse which tampering with the replay window file 

1794 # already does. 

1795 raise self.LoadError( 

1796 "Persisted replay window state was not understood" 

1797 ) 

1798 self.replay_window_persisted = True 

1799 

1800 # This is called internally whenever a new sequence number is taken or 

1801 # crossed out from the window, and blocks a lot; B.1 mode mitigates that. 

1802 # 

1803 # Making it async and block in a threadpool would mitigate the blocking of 

1804 # other messages, but the more visible effect of this will be that no 

1805 # matter if sync or async, a reply will need to wait for a file sync 

1806 # operation to conclude. 

1807 def _store(self): 

1808 tmphand, tmpnam = tempfile.mkstemp( 

1809 dir=self.basedir, prefix=".sequence-", suffix=".json", text=True 

1810 ) 

1811 

1812 data = {"next-to-send": self.sequence_number_persisted} 

1813 if not self.replay_window_persisted: 

1814 data["received"] = "unknown" 

1815 else: 

1816 data["received"] = self.recipient_replay_window.persist() 

1817 

1818 # Using io.open (instead os.fdopen) and binary / write with encode 

1819 # rather than dumps as that works even while the interpreter is 

1820 # shutting down. 

1821 # 

1822 # This can be relaxed when there is a defined shutdown sequence for 

1823 # security contexts that's triggered from the general context shutdown 

1824 # -- but right now, there isn't. 

1825 with io.open(tmphand, "wb") as tmpfile: 

1826 tmpfile.write(json.dumps(data).encode("utf8")) 

1827 tmpfile.flush() 

1828 os.fsync(tmpfile.fileno()) 

1829 

1830 os.replace(tmpnam, os.path.join(self.basedir, "sequence.json")) 

1831 

1832 def _replay_window_changed(self): 

1833 if self.replay_window_persisted: 

1834 # Just remove the sequence numbers once from the file 

1835 self.replay_window_persisted = False 

1836 self._store() 

1837 

1838 def post_seqnoincrease(self): 

1839 if self.sender_sequence_number > self.sequence_number_persisted: 

1840 self.sequence_number_persisted += self.sequence_number_chunksize 

1841 

1842 self.sequence_number_chunksize = min( 

1843 self.sequence_number_chunksize * 2, self.sequence_number_chunksize_limit 

1844 ) 

1845 # FIXME: this blocks -- see https://github.com/chrysn/aiocoap/issues/178 

1846 self._store() 

1847 

1848 # The = case would only happen if someone deliberately sets all 

1849 # numbers to 1 to force persisting on every step 

1850 assert ( 

1851 self.sender_sequence_number <= self.sequence_number_persisted 

1852 ), "Using a sequence number that has been persisted already" 

1853 

1854 def _destroy(self): 

1855 """Release the lock file, and ensure tha he object has become 

1856 unusable. 

1857 

1858 If there is unpersisted state from B.1 operation, the actually used 

1859 number and replay window gets written back to the file to allow 

1860 resumption without wasting digits or round-trips. 

1861 """ 

1862 # FIXME: Arrange for a more controlled shutdown through the credentials 

1863 

1864 self.replay_window_persisted = True 

1865 self.sequence_number_persisted = self.sender_sequence_number 

1866 self._store() 

1867 

1868 del self.sender_key 

1869 del self.recipient_key 

1870 

1871 os.unlink(self.lockfile.lock_file) 

1872 self.lockfile.release() 

1873 

1874 self.lockfile = None 

1875 

1876 def __del__(self): 

1877 if self.lockfile is not None: 

1878 self._destroy() 

1879 

1880 @classmethod 

1881 def from_item(cls, init_data): 

1882 """Overriding _Objectish's from_item because the parameter name for 

1883 basedir is contextfile for historical reasons""" 

1884 

1885 def constructor( 

1886 basedir: Optional[str] = None, contextfile: Optional[str] = None 

1887 ): 

1888 if basedir is not None and contextfile is not None: 

1889 raise credentials.CredentialsLoadError( 

1890 "Conflicting arguments basedir and contextfile; just contextfile instead" 

1891 ) 

1892 if basedir is None and contextfile is None: 

1893 raise credentials.CredentialsLoadError("Missing item 'basedir'") 

1894 if contextfile is not None: 

1895 warnings.warn( 

1896 "Property contextfile was renamed to basedir in OSCORE credentials entries", 

1897 DeprecationWarning, 

1898 stacklevel=2, 

1899 ) 

1900 basedir = contextfile 

1901 assert ( 

1902 basedir is not None 

1903 ) # This helps mypy which would otherwise not see that the above ensures this already 

1904 return cls(basedir) 

1905 

1906 return credentials._call_from_structureddata( 

1907 constructor, cls.__name__, init_data 

1908 ) 

1909 

1910 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

1911 return set((self.recipient_id,)) 

1912 

1913 

1914class GroupContext(ContextWhereExternalAadIsGroup, BaseSecurityContext): 

1915 is_signing = True 

1916 responses_send_kid = True 

1917 

1918 @abc.abstractproperty 

1919 def private_key(self): 

1920 """Private key used to sign outgoing messages. 

1921 

1922 Contexts not designed to send messages may raise a RuntimeError here; 

1923 that necessity may later go away if some more accurate class modelling 

1924 is found.""" 

1925 

1926 @abc.abstractproperty 

1927 def recipient_public_key(self): 

1928 """Public key used to verify incoming messages. 

1929 

1930 Contexts not designed to receive messages (because they'd have aspects 

1931 for that) may raise a RuntimeError here; that necessity may later go 

1932 away if some more accurate class modelling is found.""" 

1933 

1934 

1935class SimpleGroupContext(GroupContext, CanProtect, CanUnprotect, SecurityContextUtils): 

1936 """A context for an OSCORE group 

1937 

1938 This is a non-persistable version of a group context that does not support 

1939 any group manager or rekeying; it is set up statically at startup. 

1940 

1941 It is intended for experimentation and demos, but aims to be correct enough 

1942 to be usable securely. 

1943 """ 

1944 

1945 # set during initialization (making all those attributes rather than 

1946 # possibly properties as they might be in super) 

1947 sender_id = None 

1948 id_context = None # type: ignore 

1949 private_key = None 

1950 alg_aead = None 

1951 hashfun = None # type: ignore 

1952 alg_signature = None 

1953 alg_group_enc = None 

1954 alg_pairwise_key_agreement = None 

1955 sender_auth_cred = None # type: ignore 

1956 group_manager_cred = None # type: ignore 

1957 cred_fmt = None 

1958 # This is currently not evaluated, but any GM interaction will need to have this information available. 

1959 group_manager_cred_fmt = None 

1960 

1961 def __init__( 

1962 self, 

1963 alg_aead, 

1964 hashfun, 

1965 alg_signature, 

1966 alg_group_enc, 

1967 alg_pairwise_key_agreement, 

1968 group_id, 

1969 master_secret, 

1970 master_salt, 

1971 sender_id, 

1972 private_key, 

1973 sender_auth_cred, 

1974 peers, 

1975 group_manager_cred, 

1976 cred_fmt=COSE_KCCS, 

1977 group_manager_cred_fmt=COSE_KCCS, 

1978 ): 

1979 self.sender_id = sender_id 

1980 self.id_context = group_id 

1981 self.private_key = private_key 

1982 self.alg_aead = alg_aead 

1983 self.hashfun = hashfun 

1984 self.alg_signature = alg_signature 

1985 self.alg_group_enc = alg_group_enc 

1986 self.alg_pairwise_key_agreement = alg_pairwise_key_agreement 

1987 self.sender_auth_cred = sender_auth_cred 

1988 self.group_manager_cred = group_manager_cred 

1989 self.cred_fmt = cred_fmt 

1990 self.group_manager_cred_fmt = group_manager_cred_fmt 

1991 

1992 self.peers = peers.keys() 

1993 self.recipient_public_keys = { 

1994 k: self._parse_credential(v) for (k, v) in peers.items() 

1995 } 

1996 self.recipient_auth_creds = peers 

1997 self.recipient_replay_windows = {} 

1998 for k in self.peers: 

1999 # no need to persist, the whole group is ephemeral 

2000 w = ReplayWindow(32, lambda: None) 

2001 w.initialize_empty() 

2002 self.recipient_replay_windows[k] = w 

2003 

2004 self.derive_keys(master_salt, master_secret) 

2005 self.sender_sequence_number = 0 

2006 

2007 sender_public_key = self._parse_credential(sender_auth_cred) 

2008 if ( 

2009 self.alg_signature.public_from_private(self.private_key) 

2010 != sender_public_key 

2011 ): 

2012 raise ValueError( 

2013 "The key in the provided sender credential does not match the private key" 

2014 ) 

2015 

2016 def _parse_credential(self, credential: bytes): 

2017 """Extract the public key (in the public_key format the respective 

2018 AlgorithmCountersign needs) from credentials. This raises a ValueError 

2019 if the credentials do not match the group's cred_fmt, or if the 

2020 parameters do not match those configured in the group. 

2021 

2022 This currently discards any information that is present in the 

2023 credential that exceeds the key. (In a future version, this could 

2024 return both the key and extracted other data, where that other data 

2025 would be stored with the peer this is parsed from). 

2026 """ 

2027 

2028 if self.cred_fmt != COSE_KCCS: 

2029 raise ValueError( 

2030 "Credential parsing is currently only implemented for CCSs" 

2031 ) 

2032 

2033 assert self.alg_signature is not None 

2034 

2035 return self.alg_signature.from_kccs(credential) 

2036 

2037 def __repr__(self): 

2038 return "<%s with group %r sender_id %r and %d peers>" % ( 

2039 type(self).__name__, 

2040 self.id_context.hex(), 

2041 self.sender_id.hex(), 

2042 len(self.peers), 

2043 ) 

2044 

2045 @property 

2046 def recipient_public_key(self): 

2047 raise RuntimeError( 

2048 "Group context without key indication was used for verification" 

2049 ) 

2050 

2051 def derive_keys(self, master_salt, master_secret): 

2052 # FIXME unify with parent? 

2053 

2054 self.sender_key = self._kdf(master_salt, master_secret, self.sender_id, "Key") 

2055 self.recipient_keys = { 

2056 recipient_id: self._kdf(master_salt, master_secret, recipient_id, "Key") 

2057 for recipient_id in self.peers 

2058 } 

2059 

2060 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV") 

2061 

2062 # but this one is new 

2063 

2064 self.signature_encryption_key = self._kdf( 

2065 master_salt, master_secret, b"", "SEKey" 

2066 ) 

2067 

2068 def post_seqnoincrease(self): 

2069 """No-op because it's ephemeral""" 

2070 

2071 def context_from_response(self, unprotected_bag) -> CanUnprotect: 

2072 # sender ID *needs to be* here -- if this were a pairwise request, it 

2073 # would not run through here 

2074 try: 

2075 sender_kid = unprotected_bag[COSE_KID] 

2076 except KeyError: 

2077 raise DecodeError("Group server failed to send own sender KID") 

2078 

2079 if COSE_COUNTERSIGNATURE0 in unprotected_bag: 

2080 return _GroupContextAspect(self, sender_kid) 

2081 else: 

2082 return _PairwiseContextAspect(self, sender_kid) 

2083 

2084 def get_oscore_context_for(self, unprotected): 

2085 if unprotected.get(COSE_KID_CONTEXT, None) != self.id_context: 

2086 return None 

2087 

2088 kid = unprotected.get(COSE_KID, None) 

2089 if kid in self.peers: 

2090 if COSE_COUNTERSIGNATURE0 in unprotected: 

2091 return _GroupContextAspect(self, kid) 

2092 elif self.recipient_public_keys[kid] is DETERMINISTIC_KEY: 

2093 return _DeterministicUnprotectProtoAspect(self, kid) 

2094 else: 

2095 return _PairwiseContextAspect(self, kid) 

2096 

2097 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

2098 # not conflicting: groups always send KID Context 

2099 return set() 

2100 

2101 # yet to stabilize... 

2102 

2103 def pairwise_for(self, recipient_id): 

2104 return _PairwiseContextAspect(self, recipient_id) 

2105 

2106 def for_sending_deterministic_requests( 

2107 self, deterministic_id, target_server: Optional[bytes] 

2108 ): 

2109 return _DeterministicProtectProtoAspect(self, deterministic_id, target_server) 

2110 

2111 

2112class _GroupContextAspect(GroupContext, CanUnprotect, SecurityContextUtils): 

2113 """The concrete context this host has with a particular peer 

2114 

2115 As all actual data is stored in the underlying groupcontext, this acts as 

2116 an accessor to that object (which picks the right recipient key). 

2117 

2118 This accessor is for receiving messages in group mode from a particular 

2119 peer; it does not send (and turns into a pairwise context through 

2120 context_for_response before it comes to that). 

2121 """ 

2122 

2123 def __init__(self, groupcontext: GroupContext, recipient_id: bytes) -> None: 

2124 self.groupcontext = groupcontext 

2125 self.recipient_id = recipient_id 

2126 

2127 def __repr__(self): 

2128 return "<%s inside %r with the peer %r>" % ( 

2129 type(self).__name__, 

2130 self.groupcontext, 

2131 self.recipient_id.hex(), 

2132 ) 

2133 

2134 private_key = None 

2135 

2136 # not inline because the equivalent lambda would not be recognized by mypy 

2137 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2138 @property 

2139 def id_context(self): 

2140 return self.groupcontext.id_context 

2141 

2142 @property 

2143 def alg_aead(self): 

2144 return self.groupcontext.alg_aead 

2145 

2146 @property 

2147 def alg_signature(self): 

2148 return self.groupcontext.alg_signature 

2149 

2150 @property 

2151 def alg_group_enc(self): 

2152 return self.groupcontext.alg_group_enc 

2153 

2154 @property 

2155 def alg_pairwise_key_agreement(self): 

2156 return self.groupcontext.alg_pairwise_key_agreement 

2157 

2158 @property 

2159 def group_manager_cred(self): 

2160 return self.groupcontext.group_manager_cred 

2161 

2162 @property 

2163 def common_iv(self): 

2164 return self.groupcontext.common_iv 

2165 

2166 @property 

2167 def hashfun(self): 

2168 return self.groupcontext.hashfun 

2169 

2170 @property 

2171 def signature_encryption_key(self): 

2172 return self.groupcontext.signature_encryption_key 

2173 

2174 @property 

2175 def recipient_key(self): 

2176 return self.groupcontext.recipient_keys[self.recipient_id] 

2177 

2178 @property 

2179 def recipient_public_key(self): 

2180 return self.groupcontext.recipient_public_keys[self.recipient_id] 

2181 

2182 @property 

2183 def recipient_auth_cred(self): 

2184 return self.groupcontext.recipient_auth_creds[self.recipient_id] 

2185 

2186 @property 

2187 def recipient_replay_window(self): 

2188 return self.groupcontext.recipient_replay_windows[self.recipient_id] 

2189 

2190 def context_for_response(self): 

2191 return self.groupcontext.pairwise_for(self.recipient_id) 

2192 

2193 @property 

2194 def sender_auth_cred(self): 

2195 raise RuntimeError( 

2196 "Could relay the sender auth credential from the group context, but it shouldn't matter here" 

2197 ) 

2198 

2199 

2200class _PairwiseContextAspect( 

2201 GroupContext, CanProtect, CanUnprotect, SecurityContextUtils 

2202): 

2203 is_signing = False 

2204 

2205 def __init__(self, groupcontext, recipient_id): 

2206 self.groupcontext = groupcontext 

2207 self.recipient_id = recipient_id 

2208 

2209 shared_secret = self.alg_pairwise_key_agreement.staticstatic( 

2210 self.groupcontext.private_key, 

2211 self.groupcontext.recipient_public_keys[recipient_id], 

2212 ) 

2213 

2214 self.sender_key = self._kdf( 

2215 self.groupcontext.sender_key, 

2216 ( 

2217 self.groupcontext.sender_auth_cred 

2218 + self.groupcontext.recipient_auth_creds[recipient_id] 

2219 + shared_secret 

2220 ), 

2221 self.groupcontext.sender_id, 

2222 "Key", 

2223 ) 

2224 self.recipient_key = self._kdf( 

2225 self.groupcontext.recipient_keys[recipient_id], 

2226 ( 

2227 self.groupcontext.recipient_auth_creds[recipient_id] 

2228 + self.groupcontext.sender_auth_cred 

2229 + shared_secret 

2230 ), 

2231 self.recipient_id, 

2232 "Key", 

2233 ) 

2234 

2235 def __repr__(self): 

2236 return "<%s based on %r with the peer %r>" % ( 

2237 type(self).__name__, 

2238 self.groupcontext, 

2239 self.recipient_id.hex(), 

2240 ) 

2241 

2242 # FIXME: actually, only to be sent in requests 

2243 

2244 # not inline because the equivalent lambda would not be recognized by mypy 

2245 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2246 @property 

2247 def id_context(self): 

2248 return self.groupcontext.id_context 

2249 

2250 @property 

2251 def alg_aead(self): 

2252 return self.groupcontext.alg_aead 

2253 

2254 @property 

2255 def hashfun(self): 

2256 return self.groupcontext.hashfun 

2257 

2258 @property 

2259 def alg_signature(self): 

2260 return self.groupcontext.alg_signature 

2261 

2262 @property 

2263 def alg_group_enc(self): 

2264 return self.groupcontext.alg_group_enc 

2265 

2266 @property 

2267 def alg_pairwise_key_agreement(self): 

2268 return self.groupcontext.alg_pairwise_key_agreement 

2269 

2270 @property 

2271 def group_manager_cred(self): 

2272 return self.groupcontext.group_manager_cred 

2273 

2274 @property 

2275 def common_iv(self): 

2276 return self.groupcontext.common_iv 

2277 

2278 @property 

2279 def sender_id(self): 

2280 return self.groupcontext.sender_id 

2281 

2282 @property 

2283 def recipient_auth_cred(self): 

2284 return self.groupcontext.recipient_auth_creds[self.recipient_id] 

2285 

2286 @property 

2287 def sender_auth_cred(self): 

2288 return self.groupcontext.sender_auth_cred 

2289 

2290 @property 

2291 def recipient_replay_window(self): 

2292 return self.groupcontext.recipient_replay_windows[self.recipient_id] 

2293 

2294 # Set at initialization 

2295 recipient_key = None 

2296 sender_key = None 

2297 

2298 @property 

2299 def sender_sequence_number(self): 

2300 return self.groupcontext.sender_sequence_number 

2301 

2302 @sender_sequence_number.setter 

2303 def sender_sequence_number(self, new): 

2304 self.groupcontext.sender_sequence_number = new 

2305 

2306 def post_seqnoincrease(self): 

2307 self.groupcontext.post_seqnoincrease() 

2308 

2309 # same here -- not needed because not signing 

2310 private_key = property(post_seqnoincrease) 

2311 recipient_public_key = property(post_seqnoincrease) 

2312 

2313 def context_from_response(self, unprotected_bag) -> CanUnprotect: 

2314 if unprotected_bag.get(COSE_KID, self.recipient_id) != self.recipient_id: 

2315 raise DecodeError( 

2316 "Response coming from a different server than requested, not attempting to decrypt" 

2317 ) 

2318 

2319 if COSE_COUNTERSIGNATURE0 in unprotected_bag: 

2320 # It'd be an odd thing to do, but it's source verified, so the 

2321 # server hopefully has reasons to make this readable to other group 

2322 # members. 

2323 return _GroupContextAspect(self.groupcontext, self.recipient_id) 

2324 else: 

2325 return self 

2326 

2327 

2328class _DeterministicProtectProtoAspect( 

2329 ContextWhereExternalAadIsGroup, CanProtect, SecurityContextUtils 

2330): 

2331 """This implements the sending side of Deterministic Requests. 

2332 

2333 While simialr to a _PairwiseContextAspect, it only derives the key at 

2334 protection time, as the plain text is hashed into the key.""" 

2335 

2336 deterministic_hashfun = hashes.SHA256() 

2337 

2338 def __init__(self, groupcontext, sender_id, target_server: Optional[bytes]): 

2339 self.groupcontext = groupcontext 

2340 self.sender_id = sender_id 

2341 self.target_server = target_server 

2342 

2343 def __repr__(self): 

2344 return "<%s based on %r with the sender ID %r%s>" % ( 

2345 type(self).__name__, 

2346 self.groupcontext, 

2347 self.sender_id.hex(), 

2348 "limited to responses from %s" % self.target_server 

2349 if self.target_server is not None 

2350 else "", 

2351 ) 

2352 

2353 def new_sequence_number(self): 

2354 return 0 

2355 

2356 def post_seqnoincrease(self): 

2357 pass 

2358 

2359 def context_from_response(self, unprotected_bag): 

2360 if self.target_server is None: 

2361 if COSE_KID not in unprotected_bag: 

2362 raise DecodeError( 

2363 "Server did not send a KID and no particular one was addressed" 

2364 ) 

2365 else: 

2366 if unprotected_bag.get(COSE_KID, self.target_server) != self.target_server: 

2367 raise DecodeError( 

2368 "Response coming from a different server than requested, not attempting to decrypt" 

2369 ) 

2370 

2371 if COSE_COUNTERSIGNATURE0 not in unprotected_bag: 

2372 # Could just as well pass and later barf when the group context doesn't find a signature 

2373 raise DecodeError( 

2374 "Response to deterministic request came from unsecure pairwise context" 

2375 ) 

2376 

2377 return _GroupContextAspect( 

2378 self.groupcontext, unprotected_bag.get(COSE_KID, self.target_server) 

2379 ) 

2380 

2381 def _get_sender_key(self, outer_message, aad, plaintext, request_id): 

2382 if outer_message.code.is_response(): 

2383 raise RuntimeError("Deterministic contexts shouldn't protect responses") 

2384 

2385 basekey = self.groupcontext.recipient_keys[self.sender_id] 

2386 

2387 h = hashes.Hash(self.deterministic_hashfun) 

2388 h.update(basekey) 

2389 h.update(aad) 

2390 h.update(plaintext) 

2391 request_hash = h.finalize() 

2392 

2393 outer_message.opt.request_hash = request_hash 

2394 outer_message.code = FETCH 

2395 

2396 # By this time, the AADs have all been calculated already; setting this 

2397 # for the benefit of the response parsing later 

2398 request_id.request_hash = request_hash 

2399 # FIXME I don't think this ever comes to bear but want to be sure 

2400 # before removing this line (this should only be client-side) 

2401 request_id.can_reuse_nonce = False 

2402 # FIXME: we're still sending a h'00' PIV. Not wrong, just a wasted byte. 

2403 

2404 return self._kdf(basekey, request_hash, self.sender_id, "Key") 

2405 

2406 # details needed for various operations, especially eAAD generation 

2407 

2408 # not inline because the equivalent lambda would not be recognized by mypy 

2409 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2410 @property 

2411 def alg_aead(self): 

2412 return self.groupcontext.alg_aead 

2413 

2414 @property 

2415 def hashfun(self): 

2416 return self.groupcontext.hashfun 

2417 

2418 @property 

2419 def common_iv(self): 

2420 return self.groupcontext.common_iv 

2421 

2422 @property 

2423 def id_context(self): 

2424 return self.groupcontext.id_context 

2425 

2426 @property 

2427 def alg_signature(self): 

2428 return self.groupcontext.alg_signature 

2429 

2430 

2431class _DeterministicUnprotectProtoAspect( 

2432 ContextWhereExternalAadIsGroup, CanUnprotect, SecurityContextUtils 

2433): 

2434 """This implements the sending side of Deterministic Requests. 

2435 

2436 While simialr to a _PairwiseContextAspect, it only derives the key at 

2437 unprotection time, based on information given as Request-Hash.""" 

2438 

2439 # Unless None, this is the value by which the running process recognizes 

2440 # that the second phase of a B.1.2 replay window recovery Echo option comes 

2441 # from the current process, and thus its sequence number is fresh 

2442 echo_recovery = None 

2443 

2444 deterministic_hashfun = hashes.SHA256() 

2445 

2446 class ZeroIsAlwaysValid: 

2447 """Special-purpose replay window that accepts 0 indefinitely""" 

2448 

2449 def is_initialized(self): 

2450 return True 

2451 

2452 def is_valid(self, number): 

2453 # No particular reason to be lax here 

2454 return number == 0 

2455 

2456 def strike_out(self, number): 

2457 # FIXME: I'd rather indicate here that it's a potential replay, have the 

2458 # request_id.can_reuse_nonce = False 

2459 # set here rather than in _post_decrypt_checks, and thus also get 

2460 # the check for whether it's a safe method 

2461 pass 

2462 

2463 def persist(self): 

2464 pass 

2465 

2466 def __init__(self, groupcontext, recipient_id): 

2467 self.groupcontext = groupcontext 

2468 self.recipient_id = recipient_id 

2469 

2470 self.recipient_replay_window = self.ZeroIsAlwaysValid() 

2471 

2472 def __repr__(self): 

2473 return "<%s based on %r with the recipient ID %r>" % ( 

2474 type(self).__name__, 

2475 self.groupcontext, 

2476 self.recipient_id.hex(), 

2477 ) 

2478 

2479 def context_for_response(self): 

2480 return self.groupcontext 

2481 

2482 def _get_recipient_key(self, protected_message): 

2483 return self._kdf( 

2484 self.groupcontext.recipient_keys[self.recipient_id], 

2485 protected_message.opt.request_hash, 

2486 self.recipient_id, 

2487 "Key", 

2488 ) 

2489 

2490 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id): 

2491 if plaintext[0] not in (GET, FETCH): # FIXME: "is safe" 

2492 # FIXME: accept but return inner Unauthorized. (Raising Unauthorized 

2493 # here would just create an unprotected Unauthorized, which is not 

2494 # what's spec'd for here) 

2495 raise ProtectionInvalid("Request was not safe") 

2496 

2497 basekey = self.groupcontext.recipient_keys[self.recipient_id] 

2498 

2499 h = hashes.Hash(self.deterministic_hashfun) 

2500 h.update(basekey) 

2501 h.update(aad) 

2502 h.update(plaintext) 

2503 request_hash = h.finalize() 

2504 

2505 if request_hash != protected_message.opt.request_hash: 

2506 raise ProtectionInvalid( 

2507 "Client's hash of the plaintext diverges from the actual request hash" 

2508 ) 

2509 

2510 # This is intended for the protection of the response, and the 

2511 # later use in signature in the unprotect function is not happening 

2512 # here anyway, neither is the later use for Echo requests 

2513 request_id.request_hash = request_hash 

2514 request_id.can_reuse_nonce = False 

2515 

2516 # details needed for various operations, especially eAAD generation 

2517 

2518 # not inline because the equivalent lambda would not be recognized by mypy 

2519 # (workaround for <https://github.com/python/mypy/issues/8083>) 

2520 @property 

2521 def alg_aead(self): 

2522 return self.groupcontext.alg_aead 

2523 

2524 @property 

2525 def hashfun(self): 

2526 return self.groupcontext.hashfun 

2527 

2528 @property 

2529 def common_iv(self): 

2530 return self.groupcontext.common_iv 

2531 

2532 @property 

2533 def id_context(self): 

2534 return self.groupcontext.id_context 

2535 

2536 @property 

2537 def alg_signature(self): 

2538 return self.groupcontext.alg_signature 

2539 

2540 

2541def verify_start(message): 

2542 """Extract the unprotected COSE options from a 

2543 message for the verifier to then pick a security context to actually verify 

2544 the message. (Future versions may also report fields from both unprotected 

2545 and protected, if the protected bag is ever used with OSCORE.). 

2546 

2547 Call this only requests; for responses, you'll have to know the security 

2548 context anyway, and there is usually no information to be gained.""" 

2549 

2550 _, _, unprotected, _ = CanUnprotect._extract_encrypted0(message) 

2551 

2552 return unprotected 

2553 

2554 

2555_getattr__ = deprecation_getattr( 

2556 { 

2557 "COSE_COUNTERSINGATURE0": "COSE_COUNTERSIGNATURE0", 

2558 "Algorithm": "AeadAlgorithm", 

2559 }, 

2560 globals(), 

2561)