Coverage for aiocoap / protocol.py: 89%

493 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-10 10:42 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module contains the classes that are responsible for keeping track of 

6messages: 

7 

8* :class:`Context` roughly represents the CoAP endpoint (basically a UDP 

9 socket) -- something that can send requests and possibly can answer 

10 incoming requests. 

11 

12 Incoming requests are processed in tasks created by the context. 

13 

14* a :class:`Request` gets generated whenever a request gets sent to keep 

15 track of the response 

16 

17Logging 

18~~~~~~~ 

19 

20Several constructors of the Context accept a logger name; these names go into 

21the construction of a Python logger. 

22 

23Log events will be emitted to these on different levels, with "warning" and 

24above being a practical default for things that should may warrant reviewing by 

25an operator: 

26 

27* DEBUG is used for things that occur even under perfect conditions. 

28* INFO is for things that are well expected, but might be interesting during 

29 testing a network of nodes and not just when debugging the library. (This 

30 includes timeouts, retransmissions, and pings.) 

31* WARNING is for everything that indicates a malbehaved peer. These don't 

32 *necessarily* indicate a client bug, though: Things like requesting a 

33 nonexistent block can just as well happen when a resource's content has 

34 changed between blocks. The library will not go out of its way to determine 

35 whether there is a plausible explanation for the odd behavior, and will 

36 report something as a warning in case of doubt. 

37* ERROR is used when something clearly went wrong. This includes irregular 

38 connection terminations and resource handler errors (which are demoted to 

39 error responses), and can often contain a backtrace. 

40 

41Logs will generally reveal messages exchanged between this and other systems, 

42and attackers can observe their encrypted counterparts. Private or shared keys 

43are only logged through an internal `log_secret` function, which usually 

44replaces them with a redacted value. Setting the ``AIOCOAP_REVEAL_KEYS`` 

45environment variable to the value ``show secrets in logs`` bypasses that 

46mechanism. As an additional precaution, this is only accepted if the effective 

47user has write access to the aiocoap source code. 

48""" 

49 

50import asyncio 

51import weakref 

52import time 

53 

54from . import defaults 

55from .credentials import CredentialsMap 

56from .message import Message 

57from .messagemanager import MessageManager 

58from .tokenmanager import TokenManager 

59from .pipe import Pipe, run_driving_pipe, error_to_message 

60from . import interfaces 

61from . import error 

62from .numbers import INTERNAL_SERVER_ERROR, NOT_FOUND, CONTINUE, SHUTDOWN_TIMEOUT 

63from .transport_params import TransportParameters 

64 

65import warnings 

66import logging 

67 

68 

69class Context(interfaces.RequestProvider): 

70 """Applications' entry point to the network 

71 

72 A :class:`.Context` coordinates one or more network :mod:`.transports` 

73 implementations and dispatches data between them and the application. 

74 

75 The application can start requests using the message dispatch methods, and 

76 set a :class:`resources.Site` that will answer requests directed to the 

77 application as a server. 

78 

79 On the library-internals side, it is the prime implementation of the 

80 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and 

81 :class:`Response` classes on demand, and decides which transport 

82 implementations to start and which are to handle which messages. 

83 

84 **Context creation and destruction** 

85 

86 The following functions are provided for creating and stopping a context: 

87 

88 .. note:: 

89 

90 A typical application should only ever create one context, even (or 

91 especially when) it acts both as a server and as a client (in which 

92 case a server context should be created). 

93 

94 A context that is not used any more must be shut down using 

95 :meth:`.shutdown()`, but typical applications will not need to because 

96 they use the context for the full process lifetime. 

97 

98 .. automethod:: create_client_context 

99 .. automethod:: create_server_context 

100 

101 .. automethod:: shutdown 

102 

103 **Dispatching messages** 

104 

105 CoAP requests can be sent using the following functions: 

106 

107 .. automethod:: request 

108 

109 If more control is needed, you can create a :class:`Request` yourself and 

110 pass the context to it. 

111 

112 

113 **Other methods and properties** 

114 

115 The remaining methods and properties are to be considered unstable even 

116 when the project reaches a stable version number; please file a feature 

117 request for stabilization if you want to reliably access any of them. 

118 """ 

119 

120 def __init__( 

121 self, 

122 loop=None, 

123 serversite=None, 

124 loggername="coap", 

125 client_credentials=None, 

126 server_credentials=None, 

127 ): 

128 self.log = logging.getLogger(loggername) 

129 

130 self.loop = loop or asyncio.get_event_loop() 

131 

132 self.serversite = serversite 

133 

134 self.request_interfaces = [] 

135 

136 self.client_credentials = client_credentials or CredentialsMap() 

137 self.server_credentials = server_credentials or CredentialsMap() 

138 

139 # 

140 # convenience methods for class instantiation 

141 # 

142 

143 async def _append_tokenmanaged_messagemanaged_transport( 

144 self, message_interface_constructor 

145 ): 

146 tman = TokenManager(self) 

147 mman = MessageManager(tman) 

148 transport = await message_interface_constructor(mman) 

149 

150 mman.message_interface = transport 

151 tman.token_interface = mman 

152 

153 self.request_interfaces.append(tman) 

154 

155 async def _append_tokenmanaged_transport(self, token_interface_constructor): 

156 tman = TokenManager(self) 

157 transport = await token_interface_constructor(tman) 

158 

159 tman.token_interface = transport 

160 

161 self.request_interfaces.append(tman) 

162 

163 @classmethod 

164 async def create_client_context( 

165 cls, 

166 *, 

167 loggername="coap", 

168 loop=None, 

169 transports: TransportParameters | None | dict | list[str] = None, 

170 ): 

171 """Create a context bound to all addresses on a random listening port. 

