Coverage for aiocoap/resourcedirectory/client/register.py: 14%

164 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-28 12:34 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""Client components for registering with a resource-directory server""" 

6 

7import asyncio 

8from urllib.parse import urljoin, urlparse 

9import logging 

10 

11from socket import getfqdn 

12 

13from ...util.linkformat import link_header 

14 

15from ...message import Message 

16from ...numbers import GET, POST, DELETE, SERVICE_UNAVAILABLE, NOT_FOUND 

17 

18__all__ = ["Registerer"] 

19 

20 

21class Registerer: 

22 """Implementation of the client side of the registration of a resource 

23 directory. Until the object is :meth:`shut down <shutdown>`, it keeps the 

24 registration alive. It works both for registering the own context as well 

25 as for registering others (taking the role of a commissioning tool). 

26 

27 The :attr:`state` attribute is kept up to date with an informal 

28 representation of whether the registration is currently active. 

29 

30 If any step in the registration fails, the object will not retry 

31 indefinitely, and it will back off to earlier steps; after a limited number 

32 of retries after the last successful step, the object permanently enters a 

33 failed state. (In future extension, it might listen for external events 

34 that allow it to restart heuristics, like a new network interface coming 

35 up). 

36 

37 The registration does not observe the resource list of the registered host 

38 (yet?), so registrations are only kept alive and never updated.""" 

39 

40 def __init__( 

41 self, 

42 context, 

43 rd=None, 

44 lt=90000, 

45 name_from_hostname=None, 

46 link_source=None, 

47 registration_parameters={}, 

48 loggername="coap-rd-registerer", 

49 ): 

50 """Use a ``context`` to create a registration at the Resource 

51 directiory at ``rd`` (defaulting to "find an RD yourself"; URIs should 

52 have no path component, unless the user wishes to sidestep the URI 

53 discovery step). It will be renewed every `lifetime` seconds. 

54 

55 The registration data will be obtained by querying the context's site's 

56 ``.well-known/core`` resource, unless another source URI is given in 

57 ``link_source``, in which case this object acts as a Commissioning 

58 Tool. 

59 

60 Parameters to pass with the registration can be given in 

61 ``registration_parameters``. ``lt`` and ``base`` default to the 

62 constructor arguments ``lt`` and ``link_source``, respectively. 

63 

64 If ``name_from_hostname`` is True (or by default if ``ep`` is not 

65 present in ``registration_parameters``), the ``ep`` and ``d`` 

