Coverage for aiocoap/options.py: 96%

146 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-28 12:34 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5from itertools import chain 

6from warnings import warn 

7 

8from .numbers.optionnumbers import OptionNumber 

9from .error import UnparsableMessage 

10 

11 

12def _read_extended_field_value(value, rawdata): 

13 """Used to decode large values of option delta and option length 

14 from raw binary form.""" 

15 if value >= 0 and value < 13: 

16 return (value, rawdata) 

17 elif value == 13: 

18 if len(rawdata) < 1: 

19 raise UnparsableMessage("Option ended prematurely") 

20 return (rawdata[0] + 13, rawdata[1:]) 

21 elif value == 14: 

22 if len(rawdata) < 2: 

23 raise UnparsableMessage("Option ended prematurely") 

24 return (int.from_bytes(rawdata[:2], "big") + 269, rawdata[2:]) 

25 else: 

26 raise UnparsableMessage("Option contained partial payload marker.") 

27 

28 

29def _write_extended_field_value(value): 

30 """Used to encode large values of option delta and option length 

31 into raw binary form. 

32 In CoAP option delta and length can be represented by a variable 

33 number of bytes depending on the value.""" 

34 if value >= 0 and value < 13: 

35 return (value, b"") 

36 elif value >= 13 and value < 269: 

37 return (13, (value - 13).to_bytes(1, "big")) 

38 elif value >= 269 and value < 65804: 

39 return (14, (value - 269).to_bytes(2, "big")) 

40 else: 

41 raise ValueError("Value out of range.") 

42 

43 

44def _single_value_view(option_number, doc=None, deprecated=None): 

45 """Generate a property for a given option number, where the option is not 

46 repeatable. For getting, it will return the value of the first option 

47 object with matching number. For setting, it will remove all options with 

48 that number and create one with the given value. The property can be 

49 deleted, resulting in removal of the option from the header. 

50 

51 For consistency, setting the value to None also clears the option. (Note 

52 that with the currently implemented optiontypes, None is not a valid value 

53 for any of them).""" 

54 

55 def _getter(self, option_number=option_number): 

56 if deprecated is not None: 

57 warn(deprecated, stacklevel=2) 

58 options = self.get_option(option_number) 

59 if not options: 

60 return None 

61 else: 

62 return options[0].value 

63 

64 def _setter(self, value, option_number=option_number): 

65 if deprecated is not None: 

66 # Stack level 3 is what it takes to get out of the typical 

67 # `Message(opt=val)` construction, and probably most useful. 

68 warn(deprecated, stacklevel=3) 

69 self.delete_option(option_number) 

70 if value is not None: 

71 self.add_option(option_number.create_option(value=value)) 

72 

73 def _deleter(self, option_number=option_number): 

74 if deprecated is not None: 

75 warn(deprecated, stacklevel=2) 

76 self.delete_option(option_number) 

77 

78 return property( 

79 _getter, 

80 _setter, 

81 _deleter, 

82 doc or "Single-value view on the %s option." % option_number, 

83 ) 

84 

85 

86def _items_view(option_number, doc=None): 

87 """Generate a property for a given option number, where the option is 

88 repeatable. For getting, it will return a tuple of the values of the option 

89 objects with matching number. For setting, it will remove all options with 

90 that number and create new ones from the given iterable.""" 

91 

92 def _getter(self, option_number=option_number): 

93 return tuple(o.value for o in self.get_option(option_number)) 

94 

95 def _setter(self, value, option_number=option_number): 

96 self.delete_option(option_number) 

97 for v in value: 

98 self.add_option(option_number.create_option(value=v)) 

99 

100 def _deleter(self, option_number=option_number): 

101 self.delete_option(option_number) 

102 

103 return property( 

104 _getter, 

105 _setter, 

106 _deleter, 

107 doc=doc or "Iterable view on the %s option." % option_number, 

108 ) 

109 

110 

111def _empty_presence_view(option_number, doc=None): 

112 """Generate a property for a given option number, where the option is not 

113 repeatable and (usually) empty. The values True and False are mapped to 

114 presence and absence of the option.""" 

115 

116 def _getter(self, option_number=option_number): 

117 return bool(self.get_option(option_number)) 

118 

119 def _setter(self, value, option_number=option_number): 

120 self.delete_option(option_number) 

121 if value: 

122 self.add_option(option_number.create_option()) 

123 

124 return property( 

125 _getter, _setter, doc=doc or "Presence of the %s option." % option_number 

126 ) 

127 

128 

129class Options(object): 

130 """Represent CoAP Header Options.""" 

131 

132 # this is not so much an optimization as a safeguard -- if custom 

133 # attributes were placed here, they could be accessed but would not be 

134 # serialized 

135 __slots__ = ["_options"] 

136 

137 def __init__(self): 

138 self._options = {} 

139 

140 def __eq__(self, other): 

141 if not isinstance(other, Options): 

142 return NotImplemented 

143 # this implementation is much easier than implementing equality on 

144 # StringOption etc 

145 return self.encode() == other.encode() 

146 

147 def __repr__(self): 

148 text = ", ".join( 

149 "%s: %s" % (OptionNumber(k), " / ".join(map(str, v))) 

150 for (k, v) in self._options.items() 

151 ) 

