Coverage for aiocoap/tokenmanager.py: 88%
115 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5import functools
6import random
8from . import error
9from . import interfaces
11# To be used sparingly here: This deals with request / responses on the token
12# layer. But the layer below won't even know that messages are responses, so it
13# can't make the informed decisions we make here.
14from .numbers.types import NON
15from .pipe import Pipe
18class TokenManager(interfaces.RequestInterface, interfaces.TokenManager):
19 def __init__(self, context):
20 self.context = context
22 self._token = random.randint(0, 65535)
23 self.outgoing_requests = {}
24 """Unfinished outgoing requests (identified by token and remote)"""
25 self.incoming_requests = {}
26 """Unfinished incoming requests.
28 ``(token, remote): (Pipe, stopper)`` where stopper is a
29 function unregistes the Pipe event handler and thus
30 indicates to the server the discontinued interest"""
32 self.log = self.context.log
33 self.loop = self.context.loop
35 # self.token_interface = … -- needs to be set post-construction, because the token_interface in its constructor already needs to get its manager
37 def __repr__(self):
38 return "<%s for %s>" % (
39 type(self).__name__,
40 getattr(self, "token_interface", "(unbound)"),
41 )
43 @property
44 def client_credentials(self):
45 return self.context.client_credentials
47 async def shutdown(self):
48 while self.incoming_requests:
49 key = next(iter(self.incoming_requests.keys()))
50 (_, stop) = self.incoming_requests.pop(key)
51 # This cancels them, not sending anything.
52 #
53 # FIXME should we? (RST? 5.00 Server Shutdown? An RST would only
54 # work if we pushed this further down the shutdown chain; a 5.00 we
55 # could raise in the task.)
56 stop()
57 self.incoming_requests = None
59 while self.outgoing_requests:
60 key = next(iter(self.outgoing_requests.keys()))
61 request = self.outgoing_requests.pop(key)
62 request.add_exception(error.LibraryShutdown())
63 self.outgoing_requests = None
65 await self.token_interface.shutdown()
67 def next_token(self):
68 """Reserve and return a new Token for request."""
69 # TODO: add proper Token handling
70 self._token = (self._token + 1) % (2**64)
71 return self._token.to_bytes(8, "big").lstrip(b"\0")
73 #
74 # implement the tokenmanager interface
75 #
77 def dispatch_error(self, exception, remote):
78 if self.outgoing_requests is None:
79 # Not entirely sure where it is so far; better just raise a warning
80 # than an exception later, nothing terminally bad should come of
81 # this error.
82 self.log.warning(
83 "Internal shutdown sequence msismatch: error dispatched through tokenmanager after shutdown"
84 )
85 return
87 # NetworkError is what we promise users to raise from request etc; if
88 # it's already a NetworkError and possibly more descriptive (eg. a
89 # TimeoutError), we'll just let it through (and thus allow
90 # differentiated handling eg. in application-level retries).
91 if not isinstance(exception, error.NetworkError):
92 cause = exception
93 exception = error.NetworkError(str(exception))
94 exception.__cause__ = cause
96 # The stopping calls would pop items from the pending requests --
97 # iterating once, extracting the stoppers and then calling them en
98 # batch
99 stoppers = []
100 for key, request in self.outgoing_requests.items():
101 (token, request_remote) = key
102 if request_remote == remote:
103 stoppers.append(
104 lambda request=request, exception=exception: request.add_exception(
105 exception
106 )
107 )
109 for (_, _r), (_, stopper) in self.incoming_requests.items():
110 if remote == _r:
111 stoppers.append(stopper)
112 for stopper in stoppers:
113 stopper()
115 def process_request(self, request):
116 key = (request.token, request.remote)
118 if key in self.incoming_requests:
119 # This is either a "I consider that token invalid, probably forgot
120 # about it, but here's a new request" or renewed interest in an
121 # observation, which gets modelled as a new request at thislevel
122 self.log.debug("Incoming request overrides existing request")
123 # Popping: FIXME Decide if one of them is sufficient (see `del self.incoming_requests[key]` below)
124 (pipe, stop) = self.incoming_requests.pop(key)
125 stop()
127 pipe = Pipe(request, self.log)
129 # FIXME: what can we pass down to the token_interface? certainly not
130 # the request, but maybe the request with a response filter applied?
131 def on_event(ev):
132 if ev.message is not None:
133 m = ev.message
134 # FIXME: should this code warn if token or remote are set?
135 m.token = request.token
136 m.remote = request.remote.as_response_address()
138 if m.mtype is None and request.mtype is NON:
139 # Default to sending NON to NON requests; rely on the
140 # default (CON if stand-alone else ACK) otherwise.
141 m.mtype = NON
142 self.token_interface.send_message(
143 m,
144 # No more interest from *that* remote; as it's the only
145 # thing keeping the PR alive, it'll go its course of
146 # vanishing for lack of interest (as it would if
147 # stop were called from its other possible caller,
148 # the start of process_request when a new request comes
149 # in on the same token)
150 stop,
151 )
152 else:
153 # It'd be tempting to raise here, but typically being called
154 # from a task, it wouldn't propagate any further either, and at
155 # least here we have a logger.
156 self.log.error(
157 "Requests shouldn't receive errors at the level of a TokenManager any more, but this did: %s",
158 ev,
159 )
160 if not ev.is_last:
161 return True
163 def on_end():
164 if key in self.incoming_requests:
165 # It may not be, especially if it was popped in `(pipe, stop) = self.incoming_requests.pop(keyu)` above
166 # FIXME Decide if one of them is sufficient
167 del self.incoming_requests[key]
168 # no further cleanup to do here: any piggybackable ack was already flushed
169 # out by the first response, and if there was not even a
170 # NoResponse, something went wrong above (and we can't tell easily
171 # here).
173 stop = pipe.on_event(on_event)
174 pipe.on_interest_end(on_end)
176 self.incoming_requests[key] = (pipe, stop)
178 self.context.render_to_pipe(pipe)
180 def process_response(self, response):
181 key = (response.token, response.remote)
182 if key not in self.outgoing_requests:
183 # maybe it was a multicast...
184 key = (response.token, None)
186 try:
187 request = self.outgoing_requests[key]
188 except KeyError:
189 self.log.info("Response %r could not be matched to any request", response)
190 return False
191 else:
192 self.log.debug("Response %r matched to request %r", response, request)
194 # FIXME: there's a multicast aspect to that as well
195 #
196 # Is it necessary to look into .opt.observe here, wouldn't that better
197 # be done by the higher-level code that knows about CoAP options?
198 # Maybe, but at some point in TokenManager we *have* to look into the
199 # options to see whether to expect a short- or long-running token.
200 # Still, it would be an option not to send an is_last here and *always*
201 # have the higher-level code indicate loss of interest in that exchange
202 # when it detects that no more observations will follow.
203 final = not (
204 request.request.opt.observe == 0 and response.opt.observe is not None
205 )
207 if final:
208 self.outgoing_requests.pop(key)
210 request.add_response(response, is_last=final)
211 return True
213 #
214 # implement RequestInterface
215 #
217 async def fill_or_recognize_remote(self, message):
218 return await self.token_interface.fill_or_recognize_remote(message)
220 def request(self, request):
221 msg = request.request
223 assert msg.code.is_request(), "Message code is not valid for request"
225 # This might easily change, but right now, relying on the Context to
226 # fill_remote early makes steps easier here.
227 assert msg.remote is not None, "Remote not pre-populated"
229 # FIXME: pick a suitably short one where available, and a longer one
230 # for observations if many short ones are already in-flight
231 msg.token = self.next_token()
233 self.log.debug(
234 "Sending request - Token: %s, Remote: %s", msg.token.hex(), msg.remote
235 )
237 # A request sent over the multicast interface will only return a single
238 # response and otherwise behave quite like an anycast request (which is
239 # probably intended).
240 if msg.remote.is_multicast:
241 self.log.warning("Sending request to multicast via unicast request method")
242 key = (msg.token, None)
243 else:
244 key = (msg.token, msg.remote)
246 self.outgoing_requests[key] = request
247 request.on_interest_end(
248 functools.partial(self.outgoing_requests.pop, key, None)
249 )
251 try:
252 send_canceller = self.token_interface.send_message(
253 msg, lambda: request.add_exception(error.MessageError)
254 )
255 except Exception as e:
256 request.add_exception(e)
257 return
259 if send_canceller is not None:
260 # This needs to be called both when the requester cancels the
261 # request, and when a response to the CON request comes in via a
262 # different CON when the original ACK was lost, so the retransmits
263 # can stop.
264 #
265 # FIXME: This might need a little sharper conditions: A fresh CON
266 # should be sufficient to stop retransmits of a CON in a first
267 # request, but when refreshing an observation, only an ACK tells us
268 # that the updated observation got through. Also, multicast needs
269 # to be an exception, but that generally needs handling here.
270 #
271 # It may be that it'd be wise to reduce the use of send_canceller
272 # to situations when the request is actualy cancelled, and pass
273 # some information to the token_interface about whether it should
274 # keep an eye out for responses on that token and cancel
275 # transmission accordingly.
276 request.on_event(lambda ev: (send_canceller(), False)[1], is_interest=False)