Coverage for aiocoap / proxy / server.py: 58%
252 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-29 12:32 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""Basic implementation of CoAP-CoAP proxying
7This is work in progress and not yet part of the API."""
9import asyncio
10import functools
11import ipaddress
12import logging
13import warnings
15from .. import numbers, interfaces, message, error, util, resource
16from ..numbers import codes
17from ..blockwise import Block1Spool, Block2Cache
18from ..pipe import Pipe
19from ..util import DeprecationWarning
22class CanNotRedirect(error.ConstructionRenderableError):
23 message = "Proxy redirection failed"
26class NoUriSplitting(CanNotRedirect):
27 code = codes.NOT_IMPLEMENTED
28 message = "URI splitting not implemented, please use Proxy-Scheme."
31class IncompleteProxyUri(CanNotRedirect):
32 code = codes.BAD_REQUEST
33 message = "Proxying requires Proxy-Scheme and Uri-Host"
36class NotAForwardProxy(CanNotRedirect):
37 code = codes.PROXYING_NOT_SUPPORTED
38 message = "This is a reverse proxy, not a forward one."
41class NoSuchHostname(CanNotRedirect):
42 code = codes.NOT_FOUND
43 message = ""
46class CanNotRedirectBecauseOfUnsafeOptions(CanNotRedirect):
47 code = codes.BAD_OPTION
49 def __init__(self, options):
50 self.message = "Unsafe options in request: %s" % (
51 ", ".join(str(o.number) for o in options)
52 )
55def raise_unless_safe(request, known_options):
56 """Raise a BAD_OPTION CanNotRedirect unless all options in request are
57 safe to forward or known"""
59 known_options = set(known_options).union(
60 {
61 # it is expected that every proxy is aware of these options even though
62 # one of them often doesn't need touching
63 numbers.OptionNumber.URI_HOST,
64 numbers.OptionNumber.URI_PORT,
65 numbers.OptionNumber.URI_PATH,
66 numbers.OptionNumber.URI_QUERY,
67 # handled by the Context
68 numbers.OptionNumber.BLOCK1,
69 numbers.OptionNumber.BLOCK2,
70 # handled by the proxy resource
71 numbers.OptionNumber.OBSERVE,
72 }
73 )
75 unsafe_options = [
76 o
77 for o in request.opt.option_list()
78 if o.number.is_unsafe() and o.number not in known_options
79 ]
80 if unsafe_options:
81 raise CanNotRedirectBecauseOfUnsafeOptions(unsafe_options)
84class Proxy(interfaces.Resource):
85 # other than in special cases, we're trying to be transparent wrt blockwise transfers
86 interpret_block_options = False
88 def __init__(self, outgoing_context, logger=None):
89 super().__init__()
90 # Provide variables for render_to_pipe
91 # FIXME this is copied from aiocoap.resource's __init__ -- but on the
92 # long run proxying shouldn't rely on that anyway but implement
93 # render_to_pipe right on its own
94 self._block1 = Block1Spool()
95 self._block2 = Block2Cache()
97 self.outgoing_context = outgoing_context
98 self.log = logger or logging.getLogger("proxy")
100 self._redirectors = []
102 def add_redirector(self, redirector):
103 self._redirectors.append(redirector)
105 def apply_redirection(self, request):
106 for r in self._redirectors:
107 result = r.apply_redirection(request)
108 if result is not None:
109 return result
110 return None
112 async def needs_blockwise_assembly(self, request):
113 return self.interpret_block_options
115 async def render(self, request):
116 # FIXME i'd rather let the application do with the message whatever it
117 # wants. everything the responder needs of the request should be
118 # extracted beforehand.
119 request = request.copy(mid=None, token=None)
121 try:
122 redirected = self.apply_redirection(request)
123 except CanNotRedirect as e:
124 return e.to_message()
126 if redirected is None:
127 response = await super().render(request)
128 if response is None:
129 raise IncompleteProxyUri("No matching proxy rule")
130 return response
132 redirected.direction = message.Direction.OUTGOING
133 request = redirected
135 try:
136 response = await self.outgoing_context.request(
137 request, handle_blockwise=self.interpret_block_options
138 ).response
139 except error.TimeoutError:
140 return message.Message(code=numbers.codes.GATEWAY_TIMEOUT)
142 raise_unless_safe(response, ())
144 response.mtype = None
145 response.mid = None
146 response.remote = None
147 response.direction = message.Direction.OUTGOING
148 response.token = None
150 return response
152 # Not inheriting from them because we do *not* want the .render() in the
153 # resolution tree (it can't deal with None requests, which are used among
154 # proxy implementations)
155 async def render_to_pipe(self, pipe: Pipe) -> None:
156 # I'd rather not expand unconditionally, but if we don't do this here,
157 # we're too deep into render() etc to get out again. This is working on
158 # the assumption that all proxies are really at root.
159 #
160 # Workaround-For: https://github.com/chrysn/aiocoap/issues/414
161 try:
162 resource._expand_upa(pipe.request)
163 except error.BadOption:
164 pass
165 await resource.Resource.render_to_pipe(self, pipe) # type: ignore
168class ProxyWithPooledObservations(Proxy, interfaces.ObservableResource):
169 def __init__(self, outgoing_context, logger=None):
170 super(ProxyWithPooledObservations, self).__init__(outgoing_context, logger)
172 self._outgoing_observations = {}
174 @staticmethod
175 def _cache_key(request):
176 return request.get_cache_key([numbers.optionnumbers.OptionNumber.OBSERVE])
178 def _peek_observation_for(self, request):
179 """Return the augmented request (see _get_obervation_for) towards a
180 resource, or raise KeyError"""
181 cachekey = self._cache_key(request)
183 return self._outgoing_observations[cachekey]
185 def _get_observation_for(self, request):
186 """Return an existing augmented request towards a resource or create one.
188 An augmented request is an observation request that has some additional
189 properties (__users, __cachekey, __latest_response), which are used in
190 ProxyWithPooledObservations to immediately serve responses from
191 observed resources, and to tear the observations down again."""
193 # see ProxiedResource.render
194 request = request.copy(mid=None, remote=None, token=None)
195 request = self.apply_redirection(request)
197 cachekey = self._cache_key(request)
199 try:
200 obs = self._outgoing_observations[cachekey]
201 except KeyError:
202 obs = self._outgoing_observations[cachekey] = self.outgoing_context.request(
203 request
204 )
205 obs.__users = set()
206 obs.__cachekey = cachekey
207 obs.__latest_response = None # this becomes a cached response right after the .response comes in (so only use this after waiting for it), and gets updated when new responses arrive.
209 def when_first_request_done(result, obs=obs):
210 obs.__latest_response = result.result()
212 obs.response.add_done_callback(when_first_request_done)
214 def cb(incoming_message, obs=obs):
215 self.log.info(
216 "Received incoming message %r, relaying it to %d clients",
217 incoming_message,
218 len(obs.__users),
219 )
220 obs.__latest_response = incoming_message
221 for observationserver in set(obs.__users):
222 observationserver.trigger(incoming_message.copy())
224 obs.observation.register_callback(cb)
226 def eb(exception, obs=obs):
227 if obs.__users:
228 code = numbers.codes.INTERNAL_SERVER_ERROR
229 payload = b""
230 if isinstance(exception, error.RenderableError):
231 code = exception.code
232 payload = exception.message.encode("ascii")
233 self.log.debug(
234 "Received error %r, which did not lead to unregistration of the clients. Actively deregistering them with %s %r.",
235 exception,
236 code,
237 payload,
238 )
239 for u in list(obs.__users):
240 u.trigger(message.Message(code=code, payload=payload))
241 if obs.__users:
242 self.log.error(
243 "Observations survived sending them an error message."
244 )
245 else:
246 self.log.debug(
247 "Received error %r, but that seems to have been passed on cleanly to the observers as they are gone by now.",
248 exception,
249 )
251 obs.observation.register_errback(eb)
253 return obs
255 def _add_observation_user(self, clientobservationrequest, serverobservation):
256 clientobservationrequest.__users.add(serverobservation)
258 def _remove_observation_user(self, clientobservationrequest, serverobservation):
259 clientobservationrequest.__users.remove(serverobservation)
260 # give the request that just cancelled time to be dealt with before
261 # dropping the __latest_response
262 asyncio.get_running_loop().call_soon(
263 self._consider_dropping, clientobservationrequest
264 )
266 def _consider_dropping(self, clientobservationrequest):
267 if not clientobservationrequest.__users:
268 self.log.debug(
269 "Last client of observation went away, deregistering with server."
270 )
271 self._outgoing_observations.pop(clientobservationrequest.__cachekey)
272 if not clientobservationrequest.observation.cancelled:
273 clientobservationrequest.observation.cancel()
275 async def add_observation(self, request, serverobservation):
276 """As ProxiedResource is intended to be just the proxy's interface
277 toward the Context, accepting observations is handled here, where the
278 observations handling can be defined by the subclasses."""
280 try:
281 clientobservationrequest = self._get_observation_for(request)
282 except CanNotRedirect:
283 pass # just don't accept the observation, the rest will be taken care of at rendering
284 else:
285 self._add_observation_user(clientobservationrequest, serverobservation)
286 serverobservation.accept(
287 functools.partial(
288 self._remove_observation_user,
289 clientobservationrequest,
290 serverobservation,
291 )
292 )
294 async def render(self, request):
295 # FIXME this is evaluated twice in the implementation (once here, but
296 # unless it's an observation what matters is inside the super call),
297 # maybe this needs to hook in differently than by subclassing and
298 # calling super.
299 self.log.info("render called")
300 redirected_request = request.copy()
302 try:
303 redirected_request = self.apply_redirection(redirected_request)
304 if redirected_request is None:
305 return await super().render(request)
306 clientobservationrequest = self._peek_observation_for(redirected_request)
307 except (KeyError, CanNotRedirect) as e:
308 if not isinstance(e, CanNotRedirect) and request.opt.observe is not None:
309 self.log.warning(
310 "No matching observation found: request is %r (cache key %r), outgoing observations %r",
311 redirected_request,
312 self._cache_key(redirected_request),
313 self._outgoing_observations,
314 )
316 return message.Message(
317 code=numbers.codes.BAD_OPTION,
318 payload="Observe option can not be proxied without active observation.".encode(
319 "utf8"
320 ),
321 )
322 self.log.debug(
323 "Request is not an observation or can't be proxied, passing it on to regular proxying mechanisms."
324 )
325 return await super(ProxyWithPooledObservations, self).render(request)
326 else:
327 self.log.info(
328 "Serving request using latest cached response of %r",
329 clientobservationrequest,
330 )
331 await clientobservationrequest.response
332 cached_response = clientobservationrequest.__latest_response
333 cached_response.mid = None
334 cached_response.token = None
335 cached_response.remote = None
336 cached_response.mtype = None
337 return cached_response
340class ForwardProxy(Proxy):
341 def apply_redirection(self, request):
342 request = request.copy()
343 if request.opt.proxy_uri is not None:
344 raise NoUriSplitting
345 if request.opt.proxy_scheme is None:
346 return super().apply_redirection(request)
347 if request.opt.uri_host is None:
348 raise IncompleteProxyUri
350 raise_unless_safe(
351 request,
352 (
353 numbers.OptionNumber.PROXY_SCHEME,
354 numbers.OptionNumber.URI_HOST,
355 numbers.OptionNumber.URI_PORT,
356 ),
357 )
359 request.remote = message.UndecidedRemote(
360 request.opt.proxy_scheme,
361 util.hostportjoin(request.opt.uri_host, request.opt.uri_port),
362 )
363 request.opt.proxy_scheme = None
364 request.opt.uri_port = None
365 forward_host = request.opt.uri_host
366 try:
367 # I'd prefer to not do if-by-try, but the ipaddress doesn't seem to
368 # offer any other choice
369 ipaddress.ip_address(request.opt.uri_host)
371 warnings.warn(
372 "URI-Host looks like IPv6 but has no square "
373 "brackets. This is deprecated, see "
374 "https://github.com/chrysn/aiocoap/issues/216",
375 DeprecationWarning,
376 )
377 except ValueError:
378 pass
379 else:
380 request.opt.uri_host = None
381 if forward_host.startswith("["):
382 # IPv6 or future literals are not recognized by ipaddress which
383 # does not look at host-encoded form
384 request.opt.uri_host = None
386 # Maybe the URI-Host matches a known forwarding -- in that case, catch that.
387 redirected = super(ForwardProxy, self).apply_redirection(request)
388 if redirected is not None:
389 redirected.direction = message.Direction.OUTGOING
390 return redirected
392 return request
395class ForwardProxyWithPooledObservations(ForwardProxy, ProxyWithPooledObservations):
396 pass
399class ReverseProxy(Proxy):
400 def __init__(self, *args, **kwargs):
401 import warnings
403 warnings.warn(
404 "ReverseProxy has become moot due to proxy operation "
405 "changes, just instantiate Proxy and set the appropriate "
406 "redirectors",
407 DeprecationWarning,
408 stacklevel=1,
409 )
410 super().__init__(*args, **kwargs)
413class ReverseProxyWithPooledObservations(ReverseProxy, ProxyWithPooledObservations):
414 pass
417class Redirector:
418 def apply_redirection(self, request):
419 return None
422class NameBasedVirtualHost(Redirector):
423 def __init__(self, match_name, target, rewrite_uri_host=False, use_as_proxy=False):
424 self.match_name = match_name
425 self.target = target
426 self.rewrite_uri_host = rewrite_uri_host
427 self.use_as_proxy = use_as_proxy
429 def apply_redirection(self, request):
430 raise_unless_safe(request, ())
432 if self._matches(request.opt.uri_host):
433 if self.use_as_proxy:
434 request.opt.proxy_scheme = request.remote.scheme
435 if self.rewrite_uri_host:
436 request.opt.uri_host, _ = util.hostportsplit(self.target)
437 request.unresolved_remote = self.target
438 return request
440 def _matches(self, hostname):
441 return hostname == self.match_name
444class SubdomainVirtualHost(NameBasedVirtualHost):
445 def __init__(self, *args, **kwargs):
446 super().__init__(*args, **kwargs)
447 if self.rewrite_uri_host:
448 raise TypeError(
449 "rewrite_uri_host makes no sense with subdomain virtual hosting"
450 )
452 def _matches(self, hostname):
453 if hostname is None:
454 return False
455 return hostname.endswith("." + self.match_name)
458class UnconditionalRedirector(Redirector):
459 def __init__(self, target, use_as_proxy=False):
460 self.target = target
461 self.use_as_proxy = use_as_proxy
463 def apply_redirection(self, request):
464 raise_unless_safe(request, ())
466 if self.use_as_proxy:
467 request.opt.proxy_scheme = request.remote.scheme
468 request.unresolved_remote = self.target
469 return request
472class SubresourceVirtualHost(Redirector):
473 def __init__(self, path, target):
474 self.path = tuple(path)
475 self.target = target
477 def apply_redirection(self, request):
478 raise_unless_safe(request, ())
480 if self.path == request.opt.uri_path[: len(self.path)]:
481 request.opt.uri_path = request.opt.uri_path[len(self.path) :]
482 request.unresolved_remote = self.target
483 return request