Coverage for aiocoap/resourcedirectory/client/register.py: 14%
164 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""Client components for registering with a resource-directory server"""
7import asyncio
8from urllib.parse import urljoin, urlparse
9import logging
11from socket import getfqdn
13from ...util.linkformat import link_header
15from ...message import Message
16from ...numbers import GET, POST, DELETE, SERVICE_UNAVAILABLE, NOT_FOUND
18__all__ = ["Registerer"]
21class Registerer:
22 """Implementation of the client side of the registration of a resource
23 directory. Until the object is :meth:`shut down <shutdown>`, it keeps the
24 registration alive. It works both for registering the own context as well
25 as for registering others (taking the role of a commissioning tool).
27 The :attr:`state` attribute is kept up to date with an informal
28 representation of whether the registration is currently active.
30 If any step in the registration fails, the object will not retry
31 indefinitely, and it will back off to earlier steps; after a limited number
32 of retries after the last successful step, the object permanently enters a
33 failed state. (In future extension, it might listen for external events
34 that allow it to restart heuristics, like a new network interface coming
35 up).
37 The registration does not observe the resource list of the registered host
38 (yet?), so registrations are only kept alive and never updated."""
40 def __init__(
41 self,
42 context,
43 rd=None,
44 lt=90000,
45 name_from_hostname=None,
46 link_source=None,
47 registration_parameters={},
48 loggername="coap-rd-registerer",
49 ):
50 """Use a ``context`` to create a registration at the Resource
51 directiory at ``rd`` (defaulting to "find an RD yourself"; URIs should
52 have no path component, unless the user wishes to sidestep the URI
53 discovery step). It will be renewed every `lifetime` seconds.
55 The registration data will be obtained by querying the context's site's
56 ``.well-known/core`` resource, unless another source URI is given in
57 ``link_source``, in which case this object acts as a Commissioning
58 Tool.
60 Parameters to pass with the registration can be given in
61 ``registration_parameters``. ``lt`` and ``base`` default to the
62 constructor arguments ``lt`` and ``link_source``, respectively.
64 If ``name_from_hostname`` is True (or by default if ``ep`` is not
65 present in ``registration_parameters``), the ``ep`` and ``d``
66 registration parameters are derived from the host name."""
68 self._context = context
70 self._link_source = None
71 self._link_data = None #: Message
72 self._lt = lt
73 self._initial_rd = rd
75 self._directory_resource = None
76 self._registration_resource = None
78 self._registration_parameters = dict(registration_parameters)
80 if name_from_hostname or (
81 name_from_hostname is None and "ep" not in registration_parameters
82 ):
83 ep, _, d = getfqdn().partition(".")
84 self._registration_parameters["ep"] = ep
85 if d:
86 self._registration_parameters["d"] = d
88 self.log = logging.getLogger(loggername)
90 self._set_state("starting")
91 self._task = asyncio.create_task(self._run())
93 def __repr__(self):
94 return "<%s at %#x: registering at %s as %s (currently %s)>" % (
95 type(self).__name__,
96 id(self),
97 self._registration_resource or self._directory_resource or self._initial_rd,
98 self._registration_parameters,
99 self.state,
100 )
102 def _set_state(self, newstate):
103 self.log.debug("Entering state %s", newstate)
104 self.state = newstate
106 async def _fill_directory_resource(self, blacklist=set()):
107 # FIXME: this should at some point catch network errors (short of
108 # falling back to "RD discovery failed, backing off"), but that needs
109 # falling back to other discovery methods here, and i don't know how
110 # this will be done yet
112 if self._directory_resource is not None:
113 return
115 if self._initial_rd is None:
116 # FIXME: can't access DHCP options generically, dunno about SLAAC.
117 # It seems to be a sane assumption that the best thing to do is to
118 # assume we're on a big host and multicast is cheap here.
119 self._directory_resource = await self._discovery_directory_uri(
120 "coap://[ff05::fd]]", blacklist=blacklist
121 )
122 else:
123 components = urlparse(self._initial_rd)
124 if not components.scheme or not components.netloc:
125 raise ValueError("Explicit RD URIs need to contain scheme and path")
127 if components.path:
128 if self._initial_rd in blacklist:
129 raise self._UnrecoverableError(
130 "Explicitly configured RD was blacklisted"
131 )
132 else:
133 self._directory_resource = self._initial_rd
134 else:
135 self._directory_resource = await self._discovery_directory_uri(
136 self._initial_rd, blacklist=blacklist
137 )
139 async def _discovery_directory_uri(self, host, blacklist=set()):
140 lookup_uri = urljoin(host, "/.well-known/core?rt=core.rd")
142 try:
143 # FIXME: this should be able to deal with multicasts
144 response = await self._context.request(
145 Message(code=GET, uri=lookup_uri, accept=40)
146 ).response_raising
147 links = link_header.parse(response.payload.decode("utf8"))
148 except (UnicodeDecodeError, link_header.ParseException):
149 self.log.error("Error parsing the RD's self description")
150 raise
152 addresses = [
153 link.get_target(response.get_request_uri())
154 for link in links.links
155 if "core.rd" in " ".join(link.rt).split(" ")
156 ]
157 unfiltered_addresses = len(addresses)
158 addresses = [a for a in addresses if a not in blacklist]
159 if not addresses:
160 if len(addresses) != unfiltered_addresses:
161 raise self._UnrecoverableError(
162 "All discovered Directory Resources are blacklisted"
163 )
164 else:
165 raise self._UnrecoverableError(
166 "No registration interface found in RD's response"
167 )
169 if len(addresses) > 1:
170 self.log.warn(
171 "More than one registration interface found," " picking the first"
172 )
174 return addresses[0]
176 async def _obtain_link_data(self):
177 """Store a message describing the data to be POSTed to the
178 registration interface.
180 This needs to be in :class:`Message` format, but doesn't need to have
181 any particular code set yet (that gets set later anyway), so in effect,
182 the response message from the con can be returned as is.
183 """
185 if self._link_source is None:
186 self._link_data = Message(
187 content_format=40,
188 payload=str(
189 self._context.serversite.get_resources_as_linkheader()
190 ).encode("utf8"),
191 )
193 else:
194 self._link_data = await self._context.request(
195 Message(code=GET, uri=urljoin(self._link_source, "/.well-known/core"))
196 ).response_raising
198 class _RetryableError(RuntimeError):
199 """Raised when an initial registration or update rails in a way that
200 warrants rediscovery of the RD"""
202 class _UnrecoverableError(RuntimeError):
203 """Raised when the RD registration process runs out of options
204 (typically with a descriptive message)"""
206 async def _request_with_retries(self, message):
207 # FIXME: response_nonraising gives 5.00 now, but for debugging we might
208 # want to show something better, and for URI discovery, we should not
209 # consider this a final error
210 response = await self._context.request(message).response_nonraising
212 unavailable_retries = 0
213 while response.code == SERVICE_UNAVAILABLE and response.opt.max_age is not None:
214 if unavailable_retries > 6:
215 raise self._RetryableError(
216 "RD responded with Service Unavailable too often"
217 )
218 self.log.info("RD asked to retry the operation later")
219 await asyncio.sleep(max(response.opt.max_age, 2 ** (unavailable_retries)))
220 response = await self._context.request(message).response_nonraising
222 return response
224 async def _register(self):
225 initial_message = self._link_data.copy(code=POST, uri=self._directory_resource)
226 base_query = {}
227 if self._lt != 90000:
228 base_query["lt"] = str(self._lt)
229 if self._link_source is not None:
230 base_query["base"] = self._link_source
231 query = dict(base_query, **self._registration_parameters)
233 initial_message.opt.uri_query = initial_message.opt.uri_query + tuple(
234 "%s=%s" % (k, v) for (k, v) in query.items()
235 )
237 response = await self._request_with_retries(initial_message)
239 if not response.code.is_successful():
240 raise self._RetryableError(
241 "RD responded with odd error: %s / %r"
242 % (response.code, response.payload)
243 )
245 if not response.opt.location_path:
246 raise self._RetryableError("RD responded without a location")
248 # FIXME this should probably be available from the API, and consider location_query etc
249 self._registration_resource = urljoin(
250 response.get_request_uri(), "/" + "/".join(response.opt.location_path)
251 )
253 async def _renew_registration(self):
254 update_message = Message(code=POST, uri=self._registration_resource)
256 response = await self._request_with_retries(update_message)
258 if response.code == NOT_FOUND:
259 raise self._RetryableError("RD forgot about the registration")
261 if not response.code.is_successful():
262 raise self._RetryableError(
263 "RD responded with odd error: %s / %r"
264 % (response.code, response.payload)
265 )
267 async def _run(self):
268 obtain = asyncio.create_task(self._obtain_link_data())
270 try:
271 await self._run_inner(obtain)
272 except asyncio.CancelledError:
273 self._set_state("cancelled")
274 pass
275 except self._UnrecoverableError as e:
276 self._set_state("failed")
277 self.log.error("Aborting RD discovery: %s", e.args[0])
278 except Exception as e:
279 self._set_state("failed")
280 self.log.error(
281 "An error occurred during RD registration, not pursuing registration any further:",
282 exc_info=e,
283 )
284 finally:
285 obtain.cancel()
287 async def _run_inner(self, obtain):
288 errors = 0
289 errors_max = 5
290 failed_initialization = set()
291 try_reuse_discovery = False
293 while True:
294 if try_reuse_discovery:
295 try_reuse_discovery = False
296 else:
297 self._set_state("discovering")
299 for i in range(4):
300 if i:
301 self.log.info("Waiting to retry RD discovery")
302 await asyncio.sleep(2 * 3 ** (i - 1)) # arbitrary fall-off
303 await self._fill_directory_resource(blacklist=failed_initialization)
304 break
305 else:
306 self.log.error("Giving up RD discovery")
307 break
309 await obtain
311 self._set_state("registering")
313 try:
314 await self._register()
315 except self._RetryableError as e:
316 errors += 1
317 if errors < errors_max:
318 self.log.warning(
319 "Initial registration failed (%s), blacklisting RD URI and retrying discovery",
320 e,
321 )
322 failed_initialization.add(self._directory_resource)
323 self._directory_resource = None
324 continue
325 else:
326 self.log.error(
327 "Giving up after too many failed initial registrations"
328 )
329 break
331 # registration is active, keep it that way.
333 # things look good enough to forget about past bad experiences.
334 # could move this to the end of the following loop if worries come
335 # up of having picked a bad RD that supports registration but not
336 # registration updates
337 errors = 0
338 failed_initialization = set()
339 try_reuse_discovery = True
341 while True:
342 self._set_state("registered")
344 # renew 60 seconds before timeout, unless that's before the 75% mark (then wait for that)
345 await asyncio.sleep(
346 self._lt - 60 if self._lt > 240 else self._lt * 3 // 4
347 )
349 self._set_state("renewing")
351 try:
352 await self._renew_registration()
353 except self._RetryableError as e:
354 self.log.warning(
355 "Registration update failed (%s), retrying with new registration",
356 e,
357 )
358 break
360 async def shutdown(self):
361 """Delete the registration. This will not raise any resulting error
362 messages but just log them, same as any errors occurring during the
363 registration will only be logged."""
364 self._task.cancel()
366 if self._registration_resource is None:
367 return
369 try:
370 await self._context.request(
371 Message(code=DELETE, uri=self._registration_resource)
372 ).response_raising
373 except Exception as e:
374 self.log.error("Error deregistering from the RD", exc_info=e)