Coverage for aiocoap/pipe.py: 89%

117 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-28 12:34 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5import asyncio 

6from collections import namedtuple 

7import functools 

8 

9from . import error 

10from .numbers import INTERNAL_SERVER_ERROR 

11 

12 

13class Pipe: 

14 """Low-level meeting point between a request and a any responses that come 

15 back on it. 

16 

17 A single request message is placed in the Pipe at creation time. 

18 Any responses, as well as any exception happening in the course of 

19 processing, are passed back to the requester along the Pipe. A 

20 response can carry an indication of whether it is final; an exception 

21 always is. 

22 

23 This object is used both on the client side (where the Context on behalf of 

24 the application creates a Pipe and passes it to the network 

25 transports that send the request and fill in any responses) and on the 

26 server side (where the Context creates one for an incoming request and 

27 eventually lets the server implementation populate it with responses). 

28 

29 This currently follows a callback dispatch style. (It may be developed into 

30 something where only awaiting a response drives the proces, though). 

31 

32 Currently, the requester sets up the object, connects callbacks, and then 

33 passes the Pipe on to whatever creates the response. 

34 

35 The creator of responses is notified by the Pipe of a loss of 

36 interest in a response when there are no more callback handlers registered 

37 by registering an on_interest_end callback. As the response callbacks need 

38 to be already in place when the Pipe is passed on to the 

39 responder, the absence event callbacks is signalled by callign the callback 

40 immediately on registration. 

41 

42 To accurately model "loss of interest", it is important to use the 

43 two-phase setup of first registering actual callbacks and then producing 

44 events and/or placing on_interest_end callbacks; this is not clearly 

45 expressed in type or state yet. (One possibility would be for the 

46 Pipe to carry a preparation boolean, and which prohibits event 

47 sending during preparation and is_interest=True callback creation 

48 afterwards). 

49 

50 This was previously named PlumbingRequest. 

51 

52 **Stability** 

53 

54 Sites and resources implemented by provinding a 

55 :meth:`~aiocoap.interfaces.Resource.render_to_pipe` method can stably use 

56 the :meth:`add_response` method of a Pipe (or something that quacks like 

57 it). 

58 

59 They should not rely on :meth:`add_exception` but rather just raise the 

60 exception, and neither register :meth:`on_event` handlers (being the sole 

61 producer of events) nor hook to :meth:`on_interest_end` (instead, they can 

62 use finally clauses or async context managers to handle any cleanup when 

63 the cancellation of the render task indicates the peer's loss of interest). 

64 """ 

65 

66 Event = namedtuple("Event", ("message", "exception", "is_last")) 

67 

68 # called by the initiator of the request 

69 

70 def __init__(self, request, log): 

71 self.request = request 

72 self.log = log 

73 

74 self._event_callbacks = [] 

75 """list[(callback, is_interest)], or None during event processing, or 

76 False when there were no more event callbacks and an the 

77 on_interest_end callbacks have already been called""" 

78 

79 def __repr__(self): 

80 return "<%s at %#x around %r with %r callbacks (thereof %r interests)>" % ( 

81 type(self).__name__, 

82 id(self), 

83 self.request, 

84 len(self._event_callbacks) 

85 if self._event_callbacks 

86 else self._event_callbacks, 

87 sum(1 for (e, is_interest) in self._event_callbacks if is_interest) 

88 if self._event_callbacks 

89 else self._event_callbacks, 

90 ) 

91 

92 def _any_interest(self): 

93 return any(is_interest for (cb, is_interest) in self._event_callbacks) 

94 

95 def poke(self): 

96 """Ask the responder for a life sign. It is up to the responder to 

97 ignore this (eg. because the responder is the library/application and 

98 can't be just gone), to issue a generic transport-dependent 'ping' to 

99 see whether the connection is still alive, or to retransmit the request 

100 if it is an observation over an unreliable channel. 

101 

102 In any case, no status is reported directly to the poke, but if 

103 whatever the responder does fails, it will send an appropriate error 

104 message as a response.""" 

