Coverage for src/aiocoap/protocol.py: 0%

495 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-12 11:18 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module contains the classes that are responsible for keeping track of 

6messages: 

7 

8* :class:`Context` roughly represents the CoAP endpoint (basically a UDP 

9 socket) -- something that can send requests and possibly can answer 

10 incoming requests. 

11 

12 Incoming requests are processed in tasks created by the context. 

13 

14* a :class:`Request` gets generated whenever a request gets sent to keep 

15 track of the response 

16 

17Logging 

18~~~~~~~ 

19 

20Several constructors of the Context accept a logger name; these names go into 

21the construction of a Python logger. 

22 

23Log events will be emitted to these on different levels, with "warning" and 

24above being a practical default for things that should may warrant reviewing by 

25an operator: 

26 

27* DEBUG is used for things that occur even under perfect conditions. 

28* INFO is for things that are well expected, but might be interesting during 

29 testing a network of nodes and not just when debugging the library. (This 

30 includes timeouts, retransmissions, and pings.) 

31* WARNING is for everything that indicates a malbehaved peer. These don't 

32 *necessarily* indicate a client bug, though: Things like requesting a 

33 nonexistent block can just as well happen when a resource's content has 

34 changed between blocks. The library will not go out of its way to determine 

35 whether there is a plausible explanation for the odd behavior, and will 

36 report something as a warning in case of doubt. 

37* ERROR is used when something clearly went wrong. This includes irregular 

38 connection terminations and resource handler errors (which are demoted to 

39 error responses), and can often contain a backtrace. 

40 

41Logs will generally reveal messages exchanged between this and other systems, 

42and attackers can observe their encrypted counterparts. Private or shared keys 

43are only logged through an internal `log_secret` function, which usually 

44replaces them with a redacted value. Setting the ``AIOCOAP_REVEAL_KEYS`` 

45environment variable to the value ``show secrets in logs`` bypasses that 

46mechanism. As an additional precaution, this is only accepted if the effective 

47user has write access to the aiocoap source code. 

48""" 

49 

50import asyncio 

51import weakref 

52import time 

53from typing import Optional, List 

54 

55from . import defaults 

56from .credentials import CredentialsMap 

57from .message import Message 

58from .messagemanager import MessageManager 

59from .tokenmanager import TokenManager 

60from .pipe import Pipe, run_driving_pipe, error_to_message 

61from . import interfaces 

62from . import error 

63from .numbers import INTERNAL_SERVER_ERROR, NOT_FOUND, CONTINUE, SHUTDOWN_TIMEOUT 

64 

65import warnings 

66import logging 

67 

68 

69class Context(interfaces.RequestProvider): 

70 """Applications' entry point to the network 

71 

72 A :class:`.Context` coordinates one or more network :mod:`.transports` 

73 implementations and dispatches data between them and the application. 

74 

75 The application can start requests using the message dispatch methods, and 

76 set a :class:`resources.Site` that will answer requests directed to the 

77 application as a server. 

78 

79 On the library-internals side, it is the prime implementation of the 

80 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and 

81 :class:`Response` classes on demand, and decides which transport 

82 implementations to start and which are to handle which messages. 

83 

84 **Context creation and destruction** 

85 

86 The following functions are provided for creating and stopping a context: 

87 

88 .. note:: 

89 

90 A typical application should only ever create one context, even (or 

91 especially when) it acts both as a server and as a client (in which 

92 case a server context should be created). 

93 

94 A context that is not used any more must be shut down using 

95 :meth:`.shutdown()`, but typical applications will not need to because 

96 they use the context for the full process lifetime. 

97 

98 .. automethod:: create_client_context 

99 .. automethod:: create_server_context 

100 

101 .. automethod:: shutdown 

102 

103 **Dispatching messages** 

104 

105 CoAP requests can be sent using the following functions: 

106 

107 .. automethod:: request 

108 

109 If more control is needed, you can create a :class:`Request` yourself and 

110 pass the context to it. 

111 

112 

113 **Other methods and properties** 

114 

115 The remaining methods and properties are to be considered unstable even 

116 when the project reaches a stable version number; please file a feature 

117 request for stabilization if you want to reliably access any of them. 

118 """ 

119 

120 def __init__( 

121 self, 

122 loop=None, 

123 serversite=None, 

124 loggername="coap", 

125 client_credentials=None, 

126 server_credentials=None, 

127 ): 

128 self.log = logging.getLogger(loggername) 

129 

130 self.loop = loop or asyncio.get_event_loop() 

131 

132 self.serversite = serversite 

133 

134 self.request_interfaces = [] 

135 

136 self.client_credentials = client_credentials or CredentialsMap() 

137 self.server_credentials = server_credentials or CredentialsMap() 

138 

139 # 

140 # convenience methods for class instanciation 

141 # 

142 

143 async def _append_tokenmanaged_messagemanaged_transport( 

144 self, message_interface_constructor 

145 ): 

146 tman = TokenManager(self) 

147 mman = MessageManager(tman) 

148 transport = await message_interface_constructor(mman) 

149 

150 mman.message_interface = transport 

151 tman.token_interface = mman 

152 

153 self.request_interfaces.append(tman) 

154 

155 async def _append_tokenmanaged_transport(self, token_interface_constructor): 

156 tman = TokenManager(self) 

157 transport = await token_interface_constructor(tman) 

158 

159 tman.token_interface = transport 

160 

161 self.request_interfaces.append(tman) 

162 

163 @classmethod 

164 async def create_client_context( 

165 cls, *, loggername="coap", loop=None, transports: Optional[List[str]] = None 

166 ): 

167 """Create a context bound to all addresses on a random listening port. 

