Coverage for aiocoap/proxy/client.py: 86%

21 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-28 12:34 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5import warnings 

6 

7from ..message import UndecidedRemote 

8from .. import interfaces 

9 

10from ..util import hostportsplit 

11 

12 

13class ProxyForwarder(interfaces.RequestProvider): 

14 """Object that behaves like a Context but only provides the request 

15 function and forwards all messages to a proxy. 

16 

17 This is not a proxy itself, it is just the interface for an external 

18 one.""" 

19 

20 def __init__(self, proxy_address, context): 

21 if "://" not in proxy_address: 

22 warnings.warn( 

23 "Proxy addresses without scheme are deprecated, " 

24 "please specify like `coap://host`, `coap+tcp://ip:port` " 

25 "etc.", 

26 DeprecationWarning, 

27 ) 

28 proxy_address = "coap://" + proxy_address 

29 

30 self.proxy_address = UndecidedRemote.from_pathless_uri(proxy_address) 

31 self.context = context 

32 

33 proxy = property(lambda self: self._proxy) 

34 

35 def request(self, message, **kwargs): 

36 if not isinstance(message.remote, UndecidedRemote): 

37 raise ValueError( 

38 "Message already has a configured " 

39 "remote, set .opt.uri_{host,port} instead of remote" 

40 ) 

41 host, port = hostportsplit(message.remote.hostinfo) 

42 message.opt.uri_port = port 

43 message.opt.uri_host = host 

44 message.opt.proxy_scheme = self.proxy_address.scheme 

45 message.remote = self.proxy_address 

46 

47 return self.context.request(message)