Coverage for aiocoap / interfaces.py: 84%

165 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:32 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module provides interface base classes to various aiocoap software 

6components, especially with respect to request and response handling. It 

7describes `abstract base classes`_ for messages, endpoints etc. 

8 

9It is *completely unrelated* to the concept of "network interfaces". 

10 

11.. _`abstract base classes`: https://docs.python.org/3/library/abc""" 

12 

13from __future__ import annotations 

14 

15import abc 

16import asyncio 

17import warnings 

18 

19from aiocoap.pipe import Pipe 

20from aiocoap.numbers.constants import MAX_REGULAR_BLOCK_SIZE_EXP 

21from .util import DeprecationWarning 

22 

23from typing import Optional, Callable 

24 

25 

26class MessageInterface(metaclass=abc.ABCMeta): 

27 """A MessageInterface is an object that can exchange addressed messages over 

28 unreliable transports. Implementations send and receive messages with 

29 message type and message ID, and are driven by a Context that deals with 

30 retransmission. 

31 

32 Usually, an MessageInterface refers to something like a local socket, and 

33 send messages to different remote endpoints depending on the message's 

34 addresses. Just as well, a MessageInterface can be useful for one single 

35 address only, or use various local addresses depending on the remote 

36 address. 

37 """ 

38 

39 @abc.abstractmethod 

40 async def shutdown(self): 

41 """Deactivate the complete transport, usually irrevertably. When the 

42 coroutine returns, the object must have made sure that it can be 

43 destructed by means of ref-counting or a garbage collector run.""" 

44 

45 @abc.abstractmethod 

46 def send(self, message): 

47 """Send a given :class:`Message` object""" 

48 

49 @abc.abstractmethod 

50 async def recognize_remote(self, remote): 

51 """Return True if the remote is one belonging to this transport""" 

52 

53 @abc.abstractmethod 

54 async def determine_remote(self, message): 

55 """Return a value suitable for the message's remote property based on 

56 its .opt.uri_host or .unresolved_remote. 

57 

58 May return None, which indicates that the MessageInterface can not 

59 transport the message (typically because it is of the wrong scheme).""" 

60 

61 

62class EndpointAddress(metaclass=abc.ABCMeta): 

63 """An address that is suitable for routing through the application to a 

64 remote endpoint. 

65 

66 Depending on the MessageInterface implementation used, an EndpointAddress 

67 property of a message can mean the message is exchanged "with 

68 [2001:db8::2:1]:5683, while my local address was [2001:db8:1::1]:5683" 

69 (typical of UDP6), "over the connected <Socket at 

70 0x1234>, wherever that's connected to" (simple6 or TCP) or "with 

71 participant 0x01 of the OSCAP key 0x..., routed over <another 

72 EndpointAddress>". 

73 

74 EndpointAddresses are only constructed by MessageInterface objects, 

75 either for incoming messages or when populating a message's .remote in 

76 :meth:`MessageInterface.determine_remote`. 

77 

78 There is no requirement that those address are always identical for a given 

79 address. However, incoming addresses must be hashable and hash-compare 

80 identically to requests from the same context. The "same context", for the 

81 purpose of EndpointAddresses, means that the message must be eligible for 

82 request/response, blockwise (de)composition and observations. (For example, 

83 in a DTLS context, the hash must change between epochs due to RFC7252 

84 Section 9.1.2). 

85 

86 So far, it is required that hash-identical objects also compare the same. 

87 That requirement might go away in future to allow equality to reflect finer 

88 details that are not hashed. (The only property that is currently known not 

89 to be hashed is the local address in UDP6, because that is *unknown* in 

90 initially sent packages, and thus disregarded for comparison but needed to 

91 round-trip through responses.) 

92 """ 

93 

94 @property 

95 @abc.abstractmethod 

96 def hostinfo(self): 

97 """The authority component of URIs that this endpoint represents when 

98 request are sent to it 

99 

100 Note that the presence of a hostinfo does not necessarily mean that 

101 globally meaningful or even syntactically valid URI can be constructed 

102 out of it; use the :attr:`.uri` property for this.""" 

103 

104 @property 

105 @abc.abstractmethod 

106 def hostinfo_local(self): 

107 """The authority component of URIs that this endpoint represents when 

108 requests are sent from it. 

109 

110 As with :attr:`.hostinfo`, this does not necessarily produce sufficient 

111 input for a URI; use :attr:`.uri_local` instead.""" 

