Coverage for aiocoap / protocol.py: 90%

526 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:32 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module contains the classes that are responsible for keeping track of 

6messages: 

7 

8* :class:`Context` roughly represents the CoAP endpoint (basically a UDP 

9 socket) -- something that can send requests and possibly can answer 

10 incoming requests. 

11 

12 Incoming requests are processed in tasks created by the context. 

13 

14* a :class:`Request` gets generated whenever a request gets sent to keep 

15 track of the response 

16 

17Logging 

18~~~~~~~ 

19 

20Several constructors of the Context accept a logger name; these names go into 

21the construction of a Python logger. 

22 

23Log events will be emitted to these on different levels, with "warning" and 

24above being a practical default for things that should may warrant reviewing by 

25an operator: 

26 

27* DEBUG is used for things that occur even under perfect conditions. 

28* INFO is for things that are well expected, but might be interesting during 

29 testing a network of nodes and not just when debugging the library. (This 

30 includes timeouts, retransmissions, and pings.) 

31* WARNING is for everything that indicates a malbehaved peer. These don't 

32 *necessarily* indicate a client bug, though: Things like requesting a 

33 nonexistent block can just as well happen when a resource's content has 

34 changed between blocks. The library will not go out of its way to determine 

35 whether there is a plausible explanation for the odd behavior, and will 

36 report something as a warning in case of doubt. 

37* ERROR is used when something clearly went wrong. This includes irregular 

38 connection terminations and resource handler errors (which are demoted to 

39 error responses), and can often contain a backtrace. 

40 

41Logs will generally reveal messages exchanged between this and other systems, 

42and attackers can observe their encrypted counterparts. Private or shared keys 

43are only logged through an internal `log_secret` function, which usually 

44replaces them with a redacted value. Setting the ``AIOCOAP_REVEAL_KEYS`` 

45environment variable to the value ``show secrets in logs`` bypasses that 

46mechanism. As an additional precaution, this is only accepted if the effective 

47user has write access to the aiocoap source code. 

48""" 

49 

50import asyncio 

51import weakref 

52import time 

53 

54from .credentials import CredentialsMap 

55from .message import Message 

56from .messagemanager import MessageManager 

57from .tokenmanager import TokenManager 

58from .pipe import Pipe, run_driving_pipe, error_to_message 

59from .util import DeprecationWarning 

60from . import interfaces 

61from . import error 

62from .numbers import INTERNAL_SERVER_ERROR, NOT_FOUND, CONTINUE, SHUTDOWN_TIMEOUT 

63from .config import TransportParameters 

64 

65import warnings 

66import logging 

67 

68 

69class Context(interfaces.RequestProvider): 

70 """Applications' entry point to the network 

71 

72 A :class:`.Context` coordinates one or more network :mod:`.transports` 

73 implementations and dispatches data between them and the application. 

74 

75 The application can start requests using the message dispatch methods, and 

76 set a :class:`resources.Site` that will answer requests directed to the 

77 application as a server. 

78 

79 On the library-internals side, it is the prime implementation of the 

80 :class:`interfaces.RequestProvider` interface, creates :class:`Request` and 

81 :class:`Response` classes on demand, and decides which transport 

82 implementations to start and which are to handle which messages. 

83 

84 **Context creation and destruction** 

85 

86 The following functions are provided for creating and stopping a context: 

87 

88 .. note:: 

89 

90 A typical application should only ever create one context, even (or 

91 especially when) it acts both as a server and as a client (in which 

92 case a server context should be created). 

93 

94 A context that is not used any more must be shut down using 

95 :meth:`.shutdown()`, but typical applications will not need to because 

96 they use the context for the full process lifetime. 

97 

98 .. automethod:: create_client_context 

99 .. automethod:: create_server_context 

100 

101 .. automethod:: shutdown 

102 

103 **Dispatching messages** 

104 

105 CoAP requests can be sent using the following functions: 

106 

107 .. automethod:: request 

108 

109 If more control is needed, you can create a :class:`Request` yourself and 

110 pass the context to it. 

111 

112 

113 **Other methods and properties** 

114 

115 The remaining methods and properties are to be considered unstable even 

116 when the project reaches a stable version number; please file a feature 

117 request for stabilization if you want to reliably access any of them. 

118 """ 

119 

120 def __init__( 

121 self, 

122 loop=None, 

123 serversite=None, 

124 loggername="coap", 

125 client_credentials=None, 

126 server_credentials=None, 

127 ): 

128 self.log = logging.getLogger(loggername) 

129 

130 self.loop = loop or asyncio.get_running_loop() 

131 

132 self.serversite = serversite 

133 

134 self.request_interfaces: list[interfaces.RequestInterface] = [] 

135 

136 self.client_credentials = client_credentials or CredentialsMap() 

137 self.server_credentials = server_credentials or CredentialsMap() 

138 

139 # 

140 # convenience methods for class instantiation 

141 # 

142 

143 async def _append_tokenmanaged_messagemanaged_transport( 

144 self, message_interface_constructor 

145 ): 

146 tman = TokenManager(self) 

147 mman = MessageManager(tman) 

148 transport = await message_interface_constructor(mman) 

149 

150 mman.message_interface = transport 

151 tman.token_interface = mman 

152 

153 self.request_interfaces.append(tman) 

154 

155 async def _append_tokenmanaged_transport(self, token_interface_constructor): 

156 tman = TokenManager(self) 

157 transport = await token_interface_constructor(tman) 

158 

159 tman.token_interface = transport 

160 

161 self.request_interfaces.append(tman) 

162 

163 @classmethod 

164 async def create_client_context( 

165 cls, 

166 *, 

167 loggername="coap", 

168 loop=None, 

169 transports: TransportParameters | None | dict | list[str] = None, 

170 ): 

171 """Create a context bound to all addresses on a random listening port. 

172 

173 This is the easiest way to get a context suitable for sending client 

174 requests. 

175 

176 :param TransportParameters|dict|list|None transports: A Configuration 

177 item that lists which transports are to be configured, and how they are 

178 to be bound. If left unset, a default choice of transports is 

179 initialized. 

180 

