Coverage for aiocoap / transports / oscore.py: 94%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:32 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5# WORK IN PROGRESS: TransportEndpoint has been renamed to MessageInterface 

6# here, but actually we'll be providing a RequestInterface -- that's one of the 

7# reasons why RequestInterface, TokenInterface and MessageInterface were split 

8# in the first place. 

9 

10"""This module implements a RequestInterface for OSCORE. As such, it takes 

11routing ownership of requests that it has a security context available for, and 

12sends off the protected messages via another transport. 

13 

14This transport is a bit different from the others because it doesn't have its 

15dedicated URI scheme, but purely relies on preconfigured contexts. 

16 

17So far, this transport only deals with outgoing requests, and does not help in 

18building an OSCORE server. (Some code that could be used here in future resides 

19in `contrib/oscore-plugtest/plugtest-server` as the `ProtectedSite` class. 

20 

21In outgoing request, this transport automatically handles Echo options that 

22appear to come from RFC8613 Appendix B.1.2 style servers. They indicate that 

23the server could not process the request initially, but could do so if the 

24client retransmits it with an appropriate Echo value. 

25 

26Unlike other transports that could (at least in theory) be present multiple 

27times in :attr:`aiocoap.protocol.Context.request_interfaces` (eg. because there 

28are several bound sockets), this is only useful once in there, as it has no own 

29state, picks the OSCORE security context from the CoAP 

30:attr:`aiocoap.protocol.Context.client_credentials` when populating the remote 

31field, and handles any populated request based ono its remote.security_context 

32property alone. 

33""" 

34 

35from collections import namedtuple 

36from functools import wraps 

37 

38from .. import interfaces, credentials, edhoc, oscore 

39from ..numbers import UNAUTHORIZED, MAX_REGULAR_BLOCK_SIZE_EXP 

40 

41 

42def _requires_ua(f): 

43 @wraps(f) 

44 def wrapper(self): 

45 if self.underlying_address is None: 

46 raise ValueError( 

47 "No underlying address populated that could be used to derive a hostinfo" 

48 ) 

49 return f(self) 

50 

51 return wrapper 

52 

53 

54class OSCOREAddress( 

55 namedtuple("_OSCOREAddress", ["security_context", "underlying_address"]), 

56 interfaces.EndpointAddress, 

57): 

58 """Remote address type for :class:`TransportOSCORE`.""" 

59 

60 def __repr__(self): 

61 return "<%s in context %r to %r>" % ( 

62 type(self).__name__, 

63 self.security_context, 

64 self.underlying_address, 

65 ) 

66 

67 @property 

68 @_requires_ua 

69 def hostinfo(self): 

70 return self.underlying_address.hostinfo 

71 

72 @property 

73 @_requires_ua 

74 def hostinfo_local(self): 

75 return self.underlying_address.hostinfo_local 

76 

77 @property 

78 @_requires_ua 

79 def uri_base(self): 

80 return self.underlying_address.uri_base 

81 

82 @property 

83 @_requires_ua 

84 def uri_base_local(self): 

85 return self.underlying_address.uri_base_local 

86 

87 @property 

88 @_requires_ua 

89 def scheme(self): 

90 return self.underlying_address.scheme 

91 

92 @property 

93 def authenticated_claims(self): 

94 return self.security_context.authenticated_claims 

95 

96 is_multicast = False 

97 is_multicast_locally = False 

98 

99 maximum_payload_size = 1024 

100 maximum_block_size_exp = MAX_REGULAR_BLOCK_SIZE_EXP 

101 

102 @property 

103 def blockwise_key(self): 

104 if hasattr(self.security_context, "groupcontext"): 

105 # it's an aspect, and all aspects work compatibly as long as data 

106 # comes from the same recipient ID -- taking the group recipient 

107 # key for that one which is stable across switches between pairwise 

108 # and group mode 

109 detail = self.security_context.groupcontext.recipient_keys[ 

110 self.security_context.recipient_id 

111 ] 

112 else: 