112 

113 @property 

114 def uri(self): 

115 """Deprecated alias for uri_base""" 

116 return self.uri_base 

117 

118 @property 

119 @abc.abstractmethod 

120 def uri_base(self): 

121 """The base URI for the peer (typically scheme plus .hostinfo). 

122 

123 This raises :class:`.error.AnonymousHost` when executed on an address 

124 whose peer coordinates can not be expressed meaningfully in a URI.""" 

125 

126 @property 

127 @abc.abstractmethod 

128 def uri_base_local(self): 

129 """The base URI for the local side of this remote. 

130 

131 This raises :class:`.error.AnonymousHost` when executed on an address 

132 whose local coordinates can not be expressed meaningfully in a URI.""" 

133 

134 @property 

135 @abc.abstractmethod 

136 def is_multicast(self): 

137 """True if the remote address is a multicast address, otherwise false.""" 

138 

139 @property 

140 @abc.abstractmethod 

141 def is_multicast_locally(self): 

142 """True if the local address is a multicast address, otherwise false.""" 

143 

144 @property 

145 @abc.abstractmethod 

146 def scheme(Self): 

147 """The that is used with addresses of this kind 

148 

149 This is usually a class property. It is applicable to both sides of the 

150 communication. (Should there ever be a scheme that addresses the 

151 participants differently, a scheme_local will be added.)""" 

152 

153 @property 

154 def maximum_block_size_exp(self) -> int: 

155 """The maximum negotiated block size that can be sent to this remote.""" 

156 return MAX_REGULAR_BLOCK_SIZE_EXP 

157 

158 # Giving some slack so that barely-larger messages (like OSCORE typically 

159 # are) don't get fragmented -- but still for migration to maximum message 

160 # size so we don't have to guess any more how much may be option and how 

161 # much payload 

162 @property 

163 def maximum_payload_size(self) -> int: 

164 """The maximum payload size that can be sent to this remote. Only relevant 

165 if maximum_block_size_exp is 7. This will be removed in favor of a maximum 

166 message size when the block handlers can get serialization length 

167 predictions from the remote.""" 

168 return 1124 

169 

170 def as_response_address(self): 

171 """Address to be assigned to a response to messages that arrived with 

172 this message 

173 

174 This can (and does, by default) return self, but gives the protocol the 

175 opportunity to react to create a modified copy to deal with variations 

176 from multicast. 

177 """ 

178 return self 

179 

180 @property 

181 def authenticated_claims(self): 

182 """Iterable of objects representing any claims (e.g. an identity, or 

183 generally objects that can be used to authorize particular accesses) 

184 that were authenticated for this remote. 

185 

186 This is experimental and may be changed without notice. 

187 

188 Its primary use is on the server side; there, a request handler (or 

189 resource decorator) can use the claims to decide whether the client is 

190 authorized for a particular request. Use on the client side is planned 

191 as a requirement on a request, although (especially on side-effect free 

192 non-confidential requests) it can also be used in response 

193 processing.""" 

194 # "no claims" is a good default 

195 return () 

196 

197 @property 

198 @abc.abstractmethod 

199 def blockwise_key(self): 

200 """A hashable (ideally, immutable) value that is only the same for 

201 remotes from which blocks may be combined. (With all current transports 

202 that means that the network addresses need to be in there, and the 

203 identity of the security context). 

204 

205 It does *not* just hinge on the identity of the address object, as a 

206 first block may come in an OSCORE group request and follow-ups may come 

207 in pairwise requests. (And there might be allowed relaxations on the 

208 transport under OSCORE, but that'd need further discussion).""" 

209 # FIXME: should this behave like something that keeps the address 

210 # alive? Conversely, if the address gets deleted, can this reach the 

211 # block keys and make their stuff vanish from the caches? 

212 # 

213 # FIXME: what do security mechanisms best put here? Currently it's a 

214 # wild mix of keys (OSCORE -- only thing guaranteed to never be reused; 

215 # DTLS client because it's available) and claims (DTLS server, because 

216 # it's available and if the claims set matches it can't be that wrong 

217 # either can it?) 

218 

219 

220class MessageManager(metaclass=abc.ABCMeta): 

221 """The interface an entity that drives a MessageInterface provides towards 

222 the MessageInterface for callbacks and object acquisition.""" 

223 

224 @abc.abstractmethod 

