Coverage for aiocoap/credentials.py: 79%
169 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module describes how security credentials are expressed in aiocoap,
6how security protocols (TLS, DTLS, OSCOAP) can store and access their key
7material, and for which URIs they are used.
9For consistency, mappings between accessible resources and their credentials
10are always centered around URIs. This is slightly atypical, because a client
11will typically use a particular set of credentials for all operations on one
12server, while a server first loads all available credentials and then filters
13out whether the client may actually access a resource per-path, but it works
14with full URIs (or patterns thereof) just as well. That approach allows using
15more similar structures both on the server and the client, and works smoothly
16for virtual hosting, firewalling and clients accessing resources with varying
17credentials.
19Still, client and server credentials are kept apart, lest a server open up (and
20potentially reveal) to a PSK set it is only configured to use as a client.
21While client credentials already have their place in
22:attr:`aiocoap.protocol.Context.client_credentials`, server credentials are not
23in use at a standardized location yet because there is only code in the OSCORE
24plug tests that can use it so far.
26Library developer notes
27~~~~~~~~~~~~~~~~~~~~~~~
29This whole module currently relies on a mixture of introspection and manual
30parsing of the JSON-ish tree. A preferred expression of the same would rely on
31the credentials.cddl description and build an object tree from that, but the
32author is unaware of any existing CDDL Python implementation. That might also
33ease porting to platforms that don't support inspect like micropython does.
34"""
36import re
37import inspect
39from typing import Optional, List, Tuple
42"""
43server: {
44 'coaps://mysite/*': { 'dtls-psk' (or other granularity): { 'psk': 'abcd' }},
45 'coap://mysite/*': { 'oscore': { 'basedir': 'my-basedir/' } },
46 'coap://myothersite/firmware': ':myotherkey',
47 'coap://myothersite/reset': ':myotherkey',
48 'coap://othersite*': { 'unprotected': true },
49 ':myotherkey': { 'oscore': { 'basedir': 'my-basedir/' } }
50 }
52server can of course just say it doesn't want to have the Site handle it and
53just say '*': { 'unprotected': true }, add some ':foo': {'dtls-psk': ...}
54entries (so communication can be established in the first place) and let
55individual resources decide whether they return 4.01 or something else.
57client can be the same with different implied role, or have something like
59client: {
60 'coap://myothersite/*': ':myotherkey',
61 ...
62 }
64in future also
66server: {
67 'coaps://mysite/*': { 'dtls-cert': {'key': '...pem', 'cert': '...crt'} }
68 }
70client: {
71 '*': { 'dtls-cert': { 'ca': '/etc/ssl/...' } }
72}
74or more complex ones:
76server: {
77 'coaps://myothersite/wellprotected': { 'all': [ ':mydtls', ':myotherkey' ]}
78 'coaps://myothersite/*': { 'any': [ ':mydtls', ':myotherkey' ]}
79}
80"""
83class CredentialsLoadError(ValueError):
84 """Raised by functions that create a CredentialsMap or its parts from
85 simple data structures"""
88class CredentialsMissingError(RuntimeError):
89 """Raised when no suiting credentials can be found for a message, or
90 credentials are found but inapplicable to a transport's security
91 mechanisms."""
94class CredentialReference:
95 def __init__(self, target, map):
96 if not target.startswith(":"):
97 raise CredentialsLoadError(
98 "Credential references must start with a colon (':')"
99 )
100 self.target = target
101 self.map = map
103 # FIXME either generalize this with getattr, or introduce a function to
104 # resolve any indirect credentials to a particular instance.
106 def as_dtls_psk(self):
107 return self.map[self.target].as_dtls_psk()
110class _Listish(list):
111 @classmethod
112 def from_item(cls, v):
113 if not isinstance(v, list):
114 raise CredentialsLoadError("%s goes with a list" % cls.__name__)
115 return cls(v)
118class AnyOf(_Listish):
119 pass
122class AllOf(_Listish):
123 pass
126def _call_from_structureddata(constructor, name, init_data):
127 if not isinstance(init_data, dict):
128 raise CredentialsLoadError("%s goes with an object" % name)
130 init_data = {k.replace("-", "_"): v for (k, v) in init_data.items()}
132 sig = inspect.signature(constructor)
134 checked_items = {}
136 for k, v in init_data.items():
137 try:
138 annotation = sig.parameters[k].annotation
139 except KeyError:
140 # let this raise later in binding
141 checked_items[k] = object()
142 annotation = "attribute does not exist"
144 if isinstance(v, dict) and "ascii" in v:
145 if len(v) != 1:
146 raise CredentialsLoadError("ASCII objects can only have one elemnt.")
147 try:
148 v = v["ascii"].encode("ascii")
149 except UnicodeEncodeError:
150 raise CredentialsLoadError(
151 "Elements of the ASCII object can not be represented in ASCII, please use binary or hex representation."
152 )
154 if isinstance(v, dict) and "hex" in v:
155 if len(v) != 1:
156 raise CredentialsLoadError("Hex objects can only have one elemnt.")
157 try:
158 v = bytes.fromhex(
159 v["hex"].replace("-", "").replace(" ", "").replace(":", "")
160 )
161 except ValueError as e:
162 raise CredentialsLoadError(
163 "Hex object can not be read: %s" % (e.args[0])
164 )
166 # Not using isinstance because I foundno way to extract the type
167 # information from an Optional/Union again; this whole thing works
168 # only for strings and ints anyway, so why not.
169 #
170 # The second or-branch is for functions from modules with __future__.annotations
171 if annotation not in (type(v), Optional[type(v)]) and annotation not in (
172 type(v).__name__,
173 "Optional[%s]" % type(v).__name__,
174 ):
175 # explicitly not excluding inspect._empty here: constructors
176 # need to be fully annotated
177 raise CredentialsLoadError(
178 "Type mismatch in attribute %s of %s: expected %s, got %r"
179 % (k, name, annotation, v)
180 )
182 checked_items[k] = v
184 try:
185 bound = sig.bind(**checked_items)
186 except TypeError as e:
187 raise CredentialsLoadError("%s: %s" % (name, e.args[0]))
189 return constructor(*bound.args, **bound.kwargs)
192class _Objectish:
193 @classmethod
194 def from_item(cls, init_data):
195 return _call_from_structureddata(cls, cls.__name__, init_data)
198class DTLS(_Objectish):
199 def __init__(self, psk: bytes, client_identity: bytes):
200 self.psk = psk
201 self.client_identity = client_identity
203 def as_dtls_psk(self):
204 return (self.client_identity, self.psk)
207class TLSCert(_Objectish):
208 """Indicates that a client can use the given certificate file to authenticate the server.
210 Can only be used with 'coaps+tcp://HOSTINFO/*' and 'coaps+tcp://*' forms.
211 """
213 def __init__(self, certfile: str):
214 self.certfile = certfile
216 def as_ssl_params(self):
217 """Generate parameters suitable for passing via ** to
218 ssl.create_default_context when purpose is alreay set"""
219 return {"cafile": self.certfile}
222def import_filesystem_security_context():
223 from .oscore import FilesystemSecurityContext
225 return FilesystemSecurityContext
228def import_edhoc_credential_pair():
229 from . import edhoc
231 return edhoc.EdhocCredentials
234_re_cache = {}
237class CredentialsMap(dict):
238 """
239 FIXME: outdated, rewrite when usable
241 A CredentialsMap, for any URI template and operation, which
242 security contexts are sufficient to to perform the operation on a matching
243 URI.
245 The same context can be used both by the server and the client, where the
246 client uses the information on allowed client credentials to decide which
247 credentials to present, and the information on allowed server credentials
248 to decide whether the server can be trusted.
250 Conversely, the server typically loads all available server credentials at
251 startup, and then uses the client credentials list to decide whether to
252 serve the request."""
254 def load_from_dict(self, d):
255 """Populate the map from a dictionary, which would typically have been
256 loaded from a JSON/YAML file and needs to match the CDDL in
257 credentials.cddl.
259 Running this multiple times will overwriter individual entries in the
260 map."""
261 for k, v in d.items():
262 if v is None:
263 if k in self:
264 del self[k]
265 else:
266 self[k] = self._item_from_dict(v)
267 # FIXME only works that way for OSCORE clients
268 self[k].authenticated_claims = [k]
270 def _item_from_dict(self, v):
271 if isinstance(v, str):
272 return CredentialReference(v, self)
273 elif isinstance(v, dict):
274 try:
275 ((key, value),) = v.items()
276 except ValueError:
277 # this follows how Rust Enums are encoded in serde JSON
278 raise CredentialsLoadError(
279 "Items in a credentials map must have exactly one key"
280 " (found %s)" % (",".join(v.keys()) or "empty")
281 )
283 try:
284 type_ = self._class_map[key]
285 except KeyError:
286 raise CredentialsLoadError("Unknown credential type: %s" % key)
288 return type_().from_item(value)
290 # Phrased as callbacks so they can import lazily. We make sure that all are
291 # still present so that an entry that is not loadable raises an error
292 # rather than possibly being ignored.
293 _class_map = {
294 "dtls": lambda: DTLS,
295 "oscore": import_filesystem_security_context,
296 "tlscert": lambda: TLSCert,
297 "any-of": lambda: AnyOf,
298 "all-of": lambda: AllOf,
299 "edhoc-oscore": import_edhoc_credential_pair,
300 }
302 @staticmethod
303 def _wildcard_match(searchterm, pattern):
304 if pattern not in _re_cache:
305 _re_cache[pattern] = re.compile(re.escape(pattern).replace("\\*", ".*"))
306 return _re_cache[pattern].fullmatch(searchterm) is not None
308 # used by a client
310 def credentials_from_request(self, msg):
311 """Return the most specific match to a request message. Matching is
312 currently based on wildcards, but not yet very well thought out."""
314 uri = msg.get_request_uri()
316 for i in range(1000):
317 for k, v in sorted(self.items(), key=lambda x: len(x[0]), reverse=True):
318 if self._wildcard_match(uri, k):
319 if isinstance(v, str):
320 uri = v
321 continue
322 return v
323 else:
324 raise CredentialsMissingError("No suitable credentials for %s" % uri)
325 else:
326 raise CredentialsLoadError(
327 "Search for suitable credentials for %s exceeds recursion limit"
328 )
330 def ssl_client_context(self, scheme, hostinfo):
331 """Return an SSL client context as configured for the given request
332 scheme and hostinfo (no full message is to be processed here, as
333 connections are used across requests to the same origin).
335 If no credentials are configured, this returns None (for which the user
336 may need to fill in ssl.create_default_context() if None is not already
337 a good indicator for the eventual consumer to use the default)."""
339 ssl_params = {}
340 tlscert = self.get("%s://%s/*" % (scheme, hostinfo), None)
341 # FIXME: handle Any or All if they include TLSCert, or deprecate them
342 if not isinstance(tlscert, TLSCert):
343 return
344 if tlscert is None:
345 tlscert = self.get("%s://*" % scheme, None)
346 if tlscert is not None:
347 ssl_params = tlscert.as_ssl_params()
348 if ssl_params:
349 import ssl
351 return ssl.create_default_context(**ssl_params)
353 # used by a server
355 def find_oscore(self, unprotected):
356 # FIXME: this is not constant-time as it should be, but too much in
357 # flux to warrant optimization
359 # FIXME: duplicate contexts for being tried out are not supported yet.
361 for item in self.values():
362 if not hasattr(item, "get_oscore_context_for"):
363 continue
365 ctx = item.get_oscore_context_for(unprotected)
366 if ctx is not None:
367 return ctx
369 raise KeyError()
371 def find_all_used_contextless_oscore_kid(self) -> set[bytes]:
372 all_kid = set()
374 for item in self.values():
375 if not hasattr(item, "find_all_used_contextless_oscore_kid"):
376 continue
378 all_kid |= item.find_all_used_contextless_oscore_kid()
380 return all_kid
382 def find_edhoc_by_id_cred_peer(self, id_cred_peer) -> Tuple[bytes, List[str]]:
383 for label, item in self.items():
384 if not hasattr(item, "find_edhoc_by_id_cred_peer"):
385 continue
387 # typically returning self
388 credential = item.find_edhoc_by_id_cred_peer(id_cred_peer)
389 if credential is not None:
390 return (credential, [label])
392 from . import edhoc
393 import cbor2
395 for label, item in self.items():
396 if (
397 isinstance(item, edhoc.EdhocCredentials)
398 and item.peer_cred_is_unauthenticated()
399 ):
400 id_cred_peer = cbor2.loads(id_cred_peer)
401 if isinstance(id_cred_peer, dict) and 14 in id_cred_peer:
402 return (cbor2.dumps(id_cred_peer[14], canonical=True), [label])
404 raise KeyError
406 def find_dtls_psk(self, identity):
407 # FIXME similar to find_oscore
408 for entry, item in self.items():
409 if not hasattr(item, "as_dtls_psk"):
410 continue
412 psk_id, psk = item.as_dtls_psk()
413 if psk_id != identity:
414 continue
416 # FIXME is returning the entry name a sane value to later put in to
417 # authenticated_claims? OSCORE does something different.
418 return (psk, entry)
420 raise KeyError()