172 

173 This is the easiest way to get a context suitable for sending client 

174 requests. 

175 

176 :meta private: 

177 (not actually private, just hiding from automodule due to being 

178 grouped with the important functions) 

179 """ 

180 

181 if loop is None: 

182 loop = asyncio.get_event_loop() 

183 

184 self = cls(loop=loop, serversite=None, loggername=loggername) 

185 

186 selected_transports = transports or list( 

187 defaults.get_default_clienttransports(loop=loop) 

188 ) 

189 selected_transports = TransportParameters._compat_create(selected_transports) 

190 

191 # FIXME make defaults overridable (postponed until they become configurable too) 

192 if selected_transports.oscore: 

193 from .transports.oscore import TransportOSCORE 

194 

195 oscoretransport = TransportOSCORE(self, self) 

196 self.request_interfaces.append(oscoretransport) 

197 if selected_transports.udp6: 

198 from .transports.udp6 import MessageInterfaceUDP6 

199 

200 await self._append_tokenmanaged_messagemanaged_transport( 

201 lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint( 

202 mman, log=self.log, loop=loop 

203 ) 

204 ) 

205 if selected_transports.simple6: 

206 from .transports.simple6 import MessageInterfaceSimple6 

207 

208 await self._append_tokenmanaged_messagemanaged_transport( 

209 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint( 

210 mman, log=self.log, loop=loop 

211 ) 

212 ) 

213 if selected_transports.tinydtls: 

214 from .transports.tinydtls import MessageInterfaceTinyDTLS 

215 

216 await self._append_tokenmanaged_messagemanaged_transport( 

217 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint( 

218 mman, log=self.log, loop=loop 

219 ) 

220 ) 

221 if selected_transports.tcpclient: 

222 from .transports.tcp import TCPClient 

223 

224 await self._append_tokenmanaged_transport( 

225 lambda tman: TCPClient.create_client_transport(tman, self.log, loop) 

226 ) 

227 if selected_transports.tlsclient: 

228 from .transports.tls import TLSClient 

229 

230 await self._append_tokenmanaged_transport( 

231 lambda tman: TLSClient.create_client_transport( 

232 tman, self.log, loop, self.client_credentials 

233 ) 

234 ) 

235 if selected_transports.ws: 

236 from .transports.ws import WSPool 

237 

238 await self._append_tokenmanaged_transport( 

239 lambda tman: WSPool.create_transport( 

240 tman, self.log, loop, client_credentials=self.client_credentials 

241 ) 

242 ) 

243 

244 return self 

245 

246 @classmethod 

247 async def create_server_context( 

248 cls, 

249 site, 

250 bind=None, 

251 *, 

252 loggername="coap-server", 

253 loop=None, 

254 _ssl_context=None, 

255 multicast=[], 

256 server_credentials=None, 

257 transports: TransportParameters | None | dict | list[str] = None, 

258 ): 

259 """Create a context, bound to all addresses on the CoAP port (unless 

260 otherwise specified in the ``bind`` argument). 

261 

262 This is the easiest way to get a context suitable both for sending 

263 client and accepting server requests. 

264 

265 The ``bind`` argument, if given, needs to be a 2-tuple of IP address 

266 string and port number, where the port number can be None to use the default port. 

267 

268 If ``multicast`` is given, it needs to be a list of (multicast address, 

269 interface name) tuples, which will all be joined. (The IPv4 style of 

270 selecting the interface by a local address is not supported; users may 

271 want to use the netifaces package to arrive at an interface name for an 

272 address). 

273 

274 As a shortcut, the list may also contain interface names alone. Those 

275 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with 

276 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6. 

277 

278 Under some circumstances you may already need a context to pass into 

279 the site for creation; this is typically the case for servers that 

280 trigger requests on their own. For those cases, it is usually easiest 

281 to pass None in as a site, and set the fully constructed site later by 

282 assigning to the ``serversite`` attribute. 

283 

284 :meta private: 

285 (not actually private, just hiding from automodule due to being 

286 grouped with the important functions) 

