Coverage for aiocoap/options.py: 96%
146 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5from itertools import chain
6from warnings import warn
8from .numbers.optionnumbers import OptionNumber
9from .error import UnparsableMessage
12def _read_extended_field_value(value, rawdata):
13 """Used to decode large values of option delta and option length
14 from raw binary form."""
15 if value >= 0 and value < 13:
16 return (value, rawdata)
17 elif value == 13:
18 if len(rawdata) < 1:
19 raise UnparsableMessage("Option ended prematurely")
20 return (rawdata[0] + 13, rawdata[1:])
21 elif value == 14:
22 if len(rawdata) < 2:
23 raise UnparsableMessage("Option ended prematurely")
24 return (int.from_bytes(rawdata[:2], "big") + 269, rawdata[2:])
25 else:
26 raise UnparsableMessage("Option contained partial payload marker.")
29def _write_extended_field_value(value):
30 """Used to encode large values of option delta and option length
31 into raw binary form.
32 In CoAP option delta and length can be represented by a variable
33 number of bytes depending on the value."""
34 if value >= 0 and value < 13:
35 return (value, b"")
36 elif value >= 13 and value < 269:
37 return (13, (value - 13).to_bytes(1, "big"))
38 elif value >= 269 and value < 65804:
39 return (14, (value - 269).to_bytes(2, "big"))
40 else:
41 raise ValueError("Value out of range.")
44def _single_value_view(option_number, doc=None, deprecated=None):
45 """Generate a property for a given option number, where the option is not
46 repeatable. For getting, it will return the value of the first option
47 object with matching number. For setting, it will remove all options with
48 that number and create one with the given value. The property can be
49 deleted, resulting in removal of the option from the header.
51 For consistency, setting the value to None also clears the option. (Note
52 that with the currently implemented optiontypes, None is not a valid value
53 for any of them)."""
55 def _getter(self, option_number=option_number):
56 if deprecated is not None:
57 warn(deprecated, stacklevel=2)
58 options = self.get_option(option_number)
59 if not options:
60 return None
61 else:
62 return options[0].value
64 def _setter(self, value, option_number=option_number):
65 if deprecated is not None:
66 # Stack level 3 is what it takes to get out of the typical
67 # `Message(opt=val)` construction, and probably most useful.
68 warn(deprecated, stacklevel=3)
69 self.delete_option(option_number)
70 if value is not None:
71 self.add_option(option_number.create_option(value=value))
73 def _deleter(self, option_number=option_number):
74 if deprecated is not None:
75 warn(deprecated, stacklevel=2)
76 self.delete_option(option_number)
78 return property(
79 _getter,
80 _setter,
81 _deleter,
82 doc or "Single-value view on the %s option." % option_number,
83 )
86def _items_view(option_number, doc=None):
87 """Generate a property for a given option number, where the option is
88 repeatable. For getting, it will return a tuple of the values of the option
89 objects with matching number. For setting, it will remove all options with
90 that number and create new ones from the given iterable."""
92 def _getter(self, option_number=option_number):
93 return tuple(o.value for o in self.get_option(option_number))
95 def _setter(self, value, option_number=option_number):
96 self.delete_option(option_number)
97 for v in value:
98 self.add_option(option_number.create_option(value=v))
100 def _deleter(self, option_number=option_number):
101 self.delete_option(option_number)
103 return property(
104 _getter,
105 _setter,
106 _deleter,
107 doc=doc or "Iterable view on the %s option." % option_number,
108 )
111def _empty_presence_view(option_number, doc=None):
112 """Generate a property for a given option number, where the option is not
113 repeatable and (usually) empty. The values True and False are mapped to
114 presence and absence of the option."""
116 def _getter(self, option_number=option_number):
117 return bool(self.get_option(option_number))
119 def _setter(self, value, option_number=option_number):
120 self.delete_option(option_number)
121 if value:
122 self.add_option(option_number.create_option())
124 return property(
125 _getter, _setter, doc=doc or "Presence of the %s option." % option_number
126 )
129class Options(object):
130 """Represent CoAP Header Options."""
132 # this is not so much an optimization as a safeguard -- if custom
133 # attributes were placed here, they could be accessed but would not be
134 # serialized
135 __slots__ = ["_options"]
137 def __init__(self):
138 self._options = {}
140 def __eq__(self, other):
141 if not isinstance(other, Options):
142 return NotImplemented
143 # this implementation is much easier than implementing equality on
144 # StringOption etc
145 return self.encode() == other.encode()
147 def __repr__(self):
148 text = ", ".join(
149 "%s: %s" % (OptionNumber(k), " / ".join(map(str, v)))
150 for (k, v) in self._options.items()
151 )
152 return "<aiocoap.options.Options at %#x: %s>" % (id(self), text or "empty")
154 def _repr_html_(self):
155 if self._options:
156 n_opt = sum(len(o) for o in self._options.values())
157 items = (
158 f'<li value="{int(k)}">{OptionNumber(k)._repr_html_()}: {", ".join(vi._repr_html_() for vi in v)}'
159 for (k, v) in sorted(self._options.items())
160 )
161 return f"""<details><summary style="display:list-item">{n_opt} option{'s' if n_opt != 1 else ''}</summary><ol>{''.join(items)}</ol></details>"""
162 else:
163 return "<div>No options</div>"
165 def decode(self, rawdata):
166 """Passed a CoAP message body after the token as rawdata, fill self
167 with the options starting at the beginning of rawdata, an return the
168 rest of the message (the body)."""
169 option_number = OptionNumber(0)
171 while rawdata:
172 if rawdata[0] == 0xFF:
173 return rawdata[1:]
174 dllen = rawdata[0]
175 delta = (dllen & 0xF0) >> 4
176 length = dllen & 0x0F
177 rawdata = rawdata[1:]
178 (delta, rawdata) = _read_extended_field_value(delta, rawdata)
179 (length, rawdata) = _read_extended_field_value(length, rawdata)
180 option_number += delta
181 if len(rawdata) < length:
182 raise UnparsableMessage("Option announced but absent")
183 option = option_number.create_option(decode=rawdata[:length])
184 self.add_option(option)
185 rawdata = rawdata[length:]
186 return b""
188 def encode(self):
189 """Encode all options in option header into string of bytes."""
190 data = []
191 current_opt_num = 0
192 for option in self.option_list():
193 optiondata = option.encode()
195 delta, extended_delta = _write_extended_field_value(
196 option.number - current_opt_num
197 )
198 length, extended_length = _write_extended_field_value(len(optiondata))
200 data.append(bytes([((delta & 0x0F) << 4) + (length & 0x0F)]))
201 data.append(extended_delta)
202 data.append(extended_length)
203 data.append(optiondata)
205 current_opt_num = option.number
207 return b"".join(data)
209 def add_option(self, option):
210 """Add option into option header."""
211 self._options.setdefault(option.number, []).append(option)
213 def delete_option(self, number):
214 """Delete option from option header."""
215 if number in self._options:
216 self._options.pop(number)
218 def get_option(self, number):
219 """Get option with specified number."""
220 return self._options.get(number, ())
222 def option_list(self):
223 return chain.from_iterable(
224 sorted(self._options.values(), key=lambda x: x[0].number)
225 )
227 uri_path = _items_view(OptionNumber.URI_PATH)
228 uri_query = _items_view(OptionNumber.URI_QUERY)
229 location_path = _items_view(OptionNumber.LOCATION_PATH)
230 location_query = _items_view(OptionNumber.LOCATION_QUERY)
231 block2 = _single_value_view(OptionNumber.BLOCK2)
232 block1 = _single_value_view(OptionNumber.BLOCK1)
233 content_format = _single_value_view(OptionNumber.CONTENT_FORMAT)
234 etag = _single_value_view(OptionNumber.ETAG, "Single ETag as used in responses")
235 etags = _items_view(OptionNumber.ETAG, "List of ETags as used in requests")
236 if_none_match = _empty_presence_view(OptionNumber.IF_NONE_MATCH)
237 observe = _single_value_view(OptionNumber.OBSERVE)
238 accept = _single_value_view(OptionNumber.ACCEPT)
239 uri_host = _single_value_view(OptionNumber.URI_HOST)
240 uri_port = _single_value_view(OptionNumber.URI_PORT)
241 proxy_uri = _single_value_view(OptionNumber.PROXY_URI)
242 proxy_scheme = _single_value_view(OptionNumber.PROXY_SCHEME)
243 size1 = _single_value_view(OptionNumber.SIZE1)
244 oscore = _single_value_view(OptionNumber.OSCORE)
245 object_security = _single_value_view(
246 OptionNumber.OSCORE, deprecated="Use `oscore` instead of `object_security`"
247 )
248 max_age = _single_value_view(OptionNumber.MAX_AGE)
249 if_match = _items_view(OptionNumber.IF_MATCH)
250 no_response = _single_value_view(OptionNumber.NO_RESPONSE)
251 echo = _single_value_view(OptionNumber.ECHO)
252 request_tag = _items_view(OptionNumber.REQUEST_TAG)
253 hop_limit = _single_value_view(OptionNumber.HOP_LIMIT)
254 request_hash = _single_value_view(
255 OptionNumber.REQUEST_HASH,
256 "Experimental property for draft-amsuess-core-cachable-oscore",
257 )
258 edhoc = _empty_presence_view(OptionNumber.EDHOC)
259 size2 = _single_value_view(OptionNumber.SIZE2)