168 

169 This is the easiest way to get a context suitable for sending client 

170 requests. 

171 

172 :meta private: 

173 (not actually private, just hiding from automodule due to being 

174 grouped with the important functions) 

175 """ 

176 

177 if loop is None: 

178 loop = asyncio.get_event_loop() 

179 

180 self = cls(loop=loop, serversite=None, loggername=loggername) 

181 

182 selected_transports = transports or defaults.get_default_clienttransports( 

183 loop=loop 

184 ) 

185 

186 # FIXME make defaults overridable (postponed until they become configurable too) 

187 for transportname in selected_transports: 

188 if transportname == "udp6": 

189 from .transports.udp6 import MessageInterfaceUDP6 

190 

191 await self._append_tokenmanaged_messagemanaged_transport( 

192 lambda mman: MessageInterfaceUDP6.create_client_transport_endpoint( 

193 mman, log=self.log, loop=loop 

194 ) 

195 ) 

196 elif transportname == "simple6": 

197 from .transports.simple6 import MessageInterfaceSimple6 

198 

199 await self._append_tokenmanaged_messagemanaged_transport( 

200 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint( 

201 mman, log=self.log, loop=loop 

202 ) 

203 ) 

204 elif transportname == "tinydtls": 

205 from .transports.tinydtls import MessageInterfaceTinyDTLS 

206 

207 await self._append_tokenmanaged_messagemanaged_transport( 

208 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint( 

209 mman, log=self.log, loop=loop 

210 ) 

211 ) 

212 elif transportname == "tcpclient": 

213 from .transports.tcp import TCPClient 

214 

215 await self._append_tokenmanaged_transport( 

216 lambda tman: TCPClient.create_client_transport(tman, self.log, loop) 

217 ) 

218 elif transportname == "tlsclient": 

219 from .transports.tls import TLSClient 

220 

221 await self._append_tokenmanaged_transport( 

222 lambda tman: TLSClient.create_client_transport( 

223 tman, self.log, loop, self.client_credentials 

224 ) 

225 ) 

226 elif transportname == "ws": 

227 from .transports.ws import WSPool 

228 

229 await self._append_tokenmanaged_transport( 

230 lambda tman: WSPool.create_transport( 

231 tman, self.log, loop, client_credentials=self.client_credentials 

232 ) 

233 ) 

234 elif transportname == "oscore": 

235 from .transports.oscore import TransportOSCORE 

236 

237 oscoretransport = TransportOSCORE(self, self) 

238 self.request_interfaces.append(oscoretransport) 

239 else: 

240 raise RuntimeError( 

241 "Transport %r not know for client context creation" % transportname 

242 ) 

243 

244 return self 

245 

246 @classmethod 

247 async def create_server_context( 

248 cls, 

249 site, 

250 bind=None, 

251 *, 

252 loggername="coap-server", 

253 loop=None, 

254 _ssl_context=None, 

255 multicast=[], 

256 server_credentials=None, 

257 transports: Optional[List[str]] = None, 

258 ): 

259 """Create a context, bound to all addresses on the CoAP port (unless 

260 otherwise specified in the ``bind`` argument). 

261 

262 This is the easiest way to get a context suitable both for sending 

263 client and accepting server requests. 

264 

265 The ``bind`` argument, if given, needs to be a 2-tuple of IP address 

266 string and port number, where the port number can be None to use the default port. 

267 

268 If ``multicast`` is given, it needs to be a list of (multicast address, 

269 interface name) tuples, which will all be joined. (The IPv4 style of 

270 selecting the interface by a local address is not supported; users may 

271 want to use the netifaces package to arrive at an interface name for an 

272 address). 

273 

274 As a shortcut, the list may also contain interface names alone. Those 

275 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with 

276 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6. 

277 

278 Under some circumstances you may already need a context to pass into 

279 the site for creation; this is typically the case for servers that 

280 trigger requests on their own. For those cases, it is usually easiest 

281 to pass None in as a site, and set the fully constructed site later by 

282 assigning to the ``serversite`` attribute. 

283 

284 :meta private: 

285 (not actually private, just hiding from automodule due to being 

286 grouped with the important functions) 

