Coverage for aiocoap / tokenmanager.py: 88%

116 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 12:28 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5import functools 

6import random 

7 

8from . import error 

9from . import interfaces 

10 

11from .pipe import Pipe 

12 

13 

14class TokenManager(interfaces.RequestInterface, interfaces.TokenManager): 

15 token_interface: interfaces.TokenInterface 

16 # needs to be set post-construction, because the token_interface in its constructor already needs to get its manager 

17 

18 def __init__(self, context): 

19 self.context = context 

20 

21 self._token = random.randint(0, 65535) 

22 self.outgoing_requests = {} 

23 """Unfinished outgoing requests (identified by token and remote)""" 

24 self.incoming_requests = {} 

25 """Unfinished incoming requests. 

26 

27 ``(token, remote): (Pipe, stopper)`` where stopper is a 

28 function unregisters the Pipe event handler and thus 

29 indicates to the server the discontinued interest""" 

30 

31 self.log = self.context.log 

32 self.loop = self.context.loop 

33 

34 def __repr__(self): 

35 return "<%s for %s>" % ( 

36 type(self).__name__, 

37 getattr(self, "token_interface", "(unbound)"), 

38 ) 

39 

40 @property 

41 def client_credentials(self): 

42 return self.context.client_credentials 

43 

44 async def shutdown(self): 

45 while self.incoming_requests: 

46 key = next(iter(self.incoming_requests.keys())) 

47 (_, stop) = self.incoming_requests.pop(key) 

48 # This cancels them, not sending anything. 

49 # 

50 # FIXME should we? (RST? 5.00 Server Shutdown? An RST would only 

51 # work if we pushed this further down the shutdown chain; a 5.00 we 

52 # could raise in the task.) 

53 stop() 

54 self.incoming_requests = None 

55 

56 while self.outgoing_requests: 

57 key = next(iter(self.outgoing_requests.keys())) 

58 request = self.outgoing_requests.pop(key) 

59 request.add_exception(error.LibraryShutdown()) 

60 self.outgoing_requests = None 

61 

62 await self.token_interface.shutdown() 

63 

64 def next_token(self): 

65 """Reserve and return a new Token for request.""" 

66 # TODO: add proper Token handling 

67 self._token = (self._token + 1) % (2**64) 

68 return self._token.to_bytes(8, "big").lstrip(b"\0") 

69 

70 # 

71 # implement the tokenmanager interface 

72 # 

73 

74 def dispatch_error(self, exception, remote): 

75 if self.outgoing_requests is None: 

76 # Not entirely sure where it is so far; better just raise a warning 

77 # than an exception later, nothing terminally bad should come of 

78 # this error. 

79 self.log.warning( 

80 "Internal shutdown sequence msismatch: error dispatched through tokenmanager after shutdown" 

81 ) 

82 return 

83 

84 # NetworkError is what we promise users to raise from request etc; if 

85 # it's already a NetworkError and possibly more descriptive (eg. a 

86 # TimeoutError), we'll just let it through (and thus allow 

87 # differentiated handling eg. in application-level retries). 

88 if not isinstance(exception, error.NetworkError): 

89 cause = exception 

90 exception = error.NetworkError(str(exception)) 

91 exception.__cause__ = cause 

92 

93 # The stopping calls would pop items from the pending requests -- 

94 # iterating once, extracting the stoppers and then calling them en 

95 # batch 

96 stoppers = [] 

97 for key, request in self.outgoing_requests.items(): 

98 (token, request_remote) = key 

99 if request_remote == remote: 

100 stoppers.append( 

101 lambda request=request, exception=exception: request.add_exception( 

102 exception 

103 ) 

104 ) 

105 

106 for (_, _r), (_, stopper) in self.incoming_requests.items(): 

107 if remote == _r: 

108 stoppers.append(stopper) 

109 for stopper in stoppers: 

110 stopper() 

111 

112 def process_request(self, request): 

113 key = (request.token, request.remote) 

114 

115 if key in self.incoming_requests: 

116 # This is either a "I consider that token invalid, probably forgot 

117 # about it, but here's a new request" or renewed interest in an 

118 # observation, which gets modelled as a new request at thislevel 

119 self.log.debug("Incoming request overrides existing request") 

120 # Popping: FIXME Decide if one of them is sufficient (see `del self.incoming_requests[key]` below) 

121 (pipe, stop) = self.incoming_requests.pop(key) 

122 stop() 

123 

124 pipe = Pipe(request, self.log) 

125 

126 # FIXME: what can we pass down to the token_interface? certainly not 

127 # the request, but maybe the request with a response filter applied? 

128 def on_event(ev): 

129 if ev.message is not None: 

130 m = ev.message 

131 # FIXME: should this code warn if token or remote are set? 

132 m.token = request.token 

133 m.remote = request.remote.as_response_address() 

134 

135 # The token interface may use information from that, eg. 

136 # whether the request was sent reliably or not. 

137 m.request = request 

138 

