Coverage for aiocoap/tokenmanager.py: 88%

116 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-30 11:17 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5import functools 

6import random 

7 

8from . import error 

9from . import interfaces 

10 

11from .pipe import Pipe 

12 

13 

14class TokenManager(interfaces.RequestInterface, interfaces.TokenManager): 

15 def __init__(self, context): 

16 self.context = context 

17 

18 self._token = random.randint(0, 65535) 

19 self.outgoing_requests = {} 

20 """Unfinished outgoing requests (identified by token and remote)""" 

21 self.incoming_requests = {} 

22 """Unfinished incoming requests. 

23 

24 ``(token, remote): (Pipe, stopper)`` where stopper is a 

25 function unregistes the Pipe event handler and thus 

26 indicates to the server the discontinued interest""" 

27 

28 self.log = self.context.log 

29 self.loop = self.context.loop 

30 

31 # self.token_interface = … -- needs to be set post-construction, because the token_interface in its constructor already needs to get its manager 

32 

33 def __repr__(self): 

34 return "<%s for %s>" % ( 

35 type(self).__name__, 

36 getattr(self, "token_interface", "(unbound)"), 

37 ) 

38 

39 @property 

40 def client_credentials(self): 

41 return self.context.client_credentials 

42 

43 async def shutdown(self): 

44 while self.incoming_requests: 

45 key = next(iter(self.incoming_requests.keys())) 

46 (_, stop) = self.incoming_requests.pop(key) 

47 # This cancels them, not sending anything. 

48 # 

49 # FIXME should we? (RST? 5.00 Server Shutdown? An RST would only 

50 # work if we pushed this further down the shutdown chain; a 5.00 we 

51 # could raise in the task.) 

52 stop() 

53 self.incoming_requests = None 

54 

55 while self.outgoing_requests: 

56 key = next(iter(self.outgoing_requests.keys())) 

57 request = self.outgoing_requests.pop(key) 

58 request.add_exception(error.LibraryShutdown()) 

59 self.outgoing_requests = None 

60 

61 await self.token_interface.shutdown() 

62 

63 def next_token(self): 

64 """Reserve and return a new Token for request.""" 

65 # TODO: add proper Token handling 

66 self._token = (self._token + 1) % (2**64) 

67 return self._token.to_bytes(8, "big").lstrip(b"\0") 

68 

69 # 

70 # implement the tokenmanager interface 

71 # 

72 

73 def dispatch_error(self, exception, remote): 

74 if self.outgoing_requests is None: 

75 # Not entirely sure where it is so far; better just raise a warning 

76 # than an exception later, nothing terminally bad should come of 

77 # this error. 

78 self.log.warning( 

79 "Internal shutdown sequence msismatch: error dispatched through tokenmanager after shutdown" 

80 ) 

81 return 

82 

83 # NetworkError is what we promise users to raise from request etc; if 

84 # it's already a NetworkError and possibly more descriptive (eg. a 

85 # TimeoutError), we'll just let it through (and thus allow 

86 # differentiated handling eg. in application-level retries). 

87 if not isinstance(exception, error.NetworkError): 

88 cause = exception 

89 exception = error.NetworkError(str(exception)) 

90 exception.__cause__ = cause 

91 

92 # The stopping calls would pop items from the pending requests -- 

93 # iterating once, extracting the stoppers and then calling them en 

94 # batch 

95 stoppers = [] 

96 for key, request in self.outgoing_requests.items(): 

97 (token, request_remote) = key 

98 if request_remote == remote: 

99 stoppers.append( 

100 lambda request=request, exception=exception: request.add_exception( 

101 exception 

102 ) 

103 ) 

104 

105 for (_, _r), (_, stopper) in self.incoming_requests.items(): 

106 if remote == _r: 

107 stoppers.append(stopper) 

108 for stopper in stoppers: 

109 stopper() 

110 

111 def process_request(self, request): 

112 key = (request.token, request.remote) 

113 

114 if key in self.incoming_requests: 

115 # This is either a "I consider that token invalid, probably forgot 

116 # about it, but here's a new request" or renewed interest in an 

117 # observation, which gets modelled as a new request at thislevel 

118 self.log.debug("Incoming request overrides existing request") 

119 # Popping: FIXME Decide if one of them is sufficient (see `del self.incoming_requests[key]` below) 

120 (pipe, stop) = self.incoming_requests.pop(key) 

121 stop() 

122 

123 pipe = Pipe(request, self.log) 

124 

125 # FIXME: what can we pass down to the token_interface? certainly not 

126 # the request, but maybe the request with a response filter applied? 

127 def on_event(ev): 

128 if ev.message is not None: 

129 m = ev.message 

130 # FIXME: should this code warn if token or remote are set? 

131 m.token = request.token 

132 m.remote = request.remote.as_response_address() 

133 

134 # The token interface may use information from that, eg. 

135 # whether the request was sent reliably or not. 

136 m.request = request 

137 

