Coverage for src/aiocoap/pipe.py: 0%

119 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-12 11:18 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5import asyncio 

6from collections import namedtuple 

7import functools 

8 

9from . import error 

10from .numbers import INTERNAL_SERVER_ERROR 

11 

12 

13class Pipe: 

14 """Low-level meeting point between a request and a any responses that come 

15 back on it. 

16 

17 A single request message is placed in the Pipe at creation time. 

18 Any responses, as well as any exception happening in the course of 

19 processing, are passed back to the requester along the Pipe. A 

20 response can carry an indication of whether it is final; an exception 

21 always is. 

22 

23 This object is used both on the client side (where the Context on behalf of 

24 the application creates a Pipe and passes it to the network 

25 transports that send the request and fill in any responses) and on the 

26 server side (where the Context creates one for an incoming request and 

27 eventually lets the server implementation populate it with responses). 

28 

29 This currently follows a callback dispatch style. (It may be developed into 

30 something where only awaiting a response drives the proces, though). 

31 

32 Currently, the requester sets up the object, connects callbacks, and then 

33 passes the Pipe on to whatever creates the response. 

34 

35 The creator of responses is notified by the Pipe of a loss of 

36 interest in a response when there are no more callback handlers registered 

37 by registering an on_interest_end callback. As the response callbacks need 

38 to be already in place when the Pipe is passed on to the 

39 responder, the absence event callbacks is signalled by callign the callback 

40 immediately on registration. 

41 

42 To accurately model "loss of interest", it is important to use the 

43 two-phase setup of first registering actual callbacks and then producing 

44 events and/or placing on_interest_end callbacks; this is not clearly 

45 expressed in type or state yet. (One possibility would be for the 

46 Pipe to carry a preparation boolean, and which prohibits event 

47 sending during preparation and is_interest=True callback creation 

48 afterwards). 

49 

50 This was previously named PlumbingRequest. 

51 

52 **Stability** 

53 

54 Sites and resources implemented by provinding a 

55 :meth:`~aiocoap.interfaces.Resource.render_to_pipe` method can stably use 

56 the :meth:`add_response` method of a Pipe (or something that quacks like 

57 it). 

58 

59 They should not rely on :meth:`add_exception` but rather just raise the 

60 exception, and neither register :meth:`on_event` handlers (being the sole 

61 producer of events) nor hook to :meth:`on_interest_end` (instead, they can 

62 use finally clauses or async context managers to handle any cleanup when 

63 the cancellation of the render task indicates the peer's loss of interest). 

64 """ 

65 

66 Event = namedtuple("Event", ("message", "exception", "is_last")) 

67 

68 # called by the initiator of the request 

69 

70 def __init__(self, request, log): 

71 self.request = request 

72 self.log = log 

73 

74 self._event_callbacks = [] 

75 """list[(callback, is_interest)], or None during event processing, or 

76 False when there were no more event callbacks and an the 

77 on_interest_end callbacks have already been called""" 

78 

79 def __repr__(self): 

80 return "<%s at %#x around %r with %r callbacks (thereof %r interests)>" % ( 

81 type(self).__name__, 

82 id(self), 

83 self.request, 

84 len(self._event_callbacks) 

85 if self._event_callbacks 

86 else self._event_callbacks, 

87 sum(1 for (e, is_interest) in self._event_callbacks if is_interest) 

88 if self._event_callbacks 

89 else self._event_callbacks, 

90 ) 

91 

92 def _any_interest(self): 

93 return any(is_interest for (cb, is_interest) in self._event_callbacks) 

94 

95 def poke(self): 

96 """Ask the responder for a life sign. It is up to the responder to 

97 ignore this (eg. because the responder is the library/application and 

98 can't be just gone), to issue a generic transport-dependent 'ping' to 

99 see whether the connection is still alive, or to retransmit the request 

100 if it is an observation over an unreliable channel. 

101 

102 In any case, no status is reported directly to the poke, but if 

103 whatever the responder does fails, it will send an appropriate error 

104 message as a response.""" 

