Coverage for aiocoap/proxy/server.py: 58%

243 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-30 11:17 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""Basic implementation of CoAP-CoAP proxying 

6 

7This is work in progress and not yet part of the API.""" 

8 

9import asyncio 

10import functools 

11import ipaddress 

12import logging 

13import warnings 

14 

15from .. import numbers, interfaces, message, error, util, resource 

16from ..numbers import codes 

17from ..blockwise import Block1Spool, Block2Cache 

18from ..pipe import Pipe 

19 

20 

21class CanNotRedirect(error.ConstructionRenderableError): 

22 message = "Proxy redirection failed" 

23 

24 

25class NoUriSplitting(CanNotRedirect): 

26 code = codes.NOT_IMPLEMENTED 

27 message = "URI splitting not implemented, please use Proxy-Scheme." 

28 

29 

30class IncompleteProxyUri(CanNotRedirect): 

31 code = codes.BAD_REQUEST 

32 message = "Proxying requires Proxy-Scheme and Uri-Host" 

33 

34 

35class NotAForwardProxy(CanNotRedirect): 

36 code = codes.PROXYING_NOT_SUPPORTED 

37 message = "This is a reverse proxy, not a forward one." 

38 

39 

40class NoSuchHostname(CanNotRedirect): 

41 code = codes.NOT_FOUND 

42 message = "" 

43 

44 

45class CanNotRedirectBecauseOfUnsafeOptions(CanNotRedirect): 

46 code = codes.BAD_OPTION 

47 

48 def __init__(self, options): 

49 self.message = "Unsafe options in request: %s" % ( 

50 ", ".join(str(o.number) for o in options) 

51 ) 

52 

53 

54def raise_unless_safe(request, known_options): 

55 """Raise a BAD_OPTION CanNotRedirect unless all options in request are 

56 safe to forward or known""" 

57 

58 known_options = set(known_options).union( 

59 { 

60 # it is expected that every proxy is aware of these options even though 

61 # one of them often doesn't need touching 

62 numbers.OptionNumber.URI_HOST, 

63 numbers.OptionNumber.URI_PORT, 

64 numbers.OptionNumber.URI_PATH, 

65 numbers.OptionNumber.URI_QUERY, 

66 # handled by the Context 

67 numbers.OptionNumber.BLOCK1, 

68 numbers.OptionNumber.BLOCK2, 

69 # handled by the proxy resource 

70 numbers.OptionNumber.OBSERVE, 

71 } 

72 ) 

73 

74 unsafe_options = [ 

75 o 

76 for o in request.opt.option_list() 

77 if o.number.is_unsafe() and o.number not in known_options 

78 ] 

79 if unsafe_options: 

80 raise CanNotRedirectBecauseOfUnsafeOptions(unsafe_options) 

81 

82 

83class Proxy(interfaces.Resource): 

84 # other than in special cases, we're trying to be transparent wrt blockwise transfers 

85 interpret_block_options = False 

86 

87 def __init__(self, outgoing_context, logger=None): 

88 super().__init__() 

89 # Provide variables for render_to_pipe 

90 # FIXME this is copied from aiocoap.resource's __init__ -- but on the 

91 # long run proxying shouldn't rely on that anyway but implement 

92 # render_to_pipe right on its own 

93 self._block1 = Block1Spool() 

94 self._block2 = Block2Cache() 

95 

96 self.outgoing_context = outgoing_context 

97 self.log = logger or logging.getLogger("proxy") 

98 

99 self._redirectors = [] 

100 

101 def add_redirector(self, redirector): 

102 self._redirectors.append(redirector) 

103 

104 def apply_redirection(self, request): 

105 for r in self._redirectors: 

106 result = r.apply_redirection(request) 

107 if result is not None: 

108 return result 

109 return None 

110 

111 async def needs_blockwise_assembly(self, request): 

112 return self.interpret_block_options 

113 

114 async def render(self, request): 

115 # FIXME i'd rather let the application do with the message whatever it 

116 # wants. everything the responder needs of the request should be 

117 # extracted beforehand. 

118 request = request.copy(mid=None, token=None) 

119 request.direction = message.Direction.OUTGOING 

120 

121 try: 

122 request = self.apply_redirection(request) 

123 except CanNotRedirect as e: 

124 return e.to_message() 

125 

126 if request is None: 

127 response = await super().render(request) 

128 if response is None: 

129 raise IncompleteProxyUri("No matching proxy rule") 

130 return response 

131 

132 try: 

133 response = await self.outgoing_context.request( 

134 request, handle_blockwise=self.interpret_block_options 

135 ).response 

136 except error.TimeoutError: 

137 return message.Message(code=numbers.codes.GATEWAY_TIMEOUT) 

138 

139 raise_unless_safe(response, ()) 

140 

141 response.mtype = None 

142 response.mid = None 

143 response.remote = None 

144 response.direction = message.Direction.OUTGOING 

145 response.token = None 

146 

147 return response 

148 

149 # Not inheriting from them because we do *not* want the .render() in the 

150 # resolution tree (it can't deal with None requests, which are used among 

151 # proxy implementations) 

152 async def render_to_pipe(self, pipe: Pipe) -> None: 

153 await resource.Resource.render_to_pipe(self, pipe) # type: ignore 

154 

155 

156class ProxyWithPooledObservations(Proxy, interfaces.ObservableResource): 

157 def __init__(self, outgoing_context, logger=None): 

158 super(ProxyWithPooledObservations, self).__init__(outgoing_context, logger) 

159 

160 self._outgoing_observations = {} 

161 

162 @staticmethod 

163 def _cache_key(request): 

164 return request.get_cache_key([numbers.optionnumbers.OptionNumber.OBSERVE]) 

165 

166 def _peek_observation_for(self, request): 

167 """Return the augmented request (see _get_obervation_for) towards a 

