Coverage for aiocoap/util/asyncio/recvmsg.py: 73%
81 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5from .. import socknumbers
7from asyncio import BaseProtocol
8from asyncio.transports import BaseTransport
11class RecvmsgDatagramProtocol(BaseProtocol):
12 """Callback interface similar to asyncio.DatagramProtocol, but dealing with
13 recvmsg data."""
15 def datagram_msg_received(self, data, ancdata, flags, address):
16 """Called when some datagram is received."""
18 def datagram_errqueue_received(self, data, ancdata, flags, address):
19 """Called when some data is received from the error queue"""
21 def error_received(self, exc):
22 """Called when a send or receive operation raises an OSError."""
25def _set_result_unless_cancelled(fut, result):
26 """Helper setting the result only if the future was not cancelled."""
27 if fut.cancelled():
28 return
29 fut.set_result(result)
32class RecvmsgSelectorDatagramTransport(BaseTransport):
33 """A simple loop-independent transport that largely mimicks
34 DatagramTransport but interfaces a RecvmsgSelectorDatagramProtocol.
36 This does not implement any flow control, based on the assumption that it's
37 not needed, for CoAP has its own flow control mechanisms."""
39 max_size = 4096 # Buffer size passed to recvmsg() -- should suffice for a full MTU package and ample ancdata
41 def __init__(self, loop, sock, protocol, waiter):
42 super().__init__(extra={"socket": sock})
43 self.__sock = sock
44 # Persisted outside of sock because when GC breaks a reference cycle,
45 # it can happen that the sock gets closed before this; we have to hope
46 # that no new file gets opened and registered in the meantime.
47 self.__sock_fileno = sock.fileno()
48 self._loop = loop
49 self._protocol = protocol
51 loop.call_soon(protocol.connection_made, self)
52 # only start reading when connection_made() has been called
53 import weakref
55 # We could add error handling in here like this:
56 # ```
57 # self = s()
58 # if self is None or self.__sock is None:
59 # # The read event happened briefly before .close() was called,
60 # # but late enough that the caller of close did not yield to let
61 # # the event out; when remove_reader was then called, the
62 # # pending event was not removed, so it fires now that the
63 # # socket is already closed. (Depending on the GC's whims, self
64 # # may or may not have been GC'd, but if it wasn't yet, the
65 # # closed state is indicated by the lack of a __sock.
66 # #
67 # # Thus, silently (preferably with an ICMP error, but really
68 # # can't do that)...
69 # return
70 # ```
71 # That was done tentatively while debugging errors flying out of
72 # _read_ready, but it turned out that this was not the actual error
73 # source. Thus, I'm not adding the handler and assuming that close's
74 # remove_reader is not racing against callbacks, and thus that s() is
75 # always valid while the transport is around (and the weakref is really
76 # only used to break up the reference cycles to ensure the GC is not
77 # needed here).
78 def rr(s=weakref.ref(self)):
79 s()._read_ready()
81 loop.call_soon(loop.add_reader, self.__sock_fileno, rr)
82 loop.call_soon(_set_result_unless_cancelled, waiter, None)
84 def close(self):
85 if self.__sock is None:
86 return
88 if not self._loop.is_closed():
89 self._loop.call_soon(self._protocol.connection_lost, None)
91 self._loop.remove_reader(self.__sock_fileno)
92 self.__sock.close()
93 self.__sock = None
94 self._protocol = None
95 self._loop = None
97 def __del__(self):
98 if self.__sock is not None:
99 self.close()
101 def _read_ready(self):
102 if socknumbers.HAS_RECVERR:
103 try:
104 data, ancdata, flags, addr = self.__sock.recvmsg(
105 self.max_size, 1024, socknumbers.MSG_ERRQUEUE
106 )
107 except (BlockingIOError, InterruptedError):
108 pass
109 except OSError as exc:
110 if (
111 repr(exc)
112 == "OSError('received malformed or improperly truncated ancillary data',)"
113 ):
114 pass # workaround for https://bitbucket.org/pypy/pypy/issues/2649/recvmsg-with-empty-err-queue-raises-odd
115 else:
116 self._protocol.error_received(exc)
117 except Exception as exc:
118 self._fatal_error(exc, "Fatal read error on datagram transport")
119 else:
120 self._protocol.datagram_errqueue_received(data, ancdata, flags, addr)
122 # copied and modified from _SelectorDatagramTransport
123 try:
124 data, ancdata, flags, addr = self.__sock.recvmsg(
125 self.max_size, 1024
126 ) # TODO: find a way for the application to tell the trensport how much data is expected
127 except (BlockingIOError, InterruptedError):
128 pass
129 except OSError as exc:
130 self._protocol.error_received(exc)
131 except Exception as exc:
132 self._fatal_error(exc, "Fatal read error on datagram transport")
133 else:
134 self._protocol.datagram_msg_received(data, ancdata, flags, addr)
136 def sendmsg(self, data, ancdata, flags, address):
137 try:
138 self.__sock.sendmsg((data,), ancdata, flags, address)
139 return
140 except OSError as exc:
141 self._protocol.error_received(exc)
142 return
143 except Exception as exc:
144 self._fatal_error(exc, "Fatal write error on datagram transport")
145 return
148async def create_recvmsg_datagram_endpoint(loop, factory, sock):
149 """Create a datagram connection that uses recvmsg rather than recvfrom, and
150 a RecvmsgDatagramProtocol protocol type.
152 This is used like the create_datagram_endpoint method of an asyncio loop,
153 but implemented in a generic way using the loop's add_reader method; thus,
154 it's not a method of the loop but an independent function.
156 Due to the way it is used in aiocoap, socket is not an optional argument
157 here; it could be were this module ever split off into a standalone
158 package.
159 """
160 sock.setblocking(False)
162 protocol = factory()
163 waiter = loop.create_future()
164 transport = RecvmsgSelectorDatagramTransport(loop, sock, protocol, waiter)
166 try:
167 await waiter
168 # see https://github.com/PyCQA/pycodestyle/issues/703
169 except: # noqa: E722
170 transport.close()
171 raise
173 return transport, protocol