105 raise NotImplementedError() 

106 

107 def on_event(self, callback, is_interest=True): 

108 """Call callback on any event. The callback must return True to be 

109 called again after an event. Callbacks must not produce new events or 

110 deregister unrelated event handlers. 

111 

112 If is_interest=False, the callback will not be counted toward the 

113 active callbacks, and will receive a (None, None, is_last=True) event 

114 eventually. 

115 

116 To unregister the handler, call the returned closure; this can trigger 

117 on_interest_end callbacks. 

118 """ 

119 self._event_callbacks.append((callback, is_interest)) 

120 return functools.partial(self._unregister_on_event, callback) 

121 

122 def _unregister_on_event(self, callback): 

123 if self._event_callbacks is False: 

124 # They wouldn't be called any more so they're already dropped.a 

125 # It's OK that the caller cleans up after itself: Sure it could 

126 # register an on_interest_end, but that's really not warranted if 

127 # all it wants to know is whether it'll have to execute cleanup 

128 # when it's shutting down or not. 

129 return 

130 

131 self._event_callbacks = [ 

132 (cb, i) for (cb, i) in self._event_callbacks if callback is not cb 

133 ] 

134 if not self._any_interest(): 

135 self._end() 

136 

137 def on_interest_end(self, callback): 

138 """Register a callback that will be called exactly once -- either right 

139 now if there is not even a current indicated interest, or at a last 

140 event, or when no more interests are present""" 

141 

142 if self._event_callbacks is False: 

143 # Happens, for example, when a proxy receives multiple requests on a single token 

144 self.log.warning( 

145 "on_interest_end callback %r added after %r has already ended", 

146 callback, 

147 self, 

148 ) 

149 callback() 

150 return 

151 

152 if self._any_interest(): 

153 self._event_callbacks.append( 

154 ( 

155 lambda e: ((callback(), False) if e.is_last else (None, True))[1], 

156 False, 

157 ) 

158 ) 

159 else: 

160 callback() 

161 

162 def _end(self): 

163 cbs = self._event_callbacks 

164 self._event_callbacks = False 

165 tombstone = self.Event(None, None, True) 

166 [cb(tombstone) for (cb, _) in cbs] 

167 

168 # called by the responding side 

169 

170 def _add_event(self, event): 

171 if self._event_callbacks is False: 

172 if event.exception is not None: 

173 self.log.error( 

174 "Discarded exception in %r added after %r has ended", 

175 event, 

176 self, 

177 exception=event.exception, 

178 ) 

179 else: 

180 # Happens, for example, when a proxy receives multiple requests on a single token 

181 self.log.warning( 

182 "Response %r added after %r has already ended", event, self 

183 ) 

184 return 

185 

186 for cb, is_interest in self._event_callbacks[:]: 

187 keep_calling = cb(event) 

188 if not keep_calling: 

189 if self._event_callbacks is False: 

190 # All interest was just lost during the callback 

191 return 

192 

193 self._event_callbacks.remove((cb, is_interest)) 

194 

195 if not self._any_interest(): 

196 self._end() 

197 

198 def add_response(self, response, is_last=False): 

199 self._add_event(self.Event(response, None, is_last)) 

200 

201 def add_exception(self, exception): 

202 self._add_event(self.Event(None, exception, True)) 

203 

204 

205def run_driving_pipe(pipe, coroutine, name=None): 

206 """Create a task from a coroutine where the end of the coroutine produces a 

207 terminal event on the pipe, and lack of interest in the pipe cancels the 

208 task. 

209 

210 The coroutine will typically produce output into the pipe; that 

211 connection is set up by the caller like as in 

212 ``run_driving_pipe(pipe, render_to(pipe))``. 

213 

214 The create task is not returned, as the only sensible operation on it would 

215 be cancellation and that's already set up from the pipe. 

216 """ 

217 

218 async def wrapped(): 

219 try: 

220 await coroutine 

221 except Exception as e: 

222 pipe.add_exception(e) 

223 # Not doing anything special about cancellation: it indicates the 

