Coverage for aiocoap/transports/udp6.py: 75%
292 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""This module implements a MessageInterface for UDP based on a variation of
6the asyncio DatagramProtocol.
8This implementation strives to be correct and complete behavior while still
9only using a single socket; that is, to be usable for all kinds of multicast
10traffic, to support server and client behavior at the same time, and to work
11correctly even when multiple IPv6 and IPv4 (using V4MAPPED style addresses)
12interfaces are present, and any of the interfaces has multiple addresses.
14This requires using some standardized but not necessarily widely ported
15features: ``IPV6_RECVPKTINFO`` to determine
16incoming packages' destination addresses (was it multicast) and to return
17packages from the same address, ``IPV6_JOIN_GROUP`` for multicast
18membership management and ``recvmsg`` to obtain data configured with the above
19options. The need for ``AI_V4MAPPED`` and ``AI_ADDRCONFIG`` is not manifest
20in the code because the latter on its own is insufficient to enable seamless
21interoperability with IPv4+IPv6 servers on IPv4-only hosts; instead,
22short-lived sockets are crated to assess which addresses are routable. This
23should correctly deal with situations in which a client has an IPv6 ULA
24assigned but no route, no matter whether the server advertises global IPv6
25addresses or addresses inside that ULA. It can not deal with situations in
26which the host has a default IPv6 route, but that route is not actually usable.
28To the author's knowledge, there is no standardized mechanism for receiving
29ICMP errors in such a setup. On Linux, ``IPV6_RECVERR`` and ``MSG_ERRQUEUE``
30are used to receive ICMP errors from the socket; on other platforms, a warning
31is emitted that ICMP errors are ignored. Using a :mod:`.simple6` for clients is
32recommended for those when working as a client only.
34Exceeding for the above error handling, no attempts are made to fall back to a
35kind-of-correct or limited-functionality behavior if these options are
36unavailable, for the resulting code would be hard to maintain ("``ifdef``
37hell") or would cause odd bugs at users (eg. servers that stop working when an
38additional IPv6 address gets assigned). If the module does not work for you,
39and the options can not be added easily to your platform, consider using the
40:mod:`.simple6` module instead.
41"""
43import asyncio
44import contextvars
45import errno
46import os
47import socket
48import ipaddress
49import struct
50import weakref
51from collections import namedtuple
53from ..message import Message
54from ..numbers import constants
55from .. import defaults
56from .. import error
57from .. import interfaces
58from ..numbers import COAP_PORT
59from ..util.asyncio.recvmsg import (
60 RecvmsgDatagramProtocol,
61 create_recvmsg_datagram_endpoint,
62)
63from ..util.asyncio.getaddrinfo_addrconfig import (
64 getaddrinfo_routechecked as getaddrinfo,
65)
66from ..util import hostportjoin, hostportsplit
67from ..util import socknumbers
69"""The `struct in6_pktinfo` from RFC3542"""
70_in6_pktinfo = struct.Struct("16sI")
72_ipv6_unspecified = socket.inet_pton(socket.AF_INET6, "::")
73_ipv4_unspecified = socket.inet_pton(socket.AF_INET6, "::ffff:0.0.0.0")
76class InterfaceOnlyPktinfo(bytes):
77 """A thin wrapper over bytes that represent a pktinfo built just to select
78 an outgoing interface.
80 This must not be treated any different than a regular pktinfo, and is just
81 tagged for better debug output. (Ie. if this is replaced everywhere with
82 plain `bytes`, things must still work)."""
85class UDP6EndpointAddress(interfaces.EndpointAddress):
86 """Remote address type for :class:`MessageInterfaceUDP6`. Remote address is
87 stored in form of a socket address; local address can be roundtripped by
88 opaque pktinfo data.
90 For purposes of equality (and thus hashing), the local address is *not*
91 checked. Neither is the scopeid that is part of the socket address.
93 >>> interface = type("FakeMessageInterface", (), {})
94 >>> if1_name = socket.if_indextoname(1)
95 >>> local = UDP6EndpointAddress(socket.getaddrinfo('127.0.0.1', 5683, type=socket.SOCK_DGRAM, family=socket.AF_INET6, flags=socket.AI_V4MAPPED)[0][-1], interface)
96 >>> local.is_multicast
97 False
98 >>> local.hostinfo
99 '127.0.0.1'
100 >>> all_coap_link1 = UDP6EndpointAddress(socket.getaddrinfo('ff02:0:0:0:0:0:0:fd%1', 1234, type=socket.SOCK_DGRAM, family=socket.AF_INET6)[0][-1], interface)
101 >>> all_coap_link1.is_multicast
102 True
103 >>> all_coap_link1.hostinfo == '[ff02::fd%{}]:1234'.format(if1_name)
104 True
105 >>> all_coap_site = UDP6EndpointAddress(socket.getaddrinfo('ff05:0:0:0:0:0:0:fd', 1234, type=socket.SOCK_DGRAM, family=socket.AF_INET6)[0][-1], interface)
106 >>> all_coap_site.is_multicast
107 True
108 >>> all_coap_site.hostinfo
109 '[ff05::fd]:1234'
110 >>> all_coap4 = UDP6EndpointAddress(socket.getaddrinfo('224.0.1.187', 5683, type=socket.SOCK_DGRAM, family=socket.AF_INET6, flags=socket.AI_V4MAPPED)[0][-1], interface)
111 >>> all_coap4.is_multicast
112 True
113 """
115 def __init__(self, sockaddr, interface, *, pktinfo=None):
116 self.sockaddr = sockaddr
117 self.pktinfo = pktinfo
118 self._interface = weakref.ref(interface)
120 scheme = "coap"
122 interface = property(lambda self: self._interface())
124 # Unlike for other remotes, this is settable per instance.
125 maximum_block_size_exp = constants.MAX_REGULAR_BLOCK_SIZE_EXP
127 def __hash__(self):
128 return hash(self.sockaddr[:-1])
130 def __eq__(self, other):
131 return self.sockaddr[:-1] == other.sockaddr[:-1]
133 def __repr__(self):
134 return "<%s %s%s>" % (
135 type(self).__name__,
136 self.hostinfo,
137 " (locally %s)" % self._repr_pktinfo() if self.pktinfo is not None else "",
138 )
140 @staticmethod
141 def _strip_v4mapped(address):
142 """Turn anything that's a valid input to ipaddress.IPv6Address into a
143 user-friendly string that's either an IPv6 or an IPv4 address.
145 This also compresses (normalizes) the IPv6 address as a convenient side
146 effect."""
147 address = ipaddress.IPv6Address(address)
148 mapped = address.ipv4_mapped
149 if mapped is not None:
150 return str(mapped)
151 return str(address)
153 def _plainaddress(self):
154 """Return the IP adress part of the sockaddr in IPv4 notation if it is
155 mapped, otherwise the plain v6 address including the interface
156 identifier if set."""
158 if self.sockaddr[3] != 0:
159 try:
160 scopepart = "%" + socket.if_indextoname(self.sockaddr[3])
161 except Exception: # could be an OS error, could just be that there is no function of this name, as it is on Android
162 scopepart = "%" + str(self.sockaddr[3])
163 else:
164 scopepart = ""
165 if "%" in self.sockaddr[0]:
166 # Fix for Python 3.6 and earlier that reported the scope information
167 # in the IP literal (3.7 consistently expresses it in the tuple slot 3)
168 scopepart = ""
169 return self._strip_v4mapped(self.sockaddr[0]) + scopepart
171 def _repr_pktinfo(self):
172 """What repr(self.pktinfo) would be if that were not a plain untyped bytestring"""
173 addr, interface = _in6_pktinfo.unpack_from(self.pktinfo)
174 if interface == 0:
175 interface = ""
176 else:
177 try:
178 interface = "%" + socket.if_indextoname(interface)
179 except Exception as e:
180 interface = "%%%d(%s)" % (interface, e)
182 return "%s%s" % (self._strip_v4mapped(addr), interface)
184 def _plainaddress_local(self):
185 """Like _plainaddress, but on the address in the pktinfo. Unlike
186 _plainaddress, this does not contain the interface identifier."""
188 addr, interface = _in6_pktinfo.unpack_from(self.pktinfo)
190 return self._strip_v4mapped(addr)
192 @property
193 def netif(self):
194 """Textual interface identifier of the explicitly configured remote
195 interface, or the interface identifier reported in an incoming
196 link-local message. None if not set."""
197 index = self.sockaddr[3]
198 return socket.if_indextoname(index) if index else None
200 @property
201 def hostinfo(self):
202 port = self.sockaddr[1]
203 if port == COAP_PORT:
204 port = None
206 # plainaddress: don't assume other applications can deal with v4mapped addresses
207 return hostportjoin(self._plainaddress(), port)
209 @property
210 def hostinfo_local(self):
211 host = self._plainaddress_local()
212 port = self.interface._local_port()
213 if port == 0:
214 raise ValueError("Local port read before socket has bound itself")
215 if port == COAP_PORT:
216 port = None
217 return hostportjoin(host, port)
219 @property
220 def uri_base(self):
221 return "coap://" + self.hostinfo
223 @property
224 def uri_base_local(self):
225 return "coap://" + self.hostinfo_local
227 @property
228 def is_multicast(self):
229 return ipaddress.ip_address(self._plainaddress().split("%", 1)[0]).is_multicast
231 @property
232 def is_multicast_locally(self):
233 return ipaddress.ip_address(self._plainaddress_local()).is_multicast
235 def as_response_address(self):
236 if not self.is_multicast_locally:
237 return self
239 # Create a copy without pktinfo, as responses to messages received to
240 # multicast addresses can not have their request's destination address
241 # as source address
242 return type(self)(self.sockaddr, self.interface)
244 @property
245 def blockwise_key(self):
246 return (self.sockaddr, self.pktinfo)
249class SockExtendedErr(
250 namedtuple(
251 "_SockExtendedErr", "ee_errno ee_origin ee_type ee_code ee_pad ee_info ee_data"
252 )
253):
254 _struct = struct.Struct("IbbbbII")
256 @classmethod
257 def load(cls, data):
258 # unpack_from: recvmsg(2) says that more data may follow
259 return cls(*cls._struct.unpack_from(data))
262class MessageInterfaceUDP6(RecvmsgDatagramProtocol, interfaces.MessageInterface):
263 def __init__(self, ctx: interfaces.MessageManager, log, loop):
264 self._ctx = ctx
265 self.log = log
266 self.loop = loop
268 self._shutting_down = (
269 None #: Future created and used in the .shutdown() method.
270 )
272 self.ready = asyncio.get_running_loop().create_future() #: Future that gets fullfilled by connection_made (ie. don't send before this is done; handled by ``create_..._context``
274 # This is set while a send is underway to determine in the
275 # error_received call site whom we were actually sending something to.
276 # This is a workaround for the abysmal error handling unconnected
277 # sockets have :-/
278 #
279 # FIXME: Figure out whether aiocoap can at all support a context being
280 # used with multiple aiocoap contexts, and if not, raise an error early
281 # rather than just-in-case doing extra stuff here.
282 self._remote_being_sent_to = contextvars.ContextVar(
283 "_remote_being_sent_to", default=None
284 )
286 def _local_port(self):
287 # FIXME: either raise an error if this is 0, or send a message to self
288 # to force the OS to decide on a port. Right now, this reports wrong
289 # results while the first message has not been sent yet.
290 return self.transport.get_extra_info("socket").getsockname()[1]
292 @classmethod
293 async def _create_transport_endpoint(
294 cls, sock, ctx: interfaces.MessageManager, log, loop, multicast=[]
295 ):
296 try:
297 sock.setsockopt(socket.IPPROTO_IPV6, socknumbers.IPV6_RECVPKTINFO, 1)
298 except NameError:
299 raise RuntimeError(
300 "RFC3542 PKTINFO flags are unavailable, unable to create a udp6 transport."
301 )
302 if socknumbers.HAS_RECVERR:
303 sock.setsockopt(socket.IPPROTO_IPV6, socknumbers.IPV6_RECVERR, 1)
304 # i'm curious why this is required; didn't IPV6_V6ONLY=0 already make
305 # it clear that i don't care about the ip version as long as everything looks the same?
306 sock.setsockopt(socket.IPPROTO_IP, socknumbers.IP_RECVERR, 1)
307 else:
308 log.warning(
309 "Transport udp6 set up on platform without RECVERR capability. ICMP errors will be ignored."
310 )
312 for address_string, interface_string in sum(
313 map(
314 # Expand shortcut of "interface name means default CoAP all-nodes addresses"
315 lambda i: [(a, i) for a in constants.MCAST_ALL]
316 if isinstance(i, str)
317 else [i],
318 multicast,
319 ),
320 [],
321 ):
322 address = ipaddress.ip_address(address_string)
323 interface = socket.if_nametoindex(interface_string)
325 if isinstance(address, ipaddress.IPv4Address):
326 s = struct.pack(
327 "4s4si", address.packed, socket.inet_aton("0.0.0.0"), interface
328 )
329 try:
330 sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, s)
331 except OSError:
332 log.warning("Could not join IPv4 multicast group")
334 elif isinstance(address, ipaddress.IPv6Address):
335 s = struct.pack("16si", address.packed, interface)
336 try:
337 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, s)
338 except OSError:
339 log.warning("Could not join IPv6 multicast group")
341 else:
342 raise RuntimeError("Unknown address format")
344 transport, protocol = await create_recvmsg_datagram_endpoint(
345 loop, lambda: cls(ctx, log=log, loop=loop), sock=sock
346 )
348 await protocol.ready
350 return protocol
352 @classmethod
353 async def create_client_transport_endpoint(
354 cls, ctx: interfaces.MessageManager, log, loop
355 ):
356 sock = socket.socket(family=socket.AF_INET6, type=socket.SOCK_DGRAM)
357 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
359 return await cls._create_transport_endpoint(sock, ctx, log, loop)
361 @classmethod
362 async def create_server_transport_endpoint(
363 cls, ctx: interfaces.MessageManager, log, loop, bind, multicast
364 ):
365 bind = bind or ("::", None)
366 # Interpret None as 'default port', but still allow to bind to 0 for
367 # servers that want a random port (eg. when the service URLs are
368 # advertised out-of-band anyway, or in LwM2M clients)
369 bind = (bind[0], COAP_PORT if bind[1] is None else bind[1])
371 # The later bind() does most of what getaddr info usually does
372 # (including resolving names), but is missing out subtly: It does not
373 # populate the zone identifier of an IPv6 address, making it impossible
374 # without a getaddrinfo (or manual mapping of the name to a number) to
375 # bind to a specific link-local interface
376 try:
377 addriter = getaddrinfo(
378 loop,
379 log,
380 bind[0],
381 bind[1],
382 )
383 try:
384 bind = await addriter.__anext__()
385 except StopAsyncIteration:
386 raise RuntimeError(
387 "getaddrinfo returned zero-length list rather than erring out"
388 )
389 except socket.gaierror:
390 raise error.ResolutionError(
391 "No local bindable address found for %s" % bind[0]
392 )
394 try:
395 additional = await addriter.__anext__()
396 except StopAsyncIteration:
397 pass
398 except Exception as e:
399 log.error(
400 "Ignoring exception raised when checking for additional addresses that match the bind address",
401 exc_info=e,
402 )
403 else:
404 log.warning(
405 "Multiple addresses to bind to, only selecting %r and discarding %r and any later",
406 bind,
407 additional,
408 )
410 sock = socket.socket(family=socket.AF_INET6, type=socket.SOCK_DGRAM)
411 if defaults.has_reuse_port():
412 # I doubt that there is any platform that supports RECVPKTINFO but
413 # not REUSEPORT, but why take chances.
414 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
415 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
416 sock.bind(bind)
418 return await cls._create_transport_endpoint(sock, ctx, log, loop, multicast)
420 async def shutdown(self):
421 self._shutting_down = asyncio.get_running_loop().create_future()
423 self.transport.close()
425 await self._shutting_down
427 del self._ctx
429 def send(self, message):
430 ancdata = []
431 if message.remote.pktinfo is not None:
432 ancdata.append(
433 (socket.IPPROTO_IPV6, socknumbers.IPV6_PKTINFO, message.remote.pktinfo)
434 )
435 assert (
436 self._remote_being_sent_to.get(None) is None
437 ), "udp6.MessageInterfaceUDP6.send was reentered in a single task"
438 self._remote_being_sent_to.set(message.remote)
439 try:
440 self.transport.sendmsg(
441 message.encode(), ancdata, 0, message.remote.sockaddr
442 )
443 finally:
444 self._remote_being_sent_to.set(None)
446 async def recognize_remote(self, remote):
447 return isinstance(remote, UDP6EndpointAddress) and remote.interface == self
449 async def determine_remote(self, request):
450 if request.requested_scheme not in ("coap", None):
451 return None
453 if request.unresolved_remote is not None:
454 host, port = hostportsplit(request.unresolved_remote)
455 port = port or COAP_PORT
456 elif request.opt.uri_host:
457 host = request.opt.uri_host
458 if host.startswith("[") and host.endswith("]"):
459 host = host[1:-1]
460 port = request.opt.uri_port or COAP_PORT
461 else:
462 raise ValueError(
463 "No location found to send message to (neither in .opt.uri_host nor in .remote)"
464 )
466 # Take aside the zone identifier. While it can pass through getaddrinfo
467 # in some situations (eg. 'fe80::1234%eth0' will give 'fe80::1234'
468 # scope eth0, and similar for ff02:: addresses), in others (eg. ff05::)
469 # it gives 'Name or service not known'.
471 if "%" in host:
472 host, zone = host.split("%", 1)
473 try:
474 zone = socket.if_nametoindex(zone)
475 except OSError:
476 raise error.ResolutionError("Invalid zone identifier %s" % zone)
477 else:
478 zone = None
480 try:
481 # Note that this is our special addrinfo that ensures there is a
482 # route.
483 ip, port, flowinfo, scopeid = await getaddrinfo(
484 self.loop,
485 self.log,
486 host,
487 port,
488 ).__anext__()
489 except socket.gaierror:
490 raise error.ResolutionError(
491 "No address information found for requests to %r" % host
492 )
494 if zone is not None:
495 # Still trying to preserve the information returned (libc can't do
496 # it as described at
497 # <https://unix.stackexchange.com/questions/174767/ipv6-zone-id-in-etc-hosts>)
498 # in case something sane does come out of that.
499 if scopeid != 0 and scopeid != zone:
500 self.log.warning(
501 "Resolved address of %s came with zone ID %d whereas explicit ID %d takes precedence",
502 host,
503 scopeid,
504 zone,
505 )
506 scopeid = zone
508 # We could be done here and return UDP6EndpointAddress(the reassembled
509 # sockaddr, self), but:
510 #
511 # Linux (unlike FreeBSD) takes the sockaddr's scope ID only for
512 # link-local scopes (as per ipv6(7), and discards it otherwise. It does
513 # need the information of the selected interface, though, in order to
514 # pick the right outgoing interface. Thus, we provide it in the local
515 # portion.
517 if scopeid:
518 # "Any" does not include "even be it IPv4" -- the underlying family
519 # unfortunately needs to be set, or Linux will refuse to send.
520 if ipaddress.IPv6Address(ip).ipv4_mapped is None:
521 local_source = _ipv6_unspecified
522 else:
523 local_source = _ipv4_unspecified
524 local = InterfaceOnlyPktinfo(_in6_pktinfo.pack(local_source, scopeid))
525 else:
526 local = None
528 sockaddr = ip, port, flowinfo, scopeid
529 result = UDP6EndpointAddress(sockaddr, self, pktinfo=local)
530 if request.remote.maximum_block_size_exp < result.maximum_block_size_exp:
531 result.maximum_block_size_exp = request.remote.maximum_block_size_exp
532 return result
534 #
535 # implementing the typical DatagramProtocol interfaces.
536 #
537 # note from the documentation: we may rely on connection_made to be called
538 # before datagram_received -- but sending immediately after context
539 # creation will still fail
541 def connection_made(self, transport):
542 """Implementation of the DatagramProtocol interface, called by the transport."""
543 self.ready.set_result(True)
544 self.transport = transport
546 def datagram_msg_received(self, data, ancdata, flags, address):
547 """Implementation of the RecvmsgDatagramProtocol interface, called by the transport."""
548 pktinfo = None
549 for cmsg_level, cmsg_type, cmsg_data in ancdata:
550 if (
551 cmsg_level == socket.IPPROTO_IPV6
552 and cmsg_type == socknumbers.IPV6_PKTINFO
553 ):
554 pktinfo = cmsg_data
555 else:
556 self.log.info(
557 "Received unexpected ancillary data to recvmsg: level %d, type %d, data %r",
558 cmsg_level,
559 cmsg_type,
560 cmsg_data,
561 )
562 if pktinfo is None:
563 self.log.warning(
564 "Did not receive requested pktinfo ancdata on message from %s", address
565 )
566 try:
567 message = Message.decode(
568 data, UDP6EndpointAddress(address, self, pktinfo=pktinfo)
569 )
570 except error.UnparsableMessage:
571 self.log.warning("Ignoring unparsable message from %s", address)
572 return
574 try:
575 self._ctx.dispatch_message(message)
576 except BaseException as exc:
577 # Catching here because util.asyncio.recvmsg inherits
578 # _SelectorDatagramTransport's bad handling of callback errors;
579 # this is the last time we have a log at hand.
580 self.log.error(
581 "Exception raised through dispatch_message: %s", exc, exc_info=exc
582 )
583 raise
585 def datagram_errqueue_received(self, data, ancdata, flags, address):
586 assert (
587 flags == socknumbers.MSG_ERRQUEUE
588 ), "Received non-error data through the errqueue"
589 pktinfo = None
590 errno_value = None
591 for cmsg_level, cmsg_type, cmsg_data in ancdata:
592 assert (
593 cmsg_level == socket.IPPROTO_IPV6
594 ), "Received non-IPv6 protocol through the errqueue"
595 if cmsg_type == socknumbers.IPV6_RECVERR:
596 extended_err = SockExtendedErr.load(cmsg_data)
597 self.log.debug("Socket error recevied, details: %s", extended_err)
598 errno_value = extended_err.ee_errno
599 elif (
600 cmsg_level == socket.IPPROTO_IPV6
601 and cmsg_type == socknumbers.IPV6_PKTINFO
602 ):
603 pktinfo = cmsg_data
604 else:
605 self.log.info(
606 "Received unexpected ancillary data to recvmsg errqueue: level %d, type %d, data %r",
607 cmsg_level,
608 cmsg_type,
609 cmsg_data,
610 )
611 remote = UDP6EndpointAddress(address, self, pktinfo=pktinfo)
613 # not trying to decode a message from data -- that works for
614 # "connection refused", doesn't work for "no route to host", and
615 # anyway, when an icmp error comes back, everything pending from that
616 # port should err out.
618 try:
619 text = os.strerror(errno_value)
620 symbol = errno.errorcode.get(errno_value, None)
621 symbol = "" if symbol is None else f"{symbol}, "
622 self._ctx.dispatch_error(
623 OSError(errno_value, f"{text} ({symbol}received through errqueue)"),
624 remote,
625 )
626 except BaseException as exc:
627 # Catching here because util.asyncio.recvmsg inherits
628 # _SelectorDatagramTransport's bad handling of callback errors;
629 # this is the last time we have a log at hand.
630 self.log.error(
631 "Exception raised through dispatch_error: %s", exc, exc_info=exc
632 )
633 raise
635 def error_received(self, exc):
636 """Implementation of the DatagramProtocol interface, called by the transport."""
638 remote = self._remote_being_sent_to.get()
640 if remote is None:
641 self.log.info(
642 "Error received in situation with no way to to determine which sending caused the error; this should be accompanied by an error in another code path: %s",
643 exc,
644 )
645 return
647 try:
648 self._ctx.dispatch_error(exc, remote)
649 except BaseException as exc:
650 # Catching here because util.asyncio.recvmsg inherits
651 # _SelectorDatagramTransport's bad handling of callback errors;
652 # this is the last time we have a log at hand.
653 self.log.error(
654 "Exception raised through dispatch_error: %s", exc, exc_info=exc
655 )
656 raise
658 def connection_lost(self, exc):
659 # TODO better error handling -- find out what can cause this at all
660 # except for a shutdown
661 if exc is not None:
662 self.log.error("Connection lost: %s", exc)
664 if self._shutting_down is None:
665 self.log.error("Connection loss was not expected.")
666 else:
667 self._shutting_down.set_result(None)