138 self.token_interface.send_message( 

139 m, 

140 # No more interest from *that* remote; as it's the only 

141 # thing keeping the PR alive, it'll go its course of 

142 # vanishing for lack of interest (as it would if 

143 # stop were called from its other possible caller, 

144 # the start of process_request when a new request comes 

145 # in on the same token) 

146 stop, 

147 ) 

148 else: 

149 # It'd be tempting to raise here, but typically being called 

150 # from a task, it wouldn't propagate any further either, and at 

151 # least here we have a logger. 

152 self.log.error( 

153 "Requests shouldn't receive errors at the level of a TokenManager any more, but this did: %s", 

154 ev, 

155 ) 

156 if not ev.is_last: 

157 return True 

158 

159 def on_end(): 

160 if key in self.incoming_requests: 

161 # It may not be, especially if it was popped in `(pipe, stop) = self.incoming_requests.pop(keyu)` above 

162 # FIXME Decide if one of them is sufficient 

163 del self.incoming_requests[key] 

164 # no further cleanup to do here: any piggybackable ack was already flushed 

165 # out by the first response, and if there was not even a 

166 # NoResponse, something went wrong above (and we can't tell easily 

167 # here). 

168 

169 stop = pipe.on_event(on_event) 

170 pipe.on_interest_end(on_end) 

171 

172 self.incoming_requests[key] = (pipe, stop) 

173 

174 self.context.render_to_pipe(pipe) 

175 

176 def process_response(self, response): 

177 key = (response.token, response.remote) 

178 if key not in self.outgoing_requests: 

179 # maybe it was a multicast... 

180 key = (response.token, None) 

181 

182 try: 

183 request = self.outgoing_requests[key] 

184 except KeyError: 

185 self.log.info("Response %r could not be matched to any request", response) 

186 return False 

187 else: 

188 self.log.debug("Response %r matched to request %r", response, request) 

189 

190 # FIXME: there's a multicast aspect to that as well 

191 # 

192 # Is it necessary to look into .opt.observe here, wouldn't that better 

193 # be done by the higher-level code that knows about CoAP options? 

194 # Maybe, but at some point in TokenManager we *have* to look into the 

195 # options to see whether to expect a short- or long-running token. 

196 # Still, it would be an option not to send an is_last here and *always* 

197 # have the higher-level code indicate loss of interest in that exchange 

198 # when it detects that no more observations will follow. 

199 final = not ( 

200 request.request.opt.observe == 0 and response.opt.observe is not None 

201 ) 

202 

203 if final: 

204 self.outgoing_requests.pop(key) 

205 

206 request.add_response(response, is_last=final) 

207 return True 

208 

209 # 

210 # implement RequestInterface 

211 # 

212 

213 async def fill_or_recognize_remote(self, message): 

214 return await self.token_interface.fill_or_recognize_remote(message) 

215 

216 def request(self, request): 

217 if self.outgoing_requests is None: 

218 request.add_exception(error.LibraryShutdown()) 

219 return 

220 

221 msg = request.request 

222 

223 assert msg.code.is_request(), "Message code is not valid for request" 

224 

225 # This might easily change, but right now, relying on the Context to 

226 # fill_remote early makes steps easier here. 

227 assert msg.remote is not None, "Remote not pre-populated" 

228 

229 # FIXME: pick a suitably short one where available, and a longer one 

230 # for observations if many short ones are already in-flight 

231 msg.token = self.next_token() 

232 

233 self.log.debug( 

234 "Sending request - Token: %s, Remote: %s", msg.token.hex(), msg.remote 

235 ) 

236 

237 # A request sent over the multicast interface will only return a single 

238 # response and otherwise behave quite like an anycast request (which is 

239 # probably intended). 

240 if msg.remote.is_multicast: 

241 self.log.warning("Sending request to multicast via unicast request method") 

242 key = (msg.token, None) 

243 else: 

244 key = (msg.token, msg.remote) 

245 

246 self.outgoing_requests[key] = request 

247 request.on_interest_end( 

248 functools.partial(self.outgoing_requests.pop, key, None) 

249 ) 

250 

251 try: 

252 send_canceller = self.token_interface.send_message( 

253 msg, lambda: request.add_exception(error.MessageError) 

254 ) 

255 except Exception as e: 

256 request.add_exception(e) 

257 return 

258 

259 if send_canceller is not None: 

260 # This needs to be called both when the requester cancels the 

261 # request, and when a response to the CON request comes in via a 

262 # different CON when the original ACK was lost, so the retransmits 

263 # can stop. 

264 # 

265 # FIXME: This might need a little sharper conditions: A fresh CON 

266 # should be sufficient to stop retransmits of a CON in a first 

267 # request, but when refreshing an observation, only an ACK tells us 

268 # that the updated observation got through. Also, multicast needs 

269 # to be an exception, but that generally needs handling here. 

270 # 

271 # It may be that it'd be wise to reduce the use of send_canceller 

272 # to situations when the request is actualy cancelled, and pass 

273 # some information to the token_interface about whether it should 

274 # keep an eye out for responses on that token and cancel 

275 # transmission accordingly. 

276 request.on_event(lambda ev: (send_canceller(), False)[1], is_interest=False)