181 For ease of use, a `dict` can be passed in, which gets :meth:`.load() 

182 <aiocoap.util.dataclass_data.LoadStoreClass.load>`ed, or a list can 

183 be passed in that merely lists the transports (equivalent to ``{k: {} 

184 for k in transports}``). 

185 

186 :meta private: 

187 (not actually private, just hiding from automodule due to being 

188 grouped with the important functions) 

189 """ 

190 

191 if loop is None: 

192 loop = asyncio.get_running_loop() 

193 

194 self = cls(loop=loop, serversite=None, loggername=loggername) 

195 

196 selected_transports = TransportParameters._compat_create(transports) 

197 if selected_transports.is_server is None: 

198 selected_transports.is_server = False 

199 selected_transports._apply_defaults() 

200 

201 self.log.debug( 

202 "Creating client context from transport configuration %r", 

203 selected_transports, 

204 ) 

205 

206 # FIXME make defaults overridable (postponed until they become configurable too) 

207 if selected_transports.oscore: 

208 from .transports.oscore import TransportOSCORE 

209 

210 oscoretransport = TransportOSCORE(self, self) 

211 self.request_interfaces.append(oscoretransport) 

212 if selected_transports.slipmux: 

213 from .transports.slipmux import MessageInterfaceSlipmux 

214 

215 await self._append_tokenmanaged_messagemanaged_transport( 

216 lambda mman: MessageInterfaceSlipmux.create_transport_endpoint( 

217 selected_transports, 

218 mman, 

219 log=self.log, 

220 loop=self.loop, 

221 ) 

222 ) 

223 if selected_transports.udp6: 

224 from .transports.udp6 import MessageInterfaceUDP6 

225 

226 # This can probably be generalized into something like 

227 # _append_tokenmanaged_messagemanaged_transport, maybe already with 

228 # using an interface rather than a lambda (now that we 

229 # comprehensively pass in all relevant options through the transport parameters). 

230 async for mint in MessageInterfaceUDP6.prepare_transport_endpoints( 

231 params=selected_transports, 

232 log=self.log, 

233 loop=loop, 

234 ): 

235 tman = TokenManager(self) 

236 mman = MessageManager(tman) 

237 mint._ctx = mman 

238 mman.message_interface = mint 

239 tman.token_interface = mman 

240 self.request_interfaces.append(tman) 

241 await mint.start_transport_endpoint() 

242 if selected_transports.simple6: 

243 from .transports.simple6 import MessageInterfaceSimple6 

244 

245 await self._append_tokenmanaged_messagemanaged_transport( 

246 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint( 

247 mman, log=self.log, loop=loop 

248 ) 

249 ) 

250 if selected_transports.tinydtls: 

251 from .transports.tinydtls import MessageInterfaceTinyDTLS 

252 

253 await self._append_tokenmanaged_messagemanaged_transport( 

254 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint( 

255 mman, log=self.log, loop=loop 

256 ) 

257 ) 

258 if selected_transports.tcpclient: 

259 from .transports.tcp import TCPClient 

260 

261 await self._append_tokenmanaged_transport( 

262 lambda tman: TCPClient.create_client_transport(tman, self.log, loop) 

263 ) 

264 if selected_transports.tlsclient: 

265 from .transports.tls import TLSClient 

266 

267 await self._append_tokenmanaged_transport( 

268 lambda tman: TLSClient.create_client_transport( 

269 tman, self.log, loop, self.client_credentials 

270 ) 

271 ) 

272 if selected_transports.ws: 

273 from .transports.ws import WSPool 

274 

275 await self._append_tokenmanaged_transport( 

276 lambda tman: WSPool.create_transport( 

277 tman, self.log, loop, client_credentials=self.client_credentials 

278 ) 

279 ) 

280 

281 return self 

282 

283 @classmethod 

284 async def create_server_context( 

285 cls, 

286 site, 

287 bind=None, 

288 *, 

289 loggername="coap-server", 

290 loop=None, 

291 _ssl_context=None, 

292 multicast=[], 

293 server_credentials=None, 

294 transports: TransportParameters | None | dict | list[str] = None, 

295 ): 

296 """Create a context, bound to all addresses on the CoAP port (unless 

297 otherwise specified in the ``bind`` argument). 

298 

299 This is the easiest way to get a context suitable both for sending 

300 client and accepting server requests. 

301 

302 The ``bind`` argument, if given, needs to be a 2-tuple of IP address 

303 string and port number, where the port number can be None to use the default port. 

304 

305 If ``multicast`` is given, it needs to be a list of (multicast address, 

306 interface name) tuples, which will all be joined. (The IPv4 style of 

307 selecting the interface by a local address is not supported; users may 

308 want to use the netifaces package to arrive at an interface name for an 

309 address). 

310 

311 As a shortcut, the list may also contain interface names alone. Those 

312 will be joined for the 'all CoAP nodes' groups of IPv4 and IPv6 (with 

313 scopes 2 and 5) as well as the respective 'all nodes' groups in IPv6. 

314 

315 Under some circumstances you may already need a context to pass into 

316 the site for creation; this is typically the case for servers that 

317 trigger requests on their own. For those cases, it is usually easiest 

318 to pass None in as a site, and set the fully constructed site later by 

319 assigning to the ``serversite`` attribute. 

320 

321 :param TransportParameters|dict|list|None transports: A Configuration 

322 item that lists which transports are to be configured, and how they are 

323 to be bound. If left unset, a default choice of transports is 

324 initialized. 

325 

326 For ease of use, a `dict` can be passed in, which gets :meth:`.load() 

327 <aiocoap.util.dataclass_data.LoadStoreClass.load>`ed, or a list can 

328 be passed in that merely lists the transports (equivalent to ``{k: {} 

329 for k in transports}``). 

330 

331 :meta private: 

332 (not actually private, just hiding from automodule due to being 

333 grouped with the important functions) 