113 detail = self.security_context.recipient_key 

114 return (self.underlying_address.blockwise_key, detail) 

115 

116 

117class TransportOSCORE(interfaces.RequestInterface): 

118 def __init__(self, context, forward_context): 

119 self._context = context 

120 self._wire = forward_context 

121 

122 if self._context.loop is not self._wire.loop: 

123 # TransportOSCORE is not designed to bridge loops -- would probably 

124 # be possible, but incur confusion that is most likely well avoidable 

125 raise ValueError("Wire and context need to share an asyncio loop") 

126 

127 self.loop = self._context.loop 

128 self.log = self._context.log 

129 

130 # Keep current requests. This is not needed for shutdown purposes (see 

131 # .shutdown), but because Python 3.6.4 (but not 3.6.5, and not at least 

132 # some 3.5) would otherwise cancel OSCORE tasks mid-observation. This 

133 # manifested itself as <https://github.com/chrysn/aiocoap/issues/111>. 

134 self._tasks = set() 

135 

136 # 

137 # implement RequestInterface 

138 # 

139 

140 async def recognize_remote(self, message): 

141 # There is just one of those 

142 return isinstance(message.remote, OSCOREAddress) 

143 

144 async def determine_remote(self, message): 

145 if message.opt.oscore is not None: 

146 # double oscore is not specified; using this fact to make `._wire 

147 # is ._context` an option 

148 return None 

149 if message.opt.uri_path == (".well-known", "edhoc"): 

150 # FIXME better criteria based on next-hop? 

151 return None 

152 

153 try: 

154 secctx = self._context.client_credentials.credentials_from_request(message) 

155 except credentials.CredentialsMissingError: 

156 return None 

157 

158 # FIXME: it'd be better to have a "get me credentials *of this type* if they exist" 

159 if isinstance(secctx, oscore.CanProtect) or isinstance( 

160 secctx, edhoc.EdhocCredentials 

161 ): 

162 self.log.debug( 

163 "Selecting OSCORE transport based on context %r for new request %r", 

164 secctx, 

165 message, 

166 ) 

167 return OSCOREAddress(secctx, message.remote) 

168 else: 

169 return None 

170 

171 def request(self, request): 

172 t = self.loop.create_task( 

173 self._request(request), 

174 name="OSCORE request %r" % request, 

175 ) 

176 self._tasks.add(t) 

177 

178 def done(t, _tasks=self._tasks, _request=request): 

179 _tasks.remove(t) 

180 try: 

181 t.result() 

182 except Exception as e: 

183 _request.add_exception(e) 

184 

185 t.add_done_callback(done) 

186 

187 async def _request(self, request) -> None: 

188 """Process a request including any pre-flights or retries 

189 

190 Retries by this coroutine are limited to actionable authenticated 

191 errors, i.e. those where it is ensured that even though the request is 

192 encrypted twice, it is still only processed once. 

193 

194 This coroutine sets the result of request.request on completion; 

195 otherwise it raises and relies on its done callback to propagate the 

196 error. 

197 """ 

198 msg = request.request 

199 

200 secctx = msg.remote.security_context 

201 

202 if isinstance(secctx, edhoc.EdhocCredentials): 

203 if secctx._established_context is None: 

204 self._established_context = ( 

205 msg.remote.security_context.establish_context( 

206 wire=self._wire, 

207 underlying_address=msg.remote.underlying_address, 

208 underlying_proxy_scheme=msg.opt.proxy_scheme, 

209 underlying_uri_host=msg.opt.uri_host, 

210 logger=self.log.getChild("edhoc"), 

211 ) 

212 ) 

213 # FIXME: Who should drive retries here? We probably don't retry if 

214 # it fails immediately, but what happens with the request that 

215 # finds this broken, will it recurse? 

216 secctx = await self._established_context 

217 

218 def protect(echo): 

219 if echo is None: 

220 msg_to_protect = msg 

221 else: 

222 if msg.opt.echo: 

