Coverage for aiocoap / util / asyncio / recvmsg.py: 74%
85 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5from .. import socknumbers
7from asyncio import BaseProtocol
8from asyncio.transports import BaseTransport
11class RecvmsgDatagramProtocol(BaseProtocol):
12 """Callback interface similar to asyncio.DatagramProtocol, but dealing with
13 recvmsg data."""
15 def datagram_msg_received(self, data, ancdata, flags, address):
16 """Called when some datagram is received."""
18 def datagram_errqueue_received(self, data, ancdata, flags, address):
19 """Called when some data is received from the error queue"""
21 def error_received(self, exc):
22 """Called when a send or receive operation raises an OSError."""
25def _set_result_unless_cancelled(fut, result):
26 """Helper setting the result only if the future was not cancelled."""
27 if fut.cancelled():
28 return
29 fut.set_result(result)
32class RecvmsgSelectorDatagramTransport(BaseTransport):
33 """A simple loop-independent transport that largely mimics
34 DatagramTransport but interfaces a RecvmsgSelectorDatagramProtocol.
36 This does not implement any flow control, based on the assumption that it's
37 not needed, for CoAP has its own flow control mechanisms."""
39 max_size = 4096 # Buffer size passed to recvmsg() -- should suffice for a full MTU package and ample ancdata
41 def __init__(self, loop, sock, protocol, waiter):
42 super().__init__(extra={"socket": sock})
43 self.__sock = sock
44 # Persisted outside of sock because when GC breaks a reference cycle,
45 # it can happen that the sock gets closed before this; we have to hope
46 # that no new file gets opened and registered in the meantime.
47 self.__sock_fileno = sock.fileno()
48 self._loop = loop
49 self._protocol = protocol
51 loop.call_soon(protocol.connection_made, self)
52 # only start reading when connection_made() has been called
53 import weakref
55 # We could add error handling in here like this:
56 # ```
57 # self = s()
58 # if self is None or self.__sock is None:
59 # # The read event happened briefly before .close() was called,
60 # # but late enough that the caller of close did not yield to let
61 # # the event out; when remove_reader was then called, the
62 # # pending event was not removed, so it fires now that the
63 # # socket is already closed. (Depending on the GC's whims, self
64 # # may or may not have been GC'd, but if it wasn't yet, the
65 # # closed state is indicated by the lack of a __sock.
66 # #
67 # # Thus, silently (preferably with an ICMP error, but really
68 # # can't do that)...
69 # return
70 # ```
71 # That was done tentatively while debugging errors flying out of
72 # _read_ready, but it turned out that this was not the actual error
73 # source. Thus, I'm not adding the handler and assuming that close's
74 # remove_reader is not racing against callbacks, and thus that s() is
75 # always valid while the transport is around (and the weakref is really
76 # only used to break up the reference cycles to ensure the GC is not
77 # needed here).
78 def rr(s=weakref.ref(self)):
79 s()._read_ready()
81 loop.call_soon(loop.add_reader, self.__sock_fileno, rr)
82 loop.call_soon(_set_result_unless_cancelled, waiter, None)
84 def get_extra_info(self, name, default=None):
85 if name == "socket":
86 return self.__sock
87 return super().get_extra_info(name, default)
89 def close(self):
90 if self.__sock is None:
91 return
93 if not self._loop.is_closed():
94 self._loop.call_soon(self._protocol.connection_lost, None)
96 self._loop.remove_reader(self.__sock_fileno)
97 self.__sock.close()
98 self.__sock = None
99 self._protocol = None
100 self._loop = None
102 def __del__(self):
103 if self.__sock is not None:
104 self.close()
106 def _read_ready(self):
107 if socknumbers.HAS_RECVERR:
108 try:
109 data, ancdata, flags, addr = self.__sock.recvmsg(
110 self.max_size, 1024, socknumbers.MSG_ERRQUEUE
111 )
112 except (BlockingIOError, InterruptedError):
113 pass
114 except OSError as exc:
115 if (
116 repr(exc)
117 == "OSError('received malformed or improperly truncated ancillary data',)"
118 ):
119 pass # workaround for https://bitbucket.org/pypy/pypy/issues/2649/recvmsg-with-empty-err-queue-raises-odd
120 else:
121 self._protocol.error_received(exc)
122 except Exception as exc:
123 self._fatal_error(exc, "Fatal read error on datagram transport")
124 else:
125 self._protocol.datagram_errqueue_received(data, ancdata, flags, addr)
127 # copied and modified from _SelectorDatagramTransport
128 try:
129 data, ancdata, flags, addr = self.__sock.recvmsg(
130 self.max_size, 1024
131 ) # TODO: find a way for the application to tell the trensport how much data is expected
132 except (BlockingIOError, InterruptedError):
133 pass
134 except OSError as exc:
135 self._protocol.error_received(exc)
136 except Exception as exc:
137 self._fatal_error(exc, "Fatal read error on datagram transport")
138 else:
139 self._protocol.datagram_msg_received(data, ancdata, flags, addr)
141 def sendmsg(self, data, ancdata, flags, address):
142 try:
143 self.__sock.sendmsg((data,), ancdata, flags, address)
144 return
145 except OSError as exc:
146 self._protocol.error_received(exc)
147 return
148 except Exception as exc:
149 self._fatal_error(exc, "Fatal write error on datagram transport")
150 return
153async def create_recvmsg_datagram_endpoint(loop, factory, sock):
154 """Create a datagram connection that uses recvmsg rather than recvfrom, and
155 a RecvmsgDatagramProtocol protocol type.
157 This is used like the create_datagram_endpoint method of an asyncio loop,
158 but implemented in a generic way using the loop's add_reader method; thus,
159 it's not a method of the loop but an independent function.
161 Due to the way it is used in aiocoap, socket is not an optional argument
162 here; it could be were this module ever split off into a standalone
163 package.
164 """
165 sock.setblocking(False)
167 protocol = factory()
168 waiter = loop.create_future()
169 transport = RecvmsgSelectorDatagramTransport(loop, sock, protocol, waiter)
171 try:
172 await waiter
173 # see https://github.com/PyCQA/pycodestyle/issues/703
174 except: # noqa: E722
175 transport.close()
176 raise
178 return transport, protocol