152 return "<aiocoap.options.Options at %#x: %s>" % (id(self), text or "empty") 

153 

154 def _repr_html_(self): 

155 if self._options: 

156 n_opt = sum(len(o) for o in self._options.values()) 

157 items = ( 

158 f'<li value="{int(k)}">{OptionNumber(k)._repr_html_()}: {", ".join(vi._repr_html_() for vi in v)}' 

159 for (k, v) in sorted(self._options.items()) 

160 ) 

161 return f"""<details><summary style="display:list-item">{n_opt} option{'s' if n_opt != 1 else ''}</summary><ol>{''.join(items)}</ol></details>""" 

162 else: 

163 return "<div>No options</div>" 

164 

165 def decode(self, rawdata): 

166 """Passed a CoAP message body after the token as rawdata, fill self 

167 with the options starting at the beginning of rawdata, an return the 

168 rest of the message (the body).""" 

169 option_number = OptionNumber(0) 

170 

171 while rawdata: 

172 if rawdata[0] == 0xFF: 

173 return rawdata[1:] 

174 dllen = rawdata[0] 

175 delta = (dllen & 0xF0) >> 4 

176 length = dllen & 0x0F 

177 rawdata = rawdata[1:] 

178 (delta, rawdata) = _read_extended_field_value(delta, rawdata) 

179 (length, rawdata) = _read_extended_field_value(length, rawdata) 

180 option_number += delta 

181 if len(rawdata) < length: 

182 raise UnparsableMessage("Option announced but absent") 

183 option = option_number.create_option(decode=rawdata[:length]) 

184 self.add_option(option) 

185 rawdata = rawdata[length:] 

186 return b"" 

187 

188 def encode(self): 

189 """Encode all options in option header into string of bytes.""" 

190 data = [] 

191 current_opt_num = 0 

192 for option in self.option_list(): 

193 optiondata = option.encode() 

194 

195 delta, extended_delta = _write_extended_field_value( 

196 option.number - current_opt_num 

197 ) 

198 length, extended_length = _write_extended_field_value(len(optiondata)) 

199 

200 data.append(bytes([((delta & 0x0F) << 4) + (length & 0x0F)])) 

201 data.append(extended_delta) 

202 data.append(extended_length) 

203 data.append(optiondata) 

204 

205 current_opt_num = option.number 

206 

207 return b"".join(data) 

208 

209 def add_option(self, option): 

210 """Add option into option header.""" 

211 self._options.setdefault(option.number, []).append(option) 

212 

213 def delete_option(self, number): 

214 """Delete option from option header.""" 

215 if number in self._options: 

216 self._options.pop(number) 

217 

218 def get_option(self, number): 

219 """Get option with specified number.""" 

220 return self._options.get(number, ()) 

221 

222 def option_list(self): 

223 return chain.from_iterable( 

224 sorted(self._options.values(), key=lambda x: x[0].number) 

225 ) 

226 

227 uri_path = _items_view(OptionNumber.URI_PATH) 

228 uri_query = _items_view(OptionNumber.URI_QUERY) 

229 location_path = _items_view(OptionNumber.LOCATION_PATH) 

230 location_query = _items_view(OptionNumber.LOCATION_QUERY) 

231 block2 = _single_value_view(OptionNumber.BLOCK2) 

232 block1 = _single_value_view(OptionNumber.BLOCK1) 

233 content_format = _single_value_view(OptionNumber.CONTENT_FORMAT) 

234 etag = _single_value_view(OptionNumber.ETAG, "Single ETag as used in responses") 

235 etags = _items_view(OptionNumber.ETAG, "List of ETags as used in requests") 

236 if_none_match = _empty_presence_view(OptionNumber.IF_NONE_MATCH) 

237 observe = _single_value_view(OptionNumber.OBSERVE) 

238 accept = _single_value_view(OptionNumber.ACCEPT) 

239 uri_host = _single_value_view(OptionNumber.URI_HOST) 

240 uri_port = _single_value_view(OptionNumber.URI_PORT) 

241 proxy_uri = _single_value_view(OptionNumber.PROXY_URI) 

242 proxy_scheme = _single_value_view(OptionNumber.PROXY_SCHEME) 

243 size1 = _single_value_view(OptionNumber.SIZE1) 

244 oscore = _single_value_view(OptionNumber.OSCORE) 

245 object_security = _single_value_view( 

246 OptionNumber.OSCORE, deprecated="Use `oscore` instead of `object_security`" 

247 ) 

248 max_age = _single_value_view(OptionNumber.MAX_AGE) 

249 if_match = _items_view(OptionNumber.IF_MATCH) 

250 no_response = _single_value_view(OptionNumber.NO_RESPONSE) 

251 echo = _single_value_view(OptionNumber.ECHO) 

252 request_tag = _items_view(OptionNumber.REQUEST_TAG) 

253 hop_limit = _single_value_view(OptionNumber.HOP_LIMIT) 

254 request_hash = _single_value_view( 

255 OptionNumber.REQUEST_HASH, 

256 "Experimental property for draft-amsuess-core-cachable-oscore", 

257 ) 

258 edhoc = _empty_presence_view(OptionNumber.EDHOC) 

259 size2 = _single_value_view(OptionNumber.SIZE2)