168 resource, or raise KeyError""" 

169 cachekey = self._cache_key(request) 

170 

171 return self._outgoing_observations[cachekey] 

172 

173 def _get_observation_for(self, request): 

174 """Return an existing augmented request towards a resource or create one. 

175 

176 An augmented request is an observation request that has some additional 

177 properties (__users, __cachekey, __latest_response), which are used in 

178 ProxyWithPooledObservations to immediately serve responses from 

179 observed resources, and to tear the observations down again.""" 

180 

181 # see ProxiedResource.render 

182 request = request.copy(mid=None, remote=None, token=None) 

183 request = self.apply_redirection(request) 

184 

185 cachekey = self._cache_key(request) 

186 

187 try: 

188 obs = self._outgoing_observations[cachekey] 

189 except KeyError: 

190 obs = self._outgoing_observations[cachekey] = self.outgoing_context.request( 

191 request 

192 ) 

193 obs.__users = set() 

194 obs.__cachekey = cachekey 

195 obs.__latest_response = None # this becomes a cached response right after the .response comes in (so only use this after waiting for it), and gets updated when new responses arrive. 

196 

197 def when_first_request_done(result, obs=obs): 

198 obs.__latest_response = result.result() 

199 

200 obs.response.add_done_callback(when_first_request_done) 

201 

202 def cb(incoming_message, obs=obs): 

203 self.log.info( 

204 "Received incoming message %r, relaying it to %d clients", 

205 incoming_message, 

206 len(obs.__users), 

207 ) 

208 obs.__latest_response = incoming_message 

209 for observationserver in set(obs.__users): 

210 observationserver.trigger(incoming_message.copy()) 

211 

212 obs.observation.register_callback(cb) 

213 

214 def eb(exception, obs=obs): 

215 if obs.__users: 

216 code = numbers.codes.INTERNAL_SERVER_ERROR 

217 payload = b"" 

218 if isinstance(exception, error.RenderableError): 

219 code = exception.code 

220 payload = exception.message.encode("ascii") 

221 self.log.debug( 

222 "Received error %r, which did not lead to unregistration of the clients. Actively deregistering them with %s %r.", 

223 exception, 

224 code, 

225 payload, 

226 ) 

227 for u in list(obs.__users): 

228 u.trigger(message.Message(code=code, payload=payload)) 

229 if obs.__users: 

230 self.log.error( 

231 "Observations survived sending them an error message." 

232 ) 

233 else: 

234 self.log.debug( 

235 "Received error %r, but that seems to have been passed on cleanly to the observers as they are gone by now.", 

236 exception, 

237 ) 

238 

239 obs.observation.register_errback(eb) 

240 

241 return obs 

242 

243 def _add_observation_user(self, clientobservationrequest, serverobservation): 

244 clientobservationrequest.__users.add(serverobservation) 

245 

246 def _remove_observation_user(self, clientobservationrequest, serverobservation): 

247 clientobservationrequest.__users.remove(serverobservation) 

248 # give the request that just cancelled time to be dealt with before 

249 # dropping the __latest_response 

250 asyncio.get_event_loop().call_soon( 

251 self._consider_dropping, clientobservationrequest 

252 ) 

253 

254 def _consider_dropping(self, clientobservationrequest): 

255 if not clientobservationrequest.__users: 

256 self.log.debug( 

257 "Last client of observation went away, deregistering with server." 

258 ) 

259 self._outgoing_observations.pop(clientobservationrequest.__cachekey) 

260 if not clientobservationrequest.observation.cancelled: 

261 clientobservationrequest.observation.cancel() 

262 

263 async def add_observation(self, request, serverobservation): 

264 """As ProxiedResource is intended to be just the proxy's interface 