287 """ 

288 

289 if loop is None: 

290 loop = asyncio.get_event_loop() 

291 

292 self = cls( 

293 loop=loop, 

294 serversite=site, 

295 loggername=loggername, 

296 server_credentials=server_credentials, 

297 ) 

298 

299 multicast_done = not multicast 

300 

301 selected_transports = transports or list( 

302 defaults.get_default_servertransports(loop=loop) 

303 ) 

304 selected_transports = TransportParameters._compat_create(selected_transports) 

305 

306 if selected_transports.oscore: 

307 from .transports.oscore import TransportOSCORE 

308 

309 oscoretransport = TransportOSCORE(self, self) 

310 self.request_interfaces.append(oscoretransport) 

311 if selected_transports.udp6: 

312 from .transports.udp6 import MessageInterfaceUDP6 

313 

314 await self._append_tokenmanaged_messagemanaged_transport( 

315 lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint( 

316 mman, log=self.log, loop=loop, bind=bind, multicast=multicast 

317 ) 

318 ) 

319 multicast_done = True 

320 # FIXME this is duplicated from the client version, as those are client-only anyway 

321 if selected_transports.simple6: 

322 from .transports.simple6 import MessageInterfaceSimple6 

323 

324 await self._append_tokenmanaged_messagemanaged_transport( 

325 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint( 

326 mman, log=self.log, loop=loop 

327 ) 

328 ) 

329 elif selected_transports.tinydtls: 

330 from .transports.tinydtls import MessageInterfaceTinyDTLS 

331 

332 await self._append_tokenmanaged_messagemanaged_transport( 

333 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint( 

334 mman, log=self.log, loop=loop 

335 ) 

336 ) 

337 # FIXME end duplication 

338 if selected_transports.tinydtls_server: 

339 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer 

340 

341 await self._append_tokenmanaged_messagemanaged_transport( 

342 lambda mman: MessageInterfaceTinyDTLSServer.create_server( 

343 bind, 

344 mman, 

345 log=self.log, 

346 loop=loop, 

347 server_credentials=self.server_credentials, 

348 ) 

349 ) 

350 if selected_transports.simplesocketserver: 

351 from .transports.simplesocketserver import MessageInterfaceSimpleServer 

352 

353 await self._append_tokenmanaged_messagemanaged_transport( 

354 lambda mman: MessageInterfaceSimpleServer.create_server( 

355 bind, mman, log=self.log, loop=loop 

356 ) 

357 ) 

358 if selected_transports.tcpserver: 

359 from .transports.tcp import TCPServer 

360 

361 await self._append_tokenmanaged_transport( 

362 lambda tman: TCPServer.create_server(bind, tman, self.log, loop) 

363 ) 

364 if selected_transports.tcpclient: 

365 from .transports.tcp import TCPClient 

366 

367 await self._append_tokenmanaged_transport( 

368 lambda tman: TCPClient.create_client_transport(tman, self.log, loop) 

369 ) 

370 if selected_transports.tlsserver: 

371 if _ssl_context is not None: 

372 from .transports.tls import TLSServer 

373 

374 await self._append_tokenmanaged_transport( 

375 lambda tman: TLSServer.create_server( 

376 bind, tman, self.log, loop, _ssl_context 

377 ) 

378 ) 

379 if selected_transports.tlsclient: 

380 from .transports.tls import TLSClient 

381 

382 await self._append_tokenmanaged_transport( 

383 lambda tman: TLSClient.create_client_transport( 

384 tman, self.log, loop, self.client_credentials 

385 ) 

386 ) 

387 if selected_transports.ws: 

388 from .transports.ws import WSPool 

389 

390 await self._append_tokenmanaged_transport( 

391 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind 

392 lambda tman: WSPool.create_transport( 

393 tman, 

394 self.log, 

395 loop, 

396 client_credentials=self.client_credentials, 

397 server_bind=bind or (None, None), 

398 server_context=_ssl_context, 

399 ) 

400 ) 

401 

402 if not multicast_done: 

403 self.log.warning( 

404 "Multicast was requested, but no multicast capable transport was selected." 

405 ) 

406 

407 # This is used in tests to wait for externally launched servers to be ready 

408 self.log.debug("Server ready to receive requests") 

409 

410 return self 

411 

412 async def shutdown(self): 

413 """Take down any listening sockets and stop all related timers. 

414 

415 After this coroutine terminates, and once all external references to 

416 the object are dropped, it should be garbage-collectable. 

417 

418 This method takes up to 

419 :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing 

420 transports to perform any cleanup implemented in them (such as orderly 

421 connection shutdown and cancelling observations, where the latter is 

422 currently not implemented). 

423 

424 :meta private: 

425 (not actually private, just hiding from automodule due to being 

426 grouped with the important functions) 

427 """ 

428 

429 self.log.debug("Shutting down context") 

430 

431 done, pending = await asyncio.wait( 

432 [ 

433 asyncio.create_task( 

434 ri.shutdown(), 

435 name="Shutdown of %r" % ri, 

436 ) 

437 for ri in self.request_interfaces 

438 ], 

439 timeout=SHUTDOWN_TIMEOUT, 

440 ) 

441 for item in done: 

442 await item 

443 if pending: 

444 # Apart from being useful to see, this also ensures that developers 

445 # see the error in the logs during test suite runs -- and the error 

446 # should be easier to follow than the "we didn't garbage collect 

447 # everything" errors we see anyway (or otherwise, if the error is 

448 # escalated into a test failure) 

449 self.log.error( 

450 "Shutdown timeout exceeded, returning anyway. Interfaces still busy: %s", 

451 pending, 

452 ) 

453 

454 # FIXME: determine how official this should be, or which part of it is 

455 # public -- now that BlockwiseRequest uses it. (And formalize what can 

456 # change about messages and what can't after the remote has been thusly 

457 # populated). 

458 async def find_remote_and_interface(self, message): 

459 if message.remote is None: 

460 raise error.MissingRemoteError() 

461 for ri in self.request_interfaces: 

462 if await ri.fill_or_recognize_remote(message): 

463 return ri 

464 raise error.NoRequestInterface() 

465 

466 def request(self, request_message, handle_blockwise=True): 

467 if handle_blockwise: 

468 return BlockwiseRequest(self, request_message) 

469 

470 pipe = Pipe(request_message, self.log) 

471 # Request sets up callbacks at creation 

472 result = Request(pipe, self.loop, self.log) 

473 

474 async def send(): 

475 try: 

476 request_interface = await self.find_remote_and_interface( 

477 request_message 

478 ) 

479 request_interface.request(pipe) 

480 except Exception as e: 

481 pipe.add_exception(e) 

482 return 

483 

484 self.loop.create_task( 

485 send(), 

486 name="Request processing of %r" % result, 

487 ) 

488 return result 

489 

490 # the following are under consideration for moving into Site or something 

491 # mixed into it 

492 

493 def render_to_pipe(self, pipe): 

494 """Fill a pipe by running the site's render_to_pipe interface and 