139 self.token_interface.send_message( 

140 m, 

141 # No more interest from *that* remote; as it's the only 

142 # thing keeping the PR alive, it'll go its course of 

143 # vanishing for lack of interest (as it would if 

144 # stop were called from its other possible caller, 

145 # the start of process_request when a new request comes 

146 # in on the same token) 

147 stop, 

148 ) 

149 else: 

150 # It'd be tempting to raise here, but typically being called 

151 # from a task, it wouldn't propagate any further either, and at 

152 # least here we have a logger. 

153 self.log.error( 

154 "Requests shouldn't receive errors at the level of a TokenManager any more, but this did: %s", 

155 ev, 

156 ) 

157 if not ev.is_last: 

158 return True 

159 

160 def on_end(): 

161 if key in self.incoming_requests: 

162 # It may not be, especially if it was popped in `(pipe, stop) = self.incoming_requests.pop(keyu)` above 

163 # FIXME Decide if one of them is sufficient 

164 del self.incoming_requests[key] 

165 # no further cleanup to do here: any piggybackable ack was already flushed 

166 # out by the first response, and if there was not even a 

167 # NoResponse, something went wrong above (and we can't tell easily 

168 # here). 

169 

170 stop = pipe.on_event(on_event) 

171 pipe.on_interest_end(on_end) 

172 

173 self.incoming_requests[key] = (pipe, stop) 

174 

175 self.context.render_to_pipe(pipe) 

176 

177 def process_response(self, response): 

178 key = (response.token, response.remote) 

179 if key not in self.outgoing_requests: 

180 # maybe it was a multicast... 

181 key = (response.token, None) 

182 

183 try: 

184 request = self.outgoing_requests[key] 

185 except KeyError: 

186 self.log.info("Response %r could not be matched to any request", response) 

187 return False 

188 else: 

189 self.log.debug("Response %r matched to request %r", response, request) 

190 

191 # FIXME: there's a multicast aspect to that as well 

192 # 

193 # Is it necessary to look into .opt.observe here, wouldn't that better 

194 # be done by the higher-level code that knows about CoAP options? 

195 # Maybe, but at some point in TokenManager we *have* to look into the 

196 # options to see whether to expect a short- or long-running token. 

197 # Still, it would be an option not to send an is_last here and *always* 

198 # have the higher-level code indicate loss of interest in that exchange 

199 # when it detects that no more observations will follow. 

200 final = not ( 

201 request.request.opt.observe == 0 and response.opt.observe is not None 

202 ) 

203 

204 if final: 

205 self.outgoing_requests.pop(key) 

206 

207 request.add_response(response, is_last=final) 

208 return True 

209 

210 # 

211 # implement RequestInterface 

212 # 

213 

214 async def fill_or_recognize_remote(self, message): 

215 return await self.token_interface.fill_or_recognize_remote(message) 

216 

217 def request(self, request): 

218 if self.outgoing_requests is None: 

219 request.add_exception(error.LibraryShutdown()) 

220 return 

221 

222 msg = request.request 

223 

224 assert msg.code.is_request(), "Message code is not valid for request" 

225 

226 # This might easily change, but right now, relying on the Context to 

227 # fill_remote early makes steps easier here. 

228 assert msg.remote is not None, "Remote not pre-populated" 

229 

230 # FIXME: pick a suitably short one where available, and a longer one 

231 # for observations if many short ones are already in-flight 

232 msg.token = self.next_token() 

233 

234 self.log.debug( 

235 "Sending request - Token: %s, Remote: %s", msg.token.hex(), msg.remote 

236 ) 

237 

238 # A request sent over the multicast interface will only return a single 

239 # response and otherwise behave quite like an anycast request (which is 

240 # probably intended). 

241 if msg.remote.is_multicast: 

242 self.log.warning("Sending request to multicast via unicast request method") 

243 key = (msg.token, None) 

244 else: 

245 key = (msg.token, msg.remote) 

246 

247 self.outgoing_requests[key] = request 

248 request.on_interest_end( 

249 functools.partial(self.outgoing_requests.pop, key, None) 

250 ) 

251 

252 try: 

253 send_canceller = self.token_interface.send_message( 

254 msg, lambda: request.add_exception(error.MessageError) 

255 ) 

256 except Exception as e: 

257 request.add_exception(e) 

258 return 

259 

260 if send_canceller is not None: 

261 # This needs to be called both when the requester cancels the 

262 # request, and when a response to the CON request comes in via a 

263 # different CON when the original ACK was lost, so the retransmits 

264 # can stop. 

265 # 

266 # FIXME: This might need a little sharper conditions: A fresh CON 

267 # should be sufficient to stop retransmits of a CON in a first 

268 # request, but when refreshing an observation, only an ACK tells us 

269 # that the updated observation got through. Also, multicast needs 

270 # to be an exception, but that generally needs handling here. 

271 # 

272 # It may be that it'd be wise to reduce the use of send_canceller 

273 # to situations when the request is actually cancelled, and pass 

274 # some information to the token_interface about whether it should 

275 # keep an eye out for responses on that token and cancel 

276 # transmission accordingly. 

277 request.on_event(lambda ev: (send_canceller(), False)[1], is_interest=False)