Coverage for aiocoap/protocol.py: 88%

495 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-05 18:37 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module contains the classes that are responsible for keeping track of 

6messages: 

7 

8* :class:`Context` roughly represents the CoAP endpoint (basically a UDP 

9 socket) -- something that can send requests and possibly can answer 

10 incoming requests. 

11 

12 Incoming requests are processed in tasks created by the context. 

13 

14* a :class:`Request` gets generated whenever a request gets sent to keep 

15 track of the response 

16 

17Logging 

18~~~~~~~ 

19 

20Several constructors of the Context accept a logger name; these names go into 

21the construction of a Python logger. 

22 

23Log events will be emitted to these on different levels, with "warning" and 

24above being a practical default for things that should may warrant reviewing by 

25an operator: 

26 

27* DEBUG is used for things that occur even under perfect conditions. 

28* INFO is for things that are well expected, but might be interesting during 

29 testing a network of nodes and not just when debugging the library. (This 

30 includes timeouts, retransmissions, and pings.) 

31* WARNING is for everything that indicates a malbehaved peer. These don't 

32 *necessarily* indicate a client bug, though: Things like requesting a 

33 nonexistent block can just as well happen when a resource's content has 

34 changed between blocks. The library will not go out of its way to determine 

35 whether there is a plausible explanation for the odd behavior, and will 

36 report something as a warning in case of doubt. 

37* ERROR is used when something clearly went wrong. This includes irregular 

38 connection terminations and resource handler errors (which are demoted to 

39 error responses), and can often contain a backtrace. 

40 

41Logs will generally reveal messages exchanged between this and other systems, 

42and attackers can observe their encrypted counterparts. Private or shared keys 

43are only logged through an internal `log_secret` function, which usually 

44replaces them with a redacted value. Setting the ``AIOCOAP_REVEAL_KEYS`` 

45environment variable to the value ``show secrets in logs`` bypasses that 

46mechanism. As an additional precaution, this is only accepted if the effective 

47user has write access to the aiocoap source code. 

48""" 

49 

50import asyncio 

51import weakref 

52import time 

53from typing import Optional, List 

54 

55from . import defaults 

56from .credentials import CredentialsMap 

57from .message import Message 

58from .messagemanager import MessageManager 

59from .tokenmanager import TokenManager 

60from .pipe import Pipe, run_driving_pipe, error_to_message 

61from . import interfaces 

62from . import error 

63from .numbers import INTERNAL_SERVER_ERROR, NOT_FOUND, CONTINUE, SHUTDOWN_TIMEOUT 

64 

65import warnings 

66import logging 

67 

68 

69class Context(interfaces.RequestProvider): 

70 """Applications' entry point to the network 

71 

72 A :class:`.Context` coordinates one or more network :mod:`.transports` 

73 implementations and dispatches data between them and the application. 

74 

75 The application can start requests using the message dispatch methods, and 

76 set a :class:`resources.Site` that will answer requests directed to the 

77 application as a server. 

78 

79 On the library-internals side, it is the prime implementation of the 

80 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and 

81 :class:`Response` classes on demand, and decides which transport 

82 implementations to start and which are to handle which messages. 

83 

84 **Context creation and destruction** 

85 

86 The following functions are provided for creating and stopping a context: 

87 

88 .. note:: 

89 

90 A typical application should only ever create one context, even (or 

91 especially when) it acts both as a server and as a client (in which 

92 case a server context should be created). 

93 

94 A context that is not used any more must be shut down using 

95 :meth:`.shutdown()`, but typical applications will not need to because 

96 they use the context for the full process lifetime. 

97 

98 .. automethod:: create_client_context 

99 .. automethod:: create_server_context 

100 

101 .. automethod:: shutdown 

102 

103 **Dispatching messages** 

104 

105 CoAP requests can be sent using the following functions: 

106 

107 .. automethod:: request 

108 

109 If more control is needed, you can create a :class:`Request` yourself and 

110 pass the context to it. 

111 

112 

113 **Other methods and properties** 

114 

115 The remaining methods and properties are to be considered unstable even 

116 when the project reaches a stable version number; please file a feature 

117 request for stabilization if you want to reliably access any of them. 

118 """ 

119 

120 def __init__( 

121 self, 

122 loop=None, 

123 serversite=None, 

124 loggername="coap", 

125 client_credentials=None, 

126 server_credentials=None, 

127 ): 

128 self.log = logging.getLogger(loggername) 

129 

130 self.loop = loop or asyncio.get_event_loop() 

131 

132 self.serversite = serversite 

133 

134 self.request_interfaces = [] 

135 

136 self.client_credentials = client_credentials or CredentialsMap() 

137 self.server_credentials = server_credentials or CredentialsMap() 

138 

139 # 

140 # convenience methods for class instantiation 

141 # 

142 

143 async def _append_tokenmanaged_messagemanaged_transport( 

144 self, message_interface_constructor 

145 ): 

146 tman = TokenManager(self) 

147 mman = MessageManager(tman) 

148 transport = await message_interface_constructor(mman) 

149 

150 mman.message_interface = transport 

151 tman.token_interface = mman 

152 

153 self.request_interfaces.append(tman) 

154 

155 async def _append_tokenmanaged_transport(self, token_interface_constructor): 

156 tman = TokenManager(self) 

157 transport = await token_interface_constructor(tman) 

158 

159 tman.token_interface = transport 

160 

161 self.request_interfaces.append(tman) 

162 

163 @classmethod 

164 async def create_client_context( 

165 cls, *, loggername="coap", loop=None, transports: Optional[List[str]] = None 

166 ): 

167 """Create a context bound to all addresses on a random listening port. 