334 """ 

335 

336 if loop is None: 

337 loop = asyncio.get_running_loop() 

338 

339 self = cls( 

340 loop=loop, 

341 serversite=site, 

342 loggername=loggername, 

343 server_credentials=server_credentials, 

344 ) 

345 

346 multicast_done = not multicast 

347 

348 selected_transports = TransportParameters._compat_create(transports) 

349 if selected_transports.is_server is None: 

350 selected_transports.is_server = True 

351 selected_transports._legacy_bind = bind 

352 selected_transports._legacy_multicast = multicast 

353 selected_transports._apply_defaults() 

354 

355 self.log.debug( 

356 "Creating server context from transport configuration %r", 

357 selected_transports, 

358 ) 

359 

360 if selected_transports.oscore: 

361 from .transports.oscore import TransportOSCORE 

362 

363 oscoretransport = TransportOSCORE(self, self) 

364 self.request_interfaces.append(oscoretransport) 

365 if selected_transports.slipmux: 

366 from .transports.slipmux import MessageInterfaceSlipmux 

367 

368 await self._append_tokenmanaged_messagemanaged_transport( 

369 lambda mman: MessageInterfaceSlipmux.create_transport_endpoint( 

370 selected_transports, 

371 mman, 

372 log=self.log, 

373 loop=self.loop, 

374 ) 

375 ) 

376 # FIXME this is duplicated from the client version, as it uses the transport parameters 

377 if selected_transports.udp6: 

378 from .transports.udp6 import MessageInterfaceUDP6 

379 

380 # This can probably be generalized into something like 

381 # _append_tokenmanaged_messagemanaged_transport, maybe already with 

382 # using an interface rather than a lambda (now that we 

383 # comprehensively pass in all relevant options through the 

384 # transport parameters). 

385 async for mint in MessageInterfaceUDP6.prepare_transport_endpoints( 

386 params=selected_transports, 

387 log=self.log, 

388 loop=loop, 

389 ): 

390 tman = TokenManager(self) 

391 mman = MessageManager(tman) 

392 mint._ctx = mman 

393 mman.message_interface = mint 

394 tman.token_interface = mman 

395 self.request_interfaces.append(tman) 

396 await mint.start_transport_endpoint() 

397 multicast_done = True 

398 # FIXME this is duplicated from the client version, as those are client-only anyway 

399 if selected_transports.simple6: 

400 from .transports.simple6 import MessageInterfaceSimple6 

401 

402 await self._append_tokenmanaged_messagemanaged_transport( 

403 lambda mman: MessageInterfaceSimple6.create_client_transport_endpoint( 

404 mman, log=self.log, loop=loop 

405 ) 

406 ) 

407 elif selected_transports.tinydtls: 

408 from .transports.tinydtls import MessageInterfaceTinyDTLS 

409 

410 await self._append_tokenmanaged_messagemanaged_transport( 

411 lambda mman: MessageInterfaceTinyDTLS.create_client_transport_endpoint( 

412 mman, log=self.log, loop=loop 

413 ) 

414 ) 

415 # FIXME end duplication 

416 if selected_transports.tinydtls_server: 

417 from .transports.tinydtls_server import MessageInterfaceTinyDTLSServer 

418 

419 await self._append_tokenmanaged_messagemanaged_transport( 

420 lambda mman: MessageInterfaceTinyDTLSServer.create_server( 

421 bind, 

422 mman, 

423 log=self.log, 

424 loop=loop, 

425 server_credentials=self.server_credentials, 

426 ) 

427 ) 

428 if selected_transports.simplesocketserver: 

429 from .transports.simplesocketserver import MessageInterfaceSimpleServer 

430 

431 await self._append_tokenmanaged_messagemanaged_transport( 

432 lambda mman: MessageInterfaceSimpleServer.create_server( 

433 bind, mman, log=self.log, loop=loop 

434 ) 

435 ) 

436 if selected_transports.tcpserver: 

437 from .transports.tcp import TCPServer 

438 

439 await self._append_tokenmanaged_transport( 

440 lambda tman: TCPServer.create_server(bind, tman, self.log, loop) 

441 ) 

442 if selected_transports.tcpclient: 

443 from .transports.tcp import TCPClient 

444 

445 await self._append_tokenmanaged_transport( 

446 lambda tman: TCPClient.create_client_transport(tman, self.log, loop) 

447 ) 

448 if selected_transports.tlsserver: 

449 if _ssl_context is not None: 

450 from .transports.tls import TLSServer 

451 

452 await self._append_tokenmanaged_transport( 

453 lambda tman: TLSServer.create_server( 

454 bind, tman, self.log, loop, _ssl_context 

455 ) 

456 ) 

457 else: 

458 # Could also be a warning, but at least as of now, TLS often 

459 # enabled implicitly, and turning this into a warning is 

460 # excessive as long as we don't have a recommentation about how 

461 # users should acknowledge that they don't need TLS anyway. 

462 self.log.info("Not opening TLS server: No server certificates present") 

463 if selected_transports.tlsclient: 

464 from .transports.tls import TLSClient 

465 

466 await self._append_tokenmanaged_transport( 

467 lambda tman: TLSClient.create_client_transport( 

468 tman, self.log, loop, self.client_credentials 

469 ) 

470 ) 

471 if selected_transports.ws: 

472 from .transports.ws import WSPool 

473 

474 await self._append_tokenmanaged_transport( 

475 # None, None: Unlike the other transports this has a server/client generic creator, and only binds if there is some bind 

476 lambda tman: WSPool.create_transport( 

477 tman, 

478 self.log, 

479 loop, 

480 client_credentials=self.client_credentials, 

481 server_bind=bind or (None, None), 

482 server_context=_ssl_context, 

483 ) 

484 ) 

485 

486 if not multicast_done: 

487 self.log.warning( 

488 "Multicast was requested, but no multicast capable transport was selected." 

489 ) 

490 

491 # This is used in tests to wait for externally launched servers to be ready 

492 self.log.debug("Server ready to receive requests") 

493 

494 return self 

495 

496 async def shutdown(self): 

497 """Take down any listening sockets and stop all related timers. 

498 

499 After this coroutine terminates, and once all external references to 

500 the object are dropped, it should be garbage-collectable. 

501 

502 This method takes up to 

503 :const:`aiocoap.numbers.constants.SHUTDOWN_TIMEOUT` seconds, allowing 

504 transports to perform any cleanup implemented in them (such as orderly 

505 connection shutdown and cancelling observations, where the latter is 

506 currently not implemented). 

