Coverage for aiocoap / oscore.py: 85%
1220 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:32 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module contains the tools to send OSCORE secured messages.
7It only deals with the algorithmic parts, the security context and protection
8and unprotection of messages. It does not touch on the integration of OSCORE in
9the larger aiocoap stack of having a context or requests; that's what
10:mod:`aiocoap.transports.osore` is for.`"""
12from __future__ import annotations
14from collections import namedtuple
15import io
16import json
17import binascii
18import os
19import os.path
20import tempfile
21import abc
22from typing import Optional, List, Any, Tuple
23import secrets
24import warnings
25import logging
27from aiocoap.message import Message, Direction
28from aiocoap.util import cryptography_additions, deprecation_getattr, Sentinel
29from aiocoap.numbers import GET, POST, FETCH, CHANGED, UNAUTHORIZED, CONTENT
30from aiocoap import error
31from . import credentials
32from .util import DeprecationWarning
33from aiocoap.defaults import log_secret
35from cryptography.hazmat.primitives.ciphers import aead
36from cryptography.hazmat.primitives.kdf.hkdf import HKDF
37from cryptography.hazmat.primitives import ciphers, hashes
38import cryptography.hazmat.backends
39import cryptography.exceptions
40from cryptography.hazmat.primitives import asymmetric, serialization
41from cryptography.hazmat.primitives.asymmetric.utils import (
42 decode_dss_signature,
43 encode_dss_signature,
44)
46import cbor2 as cbor
48import filelock
50# Logger through which log events from cryptographic operations (both inside
51# the primitives and around key derivation) are traced.
52_alglog = logging.getLogger("aiocoap.cryptography")
54MAX_SEQNO = 2**40 - 1
56# Relevant values from the IANA registry "CBOR Object Signing and Encryption (COSE)"
57COSE_KID = 4
58COSE_PIV = 6
59COSE_KID_CONTEXT = 10
60# from RFC9338
61COSE_COUNTERSIGNATURE0 = 12
62# from RFC9528
63COSE_KCCS = 14
65COMPRESSION_BITS_N = 0b111
66COMPRESSION_BIT_K = 0b1000
67COMPRESSION_BIT_H = 0b10000
68COMPRESSION_BIT_GROUP = 0b100000 # Group Flag from draft-ietf-core-oscore-groupcomm-21
69COMPRESSION_BITS_RESERVED = 0b11000000
71CWT_CLAIM_CNF = 8
72CWT_CNF_COSE_KEY = 1
73COSE_KEY_COMMON_KTY = 1
74COSE_KTY_OKP = 1
75COSE_KTY_EC2 = 2
76COSE_KEY_COMMON_ALG = 3
77COSE_KEY_OKP_CRV = -1
78COSE_KEY_OKP_X = -2
79COSE_KEY_EC2_X = -2
80COSE_KEY_EC2_Y = -3
82# While the original values were simple enough to be used in literals, starting
83# with oscore-groupcomm we're using more compact values
85INFO_TYPE_KEYSTREAM_REQUEST = True
86INFO_TYPE_KEYSTREAM_RESPONSE = False
88PRESENT_BUT_NO_VALUE_YET = Sentinel("Value will be populated later")
91class CodeStyle(namedtuple("_CodeStyle", ("request", "response"))):
92 FETCH_CONTENT: CodeStyle
93 POST_CHANGED: CodeStyle
95 @classmethod
96 def from_request(cls, request) -> CodeStyle:
97 if request == FETCH:
98 return cls.FETCH_CONTENT
99 elif request == POST:
100 return cls.POST_CHANGED
101 else:
102 raise ValueError("Invalid request code %r" % request)
105CodeStyle.FETCH_CONTENT = CodeStyle(FETCH, CONTENT)
106CodeStyle.POST_CHANGED = CodeStyle(POST, CHANGED)
109class _DeterministicKey:
110 """Singleton to indicate that for this key member no public or private key
111 is available because it is the Deterministic Client (see
112 <https://www.ietf.org/archive/id/draft-amsuess-core-cachable-oscore-01.html>)
114 This is highly experimental not only from an implementation but also from a
115 specification point of view. The specification has not received adaequate
116 review that would justify using it in any non-experimental scenario.
117 """
120DETERMINISTIC_KEY = _DeterministicKey()
123class NotAProtectedMessage(error.Error, ValueError):
124 """Raised when verification is attempted on a non-OSCORE message"""
126 def __init__(self, message, plain_message):
127 super().__init__(message)
128 self.plain_message = plain_message
131class ProtectionInvalid(error.Error, ValueError):
132 """Raised when verification of an OSCORE message fails"""
135class DecodeError(ProtectionInvalid):
136 """Raised when verification of an OSCORE message fails because CBOR or compressed data were erroneous"""
139class ReplayError(ProtectionInvalid):
140 """Raised when verification of an OSCORE message fails because the sequence numbers was already used"""
143class ReplayErrorWithEcho(ProtectionInvalid, error.RenderableError):
144 """Raised when verification of an OSCORE message fails because the
145 recipient replay window is uninitialized, but a 4.01 Echo can be
146 constructed with the data in the exception that can lead to the client
147 assisting in replay window recovery"""
149 def __init__(self, secctx, request_id, echo):
150 self.secctx = secctx
151 self.request_id = request_id
152 self.echo = echo
154 def to_message(self):
155 inner = Message(
156 code=UNAUTHORIZED,
157 echo=self.echo,
158 )
159 outer, _ = self.secctx.protect(inner, request_id=self.request_id)
160 return outer
163class ContextUnavailable(error.Error, ValueError):
164 """Raised when a context is (currently or permanently) unavailable for
165 protecting or unprotecting a message"""
168class RequestIdentifiers:
169 """A container for details that need to be passed along from the
170 (un)protection of a request to the (un)protection of the response; these
171 data ensure that the request-response binding process works by passing
172 around the request's partial IV.
174 Users of this module should never create or interact with instances, but
175 just pass them around.
176 """
178 def __init__(self, kid, partial_iv, can_reuse_nonce, request_code):
179 # The sender ID of whoever generated the partial IV
180 self.kid = kid
181 self.partial_iv = partial_iv
182 self.can_reuse_nonce = can_reuse_nonce
183 self.code_style = CodeStyle.from_request(request_code)
185 self.request_hash = None
187 def get_reusable_kid_and_piv(self):
188 """Return the kid and the partial IV if can_reuse_nonce is True, and
189 set can_reuse_nonce to False."""
191 if self.can_reuse_nonce:
192 self.can_reuse_nonce = False
193 return (self.kid, self.partial_iv)
194 else:
195 return (None, None)
198def _xor_bytes(a, b):
199 assert len(a) == len(b), "XOR needs consistent lengths"
200 # FIXME is this an efficient thing to do, or should we store everything
201 # that possibly needs xor'ing as long integers with an associated length?
202 return bytes(_a ^ _b for (_a, _b) in zip(a, b))
205class SymmetricEncryptionAlgorithm(metaclass=abc.ABCMeta):
206 """A symmetric algorithm
208 The algorithm's API is the AEAD API with additional authenticated data: The
209 algorithm may or may not verify that data. Algorithms that actually do
210 verify the data are recognized by also being AeadAlgorithm.
211 """
213 value: int
214 key_bytes: int
215 tag_bytes: int
216 iv_bytes: int
218 @abc.abstractmethod
219 def encrypt(cls, plaintext, aad, key, iv):
220 """Return ciphertext + tag for given input data"""
222 @abc.abstractmethod
223 def decrypt(cls, ciphertext_and_tag, aad, key, iv):
224 """Reverse encryption. Must raise ProtectionInvalid on any error
225 stemming from untrusted data."""
227 @staticmethod
228 def _build_encrypt0_structure(protected, external_aad):
229 assert protected == {}, "Unexpected data in protected bucket"
230 protected_serialized = b"" # were it into an empty dict, it'd be the cbor dump
231 enc_structure = ["Encrypt0", protected_serialized, external_aad]
233 return cbor.dumps(enc_structure)
236class AeadAlgorithm(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta):
237 """A symmetric algorithm that provides authentication, including
238 authentication of additional data."""
241class AES_CBC(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta):
242 """AES in CBC mode using the Python cryptography library"""
244 tag_bytes = 0
245 # This introduces padding -- this library doesn't need to care because
246 # Python does allocation for us, but others may need to rethink their
247 # buffer allocation strategies.
249 @classmethod
250 def _cipher(cls, key, iv):
251 return ciphers.base.Cipher(
252 ciphers.algorithms.AES(key),
253 ciphers.modes.CBC(iv),
254 )
256 @classmethod
257 def encrypt(cls, plaintext, _aad, key, iv):
258 # FIXME: Ignoring aad violates https://www.rfc-editor.org/rfc/rfc9459.html#name-implementation-consideratio but is required for Group OSCORE
260 # Padding according to https://www.rfc-editor.org/rfc/rfc5652#section-6.3
261 k = cls.key_bytes
262 assert k < 256, (
263 "Algorithm with this key size should not have been created in the first plae"
264 )
265 pad_byte = k - (len(plaintext) % k)
266 pad_bytes = bytes((pad_byte,)) * pad_byte
267 plaintext += pad_bytes
269 encryptor = cls._cipher(key, iv).encryptor()
270 result = encryptor.update(plaintext)
271 result += encryptor.finalize()
272 return result
274 @classmethod
275 def decrypt(cls, ciphertext_and_tag, _aad, key, iv):
276 # FIXME: Ignoring aad violates https://www.rfc-editor.org/rfc/rfc9459.html#name-implementation-consideratio but is required for Group OSCORE
278 k = cls.key_bytes
279 if ciphertext_and_tag == b"" or len(ciphertext_and_tag) % k != 0:
280 raise ProtectionInvalid("Message length does not match padding")
282 decryptor = cls._cipher(key, iv).decryptor()
283 result = decryptor.update(ciphertext_and_tag)
284 result += decryptor.finalize()
286 # Padding according to https://www.rfc-editor.org/rfc/rfc5652#section-6.3
287 claimed_padding = result[-1]
288 if claimed_padding == 0 or claimed_padding > k:
289 raise ProtectionInvalid("Padding does not match key")
290 if result[-claimed_padding:] != bytes((claimed_padding,)) * claimed_padding:
291 raise ProtectionInvalid("Padding is inconsistent")
293 return result[:-claimed_padding]
296class A128CBC(AES_CBC):
297 # from RFC9459
298 value = -65531
299 key_bytes = 16 # 128-bit key
300 iv_bytes = 16 # 16-octet nonce
303class AES_CCM(AeadAlgorithm, metaclass=abc.ABCMeta):
304 """AES-CCM implemented using the Python cryptography library"""
306 @classmethod
307 def encrypt(cls, plaintext, aad, key, iv):
308 return aead.AESCCM(key, cls.tag_bytes).encrypt(iv, plaintext, aad)
310 @classmethod
311 def decrypt(cls, ciphertext_and_tag, aad, key, iv):
312 try:
313 return aead.AESCCM(key, cls.tag_bytes).decrypt(iv, ciphertext_and_tag, aad)
314 except cryptography.exceptions.InvalidTag:
315 raise ProtectionInvalid("Tag invalid")
318class AES_CCM_16_64_128(AES_CCM):
319 # from RFC8152 and draft-ietf-core-object-security-0[012] 3.2.1
320 value = 10
321 key_bytes = 16 # 128-bit key
322 tag_bytes = 8 # 64-bit tag
323 iv_bytes = 13 # 13-byte nonce
326class AES_CCM_16_64_256(AES_CCM):
327 # from RFC8152
328 value = 11
329 key_bytes = 32 # 256-bit key
330 tag_bytes = 8 # 64-bit tag
331 iv_bytes = 13 # 13-byte nonce
334class AES_CCM_64_64_128(AES_CCM):
335 # from RFC8152
336 value = 12
337 key_bytes = 16 # 128-bit key
338 tag_bytes = 8 # 64-bit tag
339 iv_bytes = 7 # 7-byte nonce
342class AES_CCM_64_64_256(AES_CCM):
343 # from RFC8152
344 value = 13
345 key_bytes = 32 # 256-bit key
346 tag_bytes = 8 # 64-bit tag
347 iv_bytes = 7 # 7-byte nonce
350class AES_CCM_16_128_128(AES_CCM):
351 # from RFC8152
352 value = 30
353 key_bytes = 16 # 128-bit key
354 tag_bytes = 16 # 128-bit tag
355 iv_bytes = 13 # 13-byte nonce
358class AES_CCM_16_128_256(AES_CCM):
359 # from RFC8152
360 value = 31
361 key_bytes = 32 # 256-bit key
362 tag_bytes = 16 # 128-bit tag
363 iv_bytes = 13 # 13-byte nonce
366class AES_CCM_64_128_128(AES_CCM):
367 # from RFC8152
368 value = 32
369 key_bytes = 16 # 128-bit key
370 tag_bytes = 16 # 128-bit tag
371 iv_bytes = 7 # 7-byte nonce
374class AES_CCM_64_128_256(AES_CCM):
375 # from RFC8152
376 value = 33
377 key_bytes = 32 # 256-bit key
378 tag_bytes = 16 # 128-bit tag
379 iv_bytes = 7 # 7-byte nonce
382class AES_GCM(AeadAlgorithm, metaclass=abc.ABCMeta):
383 """AES-GCM implemented using the Python cryptography library"""
385 iv_bytes = 12 # 96 bits fixed size of the nonce
387 @classmethod
388 def encrypt(cls, plaintext, aad, key, iv):
389 return aead.AESGCM(key).encrypt(iv, plaintext, aad)
391 @classmethod
392 def decrypt(cls, ciphertext_and_tag, aad, key, iv):
393 try:
394 return aead.AESGCM(key).decrypt(iv, ciphertext_and_tag, aad)
395 except cryptography.exceptions.InvalidTag:
396 raise ProtectionInvalid("Tag invalid")
399class A128GCM(AES_GCM):
400 # from RFC8152
401 value = 1
402 key_bytes = 16 # 128-bit key
403 tag_bytes = 16 # 128-bit tag
406class A192GCM(AES_GCM):
407 # from RFC8152
408 value = 2
409 key_bytes = 24 # 192-bit key
410 tag_bytes = 16 # 128-bit tag
413class A256GCM(AES_GCM):
414 # from RFC8152
415 value = 3
416 key_bytes = 32 # 256-bit key
417 tag_bytes = 16 # 128-bit tag
420class ChaCha20Poly1305(AeadAlgorithm):
421 # from RFC8152
422 value = 24
423 key_bytes = 32 # 256-bit key
424 tag_bytes = 16 # 128-bit tag
425 iv_bytes = 12 # 96-bit nonce
427 @classmethod
428 def encrypt(cls, plaintext, aad, key, iv):
429 return aead.ChaCha20Poly1305(key).encrypt(iv, plaintext, aad)
431 @classmethod
432 def decrypt(cls, ciphertext_and_tag, aad, key, iv):
433 try:
434 return aead.ChaCha20Poly1305(key).decrypt(iv, ciphertext_and_tag, aad)
435 except cryptography.exceptions.InvalidTag:
436 raise ProtectionInvalid("Tag invalid")
439class AlgorithmCountersign(metaclass=abc.ABCMeta):
440 """A fully parameterized COSE countersign algorithm
442 An instance is able to provide all the alg_signature, par_countersign and
443 par_countersign_key parameters that go into the Group OSCORE algorithms
444 field.
445 """
447 value: int | str
449 @abc.abstractmethod
450 def sign(self, body, external_aad, private_key):
451 """Return the signature produced by the key when using
452 CounterSignature0 as describe in draft-ietf-cose-countersign-01"""
454 @abc.abstractmethod
455 def verify(self, signature, body, external_aad, public_key):
456 """Verify a signature in analogy to sign"""
458 @abc.abstractmethod
459 def generate_with_ccs(self) -> Tuple[Any, bytes]:
460 """Return a usable private key along with a CCS describing it"""
462 @abc.abstractmethod
463 def public_from_private(self, private_key):
464 """Given a private key, derive the publishable key"""
466 @abc.abstractmethod
467 def from_kccs(self, ccs: bytes) -> Any:
468 """Given a CCS, extract the public key, or raise a ValueError if the
469 credential format does not align with the type.
471 The type is not exactly Any, but whichever type is used by this
472 algorithm class."""
474 @staticmethod
475 def _build_countersign_structure(body, external_aad):
476 countersign_structure = [
477 "CounterSignature0",
478 b"",
479 b"",
480 external_aad,
481 body,
482 ]
483 tobesigned = cbor.dumps(countersign_structure)
484 return tobesigned
486 @abc.abstractproperty
487 def signature_length(self) -> int:
488 """The length of a signature using this algorithm"""
490 @abc.abstractproperty
491 def curve_number(self) -> int:
492 """Registered curve number used with this algorithm.
494 Only used for verification of credentials' details"""
497class AlgorithmStaticStatic(metaclass=abc.ABCMeta):
498 @abc.abstractmethod
499 def staticstatic(self, private_key, public_key):
500 """Derive a shared static-static secret from a private and a public key"""
503def _from_kccs_common(ccs: bytes) -> dict:
504 """Check that the CCS contains a CNF claim that is a COSE Key, and return
505 that key"""
507 try:
508 parsed = cbor.loads(ccs)
509 except cbor.CBORDecodeError as e:
510 raise ValueError("CCS not in CBOR format") from e
512 if (
513 not isinstance(parsed, dict)
514 or CWT_CLAIM_CNF not in parsed
515 or not isinstance(parsed[CWT_CLAIM_CNF], dict)
516 or CWT_CNF_COSE_KEY not in parsed[CWT_CLAIM_CNF]
517 or not isinstance(parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY], dict)
518 ):
519 raise ValueError("CCS must contain a COSE Key dict in a CNF")
521 return parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY]
524class Ed25519(AlgorithmCountersign):
525 def sign(self, body, aad, private_key):
526 _alglog.debug("Performing signature:")
527 _alglog.debug("* body: %s", body.hex())
528 _alglog.debug("* AAD: %s", aad.hex())
529 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(
530 private_key
531 )
532 return private_key.sign(self._build_countersign_structure(body, aad))
534 def verify(self, signature, body, aad, public_key):
535 _alglog.debug("Verifying signature:")
536 _alglog.debug("* body: %s", body.hex())
537 _alglog.debug("* AAD: %s", aad.hex())
538 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key)
539 try:
540 public_key.verify(signature, self._build_countersign_structure(body, aad))
541 except cryptography.exceptions.InvalidSignature:
542 _alglog.debug("Signature was invalid.")
543 raise ProtectionInvalid("Signature mismatch")
545 def _generate(self):
546 key = asymmetric.ed25519.Ed25519PrivateKey.generate()
547 # FIXME: We could avoid handing the easy-to-misuse bytes around if the
548 # current algorithm interfaces did not insist on passing the
549 # exchangeable representations -- and generally that should be more
550 # efficient.
551 return key.private_bytes(
552 encoding=serialization.Encoding.Raw,
553 format=serialization.PrivateFormat.Raw,
554 encryption_algorithm=serialization.NoEncryption(),
555 )
557 def generate_with_ccs(self) -> Tuple[Any, bytes]:
558 private = self._generate()
559 public = self.public_from_private(private)
561 ccs = cbor.dumps(
562 {
563 CWT_CLAIM_CNF: {
564 CWT_CNF_COSE_KEY: {
565 COSE_KEY_COMMON_KTY: COSE_KTY_OKP,
566 COSE_KEY_COMMON_ALG: self.value,
567 COSE_KEY_OKP_CRV: self.curve_number,
568 COSE_KEY_OKP_X: public,
569 }
570 }
571 }
572 )
574 return (private, ccs)
576 def public_from_private(self, private_key):
577 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(
578 private_key
579 )
580 public_key = private_key.public_key()
581 return public_key.public_bytes(
582 encoding=serialization.Encoding.Raw,
583 format=serialization.PublicFormat.Raw,
584 )
586 def from_kccs(self, ccs: bytes) -> Any:
587 # eg. {1: 1, 3: -8, -1: 6, -2: h'77 ... 88'}
588 cose_key = _from_kccs_common(ccs)
590 if (
591 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_OKP
592 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value
593 and cose_key.get(COSE_KEY_OKP_CRV) == self.curve_number
594 and COSE_KEY_OKP_X in cose_key
595 ):
596 return cose_key[COSE_KEY_OKP_X]
597 else:
598 raise ValueError("Key type not recognized from CCS key %r" % cose_key)
600 value = -8
601 curve_number = 6
603 signature_length = 64
606class EcdhSsHkdf256(AlgorithmStaticStatic):
607 # FIXME: This class uses the Edwards keys as private and public keys, and
608 # not the converted ones. This will be problematic if pairwise-only
609 # contexts are to be set up.
611 value = -27
613 # FIXME these two will be different when using the Montgomery keys directly
615 # This one will only be used when establishing and distributing pairwise-only keys
616 public_from_private = Ed25519.public_from_private
618 def staticstatic(self, private_key, public_key):
619 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(
620 private_key
621 )
622 private_key = cryptography_additions.sk_to_curve25519(private_key)
624 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key)
625 public_key = cryptography_additions.pk_to_curve25519(public_key)
627 return private_key.exchange(public_key)
630class ECDSA_SHA256_P256(AlgorithmCountersign, AlgorithmStaticStatic):
631 # Trying a new construction approach -- should work just as well given
632 # we're just passing Python objects around
633 def from_public_parts(self, x: bytes, y: bytes):
634 """Create a public key from its COSE values"""
635 return asymmetric.ec.EllipticCurvePublicNumbers(
636 int.from_bytes(x, "big"),
637 int.from_bytes(y, "big"),
638 asymmetric.ec.SECP256R1(),
639 ).public_key()
641 def from_kccs(self, ccs: bytes) -> Any:
642 cose_key = _from_kccs_common(ccs)
644 if (
645 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_EC2
646 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value
647 and COSE_KEY_EC2_X in cose_key
648 and COSE_KEY_EC2_Y in cose_key
649 ):
650 return self.from_public_parts(
651 x=cose_key[COSE_KEY_EC2_X],
652 y=cose_key[COSE_KEY_EC2_Y],
653 )
654 else:
655 raise ValueError("Key type not recognized from CCS key %r" % cose_key)
657 def from_private_parts(self, x: bytes, y: bytes, d: bytes):
658 public_numbers = self.from_public_parts(x, y).public_numbers()
659 private_numbers = asymmetric.ec.EllipticCurvePrivateNumbers(
660 int.from_bytes(d, "big"), public_numbers
661 )
662 return private_numbers.private_key()
664 def sign(self, body, aad, private_key):
665 der_signature = private_key.sign(
666 self._build_countersign_structure(body, aad),
667 asymmetric.ec.ECDSA(hashes.SHA256()),
668 )
669 (r, s) = decode_dss_signature(der_signature)
671 return r.to_bytes(32, "big") + s.to_bytes(32, "big")
673 def verify(self, signature, body, aad, public_key):
674 r = signature[:32]
675 s = signature[32:]
676 r = int.from_bytes(r, "big")
677 s = int.from_bytes(s, "big")
678 der_signature = encode_dss_signature(r, s)
679 try:
680 public_key.verify(
681 der_signature,
682 self._build_countersign_structure(body, aad),
683 asymmetric.ec.ECDSA(hashes.SHA256()),
684 )
685 except cryptography.exceptions.InvalidSignature:
686 raise ProtectionInvalid("Signature mismatch")
688 def _generate(self):
689 return asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1())
691 def generate_with_ccs(self) -> Tuple[Any, bytes]:
692 private = self._generate()
693 public = self.public_from_private(private)
694 # FIXME: Deduplicate with edhoc.py
695 x = public.public_numbers().x.to_bytes(32, "big")
696 y = public.public_numbers().y.to_bytes(32, "big")
698 ccs = cbor.dumps(
699 {
700 CWT_CLAIM_CNF: {
701 CWT_CNF_COSE_KEY: {
702 COSE_KEY_COMMON_KTY: COSE_KTY_EC2,
703 COSE_KEY_COMMON_ALG: self.value,
704 COSE_KEY_EC2_X: x,
705 COSE_KEY_EC2_Y: y,
706 }
707 }
708 }
709 )
711 return (private, ccs)
713 def public_from_private(self, private_key):
714 return private_key.public_key()
716 def staticstatic(self, private_key, public_key):
717 return private_key.exchange(asymmetric.ec.ECDH(), public_key)
719 value = -7 # FIXME: when used as a static-static algorithm, does this become -27? see shepherd review.
720 curve_number = 1
722 signature_length = 64
725algorithms = {
726 "AES-CCM-16-64-128": AES_CCM_16_64_128(),
727 "AES-CCM-16-64-256": AES_CCM_16_64_256(),
728 "AES-CCM-64-64-128": AES_CCM_64_64_128(),
729 "AES-CCM-64-64-256": AES_CCM_64_64_256(),
730 "AES-CCM-16-128-128": AES_CCM_16_128_128(),
731 "AES-CCM-16-128-256": AES_CCM_16_128_256(),
732 "AES-CCM-64-128-128": AES_CCM_64_128_128(),
733 "AES-CCM-64-128-256": AES_CCM_64_128_256(),
734 "ChaCha20/Poly1305": ChaCha20Poly1305(),
735 "A128GCM": A128GCM(),
736 "A192GCM": A192GCM(),
737 "A256GCM": A256GCM(),
738 "A128CBC": A128CBC(),
739}
741# algorithms with full parameter set
742algorithms_countersign = {
743 # maybe needs a different name...
744 "EdDSA on Ed25519": Ed25519(),
745 "ECDSA w/ SHA-256 on P-256": ECDSA_SHA256_P256(),
746}
748algorithms_staticstatic = {
749 "ECDH-SS + HKDF-256": EcdhSsHkdf256(),
750}
752DEFAULT_ALGORITHM = "AES-CCM-16-64-128"
754_hash_backend = cryptography.hazmat.backends.default_backend()
755hashfunctions = {
756 "sha256": hashes.SHA256(),
757 "sha384": hashes.SHA384(),
758 "sha512": hashes.SHA512(),
759}
761DEFAULT_HASHFUNCTION = "sha256"
763DEFAULT_WINDOWSIZE = 32
766class BaseSecurityContext:
767 # Deprecated marker for whether the class uses the
768 # ContextWhereExternalAadIsGroup mixin; see documentation there.
769 external_aad_is_group = False
771 # Authentication information carried with this security context; managed
772 # externally by whatever creates the security context.
773 authenticated_claims: List[str] = []
775 #: AEAD algorithm. This may be None if it is not set in an OSCORE group context.
776 alg_aead: Optional[AeadAlgorithm]
778 #: The common IV of the common context.
779 #:
780 #: This may be longer than needed for constructing IVs with any particular
781 #: algorithm, as per <https://www.ietf.org/archive/id/draft-ietf-core-oscore-groupcomm-23.html#section-2.1.4>
782 common_iv: bytes
784 id_context: Optional[bytes]
786 @property
787 def algorithm(self):
788 warnings.warn(
789 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2
790 )
791 return self.alg_aead
793 @algorithm.setter
794 def algorithm(self, value):
795 warnings.warn(
796 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2
797 )
798 self.alg_aead = value
800 hashfun: hashes.HashAlgorithm
802 def _construct_nonce(
803 self, partial_iv_short, piv_generator_id, alg: SymmetricEncryptionAlgorithm
804 ):
805 pad_piv = b"\0" * (5 - len(partial_iv_short))
807 s = bytes([len(piv_generator_id)])
808 pad_id = b"\0" * (alg.iv_bytes - 6 - len(piv_generator_id))
810 components = s + pad_id + piv_generator_id + pad_piv + partial_iv_short
812 used_common_iv = self.common_iv[: len(components)]
813 nonce = _xor_bytes(used_common_iv, components)
814 _alglog.debug(
815 "Nonce construction: common %s ^ components %s = %s",
816 self.common_iv.hex(),
817 components.hex(),
818 nonce.hex(),
819 )
821 return nonce
823 def _extract_external_aad(
824 self, message, request_id, local_is_sender: bool
825 ) -> bytes:
826 """Build the serialized external AAD from information in the message
827 and the request_id.
829 Information about whether the local context is the sender of the
830 message is only relevant to group contexts, where it influences whose
831 authentication credentials are placed in the AAD.
832 """
833 # If any option were actually Class I, it would be something like
834 #
835 # the_options = pick some of(message)
836 # class_i_options = Message(the_options).opt.encode()
838 oscore_version = 1
839 class_i_options = b""
840 if request_id.request_hash is not None:
841 class_i_options = Message(request_hash=request_id.request_hash).opt.encode()
843 algorithms: List[int | str | None] = [
844 None if self.alg_aead is None else self.alg_aead.value
845 ]
846 if isinstance(self, ContextWhereExternalAadIsGroup):
847 algorithms.append(
848 None if self.alg_group_enc is None else self.alg_group_enc.value
849 )
850 algorithms.append(
851 None if self.alg_signature is None else self.alg_signature.value
852 )
853 algorithms.append(
854 None
855 if self.alg_pairwise_key_agreement is None
856 else self.alg_pairwise_key_agreement.value
857 )
859 external_aad = [
860 oscore_version,
861 algorithms,
862 request_id.kid,
863 request_id.partial_iv,
864 class_i_options,
865 ]
867 if isinstance(self, ContextWhereExternalAadIsGroup):
868 # FIXME: We may need to carry this over in the request_id when
869 # observation span group rekeyings
870 external_aad.append(self.id_context)
872 assert message.opt.oscore is not None, "Double OSCORE"
873 external_aad.append(message.opt.oscore)
875 if local_is_sender:
876 external_aad.append(self.sender_auth_cred)
877 else:
878 external_aad.append(self.recipient_auth_cred)
879 external_aad.append(self.group_manager_cred)
881 return cbor.dumps(external_aad)
884class ContextWhereExternalAadIsGroup(BaseSecurityContext):
885 """The protection and unprotection functions will use the Group OSCORE AADs
886 rather than the regular OSCORE AADs iff a context uses this mixin. (Ie.
887 alg_group_enc etc are added to the algorithms, and request_kid_context,
888 OSCORE_option, sender_auth_cred and gm_cred are added).
890 This does not necessarily match the is_signing property (as pairwise
891 contexts use this but don't sign), and is distinct from the added OSCORE
892 option in the AAD (as that's only applicable for the external AAD as
893 extracted for signing and signature verification purposes)."""
895 id_context: bytes
897 external_aad_is_group = True
899 alg_group_enc: Optional[SymmetricEncryptionAlgorithm]
900 alg_signature: Optional[AlgorithmCountersign]
901 # This is also of type AlgorithmCountersign because the staticstatic
902 # function is sitting on the same type.
903 alg_pairwise_key_agreement: Optional[AlgorithmCountersign]
905 sender_auth_cred: bytes
906 recipient_auth_cred: bytes
907 group_manager_cred: bytes
910# FIXME pull interface components from SecurityContext up here
911class CanProtect(BaseSecurityContext, metaclass=abc.ABCMeta):
912 # The protection function will add a signature according to the context's
913 # alg_signature attribute if this is true
914 is_signing = False
916 # Send the KID when protecting responses
917 #
918 # Once group pairwise mode is implemented, this will need to become a
919 # parameter to protect(), which is stored at the point where the incoming
920 # context is turned into an outgoing context. (Currently, such a mechanism
921 # isn't there yet, and oscore_wrapper protects responses with the very same
922 # context they came in on).
923 responses_send_kid = False
925 #: The KID sent by this party when sending requests, or answering to group
926 #: requests.
927 sender_id: bytes
929 @staticmethod
930 def _compress(protected, unprotected, ciphertext):
931 """Pack the untagged COSE_Encrypt0 object described by the *args
932 into two bytestrings suitable for the Object-Security option and the
933 message body"""
935 if protected:
936 raise RuntimeError(
937 "Protection produced a message that has uncompressable fields."
938 )
940 piv = unprotected.pop(COSE_PIV, b"")
941 if len(piv) > COMPRESSION_BITS_N:
942 raise ValueError("Can't encode overly long partial IV")
944 firstbyte = len(piv)
945 if COSE_KID in unprotected:
946 firstbyte |= COMPRESSION_BIT_K
947 kid_data = unprotected.pop(COSE_KID)
948 else:
949 kid_data = b""
951 if COSE_KID_CONTEXT in unprotected:
952 firstbyte |= COMPRESSION_BIT_H
953 kid_context = unprotected.pop(COSE_KID_CONTEXT)
954 s = len(kid_context)
955 if s > 255:
956 raise ValueError("KID Context too long")
957 s_kid_context = bytes((s,)) + kid_context
958 else:
959 s_kid_context = b""
961 if COSE_COUNTERSIGNATURE0 in unprotected:
962 firstbyte |= COMPRESSION_BIT_GROUP
964 unprotected.pop(COSE_COUNTERSIGNATURE0)
966 # ciphertext will eventually also get the countersignature, but
967 # that happens later when the option is already processed.
969 if unprotected:
970 raise RuntimeError(
971 "Protection produced a message that has uncompressable fields."
972 )
974 if firstbyte:
975 option = bytes([firstbyte]) + piv + s_kid_context + kid_data
976 else:
977 option = b""
979 return (option, ciphertext)
981 def protect(self, message, request_id=None, *, kid_context=True):
982 """Given a plain CoAP message, create a protected message that contains
983 message's options in the inner or outer CoAP message as described in
984 OSCOAP.
986 If the message is a response to a previous message, the additional data
987 from unprotecting the request are passed in as request_id. When
988 request data is present, its partial IV is reused if possible. The
989 security context's ID context is encoded in the resulting message
990 unless kid_context is explicitly set to a False; other values for the
991 kid_context can be passed in as byte string in the same parameter.
992 """
994 _alglog.debug(
995 "Protecting message %s with context %s and request ID %s",
996 message,
997 self,
998 request_id,
999 )
1001 assert (request_id is None) == message.code.is_request(), (
1002 "Requestishness of code to protect does not match presence of request ID"
1003 )
1005 assert message.direction is Direction.OUTGOING
1007 outer_message, plaintext = self._split_message(message, request_id)
1009 outer_message.direction = Direction.OUTGOING
1010 # There are currently no properties that relate to OSCORE that'd need to be discarded.
1011 outer_message.transport_tuning = message.transport_tuning
1013 protected = {}
1014 nonce = None
1015 partial_iv_generated_by = None
1016 unprotected = {}
1017 if request_id is not None:
1018 partial_iv_generated_by, partial_iv_short = (
1019 request_id.get_reusable_kid_and_piv()
1020 )
1022 alg_symmetric = self.alg_group_enc if self.is_signing else self.alg_aead
1023 assert isinstance(alg_symmetric, AeadAlgorithm) or self.is_signing, (
1024 "Non-AEAD algorithms can only be used in signing modes."
1025 )
1027 if partial_iv_generated_by is None:
1028 nonce, partial_iv_short = self._build_new_nonce(alg_symmetric)
1029 partial_iv_generated_by = self.sender_id
1031 unprotected[COSE_PIV] = partial_iv_short
1032 else:
1033 nonce = self._construct_nonce(
1034 partial_iv_short, partial_iv_generated_by, alg_symmetric
1035 )
1037 if message.code.is_request():
1038 unprotected[COSE_KID] = self.sender_id
1040 request_id = RequestIdentifiers(
1041 self.sender_id,
1042 partial_iv_short,
1043 can_reuse_nonce=None,
1044 request_code=outer_message.code,
1045 )
1047 if kid_context is True:
1048 if self.id_context is not None:
1049 unprotected[COSE_KID_CONTEXT] = self.id_context
1050 elif kid_context is not False:
1051 unprotected[COSE_KID_CONTEXT] = kid_context
1052 else:
1053 if self.responses_send_kid:
1054 unprotected[COSE_KID] = self.sender_id
1056 # Putting in a dummy value as the signature calculation will already need some of the compression result
1057 if self.is_signing:
1058 unprotected[COSE_COUNTERSIGNATURE0] = b""
1059 # FIXME: Running this twice quite needlessly (just to get the oscore option for sending)
1060 option_data, _ = self._compress(protected, unprotected, b"")
1062 outer_message.opt.oscore = option_data
1064 external_aad = self._extract_external_aad(
1065 outer_message, request_id, local_is_sender=True
1066 )
1068 aad = SymmetricEncryptionAlgorithm._build_encrypt0_structure(
1069 protected, external_aad
1070 )
1072 key = self._get_sender_key(outer_message, external_aad, plaintext, request_id)
1074 _alglog.debug("Encrypting Encrypt0:")
1075 _alglog.debug("* aad = %s", aad.hex())
1076 _alglog.debug("* nonce = %s", nonce.hex())
1077 _alglog.debug("* key = %s", log_secret(key.hex()))
1078 _alglog.debug("* algorithm = %s", alg_symmetric)
1079 ciphertext = alg_symmetric.encrypt(plaintext, aad, key, nonce)
1081 _alglog.debug("Produced ciphertext %s", ciphertext.hex())
1083 _, payload = self._compress(protected, unprotected, ciphertext)
1085 if self.is_signing:
1086 signature = self.alg_signature.sign(payload, external_aad, self.private_key)
1087 # This is bordering "it's OK to log it in plain", because a reader
1088 # of the log can access both the plaintext and the ciphertext, but
1089 # still, it is called a key.
1090 _alglog.debug(
1091 "Producing keystream from signature encryption key: %s",
1092 log_secret(self.signature_encryption_key.hex()),
1093 )
1094 keystream = self._kdf_for_keystreams(
1095 partial_iv_generated_by,
1096 partial_iv_short,
1097 self.signature_encryption_key,
1098 self.sender_id,
1099 INFO_TYPE_KEYSTREAM_REQUEST
1100 if message.code.is_request()
1101 else INFO_TYPE_KEYSTREAM_RESPONSE,
1102 )
1103 _alglog.debug("Keystream is %s", keystream.hex())
1104 encrypted_signature = _xor_bytes(signature, keystream)
1105 _alglog.debug("Encrypted signature %s", encrypted_signature.hex())
1106 payload += encrypted_signature
1107 outer_message.payload = payload
1109 # FIXME go through options section
1111 _alglog.debug(
1112 "Protecting the message succeeded, yielding ciphertext %s and request ID %s",
1113 outer_message,
1114 request_id,
1115 )
1116 # the request_id in the second argument should be discarded by the
1117 # caller when protecting a response -- is that reason enough for an
1118 # `if` and returning None?
1119 return outer_message, request_id
1121 def _get_sender_key(self, outer_message, aad, plaintext, request_id):
1122 """Customization hook of the protect function
1124 While most security contexts have a fixed sender key, deterministic
1125 requests need to shake up a few things. They need to modify the outer
1126 message, as well as the request_id as it will later be used to
1127 unprotect the response."""
1128 return self.sender_key
1130 def _split_message(self, message, request_id):
1131 """Given a protected message, return the outer message that contains
1132 all Class I and Class U options (but without payload or Object-Security
1133 option), and the encoded inner message that contains all Class E
1134 options and the payload.
1136 This leaves the messages' remotes unset."""
1138 if message.code.is_request():
1139 outer_host = message.opt.uri_host
1140 proxy_uri = message.opt.proxy_uri
1142 inner_message = message.copy(
1143 uri_host=None,
1144 uri_port=None,
1145 proxy_uri=None,
1146 proxy_scheme=None,
1147 )
1148 inner_message.remote = None
1150 if proxy_uri is not None:
1151 # Use set_request_uri to split up the proxy URI into its
1152 # components; extract, preserve and clear them.
1153 inner_message.set_request_uri(proxy_uri, set_uri_host=False)
1154 if inner_message.opt.proxy_uri is not None:
1155 raise ValueError("Can not split Proxy-URI into options")
1156 outer_uri = inner_message.remote.uri_base
1157 inner_message.remote = None
1158 inner_message.opt.proxy_scheme = None
1160 if message.opt.observe is None:
1161 outer_code = POST
1162 else:
1163 outer_code = FETCH
1164 else:
1165 outer_host = None
1166 proxy_uri = None
1168 inner_message = message.copy()
1170 outer_code = request_id.code_style.response
1172 # no max-age because these are always successful responses
1173 outer_message = Message(
1174 code=outer_code,
1175 uri_host=outer_host,
1176 observe=None if message.code.is_response() else message.opt.observe,
1177 )
1178 if proxy_uri is not None:
1179 outer_message.set_request_uri(outer_uri)
1181 plaintext = bytes([inner_message.code]) + inner_message.opt.encode()
1182 if inner_message.payload:
1183 plaintext += bytes([0xFF])
1184 plaintext += inner_message.payload
1186 return outer_message, plaintext
1188 def _build_new_nonce(self, alg: SymmetricEncryptionAlgorithm):
1189 """This implements generation of a new nonce, assembled as per Figure 5
1190 of draft-ietf-core-object-security-06. Returns the shortened partial IV
1191 as well."""
1192 seqno = self.new_sequence_number()
1194 partial_iv = seqno.to_bytes(5, "big")
1196 return (
1197 self._construct_nonce(partial_iv, self.sender_id, alg),
1198 partial_iv.lstrip(b"\0") or b"\0",
1199 )
1201 # sequence number handling
1203 def new_sequence_number(self):
1204 """Return a new sequence number; the implementation is responsible for
1205 never returning the same value twice in a given security context.
1207 May raise ContextUnavailable."""
1208 retval = self.sender_sequence_number
1209 if retval >= MAX_SEQNO:
1210 raise ContextUnavailable("Sequence number too large, context is exhausted.")
1211 self.sender_sequence_number += 1
1212 self.post_seqnoincrease()
1213 return retval
1215 # implementation defined
1217 @abc.abstractmethod
1218 def post_seqnoincrease(self):
1219 """Ensure that sender_sequence_number is stored"""
1220 raise
1222 def context_from_response(self, unprotected_bag) -> CanUnprotect:
1223 """When receiving a response to a request protected with this security
1224 context, pick the security context with which to unprotect the response
1225 given the unprotected information from the Object-Security option.
1227 This allow picking the right security context in a group response, and
1228 helps getting a new short-lived context for B.2 mode. The default
1229 behavior is returning self.
1230 """
1232 # FIXME justify by moving into a mixin for CanProtectAndUnprotect
1233 return self # type: ignore
1236class CanUnprotect(BaseSecurityContext):
1237 recipient_key: bytes
1239 def unprotect(self, protected_message, request_id=None):
1240 _alglog.debug(
1241 "Unprotecting message %s with context %s and request ID %s",
1242 protected_message,
1243 self,
1244 request_id,
1245 )
1247 assert (request_id is not None) == protected_message.code.is_response(), (
1248 "Requestishness of code to unprotect does not match presence of request ID"
1249 )
1250 is_response = protected_message.code.is_response()
1252 assert protected_message.direction is Direction.INCOMING
1254 # Set to a raisable exception on replay check failures; it will be
1255 # raised, but the package may still be processed in the course of Echo handling.
1256 replay_error = None
1258 protected_serialized, protected, unprotected, ciphertext = (
1259 self._extract_encrypted0(protected_message)
1260 )
1262 if protected:
1263 raise ProtectionInvalid("The protected field is not empty")
1265 # FIXME check for duplicate keys in protected
1267 if unprotected.pop(COSE_KID_CONTEXT, self.id_context) != self.id_context:
1268 # FIXME is this necessary?
1269 raise ProtectionInvalid("Sender ID context does not match")
1271 if unprotected.pop(COSE_KID, self.recipient_id) != self.recipient_id:
1272 # for most cases, this is caught by the session ID dispatch, but in
1273 # responses (where explicit sender IDs are atypical), this is a
1274 # valid check
1275 raise ProtectionInvalid("Sender ID does not match")
1277 if COSE_PIV not in unprotected:
1278 if not is_response:
1279 raise ProtectionInvalid("No sequence number provided in request")
1281 seqno = None # sentinel for not striking out anything
1282 partial_iv_short = request_id.partial_iv
1283 partial_iv_generated_by = request_id.kid
1284 else:
1285 partial_iv_short = unprotected.pop(COSE_PIV)
1286 partial_iv_generated_by = self.recipient_id
1288 seqno = int.from_bytes(partial_iv_short, "big")
1290 if not is_response:
1291 if not self.recipient_replay_window.is_initialized():
1292 replay_error = ReplayError("Sequence number check unavailable")
1293 elif not self.recipient_replay_window.is_valid(seqno):
1294 replay_error = ReplayError("Sequence number was reused")
1296 if replay_error is not None and self.echo_recovery is None:
1297 # Don't even try decoding if there is no reason to
1298 raise replay_error
1300 request_id = RequestIdentifiers(
1301 partial_iv_generated_by,
1302 partial_iv_short,
1303 can_reuse_nonce=replay_error is None,
1304 request_code=protected_message.code,
1305 )
1307 external_aad = self._extract_external_aad(
1308 protected_message, request_id, local_is_sender=False
1309 )
1311 if unprotected.pop(COSE_COUNTERSIGNATURE0, None) is not None:
1312 try:
1313 alg_signature = self.alg_signature
1314 except NameError:
1315 raise DecodeError(
1316 "Group messages can not be decoded with this non-group context"
1317 )
1319 siglen = alg_signature.signature_length
1320 if len(ciphertext) < siglen:
1321 raise DecodeError("Message too short for signature")
1322 encrypted_signature = ciphertext[-siglen:]
1324 _alglog.debug(
1325 "Producing keystream from signature encryption key: %s",
1326 log_secret(self.signature_encryption_key.hex()),
1327 )
1328 keystream = self._kdf_for_keystreams(
1329 partial_iv_generated_by,
1330 partial_iv_short,
1331 self.signature_encryption_key,
1332 self.recipient_id,
1333 INFO_TYPE_KEYSTREAM_REQUEST
1334 if protected_message.code.is_request()
1335 else INFO_TYPE_KEYSTREAM_RESPONSE,
1336 )
1337 _alglog.debug("Encrypted signature %s", encrypted_signature.hex())
1338 _alglog.debug("Keystream is %s", keystream.hex())
1339 signature = _xor_bytes(encrypted_signature, keystream)
1341 ciphertext = ciphertext[:-siglen]
1343 alg_signature.verify(
1344 signature, ciphertext, external_aad, self.recipient_public_key
1345 )
1347 alg_symmetric = self.alg_group_enc
1348 else:
1349 alg_symmetric = self.alg_aead
1351 if unprotected:
1352 raise DecodeError("Unsupported unprotected option")
1354 if (
1355 len(ciphertext) < self.alg_aead.tag_bytes + 1
1356 ): # +1 assures access to plaintext[0] (the code)
1357 raise ProtectionInvalid("Ciphertext too short")
1359 enc_structure = ["Encrypt0", protected_serialized, external_aad]
1360 aad = cbor.dumps(enc_structure)
1362 key = self._get_recipient_key(protected_message, alg_symmetric)
1364 nonce = self._construct_nonce(
1365 partial_iv_short, partial_iv_generated_by, alg_symmetric
1366 )
1368 _alglog.debug("Decrypting Encrypt0:")
1369 _alglog.debug("* ciphertext = %s", ciphertext.hex())
1370 _alglog.debug("* aad = %s", aad.hex())
1371 _alglog.debug("* nonce = %s", nonce.hex())
1372 _alglog.debug("* key = %s", log_secret(key.hex()))
1373 _alglog.debug("* algorithm = %s", alg_symmetric)
1374 try:
1375 plaintext = alg_symmetric.decrypt(ciphertext, aad, key, nonce)
1376 except Exception as e:
1377 _alglog.debug("Unprotecting failed")
1378 raise e
1380 self._post_decrypt_checks(
1381 external_aad, plaintext, protected_message, request_id
1382 )
1384 if not is_response and seqno is not None and replay_error is None:
1385 self.recipient_replay_window.strike_out(seqno)
1387 # FIXME add options from unprotected
1389 unprotected_message = Message(code=plaintext[0])
1390 unprotected_message.payload = unprotected_message.opt.decode(plaintext[1:])
1391 unprotected_message.direction = Direction.INCOMING
1393 try_initialize = (
1394 not self.recipient_replay_window.is_initialized()
1395 and self.echo_recovery is not None
1396 )
1397 if try_initialize:
1398 if protected_message.code.is_request():
1399 # Either accept into replay window and clear replay error, or raise
1400 # something that can turn into a 4.01,Echo response
1401 if unprotected_message.opt.echo == self.echo_recovery:
1402 self.recipient_replay_window.initialize_from_freshlyseen(seqno)
1403 replay_error = None
1404 else:
1405 raise ReplayErrorWithEcho(
1406 secctx=self, request_id=request_id, echo=self.echo_recovery
1407 )
1408 else:
1409 # We can initialize the replay window from a response as well.
1410 # The response is guaranteed fresh as it was AEAD-decoded to
1411 # match a request sent by this process.
1412 #
1413 # This is rare, as it only works when the server uses an own
1414 # sequence number, eg. when sending a notification or when
1415 # acting again on a retransmitted safe request whose response
1416 # it did not cache.
1417 #
1418 # Nothing bad happens if we can't make progress -- we just
1419 # don't initialize the replay window that wouldn't have been
1420 # checked for a response anyway.
1421 if seqno is not None:
1422 self.recipient_replay_window.initialize_from_freshlyseen(seqno)
1424 if replay_error is not None:
1425 raise replay_error
1427 if unprotected_message.code.is_request():
1428 if protected_message.opt.observe != 0:
1429 unprotected_message.opt.observe = None
1430 else:
1431 if protected_message.opt.observe is not None:
1432 # -1 ensures that they sort correctly in later reordering
1433 # detection. Note that neither -1 nor high (>3 byte) sequence
1434 # numbers can be serialized in the Observe option, but they are
1435 # in this implementation accepted for passing around.
1436 unprotected_message.opt.observe = -1 if seqno is None else seqno
1438 _alglog.debug(
1439 "Unprotecting succeeded, yielding plaintext %s and request_id %s",
1440 unprotected_message,
1441 request_id,
1442 )
1443 return unprotected_message, request_id
1445 def _get_recipient_key(
1446 self, protected_message, algorithm: SymmetricEncryptionAlgorithm
1447 ):
1448 """Customization hook of the unprotect function
1450 While most security contexts have a fixed recipient key, group contexts
1451 have multiple, and deterministic requests build it on demand."""
1452 return self.recipient_key
1454 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id):
1455 """Customization hook of the unprotect function after decryption
1457 While most security contexts are good with the default checks,
1458 deterministic requests need to perform additional checks while AAD and
1459 plaintext information is still available, and modify the request_id for
1460 the later protection step of the response."""
1462 @staticmethod
1463 def _uncompress(option_data, payload):
1464 if option_data == b"":
1465 firstbyte = 0
1466 else:
1467 firstbyte = option_data[0]
1468 tail = option_data[1:]
1470 unprotected = {}
1472 if firstbyte & COMPRESSION_BITS_RESERVED:
1473 raise DecodeError("Protected data uses reserved fields")
1475 pivsz = firstbyte & COMPRESSION_BITS_N
1476 if pivsz:
1477 if len(tail) < pivsz:
1478 raise DecodeError("Partial IV announced but not present")
1479 unprotected[COSE_PIV] = tail[:pivsz]
1480 tail = tail[pivsz:]
1482 if firstbyte & COMPRESSION_BIT_H:
1483 # kid context hint
1484 s = tail[0]
1485 if len(tail) - 1 < s:
1486 raise DecodeError("Context hint announced but not present")
1487 tail = tail[1:]
1488 unprotected[COSE_KID_CONTEXT] = tail[:s]
1489 tail = tail[s:]
1491 if firstbyte & COMPRESSION_BIT_K:
1492 kid = tail
1493 unprotected[COSE_KID] = kid
1495 if firstbyte & COMPRESSION_BIT_GROUP:
1496 # Not really; As this is (also) used early on (before the KID
1497 # context is even known, because it's just getting extracted), this
1498 # is returning an incomplete value here and leaves it to the later
1499 # processing to strip the right number of bytes from the ciphertext
1500 unprotected[COSE_COUNTERSIGNATURE0] = PRESENT_BUT_NO_VALUE_YET
1502 return b"", {}, unprotected, payload
1504 @classmethod
1505 def _extract_encrypted0(cls, message):
1506 if message.opt.oscore is None:
1507 raise NotAProtectedMessage("No Object-Security option present", message)
1509 protected_serialized, protected, unprotected, ciphertext = cls._uncompress(
1510 message.opt.oscore, message.payload
1511 )
1512 return protected_serialized, protected, unprotected, ciphertext
1514 # implementation defined
1516 def context_for_response(self) -> CanProtect:
1517 """After processing a request with this context, with which security
1518 context should an outgoing response be protected? By default, it's the
1519 same context."""
1520 # FIXME: Is there any way in which the handler may want to influence
1521 # the decision taken here? Or would, then, the handler just call a more
1522 # elaborate but similar function when setting the response's remote
1523 # already?
1525 # FIXME justify by moving into a mixin for CanProtectAndUnprotect
1526 return self # type: ignore
1529class SecurityContextUtils(BaseSecurityContext):
1530 def _kdf(
1531 self,
1532 salt,
1533 ikm,
1534 role_id,
1535 out_type,
1536 key_alg: Optional[SymmetricEncryptionAlgorithm] = None,
1537 ):
1538 """The HKDF as used to derive sender and recipient key and IV in
1539 RFC8613 Section 3.2.1, and analogously the Group Encryption Key of oscore-groupcomm.
1540 """
1542 _alglog.debug("Deriving through KDF:")
1543 _alglog.debug("* salt = %s", salt.hex() if salt else salt)
1544 _alglog.debug("* ikm = %s", log_secret(ikm.hex()))
1545 _alglog.debug("* role_id = %s", role_id.hex())
1546 _alglog.debug("* out_type = %r", out_type)
1547 _alglog.debug("* key_alg = %r", key_alg)
1549 # The field in info is called `alg_aead` defined in RFC8613, but in
1550 # group OSCORE something that's very clearly *not* alg_aead is put in
1551 # there.
1552 #
1553 # The rules about this come both from
1554 # https://www.ietf.org/archive/id/draft-ietf-core-oscore-groupcomm-23.html#section-2.3
1555 # and
1556 # https://www.ietf.org/archive/id/draft-ietf-core-oscore-groupcomm-23.html#section-2.1.9
1557 # but they produce the same outcome.
1558 if hasattr(self, "alg_group_enc") and self.alg_group_enc is not None:
1559 the_field_called_alg_aead = self.alg_group_enc.value
1560 else:
1561 assert self.alg_aead is not None, (
1562 "At least alg_aead or alg_group_enc needs to be set on a context."
1563 )
1564 the_field_called_alg_aead = self.alg_aead.value
1566 assert (key_alg is None) ^ (out_type == "Key")
1567 if out_type == "Key":
1568 # Duplicate assertion needed while mypy can not see that the assert
1569 # above the if is stricter than this.
1570 assert key_alg is not None
1571 out_bytes = key_alg.key_bytes
1572 the_field_called_alg_aead = key_alg.value
1573 elif out_type == "IV":
1574 assert self.alg_aead is not None, (
1575 "At least alg_aead or alg_group_enc needs to be set on a context."
1576 )
1577 out_bytes = max(
1578 (
1579 a.iv_bytes
1580 for a in [self.alg_aead, getattr(self, "alg_group_enc", None)]
1581 if a is not None
1582 )
1583 )
1584 elif out_type == "SEKey":
1585 assert isinstance(self, GroupContext) and self.alg_group_enc is not None, (
1586 "SEKey derivation is only defined for group contexts with a group encryption algorithm."
1587 )
1588 # "While the obtained Signature Encryption Key is never used with
1589 # the Group Encryption Algorithm, its length was chosen to obtain a
1590 # matching level of security."
1591 out_bytes = self.alg_group_enc.key_bytes
1592 else:
1593 raise ValueError("Output type not recognized")
1595 _alglog.debug("* the_field_called_alg_aead = %s", the_field_called_alg_aead)
1597 info = [
1598 role_id,
1599 self.id_context,
1600 the_field_called_alg_aead,
1601 out_type,
1602 out_bytes,
1603 ]
1604 _alglog.debug("* info = %r", info)
1605 ret = self._kdf_lowlevel(salt, ikm, info, out_bytes)
1606 _alglog.debug("Derivation of %r produced %s", out_type, log_secret(ret.hex()))
1607 return ret
1609 def _kdf_for_keystreams(self, piv_generated_by, salt, ikm, role_id, out_type):
1610 """The HKDF as used to derive the keystreams of oscore-groupcomm."""
1612 out_bytes = self.alg_signature.signature_length
1614 assert out_type in (
1615 INFO_TYPE_KEYSTREAM_REQUEST,
1616 INFO_TYPE_KEYSTREAM_RESPONSE,
1617 ), "Output type not recognized"
1619 info = [
1620 piv_generated_by,
1621 self.id_context,
1622 out_type,
1623 out_bytes,
1624 ]
1625 return self._kdf_lowlevel(salt, ikm, info, out_bytes)
1627 def _kdf_lowlevel(self, salt: bytes, ikm: bytes, info: list, l: int) -> bytes: # noqa: E741 (signature follows RFC definition)
1628 """The HKDF function as used in RFC8613 and oscore-groupcomm (notated
1629 there as ``something = HKDF(...)``
1631 Note that `info` typically contains `L` at some point.
1633 When `info` takes the conventional structure of pid, id_context,
1634 ald_aead, type, L], it may make sense to extend the `_kdf` function to
1635 support that case, or `_kdf_for_keystreams` for a different structure, as
1636 they are the more high-level tools."""
1637 hkdf = HKDF(
1638 algorithm=self.hashfun,
1639 length=l,
1640 salt=salt,
1641 info=cbor.dumps(info),
1642 backend=_hash_backend,
1643 )
1644 expanded = hkdf.derive(ikm)
1645 return expanded
1647 def derive_keys(self, master_salt, master_secret):
1648 """Populate sender_key, recipient_key and common_iv from the algorithm,
1649 hash function and id_context already configured beforehand, and from
1650 the passed salt and secret."""
1652 self.sender_key = self._kdf(
1653 master_salt, master_secret, self.sender_id, "Key", self.alg_aead
1654 )
1655 self.recipient_key = self._kdf(
1656 master_salt, master_secret, self.recipient_id, "Key", self.alg_aead
1657 )
1659 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV")
1661 # really more of the Credentials interface
1663 def get_oscore_context_for(self, unprotected):
1664 """Return a sutiable context (most easily self) for an incoming request
1665 if its unprotected data (COSE_KID, COSE_KID_CONTEXT) fit its
1666 description. If it doesn't match, it returns None.
1668 The default implementation just strictly checks for whether kid and any
1669 kid context match (not matching if a local KID context is set but none
1670 is given in the request); modes like Group OSCORE can spin up aspect
1671 objects here.
1672 """
1673 if (
1674 unprotected.get(COSE_KID, None) == self.recipient_id
1675 and unprotected.get(COSE_KID_CONTEXT, None) == self.id_context
1676 ):
1677 return self
1680class ReplayWindow:
1681 """A regular replay window of a fixed size.
1683 It is implemented as an index and a bitfield (represented by an integer)
1684 whose least significant bit represents the seqyence number of the index,
1685 and a 1 indicates that a number was seen. No shenanigans around implicit
1686 leading ones (think floating point normalization) happen.
1688 >>> w = ReplayWindow(32, lambda: None)
1689 >>> w.initialize_empty()
1690 >>> w.strike_out(5)
1691 >>> w.is_valid(3)
1692 True
1693 >>> w.is_valid(5)
1694 False
1695 >>> w.strike_out(0)
1696 >>> w.strike_out(1)
1697 >>> w.strike_out(2)
1698 >>> w.is_valid(1)
1699 False
1701 Jumping ahead by the window size invalidates older numbers:
1703 >>> w.is_valid(4)
1704 True
1705 >>> w.strike_out(35)
1706 >>> w.is_valid(4)
1707 True
1708 >>> w.strike_out(36)
1709 >>> w.is_valid(4)
1710 False
1712 Usage safety
1713 ------------
1715 For every key, the replay window can only be initielized empty once. On
1716 later uses, it needs to be persisted by storing the output of
1717 self.persist() somewhere and loaded from that persisted data.
1719 It is acceptable to store persistence data in the strike_out_callback, but
1720 that must then ensure that the data is written (flushed to a file or
1721 committed to a database), but that is usually inefficient.
1723 Stability
1724 ---------
1726 This class is not considered for stabilization yet and an implementation
1727 detail of the SecurityContext implementation(s).
1728 """
1730 _index = None
1731 """Sequence number represented by the least significant bit of _bitfield"""
1732 _bitfield = None
1733 """Integer interpreted as a bitfield, self._size wide. A digit 1 at any bit
1734 indicates that the bit's index (its power of 2) plus self._index was
1735 already seen."""
1737 def __init__(self, size, strike_out_callback):
1738 self._size = size
1739 self.strike_out_callback = strike_out_callback
1741 def is_initialized(self):
1742 return self._index is not None
1744 def initialize_empty(self):
1745 self._index = 0
1746 self._bitfield = 0
1748 def initialize_from_persisted(self, persisted):
1749 self._index = persisted["index"]
1750 self._bitfield = persisted["bitfield"]
1752 def initialize_from_freshlyseen(self, seen):
1753 """Initialize the replay window with a particular value that is just
1754 being observed in a fresh (ie. generated by the peer later than any
1755 messages processed before state was lost here) message. This marks the
1756 seen sequence number and all preceding it as invalid, and and all later
1757 ones as valid."""
1758 self._index = seen
1759 self._bitfield = 1
1761 def is_valid(self, number):
1762 if number < self._index:
1763 return False
1764 if number >= self._index + self._size:
1765 return True
1766 return (self._bitfield >> (number - self._index)) & 1 == 0
1768 def strike_out(self, number):
1769 if not self.is_valid(number):
1770 raise ValueError(
1771 "Sequence number is not valid any more and "
1772 "thus can't be removed from the window"
1773 )
1774 overshoot = number - (self._index + self._size - 1)
1775 if overshoot > 0:
1776 self._index += overshoot
1777 self._bitfield >>= overshoot
1778 assert self.is_valid(number), "Sequence number was not valid before strike-out"
1779 self._bitfield |= 1 << (number - self._index)
1781 self.strike_out_callback()
1783 def persist(self):
1784 """Return a dict containing internal state which can be passed to init
1785 to recreated the replay window."""
1787 return {"index": self._index, "bitfield": self._bitfield}
1790class FilesystemSecurityContext(
1791 CanProtect, CanUnprotect, SecurityContextUtils, credentials._Objectish
1792):
1793 """Security context stored in a directory as distinct files containing
1794 containing
1796 * Master secret, master salt, sender and recipient ID,
1797 optionally algorithm, the KDF hash function, and replay window size
1798 (settings.json and secrets.json, where the latter is typically readable
1799 only for the user)
1800 * sequence numbers and replay windows (sequence.json, the only file the
1801 process needs write access to)
1803 The static parameters can all either be placed in settings.json or
1804 secrets.json, but must not be present in both; the presence of either file
1805 is sufficient.
1807 .. warning::
1809 Security contexts must never be copied around and used after another
1810 copy was used. They should only ever be moved, and if they are copied
1811 (eg. as a part of a system backup), restored contexts must not be used
1812 again; they need to be replaced with freshly created ones.
1814 An additional file named `lock` is created to prevent the accidental use of
1815 a context by to concurrent programs.
1817 Note that the sequence number file is updated in an atomic fashion which
1818 requires file creation privileges in the directory. If privilege separation
1819 between settings/key changes and sequence number changes is desired, one
1820 way to achieve that on Linux is giving the aiocoap process's user group
1821 write permissions on the directory and setting the sticky bit on the
1822 directory, thus forbidding the user to remove the settings/secret files not
1823 owned by him.
1825 Writes due to sent sequence numbers are reduced by applying a variation on
1826 the mechanism of RFC8613 Appendix B.1.1 (incrementing the persisted sender
1827 sequence number in steps of `k`). That value is automatically grown from
1828 sequence_number_chunksize_start up to sequence_number_chunksize_limit.
1829 At runtime, the receive window is not stored but kept indeterminate. In
1830 case of an abnormal shutdown, the server uses the mechanism described in
1831 Appendix B.1.2 to recover.
1832 """
1834 # possibly overridden in constructor
1835 #
1836 # Type is ignored because while it *is* AlgAead, mypy can't tell.
1837 alg_aead = algorithms[DEFAULT_ALGORITHM] # type: ignore
1839 class LoadError(ValueError):
1840 """Exception raised with a descriptive message when trying to load a
1841 faulty security context"""
1843 def __init__(
1844 self,
1845 basedir: str,
1846 sequence_number_chunksize_start=10,
1847 sequence_number_chunksize_limit=10000,
1848 ):
1849 self.basedir = basedir
1851 self.lockfile: Optional[filelock.FileLock] = filelock.FileLock(
1852 os.path.join(basedir, "lock")
1853 )
1854 # We *actually* want to fail right if there's any other process locking
1855 # it, but making this small means that it'll fail when the system is
1856 # under load and time passes just inside the code.
1857 #
1858 # As this is a very unlikely unhappy path (some othe process is sitting
1859 # on the same lock file), we can afford a 1s delay in showing the error.
1860 try:
1861 self.lockfile.acquire(timeout=1)
1862 # see https://github.com/PyCQA/pycodestyle/issues/703
1863 except: # noqa: E722
1864 # No lock, no loading, no need to fail in __del__
1865 self.lockfile = None
1866 raise
1868 # Always enabled as committing to a file for every received request
1869 # would be a terrible burden.
1870 self.echo_recovery = secrets.token_bytes(8)
1872 try:
1873 self._load()
1874 except KeyError as k:
1875 raise self.LoadError("Configuration key missing: %s" % (k.args[0],))
1877 self.sequence_number_chunksize_start = sequence_number_chunksize_start
1878 self.sequence_number_chunksize_limit = sequence_number_chunksize_limit
1879 self.sequence_number_chunksize = sequence_number_chunksize_start
1881 self.sequence_number_persisted = self.sender_sequence_number
1883 def _load(self):
1884 # doesn't check for KeyError on every occasion, relies on __init__ to
1885 # catch that
1887 data = {}
1888 for readfile in ("secret.json", "settings.json"):
1889 try:
1890 with open(os.path.join(self.basedir, readfile)) as f:
1891 filedata = json.load(f)
1892 except FileNotFoundError:
1893 continue
1895 for key, value in filedata.items():
1896 if key.endswith("_hex"):
1897 key = key[:-4]
1898 value = binascii.unhexlify(value)
1899 elif key.endswith("_ascii"):
1900 key = key[:-6]
1901 value = value.encode("ascii")
1903 if key in data:
1904 raise self.LoadError(
1905 "Datum %r present in multiple input files at %r."
1906 % (key, self.basedir)
1907 )
1909 data[key] = value
1911 self.alg_aead = algorithms[data.get("algorithm", DEFAULT_ALGORITHM)]
1912 self.hashfun = hashfunctions[data.get("kdf-hashfun", DEFAULT_HASHFUNCTION)]
1914 windowsize = data.get("window", DEFAULT_WINDOWSIZE)
1915 if not isinstance(windowsize, int):
1916 raise self.LoadError("Non-integer replay window")
1918 self.sender_id = data["sender-id"]
1919 self.recipient_id = data["recipient-id"]
1921 if (
1922 max(len(self.sender_id), len(self.recipient_id))
1923 > self.alg_aead.iv_bytes - 6
1924 ):
1925 raise self.LoadError(
1926 "Sender or Recipient ID too long (maximum length %s for this algorithm)"
1927 % (self.alg_aead.iv_bytes - 6)
1928 )
1930 master_secret = data["secret"]
1931 master_salt = data.get("salt", b"")
1932 self.id_context = data.get("id-context", None)
1934 self.derive_keys(master_salt, master_secret)
1936 self.recipient_replay_window = ReplayWindow(
1937 windowsize, self._replay_window_changed
1938 )
1939 try:
1940 with open(os.path.join(self.basedir, "sequence.json")) as f:
1941 sequence = json.load(f)
1942 except FileNotFoundError:
1943 self.sender_sequence_number = 0
1944 self.recipient_replay_window.initialize_empty()
1945 self.replay_window_persisted = True
1946 else:
1947 self.sender_sequence_number = int(sequence["next-to-send"])
1948 received = sequence["received"]
1949 if received == "unknown":
1950 # The replay window will stay uninitialized, which triggers
1951 # Echo recovery
1952 self.replay_window_persisted = False
1953 else:
1954 try:
1955 self.recipient_replay_window.initialize_from_persisted(received)
1956 except (ValueError, TypeError, KeyError):
1957 # Not being particularly careful about what could go wrong: If
1958 # someone tampers with the replay data, we're already in *big*
1959 # trouble, of which I fail to see how it would become worse
1960 # than a crash inside the application around "failure to
1961 # right-shift a string" or that like; at worst it'd result in
1962 # nonce reuse which tampering with the replay window file
1963 # already does.
1964 raise self.LoadError(
1965 "Persisted replay window state was not understood"
1966 )
1967 self.replay_window_persisted = True
1969 # This is called internally whenever a new sequence number is taken or
1970 # crossed out from the window, and blocks a lot; B.1 mode mitigates that.
1971 #
1972 # Making it async and block in a threadpool would mitigate the blocking of
1973 # other messages, but the more visible effect of this will be that no
1974 # matter if sync or async, a reply will need to wait for a file sync
1975 # operation to conclude.
1976 def _store(self):
1977 tmphand, tmpnam = tempfile.mkstemp(
1978 dir=self.basedir, prefix=".sequence-", suffix=".json", text=True
1979 )
1981 data = {"next-to-send": self.sequence_number_persisted}
1982 if not self.replay_window_persisted:
1983 data["received"] = "unknown"
1984 else:
1985 data["received"] = self.recipient_replay_window.persist()
1987 # Using io.open (instead os.fdopen) and binary / write with encode
1988 # rather than dumps as that works even while the interpreter is
1989 # shutting down.
1990 #
1991 # This can be relaxed when there is a defined shutdown sequence for
1992 # security contexts that's triggered from the general context shutdown
1993 # -- but right now, there isn't.
1994 with io.open(tmphand, "wb") as tmpfile:
1995 tmpfile.write(json.dumps(data).encode("utf8"))
1996 tmpfile.flush()
1997 os.fsync(tmpfile.fileno())
1999 os.replace(tmpnam, os.path.join(self.basedir, "sequence.json"))
2001 def _replay_window_changed(self):
2002 if self.replay_window_persisted:
2003 # Just remove the sequence numbers once from the file
2004 self.replay_window_persisted = False
2005 self._store()
2007 def post_seqnoincrease(self):
2008 if self.sender_sequence_number > self.sequence_number_persisted:
2009 self.sequence_number_persisted += self.sequence_number_chunksize
2011 self.sequence_number_chunksize = min(
2012 self.sequence_number_chunksize * 2, self.sequence_number_chunksize_limit
2013 )
2014 # FIXME: this blocks -- see https://github.com/chrysn/aiocoap/issues/178
2015 self._store()
2017 # The = case would only happen if someone deliberately sets all
2018 # numbers to 1 to force persisting on every step
2019 assert self.sender_sequence_number <= self.sequence_number_persisted, (
2020 "Using a sequence number that has been persisted already"
2021 )
2023 def _destroy(self):
2024 """Release the lock file, and ensure that the object has become
2025 unusable.
2027 If there is unpersisted state from B.1 operation, the actually used
2028 number and replay window gets written back to the file to allow
2029 resumption without wasting digits or round-trips.
2030 """
2031 # FIXME: Arrange for a more controlled shutdown through the credentials
2033 self.replay_window_persisted = True
2034 self.sequence_number_persisted = self.sender_sequence_number
2035 self._store()
2037 del self.sender_key
2038 del self.recipient_key
2040 os.unlink(self.lockfile.lock_file)
2041 self.lockfile.release()
2043 self.lockfile = None
2045 def __del__(self):
2046 if self.lockfile is not None:
2047 self._destroy()
2049 @classmethod
2050 def from_item(cls, init_data):
2051 """Overriding _Objectish's from_item because the parameter name for
2052 basedir is contextfile for historical reasons"""
2054 def constructor(
2055 basedir: Optional[str] = None, contextfile: Optional[str] = None
2056 ):
2057 if basedir is not None and contextfile is not None:
2058 raise credentials.CredentialsLoadError(
2059 "Conflicting arguments basedir and contextfile; just contextfile instead"
2060 )
2061 if basedir is None and contextfile is None:
2062 raise credentials.CredentialsLoadError("Missing item 'basedir'")
2063 if contextfile is not None:
2064 warnings.warn(
2065 "Property contextfile was renamed to basedir in OSCORE credentials entries",
2066 DeprecationWarning,
2067 stacklevel=2,
2068 )
2069 basedir = contextfile
2070 assert (
2071 basedir is not None
2072 ) # This helps mypy which would otherwise not see that the above ensures this already
2073 return cls(basedir)
2075 return credentials._call_from_structureddata(
2076 constructor, cls.__name__, init_data
2077 )
2079 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
2080 return set((self.recipient_id,))
2083class GroupContext(ContextWhereExternalAadIsGroup, BaseSecurityContext):
2084 is_signing = True
2085 responses_send_kid = True
2087 @abc.abstractproperty
2088 def private_key(self):
2089 """Private key used to sign outgoing messages.
2091 Contexts not designed to send messages may raise a RuntimeError here;
2092 that necessity may later go away if some more accurate class modelling
2093 is found."""
2095 @abc.abstractproperty
2096 def recipient_public_key(self):
2097 """Public key used to verify incoming messages.
2099 Contexts not designed to receive messages (because they'd have aspects
2100 for that) may raise a RuntimeError here; that necessity may later go
2101 away if some more accurate class modelling is found."""
2104class SimpleGroupContext(GroupContext, CanProtect, CanUnprotect, SecurityContextUtils):
2105 """A context for an OSCORE group
2107 This is a non-persistable version of a group context that does not support
2108 any group manager or rekeying; it is set up statically at startup.
2110 It is intended for experimentation and demos, but aims to be correct enough
2111 to be usable securely.
2112 """
2114 # set during initialization (making all those attributes rather than
2115 # possibly properties as they might be in super)
2116 sender_id = None # type: ignore
2117 id_context = None # type: ignore
2118 private_key = None
2119 alg_aead = None
2120 hashfun = None # type: ignore
2121 alg_signature = None
2122 alg_group_enc = None
2123 alg_pairwise_key_agreement = None
2124 sender_auth_cred = None # type: ignore
2125 group_manager_cred = None # type: ignore
2126 cred_fmt = None
2127 # This is currently not evaluated, but any GM interaction will need to have this information available.
2128 group_manager_cred_fmt = None
2130 def __init__(
2131 self,
2132 alg_aead,
2133 hashfun,
2134 alg_signature,
2135 alg_group_enc,
2136 alg_pairwise_key_agreement,
2137 group_id,
2138 master_secret,
2139 master_salt,
2140 sender_id,
2141 private_key,
2142 sender_auth_cred,
2143 peers,
2144 group_manager_cred,
2145 cred_fmt=COSE_KCCS,
2146 group_manager_cred_fmt=COSE_KCCS,
2147 ):
2148 self.sender_id = sender_id
2149 self.id_context = group_id
2150 self.private_key = private_key
2151 self.alg_aead = alg_aead
2152 self.hashfun = hashfun
2153 self.alg_signature = alg_signature
2154 self.alg_group_enc = alg_group_enc
2155 self.alg_pairwise_key_agreement = alg_pairwise_key_agreement
2156 self.sender_auth_cred = sender_auth_cred
2157 self.group_manager_cred = group_manager_cred
2158 self.cred_fmt = cred_fmt
2159 self.group_manager_cred_fmt = group_manager_cred_fmt
2161 self.peers = peers.keys()
2162 self.recipient_public_keys = {
2163 k: self._parse_credential(v) for (k, v) in peers.items()
2164 }
2165 self.recipient_auth_creds = peers
2166 self.recipient_replay_windows = {}
2167 for k in self.peers:
2168 # no need to persist, the whole group is ephemeral
2169 w = ReplayWindow(32, lambda: None)
2170 w.initialize_empty()
2171 self.recipient_replay_windows[k] = w
2173 self.derive_keys(master_salt, master_secret)
2174 self.sender_sequence_number = 0
2176 sender_public_key = self._parse_credential(sender_auth_cred)
2177 if (
2178 self.alg_signature.public_from_private(self.private_key)
2179 != sender_public_key
2180 ):
2181 raise ValueError(
2182 "The key in the provided sender credential does not match the private key"
2183 )
2185 def _parse_credential(self, credential: bytes | _DeterministicKey):
2186 """Extract the public key (in the public_key format the respective
2187 AlgorithmCountersign needs) from credentials. This raises a ValueError
2188 if the credentials do not match the group's cred_fmt, or if the
2189 parameters do not match those configured in the group.
2191 This currently discards any information that is present in the
2192 credential that exceeds the key. (In a future version, this could
2193 return both the key and extracted other data, where that other data
2194 would be stored with the peer this is parsed from).
2195 """
2197 if credential is DETERMINISTIC_KEY:
2198 return credential
2200 if self.cred_fmt != COSE_KCCS:
2201 raise ValueError(
2202 "Credential parsing is currently only implemented for CCSs"
2203 )
2205 assert self.alg_signature is not None
2207 return self.alg_signature.from_kccs(credential)
2209 def __repr__(self):
2210 return "<%s with group %r sender_id %r and %d peers>" % (
2211 type(self).__name__,
2212 self.id_context.hex(),
2213 self.sender_id.hex(),
2214 len(self.peers),
2215 )
2217 @property
2218 def recipient_public_key(self):
2219 raise RuntimeError(
2220 "Group context without key indication was used for verification"
2221 )
2223 def _get_sender_key(self, outer_message, aad, plaintext, request_id):
2224 # If we even get here, there has to be a alg_group_enc, and thus the sender key does match it
2225 return self._sender_key
2227 def derive_keys(self, master_salt, master_secret):
2228 the_main_alg = (
2229 self.alg_group_enc if self.alg_group_enc is not None else self.alg_aead
2230 )
2232 self._sender_key = self._kdf(
2233 master_salt, master_secret, self.sender_id, "Key", the_main_alg
2234 )
2235 self.recipient_keys = {
2236 recipient_id: self._kdf(
2237 master_salt, master_secret, recipient_id, "Key", the_main_alg
2238 )
2239 for recipient_id in self.peers
2240 }
2242 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV")
2244 self.signature_encryption_key = self._kdf(
2245 master_salt, master_secret, b"", "SEKey"
2246 )
2248 def post_seqnoincrease(self):
2249 """No-op because it's ephemeral"""
2251 def context_from_response(self, unprotected_bag) -> CanUnprotect:
2252 # sender ID *needs to be* here -- if this were a pairwise request, it
2253 # would not run through here
2254 try:
2255 sender_kid = unprotected_bag[COSE_KID]
2256 except KeyError:
2257 raise DecodeError("Group server failed to send own sender KID")
2259 if COSE_COUNTERSIGNATURE0 in unprotected_bag:
2260 return _GroupContextAspect(self, sender_kid)
2261 else:
2262 return _PairwiseContextAspect(self, sender_kid)
2264 def get_oscore_context_for(self, unprotected):
2265 if unprotected.get(COSE_KID_CONTEXT, None) != self.id_context:
2266 return None
2268 kid = unprotected.get(COSE_KID, None)
2269 if kid in self.peers:
2270 if COSE_COUNTERSIGNATURE0 in unprotected:
2271 return _GroupContextAspect(self, kid)
2272 elif self.recipient_public_keys[kid] is DETERMINISTIC_KEY:
2273 return _DeterministicUnprotectProtoAspect(self, kid)
2274 else:
2275 return _PairwiseContextAspect(self, kid)
2277 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
2278 # not conflicting: groups always send KID Context
2279 return set()
2281 # yet to stabilize...
2283 def pairwise_for(self, recipient_id):
2284 return _PairwiseContextAspect(self, recipient_id)
2286 def for_sending_deterministic_requests(
2287 self, deterministic_id, target_server: Optional[bytes]
2288 ):
2289 return _DeterministicProtectProtoAspect(self, deterministic_id, target_server)
2292class _GroupContextAspect(GroupContext, CanUnprotect, SecurityContextUtils):
2293 """The concrete context this host has with a particular peer
2295 As all actual data is stored in the underlying groupcontext, this acts as
2296 an accessor to that object (which picks the right recipient key).
2298 This accessor is for receiving messages in group mode from a particular
2299 peer; it does not send (and turns into a pairwise context through
2300 context_for_response before it comes to that).
2301 """
2303 def __init__(self, groupcontext: GroupContext, recipient_id: bytes) -> None:
2304 self.groupcontext = groupcontext
2305 self.recipient_id = recipient_id
2307 def __repr__(self):
2308 return "<%s inside %r with the peer %r>" % (
2309 type(self).__name__,
2310 self.groupcontext,
2311 self.recipient_id.hex(),
2312 )
2314 private_key = None
2316 # not inline because the equivalent lambda would not be recognized by mypy
2317 # (workaround for <https://github.com/python/mypy/issues/8083>)
2318 @property
2319 def id_context(self):
2320 return self.groupcontext.id_context
2322 @property
2323 def alg_aead(self):
2324 return self.groupcontext.alg_aead
2326 @property
2327 def alg_signature(self):
2328 return self.groupcontext.alg_signature
2330 @property
2331 def alg_group_enc(self):
2332 return self.groupcontext.alg_group_enc
2334 @property
2335 def alg_pairwise_key_agreement(self):
2336 return self.groupcontext.alg_pairwise_key_agreement
2338 @property
2339 def group_manager_cred(self):
2340 return self.groupcontext.group_manager_cred
2342 @property
2343 def common_iv(self):
2344 return self.groupcontext.common_iv
2346 @property
2347 def hashfun(self):
2348 return self.groupcontext.hashfun
2350 @property
2351 def signature_encryption_key(self):
2352 return self.groupcontext.signature_encryption_key
2354 @property
2355 def recipient_key(self):
2356 # If we even get here, there has to be a alg_group_enc, and thus the recipient key does match it
2357 return self.groupcontext.recipient_keys[self.recipient_id]
2359 @property
2360 def recipient_public_key(self):
2361 return self.groupcontext.recipient_public_keys[self.recipient_id]
2363 @property
2364 def recipient_auth_cred(self):
2365 return self.groupcontext.recipient_auth_creds[self.recipient_id]
2367 @property
2368 def recipient_replay_window(self):
2369 return self.groupcontext.recipient_replay_windows[self.recipient_id]
2371 def context_for_response(self):
2372 return self.groupcontext.pairwise_for(self.recipient_id)
2374 @property
2375 def sender_auth_cred(self):
2376 raise RuntimeError(
2377 "Could relay the sender auth credential from the group context, but it shouldn't matter here"
2378 )
2381class _PairwiseContextAspect(
2382 GroupContext, CanProtect, CanUnprotect, SecurityContextUtils
2383):
2384 is_signing = False
2386 def __init__(self, groupcontext, recipient_id):
2387 self.groupcontext = groupcontext
2388 self.recipient_id = recipient_id
2390 shared_secret = self.alg_pairwise_key_agreement.staticstatic(
2391 self.groupcontext.private_key,
2392 self.groupcontext.recipient_public_keys[recipient_id],
2393 )
2395 self.sender_key = self._kdf(
2396 self.groupcontext._sender_key,
2397 (
2398 self.groupcontext.sender_auth_cred
2399 + self.groupcontext.recipient_auth_creds[recipient_id]
2400 + shared_secret
2401 ),
2402 self.groupcontext.sender_id,
2403 "Key",
2404 self.alg_group_enc if self.is_signing else self.alg_aead,
2405 )
2406 self.recipient_key = self._kdf(
2407 self.groupcontext.recipient_keys[recipient_id],
2408 (
2409 self.groupcontext.recipient_auth_creds[recipient_id]
2410 + self.groupcontext.sender_auth_cred
2411 + shared_secret
2412 ),
2413 self.recipient_id,
2414 "Key",
2415 self.alg_group_enc if self.is_signing else self.alg_aead,
2416 )
2418 def __repr__(self):
2419 return "<%s based on %r with the peer %r>" % (
2420 type(self).__name__,
2421 self.groupcontext,
2422 self.recipient_id.hex(),
2423 )
2425 # FIXME: actually, only to be sent in requests
2427 # not inline because the equivalent lambda would not be recognized by mypy
2428 # (workaround for <https://github.com/python/mypy/issues/8083>)
2429 @property
2430 def id_context(self):
2431 return self.groupcontext.id_context
2433 @property
2434 def alg_aead(self):
2435 return self.groupcontext.alg_aead
2437 @property
2438 def hashfun(self):
2439 return self.groupcontext.hashfun
2441 @property
2442 def alg_signature(self):
2443 return self.groupcontext.alg_signature
2445 @property
2446 def alg_group_enc(self):
2447 return self.groupcontext.alg_group_enc
2449 @property
2450 def alg_pairwise_key_agreement(self):
2451 return self.groupcontext.alg_pairwise_key_agreement
2453 @property
2454 def group_manager_cred(self):
2455 return self.groupcontext.group_manager_cred
2457 @property
2458 def common_iv(self):
2459 return self.groupcontext.common_iv
2461 @property
2462 def sender_id(self):
2463 return self.groupcontext.sender_id
2465 @property
2466 def recipient_auth_cred(self):
2467 return self.groupcontext.recipient_auth_creds[self.recipient_id]
2469 @property
2470 def sender_auth_cred(self):
2471 return self.groupcontext.sender_auth_cred
2473 @property
2474 def recipient_replay_window(self):
2475 return self.groupcontext.recipient_replay_windows[self.recipient_id]
2477 # Set at initialization (making all those attributes rather than
2478 # possibly properties as they might be in super)
2479 recipient_key = None # type: ignore
2480 sender_key = None
2482 @property
2483 def sender_sequence_number(self):
2484 return self.groupcontext.sender_sequence_number
2486 @sender_sequence_number.setter
2487 def sender_sequence_number(self, new):
2488 self.groupcontext.sender_sequence_number = new
2490 def post_seqnoincrease(self):
2491 self.groupcontext.post_seqnoincrease()
2493 # same here -- not needed because not signing
2494 private_key = property(post_seqnoincrease)
2495 recipient_public_key = property(post_seqnoincrease)
2497 def context_from_response(self, unprotected_bag) -> CanUnprotect:
2498 if unprotected_bag.get(COSE_KID, self.recipient_id) != self.recipient_id:
2499 raise DecodeError(
2500 "Response coming from a different server than requested, not attempting to decrypt"
2501 )
2503 if COSE_COUNTERSIGNATURE0 in unprotected_bag:
2504 # It'd be an odd thing to do, but it's source verified, so the
2505 # server hopefully has reasons to make this readable to other group
2506 # members.
2507 return _GroupContextAspect(self.groupcontext, self.recipient_id)
2508 else:
2509 return self
2512class _DeterministicProtectProtoAspect(
2513 ContextWhereExternalAadIsGroup, CanProtect, SecurityContextUtils
2514):
2515 """This implements the sending side of Deterministic Requests.
2517 While similar to a _PairwiseContextAspect, it only derives the key at
2518 protection time, as the plain text is hashed into the key."""
2520 deterministic_hashfun = hashes.SHA256()
2522 def __init__(self, groupcontext, sender_id, target_server: Optional[bytes]):
2523 self.groupcontext = groupcontext
2524 self.sender_id = sender_id
2525 self.target_server = target_server
2527 def __repr__(self):
2528 return "<%s based on %r with the sender ID %r%s>" % (
2529 type(self).__name__,
2530 self.groupcontext,
2531 self.sender_id.hex(),
2532 "limited to responses from %s" % self.target_server
2533 if self.target_server is not None
2534 else "",
2535 )
2537 def new_sequence_number(self):
2538 return 0
2540 def post_seqnoincrease(self):
2541 pass
2543 def context_from_response(self, unprotected_bag):
2544 if self.target_server is None:
2545 if COSE_KID not in unprotected_bag:
2546 raise DecodeError(
2547 "Server did not send a KID and no particular one was addressed"
2548 )
2549 else:
2550 if unprotected_bag.get(COSE_KID, self.target_server) != self.target_server:
2551 raise DecodeError(
2552 "Response coming from a different server than requested, not attempting to decrypt"
2553 )
2555 if COSE_COUNTERSIGNATURE0 not in unprotected_bag:
2556 # Could just as well pass and later barf when the group context doesn't find a signature
2557 raise DecodeError(
2558 "Response to deterministic request came from insecure pairwise context"
2559 )
2561 return _GroupContextAspect(
2562 self.groupcontext, unprotected_bag.get(COSE_KID, self.target_server)
2563 )
2565 def _get_sender_key(self, outer_message, aad, plaintext, request_id):
2566 if outer_message.code.is_response():
2567 raise RuntimeError("Deterministic contexts shouldn't protect responses")
2569 basekey = self.groupcontext.recipient_keys[self.sender_id]
2571 h = hashes.Hash(self.deterministic_hashfun)
2572 h.update(basekey)
2573 h.update(aad)
2574 h.update(plaintext)
2575 request_hash = h.finalize()
2577 outer_message.opt.request_hash = request_hash
2578 outer_message.code = FETCH
2580 # By this time, the AADs have all been calculated already; setting this
2581 # for the benefit of the response parsing later
2582 request_id.request_hash = request_hash
2583 # FIXME I don't think this ever comes to bear but want to be sure
2584 # before removing this line (this should only be client-side)
2585 request_id.can_reuse_nonce = False
2586 # FIXME: we're still sending a h'00' PIV. Not wrong, just a wasted byte.
2588 return self._kdf(basekey, request_hash, self.sender_id, "Key", self.alg_aead)
2590 # details needed for various operations, especially eAAD generation
2592 @property
2593 def sender_auth_cred(self):
2594 # When preparing the external_aad, the element 'sender_cred' in the
2595 # aad_array takes the empty CBOR byte string (0x40).
2596 return b""
2598 # not inline because the equivalent lambda would not be recognized by mypy
2599 # (workaround for <https://github.com/python/mypy/issues/8083>)
2600 @property
2601 def alg_aead(self):
2602 return self.groupcontext.alg_aead
2604 @property
2605 def alg_group_enc(self):
2606 return self.groupcontext.alg_group_enc
2608 @property
2609 def alg_pairwise_key_agreement(self):
2610 return self.groupcontext.alg_pairwise_key_agreement
2612 @property
2613 def hashfun(self):
2614 return self.groupcontext.hashfun
2616 @property
2617 def common_iv(self):
2618 return self.groupcontext.common_iv
2620 @property
2621 def id_context(self):
2622 return self.groupcontext.id_context
2624 @property
2625 def alg_signature(self):
2626 return self.groupcontext.alg_signature
2628 @property
2629 def group_manager_cred(self):
2630 return self.groupcontext.group_manager_cred
2633class _DeterministicUnprotectProtoAspect(
2634 ContextWhereExternalAadIsGroup, CanUnprotect, SecurityContextUtils
2635):
2636 """This implements the sending side of Deterministic Requests.
2638 While similar to a _PairwiseContextAspect, it only derives the key at
2639 unprotection time, based on information given as Request-Hash."""
2641 # Unless None, this is the value by which the running process recognizes
2642 # that the second phase of a B.1.2 replay window recovery Echo option comes
2643 # from the current process, and thus its sequence number is fresh
2644 echo_recovery = None
2646 deterministic_hashfun = hashes.SHA256()
2648 class ZeroIsAlwaysValid:
2649 """Special-purpose replay window that accepts 0 indefinitely"""
2651 def is_initialized(self):
2652 return True
2654 def is_valid(self, number):
2655 # No particular reason to be lax here
2656 return number == 0
2658 def strike_out(self, number):
2659 # FIXME: I'd rather indicate here that it's a potential replay, have the
2660 # request_id.can_reuse_nonce = False
2661 # set here rather than in _post_decrypt_checks, and thus also get
2662 # the check for whether it's a safe method
2663 pass
2665 def persist(self):
2666 pass
2668 def __init__(self, groupcontext, recipient_id):
2669 self.groupcontext = groupcontext
2670 self.recipient_id = recipient_id
2672 self.recipient_replay_window = self.ZeroIsAlwaysValid()
2674 def __repr__(self):
2675 return "<%s based on %r with the recipient ID %r>" % (
2676 type(self).__name__,
2677 self.groupcontext,
2678 self.recipient_id.hex(),
2679 )
2681 def context_for_response(self):
2682 return self.groupcontext
2684 def _get_recipient_key(self, protected_message, algorithm):
2685 logging.critical(
2686 "Deriving recipient key for protected message %s", protected_message
2687 )
2688 return self._kdf(
2689 self.groupcontext.recipient_keys[self.recipient_id],
2690 protected_message.opt.request_hash,
2691 self.recipient_id,
2692 "Key",
2693 algorithm,
2694 )
2696 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id):
2697 if plaintext[0] not in (GET, FETCH): # FIXME: "is safe"
2698 # FIXME: accept but return inner Unauthorized. (Raising Unauthorized
2699 # here would just create an unprotected Unauthorized, which is not
2700 # what's spec'd for here)
2701 raise ProtectionInvalid("Request was not safe")
2703 basekey = self.groupcontext.recipient_keys[self.recipient_id]
2705 h = hashes.Hash(self.deterministic_hashfun)
2706 h.update(basekey)
2707 h.update(aad)
2708 h.update(plaintext)
2709 request_hash = h.finalize()
2711 if request_hash != protected_message.opt.request_hash:
2712 raise ProtectionInvalid(
2713 "Client's hash of the plaintext diverges from the actual request hash"
2714 )
2716 # This is intended for the protection of the response, and the
2717 # later use in signature in the unprotect function is not happening
2718 # here anyway, neither is the later use for Echo requests
2719 request_id.request_hash = request_hash
2720 request_id.can_reuse_nonce = False
2722 # details needed for various operations, especially eAAD generation
2724 @property
2725 def recipient_auth_cred(self):
2726 # When preparing the external_aad, the element 'sender_cred' in the
2727 # aad_array takes the empty CBOR byte string (0x40).
2728 return b""
2730 # not inline because the equivalent lambda would not be recognized by mypy
2731 # (workaround for <https://github.com/python/mypy/issues/8083>)
2732 @property
2733 def alg_aead(self):
2734 return self.groupcontext.alg_aead
2736 @property
2737 def alg_group_enc(self):
2738 return self.groupcontext.alg_group_enc
2740 @property
2741 def alg_pairwise_key_agreement(self):
2742 return self.groupcontext.alg_pairwise_key_agreement
2744 @property
2745 def hashfun(self):
2746 return self.groupcontext.hashfun
2748 @property
2749 def common_iv(self):
2750 return self.groupcontext.common_iv
2752 @property
2753 def id_context(self):
2754 return self.groupcontext.id_context
2756 @property
2757 def alg_signature(self):
2758 return self.groupcontext.alg_signature
2760 @property
2761 def group_manager_cred(self):
2762 return self.groupcontext.group_manager_cred
2765def verify_start(message):
2766 """Extract the unprotected COSE options from a
2767 message for the verifier to then pick a security context to actually verify
2768 the message. (Future versions may also report fields from both unprotected
2769 and protected, if the protected bag is ever used with OSCORE.).
2771 Call this only requests; for responses, you'll have to know the security
2772 context anyway, and there is usually no information to be gained."""
2774 _, _, unprotected, _ = CanUnprotect._extract_encrypted0(message)
2776 return unprotected
2779_getattr__ = deprecation_getattr(
2780 {
2781 "COSE_COUNTERSINGATURE0": "COSE_COUNTERSIGNATURE0",
2782 "Algorithm": "AeadAlgorithm",
2783 },
2784 globals(),
2785)