168 

169 This is the easiest way to get a context suitable for sending client 

170 requests. 

171 

172 :meta private: 

173 (not actually private, just hiding from automodule due to being 

174 grouped with the important functions) 

175 """ 

176 

177 if loop is None: 

178 loop = asyncio.get_event_loop() 

179 

180 self = cls(loop=loop, serversite=None, loggername=loggername) 

181 

182 selected_transports = transports or defaults.get_default_clienttransports( 

183 loop=loop 

184 ) 

185 

186 # FIXME make defaults overridable (postponed until they become configurable too) 

187 for transportname in selected_transports: 

188 if transportname == "udp6": 

189 from .transports.udp6 import MessageInterfaceUDP6 

190 

191 await self._append_tokenmanaged_messagemanaged_transport( 

192 lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint( 

193 mman, log=self.log, loop=loop 

194 ) 

195 ) 

196 elif transportname == "simple6": 

197 from .transports.simple6 import MessageInterfaceSimple6 

198 

199 await self._append_tokenmanaged_messagemanaged_transport( 

200 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint( 

201 mman, log=self.log, loop=loop 

202 ) 

203 ) 

204 elif transportname == "tinydtls": 

205 from .transports.tinydtls import MessageInterfaceTinyDTLS 

206 

207 await self._append_tokenmanaged_messagemanaged_transport( 

208 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint( 

209 mman, log=self.log, loop=loop 

210 ) 

211 ) 

212 elif transportname == "tcpclient": 

213 from .transports.tcp import TCPClient 

214 

215 await self._append_tokenmanaged_transport( 

216 lambda tman: TCPClient.create_client_transport(tman, self.log, loop) 

217 ) 

218 elif transportname == "tlsclient": 

219 from .transports.tls import TLSClient 

220 

221 await self._append_tokenmanaged_transport( 

222 lambda tman: TLSClient.create_client_transport( 

223 tman, self.log, loop, self.client_credentials 

224 ) 

225 ) 

226 elif transportname == "ws": 

227 from .transports.ws import WSPool 

228 

229 await self._append_tokenmanaged_transport( 

230 lambda tman: WSPool.create_transport( 

231 tman, self.log, loop, client_credentials=self.client_credentials 

232 ) 

233 ) 

234 elif transportname == "oscore": 

235 from .transports.oscore import TransportOSCORE 

236 

237 oscoretransport = TransportOSCORE(self, self) 

238 self.request_interfaces.append(oscoretransport) 

239 else: 

240 raise RuntimeError( 

241 "Transport %r not know for client context creation" % transportname 

242 ) 

243 

244 return self 

245 

246 @classmethod 

247 async def create_server_context( 

248 cls, 

249 site, 

250 bind=None, 

251 *, 

252 loggername="coap-server", 

253 loop=None, 

254 _ssl_context=None, 

255 multicast=[], 

256 server_credentials=None, 

257 transports: Optional[List[str]] = None, 

258 ): 

259 """Create a context, bound to all addresses on the CoAP port (unless 

260 otherwise specified in the ``bind`` argument). 

261 

262 This is the easiest way to get a context suitable both for sending 

263 client and accepting server requests. 

264 

265 The ``bind`` argument, if given, needs to be a 2-tuple of IP address 

266 string and port number, where the port number can be None to use the default port. 

267 

268 If ``multicast`` is given, it needs to be a list of (multicast address, 

269 interface name) tuples, which will all be joined. (The IPv4 style of 

270 selecting the interface by a local address is not supported; users may 

271 want to use the netifaces package to arrive at an interface name for an 

272 address). 

273 

274 As a shortcut, the list may also contain interface names alone. Those 

275 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with 

276 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6. 

277 

278 Under some circumstances you may already need a context to pass into 

279 the site for creation; this is typically the case for servers that 

280 trigger requests on their own. For those cases, it is usually easiest 

281 to pass None in as a site, and set the fully constructed site later by 

282 assigning to the ``serversite`` attribute. 

283 

284 :meta private: 

285 (not actually private, just hiding from automodule due to being 

286 grouped with the important functions) 