507 

508 :meta private: 

509 (not actually private, just hiding from automodule due to being 

510 grouped with the important functions) 

511 """ 

512 

513 self.log.debug("Shutting down context") 

514 

515 done, pending = await asyncio.wait( 

516 [ 

517 asyncio.create_task( 

518 ri.shutdown(), 

519 name="Shutdown of %r" % ri, 

520 ) 

521 for ri in self.request_interfaces 

522 ], 

523 timeout=SHUTDOWN_TIMEOUT, 

524 ) 

525 for item in done: 

526 await item 

527 if pending: 

528 # Apart from being useful to see, this also ensures that developers 

529 # see the error in the logs during test suite runs -- and the error 

530 # should be easier to follow than the "we didn't garbage collect 

531 # everything" errors we see anyway (or otherwise, if the error is 

532 # escalated into a test failure) 

533 self.log.error( 

534 "Shutdown timeout exceeded, returning anyway. Interfaces still busy: %s", 

535 pending, 

536 ) 

537 

538 # FIXME: determine how official this should be, or which part of it is 

539 # public -- now that BlockwiseRequest uses it. (And formalize what can 

540 # change about messages and what can't after the remote has been thusly 

541 # populated). 

542 async def find_remote_and_interface(self, message): 

543 if message.remote is None: 

544 raise error.MissingRemoteError() 

545 for ri in self.request_interfaces: 

546 if await ri.recognize_remote(message): 

547 return ri 

548 for ri in self.request_interfaces: 

549 if remote := await ri.determine_remote(message): 

550 message.remote = remote 

551 return ri 

552 raise error.NoRequestInterface() 

553 

554 def request(self, request_message, handle_blockwise=True): 

555 if handle_blockwise: 

556 return BlockwiseRequest(self, request_message) 

557 

558 pipe = Pipe(request_message, self.log) 

559 # Request sets up callbacks at creation 

560 result = Request(pipe, self.loop, self.log) 

561 

562 async def send(): 

563 try: 

564 request_interface = await self.find_remote_and_interface( 

565 request_message 

566 ) 

567 request_interface.request(pipe) 

568 except Exception as e: 

569 pipe.add_exception(e) 

570 return 

571 

572 self.loop.create_task( 

573 send(), 

574 name="Request processing of %r" % result, 

575 ) 

576 return result 

577 

578 # the following are under consideration for moving into Site or something 

579 # mixed into it 

580 

581 def render_to_pipe(self, pipe): 

582 """Fill a pipe by running the site's render_to_pipe interface and 

583 handling errors.""" 

584 

585 pr_that_can_receive_errors = error_to_message(pipe, self.log) 

586 

587 run_driving_pipe( 

588 pr_that_can_receive_errors, 

589 self._render_to_pipe(pipe), 

590 name="Rendering for %r" % pipe.request, 

591 ) 

592 

593 async def _render_to_pipe(self, pipe): 

594 if self.serversite is None: 

595 pipe.add_response( 

596 Message(code=NOT_FOUND, payload=b"not a server"), is_last=True 

597 ) 

598 return 

599 

600 return await self.serversite.render_to_pipe(pipe) 

601 

602 

603class BaseRequest: 

604 """Common mechanisms of :class:`Request` and :class:`MulticastRequest`""" 

605 

606 

607class BaseUnicastRequest(BaseRequest): 

608 """A utility class that offers the :attr:`response_raising` and 

609 :attr:`response_nonraising` alternatives to waiting for the 

610 :attr:`response` future whose error states can be presented either as an 

611 unsuccessful response (eg. 4.04) or an exception. 

612 

613 It also provides some internal tools for handling anything that has a 

614 :attr:`response` future and an :attr:`observation`""" 

615 

616 @property 

617 async def response_raising(self): 

618 """An awaitable that returns if a response comes in and is successful, 

619 otherwise raises generic network exception or a 

620 :class:`.error.ResponseWrappingError` for unsuccessful responses. 

621 

622 Experimental Interface.""" 

623 

624 response = await self.response 

625 if not response.code.is_successful(): 

626 raise error.ResponseWrappingError(response) 

627 

628 return response 

629 

630 @property 

631 async def response_nonraising(self): 

632 """An awaitable that rather returns a 500ish fabricated message (as a 

633 proxy would return) instead of raising an exception. 

634 

