Coverage for src/aiocoap/ 0%
293 statements
« prev ^ index » next v7.6.12, created at 2025-02-12 11:18 +0000
« prev ^ index » next v7.6.12, created at 2025-02-12 11:18 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
3# SPDX-License-Identifier: MIT
5"""Internal module containing types used inside EDHOC security contexts"""
7import abc
8import enum
9import io
10from pathlib import Path
11import random
12from typing import Optional, Dict, Literal
13import os
15import cbor2
16import lakers
18from . import oscore, credentials, error
19from . import Message
20from .numbers import POST
23def load_cbor_or_edn(filename: Path):
24 """Common heuristic for whether something is CBOR or EDN"""
25 import cbor_diag
26 import cbor2
28 with"rb") as binary:
29 try:
30 result = cbor2.load(binary)
31 except cbor2.CBORDecodeError:
32 pass
33 else:
34 if == b"":
35 return result
36 # else it apparently hasn't been CBOR all through...
37 with as textual:
38 try:
39 converted = cbor_diag.diag2cbor(
40 except ValueError:
41 raise credentials.CredentialsLoadError(
42 "Data loaded from %s was recognized neither as CBOR nor CBOR Diagnostic Notation (EDN)"
43 % filename
44 )
45 # no need to check for completeness: diag2cbor doesn't do diagnostic
46 # sequences, AIU that's not even a thing
47 return cbor2.loads(converted)
50class CoseKeyForEdhoc:
51 kty: int
52 crv: int
53 d: bytes
55 @classmethod
56 def from_file(cls, filename: Path) -> "CoseKeyForEdhoc":
57 """Load a key from a file (in CBOR or EDN), asserting that the file is not group/world readable"""
58 if filename.stat().st_mode & 0o077 != 0:
59 raise credentials.CredentialsLoadError(
60 "Refusing to load private key that is group or world accessible"
61 )
63 loaded = load_cbor_or_edn(filename)
64 return cls.from_map(loaded)
66 @classmethod
67 def from_map(cls, key: dict) -> "CoseKeyForEdhoc":
68 if not isinstance(key, dict):
69 raise credentials.CredentialsLoadError(
70 "Data is not shaped like COSE_KEY (expected top-level dictionary)"
71 )
72 if 1 not in key:
73 raise credentials.CredentialsLoadError(
74 "Data is not shaped like COSE_KEY (expected key 1 (kty) in top-level dictionary)"
75 )
76 if key[1] != 2:
77 raise credentials.CredentialsLoadError(
78 "Private key type %s is not supported (currently only 2 (EC) is supported)"
79 % (key[1],)
80 )
82 if key.get(-1) != 1:
83 raise credentials.CredentialsLoadError(
84 "Private key of type EC requires key -1 (crv), currently supported values: 1 (P-256)"
85 )
87 if not isinstance(key.get(-4), bytes) or len(key[-4]) != 32:
88 raise credentials.CredentialsLoadError(
89 "Private key of type EC P-256 requires key -4 (d) to be a 32-byte long byte string"
90 )
92 if any(k not in (1, -1, -4) for k in key):
93 raise credentials.CredentialsLoadError(
94 "Extraneous data in key, consider allow-listing the item if acceptable"
95 )
97 s = cls()
98 s.kty = 2 # EC
99 s.crv = 1 # P-256
100 s.d = key[-4]
102 return s
104 def secret_to_map(self) -> dict:
105 # kty: EC, crv: P-256, d: ...
106 return {1: self.kty, -1: self.crv, -4: self.d}
108 # Should we deprecate filename, add a generate_in_file method? (It's there
109 # because generate originally depended on a file system)
110 @classmethod
111 def generate(cls, filename: Optional[Path] = None) -> "CoseKeyForEdhoc":
112 """Generate a key inside a file
114 This returns the generated private key.
115 """
117 from cryptography.hazmat.primitives.asymmetric import ec
119 key = ec.generate_private_key(curve=ec.SECP256R1())
121 s = cls()
122 s.kty = 2 # EC
123 s.crv = 1 # P-256
124 s.d = key.private_numbers().private_value.to_bytes(32, "big")
126 if filename is not None:
127 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
128 if hasattr(os, "O_BINARY"):
129 flags |= os.O_BINARY
130 descriptor =, flags, mode=0o600)
131 try:
132 with open(descriptor, "wb") as keyfile:
133 cbor2.dump(s.secret_to_map(), keyfile)
134 except Exception:
135 filename.unlink()
136 raise
138 return s
140 def as_ccs(
141 self, kid: Optional[bytes], subject: Optional[str]
142 ) -> Dict[Literal[14], dict]:
143 """Given a key, generate a corresponding KCCS"""
145 from cryptography.hazmat.primitives.asymmetric import ec
147 private = ec.derive_private_key(int.from_bytes(self.d, "big"), ec.SECP256R1())
148 public = private.public_key()
150 x = public.public_numbers().x.to_bytes(32, "big")
151 y = public.public_numbers().y.to_bytes(32, "big")
152 # kty: EC2, crv: P-256, x, y
153 cosekey = {1: 2, -1: 1, -2: x, -3: y}
154 if kid is not None:
155 cosekey[2] = kid
156 # cnf: COSE_Key
157 credential_kccs: dict = {8: {1: cosekey}}
158 if subject is not None:
159 credential_kccs[2] = subject
161 # kccs: cnf
162 return {14: credential_kccs}
165class EdhocCredentials(credentials._Objectish):
166 own_key: Optional[CoseKeyForEdhoc]
167 suite: int
168 method: int
169 own_cred: Optional[dict]
170 peer_cred: Optional[dict]
172 #: Whether the combined flow should be used when using this credential set.
173 #:
174 #: If unset or None, this the decision is left to the library (which at the
175 #: time of writing always picks True).
176 use_combined_edhoc: Optional[bool]
178 def __init__(
179 self,
180 suite: int,
181 method: int,
182 own_cred_style: Optional[str] = None,
183 peer_cred: Optional[dict] = None,
184 own_cred: Optional[dict] = None,
185 private_key_file: Optional[str] = None,
186 private_key: Optional[dict] = None,
187 use_combined_edhoc: Optional[bool] = None,
188 ):
189 from . import edhoc
191 self.suite = suite
192 self.method = method
193 self.own_cred = own_cred
194 self.peer_cred = peer_cred
195 self.use_combined_edhoc = use_combined_edhoc
197 if private_key_file is not None and private_key is not None:
198 raise credentials.CredentialsLoadError(
199 "private_key is mutually exclusive with private_key_file"
200 )
201 if private_key_file is not None:
202 # FIXME: We should carry around a base
203 private_key_path = Path(private_key_file)
204 # FIXME: We left loading the file to the user, and now we're once more
205 # in a position where we guess the file type
206 self.own_key = CoseKeyForEdhoc.from_file(private_key_path)
207 elif private_key is not None:
208 self.own_key = CoseKeyForEdhoc.from_map(private_key)
209 else:
210 self.own_key = None
212 if own_cred_style is None:
213 self.own_cred_style = None
214 else:
215 self.own_cred_style = edhoc.OwnCredStyle(own_cred_style)
217 if self.own_cred == {"unauthenticated": True}:
218 if self.own_key is not None or self.own_cred_style not in (
219 None,
220 OwnCredStyle.ByValue,
221 ):
222 raise credentials.CredentialsLoadError(
223 "For local unauthenticated use, no key can be give, and own_cred_style needs to be by-value or absent"
224 )
225 self.own_key = CoseKeyForEdhoc.generate()
226 # FIXME: kid and subj should rather not be sent, but lakers insists on their presence
227 self.own_cred = self.own_key.as_ccs(kid=b"?", subject="")
228 self.own_cred_style = OwnCredStyle.ByValue
229 else:
230 if (own_cred is None) != (own_cred_style is None) or (own_cred is None) != (
231 self.own_key is None
232 ):
233 raise credentials.CredentialsLoadError(
234 "If own credentials are given, all of own_cred, own_cred_style and private_key(_path) need to be given"
235 )
237 # FIXME: This is only used on the client side, and expects that all parts (own and peer) are present
238 self._established_context = None
240 def find_edhoc_by_id_cred_peer(self, id_cred_peer):
241 if self.peer_cred is None:
242 return None
243 if 14 not in self.peer_cred:
244 # Only recognizing CCS so far
245 return None
247 if id_cred_peer == cbor2.dumps(self.peer_cred, canonical=True):
248 # credential by value
249 return cbor2.dumps(self.peer_cred[14], canonical=True)
251 # cnf / COS_Key / kid, should be present in all CCS
252 kid = self.peer_cred[14][8][1].get(2)
253 if kid is not None and id_cred_peer == cbor2.dumps({4: kid}, canonical=True):
254 # credential by kid
255 return cbor2.dumps(self.peer_cred[14], canonical=True)
257 def peer_cred_is_unauthenticated(self):
258 # FIXME: This is rather weird internal API, and rather weird
259 # format-wise -- but it will suffice until credentials are rewritten.
260 return self.peer_cred is not None and self.peer_cred == {
261 "unauthenticated": True
262 }
264 async def establish_context(
265 self,
266 wire,
267 underlying_address,
268 underlying_proxy_scheme,
269 underlying_uri_host,
270 logger,
271 ):
273 "No OSCORE context found for EDHOC context %r, initiating one.", self
274 )
275 # The semantic identifier (an arbitrary string)
276 #
277 # FIXME: We don't support role reversal yet, but once we
278 # register this context to be available for incoming
279 # requests, we'll have to pick more carefully
280 c_i = bytes([random.randint(0, 23)])
281 initiator = lakers.EdhocInitiator()
282 message_1 = initiator.prepare_message_1(c_i)
284 msg1 = Message(
285 code=POST,
286 proxy_scheme=underlying_proxy_scheme,
287 uri_host=underlying_uri_host,
288 uri_path=[".well-known", "edhoc"],
289 payload=cbor2.dumps(True) + message_1,
290 )
291 msg1.remote = underlying_address
292 msg2 = await wire.request(msg1).response_raising
294 (c_r, id_cred_r, ead_2) = initiator.parse_message_2(msg2.payload)
296 assert isinstance(self.own_cred, dict) and list(self.own_cred.keys()) == [14], (
297 "So far can only process CCS style own credentials a la {14: ...}, own_cred = %r"
298 % self.own_cred
299 )
300 cred_i = cbor2.dumps(self.own_cred[14], canonical=True)
301 key_i = self.own_key.d
303 logger.debug("EDHOC responder sent message_2 with ID_CRED_R = %r", id_cred_r)
304 if self.peer_cred_is_unauthenticated():
305 # Not doing further checks (eg. for trailing bytes) or re-raising: This
306 # was already checked by lakers
307 parsed = cbor2.loads(id_cred_r)
309 if 14 not in parsed:
310 raise credentials.CredentialsMissingError(
311 "Peer presented credential-by-reference (or anything else that's not a KCCS) when no credential was pre-agreed"
312 )
314 # We could also pick the [14] out of the serialized stream and
315 # treat it as an opaque item, but cbor2 doesn't really do serialized items.
316 cred_r = cbor2.dumps(parsed[14], canonical=True)
317 else:
318 # We could look into id_cred_r, which is a CBOR encoded
319 # byte string, and could start comparing ... but actually
320 # EDHOC and Lakers protect us from misbinding attacks (is
321 # that what they are called?), so we can just put in our
322 # expected credential here
323 #
324 # FIXME: But looking into it might give us a better error than just
325 # "Mac2 verification failed"
327 # FIXME add assert on the structure or start doing the
328 # generalization that'll fail at startup
329 cred_r = cbor2.dumps(self.peer_cred[14], canonical=True)
331 initiator.verify_message_2(
332 key_i,
333 cred_i,
334 cred_r,
335 ) # odd that we provide that here rather than in the next function
337 logger.debug("Message 2 was verified")
339 secctx = EdhocInitiatorContext(initiator, c_i, c_r, self.own_cred_style, logger)
341 if self.use_combined_edhoc is not False:
342 secctx.complete_without_message_4()
343 # That's enough: Message 3 can well be sent along with the next
344 # message.
345 return secctx
347 logger.debug("Sending explicit message 3 without optimization")
349 message_3 = secctx._message_3
350 assert message_3 is not None
351 secctx._message_3 = None
353 if len(c_r) == 1 and (0 <= c_r[0] < 24 or 0x20 <= c_r[0] < 0x38):
354 c_r_encoded = c_r
355 else:
356 c_r_encoded = cbor2.dumps(c_r)
357 msg3 = Message(
358 code=POST,
359 proxy_scheme=underlying_proxy_scheme,
360 uri_host=underlying_uri_host,
361 uri_path=[".well-known", "edhoc"],
362 payload=c_r_encoded + message_3,
363 )
364 msg3.remote = underlying_address
365 msg4 = await wire.request(msg3).response_raising
367 secctx.complete_with_message_4(msg4.payload)
369 logger.debug("Received message 4, context is ready")
371 return secctx
374class _EdhocContextBase(
375 oscore.CanProtect, oscore.CanUnprotect, oscore.SecurityContextUtils
377 def __init__(self, logger):
378 self.log = logger
380 def post_seqnoincrease(self):
381 # The context is not persisted
382 pass
384 def protect(self, message, request_id=None, *, kid_context=True):
385 outer_message, request_id = super().protect(
386 message, request_id=request_id, kid_context=kid_context
387 )
388 message_3 = self.message_3_to_include()
389 if message_3 is not None:
390 outer_message.opt.edhoc = True
391 outer_message.payload = message_3 + outer_message.payload
392 return outer_message, request_id
394 def _make_ready(self, edhoc_context, c_ours, c_theirs):
395 # FIXME: both should offer this
396 if (
397 isinstance(edhoc_context, lakers.EdhocResponder)
398 or edhoc_context.selected_cipher_suite() == 2
399 ):
400 self.alg_aead = oscore.algorithms["AES-CCM-16-64-128"]
401 self.hashfun = oscore.hashfunctions["sha256"]
402 else:
403 raise RuntimeError("Unknown suite")
405 # we did check for critical EADs, there was no out-of-band agreement, so 8 it is
406 oscore_salt_length = 8
407 # I figure that one would be ageed out-of-band as well (currently no
408 # options to set/change this are known)
409 self.id_context = None
410 self.recipient_replay_window = oscore.ReplayWindow(32, lambda: None)
412 master_secret = edhoc_context.edhoc_exporter(0, [], self.alg_aead.key_bytes)
413 master_salt = edhoc_context.edhoc_exporter(1, [], oscore_salt_length)
415 self.sender_id = c_theirs
416 self.recipient_id = c_ours
417 if self.sender_id == self.recipient_id:
418 raise ValueError("Bad IDs: identical ones were picked")
420 self.derive_keys(master_salt, master_secret)
422 self.sender_sequence_number = 0
423 self.recipient_replay_window.initialize_empty()
425 self.log.debug("EDHOC context %r ready for OSCORE operation", self)
427 @abc.abstractmethod
428 def message_3_to_include(self) -> Optional[bytes]:
429 """An encoded message_3 to include in outgoing messages
431 This may modify self to only return something once."""
434class EdhocInitiatorContext(_EdhocContextBase):
435 """An OSCORE context that is derived from an EDHOC exchange.
437 It does not require that the EDHOC exchange has completed -- it can be set
438 up by an initiator already when message 2 has been received, prepares a
439 message 3 at setup time, and sends it with the first request that is sent
440 through it."""
442 # FIXME: Should we rather send it with *every* request that is sent before a message 4 is received implicitly?
443 def __init__(self, initiator, c_ours, c_theirs, cred_i_mode, logger):
444 super().__init__(logger)
446 # Only this line is role specific
447 self._message_3, _i_prk_out = initiator.prepare_message_3(
448 cred_i_mode.as_lakers(), None
449 )
451 self._incomplete = True
452 self._init_details = (initiator, c_ours, c_theirs)
454 def complete_without_message_4(self) -> None:
455 (initiator, c_ours, c_theirs) = self._init_details
456 initiator.completed_without_message_4()
457 self._make_ready(initiator, c_ours, c_theirs)
458 self._incomplete = False
459 self._init_details = None
461 def complete_with_message_4(self, message_4: bytes) -> None:
462 (initiator, c_ours, c_theirs) = self._init_details
463 initiator.process_message_4(message_4)
464 self._make_ready(initiator, c_ours, c_theirs)
465 self._incomplete = False
466 self._init_details = None
468 def message_3_to_include(self) -> Optional[bytes]:
469 if self._message_3 is not None:
470 result = self._message_3
471 self._message_3 = None
472 return result
473 return None
476class EdhocResponderContext(_EdhocContextBase):
477 def __init__(self, responder, c_i, c_r, server_credentials, logger):
478 super().__init__(logger)
480 # storing them where they will later be overwritten with themselves
481 self.recipient_id = c_r
482 self.sender_id = c_i
484 self._responder = responder
485 # Through these we'll look up id_cred_i
486 self._server_credentials = server_credentials
488 self.authenticated_claims = []
490 # Not sure why mypy even tolerates this -- we're clearly not ready for
491 # a general protect/unprotect, and things only work because all
492 # relevant functions get their checks introduced
493 self._incomplete = True
495 self._message_4 = None
497 def message_3_to_include(self) -> Optional[bytes]:
498 # as a responder we never send one
499 return None
501 def get_oscore_context_for(self, unprotected):
502 if oscore.COSE_KID_CONTEXT in unprotected:
503 return None
504 if unprotected.get(oscore.COSE_KID) == self.recipient_id:
505 return self
507 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
508 return set((self.recipient_id,))
510 def protect(self, *args, **kwargs):
511 if self._incomplete:
512 raise RuntimeError(
513 "EDHOC has not completed yet, waiting for message 3, can not protect own messages yet"
514 )
515 return super().protect(*args, **kwargs)
517 def unprotect(self, protected_message, request_id=None):
518 if self._incomplete:
519 if not protected_message.opt.edhoc:
520 self.log.error(
521 "OSCORE failed: No EDHOC message 3 received and none present"
522 )
523 raise error.BadRequest("EDHOC incomplete")
525 payload_stream = io.BytesIO(protected_message.payload)
526 # discarding result -- just need to have a point to split
527 _ = cbor2.load(payload_stream)
528 m3len = payload_stream.tell()
529 message_3 = protected_message.payload[:m3len]
531 self._offer_message_3(message_3)
533 protected_message = protected_message.copy(
534 edhoc=False, payload=protected_message.payload[m3len:]
535 )
537 return super().unprotect(protected_message, request_id)
539 def _offer_message_3(self, message_3: bytes) -> bytes:
540 """Send in a message 3, obtain a message 4
542 It is safe to discard the output, eg. when the CoAP EDHOC option is
543 used."""
544 if self._incomplete:
545 id_cred_i, ead_3 = self._responder.parse_message_3(message_3)
546 if ead_3 is not None:
547 self.log.error("Aborting EDHOC: EAD3 present")
548 raise error.BadRequest
550 try:
551 (cred_i, claims) = self._server_credentials.find_edhoc_by_id_cred_peer(
552 id_cred_i
553 )
554 except KeyError:
555 self.log.error(
556 "Aborting EDHOC: No credentials found for client with id_cred_i=h'%s'",
557 id_cred_i.hex(),
558 )
559 raise error.BadRequest
561 self.authenticated_claims.extend(claims)
563 self._responder.verify_message_3(cred_i)
564 self._message_4 = self._responder.prepare_message_4()
566 self._make_ready(self._responder, self.recipient_id, self.sender_id)
567 self._incomplete = False
569 return self._message_4
572class OwnCredStyle(enum.Enum):
573 """Guidance for how the own credential should be sent in an EDHOC
574 exchange"""
576 ByKeyId = "by-key-id"
577 ByValue = "by-value"
579 def as_lakers(self):
580 """Convert the enum into Lakers' reepresentation of the same concept.
582 The types may eventually be unified, but so far, Lakers doesn't make
583 the distinctions we expect to make yet."""
584 if self == self.ByKeyId:
585 # FIXME: Mismatch to be fixed in lakers -- currently the only way
586 # it allows sending by reference is by Key ID
587 return lakers.CredentialTransfer.ByReference
588 if self == self.ByValue:
589 return lakers.CredentialTransfer.ByValue
590 else:
591 raise RuntimeError("enum variant not covered")