66 registration parameters are derived from the host name.""" 

67 

68 self._context = context 

69 

70 self._link_source = None 

71 self._link_data = None #: Message 

72 self._lt = lt 

73 self._initial_rd = rd 

74 

75 self._directory_resource = None 

76 self._registration_resource = None 

77 

78 self._registration_parameters = dict(registration_parameters) 

79 

80 if name_from_hostname or ( 

81 name_from_hostname is None and "ep" not in registration_parameters 

82 ): 

83 ep, _, d = getfqdn().partition(".") 

84 self._registration_parameters["ep"] = ep 

85 if d: 

86 self._registration_parameters["d"] = d 

87 

88 self.log = logging.getLogger(loggername) 

89 

90 self._set_state("starting") 

91 self._task = asyncio.create_task(self._run()) 

92 

93 def __repr__(self): 

94 return "<%s at %#x: registering at %s as %s (currently %s)>" % ( 

95 type(self).__name__, 

96 id(self), 

97 self._registration_resource or self._directory_resource or self._initial_rd, 

98 self._registration_parameters, 

99 self.state, 

100 ) 

101 

102 def _set_state(self, newstate): 

103 self.log.debug("Entering state %s", newstate) 

104 self.state = newstate 

105 

106 async def _fill_directory_resource(self, blacklist=set()): 

107 # FIXME: this should at some point catch network errors (short of 

108 # falling back to "RD discovery failed, backing off"), but that needs 

109 # falling back to other discovery methods here, and i don't know how 

110 # this will be done yet 

111 

112 if self._directory_resource is not None: 

113 return 

114 

115 if self._initial_rd is None: 

116 # FIXME: can't access DHCP options generically, dunno about SLAAC. 

117 # It seems to be a sane assumption that the best thing to do is to 

118 # assume we're on a big host and multicast is cheap here. 

119 self._directory_resource = await self._discovery_directory_uri( 

120 "coap://[ff05::fd]]", blacklist=blacklist 

121 ) 

122 else: 

123 components = urlparse(self._initial_rd) 

124 if not components.scheme or not components.netloc: 

125 raise ValueError("Explicit RD URIs need to contain scheme and path") 

126 

127 if components.path: 

128 if self._initial_rd in blacklist: 

129 raise self._UnrecoverableError( 

130 "Explicitly configured RD was blacklisted" 

131 ) 

132 else: 

133 self._directory_resource = self._initial_rd 

134 else: 

135 self._directory_resource = await self._discovery_directory_uri( 

136 self._initial_rd, blacklist=blacklist 

137 ) 

138 

139 async def _discovery_directory_uri(self, host, blacklist=set()): 

140 lookup_uri = urljoin(host, "/.well-known/core?rt=core.rd") 

141 

142 try: 

143 # FIXME: this should be able to deal with multicasts 

144 response = await self._context.request( 

145 Message(code=GET, uri=lookup_uri, accept=40) 

146 ).response_raising 

147 links = link_header.parse(response.payload.decode("utf8")) 

148 except (UnicodeDecodeError, link_header.ParseException): 

149 self.log.error("Error parsing the RD's self description") 

150 raise 

151 

152 addresses = [ 

153 link.get_target(response.get_request_uri()) 

154 for link in links.links 

155 if "core.rd" in " ".join(link.rt).split(" ") 

156 ] 

157 unfiltered_addresses = len(addresses) 

158 addresses = [a for a in addresses if a not in blacklist] 

159 if not addresses: 

160 if len(addresses) != unfiltered_addresses: 

161 raise self._UnrecoverableError( 

162 "All discovered Directory Resources are blacklisted" 

163 ) 

164 else: 

165 raise self._UnrecoverableError( 

166 "No registration interface found in RD's response" 

167 ) 

168 

169 if len(addresses) > 1: 

170 self.log.warn( 

171 "More than one registration interface found," " picking the first" 

172 ) 

173 

174 return addresses[0] 

175 

176 async def _obtain_link_data(self): 

177 """Store a message describing the data to be POSTed to the 

178 registration interface. 

179 

180 This needs to be in :class:`Message` format, but doesn't need to have 

181 any particular code set yet (that gets set later anyway), so in effect, 

182 the response message from the con can be returned as is. 

183 """ 

184 

185 if self._link_source is None: 

186 self._link_data = Message( 

187 content_format=40, 

188 payload=str( 

189 self._context.serversite.get_resources_as_linkheader() 

190 ).encode("utf8"), 

191 ) 

192 

193 else: 

194 self._link_data = await self._context.request( 

195 Message(code=GET, uri=urljoin(self._link_source, "/.well-known/core")) 

196 ).response_raising 

197 

198 class _RetryableError(RuntimeError): 

199 """Raised when an initial registration or update rails in a way that 

200 warrants rediscovery of the RD""" 

201 

202 class _UnrecoverableError(RuntimeError): 

203 """Raised when the RD registration process runs out of options 