495 handling errors.""" 

496 

497 pr_that_can_receive_errors = error_to_message(pipe, self.log) 

498 

499 run_driving_pipe( 

500 pr_that_can_receive_errors, 

501 self._render_to_pipe(pipe), 

502 name="Rendering for %r" % pipe.request, 

503 ) 

504 

505 async def _render_to_pipe(self, pipe): 

506 if self.serversite is None: 

507 pipe.add_response( 

508 Message(code=NOT_FOUND, payload=b"not a server"), is_last=True 

509 ) 

510 return 

511 

512 return await self.serversite.render_to_pipe(pipe) 

513 

514 

515class BaseRequest: 

516 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`""" 

517 

518 

519class BaseUnicastRequest(BaseRequest): 

520 """A utility class that offers the :attr:`response_raising` and 

521 :attr:`response_nonraising` alternatives to waiting for the 

522 :attr:`response` future whose error states can be presented either as an 

523 unsuccessful response (eg. 4.04) or an exception. 

524 

525 It also provides some internal tools for handling anything that has a 

526 :attr:`response` future and an :attr:`observation`""" 

527 

528 @property 

529 async def response_raising(self): 

530 """An awaitable that returns if a response comes in and is successful, 

531 otherwise raises generic network exception or a 

532 :class:`.error.ResponseWrappingError` for unsuccessful responses. 

533 

534 Experimental Interface.""" 

535 

536 response = await self.response 

537 if not response.code.is_successful(): 

538 raise error.ResponseWrappingError(response) 

539 

540 return response 

541 

542 @property 

543 async def response_nonraising(self): 

544 """An awaitable that rather returns a 500ish fabricated message (as a 

545 proxy would return) instead of raising an exception. 

546 

547 Experimental Interface.""" 

548 

549 # FIXME: Can we smuggle error_to_message into the underlying pipe? 

550 # That should make observe notifications into messages rather 

551 # than exceptions as well, plus it has fallbacks for `e.to_message()` 

552 # raising. 

553 

554 try: 

555 return await self.response 

556 except error.RenderableError as e: 

557 return e.to_message() 

558 except Exception: 

559 return Message(code=INTERNAL_SERVER_ERROR) 

560 

561 

562class Request(interfaces.Request, BaseUnicastRequest): 

563 def __init__(self, pipe, loop, log): 

564 self._pipe = pipe 

565 

566 self.response = loop.create_future() 

567 

568 if pipe.request.opt.observe == 0: 

569 self.observation = ClientObservation() 

570 else: 

571 self.observation = None 

572 

573 self._runner = self._run() 

574 self._runner.send(None) 

575 

576 def process(event): 

577 try: 

578 # would be great to have self or the runner as weak ref, but 

579 # see ClientObservation.register_callback comments -- while 

580 # that is around, we can't weakref here. 

581 self._runner.send(event) 

582 return True 

583 except StopIteration: 

584 return False 

585 

586 self._stop_interest = self._pipe.on_event(process) 

587 

588 self.log = log 

589 

590 self.response.add_done_callback(self._response_cancellation_handler) 

591 

592 def _response_cancellation_handler(self, response): 

593 # Propagate cancellation to the runner (if interest in the first 

594 # response is lost, there won't be observation items to pull out), but 

595 # not general completion (because if it's completed and not cancelled, 

596 # eg. when an observation is active) 

597 if self.response.cancelled() and self._runner is not None: 

598 # Dropping the only reference makes it stop with GeneratorExit, 

599 # similar to a cancelled task 

600 self._runner = None 

601 self._stop_interest() 

602 # Otherwise, there will be a runner still around, and it's its task to 

603 # call _stop_interest. 

604 

605 @staticmethod 

606 def _add_response_properties(response, request): 

607 response.request = request 

608 

609 def _run(self): 

610 # FIXME: This is in iterator form because it used to be a task that 

611 # awaited futures, and that code could be easily converted to an 

612 # iterator. I'm not sure that's a bad state here, but at least it 

613 # should be a more conscious decision to make this an iterator rather 

614 # than just having it happen to be one. 

615 # 

616 # FIXME: check that responses come from the same remmote as long as we're assuming unicast 

617 

618 first_event = yield None 

619 

620 if first_event.message is not None: 

621 self._add_response_properties(first_event.message, self._pipe.request) 

622 self.response.set_result(first_event.message) 

623 else: 

624 self.response.set_exception(first_event.exception) 

625 if not isinstance(first_event.exception, error.Error): 

626 self.log.warning( 

627 "An exception that is not an aiocoap Error was raised " 

628 "from a transport; please report this as a bug in " 

629 "aiocoap: %r", 

630 first_event.exception, 

631 ) 

632 

633 if self.observation is None: 

634 if not first_event.is_last: 

635 self.log.error( 

636 "Pipe indicated more possible responses" 

637 " while the Request handler would not know what to" 

638 " do with them, stopping any further request." 

639 ) 

640 self._stop_interest() 

641 return 

642 

643 if first_event.is_last: 

644 self.observation.error(error.NotObservable()) 

645 return 

646 

647 if first_event.message.opt.observe is None: 

648 self.log.error( 

649 "Pipe indicated more possible responses" 

650 " while the Request handler would not know what to" 

651 " do with them, stopping any further request." 

652 ) 

653 self._stop_interest() 

654 return 

655 

656 # variable names from RFC7641 Section 3.4 

657 v1 = first_event.message.opt.observe 

