Coverage for src/aiocoap/ 0%
1189 statements
« prev ^ index » next v7.6.12, created at 2025-02-12 11:18 +0000
« prev ^ index » next v7.6.12, created at 2025-02-12 11:18 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
3# SPDX-License-Identifier: MIT
5"""This module contains the tools to send OSCORE secured messages.
7It only deals with the algorithmic parts, the security context and protection
8and unprotection of messages. It does not touch on the integration of OSCORE in
9the larger aiocoap stack of having a context or requests; that's what
10:mod:`aiocoap.transports.osore` is for.`"""
12from __future__ import annotations
14from collections import namedtuple
15import io
16import json
17import binascii
18import os
19import os.path
20import tempfile
21import abc
22from typing import Optional, List, Any, Tuple
23import secrets
24import warnings
25import logging
27from aiocoap.message import Message
28from aiocoap.util import cryptography_additions, deprecation_getattr, Sentinel
29from aiocoap.numbers import GET, POST, FETCH, CHANGED, UNAUTHORIZED, CONTENT
30from aiocoap import error
31from . import credentials
32from aiocoap.defaults import log_secret
34from cryptography.hazmat.primitives.ciphers import aead
35from cryptography.hazmat.primitives.kdf.hkdf import HKDF
36from cryptography.hazmat.primitives import ciphers, hashes
37import cryptography.hazmat.backends
38import cryptography.exceptions
39from cryptography.hazmat.primitives import asymmetric, serialization
40from cryptography.hazmat.primitives.asymmetric.utils import (
41 decode_dss_signature,
42 encode_dss_signature,
45import cbor2 as cbor
47import filelock
49# Logger through which log events from cryptographic operations (both inside
50# the primitives and around key derivation) are traced.
51_alglog = logging.getLogger("aiocoap.cryptography")
53MAX_SEQNO = 2**40 - 1
55# Relevant values from the IANA registry "CBOR Object Signing and Encryption (COSE)"
56COSE_KID = 4
57COSE_PIV = 6
59# from RFC9338
61# from RFC9528
62COSE_KCCS = 14
67COMPRESSION_BIT_GROUP = 0b100000 # Group Flag from draft-ietf-core-oscore-groupcomm-21
74COSE_KTY_EC2 = 2
78COSE_KEY_EC2_X = -2
79COSE_KEY_EC2_Y = -3
81# While the original values were simple enough to be used in literals, starting
82# with oscore-groupcomm we're using more compact values
87PRESENT_BUT_NO_VALUE_YET = Sentinel("Value will be populated later")
90class CodeStyle(namedtuple("_CodeStyle", ("request", "response"))):
92 POST_CHANGED: CodeStyle
94 @classmethod
95 def from_request(cls, request) -> CodeStyle:
96 if request == FETCH:
97 return cls.FETCH_CONTENT
98 elif request == POST:
99 return cls.POST_CHANGED
100 else:
101 raise ValueError("Invalid request code %r" % request)
105CodeStyle.POST_CHANGED = CodeStyle(POST, CHANGED)
108class DeterministicKey:
109 """Singleton to indicate that for this key member no public or private key
110 is available because it is the Deterministic Client (see
111 <>)
113 This is highly experimental not only from an implementation but also from a
114 specification point of view. The specification has not received adaequate
115 review that would justify using it in any non-experimental scenario.
116 """
119DETERMINISTIC_KEY = DeterministicKey()
120del DeterministicKey
123class NotAProtectedMessage(error.Error, ValueError):
124 """Raised when verification is attempted on a non-OSCORE message"""
126 def __init__(self, message, plain_message):
127 super().__init__(message)
128 self.plain_message = plain_message
131class ProtectionInvalid(error.Error, ValueError):
132 """Raised when verification of an OSCORE message fails"""
135class DecodeError(ProtectionInvalid):
136 """Raised when verification of an OSCORE message fails because CBOR or compressed data were erroneous"""
139class ReplayError(ProtectionInvalid):
140 """Raised when verification of an OSCORE message fails because the sequence numbers was already used"""
143class ReplayErrorWithEcho(ProtectionInvalid, error.RenderableError):
144 """Raised when verification of an OSCORE message fails because the
145 recipient replay window is uninitialized, but a 4.01 Echo can be
146 constructed with the data in the exception that can lead to the client
147 assisting in replay window recovery"""
149 def __init__(self, secctx, request_id, echo):
150 self.secctx = secctx
151 self.request_id = request_id
152 self.echo = echo
154 def to_message(self):
155 inner = Message(
157 echo=self.echo,
158 )
159 outer, _ = self.secctx.protect(inner, request_id=self.request_id)
160 return outer
163class ContextUnavailable(error.Error, ValueError):
164 """Raised when a context is (currently or permanently) unavailable for
165 protecting or unprotecting a message"""
168class RequestIdentifiers:
169 """A container for details that need to be passed along from the
170 (un)protection of a request to the (un)protection of the response; these
171 data ensure that the request-response binding process works by passing
172 around the request's partial IV.
174 Users of this module should never create or interact with instances, but
175 just pass them around.
176 """
178 def __init__(self, kid, partial_iv, can_reuse_nonce, request_code):
179 # The sender ID of whoever generated the partial IV
180 self.kid = kid
181 self.partial_iv = partial_iv
182 self.can_reuse_nonce = can_reuse_nonce
183 self.code_style = CodeStyle.from_request(request_code)
185 self.request_hash = None
187 def get_reusable_kid_and_piv(self):
188 """Return the kid and the partial IV if can_reuse_nonce is True, and
189 set can_reuse_nonce to False."""
191 if self.can_reuse_nonce:
192 self.can_reuse_nonce = False
193 return (self.kid, self.partial_iv)
194 else:
195 return (None, None)
198def _xor_bytes(a, b):
199 assert len(a) == len(b), "XOR needs consistent lengths"
200 # FIXME is this an efficient thing to do, or should we store everything
201 # that possibly needs xor'ing as long integers with an associated length?
202 return bytes(_a ^ _b for (_a, _b) in zip(a, b))
205class SymmetricEncryptionAlgorithm(metaclass=abc.ABCMeta):
206 """A symmetric algorithm
208 The algorithm's API is the AEAD API with addtional authenticated data: The
209 algorihm may or may not verify that data. Algorithms that actually do
210 verify the data are recognized by also being AeadAlgorithm.
211 """
213 value: int
214 key_bytes: int
215 tag_bytes: int
216 iv_bytes: int
218 @abc.abstractmethod
219 def encrypt(cls, plaintext, aad, key, iv):
220 """Return ciphertext + tag for given input data"""
222 @abc.abstractmethod
223 def decrypt(cls, ciphertext_and_tag, aad, key, iv):
224 """Reverse encryption. Must raise ProtectionInvalid on any error
225 stemming from untrusted data."""
227 @staticmethod
228 def _build_encrypt0_structure(protected, external_aad):
229 assert protected == {}, "Unexpected data in protected bucket"
230 protected_serialized = b"" # were it into an empty dict, it'd be the cbor dump
231 enc_structure = ["Encrypt0", protected_serialized, external_aad]
233 return cbor.dumps(enc_structure)
236class AeadAlgorithm(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta):
237 """A symmetric algorithm that provides authentication, including
238 authentication of additional data."""
241class AES_CBC(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta):
242 """AES in CBC mode using tthe Python cryptography library"""
244 tag_bytes = 0
245 # This introduces padding -- this library doesn't need to care because
246 # Python does allocation for us, but others may need to rethink their
247 # buffer allocation strategies.
249 @classmethod
250 def _cipher(cls, key, iv):
251 return ciphers.base.Cipher(
252 ciphers.algorithms.AES(key),
253 ciphers.modes.CBC(iv),
254 )
256 @classmethod
257 def encrypt(cls, plaintext, _aad, key, iv):
258 # FIXME: Ignoring aad violates but is required for Group OSCORE
260 # Padding according to
261 k = cls.key_bytes
262 assert k < 256, (
263 "Algorithm with this key size should not have been created in the first plae"
264 )
265 pad_byte = k - (len(plaintext) % k)
266 pad_bytes = bytes((pad_byte,)) * pad_byte
267 plaintext += pad_bytes
269 encryptor = cls._cipher(key, iv).encryptor()
270 result = encryptor.update(plaintext)
271 result += encryptor.finalize()
272 return result
274 @classmethod
275 def decrypt(cls, ciphertext_and_tag, _aad, key, iv):
276 # FIXME: Ignoring aad violates but is required for Group OSCORE
278 k = cls.key_bytes
279 if ciphertext_and_tag == b"" or len(ciphertext_and_tag) % k != 0:
280 raise ProtectionInvalid("Message length does not match padding")
282 decryptor = cls._cipher(key, iv).decryptor()
283 result = decryptor.update(ciphertext_and_tag)
284 result += decryptor.finalize()
286 # Padding according to
287 claimed_padding = result[-1]
288 if claimed_padding == 0 or claimed_padding > k:
289 raise ProtectionInvalid("Padding does not match key")
290 if result[-claimed_padding:] != bytes((claimed_padding,)) * claimed_padding:
291 raise ProtectionInvalid("Padding is inconsistent")
293 return result[:-claimed_padding]
296class A128CBC(AES_CBC):
297 # from RFC9459
298 value = -65531
299 key_bytes = 16 # 128-bit key
300 iv_bytes = 16 # 16-octet nonce
303class AES_CCM(AeadAlgorithm, metaclass=abc.ABCMeta):
304 """AES-CCM implemented using the Python cryptography library"""
306 @classmethod
307 def encrypt(cls, plaintext, aad, key, iv):
308 return aead.AESCCM(key, cls.tag_bytes).encrypt(iv, plaintext, aad)
310 @classmethod
311 def decrypt(cls, ciphertext_and_tag, aad, key, iv):
312 try:
313 return aead.AESCCM(key, cls.tag_bytes).decrypt(iv, ciphertext_and_tag, aad)
314 except cryptography.exceptions.InvalidTag:
315 raise ProtectionInvalid("Tag invalid")
318class AES_CCM_16_64_128(AES_CCM):
319 # from RFC8152 and draft-ietf-core-object-security-0[012] 3.2.1
320 value = 10
321 key_bytes = 16 # 128-bit key
322 tag_bytes = 8 # 64-bit tag
323 iv_bytes = 13 # 13-byte nonce
326class AES_CCM_16_64_256(AES_CCM):
327 # from RFC8152
328 value = 11
329 key_bytes = 32 # 256-bit key
330 tag_bytes = 8 # 64-bit tag
331 iv_bytes = 13 # 13-byte nonce
334class AES_CCM_64_64_128(AES_CCM):
335 # from RFC8152
336 value = 12
337 key_bytes = 16 # 128-bit key
338 tag_bytes = 8 # 64-bit tag
339 iv_bytes = 7 # 7-byte nonce
342class AES_CCM_64_64_256(AES_CCM):
343 # from RFC8152
344 value = 13
345 key_bytes = 32 # 256-bit key
346 tag_bytes = 8 # 64-bit tag
347 iv_bytes = 7 # 7-byte nonce
350class AES_CCM_16_128_128(AES_CCM):
351 # from RFC8152
352 value = 30
353 key_bytes = 16 # 128-bit key
354 tag_bytes = 16 # 128-bit tag
355 iv_bytes = 13 # 13-byte nonce
358class AES_CCM_16_128_256(AES_CCM):
359 # from RFC8152
360 value = 31
361 key_bytes = 32 # 256-bit key
362 tag_bytes = 16 # 128-bit tag
363 iv_bytes = 13 # 13-byte nonce
366class AES_CCM_64_128_128(AES_CCM):
367 # from RFC8152
368 value = 32
369 key_bytes = 16 # 128-bit key
370 tag_bytes = 16 # 128-bit tag
371 iv_bytes = 7 # 7-byte nonce
374class AES_CCM_64_128_256(AES_CCM):
375 # from RFC8152
376 value = 33
377 key_bytes = 32 # 256-bit key
378 tag_bytes = 16 # 128-bit tag
379 iv_bytes = 7 # 7-byte nonce
382class AES_GCM(AeadAlgorithm, metaclass=abc.ABCMeta):
383 """AES-GCM implemented using the Python cryptography library"""
385 iv_bytes = 12 # 96 bits fixed size of the nonce
387 @classmethod
388 def encrypt(cls, plaintext, aad, key, iv):
389 return aead.AESGCM(key).encrypt(iv, plaintext, aad)
391 @classmethod
392 def decrypt(cls, ciphertext_and_tag, aad, key, iv):
393 try:
394 return aead.AESGCM(key).decrypt(iv, ciphertext_and_tag, aad)
395 except cryptography.exceptions.InvalidTag:
396 raise ProtectionInvalid("Tag invalid")
399class A128GCM(AES_GCM):
400 # from RFC8152
401 value = 1
402 key_bytes = 16 # 128-bit key
403 tag_bytes = 16 # 128-bit tag
406class A192GCM(AES_GCM):
407 # from RFC8152
408 value = 2
409 key_bytes = 24 # 192-bit key
410 tag_bytes = 16 # 128-bit tag
413class A256GCM(AES_GCM):
414 # from RFC8152
415 value = 3
416 key_bytes = 32 # 256-bit key
417 tag_bytes = 16 # 128-bit tag
420class ChaCha20Poly1305(AeadAlgorithm):
421 # from RFC8152
422 value = 24
423 key_bytes = 32 # 256-bit key
424 tag_bytes = 16 # 128-bit tag
425 iv_bytes = 12 # 96-bit nonce
427 @classmethod
428 def encrypt(cls, plaintext, aad, key, iv):
429 return aead.ChaCha20Poly1305(key).encrypt(iv, plaintext, aad)
431 @classmethod
432 def decrypt(cls, ciphertext_and_tag, aad, key, iv):
433 try:
434 return aead.ChaCha20Poly1305(key).decrypt(iv, ciphertext_and_tag, aad)
435 except cryptography.exceptions.InvalidTag:
436 raise ProtectionInvalid("Tag invalid")
439class AlgorithmCountersign(metaclass=abc.ABCMeta):
440 """A fully parameterized COSE countersign algorithm
442 An instance is able to provide all the alg_signature, par_countersign and
443 par_countersign_key parameters taht go into the Group OSCORE algorithms
444 field.
445 """
447 value: int | str
449 @abc.abstractmethod
450 def sign(self, body, external_aad, private_key):
451 """Return the signature produced by the key when using
452 CounterSignature0 as describe in draft-ietf-cose-countersign-01"""
454 @abc.abstractmethod
455 def verify(self, signature, body, external_aad, public_key):
456 """Verify a signature in analogy to sign"""
458 @abc.abstractmethod
459 def generate_with_ccs(self) -> Tuple[Any, bytes]:
460 """Return a usable private key along with a CCS describing it"""
462 @abc.abstractmethod
463 def public_from_private(self, private_key):
464 """Given a private key, derive the publishable key"""
466 @abc.abstractmethod
467 def from_kccs(self, ccs: bytes) -> Any:
468 """Given a CCS, extract the public key, or raise a ValueError if the
469 credential format does not align with the type.
471 The type is not exactly Any, but whichever type is used by this
472 algorithm class."""
474 @staticmethod
475 def _build_countersign_structure(body, external_aad):
476 countersign_structure = [
477 "CounterSignature0",
478 b"",
479 b"",
480 external_aad,
481 body,
482 ]
483 tobesigned = cbor.dumps(countersign_structure)
484 return tobesigned
486 @abc.abstractproperty
487 def signature_length(self) -> int:
488 """The length of a signature using this algorithm"""
490 @abc.abstractproperty
491 def curve_number(self) -> int:
492 """Registered curve number used with this algorithm.
494 Only used for verification of credentials' details"""
497class AlgorithmStaticStatic(metaclass=abc.ABCMeta):
498 @abc.abstractmethod
499 def staticstatic(self, private_key, public_key):
500 """Derive a shared static-static secret from a private and a public key"""
503def _from_kccs_common(ccs: bytes) -> dict:
504 """Check that the CCS contains a CNF claim that is a COSE Key, and return
505 that key"""
507 try:
508 parsed = cbor.loads(ccs)
509 except cbor.CBORDecodeError as e:
510 raise ValueError("CCS not in CBOR format") from e
512 if (
513 not isinstance(parsed, dict)
514 or CWT_CLAIM_CNF not in parsed
515 or not isinstance(parsed[CWT_CLAIM_CNF], dict)
516 or CWT_CNF_COSE_KEY not in parsed[CWT_CLAIM_CNF]
517 or not isinstance(parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY], dict)
518 ):
519 raise ValueError("CCS must contain a COSE Key dict in a CNF")
521 return parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY]
524class Ed25519(AlgorithmCountersign):
525 def sign(self, body, aad, private_key):
526 _alglog.debug("Perfoming signature:")
527 _alglog.debug("* body: %s", body.hex())
528 _alglog.debug("* AAD: %s", aad.hex())
529 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(
530 private_key
531 )
532 return private_key.sign(self._build_countersign_structure(body, aad))
534 def verify(self, signature, body, aad, public_key):
535 _alglog.debug("Verifying signature:")
536 _alglog.debug("* body: %s", body.hex())
537 _alglog.debug("* AAD: %s", aad.hex())
538 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key)
539 try:
540 public_key.verify(signature, self._build_countersign_structure(body, aad))
541 except cryptography.exceptions.InvalidSignature:
542 _alglog.debug("Signature was invalid.")
543 raise ProtectionInvalid("Signature mismatch")
545 def _generate(self):
546 key = asymmetric.ed25519.Ed25519PrivateKey.generate()
547 # FIXME: We could avoid handing the easy-to-misuse bytes around if the
548 # current algorithm interfaces did not insist on passing the
549 # exchangable representations -- and generally that should be more
550 # efficient.
551 return key.private_bytes(
552 encoding=serialization.Encoding.Raw,
553 format=serialization.PrivateFormat.Raw,
554 encryption_algorithm=serialization.NoEncryption(),
555 )
557 def generate_with_ccs(self) -> Tuple[Any, bytes]:
558 private = self._generate()
559 public = self.public_from_private(private)
561 ccs = cbor.dumps(
562 {
566 COSE_KEY_COMMON_ALG: self.value,
567 COSE_KEY_OKP_CRV: self.curve_number,
568 COSE_KEY_OKP_X: public,
569 }
570 }
571 }
572 )
574 return (private, ccs)
576 def public_from_private(self, private_key):
577 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(
578 private_key
579 )
580 public_key = private_key.public_key()
581 return public_key.public_bytes(
582 encoding=serialization.Encoding.Raw,
583 format=serialization.PublicFormat.Raw,
584 )
586 def from_kccs(self, ccs: bytes) -> Any:
587 # eg. {1: 1, 3: -8, -1: 6, -2: h'77 ... 88'}
588 cose_key = _from_kccs_common(ccs)
590 if (
591 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_OKP
592 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value
593 and cose_key.get(COSE_KEY_OKP_CRV) == self.curve_number
594 and COSE_KEY_OKP_X in cose_key
595 ):
596 return cose_key[COSE_KEY_OKP_X]
597 else:
598 raise ValueError("Key type not recognized from CCS key %r" % cose_key)
600 value = -8
601 curve_number = 6
603 signature_length = 64
606class EcdhSsHkdf256(AlgorithmStaticStatic):
607 # FIXME: This class uses the Edwards keys as private and public keys, and
608 # not the converted ones. This will be problematic if pairwise-only
609 # contexts are to be set up.
611 value = -27
613 # FIXME these two will be different when using the Montgomery keys directly
615 # This one will only be used when establishing and distributing pairwise-only keys
616 public_from_private = Ed25519.public_from_private
618 def staticstatic(self, private_key, public_key):
619 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(
620 private_key
621 )
622 private_key = cryptography_additions.sk_to_curve25519(private_key)
624 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key)
625 public_key = cryptography_additions.pk_to_curve25519(public_key)
627 return
630class ECDSA_SHA256_P256(AlgorithmCountersign, AlgorithmStaticStatic):
631 # Trying a new construction approach -- should work just as well given
632 # we're just passing Python objects around
633 def from_public_parts(self, x: bytes, y: bytes):
634 """Create a public key from its COSE values"""
635 return
636 int.from_bytes(x, "big"),
637 int.from_bytes(y, "big"),
639 ).public_key()
641 def from_kccs(self, ccs: bytes) -> Any:
642 cose_key = _from_kccs_common(ccs)
644 if (
645 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_EC2
646 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value
647 and COSE_KEY_EC2_X in cose_key
648 and COSE_KEY_EC2_Y in cose_key
649 ):
650 return self.from_public_parts(
651 x=cose_key[COSE_KEY_EC2_X],
652 y=cose_key[COSE_KEY_EC2_Y],
653 )
654 else:
655 raise ValueError("Key type not recognized from CCS key %r" % cose_key)
657 def from_private_parts(self, x: bytes, y: bytes, d: bytes):
658 public_numbers = self.from_public_parts(x, y).public_numbers()
659 private_numbers =
660 int.from_bytes(d, "big"), public_numbers
661 )
662 return private_numbers.private_key()
664 def sign(self, body, aad, private_key):
665 der_signature = private_key.sign(
666 self._build_countersign_structure(body, aad),
668 )
669 (r, s) = decode_dss_signature(der_signature)
671 return r.to_bytes(32, "big") + s.to_bytes(32, "big")
673 def verify(self, signature, body, aad, public_key):
674 r = signature[:32]
675 s = signature[32:]
676 r = int.from_bytes(r, "big")
677 s = int.from_bytes(s, "big")
678 der_signature = encode_dss_signature(r, s)
679 try:
680 public_key.verify(
681 der_signature,
682 self._build_countersign_structure(body, aad),
684 )
685 except cryptography.exceptions.InvalidSignature:
686 raise ProtectionInvalid("Signature mismatch")
688 def _generate(self):
689 return
691 def generate_with_ccs(self) -> Tuple[Any, bytes]:
692 private = self._generate()
693 public = self.public_from_private(private)
694 # FIXME: Deduplicate with
695 x = public.public_numbers().x.to_bytes(32, "big")
696 y = public.public_numbers().y.to_bytes(32, "big")
698 ccs = cbor.dumps(
699 {
703 COSE_KEY_COMMON_ALG: self.value,
704 COSE_KEY_EC2_X: x,
705 COSE_KEY_EC2_Y: y,
706 }
707 }
708 }
709 )
711 return (private, ccs)
713 def public_from_private(self, private_key):
714 return private_key.public_key()
716 def staticstatic(self, private_key, public_key):
717 return, public_key)
719 value = -7 # FIXME: when used as a static-static algorithm, does this become -27? see shepherd review.
720 curve_number = 1
722 signature_length = 64
725algorithms = {
726 "AES-CCM-16-64-128": AES_CCM_16_64_128(),
727 "AES-CCM-16-64-256": AES_CCM_16_64_256(),
728 "AES-CCM-64-64-128": AES_CCM_64_64_128(),
729 "AES-CCM-64-64-256": AES_CCM_64_64_256(),
730 "AES-CCM-16-128-128": AES_CCM_16_128_128(),
731 "AES-CCM-16-128-256": AES_CCM_16_128_256(),
732 "AES-CCM-64-128-128": AES_CCM_64_128_128(),
733 "AES-CCM-64-128-256": AES_CCM_64_128_256(),
734 "ChaCha20/Poly1305": ChaCha20Poly1305(),
735 "A128GCM": A128GCM(),
736 "A192GCM": A192GCM(),
737 "A256GCM": A256GCM(),
738 "A128CBC": A128CBC(),
741# algorithms with full parameter set
742algorithms_countersign = {
743 # maybe needs a different name...
744 "EdDSA on Ed25519": Ed25519(),
745 "ECDSA w/ SHA-256 on P-256": ECDSA_SHA256_P256(),
748algorithms_staticstatic = {
749 "ECDH-SS + HKDF-256": EcdhSsHkdf256(),
754_hash_backend = cryptography.hazmat.backends.default_backend()
755hashfunctions = {
756 "sha256": hashes.SHA256(),
757 "sha384": hashes.SHA384(),
758 "sha512": hashes.SHA512(),
766class BaseSecurityContext:
767 # Deprecated marker for whether the class uses the
768 # ContextWhereExternalAadIsGroup mixin; see documentation there.
769 external_aad_is_group = False
771 # Authentication information carried with this security context; managed
772 # externally by whatever creates the security context.
773 authenticated_claims: List[str] = []
775 #: AEAD algorithm. This may be None if it is not set in an OSCORE group context.
776 alg_aead: Optional[AeadAlgorithm]
778 #: The common IV of the common context.
779 #:
780 #: This may be longer than needed for constructing IVs with any particular
781 #: algorithm, as per <>
782 common_iv: bytes
784 id_context: Optional[bytes]
786 @property
787 def algorithm(self):
788 warnings.warn(
789 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2
790 )
791 return self.alg_aead
793 @algorithm.setter
794 def algorithm(self, value):
795 warnings.warn(
796 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2
797 )
798 self.alg_aead = value
800 hashfun: hashes.HashAlgorithm
802 def _construct_nonce(
803 self, partial_iv_short, piv_generator_id, alg: SymmetricEncryptionAlgorithm
804 ):
805 pad_piv = b"\0" * (5 - len(partial_iv_short))
807 s = bytes([len(piv_generator_id)])
808 pad_id = b"\0" * (alg.iv_bytes - 6 - len(piv_generator_id))
810 components = s + pad_id + piv_generator_id + pad_piv + partial_iv_short
812 used_common_iv = self.common_iv[: len(components)]
813 nonce = _xor_bytes(used_common_iv, components)
814 _alglog.debug(
815 "Nonce construction: common %s ^ components %s = %s",
816 self.common_iv.hex(),
817 components.hex(),
818 nonce.hex(),
819 )
821 return nonce
823 def _extract_external_aad(
824 self, message, request_id, local_is_sender: bool
825 ) -> bytes:
826 """Build the serialized external AAD from information in the message
827 and the request_id.
829 Information about whether the local context is the sender of the
830 message is only relevant to group contexts, where it influences whose
831 authentication credentials are placed in the AAD.
832 """
833 # If any option were actually Class I, it would be something like
834 #
835 # the_options = pick some of(message)
836 # class_i_options = Message(the_options).opt.encode()
838 oscore_version = 1
839 class_i_options = b""
840 if request_id.request_hash is not None:
841 class_i_options = Message(request_hash=request_id.request_hash).opt.encode()
843 algorithms: List[int | str | None] = [
844 None if self.alg_aead is None else self.alg_aead.value
845 ]
846 if isinstance(self, ContextWhereExternalAadIsGroup):
847 algorithms.append(
848 None if self.alg_group_enc is None else self.alg_group_enc.value
849 )
850 algorithms.append(
851 None if self.alg_signature is None else self.alg_signature.value
852 )
853 algorithms.append(
854 None
855 if self.alg_pairwise_key_agreement is None
856 else self.alg_pairwise_key_agreement.value
857 )
859 external_aad = [
860 oscore_version,
861 algorithms,
862 request_id.kid,
863 request_id.partial_iv,
864 class_i_options,
865 ]
867 if isinstance(self, ContextWhereExternalAadIsGroup):
868 # FIXME: We may need to carry this over in the request_id when
869 # observation span group rekeyings
870 external_aad.append(self.id_context)
872 assert message.opt.oscore is not None, "Double OSCORE"
873 external_aad.append(message.opt.oscore)
875 if local_is_sender:
876 external_aad.append(self.sender_auth_cred)
877 else:
878 external_aad.append(self.recipient_auth_cred)
879 external_aad.append(self.group_manager_cred)
881 return cbor.dumps(external_aad)
884class ContextWhereExternalAadIsGroup(BaseSecurityContext):
885 """The protection and unprotection functions will use the Group OSCORE AADs
886 rather than the regular OSCORE AADs iff a context uses this mixin. (Ie.
887 alg_group_enc etc are added to the algorithms, and request_kid_context,
888 OSCORE_option, sender_auth_cred and gm_cred are added).
890 This does not necessarily match the is_signing property (as pairwise
891 contexts use this but don't sign), and is distinct from the added OSCORE
892 option in the AAD (as that's only applicable for the external AAD as
893 extracted for signing and signature verification purposes)."""
895 id_context: bytes
897 external_aad_is_group = True
899 alg_group_enc: Optional[SymmetricEncryptionAlgorithm]
900 alg_signature: Optional[AlgorithmCountersign]
901 # This is also of type AlgorithmCountersign because the staticstatic
902 # function is sitting on the same type.
903 alg_pairwise_key_agreement: Optional[AlgorithmCountersign]
905 sender_auth_cred: bytes
906 recipient_auth_cred: bytes
907 group_manager_cred: bytes
910# FIXME pull interface components from SecurityContext up here
911class CanProtect(BaseSecurityContext, metaclass=abc.ABCMeta):
912 # The protection function will add a signature acccording to the context's
913 # alg_signature attribute if this is true
914 is_signing = False
916 # Send the KID when protecting responses
917 #
918 # Once group pairwise mode is implemented, this will need to become a
919 # parameter to protect(), which is stored at the point where the incoming
920 # context is turned into an outgoing context. (Currently, such a mechanism
921 # isn't there yet, and oscore_wrapper protects responses with the very same
922 # context they came in on).
923 responses_send_kid = False
925 #: The KID sent by this party when sending requests, or answering to group
926 #: requests.
927 sender_id: bytes
929 @staticmethod
930 def _compress(protected, unprotected, ciphertext):
931 """Pack the untagged COSE_Encrypt0 object described by the *args
932 into two bytestrings suitable for the Object-Security option and the
933 message body"""
935 if protected:
936 raise RuntimeError(
937 "Protection produced a message that has uncompressable fields."
938 )
940 piv = unprotected.pop(COSE_PIV, b"")
941 if len(piv) > COMPRESSION_BITS_N:
942 raise ValueError("Can't encode overly long partial IV")
944 firstbyte = len(piv)
945 if COSE_KID in unprotected:
946 firstbyte |= COMPRESSION_BIT_K
947 kid_data = unprotected.pop(COSE_KID)
948 else:
949 kid_data = b""
951 if COSE_KID_CONTEXT in unprotected:
952 firstbyte |= COMPRESSION_BIT_H
953 kid_context = unprotected.pop(COSE_KID_CONTEXT)
954 s = len(kid_context)
955 if s > 255:
956 raise ValueError("KID Context too long")
957 s_kid_context = bytes((s,)) + kid_context
958 else:
959 s_kid_context = b""
961 if COSE_COUNTERSIGNATURE0 in unprotected:
962 firstbyte |= COMPRESSION_BIT_GROUP
964 unprotected.pop(COSE_COUNTERSIGNATURE0)
966 # ciphertext will eventually also get the countersignature, but
967 # that happens later when the option is already processed.
969 if unprotected:
970 raise RuntimeError(
971 "Protection produced a message that has uncompressable fields."
972 )
974 if firstbyte:
975 option = bytes([firstbyte]) + piv + s_kid_context + kid_data
976 else:
977 option = b""
979 return (option, ciphertext)
981 def protect(self, message, request_id=None, *, kid_context=True):
982 """Given a plain CoAP message, create a protected message that contains
983 message's options in the inner or outer CoAP message as described in
986 If the message is a response to a previous message, the additional data
987 from unprotecting the request are passed in as request_id. When
988 request data is present, its partial IV is reused if possible. The
989 security context's ID context is encoded in the resulting message
990 unless kid_context is explicitly set to a False; other values for the
991 kid_context can be passed in as byte string in the same parameter.
992 """
994 _alglog.debug(
995 "Protecting message %s with context %s and request ID %s",
996 message,
997 self,
998 request_id,
999 )
1001 assert (request_id is None) == message.code.is_request(), (
1002 "Requestishness of code to protect does not match presence of request ID"
1003 )
1005 outer_message, plaintext = self._split_message(message, request_id)
1007 protected = {}
1008 nonce = None
1009 partial_iv_generated_by = None
1010 unprotected = {}
1011 if request_id is not None:
1012 partial_iv_generated_by, partial_iv_short = (
1013 request_id.get_reusable_kid_and_piv()
1014 )
1016 alg_symmetric = self.alg_group_enc if self.is_signing else self.alg_aead
1017 assert isinstance(alg_symmetric, AeadAlgorithm) or self.is_signing, (
1018 "Non-AEAD algorithms can only be used in signing modes."
1019 )
1021 if partial_iv_generated_by is None:
1022 nonce, partial_iv_short = self._build_new_nonce(alg_symmetric)
1023 partial_iv_generated_by = self.sender_id
1025 unprotected[COSE_PIV] = partial_iv_short
1026 else:
1027 nonce = self._construct_nonce(
1028 partial_iv_short, partial_iv_generated_by, alg_symmetric
1029 )
1031 if message.code.is_request():
1032 unprotected[COSE_KID] = self.sender_id
1034 request_id = RequestIdentifiers(
1035 self.sender_id,
1036 partial_iv_short,
1037 can_reuse_nonce=None,
1038 request_code=outer_message.code,
1039 )
1041 if kid_context is True:
1042 if self.id_context is not None:
1043 unprotected[COSE_KID_CONTEXT] = self.id_context
1044 elif kid_context is not False:
1045 unprotected[COSE_KID_CONTEXT] = kid_context
1046 else:
1047 if self.responses_send_kid:
1048 unprotected[COSE_KID] = self.sender_id
1050 # Putting in a dummy value as the signature calculation will already need some of the compression result
1051 if self.is_signing:
1052 unprotected[COSE_COUNTERSIGNATURE0] = b""
1053 # FIXME: Running this twice quite needlessly (just to get the oscore option for sending)
1054 option_data, _ = self._compress(protected, unprotected, b"")
1056 outer_message.opt.oscore = option_data
1058 external_aad = self._extract_external_aad(
1059 outer_message, request_id, local_is_sender=True
1060 )
1062 aad = SymmetricEncryptionAlgorithm._build_encrypt0_structure(
1063 protected, external_aad
1064 )
1066 key = self._get_sender_key(outer_message, external_aad, plaintext, request_id)
1068 _alglog.debug("Encrypting Encrypt0:")
1069 _alglog.debug("* aad = %s", aad.hex())
1070 _alglog.debug("* nonce = %s", nonce.hex())
1071 _alglog.debug("* key = %s", log_secret(key.hex()))
1072 _alglog.debug("* algorithm = %s", alg_symmetric)
1073 ciphertext = alg_symmetric.encrypt(plaintext, aad, key, nonce)
1075 _alglog.debug("Produced ciphertext %s", ciphertext.hex())
1077 _, payload = self._compress(protected, unprotected, ciphertext)
1079 if self.is_signing:
1080 signature = self.alg_signature.sign(payload, external_aad, self.private_key)
1081 # This is bordering "it's OK to log it in plain", because a reader
1082 # of the log can access both the plaintext and the ciphertext, but
1083 # still, it is called a key.
1084 _alglog.debug(
1085 "Producing keystream from signature encryption key: %s",
1086 log_secret(self.signature_encryption_key.hex()),
1087 )
1088 keystream = self._kdf_for_keystreams(
1089 partial_iv_generated_by,
1090 partial_iv_short,
1091 self.signature_encryption_key,
1092 self.sender_id,
1094 if message.code.is_request()
1096 )
1097 _alglog.debug("Keystream is %s", keystream.hex())
1098 encrypted_signature = _xor_bytes(signature, keystream)
1099 _alglog.debug("Encrypted signature %s", encrypted_signature.hex())
1100 payload += encrypted_signature
1101 outer_message.payload = payload
1103 # FIXME go through options section
1105 _alglog.debug(
1106 "Protecting the message succeeded, yielding ciphertext %s and request ID %s",
1107 outer_message,
1108 request_id,
1109 )
1110 # the request_id in the second argument should be discarded by the
1111 # caller when protecting a response -- is that reason enough for an
1112 # `if` and returning None?
1113 return outer_message, request_id
1115 def _get_sender_key(self, outer_message, aad, plaintext, request_id):
1116 """Customization hook of the protect function
1118 While most security contexts have a fixed sender key, deterministic
1119 requests need to shake up a few things. They need to modify the outer
1120 message, as well as the request_id as it will later be used to
1121 unprotect the response."""
1122 return self.sender_key
1124 def _split_message(self, message, request_id):
1125 """Given a protected message, return the outer message that contains
1126 all Class I and Class U options (but without payload or Object-Security
1127 option), and the encoded inner message that contains all Class E
1128 options and the payload.
1130 This leaves the messages' remotes unset."""
1132 if message.code.is_request():
1133 outer_host = message.opt.uri_host
1134 proxy_uri = message.opt.proxy_uri
1136 inner_message = message.copy(
1137 uri_host=None,
1138 uri_port=None,
1139 proxy_uri=None,
1140 proxy_scheme=None,
1141 )
1142 inner_message.remote = None
1144 if proxy_uri is not None:
1145 # Use set_request_uri to split up the proxy URI into its
1146 # components; extract, preserve and clear them.
1147 inner_message.set_request_uri(proxy_uri, set_uri_host=False)
1148 if inner_message.opt.proxy_uri is not None:
1149 raise ValueError("Can not split Proxy-URI into options")
1150 outer_uri = inner_message.remote.uri_base
1151 inner_message.remote = None
1152 inner_message.opt.proxy_scheme = None
1154 if message.opt.observe is None:
1155 outer_code = POST
1156 else:
1157 outer_code = FETCH
1158 else:
1159 outer_host = None
1160 proxy_uri = None
1162 inner_message = message.copy()
1164 outer_code = request_id.code_style.response
1166 # no max-age because these are always successsful responses
1167 outer_message = Message(
1168 code=outer_code,
1169 uri_host=outer_host,
1170 observe=None if message.code.is_response() else message.opt.observe,
1171 )
1172 if proxy_uri is not None:
1173 outer_message.set_request_uri(outer_uri)
1175 plaintext = bytes([inner_message.code]) + inner_message.opt.encode()
1176 if inner_message.payload:
1177 plaintext += bytes([0xFF])
1178 plaintext += inner_message.payload
1180 return outer_message, plaintext
1182 def _build_new_nonce(self, alg: SymmetricEncryptionAlgorithm):
1183 """This implements generation of a new nonce, assembled as per Figure 5
1184 of draft-ietf-core-object-security-06. Returns the shortened partial IV
1185 as well."""
1186 seqno = self.new_sequence_number()
1188 partial_iv = seqno.to_bytes(5, "big")
1190 return (
1191 self._construct_nonce(partial_iv, self.sender_id, alg),
1192 partial_iv.lstrip(b"\0") or b"\0",
1193 )
1195 # sequence number handling
1197 def new_sequence_number(self):
1198 """Return a new sequence number; the implementation is responsible for
1199 never returning the same value twice in a given security context.
1201 May raise ContextUnavailable."""
1202 retval = self.sender_sequence_number
1203 if retval >= MAX_SEQNO:
1204 raise ContextUnavailable("Sequence number too large, context is exhausted.")
1205 self.sender_sequence_number += 1
1206 self.post_seqnoincrease()
1207 return retval
1209 # implementation defined
1211 @abc.abstractmethod
1212 def post_seqnoincrease(self):
1213 """Ensure that sender_sequence_number is stored"""
1214 raise
1216 def context_from_response(self, unprotected_bag) -> CanUnprotect:
1217 """When receiving a response to a request protected with this security
1218 context, pick the security context with which to unprotect the response
1219 given the unprotected information from the Object-Security option.
1221 This allow picking the right security context in a group response, and
1222 helps getting a new short-lived context for B.2 mode. The default
1223 behaivor is returning self.
1224 """
1226 # FIXME justify by moving into a mixin for CanProtectAndUnprotect
1227 return self # type: ignore
1230class CanUnprotect(BaseSecurityContext):
1231 recipient_key: bytes
1233 def unprotect(self, protected_message, request_id=None):
1234 _alglog.debug(
1235 "Unprotecting message %s with context %s and request ID %s",
1236 protected_message,
1237 self,
1238 request_id,
1239 )
1241 assert (request_id is not None) == protected_message.code.is_response(), (
1242 "Requestishness of code to unprotect does not match presence of request ID"
1243 )
1244 is_response = protected_message.code.is_response()
1246 # Set to a raisable exception on replay check failures; it will be
1247 # raised, but the package may still be processed in the course of Echo handling.
1248 replay_error = None
1250 protected_serialized, protected, unprotected, ciphertext = (
1251 self._extract_encrypted0(protected_message)
1252 )
1254 if protected:
1255 raise ProtectionInvalid("The protected field is not empty")
1257 # FIXME check for duplicate keys in protected
1259 if unprotected.pop(COSE_KID_CONTEXT, self.id_context) != self.id_context:
1260 # FIXME is this necessary?
1261 raise ProtectionInvalid("Sender ID context does not match")
1263 if unprotected.pop(COSE_KID, self.recipient_id) != self.recipient_id:
1264 # for most cases, this is caught by the session ID dispatch, but in
1265 # responses (where explicit sender IDs are atypical), this is a
1266 # valid check
1267 raise ProtectionInvalid("Sender ID does not match")
1269 if COSE_PIV not in unprotected:
1270 if not is_response:
1271 raise ProtectionInvalid("No sequence number provided in request")
1273 seqno = None # sentinel for not striking out anyting
1274 partial_iv_short = request_id.partial_iv
1275 partial_iv_generated_by = request_id.kid
1276 else:
1277 partial_iv_short = unprotected.pop(COSE_PIV)
1278 partial_iv_generated_by = self.recipient_id
1280 seqno = int.from_bytes(partial_iv_short, "big")
1282 if not is_response:
1283 if not self.recipient_replay_window.is_initialized():
1284 replay_error = ReplayError("Sequence number check unavailable")
1285 elif not self.recipient_replay_window.is_valid(seqno):
1286 replay_error = ReplayError("Sequence number was re-used")
1288 if replay_error is not None and self.echo_recovery is None:
1289 # Don't even try decoding if there is no reason to
1290 raise replay_error
1292 request_id = RequestIdentifiers(
1293 partial_iv_generated_by,
1294 partial_iv_short,
1295 can_reuse_nonce=replay_error is None,
1296 request_code=protected_message.code,
1297 )
1299 external_aad = self._extract_external_aad(
1300 protected_message, request_id, local_is_sender=False
1301 )
1303 if unprotected.pop(COSE_COUNTERSIGNATURE0, None) is not None:
1304 try:
1305 alg_signature = self.alg_signature
1306 except NameError:
1307 raise DecodeError(
1308 "Group messages can not be decoded with this non-group context"
1309 )
1311 siglen = alg_signature.signature_length
1312 if len(ciphertext) < siglen:
1313 raise DecodeError("Message too short for signature")
1314 encrypted_signature = ciphertext[-siglen:]
1316 _alglog.debug(
1317 "Producing keystream from signature encryption key: %s",
1318 log_secret(self.signature_encryption_key.hex()),
1319 )
1320 keystream = self._kdf_for_keystreams(
1321 partial_iv_generated_by,
1322 partial_iv_short,
1323 self.signature_encryption_key,
1324 self.recipient_id,
1326 if protected_message.code.is_request()
1328 )
1329 _alglog.debug("Encrypted signature %s", encrypted_signature.hex())
1330 _alglog.debug("Keystream is %s", keystream.hex())
1331 signature = _xor_bytes(encrypted_signature, keystream)
1333 ciphertext = ciphertext[:-siglen]
1335 alg_signature.verify(
1336 signature, ciphertext, external_aad, self.recipient_public_key
1337 )
1339 alg_symmetric = self.alg_group_enc
1340 else:
1341 alg_symmetric = self.alg_aead
1343 if unprotected:
1344 raise DecodeError("Unsupported unprotected option")
1346 if (
1347 len(ciphertext) < self.alg_aead.tag_bytes + 1
1348 ): # +1 assures access to plaintext[0] (the code)
1349 raise ProtectionInvalid("Ciphertext too short")
1351 enc_structure = ["Encrypt0", protected_serialized, external_aad]
1352 aad = cbor.dumps(enc_structure)
1354 key = self._get_recipient_key(protected_message, alg_symmetric)
1356 nonce = self._construct_nonce(
1357 partial_iv_short, partial_iv_generated_by, alg_symmetric
1358 )
1360 _alglog.debug("Decrypting Encrypt0:")
1361 _alglog.debug("* ciphertext = %s", ciphertext.hex())
1362 _alglog.debug("* aad = %s", aad.hex())
1363 _alglog.debug("* nonce = %s", nonce.hex())
1364 _alglog.debug("* key = %s", log_secret(key.hex()))
1365 _alglog.debug("* algorithm = %s", alg_symmetric)
1366 try:
1367 plaintext = alg_symmetric.decrypt(ciphertext, aad, key, nonce)
1368 except Exception as e:
1369 _alglog.debug("Unprotecting failed")
1370 raise e
1372 self._post_decrypt_checks(
1373 external_aad, plaintext, protected_message, request_id
1374 )
1376 if not is_response and seqno is not None and replay_error is None:
1377 self.recipient_replay_window.strike_out(seqno)
1379 # FIXME add options from unprotected
1381 unprotected_message = Message(code=plaintext[0])
1382 unprotected_message.payload = unprotected_message.opt.decode(plaintext[1:])
1384 try_initialize = (
1385 not self.recipient_replay_window.is_initialized()
1386 and self.echo_recovery is not None
1387 )
1388 if try_initialize:
1389 if protected_message.code.is_request():
1390 # Either accept into replay window and clear replay error, or raise
1391 # something that can turn into a 4.01,Echo response
1392 if unprotected_message.opt.echo == self.echo_recovery:
1393 self.recipient_replay_window.initialize_from_freshlyseen(seqno)
1394 replay_error = None
1395 else:
1396 raise ReplayErrorWithEcho(
1397 secctx=self, request_id=request_id, echo=self.echo_recovery
1398 )
1399 else:
1400 # We can initialize the replay window from a response as well.
1401 # The response is guaranteed fresh as it was AEAD-decoded to
1402 # match a request sent by this process.
1403 #
1404 # This is rare, as it only works when the server uses an own
1405 # sequence number, eg. when sending a notification or when
1406 # acting again on a retransmitted safe request whose response
1407 # it did not cache.
1408 #
1409 # Nothing bad happens if we can't make progress -- we just
1410 # don't initialize the replay window that wouldn't have been
1411 # checked for a response anyway.
1412 if seqno is not None:
1413 self.recipient_replay_window.initialize_from_freshlyseen(seqno)
1415 if replay_error is not None:
1416 raise replay_error
1418 if unprotected_message.code.is_request():
1419 if protected_message.opt.observe != 0:
1420 unprotected_message.opt.observe = None
1421 else:
1422 if protected_message.opt.observe is not None:
1423 # -1 ensures that they sort correctly in later reordering
1424 # detection. Note that neither -1 nor high (>3 byte) sequence
1425 # numbers can be serialized in the Observe option, but they are
1426 # in this implementation accepted for passing around.
1427 unprotected_message.opt.observe = -1 if seqno is None else seqno
1429 _alglog.debug(
1430 "Unprotecting succeeded, yielding plaintext %s and request_id %s",
1431 unprotected_message,
1432 request_id,
1433 )
1434 return unprotected_message, request_id
1436 def _get_recipient_key(
1437 self, protected_message, algorithm: SymmetricEncryptionAlgorithm
1438 ):
1439 """Customization hook of the unprotect function
1441 While most security contexts have a fixed recipient key, group contexts
1442 have multiple, and deterministic requests build it on demand."""
1443 return self.recipient_key
1445 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id):
1446 """Customization hook of the unprotect function after decryption
1448 While most security contexts are good with the default checks,
1449 deterministic requests need to perform additional checks while AAD and
1450 plaintext information is still available, and modify the request_id for
1451 the later protection step of the response."""
1453 @staticmethod
1454 def _uncompress(option_data, payload):
1455 if option_data == b"":
1456 firstbyte = 0
1457 else:
1458 firstbyte = option_data[0]
1459 tail = option_data[1:]
1461 unprotected = {}
1463 if firstbyte & COMPRESSION_BITS_RESERVED:
1464 raise DecodeError("Protected data uses reserved fields")
1466 pivsz = firstbyte & COMPRESSION_BITS_N
1467 if pivsz:
1468 if len(tail) < pivsz:
1469 raise DecodeError("Partial IV announced but not present")
1470 unprotected[COSE_PIV] = tail[:pivsz]
1471 tail = tail[pivsz:]
1473 if firstbyte & COMPRESSION_BIT_H:
1474 # kid context hint
1475 s = tail[0]
1476 if len(tail) - 1 < s:
1477 raise DecodeError("Context hint announced but not present")
1478 tail = tail[1:]
1479 unprotected[COSE_KID_CONTEXT] = tail[:s]
1480 tail = tail[s:]
1482 if firstbyte & COMPRESSION_BIT_K:
1483 kid = tail
1484 unprotected[COSE_KID] = kid
1486 if firstbyte & COMPRESSION_BIT_GROUP:
1487 # Not really; As this is (also) used early on (before the KID
1488 # context is even known, because it's just getting extracted), this
1489 # is returning an incomplete value here and leaves it to the later
1490 # processing to strip the right number of bytes from the ciphertext
1493 return b"", {}, unprotected, payload
1495 @classmethod
1496 def _extract_encrypted0(cls, message):
1497 if message.opt.oscore is None:
1498 raise NotAProtectedMessage("No Object-Security option present", message)
1500 protected_serialized, protected, unprotected, ciphertext = cls._uncompress(
1501 message.opt.oscore, message.payload
1502 )
1503 return protected_serialized, protected, unprotected, ciphertext
1505 # implementation defined
1507 def context_for_response(self) -> CanProtect:
1508 """After processing a request with this context, with which security
1509 context should an outgoing response be protected? By default, it's the
1510 same context."""
1511 # FIXME: Is there any way in which the handler may want to influence
1512 # the decision taken here? Or would, then, the handler just call a more
1513 # elaborate but similar function when setting the response's remote
1514 # already?
1516 # FIXME justify by moving into a mixin for CanProtectAndUnprotect
1517 return self # type: ignore
1520class SecurityContextUtils(BaseSecurityContext):
1521 def _kdf(
1522 self,
1523 salt,
1524 ikm,
1525 role_id,
1526 out_type,
1527 key_alg: Optional[SymmetricEncryptionAlgorithm] = None,
1528 ):
1529 """The HKDF as used to derive sender and recipient key and IV in
1530 RFC8613 Section 3.2.1, and analogously the Group Encryption Key of oscore-groupcomm.
1531 """
1533 _alglog.debug("Deriving through KDF:")
1534 _alglog.debug("* salt = %s", salt.hex() if salt else salt)
1535 _alglog.debug("* ikm = %s", log_secret(ikm.hex()))
1536 _alglog.debug("* role_id = %s", role_id.hex())
1537 _alglog.debug("* out_type = %r", out_type)
1538 _alglog.debug("* key_alg = %r", key_alg)
1540 # The field in info is called `alg_aead` defined in RFC8613, but in
1541 # group OSCORE something that's very clearly *not* alg_aead is put in
1542 # there.
1543 #
1544 # The rules about this come both from
1545 #
1546 # and
1547 #
1548 # but they produce the same outcome.
1549 if hasattr(self, "alg_group_enc") and self.alg_group_enc is not None:
1550 the_field_called_alg_aead = self.alg_group_enc.value
1551 else:
1552 assert self.alg_aead is not None, (
1553 "At least alg_aead or alg_group_enc needs to be set on a context."
1554 )
1555 the_field_called_alg_aead = self.alg_aead.value
1557 assert (key_alg is None) ^ (out_type == "Key")
1558 if out_type == "Key":
1559 # Duplicate assertion needed while mypy can not see that the assert
1560 # above the if is stricter than this.
1561 assert key_alg is not None
1562 out_bytes = key_alg.key_bytes
1563 the_field_called_alg_aead = key_alg.value
1564 elif out_type == "IV":
1565 assert self.alg_aead is not None, (
1566 "At least alg_aead or alg_group_enc needs to be set on a context."
1567 )
1568 out_bytes = max(
1569 (
1570 a.iv_bytes
1571 for a in [self.alg_aead, getattr(self, "alg_group_enc", None)]
1572 if a is not None
1573 )
1574 )
1575 elif out_type == "SEKey":
1576 assert isinstance(self, GroupContext) and self.alg_group_enc is not None, (
1577 "SEKey derivation is only defined for group contexts with a group encryption algorithm."
1578 )
1579 # "While the obtained Signature Encryption Key is never used with
1580 # the Group Encryption Algorithm, its length was chosen to obtain a
1581 # matching level of security."
1582 out_bytes = self.alg_group_enc.key_bytes
1583 else:
1584 raise ValueError("Output type not recognized")
1586 _alglog.debug("* the_field_called_alg_aead = %s", the_field_called_alg_aead)
1588 info = [
1589 role_id,
1590 self.id_context,
1591 the_field_called_alg_aead,
1592 out_type,
1593 out_bytes,
1594 ]
1595 _alglog.debug("* info = %r", info)
1596 ret = self._kdf_lowlevel(salt, ikm, info, out_bytes)
1597 _alglog.debug("Derivation of %r produced %s", out_type, log_secret(ret.hex()))
1598 return ret
1600 def _kdf_for_keystreams(self, piv_generated_by, salt, ikm, role_id, out_type):
1601 """The HKDF as used to derive the keystreams of oscore-groupcomm."""
1603 out_bytes = self.alg_signature.signature_length
1605 assert out_type in (
1608 ), "Output type not recognized"
1610 info = [
1611 piv_generated_by,
1612 self.id_context,
1613 out_type,
1614 out_bytes,
1615 ]
1616 return self._kdf_lowlevel(salt, ikm, info, out_bytes)
1618 def _kdf_lowlevel(self, salt: bytes, ikm: bytes, info: list, l: int) -> bytes: # noqa: E741 (signature follows RFC definition)
1619 """The HKDF function as used in RFC8613 and oscore-groupcomm (notated
1620 there as ``something = HKDF(...)``
1622 Note that `info` typically contains `L` at some point.
1624 When `info` takes the conventional structure of pid, id_context,
1625 ald_aead, type, L], it may make sense to extend the `_kdf` function to
1626 support that case, or `_kdf_for_keystreams` for a different structure, as
1627 they are the more high-level tools."""
1628 hkdf = HKDF(
1629 algorithm=self.hashfun,
1630 length=l,
1631 salt=salt,
1632 info=cbor.dumps(info),
1633 backend=_hash_backend,
1634 )
1635 expanded = hkdf.derive(ikm)
1636 return expanded
1638 def derive_keys(self, master_salt, master_secret):
1639 """Populate sender_key, recipient_key and common_iv from the algorithm,
1640 hash function and id_context already configured beforehand, and from
1641 the passed salt and secret."""
1643 self.sender_key = self._kdf(
1644 master_salt, master_secret, self.sender_id, "Key", self.alg_aead
1645 )
1646 self.recipient_key = self._kdf(
1647 master_salt, master_secret, self.recipient_id, "Key", self.alg_aead
1648 )
1650 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV")
1652 # really more of the Credentials interface
1654 def get_oscore_context_for(self, unprotected):
1655 """Return a sutiable context (most easily self) for an incoming request
1656 if its unprotected data (COSE_KID, COSE_KID_CONTEXT) fit its
1657 description. If it doesn't match, it returns None.
1659 The default implementation just strictly checks for whether kid and any
1660 kid context match (not matching if a local KID context is set but none
1661 is given in the request); modes like Group OSCORE can spin up aspect
1662 objects here.
1663 """
1664 if (
1665 unprotected.get(COSE_KID, None) == self.recipient_id
1666 and unprotected.get(COSE_KID_CONTEXT, None) == self.id_context
1667 ):
1668 return self
1671class ReplayWindow:
1672 """A regular replay window of a fixed size.
1674 It is implemented as an index and a bitfield (represented by an integer)
1675 whose least significant bit represents the seqyence number of the index,
1676 and a 1 indicates that a number was seen. No shenanigans around implicit
1677 leading ones (think floating point normalization) happen.
1679 >>> w = ReplayWindow(32, lambda: None)
1680 >>> w.initialize_empty()
1681 >>> w.strike_out(5)
1682 >>> w.is_valid(3)
1683 True
1684 >>> w.is_valid(5)
1685 False
1686 >>> w.strike_out(0)
1687 >>> w.strike_out(1)
1688 >>> w.strike_out(2)
1689 >>> w.is_valid(1)
1690 False
1692 Jumping ahead by the window size invalidates older numbers:
1694 >>> w.is_valid(4)
1695 True
1696 >>> w.strike_out(35)
1697 >>> w.is_valid(4)
1698 True
1699 >>> w.strike_out(36)
1700 >>> w.is_valid(4)
1701 False
1703 Usage safety
1704 ------------
1706 For every key, the replay window can only be initielized empty once. On
1707 later uses, it needs to be persisted by storing the output of
1708 self.persist() somewhere and loaded from that persisted data.
1710 It is acceptable to store persistance data in the strike_out_callback, but
1711 that must then ensure that the data is written (flushed to a file or
1712 committed to a database), but that is usually inefficient.
1714 Stability
1715 ---------
1717 This class is not considered for stabilization yet and an implementation
1718 detail of the SecurityContext implementation(s).
1719 """
1721 _index = None
1722 """Sequence number represented by the least significant bit of _bitfield"""
1723 _bitfield = None
1724 """Integer interpreted as a bitfield, self._size wide. A digit 1 at any bit
1725 indicates that the bit's index (its power of 2) plus self._index was
1726 already seen."""
1728 def __init__(self, size, strike_out_callback):
1729 self._size = size
1730 self.strike_out_callback = strike_out_callback
1732 def is_initialized(self):
1733 return self._index is not None
1735 def initialize_empty(self):
1736 self._index = 0
1737 self._bitfield = 0
1739 def initialize_from_persisted(self, persisted):
1740 self._index = persisted["index"]
1741 self._bitfield = persisted["bitfield"]
1743 def initialize_from_freshlyseen(self, seen):
1744 """Initialize the replay window with a particular value that is just
1745 being observed in a fresh (ie. generated by the peer later than any
1746 messages processed before state was lost here) message. This marks the
1747 seen sequence number and all preceding it as invalid, and and all later
1748 ones as valid."""
1749 self._index = seen
1750 self._bitfield = 1
1752 def is_valid(self, number):
1753 if number < self._index:
1754 return False
1755 if number >= self._index + self._size:
1756 return True
1757 return (self._bitfield >> (number - self._index)) & 1 == 0
1759 def strike_out(self, number):
1760 if not self.is_valid(number):
1761 raise ValueError(
1762 "Sequence number is not valid any more and "
1763 "thus can't be removed from the window"
1764 )
1765 overshoot = number - (self._index + self._size - 1)
1766 if overshoot > 0:
1767 self._index += overshoot
1768 self._bitfield >>= overshoot
1769 assert self.is_valid(number), "Sequence number was not valid before strike-out"
1770 self._bitfield |= 1 << (number - self._index)
1772 self.strike_out_callback()
1774 def persist(self):
1775 """Return a dict containing internal state which can be passed to init
1776 to recreated the replay window."""
1778 return {"index": self._index, "bitfield": self._bitfield}
1781class FilesystemSecurityContext(
1782 CanProtect, CanUnprotect, SecurityContextUtils, credentials._Objectish
1784 """Security context stored in a directory as distinct files containing
1785 containing
1787 * Master secret, master salt, sender and recipient ID,
1788 optionally algorithm, the KDF hash function, and replay window size
1789 (settings.json and secrets.json, where the latter is typically readable
1790 only for the user)
1791 * sequence numbers and replay windows (sequence.json, the only file the
1792 process needs write access to)
1794 The static parameters can all either be placed in settings.json or
1795 secrets.json, but must not be present in both; the presence of either file
1796 is sufficient.
1798 .. warning::
1800 Security contexts must never be copied around and used after another
1801 copy was used. They should only ever be moved, and if they are copied
1802 (eg. as a part of a system backup), restored contexts must not be used
1803 again; they need to be replaced with freshly created ones.
1805 An additional file named `lock` is created to prevent the accidental use of
1806 a context by to concurrent programs.
1808 Note that the sequence number file is updated in an atomic fashion which
1809 requires file creation privileges in the directory. If privilege separation
1810 between settings/key changes and sequence number changes is desired, one
1811 way to achieve that on Linux is giving the aiocoap process's user group
1812 write permissions on the directory and setting the sticky bit on the
1813 directory, thus forbidding the user to remove the settings/secret files not
1814 owned by him.
1816 Writes due to sent sequence numbers are reduced by applying a variation on
1817 the mechanism of RFC8613 Appendix B.1.1 (incrementing the persisted sender
1818 seqence number in steps of `k`). That value is automatically grown from
1819 sequence_number_chunksize_start up to sequence_number_chunksize_limit.
1820 At runtime, the receive window is not stored but kept indeterminate. In
1821 case of an abnormal shutdown, the server uses the mechanism described in
1822 Appendix B.1.2 to recover.
1823 """
1825 # possibly overridden in constructor
1826 #
1827 # Type is ignored because while it *is* AlgAead, mypy can't tell.
1828 alg_aead = algorithms[DEFAULT_ALGORITHM] # type: ignore
1830 class LoadError(ValueError):
1831 """Exception raised with a descriptive message when trying to load a
1832 faulty security context"""
1834 def __init__(
1835 self,
1836 basedir: str,
1837 sequence_number_chunksize_start=10,
1838 sequence_number_chunksize_limit=10000,
1839 ):
1840 self.basedir = basedir
1842 self.lockfile: Optional[filelock.FileLock] = filelock.FileLock(
1843 os.path.join(basedir, "lock")
1844 )
1845 # 0.001: Just fail if it can't be acquired
1846 # See
1847 try:
1848 self.lockfile.acquire(timeout=0.001)
1849 # see
1850 except: # noqa: E722
1851 # No lock, no loading, no need to fail in __del__
1852 self.lockfile = None
1853 raise
1855 # Always enabled as committing to a file for every received request
1856 # would be a terrible burden.
1857 self.echo_recovery = secrets.token_bytes(8)
1859 try:
1860 self._load()
1861 except KeyError as k:
1862 raise self.LoadError("Configuration key missing: %s" % (k.args[0],))
1864 self.sequence_number_chunksize_start = sequence_number_chunksize_start
1865 self.sequence_number_chunksize_limit = sequence_number_chunksize_limit
1866 self.sequence_number_chunksize = sequence_number_chunksize_start
1868 self.sequence_number_persisted = self.sender_sequence_number
1870 def _load(self):
1871 # doesn't check for KeyError on every occasion, relies on __init__ to
1872 # catch that
1874 data = {}
1875 for readfile in ("secret.json", "settings.json"):
1876 try:
1877 with open(os.path.join(self.basedir, readfile)) as f:
1878 filedata = json.load(f)
1879 except FileNotFoundError:
1880 continue
1882 for key, value in filedata.items():
1883 if key.endswith("_hex"):
1884 key = key[:-4]
1885 value = binascii.unhexlify(value)
1886 elif key.endswith("_ascii"):
1887 key = key[:-6]
1888 value = value.encode("ascii")
1890 if key in data:
1891 raise self.LoadError(
1892 "Datum %r present in multiple input files at %r."
1893 % (key, self.basedir)
1894 )
1896 data[key] = value
1898 self.alg_aead = algorithms[data.get("algorithm", DEFAULT_ALGORITHM)]
1899 self.hashfun = hashfunctions[data.get("kdf-hashfun", DEFAULT_HASHFUNCTION)]
1901 windowsize = data.get("window", DEFAULT_WINDOWSIZE)
1902 if not isinstance(windowsize, int):
1903 raise self.LoadError("Non-integer replay window")
1905 self.sender_id = data["sender-id"]
1906 self.recipient_id = data["recipient-id"]
1908 if (
1909 max(len(self.sender_id), len(self.recipient_id))
1910 > self.alg_aead.iv_bytes - 6
1911 ):
1912 raise self.LoadError(
1913 "Sender or Recipient ID too long (maximum length %s for this algorithm)"
1914 % (self.alg_aead.iv_bytes - 6)
1915 )
1917 master_secret = data["secret"]
1918 master_salt = data.get("salt", b"")
1919 self.id_context = data.get("id-context", None)
1921 self.derive_keys(master_salt, master_secret)
1923 self.recipient_replay_window = ReplayWindow(
1924 windowsize, self._replay_window_changed
1925 )
1926 try:
1927 with open(os.path.join(self.basedir, "sequence.json")) as f:
1928 sequence = json.load(f)
1929 except FileNotFoundError:
1930 self.sender_sequence_number = 0
1931 self.recipient_replay_window.initialize_empty()
1932 self.replay_window_persisted = True
1933 else:
1934 self.sender_sequence_number = int(sequence["next-to-send"])
1935 received = sequence["received"]
1936 if received == "unknown":
1937 # The replay window will stay uninitialized, which triggers
1938 # Echo recovery
1939 self.replay_window_persisted = False
1940 else:
1941 try:
1942 self.recipient_replay_window.initialize_from_persisted(received)
1943 except (ValueError, TypeError, KeyError):
1944 # Not being particularly careful about what could go wrong: If
1945 # someone tampers with the replay data, we're already in *big*
1946 # trouble, of which I fail to see how it would become worse
1947 # than a crash inside the application around "failure to
1948 # right-shift a string" or that like; at worst it'd result in
1949 # nonce reuse which tampering with the replay window file
1950 # already does.
1951 raise self.LoadError(
1952 "Persisted replay window state was not understood"
1953 )
1954 self.replay_window_persisted = True
1956 # This is called internally whenever a new sequence number is taken or
1957 # crossed out from the window, and blocks a lot; B.1 mode mitigates that.
1958 #
1959 # Making it async and block in a threadpool would mitigate the blocking of
1960 # other messages, but the more visible effect of this will be that no
1961 # matter if sync or async, a reply will need to wait for a file sync
1962 # operation to conclude.
1963 def _store(self):
1964 tmphand, tmpnam = tempfile.mkstemp(
1965 dir=self.basedir, prefix=".sequence-", suffix=".json", text=True
1966 )
1968 data = {"next-to-send": self.sequence_number_persisted}
1969 if not self.replay_window_persisted:
1970 data["received"] = "unknown"
1971 else:
1972 data["received"] = self.recipient_replay_window.persist()
1974 # Using (instead os.fdopen) and binary / write with encode
1975 # rather than dumps as that works even while the interpreter is
1976 # shutting down.
1977 #
1978 # This can be relaxed when there is a defined shutdown sequence for
1979 # security contexts that's triggered from the general context shutdown
1980 # -- but right now, there isn't.
1981 with, "wb") as tmpfile:
1982 tmpfile.write(json.dumps(data).encode("utf8"))
1983 tmpfile.flush()
1984 os.fsync(tmpfile.fileno())
1986 os.replace(tmpnam, os.path.join(self.basedir, "sequence.json"))
1988 def _replay_window_changed(self):
1989 if self.replay_window_persisted:
1990 # Just remove the sequence numbers once from the file
1991 self.replay_window_persisted = False
1992 self._store()
1994 def post_seqnoincrease(self):
1995 if self.sender_sequence_number > self.sequence_number_persisted:
1996 self.sequence_number_persisted += self.sequence_number_chunksize
1998 self.sequence_number_chunksize = min(
1999 self.sequence_number_chunksize * 2, self.sequence_number_chunksize_limit
2000 )
2001 # FIXME: this blocks -- see
2002 self._store()
2004 # The = case would only happen if someone deliberately sets all
2005 # numbers to 1 to force persisting on every step
2006 assert self.sender_sequence_number <= self.sequence_number_persisted, (
2007 "Using a sequence number that has been persisted already"
2008 )
2010 def _destroy(self):
2011 """Release the lock file, and ensure tha he object has become
2012 unusable.
2014 If there is unpersisted state from B.1 operation, the actually used
2015 number and replay window gets written back to the file to allow
2016 resumption without wasting digits or round-trips.
2017 """
2018 # FIXME: Arrange for a more controlled shutdown through the credentials
2020 self.replay_window_persisted = True
2021 self.sequence_number_persisted = self.sender_sequence_number
2022 self._store()
2024 del self.sender_key
2025 del self.recipient_key
2027 os.unlink(self.lockfile.lock_file)
2028 self.lockfile.release()
2030 self.lockfile = None
2032 def __del__(self):
2033 if self.lockfile is not None:
2034 self._destroy()
2036 @classmethod
2037 def from_item(cls, init_data):
2038 """Overriding _Objectish's from_item because the parameter name for
2039 basedir is contextfile for historical reasons"""
2041 def constructor(
2042 basedir: Optional[str] = None, contextfile: Optional[str] = None
2043 ):
2044 if basedir is not None and contextfile is not None:
2045 raise credentials.CredentialsLoadError(
2046 "Conflicting arguments basedir and contextfile; just contextfile instead"
2047 )
2048 if basedir is None and contextfile is None:
2049 raise credentials.CredentialsLoadError("Missing item 'basedir'")
2050 if contextfile is not None:
2051 warnings.warn(
2052 "Property contextfile was renamed to basedir in OSCORE credentials entries",
2053 DeprecationWarning,
2054 stacklevel=2,
2055 )
2056 basedir = contextfile
2057 assert (
2058 basedir is not None
2059 ) # This helps mypy which would otherwise not see that the above ensures this already
2060 return cls(basedir)
2062 return credentials._call_from_structureddata(
2063 constructor, cls.__name__, init_data
2064 )
2066 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
2067 return set((self.recipient_id,))
2070class GroupContext(ContextWhereExternalAadIsGroup, BaseSecurityContext):
2071 is_signing = True
2072 responses_send_kid = True
2074 @abc.abstractproperty
2075 def private_key(self):
2076 """Private key used to sign outgoing messages.
2078 Contexts not designed to send messages may raise a RuntimeError here;
2079 that necessity may later go away if some more accurate class modelling
2080 is found."""
2082 @abc.abstractproperty
2083 def recipient_public_key(self):
2084 """Public key used to verify incoming messages.
2086 Contexts not designed to receive messages (because they'd have aspects
2087 for that) may raise a RuntimeError here; that necessity may later go
2088 away if some more accurate class modelling is found."""
2091class SimpleGroupContext(GroupContext, CanProtect, CanUnprotect, SecurityContextUtils):
2092 """A context for an OSCORE group
2094 This is a non-persistable version of a group context that does not support
2095 any group manager or rekeying; it is set up statically at startup.
2097 It is intended for experimentation and demos, but aims to be correct enough
2098 to be usable securely.
2099 """
2101 # set during initialization (making all those attributes rather than
2102 # possibly properties as they might be in super)
2103 sender_id = None # type: ignore
2104 id_context = None # type: ignore
2105 private_key = None
2106 alg_aead = None
2107 hashfun = None # type: ignore
2108 alg_signature = None
2109 alg_group_enc = None
2110 alg_pairwise_key_agreement = None
2111 sender_auth_cred = None # type: ignore
2112 group_manager_cred = None # type: ignore
2113 cred_fmt = None
2114 # This is currently not evaluated, but any GM interaction will need to have this information available.
2115 group_manager_cred_fmt = None
2117 def __init__(
2118 self,
2119 alg_aead,
2120 hashfun,
2121 alg_signature,
2122 alg_group_enc,
2123 alg_pairwise_key_agreement,
2124 group_id,
2125 master_secret,
2126 master_salt,
2127 sender_id,
2128 private_key,
2129 sender_auth_cred,
2130 peers,
2131 group_manager_cred,
2132 cred_fmt=COSE_KCCS,
2133 group_manager_cred_fmt=COSE_KCCS,
2134 ):
2135 self.sender_id = sender_id
2136 self.id_context = group_id
2137 self.private_key = private_key
2138 self.alg_aead = alg_aead
2139 self.hashfun = hashfun
2140 self.alg_signature = alg_signature
2141 self.alg_group_enc = alg_group_enc
2142 self.alg_pairwise_key_agreement = alg_pairwise_key_agreement
2143 self.sender_auth_cred = sender_auth_cred
2144 self.group_manager_cred = group_manager_cred
2145 self.cred_fmt = cred_fmt
2146 self.group_manager_cred_fmt = group_manager_cred_fmt
2148 self.peers = peers.keys()
2149 self.recipient_public_keys = {
2150 k: self._parse_credential(v) for (k, v) in peers.items()
2151 }
2152 self.recipient_auth_creds = peers
2153 self.recipient_replay_windows = {}
2154 for k in self.peers:
2155 # no need to persist, the whole group is ephemeral
2156 w = ReplayWindow(32, lambda: None)
2157 w.initialize_empty()
2158 self.recipient_replay_windows[k] = w
2160 self.derive_keys(master_salt, master_secret)
2161 self.sender_sequence_number = 0
2163 sender_public_key = self._parse_credential(sender_auth_cred)
2164 if (
2165 self.alg_signature.public_from_private(self.private_key)
2166 != sender_public_key
2167 ):
2168 raise ValueError(
2169 "The key in the provided sender credential does not match the private key"
2170 )
2172 def _parse_credential(self, credential: bytes):
2173 """Extract the public key (in the public_key format the respective
2174 AlgorithmCountersign needs) from credentials. This raises a ValueError
2175 if the credentials do not match the group's cred_fmt, or if the
2176 parameters do not match those configured in the group.
2178 This currently discards any information that is present in the
2179 credential that exceeds the key. (In a future version, this could
2180 return both the key and extracted other data, where that other data
2181 would be stored with the peer this is parsed from).
2182 """
2184 if self.cred_fmt != COSE_KCCS:
2185 raise ValueError(
2186 "Credential parsing is currently only implemented for CCSs"
2187 )
2189 assert self.alg_signature is not None
2191 return self.alg_signature.from_kccs(credential)
2193 def __repr__(self):
2194 return "<%s with group %r sender_id %r and %d peers>" % (
2195 type(self).__name__,
2196 self.id_context.hex(),
2197 self.sender_id.hex(),
2198 len(self.peers),
2199 )
2201 @property
2202 def recipient_public_key(self):
2203 raise RuntimeError(
2204 "Group context without key indication was used for verification"
2205 )
2207 def _get_sender_key(self, outer_message, aad, plaintext, request_id):
2208 # If we even get here, there has to be a alg_group_enc, and thus the sender key does match it
2209 return self._sender_key
2211 def derive_keys(self, master_salt, master_secret):
2212 the_main_alg = (
2213 self.alg_group_enc if self.alg_group_enc is not None else self.alg_aead
2214 )
2216 self._sender_key = self._kdf(
2217 master_salt, master_secret, self.sender_id, "Key", the_main_alg
2218 )
2219 self.recipient_keys = {
2220 recipient_id: self._kdf(
2221 master_salt, master_secret, recipient_id, "Key", the_main_alg
2222 )
2223 for recipient_id in self.peers
2224 }
2226 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV")
2228 self.signature_encryption_key = self._kdf(
2229 master_salt, master_secret, b"", "SEKey"
2230 )
2232 def post_seqnoincrease(self):
2233 """No-op because it's ephemeral"""
2235 def context_from_response(self, unprotected_bag) -> CanUnprotect:
2236 # sender ID *needs to be* here -- if this were a pairwise request, it
2237 # would not run through here
2238 try:
2239 sender_kid = unprotected_bag[COSE_KID]
2240 except KeyError:
2241 raise DecodeError("Group server failed to send own sender KID")
2243 if COSE_COUNTERSIGNATURE0 in unprotected_bag:
2244 return _GroupContextAspect(self, sender_kid)
2245 else:
2246 return _PairwiseContextAspect(self, sender_kid)
2248 def get_oscore_context_for(self, unprotected):
2249 if unprotected.get(COSE_KID_CONTEXT, None) != self.id_context:
2250 return None
2252 kid = unprotected.get(COSE_KID, None)
2253 if kid in self.peers:
2254 if COSE_COUNTERSIGNATURE0 in unprotected:
2255 return _GroupContextAspect(self, kid)
2256 elif self.recipient_public_keys[kid] is DETERMINISTIC_KEY:
2257 return _DeterministicUnprotectProtoAspect(self, kid)
2258 else:
2259 return _PairwiseContextAspect(self, kid)
2261 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
2262 # not conflicting: groups always send KID Context
2263 return set()
2265 # yet to stabilize...
2267 def pairwise_for(self, recipient_id):
2268 return _PairwiseContextAspect(self, recipient_id)
2270 def for_sending_deterministic_requests(
2271 self, deterministic_id, target_server: Optional[bytes]
2272 ):
2273 return _DeterministicProtectProtoAspect(self, deterministic_id, target_server)
2276class _GroupContextAspect(GroupContext, CanUnprotect, SecurityContextUtils):
2277 """The concrete context this host has with a particular peer
2279 As all actual data is stored in the underlying groupcontext, this acts as
2280 an accessor to that object (which picks the right recipient key).
2282 This accessor is for receiving messages in group mode from a particular
2283 peer; it does not send (and turns into a pairwise context through
2284 context_for_response before it comes to that).
2285 """
2287 def __init__(self, groupcontext: GroupContext, recipient_id: bytes) -> None:
2288 self.groupcontext = groupcontext
2289 self.recipient_id = recipient_id
2291 def __repr__(self):
2292 return "<%s inside %r with the peer %r>" % (
2293 type(self).__name__,
2294 self.groupcontext,
2295 self.recipient_id.hex(),
2296 )
2298 private_key = None
2300 # not inline because the equivalent lambda would not be recognized by mypy
2301 # (workaround for <>)
2302 @property
2303 def id_context(self):
2304 return self.groupcontext.id_context
2306 @property
2307 def alg_aead(self):
2308 return self.groupcontext.alg_aead
2310 @property
2311 def alg_signature(self):
2312 return self.groupcontext.alg_signature
2314 @property
2315 def alg_group_enc(self):
2316 return self.groupcontext.alg_group_enc
2318 @property
2319 def alg_pairwise_key_agreement(self):
2320 return self.groupcontext.alg_pairwise_key_agreement
2322 @property
2323 def group_manager_cred(self):
2324 return self.groupcontext.group_manager_cred
2326 @property
2327 def common_iv(self):
2328 return self.groupcontext.common_iv
2330 @property
2331 def hashfun(self):
2332 return self.groupcontext.hashfun
2334 @property
2335 def signature_encryption_key(self):
2336 return self.groupcontext.signature_encryption_key
2338 @property
2339 def recipient_key(self):
2340 # If we even get here, there has to be a alg_group_enc, and thus the recipient key does match it
2341 return self.groupcontext.recipient_keys[self.recipient_id]
2343 @property
2344 def recipient_public_key(self):
2345 return self.groupcontext.recipient_public_keys[self.recipient_id]
2347 @property
2348 def recipient_auth_cred(self):
2349 return self.groupcontext.recipient_auth_creds[self.recipient_id]
2351 @property
2352 def recipient_replay_window(self):
2353 return self.groupcontext.recipient_replay_windows[self.recipient_id]
2355 def context_for_response(self):
2356 return self.groupcontext.pairwise_for(self.recipient_id)
2358 @property
2359 def sender_auth_cred(self):
2360 raise RuntimeError(
2361 "Could relay the sender auth credential from the group context, but it shouldn't matter here"
2362 )
2365class _PairwiseContextAspect(
2366 GroupContext, CanProtect, CanUnprotect, SecurityContextUtils
2368 is_signing = False
2370 def __init__(self, groupcontext, recipient_id):
2371 self.groupcontext = groupcontext
2372 self.recipient_id = recipient_id
2374 shared_secret = self.alg_pairwise_key_agreement.staticstatic(
2375 self.groupcontext.private_key,
2376 self.groupcontext.recipient_public_keys[recipient_id],
2377 )
2379 self.sender_key = self._kdf(
2380 self.groupcontext._sender_key,
2381 (
2382 self.groupcontext.sender_auth_cred
2383 + self.groupcontext.recipient_auth_creds[recipient_id]
2384 + shared_secret
2385 ),
2386 self.groupcontext.sender_id,
2387 "Key",
2388 self.alg_group_enc if self.is_signing else self.alg_aead,
2389 )
2390 self.recipient_key = self._kdf(
2391 self.groupcontext.recipient_keys[recipient_id],
2392 (
2393 self.groupcontext.recipient_auth_creds[recipient_id]
2394 + self.groupcontext.sender_auth_cred
2395 + shared_secret
2396 ),
2397 self.recipient_id,
2398 "Key",
2399 self.alg_group_enc if self.is_signing else self.alg_aead,
2400 )
2402 def __repr__(self):
2403 return "<%s based on %r with the peer %r>" % (
2404 type(self).__name__,
2405 self.groupcontext,
2406 self.recipient_id.hex(),
2407 )
2409 # FIXME: actually, only to be sent in requests
2411 # not inline because the equivalent lambda would not be recognized by mypy
2412 # (workaround for <>)
2413 @property
2414 def id_context(self):
2415 return self.groupcontext.id_context
2417 @property
2418 def alg_aead(self):
2419 return self.groupcontext.alg_aead
2421 @property
2422 def hashfun(self):
2423 return self.groupcontext.hashfun
2425 @property
2426 def alg_signature(self):
2427 return self.groupcontext.alg_signature
2429 @property
2430 def alg_group_enc(self):
2431 return self.groupcontext.alg_group_enc
2433 @property
2434 def alg_pairwise_key_agreement(self):
2435 return self.groupcontext.alg_pairwise_key_agreement
2437 @property
2438 def group_manager_cred(self):
2439 return self.groupcontext.group_manager_cred
2441 @property
2442 def common_iv(self):
2443 return self.groupcontext.common_iv
2445 @property
2446 def sender_id(self):
2447 return self.groupcontext.sender_id
2449 @property
2450 def recipient_auth_cred(self):
2451 return self.groupcontext.recipient_auth_creds[self.recipient_id]
2453 @property
2454 def sender_auth_cred(self):
2455 return self.groupcontext.sender_auth_cred
2457 @property
2458 def recipient_replay_window(self):
2459 return self.groupcontext.recipient_replay_windows[self.recipient_id]
2461 # Set at initialization (making all those attributes rather than
2462 # possibly properties as they might be in super)
2463 recipient_key = None # type: ignore
2464 sender_key = None
2466 @property
2467 def sender_sequence_number(self):
2468 return self.groupcontext.sender_sequence_number
2470 @sender_sequence_number.setter
2471 def sender_sequence_number(self, new):
2472 self.groupcontext.sender_sequence_number = new
2474 def post_seqnoincrease(self):
2475 self.groupcontext.post_seqnoincrease()
2477 # same here -- not needed because not signing
2478 private_key = property(post_seqnoincrease)
2479 recipient_public_key = property(post_seqnoincrease)
2481 def context_from_response(self, unprotected_bag) -> CanUnprotect:
2482 if unprotected_bag.get(COSE_KID, self.recipient_id) != self.recipient_id:
2483 raise DecodeError(
2484 "Response coming from a different server than requested, not attempting to decrypt"
2485 )
2487 if COSE_COUNTERSIGNATURE0 in unprotected_bag:
2488 # It'd be an odd thing to do, but it's source verified, so the
2489 # server hopefully has reasons to make this readable to other group
2490 # members.
2491 return _GroupContextAspect(self.groupcontext, self.recipient_id)
2492 else:
2493 return self
2496class _DeterministicProtectProtoAspect(
2497 ContextWhereExternalAadIsGroup, CanProtect, SecurityContextUtils
2499 """This implements the sending side of Deterministic Requests.
2501 While simialr to a _PairwiseContextAspect, it only derives the key at
2502 protection time, as the plain text is hashed into the key."""
2504 deterministic_hashfun = hashes.SHA256()
2506 def __init__(self, groupcontext, sender_id, target_server: Optional[bytes]):
2507 self.groupcontext = groupcontext
2508 self.sender_id = sender_id
2509 self.target_server = target_server
2511 def __repr__(self):
2512 return "<%s based on %r with the sender ID %r%s>" % (
2513 type(self).__name__,
2514 self.groupcontext,
2515 self.sender_id.hex(),
2516 "limited to responses from %s" % self.target_server
2517 if self.target_server is not None
2518 else "",
2519 )
2521 def new_sequence_number(self):
2522 return 0
2524 def post_seqnoincrease(self):
2525 pass
2527 def context_from_response(self, unprotected_bag):
2528 if self.target_server is None:
2529 if COSE_KID not in unprotected_bag:
2530 raise DecodeError(
2531 "Server did not send a KID and no particular one was addressed"
2532 )
2533 else:
2534 if unprotected_bag.get(COSE_KID, self.target_server) != self.target_server:
2535 raise DecodeError(
2536 "Response coming from a different server than requested, not attempting to decrypt"
2537 )
2539 if COSE_COUNTERSIGNATURE0 not in unprotected_bag:
2540 # Could just as well pass and later barf when the group context doesn't find a signature
2541 raise DecodeError(
2542 "Response to deterministic request came from unsecure pairwise context"
2543 )
2545 return _GroupContextAspect(
2546 self.groupcontext, unprotected_bag.get(COSE_KID, self.target_server)
2547 )
2549 def _get_sender_key(self, outer_message, aad, plaintext, request_id):
2550 if outer_message.code.is_response():
2551 raise RuntimeError("Deterministic contexts shouldn't protect responses")
2553 basekey = self.groupcontext.recipient_keys[self.sender_id]
2555 h = hashes.Hash(self.deterministic_hashfun)
2556 h.update(basekey)
2557 h.update(aad)
2558 h.update(plaintext)
2559 request_hash = h.finalize()
2561 outer_message.opt.request_hash = request_hash
2562 outer_message.code = FETCH
2564 # By this time, the AADs have all been calculated already; setting this
2565 # for the benefit of the response parsing later
2566 request_id.request_hash = request_hash
2567 # FIXME I don't think this ever comes to bear but want to be sure
2568 # before removing this line (this should only be client-side)
2569 request_id.can_reuse_nonce = False
2570 # FIXME: we're still sending a h'00' PIV. Not wrong, just a wasted byte.
2572 return self._kdf(basekey, request_hash, self.sender_id, "Key", self.alg_aead)
2574 # details needed for various operations, especially eAAD generation
2576 # not inline because the equivalent lambda would not be recognized by mypy
2577 # (workaround for <>)
2578 @property
2579 def alg_aead(self):
2580 return self.groupcontext.alg_aead
2582 @property
2583 def hashfun(self):
2584 return self.groupcontext.hashfun
2586 @property
2587 def common_iv(self):
2588 return self.groupcontext.common_iv
2590 @property
2591 def id_context(self):
2592 return self.groupcontext.id_context
2594 @property
2595 def alg_signature(self):
2596 return self.groupcontext.alg_signature
2599class _DeterministicUnprotectProtoAspect(
2600 ContextWhereExternalAadIsGroup, CanUnprotect, SecurityContextUtils
2602 """This implements the sending side of Deterministic Requests.
2604 While simialr to a _PairwiseContextAspect, it only derives the key at
2605 unprotection time, based on information given as Request-Hash."""
2607 # Unless None, this is the value by which the running process recognizes
2608 # that the second phase of a B.1.2 replay window recovery Echo option comes
2609 # from the current process, and thus its sequence number is fresh
2610 echo_recovery = None
2612 deterministic_hashfun = hashes.SHA256()
2614 class ZeroIsAlwaysValid:
2615 """Special-purpose replay window that accepts 0 indefinitely"""
2617 def is_initialized(self):
2618 return True
2620 def is_valid(self, number):
2621 # No particular reason to be lax here
2622 return number == 0
2624 def strike_out(self, number):
2625 # FIXME: I'd rather indicate here that it's a potential replay, have the
2626 # request_id.can_reuse_nonce = False
2627 # set here rather than in _post_decrypt_checks, and thus also get
2628 # the check for whether it's a safe method
2629 pass
2631 def persist(self):
2632 pass
2634 def __init__(self, groupcontext, recipient_id):
2635 self.groupcontext = groupcontext
2636 self.recipient_id = recipient_id
2638 self.recipient_replay_window = self.ZeroIsAlwaysValid()
2640 def __repr__(self):
2641 return "<%s based on %r with the recipient ID %r>" % (
2642 type(self).__name__,
2643 self.groupcontext,
2644 self.recipient_id.hex(),
2645 )
2647 def context_for_response(self):
2648 return self.groupcontext
2650 def _get_recipient_key(self, protected_message, algorithm):
2651 logging.critical(
2652 "Deriving recipient key for protected message %s", protected_message
2653 )
2654 return self._kdf(
2655 self.groupcontext.recipient_keys[self.recipient_id],
2656 protected_message.opt.request_hash,
2657 self.recipient_id,
2658 "Key",
2659 algorithm,
2660 )
2662 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id):
2663 if plaintext[0] not in (GET, FETCH): # FIXME: "is safe"
2664 # FIXME: accept but return inner Unauthorized. (Raising Unauthorized
2665 # here would just create an unprotected Unauthorized, which is not
2666 # what's spec'd for here)
2667 raise ProtectionInvalid("Request was not safe")
2669 basekey = self.groupcontext.recipient_keys[self.recipient_id]
2671 h = hashes.Hash(self.deterministic_hashfun)
2672 h.update(basekey)
2673 h.update(aad)
2674 h.update(plaintext)
2675 request_hash = h.finalize()
2677 if request_hash != protected_message.opt.request_hash:
2678 raise ProtectionInvalid(
2679 "Client's hash of the plaintext diverges from the actual request hash"
2680 )
2682 # This is intended for the protection of the response, and the
2683 # later use in signature in the unprotect function is not happening
2684 # here anyway, neither is the later use for Echo requests
2685 request_id.request_hash = request_hash
2686 request_id.can_reuse_nonce = False
2688 # details needed for various operations, especially eAAD generation
2690 # not inline because the equivalent lambda would not be recognized by mypy
2691 # (workaround for <>)
2692 @property
2693 def alg_aead(self):
2694 return self.groupcontext.alg_aead
2696 @property
2697 def hashfun(self):
2698 return self.groupcontext.hashfun
2700 @property
2701 def common_iv(self):
2702 return self.groupcontext.common_iv
2704 @property
2705 def id_context(self):
2706 return self.groupcontext.id_context
2708 @property
2709 def alg_signature(self):
2710 return self.groupcontext.alg_signature
2713def verify_start(message):
2714 """Extract the unprotected COSE options from a
2715 message for the verifier to then pick a security context to actually verify
2716 the message. (Future versions may also report fields from both unprotected
2717 and protected, if the protected bag is ever used with OSCORE.).
2719 Call this only requests; for responses, you'll have to know the security
2720 context anyway, and there is usually no information to be gained."""
2722 _, _, unprotected, _ = CanUnprotect._extract_encrypted0(message)
2724 return unprotected
2727_getattr__ = deprecation_getattr(
2728 {
2730 "Algorithm": "AeadAlgorithm",
2731 },
2732 globals(),