287 """ 

288 

289 if loop is None: 

290 loop = asyncio.get_event_loop() 

291 

292 self = cls( 

293 loop=loop, 

294 serversite=site, 

295 loggername=loggername, 

296 server_credentials=server_credentials, 

297 ) 

298 

299 multicast_done = not multicast 

300 

301 selected_transports = transports or defaults.get_default_servertransports( 

302 loop=loop 

303 ) 

304 

305 for transportname in selected_transports: 

306 if transportname == "udp6": 

307 from .transports.udp6 import MessageInterfaceUDP6 

308 

309 await self._append_tokenmanaged_messagemanaged_transport( 

310 lambda mman: MessageInterfaceUDP6.create_server_transport_endpoint( 

311 mman, log=self.log, loop=loop, bind=bind, multicast=multicast 

312 ) 

313 ) 

314 multicast_done = True 

315 # FIXME this is duplicated from the client version, as those are client-only anyway 

316 elif transportname == "simple6": 

317 from .transports.simple6 import MessageInterfaceSimple6 

318 

319 await self._append_tokenmanaged_messagemanaged_transport( 

320 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint( 

321 mman, log=self.log, loop=loop 

322 ) 

323 ) 

324 elif transportname == "tinydtls": 

325 from .transports.tinydtls import MessageInterfaceTinyDTLS 

326 

327 await self._append_tokenmanaged_messagemanaged_transport( 

328 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint( 

329 mman, log=self.log, loop=loop 

330 ) 

331 ) 

332 # FIXME end duplication 

333 elif transportname == "tinydtls_server": 

334 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer 

335 

336 await self._append_tokenmanaged_messagemanaged_transport( 

337 lambda mman: MessageInterfaceTinyDTLSServer.create_server( 

338 bind, 

339 mman, 

340 log=self.log, 

341 loop=loop, 

342 server_credentials=self.server_credentials, 

343 ) 

344 ) 

345 elif transportname == "simplesocketserver": 

346 from .transports.simplesocketserver import MessageInterfaceSimpleServer 

347 

348 await self._append_tokenmanaged_messagemanaged_transport( 

349 lambda mman: MessageInterfaceSimpleServer.create_server( 

350 bind, mman, log=self.log, loop=loop 

351 ) 

352 ) 

353 elif transportname == "tcpserver": 

354 from .transports.tcp import TCPServer 

355 

356 await self._append_tokenmanaged_transport( 

357 lambda tman: TCPServer.create_server(bind, tman, self.log, loop) 

358 ) 

359 elif transportname == "tcpclient": 

360 from .transports.tcp import TCPClient 

361 

362 await self._append_tokenmanaged_transport( 

363 lambda tman: TCPClient.create_client_transport(tman, self.log, loop) 

364 ) 

365 elif transportname == "tlsserver": 

366 if _ssl_context is not None: 

367 from .transports.tls import TLSServer 

368 

369 await self._append_tokenmanaged_transport( 

370 lambda tman: TLSServer.create_server( 

371 bind, tman, self.log, loop, _ssl_context 

372 ) 

373 ) 

374 elif transportname == "tlsclient": 

375 from .transports.tls import TLSClient 

376 

377 await self._append_tokenmanaged_transport( 

378 lambda tman: TLSClient.create_client_transport( 

379 tman, self.log, loop, self.client_credentials 

380 ) 

381 ) 

382 elif transportname == "ws": 

383 from .transports.ws import WSPool 

384 

385 await self._append_tokenmanaged_transport( 

386 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind 

387 lambda tman: WSPool.create_transport( 

388 tman, 

389 self.log, 

390 loop, 

391 client_credentials=self.client_credentials, 

392 server_bind=bind or (None, None), 

393 server_context=_ssl_context, 

394 ) 

395 ) 

396 elif transportname == "oscore": 

397 from .transports.oscore import TransportOSCORE 

398 

399 oscoretransport = TransportOSCORE(self, self) 

400 self.request_interfaces.append(oscoretransport) 

401 else: 

402 raise RuntimeError( 

403 "Transport %r not know for server context creation" % transportname 

404 ) 

405 

406 if not multicast_done: 

407 self.log.warning( 

408 "Multicast was requested, but no multicast capable transport was selected." 

409 ) 

410 

411 # This is used in tests to wait for externally launched servers to be ready 

412 self.log.debug("Server ready to receive requests") 

413 

414 return self 

415 

416 async def shutdown(self): 

417 """Take down any listening sockets and stop all related timers. 

418 

419 After this coroutine terminates, and once all external references to 

420 the object are dropped, it should be garbage-collectable. 

421 

422 This method takes up to 

423 :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing 

424 transports to perform any cleanup implemented in them (such as orderly 

425 connection shutdown and cancelling observations, where the latter is 

426 currently not implemented). 

427 

428 :meta private: 

429 (not actually private, just hiding from automodule due to being 

430 grouped with the important functions) 

431 """ 

432 

433 self.log.debug("Shutting down context") 

434 

435 done, pending = await asyncio.wait( 

436 [ 

437 asyncio.create_task( 

438 ri.shutdown(), 

439 name="Shutdown of %r" % ri, 

440 ) 

441 for ri in self.request_interfaces 

442 ], 

443 timeout=SHUTDOWN_TIMEOUT, 

444 ) 

445 for item in done: 

446 await item 

447 if pending: 

448 # Apart from being useful to see, this also ensures that developers 

449 # see the error in the logs during test suite runs -- and the error 

450 # should be easier to follow than the "we didn't garbage collect 

451 # everything" errors we see anyway (or otherwise, if the error is 

452 # escalated into a test failure) 

453 self.log.error( 

454 "Shutdown timeout exceeded, returning anyway. Interfaces still busy: %s", 

455 pending, 

456 ) 

457 

458 # FIXME: determine how official this should be, or which part of it is 

459 # public -- now that BlockwiseRequest uses it. (And formalize what can 

460 # change about messages and what can't after the remote has been thusly 

461 # populated). 

462 async def find_remote_and_interface(self, message): 

463 if message.remote is None: 

464 raise error.MissingRemoteError() 

465 for ri in self.request_interfaces: 

466 if await ri.fill_or_recognize_remote(message): 

467 return ri 

468 raise RuntimeError("No request interface could route message") 

469 

470 def request(self, request_message, handle_blockwise=True): 

471 if handle_blockwise: 

472 return BlockwiseRequest(self, request_message) 

473 

474 pipe = Pipe(request_message, self.log) 

475 # Request sets up callbacks at creation 

476 result = Request(pipe, self.loop, self.log) 

477 

478 async def send(): 

479 try: 

480 request_interface = await self.find_remote_and_interface( 

481 request_message 

482 ) 

483 request_interface.request(pipe) 

484 except Exception as e: 

485 pipe.add_exception(e) 

486 return 

487 

488 self.loop.create_task( 

489 send(), 

490 name="Request processing of %r" % result, 

491 ) 

492 return result 

493 

494 # the following are under consideration for moving into Site or something 

495 # mixed into it 

496 

497 def render_to_pipe(self, pipe): 

498 """Fill a pipe by running the site's render_to_pipe interface and 

