Coverage for aiocoap/proxy/server.py: 58%
243 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-30 11:17 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-30 11:17 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""Basic implementation of CoAP-CoAP proxying
7This is work in progress and not yet part of the API."""
9import asyncio
10import functools
11import ipaddress
12import logging
13import warnings
15from .. import numbers, interfaces, message, error, util, resource
16from ..numbers import codes
17from ..blockwise import Block1Spool, Block2Cache
18from ..pipe import Pipe
21class CanNotRedirect(error.ConstructionRenderableError):
22 message = "Proxy redirection failed"
25class NoUriSplitting(CanNotRedirect):
26 code = codes.NOT_IMPLEMENTED
27 message = "URI splitting not implemented, please use Proxy-Scheme."
30class IncompleteProxyUri(CanNotRedirect):
31 code = codes.BAD_REQUEST
32 message = "Proxying requires Proxy-Scheme and Uri-Host"
35class NotAForwardProxy(CanNotRedirect):
36 code = codes.PROXYING_NOT_SUPPORTED
37 message = "This is a reverse proxy, not a forward one."
40class NoSuchHostname(CanNotRedirect):
41 code = codes.NOT_FOUND
42 message = ""
45class CanNotRedirectBecauseOfUnsafeOptions(CanNotRedirect):
46 code = codes.BAD_OPTION
48 def __init__(self, options):
49 self.message = "Unsafe options in request: %s" % (
50 ", ".join(str(o.number) for o in options)
51 )
54def raise_unless_safe(request, known_options):
55 """Raise a BAD_OPTION CanNotRedirect unless all options in request are
56 safe to forward or known"""
58 known_options = set(known_options).union(
59 {
60 # it is expected that every proxy is aware of these options even though
61 # one of them often doesn't need touching
62 numbers.OptionNumber.URI_HOST,
63 numbers.OptionNumber.URI_PORT,
64 numbers.OptionNumber.URI_PATH,
65 numbers.OptionNumber.URI_QUERY,
66 # handled by the Context
67 numbers.OptionNumber.BLOCK1,
68 numbers.OptionNumber.BLOCK2,
69 # handled by the proxy resource
70 numbers.OptionNumber.OBSERVE,
71 }
72 )
74 unsafe_options = [
75 o
76 for o in request.opt.option_list()
77 if o.number.is_unsafe() and o.number not in known_options
78 ]
79 if unsafe_options:
80 raise CanNotRedirectBecauseOfUnsafeOptions(unsafe_options)
83class Proxy(interfaces.Resource):
84 # other than in special cases, we're trying to be transparent wrt blockwise transfers
85 interpret_block_options = False
87 def __init__(self, outgoing_context, logger=None):
88 super().__init__()
89 # Provide variables for render_to_pipe
90 # FIXME this is copied from aiocoap.resource's __init__ -- but on the
91 # long run proxying shouldn't rely on that anyway but implement
92 # render_to_pipe right on its own
93 self._block1 = Block1Spool()
94 self._block2 = Block2Cache()
96 self.outgoing_context = outgoing_context
97 self.log = logger or logging.getLogger("proxy")
99 self._redirectors = []
101 def add_redirector(self, redirector):
102 self._redirectors.append(redirector)
104 def apply_redirection(self, request):
105 for r in self._redirectors:
106 result = r.apply_redirection(request)
107 if result is not None:
108 return result
109 return None
111 async def needs_blockwise_assembly(self, request):
112 return self.interpret_block_options
114 async def render(self, request):
115 # FIXME i'd rather let the application do with the message whatever it
116 # wants. everything the responder needs of the request should be
117 # extracted beforehand.
118 request = request.copy(mid=None, token=None)
119 request.direction = message.Direction.OUTGOING
121 try:
122 request = self.apply_redirection(request)
123 except CanNotRedirect as e:
124 return e.to_message()
126 if request is None:
127 response = await super().render(request)
128 if response is None:
129 raise IncompleteProxyUri("No matching proxy rule")
130 return response
132 try:
133 response = await self.outgoing_context.request(
134 request, handle_blockwise=self.interpret_block_options
135 ).response
136 except error.TimeoutError:
137 return message.Message(code=numbers.codes.GATEWAY_TIMEOUT)
139 raise_unless_safe(response, ())
141 response.mtype = None
142 response.mid = None
143 response.remote = None
144 response.direction = message.Direction.OUTGOING
145 response.token = None
147 return response
149 # Not inheriting from them because we do *not* want the .render() in the
150 # resolution tree (it can't deal with None requests, which are used among
151 # proxy implementations)
152 async def render_to_pipe(self, pipe: Pipe) -> None:
153 await resource.Resource.render_to_pipe(self, pipe) # type: ignore
156class ProxyWithPooledObservations(Proxy, interfaces.ObservableResource):
157 def __init__(self, outgoing_context, logger=None):
158 super(ProxyWithPooledObservations, self).__init__(outgoing_context, logger)
160 self._outgoing_observations = {}
162 @staticmethod
163 def _cache_key(request):
164 return request.get_cache_key([numbers.optionnumbers.OptionNumber.OBSERVE])
166 def _peek_observation_for(self, request):
167 """Return the augmented request (see _get_obervation_for) towards a
168 resource, or raise KeyError"""
169 cachekey = self._cache_key(request)
171 return self._outgoing_observations[cachekey]
173 def _get_observation_for(self, request):
174 """Return an existing augmented request towards a resource or create one.
176 An augmented request is an observation request that has some additional
177 properties (__users, __cachekey, __latest_response), which are used in
178 ProxyWithPooledObservations to immediately serve responses from
179 observed resources, and to tear the observations down again."""
181 # see ProxiedResource.render
182 request = request.copy(mid=None, remote=None, token=None)
183 request = self.apply_redirection(request)
185 cachekey = self._cache_key(request)
187 try:
188 obs = self._outgoing_observations[cachekey]
189 except KeyError:
190 obs = self._outgoing_observations[cachekey] = self.outgoing_context.request(
191 request
192 )
193 obs.__users = set()
194 obs.__cachekey = cachekey
195 obs.__latest_response = None # this becomes a cached response right after the .response comes in (so only use this after waiting for it), and gets updated when new responses arrive.
197 def when_first_request_done(result, obs=obs):
198 obs.__latest_response = result.result()
200 obs.response.add_done_callback(when_first_request_done)
202 def cb(incoming_message, obs=obs):
203 self.log.info(
204 "Received incoming message %r, relaying it to %d clients",
205 incoming_message,
206 len(obs.__users),
207 )
208 obs.__latest_response = incoming_message
209 for observationserver in set(obs.__users):
210 observationserver.trigger(incoming_message.copy())
212 obs.observation.register_callback(cb)
214 def eb(exception, obs=obs):
215 if obs.__users:
216 code = numbers.codes.INTERNAL_SERVER_ERROR
217 payload = b""
218 if isinstance(exception, error.RenderableError):
219 code = exception.code
220 payload = exception.message.encode("ascii")
221 self.log.debug(
222 "Received error %r, which did not lead to unregistration of the clients. Actively deregistering them with %s %r.",
223 exception,
224 code,
225 payload,
226 )
227 for u in list(obs.__users):
228 u.trigger(message.Message(code=code, payload=payload))
229 if obs.__users:
230 self.log.error(
231 "Observations survived sending them an error message."
232 )
233 else:
234 self.log.debug(
235 "Received error %r, but that seems to have been passed on cleanly to the observers as they are gone by now.",
236 exception,
237 )
239 obs.observation.register_errback(eb)
241 return obs
243 def _add_observation_user(self, clientobservationrequest, serverobservation):
244 clientobservationrequest.__users.add(serverobservation)
246 def _remove_observation_user(self, clientobservationrequest, serverobservation):
247 clientobservationrequest.__users.remove(serverobservation)
248 # give the request that just cancelled time to be dealt with before
249 # dropping the __latest_response
250 asyncio.get_event_loop().call_soon(
251 self._consider_dropping, clientobservationrequest
252 )
254 def _consider_dropping(self, clientobservationrequest):
255 if not clientobservationrequest.__users:
256 self.log.debug(
257 "Last client of observation went away, deregistering with server."
258 )
259 self._outgoing_observations.pop(clientobservationrequest.__cachekey)
260 if not clientobservationrequest.observation.cancelled:
261 clientobservationrequest.observation.cancel()
263 async def add_observation(self, request, serverobservation):
264 """As ProxiedResource is intended to be just the proxy's interface
265 toward the Context, accepting observations is handled here, where the
266 observations handling can be defined by the subclasses."""
268 try:
269 clientobservationrequest = self._get_observation_for(request)
270 except CanNotRedirect:
271 pass # just don't accept the observation, the rest will be taken care of at rendering
272 else:
273 self._add_observation_user(clientobservationrequest, serverobservation)
274 serverobservation.accept(
275 functools.partial(
276 self._remove_observation_user,
277 clientobservationrequest,
278 serverobservation,
279 )
280 )
282 async def render(self, request):
283 # FIXME this is evaulated twice in the implementation (once here, but
284 # unless it's an observation what matters is inside the super call),
285 # maybe this needs to hook in differently than by subclassing and
286 # calling super.
287 self.log.info("render called")
288 redirected_request = request.copy()
290 try:
291 redirected_request = self.apply_redirection(redirected_request)
292 if redirected_request is None:
293 return await super().render(request)
294 clientobservationrequest = self._peek_observation_for(redirected_request)
295 except (KeyError, CanNotRedirect) as e:
296 if not isinstance(e, CanNotRedirect) and request.opt.observe is not None:
297 self.log.warning(
298 "No matching observation found: request is %r (cache key %r), outgoing observations %r",
299 redirected_request,
300 self._cache_key(redirected_request),
301 self._outgoing_observations,
302 )
304 return message.Message(
305 code=numbers.codes.BAD_OPTION,
306 payload="Observe option can not be proxied without active observation.".encode(
307 "utf8"
308 ),
309 )
310 self.log.debug(
311 "Request is not an observation or can't be proxied, passing it on to regular proxying mechanisms."
312 )
313 return await super(ProxyWithPooledObservations, self).render(request)
314 else:
315 self.log.info(
316 "Serving request using latest cached response of %r",
317 clientobservationrequest,
318 )
319 await clientobservationrequest.response
320 cached_response = clientobservationrequest.__latest_response
321 cached_response.mid = None
322 cached_response.token = None
323 cached_response.remote = None
324 cached_response.mtype = None
325 return cached_response
328class ForwardProxy(Proxy):
329 def apply_redirection(self, request):
330 request = request.copy()
331 if request.opt.proxy_uri is not None:
332 raise NoUriSplitting
333 if request.opt.proxy_scheme is None:
334 return super().apply_redirection(request)
335 if request.opt.uri_host is None:
336 raise IncompleteProxyUri
338 raise_unless_safe(
339 request,
340 (
341 numbers.OptionNumber.PROXY_SCHEME,
342 numbers.OptionNumber.URI_HOST,
343 numbers.OptionNumber.URI_PORT,
344 ),
345 )
347 request.remote = message.UndecidedRemote(
348 request.opt.proxy_scheme,
349 util.hostportjoin(request.opt.uri_host, request.opt.uri_port),
350 )
351 request.opt.proxy_scheme = None
352 request.opt.uri_port = None
353 forward_host = request.opt.uri_host
354 try:
355 # I'd prefer to not do if-by-try, but the ipaddress doesn't seem to
356 # offer any other choice
357 ipaddress.ip_address(request.opt.uri_host)
359 warnings.warn(
360 "URI-Host looks like IPv6 but has no square "
361 "brackets. This is deprecated, see "
362 "https://github.com/chrysn/aiocoap/issues/216",
363 DeprecationWarning,
364 )
365 except ValueError:
366 pass
367 else:
368 request.opt.uri_host = None
369 if forward_host.startswith("["):
370 # IPv6 or future literals are not recognized by ipaddress which
371 # does not look at host-encoded form
372 request.opt.uri_host = None
374 # Maybe the URI-Host matches a known forwarding -- in that case, catch that.
375 redirected = super(ForwardProxy, self).apply_redirection(request)
376 if redirected is not None:
377 return redirected
379 return request
382class ForwardProxyWithPooledObservations(ForwardProxy, ProxyWithPooledObservations):
383 pass
386class ReverseProxy(Proxy):
387 def __init__(self, *args, **kwargs):
388 import warnings
390 warnings.warn(
391 "ReverseProxy has become moot due to proxy operation "
392 "changes, just instanciate Proxy and set the appropriate "
393 "redirectors",
394 DeprecationWarning,
395 stacklevel=1,
396 )
397 super().__init__(*args, **kwargs)
400class ReverseProxyWithPooledObservations(ReverseProxy, ProxyWithPooledObservations):
401 pass
404class Redirector:
405 def apply_redirection(self, request):
406 return None
409class NameBasedVirtualHost(Redirector):
410 def __init__(self, match_name, target, rewrite_uri_host=False, use_as_proxy=False):
411 self.match_name = match_name
412 self.target = target
413 self.rewrite_uri_host = rewrite_uri_host
414 self.use_as_proxy = use_as_proxy
416 def apply_redirection(self, request):
417 raise_unless_safe(request, ())
419 if self._matches(request.opt.uri_host):
420 if self.use_as_proxy:
421 request.opt.proxy_scheme = request.remote.scheme
422 if self.rewrite_uri_host:
423 request.opt.uri_host, _ = util.hostportsplit(self.target)
424 request.unresolved_remote = self.target
425 return request
427 def _matches(self, hostname):
428 return hostname == self.match_name
431class SubdomainVirtualHost(NameBasedVirtualHost):
432 def __init__(self, *args, **kwargs):
433 super().__init__(*args, **kwargs)
434 if self.rewrite_uri_host:
435 raise TypeError(
436 "rewrite_uri_host makes no sense with subdomain virtual hosting"
437 )
439 def _matches(self, hostname):
440 return hostname.endswith("." + self.match_name)
443class UnconditionalRedirector(Redirector):
444 def __init__(self, target, use_as_proxy=False):
445 self.target = target
446 self.use_as_proxy = use_as_proxy
448 def apply_redirection(self, request):
449 raise_unless_safe(request, ())
451 if self.use_as_proxy:
452 request.opt.proxy_scheme = request.remote.scheme
453 request.unresolved_remote = self.target
454 return request
457class SubresourceVirtualHost(Redirector):
458 def __init__(self, path, target):
459 self.path = tuple(path)
460 self.target = target
462 def apply_redirection(self, request):
463 raise_unless_safe(request, ())
465 if self.path == request.opt.uri_path[: len(self.path)]:
466 request.opt.uri_path = request.opt.uri_path[len(self.path) :]
467 request.unresolved_remote = self.target
468 return request