287 """ 

288 

289 if loop is None: 

290 loop = asyncio.get_event_loop() 

291 

292 self = cls( 

293 loop=loop, 

294 serversite=site, 

295 loggername=loggername, 

296 server_credentials=server_credentials, 

297 ) 

298 

299 multicast_done = not multicast 

300 

301 selected_transports = transports or defaults.get_default_servertransports( 

302 loop=loop 

303 ) 

304 

305 for transportname in selected_transports: 

306 if transportname == "udp6": 

307 from .transports.udp6 import MessageInterfaceUDP6 

308 

309 await self._append_tokenmanaged_messagemanaged_transport( 

310 lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint( 

311 mman, log=self.log, loop=loop, bind=bind, multicast=multicast 

312 ) 

313 ) 

314 multicast_done = True 

315 # FIXME this is duplicated from the client version, as those are client-only anyway 

316 elif transportname == "simple6": 

317 from .transports.simple6 import MessageInterfaceSimple6 

318 

319 await self._append_tokenmanaged_messagemanaged_transport( 

320 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint( 

321 mman, log=self.log, loop=loop 

322 ) 

323 ) 

324 elif transportname == "tinydtls": 

325 from .transports.tinydtls import MessageInterfaceTinyDTLS 

326 

327 await self._append_tokenmanaged_messagemanaged_transport( 

328 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint( 

329 mman, log=self.log, loop=loop 

330 ) 

331 ) 

332 # FIXME end duplication 

333 elif transportname == "tinydtls_server": 

334 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer 

335 

336 await self._append_tokenmanaged_messagemanaged_transport( 

337 lambda mman: MessageInterfaceTinyDTLSServer.create_server( 

338 bind, 

339 mman, 

340 log=self.log, 

341 loop=loop, 

342 server_credentials=self.server_credentials, 

343 ) 

344 ) 

345 elif transportname == "simplesocketserver": 

346 from .transports.simplesocketserver import MessageInterfaceSimpleServer 

347 

348 await self._append_tokenmanaged_messagemanaged_transport( 

349 lambda mman: MessageInterfaceSimpleServer.create_server( 

350 bind, mman, log=self.log, loop=loop 

351 ) 

352 ) 

353 elif transportname == "tcpserver": 

354 from .transports.tcp import TCPServer 

355 

356 await self._append_tokenmanaged_transport( 

357 lambda tman: TCPServer.create_server(bind, tman, self.log, loop) 

358 ) 

359 elif transportname == "tcpclient": 

360 from .transports.tcp import TCPClient 

361 

362 await self._append_tokenmanaged_transport( 

363 lambda tman: TCPClient.create_client_transport(tman, self.log, loop) 

364 ) 

365 elif transportname == "tlsserver": 

366 if _ssl_context is not None: 

367 from .transports.tls import TLSServer 

368 

369 await self._append_tokenmanaged_transport( 

370 lambda tman: TLSServer.create_server( 

371 bind, tman, self.log, loop, _ssl_context 

372 ) 

373 ) 

374 elif transportname == "tlsclient": 

375 from .transports.tls import TLSClient 

376 

377 await self._append_tokenmanaged_transport( 

378 lambda tman: TLSClient.create_client_transport( 

379 tman, self.log, loop, self.client_credentials 

380 ) 

381 ) 

382 elif transportname == "ws": 

383 from .transports.ws import WSPool 

384 

385 await self._append_tokenmanaged_transport( 

386 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind 

387 lambda tman: WSPool.create_transport( 

388 tman, 

389 self.log, 

390 loop, 

391 client_credentials=self.client_credentials, 

392 server_bind=bind or (None, None), 

393 server_context=_ssl_context, 

394 ) 

395 ) 

396 elif transportname == "oscore": 

397 from .transports.oscore import TransportOSCORE 

398 

399 oscoretransport = TransportOSCORE(self, self) 

400 self.request_interfaces.append(oscoretransport) 

401 else: 

402 raise RuntimeError( 

403 "Transport %r not know for server context creation" % transportname 

404 ) 

405 

406 if not multicast_done: 

407 self.log.warning( 

408 "Multicast was requested, but no multicast capable transport was selected." 

409 ) 

410 

411 # This is used in tests to wait for externally launched servers to be ready 

412 self.log.debug("Server ready to receive requests") 

413 

414 return self 

415 

416 async def shutdown(self): 

417 """Take down any listening sockets and stop all related timers. 

418 

419 After this coroutine terminates, and once all external references to 

420 the object are dropped, it should be garbage-collectable. 

421 

422 This method takes up to 

423 :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing 

424 transports to perform any cleanup implemented in them (such as orderly 

425 connection shutdown and cancelling observations, where the latter is 

426 currently not implemented). 

427 

428 :meta private: 

429 (not actually private, just hiding from automodule due to being 

430 grouped with the important functions) 

431 """ 

432 

433 self.log.debug("Shutting down context") 

434 

435 done, pending = await asyncio.wait( 

436 [ 

437 asyncio.create_task( 

438 ri.shutdown(), 

439 name="Shutdown of %r" % ri, 

440 ) 

441 for ri in self.request_interfaces 

442 ], 

443 timeout=SHUTDOWN_TIMEOUT, 

444 ) 

445 for item in done: 

446 await item 

447 if pending: 

448 # Apart from being useful to see, this also ensures that developers 

449 # see the error in the logs during test suite runs -- and the error 

450 # should be easier to follow than the "we didn't garbage collect 

451 # everything" errors we see anyway (or otherwise, if the error is 

452 # escalated into a test failure) 

453 self.log.error( 

454 "Shutdown timeout exceeded, returning anyway. Interfaces still busy: %s", 

455 pending, 

456 ) 

457 

458 # FIXME: determine how official this should be, or which part of it is 

459 # public -- now that BlockwiseRequest uses it. (And formalize what can 

460 # change about messages and what can't after the remote has been thusly 

461 # populated). 

462 async def find_remote_and_interface(self, message): 

463 if message.remote is None: 

464 raise error.MissingRemoteError() 

465 for ri in self.request_interfaces: 

466 if await ri.fill_or_recognize_remote(message): 

467 return ri 

468 raise error.NoRequestInterface() 

469 

470 def request(self, request_message, handle_blockwise=True): 

471 if handle_blockwise: 

472 return BlockwiseRequest(self, request_message) 

473 

474 pipe = Pipe(request_message, self.log) 

475 # Request sets up callbacks at creation 

476 result = Request(pipe, self.loop, self.log) 

477 

478 async def send(): 

479 try: 

480 request_interface = await self.find_remote_and_interface( 

481 request_message 

482 ) 

483 request_interface.request(pipe) 

484 except Exception as e: 

485 pipe.add_exception(e) 

486 return 

487 

488 self.loop.create_task( 

489 send(), 

490 name="Request processing of %r" % result, 

491 ) 

492 return result 

493 

494 # the following are under consideration for moving into Site or something 

495 # mixed into it 

496 

497 def render_to_pipe(self, pipe): 

498 """Fill a pipe by running the site's render_to_pipe interface and 