635 Experimental Interface.""" 

636 

637 # FIXME: Can we smuggle error_to_message into the underlying pipe? 

638 # That should make observe notifications into messages rather 

639 # than exceptions as well, plus it has fallbacks for `e.to_message()` 

640 # raising. 

641 

642 try: 

643 return await self.response 

644 except error.RenderableError as e: 

645 return e.to_message() 

646 except Exception: 

647 return Message(code=INTERNAL_SERVER_ERROR) 

648 

649 

650class Request(interfaces.Request, BaseUnicastRequest): 

651 def __init__(self, pipe, loop, log): 

652 self._pipe = pipe 

653 

654 self.response = loop.create_future() 

655 

656 if pipe.request.opt.observe == 0: 

657 self.observation = ClientObservation() 

658 else: 

659 self.observation = None 

660 

661 self._runner = self._run() 

662 self._runner.send(None) 

663 

664 def process(event): 

665 try: 

666 # would be great to have self or the runner as weak ref, but 

667 # see ClientObservation.register_callback comments -- while 

668 # that is around, we can't weakref here. 

669 self._runner.send(event) 

670 return True 

671 except StopIteration: 

672 return False 

673 

674 self._stop_interest = self._pipe.on_event(process) 

675 

676 self.log = log 

677 

678 self.response.add_done_callback(self._response_cancellation_handler) 

679 

680 def _response_cancellation_handler(self, response): 

681 # Propagate cancellation to the runner (if interest in the first 

682 # response is lost, there won't be observation items to pull out), but 

683 # not general completion (because if it's completed and not cancelled, 

684 # eg. when an observation is active) 

685 if self.response.cancelled() and self._runner is not None: 

686 # Dropping the only reference makes it stop with GeneratorExit, 

687 # similar to a cancelled task 

688 self._runner = None 

689 self._stop_interest() 

690 # Otherwise, there will be a runner still around, and it's its task to 

691 # call _stop_interest. 

692 

693 @staticmethod 

694 def _add_response_properties(response, request): 

695 response.request = request 

696 

697 def _run(self): 

698 # FIXME: This is in iterator form because it used to be a task that 

699 # awaited futures, and that code could be easily converted to an 

700 # iterator. I'm not sure that's a bad state here, but at least it 

701 # should be a more conscious decision to make this an iterator rather 

702 # than just having it happen to be one. 

703 # 

704 # FIXME: check that responses come from the same remmote as long as we're assuming unicast 

705 

706 first_event = yield None 

707 

708 if first_event.message is not None: 

709 self._add_response_properties(first_event.message, self._pipe.request) 

710 self.response.set_result(first_event.message) 

711 else: 

712 self.response.set_exception(first_event.exception) 

713 if not isinstance(first_event.exception, error.Error): 

714 self.log.warning( 

715 "An exception that is not an aiocoap Error was raised " 

716 "from a transport; please report this as a bug in " 

717 "aiocoap: %r", 

718 first_event.exception, 

719 ) 

720 

721 if self.observation is None: 

722 if not first_event.is_last: 

723 self.log.error( 

724 "Pipe indicated more possible responses" 

725 " while the Request handler would not know what to" 

726 " do with them, stopping any further request." 

727 ) 

728 self._stop_interest() 

729 return 

730 

731 if first_event.is_last: 

732 self.observation.error(error.NotObservable()) 

733 return 

734 

735 if first_event.message.opt.observe is None: 

736 self.log.error( 

737 "Pipe indicated more possible responses" 

738 " while the Request handler would not know what to" 

739 " do with them, stopping any further request." 

740 ) 

741 self._stop_interest() 

742 return 

743 

744 # variable names from RFC7641 Section 3.4 

745 v1 = first_event.message.opt.observe 

746 t1 = time.time() 

747 

748 while True: 

749 # We don't really support cancellation of observations yet (see 

750 # https://github.com/chrysn/aiocoap/issues/92), but at least 

751 # stopping the interest is a way to free the local resources after 

752 # the first observation update, and to make the MID handler RST the 

753 # observation on the next. 

754 # FIXME: there *is* now a .on_cancel callback, we should at least 

755 # hook into that, and possibly even send a proper cancellation 

756 # then. 

757 next_event = yield True 

758 if self.observation.cancelled: 

759 self._stop_interest() 

760 return 

761 

762 if next_event.exception is not None: 

763 self.observation.error(next_event.exception) 

764 if not next_event.is_last: 

765 self._stop_interest() 

766 if not isinstance(next_event.exception, error.Error): 

767 self.log.warning( 

768 "An exception that is not an aiocoap Error was " 

769 "raised from a transport during an observation; " 

770 "please report this as a bug in aiocoap: %r", 

771 next_event.exception, 

772 ) 

773 return 

774 

775 self._add_response_properties(next_event.message, self._pipe.request) 

776 

777 if next_event.message.opt.observe is not None: 

778 # check for reordering 

779 v2 = next_event.message.opt.observe 

780 t2 = time.time() 

781 

782 is_recent = ( 

783 (v1 < v2 and v2 - v1 < 2**23) 

784 or (v1 > v2 and v1 - v2 > 2**23) 

785 or ( 

786 t2 

787 > t1 

788 + self._pipe.request.transport_tuning.OBSERVATION_RESET_TIME 

789 ) 

790 ) 

791 if is_recent: 

792 t1 = t2 

793 v1 = v2 

794 else: 

795 # the terminal message is always the last 

796 is_recent = True 

797 

798 if is_recent: 

799 self.observation.callback(next_event.message) 

800 

801 if next_event.is_last: 

802 self.observation.error(error.ObservationCancelled()) 

803 return 

804 

805 if next_event.message.opt.observe is None: 

806 self.observation.error(error.ObservationCancelled()) 

807 self.log.error( 

808 "Pipe indicated more possible responses" 

809 " while the Request handler would not know what to" 

810 " do with them, stopping any further request." 

811 ) 

812 self._stop_interest() 

813 return 

814 

815 

816class BlockwiseRequest(BaseUnicastRequest, interfaces.Request): 

817 def __init__(self, protocol, app_request): 

818 self.protocol = protocol 

819 self.log = self.protocol.log.getChild("blockwise-requester") 

820 

821 self.response = protocol.loop.create_future() 

822 

823 if app_request.opt.observe is not None: 

824 self.observation = ClientObservation() 

825 else: 

826 self.observation = None 

827 

828 self._runner = protocol.loop.create_task( 

829 self._run_outer( 

830 app_request, 

831 self.response, 

832 weakref.ref(self.observation) 

833 if self.observation is not None 

834 else lambda: None, 

835 self.protocol, 

836 self.log, 

837 ), 

838 name="Blockwise runner for %r" % app_request, 

839 ) 

840 self.response.add_done_callback(self._response_cancellation_handler) 

841 

842 def _response_cancellation_handler(self, response_future): 

843 # see Request._response_cancellation_handler 

844 if self.response.cancelled(): 

845 self._runner.cancel() 

846 

847 @classmethod 

848 async def _run_outer(cls, app_request, response, weak_observation, protocol, log): 

849 try: 

850 await cls._run(app_request, response, weak_observation, protocol, log) 

851 except asyncio.CancelledError: 

852 pass # results already set 

853 except Exception as e: 

854 logged = False 

855 if not response.done(): 

856 logged = True 

857 response.set_exception(e) 

858 obs = weak_observation() 

859 if app_request.opt.observe is not None and obs is not None: 

860 logged = True 

861 obs.error(e) 

862 if not logged: 

863 # should be unreachable 

864 log.error( 

865 "Exception in BlockwiseRequest runner neither went to response nor to observation: %s", 

866 e, 

867 exc_info=e, 

868 ) 

869 

870 # This is a class method because that allows self and self.observation to 

871 # be freed even when this task is running, and the task to stop itself -- 

872 # otherwise we couldn't know when users just "forget" about a request 

873 # object after using its response (esp. in observe cases) and leave this 

874 # task running. 

875 @classmethod 

876 async def _run(cls, app_request, response, weak_observation, protocol, log): 

877 # we need to populate the remote right away, because the choice of 

878 # blocks depends on it. 

879 await protocol.find_remote_and_interface(app_request) 

880 

881 size_exp = app_request.remote.maximum_block_size_exp 

882 

883 if app_request.opt.block1 is not None: 

884 warnings.warn( 

885 "Setting a block1 option in a managed block-wise transfer is deprecated. Instead, set request.remote.maximum_block_size_exp to the desired value", 

886 DeprecationWarning, 

887 stacklevel=2, 

888 ) 

889 assert app_request.opt.block1.block_number == 0, ( 

890 "Unexpected block number in app_request" 

891 ) 

892 assert not app_request.opt.block1.more, ( 

893 "Unexpected more-flag in app_request" 

894 ) 

895 # this is where the library user can traditionally pass in size 

896 # exponent hints into the library. 

897 size_exp = app_request.opt.block1.size_exponent 

898 

899 # Offset in the message in blocks of size_exp. Whoever changes size_exp 

900 # is responsible for updating this number. 

901 block_cursor = 0 

902 

903 while True: 

904 # ... send a chunk 

905 

906 if size_exp >= 6: 

907 # FIXME from maximum_payload_size 

908 fragmentation_threshold = app_request.remote.maximum_payload_size 

909 else: 

910 fragmentation_threshold = 2 ** (size_exp + 4) 

911 

912 if ( 

913 app_request.opt.block1 is not None 

914 or len(app_request.payload) > fragmentation_threshold 

915 ): 

916 current_block1 = app_request._extract_block( 

917 block_cursor, size_exp, app_request.remote.maximum_payload_size 

918 ) 

919 if block_cursor == 0: 

920 current_block1.opt.size1 = len(app_request.payload) 

921 else: 

922 current_block1 = app_request 

923 

924 blockrequest = protocol.request(current_block1, handle_blockwise=False) 

925 blockresponse = await blockrequest.response 

926 

927 # store for future blocks to ensure that the next blocks will be 

928 # sent from the same source address (in the UDP case; for many 

929 # other transports it won't matter). carrying along locally set block size limitation 

930 if ( 

931 app_request.remote.maximum_block_size_exp 

932 < blockresponse.remote.maximum_block_size_exp 

933 ): 

934 blockresponse.remote.maximum_block_size_exp = ( 

935 app_request.remote.maximum_block_size_exp 

936 ) 

937 app_request.remote = blockresponse.remote 

938 

939 if blockresponse.opt.block1 is None: 

940 if blockresponse.code.is_successful() and current_block1.opt.block1: 

941 log.warning( 

942 "Block1 option completely ignored by server, assuming it knows what it is doing." 

943 ) 

944 # FIXME: handle 4.13 and retry with the indicated size option 

945 break 

946 

947 block1 = blockresponse.opt.block1 

948 log.debug( 

949 "Response with Block1 option received, number = %d, more = %d, size_exp = %d.", 

950 block1.block_number, 

951 block1.more, 

952 block1.size_exponent, 

953 ) 

954 

955 if block1.block_number != current_block1.opt.block1.block_number: 

956 raise error.UnexpectedBlock1Option("Block number mismatch") 

957 

958 if size_exp == 7: 

959 block_cursor += len(current_block1.payload) // 1024 

960 else: 

961 block_cursor += 1 

962 

963 while block1.size_exponent < size_exp: 

964 block_cursor *= 2 

965 size_exp -= 1 

966 

967 if not current_block1.opt.block1.more: 

968 if block1.more or blockresponse.code == CONTINUE: 

969 # treating this as a protocol error -- letting it slip 

970 # through would misrepresent the whole operation as an 

971 # over-all 2.xx (successful) one. 

972 raise error.UnexpectedBlock1Option( 

973 "Server asked for more data at end of body" 

974 ) 

975 break 

976 

977 # checks before preparing the next round: 

978 

979 if blockresponse.opt.observe: 

980 # we're not *really* interested in that block, we just sent an 

981 # observe option to indicate that we'll want to observe the 

982 # resulting representation as a whole 

983 log.warning( 

984 "Server answered Observe in early Block1 phase, cancelling the erroneous observation." 

985 ) 

986 blockrequest.observe.cancel() 

987 

988 if block1.more: 

989 # FIXME i think my own server is dowing this wrong 

990 # if response.code != CONTINUE: 

991 # raise error.UnexpectedBlock1Option("more-flag set but no Continue") 

992 pass 

993 else: 

994 if not blockresponse.code.is_successful(): 

995 break 

996 else: 

997 # ignoring (discarding) the successful intermediate result, waiting for a final one 

998 continue 

999 

1000 lower_observation = None 

1001 if app_request.opt.observe is not None: 

1002 if blockresponse.opt.observe is not None: 

1003 lower_observation = blockrequest.observation 

1004 else: 

1005 obs = weak_observation() 

1006 if obs: 

1007 obs.error(error.NotObservable()) 

1008 del obs 

1009 

1010 assert blockresponse is not None, "Block1 loop broke without setting a response" 

1011 blockresponse.opt.block1 = None 

1012 

1013 # FIXME check with RFC7959: it just says "send requests similar to the 

1014 # requests in the Block1 phase", what does that mean? using the last 

1015 # block1 as a reference for now, especially because in the 

1016 # only-one-request-block case, that's the original request we must send 

1017 # again and again anyway 

1018 assembled_response = await cls._complete_by_requesting_block2( 

1019 protocol, current_block1, blockresponse, log 

1020 ) 

1021 

1022 response.set_result(assembled_response) 

1023 # finally set the result 

1024 

1025 if lower_observation is not None: 

1026 # FIXME this can all be simplified a lot since it's no more 

1027 # expected that observations shut themselves down when GC'd. 

1028 obs = weak_observation() 

1029 del weak_observation 

1030 if obs is None: 

1031 lower_observation.cancel() 

1032 return 

1033 future_weak_observation = protocol.loop.create_future() # packing this up because its destroy callback needs to reference the subtask 

1034 subtask = asyncio.create_task( 

1035 cls._run_observation( 

1036 app_request, 

1037 lower_observation, 

1038 future_weak_observation, 

1039 protocol, 

1040 log, 

1041 ), 

1042 name="Blockwise observation for %r" % app_request, 

1043 ) 

1044 future_weak_observation.set_result( 

1045 weakref.ref(obs, lambda obs: subtask.cancel()) 

1046 ) 

1047 obs.on_cancel(subtask.cancel) 

1048 del obs 

1049 await subtask 

1050 

1051 @classmethod 

1052 async def _run_observation( 

1053 cls, original_request, lower_observation, future_weak_observation, protocol, log 

1054 ): 

1055 weak_observation = await future_weak_observation 

1056 # we can use weak_observation() here at any time, because whenever that 

1057 # becomes None, this task gets cancelled 

1058 try: 

1059 async for block1_notification in lower_observation: 

1060 log.debug("Notification received") 

1061 full_notification = await cls._complete_by_requesting_block2( 

1062 protocol, original_request, block1_notification, log 

1063 ) 

1064 log.debug("Reporting completed notification") 

1065 weak_observation().callback(full_notification) 

1066 # FIXME verify that this loop actually ends iff the observation 

1067 # was cancelled -- otherwise find out the cause(s) or make it not 

1068 # cancel under indistinguishable circumstances 

1069 weak_observation().error(error.ObservationCancelled()) 

1070 except asyncio.CancelledError: 

1071 return 

1072 except Exception as e: 

1073 weak_observation().error(e) 

1074 finally: 

1075 # We generally avoid idempotent cancellation, but we may have 

1076 # reached this point either due to an earlier cancellation or 

1077 # without one 

1078 if not lower_observation.cancelled: 

1079 lower_observation.cancel() 

1080 

1081 @classmethod 

1082 async def _complete_by_requesting_block2( 

1083 cls, protocol, request_to_repeat, initial_response, log 

1084 ): 

1085 # FIXME this can probably be deduplicated against BlockwiseRequest 

1086 

1087 if ( 

1088 initial_response.opt.block2 is None 

1089 or initial_response.opt.block2.more is False 

1090 ): 

1091 initial_response.opt.block2 = None 

1092 return initial_response 

1093 

1094 if initial_response.opt.block2.block_number != 0: 

1095 log.error("Error assembling blockwise response (expected first block)") 

1096 raise error.UnexpectedBlock2() 

1097 

1098 assembled_response = initial_response 

1099 last_response = initial_response 

1100 while True: 

1101 current_block2 = request_to_repeat._generate_next_block2_request( 

1102 assembled_response 

1103 ) 

1104 

1105 current_block2 = current_block2.copy(remote=initial_response.remote) 

1106 

1107 blockrequest = protocol.request(current_block2, handle_blockwise=False) 

1108 last_response = await blockrequest.response 

1109 

1110 if last_response.opt.block2 is None: 

1111 log.warning( 

1112 "Server sent non-blockwise response after having started a blockwise transfer. Blockwise transfer cancelled, accepting single response." 

1113 ) 

1114 return last_response 

1115 

1116 block2 = last_response.opt.block2 

1117 log.debug( 

1118 "Response with Block2 option received, number = %d, more = %d, size_exp = %d.", 

1119 block2.block_number, 

1120 block2.more, 

1121 block2.size_exponent, 

1122 ) 

1123 try: 

1124 assembled_response._append_response_block(last_response) 

1125 except error.Error as e: 

1126 log.error("Error assembling blockwise response, passing on error %r", e) 

1127 raise 

1128 

1129 if block2.more is False: 

1130 return assembled_response 

1131 

1132 

1133class ClientObservation: 

1134 """An interface to observe notification updates arriving on a request. 