265 toward the Context, accepting observations is handled here, where the 

266 observations handling can be defined by the subclasses.""" 

267 

268 try: 

269 clientobservationrequest = self._get_observation_for(request) 

270 except CanNotRedirect: 

271 pass # just don't accept the observation, the rest will be taken care of at rendering 

272 else: 

273 self._add_observation_user(clientobservationrequest, serverobservation) 

274 serverobservation.accept( 

275 functools.partial( 

276 self._remove_observation_user, 

277 clientobservationrequest, 

278 serverobservation, 

279 ) 

280 ) 

281 

282 async def render(self, request): 

283 # FIXME this is evaulated twice in the implementation (once here, but 

284 # unless it's an observation what matters is inside the super call), 

285 # maybe this needs to hook in differently than by subclassing and 

286 # calling super. 

287 self.log.info("render called") 

288 redirected_request = request.copy() 

289 

290 try: 

291 redirected_request = self.apply_redirection(redirected_request) 

292 if redirected_request is None: 

293 return await super().render(request) 

294 clientobservationrequest = self._peek_observation_for(redirected_request) 

295 except (KeyError, CanNotRedirect) as e: 

296 if not isinstance(e, CanNotRedirect) and request.opt.observe is not None: 

297 self.log.warning( 

298 "No matching observation found: request is %r (cache key %r), outgoing observations %r", 

299 redirected_request, 

300 self._cache_key(redirected_request), 

301 self._outgoing_observations, 

302 ) 

303 

304 return message.Message( 

305 code=numbers.codes.BAD_OPTION, 

306 payload="Observe option can not be proxied without active observation.".encode( 

307 "utf8" 

308 ), 

309 ) 

310 self.log.debug( 

311 "Request is not an observation or can't be proxied, passing it on to regular proxying mechanisms." 

312 ) 

313 return await super(ProxyWithPooledObservations, self).render(request) 

314 else: 

315 self.log.info( 

316 "Serving request using latest cached response of %r", 

317 clientobservationrequest, 

318 ) 

319 await clientobservationrequest.response 

320 cached_response = clientobservationrequest.__latest_response 

321 cached_response.mid = None 

322 cached_response.token = None 

323 cached_response.remote = None 

324 cached_response.mtype = None 

325 return cached_response 

326 

327 

328class ForwardProxy(Proxy): 

329 def apply_redirection(self, request): 

330 request = request.copy() 

331 if request.opt.proxy_uri is not None: 

332 raise NoUriSplitting 

333 if request.opt.proxy_scheme is None: 

334 return super().apply_redirection(request) 

335 if request.opt.uri_host is None: 

336 raise IncompleteProxyUri 

337 

338 raise_unless_safe( 

339 request, 

340 ( 

341 numbers.OptionNumber.PROXY_SCHEME, 

342 numbers.OptionNumber.URI_HOST, 

343 numbers.OptionNumber.URI_PORT, 

344 ), 

345 ) 

346 

347 request.remote = message.UndecidedRemote( 

348 request.opt.proxy_scheme, 

349 util.hostportjoin(request.opt.uri_host, request.opt.uri_port), 

350 ) 

351 request.opt.proxy_scheme = None 

352 request.opt.uri_port = None 

353 forward_host = request.opt.uri_host 

354 try: 

355 # I'd prefer to not do if-by-try, but the ipaddress doesn't seem to 

356 # offer any other choice 

357 ipaddress.ip_address(request.opt.uri_host) 

358 

359 warnings.warn( 

360 "URI-Host looks like IPv6 but has no square " 

361 "brackets. This is deprecated, see " 

362 "https://github.com/chrysn/aiocoap/issues/216", 

363 DeprecationWarning, 

364 ) 

365 except ValueError: 

366 pass 

367 else: 

368 request.opt.uri_host = None 

369 if forward_host.startswith("["): 

370 # IPv6 or future literals are not recognized by ipaddress which 

371 # does not look at host-encoded form 

372 request.opt.uri_host = None 

373 

374 # Maybe the URI-Host matches a known forwarding -- in that case, catch that. 

375 redirected = super(ForwardProxy, self).apply_redirection(request) 

376 if redirected is not None: 

377 return redirected 

378 

379 return request 

380 

381 

382class ForwardProxyWithPooledObservations(ForwardProxy, ProxyWithPooledObservations): 

383 pass 

384 

385 

386class ReverseProxy(Proxy): 

387 def __init__(self, *args, **kwargs): 

388 import warnings 

389 

390 warnings.warn( 

391 "ReverseProxy has become moot due to proxy operation " 

392 "changes, just instanciate Proxy and set the appropriate " 

393 "redirectors", 

394 DeprecationWarning, 

395 stacklevel=1, 

396 ) 

397 super().__init__(*args, **kwargs) 

398 

399 

400class ReverseProxyWithPooledObservations(ReverseProxy, ProxyWithPooledObservations): 

401 pass 

402 

403 

404class Redirector: 

405 def apply_redirection(self, request): 

406 return None 

407 

408 

409class NameBasedVirtualHost(Redirector): 

410 def __init__(self, match_name, target, rewrite_uri_host=False, use_as_proxy=False): 

411 self.match_name = match_name 

412 self.target = target 

413 self.rewrite_uri_host = rewrite_uri_host 

414 self.use_as_proxy = use_as_proxy 

415 

416 def apply_redirection(self, request): 

417 raise_unless_safe(request, ()) 

418 

419 if self._matches(request.opt.uri_host): 

420 if self.use_as_proxy: 

421 request.opt.proxy_scheme = request.remote.scheme 

422 if self.rewrite_uri_host: 

423 request.opt.uri_host, _ = util.hostportsplit(self.target) 

424 request.unresolved_remote = self.target 

425 return request 

426 

427 def _matches(self, hostname): 

428 return hostname == self.match_name 

429 

430 

431class SubdomainVirtualHost(NameBasedVirtualHost): 

432 def __init__(self, *args, **kwargs): 

433 super().__init__(*args, **kwargs) 

434 if self.rewrite_uri_host: 

435 raise TypeError( 

436 "rewrite_uri_host makes no sense with subdomain virtual hosting" 

437 ) 

438 

439 def _matches(self, hostname): 

440 return hostname.endswith("." + self.match_name) 

441 

442 

443class UnconditionalRedirector(Redirector): 

444 def __init__(self, target, use_as_proxy=False): 

445 self.target = target 

446 self.use_as_proxy = use_as_proxy 

447 

448 def apply_redirection(self, request): 

449 raise_unless_safe(request, ()) 

450 

451 if self.use_as_proxy: 

452 request.opt.proxy_scheme = request.remote.scheme 

453 request.unresolved_remote = self.target 

454 return request 

455 

456 

457class SubresourceVirtualHost(Redirector): 

458 def __init__(self, path, target): 

459 self.path = tuple(path) 

460 self.target = target 

461 

462 def apply_redirection(self, request): 

463 raise_unless_safe(request, ()) 

464 

465 if self.path == request.opt.uri_path[: len(self.path)]: 

466 request.opt.uri_path = request.opt.uri_path[len(self.path) :] 

467 request.unresolved_remote = self.target 

468 return request