499 handling errors.""" 

500 

501 pr_that_can_receive_errors = error_to_message(pipe, self.log) 

502 

503 run_driving_pipe( 

504 pr_that_can_receive_errors, 

505 self._render_to_pipe(pipe), 

506 name="Rendering for %r" % pipe.request, 

507 ) 

508 

509 async def _render_to_pipe(self, pipe): 

510 if self.serversite is None: 

511 pipe.add_response( 

512 Message(code=NOT_FOUND, payload=b"not a server"), is_last=True 

513 ) 

514 return 

515 

516 return await self.serversite.render_to_pipe(pipe) 

517 

518 

519class BaseRequest(object): 

520 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`""" 

521 

522 

523class BaseUnicastRequest(BaseRequest): 

524 """A utility class that offers the :attr:`response_raising` and 

525 :attr:`response_nonraising` alternatives to waiting for the 

526 :attr:`response` future whose error states can be presented either as an 

527 unsuccessful response (eg. 4.04) or an exception. 

528 

529 It also provides some internal tools for handling anything that has a 

530 :attr:`response` future and an :attr:`observation`""" 

531 

532 @property 

533 async def response_raising(self): 

534 """An awaitable that returns if a response comes in and is successful, 

535 otherwise raises generic network exception or a 

536 :class:`.error.ResponseWrappingError` for unsuccessful responses. 

537 

538 Experimental Interface.""" 

539 

540 response = await self.response 

541 if not response.code.is_successful(): 

542 raise error.ResponseWrappingError(response) 

543 

544 return response 

545 

546 @property 

547 async def response_nonraising(self): 

548 """An awaitable that rather returns a 500ish fabricated message (as a 

549 proxy would return) instead of raising an exception. 

550 

551 Experimental Interface.""" 

552 

553 # FIXME: Can we smuggle error_to_message into the underlying pipe? 

554 # That should make observe notifications into messages rather 

555 # than exceptions as well, plus it has fallbacks for `e.to_message()` 

556 # raising. 

557 

558 try: 

559 return await self.response 

560 except error.RenderableError as e: 

561 return e.to_message() 

562 except Exception: 

563 return Message(code=INTERNAL_SERVER_ERROR) 

564 

565 

566class Request(interfaces.Request, BaseUnicastRequest): 

567 # FIXME: Implement timing out with REQUEST_TIMEOUT here 

568 

569 def __init__(self, pipe, loop, log): 

570 self._pipe = pipe 

571 

572 self.response = loop.create_future() 

573 

574 if pipe.request.opt.observe == 0: 

575 self.observation = ClientObservation() 

576 else: 

577 self.observation = None 

578 

579 self._runner = self._run() 

580 self._runner.send(None) 

581 

582 def process(event): 

583 try: 

584 # would be great to have self or the runner as weak ref, but 

585 # see ClientObservation.register_callback comments -- while 

586 # that is around, we can't weakref here. 

587 self._runner.send(event) 

588 return True 

589 except StopIteration: 

590 return False 

591 

592 self._stop_interest = self._pipe.on_event(process) 

593 

594 self.log = log 

595 

596 self.response.add_done_callback(self._response_cancellation_handler) 

597 

598 def _response_cancellation_handler(self, response): 

599 # Propagate cancellation to the runner (if interest in the first 

600 # response is lost, there won't be observation items to pull out), but 

601 # not general completion (because if it's completed and not cancelled, 

602 # eg. when an observation is active) 

603 if self.response.cancelled() and self._runner is not None: 

604 # Dropping the only reference makes it stop with GeneratorExit, 

605 # similar to a cancelled task 

606 self._runner = None 

607 self._stop_interest() 

608 # Otherwise, there will be a runner still around, and it's its task to 

609 # call _stop_interest. 

610 

611 @staticmethod 

612 def _add_response_properties(response, request): 

613 response.request = request 

614 

615 def _run(self): 

616 # FIXME: This is in iterator form because it used to be a task that 

617 # awaited futures, and that code could be easily converted to an 

618 # iterator. I'm not sure that's a bad state here, but at least it 

619 # should be a more conscious decision to make this an iterator rather 

620 # than just having it happen to be one. 

621 # 

622 # FIXME: check that responses come from the same remmote as long as we're assuming unicast 

623 

624 first_event = yield None 

625 

626 if first_event.message is not None: 

627 self._add_response_properties(first_event.message, self._pipe.request) 

628 self.response.set_result(first_event.message) 

629 else: 

630 self.response.set_exception(first_event.exception) 

631 if not isinstance(first_event.exception, error.Error): 

632 self.log.warning( 

633 "An exception that is not an aiocoap Error was raised " 

634 "from a transport; please report this as a bug in " 

635 "aiocoap: %r", 

636 first_event.exception, 

637 ) 

638 

639 if self.observation is None: 

640 if not first_event.is_last: 

641 self.log.error( 

642 "Pipe indicated more possible responses" 

643 " while the Request handler would not know what to" 

644 " do with them, stopping any further request." 

645 ) 

646 self._stop_interest() 

647 return 

648 

649 if first_event.is_last: 

650 self.observation.error(error.NotObservable()) 

651 return 

652 

653 if first_event.message.opt.observe is None: 

654 self.log.error( 

655 "Pipe indicated more possible responses" 

656 " while the Request handler would not know what to" 

657 " do with them, stopping any further request." 

658 ) 