499 handling errors.""" 

500 

501 pr_that_can_receive_errors = error_to_message(pipe, self.log) 

502 

503 run_driving_pipe( 

504 pr_that_can_receive_errors, 

505 self._render_to_pipe(pipe), 

506 name="Rendering for %r" % pipe.request, 

507 ) 

508 

509 async def _render_to_pipe(self, pipe): 

510 if self.serversite is None: 

511 pipe.add_response( 

512 Message(code=NOT_FOUND, payload=b"not a server"), is_last=True 

513 ) 

514 return 

515 

516 return await self.serversite.render_to_pipe(pipe) 

517 

518 

519class BaseRequest: 

520 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`""" 

521 

522 

523class BaseUnicastRequest(BaseRequest): 

524 """A utility class that offers the :attr:`response_raising` and 

525 :attr:`response_nonraising` alternatives to waiting for the 

526 :attr:`response` future whose error states can be presented either as an 

527 unsuccessful response (eg. 4.04) or an exception. 

528 

529 It also provides some internal tools for handling anything that has a 

530 :attr:`response` future and an :attr:`observation`""" 

531 

532 @property 

533 async def response_raising(self): 

534 """An awaitable that returns if a response comes in and is successful, 

535 otherwise raises generic network exception or a 

536 :class:`.error.ResponseWrappingError` for unsuccessful responses. 

537 

538 Experimental Interface.""" 

539 

540 response = await self.response 

541 if not response.code.is_successful(): 

542 raise error.ResponseWrappingError(response) 

543 

544 return response 

545 

546 @property 

547 async def response_nonraising(self): 

548 """An awaitable that rather returns a 500ish fabricated message (as a 

549 proxy would return) instead of raising an exception. 

550 

551 Experimental Interface.""" 

552 

553 # FIXME: Can we smuggle error_to_message into the underlying pipe? 

554 # That should make observe notifications into messages rather 

555 # than exceptions as well, plus it has fallbacks for `e.to_message()` 

556 # raising. 

557 

558 try: 

559 return await self.response 

560 except error.RenderableError as e: 

561 return e.to_message() 

562 except Exception: 

563 return Message(code=INTERNAL_SERVER_ERROR) 

564 

565 

566class Request(interfaces.Request, BaseUnicastRequest): 

567 def __init__(self, pipe, loop, log): 

568 self._pipe = pipe 

569 

570 self.response = loop.create_future() 

571 

572 if pipe.request.opt.observe == 0: 

573 self.observation = ClientObservation() 

574 else: 

575 self.observation = None 

576 

577 self._runner = self._run() 

578 self._runner.send(None) 

579 

580 def process(event): 

581 try: 

582 # would be great to have self or the runner as weak ref, but 

583 # see ClientObservation.register_callback comments -- while 

584 # that is around, we can't weakref here. 

585 self._runner.send(event) 

586 return True 

587 except StopIteration: 

588 return False 

589 

590 self._stop_interest = self._pipe.on_event(process) 

591 

592 self.log = log 

593 

594 self.response.add_done_callback(self._response_cancellation_handler) 

595 

596 def _response_cancellation_handler(self, response): 

597 # Propagate cancellation to the runner (if interest in the first 

598 # response is lost, there won't be observation items to pull out), but 

599 # not general completion (because if it's completed and not cancelled, 

600 # eg. when an observation is active) 

601 if self.response.cancelled() and self._runner is not None: 

602 # Dropping the only reference makes it stop with GeneratorExit, 

603 # similar to a cancelled task 

604 self._runner = None 

605 self._stop_interest() 

606 # Otherwise, there will be a runner still around, and it's its task to 

607 # call _stop_interest. 

608 

609 @staticmethod 

610 def _add_response_properties(response, request): 

611 response.request = request 

612 

613 def _run(self): 

614 # FIXME: This is in iterator form because it used to be a task that 

615 # awaited futures, and that code could be easily converted to an 

616 # iterator. I'm not sure that's a bad state here, but at least it 

617 # should be a more conscious decision to make this an iterator rather 

618 # than just having it happen to be one. 

619 # 

620 # FIXME: check that responses come from the same remmote as long as we're assuming unicast 

621 

622 first_event = yield None 

623 

624 if first_event.message is not None: 

625 self._add_response_properties(first_event.message, self._pipe.request) 

626 self.response.set_result(first_event.message) 

627 else: 

628 self.response.set_exception(first_event.exception) 

629 if not isinstance(first_event.exception, error.Error): 

630 self.log.warning( 

631 "An exception that is not an aiocoap Error was raised " 

632 "from a transport; please report this as a bug in " 

633 "aiocoap: %r", 

634 first_event.exception, 

635 ) 

636 

637 if self.observation is None: 

638 if not first_event.is_last: 

639 self.log.error( 

640 "Pipe indicated more possible responses" 

641 " while the Request handler would not know what to" 

642 " do with them, stopping any further request." 

643 ) 

644 self._stop_interest() 

645 return 

646 

647 if first_event.is_last: 

648 self.observation.error(error.NotObservable()) 

649 return 

650 

651 if first_event.message.opt.observe is None: 

652 self.log.error( 

653 "Pipe indicated more possible responses" 

654 " while the Request handler would not know what to" 

655 " do with them, stopping any further request." 

656 ) 

657 self._stop_interest() 

658 return 

659 

660 # variable names from RFC7641 Section 3.4 

661 v1 = first_event.message.opt.observe 

