Coverage for src/aiocoap/oscore.py: 0%
1214 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 08:24 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-07-01 08:24 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module contains the tools to send OSCORE secured messages.
7It only deals with the algorithmic parts, the security context and protection
8and unprotection of messages. It does not touch on the integration of OSCORE in
9the larger aiocoap stack of having a context or requests; that's what
10:mod:`aiocoap.transports.osore` is for.`"""
12from __future__ import annotations
14from collections import namedtuple
15import io
16import json
17import binascii
18import os
19import os.path
20import tempfile
21import abc
22from typing import Optional, List, Any, Tuple
23import secrets
24import warnings
25import logging
27from aiocoap.message import Message
28from aiocoap.util import cryptography_additions, deprecation_getattr, Sentinel
29from aiocoap.numbers import GET, POST, FETCH, CHANGED, UNAUTHORIZED, CONTENT
30from aiocoap import error
31from . import credentials
32from aiocoap.defaults import log_secret
34from cryptography.hazmat.primitives.ciphers import aead
35from cryptography.hazmat.primitives.kdf.hkdf import HKDF
36from cryptography.hazmat.primitives import ciphers, hashes
37import cryptography.hazmat.backends
38import cryptography.exceptions
39from cryptography.hazmat.primitives import asymmetric, serialization
40from cryptography.hazmat.primitives.asymmetric.utils import (
41 decode_dss_signature,
42 encode_dss_signature,
43)
45import cbor2 as cbor
47import filelock
49# Logger through which log events from cryptographic operations (both inside
50# the primitives and around key derivation) are traced.
51_alglog = logging.getLogger("aiocoap.cryptography")
53MAX_SEQNO = 2**40 - 1
55# Relevant values from the IANA registry "CBOR Object Signing and Encryption (COSE)"
56COSE_KID = 4
57COSE_PIV = 6
58COSE_KID_CONTEXT = 10
59# from RFC9338
60COSE_COUNTERSIGNATURE0 = 12
61# from RFC9528
62COSE_KCCS = 14
64COMPRESSION_BITS_N = 0b111
65COMPRESSION_BIT_K = 0b1000
66COMPRESSION_BIT_H = 0b10000
67COMPRESSION_BIT_GROUP = 0b100000 # Group Flag from draft-ietf-core-oscore-groupcomm-21
68COMPRESSION_BITS_RESERVED = 0b11000000
70CWT_CLAIM_CNF = 8
71CWT_CNF_COSE_KEY = 1
72COSE_KEY_COMMON_KTY = 1
73COSE_KTY_OKP = 1
74COSE_KTY_EC2 = 2
75COSE_KEY_COMMON_ALG = 3
76COSE_KEY_OKP_CRV = -1
77COSE_KEY_OKP_X = -2
78COSE_KEY_EC2_X = -2
79COSE_KEY_EC2_Y = -3
81# While the original values were simple enough to be used in literals, starting
82# with oscore-groupcomm we're using more compact values
84INFO_TYPE_KEYSTREAM_REQUEST = True
85INFO_TYPE_KEYSTREAM_RESPONSE = False
87PRESENT_BUT_NO_VALUE_YET = Sentinel("Value will be populated later")
90class CodeStyle(namedtuple("_CodeStyle", ("request", "response"))):
91 FETCH_CONTENT: CodeStyle
92 POST_CHANGED: CodeStyle
94 @classmethod
95 def from_request(cls, request) -> CodeStyle:
96 if request == FETCH:
97 return cls.FETCH_CONTENT
98 elif request == POST:
99 return cls.POST_CHANGED
100 else:
101 raise ValueError("Invalid request code %r" % request)
104CodeStyle.FETCH_CONTENT = CodeStyle(FETCH, CONTENT)
105CodeStyle.POST_CHANGED = CodeStyle(POST, CHANGED)
108class _DeterministicKey:
109 """Singleton to indicate that for this key member no public or private key
110 is available because it is the Deterministic Client (see
111 <https://www.ietf.org/archive/id/draft-amsuess-core-cachable-oscore-01.html>)
113 This is highly experimental not only from an implementation but also from a
114 specification point of view. The specification has not received adaequate
115 review that would justify using it in any non-experimental scenario.
116 """
119DETERMINISTIC_KEY = _DeterministicKey()
122class NotAProtectedMessage(error.Error, ValueError):
123 """Raised when verification is attempted on a non-OSCORE message"""
125 def __init__(self, message, plain_message):
126 super().__init__(message)
127 self.plain_message = plain_message
130class ProtectionInvalid(error.Error, ValueError):
131 """Raised when verification of an OSCORE message fails"""
134class DecodeError(ProtectionInvalid):
135 """Raised when verification of an OSCORE message fails because CBOR or compressed data were erroneous"""
138class ReplayError(ProtectionInvalid):
139 """Raised when verification of an OSCORE message fails because the sequence numbers was already used"""
142class ReplayErrorWithEcho(ProtectionInvalid, error.RenderableError):
143 """Raised when verification of an OSCORE message fails because the
144 recipient replay window is uninitialized, but a 4.01 Echo can be
145 constructed with the data in the exception that can lead to the client
146 assisting in replay window recovery"""
148 def __init__(self, secctx, request_id, echo):
149 self.secctx = secctx
150 self.request_id = request_id
151 self.echo = echo
153 def to_message(self):
154 inner = Message(
155 code=UNAUTHORIZED,
156 echo=self.echo,
157 )
158 outer, _ = self.secctx.protect(inner, request_id=self.request_id)
159 return outer
162class ContextUnavailable(error.Error, ValueError):
163 """Raised when a context is (currently or permanently) unavailable for
164 protecting or unprotecting a message"""
167class RequestIdentifiers:
168 """A container for details that need to be passed along from the
169 (un)protection of a request to the (un)protection of the response; these
170 data ensure that the request-response binding process works by passing
171 around the request's partial IV.
173 Users of this module should never create or interact with instances, but
174 just pass them around.
175 """
177 def __init__(self, kid, partial_iv, can_reuse_nonce, request_code):
178 # The sender ID of whoever generated the partial IV
179 self.kid = kid
180 self.partial_iv = partial_iv
181 self.can_reuse_nonce = can_reuse_nonce
182 self.code_style = CodeStyle.from_request(request_code)
184 self.request_hash = None
186 def get_reusable_kid_and_piv(self):
187 """Return the kid and the partial IV if can_reuse_nonce is True, and
188 set can_reuse_nonce to False."""
190 if self.can_reuse_nonce:
191 self.can_reuse_nonce = False
192 return (self.kid, self.partial_iv)
193 else:
194 return (None, None)
197def _xor_bytes(a, b):
198 assert len(a) == len(b), "XOR needs consistent lengths"
199 # FIXME is this an efficient thing to do, or should we store everything
200 # that possibly needs xor'ing as long integers with an associated length?
201 return bytes(_a ^ _b for (_a, _b) in zip(a, b))
204class SymmetricEncryptionAlgorithm(metaclass=abc.ABCMeta):
205 """A symmetric algorithm
207 The algorithm's API is the AEAD API with addtional authenticated data: The
208 algorihm may or may not verify that data. Algorithms that actually do
209 verify the data are recognized by also being AeadAlgorithm.
210 """
212 value: int
213 key_bytes: int
214 tag_bytes: int
215 iv_bytes: int
217 @abc.abstractmethod
218 def encrypt(cls, plaintext, aad, key, iv):
219 """Return ciphertext + tag for given input data"""
221 @abc.abstractmethod
222 def decrypt(cls, ciphertext_and_tag, aad, key, iv):
223 """Reverse encryption. Must raise ProtectionInvalid on any error
224 stemming from untrusted data."""
226 @staticmethod
227 def _build_encrypt0_structure(protected, external_aad):
228 assert protected == {}, "Unexpected data in protected bucket"
229 protected_serialized = b"" # were it into an empty dict, it'd be the cbor dump
230 enc_structure = ["Encrypt0", protected_serialized, external_aad]
232 return cbor.dumps(enc_structure)
235class AeadAlgorithm(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta):
236 """A symmetric algorithm that provides authentication, including
237 authentication of additional data."""
240class AES_CBC(SymmetricEncryptionAlgorithm, metaclass=abc.ABCMeta):
241 """AES in CBC mode using tthe Python cryptography library"""
243 tag_bytes = 0
244 # This introduces padding -- this library doesn't need to care because
245 # Python does allocation for us, but others may need to rethink their
246 # buffer allocation strategies.
248 @classmethod
249 def _cipher(cls, key, iv):
250 return ciphers.base.Cipher(
251 ciphers.algorithms.AES(key),
252 ciphers.modes.CBC(iv),
253 )
255 @classmethod
256 def encrypt(cls, plaintext, _aad, key, iv):
257 # FIXME: Ignoring aad violates https://www.rfc-editor.org/rfc/rfc9459.html#name-implementation-consideratio but is required for Group OSCORE
259 # Padding according to https://www.rfc-editor.org/rfc/rfc5652#section-6.3
260 k = cls.key_bytes
261 assert k < 256, (
262 "Algorithm with this key size should not have been created in the first plae"
263 )
264 pad_byte = k - (len(plaintext) % k)
265 pad_bytes = bytes((pad_byte,)) * pad_byte
266 plaintext += pad_bytes
268 encryptor = cls._cipher(key, iv).encryptor()
269 result = encryptor.update(plaintext)
270 result += encryptor.finalize()
271 return result
273 @classmethod
274 def decrypt(cls, ciphertext_and_tag, _aad, key, iv):
275 # FIXME: Ignoring aad violates https://www.rfc-editor.org/rfc/rfc9459.html#name-implementation-consideratio but is required for Group OSCORE
277 k = cls.key_bytes
278 if ciphertext_and_tag == b"" or len(ciphertext_and_tag) % k != 0:
279 raise ProtectionInvalid("Message length does not match padding")
281 decryptor = cls._cipher(key, iv).decryptor()
282 result = decryptor.update(ciphertext_and_tag)
283 result += decryptor.finalize()
285 # Padding according to https://www.rfc-editor.org/rfc/rfc5652#section-6.3
286 claimed_padding = result[-1]
287 if claimed_padding == 0 or claimed_padding > k:
288 raise ProtectionInvalid("Padding does not match key")
289 if result[-claimed_padding:] != bytes((claimed_padding,)) * claimed_padding:
290 raise ProtectionInvalid("Padding is inconsistent")
292 return result[:-claimed_padding]
295class A128CBC(AES_CBC):
296 # from RFC9459
297 value = -65531
298 key_bytes = 16 # 128-bit key
299 iv_bytes = 16 # 16-octet nonce
302class AES_CCM(AeadAlgorithm, metaclass=abc.ABCMeta):
303 """AES-CCM implemented using the Python cryptography library"""
305 @classmethod
306 def encrypt(cls, plaintext, aad, key, iv):
307 return aead.AESCCM(key, cls.tag_bytes).encrypt(iv, plaintext, aad)
309 @classmethod
310 def decrypt(cls, ciphertext_and_tag, aad, key, iv):
311 try:
312 return aead.AESCCM(key, cls.tag_bytes).decrypt(iv, ciphertext_and_tag, aad)
313 except cryptography.exceptions.InvalidTag:
314 raise ProtectionInvalid("Tag invalid")
317class AES_CCM_16_64_128(AES_CCM):
318 # from RFC8152 and draft-ietf-core-object-security-0[012] 3.2.1
319 value = 10
320 key_bytes = 16 # 128-bit key
321 tag_bytes = 8 # 64-bit tag
322 iv_bytes = 13 # 13-byte nonce
325class AES_CCM_16_64_256(AES_CCM):
326 # from RFC8152
327 value = 11
328 key_bytes = 32 # 256-bit key
329 tag_bytes = 8 # 64-bit tag
330 iv_bytes = 13 # 13-byte nonce
333class AES_CCM_64_64_128(AES_CCM):
334 # from RFC8152
335 value = 12
336 key_bytes = 16 # 128-bit key
337 tag_bytes = 8 # 64-bit tag
338 iv_bytes = 7 # 7-byte nonce
341class AES_CCM_64_64_256(AES_CCM):
342 # from RFC8152
343 value = 13
344 key_bytes = 32 # 256-bit key
345 tag_bytes = 8 # 64-bit tag
346 iv_bytes = 7 # 7-byte nonce
349class AES_CCM_16_128_128(AES_CCM):
350 # from RFC8152
351 value = 30
352 key_bytes = 16 # 128-bit key
353 tag_bytes = 16 # 128-bit tag
354 iv_bytes = 13 # 13-byte nonce
357class AES_CCM_16_128_256(AES_CCM):
358 # from RFC8152
359 value = 31
360 key_bytes = 32 # 256-bit key
361 tag_bytes = 16 # 128-bit tag
362 iv_bytes = 13 # 13-byte nonce
365class AES_CCM_64_128_128(AES_CCM):
366 # from RFC8152
367 value = 32
368 key_bytes = 16 # 128-bit key
369 tag_bytes = 16 # 128-bit tag
370 iv_bytes = 7 # 7-byte nonce
373class AES_CCM_64_128_256(AES_CCM):
374 # from RFC8152
375 value = 33
376 key_bytes = 32 # 256-bit key
377 tag_bytes = 16 # 128-bit tag
378 iv_bytes = 7 # 7-byte nonce
381class AES_GCM(AeadAlgorithm, metaclass=abc.ABCMeta):
382 """AES-GCM implemented using the Python cryptography library"""
384 iv_bytes = 12 # 96 bits fixed size of the nonce
386 @classmethod
387 def encrypt(cls, plaintext, aad, key, iv):
388 return aead.AESGCM(key).encrypt(iv, plaintext, aad)
390 @classmethod
391 def decrypt(cls, ciphertext_and_tag, aad, key, iv):
392 try:
393 return aead.AESGCM(key).decrypt(iv, ciphertext_and_tag, aad)
394 except cryptography.exceptions.InvalidTag:
395 raise ProtectionInvalid("Tag invalid")
398class A128GCM(AES_GCM):
399 # from RFC8152
400 value = 1
401 key_bytes = 16 # 128-bit key
402 tag_bytes = 16 # 128-bit tag
405class A192GCM(AES_GCM):
406 # from RFC8152
407 value = 2
408 key_bytes = 24 # 192-bit key
409 tag_bytes = 16 # 128-bit tag
412class A256GCM(AES_GCM):
413 # from RFC8152
414 value = 3
415 key_bytes = 32 # 256-bit key
416 tag_bytes = 16 # 128-bit tag
419class ChaCha20Poly1305(AeadAlgorithm):
420 # from RFC8152
421 value = 24
422 key_bytes = 32 # 256-bit key
423 tag_bytes = 16 # 128-bit tag
424 iv_bytes = 12 # 96-bit nonce
426 @classmethod
427 def encrypt(cls, plaintext, aad, key, iv):
428 return aead.ChaCha20Poly1305(key).encrypt(iv, plaintext, aad)
430 @classmethod
431 def decrypt(cls, ciphertext_and_tag, aad, key, iv):
432 try:
433 return aead.ChaCha20Poly1305(key).decrypt(iv, ciphertext_and_tag, aad)
434 except cryptography.exceptions.InvalidTag:
435 raise ProtectionInvalid("Tag invalid")
438class AlgorithmCountersign(metaclass=abc.ABCMeta):
439 """A fully parameterized COSE countersign algorithm
441 An instance is able to provide all the alg_signature, par_countersign and
442 par_countersign_key parameters taht go into the Group OSCORE algorithms
443 field.
444 """
446 value: int | str
448 @abc.abstractmethod
449 def sign(self, body, external_aad, private_key):
450 """Return the signature produced by the key when using
451 CounterSignature0 as describe in draft-ietf-cose-countersign-01"""
453 @abc.abstractmethod
454 def verify(self, signature, body, external_aad, public_key):
455 """Verify a signature in analogy to sign"""
457 @abc.abstractmethod
458 def generate_with_ccs(self) -> Tuple[Any, bytes]:
459 """Return a usable private key along with a CCS describing it"""
461 @abc.abstractmethod
462 def public_from_private(self, private_key):
463 """Given a private key, derive the publishable key"""
465 @abc.abstractmethod
466 def from_kccs(self, ccs: bytes) -> Any:
467 """Given a CCS, extract the public key, or raise a ValueError if the
468 credential format does not align with the type.
470 The type is not exactly Any, but whichever type is used by this
471 algorithm class."""
473 @staticmethod
474 def _build_countersign_structure(body, external_aad):
475 countersign_structure = [
476 "CounterSignature0",
477 b"",
478 b"",
479 external_aad,
480 body,
481 ]
482 tobesigned = cbor.dumps(countersign_structure)
483 return tobesigned
485 @abc.abstractproperty
486 def signature_length(self) -> int:
487 """The length of a signature using this algorithm"""
489 @abc.abstractproperty
490 def curve_number(self) -> int:
491 """Registered curve number used with this algorithm.
493 Only used for verification of credentials' details"""
496class AlgorithmStaticStatic(metaclass=abc.ABCMeta):
497 @abc.abstractmethod
498 def staticstatic(self, private_key, public_key):
499 """Derive a shared static-static secret from a private and a public key"""
502def _from_kccs_common(ccs: bytes) -> dict:
503 """Check that the CCS contains a CNF claim that is a COSE Key, and return
504 that key"""
506 try:
507 parsed = cbor.loads(ccs)
508 except cbor.CBORDecodeError as e:
509 raise ValueError("CCS not in CBOR format") from e
511 if (
512 not isinstance(parsed, dict)
513 or CWT_CLAIM_CNF not in parsed
514 or not isinstance(parsed[CWT_CLAIM_CNF], dict)
515 or CWT_CNF_COSE_KEY not in parsed[CWT_CLAIM_CNF]
516 or not isinstance(parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY], dict)
517 ):
518 raise ValueError("CCS must contain a COSE Key dict in a CNF")
520 return parsed[CWT_CLAIM_CNF][CWT_CNF_COSE_KEY]
523class Ed25519(AlgorithmCountersign):
524 def sign(self, body, aad, private_key):
525 _alglog.debug("Perfoming signature:")
526 _alglog.debug("* body: %s", body.hex())
527 _alglog.debug("* AAD: %s", aad.hex())
528 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(
529 private_key
530 )
531 return private_key.sign(self._build_countersign_structure(body, aad))
533 def verify(self, signature, body, aad, public_key):
534 _alglog.debug("Verifying signature:")
535 _alglog.debug("* body: %s", body.hex())
536 _alglog.debug("* AAD: %s", aad.hex())
537 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key)
538 try:
539 public_key.verify(signature, self._build_countersign_structure(body, aad))
540 except cryptography.exceptions.InvalidSignature:
541 _alglog.debug("Signature was invalid.")
542 raise ProtectionInvalid("Signature mismatch")
544 def _generate(self):
545 key = asymmetric.ed25519.Ed25519PrivateKey.generate()
546 # FIXME: We could avoid handing the easy-to-misuse bytes around if the
547 # current algorithm interfaces did not insist on passing the
548 # exchangable representations -- and generally that should be more
549 # efficient.
550 return key.private_bytes(
551 encoding=serialization.Encoding.Raw,
552 format=serialization.PrivateFormat.Raw,
553 encryption_algorithm=serialization.NoEncryption(),
554 )
556 def generate_with_ccs(self) -> Tuple[Any, bytes]:
557 private = self._generate()
558 public = self.public_from_private(private)
560 ccs = cbor.dumps(
561 {
562 CWT_CLAIM_CNF: {
563 CWT_CNF_COSE_KEY: {
564 COSE_KEY_COMMON_KTY: COSE_KTY_OKP,
565 COSE_KEY_COMMON_ALG: self.value,
566 COSE_KEY_OKP_CRV: self.curve_number,
567 COSE_KEY_OKP_X: public,
568 }
569 }
570 }
571 )
573 return (private, ccs)
575 def public_from_private(self, private_key):
576 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(
577 private_key
578 )
579 public_key = private_key.public_key()
580 return public_key.public_bytes(
581 encoding=serialization.Encoding.Raw,
582 format=serialization.PublicFormat.Raw,
583 )
585 def from_kccs(self, ccs: bytes) -> Any:
586 # eg. {1: 1, 3: -8, -1: 6, -2: h'77 ... 88'}
587 cose_key = _from_kccs_common(ccs)
589 if (
590 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_OKP
591 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value
592 and cose_key.get(COSE_KEY_OKP_CRV) == self.curve_number
593 and COSE_KEY_OKP_X in cose_key
594 ):
595 return cose_key[COSE_KEY_OKP_X]
596 else:
597 raise ValueError("Key type not recognized from CCS key %r" % cose_key)
599 value = -8
600 curve_number = 6
602 signature_length = 64
605class EcdhSsHkdf256(AlgorithmStaticStatic):
606 # FIXME: This class uses the Edwards keys as private and public keys, and
607 # not the converted ones. This will be problematic if pairwise-only
608 # contexts are to be set up.
610 value = -27
612 # FIXME these two will be different when using the Montgomery keys directly
614 # This one will only be used when establishing and distributing pairwise-only keys
615 public_from_private = Ed25519.public_from_private
617 def staticstatic(self, private_key, public_key):
618 private_key = asymmetric.ed25519.Ed25519PrivateKey.from_private_bytes(
619 private_key
620 )
621 private_key = cryptography_additions.sk_to_curve25519(private_key)
623 public_key = asymmetric.ed25519.Ed25519PublicKey.from_public_bytes(public_key)
624 public_key = cryptography_additions.pk_to_curve25519(public_key)
626 return private_key.exchange(public_key)
629class ECDSA_SHA256_P256(AlgorithmCountersign, AlgorithmStaticStatic):
630 # Trying a new construction approach -- should work just as well given
631 # we're just passing Python objects around
632 def from_public_parts(self, x: bytes, y: bytes):
633 """Create a public key from its COSE values"""
634 return asymmetric.ec.EllipticCurvePublicNumbers(
635 int.from_bytes(x, "big"),
636 int.from_bytes(y, "big"),
637 asymmetric.ec.SECP256R1(),
638 ).public_key()
640 def from_kccs(self, ccs: bytes) -> Any:
641 cose_key = _from_kccs_common(ccs)
643 if (
644 cose_key.get(COSE_KEY_COMMON_KTY) == COSE_KTY_EC2
645 and cose_key.get(COSE_KEY_COMMON_ALG) == self.value
646 and COSE_KEY_EC2_X in cose_key
647 and COSE_KEY_EC2_Y in cose_key
648 ):
649 return self.from_public_parts(
650 x=cose_key[COSE_KEY_EC2_X],
651 y=cose_key[COSE_KEY_EC2_Y],
652 )
653 else:
654 raise ValueError("Key type not recognized from CCS key %r" % cose_key)
656 def from_private_parts(self, x: bytes, y: bytes, d: bytes):
657 public_numbers = self.from_public_parts(x, y).public_numbers()
658 private_numbers = asymmetric.ec.EllipticCurvePrivateNumbers(
659 int.from_bytes(d, "big"), public_numbers
660 )
661 return private_numbers.private_key()
663 def sign(self, body, aad, private_key):
664 der_signature = private_key.sign(
665 self._build_countersign_structure(body, aad),
666 asymmetric.ec.ECDSA(hashes.SHA256()),
667 )
668 (r, s) = decode_dss_signature(der_signature)
670 return r.to_bytes(32, "big") + s.to_bytes(32, "big")
672 def verify(self, signature, body, aad, public_key):
673 r = signature[:32]
674 s = signature[32:]
675 r = int.from_bytes(r, "big")
676 s = int.from_bytes(s, "big")
677 der_signature = encode_dss_signature(r, s)
678 try:
679 public_key.verify(
680 der_signature,
681 self._build_countersign_structure(body, aad),
682 asymmetric.ec.ECDSA(hashes.SHA256()),
683 )
684 except cryptography.exceptions.InvalidSignature:
685 raise ProtectionInvalid("Signature mismatch")
687 def _generate(self):
688 return asymmetric.ec.generate_private_key(asymmetric.ec.SECP256R1())
690 def generate_with_ccs(self) -> Tuple[Any, bytes]:
691 private = self._generate()
692 public = self.public_from_private(private)
693 # FIXME: Deduplicate with edhoc.py
694 x = public.public_numbers().x.to_bytes(32, "big")
695 y = public.public_numbers().y.to_bytes(32, "big")
697 ccs = cbor.dumps(
698 {
699 CWT_CLAIM_CNF: {
700 CWT_CNF_COSE_KEY: {
701 COSE_KEY_COMMON_KTY: COSE_KTY_EC2,
702 COSE_KEY_COMMON_ALG: self.value,
703 COSE_KEY_EC2_X: x,
704 COSE_KEY_EC2_Y: y,
705 }
706 }
707 }
708 )
710 return (private, ccs)
712 def public_from_private(self, private_key):
713 return private_key.public_key()
715 def staticstatic(self, private_key, public_key):
716 return private_key.exchange(asymmetric.ec.ECDH(), public_key)
718 value = -7 # FIXME: when used as a static-static algorithm, does this become -27? see shepherd review.
719 curve_number = 1
721 signature_length = 64
724algorithms = {
725 "AES-CCM-16-64-128": AES_CCM_16_64_128(),
726 "AES-CCM-16-64-256": AES_CCM_16_64_256(),
727 "AES-CCM-64-64-128": AES_CCM_64_64_128(),
728 "AES-CCM-64-64-256": AES_CCM_64_64_256(),
729 "AES-CCM-16-128-128": AES_CCM_16_128_128(),
730 "AES-CCM-16-128-256": AES_CCM_16_128_256(),
731 "AES-CCM-64-128-128": AES_CCM_64_128_128(),
732 "AES-CCM-64-128-256": AES_CCM_64_128_256(),
733 "ChaCha20/Poly1305": ChaCha20Poly1305(),
734 "A128GCM": A128GCM(),
735 "A192GCM": A192GCM(),
736 "A256GCM": A256GCM(),
737 "A128CBC": A128CBC(),
738}
740# algorithms with full parameter set
741algorithms_countersign = {
742 # maybe needs a different name...
743 "EdDSA on Ed25519": Ed25519(),
744 "ECDSA w/ SHA-256 on P-256": ECDSA_SHA256_P256(),
745}
747algorithms_staticstatic = {
748 "ECDH-SS + HKDF-256": EcdhSsHkdf256(),
749}
751DEFAULT_ALGORITHM = "AES-CCM-16-64-128"
753_hash_backend = cryptography.hazmat.backends.default_backend()
754hashfunctions = {
755 "sha256": hashes.SHA256(),
756 "sha384": hashes.SHA384(),
757 "sha512": hashes.SHA512(),
758}
760DEFAULT_HASHFUNCTION = "sha256"
762DEFAULT_WINDOWSIZE = 32
765class BaseSecurityContext:
766 # Deprecated marker for whether the class uses the
767 # ContextWhereExternalAadIsGroup mixin; see documentation there.
768 external_aad_is_group = False
770 # Authentication information carried with this security context; managed
771 # externally by whatever creates the security context.
772 authenticated_claims: List[str] = []
774 #: AEAD algorithm. This may be None if it is not set in an OSCORE group context.
775 alg_aead: Optional[AeadAlgorithm]
777 #: The common IV of the common context.
778 #:
779 #: This may be longer than needed for constructing IVs with any particular
780 #: algorithm, as per <https://www.ietf.org/archive/id/draft-ietf-core-oscore-groupcomm-23.html#section-2.1.4>
781 common_iv: bytes
783 id_context: Optional[bytes]
785 @property
786 def algorithm(self):
787 warnings.warn(
788 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2
789 )
790 return self.alg_aead
792 @algorithm.setter
793 def algorithm(self, value):
794 warnings.warn(
795 "Property was renamed to 'alg_aead'", DeprecationWarning, stacklevel=2
796 )
797 self.alg_aead = value
799 hashfun: hashes.HashAlgorithm
801 def _construct_nonce(
802 self, partial_iv_short, piv_generator_id, alg: SymmetricEncryptionAlgorithm
803 ):
804 pad_piv = b"\0" * (5 - len(partial_iv_short))
806 s = bytes([len(piv_generator_id)])
807 pad_id = b"\0" * (alg.iv_bytes - 6 - len(piv_generator_id))
809 components = s + pad_id + piv_generator_id + pad_piv + partial_iv_short
811 used_common_iv = self.common_iv[: len(components)]
812 nonce = _xor_bytes(used_common_iv, components)
813 _alglog.debug(
814 "Nonce construction: common %s ^ components %s = %s",
815 self.common_iv.hex(),
816 components.hex(),
817 nonce.hex(),
818 )
820 return nonce
822 def _extract_external_aad(
823 self, message, request_id, local_is_sender: bool
824 ) -> bytes:
825 """Build the serialized external AAD from information in the message
826 and the request_id.
828 Information about whether the local context is the sender of the
829 message is only relevant to group contexts, where it influences whose
830 authentication credentials are placed in the AAD.
831 """
832 # If any option were actually Class I, it would be something like
833 #
834 # the_options = pick some of(message)
835 # class_i_options = Message(the_options).opt.encode()
837 oscore_version = 1
838 class_i_options = b""
839 if request_id.request_hash is not None:
840 class_i_options = Message(request_hash=request_id.request_hash).opt.encode()
842 algorithms: List[int | str | None] = [
843 None if self.alg_aead is None else self.alg_aead.value
844 ]
845 if isinstance(self, ContextWhereExternalAadIsGroup):
846 algorithms.append(
847 None if self.alg_group_enc is None else self.alg_group_enc.value
848 )
849 algorithms.append(
850 None if self.alg_signature is None else self.alg_signature.value
851 )
852 algorithms.append(
853 None
854 if self.alg_pairwise_key_agreement is None
855 else self.alg_pairwise_key_agreement.value
856 )
858 external_aad = [
859 oscore_version,
860 algorithms,
861 request_id.kid,
862 request_id.partial_iv,
863 class_i_options,
864 ]
866 if isinstance(self, ContextWhereExternalAadIsGroup):
867 # FIXME: We may need to carry this over in the request_id when
868 # observation span group rekeyings
869 external_aad.append(self.id_context)
871 assert message.opt.oscore is not None, "Double OSCORE"
872 external_aad.append(message.opt.oscore)
874 if local_is_sender:
875 external_aad.append(self.sender_auth_cred)
876 else:
877 external_aad.append(self.recipient_auth_cred)
878 external_aad.append(self.group_manager_cred)
880 return cbor.dumps(external_aad)
883class ContextWhereExternalAadIsGroup(BaseSecurityContext):
884 """The protection and unprotection functions will use the Group OSCORE AADs
885 rather than the regular OSCORE AADs iff a context uses this mixin. (Ie.
886 alg_group_enc etc are added to the algorithms, and request_kid_context,
887 OSCORE_option, sender_auth_cred and gm_cred are added).
889 This does not necessarily match the is_signing property (as pairwise
890 contexts use this but don't sign), and is distinct from the added OSCORE
891 option in the AAD (as that's only applicable for the external AAD as
892 extracted for signing and signature verification purposes)."""
894 id_context: bytes
896 external_aad_is_group = True
898 alg_group_enc: Optional[SymmetricEncryptionAlgorithm]
899 alg_signature: Optional[AlgorithmCountersign]
900 # This is also of type AlgorithmCountersign because the staticstatic
901 # function is sitting on the same type.
902 alg_pairwise_key_agreement: Optional[AlgorithmCountersign]
904 sender_auth_cred: bytes
905 recipient_auth_cred: bytes
906 group_manager_cred: bytes
909# FIXME pull interface components from SecurityContext up here
910class CanProtect(BaseSecurityContext, metaclass=abc.ABCMeta):
911 # The protection function will add a signature acccording to the context's
912 # alg_signature attribute if this is true
913 is_signing = False
915 # Send the KID when protecting responses
916 #
917 # Once group pairwise mode is implemented, this will need to become a
918 # parameter to protect(), which is stored at the point where the incoming
919 # context is turned into an outgoing context. (Currently, such a mechanism
920 # isn't there yet, and oscore_wrapper protects responses with the very same
921 # context they came in on).
922 responses_send_kid = False
924 #: The KID sent by this party when sending requests, or answering to group
925 #: requests.
926 sender_id: bytes
928 @staticmethod
929 def _compress(protected, unprotected, ciphertext):
930 """Pack the untagged COSE_Encrypt0 object described by the *args
931 into two bytestrings suitable for the Object-Security option and the
932 message body"""
934 if protected:
935 raise RuntimeError(
936 "Protection produced a message that has uncompressable fields."
937 )
939 piv = unprotected.pop(COSE_PIV, b"")
940 if len(piv) > COMPRESSION_BITS_N:
941 raise ValueError("Can't encode overly long partial IV")
943 firstbyte = len(piv)
944 if COSE_KID in unprotected:
945 firstbyte |= COMPRESSION_BIT_K
946 kid_data = unprotected.pop(COSE_KID)
947 else:
948 kid_data = b""
950 if COSE_KID_CONTEXT in unprotected:
951 firstbyte |= COMPRESSION_BIT_H
952 kid_context = unprotected.pop(COSE_KID_CONTEXT)
953 s = len(kid_context)
954 if s > 255:
955 raise ValueError("KID Context too long")
956 s_kid_context = bytes((s,)) + kid_context
957 else:
958 s_kid_context = b""
960 if COSE_COUNTERSIGNATURE0 in unprotected:
961 firstbyte |= COMPRESSION_BIT_GROUP
963 unprotected.pop(COSE_COUNTERSIGNATURE0)
965 # ciphertext will eventually also get the countersignature, but
966 # that happens later when the option is already processed.
968 if unprotected:
969 raise RuntimeError(
970 "Protection produced a message that has uncompressable fields."
971 )
973 if firstbyte:
974 option = bytes([firstbyte]) + piv + s_kid_context + kid_data
975 else:
976 option = b""
978 return (option, ciphertext)
980 def protect(self, message, request_id=None, *, kid_context=True):
981 """Given a plain CoAP message, create a protected message that contains
982 message's options in the inner or outer CoAP message as described in
983 OSCOAP.
985 If the message is a response to a previous message, the additional data
986 from unprotecting the request are passed in as request_id. When
987 request data is present, its partial IV is reused if possible. The
988 security context's ID context is encoded in the resulting message
989 unless kid_context is explicitly set to a False; other values for the
990 kid_context can be passed in as byte string in the same parameter.
991 """
993 _alglog.debug(
994 "Protecting message %s with context %s and request ID %s",
995 message,
996 self,
997 request_id,
998 )
1000 assert (request_id is None) == message.code.is_request(), (
1001 "Requestishness of code to protect does not match presence of request ID"
1002 )
1004 outer_message, plaintext = self._split_message(message, request_id)
1006 protected = {}
1007 nonce = None
1008 partial_iv_generated_by = None
1009 unprotected = {}
1010 if request_id is not None:
1011 partial_iv_generated_by, partial_iv_short = (
1012 request_id.get_reusable_kid_and_piv()
1013 )
1015 alg_symmetric = self.alg_group_enc if self.is_signing else self.alg_aead
1016 assert isinstance(alg_symmetric, AeadAlgorithm) or self.is_signing, (
1017 "Non-AEAD algorithms can only be used in signing modes."
1018 )
1020 if partial_iv_generated_by is None:
1021 nonce, partial_iv_short = self._build_new_nonce(alg_symmetric)
1022 partial_iv_generated_by = self.sender_id
1024 unprotected[COSE_PIV] = partial_iv_short
1025 else:
1026 nonce = self._construct_nonce(
1027 partial_iv_short, partial_iv_generated_by, alg_symmetric
1028 )
1030 if message.code.is_request():
1031 unprotected[COSE_KID] = self.sender_id
1033 request_id = RequestIdentifiers(
1034 self.sender_id,
1035 partial_iv_short,
1036 can_reuse_nonce=None,
1037 request_code=outer_message.code,
1038 )
1040 if kid_context is True:
1041 if self.id_context is not None:
1042 unprotected[COSE_KID_CONTEXT] = self.id_context
1043 elif kid_context is not False:
1044 unprotected[COSE_KID_CONTEXT] = kid_context
1045 else:
1046 if self.responses_send_kid:
1047 unprotected[COSE_KID] = self.sender_id
1049 # Putting in a dummy value as the signature calculation will already need some of the compression result
1050 if self.is_signing:
1051 unprotected[COSE_COUNTERSIGNATURE0] = b""
1052 # FIXME: Running this twice quite needlessly (just to get the oscore option for sending)
1053 option_data, _ = self._compress(protected, unprotected, b"")
1055 outer_message.opt.oscore = option_data
1057 external_aad = self._extract_external_aad(
1058 outer_message, request_id, local_is_sender=True
1059 )
1061 aad = SymmetricEncryptionAlgorithm._build_encrypt0_structure(
1062 protected, external_aad
1063 )
1065 key = self._get_sender_key(outer_message, external_aad, plaintext, request_id)
1067 _alglog.debug("Encrypting Encrypt0:")
1068 _alglog.debug("* aad = %s", aad.hex())
1069 _alglog.debug("* nonce = %s", nonce.hex())
1070 _alglog.debug("* key = %s", log_secret(key.hex()))
1071 _alglog.debug("* algorithm = %s", alg_symmetric)
1072 ciphertext = alg_symmetric.encrypt(plaintext, aad, key, nonce)
1074 _alglog.debug("Produced ciphertext %s", ciphertext.hex())
1076 _, payload = self._compress(protected, unprotected, ciphertext)
1078 if self.is_signing:
1079 signature = self.alg_signature.sign(payload, external_aad, self.private_key)
1080 # This is bordering "it's OK to log it in plain", because a reader
1081 # of the log can access both the plaintext and the ciphertext, but
1082 # still, it is called a key.
1083 _alglog.debug(
1084 "Producing keystream from signature encryption key: %s",
1085 log_secret(self.signature_encryption_key.hex()),
1086 )
1087 keystream = self._kdf_for_keystreams(
1088 partial_iv_generated_by,
1089 partial_iv_short,
1090 self.signature_encryption_key,
1091 self.sender_id,
1092 INFO_TYPE_KEYSTREAM_REQUEST
1093 if message.code.is_request()
1094 else INFO_TYPE_KEYSTREAM_RESPONSE,
1095 )
1096 _alglog.debug("Keystream is %s", keystream.hex())
1097 encrypted_signature = _xor_bytes(signature, keystream)
1098 _alglog.debug("Encrypted signature %s", encrypted_signature.hex())
1099 payload += encrypted_signature
1100 outer_message.payload = payload
1102 # FIXME go through options section
1104 _alglog.debug(
1105 "Protecting the message succeeded, yielding ciphertext %s and request ID %s",
1106 outer_message,
1107 request_id,
1108 )
1109 # the request_id in the second argument should be discarded by the
1110 # caller when protecting a response -- is that reason enough for an
1111 # `if` and returning None?
1112 return outer_message, request_id
1114 def _get_sender_key(self, outer_message, aad, plaintext, request_id):
1115 """Customization hook of the protect function
1117 While most security contexts have a fixed sender key, deterministic
1118 requests need to shake up a few things. They need to modify the outer
1119 message, as well as the request_id as it will later be used to
1120 unprotect the response."""
1121 return self.sender_key
1123 def _split_message(self, message, request_id):
1124 """Given a protected message, return the outer message that contains
1125 all Class I and Class U options (but without payload or Object-Security
1126 option), and the encoded inner message that contains all Class E
1127 options and the payload.
1129 This leaves the messages' remotes unset."""
1131 if message.code.is_request():
1132 outer_host = message.opt.uri_host
1133 proxy_uri = message.opt.proxy_uri
1135 inner_message = message.copy(
1136 uri_host=None,
1137 uri_port=None,
1138 proxy_uri=None,
1139 proxy_scheme=None,
1140 )
1141 inner_message.remote = None
1143 if proxy_uri is not None:
1144 # Use set_request_uri to split up the proxy URI into its
1145 # components; extract, preserve and clear them.
1146 inner_message.set_request_uri(proxy_uri, set_uri_host=False)
1147 if inner_message.opt.proxy_uri is not None:
1148 raise ValueError("Can not split Proxy-URI into options")
1149 outer_uri = inner_message.remote.uri_base
1150 inner_message.remote = None
1151 inner_message.opt.proxy_scheme = None
1153 if message.opt.observe is None:
1154 outer_code = POST
1155 else:
1156 outer_code = FETCH
1157 else:
1158 outer_host = None
1159 proxy_uri = None
1161 inner_message = message.copy()
1163 outer_code = request_id.code_style.response
1165 # no max-age because these are always successsful responses
1166 outer_message = Message(
1167 code=outer_code,
1168 uri_host=outer_host,
1169 observe=None if message.code.is_response() else message.opt.observe,
1170 )
1171 if proxy_uri is not None:
1172 outer_message.set_request_uri(outer_uri)
1174 plaintext = bytes([inner_message.code]) + inner_message.opt.encode()
1175 if inner_message.payload:
1176 plaintext += bytes([0xFF])
1177 plaintext += inner_message.payload
1179 return outer_message, plaintext
1181 def _build_new_nonce(self, alg: SymmetricEncryptionAlgorithm):
1182 """This implements generation of a new nonce, assembled as per Figure 5
1183 of draft-ietf-core-object-security-06. Returns the shortened partial IV
1184 as well."""
1185 seqno = self.new_sequence_number()
1187 partial_iv = seqno.to_bytes(5, "big")
1189 return (
1190 self._construct_nonce(partial_iv, self.sender_id, alg),
1191 partial_iv.lstrip(b"\0") or b"\0",
1192 )
1194 # sequence number handling
1196 def new_sequence_number(self):
1197 """Return a new sequence number; the implementation is responsible for
1198 never returning the same value twice in a given security context.
1200 May raise ContextUnavailable."""
1201 retval = self.sender_sequence_number
1202 if retval >= MAX_SEQNO:
1203 raise ContextUnavailable("Sequence number too large, context is exhausted.")
1204 self.sender_sequence_number += 1
1205 self.post_seqnoincrease()
1206 return retval
1208 # implementation defined
1210 @abc.abstractmethod
1211 def post_seqnoincrease(self):
1212 """Ensure that sender_sequence_number is stored"""
1213 raise
1215 def context_from_response(self, unprotected_bag) -> CanUnprotect:
1216 """When receiving a response to a request protected with this security
1217 context, pick the security context with which to unprotect the response
1218 given the unprotected information from the Object-Security option.
1220 This allow picking the right security context in a group response, and
1221 helps getting a new short-lived context for B.2 mode. The default
1222 behaivor is returning self.
1223 """
1225 # FIXME justify by moving into a mixin for CanProtectAndUnprotect
1226 return self # type: ignore
1229class CanUnprotect(BaseSecurityContext):
1230 recipient_key: bytes
1232 def unprotect(self, protected_message, request_id=None):
1233 _alglog.debug(
1234 "Unprotecting message %s with context %s and request ID %s",
1235 protected_message,
1236 self,
1237 request_id,
1238 )
1240 assert (request_id is not None) == protected_message.code.is_response(), (
1241 "Requestishness of code to unprotect does not match presence of request ID"
1242 )
1243 is_response = protected_message.code.is_response()
1245 # Set to a raisable exception on replay check failures; it will be
1246 # raised, but the package may still be processed in the course of Echo handling.
1247 replay_error = None
1249 protected_serialized, protected, unprotected, ciphertext = (
1250 self._extract_encrypted0(protected_message)
1251 )
1253 if protected:
1254 raise ProtectionInvalid("The protected field is not empty")
1256 # FIXME check for duplicate keys in protected
1258 if unprotected.pop(COSE_KID_CONTEXT, self.id_context) != self.id_context:
1259 # FIXME is this necessary?
1260 raise ProtectionInvalid("Sender ID context does not match")
1262 if unprotected.pop(COSE_KID, self.recipient_id) != self.recipient_id:
1263 # for most cases, this is caught by the session ID dispatch, but in
1264 # responses (where explicit sender IDs are atypical), this is a
1265 # valid check
1266 raise ProtectionInvalid("Sender ID does not match")
1268 if COSE_PIV not in unprotected:
1269 if not is_response:
1270 raise ProtectionInvalid("No sequence number provided in request")
1272 seqno = None # sentinel for not striking out anyting
1273 partial_iv_short = request_id.partial_iv
1274 partial_iv_generated_by = request_id.kid
1275 else:
1276 partial_iv_short = unprotected.pop(COSE_PIV)
1277 partial_iv_generated_by = self.recipient_id
1279 seqno = int.from_bytes(partial_iv_short, "big")
1281 if not is_response:
1282 if not self.recipient_replay_window.is_initialized():
1283 replay_error = ReplayError("Sequence number check unavailable")
1284 elif not self.recipient_replay_window.is_valid(seqno):
1285 replay_error = ReplayError("Sequence number was re-used")
1287 if replay_error is not None and self.echo_recovery is None:
1288 # Don't even try decoding if there is no reason to
1289 raise replay_error
1291 request_id = RequestIdentifiers(
1292 partial_iv_generated_by,
1293 partial_iv_short,
1294 can_reuse_nonce=replay_error is None,
1295 request_code=protected_message.code,
1296 )
1298 external_aad = self._extract_external_aad(
1299 protected_message, request_id, local_is_sender=False
1300 )
1302 if unprotected.pop(COSE_COUNTERSIGNATURE0, None) is not None:
1303 try:
1304 alg_signature = self.alg_signature
1305 except NameError:
1306 raise DecodeError(
1307 "Group messages can not be decoded with this non-group context"
1308 )
1310 siglen = alg_signature.signature_length
1311 if len(ciphertext) < siglen:
1312 raise DecodeError("Message too short for signature")
1313 encrypted_signature = ciphertext[-siglen:]
1315 _alglog.debug(
1316 "Producing keystream from signature encryption key: %s",
1317 log_secret(self.signature_encryption_key.hex()),
1318 )
1319 keystream = self._kdf_for_keystreams(
1320 partial_iv_generated_by,
1321 partial_iv_short,
1322 self.signature_encryption_key,
1323 self.recipient_id,
1324 INFO_TYPE_KEYSTREAM_REQUEST
1325 if protected_message.code.is_request()
1326 else INFO_TYPE_KEYSTREAM_RESPONSE,
1327 )
1328 _alglog.debug("Encrypted signature %s", encrypted_signature.hex())
1329 _alglog.debug("Keystream is %s", keystream.hex())
1330 signature = _xor_bytes(encrypted_signature, keystream)
1332 ciphertext = ciphertext[:-siglen]
1334 alg_signature.verify(
1335 signature, ciphertext, external_aad, self.recipient_public_key
1336 )
1338 alg_symmetric = self.alg_group_enc
1339 else:
1340 alg_symmetric = self.alg_aead
1342 if unprotected:
1343 raise DecodeError("Unsupported unprotected option")
1345 if (
1346 len(ciphertext) < self.alg_aead.tag_bytes + 1
1347 ): # +1 assures access to plaintext[0] (the code)
1348 raise ProtectionInvalid("Ciphertext too short")
1350 enc_structure = ["Encrypt0", protected_serialized, external_aad]
1351 aad = cbor.dumps(enc_structure)
1353 key = self._get_recipient_key(protected_message, alg_symmetric)
1355 nonce = self._construct_nonce(
1356 partial_iv_short, partial_iv_generated_by, alg_symmetric
1357 )
1359 _alglog.debug("Decrypting Encrypt0:")
1360 _alglog.debug("* ciphertext = %s", ciphertext.hex())
1361 _alglog.debug("* aad = %s", aad.hex())
1362 _alglog.debug("* nonce = %s", nonce.hex())
1363 _alglog.debug("* key = %s", log_secret(key.hex()))
1364 _alglog.debug("* algorithm = %s", alg_symmetric)
1365 try:
1366 plaintext = alg_symmetric.decrypt(ciphertext, aad, key, nonce)
1367 except Exception as e:
1368 _alglog.debug("Unprotecting failed")
1369 raise e
1371 self._post_decrypt_checks(
1372 external_aad, plaintext, protected_message, request_id
1373 )
1375 if not is_response and seqno is not None and replay_error is None:
1376 self.recipient_replay_window.strike_out(seqno)
1378 # FIXME add options from unprotected
1380 unprotected_message = Message(code=plaintext[0])
1381 unprotected_message.payload = unprotected_message.opt.decode(plaintext[1:])
1383 try_initialize = (
1384 not self.recipient_replay_window.is_initialized()
1385 and self.echo_recovery is not None
1386 )
1387 if try_initialize:
1388 if protected_message.code.is_request():
1389 # Either accept into replay window and clear replay error, or raise
1390 # something that can turn into a 4.01,Echo response
1391 if unprotected_message.opt.echo == self.echo_recovery:
1392 self.recipient_replay_window.initialize_from_freshlyseen(seqno)
1393 replay_error = None
1394 else:
1395 raise ReplayErrorWithEcho(
1396 secctx=self, request_id=request_id, echo=self.echo_recovery
1397 )
1398 else:
1399 # We can initialize the replay window from a response as well.
1400 # The response is guaranteed fresh as it was AEAD-decoded to
1401 # match a request sent by this process.
1402 #
1403 # This is rare, as it only works when the server uses an own
1404 # sequence number, eg. when sending a notification or when
1405 # acting again on a retransmitted safe request whose response
1406 # it did not cache.
1407 #
1408 # Nothing bad happens if we can't make progress -- we just
1409 # don't initialize the replay window that wouldn't have been
1410 # checked for a response anyway.
1411 if seqno is not None:
1412 self.recipient_replay_window.initialize_from_freshlyseen(seqno)
1414 if replay_error is not None:
1415 raise replay_error
1417 if unprotected_message.code.is_request():
1418 if protected_message.opt.observe != 0:
1419 unprotected_message.opt.observe = None
1420 else:
1421 if protected_message.opt.observe is not None:
1422 # -1 ensures that they sort correctly in later reordering
1423 # detection. Note that neither -1 nor high (>3 byte) sequence
1424 # numbers can be serialized in the Observe option, but they are
1425 # in this implementation accepted for passing around.
1426 unprotected_message.opt.observe = -1 if seqno is None else seqno
1428 _alglog.debug(
1429 "Unprotecting succeeded, yielding plaintext %s and request_id %s",
1430 unprotected_message,
1431 request_id,
1432 )
1433 return unprotected_message, request_id
1435 def _get_recipient_key(
1436 self, protected_message, algorithm: SymmetricEncryptionAlgorithm
1437 ):
1438 """Customization hook of the unprotect function
1440 While most security contexts have a fixed recipient key, group contexts
1441 have multiple, and deterministic requests build it on demand."""
1442 return self.recipient_key
1444 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id):
1445 """Customization hook of the unprotect function after decryption
1447 While most security contexts are good with the default checks,
1448 deterministic requests need to perform additional checks while AAD and
1449 plaintext information is still available, and modify the request_id for
1450 the later protection step of the response."""
1452 @staticmethod
1453 def _uncompress(option_data, payload):
1454 if option_data == b"":
1455 firstbyte = 0
1456 else:
1457 firstbyte = option_data[0]
1458 tail = option_data[1:]
1460 unprotected = {}
1462 if firstbyte & COMPRESSION_BITS_RESERVED:
1463 raise DecodeError("Protected data uses reserved fields")
1465 pivsz = firstbyte & COMPRESSION_BITS_N
1466 if pivsz:
1467 if len(tail) < pivsz:
1468 raise DecodeError("Partial IV announced but not present")
1469 unprotected[COSE_PIV] = tail[:pivsz]
1470 tail = tail[pivsz:]
1472 if firstbyte & COMPRESSION_BIT_H:
1473 # kid context hint
1474 s = tail[0]
1475 if len(tail) - 1 < s:
1476 raise DecodeError("Context hint announced but not present")
1477 tail = tail[1:]
1478 unprotected[COSE_KID_CONTEXT] = tail[:s]
1479 tail = tail[s:]
1481 if firstbyte & COMPRESSION_BIT_K:
1482 kid = tail
1483 unprotected[COSE_KID] = kid
1485 if firstbyte & COMPRESSION_BIT_GROUP:
1486 # Not really; As this is (also) used early on (before the KID
1487 # context is even known, because it's just getting extracted), this
1488 # is returning an incomplete value here and leaves it to the later
1489 # processing to strip the right number of bytes from the ciphertext
1490 unprotected[COSE_COUNTERSIGNATURE0] = PRESENT_BUT_NO_VALUE_YET
1492 return b"", {}, unprotected, payload
1494 @classmethod
1495 def _extract_encrypted0(cls, message):
1496 if message.opt.oscore is None:
1497 raise NotAProtectedMessage("No Object-Security option present", message)
1499 protected_serialized, protected, unprotected, ciphertext = cls._uncompress(
1500 message.opt.oscore, message.payload
1501 )
1502 return protected_serialized, protected, unprotected, ciphertext
1504 # implementation defined
1506 def context_for_response(self) -> CanProtect:
1507 """After processing a request with this context, with which security
1508 context should an outgoing response be protected? By default, it's the
1509 same context."""
1510 # FIXME: Is there any way in which the handler may want to influence
1511 # the decision taken here? Or would, then, the handler just call a more
1512 # elaborate but similar function when setting the response's remote
1513 # already?
1515 # FIXME justify by moving into a mixin for CanProtectAndUnprotect
1516 return self # type: ignore
1519class SecurityContextUtils(BaseSecurityContext):
1520 def _kdf(
1521 self,
1522 salt,
1523 ikm,
1524 role_id,
1525 out_type,
1526 key_alg: Optional[SymmetricEncryptionAlgorithm] = None,
1527 ):
1528 """The HKDF as used to derive sender and recipient key and IV in
1529 RFC8613 Section 3.2.1, and analogously the Group Encryption Key of oscore-groupcomm.
1530 """
1532 _alglog.debug("Deriving through KDF:")
1533 _alglog.debug("* salt = %s", salt.hex() if salt else salt)
1534 _alglog.debug("* ikm = %s", log_secret(ikm.hex()))
1535 _alglog.debug("* role_id = %s", role_id.hex())
1536 _alglog.debug("* out_type = %r", out_type)
1537 _alglog.debug("* key_alg = %r", key_alg)
1539 # The field in info is called `alg_aead` defined in RFC8613, but in
1540 # group OSCORE something that's very clearly *not* alg_aead is put in
1541 # there.
1542 #
1543 # The rules about this come both from
1544 # https://www.ietf.org/archive/id/draft-ietf-core-oscore-groupcomm-23.html#section-2.3
1545 # and
1546 # https://www.ietf.org/archive/id/draft-ietf-core-oscore-groupcomm-23.html#section-2.1.9
1547 # but they produce the same outcome.
1548 if hasattr(self, "alg_group_enc") and self.alg_group_enc is not None:
1549 the_field_called_alg_aead = self.alg_group_enc.value
1550 else:
1551 assert self.alg_aead is not None, (
1552 "At least alg_aead or alg_group_enc needs to be set on a context."
1553 )
1554 the_field_called_alg_aead = self.alg_aead.value
1556 assert (key_alg is None) ^ (out_type == "Key")
1557 if out_type == "Key":
1558 # Duplicate assertion needed while mypy can not see that the assert
1559 # above the if is stricter than this.
1560 assert key_alg is not None
1561 out_bytes = key_alg.key_bytes
1562 the_field_called_alg_aead = key_alg.value
1563 elif out_type == "IV":
1564 assert self.alg_aead is not None, (
1565 "At least alg_aead or alg_group_enc needs to be set on a context."
1566 )
1567 out_bytes = max(
1568 (
1569 a.iv_bytes
1570 for a in [self.alg_aead, getattr(self, "alg_group_enc", None)]
1571 if a is not None
1572 )
1573 )
1574 elif out_type == "SEKey":
1575 assert isinstance(self, GroupContext) and self.alg_group_enc is not None, (
1576 "SEKey derivation is only defined for group contexts with a group encryption algorithm."
1577 )
1578 # "While the obtained Signature Encryption Key is never used with
1579 # the Group Encryption Algorithm, its length was chosen to obtain a
1580 # matching level of security."
1581 out_bytes = self.alg_group_enc.key_bytes
1582 else:
1583 raise ValueError("Output type not recognized")
1585 _alglog.debug("* the_field_called_alg_aead = %s", the_field_called_alg_aead)
1587 info = [
1588 role_id,
1589 self.id_context,
1590 the_field_called_alg_aead,
1591 out_type,
1592 out_bytes,
1593 ]
1594 _alglog.debug("* info = %r", info)
1595 ret = self._kdf_lowlevel(salt, ikm, info, out_bytes)
1596 _alglog.debug("Derivation of %r produced %s", out_type, log_secret(ret.hex()))
1597 return ret
1599 def _kdf_for_keystreams(self, piv_generated_by, salt, ikm, role_id, out_type):
1600 """The HKDF as used to derive the keystreams of oscore-groupcomm."""
1602 out_bytes = self.alg_signature.signature_length
1604 assert out_type in (
1605 INFO_TYPE_KEYSTREAM_REQUEST,
1606 INFO_TYPE_KEYSTREAM_RESPONSE,
1607 ), "Output type not recognized"
1609 info = [
1610 piv_generated_by,
1611 self.id_context,
1612 out_type,
1613 out_bytes,
1614 ]
1615 return self._kdf_lowlevel(salt, ikm, info, out_bytes)
1617 def _kdf_lowlevel(self, salt: bytes, ikm: bytes, info: list, l: int) -> bytes: # noqa: E741 (signature follows RFC definition)
1618 """The HKDF function as used in RFC8613 and oscore-groupcomm (notated
1619 there as ``something = HKDF(...)``
1621 Note that `info` typically contains `L` at some point.
1623 When `info` takes the conventional structure of pid, id_context,
1624 ald_aead, type, L], it may make sense to extend the `_kdf` function to
1625 support that case, or `_kdf_for_keystreams` for a different structure, as
1626 they are the more high-level tools."""
1627 hkdf = HKDF(
1628 algorithm=self.hashfun,
1629 length=l,
1630 salt=salt,
1631 info=cbor.dumps(info),
1632 backend=_hash_backend,
1633 )
1634 expanded = hkdf.derive(ikm)
1635 return expanded
1637 def derive_keys(self, master_salt, master_secret):
1638 """Populate sender_key, recipient_key and common_iv from the algorithm,
1639 hash function and id_context already configured beforehand, and from
1640 the passed salt and secret."""
1642 self.sender_key = self._kdf(
1643 master_salt, master_secret, self.sender_id, "Key", self.alg_aead
1644 )
1645 self.recipient_key = self._kdf(
1646 master_salt, master_secret, self.recipient_id, "Key", self.alg_aead
1647 )
1649 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV")
1651 # really more of the Credentials interface
1653 def get_oscore_context_for(self, unprotected):
1654 """Return a sutiable context (most easily self) for an incoming request
1655 if its unprotected data (COSE_KID, COSE_KID_CONTEXT) fit its
1656 description. If it doesn't match, it returns None.
1658 The default implementation just strictly checks for whether kid and any
1659 kid context match (not matching if a local KID context is set but none
1660 is given in the request); modes like Group OSCORE can spin up aspect
1661 objects here.
1662 """
1663 if (
1664 unprotected.get(COSE_KID, None) == self.recipient_id
1665 and unprotected.get(COSE_KID_CONTEXT, None) == self.id_context
1666 ):
1667 return self
1670class ReplayWindow:
1671 """A regular replay window of a fixed size.
1673 It is implemented as an index and a bitfield (represented by an integer)
1674 whose least significant bit represents the seqyence number of the index,
1675 and a 1 indicates that a number was seen. No shenanigans around implicit
1676 leading ones (think floating point normalization) happen.
1678 >>> w = ReplayWindow(32, lambda: None)
1679 >>> w.initialize_empty()
1680 >>> w.strike_out(5)
1681 >>> w.is_valid(3)
1682 True
1683 >>> w.is_valid(5)
1684 False
1685 >>> w.strike_out(0)
1686 >>> w.strike_out(1)
1687 >>> w.strike_out(2)
1688 >>> w.is_valid(1)
1689 False
1691 Jumping ahead by the window size invalidates older numbers:
1693 >>> w.is_valid(4)
1694 True
1695 >>> w.strike_out(35)
1696 >>> w.is_valid(4)
1697 True
1698 >>> w.strike_out(36)
1699 >>> w.is_valid(4)
1700 False
1702 Usage safety
1703 ------------
1705 For every key, the replay window can only be initielized empty once. On
1706 later uses, it needs to be persisted by storing the output of
1707 self.persist() somewhere and loaded from that persisted data.
1709 It is acceptable to store persistance data in the strike_out_callback, but
1710 that must then ensure that the data is written (flushed to a file or
1711 committed to a database), but that is usually inefficient.
1713 Stability
1714 ---------
1716 This class is not considered for stabilization yet and an implementation
1717 detail of the SecurityContext implementation(s).
1718 """
1720 _index = None
1721 """Sequence number represented by the least significant bit of _bitfield"""
1722 _bitfield = None
1723 """Integer interpreted as a bitfield, self._size wide. A digit 1 at any bit
1724 indicates that the bit's index (its power of 2) plus self._index was
1725 already seen."""
1727 def __init__(self, size, strike_out_callback):
1728 self._size = size
1729 self.strike_out_callback = strike_out_callback
1731 def is_initialized(self):
1732 return self._index is not None
1734 def initialize_empty(self):
1735 self._index = 0
1736 self._bitfield = 0
1738 def initialize_from_persisted(self, persisted):
1739 self._index = persisted["index"]
1740 self._bitfield = persisted["bitfield"]
1742 def initialize_from_freshlyseen(self, seen):
1743 """Initialize the replay window with a particular value that is just
1744 being observed in a fresh (ie. generated by the peer later than any
1745 messages processed before state was lost here) message. This marks the
1746 seen sequence number and all preceding it as invalid, and and all later
1747 ones as valid."""
1748 self._index = seen
1749 self._bitfield = 1
1751 def is_valid(self, number):
1752 if number < self._index:
1753 return False
1754 if number >= self._index + self._size:
1755 return True
1756 return (self._bitfield >> (number - self._index)) & 1 == 0
1758 def strike_out(self, number):
1759 if not self.is_valid(number):
1760 raise ValueError(
1761 "Sequence number is not valid any more and "
1762 "thus can't be removed from the window"
1763 )
1764 overshoot = number - (self._index + self._size - 1)
1765 if overshoot > 0:
1766 self._index += overshoot
1767 self._bitfield >>= overshoot
1768 assert self.is_valid(number), "Sequence number was not valid before strike-out"
1769 self._bitfield |= 1 << (number - self._index)
1771 self.strike_out_callback()
1773 def persist(self):
1774 """Return a dict containing internal state which can be passed to init
1775 to recreated the replay window."""
1777 return {"index": self._index, "bitfield": self._bitfield}
1780class FilesystemSecurityContext(
1781 CanProtect, CanUnprotect, SecurityContextUtils, credentials._Objectish
1782):
1783 """Security context stored in a directory as distinct files containing
1784 containing
1786 * Master secret, master salt, sender and recipient ID,
1787 optionally algorithm, the KDF hash function, and replay window size
1788 (settings.json and secrets.json, where the latter is typically readable
1789 only for the user)
1790 * sequence numbers and replay windows (sequence.json, the only file the
1791 process needs write access to)
1793 The static parameters can all either be placed in settings.json or
1794 secrets.json, but must not be present in both; the presence of either file
1795 is sufficient.
1797 .. warning::
1799 Security contexts must never be copied around and used after another
1800 copy was used. They should only ever be moved, and if they are copied
1801 (eg. as a part of a system backup), restored contexts must not be used
1802 again; they need to be replaced with freshly created ones.
1804 An additional file named `lock` is created to prevent the accidental use of
1805 a context by to concurrent programs.
1807 Note that the sequence number file is updated in an atomic fashion which
1808 requires file creation privileges in the directory. If privilege separation
1809 between settings/key changes and sequence number changes is desired, one
1810 way to achieve that on Linux is giving the aiocoap process's user group
1811 write permissions on the directory and setting the sticky bit on the
1812 directory, thus forbidding the user to remove the settings/secret files not
1813 owned by him.
1815 Writes due to sent sequence numbers are reduced by applying a variation on
1816 the mechanism of RFC8613 Appendix B.1.1 (incrementing the persisted sender
1817 seqence number in steps of `k`). That value is automatically grown from
1818 sequence_number_chunksize_start up to sequence_number_chunksize_limit.
1819 At runtime, the receive window is not stored but kept indeterminate. In
1820 case of an abnormal shutdown, the server uses the mechanism described in
1821 Appendix B.1.2 to recover.
1822 """
1824 # possibly overridden in constructor
1825 #
1826 # Type is ignored because while it *is* AlgAead, mypy can't tell.
1827 alg_aead = algorithms[DEFAULT_ALGORITHM] # type: ignore
1829 class LoadError(ValueError):
1830 """Exception raised with a descriptive message when trying to load a
1831 faulty security context"""
1833 def __init__(
1834 self,
1835 basedir: str,
1836 sequence_number_chunksize_start=10,
1837 sequence_number_chunksize_limit=10000,
1838 ):
1839 self.basedir = basedir
1841 self.lockfile: Optional[filelock.FileLock] = filelock.FileLock(
1842 os.path.join(basedir, "lock")
1843 )
1844 # 0.001: Just fail if it can't be acquired
1845 # See https://github.com/benediktschmitt/py-filelock/issues/57
1846 try:
1847 self.lockfile.acquire(timeout=0.001)
1848 # see https://github.com/PyCQA/pycodestyle/issues/703
1849 except: # noqa: E722
1850 # No lock, no loading, no need to fail in __del__
1851 self.lockfile = None
1852 raise
1854 # Always enabled as committing to a file for every received request
1855 # would be a terrible burden.
1856 self.echo_recovery = secrets.token_bytes(8)
1858 try:
1859 self._load()
1860 except KeyError as k:
1861 raise self.LoadError("Configuration key missing: %s" % (k.args[0],))
1863 self.sequence_number_chunksize_start = sequence_number_chunksize_start
1864 self.sequence_number_chunksize_limit = sequence_number_chunksize_limit
1865 self.sequence_number_chunksize = sequence_number_chunksize_start
1867 self.sequence_number_persisted = self.sender_sequence_number
1869 def _load(self):
1870 # doesn't check for KeyError on every occasion, relies on __init__ to
1871 # catch that
1873 data = {}
1874 for readfile in ("secret.json", "settings.json"):
1875 try:
1876 with open(os.path.join(self.basedir, readfile)) as f:
1877 filedata = json.load(f)
1878 except FileNotFoundError:
1879 continue
1881 for key, value in filedata.items():
1882 if key.endswith("_hex"):
1883 key = key[:-4]
1884 value = binascii.unhexlify(value)
1885 elif key.endswith("_ascii"):
1886 key = key[:-6]
1887 value = value.encode("ascii")
1889 if key in data:
1890 raise self.LoadError(
1891 "Datum %r present in multiple input files at %r."
1892 % (key, self.basedir)
1893 )
1895 data[key] = value
1897 self.alg_aead = algorithms[data.get("algorithm", DEFAULT_ALGORITHM)]
1898 self.hashfun = hashfunctions[data.get("kdf-hashfun", DEFAULT_HASHFUNCTION)]
1900 windowsize = data.get("window", DEFAULT_WINDOWSIZE)
1901 if not isinstance(windowsize, int):
1902 raise self.LoadError("Non-integer replay window")
1904 self.sender_id = data["sender-id"]
1905 self.recipient_id = data["recipient-id"]
1907 if (
1908 max(len(self.sender_id), len(self.recipient_id))
1909 > self.alg_aead.iv_bytes - 6
1910 ):
1911 raise self.LoadError(
1912 "Sender or Recipient ID too long (maximum length %s for this algorithm)"
1913 % (self.alg_aead.iv_bytes - 6)
1914 )
1916 master_secret = data["secret"]
1917 master_salt = data.get("salt", b"")
1918 self.id_context = data.get("id-context", None)
1920 self.derive_keys(master_salt, master_secret)
1922 self.recipient_replay_window = ReplayWindow(
1923 windowsize, self._replay_window_changed
1924 )
1925 try:
1926 with open(os.path.join(self.basedir, "sequence.json")) as f:
1927 sequence = json.load(f)
1928 except FileNotFoundError:
1929 self.sender_sequence_number = 0
1930 self.recipient_replay_window.initialize_empty()
1931 self.replay_window_persisted = True
1932 else:
1933 self.sender_sequence_number = int(sequence["next-to-send"])
1934 received = sequence["received"]
1935 if received == "unknown":
1936 # The replay window will stay uninitialized, which triggers
1937 # Echo recovery
1938 self.replay_window_persisted = False
1939 else:
1940 try:
1941 self.recipient_replay_window.initialize_from_persisted(received)
1942 except (ValueError, TypeError, KeyError):
1943 # Not being particularly careful about what could go wrong: If
1944 # someone tampers with the replay data, we're already in *big*
1945 # trouble, of which I fail to see how it would become worse
1946 # than a crash inside the application around "failure to
1947 # right-shift a string" or that like; at worst it'd result in
1948 # nonce reuse which tampering with the replay window file
1949 # already does.
1950 raise self.LoadError(
1951 "Persisted replay window state was not understood"
1952 )
1953 self.replay_window_persisted = True
1955 # This is called internally whenever a new sequence number is taken or
1956 # crossed out from the window, and blocks a lot; B.1 mode mitigates that.
1957 #
1958 # Making it async and block in a threadpool would mitigate the blocking of
1959 # other messages, but the more visible effect of this will be that no
1960 # matter if sync or async, a reply will need to wait for a file sync
1961 # operation to conclude.
1962 def _store(self):
1963 tmphand, tmpnam = tempfile.mkstemp(
1964 dir=self.basedir, prefix=".sequence-", suffix=".json", text=True
1965 )
1967 data = {"next-to-send": self.sequence_number_persisted}
1968 if not self.replay_window_persisted:
1969 data["received"] = "unknown"
1970 else:
1971 data["received"] = self.recipient_replay_window.persist()
1973 # Using io.open (instead os.fdopen) and binary / write with encode
1974 # rather than dumps as that works even while the interpreter is
1975 # shutting down.
1976 #
1977 # This can be relaxed when there is a defined shutdown sequence for
1978 # security contexts that's triggered from the general context shutdown
1979 # -- but right now, there isn't.
1980 with io.open(tmphand, "wb") as tmpfile:
1981 tmpfile.write(json.dumps(data).encode("utf8"))
1982 tmpfile.flush()
1983 os.fsync(tmpfile.fileno())
1985 os.replace(tmpnam, os.path.join(self.basedir, "sequence.json"))
1987 def _replay_window_changed(self):
1988 if self.replay_window_persisted:
1989 # Just remove the sequence numbers once from the file
1990 self.replay_window_persisted = False
1991 self._store()
1993 def post_seqnoincrease(self):
1994 if self.sender_sequence_number > self.sequence_number_persisted:
1995 self.sequence_number_persisted += self.sequence_number_chunksize
1997 self.sequence_number_chunksize = min(
1998 self.sequence_number_chunksize * 2, self.sequence_number_chunksize_limit
1999 )
2000 # FIXME: this blocks -- see https://github.com/chrysn/aiocoap/issues/178
2001 self._store()
2003 # The = case would only happen if someone deliberately sets all
2004 # numbers to 1 to force persisting on every step
2005 assert self.sender_sequence_number <= self.sequence_number_persisted, (
2006 "Using a sequence number that has been persisted already"
2007 )
2009 def _destroy(self):
2010 """Release the lock file, and ensure tha he object has become
2011 unusable.
2013 If there is unpersisted state from B.1 operation, the actually used
2014 number and replay window gets written back to the file to allow
2015 resumption without wasting digits or round-trips.
2016 """
2017 # FIXME: Arrange for a more controlled shutdown through the credentials
2019 self.replay_window_persisted = True
2020 self.sequence_number_persisted = self.sender_sequence_number
2021 self._store()
2023 del self.sender_key
2024 del self.recipient_key
2026 os.unlink(self.lockfile.lock_file)
2027 self.lockfile.release()
2029 self.lockfile = None
2031 def __del__(self):
2032 if self.lockfile is not None:
2033 self._destroy()
2035 @classmethod
2036 def from_item(cls, init_data):
2037 """Overriding _Objectish's from_item because the parameter name for
2038 basedir is contextfile for historical reasons"""
2040 def constructor(
2041 basedir: Optional[str] = None, contextfile: Optional[str] = None
2042 ):
2043 if basedir is not None and contextfile is not None:
2044 raise credentials.CredentialsLoadError(
2045 "Conflicting arguments basedir and contextfile; just contextfile instead"
2046 )
2047 if basedir is None and contextfile is None:
2048 raise credentials.CredentialsLoadError("Missing item 'basedir'")
2049 if contextfile is not None:
2050 warnings.warn(
2051 "Property contextfile was renamed to basedir in OSCORE credentials entries",
2052 DeprecationWarning,
2053 stacklevel=2,
2054 )
2055 basedir = contextfile
2056 assert (
2057 basedir is not None
2058 ) # This helps mypy which would otherwise not see that the above ensures this already
2059 return cls(basedir)
2061 return credentials._call_from_structureddata(
2062 constructor, cls.__name__, init_data
2063 )
2065 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
2066 return set((self.recipient_id,))
2069class GroupContext(ContextWhereExternalAadIsGroup, BaseSecurityContext):
2070 is_signing = True
2071 responses_send_kid = True
2073 @abc.abstractproperty
2074 def private_key(self):
2075 """Private key used to sign outgoing messages.
2077 Contexts not designed to send messages may raise a RuntimeError here;
2078 that necessity may later go away if some more accurate class modelling
2079 is found."""
2081 @abc.abstractproperty
2082 def recipient_public_key(self):
2083 """Public key used to verify incoming messages.
2085 Contexts not designed to receive messages (because they'd have aspects
2086 for that) may raise a RuntimeError here; that necessity may later go
2087 away if some more accurate class modelling is found."""
2090class SimpleGroupContext(GroupContext, CanProtect, CanUnprotect, SecurityContextUtils):
2091 """A context for an OSCORE group
2093 This is a non-persistable version of a group context that does not support
2094 any group manager or rekeying; it is set up statically at startup.
2096 It is intended for experimentation and demos, but aims to be correct enough
2097 to be usable securely.
2098 """
2100 # set during initialization (making all those attributes rather than
2101 # possibly properties as they might be in super)
2102 sender_id = None # type: ignore
2103 id_context = None # type: ignore
2104 private_key = None
2105 alg_aead = None
2106 hashfun = None # type: ignore
2107 alg_signature = None
2108 alg_group_enc = None
2109 alg_pairwise_key_agreement = None
2110 sender_auth_cred = None # type: ignore
2111 group_manager_cred = None # type: ignore
2112 cred_fmt = None
2113 # This is currently not evaluated, but any GM interaction will need to have this information available.
2114 group_manager_cred_fmt = None
2116 def __init__(
2117 self,
2118 alg_aead,
2119 hashfun,
2120 alg_signature,
2121 alg_group_enc,
2122 alg_pairwise_key_agreement,
2123 group_id,
2124 master_secret,
2125 master_salt,
2126 sender_id,
2127 private_key,
2128 sender_auth_cred,
2129 peers,
2130 group_manager_cred,
2131 cred_fmt=COSE_KCCS,
2132 group_manager_cred_fmt=COSE_KCCS,
2133 ):
2134 self.sender_id = sender_id
2135 self.id_context = group_id
2136 self.private_key = private_key
2137 self.alg_aead = alg_aead
2138 self.hashfun = hashfun
2139 self.alg_signature = alg_signature
2140 self.alg_group_enc = alg_group_enc
2141 self.alg_pairwise_key_agreement = alg_pairwise_key_agreement
2142 self.sender_auth_cred = sender_auth_cred
2143 self.group_manager_cred = group_manager_cred
2144 self.cred_fmt = cred_fmt
2145 self.group_manager_cred_fmt = group_manager_cred_fmt
2147 self.peers = peers.keys()
2148 self.recipient_public_keys = {
2149 k: self._parse_credential(v) for (k, v) in peers.items()
2150 }
2151 self.recipient_auth_creds = peers
2152 self.recipient_replay_windows = {}
2153 for k in self.peers:
2154 # no need to persist, the whole group is ephemeral
2155 w = ReplayWindow(32, lambda: None)
2156 w.initialize_empty()
2157 self.recipient_replay_windows[k] = w
2159 self.derive_keys(master_salt, master_secret)
2160 self.sender_sequence_number = 0
2162 sender_public_key = self._parse_credential(sender_auth_cred)
2163 if (
2164 self.alg_signature.public_from_private(self.private_key)
2165 != sender_public_key
2166 ):
2167 raise ValueError(
2168 "The key in the provided sender credential does not match the private key"
2169 )
2171 def _parse_credential(self, credential: bytes | _DeterministicKey):
2172 """Extract the public key (in the public_key format the respective
2173 AlgorithmCountersign needs) from credentials. This raises a ValueError
2174 if the credentials do not match the group's cred_fmt, or if the
2175 parameters do not match those configured in the group.
2177 This currently discards any information that is present in the
2178 credential that exceeds the key. (In a future version, this could
2179 return both the key and extracted other data, where that other data
2180 would be stored with the peer this is parsed from).
2181 """
2183 if credential is DETERMINISTIC_KEY:
2184 return credential
2186 if self.cred_fmt != COSE_KCCS:
2187 raise ValueError(
2188 "Credential parsing is currently only implemented for CCSs"
2189 )
2191 assert self.alg_signature is not None
2193 return self.alg_signature.from_kccs(credential)
2195 def __repr__(self):
2196 return "<%s with group %r sender_id %r and %d peers>" % (
2197 type(self).__name__,
2198 self.id_context.hex(),
2199 self.sender_id.hex(),
2200 len(self.peers),
2201 )
2203 @property
2204 def recipient_public_key(self):
2205 raise RuntimeError(
2206 "Group context without key indication was used for verification"
2207 )
2209 def _get_sender_key(self, outer_message, aad, plaintext, request_id):
2210 # If we even get here, there has to be a alg_group_enc, and thus the sender key does match it
2211 return self._sender_key
2213 def derive_keys(self, master_salt, master_secret):
2214 the_main_alg = (
2215 self.alg_group_enc if self.alg_group_enc is not None else self.alg_aead
2216 )
2218 self._sender_key = self._kdf(
2219 master_salt, master_secret, self.sender_id, "Key", the_main_alg
2220 )
2221 self.recipient_keys = {
2222 recipient_id: self._kdf(
2223 master_salt, master_secret, recipient_id, "Key", the_main_alg
2224 )
2225 for recipient_id in self.peers
2226 }
2228 self.common_iv = self._kdf(master_salt, master_secret, b"", "IV")
2230 self.signature_encryption_key = self._kdf(
2231 master_salt, master_secret, b"", "SEKey"
2232 )
2234 def post_seqnoincrease(self):
2235 """No-op because it's ephemeral"""
2237 def context_from_response(self, unprotected_bag) -> CanUnprotect:
2238 # sender ID *needs to be* here -- if this were a pairwise request, it
2239 # would not run through here
2240 try:
2241 sender_kid = unprotected_bag[COSE_KID]
2242 except KeyError:
2243 raise DecodeError("Group server failed to send own sender KID")
2245 if COSE_COUNTERSIGNATURE0 in unprotected_bag:
2246 return _GroupContextAspect(self, sender_kid)
2247 else:
2248 return _PairwiseContextAspect(self, sender_kid)
2250 def get_oscore_context_for(self, unprotected):
2251 if unprotected.get(COSE_KID_CONTEXT, None) != self.id_context:
2252 return None
2254 kid = unprotected.get(COSE_KID, None)
2255 if kid in self.peers:
2256 if COSE_COUNTERSIGNATURE0 in unprotected:
2257 return _GroupContextAspect(self, kid)
2258 elif self.recipient_public_keys[kid] is DETERMINISTIC_KEY:
2259 return _DeterministicUnprotectProtoAspect(self, kid)
2260 else:
2261 return _PairwiseContextAspect(self, kid)
2263 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
2264 # not conflicting: groups always send KID Context
2265 return set()
2267 # yet to stabilize...
2269 def pairwise_for(self, recipient_id):
2270 return _PairwiseContextAspect(self, recipient_id)
2272 def for_sending_deterministic_requests(
2273 self, deterministic_id, target_server: Optional[bytes]
2274 ):
2275 return _DeterministicProtectProtoAspect(self, deterministic_id, target_server)
2278class _GroupContextAspect(GroupContext, CanUnprotect, SecurityContextUtils):
2279 """The concrete context this host has with a particular peer
2281 As all actual data is stored in the underlying groupcontext, this acts as
2282 an accessor to that object (which picks the right recipient key).
2284 This accessor is for receiving messages in group mode from a particular
2285 peer; it does not send (and turns into a pairwise context through
2286 context_for_response before it comes to that).
2287 """
2289 def __init__(self, groupcontext: GroupContext, recipient_id: bytes) -> None:
2290 self.groupcontext = groupcontext
2291 self.recipient_id = recipient_id
2293 def __repr__(self):
2294 return "<%s inside %r with the peer %r>" % (
2295 type(self).__name__,
2296 self.groupcontext,
2297 self.recipient_id.hex(),
2298 )
2300 private_key = None
2302 # not inline because the equivalent lambda would not be recognized by mypy
2303 # (workaround for <https://github.com/python/mypy/issues/8083>)
2304 @property
2305 def id_context(self):
2306 return self.groupcontext.id_context
2308 @property
2309 def alg_aead(self):
2310 return self.groupcontext.alg_aead
2312 @property
2313 def alg_signature(self):
2314 return self.groupcontext.alg_signature
2316 @property
2317 def alg_group_enc(self):
2318 return self.groupcontext.alg_group_enc
2320 @property
2321 def alg_pairwise_key_agreement(self):
2322 return self.groupcontext.alg_pairwise_key_agreement
2324 @property
2325 def group_manager_cred(self):
2326 return self.groupcontext.group_manager_cred
2328 @property
2329 def common_iv(self):
2330 return self.groupcontext.common_iv
2332 @property
2333 def hashfun(self):
2334 return self.groupcontext.hashfun
2336 @property
2337 def signature_encryption_key(self):
2338 return self.groupcontext.signature_encryption_key
2340 @property
2341 def recipient_key(self):
2342 # If we even get here, there has to be a alg_group_enc, and thus the recipient key does match it
2343 return self.groupcontext.recipient_keys[self.recipient_id]
2345 @property
2346 def recipient_public_key(self):
2347 return self.groupcontext.recipient_public_keys[self.recipient_id]
2349 @property
2350 def recipient_auth_cred(self):
2351 return self.groupcontext.recipient_auth_creds[self.recipient_id]
2353 @property
2354 def recipient_replay_window(self):
2355 return self.groupcontext.recipient_replay_windows[self.recipient_id]
2357 def context_for_response(self):
2358 return self.groupcontext.pairwise_for(self.recipient_id)
2360 @property
2361 def sender_auth_cred(self):
2362 raise RuntimeError(
2363 "Could relay the sender auth credential from the group context, but it shouldn't matter here"
2364 )
2367class _PairwiseContextAspect(
2368 GroupContext, CanProtect, CanUnprotect, SecurityContextUtils
2369):
2370 is_signing = False
2372 def __init__(self, groupcontext, recipient_id):
2373 self.groupcontext = groupcontext
2374 self.recipient_id = recipient_id
2376 shared_secret = self.alg_pairwise_key_agreement.staticstatic(
2377 self.groupcontext.private_key,
2378 self.groupcontext.recipient_public_keys[recipient_id],
2379 )
2381 self.sender_key = self._kdf(
2382 self.groupcontext._sender_key,
2383 (
2384 self.groupcontext.sender_auth_cred
2385 + self.groupcontext.recipient_auth_creds[recipient_id]
2386 + shared_secret
2387 ),
2388 self.groupcontext.sender_id,
2389 "Key",
2390 self.alg_group_enc if self.is_signing else self.alg_aead,
2391 )
2392 self.recipient_key = self._kdf(
2393 self.groupcontext.recipient_keys[recipient_id],
2394 (
2395 self.groupcontext.recipient_auth_creds[recipient_id]
2396 + self.groupcontext.sender_auth_cred
2397 + shared_secret
2398 ),
2399 self.recipient_id,
2400 "Key",
2401 self.alg_group_enc if self.is_signing else self.alg_aead,
2402 )
2404 def __repr__(self):
2405 return "<%s based on %r with the peer %r>" % (
2406 type(self).__name__,
2407 self.groupcontext,
2408 self.recipient_id.hex(),
2409 )
2411 # FIXME: actually, only to be sent in requests
2413 # not inline because the equivalent lambda would not be recognized by mypy
2414 # (workaround for <https://github.com/python/mypy/issues/8083>)
2415 @property
2416 def id_context(self):
2417 return self.groupcontext.id_context
2419 @property
2420 def alg_aead(self):
2421 return self.groupcontext.alg_aead
2423 @property
2424 def hashfun(self):
2425 return self.groupcontext.hashfun
2427 @property
2428 def alg_signature(self):
2429 return self.groupcontext.alg_signature
2431 @property
2432 def alg_group_enc(self):
2433 return self.groupcontext.alg_group_enc
2435 @property
2436 def alg_pairwise_key_agreement(self):
2437 return self.groupcontext.alg_pairwise_key_agreement
2439 @property
2440 def group_manager_cred(self):
2441 return self.groupcontext.group_manager_cred
2443 @property
2444 def common_iv(self):
2445 return self.groupcontext.common_iv
2447 @property
2448 def sender_id(self):
2449 return self.groupcontext.sender_id
2451 @property
2452 def recipient_auth_cred(self):
2453 return self.groupcontext.recipient_auth_creds[self.recipient_id]
2455 @property
2456 def sender_auth_cred(self):
2457 return self.groupcontext.sender_auth_cred
2459 @property
2460 def recipient_replay_window(self):
2461 return self.groupcontext.recipient_replay_windows[self.recipient_id]
2463 # Set at initialization (making all those attributes rather than
2464 # possibly properties as they might be in super)
2465 recipient_key = None # type: ignore
2466 sender_key = None
2468 @property
2469 def sender_sequence_number(self):
2470 return self.groupcontext.sender_sequence_number
2472 @sender_sequence_number.setter
2473 def sender_sequence_number(self, new):
2474 self.groupcontext.sender_sequence_number = new
2476 def post_seqnoincrease(self):
2477 self.groupcontext.post_seqnoincrease()
2479 # same here -- not needed because not signing
2480 private_key = property(post_seqnoincrease)
2481 recipient_public_key = property(post_seqnoincrease)
2483 def context_from_response(self, unprotected_bag) -> CanUnprotect:
2484 if unprotected_bag.get(COSE_KID, self.recipient_id) != self.recipient_id:
2485 raise DecodeError(
2486 "Response coming from a different server than requested, not attempting to decrypt"
2487 )
2489 if COSE_COUNTERSIGNATURE0 in unprotected_bag:
2490 # It'd be an odd thing to do, but it's source verified, so the
2491 # server hopefully has reasons to make this readable to other group
2492 # members.
2493 return _GroupContextAspect(self.groupcontext, self.recipient_id)
2494 else:
2495 return self
2498class _DeterministicProtectProtoAspect(
2499 ContextWhereExternalAadIsGroup, CanProtect, SecurityContextUtils
2500):
2501 """This implements the sending side of Deterministic Requests.
2503 While simialr to a _PairwiseContextAspect, it only derives the key at
2504 protection time, as the plain text is hashed into the key."""
2506 deterministic_hashfun = hashes.SHA256()
2508 def __init__(self, groupcontext, sender_id, target_server: Optional[bytes]):
2509 self.groupcontext = groupcontext
2510 self.sender_id = sender_id
2511 self.target_server = target_server
2513 def __repr__(self):
2514 return "<%s based on %r with the sender ID %r%s>" % (
2515 type(self).__name__,
2516 self.groupcontext,
2517 self.sender_id.hex(),
2518 "limited to responses from %s" % self.target_server
2519 if self.target_server is not None
2520 else "",
2521 )
2523 def new_sequence_number(self):
2524 return 0
2526 def post_seqnoincrease(self):
2527 pass
2529 def context_from_response(self, unprotected_bag):
2530 if self.target_server is None:
2531 if COSE_KID not in unprotected_bag:
2532 raise DecodeError(
2533 "Server did not send a KID and no particular one was addressed"
2534 )
2535 else:
2536 if unprotected_bag.get(COSE_KID, self.target_server) != self.target_server:
2537 raise DecodeError(
2538 "Response coming from a different server than requested, not attempting to decrypt"
2539 )
2541 if COSE_COUNTERSIGNATURE0 not in unprotected_bag:
2542 # Could just as well pass and later barf when the group context doesn't find a signature
2543 raise DecodeError(
2544 "Response to deterministic request came from unsecure pairwise context"
2545 )
2547 return _GroupContextAspect(
2548 self.groupcontext, unprotected_bag.get(COSE_KID, self.target_server)
2549 )
2551 def _get_sender_key(self, outer_message, aad, plaintext, request_id):
2552 if outer_message.code.is_response():
2553 raise RuntimeError("Deterministic contexts shouldn't protect responses")
2555 basekey = self.groupcontext.recipient_keys[self.sender_id]
2557 h = hashes.Hash(self.deterministic_hashfun)
2558 h.update(basekey)
2559 h.update(aad)
2560 h.update(plaintext)
2561 request_hash = h.finalize()
2563 outer_message.opt.request_hash = request_hash
2564 outer_message.code = FETCH
2566 # By this time, the AADs have all been calculated already; setting this
2567 # for the benefit of the response parsing later
2568 request_id.request_hash = request_hash
2569 # FIXME I don't think this ever comes to bear but want to be sure
2570 # before removing this line (this should only be client-side)
2571 request_id.can_reuse_nonce = False
2572 # FIXME: we're still sending a h'00' PIV. Not wrong, just a wasted byte.
2574 return self._kdf(basekey, request_hash, self.sender_id, "Key", self.alg_aead)
2576 # details needed for various operations, especially eAAD generation
2578 @property
2579 def sender_auth_cred(self):
2580 # When preparing the external_aad, the element 'sender_cred' in the
2581 # aad_array takes the empty CBOR byte string (0x40).
2582 return b""
2584 # not inline because the equivalent lambda would not be recognized by mypy
2585 # (workaround for <https://github.com/python/mypy/issues/8083>)
2586 @property
2587 def alg_aead(self):
2588 return self.groupcontext.alg_aead
2590 @property
2591 def alg_group_enc(self):
2592 return self.groupcontext.alg_group_enc
2594 @property
2595 def alg_pairwise_key_agreement(self):
2596 return self.groupcontext.alg_pairwise_key_agreement
2598 @property
2599 def hashfun(self):
2600 return self.groupcontext.hashfun
2602 @property
2603 def common_iv(self):
2604 return self.groupcontext.common_iv
2606 @property
2607 def id_context(self):
2608 return self.groupcontext.id_context
2610 @property
2611 def alg_signature(self):
2612 return self.groupcontext.alg_signature
2614 @property
2615 def group_manager_cred(self):
2616 return self.groupcontext.group_manager_cred
2619class _DeterministicUnprotectProtoAspect(
2620 ContextWhereExternalAadIsGroup, CanUnprotect, SecurityContextUtils
2621):
2622 """This implements the sending side of Deterministic Requests.
2624 While simialr to a _PairwiseContextAspect, it only derives the key at
2625 unprotection time, based on information given as Request-Hash."""
2627 # Unless None, this is the value by which the running process recognizes
2628 # that the second phase of a B.1.2 replay window recovery Echo option comes
2629 # from the current process, and thus its sequence number is fresh
2630 echo_recovery = None
2632 deterministic_hashfun = hashes.SHA256()
2634 class ZeroIsAlwaysValid:
2635 """Special-purpose replay window that accepts 0 indefinitely"""
2637 def is_initialized(self):
2638 return True
2640 def is_valid(self, number):
2641 # No particular reason to be lax here
2642 return number == 0
2644 def strike_out(self, number):
2645 # FIXME: I'd rather indicate here that it's a potential replay, have the
2646 # request_id.can_reuse_nonce = False
2647 # set here rather than in _post_decrypt_checks, and thus also get
2648 # the check for whether it's a safe method
2649 pass
2651 def persist(self):
2652 pass
2654 def __init__(self, groupcontext, recipient_id):
2655 self.groupcontext = groupcontext
2656 self.recipient_id = recipient_id
2658 self.recipient_replay_window = self.ZeroIsAlwaysValid()
2660 def __repr__(self):
2661 return "<%s based on %r with the recipient ID %r>" % (
2662 type(self).__name__,
2663 self.groupcontext,
2664 self.recipient_id.hex(),
2665 )
2667 def context_for_response(self):
2668 return self.groupcontext
2670 def _get_recipient_key(self, protected_message, algorithm):
2671 logging.critical(
2672 "Deriving recipient key for protected message %s", protected_message
2673 )
2674 return self._kdf(
2675 self.groupcontext.recipient_keys[self.recipient_id],
2676 protected_message.opt.request_hash,
2677 self.recipient_id,
2678 "Key",
2679 algorithm,
2680 )
2682 def _post_decrypt_checks(self, aad, plaintext, protected_message, request_id):
2683 if plaintext[0] not in (GET, FETCH): # FIXME: "is safe"
2684 # FIXME: accept but return inner Unauthorized. (Raising Unauthorized
2685 # here would just create an unprotected Unauthorized, which is not
2686 # what's spec'd for here)
2687 raise ProtectionInvalid("Request was not safe")
2689 basekey = self.groupcontext.recipient_keys[self.recipient_id]
2691 h = hashes.Hash(self.deterministic_hashfun)
2692 h.update(basekey)
2693 h.update(aad)
2694 h.update(plaintext)
2695 request_hash = h.finalize()
2697 if request_hash != protected_message.opt.request_hash:
2698 raise ProtectionInvalid(
2699 "Client's hash of the plaintext diverges from the actual request hash"
2700 )
2702 # This is intended for the protection of the response, and the
2703 # later use in signature in the unprotect function is not happening
2704 # here anyway, neither is the later use for Echo requests
2705 request_id.request_hash = request_hash
2706 request_id.can_reuse_nonce = False
2708 # details needed for various operations, especially eAAD generation
2710 @property
2711 def recipient_auth_cred(self):
2712 # When preparing the external_aad, the element 'sender_cred' in the
2713 # aad_array takes the empty CBOR byte string (0x40).
2714 return b""
2716 # not inline because the equivalent lambda would not be recognized by mypy
2717 # (workaround for <https://github.com/python/mypy/issues/8083>)
2718 @property
2719 def alg_aead(self):
2720 return self.groupcontext.alg_aead
2722 @property
2723 def alg_group_enc(self):
2724 return self.groupcontext.alg_group_enc
2726 @property
2727 def alg_pairwise_key_agreement(self):
2728 return self.groupcontext.alg_pairwise_key_agreement
2730 @property
2731 def hashfun(self):
2732 return self.groupcontext.hashfun
2734 @property
2735 def common_iv(self):
2736 return self.groupcontext.common_iv
2738 @property
2739 def id_context(self):
2740 return self.groupcontext.id_context
2742 @property
2743 def alg_signature(self):
2744 return self.groupcontext.alg_signature
2746 @property
2747 def group_manager_cred(self):
2748 return self.groupcontext.group_manager_cred
2751def verify_start(message):
2752 """Extract the unprotected COSE options from a
2753 message for the verifier to then pick a security context to actually verify
2754 the message. (Future versions may also report fields from both unprotected
2755 and protected, if the protected bag is ever used with OSCORE.).
2757 Call this only requests; for responses, you'll have to know the security
2758 context anyway, and there is usually no information to be gained."""
2760 _, _, unprotected, _ = CanUnprotect._extract_encrypted0(message)
2762 return unprotected
2765_getattr__ = deprecation_getattr(
2766 {
2767 "COSE_COUNTERSINGATURE0": "COSE_COUNTERSIGNATURE0",
2768 "Algorithm": "AeadAlgorithm",
2769 },
2770 globals(),
2771)