105 raise NotImplementedError() 

106 

107 def on_event(self, callback, is_interest=True): 

108 """Call callback on any event. The callback must return True to be 

109 called again after an event. Callbacks must not produce new events or 

110 deregister unrelated event handlers. 

111 

112 If is_interest=False, the callback will not be counted toward the 

113 active callbacks, and will receive a (None, None, is_last=True) event 

114 eventually. 

115 

116 To unregister the handler, call the returned closure; this can trigger 

117 on_interest_end callbacks. 

118 """ 

119 self._event_callbacks.append((callback, is_interest)) 

120 return functools.partial(self._unregister_on_event, callback) 

121 

122 def _unregister_on_event(self, callback): 

123 if self._event_callbacks is False: 

124 # They wouldn't be called any more so they're already dropped.a 

125 # It's OK that the caller cleans up after itself: Sure it could 

126 # register an on_interest_end, but that's really not warranted if 

127 # all it wants to know is whether it'll have to execute cleanup 

128 # when it's shutting down or not. 

129 return 

130 

131 self._event_callbacks = [ 

132 (cb, i) for (cb, i) in self._event_callbacks if callback is not cb 

133 ] 

134 if not self._any_interest(): 

135 self._end() 

136 

137 def on_interest_end(self, callback): 

138 """Register a callback that will be called exactly once -- either right 

139 now if there is not even a current indicated interest, or at a last 

140 event, or when no more interests are present""" 

141 

142 if self._event_callbacks is False: 

143 # Happens, for example, when a proxy receives multiple requests on a single token 

144 self.log.warning( 

145 "on_interest_end callback %r added after %r has already ended", 

146 callback, 

147 self, 

148 ) 

149 callback() 

150 return 

151 

152 if self._any_interest(): 

153 self._event_callbacks.append( 

154 ( 

155 lambda e: ((callback(), False) if e.is_last else (None, True))[1], 

156 False, 

157 ) 

158 ) 

159 else: 

160 callback() 

161 

162 def _end(self): 

163 cbs = self._event_callbacks 

164 self._event_callbacks = False 

165 tombstone = self.Event(None, None, True) 

166 [cb(tombstone) for (cb, _) in cbs] 

167 

168 # called by the responding side 

169 

170 def _add_event(self, event): 

171 if self._event_callbacks is False: 

172 # Happens, for example, when a proxy receives multiple requests on a single token 

173 self.log.warning( 

174 "Response %r added after %r has already ended", event, self 

175 ) 

176 return 

177 

178 for cb, is_interest in self._event_callbacks[:]: 

179 keep_calling = cb(event) 

180 if not keep_calling: 

181 if self._event_callbacks is False: 

182 # All interest was just lost during the callback 

183 return 

184 

185 self._event_callbacks.remove((cb, is_interest)) 

186 

187 if not self._any_interest(): 

188 self._end() 

189 

190 def add_response(self, response, is_last=False): 

191 self._add_event(self.Event(response, None, is_last)) 

192 

193 def add_exception(self, exception): 

194 self._add_event(self.Event(None, exception, True)) 

195 

196 

197def run_driving_pipe(pipe, coroutine, name=None): 

198 """Create a task from a coroutine where the end of the coroutine produces a 

199 terminal event on the pipe, and lack of interest in the pipe cancels the 

200 task. 

201 

202 The coroutine will typically produce output into the pipe; that 

203 connection is set up by the caller like as in 

204 ``run_driving_pipe(pipe, render_to(pipe))``. 

205 

206 The create task is not returned, as the only sensible operation on it would 

207 be cancellation and that's already set up from the pipe. 

208 """ 

209 

210 async def wrapped(): 

211 try: 

212 await coroutine 

213 except Exception as e: 

214 pipe.add_exception(e) 

215 # Not doing anything special about cancellation: it indicates the 

216 # peer's loss of interest, so there's no use in sending anythign out to 