1135 

1136 This class does not actually provide any of the observe functionality, it 

1137 is purely a container for dispatching the messages via asynchronous 

1138 iteration. It gets driven (ie. populated with responses or errors including 

1139 observation termination) by a Request object. 

1140 """ 

1141 

1142 def __init__(self): 

1143 self.callbacks = [] 

1144 self.errbacks = [] 

1145 

1146 self.cancelled = False 

1147 self._on_cancel = [] 

1148 

1149 self._latest_response = None 

1150 # the analogous error is stored in _cancellation_reason when cancelled. 

1151 

1152 def __aiter__(self): 

1153 """`async for` interface to observations. 

1154 

1155 This is the preferred interface to obtaining observations.""" 

1156 it = self._Iterator() 

1157 self.register_callback(it.push, _suppress_deprecation=True) 

1158 self.register_errback(it.push_err, _suppress_deprecation=True) 

1159 return it 

1160 

1161 class _Iterator: 

1162 def __init__(self): 

1163 self._future = asyncio.get_running_loop().create_future() 

1164 

1165 def push(self, item): 

1166 if self._future.done(): 

1167 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1168 self._future = asyncio.get_running_loop().create_future() 

1169 self._future.set_result(item) 

1170 

1171 def push_err(self, e): 

1172 if self._future.done(): 

1173 self._future = asyncio.get_running_loop().create_future() 

1174 self._future.set_exception(e) 

1175 

1176 async def __anext__(self): 

1177 f = self._future 

1178 try: 

1179 result = await self._future 

1180 # FIXME see `await servobs._trigger` comment: might waiting for 

1181 # the original future not yield the first future's result when 

1182 # a quick second future comes in in a push? 

1183 if f is self._future: 

1184 self._future = asyncio.get_running_loop().create_future() 

1185 return result 

1186 except (error.NotObservable, error.ObservationCancelled): 

1187 # only exit cleanly when the server -- right away or later -- 

1188 # states that the resource is not observable any more 

1189 # FIXME: check whether an unsuccessful message is still passed 

1190 # as an observation result (or whether it should be) 

1191 raise StopAsyncIteration 

1192 

1193 def __del__(self): 

1194 if self._future.done(): 

1195 try: 

1196 # Fetch the result so any errors show up at least in the 

1197 # finalizer output 

1198 self._future.result() 

1199 except (error.ObservationCancelled, error.NotObservable): 

1200 # This is the case at the end of an observation cancelled 

1201 # by the server. 

1202 pass 

1203 except error.NetworkError: 

1204 # This will already have shown up in the main result too. 

1205 pass 

1206 except (error.LibraryShutdown, asyncio.CancelledError): 

1207 pass 

1208 # Anything else flying out of this is unexpected and probably a 

1209 # library error 

1210 

1211 # When this function is removed, we can finally do cleanup better. Right 

1212 # now, someone could register a callback that doesn't hold any references, 

1213 # so we can't just stop the request when nobody holds a reference to this 

1214 # any more. Once we're all in pull mode, we can make the `process` function 

1215 # that sends data in here use a weak reference (because any possible 

1216 # recipient would need to hold a reference to self or the iterator, and 

1217 # thus _run). 

1218 def register_callback(self, callback, _suppress_deprecation=False): 

1219 """Call the callback whenever a response to the message comes in, and 