659 self._stop_interest() 

660 return 

661 

662 # variable names from RFC7641 Section 3.4 

663 v1 = first_event.message.opt.observe 

664 t1 = time.time() 

665 

666 while True: 

667 # We don't really support cancellation of observations yet (see 

668 # https://github.com/chrysn/aiocoap/issues/92), but at least 

669 # stopping the interest is a way to free the local resources after 

670 # the first observation update, and to make the MID handler RST the 

671 # observation on the next. 

672 # FIXME: there *is* now a .on_cancel callback, we should at least 

673 # hook into that, and possibly even send a proper cancellation 

674 # then. 

675 next_event = yield True 

676 if self.observation.cancelled: 

677 self._stop_interest() 

678 return 

679 

680 if next_event.exception is not None: 

681 self.observation.error(next_event.exception) 

682 if not next_event.is_last: 

683 self._stop_interest() 

684 if not isinstance(next_event.exception, error.Error): 

685 self.log.warning( 

686 "An exception that is not an aiocoap Error was " 

687 "raised from a transport during an observation; " 

688 "please report this as a bug in aiocoap: %r", 

689 next_event.exception, 

690 ) 

691 return 

692 

693 self._add_response_properties(next_event.message, self._pipe.request) 

694 

695 if next_event.message.opt.observe is not None: 

696 # check for reordering 

697 v2 = next_event.message.opt.observe 

698 t2 = time.time() 

699 

700 is_recent = ( 

701 (v1 < v2 and v2 - v1 < 2**23) 

702 or (v1 > v2 and v1 - v2 > 2**23) 

703 or ( 

704 t2 

705 > t1 

706 + self._pipe.request.transport_tuning.OBSERVATION_RESET_TIME 

707 ) 

708 ) 

709 if is_recent: 

710 t1 = t2 

711 v1 = v2 

712 else: 

713 # the terminal message is always the last 

714 is_recent = True 

715 

716 if is_recent: 

717 self.observation.callback(next_event.message) 

718 

719 if next_event.is_last: 

720 self.observation.error(error.ObservationCancelled()) 

721 return 

722 

723 if next_event.message.opt.observe is None: 

724 self.observation.error(error.ObservationCancelled()) 

725 self.log.error( 

726 "Pipe indicated more possible responses" 

727 " while the Request handler would not know what to" 

728 " do with them, stopping any further request." 

729 ) 

730 self._stop_interest() 

731 return 

732 

733 

734class BlockwiseRequest(BaseUnicastRequest, interfaces.Request): 

735 def __init__(self, protocol, app_request): 

736 self.protocol = protocol 

737 self.log = self.protocol.log.getChild("blockwise-requester") 

738 

739 self.response = protocol.loop.create_future() 

740 

741 if app_request.opt.observe is not None: 

742 self.observation = ClientObservation() 

743 else: 

744 self.observation = None 

745 

746 self._runner = protocol.loop.create_task( 

747 self._run_outer( 

748 app_request, 

749 self.response, 

750 weakref.ref(self.observation) 

751 if self.observation is not None 

752 else lambda: None, 

753 self.protocol, 

754 self.log, 

755 ), 

756 name="Blockwise runner for %r" % app_request, 

757 ) 

758 self.response.add_done_callback(self._response_cancellation_handler) 

759 

760 def _response_cancellation_handler(self, response_future): 

761 # see Request._response_cancellation_handler 

762 if self.response.cancelled(): 

763 self._runner.cancel() 

764 

765 @classmethod 

766 async def _run_outer(cls, app_request, response, weak_observation, protocol, log): 

767 try: 

768 await cls._run(app_request, response, weak_observation, protocol, log) 

769 except asyncio.CancelledError: 

770 pass # results already set 

771 except Exception as e: 

772 logged = False 

773 if not response.done(): 

774 logged = True 

775 response.set_exception(e) 

776 obs = weak_observation() 

777 if app_request.opt.observe is not None and obs is not None: 

778 logged = True 

779 obs.error(e) 

780 if not logged: 

781 # should be unreachable 

782 log.error( 

783 "Exception in BlockwiseRequest runner neither went to response nor to observation: %s", 

784 e, 

785 exc_info=e, 

786 ) 

787 

788 # This is a class method because that allows self and self.observation to 

789 # be freed even when this task is running, and the task to stop itself -- 

790 # otherwise we couldn't know when users just "forget" about a request 

791 # object after using its response (esp. in observe cases) and leave this 

792 # task running. 

793 @classmethod 

794 async def _run(cls, app_request, response, weak_observation, protocol, log): 

795 # we need to populate the remote right away, because the choice of 

796 # blocks depends on it. 

797 await protocol.find_remote_and_interface(app_request) 

798 

799 size_exp = app_request.remote.maximum_block_size_exp 

800 

801 if app_request.opt.block1 is not None: 

802 warnings.warn( 

803 "Setting a block1 option in a managed block-wise transfer is deprecated. Instead, set request.remote.maximum_block_size_exp to the desired value", 

804 DeprecationWarning, 

805 stacklevel=2, 

806 ) 

807 assert app_request.opt.block1.block_number == 0, ( 

808 "Unexpected block number in app_request" 

809 ) 

810 assert not app_request.opt.block1.more, ( 

811 "Unexpected more-flag in app_request" 

812 ) 

813 # this is where the library user can traditionally pass in size 

814 # exponent hints into the library. 

815 size_exp = app_request.opt.block1.size_exponent 

816 

817 # Offset in the message in blocks of size_exp. Whoever changes size_exp 

818 # is responsible for updating this number. 

819 block_cursor = 0 