223 self.log.warning( 

224 "Overwriting the requested Echo value with the one to answer a 4.01 Unauthorized" 

225 ) 

226 msg_to_protect = msg.copy(echo=echo) 

227 protected, original_request_seqno = secctx.protect(msg_to_protect) 

228 protected.remote = msg.remote.underlying_address 

229 

230 wire_request = self._wire.request(protected) 

231 

232 return (wire_request, original_request_seqno) 

233 

234 wire_request, original_request_seqno = protect(None) 

235 

236 # tempting as it would be, we can't access the request as a 

237 # Pipe here, because it is a BlockwiseRequest to handle 

238 # outer blockwise. 

239 # (Might be a good idea to model those after Pipe too, 

240 # though). 

241 

242 def _check(more, unprotected_response): 

243 if more and not unprotected_response.code.is_successful(): 

244 self.log.warning( 

245 "OSCORE protected message contained observe, but unprotected code is unsuccessful. Ignoring the observation." 

246 ) 

247 return False 

248 return more 

249 

250 try: 

251 protected_response = await wire_request.response 

252 

253 # Offer secctx to switch over for reception based on the header 

254 # data (similar to how the server address switches over when 

255 # receiving a response to a request sent over multicast) 

256 unprotected = oscore.verify_start(protected_response) 

257 secctx = secctx.context_from_response(unprotected) 

258 

259 unprotected_response, _ = secctx.unprotect( 

260 protected_response, original_request_seqno 

261 ) 

262 

263 if ( 

264 unprotected_response.code == UNAUTHORIZED 

265 and unprotected_response.opt.echo is not None 

266 ): 

267 # Assist the server in B.1.2 Echo receive window recovery 

268 self.log.info( 

269 "Answering the server's 4.01 Unauthorized / Echo as part of OSCORE B.1.2 recovery" 

270 ) 

271 

272 wire_request, original_request_seqno = protect( 

273 unprotected_response.opt.echo 

274 ) 

275 

276 protected_response = await wire_request.response 

277 unprotected_response, _ = secctx.unprotect( 

278 protected_response, original_request_seqno 

279 ) 

280 

281 unprotected_response.remote = OSCOREAddress( 

282 secctx, protected_response.remote 

283 ) 

284 self.log.debug( 

285 "Successfully unprotected %r into %r", 

286 protected_response, 

287 unprotected_response, 

288 ) 

289 # FIXME: if i could tap into the underlying Pipe, that'd 

290 # be a lot easier -- and also get rid of the awkward _check 

291 # code moved into its own function just to avoid duplication. 

292 more = protected_response.opt.observe is not None 

293 more = _check(more, unprotected_response) 

294 request.add_response(unprotected_response, is_last=not more) 

295 

296 if not more: 

297 return 

298 

299 async for protected_response in wire_request.observation: 

300 unprotected_response, _ = secctx.unprotect( 

301 protected_response, original_request_seqno 

302 ) 

303 

304 more = protected_response.opt.observe is not None 

305 more = _check(more, unprotected_response) 

306 

307 unprotected_response.remote = OSCOREAddress( 

308 secctx, protected_response.remote 

309 ) 

310 self.log.debug( 

311 "Successfully unprotected %r into %r", 

312 protected_response, 

313 unprotected_response, 

314 ) 

315 # FIXME: discover is_last from the underlying response 

316 request.add_response(unprotected_response, is_last=not more) 

317 

318 if not more: 

319 return 

320 request.add_exception( 

321 NotImplementedError( 

322 "End of observation" 

323 " should have been indicated in is_last, see above lines" 

324 ) 

325 ) 

326 finally: 

327 # FIXME: no way yet to cancel observations exists yet, let alone 

328 # one that can be used in a finally clause (ie. won't raise 

329 # something else if the observation terminated server-side) 

330 pass 

331 # if wire_request.observation is not None: 

332 # wire_request.observation.cancel() 

333 

334 async def shutdown(self): 

335 # Nothing to do here yet; the individual requests will be shut down by 

336 # their underlying transports 

337 pass