658 t1 = time.time() 

659 

660 while True: 

661 # We don't really support cancellation of observations yet (see 

662 # https://github.com/chrysn/aiocoap/issues/92), but at least 

663 # stopping the interest is a way to free the local resources after 

664 # the first observation update, and to make the MID handler RST the 

665 # observation on the next. 

666 # FIXME: there *is* now a .on_cancel callback, we should at least 

667 # hook into that, and possibly even send a proper cancellation 

668 # then. 

669 next_event = yield True 

670 if self.observation.cancelled: 

671 self._stop_interest() 

672 return 

673 

674 if next_event.exception is not None: 

675 self.observation.error(next_event.exception) 

676 if not next_event.is_last: 

677 self._stop_interest() 

678 if not isinstance(next_event.exception, error.Error): 

679 self.log.warning( 

680 "An exception that is not an aiocoap Error was " 

681 "raised from a transport during an observation; " 

682 "please report this as a bug in aiocoap: %r", 

683 next_event.exception, 

684 ) 

685 return 

686 

687 self._add_response_properties(next_event.message, self._pipe.request) 

688 

689 if next_event.message.opt.observe is not None: 

690 # check for reordering 

691 v2 = next_event.message.opt.observe 

692 t2 = time.time() 

693 

694 is_recent = ( 

695 (v1 < v2 and v2 - v1 < 2**23) 

696 or (v1 > v2 and v1 - v2 > 2**23) 

697 or ( 

698 t2 

699 > t1 

700 + self._pipe.request.transport_tuning.OBSERVATION_RESET_TIME 

701 ) 

702 ) 

703 if is_recent: 

704 t1 = t2 

705 v1 = v2 

706 else: 

707 # the terminal message is always the last 

708 is_recent = True 

709 

710 if is_recent: 

711 self.observation.callback(next_event.message) 

712 

713 if next_event.is_last: 

714 self.observation.error(error.ObservationCancelled()) 

715 return 

716 

717 if next_event.message.opt.observe is None: 

718 self.observation.error(error.ObservationCancelled()) 

719 self.log.error( 

720 "Pipe indicated more possible responses" 

721 " while the Request handler would not know what to" 

722 " do with them, stopping any further request." 

723 ) 

724 self._stop_interest() 

725 return 

726 

727 

728class BlockwiseRequest(BaseUnicastRequest, interfaces.Request): 

729 def __init__(self, protocol, app_request): 

730 self.protocol = protocol 

731 self.log = self.protocol.log.getChild("blockwise-requester") 

732 

733 self.response = protocol.loop.create_future() 

734 

735 if app_request.opt.observe is not None: 

736 self.observation = ClientObservation() 

737 else: 

738 self.observation = None 

739 

740 self._runner = protocol.loop.create_task( 

741 self._run_outer( 

742 app_request, 

743 self.response, 

744 weakref.ref(self.observation) 

745 if self.observation is not None 

746 else lambda: None, 

747 self.protocol, 

748 self.log, 

749 ), 

750 name="Blockwise runner for %r" % app_request, 

751 ) 

752 self.response.add_done_callback(self._response_cancellation_handler) 

753 

754 def _response_cancellation_handler(self, response_future): 

755 # see Request._response_cancellation_handler 

756 if self.response.cancelled(): 

757 self._runner.cancel() 

758 

759 @classmethod 

760 async def _run_outer(cls, app_request, response, weak_observation, protocol, log): 

761 try: 

762 await cls._run(app_request, response, weak_observation, protocol, log) 

763 except asyncio.CancelledError: 

764 pass # results already set 

765 except Exception as e: 

766 logged = False 

767 if not response.done(): 

768 logged = True 

769 response.set_exception(e) 

770 obs = weak_observation() 

771 if app_request.opt.observe is not None and obs is not None: 

772 logged = True 

773 obs.error(e) 

774 if not logged: 

775 # should be unreachable 

776 log.error( 

777 "Exception in BlockwiseRequest runner neither went to response nor to observation: %s", 

778 e, 

779 exc_info=e, 

780 ) 

781 

782 # This is a class method because that allows self and self.observation to 

783 # be freed even when this task is running, and the task to stop itself -- 

784 # otherwise we couldn't know when users just "forget" about a request 

785 # object after using its response (esp. in observe cases) and leave this 

786 # task running. 

787 @classmethod 

788 async def _run(cls, app_request, response, weak_observation, protocol, log): 

789 # we need to populate the remote right away, because the choice of 

790 # blocks depends on it. 

791 await protocol.find_remote_and_interface(app_request) 

792 

793 size_exp = app_request.remote.maximum_block_size_exp 

794 

795 if app_request.opt.block1 is not None: 

796 warnings.warn( 

797 "Setting a block1 option in a managed block-wise transfer is deprecated. Instead, set request.remote.maximum_block_size_exp to the desired value", 

798 DeprecationWarning, 

799 stacklevel=2, 

800 ) 

801 assert app_request.opt.block1.block_number == 0, ( 

802 "Unexpected block number in app_request" 

803 ) 

804 assert not app_request.opt.block1.more, ( 

805 "Unexpected more-flag in app_request" 

806 ) 

807 # this is where the library user can traditionally pass in size 

808 # exponent hints into the library. 

809 size_exp = app_request.opt.block1.size_exponent 

810 

811 # Offset in the message in blocks of size_exp. Whoever changes size_exp 

812 # is responsible for updating this number. 

813 block_cursor = 0 

814 

815 while True: 

816 # ... send a chunk 

817 

818 if size_exp >= 6: 