1220 pass the response to it. 

1221 

1222 The use of this function is deprecated: Use the asynchronous iteration 

1223 interface instead.""" 

1224 if not _suppress_deprecation: 

1225 warnings.warn( 

1226 "register_callback on observe results is deprecated: Use `async for notify in request.observation` instead.", 

1227 DeprecationWarning, 

1228 stacklevel=2, 

1229 ) 

1230 if self.cancelled: 

1231 return 

1232 

1233 self.callbacks.append(callback) 

1234 if self._latest_response is not None: 

1235 callback(self._latest_response) 

1236 

1237 def register_errback(self, callback, _suppress_deprecation=False): 

1238 """Call the callback whenever something goes wrong with the 

1239 observation, and pass an exception to the callback. After such a 

1240 callback is called, no more callbacks will be issued. 

1241 

1242 The use of this function is deprecated: Use the asynchronous iteration 

1243 interface instead.""" 

1244 if not _suppress_deprecation: 

1245 warnings.warn( 

1246 "register_errback on observe results is deprecated: Use `async for notify in request.observation` instead.", 

1247 DeprecationWarning, 

1248 stacklevel=2, 

1249 ) 

1250 if self.cancelled: 

1251 callback(self._cancellation_reason) 

1252 return 

1253 self.errbacks.append(callback) 

1254 

1255 def callback(self, response): 

1256 """Notify all listeners of an incoming response""" 

1257 

1258 self._latest_response = response 

1259 

1260 for c in self.callbacks: 

1261 c(response) 

1262 

1263 def error(self, exception): 

1264 """Notify registered listeners that the observation went wrong. This 