662 t1 = time.time() 

663 

664 while True: 

665 # We don't really support cancellation of observations yet (see 

666 # https://github.com/chrysn/aiocoap/issues/92), but at least 

667 # stopping the interest is a way to free the local resources after 

668 # the first observation update, and to make the MID handler RST the 

669 # observation on the next. 

670 # FIXME: there *is* now a .on_cancel callback, we should at least 

671 # hook into that, and possibly even send a proper cancellation 

672 # then. 

673 next_event = yield True 

674 if self.observation.cancelled: 

675 self._stop_interest() 

676 return 

677 

678 if next_event.exception is not None: 

679 self.observation.error(next_event.exception) 

680 if not next_event.is_last: 

681 self._stop_interest() 

682 if not isinstance(next_event.exception, error.Error): 

683 self.log.warning( 

684 "An exception that is not an aiocoap Error was " 

685 "raised from a transport during an observation; " 

686 "please report this as a bug in aiocoap: %r", 

687 next_event.exception, 

688 ) 

689 return 

690 

691 self._add_response_properties(next_event.message, self._pipe.request) 

692 

693 if next_event.message.opt.observe is not None: 

694 # check for reordering 

695 v2 = next_event.message.opt.observe 

696 t2 = time.time() 

697 

698 is_recent = ( 

699 (v1 < v2 and v2 - v1 < 2**23) 

700 or (v1 > v2 and v1 - v2 > 2**23) 

701 or ( 

702 t2 

703 > t1 

704 + self._pipe.request.transport_tuning.OBSERVATION_RESET_TIME 

705 ) 

706 ) 

707 if is_recent: 

708 t1 = t2 

709 v1 = v2 

710 else: 

711 # the terminal message is always the last 

712 is_recent = True 

713 

714 if is_recent: 

715 self.observation.callback(next_event.message) 

716 

717 if next_event.is_last: 

718 self.observation.error(error.ObservationCancelled()) 

719 return 

720 

721 if next_event.message.opt.observe is None: 

722 self.observation.error(error.ObservationCancelled()) 

723 self.log.error( 

724 "Pipe indicated more possible responses" 

725 " while the Request handler would not know what to" 

726 " do with them, stopping any further request." 

727 ) 

728 self._stop_interest() 

729 return 

730 

731 

732class BlockwiseRequest(BaseUnicastRequest, interfaces.Request): 

733 def __init__(self, protocol, app_request): 

734 self.protocol = protocol 

735 self.log = self.protocol.log.getChild("blockwise-requester") 

736 

737 self.response = protocol.loop.create_future() 

738 

739 if app_request.opt.observe is not None: 

740 self.observation = ClientObservation() 

741 else: 

742 self.observation = None 

743 

744 self._runner = protocol.loop.create_task( 

745 self._run_outer( 

746 app_request, 

747 self.response, 

748 weakref.ref(self.observation) 

749 if self.observation is not None 

750 else lambda: None, 

751 self.protocol, 

752 self.log, 

753 ), 

754 name="Blockwise runner for %r" % app_request, 

755 ) 

756 self.response.add_done_callback(self._response_cancellation_handler) 

757 

758 def _response_cancellation_handler(self, response_future): 

759 # see Request._response_cancellation_handler 

760 if self.response.cancelled(): 

761 self._runner.cancel() 

762 

763 @classmethod 

764 async def _run_outer(cls, app_request, response, weak_observation, protocol, log): 

765 try: 

766 await cls._run(app_request, response, weak_observation, protocol, log) 

767 except asyncio.CancelledError: 

768 pass # results already set 

769 except Exception as e: 

770 logged = False 

771 if not response.done(): 

772 logged = True 

773 response.set_exception(e) 

774 obs = weak_observation() 

775 if app_request.opt.observe is not None and obs is not None: 

776 logged = True 

777 obs.error(e) 

778 if not logged: 

779 # should be unreachable 

780 log.error( 

781 "Exception in BlockwiseRequest runner neither went to response nor to observation: %s", 

782 e, 

783 exc_info=e, 

784 ) 

785 

786 # This is a class method because that allows self and self.observation to 

787 # be freed even when this task is running, and the task to stop itself -- 

788 # otherwise we couldn't know when users just "forget" about a request 

789 # object after using its response (esp. in observe cases) and leave this 

790 # task running. 

791 @classmethod 

792 async def _run(cls, app_request, response, weak_observation, protocol, log): 

793 # we need to populate the remote right away, because the choice of 

794 # blocks depends on it. 

795 await protocol.find_remote_and_interface(app_request) 

796 

797 size_exp = app_request.remote.maximum_block_size_exp 

798 

799 if app_request.opt.block1 is not None: 

800 warnings.warn( 

801 "Setting a block1 option in a managed block-wise transfer is deprecated. Instead, set request.remote.maximum_block_size_exp to the desired value", 

802 DeprecationWarning, 

803 stacklevel=2, 

804 ) 

805 assert app_request.opt.block1.block_number == 0, ( 

806 "Unexpected block number in app_request" 

807 ) 

808 assert not app_request.opt.block1.more, ( 

809 "Unexpected more-flag in app_request" 

810 ) 

811 # this is where the library user can traditionally pass in size 

812 # exponent hints into the library. 

813 size_exp = app_request.opt.block1.size_exponent 

814 

815 # Offset in the message in blocks of size_exp. Whoever changes size_exp 

816 # is responsible for updating this number. 

817 block_cursor = 0 

818 

819 while True: 

820 # ... send a chunk 

821 

822 if size_exp >= 6: 

