Coverage for aiocoap/edhoc.py: 0%
296 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-18 17:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-18 17:48 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""Internal module containing types used inside EDHOC security contexts"""
7import abc
8import enum
9import io
10from pathlib import Path
11import random
12from typing import Optional, Dict, Literal
13import os
15import cbor2
16import lakers
18from . import oscore, credentials, error
19from . import Message
20from .numbers import POST
23def load_cbor_or_edn(filename: Path):
24 """Common heuristic for whether something is CBOR or EDN"""
25 import cbor_diag
26 import cbor2
28 with filename.open("rb") as binary:
29 try:
30 result = cbor2.load(binary)
31 except cbor2.CBORDecodeError:
32 pass
33 else:
34 if binary.read(1) == b"":
35 return result
36 # else it apparently hasn't been CBOR all through...
37 with filename.open() as textual:
38 try:
39 converted = cbor_diag.diag2cbor(textual.read())
40 except ValueError:
41 raise credentials.CredentialsLoadError(
42 "Data loaded from %s was recognized neither as CBOR nor CBOR Diagnostic Notation (EDN)"
43 % filename
44 )
45 # no need to check for completeness: diag2cbor doesn't do diagnostic
46 # sequences, AIU that's not even a thing
47 return cbor2.loads(converted)
50class CoseKeyForEdhoc:
51 kty: int
52 crv: int
53 d: bytes
55 @classmethod
56 def from_file(cls, filename: Path) -> "CoseKeyForEdhoc":
57 """Load a key from a file (in CBOR or EDN), asserting that the file is not group/world readable"""
58 if filename.stat().st_mode & 0o077 != 0:
59 raise credentials.CredentialsLoadError(
60 "Refusing to load private key that is group or world accessible"
61 )
63 loaded = load_cbor_or_edn(filename)
64 return cls.from_map(loaded)
66 @classmethod
67 def from_map(cls, key: dict) -> "CoseKeyForEdhoc":
68 if not isinstance(key, dict):
69 raise credentials.CredentialsLoadError(
70 "Data is not shaped like COSE_KEY (expected top-level dictionary)"
71 )
72 if 1 not in key:
73 raise credentials.CredentialsLoadError(
74 "Data is not shaped like COSE_KEY (expected key 1 (kty) in top-level dictionary)"
75 )
76 if key[1] != 2:
77 raise credentials.CredentialsLoadError(
78 "Private key type %s is not supported (currently only 2 (EC) is supported)"
79 % (key[1],)
80 )
82 if key.get(-1) != 1:
83 raise credentials.CredentialsLoadError(
84 "Private key of type EC requires key -1 (crv), currently supported values: 1 (P-256)"
85 )
87 if not isinstance(key.get(-4), bytes) or len(key[-4]) != 32:
88 raise credentials.CredentialsLoadError(
89 "Private key of type EC P-256 requires key -4 (d) to be a 32-byte long byte string"
90 )
92 if any(k not in (1, -1, -4) for k in key):
93 raise credentials.CredentialsLoadError(
94 "Extraneous data in key, consider allow-listing the item if acceptable"
95 )
97 s = cls()
98 s.kty = 2 # EC
99 s.crv = 1 # P-256
100 s.d = key[-4]
102 return s
104 def secret_to_map(self) -> dict:
105 # kty: EC, crv: P-256, d: ...
106 return {1: self.kty, -1: self.crv, -4: self.d}
108 # Should we deprecate filename, add a generate_in_file method? (It's there
109 # because generate originally depended on a file system)
110 @classmethod
111 def generate(cls, filename: Optional[Path] = None) -> "CoseKeyForEdhoc":
112 """Generate a key inside a file
114 This returns the generated private key.
115 """
117 from cryptography.hazmat.primitives.asymmetric import ec
119 key = ec.generate_private_key(curve=ec.SECP256R1())
121 s = cls()
122 s.kty = 2 # EC
123 s.crv = 1 # P-256
124 s.d = key.private_numbers().private_value.to_bytes(32, "big")
126 if filename is not None:
127 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
128 if hasattr(os, "O_BINARY"):
129 flags |= os.O_BINARY
130 descriptor = os.open(filename, flags, mode=0o600)
131 try:
132 with open(descriptor, "wb") as keyfile:
133 cbor2.dump(s.secret_to_map(), keyfile)
134 except Exception:
135 filename.unlink()
136 raise
138 return s
140 def as_ccs(
141 self, kid: Optional[bytes], subject: Optional[str]
142 ) -> Dict[Literal[14], dict]:
143 """Given a key, generate a corresponding KCCS"""
145 from cryptography.hazmat.primitives.asymmetric import ec
147 private = ec.derive_private_key(int.from_bytes(self.d, "big"), ec.SECP256R1())
148 public = private.public_key()
150 x = public.public_numbers().x.to_bytes(32, "big")
151 y = public.public_numbers().y.to_bytes(32, "big")
152 # kty: EC2, crv: P-256, x, y
153 cosekey = {1: 2, -1: 1, -2: x, -3: y}
154 if kid is not None:
155 cosekey[2] = kid
156 # cnf: COSE_Key
157 credential_kccs: dict = {8: {1: cosekey}}
158 if subject is not None:
159 credential_kccs[2] = subject
161 # kccs: cnf
162 return {14: credential_kccs}
165class EdhocCredentials(credentials._Objectish):
166 own_key: Optional[CoseKeyForEdhoc]
167 suite: int
168 method: int
169 own_cred: Optional[dict]
170 peer_cred: Optional[dict]
172 #: Whether the combined flow should be used when using this credential set.
173 #:
174 #: If unset or None, this the decision is left to the library (which at the
175 #: time of writing always picks True).
176 use_combined_edhoc: Optional[bool]
178 def __init__(
179 self,
180 suite: int,
181 method: int,
182 own_cred_style: Optional[str] = None,
183 peer_cred: Optional[dict] = None,
184 own_cred: Optional[dict] = None,
185 private_key_file: Optional[str] = None,
186 private_key: Optional[dict] = None,
187 use_combined_edhoc: Optional[bool] = None,
188 ):
189 from . import edhoc
191 self.suite = suite
192 self.method = method
193 self.own_cred = own_cred
194 self.peer_cred = peer_cred
195 self.use_combined_edhoc = use_combined_edhoc
197 if private_key_file is not None and private_key is not None:
198 raise credentials.CredentialsLoadError(
199 "private_key is mutually exclusive with private_key_file"
200 )
201 if private_key_file is not None:
202 # FIXME: We should carry around a base
203 private_key_path = Path(private_key_file)
204 # FIXME: We left loading the file to the user, and now we're once more
205 # in a position where we guess the file type
206 self.own_key = CoseKeyForEdhoc.from_file(private_key_path)
207 elif private_key is not None:
208 self.own_key = CoseKeyForEdhoc.from_map(private_key)
209 else:
210 self.own_key = None
212 if own_cred_style is None:
213 self.own_cred_style = None
214 else:
215 self.own_cred_style = edhoc.OwnCredStyle(own_cred_style)
217 if self.own_cred == {"unauthenticated": True}:
218 if self.own_key is not None or self.own_cred_style not in (
219 None,
220 OwnCredStyle.ByValue,
221 ):
222 raise credentials.CredentialsLoadError(
223 "For local unauthenticated use, no key can be give, and own_cred_style needs to be by-value or absent"
224 )
225 self.own_key = CoseKeyForEdhoc.generate()
226 # FIXME: kid and subj should rather not be sent, but lakers insists on their presence
227 self.own_cred = self.own_key.as_ccs(kid=b"?", subject="")
228 self.own_cred_style = OwnCredStyle.ByValue
229 else:
230 if (own_cred is None) != (own_cred_style is None) or (own_cred is None) != (
231 self.own_key is None
232 ):
233 raise credentials.CredentialsLoadError(
234 "If own credentials are given, all of own_cred, own_cred_style and private_key(_path) need to be given"
235 )
237 # FIXME: This is only used on the client side, and expects that all parts (own and peer) are present
238 self._established_context = None
240 def find_edhoc_by_id_cred_peer(self, id_cred_peer):
241 if self.peer_cred is None:
242 return None
243 if 14 not in self.peer_cred:
244 # Only recognizing CCS so far
245 return None
247 if id_cred_peer == cbor2.dumps(self.peer_cred, canonical=True):
248 # credential by value
249 return cbor2.dumps(self.peer_cred[14], canonical=True)
251 # cnf / COS_Key / kid, should be present in all CCS
252 kid = self.peer_cred[14][8][1].get(2)
253 if kid is not None and id_cred_peer == cbor2.dumps({4: kid}, canonical=True):
254 # credential by kid
255 return cbor2.dumps(self.peer_cred[14], canonical=True)
257 def peer_cred_is_unauthenticated(self):
258 # FIXME: This is rather weird internal API, and rather weird
259 # format-wise -- but it will suffice until credentials are rewritten.
260 return self.peer_cred is not None and self.peer_cred == {
261 "unauthenticated": True
262 }
264 async def establish_context(
265 self,
266 wire,
267 underlying_address,
268 underlying_proxy_scheme,
269 underlying_uri_host,
270 logger,
271 ):
272 logger.info(
273 "No OSCORE context found for EDHOC context %r, initiating one.", self
274 )
275 # The semantic identifier (an arbitrary string)
276 #
277 # FIXME: We don't support role reversal yet, but once we
278 # register this context to be available for incoming
279 # requests, we'll have to pick more carefully
280 c_i = bytes([random.randint(0, 23)])
281 initiator = lakers.EdhocInitiator()
282 message_1 = initiator.prepare_message_1(c_i)
284 msg1 = Message(
285 code=POST,
286 proxy_scheme=underlying_proxy_scheme,
287 uri_host=underlying_uri_host,
288 uri_path=[".well-known", "edhoc"],
289 payload=cbor2.dumps(True) + message_1,
290 )
291 msg1.remote = underlying_address
292 msg2 = await wire.request(msg1).response_raising
294 (c_r, id_cred_r, ead_2) = initiator.parse_message_2(msg2.payload)
295 if any(e.is_critical() for e in ead_2):
296 self.log.error("Aborting EDHOC: Critical EAD2 present")
297 raise error.BadRequest
299 assert isinstance(self.own_cred, dict) and list(self.own_cred.keys()) == [14], (
300 "So far can only process CCS style own credentials a la {14: ...}, own_cred = %r"
301 % self.own_cred
302 )
303 cred_i = cbor2.dumps(self.own_cred[14], canonical=True)
304 key_i = self.own_key.d
306 logger.debug("EDHOC responder sent message_2 with ID_CRED_R = %r", id_cred_r)
307 if self.peer_cred_is_unauthenticated():
308 # Not doing further checks (eg. for trailing bytes) or re-raising: This
309 # was already checked by lakers
310 parsed = cbor2.loads(id_cred_r)
312 if 14 not in parsed:
313 raise credentials.CredentialsMissingError(
314 "Peer presented credential-by-reference (or anything else that's not a KCCS) when no credential was pre-agreed"
315 )
317 # We could also pick the [14] out of the serialized stream and
318 # treat it as an opaque item, but cbor2 doesn't really do serialized items.
319 cred_r = cbor2.dumps(parsed[14], canonical=True)
320 else:
321 # We could look into id_cred_r, which is a CBOR encoded
322 # byte string, and could start comparing ... but actually
323 # EDHOC and Lakers protect us from misbinding attacks (is
324 # that what they are called?), so we can just put in our
325 # expected credential here
326 #
327 # FIXME: But looking into it might give us a better error than just
328 # "Mac2 verification failed"
330 # FIXME add assert on the structure or start doing the
331 # generalization that'll fail at startup
332 cred_r = cbor2.dumps(self.peer_cred[14], canonical=True)
334 initiator.verify_message_2(
335 key_i,
336 cred_i,
337 cred_r,
338 ) # odd that we provide that here rather than in the next function
340 logger.debug("Message 2 was verified")
342 secctx = EdhocInitiatorContext(initiator, c_i, c_r, self.own_cred_style, logger)
344 if self.use_combined_edhoc is not False:
345 secctx.complete_without_message_4()
346 # That's enough: Message 3 can well be sent along with the next
347 # message.
348 return secctx
350 logger.debug("Sending explicit message 3 without optimization")
352 message_3 = secctx._message_3
353 assert message_3 is not None
354 secctx._message_3 = None
356 if len(c_r) == 1 and (0 <= c_r[0] < 24 or 0x20 <= c_r[0] < 0x38):
357 c_r_encoded = c_r
358 else:
359 c_r_encoded = cbor2.dumps(c_r)
360 msg3 = Message(
361 code=POST,
362 proxy_scheme=underlying_proxy_scheme,
363 uri_host=underlying_uri_host,
364 uri_path=[".well-known", "edhoc"],
365 payload=c_r_encoded + message_3,
366 )
367 msg3.remote = underlying_address
368 msg4 = await wire.request(msg3).response_raising
370 secctx.complete_with_message_4(msg4.payload)
372 logger.debug("Received message 4, context is ready")
374 return secctx
377class _EdhocContextBase(
378 oscore.CanProtect, oscore.CanUnprotect, oscore.SecurityContextUtils
379):
380 def __init__(self, logger):
381 self.log = logger
383 def post_seqnoincrease(self):
384 # The context is not persisted
385 pass
387 def protect(self, message, request_id=None, *, kid_context=True):
388 outer_message, request_id = super().protect(
389 message, request_id=request_id, kid_context=kid_context
390 )
391 message_3 = self.message_3_to_include()
392 if message_3 is not None:
393 outer_message.opt.edhoc = True
394 outer_message.payload = message_3 + outer_message.payload
395 return outer_message, request_id
397 def _make_ready(self, edhoc_context, c_ours, c_theirs):
398 # FIXME: both should offer this
399 if (
400 isinstance(edhoc_context, lakers.EdhocResponder)
401 or edhoc_context.selected_cipher_suite() == 2
402 ):
403 self.alg_aead = oscore.algorithms["AES-CCM-16-64-128"]
404 self.hashfun = oscore.hashfunctions["sha256"]
405 else:
406 raise RuntimeError("Unknown suite")
408 # we did check for critical EADs, there was no out-of-band agreement, so 8 it is
409 oscore_salt_length = 8
410 # I figure that one would be ageed out-of-band as well (currently no
411 # options to set/change this are known)
412 self.id_context = None
413 self.recipient_replay_window = oscore.ReplayWindow(32, lambda: None)
415 master_secret = edhoc_context.edhoc_exporter(0, [], self.alg_aead.key_bytes)
416 master_salt = edhoc_context.edhoc_exporter(1, [], oscore_salt_length)
418 self.sender_id = c_theirs
419 self.recipient_id = c_ours
420 if self.sender_id == self.recipient_id:
421 raise ValueError("Bad IDs: identical ones were picked")
423 self.derive_keys(master_salt, master_secret)
425 self.sender_sequence_number = 0
426 self.recipient_replay_window.initialize_empty()
428 self.log.debug("EDHOC context %r ready for OSCORE operation", self)
430 @abc.abstractmethod
431 def message_3_to_include(self) -> Optional[bytes]:
432 """An encoded message_3 to include in outgoing messages
434 This may modify self to only return something once."""
437class EdhocInitiatorContext(_EdhocContextBase):
438 """An OSCORE context that is derived from an EDHOC exchange.
440 It does not require that the EDHOC exchange has completed -- it can be set
441 up by an initiator already when message 2 has been received, prepares a
442 message 3 at setup time, and sends it with the first request that is sent
443 through it."""
445 # FIXME: Should we rather send it with *every* request that is sent before a message 4 is received implicitly?
446 def __init__(self, initiator, c_ours, c_theirs, cred_i_mode, logger):
447 super().__init__(logger)
449 # Only this line is role specific
450 self._message_3, _i_prk_out = initiator.prepare_message_3(
451 cred_i_mode.as_lakers(), None
452 )
454 self._incomplete = True
455 self._init_details = (initiator, c_ours, c_theirs)
457 def complete_without_message_4(self) -> None:
458 (initiator, c_ours, c_theirs) = self._init_details
459 initiator.completed_without_message_4()
460 self._make_ready(initiator, c_ours, c_theirs)
461 self._incomplete = False
462 self._init_details = None
464 def complete_with_message_4(self, message_4: bytes) -> None:
465 (initiator, c_ours, c_theirs) = self._init_details
466 initiator.process_message_4(message_4)
467 self._make_ready(initiator, c_ours, c_theirs)
468 self._incomplete = False
469 self._init_details = None
471 def message_3_to_include(self) -> Optional[bytes]:
472 if self._message_3 is not None:
473 result = self._message_3
474 self._message_3 = None
475 return result
476 return None
479class EdhocResponderContext(_EdhocContextBase):
480 def __init__(self, responder, c_i, c_r, server_credentials, logger):
481 super().__init__(logger)
483 # storing them where they will later be overwritten with themselves
484 self.recipient_id = c_r
485 self.sender_id = c_i
487 self._responder = responder
488 # Through these we'll look up id_cred_i
489 self._server_credentials = server_credentials
491 self.authenticated_claims = []
493 # Not sure why mypy even tolerates this -- we're clearly not ready for
494 # a general protect/unprotect, and things only work because all
495 # relevant functions get their checks introduced
496 self._incomplete = True
498 self._message_4 = None
500 def message_3_to_include(self) -> Optional[bytes]:
501 # as a responder we never send one
502 return None
504 def get_oscore_context_for(self, unprotected):
505 if oscore.COSE_KID_CONTEXT in unprotected:
506 return None
507 if unprotected.get(oscore.COSE_KID) == self.recipient_id:
508 return self
510 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
511 return set((self.recipient_id,))
513 def protect(self, *args, **kwargs):
514 if self._incomplete:
515 raise RuntimeError(
516 "EDHOC has not completed yet, waiting for message 3, can not protect own messages yet"
517 )
518 return super().protect(*args, **kwargs)
520 def unprotect(self, protected_message, request_id=None):
521 if self._incomplete:
522 if not protected_message.opt.edhoc:
523 self.log.error(
524 "OSCORE failed: No EDHOC message 3 received and none present"
525 )
526 raise error.BadRequest("EDHOC incomplete")
528 payload_stream = io.BytesIO(protected_message.payload)
529 # discarding result -- just need to have a point to split
530 _ = cbor2.load(payload_stream)
531 m3len = payload_stream.tell()
532 message_3 = protected_message.payload[:m3len]
534 self._offer_message_3(message_3)
536 protected_message = protected_message.copy(
537 edhoc=False, payload=protected_message.payload[m3len:]
538 )
540 return super().unprotect(protected_message, request_id)
542 def _offer_message_3(self, message_3: bytes) -> bytes:
543 """Send in a message 3, obtain a message 4
545 It is safe to discard the output, eg. when the CoAP EDHOC option is
546 used."""
547 if self._incomplete:
548 id_cred_i, ead_3 = self._responder.parse_message_3(message_3)
549 if any(e.is_critical() for e in ead_3):
550 self.log.error("Aborting EDHOC: Critical EAD3 present")
551 raise error.BadRequest
553 try:
554 (cred_i, claims) = self._server_credentials.find_edhoc_by_id_cred_peer(
555 id_cred_i
556 )
557 except KeyError:
558 self.log.error(
559 "Aborting EDHOC: No credentials found for client with id_cred_i=h'%s'",
560 id_cred_i.hex(),
561 )
562 raise error.BadRequest
564 self.authenticated_claims.extend(claims)
566 self._responder.verify_message_3(cred_i)
567 self._message_4 = self._responder.prepare_message_4()
569 self._make_ready(self._responder, self.recipient_id, self.sender_id)
570 self._incomplete = False
572 return self._message_4
575class OwnCredStyle(enum.Enum):
576 """Guidance for how the own credential should be sent in an EDHOC
577 exchange"""
579 ByKeyId = "by-key-id"
580 ByValue = "by-value"
582 def as_lakers(self):
583 """Convert the enum into Lakers' reepresentation of the same concept.
585 The types may eventually be unified, but so far, Lakers doesn't make
586 the distinctions we expect to make yet."""
587 if self == self.ByKeyId:
588 # FIXME: Mismatch to be fixed in lakers -- currently the only way
589 # it allows sending by reference is by Key ID
590 return lakers.CredentialTransfer.ByReference
591 if self == self.ByValue:
592 return lakers.CredentialTransfer.ByValue
593 else:
594 raise RuntimeError("enum variant not covered")