820 

821 while True: 

822 # ... send a chunk 

823 

824 if size_exp >= 6: 

825 # FIXME from maximum_payload_size 

826 fragmentation_threshold = app_request.remote.maximum_payload_size 

827 else: 

828 fragmentation_threshold = 2 ** (size_exp + 4) 

829 

830 if ( 

831 app_request.opt.block1 is not None 

832 or len(app_request.payload) > fragmentation_threshold 

833 ): 

834 current_block1 = app_request._extract_block( 

835 block_cursor, size_exp, app_request.remote.maximum_payload_size 

836 ) 

837 if block_cursor == 0: 

838 current_block1.opt.size1 = len(app_request.payload) 

839 else: 

840 current_block1 = app_request 

841 

842 blockrequest = protocol.request(current_block1, handle_blockwise=False) 

843 blockresponse = await blockrequest.response 

844 

845 # store for future blocks to ensure that the next blocks will be 

846 # sent from the same source address (in the UDP case; for many 

847 # other transports it won't matter). carrying along locally set block size limitation 

848 if ( 

849 app_request.remote.maximum_block_size_exp 

850 < blockresponse.remote.maximum_block_size_exp 

851 ): 

852 blockresponse.remote.maximum_block_size_exp = ( 

853 app_request.remote.maximum_block_size_exp 

854 ) 

855 app_request.remote = blockresponse.remote 

856 

857 if blockresponse.opt.block1 is None: 

858 if blockresponse.code.is_successful() and current_block1.opt.block1: 

859 log.warning( 

860 "Block1 option completely ignored by server, assuming it knows what it is doing." 

861 ) 

862 # FIXME: handle 4.13 and retry with the indicated size option 

863 break 

864 

865 block1 = blockresponse.opt.block1 

866 log.debug( 

867 "Response with Block1 option received, number = %d, more = %d, size_exp = %d.", 

868 block1.block_number, 

869 block1.more, 

870 block1.size_exponent, 

871 ) 

872 

873 if block1.block_number != current_block1.opt.block1.block_number: 

874 raise error.UnexpectedBlock1Option("Block number mismatch") 

875 

876 if size_exp == 7: 

877 block_cursor += len(current_block1.payload) // 1024 

878 else: 

879 block_cursor += 1 

880 

881 while block1.size_exponent < size_exp: 

882 block_cursor *= 2 

883 size_exp -= 1 

884 

885 if not current_block1.opt.block1.more: 

886 if block1.more or blockresponse.code == CONTINUE: 

887 # treating this as a protocol error -- letting it slip 

888 # through would misrepresent the whole operation as an 

889 # over-all 2.xx (successful) one. 

890 raise error.UnexpectedBlock1Option( 

891 "Server asked for more data at end of body" 

892 ) 

893 break 

894 

895 # checks before preparing the next round: 

896 

897 if blockresponse.opt.observe: 

898 # we're not *really* interested in that block, we just sent an 

899 # observe option to indicate that we'll want to observe the 

900 # resulting representation as a whole 

901 log.warning( 

902 "Server answered Observe in early Block1 phase, cancelling the erroneous observation." 

903 ) 

904 blockrequest.observe.cancel() 

905 

906 if block1.more: 

907 # FIXME i think my own server is dowing this wrong 

908 # if response.code != CONTINUE: 

909 # raise error.UnexpectedBlock1Option("more-flag set but no Continue") 

910 pass 

911 else: 

912 if not blockresponse.code.is_successful(): 

913 break 

914 else: 

915 # ignoring (discarding) the successul intermediate result, waiting for a final one 

916 continue 

917 

918 lower_observation = None 

919 if app_request.opt.observe is not None: 

920 if blockresponse.opt.observe is not None: 

921 lower_observation = blockrequest.observation 

922 else: 

923 obs = weak_observation() 

924 if obs: 

925 obs.error(error.NotObservable()) 

926 del obs 

927 

928 assert blockresponse is not None, "Block1 loop broke without setting a response" 

929 blockresponse.opt.block1 = None 

930 

931 # FIXME check with RFC7959: it just says "send requests similar to the 

932 # requests in the Block1 phase", what does that mean? using the last 

933 # block1 as a reference for now, especially because in the 

934 # only-one-request-block case, that's the original request we must send 

935 # again and again anyway 

936 assembled_response = await cls._complete_by_requesting_block2( 

937 protocol, current_block1, blockresponse, log 

938 ) 

939 

940 response.set_result(assembled_response) 

941 # finally set the result 

942 

943 if lower_observation is not None: 

944 # FIXME this can all be simplified a lot since it's no more 

945 # expected that observations shut themselves down when GC'd. 

946 obs = weak_observation() 

947 del weak_observation 

948 if obs is None: 

949 lower_observation.cancel() 

950 return 

951 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask 

952 subtask = asyncio.create_task( 

953 cls._run_observation( 

954 app_request, 

955 lower_observation, 

956 future_weak_observation, 

957 protocol, 

958 log, 

959 ), 

960 name="Blockwise observation for %r" % app_request, 

961 ) 

962 future_weak_observation.set_result( 

963 weakref.ref(obs, lambda obs: subtask.cancel()) 

964 ) 

965 obs.on_cancel(subtask.cancel) 

966 del obs 

967 await subtask 

968 

969 @classmethod 

970 async def _run_observation( 

971 cls, original_request, lower_observation, future_weak_observation, protocol, log 

972 ): 

973 weak_observation = await future_weak_observation 

974 # we can use weak_observation() here at any time, because whenever that 

975 # becomes None, this task gets cancelled 

976 try: 

