Coverage for aiocoap / proxy / server.py: 58%

252 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:32 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""Basic implementation of CoAP-CoAP proxying 

6 

7This is work in progress and not yet part of the API.""" 

8 

9import asyncio 

10import functools 

11import ipaddress 

12import logging 

13import warnings 

14 

15from .. import numbers, interfaces, message, error, util, resource 

16from ..numbers import codes 

17from ..blockwise import Block1Spool, Block2Cache 

18from ..pipe import Pipe 

19from ..util import DeprecationWarning 

20 

21 

22class CanNotRedirect(error.ConstructionRenderableError): 

23 message = "Proxy redirection failed" 

24 

25 

26class NoUriSplitting(CanNotRedirect): 

27 code = codes.NOT_IMPLEMENTED 

28 message = "URI splitting not implemented, please use Proxy-Scheme." 

29 

30 

31class IncompleteProxyUri(CanNotRedirect): 

32 code = codes.BAD_REQUEST 

33 message = "Proxying requires Proxy-Scheme and Uri-Host" 

34 

35 

36class NotAForwardProxy(CanNotRedirect): 

37 code = codes.PROXYING_NOT_SUPPORTED 

38 message = "This is a reverse proxy, not a forward one." 

39 

40 

41class NoSuchHostname(CanNotRedirect): 

42 code = codes.NOT_FOUND 

43 message = "" 

44 

45 

46class CanNotRedirectBecauseOfUnsafeOptions(CanNotRedirect): 

47 code = codes.BAD_OPTION 

48 

49 def __init__(self, options): 

50 self.message = "Unsafe options in request: %s" % ( 

51 ", ".join(str(o.number) for o in options) 

52 ) 

53 

54 

55def raise_unless_safe(request, known_options): 

56 """Raise a BAD_OPTION CanNotRedirect unless all options in request are 

57 safe to forward or known""" 

58 

59 known_options = set(known_options).union( 

60 { 

61 # it is expected that every proxy is aware of these options even though 

62 # one of them often doesn't need touching 

63 numbers.OptionNumber.URI_HOST, 

64 numbers.OptionNumber.URI_PORT, 

65 numbers.OptionNumber.URI_PATH, 

66 numbers.OptionNumber.URI_QUERY, 

67 # handled by the Context 

68 numbers.OptionNumber.BLOCK1, 

69 numbers.OptionNumber.BLOCK2, 

70 # handled by the proxy resource 

71 numbers.OptionNumber.OBSERVE, 

72 } 

73 ) 

74 

75 unsafe_options = [ 

76 o 

77 for o in request.opt.option_list() 

78 if o.number.is_unsafe() and o.number not in known_options 

79 ] 

80 if unsafe_options: 

81 raise CanNotRedirectBecauseOfUnsafeOptions(unsafe_options) 

82 

83 

84class Proxy(interfaces.Resource): 

85 # other than in special cases, we're trying to be transparent wrt blockwise transfers 

86 interpret_block_options = False 

87 

88 def __init__(self, outgoing_context, logger=None): 

89 super().__init__() 

90 # Provide variables for render_to_pipe 

91 # FIXME this is copied from aiocoap.resource's __init__ -- but on the 

92 # long run proxying shouldn't rely on that anyway but implement 

93 # render_to_pipe right on its own 

94 self._block1 = Block1Spool() 

95 self._block2 = Block2Cache() 

96 

97 self.outgoing_context = outgoing_context 

98 self.log = logger or logging.getLogger("proxy") 

99 

100 self._redirectors = [] 

101 

102 def add_redirector(self, redirector): 

103 self._redirectors.append(redirector) 

104 

105 def apply_redirection(self, request): 

106 for r in self._redirectors: 

107 result = r.apply_redirection(request) 

108 if result is not None: 

109 return result 

110 return None 

111 

112 async def needs_blockwise_assembly(self, request): 

113 return self.interpret_block_options 

114 

115 async def render(self, request): 

116 # FIXME i'd rather let the application do with the message whatever it 

117 # wants. everything the responder needs of the request should be 

118 # extracted beforehand. 

119 request = request.copy(mid=None, token=None) 

120 

121 try: 

122 redirected = self.apply_redirection(request) 

123 except CanNotRedirect as e: 

124 return e.to_message() 

125 

126 if redirected is None: 

127 response = await super().render(request) 

128 if response is None: 

129 raise IncompleteProxyUri("No matching proxy rule") 

130 return response 

131 

132 redirected.direction = message.Direction.OUTGOING 

133 request = redirected 

134 

135 try: 

136 response = await self.outgoing_context.request( 

137 request, handle_blockwise=self.interpret_block_options 

138 ).response 

139 except error.TimeoutError: 

140 return message.Message(code=numbers.codes.GATEWAY_TIMEOUT) 

141 

142 raise_unless_safe(response, ()) 

143 

144 response.mtype = None 

145 response.mid = None 

146 response.remote = None 

147 response.direction = message.Direction.OUTGOING 

148 response.token = None 

149 

150 return response 

151 

152 # Not inheriting from them because we do *not* want the .render() in the 

153 # resolution tree (it can't deal with None requests, which are used among 

154 # proxy implementations) 

155 async def render_to_pipe(self, pipe: Pipe) -> None: 

156 # I'd rather not expand unconditionally, but if we don't do this here, 

157 # we're too deep into render() etc to get out again. This is working on 

158 # the assumption that all proxies are really at root. 

159 # 

160 # Workaround-For: https://github.com/chrysn/aiocoap/issues/414 

161 try: 

162 resource._expand_upa(pipe.request) 

163 except error.BadOption: 

164 pass 

165 await resource.Resource.render_to_pipe(self, pipe) # type: ignore 

166 

167 

168class ProxyWithPooledObservations(Proxy, interfaces.ObservableResource): 

169 def __init__(self, outgoing_context, logger=None): 

170 super(ProxyWithPooledObservations, self).__init__(outgoing_context, logger) 

171 

172 self._outgoing_observations = {} 

173 

174 @staticmethod 

175 def _cache_key(request): 

176 return request.get_cache_key([numbers.optionnumbers.OptionNumber.OBSERVE]) 

177 

178 def _peek_observation_for(self, request): 

179 """Return the augmented request (see _get_obervation_for) towards a 