823 # FIXME from maximum_payload_size 

824 fragmentation_threshold = app_request.remote.maximum_payload_size 

825 else: 

826 fragmentation_threshold = 2 ** (size_exp + 4) 

827 

828 if ( 

829 app_request.opt.block1 is not None 

830 or len(app_request.payload) > fragmentation_threshold 

831 ): 

832 current_block1 = app_request._extract_block( 

833 block_cursor, size_exp, app_request.remote.maximum_payload_size 

834 ) 

835 if block_cursor == 0: 

836 current_block1.opt.size1 = len(app_request.payload) 

837 else: 

838 current_block1 = app_request 

839 

840 blockrequest = protocol.request(current_block1, handle_blockwise=False) 

841 blockresponse = await blockrequest.response 

842 

843 # store for future blocks to ensure that the next blocks will be 

844 # sent from the same source address (in the UDP case; for many 

845 # other transports it won't matter). carrying along locally set block size limitation 

846 if ( 

847 app_request.remote.maximum_block_size_exp 

848 < blockresponse.remote.maximum_block_size_exp 

849 ): 

850 blockresponse.remote.maximum_block_size_exp = ( 

851 app_request.remote.maximum_block_size_exp 

852 ) 

853 app_request.remote = blockresponse.remote 

854 

855 if blockresponse.opt.block1 is None: 

856 if blockresponse.code.is_successful() and current_block1.opt.block1: 

857 log.warning( 

858 "Block1 option completely ignored by server, assuming it knows what it is doing." 

859 ) 

860 # FIXME: handle 4.13 and retry with the indicated size option 

861 break 

862 

863 block1 = blockresponse.opt.block1 

864 log.debug( 

865 "Response with Block1 option received, number = %d, more = %d, size_exp = %d.", 

866 block1.block_number, 

867 block1.more, 

868 block1.size_exponent, 

869 ) 

870 

871 if block1.block_number != current_block1.opt.block1.block_number: 

872 raise error.UnexpectedBlock1Option("Block number mismatch") 

873 

874 if size_exp == 7: 

875 block_cursor += len(current_block1.payload) // 1024 

876 else: 

877 block_cursor += 1 

878 

879 while block1.size_exponent < size_exp: 

880 block_cursor *= 2 

881 size_exp -= 1 

882 

883 if not current_block1.opt.block1.more: 

884 if block1.more or blockresponse.code == CONTINUE: 

885 # treating this as a protocol error -- letting it slip 

886 # through would misrepresent the whole operation as an 

887 # over-all 2.xx (successful) one. 

888 raise error.UnexpectedBlock1Option( 

889 "Server asked for more data at end of body" 

890 ) 

891 break 

892 

893 # checks before preparing the next round: 

894 

895 if blockresponse.opt.observe: 

896 # we're not *really* interested in that block, we just sent an 

897 # observe option to indicate that we'll want to observe the 

898 # resulting representation as a whole 

899 log.warning( 

900 "Server answered Observe in early Block1 phase, cancelling the erroneous observation." 

901 ) 

902 blockrequest.observe.cancel() 

903 

904 if block1.more: 

905 # FIXME i think my own server is dowing this wrong 

906 # if response.code != CONTINUE: 

907 # raise error.UnexpectedBlock1Option("more-flag set but no Continue") 

908 pass 

909 else: 

910 if not blockresponse.code.is_successful(): 

911 break 

912 else: 

913 # ignoring (discarding) the successful intermediate result, waiting for a final one 

914 continue 

915 

916 lower_observation = None 

917 if app_request.opt.observe is not None: 

918 if blockresponse.opt.observe is not None: 

919 lower_observation = blockrequest.observation 

920 else: 

921 obs = weak_observation() 

922 if obs: 

923 obs.error(error.NotObservable()) 

924 del obs 

925 

926 assert blockresponse is not None, "Block1 loop broke without setting a response" 

927 blockresponse.opt.block1 = None 

928 

929 # FIXME check with RFC7959: it just says "send requests similar to the 

930 # requests in the Block1 phase", what does that mean? using the last 

931 # block1 as a reference for now, especially because in the 

932 # only-one-request-block case, that's the original request we must send 

933 # again and again anyway 

934 assembled_response = await cls._complete_by_requesting_block2( 

935 protocol, current_block1, blockresponse, log 

936 ) 

937 

938 response.set_result(assembled_response) 

939 # finally set the result 

940 

941 if lower_observation is not None: 

942 # FIXME this can all be simplified a lot since it's no more 

943 # expected that observations shut themselves down when GC'd. 

944 obs = weak_observation() 

945 del weak_observation 

946 if obs is None: 

947 lower_observation.cancel() 

948 return 

949 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask 

950 subtask = asyncio.create_task( 

951 cls._run_observation( 

952 app_request, 

953 lower_observation, 

954 future_weak_observation, 

955 protocol, 

956 log, 

957 ), 

958 name="Blockwise observation for %r" % app_request, 

959 ) 

960 future_weak_observation.set_result( 

961 weakref.ref(obs, lambda obs: subtask.cancel()) 

962 ) 

963 obs.on_cancel(subtask.cancel) 

964 del obs 

965 await subtask 

966 

967 @classmethod 

968 async def _run_observation( 

969 cls, original_request, lower_observation, future_weak_observation, protocol, log 

970 ): 

971 weak_observation = await future_weak_observation 

972 # we can use weak_observation() here at any time, because whenever that 

973 # becomes None, this task gets cancelled 

974 try: 

975 async for block1_notification in lower_observation: 

976 log.debug("Notification received") 