977 async for block1_notification in lower_observation: 

978 log.debug("Notification received") 

979 full_notification = await cls._complete_by_requesting_block2( 

980 protocol, original_request, block1_notification, log 

981 ) 

982 log.debug("Reporting completed notification") 

983 weak_observation().callback(full_notification) 

984 # FIXME verify that this loop actually ends iff the observation 

985 # was cancelled -- otherwise find out the cause(s) or make it not 

986 # cancel under indistinguishable circumstances 

987 weak_observation().error(error.ObservationCancelled()) 

988 except asyncio.CancelledError: 

989 return 

990 except Exception as e: 

991 weak_observation().error(e) 

992 finally: 

993 # We generally avoid idempotent cancellation, but we may have 

994 # reached this point either due to an earlier cancellation or 

995 # without one 

996 if not lower_observation.cancelled: 

997 lower_observation.cancel() 

998 

999 @classmethod 

1000 async def _complete_by_requesting_block2( 

1001 cls, protocol, request_to_repeat, initial_response, log 

1002 ): 

1003 # FIXME this can probably be deduplicated against BlockwiseRequest 

1004 

1005 if ( 

1006 initial_response.opt.block2 is None 

1007 or initial_response.opt.block2.more is False 

1008 ): 

1009 initial_response.opt.block2 = None 

1010 return initial_response 

1011 

1012 if initial_response.opt.block2.block_number != 0: 

1013 log.error("Error assembling blockwise response (expected first block)") 

1014 raise error.UnexpectedBlock2() 

1015 

1016 assembled_response = initial_response 

1017 last_response = initial_response 

1018 while True: 

1019 current_block2 = request_to_repeat._generate_next_block2_request( 

1020 assembled_response 

1021 ) 

1022 

1023 current_block2 = current_block2.copy(remote=initial_response.remote) 

1024 

1025 blockrequest = protocol.request(current_block2, handle_blockwise=False) 

1026 last_response = await blockrequest.response 

1027 

1028 if last_response.opt.block2 is None: 

1029 log.warning( 

1030 "Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response." 

1031 ) 

1032 return last_response 

1033 

1034 block2 = last_response.opt.block2 

1035 log.debug( 

1036 "Response with Block2 option received, number = %d, more = %d, size_exp = %d.", 

1037 block2.block_number, 

1038 block2.more, 

1039 block2.size_exponent, 

1040 ) 

1041 try: 

1042 assembled_response._append_response_block(last_response) 

1043 except error.Error as e: 

1044 log.error("Error assembling blockwise response, passing on error %r", e) 

1045 raise 

1046 

1047 if block2.more is False: 

1048 return assembled_response 

1049 

1050 

1051class ClientObservation: 

1052 """An interface to observe notification updates arriving on a request. 

1053 

1054 This class does not actually provide any of the observe functionality, it 

1055 is purely a container for dispatching the messages via asynchronous 

1056 iteration. It gets driven (ie. populated with responses or errors including 

1057 observation termination) by a Request object. 

1058 """ 

1059 

1060 def __init__(self): 

1061 self.callbacks = [] 

1062 self.errbacks = [] 

1063 

1064 self.cancelled = False 

1065 self._on_cancel = [] 

1066 

1067 self._latest_response = None 

1068 # the analogous error is stored in _cancellation_reason when cancelled. 

1069 

1070 def __aiter__(self): 

1071 """`async for` interface to observations. 

1072 

1073 This is the preferred interface to obtaining observations.""" 

1074 it = self._Iterator() 

1075 self.register_callback(it.push, _suppress_deprecation=True) 

1076 self.register_errback(it.push_err, _suppress_deprecation=True) 

1077 return it 

1078 

1079 class _Iterator: 

1080 def __init__(self): 

1081 self._future = asyncio.get_event_loop().create_future() 

1082 

1083 def push(self, item): 

1084 if self._future.done(): 

1085 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1086 self._future = asyncio.get_event_loop().create_future() 

1087 self._future.set_result(item) 

1088 

1089 def push_err(self, e): 

1090 if self._future.done(): 

1091 self._future = asyncio.get_event_loop().create_future() 

1092 self._future.set_exception(e) 

1093 

1094 async def __anext__(self): 

1095 f = self._future 

1096 try: 

1097 result = await self._future 

1098 # FIXME see `await servobs._trigger` comment: might waiting for 

1099 # the original future not yield the first future's result when 

1100 # a quick second future comes in in a push? 

1101 if f is self._future: 

1102 self._future = asyncio.get_event_loop().create_future() 

1103 return result 

1104 except (error.NotObservable, error.ObservationCancelled): 

1105 # only exit cleanly when the server -- right away or later -- 

1106 # states that the resource is not observable any more 

1107 # FIXME: check whether an unsuccessful message is still passed 

1108 # as an observation result (or whether it should be) 

1109 raise StopAsyncIteration 

1110 

1111 def __del__(self): 

1112 if self._future.done(): 

1113 try: 

1114 # Fetch the result so any errors show up at least in the 

1115 # finalizer output 

1116 self._future.result() 

1117 except (error.ObservationCancelled, error.NotObservable): 

1118 # This is the case at the end of an observation cancelled 

1119 # by the server. 

1120 pass 

1121 except error.NetworkError: 

1122 # This will already have shown up in the main result too. 

1123 pass 

1124 except (error.LibraryShutdown, asyncio.CancelledError): 

1125 pass 

1126 # Anything else flying out of this is unexpected and probably a 

1127 # library error 

1128 

1129 # When this function is removed, we can finally do cleanup better. Right 

1130 # now, someone could register a callback that doesn't hold any references, 