819 # FIXME from maximum_payload_size 

820 fragmentation_threshold = app_request.remote.maximum_payload_size 

821 else: 

822 fragmentation_threshold = 2 ** (size_exp + 4) 

823 

824 if ( 

825 app_request.opt.block1 is not None 

826 or len(app_request.payload) > fragmentation_threshold 

827 ): 

828 current_block1 = app_request._extract_block( 

829 block_cursor, size_exp, app_request.remote.maximum_payload_size 

830 ) 

831 if block_cursor == 0: 

832 current_block1.opt.size1 = len(app_request.payload) 

833 else: 

834 current_block1 = app_request 

835 

836 blockrequest = protocol.request(current_block1, handle_blockwise=False) 

837 blockresponse = await blockrequest.response 

838 

839 # store for future blocks to ensure that the next blocks will be 

840 # sent from the same source address (in the UDP case; for many 

841 # other transports it won't matter). carrying along locally set block size limitation 

842 if ( 

843 app_request.remote.maximum_block_size_exp 

844 < blockresponse.remote.maximum_block_size_exp 

845 ): 

846 blockresponse.remote.maximum_block_size_exp = ( 

847 app_request.remote.maximum_block_size_exp 

848 ) 

849 app_request.remote = blockresponse.remote 

850 

851 if blockresponse.opt.block1 is None: 

852 if blockresponse.code.is_successful() and current_block1.opt.block1: 

853 log.warning( 

854 "Block1 option completely ignored by server, assuming it knows what it is doing." 

855 ) 

856 # FIXME: handle 4.13 and retry with the indicated size option 

857 break 

858 

859 block1 = blockresponse.opt.block1 

860 log.debug( 

861 "Response with Block1 option received, number = %d, more = %d, size_exp = %d.", 

862 block1.block_number, 

863 block1.more, 

864 block1.size_exponent, 

865 ) 

866 

867 if block1.block_number != current_block1.opt.block1.block_number: 

868 raise error.UnexpectedBlock1Option("Block number mismatch") 

869 

870 if size_exp == 7: 

871 block_cursor += len(current_block1.payload) // 1024 

872 else: 

873 block_cursor += 1 

874 

875 while block1.size_exponent < size_exp: 

876 block_cursor *= 2 

877 size_exp -= 1 

878 

879 if not current_block1.opt.block1.more: 

880 if block1.more or blockresponse.code == CONTINUE: 

881 # treating this as a protocol error -- letting it slip 

882 # through would misrepresent the whole operation as an 

883 # over-all 2.xx (successful) one. 

884 raise error.UnexpectedBlock1Option( 

885 "Server asked for more data at end of body" 

886 ) 

887 break 

888 

889 # checks before preparing the next round: 

890 

891 if blockresponse.opt.observe: 

892 # we're not *really* interested in that block, we just sent an 

893 # observe option to indicate that we'll want to observe the 

894 # resulting representation as a whole 

895 log.warning( 

896 "Server answered Observe in early Block1 phase, cancelling the erroneous observation." 

897 ) 

898 blockrequest.observe.cancel() 

899 

900 if block1.more: 

901 # FIXME i think my own server is dowing this wrong 

902 # if response.code != CONTINUE: 

903 # raise error.UnexpectedBlock1Option("more-flag set but no Continue") 

904 pass 

905 else: 

906 if not blockresponse.code.is_successful(): 

907 break 

908 else: 

909 # ignoring (discarding) the successful intermediate result, waiting for a final one 

910 continue 

911 

912 lower_observation = None 

913 if app_request.opt.observe is not None: 

914 if blockresponse.opt.observe is not None: 

915 lower_observation = blockrequest.observation 

916 else: 

917 obs = weak_observation() 

918 if obs: 

919 obs.error(error.NotObservable()) 

920 del obs 

921 

922 assert blockresponse is not None, "Block1 loop broke without setting a response" 

923 blockresponse.opt.block1 = None 

924 

925 # FIXME check with RFC7959: it just says "send requests similar to the 

926 # requests in the Block1 phase", what does that mean? using the last 

927 # block1 as a reference for now, especially because in the 

928 # only-one-request-block case, that's the original request we must send 

929 # again and again anyway 

930 assembled_response = await cls._complete_by_requesting_block2( 

931 protocol, current_block1, blockresponse, log 

932 ) 

933 

934 response.set_result(assembled_response) 

935 # finally set the result 

936 

937 if lower_observation is not None: 

938 # FIXME this can all be simplified a lot since it's no more 

939 # expected that observations shut themselves down when GC'd. 

940 obs = weak_observation() 

941 del weak_observation 

942 if obs is None: 

943 lower_observation.cancel() 

944 return 

945 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask 

946 subtask = asyncio.create_task( 

947 cls._run_observation( 

948 app_request, 

949 lower_observation, 

950 future_weak_observation, 

951 protocol, 

952 log, 

953 ), 

954 name="Blockwise observation for %r" % app_request, 

955 ) 

956 future_weak_observation.set_result( 

957 weakref.ref(obs, lambda obs: subtask.cancel()) 

958 ) 

959 obs.on_cancel(subtask.cancel) 

960 del obs 

961 await subtask 

962 

963 @classmethod 

964 async def _run_observation( 

965 cls, original_request, lower_observation, future_weak_observation, protocol, log 

966 ): 

967 weak_observation = await future_weak_observation 

968 # we can use weak_observation() here at any time, because whenever that 

969 # becomes None, this task gets cancelled 

970 try: 

971 async for block1_notification in lower_observation: 

972 log.debug("Notification received") 