217 # someone not listening any more 

218 

219 task = asyncio.create_task( 

220 wrapped(), 

221 name=name, 

222 ) 

223 pipe.on_interest_end(task.cancel) 

224 

225 

226def error_to_message(old_pr, log): 

227 """Given a pipe set up by the requester, create a new pipe to pass on to a 

228 responder. 

229 

230 Any exceptions produced by the responder will be turned into terminal 

231 responses on the original pipe, and loss of interest is forwarded.""" 

232 

233 from .message import Message 

234 

235 next_pr = Pipe(old_pr.request, log) 

236 

237 def on_event(event): 

238 if event.message is not None: 

239 old_pr.add_response(event.message, event.is_last) 

240 return not event.is_last 

241 

242 e = event.exception 

243 

244 if isinstance(e, error.RenderableError): 

245 # the repr() here is quite imporant for garbage collection 

246 log.info( 

247 "Render request raised a renderable error (%s), responding accordingly.", 

248 repr(e), 

249 ) 

250 try: 

251 msg = e.to_message() 

252 if msg is None: 

253 # This deserves a separate check because the ABC checks 

254 # that should ensure that the default to_message method is 

255 # never used in concrete classes fails due to the metaclass 

256 # conflict between ABC and Exceptions 

257 raise ValueError( 

258 "Exception to_message failed to produce a message on %r" % e 

259 ) 

260 except Exception as e2: 

261 log.error( 

262 "Rendering the renderable exception failed: %r", e2, exc_info=e2 

263 ) 

264 msg = Message(code=INTERNAL_SERVER_ERROR) 

265 old_pr.add_response(msg, is_last=True) 

266 else: 

267 log.error( 

268 "An exception occurred while rendering a resource: %r", e, exc_info=e 

269 ) 

270 old_pr.add_response(Message(code=INTERNAL_SERVER_ERROR), is_last=True) 

271 

272 return False 

273 

274 remove_interest = next_pr.on_event(on_event) 

275 old_pr.on_interest_end(remove_interest) 

276 return next_pr 

277 

278 

279class IterablePipe: 

280 """A stand-in for a Pipe that the requesting party can use 

281 instead. It should behave just like a Pipe to the responding 

282 party, but the caller does not register on_event handlers and instead 

283 iterates asynchronously over the events. 

284 

285 Note that the PR can be aitered over only once, and does not support any 

286 additional hook settings once asynchronous iteration is started; this is 

287 consistent with the usage pattern of pipes. 

288 """ 

289 

290 def __init__(self, request): 

291 self.request = request 

292 

293 self.__on_interest_end = [] 

294 

295 # FIXME: This is unbounded -- pipes should gain support for 

296 # backpressure. 

297 self.__queue = asyncio.Queue() 

298 

299 def on_interest_end(self, callback): 

300 try: 

301 self.__on_interest_end.append(callback) 

302 except AttributeError: 

303 raise RuntimeError( 

304 "Attempted to declare interest in the end of a IterablePipe on which iteration already started" 

305 ) from None 

306 

307 def __aiter__(self): 

308 i = self.Iterator(self.__queue, self.__on_interest_end) 

309 del self.__on_interest_end 

310 return i 

311 

312 def _add_event(self, e): 

313 self.__queue.put_nowait(e) 

314 

315 def add_response(self, response, is_last=False): 

316 self._add_event(Pipe.Event(response, None, is_last)) 

317 

318 def add_exception(self, exception): 

319 self._add_event(Pipe.Event(None, exception, True)) 

320 

321 class Iterator: 

322 def __init__(self, queue, on_interest_end): 

323 self.__queue = queue 

324 self.__on_interest_end = on_interest_end 

325 

326 async def __anext__(self): 

327 return await self.__queue.get() 

328 

329 def __del__(self): 

330 # This is pretty reliable as the iterator is only created and 

331 # referenced in the desugaring of the `async for`. 

332 for c in self.__on_interest_end: 

333 c()