1131 # so we can't just stop the request when nobody holds a reference to this 

1132 # any more. Once we're all in pull mode, we can make the `process` function 

1133 # that sends data in here use a weak reference (because any possible 

1134 # recipient would need to hold a reference to self or the iterator, and 

1135 # thus _run). 

1136 def register_callback(self, callback, _suppress_deprecation=False): 

1137 """Call the callback whenever a response to the message comes in, and 

1138 pass the response to it. 

1139 

1140 The use of this function is deprecated: Use the asynchronous iteration 

1141 interface instead.""" 

1142 if not _suppress_deprecation: 

1143 warnings.warn( 

1144 "register_callback on observe results is deprected: Use `async for notify in request.observation` instead.", 

1145 DeprecationWarning, 

1146 stacklevel=2, 

1147 ) 

1148 if self.cancelled: 

1149 return 

1150 

1151 self.callbacks.append(callback) 

1152 if self._latest_response is not None: 

1153 callback(self._latest_response) 

1154 

1155 def register_errback(self, callback, _suppress_deprecation=False): 

1156 """Call the callback whenever something goes wrong with the 

1157 observation, and pass an exception to the callback. After such a 

1158 callback is called, no more callbacks will be issued. 

1159 

1160 The use of this function is deprecated: Use the asynchronous iteration 

1161 interface instead.""" 

1162 if not _suppress_deprecation: 

1163 warnings.warn( 

1164 "register_errback on observe results is deprected: Use `async for notify in request.observation` instead.", 

1165 DeprecationWarning, 

1166 stacklevel=2, 

1167 ) 

1168 if self.cancelled: 

1169 callback(self._cancellation_reason) 

1170 return 

1171 self.errbacks.append(callback) 

1172 

1173 def callback(self, response): 

1174 """Notify all listeners of an incoming response""" 

1175 

1176 self._latest_response = response 

1177 

1178 for c in self.callbacks: 

1179 c(response) 

1180 

1181 def error(self, exception): 

1182 """Notify registered listeners that the observation went wrong. This 

1183 can only be called once.""" 

1184 

1185 if self.errbacks is None: 

1186 raise RuntimeError( 

1187 "Error raised in an already cancelled ClientObservation" 

1188 ) from exception 

1189 for c in self.errbacks: 

1190 c(exception) 

1191 

1192 self.cancel() 

1193 self._cancellation_reason = exception 

1194 

1195 def cancel(self): 

1196 # FIXME determine whether this is called by anything other than error, 

1197 # and make it private so there is always a _cancellation_reason 

1198 """Cease to generate observation or error events. This will not 

1199 generate an error by itself. 

1200 

1201 This function is only needed while register_callback and 

1202 register_errback are around; once their deprecations are acted on, 

1203 dropping the asynchronous iterator will automatically cancel the 

1204 observation. 

1205 """ 

1206 

1207 assert not self.cancelled, "ClientObservation cancelled twice" 

1208 

1209 # make sure things go wrong when someone tries to continue this 

1210 self.errbacks = None 

1211 self.callbacks = None 

1212 

1213 self.cancelled = True 

1214 while self._on_cancel: 

1215 self._on_cancel.pop()() 

1216 

1217 self._cancellation_reason = None 

1218 

1219 def on_cancel(self, callback): 

1220 if self.cancelled: 

1221 callback() 

1222 self._on_cancel.append(callback) 

1223 

1224 def __repr__(self): 

1225 return "<%s %s at %#x>" % ( 

1226 type(self).__name__, 

1227 "(cancelled)" 

1228 if self.cancelled 

1229 else "(%s call-, %s errback(s))" 

1230 % (len(self.callbacks), len(self.errbacks)), 

1231 id(self), 

1232 ) 

1233 

1234 

1235class ServerObservation: 

1236 def __init__(self): 

1237 self._accepted = False 

1238 self._trigger = asyncio.get_event_loop().create_future() 

1239 # A deregistration is "early" if it happens before the response message 

1240 # is actually sent; calling deregister() in that time (typically during 

1241 # `render()`) will not send an unsuccessful response message but just 

1242 # sent this flag which is set to None as soon as it is too late for an 

1243 # early deregistration. 

1244 # This mechanism is temporary until more of aiocoap behaves like 

1245 # Pipe which does not suffer from this limitation. 

1246 self._early_deregister = False 

1247 self._late_deregister = False 

1248 

1249 def accept(self, cancellation_callback): 

1250 self._accepted = True 

1251 self._cancellation_callback = cancellation_callback 

1252 

1253 def deregister(self, reason=None): 

1254 if self._early_deregister is False: 

1255 self._early_deregister = True 

1256 return 

1257 

1258 warnings.warn( 

1259 "Late use of ServerObservation.deregister() is" 

1260 " deprecated, use .trigger with an unsuccessful value" 

1261 " instead", 

1262 DeprecationWarning, 

1263 ) 

1264 self.trigger( 

1265 Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable") 

1266 ) 

1267 

1268 def trigger(self, response=None, *, is_last=False): 

1269 """Send an updated response; if None is given, the observed resource's 

1270 rendering will be invoked to produce one. 

1271 

1272 `is_last` can be set to True to indicate that no more responses will be 

1273 sent. Note that an unsuccessful response will be the last no matter 

1274 what is_last says, as such a message always terminates a CoAP 

1275 observation.""" 

1276 if is_last: 

1277 self._late_deregister = True 

1278 if self._trigger.done(): 

1279 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1280 self._trigger = asyncio.get_event_loop().create_future() 

1281 self._trigger.set_result(response)