Coverage for aiocoap / tokenmanager.py: 88%
116 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5import functools
6import random
8from . import error
9from . import interfaces
11from .pipe import Pipe
14class TokenManager(interfaces.RequestInterface, interfaces.TokenManager):
15 token_interface: interfaces.TokenInterface
16 # needs to be set post-construction, because the token_interface in its constructor already needs to get its manager
18 def __init__(self, context):
19 self.context = context
21 self._token = random.randint(0, 65535)
22 self.outgoing_requests = {}
23 """Unfinished outgoing requests (identified by token and remote)"""
24 self.incoming_requests = {}
25 """Unfinished incoming requests.
27 ``(token, remote): (Pipe, stopper)`` where stopper is a
28 function unregisters the Pipe event handler and thus
29 indicates to the server the discontinued interest"""
31 self.log = self.context.log
32 self.loop = self.context.loop
34 def __repr__(self):
35 return "<%s for %s>" % (
36 type(self).__name__,
37 getattr(self, "token_interface", "(unbound)"),
38 )
40 @property
41 def client_credentials(self):
42 return self.context.client_credentials
44 async def shutdown(self):
45 while self.incoming_requests:
46 key = next(iter(self.incoming_requests.keys()))
47 (_, stop) = self.incoming_requests.pop(key)
48 # This cancels them, not sending anything.
49 #
50 # FIXME should we? (RST? 5.00 Server Shutdown? An RST would only
51 # work if we pushed this further down the shutdown chain; a 5.00 we
52 # could raise in the task.)
53 stop()
54 self.incoming_requests = None
56 while self.outgoing_requests:
57 key = next(iter(self.outgoing_requests.keys()))
58 request = self.outgoing_requests.pop(key)
59 request.add_exception(error.LibraryShutdown())
60 self.outgoing_requests = None
62 await self.token_interface.shutdown()
64 def next_token(self):
65 """Reserve and return a new Token for request."""
66 # TODO: add proper Token handling
67 self._token = (self._token + 1) % (2**64)
68 return self._token.to_bytes(8, "big").lstrip(b"\0")
70 #
71 # implement the tokenmanager interface
72 #
74 def dispatch_error(self, exception, remote):
75 if self.outgoing_requests is None:
76 # Not entirely sure where it is so far; better just raise a warning
77 # than an exception later, nothing terminally bad should come of
78 # this error.
79 self.log.warning(
80 "Internal shutdown sequence msismatch: error dispatched through tokenmanager after shutdown"
81 )
82 return
84 # NetworkError is what we promise users to raise from request etc; if
85 # it's already a NetworkError and possibly more descriptive (eg. a
86 # TimeoutError), we'll just let it through (and thus allow
87 # differentiated handling eg. in application-level retries).
88 if not isinstance(exception, error.NetworkError):
89 cause = exception
90 exception = error.NetworkError(str(exception))
91 exception.__cause__ = cause
93 # The stopping calls would pop items from the pending requests --
94 # iterating once, extracting the stoppers and then calling them en
95 # batch
96 stoppers = []
97 for key, request in self.outgoing_requests.items():
98 (token, request_remote) = key
99 if request_remote == remote:
100 stoppers.append(
101 lambda request=request, exception=exception: request.add_exception(
102 exception
103 )
104 )
106 for (_, _r), (_, stopper) in self.incoming_requests.items():
107 if remote == _r:
108 stoppers.append(stopper)
109 for stopper in stoppers:
110 stopper()
112 def process_request(self, request):
113 key = (request.token, request.remote)
115 if key in self.incoming_requests:
116 # This is either a "I consider that token invalid, probably forgot
117 # about it, but here's a new request" or renewed interest in an
118 # observation, which gets modelled as a new request at thislevel
119 self.log.debug("Incoming request overrides existing request")
120 # Popping: FIXME Decide if one of them is sufficient (see `del self.incoming_requests[key]` below)
121 (pipe, stop) = self.incoming_requests.pop(key)
122 stop()
124 pipe = Pipe(request, self.log)
126 # FIXME: what can we pass down to the token_interface? certainly not
127 # the request, but maybe the request with a response filter applied?
128 def on_event(ev):
129 if ev.message is not None:
130 m = ev.message
131 # FIXME: should this code warn if token or remote are set?
132 m.token = request.token
133 m.remote = request.remote.as_response_address()
135 # The token interface may use information from that, eg.
136 # whether the request was sent reliably or not.
137 m.request = request
139 self.token_interface.send_message(
140 m,
141 # No more interest from *that* remote; as it's the only
142 # thing keeping the PR alive, it'll go its course of
143 # vanishing for lack of interest (as it would if
144 # stop were called from its other possible caller,
145 # the start of process_request when a new request comes
146 # in on the same token)
147 stop,
148 )
149 else:
150 # It'd be tempting to raise here, but typically being called
151 # from a task, it wouldn't propagate any further either, and at
152 # least here we have a logger.
153 self.log.error(
154 "Requests shouldn't receive errors at the level of a TokenManager any more, but this did: %s",
155 ev,
156 )
157 if not ev.is_last:
158 return True
160 def on_end():
161 if key in self.incoming_requests:
162 # It may not be, especially if it was popped in `(pipe, stop) = self.incoming_requests.pop(keyu)` above
163 # FIXME Decide if one of them is sufficient
164 del self.incoming_requests[key]
165 # no further cleanup to do here: any piggybackable ack was already flushed
166 # out by the first response, and if there was not even a
167 # NoResponse, something went wrong above (and we can't tell easily
168 # here).
170 stop = pipe.on_event(on_event)
171 pipe.on_interest_end(on_end)
173 self.incoming_requests[key] = (pipe, stop)
175 self.context.render_to_pipe(pipe)
177 def process_response(self, response):
178 key = (response.token, response.remote)
179 if key not in self.outgoing_requests:
180 # maybe it was a multicast...
181 key = (response.token, None)
183 try:
184 request = self.outgoing_requests[key]
185 except KeyError:
186 self.log.info("Response %r could not be matched to any request", response)
187 return False
188 else:
189 self.log.debug("Response %r matched to request %r", response, request)
191 # FIXME: there's a multicast aspect to that as well
192 #
193 # Is it necessary to look into .opt.observe here, wouldn't that better
194 # be done by the higher-level code that knows about CoAP options?
195 # Maybe, but at some point in TokenManager we *have* to look into the
196 # options to see whether to expect a short- or long-running token.
197 # Still, it would be an option not to send an is_last here and *always*
198 # have the higher-level code indicate loss of interest in that exchange
199 # when it detects that no more observations will follow.
200 final = not (
201 request.request.opt.observe == 0 and response.opt.observe is not None
202 )
204 if final:
205 self.outgoing_requests.pop(key)
207 request.add_response(response, is_last=final)
208 return True
210 #
211 # implement RequestInterface
212 #
214 async def fill_or_recognize_remote(self, message):
215 return await self.token_interface.fill_or_recognize_remote(message)
217 def request(self, request):
218 if self.outgoing_requests is None:
219 request.add_exception(error.LibraryShutdown())
220 return
222 msg = request.request
224 assert msg.code.is_request(), "Message code is not valid for request"
226 # This might easily change, but right now, relying on the Context to
227 # fill_remote early makes steps easier here.
228 assert msg.remote is not None, "Remote not pre-populated"
230 # FIXME: pick a suitably short one where available, and a longer one
231 # for observations if many short ones are already in-flight
232 msg.token = self.next_token()
234 self.log.debug(
235 "Sending request - Token: %s, Remote: %s", msg.token.hex(), msg.remote
236 )
238 # A request sent over the multicast interface will only return a single
239 # response and otherwise behave quite like an anycast request (which is
240 # probably intended).
241 if msg.remote.is_multicast:
242 self.log.warning("Sending request to multicast via unicast request method")
243 key = (msg.token, None)
244 else:
245 key = (msg.token, msg.remote)
247 self.outgoing_requests[key] = request
248 request.on_interest_end(
249 functools.partial(self.outgoing_requests.pop, key, None)
250 )
252 try:
253 send_canceller = self.token_interface.send_message(
254 msg, lambda: request.add_exception(error.MessageError)
255 )
256 except Exception as e:
257 request.add_exception(e)
258 return
260 if send_canceller is not None:
261 # This needs to be called both when the requester cancels the
262 # request, and when a response to the CON request comes in via a
263 # different CON when the original ACK was lost, so the retransmits
264 # can stop.
265 #
266 # FIXME: This might need a little sharper conditions: A fresh CON
267 # should be sufficient to stop retransmits of a CON in a first
268 # request, but when refreshing an observation, only an ACK tells us
269 # that the updated observation got through. Also, multicast needs
270 # to be an exception, but that generally needs handling here.
271 #
272 # It may be that it'd be wise to reduce the use of send_canceller
273 # to situations when the request is actually cancelled, and pass
274 # some information to the token_interface about whether it should
275 # keep an eye out for responses on that token and cancel
276 # transmission accordingly.
277 request.on_event(lambda ev: (send_canceller(), False)[1], is_interest=False)