204 (typically with a descriptive message)""" 

205 

206 async def _request_with_retries(self, message): 

207 # FIXME: response_nonraising gives 5.00 now, but for debugging we might 

208 # want to show something better, and for URI discovery, we should not 

209 # consider this a final error 

210 response = await self._context.request(message).response_nonraising 

211 

212 unavailable_retries = 0 

213 while response.code == SERVICE_UNAVAILABLE and response.opt.max_age is not None: 

214 if unavailable_retries > 6: 

215 raise self._RetryableError( 

216 "RD responded with Service Unavailable too often" 

217 ) 

218 self.log.info("RD asked to retry the operation later") 

219 await asyncio.sleep(max(response.opt.max_age, 2 ** (unavailable_retries))) 

220 response = await self._context.request(message).response_nonraising 

221 

222 return response 

223 

224 async def _register(self): 

225 initial_message = self._link_data.copy(code=POST, uri=self._directory_resource) 

226 base_query = {} 

227 if self._lt != 90000: 

228 base_query["lt"] = str(self._lt) 

229 if self._link_source is not None: 

230 base_query["base"] = self._link_source 

231 query = dict(base_query, **self._registration_parameters) 

232 

233 initial_message.opt.uri_query = initial_message.opt.uri_query + tuple( 

234 "%s=%s" % (k, v) for (k, v) in query.items() 

235 ) 

236 

237 response = await self._request_with_retries(initial_message) 

238 

239 if not response.code.is_successful(): 

240 raise self._RetryableError( 

241 "RD responded with odd error: %s / %r" 

242 % (response.code, response.payload) 

243 ) 

244 

245 if not response.opt.location_path: 

246 raise self._RetryableError("RD responded without a location") 

247 

248 # FIXME this should probably be available from the API, and consider location_query etc 

249 self._registration_resource = urljoin( 

250 response.get_request_uri(), "/" + "/".join(response.opt.location_path) 

251 ) 

252 

253 async def _renew_registration(self): 

254 update_message = Message(code=POST, uri=self._registration_resource) 

255 

256 response = await self._request_with_retries(update_message) 

257 

258 if response.code == NOT_FOUND: 

259 raise self._RetryableError("RD forgot about the registration") 

260 

261 if not response.code.is_successful(): 

262 raise self._RetryableError( 

263 "RD responded with odd error: %s / %r" 

264 % (response.code, response.payload) 

265 ) 

266 

267 async def _run(self): 

268 obtain = asyncio.create_task(self._obtain_link_data()) 

269 

270 try: 

271 await self._run_inner(obtain) 

272 except asyncio.CancelledError: 

273 self._set_state("cancelled") 

274 pass 

275 except self._UnrecoverableError as e: 

276 self._set_state("failed") 

277 self.log.error("Aborting RD discovery: %s", e.args[0]) 

278 except Exception as e: 

279 self._set_state("failed") 

280 self.log.error( 

281 "An error occurred during RD registration, not pursuing registration any further:", 

282 exc_info=e, 

283 ) 

284 finally: 

285 obtain.cancel() 

286 

287 async def _run_inner(self, obtain): 

288 errors = 0 

289 errors_max = 5 

290 failed_initialization = set() 

291 try_reuse_discovery = False 

292 

293 while True: 

294 if try_reuse_discovery: 

295 try_reuse_discovery = False 

296 else: 

297 self._set_state("discovering") 

298 

299 for i in range(4): 

300 if i: 

301 self.log.info("Waiting to retry RD discovery") 

302 await asyncio.sleep(2 * 3 ** (i - 1)) # arbitrary fall-off 

303 await self._fill_directory_resource(blacklist=failed_initialization) 

304 break 

305 else: 

306 self.log.error("Giving up RD discovery") 

307 break 

308 

309 await obtain 

310 

311 self._set_state("registering") 

312 

313 try: 

314 await self._register() 

315 except self._RetryableError as e: 

316 errors += 1 

317 if errors < errors_max: 

318 self.log.warning( 

319 "Initial registration failed (%s), blacklisting RD URI and retrying discovery", 

320 e, 

321 ) 

322 failed_initialization.add(self._directory_resource) 

323 self._directory_resource = None 

324 continue 

325 else: 

326 self.log.error( 

327 "Giving up after too many failed initial registrations" 

328 ) 

329 break 

330 

331 # registration is active, keep it that way. 

332 

333 # things look good enough to forget about past bad experiences. 

334 # could move this to the end of the following loop if worries come 

335 # up of having picked a bad RD that supports registration but not 

336 # registration updates 

337 errors = 0 

338 failed_initialization = set() 

339 try_reuse_discovery = True 

340 

341 while True: 

342 self._set_state("registered") 

343 

344 # renew 60 seconds before timeout, unless that's before the 75% mark (then wait for that) 

345 await asyncio.sleep( 

346 self._lt - 60 if self._lt > 240 else self._lt * 3 // 4 

347 ) 

348 

349 self._set_state("renewing") 

350 

351 try: 

352 await self._renew_registration() 

353 except self._RetryableError as e: 

354 self.log.warning( 

355 "Registration update failed (%s), retrying with new registration", 

356 e, 

357 ) 

358 break 

359 

360 async def shutdown(self): 

361 """Delete the registration. This will not raise any resulting error 

362 messages but just log them, same as any errors occurring during the 

363 registration will only be logged.""" 

364 self._task.cancel() 

365 

366 if self._registration_resource is None: 

367 return 

368 

369 try: 

370 await self._context.request( 

371 Message(code=DELETE, uri=self._registration_resource) 

372 ).response_raising 

373 except Exception as e: 

374 self.log.error("Error deregistering from the RD", exc_info=e)