Coverage for aiocoap / proxy / server.py: 58%

248 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 12:28 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""Basic implementation of CoAP-CoAP proxying 

6 

7This is work in progress and not yet part of the API.""" 

8 

9import asyncio 

10import functools 

11import ipaddress 

12import logging 

13import warnings 

14 

15from .. import numbers, interfaces, message, error, util, resource 

16from ..numbers import codes 

17from ..blockwise import Block1Spool, Block2Cache 

18from ..pipe import Pipe 

19from ..util import DeprecationWarning 

20 

21 

22class CanNotRedirect(error.ConstructionRenderableError): 

23 message = "Proxy redirection failed" 

24 

25 

26class NoUriSplitting(CanNotRedirect): 

27 code = codes.NOT_IMPLEMENTED 

28 message = "URI splitting not implemented, please use Proxy-Scheme." 

29 

30 

31class IncompleteProxyUri(CanNotRedirect): 

32 code = codes.BAD_REQUEST 

33 message = "Proxying requires Proxy-Scheme and Uri-Host" 

34 

35 

36class NotAForwardProxy(CanNotRedirect): 

37 code = codes.PROXYING_NOT_SUPPORTED 

38 message = "This is a reverse proxy, not a forward one." 

39 

40 

41class NoSuchHostname(CanNotRedirect): 

42 code = codes.NOT_FOUND 

43 message = "" 

44 

45 

46class CanNotRedirectBecauseOfUnsafeOptions(CanNotRedirect): 

47 code = codes.BAD_OPTION 

48 

49 def __init__(self, options): 

50 self.message = "Unsafe options in request: %s" % ( 

51 ", ".join(str(o.number) for o in options) 

52 ) 

53 

54 

55def raise_unless_safe(request, known_options): 

56 """Raise a BAD_OPTION CanNotRedirect unless all options in request are 

57 safe to forward or known""" 

58 

59 known_options = set(known_options).union( 

60 { 

61 # it is expected that every proxy is aware of these options even though 

62 # one of them often doesn't need touching 

63 numbers.OptionNumber.URI_HOST, 

64 numbers.OptionNumber.URI_PORT, 

65 numbers.OptionNumber.URI_PATH, 

66 numbers.OptionNumber.URI_QUERY, 

67 # handled by the Context 

68 numbers.OptionNumber.BLOCK1, 

69 numbers.OptionNumber.BLOCK2, 

70 # handled by the proxy resource 

71 numbers.OptionNumber.OBSERVE, 

72 } 

73 ) 

74 

75 unsafe_options = [ 

76 o 

77 for o in request.opt.option_list() 

78 if o.number.is_unsafe() and o.number not in known_options 

79 ] 

80 if unsafe_options: 

81 raise CanNotRedirectBecauseOfUnsafeOptions(unsafe_options) 

82 

83 

84class Proxy(interfaces.Resource): 

85 # other than in special cases, we're trying to be transparent wrt blockwise transfers 

86 interpret_block_options = False 

87 

88 def __init__(self, outgoing_context, logger=None): 

89 super().__init__() 

90 # Provide variables for render_to_pipe 

91 # FIXME this is copied from aiocoap.resource's __init__ -- but on the 

92 # long run proxying shouldn't rely on that anyway but implement 

93 # render_to_pipe right on its own 

94 self._block1 = Block1Spool() 

95 self._block2 = Block2Cache() 

96 

97 self.outgoing_context = outgoing_context 

98 self.log = logger or logging.getLogger("proxy") 

99 

100 self._redirectors = [] 

101 

102 def add_redirector(self, redirector): 

103 self._redirectors.append(redirector) 

104 

105 def apply_redirection(self, request): 

106 for r in self._redirectors: 

107 result = r.apply_redirection(request) 

108 if result is not None: 

109 return result 

110 return None 

111 

112 async def needs_blockwise_assembly(self, request): 

113 return self.interpret_block_options 

114 

115 async def render(self, request): 

116 # FIXME i'd rather let the application do with the message whatever it 

117 # wants. everything the responder needs of the request should be 

118 # extracted beforehand. 

119 request = request.copy(mid=None, token=None) 

120 

121 try: 

122 redirected = self.apply_redirection(request) 

123 except CanNotRedirect as e: 

124 return e.to_message() 

125 

126 if redirected is None: 

127 response = await super().render(request) 

128 if response is None: 

129 raise IncompleteProxyUri("No matching proxy rule") 

130 return response 

131 

132 redirected.direction = message.Direction.OUTGOING 

133 request = redirected 

134 

135 try: 

136 response = await self.outgoing_context.request( 

137 request, handle_blockwise=self.interpret_block_options 

138 ).response 

139 except error.TimeoutError: 

140 return message.Message(code=numbers.codes.GATEWAY_TIMEOUT) 

141 

142 raise_unless_safe(response, ()) 

143 

144 response.mtype = None 

145 response.mid = None 

146 response.remote = None 

147 response.direction = message.Direction.OUTGOING 

148 response.token = None 

149 

150 return response 

151 

152 # Not inheriting from them because we do *not* want the .render() in the 

153 # resolution tree (it can't deal with None requests, which are used among 

154 # proxy implementations) 

155 async def render_to_pipe(self, pipe: Pipe) -> None: 

156 await resource.Resource.render_to_pipe(self, pipe) # type: ignore 

157 

158 

159class ProxyWithPooledObservations(Proxy, interfaces.ObservableResource): 

160 def __init__(self, outgoing_context, logger=None): 

161 super(ProxyWithPooledObservations, self).__init__(outgoing_context, logger) 

162 

163 self._outgoing_observations = {} 

164 

165 @staticmethod 

166 def _cache_key(request): 

167 return request.get_cache_key([numbers.optionnumbers.OptionNumber.OBSERVE]) 

168 

169 def _peek_observation_for(self, request): 

170 """Return the augmented request (see _get_obervation_for) towards a 

