Coverage for aiocoap/edhoc.py: 83%
258 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""Internal module containing types used inside EDHOC security contexts"""
7import abc
8import enum
9import io
10from pathlib import Path
11import random
12from typing import Optional, Dict, Literal
13import os
15import cbor2
16import lakers
18from . import oscore, credentials, error
19from . import Message
20from .numbers import POST
23def load_cbor_or_edn(filename: Path):
24 """Common heuristic for whether something is CBOR or EDN"""
25 import cbor_diag
26 import cbor2
28 with filename.open("rb") as binary:
29 try:
30 result = cbor2.load(binary)
31 except cbor2.CBORDecodeError:
32 pass
33 else:
34 if binary.read(1) == b"":
35 return result
36 # else it apparently hasn't been CBOR all through...
37 with filename.open() as textual:
38 try:
39 converted = cbor_diag.diag2cbor(textual.read())
40 except ValueError:
41 raise credentials.CredentialsLoadError(
42 "Data loaded from %s was recognized neither as CBOR nor CBOR Diagnostic Notation (EDN)"
43 % filename
44 )
45 # no need to check for completeness: diag2cbor doesn't do diagnostic
46 # sequences, AIU that's not even a thing
47 return cbor2.loads(converted)
50class CoseKeyForEdhoc:
51 kty: int
52 crv: int
53 d: bytes
55 @classmethod
56 def from_file(cls, filename: Path) -> "CoseKeyForEdhoc":
57 """Load a key from a file (in CBOR or EDN), asserting that the file is not group/world readable"""
58 if filename.stat().st_mode & 0o077 != 0:
59 raise credentials.CredentialsLoadError(
60 "Refusing to load private key that is group or world accessible"
61 )
63 loaded = load_cbor_or_edn(filename)
64 return cls.from_map(loaded)
66 @classmethod
67 def from_map(cls, key: dict) -> "CoseKeyForEdhoc":
68 if not isinstance(key, dict):
69 raise credentials.CredentialsLoadError(
70 "Data is not shaped like COSE_KEY (expected top-level dictionary)"
71 )
72 if 1 not in key:
73 raise credentials.CredentialsLoadError(
74 "Data is not shaped like COSE_KEY (expected key 1 (kty) in top-level dictionary)"
75 )
76 if key[1] != 2:
77 raise credentials.CredentialsLoadError(
78 "Private key type %s is not supported (currently only 2 (EC) is supported)"
79 % (key[1],)
80 )
82 if key.get(-1) != 1:
83 raise credentials.CredentialsLoadError(
84 "Private key of type EC requires key -1 (crv), currently supported values: 1 (P-256)"
85 )
87 if not isinstance(key.get(-4), bytes) or len(key[-4]) != 32:
88 raise credentials.CredentialsLoadError(
89 "Private key of type EC P-256 requires key -4 (d) to be a 32-byte long byte string"
90 )
92 if any(k not in (1, -1, -4) for k in key):
93 raise credentials.CredentialsLoadError(
94 "Extraneous data in key, consider allow-listing the item if acceptable"
95 )
97 s = cls()
98 s.kty = 2 # EC
99 s.crv = 1 # P-256
100 s.d = key[-4]
102 return s
104 def secret_to_map(self) -> dict:
105 # kty: EC, crv: P-256, d: ...
106 return {1: self.kty, -1: self.crv, -4: self.d}
108 # Should we deprecate filename, add a generate_in_file method? (It's there
109 # because generate originally depended on a file system)
110 @classmethod
111 def generate(cls, filename: Optional[Path] = None) -> "CoseKeyForEdhoc":
112 """Generate a key inside a file
114 This returns the generated private key.
115 """
117 from cryptography.hazmat.primitives.asymmetric import ec
119 key = ec.generate_private_key(curve=ec.SECP256R1())
121 s = cls()
122 s.kty = 2 # EC
123 s.crv = 1 # P-256
124 s.d = key.private_numbers().private_value.to_bytes(32, "big")
126 if filename is not None:
127 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
128 if hasattr(os, "O_BINARY"):
129 flags |= os.O_BINARY
130 descriptor = os.open(filename, flags, mode=0o600)
131 try:
132 with open(descriptor, "wb") as keyfile:
133 cbor2.dump(s.secret_to_map(), keyfile)
134 except Exception:
135 filename.unlink()
136 raise
138 return s
140 def as_ccs(
141 self, kid: Optional[bytes], subject: Optional[str]
142 ) -> Dict[Literal[14], dict]:
143 """Given a key, generate a corresponding KCCS"""
145 from cryptography.hazmat.primitives.asymmetric import ec
147 private = ec.derive_private_key(int.from_bytes(self.d, "big"), ec.SECP256R1())
148 public = private.public_key()
150 x = public.public_numbers().x.to_bytes(32, "big")
151 y = public.public_numbers().y.to_bytes(32, "big")
152 # kty: EC2, crv: P-256, x, y
153 cosekey = {1: 2, -1: 1, -2: x, -3: y}
154 if kid is not None:
155 cosekey[2] = kid
156 # cnf: COSE_Key
157 credential_kccs: dict = {8: {1: cosekey}}
158 if subject is not None:
159 credential_kccs[2] = subject
161 # kccs: cnf
162 return {14: credential_kccs}
165class EdhocCredentials(credentials._Objectish):
166 own_key: Optional[CoseKeyForEdhoc]
167 suite: int
168 method: int
169 own_cred: Optional[dict]
170 peer_cred: Optional[dict]
172 def __init__(
173 self,
174 suite: int,
175 method: int,
176 own_cred_style: Optional[str] = None,
177 peer_cred: Optional[dict] = None,
178 own_cred: Optional[dict] = None,
179 private_key_file: Optional[str] = None,
180 private_key: Optional[dict] = None,
181 ):
182 from . import edhoc
184 self.suite = suite
185 self.method = method
186 self.own_cred = own_cred
187 self.peer_cred = peer_cred
189 if private_key_file is not None and private_key is not None:
190 raise credentials.CredentialsLoadError(
191 "private_key is mutually exclusive with private_key_file"
192 )
193 if private_key_file is not None:
194 # FIXME: We should carry around a base
195 private_key_path = Path(private_key_file)
196 # FIXME: We left loading the file to the user, and now we're once more
197 # in a position where we guess the file type
198 self.own_key = CoseKeyForEdhoc.from_file(private_key_path)
199 elif private_key is not None:
200 self.own_key = CoseKeyForEdhoc.from_map(private_key)
201 else:
202 self.own_key = None
204 if own_cred_style is None:
205 self.own_cred_style = None
206 else:
207 self.own_cred_style = edhoc.OwnCredStyle(own_cred_style)
209 if self.own_cred == {"unauthenticated": True}:
210 if self.own_key is not None or self.own_cred_style not in (
211 None,
212 OwnCredStyle.ByValue,
213 ):
214 raise credentials.CredentialsLoadError(
215 "For local unauthenticated use, no key can be give, and own_cred_style needs to be by-value or absent"
216 )
217 self.own_key = CoseKeyForEdhoc.generate()
218 # FIXME: kid and subj should rather not be sent, but lakers insists on their presence
219 self.own_cred = self.own_key.as_ccs(kid=b"?", subject="")
220 self.own_cred_style = OwnCredStyle.ByValue
221 else:
222 if (own_cred is None) != (own_cred_style is None) or (own_cred is None) != (
223 self.own_key is None
224 ):
225 raise credentials.CredentialsLoadError(
226 "If own credentials are given, all of own_cred, own_cred_style and private_key(_path) need to be given"
227 )
229 # FIXME: This is only used on the client side, and expects that all parts (own and peer) are present
230 self._established_context = None
232 def find_edhoc_by_id_cred_peer(self, id_cred_peer):
233 if self.peer_cred is None:
234 return None
235 if 14 not in self.peer_cred:
236 # Only recognizing CCS so far
237 return None
239 if id_cred_peer == cbor2.dumps(self.peer_cred, canonical=True):
240 # credential by value
241 return cbor2.dumps(self.peer_cred[14], canonical=True)
243 # cnf / COS_Key / kid, should be present in all CCS
244 kid = self.peer_cred[14][8][1].get(2)
245 if kid is not None and id_cred_peer == cbor2.dumps({4: kid}, canonical=True):
246 # credential by kid
247 return cbor2.dumps(self.peer_cred[14], canonical=True)
249 def peer_cred_is_unauthenticated(self):
250 # FIXME: This is rather weird internal API, and rather weird
251 # format-wise -- but it will suffice until credentials are rewritten.
252 return self.peer_cred is not None and self.peer_cred == {
253 "unauthenticated": True
254 }
256 async def establish_context(
257 self,
258 wire,
259 underlying_address,
260 underlying_proxy_scheme,
261 underlying_uri_host,
262 logger,
263 ):
264 logger.info(
265 "No OSCORE context found for EDHOC context %r, initiating one.", self
266 )
267 # FIXME: We don't support role reversal yet, but once we
268 # register this context to be available for incoming
269 # requests, we'll have to pick more carefully
270 c_i = bytes([random.randint(0, 23)])
271 initiator = lakers.EdhocInitiator()
272 message_1 = initiator.prepare_message_1(c_i)
274 msg1 = Message(
275 code=POST,
276 proxy_scheme=underlying_proxy_scheme,
277 uri_host=underlying_uri_host,
278 uri_path=[".well-known", "edhoc"],
279 payload=cbor2.dumps(True) + message_1,
280 )
281 msg1.remote = underlying_address
282 msg2 = await wire.request(msg1).response_raising
284 (c_r, id_cred_r, ead_2) = initiator.parse_message_2(msg2.payload)
286 assert isinstance(self.own_cred, dict) and list(self.own_cred.keys()) == [14], (
287 "So far can only process CCS style own credentials a la {14: ...}, own_cred = %r"
288 % self.own_cred
289 )
290 cred_i = cbor2.dumps(self.own_cred[14], canonical=True)
291 key_i = self.own_key.d
293 logger.debug("EDHOC responder sent message_2 with ID_CRED_R = %r", id_cred_r)
294 if self.peer_cred_is_unauthenticated():
295 # Not doing further checks (eg. for trailing bytes) or re-raising: This
296 # was already checked by lakers
297 parsed = cbor2.loads(id_cred_r)
299 if 14 not in parsed:
300 raise credentials.CredentialsMissingError(
301 "Peer presented credential-by-reference (or anything else that's not a KCCS) when no credential was pre-agreed"
302 )
304 # We could also pick the [14] out of the serialized stream and
305 # treat it as an opaque item, but cbor2 doesn't really do serialized items.
306 cred_r = cbor2.dumps(parsed[14], canonical=True)
307 else:
308 # We could look into id_cred_r, which is a CBOR encoded
309 # byte string, and could start comparing ... but actually
310 # EDHOC and Lakers protect us from misbinding attacks (is
311 # that what they are called?), so we can just put in our
312 # expected credential here
313 #
314 # FIXME: But looking into it might give us a better error than just
315 # "Mac2 verification failed"
317 # FIXME add assert on the structure or start doing the
318 # generalization that'll fail at startup
319 cred_r = cbor2.dumps(self.peer_cred[14], canonical=True)
321 initiator.verify_message_2(
322 key_i,
323 cred_i,
324 cred_r,
325 ) # odd that we provide that here rather than in the next function
327 logger.debug("Message 2 was verified")
329 return EdhocInitiatorContext(initiator, c_i, c_r, self.own_cred_style, logger)
332class _EdhocContextBase(
333 oscore.CanProtect, oscore.CanUnprotect, oscore.SecurityContextUtils
334):
335 def __init__(self, logger):
336 self.log = logger
338 def post_seqnoincrease(self):
339 # The context is not persisted
340 pass
342 def protect(self, message, request_id=None, *, kid_context=True):
343 outer_message, request_id = super().protect(
344 message, request_id=request_id, kid_context=kid_context
345 )
346 message_3 = self.message_3_to_include()
347 if message_3 is not None:
348 outer_message.opt.edhoc = True
349 outer_message.payload = message_3 + outer_message.payload
350 return outer_message, request_id
352 def _make_ready(self, edhoc_context, c_ours, c_theirs):
353 # only in lakers >= 0.5 that is present
354 if hasattr(edhoc_context, "completed_without_message_4"):
355 edhoc_context.completed_without_message_4()
357 # FIXME: both should offer this
358 if (
359 isinstance(edhoc_context, lakers.EdhocResponder)
360 or edhoc_context.selected_cipher_suite() == 2
361 ):
362 self.alg_aead = oscore.algorithms["AES-CCM-16-64-128"]
363 self.hashfun = oscore.hashfunctions["sha256"]
364 else:
365 raise RuntimeError("Unknown suite")
367 # we did check for critical EADs, there was no out-of-band agreement, so 8 it is
368 oscore_salt_length = 8
369 # I figure that one would be ageed out-of-band as well (currently no
370 # options to set/change this are known)
371 self.id_context = None
372 self.recipient_replay_window = oscore.ReplayWindow(32, lambda: None)
374 master_secret = edhoc_context.edhoc_exporter(0, [], self.alg_aead.key_bytes)
375 master_salt = edhoc_context.edhoc_exporter(1, [], oscore_salt_length)
377 self.sender_id = c_theirs
378 self.recipient_id = c_ours
379 if self.sender_id == self.recipient_id:
380 raise ValueError("Bad IDs: identical ones were picked")
382 self.derive_keys(master_salt, master_secret)
384 self.sender_sequence_number = 0
385 self.recipient_replay_window.initialize_empty()
387 self.log.debug("EDHOC context %r ready for OSCORE operation", self)
389 @abc.abstractmethod
390 def message_3_to_include(self) -> Optional[bytes]:
391 """An encoded message_3 to include in outgoing messages
393 This may modify self to only return something once."""
396class EdhocInitiatorContext(_EdhocContextBase):
397 """An OSCORE context that is derived from an EDHOC exchange.
399 It does not require that the EDHOC exchange has completed -- it can be set
400 up by an initiator already when message 2 has been received, prepares a
401 message 3 at setup time, and sends it with the first request that is sent
402 through it."""
404 # FIXME: Should we rather send it with *every* request that is sent before a message 4 is received implicitly?
405 def __init__(self, initiator, c_ours, c_theirs, cred_i_mode, logger):
406 super().__init__(logger)
408 # Only this line is role specific
409 self._message_3, _i_prk_out = initiator.prepare_message_3(
410 cred_i_mode.as_lakers(), None
411 )
413 self._make_ready(initiator, c_ours, c_theirs)
415 def message_3_to_include(self) -> Optional[bytes]:
416 if self._message_3 is not None:
417 result = self._message_3
418 self._message_3 = None
419 return result
420 return None
423class EdhocResponderContext(_EdhocContextBase):
424 def __init__(self, responder, c_i, c_r, server_credentials, logger):
425 super().__init__(logger)
427 # storing them where they will later be overwritten with themselves
428 self.recipient_id = c_r
429 self.sender_id = c_i
431 self._responder = responder
432 # Through these we'll look up id_cred_i
433 self._server_credentials = server_credentials
435 self.authenticated_claims = []
437 # Not sure why mypy even tolerates this -- we're clearly not ready for
438 # a general protect/unprotect, and things only work because all
439 # relevant functions get their checks introduced
440 self._incomplete = True
442 def message_3_to_include(self) -> Optional[bytes]:
443 # as a responder we never send one
444 return None
446 def get_oscore_context_for(self, unprotected):
447 if oscore.COSE_KID_CONTEXT in unprotected:
448 return None
449 if unprotected.get(oscore.COSE_KID) == self.recipient_id:
450 return self
452 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
453 return set((self.recipient_id,))
455 def protect(self, *args, **kwargs):
456 if self._incomplete:
457 raise RuntimeError(
458 "EDHOC has not completed yet, waiting for message 3, can not protect own messages yet"
459 )
460 return super().protect(*args, **kwargs)
462 def unprotect(self, protected_message, request_id=None):
463 if self._incomplete:
464 if not protected_message.opt.edhoc:
465 self.log.error(
466 "OSCORE failed: No EDHOC message 3 received and none present"
467 )
468 raise error.BadRequest("EDHOC incomplete")
470 payload_stream = io.BytesIO(protected_message.payload)
471 # discarding result -- just need to have a point to split
472 _ = cbor2.load(payload_stream)
473 m3len = payload_stream.tell()
474 message_3 = protected_message.payload[:m3len]
476 id_cred_i, ead_3 = self._responder.parse_message_3(message_3)
477 if ead_3 is not None:
478 self.log.error("Aborting EDHOC: EAD3 present")
479 raise error.BadRequest
481 try:
482 (cred_i, claims) = self._server_credentials.find_edhoc_by_id_cred_peer(
483 id_cred_i
484 )
485 except KeyError:
486 self.log.error(
487 "Aborting EDHOC: No credentials found for client with id_cred_i=h'%s'",
488 id_cred_i.hex(),
489 )
490 raise error.BadRequest
492 self.authenticated_claims.extend(claims)
494 self._responder.verify_message_3(cred_i)
496 self._make_ready(self._responder, self.recipient_id, self.sender_id)
497 self._incomplete = False
499 protected_message = protected_message.copy(
500 edhoc=False, payload=protected_message.payload[m3len:]
501 )
503 return super().unprotect(protected_message, request_id)
506class OwnCredStyle(enum.Enum):
507 """Guidance for how the own credential should be sent in an EDHOC
508 exchange"""
510 ByKeyId = "by-key-id"
511 ByValue = "by-value"
513 def as_lakers(self):
514 """Convert the enum into Lakers' reepresentation of the same concept.
516 The types may eventually be unified, but so far, Lakers doesn't make
517 the distinctions we expect to make yet."""
518 if self == self.ByKeyId:
519 # FIXME: Mismatch to be fixed in lakers -- currently the only way
520 # it allows sending by reference is by Key ID
521 return lakers.CredentialTransfer.ByReference
522 if self == self.ByValue:
523 return lakers.CredentialTransfer.ByValue
524 else:
525 raise RuntimeError("enum variant not covered")