Coverage for aiocoap/optiontypes.py: 84%

119 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-28 12:34 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5import abc 

6import collections 

7 

8from .numbers.contentformat import ContentFormat 

9 

10 

11def _to_minimum_bytes(value): 

12 return value.to_bytes((value.bit_length() + 7) // 8, "big") 

13 

14 

15class OptionType(metaclass=abc.ABCMeta): 

16 """Interface for decoding and encoding option values 

17 

18 Instances of :class:`OptionType` are collected in a list in a 

19 :attr:`.Message.opt` :class:`.Options` object, and provide a translation 

20 between the CoAP octet-stream (accessed using the 

21 :meth:`encode()`/:meth:`decode()` method pair) and the interpreted value 

22 (accessed via the :attr:`value` attribute). 

23 

24 Note that OptionType objects usually don't need to be handled by library 

25 users; the recommended way to read and set options is via the Options 

26 object'sproperties (eg. ``message.opt.uri_path = ('.well-known', 

27 'core')``).""" 

28 

29 @abc.abstractmethod 

30 def __init__(self, number, value): 

31 """Set the `self.name` and `self.value` attributes""" 

32 

33 @abc.abstractmethod 

34 def encode(self): 

35 """Return the option's value in serialzied form""" 

36 

37 @abc.abstractmethod 

38 def decode(self, rawdata): 

39 """Set the option's value from the bytes in rawdata""" 

40 

41 def _repr_html_(self): 

42 import html 

43 

44 return f"<tt>{html.escape(repr(self))}</tt>" 

45 

46 

47class StringOption(OptionType): 

48 """String CoAP option - used to represent string options. Always encoded in 

49 UTF8 per CoAP specification.""" 

50 

51 def __init__(self, number, value=""): 

52 self.value = value 

53 self.number = number 

54 

55 def encode(self): 

56 # FIXME: actually, this should be utf8 of the net-unicode form (maybe it is) 

57 rawdata = self.value.encode("utf-8") 

58 return rawdata 

59 

60 def decode(self, rawdata): 

61 self.value = rawdata.decode("utf-8") 

62 

63 def __str__(self): 

64 return self.value 

65 

66 def _repr_html_(self): 

67 import html 

68 

69 return f"<tt>{html.escape(repr(self.value))}</tt>" 

70 

71 

72class OpaqueOption(OptionType): 

73 """Opaque CoAP option - used to represent options that just have their 

74 uninterpreted bytes as value.""" 

75 

76 def __init__(self, number, value=b""): 

77 self.value = value 

78 self.number = number 

79 

80 def encode(self): 

81 rawdata = self.value 

82 return rawdata 

83 

84 def decode(self, rawdata): 

85 self.value = rawdata 

86 

87 def __str__(self): 

88 return repr(self.value) 

89 

90 def _repr_html_(self): 

91 return f"<tt>{self.value.hex()}</tt>" 

92 

93 

94class UintOption(OptionType): 

95 """Uint CoAP option - used to represent integer options.""" 

96 

97 def __init__(self, number, value=0): 

98 self.value = value 

99 self.number = number 

100 

101 def encode(self): 

102 return _to_minimum_bytes(int(self.value)) 

103 

104 def decode(self, rawdata): 

105 self.value = int.from_bytes(rawdata, "big") 

106 

107 def __str__(self): 

108 return str(self.value) 

109 

110 def _repr_html_(self): 

111 import html 

112 

113 return f"<tt>{html.escape(repr(self.value))}</tt>" 

114 

115 

116class TypedOption(OptionType, metaclass=abc.ABCMeta): 

117 @property 

118 @abc.abstractmethod 

119 def type(self) -> type: 

120 """Checked type of the option""" 

121 

122 def __init__(self, number, value=None): 

123 self.number = number 

124 # FIXME when is this ever initialized without value? 

125 if value is not None: 

126 self.value = value 

127 

128 value = property( 

129 lambda self: self._value, lambda self, value: self._set_from_opt_value(value) 

130 ) 

131 

132 def _set_from_opt_value(self, value: object): 

133 """Convert a value set as ``message.opt.option_name = value`` into the 

134 stored value. By default, this does an eager isinstance check on the 

135 value (anticipating that encoding an unsuitable value would otherwise 

136 fail at a hard-to-debug location).""" 

137 if not isinstance(value, self.type): 

138 raise ValueError( 

139 "Setting values of type %s is not supported on this option" 

140 % type(value) 

141 ) 

142 self._value = value 

143 

144 def __str__(self): 

145 return str(self.value) 

146 

147 def _repr_html_(self): 

148 if hasattr(self.value, "_repr_html_"): 

149 return self.value._repr_html_() 

150 else: 

151 import html 

152 

153 return f"<tt>{html.escape(repr(self.value))}</tt>" 

154 

155 

156class BlockOption(TypedOption): 

157 """Block CoAP option - special option used only for Block1 and Block2 options. 

158 Currently it is the only type of CoAP options that has 

159 internal structure. 

160 

161 That structure (BlockwiseTuple) covers not only the block options of 

162 RFC7959, but also the BERT extension of RFC8323. If the reserved size 

163 exponent 7 is used for purposes incompatible with BERT, the implementor 

164 might want to look at the context dependent option number 

165 interpretations which will hopefully be in place for Signaling (7.xx) 

166 messages by then.""" 

167 

168 class BlockwiseTuple( 

169 collections.namedtuple( 

170 "_BlockwiseTuple", ["block_number", "more", "size_exponent"] 

171 ) 

172 ): 

173 @property 

174 def size(self): 

175 return 2 ** (min(self.size_exponent, 6) + 4) 

176 

177 @property 

178 def start(self): 

179 """The byte offset in the body indicated by block number and size. 

180 

181 Note that this calculation is only valid for descriptive use and 

182 Block2 control use. The semantics of block_number and size in 

183 Block1 control use are unrelated (indicating the acknowledged block 

184 number in the request Block1 size and the server's preferred block 

185 size), and must not be calculated using this property in that 

186 case.""" 

187 return self.block_number * self.size 

188 

189 @property 

190 def is_bert(self): 

191 """True if the exponent is recognized to signal a BERT message.""" 

192 return self.size_exponent == 7 

193 

194 def is_valid_for_payload_size(self, payloadsize): 

195 if self.is_bert: 

196 if self.more: 

197 return payloadsize % 1024 == 0 

198 return True 

199 else: 

200 if self.more: 

201 return payloadsize == self.size 

202 else: 

203 return payloadsize <= self.size 

204 

205 def reduced_to(self, maximum_exponent): 

206 """Return a BlockwiseTuple whose exponent is capped to the given 

207 maximum_exponent 

208 

209 >>> initial = BlockOption.BlockwiseTuple(10, 0, 5) 

210 >>> initial == initial.reduced_to(6) 

211 True 

212 >>> initial.reduced_to(3) 

213 BlockwiseTuple(block_number=40, more=0, size_exponent=3) 

214 """ 

215 if maximum_exponent >= self.size_exponent: 

216 return self 

217 if maximum_exponent == 6 and self.size_exponent == 7: 

218 return (self.block_number, self.more, 6) 

219 increasednumber = self.block_number << ( 

220 min(self.size_exponent, 6) - maximum_exponent 

221 ) 

222 return type(self)(increasednumber, self.more, maximum_exponent) 

223 

224 type = BlockwiseTuple 

225 

226 def encode(self): 

227 as_integer = ( 

228 (self.value.block_number << 4) 

229 + (self.value.more * 0x08) 

230 + self.value.size_exponent 

231 ) 

232 return _to_minimum_bytes(as_integer) 

233 

234 def decode(self, rawdata): 

235 as_integer = int.from_bytes(rawdata, "big") 

236 self.value = self.BlockwiseTuple( 

237 block_number=(as_integer >> 4), 

238 more=bool(as_integer & 0x08), 

239 size_exponent=(as_integer & 0x07), 

240 ) 

241 

242 def _set_from_opt_value(self, value): 

243 # Casting it through the constructor makes it easy to set the option as 

244 # `(num, more, sz)` without having to pick the type out of a very long 

245 # module name 

246 super()._set_from_opt_value(self.type(*value)) 

247 

248 

249class ContentFormatOption(TypedOption): 

250 """Type of numeric options whose number has :class:`ContentFormat` 

251 semantics""" 

252 

253 type = ContentFormat 

254 

255 def encode(self): 

256 return _to_minimum_bytes(int(self.value)) 

257 

258 def decode(self, rawdata): 

259 as_integer = int.from_bytes(rawdata, "big") 

260 self._value = ContentFormat(as_integer) 

261 

262 def _set_from_opt_value(self, value): 

263 super()._set_from_opt_value(ContentFormat(value))