Coverage for aiocoap / edhoc.py: 86%
314 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:52 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-22 09:52 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""Internal module containing types used inside EDHOC security contexts"""
7import abc
8import enum
9import io
10from pathlib import Path
11import random
12from typing import Optional, Dict, Literal
13import os
15import cbor2
16import lakers
18from . import oscore, credentials, error
19from . import Message
20from .numbers import POST
21from .numbers.eaditem import EADLabel, grease_labels
24def load_cbor_or_edn(filename: Path):
25 """Common heuristic for whether something is CBOR or EDN"""
26 import cbor_diag
27 import cbor2
29 with filename.open("rb") as binary:
30 try:
31 result = cbor2.load(binary)
32 except cbor2.CBORDecodeError:
33 pass
34 else:
35 if binary.read(1) == b"":
36 return result
37 # else it apparently hasn't been CBOR all through...
38 with filename.open() as textual:
39 try:
40 converted = cbor_diag.diag2cbor(textual.read())
41 except ValueError:
42 raise credentials.CredentialsLoadError(
43 "Data loaded from %s was recognized neither as CBOR nor CBOR Diagnostic Notation (EDN)"
44 % filename
45 )
46 # no need to check for completeness: diag2cbor doesn't do diagnostic
47 # sequences, AIU that's not even a thing
48 return cbor2.loads(converted)
51class CoseKeyForEdhoc:
52 kty: int
53 crv: int
54 d: bytes
56 @classmethod
57 def from_file(cls, filename: Path) -> "CoseKeyForEdhoc":
58 """Load a key from a file (in CBOR or EDN), asserting that the file is not group/world readable"""
59 if filename.stat().st_mode & 0o077 != 0:
60 raise credentials.CredentialsLoadError(
61 "Refusing to load private key that is group or world accessible"
62 )
64 loaded = load_cbor_or_edn(filename)
65 return cls.from_map(loaded)
67 @classmethod
68 def from_map(cls, key: dict) -> "CoseKeyForEdhoc":
69 if not isinstance(key, dict):
70 raise credentials.CredentialsLoadError(
71 "Data is not shaped like COSE_KEY (expected top-level dictionary)"
72 )
73 if 1 not in key:
74 raise credentials.CredentialsLoadError(
75 "Data is not shaped like COSE_KEY (expected key 1 (kty) in top-level dictionary)"
76 )
77 if key[1] != 2:
78 raise credentials.CredentialsLoadError(
79 "Private key type %s is not supported (currently only 2 (EC) is supported)"
80 % (key[1],)
81 )
83 if key.get(-1) != 1:
84 raise credentials.CredentialsLoadError(
85 "Private key of type EC requires key -1 (crv), currently supported values: 1 (P-256)"
86 )
88 if not isinstance(key.get(-4), bytes) or len(key[-4]) != 32:
89 raise credentials.CredentialsLoadError(
90 "Private key of type EC P-256 requires key -4 (d) to be a 32-byte long byte string"
91 )
93 if any(k not in (1, -1, -4) for k in key):
94 raise credentials.CredentialsLoadError(
95 "Extraneous data in key, consider allow-listing the item if acceptable"
96 )
98 s = cls()
99 s.kty = 2 # EC
100 s.crv = 1 # P-256
101 s.d = key[-4]
103 return s
105 def secret_to_map(self) -> dict:
106 # kty: EC, crv: P-256, d: ...
107 return {1: self.kty, -1: self.crv, -4: self.d}
109 # Should we deprecate filename, add a generate_in_file method? (It's there
110 # because generate originally depended on a file system)
111 @classmethod
112 def generate(cls, filename: Optional[Path] = None) -> "CoseKeyForEdhoc":
113 """Generate a key inside a file
115 This returns the generated private key.
116 """
118 from cryptography.hazmat.primitives.asymmetric import ec
120 key = ec.generate_private_key(curve=ec.SECP256R1())
122 s = cls()
123 s.kty = 2 # EC
124 s.crv = 1 # P-256
125 s.d = key.private_numbers().private_value.to_bytes(32, "big")
127 if filename is not None:
128 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
129 if hasattr(os, "O_BINARY"):
130 flags |= os.O_BINARY
131 descriptor = os.open(filename, flags, mode=0o600)
132 try:
133 with open(descriptor, "wb") as keyfile:
134 cbor2.dump(s.secret_to_map(), keyfile)
135 except Exception:
136 filename.unlink()
137 raise
139 return s
141 def as_ccs(
142 self, kid: Optional[bytes], subject: Optional[str]
143 ) -> Dict[Literal[14], dict]:
144 """Given a key, generate a corresponding KCCS"""
146 from cryptography.hazmat.primitives.asymmetric import ec
148 private = ec.derive_private_key(int.from_bytes(self.d, "big"), ec.SECP256R1())
149 public = private.public_key()
151 x = public.public_numbers().x.to_bytes(32, "big")
152 y = public.public_numbers().y.to_bytes(32, "big")
153 # kty: EC2, crv: P-256, x, y
154 cosekey = {1: 2, -1: 1, -2: x, -3: y}
155 if kid is not None:
156 cosekey[2] = kid
157 # cnf: COSE_Key
158 credential_kccs: dict = {8: {1: cosekey}}
159 if subject is not None:
160 credential_kccs[2] = subject
162 # kccs: cnf
163 return {14: credential_kccs}
166class GreaseSettings:
167 """Object that is carried along for an EDHOC exchange to decide whether to
168 apply GREASE.
170 Having such an object allows control over how frequently (or even how) to
171 apply GREASE. Currently, it implements just the recommended pattern of
172 I-D.ietf-lake-edhoc-grease-01, which includes applying GREASE whenever an
173 unprocessed option is found.
174 """
176 def __init__(self):
177 # 6 bit have the 1/64 chance of being all 0
178 self.bits = 6
180 def should_grease(self):
181 """Random function that returns True once every ``2^{self.bits}``
182 times."""
183 return random.getrandbits(self.bits) == 0
185 def grease_ead(self, eads: list[lakers.EADItem]):
186 """Perform the limited-fingerprinting pattern of Section 2.1.1 of
187 draft-ietf-lake-edhoc-grease-01."""
189 if not self.should_grease():
190 return eads
192 label = random.choice(grease_labels)
193 length = random.randrange(9, 41)
195 return [
196 *eads,
197 lakers.EADItem(label, value=random.randbytes(length), is_critical=False),
198 ]
200 def update_from_ead(self, eads: list[lakers.EADItem]):
201 """Increment the likelihood of using own grease from the remaining
202 incoming EAD items (after processable items have been removed).
204 This implementes the SHOULD of I-D.ietf-lake-edhoc-grease-01 Section 2.2."""
205 if len(eads):
206 print(f"Found EAD items {eads}, going to 100%")
207 self.bits = 0
210class EdhocCredentials(credentials._Objectish):
211 own_key: Optional[CoseKeyForEdhoc]
212 suite: int
213 method: int
214 own_cred: Optional[dict]
215 peer_cred: Optional[dict]
217 #: Whether the combined flow should be used when using this credential set.
218 #:
219 #: If unset or None, this the decision is left to the library (which at the
220 #: time of writing always picks True).
221 use_combined_edhoc: Optional[bool]
223 def __init__(
224 self,
225 suite: int,
226 method: int,
227 own_cred_style: Optional[str] = None,
228 peer_cred: Optional[dict] = None,
229 own_cred: Optional[dict] = None,
230 private_key_file: Optional[str] = None,
231 private_key: Optional[dict] = None,
232 use_combined_edhoc: Optional[bool] = None,
233 ):
234 from . import edhoc
236 self.suite = suite
237 self.method = method
238 self.own_cred = own_cred
239 self.peer_cred = peer_cred
240 self.use_combined_edhoc = use_combined_edhoc
242 if private_key_file is not None and private_key is not None:
243 raise credentials.CredentialsLoadError(
244 "private_key is mutually exclusive with private_key_file"
245 )
246 if private_key_file is not None:
247 # FIXME: We should carry around a base
248 private_key_path = Path(private_key_file)
249 # FIXME: We left loading the file to the user, and now we're once more
250 # in a position where we guess the file type
251 self.own_key = CoseKeyForEdhoc.from_file(private_key_path)
252 elif private_key is not None:
253 self.own_key = CoseKeyForEdhoc.from_map(private_key)
254 else:
255 self.own_key = None
257 if own_cred_style is None:
258 self.own_cred_style = None
259 else:
260 self.own_cred_style = edhoc.OwnCredStyle(own_cred_style)
262 if self.own_cred == {"unauthenticated": True}:
263 if self.own_key is not None or self.own_cred_style not in (
264 None,
265 OwnCredStyle.ByValue,
266 ):
267 raise credentials.CredentialsLoadError(
268 "For local unauthenticated use, no key can be give, and own_cred_style needs to be by-value or absent"
269 )
270 self.own_key = CoseKeyForEdhoc.generate()
271 # FIXME: kid and subj should rather not be sent, but lakers insists on their presence
272 self.own_cred = self.own_key.as_ccs(kid=b"?", subject="")
273 self.own_cred_style = OwnCredStyle.ByValue
274 else:
275 if (own_cred is None) != (own_cred_style is None) or (own_cred is None) != (
276 self.own_key is None
277 ):
278 raise credentials.CredentialsLoadError(
279 "If own credentials are given, all of own_cred, own_cred_style and private_key(_path) need to be given"
280 )
282 # FIXME: This is only used on the client side, and expects that all parts (own and peer) are present
283 self._established_context = None
285 def find_edhoc_by_id_cred_peer(self, id_cred_peer):
286 if self.peer_cred is None:
287 return None
288 if 14 not in self.peer_cred:
289 # Only recognizing CCS so far
290 return None
292 if id_cred_peer == cbor2.dumps(self.peer_cred, canonical=True):
293 # credential by value
294 return cbor2.dumps(self.peer_cred[14], canonical=True)
296 # cnf / COS_Key / kid, should be present in all CCS
297 kid = self.peer_cred[14][8][1].get(2)
298 if kid is not None and id_cred_peer == cbor2.dumps({4: kid}, canonical=True):
299 # credential by kid
300 return cbor2.dumps(self.peer_cred[14], canonical=True)
302 def peer_cred_is_unauthenticated(self):
303 # FIXME: This is rather weird internal API, and rather weird
304 # format-wise -- but it will suffice until credentials are rewritten.
305 return self.peer_cred is not None and self.peer_cred == {
306 "unauthenticated": True
307 }
309 async def establish_context(
310 self,
311 wire,
312 underlying_address,
313 underlying_proxy_scheme,
314 underlying_uri_host,
315 logger,
316 ):
317 logger.info(
318 "No OSCORE context found for EDHOC context %r, initiating one.", self
319 )
321 grease_settings = GreaseSettings()
323 # The semantic identifier (an arbitrary string)
324 #
325 # FIXME: We don't support role reversal yet, but once we
326 # register this context to be available for incoming
327 # requests, we'll have to pick more carefully
328 c_i = bytes([random.randint(0, 23)])
329 initiator = lakers.EdhocInitiator()
330 message_1 = initiator.prepare_message_1(
331 c_i,
332 # We could also send this depending on our configuration, ie. not
333 # send it if we do expect credentials. But that'll also reveal to
334 # an extent that we are ready to do TOFU, so sending it
335 # unconditionally is a good first step, also because it allows
336 # peers to migrate towards sending by reference as a default.
337 grease_settings.grease_ead(
338 [lakers.EADItem(EADLabel.CRED_BY_VALUE, is_critical=False)]
339 ),
340 )
342 msg1 = Message(
343 code=POST,
344 proxy_scheme=underlying_proxy_scheme,
345 uri_host=underlying_uri_host,
346 uri_path=[".well-known", "edhoc"],
347 payload=cbor2.dumps(True) + message_1,
348 )
349 msg1.remote = underlying_address
350 msg2 = await wire.request(msg1).response_raising
352 (c_r, id_cred_r, ead_2) = initiator.parse_message_2(msg2.payload)
354 new_ead = []
355 peer_requested_by_value = False
356 for e in ead_2:
357 if e.label() == EADLabel.CRED_BY_VALUE:
358 peer_requested_by_value = True
359 else:
360 # Removing it from the new list not so much for is_critical,
361 # for this item usually is not, but mostly for the
362 # grease_settings
363 new_ead.append(e)
364 ead_2 = new_ead
366 if any(e.is_critical() for e in ead_2):
367 self.log.error("Aborting EDHOC: Critical EAD2 present")
368 raise error.BadRequest
370 grease_settings.update_from_ead(ead_2)
372 assert isinstance(self.own_cred, dict) and list(self.own_cred.keys()) == [14], (
373 "So far can only process CCS style own credentials a la {14: ...}, own_cred = %r"
374 % self.own_cred
375 )
376 cred_i = cbor2.dumps(self.own_cred[14], canonical=True)
377 key_i = self.own_key.d
379 logger.debug("EDHOC responder sent message_2 with ID_CRED_R = %r", id_cred_r)
380 if self.peer_cred_is_unauthenticated():
381 # Not doing further checks (eg. for trailing bytes) or re-raising: This
382 # was already checked by lakers
383 parsed = cbor2.loads(id_cred_r)
385 if 14 not in parsed:
386 raise credentials.CredentialsMissingError(
387 "Peer presented credential-by-reference (or anything else that's not a KCCS) when no credential was pre-agreed"
388 )
390 # We could also pick the [14] out of the serialized stream and
391 # treat it as an opaque item, but cbor2 doesn't really do serialized items.
392 cred_r = cbor2.dumps(parsed[14], canonical=True)
393 else:
394 # We could look into id_cred_r, which is a CBOR encoded
395 # byte string, and could start comparing ... but actually
396 # EDHOC and Lakers protect us from misbinding attacks (is
397 # that what they are called?), so we can just put in our
398 # expected credential here
399 #
400 # FIXME: But looking into it might give us a better error than just
401 # "Mac2 verification failed"
403 # FIXME add assert on the structure or start doing the
404 # generalization that'll fail at startup
405 cred_r = cbor2.dumps(self.peer_cred[14], canonical=True)
407 initiator.verify_message_2(
408 key_i,
409 cred_i,
410 cred_r,
411 ) # odd that we provide that here rather than in the next function
413 logger.debug("Message 2 was verified")
415 secctx = EdhocInitiatorContext(
416 initiator,
417 c_i,
418 c_r,
419 lakers.CredentialTransfer.ByValue
420 if peer_requested_by_value
421 else self.own_cred_style,
422 logger,
423 grease_settings,
424 )
426 if self.use_combined_edhoc is not False:
427 secctx.complete_without_message_4()
428 # That's enough: Message 3 can well be sent along with the next
429 # message.
430 return secctx
432 logger.debug("Sending explicit message 3 without optimization")
434 message_3 = secctx._message_3
435 assert message_3 is not None
436 secctx._message_3 = None
438 if len(c_r) == 1 and (0 <= c_r[0] < 24 or 0x20 <= c_r[0] < 0x38):
439 c_r_encoded = c_r
440 else:
441 c_r_encoded = cbor2.dumps(c_r)
442 msg3 = Message(
443 code=POST,
444 proxy_scheme=underlying_proxy_scheme,
445 uri_host=underlying_uri_host,
446 uri_path=[".well-known", "edhoc"],
447 payload=c_r_encoded + message_3,
448 )
449 msg3.remote = underlying_address
450 msg4 = await wire.request(msg3).response_raising
452 secctx.complete_with_message_4(msg4.payload)
454 logger.debug("Received message 4, context is ready")
456 return secctx
459class _EdhocContextBase(
460 oscore.CanProtect, oscore.CanUnprotect, oscore.SecurityContextUtils
461):
462 def __init__(self, logger, grease_settings):
463 self.log = logger
464 self.grease_settings = grease_settings
466 def post_seqnoincrease(self):
467 # The context is not persisted
468 pass
470 def protect(self, message, request_id=None, *, kid_context=True):
471 outer_message, request_id = super().protect(
472 message, request_id=request_id, kid_context=kid_context
473 )
474 message_3 = self.message_3_to_include()
475 if message_3 is not None:
476 outer_message.opt.edhoc = True
477 outer_message.payload = message_3 + outer_message.payload
478 return outer_message, request_id
480 def _make_ready(self, edhoc_context, c_ours, c_theirs):
481 # FIXME: both should offer this
482 if (
483 isinstance(edhoc_context, lakers.EdhocResponder)
484 or edhoc_context.selected_cipher_suite() == 2
485 ):
486 self.alg_aead = oscore.algorithms["AES-CCM-16-64-128"]
487 self.hashfun = oscore.hashfunctions["sha256"]
488 else:
489 raise RuntimeError("Unknown suite")
491 # we did check for critical EADs, there was no out-of-band agreement, so 8 it is
492 oscore_salt_length = 8
493 # I figure that one would be ageed out-of-band as well (currently no
494 # options to set/change this are known)
495 self.id_context = None
496 self.recipient_replay_window = oscore.ReplayWindow(32, lambda: None)
498 master_secret = edhoc_context.edhoc_exporter(0, [], self.alg_aead.key_bytes)
499 master_salt = edhoc_context.edhoc_exporter(1, [], oscore_salt_length)
501 self.sender_id = c_theirs
502 self.recipient_id = c_ours
503 if self.sender_id == self.recipient_id:
504 raise ValueError("Bad IDs: identical ones were picked")
506 self.derive_keys(master_salt, master_secret)
508 self.sender_sequence_number = 0
509 self.recipient_replay_window.initialize_empty()
511 self.log.debug("EDHOC context %r ready for OSCORE operation", self)
513 @abc.abstractmethod
514 def message_3_to_include(self) -> Optional[bytes]:
515 """An encoded message_3 to include in outgoing messages
517 This may modify self to only return something once."""
520class EdhocInitiatorContext(_EdhocContextBase):
521 """An OSCORE context that is derived from an EDHOC exchange.
523 It does not require that the EDHOC exchange has completed -- it can be set
524 up by an initiator already when message 2 has been received, prepares a
525 message 3 at setup time, and sends it with the first request that is sent
526 through it."""
528 # FIXME: Should we rather send it with *every* request that is sent before a message 4 is received implicitly?
529 def __init__(
530 self, initiator, c_ours, c_theirs, cred_i_mode, logger, grease_settings
531 ):
532 super().__init__(logger, grease_settings)
534 # Only this line is role specific
535 self._message_3, _i_prk_out = initiator.prepare_message_3(
536 cred_i_mode.as_lakers(),
537 self.grease_settings.grease_ead([]),
538 )
540 self._incomplete = True
541 self._init_details = (initiator, c_ours, c_theirs)
543 def complete_without_message_4(self) -> None:
544 (initiator, c_ours, c_theirs) = self._init_details
545 initiator.completed_without_message_4()
546 self._make_ready(initiator, c_ours, c_theirs)
547 self._incomplete = False
548 self._init_details = None
550 def complete_with_message_4(self, message_4: bytes) -> None:
551 (initiator, c_ours, c_theirs) = self._init_details
552 initiator.process_message_4(message_4)
553 self._make_ready(initiator, c_ours, c_theirs)
554 self._incomplete = False
555 self._init_details = None
557 def message_3_to_include(self) -> Optional[bytes]:
558 if self._message_3 is not None:
559 result = self._message_3
560 self._message_3 = None
561 return result
562 return None
565class EdhocResponderContext(_EdhocContextBase):
566 def __init__(
567 self, responder, c_i, c_r, server_credentials, logger, grease_settings
568 ):
569 super().__init__(logger, grease_settings)
571 # storing them where they will later be overwritten with themselves
572 self.recipient_id = c_r
573 self.sender_id = c_i
575 self._responder = responder
576 # Through these we'll look up id_cred_i
577 self._server_credentials = server_credentials
579 self.authenticated_claims = []
581 # Not sure why mypy even tolerates this -- we're clearly not ready for
582 # a general protect/unprotect, and things only work because all
583 # relevant functions get their checks introduced
584 self._incomplete = True
586 self._message_4 = None
588 def message_3_to_include(self) -> Optional[bytes]:
589 # as a responder we never send one
590 return None
592 def get_oscore_context_for(self, unprotected):
593 if oscore.COSE_KID_CONTEXT in unprotected:
594 return None
595 if unprotected.get(oscore.COSE_KID) == self.recipient_id:
596 return self
598 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
599 return set((self.recipient_id,))
601 def protect(self, *args, **kwargs):
602 if self._incomplete:
603 raise RuntimeError(
604 "EDHOC has not completed yet, waiting for message 3, can not protect own messages yet"
605 )
606 return super().protect(*args, **kwargs)
608 def unprotect(self, protected_message, request_id=None):
609 if self._incomplete:
610 if not protected_message.opt.edhoc:
611 self.log.error(
612 "OSCORE failed: No EDHOC message 3 received and none present"
613 )
614 raise error.BadRequest("EDHOC incomplete")
616 payload_stream = io.BytesIO(protected_message.payload)
617 # discarding result -- just need to have a point to split
618 _ = cbor2.load(payload_stream)
619 m3len = payload_stream.tell()
620 message_3 = protected_message.payload[:m3len]
622 self._offer_message_3(message_3)
624 protected_message = protected_message.copy(
625 edhoc=False, payload=protected_message.payload[m3len:]
626 )
628 return super().unprotect(protected_message, request_id)
630 def _offer_message_3(self, message_3: bytes) -> bytes:
631 """Send in a message 3, obtain a message 4
633 It is safe to discard the output, eg. when the CoAP EDHOC option is
634 used."""
635 if self._incomplete:
636 id_cred_i, ead_3 = self._responder.parse_message_3(message_3)
637 if any(e.is_critical() for e in ead_3):
638 self.log.error("Aborting EDHOC: Critical EAD3 present")
639 raise error.BadRequest
640 self.grease_settings.update_from_ead(ead_3)
642 try:
643 (cred_i, claims) = self._server_credentials.find_edhoc_by_id_cred_peer(
644 id_cred_i
645 )
646 except KeyError:
647 self.log.error(
648 "Aborting EDHOC: No credentials found for client with id_cred_i=h'%s'",
649 id_cred_i.hex(),
650 )
651 raise error.BadRequest
653 self.authenticated_claims.extend(claims)
655 self._responder.verify_message_3(cred_i)
656 self._message_4 = self._responder.prepare_message_4(
657 self.grease_settings.grease_ead([])
658 )
660 self._make_ready(self._responder, self.recipient_id, self.sender_id)
661 self._incomplete = False
663 return self._message_4
666class OwnCredStyle(enum.Enum):
667 """Guidance for how the own credential should be sent in an EDHOC
668 exchange"""
670 ByKeyId = "by-key-id"
671 ByValue = "by-value"
673 def as_lakers(self):
674 """Convert the enum into Lakers' reepresentation of the same concept.
676 The types may eventually be unified, but so far, Lakers doesn't make
677 the distinctions we expect to make yet."""
678 if self == self.ByKeyId:
679 # FIXME: Mismatch to be fixed in lakers -- currently the only way
680 # it allows sending by reference is by Key ID
681 return lakers.CredentialTransfer.ByReference
682 if self == self.ByValue:
683 return lakers.CredentialTransfer.ByValue
684 else:
685 raise RuntimeError("enum variant not covered")