977 full_notification = await cls._complete_by_requesting_block2( 

978 protocol, original_request, block1_notification, log 

979 ) 

980 log.debug("Reporting completed notification") 

981 weak_observation().callback(full_notification) 

982 # FIXME verify that this loop actually ends iff the observation 

983 # was cancelled -- otherwise find out the cause(s) or make it not 

984 # cancel under indistinguishable circumstances 

985 weak_observation().error(error.ObservationCancelled()) 

986 except asyncio.CancelledError: 

987 return 

988 except Exception as e: 

989 weak_observation().error(e) 

990 finally: 

991 # We generally avoid idempotent cancellation, but we may have 

992 # reached this point either due to an earlier cancellation or 

993 # without one 

994 if not lower_observation.cancelled: 

995 lower_observation.cancel() 

996 

997 @classmethod 

998 async def _complete_by_requesting_block2( 

999 cls, protocol, request_to_repeat, initial_response, log 

1000 ): 

1001 # FIXME this can probably be deduplicated against BlockwiseRequest 

1002 

1003 if ( 

1004 initial_response.opt.block2 is None 

1005 or initial_response.opt.block2.more is False 

1006 ): 

1007 initial_response.opt.block2 = None 

1008 return initial_response 

1009 

1010 if initial_response.opt.block2.block_number != 0: 

1011 log.error("Error assembling blockwise response (expected first block)") 

1012 raise error.UnexpectedBlock2() 

1013 

1014 assembled_response = initial_response 

1015 last_response = initial_response 

1016 while True: 

1017 current_block2 = request_to_repeat._generate_next_block2_request( 

1018 assembled_response 

1019 ) 

1020 

1021 current_block2 = current_block2.copy(remote=initial_response.remote) 

1022 

1023 blockrequest = protocol.request(current_block2, handle_blockwise=False) 

1024 last_response = await blockrequest.response 

1025 

1026 if last_response.opt.block2 is None: 

1027 log.warning( 

1028 "Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response." 

1029 ) 

1030 return last_response 

1031 

1032 block2 = last_response.opt.block2 

1033 log.debug( 

1034 "Response with Block2 option received, number = %d, more = %d, size_exp = %d.", 

1035 block2.block_number, 

1036 block2.more, 

1037 block2.size_exponent, 

1038 ) 

1039 try: 

1040 assembled_response._append_response_block(last_response) 

1041 except error.Error as e: 

1042 log.error("Error assembling blockwise response, passing on error %r", e) 

1043 raise 

1044 

1045 if block2.more is False: 

1046 return assembled_response 

1047 

1048 

1049class ClientObservation: 

1050 """An interface to observe notification updates arriving on a request. 

1051 

1052 This class does not actually provide any of the observe functionality, it 

1053 is purely a container for dispatching the messages via asynchronous 

1054 iteration. It gets driven (ie. populated with responses or errors including 

1055 observation termination) by a Request object. 

1056 """ 

1057 

1058 def __init__(self): 

1059 self.callbacks = [] 

1060 self.errbacks = [] 

1061 

1062 self.cancelled = False 

1063 self._on_cancel = [] 

1064 

1065 self._latest_response = None 

1066 # the analogous error is stored in _cancellation_reason when cancelled. 

1067 

1068 def __aiter__(self): 

1069 """`async for` interface to observations. 

1070 

1071 This is the preferred interface to obtaining observations.""" 

1072 it = self._Iterator() 

1073 self.register_callback(it.push, _suppress_deprecation=True) 

1074 self.register_errback(it.push_err, _suppress_deprecation=True) 

1075 return it 

1076 

1077 class _Iterator: 

1078 def __init__(self): 

1079 self._future = asyncio.get_event_loop().create_future() 

1080 

1081 def push(self, item): 

1082 if self._future.done(): 

1083 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1084 self._future = asyncio.get_event_loop().create_future() 

1085 self._future.set_result(item) 

1086 

1087 def push_err(self, e): 

1088 if self._future.done(): 

1089 self._future = asyncio.get_event_loop().create_future() 

1090 self._future.set_exception(e) 

1091 

1092 async def __anext__(self): 

1093 f = self._future 

1094 try: 

1095 result = await self._future 

1096 # FIXME see `await servobs._trigger` comment: might waiting for 

1097 # the original future not yield the first future's result when 

1098 # a quick second future comes in in a push? 

1099 if f is self._future: 

1100 self._future = asyncio.get_event_loop().create_future() 

1101 return result 

1102 except (error.NotObservable, error.ObservationCancelled): 

1103 # only exit cleanly when the server -- right away or later -- 

1104 # states that the resource is not observable any more 

1105 # FIXME: check whether an unsuccessful message is still passed 

1106 # as an observation result (or whether it should be) 

1107 raise StopAsyncIteration 

1108 

1109 def __del__(self): 

1110 if self._future.done(): 

1111 try: 

1112 # Fetch the result so any errors show up at least in the 

1113 # finalizer output 

1114 self._future.result() 

1115 except (error.ObservationCancelled, error.NotObservable): 

1116 # This is the case at the end of an observation cancelled 

1117 # by the server. 

1118 pass 

1119 except error.NetworkError: 

1120 # This will already have shown up in the main result too. 

1121 pass 

1122 except (error.LibraryShutdown, asyncio.CancelledError): 

1123 pass 

1124 # Anything else flying out of this is unexpected and probably a 

1125 # library error 

1126 

1127 # When this function is removed, we can finally do cleanup better. Right 

1128 # now, someone could register a callback that doesn't hold any references, 