225 def dispatch_message(self, message): 

226 """Callback to be invoked with an incoming message""" 

227 

228 @abc.abstractmethod 

229 def dispatch_error(self, error: Exception, remote): 

230 """Callback to be invoked when the operating system indicated an error 

231 condition from a particular remote.""" 

232 

233 @property 

234 @abc.abstractmethod 

235 def client_credentials(self): 

236 """A CredentialsMap that transports should consult when trying to 

237 establish a security context""" 

238 

239 

240class TokenInterface(metaclass=abc.ABCMeta): 

241 @abc.abstractmethod 

242 def send_message( 

243 self, message, messageerror_monitor 

244 ) -> Optional[Callable[[], None]]: 

245 """Send a message. If it returns a a callable, the caller is asked to 

246 call in case it no longer needs the message sent, and to dispose of if 

247 it doesn't intend to any more. 

248 

249 messageerror_monitor is a function that will be called at most once by 

250 the token interface: When the underlying layer is indicating that this 

251 concrete message could not be processed. This is typically the case for 

252 RSTs on from the message layer, and used to cancel observations. Errors 

253 that are not likely to be specific to a message (like retransmission 

254 timeouts, or ICMP errors) are reported through dispatch_error instead. 

255 (While the information which concrete message triggered that might be 

256 available, it is not likely to be relevant). 

257 

258 Currently, it is up to the TokenInterface to unset the no_response 

259 option in response messages, and to possibly not send them.""" 

260 

261 async def fill_or_recognize_remote(self, message): 

262 """Deprecated; like :meth:`RequestInterface.fill_or_recognize_remote`""" 

263 

264 warnings.warn( 

265 "fill_or_recognize_remote has been split into recognize_remote and determine_remote", 

266 DeprecationWarning, 

267 ) 

268 if self.recognize_remote(message): 

269 return True 

270 remote = self.determine_remote(message) 

271 if remote is not None: 

272 message.remote = remote 

273 return True 

274 return False 

275 

276 @abc.abstractmethod 

277 async def recognize_remote(self, message): 

278 """Like :meth:`RequestInterface.recognize_remote`""" 

279 

280 @abc.abstractmethod 

281 async def determine_remote(self, message): 

282 """Like :meth:`RequestInterface.determine_remote`""" 

283 

284 

285class TokenManager(metaclass=abc.ABCMeta): 

286 # to be described in full; at least there is a dispatch_error in analogy to MessageManager's 

287 pass 

288 

289 

290class RequestInterface(metaclass=abc.ABCMeta): 

291 """A transport of CoAP.""" 

292 

293 async def fill_or_recognize_remote(self, message): 

294 """Deprecated method to perform 

295 :meth:`RequestInterface.recognize_remote` and 

296 :meth:`RequestInterface.determine_remote` in one go. 

297 

298 The implementation is only provided for compatibility, and will be 

299 removed after the next release. 

300 

301 Return True if the message is recognized to already have a .remote 

302 managedy by this TokenInterface, or return True and set a .remote on 

303 message if it should (by its unresolved remote or Uri-* options) be 

304 routed through this TokenInterface, or return False otherwise.""" 

305 

306 warnings.warn( 

307 "fill_or_recognize_remote has been split into recognize_remote and determine_remote", 

308 DeprecationWarning, 

309 ) 

310 if self.recognize_remote(message): 

311 return True 

312 remote = self.determine_remote(message) 

313 if remote is not None: 

314 message.remote = remote 

315 return True 

316 return False 

317 

318 @abc.abstractmethod 

319 def request(self, request: Pipe): 

320 pass 

321 

322 @abc.abstractmethod 

323 async def recognize_remote(self, message): 

324 """Return True if the remote of this message is currently expected to 

325 be usable with this transport.""" 

326 

327 @abc.abstractmethod 

328 async def determine_remote(self, message): 

329 """Return a remote if the transport expects to be able to deliver the 

330 request based on its URI components.""" 

331 

332 

333class RequestProvider(metaclass=abc.ABCMeta): 

334 """ 

335 .. automethod:: request 

336 .. (which we have to list here manually because the private override in the 

337 method is needed for the repeated signature in Context) 

338 """ 

339 

340 @abc.abstractmethod 

341 def request(self, request_message, handle_blockwise=True): 