180 resource, or raise KeyError""" 

181 cachekey = self._cache_key(request) 

182 

183 return self._outgoing_observations[cachekey] 

184 

185 def _get_observation_for(self, request): 

186 """Return an existing augmented request towards a resource or create one. 

187 

188 An augmented request is an observation request that has some additional 

189 properties (__users, __cachekey, __latest_response), which are used in 

190 ProxyWithPooledObservations to immediately serve responses from 

191 observed resources, and to tear the observations down again.""" 

192 

193 # see ProxiedResource.render 

194 request = request.copy(mid=None, remote=None, token=None) 

195 request = self.apply_redirection(request) 

196 

197 cachekey = self._cache_key(request) 

198 

199 try: 

200 obs = self._outgoing_observations[cachekey] 

201 except KeyError: 

202 obs = self._outgoing_observations[cachekey] = self.outgoing_context.request( 

203 request 

204 ) 

205 obs.__users = set() 

206 obs.__cachekey = cachekey 

207 obs.__latest_response = None # this becomes a cached response right after the .response comes in (so only use this after waiting for it), and gets updated when new responses arrive. 

208 

209 def when_first_request_done(result, obs=obs): 

210 obs.__latest_response = result.result() 

211 

212 obs.response.add_done_callback(when_first_request_done) 

213 

214 def cb(incoming_message, obs=obs): 

215 self.log.info( 

216 "Received incoming message %r, relaying it to %d clients", 

217 incoming_message, 

218 len(obs.__users), 

219 ) 

220 obs.__latest_response = incoming_message 

221 for observationserver in set(obs.__users): 

222 observationserver.trigger(incoming_message.copy()) 

223 

224 obs.observation.register_callback(cb) 

225 

226 def eb(exception, obs=obs): 

227 if obs.__users: 

228 code = numbers.codes.INTERNAL_SERVER_ERROR 

229 payload = b"" 

230 if isinstance(exception, error.RenderableError): 

231 code = exception.code 

232 payload = exception.message.encode("ascii") 

233 self.log.debug( 

234 "Received error %r, which did not lead to unregistration of the clients. Actively deregistering them with %s %r.", 

235 exception, 

236 code, 

237 payload, 

238 ) 

239 for u in list(obs.__users): 

240 u.trigger(message.Message(code=code, payload=payload)) 

241 if obs.__users: 

242 self.log.error( 

243 "Observations survived sending them an error message." 

244 ) 

245 else: 

246 self.log.debug( 

247 "Received error %r, but that seems to have been passed on cleanly to the observers as they are gone by now.", 

248 exception, 

249 ) 

250 

251 obs.observation.register_errback(eb) 

252 

253 return obs 

254 

255 def _add_observation_user(self, clientobservationrequest, serverobservation): 

256 clientobservationrequest.__users.add(serverobservation) 

257 

258 def _remove_observation_user(self, clientobservationrequest, serverobservation): 

259 clientobservationrequest.__users.remove(serverobservation) 

260 # give the request that just cancelled time to be dealt with before 

261 # dropping the __latest_response 

262 asyncio.get_running_loop().call_soon( 

263 self._consider_dropping, clientobservationrequest 

264 ) 

265 

266 def _consider_dropping(self, clientobservationrequest): 

267 if not clientobservationrequest.__users: 

268 self.log.debug( 

269 "Last client of observation went away, deregistering with server." 

270 ) 

271 self._outgoing_observations.pop(clientobservationrequest.__cachekey) 

272 if not clientobservationrequest.observation.cancelled: 

273 clientobservationrequest.observation.cancel() 

274 

275 async def add_observation(self, request, serverobservation): 

276 """As ProxiedResource is intended to be just the proxy's interface 

