Coverage for aiocoap/proxy/server.py: 58%
241 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""Basic implementation of CoAP-CoAP proxying
7This is work in progress and not yet part of the API."""
9import asyncio
10import functools
11import ipaddress
12import logging
13import warnings
15from .. import numbers, interfaces, message, error, util, resource
16from ..numbers import codes
17from ..blockwise import Block1Spool, Block2Cache
18from ..pipe import Pipe
21class CanNotRedirect(error.ConstructionRenderableError):
22 message = "Proxy redirection failed"
25class NoUriSplitting(CanNotRedirect):
26 code = codes.NOT_IMPLEMENTED
27 message = "URI splitting not implemented, please use Proxy-Scheme."
30class IncompleteProxyUri(CanNotRedirect):
31 code = codes.BAD_REQUEST
32 message = "Proxying requires Proxy-Scheme and Uri-Host"
35class NotAForwardProxy(CanNotRedirect):
36 code = codes.PROXYING_NOT_SUPPORTED
37 message = "This is a reverse proxy, not a forward one."
40class NoSuchHostname(CanNotRedirect):
41 code = codes.NOT_FOUND
42 message = ""
45class CanNotRedirectBecauseOfUnsafeOptions(CanNotRedirect):
46 code = codes.BAD_OPTION
48 def __init__(self, options):
49 self.message = "Unsafe options in request: %s" % (
50 ", ".join(str(o.number) for o in options)
51 )
54def raise_unless_safe(request, known_options):
55 """Raise a BAD_OPTION CanNotRedirect unless all options in request are
56 safe to forward or known"""
58 known_options = set(known_options).union(
59 {
60 # it is expected that every proxy is aware of these options even though
61 # one of them often doesn't need touching
62 numbers.OptionNumber.URI_HOST,
63 numbers.OptionNumber.URI_PORT,
64 numbers.OptionNumber.URI_PATH,
65 numbers.OptionNumber.URI_QUERY,
66 # handled by the Context
67 numbers.OptionNumber.BLOCK1,
68 numbers.OptionNumber.BLOCK2,
69 # handled by the proxy resource
70 numbers.OptionNumber.OBSERVE,
71 }
72 )
74 unsafe_options = [
75 o
76 for o in request.opt.option_list()
77 if o.number.is_unsafe() and o.number not in known_options
78 ]
79 if unsafe_options:
80 raise CanNotRedirectBecauseOfUnsafeOptions(unsafe_options)
83class Proxy(interfaces.Resource):
84 # other than in special cases, we're trying to be transparent wrt blockwise transfers
85 interpret_block_options = False
87 def __init__(self, outgoing_context, logger=None):
88 super().__init__()
89 # Provide variables for render_to_pipe
90 # FIXME this is copied from aiocoap.resource's __init__ -- but on the
91 # long run proxying shouldn't rely on that anyway but implement
92 # render_to_pipe right on its own
93 self._block1 = Block1Spool()
94 self._block2 = Block2Cache()
96 self.outgoing_context = outgoing_context
97 self.log = logger or logging.getLogger("proxy")
99 self._redirectors = []
101 def add_redirector(self, redirector):
102 self._redirectors.append(redirector)
104 def apply_redirection(self, request):
105 for r in self._redirectors:
106 result = r.apply_redirection(request)
107 if result is not None:
108 return result
109 return None
111 async def needs_blockwise_assembly(self, request):
112 return self.interpret_block_options
114 async def render(self, request):
115 # FIXME i'd rather let the application do with the message whatever it
116 # wants. everything the responder needs of the request should be
117 # extracted beforehand.
118 request = request.copy(mid=None, token=None)
120 try:
121 request = self.apply_redirection(request)
122 except CanNotRedirect as e:
123 return e.to_message()
125 if request is None:
126 response = await super().render(request)
127 if response is None:
128 raise IncompleteProxyUri("No matching proxy rule")
129 return response
131 try:
132 response = await self.outgoing_context.request(
133 request, handle_blockwise=self.interpret_block_options
134 ).response
135 except error.TimeoutError:
136 return message.Message(code=numbers.codes.GATEWAY_TIMEOUT)
138 raise_unless_safe(response, ())
140 response.mtype = None
141 response.mid = None
142 response.remote = None
143 response.token = None
145 return response
147 # Not inheriting from them because we do *not* want the .render() in the
148 # resolution tree (it can't deal with None requests, which are used among
149 # proxy implementations)
150 async def render_to_pipe(self, pipe: Pipe) -> None:
151 await resource.Resource.render_to_pipe(self, pipe) # type: ignore
154class ProxyWithPooledObservations(Proxy, interfaces.ObservableResource):
155 def __init__(self, outgoing_context, logger=None):
156 super(ProxyWithPooledObservations, self).__init__(outgoing_context, logger)
158 self._outgoing_observations = {}
160 @staticmethod
161 def _cache_key(request):
162 return request.get_cache_key([numbers.optionnumbers.OptionNumber.OBSERVE])
164 def _peek_observation_for(self, request):
165 """Return the augmented request (see _get_obervation_for) towards a
166 resource, or raise KeyError"""
167 cachekey = self._cache_key(request)
169 return self._outgoing_observations[cachekey]
171 def _get_observation_for(self, request):
172 """Return an existing augmented request towards a resource or create one.
174 An augmented request is an observation request that has some additional
175 properties (__users, __cachekey, __latest_response), which are used in
176 ProxyWithPooledObservations to immediately serve responses from
177 observed resources, and to tear the observations down again."""
179 # see ProxiedResource.render
180 request = request.copy(mid=None, remote=None, token=None)
181 request = self.apply_redirection(request)
183 cachekey = self._cache_key(request)
185 try:
186 obs = self._outgoing_observations[cachekey]
187 except KeyError:
188 obs = self._outgoing_observations[cachekey] = self.outgoing_context.request(
189 request
190 )
191 obs.__users = set()
192 obs.__cachekey = cachekey
193 obs.__latest_response = None # this becomes a cached response right after the .response comes in (so only use this after waiting for it), and gets updated when new responses arrive.
195 def when_first_request_done(result, obs=obs):
196 obs.__latest_response = result.result()
198 obs.response.add_done_callback(when_first_request_done)
200 def cb(incoming_message, obs=obs):
201 self.log.info(
202 "Received incoming message %r, relaying it to %d clients",
203 incoming_message,
204 len(obs.__users),
205 )
206 obs.__latest_response = incoming_message
207 for observationserver in set(obs.__users):
208 observationserver.trigger(incoming_message.copy())
210 obs.observation.register_callback(cb)
212 def eb(exception, obs=obs):
213 if obs.__users:
214 code = numbers.codes.INTERNAL_SERVER_ERROR
215 payload = b""
216 if isinstance(exception, error.RenderableError):
217 code = exception.code
218 payload = exception.message.encode("ascii")
219 self.log.debug(
220 "Received error %r, which did not lead to unregistration of the clients. Actively deregistering them with %s %r.",
221 exception,
222 code,
223 payload,
224 )
225 for u in list(obs.__users):
226 u.trigger(message.Message(code=code, payload=payload))
227 if obs.__users:
228 self.log.error(
229 "Observations survived sending them an error message."
230 )
231 else:
232 self.log.debug(
233 "Received error %r, but that seems to have been passed on cleanly to the observers as they are gone by now.",
234 exception,
235 )
237 obs.observation.register_errback(eb)
239 return obs
241 def _add_observation_user(self, clientobservationrequest, serverobservation):
242 clientobservationrequest.__users.add(serverobservation)
244 def _remove_observation_user(self, clientobservationrequest, serverobservation):
245 clientobservationrequest.__users.remove(serverobservation)
246 # give the request that just cancelled time to be dealt with before
247 # dropping the __latest_response
248 asyncio.get_event_loop().call_soon(
249 self._consider_dropping, clientobservationrequest
250 )
252 def _consider_dropping(self, clientobservationrequest):
253 if not clientobservationrequest.__users:
254 self.log.debug(
255 "Last client of observation went away, deregistering with server."
256 )
257 self._outgoing_observations.pop(clientobservationrequest.__cachekey)
258 if not clientobservationrequest.observation.cancelled:
259 clientobservationrequest.observation.cancel()
261 async def add_observation(self, request, serverobservation):
262 """As ProxiedResource is intended to be just the proxy's interface
263 toward the Context, accepting observations is handled here, where the
264 observations handling can be defined by the subclasses."""
266 try:
267 clientobservationrequest = self._get_observation_for(request)
268 except CanNotRedirect:
269 pass # just don't accept the observation, the rest will be taken care of at rendering
270 else:
271 self._add_observation_user(clientobservationrequest, serverobservation)
272 serverobservation.accept(
273 functools.partial(
274 self._remove_observation_user,
275 clientobservationrequest,
276 serverobservation,
277 )
278 )
280 async def render(self, request):
281 # FIXME this is evaulated twice in the implementation (once here, but
282 # unless it's an observation what matters is inside the super call),
283 # maybe this needs to hook in differently than by subclassing and
284 # calling super.
285 self.log.info("render called")
286 redirected_request = request.copy()
288 try:
289 redirected_request = self.apply_redirection(redirected_request)
290 if redirected_request is None:
291 return await super().render(request)
292 clientobservationrequest = self._peek_observation_for(redirected_request)
293 except (KeyError, CanNotRedirect) as e:
294 if not isinstance(e, CanNotRedirect) and request.opt.observe is not None:
295 self.log.warning(
296 "No matching observation found: request is %r (cache key %r), outgoing observations %r",
297 redirected_request,
298 self._cache_key(redirected_request),
299 self._outgoing_observations,
300 )
302 return message.Message(
303 code=numbers.codes.BAD_OPTION,
304 payload="Observe option can not be proxied without active observation.".encode(
305 "utf8"
306 ),
307 )
308 self.log.debug(
309 "Request is not an observation or can't be proxied, passing it on to regular proxying mechanisms."
310 )
311 return await super(ProxyWithPooledObservations, self).render(request)
312 else:
313 self.log.info(
314 "Serving request using latest cached response of %r",
315 clientobservationrequest,
316 )
317 await clientobservationrequest.response
318 cached_response = clientobservationrequest.__latest_response
319 cached_response.mid = None
320 cached_response.token = None
321 cached_response.remote = None
322 cached_response.mtype = None
323 return cached_response
326class ForwardProxy(Proxy):
327 def apply_redirection(self, request):
328 request = request.copy()
329 if request.opt.proxy_uri is not None:
330 raise NoUriSplitting
331 if request.opt.proxy_scheme is None:
332 return super().apply_redirection(request)
333 if request.opt.uri_host is None:
334 raise IncompleteProxyUri
336 raise_unless_safe(
337 request,
338 (
339 numbers.OptionNumber.PROXY_SCHEME,
340 numbers.OptionNumber.URI_HOST,
341 numbers.OptionNumber.URI_PORT,
342 ),
343 )
345 request.remote = message.UndecidedRemote(
346 request.opt.proxy_scheme,
347 util.hostportjoin(request.opt.uri_host, request.opt.uri_port),
348 )
349 request.opt.proxy_scheme = None
350 request.opt.uri_port = None
351 forward_host = request.opt.uri_host
352 try:
353 # I'd prefer to not do if-by-try, but the ipaddress doesn't seem to
354 # offer any other choice
355 ipaddress.ip_address(request.opt.uri_host)
357 warnings.warn(
358 "URI-Host looks like IPv6 but has no square "
359 "brackets. This is deprecated, see "
360 "https://github.com/chrysn/aiocoap/issues/216",
361 DeprecationWarning,
362 )
363 except ValueError:
364 pass
365 else:
366 request.opt.uri_host = None
367 if forward_host.startswith("["):
368 # IPv6 or future literals are not recognized by ipaddress which
369 # does not look at host-encoded form
370 request.opt.uri_host = None
372 # Maybe the URI-Host matches a known forwarding -- in that case, catch that.
373 redirected = super(ForwardProxy, self).apply_redirection(request)
374 if redirected is not None:
375 return redirected
377 return request
380class ForwardProxyWithPooledObservations(ForwardProxy, ProxyWithPooledObservations):
381 pass
384class ReverseProxy(Proxy):
385 def __init__(self, *args, **kwargs):
386 import warnings
388 warnings.warn(
389 "ReverseProxy has become moot due to proxy operation "
390 "changes, just instanciate Proxy and set the appropriate "
391 "redirectors",
392 DeprecationWarning,
393 stacklevel=1,
394 )
395 super().__init__(*args, **kwargs)
398class ReverseProxyWithPooledObservations(ReverseProxy, ProxyWithPooledObservations):
399 pass
402class Redirector:
403 def apply_redirection(self, request):
404 return None
407class NameBasedVirtualHost(Redirector):
408 def __init__(self, match_name, target, rewrite_uri_host=False, use_as_proxy=False):
409 self.match_name = match_name
410 self.target = target
411 self.rewrite_uri_host = rewrite_uri_host
412 self.use_as_proxy = use_as_proxy
414 def apply_redirection(self, request):
415 raise_unless_safe(request, ())
417 if self._matches(request.opt.uri_host):
418 if self.use_as_proxy:
419 request.opt.proxy_scheme = request.remote.scheme
420 if self.rewrite_uri_host:
421 request.opt.uri_host, _ = util.hostportsplit(self.target)
422 request.unresolved_remote = self.target
423 return request
425 def _matches(self, hostname):
426 return hostname == self.match_name
429class SubdomainVirtualHost(NameBasedVirtualHost):
430 def __init__(self, *args, **kwargs):
431 super().__init__(*args, **kwargs)
432 if self.rewrite_uri_host:
433 raise TypeError(
434 "rewrite_uri_host makes no sense with subdomain virtual hosting"
435 )
437 def _matches(self, hostname):
438 return hostname.endswith("." + self.match_name)
441class UnconditionalRedirector(Redirector):
442 def __init__(self, target, use_as_proxy=False):
443 self.target = target
444 self.use_as_proxy = use_as_proxy
446 def apply_redirection(self, request):
447 raise_unless_safe(request, ())
449 if self.use_as_proxy:
450 request.opt.proxy_scheme = request.remote.scheme
451 request.unresolved_remote = self.target
452 return request
455class SubresourceVirtualHost(Redirector):
456 def __init__(self, path, target):
457 self.path = tuple(path)
458 self.target = target
460 def apply_redirection(self, request):
461 raise_unless_safe(request, ())
463 if self.path == request.opt.uri_path[: len(self.path)]:
464 request.opt.uri_path = request.opt.uri_path[len(self.path) :]
465 request.unresolved_remote = self.target
466 return request