Coverage for aiocoap/edhoc.py: 83%
250 statements
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-15 22:10 +0000
« prev ^ index » next coverage.py v7.6.3, created at 2024-10-15 22:10 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""Internal module containing types used inside EDHOC security contexts"""
7import abc
8import enum
9import io
10from pathlib import Path
11import random
12from typing import Optional, Dict, Literal
13import os
15import cbor2
16import lakers
18from . import oscore, credentials, error
19from . import Message
20from .numbers import POST
23def load_cbor_or_edn(filename: Path):
24 """Common heuristic for whether something is CBOR or EDN"""
25 import cbor_diag
26 import cbor2
28 with filename.open("rb") as binary:
29 try:
30 result = cbor2.load(binary)
31 except cbor2.CBORDecodeError:
32 pass
33 else:
34 if binary.read(1) == b"":
35 return result
36 # else it apparently hasn't been CBOR all through...
37 with filename.open() as textual:
38 try:
39 converted = cbor_diag.diag2cbor(textual.read())
40 except ValueError:
41 raise credentials.CredentialsLoadError(
42 "Data loaded from %s was recognized neither as CBOR nor CBOR Diagnostic Notation (EDN)"
43 % filename
44 )
45 # no need to check for completeness: diag2cbor doesn't do diagnostic
46 # sequences, AIU that's not even a thing
47 return cbor2.loads(converted)
50class CoseKeyForEdhoc:
51 kty: int
52 crv: int
53 d: bytes
55 @classmethod
56 def from_file(cls, filename: Path) -> "CoseKeyForEdhoc":
57 """Load a key from a file (in CBOR or EDN), asserting that the file is not group/world readable"""
58 if filename.stat().st_mode & 0o077 != 0:
59 raise credentials.CredentialsLoadError(
60 "Refusing to load private key that is group or world accessible"
61 )
63 loaded = load_cbor_or_edn(filename)
64 return cls.from_map(loaded)
66 @classmethod
67 def from_map(cls, key: dict) -> "CoseKeyForEdhoc":
68 if not isinstance(key, dict):
69 raise credentials.CredentialsLoadError(
70 "Data is not shaped like COSE_KEY (expected top-level dictionary)"
71 )
72 if 1 not in key:
73 raise credentials.CredentialsLoadError(
74 "Data is not shaped like COSE_KEY (expected key 1 (kty) in top-level dictionary)"
75 )
76 if key[1] != 2:
77 raise credentials.CredentialsLoadError(
78 "Private key type %s is not supported (currently only 2 (EC) is supported)"
79 % (key[1],)
80 )
82 if key.get(-1) != 1:
83 raise credentials.CredentialsLoadError(
84 "Private key of type EC requires key -1 (crv), currently supported values: 1 (P-256)"
85 )
87 if not isinstance(key.get(-4), bytes) or len(key[-4]) != 32:
88 raise credentials.CredentialsLoadError(
89 "Private key of type EC P-256 requires key -4 (d) to be a 32-byte long byte string"
90 )
92 if any(k not in (1, -1, -4) for k in key):
93 raise credentials.CredentialsLoadError(
94 "Extraneous data in key, consider allow-listing the item if acceptable"
95 )
97 s = cls()
98 s.kty = 2 # EC
99 s.crv = 1 # P-256
100 s.d = key[-4]
102 return s
104 def secret_to_map(self) -> dict:
105 # kty: EC, crv: P-256, d: ...
106 return {1: self.kty, -1: self.crv, -4: self.d}
108 # Should we deprecate filename, add a generate_in_file method? (It's there
109 # because generate originally depended on a file system)
110 @classmethod
111 def generate(cls, filename: Optional[Path] = None) -> "CoseKeyForEdhoc":
112 """Generate a key inside a file
114 This returns the generated private key.
115 """
117 from cryptography.hazmat.primitives.asymmetric import ec
119 key = ec.generate_private_key(curve=ec.SECP256R1())
121 s = cls()
122 s.kty = 2 # EC
123 s.crv = 1 # P-256
124 s.d = key.private_numbers().private_value.to_bytes(32, "big")
126 if filename is not None:
127 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
128 if hasattr(os, "O_BINARY"):
129 flags |= os.O_BINARY
130 descriptor = os.open(filename, flags, mode=0o600)
131 try:
132 with open(descriptor, "wb") as keyfile:
133 cbor2.dump(s.secret_to_map(), keyfile)
134 except Exception:
135 filename.unlink()
136 raise
138 return s
140 def as_ccs(
141 self, kid: Optional[bytes], subject: Optional[str]
142 ) -> Dict[Literal[14], dict]:
143 """Given a key, generate a corresponding KCCS"""
145 from cryptography.hazmat.primitives.asymmetric import ec
147 private = ec.derive_private_key(int.from_bytes(self.d, "big"), ec.SECP256R1())
148 public = private.public_key()
150 x = public.public_numbers().x.to_bytes(32, "big")
151 y = public.public_numbers().y.to_bytes(32, "big")
152 # kty: EC2, crv: P-256, x, y
153 cosekey = {1: 2, -1: 1, -2: x, -3: y}
154 if kid is not None:
155 cosekey[2] = kid
156 # cnf: COSE_Key
157 credential_kccs: dict = {8: {1: cosekey}}
158 if subject is not None:
159 credential_kccs[2] = subject
161 # kccs: cnf
162 return {14: credential_kccs}
165class EdhocCredentials(credentials._Objectish):
166 own_key: Optional[CoseKeyForEdhoc]
167 suite: int
168 method: int
169 own_cred: Optional[dict]
170 peer_cred: Optional[dict]
172 def __init__(
173 self,
174 suite: int,
175 method: int,
176 own_cred_style: Optional[str] = None,
177 peer_cred: Optional[dict] = None,
178 own_cred: Optional[dict] = None,
179 private_key_file: Optional[str] = None,
180 private_key: Optional[dict] = None,
181 ):
182 from . import edhoc
184 self.suite = suite
185 self.method = method
186 self.own_cred = own_cred
187 self.peer_cred = peer_cred
189 if private_key_file is not None and private_key is not None:
190 raise credentials.CredentialsLoadError(
191 "private_key is mutually exclusive with private_key_file"
192 )
193 if private_key_file is not None:
194 # FIXME: We should carry around a base
195 private_key_path = Path(private_key_file)
196 # FIXME: We left loading the file to the user, and now we're once more
197 # in a position where we guess the file type
198 self.own_key = CoseKeyForEdhoc.from_file(private_key_path)
199 elif private_key is not None:
200 self.own_key = CoseKeyForEdhoc.from_map(private_key)
201 else:
202 self.own_key = None
204 if (own_cred is None) != (own_cred_style is None) or (own_cred is None) != (
205 self.own_key is None
206 ):
207 raise credentials.CredentialsLoadError(
208 "If own credentials are given, all of own_cred, own_cred_style and private_key(_path) need to be given"
209 )
211 if own_cred_style is None:
212 self.own_cred_style = None
213 else:
214 self.own_cred_style = edhoc.OwnCredStyle(own_cred_style)
216 # FIXME: This is only used on the client side, and expects that all parts (own and peer) are present
217 self._established_context = None
219 def find_edhoc_by_id_cred_peer(self, id_cred_peer):
220 if self.peer_cred is None:
221 return None
222 if 14 not in self.peer_cred:
223 # Only recognizing CCS so far
224 return None
226 if id_cred_peer == cbor2.dumps(self.peer_cred[14], canonical=True):
227 # credential by value
228 return cbor2.dumps(self.peer_cred[14], canonical=True)
230 # cnf / COS_Key / kid, should be present in all CCS
231 kid = self.peer_cred[14][8][1].get(2)
232 if kid is not None and id_cred_peer == kid:
233 # credential by kid
234 return cbor2.dumps(self.peer_cred[14], canonical=True)
236 def peer_cred_is_unauthenticated(self):
237 # FIXME: This is rather weird internal API, and rather weird
238 # format-wise -- but it will suffice until credentials are rewritten.
239 return self.peer_cred is not None and self.peer_cred == {
240 "unauthenticated": True
241 }
243 async def establish_context(
244 self,
245 wire,
246 underlying_address,
247 underlying_proxy_scheme,
248 underlying_uri_host,
249 logger,
250 ):
251 logger.info(
252 "No OSCORE context found for EDHOC context %r, initiating one.", self
253 )
254 # FIXME: We don't support role reversal yet, but once we
255 # register this context to be available for incoming
256 # requests, we'll have to pick more carefully
257 c_i = bytes([random.randint(0, 23)])
258 initiator = lakers.EdhocInitiator()
259 message_1 = initiator.prepare_message_1(c_i)
261 msg1 = Message(
262 code=POST,
263 proxy_scheme=underlying_proxy_scheme,
264 uri_host=underlying_uri_host,
265 uri_path=[".well-known", "edhoc"],
266 payload=cbor2.dumps(True) + message_1,
267 )
268 msg1.remote = underlying_address
269 msg2 = await wire.request(msg1).response_raising
271 (c_r, id_cred_r, ead_2) = initiator.parse_message_2(msg2.payload)
273 assert isinstance(self.own_cred, dict) and list(self.own_cred.keys()) == [14], (
274 "So far can only process CCS style own credentials a la {14: ...}, own_cred = %r"
275 % self.own_cred
276 )
277 cred_i = cbor2.dumps(self.own_cred[14], canonical=True)
278 key_i = self.own_key.d
280 logger.debug("EDHOC responder sent message_2 with ID_CRED_R = %r", id_cred_r)
281 if self.peer_cred == {"unauthenticated": True}:
282 # Not doing further checks (eg. for trailing bytes) or re-raising: This
283 # was already checked by lakers
284 parsed = cbor2.loads(id_cred_r)
286 if not isinstance(parsed, dict):
287 raise credentials.CredentialsMissingError(
288 "Peer presented credential-by-reference when no credential was pre-agreed"
289 )
291 cred_r = id_cred_r
292 else:
293 # We could look into id_cred_r, which is a CBOR encoded
294 # byte string, and could start comparing ... but actually
295 # EDHOC and Lakers protect us from misbinding attacks (is
296 # that what they are called?), so we can just put in our
297 # expected credential here
298 #
299 # FIXME: But looking into it might give us a better error than just
300 # "Mac2 verification failed"
302 # FIXME add assert on the structure or start doing the
303 # generalization that'll fail at startup
304 cred_r = cbor2.dumps(self.peer_cred[14], canonical=True)
306 initiator.verify_message_2(
307 key_i,
308 cred_i,
309 cred_r,
310 ) # odd that we provide that here rather than in the next function
312 logger.debug("Message 2 was verified")
314 return EdhocInitiatorContext(initiator, c_i, c_r, self.own_cred_style, logger)
317class _EdhocContextBase(
318 oscore.CanProtect, oscore.CanUnprotect, oscore.SecurityContextUtils
319):
320 def __init__(self, logger):
321 self.log = logger
323 def post_seqnoincrease(self):
324 # The context is not persisted
325 pass
327 def protect(self, message, request_id=None, *, kid_context=True):
328 outer_message, request_id = super().protect(
329 message, request_id=request_id, kid_context=kid_context
330 )
331 message_3 = self.message_3_to_include()
332 if message_3 is not None:
333 outer_message.opt.edhoc = True
334 outer_message.payload = message_3 + outer_message.payload
335 return outer_message, request_id
337 def _make_ready(self, edhoc_context, c_ours, c_theirs):
338 # FIXME: both should offer this
339 if (
340 isinstance(edhoc_context, lakers.EdhocResponder)
341 or edhoc_context.selected_cipher_suite() == 2
342 ):
343 self.alg_aead = oscore.algorithms["AES-CCM-16-64-128"]
344 self.hashfun = oscore.hashfunctions["sha256"]
345 else:
346 raise RuntimeError("Unknown suite")
348 # we did check for critical EADs, there was no out-of-band agreement, so 8 it is
349 oscore_salt_length = 8
350 # I figure that one would be ageed out-of-band as well (currently no
351 # options to set/change this are known)
352 self.id_context = None
353 self.recipient_replay_window = oscore.ReplayWindow(32, lambda: None)
355 master_secret = edhoc_context.edhoc_exporter(0, [], self.alg_aead.key_bytes)
356 master_salt = edhoc_context.edhoc_exporter(1, [], oscore_salt_length)
358 self.sender_id = c_theirs
359 self.recipient_id = c_ours
360 if self.sender_id == self.recipient_id:
361 raise ValueError("Bad IDs: identical ones were picked")
363 self.derive_keys(master_salt, master_secret)
365 self.sender_sequence_number = 0
366 self.recipient_replay_window.initialize_empty()
368 self.log.debug("EDHOC context %r ready for OSCORE operation", self)
370 @abc.abstractmethod
371 def message_3_to_include(self) -> Optional[bytes]:
372 """An encoded message_3 to include in outgoing messages
374 This may modify self to only return something once."""
377class EdhocInitiatorContext(_EdhocContextBase):
378 """An OSCORE context that is derived from an EDHOC exchange.
380 It does not require that the EDHOC exchange has completed -- it can be set
381 up by an initiator already when message 2 has been received, prepares a
382 message 3 at setup time, and sends it with the first request that is sent
383 through it."""
385 # FIXME: Should we rather send it with *every* request that is sent before a message 4 is received implicitly?
386 def __init__(self, initiator, c_ours, c_theirs, cred_i_mode, logger):
387 super().__init__(logger)
389 # Only this line is role specific
390 self._message_3, _i_prk_out = initiator.prepare_message_3(
391 cred_i_mode.as_lakers(), None
392 )
394 self._make_ready(initiator, c_ours, c_theirs)
396 def message_3_to_include(self) -> Optional[bytes]:
397 if self._message_3 is not None:
398 result = self._message_3
399 self._message_3 = None
400 return result
401 return None
404class EdhocResponderContext(_EdhocContextBase):
405 def __init__(self, responder, c_i, c_r, server_credentials, logger):
406 super().__init__(logger)
408 # storing them where they will later be overwritten with themselves
409 self.recipient_id = c_r
410 self.sender_id = c_i
412 self._responder = responder
413 # Through these we'll look up id_cred_i
414 self._server_credentials = server_credentials
416 self.authenticated_claims = []
418 # Not sure why mypy even tolerates this -- we're clearly not ready for
419 # a general protect/unprotect, and things only work because all
420 # relevant functions get their checks introduced
421 self._incomplete = True
423 def message_3_to_include(self) -> Optional[bytes]:
424 # as a responder we never send one
425 return None
427 def get_oscore_context_for(self, unprotected):
428 if oscore.COSE_KID_CONTEXT in unprotected:
429 return None
430 if unprotected.get(oscore.COSE_KID) == self.recipient_id:
431 return self
433 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
434 return set((self.recipient_id,))
436 def protect(self, *args, **kwargs):
437 if self._incomplete:
438 raise RuntimeError(
439 "EDHOC has not completed yet, waiting for message 3, can not protect own messages yet"
440 )
441 return super().protect(*args, **kwargs)
443 def unprotect(self, protected_message, request_id=None):
444 if self._incomplete:
445 if not protected_message.opt.edhoc:
446 self.log.error(
447 "OSCORE failed: No EDHOC message 3 received and none present"
448 )
449 raise error.BadRequest("EDHOC incomplete")
451 payload_stream = io.BytesIO(protected_message.payload)
452 # discarding result -- just need to have a point to split
453 _ = cbor2.load(payload_stream)
454 m3len = payload_stream.tell()
455 message_3 = protected_message.payload[:m3len]
457 id_cred_i, ead_3 = self._responder.parse_message_3(message_3)
458 if ead_3 is not None:
459 self.log.error("Aborting EDHOC: EAD3 present")
460 raise error.BadRequest
462 try:
463 (cred_i, claims) = self._server_credentials.find_edhoc_by_id_cred_peer(
464 id_cred_i
465 )
466 except KeyError:
467 self.log.error(
468 "Aborting EDHOC: No credentials found for client with id_cred_i=h'%s'",
469 id_cred_i.hex(),
470 )
471 raise error.BadRequest
473 self.authenticated_claims.extend(claims)
475 self._responder.verify_message_3(cred_i)
477 self._make_ready(self._responder, self.recipient_id, self.sender_id)
478 self._incomplete = False
480 protected_message = protected_message.copy(
481 edhoc=False, payload=protected_message.payload[m3len:]
482 )
484 return super().unprotect(protected_message, request_id)
487class OwnCredStyle(enum.Enum):
488 """Guidance for how the own credential should be sent in an EDHOC
489 exchange"""
491 ByKeyId = "by-key-id"
492 ByValue = "by-value"
494 def as_lakers(self):
495 """Convert the enum into Lakers' reepresentation of the same concept.
497 The types may eventually be unified, but so far, Lakers doesn't make
498 the distinctions we expect to make yet."""
499 if self == self.ByKeyId:
500 # FIXME: Mismatch to be fixed in lakers -- currently the only way
501 # it allows sending by reference is by Key ID
502 return lakers.CredentialTransfer.ByReference
503 if self == self.ByValue:
504 return lakers.CredentialTransfer.ByValue
505 else:
506 raise RuntimeError("enum variant not covered")