171 resource, or raise KeyError""" 

172 cachekey = self._cache_key(request) 

173 

174 return self._outgoing_observations[cachekey] 

175 

176 def _get_observation_for(self, request): 

177 """Return an existing augmented request towards a resource or create one. 

178 

179 An augmented request is an observation request that has some additional 

180 properties (__users, __cachekey, __latest_response), which are used in 

181 ProxyWithPooledObservations to immediately serve responses from 

182 observed resources, and to tear the observations down again.""" 

183 

184 # see ProxiedResource.render 

185 request = request.copy(mid=None, remote=None, token=None) 

186 request = self.apply_redirection(request) 

187 

188 cachekey = self._cache_key(request) 

189 

190 try: 

191 obs = self._outgoing_observations[cachekey] 

192 except KeyError: 

193 obs = self._outgoing_observations[cachekey] = self.outgoing_context.request( 

194 request 

195 ) 

196 obs.__users = set() 

197 obs.__cachekey = cachekey 

198 obs.__latest_response = None # this becomes a cached response right after the .response comes in (so only use this after waiting for it), and gets updated when new responses arrive. 

199 

200 def when_first_request_done(result, obs=obs): 

201 obs.__latest_response = result.result() 

202 

203 obs.response.add_done_callback(when_first_request_done) 

204 

205 def cb(incoming_message, obs=obs): 

206 self.log.info( 

207 "Received incoming message %r, relaying it to %d clients", 

208 incoming_message, 

209 len(obs.__users), 

210 ) 

211 obs.__latest_response = incoming_message 

212 for observationserver in set(obs.__users): 

213 observationserver.trigger(incoming_message.copy()) 

214 

215 obs.observation.register_callback(cb) 

216 

217 def eb(exception, obs=obs): 

218 if obs.__users: 

219 code = numbers.codes.INTERNAL_SERVER_ERROR 

220 payload = b"" 

221 if isinstance(exception, error.RenderableError): 

222 code = exception.code 

223 payload = exception.message.encode("ascii") 

224 self.log.debug( 

225 "Received error %r, which did not lead to unregistration of the clients. Actively deregistering them with %s %r.", 

226 exception, 

227 code, 

228 payload, 

229 ) 

230 for u in list(obs.__users): 

231 u.trigger(message.Message(code=code, payload=payload)) 

232 if obs.__users: 

233 self.log.error( 

234 "Observations survived sending them an error message." 

235 ) 

236 else: 

237 self.log.debug( 

238 "Received error %r, but that seems to have been passed on cleanly to the observers as they are gone by now.", 

239 exception, 

240 ) 

241 

242 obs.observation.register_errback(eb) 

243 

244 return obs 

245 

246 def _add_observation_user(self, clientobservationrequest, serverobservation): 

247 clientobservationrequest.__users.add(serverobservation) 

248 

249 def _remove_observation_user(self, clientobservationrequest, serverobservation): 

250 clientobservationrequest.__users.remove(serverobservation) 

251 # give the request that just cancelled time to be dealt with before 

252 # dropping the __latest_response 

253 asyncio.get_running_loop().call_soon( 

254 self._consider_dropping, clientobservationrequest 

255 ) 

256 

257 def _consider_dropping(self, clientobservationrequest): 

258 if not clientobservationrequest.__users: 

259 self.log.debug( 

260 "Last client of observation went away, deregistering with server." 

261 ) 

262 self._outgoing_observations.pop(clientobservationrequest.__cachekey) 

263 if not clientobservationrequest.observation.cancelled: 

264 clientobservationrequest.observation.cancel() 

265 

266 async def add_observation(self, request, serverobservation): 

267 """As ProxiedResource is intended to be just the proxy's interface 