277 toward the Context, accepting observations is handled here, where the 

278 observations handling can be defined by the subclasses.""" 

279 

280 try: 

281 clientobservationrequest = self._get_observation_for(request) 

282 except CanNotRedirect: 

283 pass # just don't accept the observation, the rest will be taken care of at rendering 

284 else: 

285 self._add_observation_user(clientobservationrequest, serverobservation) 

286 serverobservation.accept( 

287 functools.partial( 

288 self._remove_observation_user, 

289 clientobservationrequest, 

290 serverobservation, 

291 ) 

292 ) 

293 

294 async def render(self, request): 

295 # FIXME this is evaluated twice in the implementation (once here, but 

296 # unless it's an observation what matters is inside the super call), 

297 # maybe this needs to hook in differently than by subclassing and 

298 # calling super. 

299 self.log.info("render called") 

300 redirected_request = request.copy() 

301 

302 try: 

303 redirected_request = self.apply_redirection(redirected_request) 

304 if redirected_request is None: 

305 return await super().render(request) 

306 clientobservationrequest = self._peek_observation_for(redirected_request) 

307 except (KeyError, CanNotRedirect) as e: 

308 if not isinstance(e, CanNotRedirect) and request.opt.observe is not None: 

309 self.log.warning( 

310 "No matching observation found: request is %r (cache key %r), outgoing observations %r", 

311 redirected_request, 

312 self._cache_key(redirected_request), 

313 self._outgoing_observations, 

314 ) 

315 

316 return message.Message( 

317 code=numbers.codes.BAD_OPTION, 

318 payload="Observe option can not be proxied without active observation.".encode( 

319 "utf8" 

320 ), 

321 ) 

322 self.log.debug( 

323 "Request is not an observation or can't be proxied, passing it on to regular proxying mechanisms." 

324 ) 

325 return await super(ProxyWithPooledObservations, self).render(request) 

326 else: 

327 self.log.info( 

328 "Serving request using latest cached response of %r", 

329 clientobservationrequest, 

330 ) 

331 await clientobservationrequest.response 

332 cached_response = clientobservationrequest.__latest_response 

333 cached_response.mid = None 

334 cached_response.token = None 

335 cached_response.remote = None 

336 cached_response.mtype = None 

337 return cached_response 

338 

339 

340class ForwardProxy(Proxy): 

341 def apply_redirection(self, request): 

342 request = request.copy() 

343 if request.opt.proxy_uri is not None: 

344 raise NoUriSplitting 

345 if request.opt.proxy_scheme is None: 

346 return super().apply_redirection(request) 

347 if request.opt.uri_host is None: 

348 raise IncompleteProxyUri 

349 

350 raise_unless_safe( 

351 request, 

352 ( 

353 numbers.OptionNumber.PROXY_SCHEME, 

354 numbers.OptionNumber.URI_HOST, 

355 numbers.OptionNumber.URI_PORT, 

356 ), 

357 ) 

358 

359 request.remote = message.UndecidedRemote( 

360 request.opt.proxy_scheme, 

361 util.hostportjoin(request.opt.uri_host, request.opt.uri_port), 

362 ) 

363 request.opt.proxy_scheme = None 

364 request.opt.uri_port = None 

365 forward_host = request.opt.uri_host 

366 try: 

367 # I'd prefer to not do if-by-try, but the ipaddress doesn't seem to 

368 # offer any other choice 

369 ipaddress.ip_address(request.opt.uri_host) 

370 

371 warnings.warn( 

372 "URI-Host looks like IPv6 but has no square " 

373 "brackets. This is deprecated, see " 

374 "https://github.com/chrysn/aiocoap/issues/216", 

375 DeprecationWarning, 

376 ) 

377 except ValueError: 

378 pass 

379 else: 

380 request.opt.uri_host = None 

381 if forward_host.startswith("["): 

382 # IPv6 or future literals are not recognized by ipaddress which 

383 # does not look at host-encoded form 

384 request.opt.uri_host = None 

385 

386 # Maybe the URI-Host matches a known forwarding -- in that case, catch that. 

387 redirected = super(ForwardProxy, self).apply_redirection(request) 

388 if redirected is not None: 

389 redirected.direction = message.Direction.OUTGOING 

390 return redirected 

391 

392 return request 

393 

394 

395class ForwardProxyWithPooledObservations(ForwardProxy, ProxyWithPooledObservations): 

396 pass 

397 

398 

399class ReverseProxy(Proxy): 

400 def __init__(self, *args, **kwargs): 

401 import warnings 

402 

403 warnings.warn( 

404 "ReverseProxy has become moot due to proxy operation " 

405 "changes, just instantiate Proxy and set the appropriate " 

406 "redirectors", 

407 DeprecationWarning, 

408 stacklevel=1, 

409 ) 

410 super().__init__(*args, **kwargs) 

411 

412 

413class ReverseProxyWithPooledObservations(ReverseProxy, ProxyWithPooledObservations): 

414 pass 

415 

416 

417class Redirector: 

418 def apply_redirection(self, request): 

419 return None 

420 

421 

422class NameBasedVirtualHost(Redirector): 

423 def __init__(self, match_name, target, rewrite_uri_host=False, use_as_proxy=False): 

424 self.match_name = match_name 

425 self.target = target 

426 self.rewrite_uri_host = rewrite_uri_host 

427 self.use_as_proxy = use_as_proxy 

428 

429 def apply_redirection(self, request): 

430 raise_unless_safe(request, ()) 

431 

432 if self._matches(request.opt.uri_host): 

433 if self.use_as_proxy: 

434 request.opt.proxy_scheme = request.remote.scheme 

435 if self.rewrite_uri_host: 

436 request.opt.uri_host, _ = util.hostportsplit(self.target) 

437 request.unresolved_remote = self.target 

438 return request 

439 

440 def _matches(self, hostname): 

441 return hostname == self.match_name 

442 

443 

444class SubdomainVirtualHost(NameBasedVirtualHost): 

445 def __init__(self, *args, **kwargs): 

446 super().__init__(*args, **kwargs) 

447 if self.rewrite_uri_host: 

448 raise TypeError( 

449 "rewrite_uri_host makes no sense with subdomain virtual hosting" 

450 ) 

451 

452 def _matches(self, hostname): 

453 if hostname is None: 

454 return False 

455 return hostname.endswith("." + self.match_name) 

456 

457 

458class UnconditionalRedirector(Redirector): 

459 def __init__(self, target, use_as_proxy=False): 

460 self.target = target 

461 self.use_as_proxy = use_as_proxy 

462 

463 def apply_redirection(self, request): 

464 raise_unless_safe(request, ()) 

465 

466 if self.use_as_proxy: 

467 request.opt.proxy_scheme = request.remote.scheme 

468 request.unresolved_remote = self.target 

469 return request 

470 

471 

472class SubresourceVirtualHost(Redirector): 

473 def __init__(self, path, target): 

474 self.path = tuple(path) 

475 self.target = target 

476 

477 def apply_redirection(self, request): 

478 raise_unless_safe(request, ()) 

479 

480 if self.path == request.opt.uri_path[: len(self.path)]: 

481 request.opt.uri_path = request.opt.uri_path[len(self.path) :] 

482 request.unresolved_remote = self.target 

483 return request