Coverage for aiocoap/util/asyncio/recvmsg.py: 73%

81 statements  

« prev     ^ index     » next       coverage.py v7.6.3, created at 2024-10-15 22:10 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5from .. import socknumbers 

6 

7from asyncio import BaseProtocol 

8from asyncio.transports import BaseTransport 

9 

10 

11class RecvmsgDatagramProtocol(BaseProtocol): 

12 """Callback interface similar to asyncio.DatagramProtocol, but dealing with 

13 recvmsg data.""" 

14 

15 def datagram_msg_received(self, data, ancdata, flags, address): 

16 """Called when some datagram is received.""" 

17 

18 def datagram_errqueue_received(self, data, ancdata, flags, address): 

19 """Called when some data is received from the error queue""" 

20 

21 def error_received(self, exc): 

22 """Called when a send or receive operation raises an OSError.""" 

23 

24 

25def _set_result_unless_cancelled(fut, result): 

26 """Helper setting the result only if the future was not cancelled.""" 

27 if fut.cancelled(): 

28 return 

29 fut.set_result(result) 

30 

31 

32class RecvmsgSelectorDatagramTransport(BaseTransport): 

33 """A simple loop-independent transport that largely mimicks 

34 DatagramTransport but interfaces a RecvmsgSelectorDatagramProtocol. 

35 

36 This does not implement any flow control, based on the assumption that it's 

37 not needed, for CoAP has its own flow control mechanisms.""" 

38 

39 max_size = 4096 # Buffer size passed to recvmsg() -- should suffice for a full MTU package and ample ancdata 

40 

41 def __init__(self, loop, sock, protocol, waiter): 

42 super().__init__(extra={"socket": sock}) 

43 self.__sock = sock 

44 # Persisted outside of sock because when GC breaks a reference cycle, 

45 # it can happen that the sock gets closed before this; we have to hope 

46 # that no new file gets opened and registered in the meantime. 

47 self.__sock_fileno = sock.fileno() 

48 self._loop = loop 

49 self._protocol = protocol 

50 

51 loop.call_soon(protocol.connection_made, self) 

52 # only start reading when connection_made() has been called 

53 import weakref 

54 

55 # We could add error handling in here like this: 

56 # ``` 

57 # self = s() 

58 # if self is None or self.__sock is None: 

59 # # The read event happened briefly before .close() was called, 

60 # # but late enough that the caller of close did not yield to let 

61 # # the event out; when remove_reader was then called, the 

62 # # pending event was not removed, so it fires now that the 

63 # # socket is already closed. (Depending on the GC's whims, self 

64 # # may or may not have been GC'd, but if it wasn't yet, the 

65 # # closed state is indicated by the lack of a __sock. 

66 # # 

67 # # Thus, silently (preferably with an ICMP error, but really 

68 # # can't do that)... 

69 # return 

70 # ``` 

71 # That was done tentatively while debugging errors flying out of 

72 # _read_ready, but it turned out that this was not the actual error 

73 # source. Thus, I'm not adding the handler and assuming that close's 

74 # remove_reader is not racing against callbacks, and thus that s() is 

75 # always valid while the transport is around (and the weakref is really 

76 # only used to break up the reference cycles to ensure the GC is not 

77 # needed here). 

78 def rr(s=weakref.ref(self)): 

79 s()._read_ready() 

80 

81 loop.call_soon(loop.add_reader, self.__sock_fileno, rr) 

82 loop.call_soon(_set_result_unless_cancelled, waiter, None) 

83 

84 def close(self): 

85 if self.__sock is None: 

86 return 

87 

88 if not self._loop.is_closed(): 

89 self._loop.call_soon(self._protocol.connection_lost, None) 

90 

91 self._loop.remove_reader(self.__sock_fileno) 

92 self.__sock.close() 

93 self.__sock = None 

94 self._protocol = None 

95 self._loop = None 

96 

97 def __del__(self): 

98 if self.__sock is not None: 

99 self.close() 

100 

101 def _read_ready(self): 

102 if socknumbers.HAS_RECVERR: 

103 try: 

104 data, ancdata, flags, addr = self.__sock.recvmsg( 

105 self.max_size, 1024, socknumbers.MSG_ERRQUEUE 

106 ) 

107 except (BlockingIOError, InterruptedError): 

108 pass 

109 except OSError as exc: 

110 if ( 

111 repr(exc) 

112 == "OSError('received malformed or improperly truncated ancillary data',)" 

113 ): 

114 pass # workaround for https://bitbucket.org/pypy/pypy/issues/2649/recvmsg-with-empty-err-queue-raises-odd 

115 else: 

116 self._protocol.error_received(exc) 

117 except Exception as exc: 

118 self._fatal_error(exc, "Fatal read error on datagram transport") 

119 else: 

120 self._protocol.datagram_errqueue_received(data, ancdata, flags, addr) 

121 

122 # copied and modified from _SelectorDatagramTransport 

123 try: 

124 data, ancdata, flags, addr = self.__sock.recvmsg( 

125 self.max_size, 1024 

126 ) # TODO: find a way for the application to tell the trensport how much data is expected 

127 except (BlockingIOError, InterruptedError): 

128 pass 

129 except OSError as exc: 

130 self._protocol.error_received(exc) 

131 except Exception as exc: 

132 self._fatal_error(exc, "Fatal read error on datagram transport") 

133 else: 

134 self._protocol.datagram_msg_received(data, ancdata, flags, addr) 

135 

136 def sendmsg(self, data, ancdata, flags, address): 

137 try: 

138 self.__sock.sendmsg((data,), ancdata, flags, address) 

139 return 

140 except OSError as exc: 

141 self._protocol.error_received(exc) 

142 return 

143 except Exception as exc: 

144 self._fatal_error(exc, "Fatal write error on datagram transport") 

145 return 

146 

147 

148async def create_recvmsg_datagram_endpoint(loop, factory, sock): 

149 """Create a datagram connection that uses recvmsg rather than recvfrom, and 

150 a RecvmsgDatagramProtocol protocol type. 

151 

152 This is used like the create_datagram_endpoint method of an asyncio loop, 

153 but implemented in a generic way using the loop's add_reader method; thus, 

154 it's not a method of the loop but an independent function. 

155 

156 Due to the way it is used in aiocoap, socket is not an optional argument 

157 here; it could be were this module ever split off into a standalone 

158 package. 

159 """ 

160 sock.setblocking(False) 

161 

162 protocol = factory() 

163 waiter = loop.create_future() 

164 transport = RecvmsgSelectorDatagramTransport(loop, sock, protocol, waiter) 

165 

166 try: 

167 await waiter 

168 # see https://github.com/PyCQA/pycodestyle/issues/703 

169 except: # noqa: E722 

170 transport.close() 

171 raise 

172 

173 return transport, protocol