268 toward the Context, accepting observations is handled here, where the 

269 observations handling can be defined by the subclasses.""" 

270 

271 try: 

272 clientobservationrequest = self._get_observation_for(request) 

273 except CanNotRedirect: 

274 pass # just don't accept the observation, the rest will be taken care of at rendering 

275 else: 

276 self._add_observation_user(clientobservationrequest, serverobservation) 

277 serverobservation.accept( 

278 functools.partial( 

279 self._remove_observation_user, 

280 clientobservationrequest, 

281 serverobservation, 

282 ) 

283 ) 

284 

285 async def render(self, request): 

286 # FIXME this is evaluated twice in the implementation (once here, but 

287 # unless it's an observation what matters is inside the super call), 

288 # maybe this needs to hook in differently than by subclassing and 

289 # calling super. 

290 self.log.info("render called") 

291 redirected_request = request.copy() 

292 

293 try: 

294 redirected_request = self.apply_redirection(redirected_request) 

295 if redirected_request is None: 

296 return await super().render(request) 

297 clientobservationrequest = self._peek_observation_for(redirected_request) 

298 except (KeyError, CanNotRedirect) as e: 

299 if not isinstance(e, CanNotRedirect) and request.opt.observe is not None: 

300 self.log.warning( 

301 "No matching observation found: request is %r (cache key %r), outgoing observations %r", 

302 redirected_request, 

303 self._cache_key(redirected_request), 

304 self._outgoing_observations, 

305 ) 

306 

307 return message.Message( 

308 code=numbers.codes.BAD_OPTION, 

309 payload="Observe option can not be proxied without active observation.".encode( 

310 "utf8" 

311 ), 

312 ) 

313 self.log.debug( 

314 "Request is not an observation or can't be proxied, passing it on to regular proxying mechanisms." 

315 ) 

316 return await super(ProxyWithPooledObservations, self).render(request) 

317 else: 

318 self.log.info( 

319 "Serving request using latest cached response of %r", 

320 clientobservationrequest, 

321 ) 

322 await clientobservationrequest.response 

323 cached_response = clientobservationrequest.__latest_response 

324 cached_response.mid = None 

325 cached_response.token = None 

326 cached_response.remote = None 

327 cached_response.mtype = None 

328 return cached_response 

329 

330 

331class ForwardProxy(Proxy): 

332 def apply_redirection(self, request): 

333 request = request.copy() 

334 if request.opt.proxy_uri is not None: 

335 raise NoUriSplitting 

336 if request.opt.proxy_scheme is None: 

337 return super().apply_redirection(request) 

338 if request.opt.uri_host is None: 

339 raise IncompleteProxyUri 

340 

341 raise_unless_safe( 

342 request, 

343 ( 

344 numbers.OptionNumber.PROXY_SCHEME, 

345 numbers.OptionNumber.URI_HOST, 

346 numbers.OptionNumber.URI_PORT, 

347 ), 

348 ) 

349 

350 request.remote = message.UndecidedRemote( 

351 request.opt.proxy_scheme, 

352 util.hostportjoin(request.opt.uri_host, request.opt.uri_port), 

353 ) 

354 request.opt.proxy_scheme = None 

355 request.opt.uri_port = None 

356 forward_host = request.opt.uri_host 

357 try: 

358 # I'd prefer to not do if-by-try, but the ipaddress doesn't seem to 

359 # offer any other choice 

360 ipaddress.ip_address(request.opt.uri_host) 

361 

362 warnings.warn( 

363 "URI-Host looks like IPv6 but has no square " 

364 "brackets. This is deprecated, see " 

365 "https://github.com/chrysn/aiocoap/issues/216", 

366 DeprecationWarning, 

367 ) 

368 except ValueError: 

369 pass 

370 else: 

371 request.opt.uri_host = None 

372 if forward_host.startswith("["): 

373 # IPv6 or future literals are not recognized by ipaddress which 

374 # does not look at host-encoded form 

375 request.opt.uri_host = None 

376 

377 # Maybe the URI-Host matches a known forwarding -- in that case, catch that. 

378 redirected = super(ForwardProxy, self).apply_redirection(request) 

379 if redirected is not None: 

380 redirected.direction = message.Direction.OUTGOING 

381 return redirected 

382 

383 return request 

384 

385 

386class ForwardProxyWithPooledObservations(ForwardProxy, ProxyWithPooledObservations): 

387 pass 

388 

389 

390class ReverseProxy(Proxy): 

391 def __init__(self, *args, **kwargs): 

392 import warnings 

393 

394 warnings.warn( 

395 "ReverseProxy has become moot due to proxy operation " 

396 "changes, just instantiate Proxy and set the appropriate " 

397 "redirectors", 

398 DeprecationWarning, 

399 stacklevel=1, 

400 ) 

401 super().__init__(*args, **kwargs) 

402 

403 

404class ReverseProxyWithPooledObservations(ReverseProxy, ProxyWithPooledObservations): 

405 pass 

406 

407 

408class Redirector: 

409 def apply_redirection(self, request): 

410 return None 

411 

412 

413class NameBasedVirtualHost(Redirector): 

414 def __init__(self, match_name, target, rewrite_uri_host=False, use_as_proxy=False): 

415 self.match_name = match_name 

416 self.target = target 

417 self.rewrite_uri_host = rewrite_uri_host 

418 self.use_as_proxy = use_as_proxy 

419 

420 def apply_redirection(self, request): 

421 raise_unless_safe(request, ()) 

422 

423 if self._matches(request.opt.uri_host): 

424 if self.use_as_proxy: 

425 request.opt.proxy_scheme = request.remote.scheme 

426 if self.rewrite_uri_host: 

427 request.opt.uri_host, _ = util.hostportsplit(self.target) 

428 request.unresolved_remote = self.target 

429 return request 

430 

431 def _matches(self, hostname): 

432 return hostname == self.match_name 

433 

434 

435class SubdomainVirtualHost(NameBasedVirtualHost): 

436 def __init__(self, *args, **kwargs): 

437 super().__init__(*args, **kwargs) 

438 if self.rewrite_uri_host: 

439 raise TypeError( 

440 "rewrite_uri_host makes no sense with subdomain virtual hosting" 

441 ) 

442 

443 def _matches(self, hostname): 

444 if hostname is None: 

445 return False 

446 return hostname.endswith("." + self.match_name) 

447 

448 

449class UnconditionalRedirector(Redirector): 

450 def __init__(self, target, use_as_proxy=False): 

451 self.target = target 

452 self.use_as_proxy = use_as_proxy 

453 

454 def apply_redirection(self, request): 

455 raise_unless_safe(request, ()) 

456 

457 if self.use_as_proxy: 

458 request.opt.proxy_scheme = request.remote.scheme 

459 request.unresolved_remote = self.target 

460 return request 

461 

462 

463class SubresourceVirtualHost(Redirector): 

464 def __init__(self, path, target): 

465 self.path = tuple(path) 

466 self.target = target 

467 

468 def apply_redirection(self, request): 

469 raise_unless_safe(request, ()) 

470 

471 if self.path == request.opt.uri_path[: len(self.path)]: 

472 request.opt.uri_path = request.opt.uri_path[len(self.path) :] 

473 request.unresolved_remote = self.target 

474 return request