342 """Create and act on a :class:`Request` object that will be handled 

343 according to the provider's implementation. 

344 

345 Note that the request is not necessarily sent on the wire immediately; 

346 it may (but, depend on the transport does not necessarily) rely on the 

347 response to be waited for. 

348 

349 If handle_blockwise is True (the default), the request provider will 

350 split the request and/or collect the response parts automatically. The 

351 block size indicated by the remote is used, and can be decreased by 

352 setting the message's :attr:`.remote.maximum_block_size_exp 

353 <aiocoap.interfaces.EndpointAddress.maximum_block_size_exp>` property. 

354 Note that by being a property of the remote, this may affect other 

355 block-wise operations on the same remote -- this should be desirable 

356 behavior. 

357 

358 :meta private: 

359 (not actually private, just hiding from automodule due to being 

360 grouped with the important functions) 

361 """ 

362 

363 

364class Request(metaclass=abc.ABCMeta): 

365 """A CoAP request, initiated by sending a message. Typically, this is not 

366 instantiated directly, but generated by a :meth:`RequestProvider.request` 

367 method.""" 

368 

369 response = """A future that is present from the creation of the object and \ 

370 fulfilled with the response message. 

371 

372 When legitimate errors occur, this becomes an aiocoap.Error. (Eg. on 

373 any kind of network failure, encryption trouble, or protocol 

374 violations). Any other kind of exception raised from this is a bug in 

375 aiocoap, and should better stop the whole application. 

376 """ 

377 

378 

379class Resource(metaclass=abc.ABCMeta): 

380 """Interface that is expected by a :class:`.protocol.Context` to be present 

381 on the serversite, which renders all requests to that context.""" 

382 

383 def __init__(self): 

384 super().__init__() 

385 

386 # FIXME: These keep addresses alive, and thus possibly transports. 

387 # Going through the shutdown dance per resource seems extraneous. 

388 # Options are to accept addresses staying around (making sure they 

389 # don't keep their transports alive, if that's a good idea), to hash 

390 # them, or to make them weak. 

391 

392 from .blockwise import Block1Spool, Block2Cache 

393 

394 self._block1 = Block1Spool() 

395 self._block2 = Block2Cache() 

396 

397 @abc.abstractmethod 

398 async def render(self, request): 

399 """Return a message that can be sent back to the requester. 

400 

401 This does not need to set any low-level message options like remote, 

402 token or message type; it does however need to set a response code. 

403 

404 A response returned may carry a no_response option (which is actually 

405 specified to apply to requests only); the underlying transports will 

406 decide based on that and its code whether to actually transmit the 

407 response.""" 

408 

409 @abc.abstractmethod 

410 async def needs_blockwise_assembly(self, request): 

411 """Indicator whether aiocoap should assemble request blocks to a single 

412 request and extract the requested blocks from a complete-resource 

413 answer (True), or whether the resource will do that by itself 

414 (False).""" 

415 

416 async def _render_to_pipe(self, pipe: Pipe) -> None: 

417 if not hasattr(self, "_block1"): 

418 warnings.warn( 

419 "No attribute _block1 found on instance of " 

420 f"{type(self).__name__}, make sure its __init__ code " 

421 "properly calls super()!", 

422 DeprecationWarning, 

423 ) 

424 

425 from .blockwise import Block1Spool, Block2Cache 

426 

427 self._block1 = Block1Spool() 

428 self._block2 = Block2Cache() 

429 

430 req = pipe.request 

431 

432 if await self.needs_blockwise_assembly(req): 

433 req = self._block1.feed_and_take(req) 

434 

435 # Note that unless the lambda gets called, we're not fully 

436 # accessing req any more -- we're just looking at its block2 

437 # option, and the blockwise key extracted earlier. 

438 res = await self._block2.extract_or_insert(req, lambda: self.render(req)) 

439 

440 res.opt.block1 = req.opt.block1 

441 else: 

442 res = await self.render(req) 

443 

444 pipe.add_response(res, is_last=True) 

445 

446 async def render_to_pipe(self, pipe: Pipe) -> None: 

447 """Create any number of responses (as indicated by the request) into 

448 the request stream. 

449 

450 This method is provided by the base Resource classes; if it is 

451 overridden, then :meth:`~interfaces.Resource.render`, :meth:`needs_blockwise_assembly` and 

452 :meth:`~.interfaces.ObservableResource.add_observation` are not used any more. 

453 (They still need to be implemented to comply with the interface 

454 definition, which is yet to be updated).""" 