973 full_notification = await cls._complete_by_requesting_block2( 

974 protocol, original_request, block1_notification, log 

975 ) 

976 log.debug("Reporting completed notification") 

977 weak_observation().callback(full_notification) 

978 # FIXME verify that this loop actually ends iff the observation 

979 # was cancelled -- otherwise find out the cause(s) or make it not 

980 # cancel under indistinguishable circumstances 

981 weak_observation().error(error.ObservationCancelled()) 

982 except asyncio.CancelledError: 

983 return 

984 except Exception as e: 

985 weak_observation().error(e) 

986 finally: 

987 # We generally avoid idempotent cancellation, but we may have 

988 # reached this point either due to an earlier cancellation or 

989 # without one 

990 if not lower_observation.cancelled: 

991 lower_observation.cancel() 

992 

993 @classmethod 

994 async def _complete_by_requesting_block2( 

995 cls, protocol, request_to_repeat, initial_response, log 

996 ): 

997 # FIXME this can probably be deduplicated against BlockwiseRequest 

998 

999 if ( 

1000 initial_response.opt.block2 is None 

1001 or initial_response.opt.block2.more is False 

1002 ): 

1003 initial_response.opt.block2 = None 

1004 return initial_response 

1005 

1006 if initial_response.opt.block2.block_number != 0: 

1007 log.error("Error assembling blockwise response (expected first block)") 

1008 raise error.UnexpectedBlock2() 

1009 

1010 assembled_response = initial_response 

1011 last_response = initial_response 

1012 while True: 

1013 current_block2 = request_to_repeat._generate_next_block2_request( 

1014 assembled_response 

1015 ) 

1016 

1017 current_block2 = current_block2.copy(remote=initial_response.remote) 

1018 

1019 blockrequest = protocol.request(current_block2, handle_blockwise=False) 

1020 last_response = await blockrequest.response 

1021 

1022 if last_response.opt.block2 is None: 

1023 log.warning( 

1024 "Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response." 

1025 ) 

1026 return last_response 

1027 

1028 block2 = last_response.opt.block2 

1029 log.debug( 

1030 "Response with Block2 option received, number = %d, more = %d, size_exp = %d.", 

1031 block2.block_number, 

1032 block2.more, 

1033 block2.size_exponent, 

1034 ) 

1035 try: 

1036 assembled_response._append_response_block(last_response) 

1037 except error.Error as e: 

1038 log.error("Error assembling blockwise response, passing on error %r", e) 

1039 raise 

1040 

1041 if block2.more is False: 

1042 return assembled_response 

1043 

1044 

1045class ClientObservation: 

1046 """An interface to observe notification updates arriving on a request. 

1047 

1048 This class does not actually provide any of the observe functionality, it 

1049 is purely a container for dispatching the messages via asynchronous 

1050 iteration. It gets driven (ie. populated with responses or errors including 

1051 observation termination) by a Request object. 

1052 """ 

1053 

1054 def __init__(self): 

1055 self.callbacks = [] 

1056 self.errbacks = [] 

1057 

1058 self.cancelled = False 

1059 self._on_cancel = [] 

1060 

1061 self._latest_response = None 

1062 # the analogous error is stored in _cancellation_reason when cancelled. 

1063 

1064 def __aiter__(self): 

1065 """`async for` interface to observations. 

1066 

1067 This is the preferred interface to obtaining observations.""" 

1068 it = self._Iterator() 

1069 self.register_callback(it.push, _suppress_deprecation=True) 

1070 self.register_errback(it.push_err, _suppress_deprecation=True) 

1071 return it 

1072 

1073 class _Iterator: 

1074 def __init__(self): 

1075 self._future = asyncio.get_event_loop().create_future() 

1076 

1077 def push(self, item): 

1078 if self._future.done(): 

1079 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1080 self._future = asyncio.get_event_loop().create_future() 

1081 self._future.set_result(item) 

1082 

1083 def push_err(self, e): 

1084 if self._future.done(): 

1085 self._future = asyncio.get_event_loop().create_future() 

1086 self._future.set_exception(e) 

1087 

1088 async def __anext__(self): 

1089 f = self._future 

1090 try: 

1091 result = await self._future 

1092 # FIXME see `await servobs._trigger` comment: might waiting for 

1093 # the original future not yield the first future's result when 

1094 # a quick second future comes in in a push? 

1095 if f is self._future: 

1096 self._future = asyncio.get_event_loop().create_future() 

1097 return result 

1098 except (error.NotObservable, error.ObservationCancelled): 

1099 # only exit cleanly when the server -- right away or later -- 

1100 # states that the resource is not observable any more 

1101 # FIXME: check whether an unsuccessful message is still passed 

1102 # as an observation result (or whether it should be) 

1103 raise StopAsyncIteration 

1104 

1105 def __del__(self): 

1106 if self._future.done(): 

1107 try: 

1108 # Fetch the result so any errors show up at least in the 

1109 # finalizer output 

1110 self._future.result() 

1111 except (error.ObservationCancelled, error.NotObservable): 

1112 # This is the case at the end of an observation cancelled 

1113 # by the server. 

1114 pass 

1115 except error.NetworkError: 

1116 # This will already have shown up in the main result too. 

1117 pass 

1118 except (error.LibraryShutdown, asyncio.CancelledError): 

1119 pass 

1120 # Anything else flying out of this is unexpected and probably a 

1121 # library error 

1122 

1123 # When this function is removed, we can finally do cleanup better. Right 

1124 # now, someone could register a callback that doesn't hold any references, 

