Coverage for aiocoap/transports/simple6.py: 89%
96 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module implements a MessageInterface for UDP based on the asyncio
6DatagramProtocol.
8This is a simple version that works only for clients (by creating a dedicated
9unbound but connected socket for each communication partner) and probably not
10with multicast (it is assumed to be unsafe for multicast), which can be
11expected to work even on platforms where the :mod:`.udp6` module can not be
12made to work (Android, OSX, Windows for missing ``recvmsg`` and socket options,
13or any event loops that don't have an add_reader method).
15Note that the name of the module is a misnomer (and the module is likely to be
16renamed): Nothing in it is IPv6 specific; the socket is created using whichever
17address family the OS chooses based on the given host name.
19One small but noteworthy detail about this transport is that it does not
20distinguish between IP literals and host names. As a result, requests and
21responses from remotes will appear to arrive from a remote whose netloc is the
22requested name, not an IP literal.
24This transport is experimental, likely to change, and not fully tested yet
25(because the test suite is not yet ready to matrix-test the same tests with
26different transport implementations, and because it still fails in proxy
27blockwise tests).
29For one particular use case, this may be usable for servers in a sense: If (and
30only if) all incoming requests are only ever sent from clients that were
31previously addressed as servers by the running instance. (This is generally
32undesirable as it greatly limits the usefulness of the server, but is used in
33LwM2M setups). As such a setup makes demands on the peer that are not justified
34by the CoAP specification (in particular, that it send requests from a
35particular port), this should still only be used for cases where the udp6
36transport is unavailable due to platform limitations.
37"""
39import asyncio
40from collections import OrderedDict
41import socket
42from typing import Tuple
44from aiocoap import error
45from aiocoap import interfaces
46from ..numbers import COAP_PORT, constants
47from ..util import hostportjoin
48from .generic_udp import GenericMessageInterface
51class _Connection(asyncio.DatagramProtocol, interfaces.EndpointAddress):
52 def __init__(
53 self,
54 ready_callback,
55 message_interface: "GenericMessageInterface",
56 stored_sockaddr,
57 ):
58 self._ready_callback = ready_callback
59 self._message_interface = message_interface
61 # This gets stored in the _Connection because not all implementations
62 # of datagram transports will expose the get_extra_info('socket')
63 # (right now, all I knew do), or their backend might not be a connected
64 # socket (like in uvloop), so the information can't be just obtained
65 # from the transport, but is needed to implement .hostinfo
66 #
67 # If _Connections become used in other contexts (eg. tinydtls starts
68 # using them), it might be a good idea to move all this into a subclass
69 # and split it from the pure networking stuff.
70 self.hostinfo = hostportjoin(
71 stored_sockaddr[0],
72 None if stored_sockaddr[1] == COAP_PORT else stored_sockaddr[1],
73 )
75 self._stage = "initializing" #: Status property purely for debugging
77 def __repr__(self):
78 return "<%s at %#x on transport %s, %s>" % (
79 type(self).__name__,
80 id(self),
81 getattr(self, "_transport", "(none)"),
82 self._stage,
83 )
85 # address interface
87 is_multicast = False
89 is_multicast_locally = False
91 scheme = "coap"
93 # Unlike for other remotes, this is settable per instance.
94 maximum_block_size_exp = constants.MAX_REGULAR_BLOCK_SIZE_EXP
96 # statically initialized in init
97 hostinfo = None
98 uri_base = None
99 uri_base = property(lambda self: "coap://" + self.hostinfo)
101 @property
102 def hostinfo_local(self):
103 # This can only be done on a best-effort base here. Unlike the below
104 # hostinfo (see comments there), there is no easy way around this, so
105 # if there are still implementations out that don't do the extras,
106 # that's it and the calling site should reconsider whether they need
107 # something that can not be determined. (Some more effort could go into
108 # falling back to get_extra_info('socket').getsockname(), but that
109 # should really be fixed in the transport provider).
110 if not hasattr(self, "_transport"):
111 raise RuntimeError(
112 "Simple6 does not have defined local host info in current stage %s"
113 % self._stage
114 )
115 sockname = self._transport.get_extra_info("sockname")
116 if sockname is None:
117 raise RuntimeError(
118 "Simple6 can not determine local address from the underlying UDP implementation"
119 )
120 return hostportjoin(*sockname[:2])
122 uri_base_local = property(lambda self: "coap://" + self.hostinfo_local)
124 @property
125 def blockwise_key(self):
126 # Not pulling in hostinfo_local as that's unreliable anyway -- if we
127 # change identity and the (UDP sever) follows its requests across our
128 # changing port, that's probably fine.
129 return self.hostinfo
131 # fully disabled because some implementations of asyncio don't make the
132 # information available; going the easy route and storing it for all (see
133 # attribute population in __init__)
135 # # FIXME continued: probably this is, and the above is not (or should not be)
136 # @property
137 # def hostinfo(self):
138 # print("ACCESSING HOSTINFO")
139 # host, port = self._transport.get_extra_info('socket').getpeername()[:2]
140 # if port == COAP_PORT:
141 # port = None
142 # # FIXME this should use some of the _plainaddress mechanisms of the udp6 addresses
143 # return hostportjoin(host, port)
145 # datagram protocol interface
147 def connection_made(self, transport):
148 self._transport = transport
149 self._ready_callback()
150 self._stage = "active"
151 del self._ready_callback
153 def datagram_received(self, data, address):
154 self._message_interface._received_datagram(self, data)
156 def error_received(self, exception):
157 self._message_interface._received_exception(self, exception)
159 def connection_lost(self, exception):
160 if exception is None:
161 pass
162 else:
163 self._new_error_callback(self, exception)
165 # whatever it is _DatagramClientSocketpoolSimple6 expects
167 # ... because generic_udp expects it from _DatagramClientSocketpoolSimple6
168 def send(self, data):
169 self._transport.sendto(data, None)
171 async def shutdown(self):
172 self._stage = "shutting down"
173 self._transport.abort()
174 del self._message_interface
175 self._stage = "destroyed"
178class _DatagramClientSocketpoolSimple6:
179 """This class is used to explore what an Python/asyncio abstraction around
180 a hypothetical "UDP connections" mechanism could look like.
182 Assume there were a socket variety that had UDP messages (ie. unreliable,
183 unordered, boundary-preserving) but that can do an accept() like a TCP
184 listening socket can, and can create outgoing connection-ish sockets from
185 the listeing port.
187 That interface would be usable for all UDP-based CoAP transport
188 implementations; this particular implementation, due to limitations of
189 POSIX sockets (and the additional limitations imposed on it like not using
190 PKTINFO) provides the interface, but only implements the outgoing part, and
191 will not allow setting the outgoing port or interface."""
193 max_sockets = 64
195 # FIXME (new_message_callback, new_error_callback) should probably rather
196 # be one object with a defined interface; either that's the
197 # MessageInterfaceSimple6 and stored accessibly (so the Protocol can know
198 # which MessageInterface to talk to for sending), or we move the
199 # MessageInterface out completely and have that object be the Protocol,
200 # and the Protocol can even send new packages via the address
201 def __init__(self, loop, mi: "GenericMessageInterface"):
202 # using an OrderedDict to implement an LRU cache as it's suitable for that purpose according to its documentation
203 self._sockets: OrderedDict[Tuple[str, int], _Connection] = OrderedDict()
205 self._loop = loop
206 self._message_interface = mi
208 async def _maybe_purge_sockets(self):
209 while len(self._sockets) >= self.max_sockets: # more of an if
210 oldaddr, oldest = next(iter(self._sockets.items()))
211 await oldest.shutdown()
212 del self._sockets[oldaddr]
214 async def connect(self, sockaddr):
215 """Create a new socket with a given remote socket address
217 Note that the sockaddr does not need to be fully resolved or complete,
218 as it is not used for matching incoming packages; ('host.example.com',
219 5683) is perfectly OK (and will create a different outgoing socket that
220 ('hostalias.example.com', 5683) even if that has the same address, for
221 better or for worse).
223 For where the general underlying interface is concerned, it is not yet
224 fixed at all when this must return identical objects."""
226 protocol = self._sockets.get(sockaddr)
227 if protocol is not None:
228 self._sockets.move_to_end(sockaddr)
229 return protocol
231 await self._maybe_purge_sockets()
233 ready = asyncio.get_running_loop().create_future()
234 try:
235 transport, protocol = await self._loop.create_datagram_endpoint(
236 lambda: _Connection(
237 lambda: ready.set_result(None), self._message_interface, sockaddr
238 ),
239 remote_addr=sockaddr,
240 )
241 except socket.gaierror as e:
242 raise error.ResolutionError(
243 "No address information found for requests to %r" % (sockaddr,)
244 ) from e
245 await ready
247 # # Enable this to easily make every connection to localhost a new one
248 # # during testing
249 # import random
250 # sockaddr = sockaddr + (random.random(),)
252 # FIXME twice: 1., those never get removed yet (should timeout or
253 # remove themselves on error), and 2., this is racy against a shutdown right after a connect
254 self._sockets[sockaddr] = protocol
256 return protocol
258 async def shutdown(self):
259 # preventing the creation of new sockets early on, and generally
260 # breaking cycles
261 del self._message_interface
263 if self._sockets:
264 done, pending = await asyncio.wait(
265 [
266 asyncio.create_task(
267 s.shutdown(),
268 name="Socket shutdown of %r" % s,
269 )
270 for s in self._sockets.values()
271 ]
272 )
273 for item in done:
274 await item
275 del self._sockets
278class MessageInterfaceSimple6(GenericMessageInterface):
279 @classmethod
280 async def create_client_transport_endpoint(cls, ctx, log, loop):
281 self = cls(ctx, log, loop)
283 # Cyclic reference broken during shutdown
284 self._pool = _DatagramClientSocketpoolSimple6(self._loop, self)
285 return self
287 async def recognize_remote(self, remote):
288 return isinstance(remote, _Connection) and remote in self._pool._sockets