Coverage for aiocoap / util / dataclass_data.py: 82%
101 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""Tools to load a typed dataclass from CBOR/JSON/TOML/YAML-model data.
7Unlike what is in the aiocoap.credentials module, this works from the fixed
8assumption that the item is a dataclass (as opposed to having an arbitrary
9constructor), which should ease things.
11Caveats
12-------
14* The module expects the data classes' annotations to be types and
15 not strings, and therefore can't be used with types defined under ``from
16 __future__ import annotations``.
18Example use
19-----------
21>>> from dataclasses import dataclass
22>>> from typing import Optional
23>>> @dataclass
24... class Inner(LoadStoreClass):
25... some_text: str
26... some_number: Optional[int]
27>>> @dataclass
28... class Top(LoadStoreClass):
29... x: str | bytes
30... y: Optional[Inner]
31... z: dict[str, int]
32>>> Top.load({"x": "test", "y": {"some-text": "one", "some-number": 42}, "z": {"a": 1}})
33Top(x='test', y=Inner(some_text='one', some_number=42), z={'a': 1})
35Stability
36---------
38This module's content is not stable for direct use.
39Nonetheless, it is updated carefully, as changes in behavior of the :mod:`aiocoap.config` module might result from changes here.
41Components
42----------
43"""
45import dataclasses
46import types
47from typing import Self, Union, Optional
48from pathlib import Path
49import sys
52class LoadStoreClass:
53 @classmethod
54 def load(
55 cls,
56 data: dict,
57 *,
58 depth_limit: int = 16,
59 basefile: Optional[Path] = None,
60 _prefix: Optional[str] = None,
61 ) -> Self:
62 """Creates an instance from the given data dictionary.
64 Keys are used to populate fields like in the initializer; dashes ("-")
65 in names are replaced with underscores ("_") so that Python-idiomatic
66 field names (in snake_case) can be used with TOML idiomatic item names
67 (in kebab-case).
69 Values are type-checked against the annotations, and unknown fields are
70 disallowed. When annotations indicate another ``LoadStoreClass``,
71 initialization recurses into that type up to a depth limit.
73 The ``basefile`` is used for error messages, and to construct ``Path``
74 items as relative to the file name given there. (For example, if
75 ``basefile=Path("config.d/test.json")``, a value of ``"test2.json"``
76 will be represented as ``Path("config.d/test2.json")``). It also serves
77 as a starting point for the error location indication, which is built
78 into ``_prefix`` in recursion as a vague path-like expression like
79 ``test.json key/key[key]``.
81 This reliably raises ``ValueError`` or its subtypes on unacceptable
82 data as long as the class is set up in a supported way.
83 """
85 assert dataclasses.is_dataclass(cls)
87 if _prefix is None:
88 if basefile is not None:
89 prefix = f"{basefile} "
90 else:
91 prefix = ""
92 else:
93 prefix = f"{_prefix}/"
95 fields = {f.name: f for f in dataclasses.fields(cls)}
97 kwargs = {}
99 for key, value in data.items():
100 keyprefix = f"{prefix}{key}"
101 f = key.replace("-", "_")
102 try:
103 fieldtype = fields[f].type
104 except KeyError:
105 raise ValueError(
106 f"Item {key!r} not recognized inside {cls.__name__} at {prefix}"
107 ) from None
109 kwargs[f] = _load(value, fieldtype, keyprefix, depth_limit, basefile)
111 try:
112 return cls(**kwargs)
113 except ValueError as e:
114 raise ValueError(
115 f"Error constructing {cls.__name__} at {prefix}: {e}"
116 ) from e
117 except TypeError as e:
118 missing = [
119 f.name
120 for f in dataclasses.fields(cls)
121 if f.name not in kwargs
122 and f.default is dataclasses.MISSING
123 and f.default_factory is dataclasses.MISSING
124 and f.init
125 ]
126 if missing:
127 # The likely case -- what else might go wrong?
128 raise ValueError(
129 f"Construcintg an instance of {cls.__name__} at {prefix}, these items are missing: {', '.join(m.replace('_', '-') for m in missing)}"
130 )
131 else:
132 raise ValueError(
133 f"Constructing instance of {cls.__name__} at {keyprefix} failed for unexpected reasons"
134 ) from e
136 @classmethod
137 def load_from_file(cls, file: Path | str) -> Self:
138 """Loads an item from a file.
140 The file is opened, and the file type is determined from the extension.
141 The set of supported file types may vary by installed packages and
142 Python version.
143 """
144 file = Path(file)
146 match file.suffix:
147 # Cases don't need error handling if they raise ValueError type
148 # exceptions, and if imports are from the standard library in the
149 # supported Python versions
150 case ".json":
151 import json
153 with file.open("rb") as opened:
154 data = json.load(opened)
155 case ".toml":
156 import tomllib
158 with file.open("rb") as opened:
159 data = tomllib.load(opened)
160 case ".yaml" | ".yml":
161 try:
162 import yaml
163 except ImportError:
164 raise ValueError(
165 "Loading configuration from YAML files requires the `pyyaml` package installed."
166 )
168 with file.open("rb") as opened:
169 data = yaml.safe_load(opened)
170 case ".diag" | ".edn":
171 try:
172 import cbor_diag
173 import cbor2
174 except ImportError:
175 raise ValueError(
176 "Loading configuration from CBOR EDN (Diagnostic Notation) files requires the `cbor-diag` and `cbor2` packages installed."
177 )
179 data = cbor2.loads(
180 cbor_diag.diag2cbor(file.read_text(encoding="utf-8"))
181 )
182 case extension:
183 raise ValueError(
184 f"Unsupported extension {extension!r}. Supported are .toml, .json and (depending on installed modules) .yml / .yaml and .edn / .diag"
185 )
187 return cls.load(data, basefile=file)
190def _load(value, fieldtype, keyprefix, depth_limit, basefile):
191 if depth_limit == 0:
192 raise ValueError("Nesting exceeded limit in {keyprefix}")
194 # FIXME: isinstance(value, fieldtype) requires a pre-check because it can't
195 # do dict[str, int] -- but we can't just check for being a type either,
196 # because Optional[str] or SomeType | None works well with isinstance
198 # Things would be much easier if we could do `isinstance(value, fieldtype)`
199 # not just for the working (42, int) and (42, Optional[int]) and ({}, dict
200 # | list), but also for the non-working ({"x":1}, dict[str, int]) and more
201 # complex cases of that style: "isinstance() argument 2 cannot be a
202 # parameterized generic".
203 #
204 # As the offending parameterized generics can not be just the top-level
205 # annotation but also part of a union, we have to dissect them:
206 fieldtypes = _unpack_union(fieldtype)
208 # As we have a list, we try to match it greedily.
209 for fieldtype in fieldtypes:
210 if isinstance(fieldtype, types.GenericAlias):
211 if fieldtype.__origin__ is dict and len(fieldtype.__args__) == 2:
212 if fieldtype.__args__[0] is not str:
213 raise TypeError(
214 "Annotations of dict are limited to using str as key."
215 )
216 if not isinstance(value, dict):
217 # Clear mismatch, continue searching
218 continue
219 # Locking in now: At this point, a dict was promised, and we
220 # expect it to be. (Might revisit if this is impractical, but I
221 # guess that alternatives would be more specific and just
222 # picked before the generic option).
223 non_string_keys = [k for k in value.keys() if not isinstance(k, str)]
224 if non_string_keys:
225 raise ValueError(
226 f"Non-string key(s) found at {keyprefix}: {non_string_keys}"
227 )
228 return {
229 k: _load(
230 v,
231 fieldtype.__args__[1],
232 f"{keyprefix}[{k}]",
233 depth_limit - 1,
234 basefile,
235 )
236 for (k, v) in value.items()
237 }
238 elif fieldtype.__origin__ is list and len(fieldtype.__args__) == 1:
239 if not isinstance(value, list):
240 # Clear mismatch, continue searching
241 continue
242 return [
243 _load(
244 v,
245 fieldtype.__args__[0],
246 "f{keyprefix}[{i}]",
247 depth_limit - 1,
248 basefile,
249 )
250 for (i, v) in enumerate(value)
251 ]
252 else:
253 raise TypeError(
254 "Annotations of generic aliases are limited to the shape dict[K, V] and list[T]."
255 )
257 if not isinstance(fieldtype, type):
258 raise TypeError(
259 "Annotation can not be processed: Can only process unions over types and some generic aliases (eg. dict[str, str])"
260 )
262 if fieldtype is Path and isinstance(value, str):
263 if basefile is None:
264 return Path(value)
265 else:
266 return basefile.parent / value
268 if isinstance(value, fieldtype):
269 # This case covers
270 # * "hello" for str
271 # * MyLoadable(…) for MyLoadable (which is something odd but allowed)
272 #
273 # It'd also tolerate
274 # * 52 for Optional[int]
275 # * None for MyLoadable | None
276 # but those are taken care of already by _unpack_union
277 return value
279 if isinstance(value, dict) and issubclass(fieldtype, LoadStoreClass):
280 # FIXME: allow annotating single distinct non-dict-value, eg.
281 # like Cargo.toml's implicit version in dependencies. (Right
282 # now we can do this can be done at the parent level, maybe
283 # that suffices?).
284 return fieldtype.load(
285 value, _prefix=keyprefix, depth_limit=depth_limit - 1, basefile=basefile
286 )
288 # FIXME:
289 # - For lists and dicts, evaluate items (can be deferred until we actually have any)
290 # - Special treatment for bytes: Accept {"acii": "hello"} and {"hex": "001122"}
291 # - Special treatment for Path (probably with new filename argument)
292 # - In union handling, support multiple, possibly fanning out by disambiguator keys?
294 expected = " or ".join(
295 f"dict (representing {t.__name__})"
296 if issubclass(t, LoadStoreClass)
297 else t.__name__
298 for t in fieldtypes
299 )
300 raise ValueError(
301 f"Type mismatch on {keyprefix}: Expected {expected}, found {type(value).__name__}"
302 )
305def _unpack_union(annotation) -> tuple:
306 """If the annotation is a Union (including Optional), this returns a tuple
307 of union'd types; otherwise a tuple containing only the annotation.
309 When Python 3.13 support is dropped, this can be simplified based on
310 type(Optional[str]) being Union.
313 >>> from typing import *
314 >>> _unpack_union(str) == (str,)
315 True
316 >>> _unpack_union(str | None) == (str, type(None))
317 True
318 >>> _unpack_union(Optional[dict[str, str]]) == (dict[str, str], type(None))
319 True
320 """
321 if sys.version_info >= (3, 14):
322 if isinstance(annotation, Union):
323 return annotation.__args__
324 else:
325 if type(annotation).__name__ in ("_UnionGenericAlias", "UnionType"):
326 return annotation.__args__
327 return (annotation,)