455 warnings.warn( 

456 "Request interface is changing: Resources should " 

457 "implement render_to_pipe or inherit from " 

458 "resource.Resource which implements that based on any " 

459 "provided render methods", 

460 DeprecationWarning, 

461 ) 

462 if isinstance(self, ObservableResource): 

463 # While the above deprecation is used, a resource previously 

464 # inheriting from (X, ObservableResource) with X inheriting from 

465 # Resource might find itself using this method. When migrating over 

466 # to inheriting from resource.Resource, this error will become 

467 # apparent and this can die with the rest of this workaround. 

468 return await ObservableResource._render_to_pipe(self, pipe) 

469 await self._render_to_pipe(pipe) 

470 

471 

472class ObservableResource(Resource, metaclass=abc.ABCMeta): 

473 """Interface the :class:`.protocol.ServerObservation` uses to negotiate 

474 whether an observation can be established based on a request. 

475 

476 This adds only functionality for registering and unregistering observations; 

477 the notification contents will be retrieved from the resource using the 

478 regular :meth:`~.Resource.render` method from crafted (fake) requests. 

479 """ 

480 

481 @abc.abstractmethod 

482 async def add_observation(self, request, serverobservation): 

483 """Before the incoming request is sent to :meth:`~.Resource.render`, the 

484 :meth:`.add_observation` method is called. If the resource chooses to 

485 accept the observation, it has to call the 

486 `serverobservation.accept(cb)` with a callback that will be called when 

487 the observation ends. After accepting, the ObservableResource should 

488 call `serverobservation.trigger()` whenever it changes its state; the 

489 ServerObservation will then initiate notifications by having the 

490 request rendered again.""" 

491 

492 async def _render_to_pipe(self, pipe: Pipe) -> None: 

493 from .protocol import ServerObservation 

494 

495 # If block2:>0 comes along, we'd just ignore the observe 

496 if pipe.request.opt.observe != 0: 

497 return await Resource._render_to_pipe(self, pipe) 

498 

499 # If block1 happens here, we can probably just not support it for the 

500 # time being. (Given that block1 + observe is untested and thus does 

501 # not work so far anyway). 

502 

503 servobs = ServerObservation() 

504 await self.add_observation(pipe.request, servobs) 

505 

506 try: 

507 first_response = await self.render(pipe.request) 

508 

509 if ( 

510 not servobs._accepted 

511 or servobs._early_deregister 

512 or not first_response.code.is_successful() 

513 ): 

514 pipe.add_response(first_response, is_last=True) 

515 return 

516 

517 # FIXME: observation numbers should actually not be per 

518 # asyncio.task, but per (remote, token). if a client renews an 

519 # observation (possibly with a new ETag or whatever is deemed 

520 # legal), the new observation events should still carry larger 

521 # numbers. (if they did not, the client might be tempted to discard 

522 # them). 

523 first_response.opt.observe = next_observation_number = 0 

524 # If block2 were to happen here, we'd store the full response 

525 # here, and pick out block2:0. 

526 pipe.add_response(first_response, is_last=False) 

527 

528 while True: 

529 await servobs._trigger 

530 # if you wonder why the lines around this are not just `response = 

531 # await servobs._trigger`, have a look at the 'double' tests in 

532 # test_observe.py: A later triggering could have replaced 

533 # servobs._trigger in the meantime. 

534 response = servobs._trigger.result() 

535 servobs._trigger = asyncio.get_running_loop().create_future() 

536 

537 if response is None: 

538 response = await self.render(pipe.request) 

539 

540 # If block2 were to happen here, we'd store the full response 

541 # here, and pick out block2:0. 

542 

543 is_last = servobs._late_deregister or not response.code.is_successful() 

544 if not is_last: 

545 next_observation_number += 1 

546 response.opt.observe = next_observation_number 

547 

548 pipe.add_response(response, is_last=is_last) 

549 

550 if is_last: 

551 return 

552 finally: 

553 servobs._cancellation_callback() 

554 

555 async def render_to_pipe(self, request: Pipe) -> None: 

556 warnings.warn( 

557 "Request interface is changing: Resources should " 

558 "implement render_to_pipe or inherit from " 

559 "resource.Resource which implements that based on any " 

560 "provided render methods", 

561 DeprecationWarning, 

562 ) 

563 await self._render_to_pipe(request)