1265 can only be called once.""" 

1266 

1267 if self.errbacks is None: 

1268 raise RuntimeError( 

1269 "Error raised in an already cancelled ClientObservation" 

1270 ) from exception 

1271 for c in self.errbacks: 

1272 c(exception) 

1273 

1274 self.cancel() 

1275 self._cancellation_reason = exception 

1276 

1277 def cancel(self): 

1278 # FIXME determine whether this is called by anything other than error, 

1279 # and make it private so there is always a _cancellation_reason 

1280 """Cease to generate observation or error events. This will not 

1281 generate an error by itself. 

1282 

1283 This function is only needed while register_callback and 

1284 register_errback are around; once their deprecations are acted on, 

1285 dropping the asynchronous iterator will automatically cancel the 

1286 observation. 

1287 """ 

1288 

1289 assert not self.cancelled, "ClientObservation cancelled twice" 

1290 

1291 # make sure things go wrong when someone tries to continue this 

1292 self.errbacks = None 

1293 self.callbacks = None 

1294 

1295 self.cancelled = True 

1296 while self._on_cancel: 

1297 self._on_cancel.pop()() 

1298 

1299 self._cancellation_reason = None 

1300 

1301 def on_cancel(self, callback): 

1302 if self.cancelled: 

1303 callback() 

1304 self._on_cancel.append(callback) 

1305 

1306 def __repr__(self): 

1307 return "<%s %s at %#x>" % ( 

1308 type(self).__name__, 

1309 "(cancelled)" 

1310 if self.cancelled 

1311 else "(%s call-, %s errback(s))" 

1312 % (len(self.callbacks), len(self.errbacks)), 

1313 id(self), 

1314 ) 

1315 

1316 

1317class ServerObservation: 

1318 def __init__(self): 

1319 self._accepted = False 

1320 self._trigger = asyncio.get_running_loop().create_future() 

1321 # A deregistration is "early" if it happens before the response message 

1322 # is actually sent; calling deregister() in that time (typically during 

1323 # `render()`) will not send an unsuccessful response message but just 

1324 # sent this flag which is set to None as soon as it is too late for an 

1325 # early deregistration. 

1326 # This mechanism is temporary until more of aiocoap behaves like 

1327 # Pipe which does not suffer from this limitation. 

1328 self._early_deregister = False 

1329 self._late_deregister = False 

1330 

1331 def accept(self, cancellation_callback): 

1332 self._accepted = True 

1333 self._cancellation_callback = cancellation_callback 

1334 

1335 def deregister(self, reason=None): 

1336 if self._early_deregister is False: 

1337 self._early_deregister = True 

1338 return 

1339 

1340 warnings.warn( 

1341 "Late use of ServerObservation.deregister() is" 

1342 " deprecated, use .trigger with an unsuccessful value" 

1343 " instead", 

1344 DeprecationWarning, 

1345 ) 

1346 self.trigger( 

1347 Message(code=INTERNAL_SERVER_ERROR, payload=b"Resource became unobservable") 

1348 ) 

1349 

1350 def trigger(self, response=None, *, is_last=False): 

1351 """Send an updated response; if None is given, the observed resource's 

1352 rendering will be invoked to produce one. 

1353 

1354 `is_last` can be set to True to indicate that no more responses will be 

1355 sent. Note that an unsuccessful response will be the last no matter 

1356 what is_last says, as such a message always terminates a CoAP 

1357 observation.""" 

1358 if is_last: 

1359 self._late_deregister = True 

1360 if self._trigger.done(): 

1361 # we don't care whether we overwrite anything, this is a lossy queue as observe is lossy 

1362 self._trigger = asyncio.get_running_loop().create_future() 

1363 self._trigger.set_result(response)