Coverage for aiocoap/transports/generic_udp.py: 84%

37 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-28 12:34 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5from aiocoap import interfaces, error, util 

6from aiocoap import COAP_PORT, Message 

7 

8 

9class GenericMessageInterface(interfaces.MessageInterface): 

10 """GenericMessageInterface is not a standalone implementation of a 

11 message inteface. It does implement everything between the MessageInterface 

12 and a not yet fully specified interface of "bound UDP sockets". 

13 

14 It delegates sending through the address objects (which persist through 

15 some time, given this is some kind of bound-socket scenario). 

16 

17 The user must: 

18 * set up a ._pool after construction with a shutdown and a connect method 

19 * provide their addresses with a send(bytes) method 

20 * pass incoming data to the _received_datagram and _received_exception methods 

21 """ 

22 

23 def __init__(self, mman: interfaces.MessageManager, log, loop): 

24 self._mman = mman 

25 self._log = log 

26 self._loop = loop 

27 

28 # Callbacks to be hooked up by the user of the class; feed data on to the 

29 # message manager 

30 

31 def _received_datagram(self, address, datagram): 

32 try: 

33 message = Message.decode(datagram, remote=address) 

34 except error.UnparsableMessage: 

35 self._log.warning("Ignoring unparsable message from %s", address) 

36 return 

37 

38 self._mman.dispatch_message(message) 

39 

40 def _received_exception(self, address, exception): 

41 self._mman.dispatch_error(exception, address) 

42 

43 # Implementations of MessageInterface 

44 

45 def send(self, message): 

46 if self._mman is None: 

47 self._log.info( 

48 "Not sending message %r: transport is already shutting down.", message 

49 ) 

50 else: 

51 message.remote.send(message.encode()) 

52 

53 async def shutdown(self): 

54 await self._pool.shutdown() 

55 self._mman = None 

56 

57 async def determine_remote(self, request): 

58 if request.requested_scheme not in ("coap", None): 

59 return None 

60 

61 if request.unresolved_remote is not None: 

62 host, port = util.hostportsplit(request.unresolved_remote) 

63 port = port or COAP_PORT 

64 elif request.opt.uri_host: 

65 host = request.opt.uri_host 

66 port = request.opt.uri_port or COAP_PORT 

67 else: 

68 raise ValueError( 

69 "No location found to send message to (neither in .opt.uri_host nor in .remote)" 

70 ) 

71 

72 result = await self._pool.connect((host, port)) 

73 if request.remote.maximum_block_size_exp < result.maximum_block_size_exp: 

74 result.maximum_block_size_exp = request.remote.maximum_block_size_exp 

75 return result