224 # peer's loss of interest, so there's no use in sending anythign out to 

225 # someone not listening any more 

226 

227 task = asyncio.create_task( 

228 wrapped(), 

229 name=name, 

230 ) 

231 pipe.on_interest_end(task.cancel) 

232 

233 

234def error_to_message(old_pr, log): 

235 """Given a pipe set up by the requester, create a new pipe to pass on to a 

236 responder. 

237 

238 Any exceptions produced by the responder will be turned into terminal 

239 responses on the original pipe, and loss of interest is forwarded.""" 

240 

241 from .message import Message 

242 

243 next_pr = Pipe(old_pr.request, log) 

244 

245 def on_event(event): 

246 if event.message is not None: 

247 old_pr.add_response(event.message, event.is_last) 

248 return not event.is_last 

249 

250 e = event.exception 

251 

252 if isinstance(e, error.RenderableError): 

253 # the repr() here is quite imporant for garbage collection 

254 log.info( 

255 "Render request raised a renderable error (%s), responding accordingly.", 

256 repr(e), 

257 ) 

258 try: 

259 msg = e.to_message() 

260 if msg is None: 

261 # This deserves a separate check because the ABC checks 

262 # that should ensure that the default to_message method is 

263 # never used in concrete classes fails due to the metaclass 

264 # conflict between ABC and Exceptions 

265 raise ValueError( 

266 "Exception to_message failed to produce a message on %r" % e 

267 ) 

268 except Exception as e2: 

269 log.error( 

270 "Rendering the renderable exception failed: %r", e2, exc_info=e2 

271 ) 

272 msg = Message(code=INTERNAL_SERVER_ERROR) 

273 old_pr.add_response(msg, is_last=True) 

274 else: 

275 log.error( 

276 "An exception occurred while rendering a resource: %r", e, exc_info=e 

277 ) 

278 old_pr.add_response(Message(code=INTERNAL_SERVER_ERROR), is_last=True) 

279 

280 return False 

281 

282 remove_interest = next_pr.on_event(on_event) 

283 old_pr.on_interest_end(remove_interest) 

284 return next_pr 

285 

286 

287class IterablePipe: 

288 """A stand-in for a Pipe that the requesting party can use 

289 instead. It should behave just like a Pipe to the responding 

290 party, but the caller does not register on_event handlers and instead 

291 iterates asynchronously over the events. 

292 

293 Note that the PR can be aitered over only once, and does not support any 

294 additional hook settings once asynchronous iteration is started; this is 

295 consistent with the usage pattern of pipes. 

296 """ 

297 

298 def __init__(self, request): 

299 self.request = request 

300 

301 self.__on_interest_end = [] 

302 

303 # FIXME: This is unbounded -- pipes should gain support for 

304 # backpressure. 

305 self.__queue = asyncio.Queue() 

306 

307 def on_interest_end(self, callback): 

308 try: 

309 self.__on_interest_end.append(callback) 

310 except AttributeError: 

311 raise RuntimeError( 

312 "Attempted to declare interest in the end of a IterablePipe on which iteration already started" 

313 ) from None 

314 

315 def __aiter__(self): 

316 i = self.Iterator(self.__queue, self.__on_interest_end) 

317 del self.__on_interest_end 

318 return i 

319 

320 def _add_event(self, e): 

321 self.__queue.put_nowait(e) 

322 

323 def add_response(self, response, is_last=False): 

324 self._add_event(Pipe.Event(response, None, is_last)) 

325 

326 def add_exception(self, exception): 

327 self._add_event(Pipe.Event(None, exception, True)) 

328 

329 class Iterator: 

330 def __init__(self, queue, on_interest_end): 

331 self.__queue = queue 

332 self.__on_interest_end = on_interest_end 

333 

334 async def __anext__(self): 

335 return await self.__queue.get() 

336 

337 def __del__(self): 

338 # This is pretty reliable as the iterator is only created and 

339 # referenced in the desugaring of the `async for`. 

340 for c in self.__on_interest_end: 

341 c()