Coverage for aiocoap / proxy / server.py: 58%
248 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""Basic implementation of CoAP-CoAP proxying
7This is work in progress and not yet part of the API."""
9import asyncio
10import functools
11import ipaddress
12import logging
13import warnings
15from .. import numbers, interfaces, message, error, util, resource
16from ..numbers import codes
17from ..blockwise import Block1Spool, Block2Cache
18from ..pipe import Pipe
19from ..util import DeprecationWarning
22class CanNotRedirect(error.ConstructionRenderableError):
23 message = "Proxy redirection failed"
26class NoUriSplitting(CanNotRedirect):
27 code = codes.NOT_IMPLEMENTED
28 message = "URI splitting not implemented, please use Proxy-Scheme."
31class IncompleteProxyUri(CanNotRedirect):
32 code = codes.BAD_REQUEST
33 message = "Proxying requires Proxy-Scheme and Uri-Host"
36class NotAForwardProxy(CanNotRedirect):
37 code = codes.PROXYING_NOT_SUPPORTED
38 message = "This is a reverse proxy, not a forward one."
41class NoSuchHostname(CanNotRedirect):
42 code = codes.NOT_FOUND
43 message = ""
46class CanNotRedirectBecauseOfUnsafeOptions(CanNotRedirect):
47 code = codes.BAD_OPTION
49 def __init__(self, options):
50 self.message = "Unsafe options in request: %s" % (
51 ", ".join(str(o.number) for o in options)
52 )
55def raise_unless_safe(request, known_options):
56 """Raise a BAD_OPTION CanNotRedirect unless all options in request are
57 safe to forward or known"""
59 known_options = set(known_options).union(
60 {
61 # it is expected that every proxy is aware of these options even though
62 # one of them often doesn't need touching
63 numbers.OptionNumber.URI_HOST,
64 numbers.OptionNumber.URI_PORT,
65 numbers.OptionNumber.URI_PATH,
66 numbers.OptionNumber.URI_QUERY,
67 # handled by the Context
68 numbers.OptionNumber.BLOCK1,
69 numbers.OptionNumber.BLOCK2,
70 # handled by the proxy resource
71 numbers.OptionNumber.OBSERVE,
72 }
73 )
75 unsafe_options = [
76 o
77 for o in request.opt.option_list()
78 if o.number.is_unsafe() and o.number not in known_options
79 ]
80 if unsafe_options:
81 raise CanNotRedirectBecauseOfUnsafeOptions(unsafe_options)
84class Proxy(interfaces.Resource):
85 # other than in special cases, we're trying to be transparent wrt blockwise transfers
86 interpret_block_options = False
88 def __init__(self, outgoing_context, logger=None):
89 super().__init__()
90 # Provide variables for render_to_pipe
91 # FIXME this is copied from aiocoap.resource's __init__ -- but on the
92 # long run proxying shouldn't rely on that anyway but implement
93 # render_to_pipe right on its own
94 self._block1 = Block1Spool()
95 self._block2 = Block2Cache()
97 self.outgoing_context = outgoing_context
98 self.log = logger or logging.getLogger("proxy")
100 self._redirectors = []
102 def add_redirector(self, redirector):
103 self._redirectors.append(redirector)
105 def apply_redirection(self, request):
106 for r in self._redirectors:
107 result = r.apply_redirection(request)
108 if result is not None:
109 return result
110 return None
112 async def needs_blockwise_assembly(self, request):
113 return self.interpret_block_options
115 async def render(self, request):
116 # FIXME i'd rather let the application do with the message whatever it
117 # wants. everything the responder needs of the request should be
118 # extracted beforehand.
119 request = request.copy(mid=None, token=None)
121 try:
122 redirected = self.apply_redirection(request)
123 except CanNotRedirect as e:
124 return e.to_message()
126 if redirected is None:
127 response = await super().render(request)
128 if response is None:
129 raise IncompleteProxyUri("No matching proxy rule")
130 return response
132 redirected.direction = message.Direction.OUTGOING
133 request = redirected
135 try:
136 response = await self.outgoing_context.request(
137 request, handle_blockwise=self.interpret_block_options
138 ).response
139 except error.TimeoutError:
140 return message.Message(code=numbers.codes.GATEWAY_TIMEOUT)
142 raise_unless_safe(response, ())
144 response.mtype = None
145 response.mid = None
146 response.remote = None
147 response.direction = message.Direction.OUTGOING
148 response.token = None
150 return response
152 # Not inheriting from them because we do *not* want the .render() in the
153 # resolution tree (it can't deal with None requests, which are used among
154 # proxy implementations)
155 async def render_to_pipe(self, pipe: Pipe) -> None:
156 await resource.Resource.render_to_pipe(self, pipe) # type: ignore
159class ProxyWithPooledObservations(Proxy, interfaces.ObservableResource):
160 def __init__(self, outgoing_context, logger=None):
161 super(ProxyWithPooledObservations, self).__init__(outgoing_context, logger)
163 self._outgoing_observations = {}
165 @staticmethod
166 def _cache_key(request):
167 return request.get_cache_key([numbers.optionnumbers.OptionNumber.OBSERVE])
169 def _peek_observation_for(self, request):
170 """Return the augmented request (see _get_obervation_for) towards a
171 resource, or raise KeyError"""
172 cachekey = self._cache_key(request)
174 return self._outgoing_observations[cachekey]
176 def _get_observation_for(self, request):
177 """Return an existing augmented request towards a resource or create one.
179 An augmented request is an observation request that has some additional
180 properties (__users, __cachekey, __latest_response), which are used in
181 ProxyWithPooledObservations to immediately serve responses from
182 observed resources, and to tear the observations down again."""
184 # see ProxiedResource.render
185 request = request.copy(mid=None, remote=None, token=None)
186 request = self.apply_redirection(request)
188 cachekey = self._cache_key(request)
190 try:
191 obs = self._outgoing_observations[cachekey]
192 except KeyError:
193 obs = self._outgoing_observations[cachekey] = self.outgoing_context.request(
194 request
195 )
196 obs.__users = set()
197 obs.__cachekey = cachekey
198 obs.__latest_response = None # this becomes a cached response right after the .response comes in (so only use this after waiting for it), and gets updated when new responses arrive.
200 def when_first_request_done(result, obs=obs):
201 obs.__latest_response = result.result()
203 obs.response.add_done_callback(when_first_request_done)
205 def cb(incoming_message, obs=obs):
206 self.log.info(
207 "Received incoming message %r, relaying it to %d clients",
208 incoming_message,
209 len(obs.__users),
210 )
211 obs.__latest_response = incoming_message
212 for observationserver in set(obs.__users):
213 observationserver.trigger(incoming_message.copy())
215 obs.observation.register_callback(cb)
217 def eb(exception, obs=obs):
218 if obs.__users:
219 code = numbers.codes.INTERNAL_SERVER_ERROR
220 payload = b""
221 if isinstance(exception, error.RenderableError):
222 code = exception.code
223 payload = exception.message.encode("ascii")
224 self.log.debug(
225 "Received error %r, which did not lead to unregistration of the clients. Actively deregistering them with %s %r.",
226 exception,
227 code,
228 payload,
229 )
230 for u in list(obs.__users):
231 u.trigger(message.Message(code=code, payload=payload))
232 if obs.__users:
233 self.log.error(
234 "Observations survived sending them an error message."
235 )
236 else:
237 self.log.debug(
238 "Received error %r, but that seems to have been passed on cleanly to the observers as they are gone by now.",
239 exception,
240 )
242 obs.observation.register_errback(eb)
244 return obs
246 def _add_observation_user(self, clientobservationrequest, serverobservation):
247 clientobservationrequest.__users.add(serverobservation)
249 def _remove_observation_user(self, clientobservationrequest, serverobservation):
250 clientobservationrequest.__users.remove(serverobservation)
251 # give the request that just cancelled time to be dealt with before
252 # dropping the __latest_response
253 asyncio.get_running_loop().call_soon(
254 self._consider_dropping, clientobservationrequest
255 )
257 def _consider_dropping(self, clientobservationrequest):
258 if not clientobservationrequest.__users:
259 self.log.debug(
260 "Last client of observation went away, deregistering with server."
261 )
262 self._outgoing_observations.pop(clientobservationrequest.__cachekey)
263 if not clientobservationrequest.observation.cancelled:
264 clientobservationrequest.observation.cancel()
266 async def add_observation(self, request, serverobservation):
267 """As ProxiedResource is intended to be just the proxy's interface
268 toward the Context, accepting observations is handled here, where the
269 observations handling can be defined by the subclasses."""
271 try:
272 clientobservationrequest = self._get_observation_for(request)
273 except CanNotRedirect:
274 pass # just don't accept the observation, the rest will be taken care of at rendering
275 else:
276 self._add_observation_user(clientobservationrequest, serverobservation)
277 serverobservation.accept(
278 functools.partial(
279 self._remove_observation_user,
280 clientobservationrequest,
281 serverobservation,
282 )
283 )
285 async def render(self, request):
286 # FIXME this is evaluated twice in the implementation (once here, but
287 # unless it's an observation what matters is inside the super call),
288 # maybe this needs to hook in differently than by subclassing and
289 # calling super.
290 self.log.info("render called")
291 redirected_request = request.copy()
293 try:
294 redirected_request = self.apply_redirection(redirected_request)
295 if redirected_request is None:
296 return await super().render(request)
297 clientobservationrequest = self._peek_observation_for(redirected_request)
298 except (KeyError, CanNotRedirect) as e:
299 if not isinstance(e, CanNotRedirect) and request.opt.observe is not None:
300 self.log.warning(
301 "No matching observation found: request is %r (cache key %r), outgoing observations %r",
302 redirected_request,
303 self._cache_key(redirected_request),
304 self._outgoing_observations,
305 )
307 return message.Message(
308 code=numbers.codes.BAD_OPTION,
309 payload="Observe option can not be proxied without active observation.".encode(
310 "utf8"
311 ),
312 )
313 self.log.debug(
314 "Request is not an observation or can't be proxied, passing it on to regular proxying mechanisms."
315 )
316 return await super(ProxyWithPooledObservations, self).render(request)
317 else:
318 self.log.info(
319 "Serving request using latest cached response of %r",
320 clientobservationrequest,
321 )
322 await clientobservationrequest.response
323 cached_response = clientobservationrequest.__latest_response
324 cached_response.mid = None
325 cached_response.token = None
326 cached_response.remote = None
327 cached_response.mtype = None
328 return cached_response
331class ForwardProxy(Proxy):
332 def apply_redirection(self, request):
333 request = request.copy()
334 if request.opt.proxy_uri is not None:
335 raise NoUriSplitting
336 if request.opt.proxy_scheme is None:
337 return super().apply_redirection(request)
338 if request.opt.uri_host is None:
339 raise IncompleteProxyUri
341 raise_unless_safe(
342 request,
343 (
344 numbers.OptionNumber.PROXY_SCHEME,
345 numbers.OptionNumber.URI_HOST,
346 numbers.OptionNumber.URI_PORT,
347 ),
348 )
350 request.remote = message.UndecidedRemote(
351 request.opt.proxy_scheme,
352 util.hostportjoin(request.opt.uri_host, request.opt.uri_port),
353 )
354 request.opt.proxy_scheme = None
355 request.opt.uri_port = None
356 forward_host = request.opt.uri_host
357 try:
358 # I'd prefer to not do if-by-try, but the ipaddress doesn't seem to
359 # offer any other choice
360 ipaddress.ip_address(request.opt.uri_host)
362 warnings.warn(
363 "URI-Host looks like IPv6 but has no square "
364 "brackets. This is deprecated, see "
365 "https://github.com/chrysn/aiocoap/issues/216",
366 DeprecationWarning,
367 )
368 except ValueError:
369 pass
370 else:
371 request.opt.uri_host = None
372 if forward_host.startswith("["):
373 # IPv6 or future literals are not recognized by ipaddress which
374 # does not look at host-encoded form
375 request.opt.uri_host = None
377 # Maybe the URI-Host matches a known forwarding -- in that case, catch that.
378 redirected = super(ForwardProxy, self).apply_redirection(request)
379 if redirected is not None:
380 redirected.direction = message.Direction.OUTGOING
381 return redirected
383 return request
386class ForwardProxyWithPooledObservations(ForwardProxy, ProxyWithPooledObservations):
387 pass
390class ReverseProxy(Proxy):
391 def __init__(self, *args, **kwargs):
392 import warnings
394 warnings.warn(
395 "ReverseProxy has become moot due to proxy operation "
396 "changes, just instantiate Proxy and set the appropriate "
397 "redirectors",
398 DeprecationWarning,
399 stacklevel=1,
400 )
401 super().__init__(*args, **kwargs)
404class ReverseProxyWithPooledObservations(ReverseProxy, ProxyWithPooledObservations):
405 pass
408class Redirector:
409 def apply_redirection(self, request):
410 return None
413class NameBasedVirtualHost(Redirector):
414 def __init__(self, match_name, target, rewrite_uri_host=False, use_as_proxy=False):
415 self.match_name = match_name
416 self.target = target
417 self.rewrite_uri_host = rewrite_uri_host
418 self.use_as_proxy = use_as_proxy
420 def apply_redirection(self, request):
421 raise_unless_safe(request, ())
423 if self._matches(request.opt.uri_host):
424 if self.use_as_proxy:
425 request.opt.proxy_scheme = request.remote.scheme
426 if self.rewrite_uri_host:
427 request.opt.uri_host, _ = util.hostportsplit(self.target)
428 request.unresolved_remote = self.target
429 return request
431 def _matches(self, hostname):
432 return hostname == self.match_name
435class SubdomainVirtualHost(NameBasedVirtualHost):
436 def __init__(self, *args, **kwargs):
437 super().__init__(*args, **kwargs)
438 if self.rewrite_uri_host:
439 raise TypeError(
440 "rewrite_uri_host makes no sense with subdomain virtual hosting"
441 )
443 def _matches(self, hostname):
444 if hostname is None:
445 return False
446 return hostname.endswith("." + self.match_name)
449class UnconditionalRedirector(Redirector):
450 def __init__(self, target, use_as_proxy=False):
451 self.target = target
452 self.use_as_proxy = use_as_proxy
454 def apply_redirection(self, request):
455 raise_unless_safe(request, ())
457 if self.use_as_proxy:
458 request.opt.proxy_scheme = request.remote.scheme
459 request.unresolved_remote = self.target
460 return request
463class SubresourceVirtualHost(Redirector):
464 def __init__(self, path, target):
465 self.path = tuple(path)
466 self.target = target
468 def apply_redirection(self, request):
469 raise_unless_safe(request, ())
471 if self.path == request.opt.uri_path[: len(self.path)]:
472 request.opt.uri_path = request.opt.uri_path[len(self.path) :]
473 request.unresolved_remote = self.target
474 return request