1129 # so we can't just stop the request when nobody holds a reference to this 

1130 # any more. Once we're all in pull mode, we can make the `process` function 

1131 # that sends data in here use a weak reference (because any possible 

1132 # recipient would need to hold a reference to self or the iterator, and 

1133 # thus _run). 

1134 def register_callback(self, callback, _suppress_deprecation=False): 

1135 """Call the callback whenever a response to the message comes in, and 

1136 pass the response to it. 

1137 

1138 The use of this function is deprecated: Use the asynchronous iteration 

1139 interface instead.""" 

1140 if not _suppress_deprecation: 

1141 warnings.warn( 

1142 "register_callback on observe results is deprecated: Use `async for notify in request.observation` instead.", 

1143 DeprecationWarning, 

1144 stacklevel=2, 

1145 ) 

1146 if self.cancelled: 

1147 return 

1148 

1149 self.callbacks.append(callback) 

1150 if self._latest_response is not None: 

1151 callback(self._latest_response) 

1152 

1153 def register_errback(self, callback, _suppress_deprecation=False): 

1154 """Call the callback whenever something goes wrong with the 

1155 observation, and pass an exception to the callback. After such a 

1156 callback is called, no more callbacks will be issued. 

1157 

1158 The use of this function is deprecated: Use the asynchronous iteration 

1159 interface instead.""" 

1160 if not _suppress_deprecation: 

1161 warnings.warn( 

1162 "register_errback on observe results is deprecated: Use `async for notify in request.observation` instead.", 

1163 DeprecationWarning, 

1164 stacklevel=2, 

1165 ) 

1166 if self.cancelled: 

1167 callback(self._cancellation_reason) 

1168 return 

1169 self.errbacks.append(callback) 

1170 

1171 def callback(self, response): 

1172 """Notify all listeners of an incoming response""" 

1173 

1174 self._latest_response = response 

1175 

1176 for c in self.callbacks: 

1177 c(response) 

1178 

1179 def error(self, exception): 

1180 """Notify registered listeners that the observation went wrong. This 

1181 can only be called once.""" 

1182 

1183 if self.errbacks is None: 

1184 raise RuntimeError( 

1185 "Error raised in an already cancelled ClientObservation" 

1186 ) from exception 

1187 for c in self.errbacks: 

1188 c(exception) 

1189 

1190 self.cancel() 

1191 self._cancellation_reason = exception 

1192 

1193 def cancel(self): 

1194 # FIXME determine whether this is called by anything other than error, 

1195 # and make it private so there is always a _cancellation_reason 

1196 """Cease to generate observation or error events. This will not 

1197 generate an error by itself. 

1198 

1199 This function is only needed while register_callback and 

1200 register_errback are around; once their deprecations are acted on, 

1201 dropping the asynchronous iterator will automatically cancel the 

1202 observation. 

1203 """ 

1204 

1205 assert not self.cancelled, "ClientObservation cancelled twice" 

1206 

1207 # make sure things go wrong when someone tries to continue this 

1208 self.errbacks = None 

1209 self.callbacks = None 

1210 

1211 self.cancelled = True 

1212 while self._on_cancel: 

1213 self._on_cancel.pop()() 

1214 

1215 self._cancellation_reason = None 

1216 

1217 def on_cancel(self, callback): 

1218 if self.cancelled: 

1219 callback() 

1220 self._on_cancel.append(callback) 

1221 

1222 def __repr__(self): 

1223 return "<%s %s at %#x>" % ( 

1224 type(self).__name__, 

1225 "(cancelled)" 

1226 if self.cancelled 

1227 else "(%s call-, %s errback(s))" 

1228 % (len(self.callbacks), len(self.errbacks)), 

1229 id(self), 

1230 ) 

1231 

1232 

1233class ServerObservation: 

1234 def __init__(self): 

1235 self._accepted = False 

1236 self._trigger = asyncio.get_event_loop().create_future() 

1237 # A deregistration is "early" if it happens before the response message 

1238 # is actually sent; calling deregister() in that time (typically during 

1239 # `render()`) will not send an unsuccessful response message but just 

1240 # sent this flag which is set to None as soon as it is too late for an 

1241 # early deregistration. 

1242 # This mechanism is temporary until more of aiocoap behaves like 

1243 # Pipe which does not suffer from this limitation. 

1244 self._early_deregister = False 

1245 self._late_deregister = False 

1246 

1247 def accept(self, cancellation_callback): 

1248 self._accepted = True 

1249 self._cancellation_callback = cancellation_callback 

1250 

1251 def deregister(self, reason=None): 

1252 if self._early_deregister is False: 

1253 self._early_deregister = True 

1254 return 

1255 

1256 warnings.warn( 

1257 "Late use of ServerObservation.deregister() is" 

1258 " deprecated, use .trigger with an unsuccessful value" 

1259 " instead", 

1260 DeprecationWarning, 

1261 ) 

1262 self.trigger( 

1263 Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable") 

1264 ) 

1265 

1266 def trigger(self, response=None, *, is_last=False): 

1267 """Send an updated response; if None is given, the observed resource's 

1268 rendering will be invoked to produce one. 

1269 

1270 `is_last` can be set to True to indicate that no more responses will be 

1271 sent. Note that an unsuccessful response will be the last no matter 

1272 what is_last says, as such a message always terminates a CoAP 

1273 observation.""" 

1274 if is_last: 

1275 self._late_deregister = True 

1276 if self._trigger.done(): 

1277 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1278 self._trigger = asyncio.get_event_loop().create_future() 

1279 self._trigger.set_result(response)