1125 # so we can't just stop the request when nobody holds a reference to this 

1126 # any more. Once we're all in pull mode, we can make the `process` function 

1127 # that sends data in here use a weak reference (because any possible 

1128 # recipient would need to hold a reference to self or the iterator, and 

1129 # thus _run). 

1130 def register_callback(self, callback, _suppress_deprecation=False): 

1131 """Call the callback whenever a response to the message comes in, and 

1132 pass the response to it. 

1133 

1134 The use of this function is deprecated: Use the asynchronous iteration 

1135 interface instead.""" 

1136 if not _suppress_deprecation: 

1137 warnings.warn( 

1138 "register_callback on observe results is deprecated: Use `async for notify in request.observation` instead.", 

1139 DeprecationWarning, 

1140 stacklevel=2, 

1141 ) 

1142 if self.cancelled: 

1143 return 

1144 

1145 self.callbacks.append(callback) 

1146 if self._latest_response is not None: 

1147 callback(self._latest_response) 

1148 

1149 def register_errback(self, callback, _suppress_deprecation=False): 

1150 """Call the callback whenever something goes wrong with the 

1151 observation, and pass an exception to the callback. After such a 

1152 callback is called, no more callbacks will be issued. 

1153 

1154 The use of this function is deprecated: Use the asynchronous iteration 

1155 interface instead.""" 

1156 if not _suppress_deprecation: 

1157 warnings.warn( 

1158 "register_errback on observe results is deprecated: Use `async for notify in request.observation` instead.", 

1159 DeprecationWarning, 

1160 stacklevel=2, 

1161 ) 

1162 if self.cancelled: 

1163 callback(self._cancellation_reason) 

1164 return 

1165 self.errbacks.append(callback) 

1166 

1167 def callback(self, response): 

1168 """Notify all listeners of an incoming response""" 

1169 

1170 self._latest_response = response 

1171 

1172 for c in self.callbacks: 

1173 c(response) 

1174 

1175 def error(self, exception): 

1176 """Notify registered listeners that the observation went wrong. This 

1177 can only be called once.""" 

1178 

1179 if self.errbacks is None: 

1180 raise RuntimeError( 

1181 "Error raised in an already cancelled ClientObservation" 

1182 ) from exception 

1183 for c in self.errbacks: 

1184 c(exception) 

1185 

1186 self.cancel() 

1187 self._cancellation_reason = exception 

1188 

1189 def cancel(self): 

1190 # FIXME determine whether this is called by anything other than error, 

1191 # and make it private so there is always a _cancellation_reason 

1192 """Cease to generate observation or error events. This will not 

1193 generate an error by itself. 

1194 

1195 This function is only needed while register_callback and 

1196 register_errback are around; once their deprecations are acted on, 

1197 dropping the asynchronous iterator will automatically cancel the 

1198 observation. 

1199 """ 

1200 

1201 assert not self.cancelled, "ClientObservation cancelled twice" 

1202 

1203 # make sure things go wrong when someone tries to continue this 

1204 self.errbacks = None 

1205 self.callbacks = None 

1206 

1207 self.cancelled = True 

1208 while self._on_cancel: 

1209 self._on_cancel.pop()() 

1210 

1211 self._cancellation_reason = None 

1212 

1213 def on_cancel(self, callback): 

1214 if self.cancelled: 

1215 callback() 

1216 self._on_cancel.append(callback) 

1217 

1218 def __repr__(self): 

1219 return "<%s %s at %#x>" % ( 

1220 type(self).__name__, 

1221 "(cancelled)" 

1222 if self.cancelled 

1223 else "(%s call-, %s errback(s))" 

1224 % (len(self.callbacks), len(self.errbacks)), 

1225 id(self), 

1226 ) 

1227 

1228 

1229class ServerObservation: 

1230 def __init__(self): 

1231 self._accepted = False 

1232 self._trigger = asyncio.get_event_loop().create_future() 

1233 # A deregistration is "early" if it happens before the response message 

1234 # is actually sent; calling deregister() in that time (typically during 

1235 # `render()`) will not send an unsuccessful response message but just 

1236 # sent this flag which is set to None as soon as it is too late for an 

1237 # early deregistration. 

1238 # This mechanism is temporary until more of aiocoap behaves like 

1239 # Pipe which does not suffer from this limitation. 

1240 self._early_deregister = False 

1241 self._late_deregister = False 

1242 

1243 def accept(self, cancellation_callback): 

1244 self._accepted = True 

1245 self._cancellation_callback = cancellation_callback 

1246 

1247 def deregister(self, reason=None): 

1248 if self._early_deregister is False: 

1249 self._early_deregister = True 

1250 return 

1251 

1252 warnings.warn( 

1253 "Late use of ServerObservation.deregister() is" 

1254 " deprecated, use .trigger with an unsuccessful value" 

1255 " instead", 

1256 DeprecationWarning, 

1257 ) 

1258 self.trigger( 

1259 Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable") 

1260 ) 

1261 

1262 def trigger(self, response=None, *, is_last=False): 

1263 """Send an updated response; if None is given, the observed resource's 

1264 rendering will be invoked to produce one. 

1265 

1266 `is_last` can be set to True to indicate that no more responses will be 

1267 sent. Note that an unsuccessful response will be the last no matter 

1268 what is_last says, as such a message always terminates a CoAP 

1269 observation.""" 

1270 if is_last: 

1271 self._late_deregister = True 

1272 if self._trigger.done(): 

1273 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1274 self._trigger = asyncio.get_event_loop().create_future() 

1275 self._trigger.set_result(response)