Coverage for aiocoap/edhoc.py: 85%
288 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-05 18:37 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-05 18:37 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""Internal module containing types used inside EDHOC security contexts"""
7import abc
8import enum
9import io
10from pathlib import Path
11import random
12from typing import Optional, Dict, Literal
13import os
15import cbor2
16import lakers
18from . import oscore, credentials, error
19from . import Message
20from .numbers import POST
21from .numbers.eaditem import EADLabel
24def load_cbor_or_edn(filename: Path):
25 """Common heuristic for whether something is CBOR or EDN"""
26 import cbor_diag
27 import cbor2
29 with filename.open("rb") as binary:
30 try:
31 result = cbor2.load(binary)
32 except cbor2.CBORDecodeError:
33 pass
34 else:
35 if binary.read(1) == b"":
36 return result
37 # else it apparently hasn't been CBOR all through...
38 with filename.open() as textual:
39 try:
40 converted = cbor_diag.diag2cbor(textual.read())
41 except ValueError:
42 raise credentials.CredentialsLoadError(
43 "Data loaded from %s was recognized neither as CBOR nor CBOR Diagnostic Notation (EDN)"
44 % filename
45 )
46 # no need to check for completeness: diag2cbor doesn't do diagnostic
47 # sequences, AIU that's not even a thing
48 return cbor2.loads(converted)
51class CoseKeyForEdhoc:
52 kty: int
53 crv: int
54 d: bytes
56 @classmethod
57 def from_file(cls, filename: Path) -> "CoseKeyForEdhoc":
58 """Load a key from a file (in CBOR or EDN), asserting that the file is not group/world readable"""
59 if filename.stat().st_mode & 0o077 != 0:
60 raise credentials.CredentialsLoadError(
61 "Refusing to load private key that is group or world accessible"
62 )
64 loaded = load_cbor_or_edn(filename)
65 return cls.from_map(loaded)
67 @classmethod
68 def from_map(cls, key: dict) -> "CoseKeyForEdhoc":
69 if not isinstance(key, dict):
70 raise credentials.CredentialsLoadError(
71 "Data is not shaped like COSE_KEY (expected top-level dictionary)"
72 )
73 if 1 not in key:
74 raise credentials.CredentialsLoadError(
75 "Data is not shaped like COSE_KEY (expected key 1 (kty) in top-level dictionary)"
76 )
77 if key[1] != 2:
78 raise credentials.CredentialsLoadError(
79 "Private key type %s is not supported (currently only 2 (EC) is supported)"
80 % (key[1],)
81 )
83 if key.get(-1) != 1:
84 raise credentials.CredentialsLoadError(
85 "Private key of type EC requires key -1 (crv), currently supported values: 1 (P-256)"
86 )
88 if not isinstance(key.get(-4), bytes) or len(key[-4]) != 32:
89 raise credentials.CredentialsLoadError(
90 "Private key of type EC P-256 requires key -4 (d) to be a 32-byte long byte string"
91 )
93 if any(k not in (1, -1, -4) for k in key):
94 raise credentials.CredentialsLoadError(
95 "Extraneous data in key, consider allow-listing the item if acceptable"
96 )
98 s = cls()
99 s.kty = 2 # EC
100 s.crv = 1 # P-256
101 s.d = key[-4]
103 return s
105 def secret_to_map(self) -> dict:
106 # kty: EC, crv: P-256, d: ...
107 return {1: self.kty, -1: self.crv, -4: self.d}
109 # Should we deprecate filename, add a generate_in_file method? (It's there
110 # because generate originally depended on a file system)
111 @classmethod
112 def generate(cls, filename: Optional[Path] = None) -> "CoseKeyForEdhoc":
113 """Generate a key inside a file
115 This returns the generated private key.
116 """
118 from cryptography.hazmat.primitives.asymmetric import ec
120 key = ec.generate_private_key(curve=ec.SECP256R1())
122 s = cls()
123 s.kty = 2 # EC
124 s.crv = 1 # P-256
125 s.d = key.private_numbers().private_value.to_bytes(32, "big")
127 if filename is not None:
128 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
129 if hasattr(os, "O_BINARY"):
130 flags |= os.O_BINARY
131 descriptor = os.open(filename, flags, mode=0o600)
132 try:
133 with open(descriptor, "wb") as keyfile:
134 cbor2.dump(s.secret_to_map(), keyfile)
135 except Exception:
136 filename.unlink()
137 raise
139 return s
141 def as_ccs(
142 self, kid: Optional[bytes], subject: Optional[str]
143 ) -> Dict[Literal[14], dict]:
144 """Given a key, generate a corresponding KCCS"""
146 from cryptography.hazmat.primitives.asymmetric import ec
148 private = ec.derive_private_key(int.from_bytes(self.d, "big"), ec.SECP256R1())
149 public = private.public_key()
151 x = public.public_numbers().x.to_bytes(32, "big")
152 y = public.public_numbers().y.to_bytes(32, "big")
153 # kty: EC2, crv: P-256, x, y
154 cosekey = {1: 2, -1: 1, -2: x, -3: y}
155 if kid is not None:
156 cosekey[2] = kid
157 # cnf: COSE_Key
158 credential_kccs: dict = {8: {1: cosekey}}
159 if subject is not None:
160 credential_kccs[2] = subject
162 # kccs: cnf
163 return {14: credential_kccs}
166class EdhocCredentials(credentials._Objectish):
167 own_key: Optional[CoseKeyForEdhoc]
168 suite: int
169 method: int
170 own_cred: Optional[dict]
171 peer_cred: Optional[dict]
173 #: Whether the combined flow should be used when using this credential set.
174 #:
175 #: If unset or None, this the decision is left to the library (which at the
176 #: time of writing always picks True).
177 use_combined_edhoc: Optional[bool]
179 def __init__(
180 self,
181 suite: int,
182 method: int,
183 own_cred_style: Optional[str] = None,
184 peer_cred: Optional[dict] = None,
185 own_cred: Optional[dict] = None,
186 private_key_file: Optional[str] = None,
187 private_key: Optional[dict] = None,
188 use_combined_edhoc: Optional[bool] = None,
189 ):
190 from . import edhoc
192 self.suite = suite
193 self.method = method
194 self.own_cred = own_cred
195 self.peer_cred = peer_cred
196 self.use_combined_edhoc = use_combined_edhoc
198 if private_key_file is not None and private_key is not None:
199 raise credentials.CredentialsLoadError(
200 "private_key is mutually exclusive with private_key_file"
201 )
202 if private_key_file is not None:
203 # FIXME: We should carry around a base
204 private_key_path = Path(private_key_file)
205 # FIXME: We left loading the file to the user, and now we're once more
206 # in a position where we guess the file type
207 self.own_key = CoseKeyForEdhoc.from_file(private_key_path)
208 elif private_key is not None:
209 self.own_key = CoseKeyForEdhoc.from_map(private_key)
210 else:
211 self.own_key = None
213 if own_cred_style is None:
214 self.own_cred_style = None
215 else:
216 self.own_cred_style = edhoc.OwnCredStyle(own_cred_style)
218 if self.own_cred == {"unauthenticated": True}:
219 if self.own_key is not None or self.own_cred_style not in (
220 None,
221 OwnCredStyle.ByValue,
222 ):
223 raise credentials.CredentialsLoadError(
224 "For local unauthenticated use, no key can be give, and own_cred_style needs to be by-value or absent"
225 )
226 self.own_key = CoseKeyForEdhoc.generate()
227 # FIXME: kid and subj should rather not be sent, but lakers insists on their presence
228 self.own_cred = self.own_key.as_ccs(kid=b"?", subject="")
229 self.own_cred_style = OwnCredStyle.ByValue
230 else:
231 if (own_cred is None) != (own_cred_style is None) or (own_cred is None) != (
232 self.own_key is None
233 ):
234 raise credentials.CredentialsLoadError(
235 "If own credentials are given, all of own_cred, own_cred_style and private_key(_path) need to be given"
236 )
238 # FIXME: This is only used on the client side, and expects that all parts (own and peer) are present
239 self._established_context = None
241 def find_edhoc_by_id_cred_peer(self, id_cred_peer):
242 if self.peer_cred is None:
243 return None
244 if 14 not in self.peer_cred:
245 # Only recognizing CCS so far
246 return None
248 if id_cred_peer == cbor2.dumps(self.peer_cred, canonical=True):
249 # credential by value
250 return cbor2.dumps(self.peer_cred[14], canonical=True)
252 # cnf / COS_Key / kid, should be present in all CCS
253 kid = self.peer_cred[14][8][1].get(2)
254 if kid is not None and id_cred_peer == cbor2.dumps({4: kid}, canonical=True):
255 # credential by kid
256 return cbor2.dumps(self.peer_cred[14], canonical=True)
258 def peer_cred_is_unauthenticated(self):
259 # FIXME: This is rather weird internal API, and rather weird
260 # format-wise -- but it will suffice until credentials are rewritten.
261 return self.peer_cred is not None and self.peer_cred == {
262 "unauthenticated": True
263 }
265 async def establish_context(
266 self,
267 wire,
268 underlying_address,
269 underlying_proxy_scheme,
270 underlying_uri_host,
271 logger,
272 ):
273 logger.info(
274 "No OSCORE context found for EDHOC context %r, initiating one.", self
275 )
276 # The semantic identifier (an arbitrary string)
277 #
278 # FIXME: We don't support role reversal yet, but once we
279 # register this context to be available for incoming
280 # requests, we'll have to pick more carefully
281 c_i = bytes([random.randint(0, 23)])
282 initiator = lakers.EdhocInitiator()
283 message_1 = initiator.prepare_message_1(
284 c_i,
285 # We could also send this depending on our configuration, ie. not
286 # send it if we do expect credentials. But that'll also reveal to
287 # an extent that we are ready to do TOFU, so sending it
288 # unconditionally is a good first step, also because it allows
289 # peers to migrate towards sending by reference as a default.
290 [lakers.EADItem(EADLabel.CRED_BY_VALUE, is_critical=False)],
291 )
293 msg1 = Message(
294 code=POST,
295 proxy_scheme=underlying_proxy_scheme,
296 uri_host=underlying_uri_host,
297 uri_path=[".well-known", "edhoc"],
298 payload=cbor2.dumps(True) + message_1,
299 )
300 msg1.remote = underlying_address
301 msg2 = await wire.request(msg1).response_raising
303 (c_r, id_cred_r, ead_2) = initiator.parse_message_2(msg2.payload)
304 if any(e.is_critical() for e in ead_2):
305 self.log.error("Aborting EDHOC: Critical EAD2 present")
306 raise error.BadRequest
308 assert isinstance(self.own_cred, dict) and list(self.own_cred.keys()) == [14], (
309 "So far can only process CCS style own credentials a la {14: ...}, own_cred = %r"
310 % self.own_cred
311 )
312 cred_i = cbor2.dumps(self.own_cred[14], canonical=True)
313 key_i = self.own_key.d
315 logger.debug("EDHOC responder sent message_2 with ID_CRED_R = %r", id_cred_r)
316 if self.peer_cred_is_unauthenticated():
317 # Not doing further checks (eg. for trailing bytes) or re-raising: This
318 # was already checked by lakers
319 parsed = cbor2.loads(id_cred_r)
321 if 14 not in parsed:
322 raise credentials.CredentialsMissingError(
323 "Peer presented credential-by-reference (or anything else that's not a KCCS) when no credential was pre-agreed"
324 )
326 # We could also pick the [14] out of the serialized stream and
327 # treat it as an opaque item, but cbor2 doesn't really do serialized items.
328 cred_r = cbor2.dumps(parsed[14], canonical=True)
329 else:
330 # We could look into id_cred_r, which is a CBOR encoded
331 # byte string, and could start comparing ... but actually
332 # EDHOC and Lakers protect us from misbinding attacks (is
333 # that what they are called?), so we can just put in our
334 # expected credential here
335 #
336 # FIXME: But looking into it might give us a better error than just
337 # "Mac2 verification failed"
339 # FIXME add assert on the structure or start doing the
340 # generalization that'll fail at startup
341 cred_r = cbor2.dumps(self.peer_cred[14], canonical=True)
343 initiator.verify_message_2(
344 key_i,
345 cred_i,
346 cred_r,
347 ) # odd that we provide that here rather than in the next function
349 logger.debug("Message 2 was verified")
351 secctx = EdhocInitiatorContext(initiator, c_i, c_r, self.own_cred_style, logger)
353 if self.use_combined_edhoc is not False:
354 secctx.complete_without_message_4()
355 # That's enough: Message 3 can well be sent along with the next
356 # message.
357 return secctx
359 logger.debug("Sending explicit message 3 without optimization")
361 message_3 = secctx._message_3
362 assert message_3 is not None
363 secctx._message_3 = None
365 if len(c_r) == 1 and (0 <= c_r[0] < 24 or 0x20 <= c_r[0] < 0x38):
366 c_r_encoded = c_r
367 else:
368 c_r_encoded = cbor2.dumps(c_r)
369 msg3 = Message(
370 code=POST,
371 proxy_scheme=underlying_proxy_scheme,
372 uri_host=underlying_uri_host,
373 uri_path=[".well-known", "edhoc"],
374 payload=c_r_encoded + message_3,
375 )
376 msg3.remote = underlying_address
377 msg4 = await wire.request(msg3).response_raising
379 secctx.complete_with_message_4(msg4.payload)
381 logger.debug("Received message 4, context is ready")
383 return secctx
386class _EdhocContextBase(
387 oscore.CanProtect, oscore.CanUnprotect, oscore.SecurityContextUtils
388):
389 def __init__(self, logger):
390 self.log = logger
392 def post_seqnoincrease(self):
393 # The context is not persisted
394 pass
396 def protect(self, message, request_id=None, *, kid_context=True):
397 outer_message, request_id = super().protect(
398 message, request_id=request_id, kid_context=kid_context
399 )
400 message_3 = self.message_3_to_include()
401 if message_3 is not None:
402 outer_message.opt.edhoc = True
403 outer_message.payload = message_3 + outer_message.payload
404 return outer_message, request_id
406 def _make_ready(self, edhoc_context, c_ours, c_theirs):
407 # FIXME: both should offer this
408 if (
409 isinstance(edhoc_context, lakers.EdhocResponder)
410 or edhoc_context.selected_cipher_suite() == 2
411 ):
412 self.alg_aead = oscore.algorithms["AES-CCM-16-64-128"]
413 self.hashfun = oscore.hashfunctions["sha256"]
414 else:
415 raise RuntimeError("Unknown suite")
417 # we did check for critical EADs, there was no out-of-band agreement, so 8 it is
418 oscore_salt_length = 8
419 # I figure that one would be ageed out-of-band as well (currently no
420 # options to set/change this are known)
421 self.id_context = None
422 self.recipient_replay_window = oscore.ReplayWindow(32, lambda: None)
424 master_secret = edhoc_context.edhoc_exporter(0, [], self.alg_aead.key_bytes)
425 master_salt = edhoc_context.edhoc_exporter(1, [], oscore_salt_length)
427 self.sender_id = c_theirs
428 self.recipient_id = c_ours
429 if self.sender_id == self.recipient_id:
430 raise ValueError("Bad IDs: identical ones were picked")
432 self.derive_keys(master_salt, master_secret)
434 self.sender_sequence_number = 0
435 self.recipient_replay_window.initialize_empty()
437 self.log.debug("EDHOC context %r ready for OSCORE operation", self)
439 @abc.abstractmethod
440 def message_3_to_include(self) -> Optional[bytes]:
441 """An encoded message_3 to include in outgoing messages
443 This may modify self to only return something once."""
446class EdhocInitiatorContext(_EdhocContextBase):
447 """An OSCORE context that is derived from an EDHOC exchange.
449 It does not require that the EDHOC exchange has completed -- it can be set
450 up by an initiator already when message 2 has been received, prepares a
451 message 3 at setup time, and sends it with the first request that is sent
452 through it."""
454 # FIXME: Should we rather send it with *every* request that is sent before a message 4 is received implicitly?
455 def __init__(self, initiator, c_ours, c_theirs, cred_i_mode, logger):
456 super().__init__(logger)
458 # Only this line is role specific
459 self._message_3, _i_prk_out = initiator.prepare_message_3(
460 cred_i_mode.as_lakers(), None
461 )
463 self._incomplete = True
464 self._init_details = (initiator, c_ours, c_theirs)
466 def complete_without_message_4(self) -> None:
467 (initiator, c_ours, c_theirs) = self._init_details
468 initiator.completed_without_message_4()
469 self._make_ready(initiator, c_ours, c_theirs)
470 self._incomplete = False
471 self._init_details = None
473 def complete_with_message_4(self, message_4: bytes) -> None:
474 (initiator, c_ours, c_theirs) = self._init_details
475 initiator.process_message_4(message_4)
476 self._make_ready(initiator, c_ours, c_theirs)
477 self._incomplete = False
478 self._init_details = None
480 def message_3_to_include(self) -> Optional[bytes]:
481 if self._message_3 is not None:
482 result = self._message_3
483 self._message_3 = None
484 return result
485 return None
488class EdhocResponderContext(_EdhocContextBase):
489 def __init__(self, responder, c_i, c_r, server_credentials, logger):
490 super().__init__(logger)
492 # storing them where they will later be overwritten with themselves
493 self.recipient_id = c_r
494 self.sender_id = c_i
496 self._responder = responder
497 # Through these we'll look up id_cred_i
498 self._server_credentials = server_credentials
500 self.authenticated_claims = []
502 # Not sure why mypy even tolerates this -- we're clearly not ready for
503 # a general protect/unprotect, and things only work because all
504 # relevant functions get their checks introduced
505 self._incomplete = True
507 self._message_4 = None
509 def message_3_to_include(self) -> Optional[bytes]:
510 # as a responder we never send one
511 return None
513 def get_oscore_context_for(self, unprotected):
514 if oscore.COSE_KID_CONTEXT in unprotected:
515 return None
516 if unprotected.get(oscore.COSE_KID) == self.recipient_id:
517 return self
519 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
520 return set((self.recipient_id,))
522 def protect(self, *args, **kwargs):
523 if self._incomplete:
524 raise RuntimeError(
525 "EDHOC has not completed yet, waiting for message 3, can not protect own messages yet"
526 )
527 return super().protect(*args, **kwargs)
529 def unprotect(self, protected_message, request_id=None):
530 if self._incomplete:
531 if not protected_message.opt.edhoc:
532 self.log.error(
533 "OSCORE failed: No EDHOC message 3 received and none present"
534 )
535 raise error.BadRequest("EDHOC incomplete")
537 payload_stream = io.BytesIO(protected_message.payload)
538 # discarding result -- just need to have a point to split
539 _ = cbor2.load(payload_stream)
540 m3len = payload_stream.tell()
541 message_3 = protected_message.payload[:m3len]
543 self._offer_message_3(message_3)
545 protected_message = protected_message.copy(
546 edhoc=False, payload=protected_message.payload[m3len:]
547 )
549 return super().unprotect(protected_message, request_id)
551 def _offer_message_3(self, message_3: bytes) -> bytes:
552 """Send in a message 3, obtain a message 4
554 It is safe to discard the output, eg. when the CoAP EDHOC option is
555 used."""
556 if self._incomplete:
557 id_cred_i, ead_3 = self._responder.parse_message_3(message_3)
558 if any(e.is_critical() for e in ead_3):
559 self.log.error("Aborting EDHOC: Critical EAD3 present")
560 raise error.BadRequest
562 try:
563 (cred_i, claims) = self._server_credentials.find_edhoc_by_id_cred_peer(
564 id_cred_i
565 )
566 except KeyError:
567 self.log.error(
568 "Aborting EDHOC: No credentials found for client with id_cred_i=h'%s'",
569 id_cred_i.hex(),
570 )
571 raise error.BadRequest
573 self.authenticated_claims.extend(claims)
575 self._responder.verify_message_3(cred_i)
576 self._message_4 = self._responder.prepare_message_4()
578 self._make_ready(self._responder, self.recipient_id, self.sender_id)
579 self._incomplete = False
581 return self._message_4
584class OwnCredStyle(enum.Enum):
585 """Guidance for how the own credential should be sent in an EDHOC
586 exchange"""
588 ByKeyId = "by-key-id"
589 ByValue = "by-value"
591 def as_lakers(self):
592 """Convert the enum into Lakers' reepresentation of the same concept.
594 The types may eventually be unified, but so far, Lakers doesn't make
595 the distinctions we expect to make yet."""
596 if self == self.ByKeyId:
597 # FIXME: Mismatch to be fixed in lakers -- currently the only way
598 # it allows sending by reference is by Key ID
599 return lakers.CredentialTransfer.ByReference
600 if self == self.ByValue:
601 return lakers.CredentialTransfer.ByValue
602 else:
603 raise RuntimeError("enum variant not covered")