Coverage for aiocoap/transports/udp6.py: 75%

292 statements  

« prev     ^ index     » next       coverage.py v7.6.3, created at 2024-10-15 22:10 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module implements a MessageInterface for UDP based on a variation of 

6the asyncio DatagramProtocol. 

7 

8This implementation strives to be correct and complete behavior while still 

9only using a single socket; that is, to be usable for all kinds of multicast 

10traffic, to support server and client behavior at the same time, and to work 

11correctly even when multiple IPv6 and IPv4 (using V4MAPPED style addresses) 

12interfaces are present, and any of the interfaces has multiple addresses. 

13 

14This requires using some standardized but not necessarily widely ported 

15features: ``IPV6_RECVPKTINFO`` to determine 

16incoming packages' destination addresses (was it multicast) and to return 

17packages from the same address, ``IPV6_JOIN_GROUP`` for multicast 

18membership management and ``recvmsg`` to obtain data configured with the above 

19options. The need for ``AI_V4MAPPED`` and ``AI_ADDRCONFIG`` is not manifest 

20in the code because the latter on its own is insufficient to enable seamless 

21interoperability with IPv4+IPv6 servers on IPv4-only hosts; instead, 

22short-lived sockets are crated to assess which addresses are routable. This 

23should correctly deal with situations in which a client has an IPv6 ULA 

24assigned but no route, no matter whether the server advertises global IPv6 

25addresses or addresses inside that ULA. It can not deal with situations in 

26which the host has a default IPv6 route, but that route is not actually usable. 

27 

28To the author's knowledge, there is no standardized mechanism for receiving 

29ICMP errors in such a setup. On Linux, ``IPV6_RECVERR`` and ``MSG_ERRQUEUE`` 

30are used to receive ICMP errors from the socket; on other platforms, a warning 

31is emitted that ICMP errors are ignored. Using a :mod:`.simple6` for clients is 

32recommended for those when working as a client only. 

33 

34Exceeding for the above error handling, no attempts are made to fall back to a 

35kind-of-correct or limited-functionality behavior if these options are 

36unavailable, for the resulting code would be hard to maintain ("``ifdef`` 

37hell") or would cause odd bugs at users (eg. servers that stop working when an 

38additional IPv6 address gets assigned). If the module does not work for you, 

39and the options can not be added easily to your platform, consider using the 

40:mod:`.simple6` module instead. 

41""" 

42 

43import asyncio 

44import contextvars 

45import errno 

46import os 

47import socket 

48import ipaddress 

49import struct 

50import weakref 

51from collections import namedtuple 

52 

53from ..message import Message 

54from ..numbers import constants 

55from .. import defaults 

56from .. import error 

57from .. import interfaces 

58from ..numbers import COAP_PORT 

59from ..util.asyncio.recvmsg import ( 

60 RecvmsgDatagramProtocol, 

61 create_recvmsg_datagram_endpoint, 

62) 

63from ..util.asyncio.getaddrinfo_addrconfig import ( 

64 getaddrinfo_routechecked as getaddrinfo, 

65) 

66from ..util import hostportjoin, hostportsplit 

67from ..util import socknumbers 

68 

69"""The `struct in6_pktinfo` from RFC3542""" 

70_in6_pktinfo = struct.Struct("16sI") 

71 

72_ipv6_unspecified = socket.inet_pton(socket.AF_INET6, "::") 

73_ipv4_unspecified = socket.inet_pton(socket.AF_INET6, "::ffff:0.0.0.0") 

74 

75 

76class InterfaceOnlyPktinfo(bytes): 

77 """A thin wrapper over bytes that represent a pktinfo built just to select 

78 an outgoing interface. 

79 

80 This must not be treated any different than a regular pktinfo, and is just 

81 tagged for better debug output. (Ie. if this is replaced everywhere with 

82 plain `bytes`, things must still work).""" 

83 

84 

85class UDP6EndpointAddress(interfaces.EndpointAddress): 

86 """Remote address type for :class:`MessageInterfaceUDP6`. Remote address is 

87 stored in form of a socket address; local address can be roundtripped by 

88 opaque pktinfo data. 

89 

90 For purposes of equality (and thus hashing), the local address is *not* 

91 checked. Neither is the scopeid that is part of the socket address. 

92 

93 >>> interface = type("FakeMessageInterface", (), {}) 

94 >>> if1_name = socket.if_indextoname(1) 

95 >>> local = UDP6EndpointAddress(socket.getaddrinfo('127.0.0.1', 5683, type=socket.SOCK_DGRAM, family=socket.AF_INET6, flags=socket.AI_V4MAPPED)[0][-1], interface) 

96 >>> local.is_multicast 

97 False 

98 >>> local.hostinfo 

99 '127.0.0.1' 

100 >>> all_coap_link1 = UDP6EndpointAddress(socket.getaddrinfo('ff02:0:0:0:0:0:0:fd%1', 1234, type=socket.SOCK_DGRAM, family=socket.AF_INET6)[0][-1], interface) 

101 >>> all_coap_link1.is_multicast 

102 True 

103 >>> all_coap_link1.hostinfo == '[ff02::fd%{}]:1234'.format(if1_name) 

104 True 

105 >>> all_coap_site = UDP6EndpointAddress(socket.getaddrinfo('ff05:0:0:0:0:0:0:fd', 1234, type=socket.SOCK_DGRAM, family=socket.AF_INET6)[0][-1], interface) 

106 >>> all_coap_site.is_multicast 

107 True 

108 >>> all_coap_site.hostinfo 

109 '[ff05::fd]:1234' 

110 >>> all_coap4 = UDP6EndpointAddress(socket.getaddrinfo('224.0.1.187', 5683, type=socket.SOCK_DGRAM, family=socket.AF_INET6, flags=socket.AI_V4MAPPED)[0][-1], interface) 

111 >>> all_coap4.is_multicast 

112 True 

113 """ 

114 

115 def __init__(self, sockaddr, interface, *, pktinfo=None): 

116 self.sockaddr = sockaddr 

117 self.pktinfo = pktinfo 

118 self._interface = weakref.ref(interface) 

119 

120 scheme = "coap" 

121 

122 interface = property(lambda self: self._interface()) 

123 

124 # Unlike for other remotes, this is settable per instance. 

125 maximum_block_size_exp = constants.MAX_REGULAR_BLOCK_SIZE_EXP 

126 

127 def __hash__(self): 

128 return hash(self.sockaddr[:-1]) 

129 

130 def __eq__(self, other): 

131 return self.sockaddr[:-1] == other.sockaddr[:-1] 

132 

133 def __repr__(self): 

134 return "<%s %s%s>" % ( 

135 type(self).__name__, 

136 self.hostinfo, 

137 " (locally %s)" % self._repr_pktinfo() if self.pktinfo is not None else "", 

138 ) 

139 

140 @staticmethod 

141 def _strip_v4mapped(address): 

142 """Turn anything that's a valid input to ipaddress.IPv6Address into a 

143 user-friendly string that's either an IPv6 or an IPv4 address. 

144 

145 This also compresses (normalizes) the IPv6 address as a convenient side 

146 effect.""" 

147 address = ipaddress.IPv6Address(address) 

148 mapped = address.ipv4_mapped 

149 if mapped is not None: 

150 return str(mapped) 

151 return str(address) 

152 

153 def _plainaddress(self): 

154 """Return the IP adress part of the sockaddr in IPv4 notation if it is 

155 mapped, otherwise the plain v6 address including the interface 

156 identifier if set.""" 

157 

158 if self.sockaddr[3] != 0: 

159 try: 

160 scopepart = "%" + socket.if_indextoname(self.sockaddr[3]) 

161 except Exception: # could be an OS error, could just be that there is no function of this name, as it is on Android 

162 scopepart = "%" + str(self.sockaddr[3]) 

163 else: 

164 scopepart = "" 

165 if "%" in self.sockaddr[0]: 

166 # Fix for Python 3.6 and earlier that reported the scope information 

167 # in the IP literal (3.7 consistently expresses it in the tuple slot 3) 

168 scopepart = "" 

169 return self._strip_v4mapped(self.sockaddr[0]) + scopepart 

170 

171 def _repr_pktinfo(self): 

172 """What repr(self.pktinfo) would be if that were not a plain untyped bytestring""" 

173 addr, interface = _in6_pktinfo.unpack_from(self.pktinfo) 

174 if interface == 0: 

175 interface = "" 

176 else: 

177 try: 

178 interface = "%" + socket.if_indextoname(interface) 

179 except Exception as e: 

180 interface = "%%%d(%s)" % (interface, e) 

181 

182 return "%s%s" % (self._strip_v4mapped(addr), interface) 

183 

184 def _plainaddress_local(self): 

185 """Like _plainaddress, but on the address in the pktinfo. Unlike 

186 _plainaddress, this does not contain the interface identifier.""" 

187 

188 addr, interface = _in6_pktinfo.unpack_from(self.pktinfo) 

189 

190 return self._strip_v4mapped(addr) 

191 

192 @property 

193 def netif(self): 

194 """Textual interface identifier of the explicitly configured remote 

195 interface, or the interface identifier reported in an incoming 

196 link-local message. None if not set.""" 

197 index = self.sockaddr[3] 

198 return socket.if_indextoname(index) if index else None 

199 

200 @property 

201 def hostinfo(self): 

202 port = self.sockaddr[1] 

203 if port == COAP_PORT: 

204 port = None 

205 

206 # plainaddress: don't assume other applications can deal with v4mapped addresses 

207 return hostportjoin(self._plainaddress(), port) 

208 

209 @property 

210 def hostinfo_local(self): 

211 host = self._plainaddress_local() 

212 port = self.interface._local_port() 

213 if port == 0: 

214 raise ValueError("Local port read before socket has bound itself") 

215 if port == COAP_PORT: 

216 port = None 

217 return hostportjoin(host, port) 

218 

219 @property 

220 def uri_base(self): 

221 return "coap://" + self.hostinfo 

222 

223 @property 

224 def uri_base_local(self): 

225 return "coap://" + self.hostinfo_local 

226 

227 @property 

228 def is_multicast(self): 

229 return ipaddress.ip_address(self._plainaddress().split("%", 1)[0]).is_multicast 

230 

231 @property 

232 def is_multicast_locally(self): 

233 return ipaddress.ip_address(self._plainaddress_local()).is_multicast 

234 

235 def as_response_address(self): 

236 if not self.is_multicast_locally: 

237 return self 

238 

239 # Create a copy without pktinfo, as responses to messages received to 

240 # multicast addresses can not have their request's destination address 

241 # as source address 

242 return type(self)(self.sockaddr, self.interface) 

243 

244 @property 

245 def blockwise_key(self): 

246 return (self.sockaddr, self.pktinfo) 

247 

248 

249class SockExtendedErr( 

250 namedtuple( 

251 "_SockExtendedErr", "ee_errno ee_origin ee_type ee_code ee_pad ee_info ee_data" 

252 ) 

253): 

254 _struct = struct.Struct("IbbbbII") 

255 

256 @classmethod 

257 def load(cls, data): 

258 # unpack_from: recvmsg(2) says that more data may follow 

259 return cls(*cls._struct.unpack_from(data)) 

260 

261 

262class MessageInterfaceUDP6(RecvmsgDatagramProtocol, interfaces.MessageInterface): 

263 def __init__(self, ctx: interfaces.MessageManager, log, loop): 

264 self._ctx = ctx 

265 self.log = log 

266 self.loop = loop 

267 

268 self._shutting_down = ( 

269 None #: Future created and used in the .shutdown() method. 

270 ) 

271 

272 self.ready = asyncio.get_running_loop().create_future() #: Future that gets fullfilled by connection_made (ie. don't send before this is done; handled by ``create_..._context`` 

273 

274 # This is set while a send is underway to determine in the 

275 # error_received call site whom we were actually sending something to. 

276 # This is a workaround for the abysmal error handling unconnected 

277 # sockets have :-/ 

278 # 

279 # FIXME: Figure out whether aiocoap can at all support a context being 

280 # used with multiple aiocoap contexts, and if not, raise an error early 

281 # rather than just-in-case doing extra stuff here. 

282 self._remote_being_sent_to = contextvars.ContextVar( 

283 "_remote_being_sent_to", default=None 

284 ) 

285 

286 def _local_port(self): 

287 # FIXME: either raise an error if this is 0, or send a message to self 

288 # to force the OS to decide on a port. Right now, this reports wrong 

289 # results while the first message has not been sent yet. 

290 return self.transport.get_extra_info("socket").getsockname()[1] 

291 

292 @classmethod 

293 async def _create_transport_endpoint( 

294 cls, sock, ctx: interfaces.MessageManager, log, loop, multicast=[] 

295 ): 

296 try: 

297 sock.setsockopt(socket.IPPROTO_IPV6, socknumbers.IPV6_RECVPKTINFO, 1) 

298 except NameError: 

299 raise RuntimeError( 

300 "RFC3542 PKTINFO flags are unavailable, unable to create a udp6 transport." 

301 ) 

302 if socknumbers.HAS_RECVERR: 

303 sock.setsockopt(socket.IPPROTO_IPV6, socknumbers.IPV6_RECVERR, 1) 

304 # i'm curious why this is required; didn't IPV6_V6ONLY=0 already make 

305 # it clear that i don't care about the ip version as long as everything looks the same? 

306 sock.setsockopt(socket.IPPROTO_IP, socknumbers.IP_RECVERR, 1) 

307 else: 

308 log.warning( 

309 "Transport udp6 set up on platform without RECVERR capability. ICMP errors will be ignored." 

310 ) 

311 

312 for address_string, interface_string in sum( 

313 map( 

314 # Expand shortcut of "interface name means default CoAP all-nodes addresses" 

315 lambda i: [(a, i) for a in constants.MCAST_ALL] 

316 if isinstance(i, str) 

317 else [i], 

318 multicast, 

319 ), 

320 [], 

321 ): 

322 address = ipaddress.ip_address(address_string) 

323 interface = socket.if_nametoindex(interface_string) 

324 

325 if isinstance(address, ipaddress.IPv4Address): 

326 s = struct.pack( 

327 "4s4si", address.packed, socket.inet_aton("0.0.0.0"), interface 

328 ) 

329 try: 

330 sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, s) 

331 except OSError: 

332 log.warning("Could not join IPv4 multicast group") 

333 

334 elif isinstance(address, ipaddress.IPv6Address): 

335 s = struct.pack("16si", address.packed, interface) 

336 try: 

337 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, s) 

338 except OSError: 

339 log.warning("Could not join IPv6 multicast group") 

340 

341 else: 

342 raise RuntimeError("Unknown address format") 

343 

344 transport, protocol = await create_recvmsg_datagram_endpoint( 

345 loop, lambda: cls(ctx, log=log, loop=loop), sock=sock 

346 ) 

347 

348 await protocol.ready 

349 

350 return protocol 

351 

352 @classmethod 

353 async def create_client_transport_endpoint( 

354 cls, ctx: interfaces.MessageManager, log, loop 

355 ): 

356 sock = socket.socket(family=socket.AF_INET6, type=socket.SOCK_DGRAM) 

357 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) 

358 

359 return await cls._create_transport_endpoint(sock, ctx, log, loop) 

360 

361 @classmethod 

362 async def create_server_transport_endpoint( 

363 cls, ctx: interfaces.MessageManager, log, loop, bind, multicast 

364 ): 

365 bind = bind or ("::", None) 

366 # Interpret None as 'default port', but still allow to bind to 0 for 

367 # servers that want a random port (eg. when the service URLs are 

368 # advertised out-of-band anyway, or in LwM2M clients) 

369 bind = (bind[0], COAP_PORT if bind[1] is None else bind[1]) 

370 

371 # The later bind() does most of what getaddr info usually does 

372 # (including resolving names), but is missing out subtly: It does not 

373 # populate the zone identifier of an IPv6 address, making it impossible 

374 # without a getaddrinfo (or manual mapping of the name to a number) to 

375 # bind to a specific link-local interface 

376 try: 

377 addriter = getaddrinfo( 

378 loop, 

379 log, 

380 bind[0], 

381 bind[1], 

382 ) 

383 try: 

384 bind = await addriter.__anext__() 

385 except StopAsyncIteration: 

386 raise RuntimeError( 

387 "getaddrinfo returned zero-length list rather than erring out" 

388 ) 

389 except socket.gaierror: 

390 raise error.ResolutionError( 

391 "No local bindable address found for %s" % bind[0] 

392 ) 

393 

394 try: 

395 additional = await addriter.__anext__() 

396 except StopAsyncIteration: 

397 pass 

398 except Exception as e: 

399 log.error( 

400 "Ignoring exception raised when checking for additional addresses that match the bind address", 

401 exc_info=e, 

402 ) 

403 else: 

404 log.warning( 

405 "Multiple addresses to bind to, only selecting %r and discarding %r and any later", 

406 bind, 

407 additional, 

408 ) 

409 

410 sock = socket.socket(family=socket.AF_INET6, type=socket.SOCK_DGRAM) 

411 if defaults.has_reuse_port(): 

412 # I doubt that there is any platform that supports RECVPKTINFO but 

413 # not REUSEPORT, but why take chances. 

414 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 

415 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) 

416 sock.bind(bind) 

417 

418 return await cls._create_transport_endpoint(sock, ctx, log, loop, multicast) 

419 

420 async def shutdown(self): 

421 self._shutting_down = asyncio.get_running_loop().create_future() 

422 

423 self.transport.close() 

424 

425 await self._shutting_down 

426 

427 del self._ctx 

428 

429 def send(self, message): 

430 ancdata = [] 

431 if message.remote.pktinfo is not None: 

432 ancdata.append( 

433 (socket.IPPROTO_IPV6, socknumbers.IPV6_PKTINFO, message.remote.pktinfo) 

434 ) 

435 assert ( 

436 self._remote_being_sent_to.get(None) is None 

437 ), "udp6.MessageInterfaceUDP6.send was reentered in a single task" 

438 self._remote_being_sent_to.set(message.remote) 

439 try: 

440 self.transport.sendmsg( 

441 message.encode(), ancdata, 0, message.remote.sockaddr 

442 ) 

443 finally: 

444 self._remote_being_sent_to.set(None) 

445 

446 async def recognize_remote(self, remote): 

447 return isinstance(remote, UDP6EndpointAddress) and remote.interface == self 

448 

449 async def determine_remote(self, request): 

450 if request.requested_scheme not in ("coap", None): 

451 return None 

452 

453 if request.unresolved_remote is not None: 

454 host, port = hostportsplit(request.unresolved_remote) 

455 port = port or COAP_PORT 

456 elif request.opt.uri_host: 

457 host = request.opt.uri_host 

458 if host.startswith("[") and host.endswith("]"): 

459 host = host[1:-1] 

460 port = request.opt.uri_port or COAP_PORT 

461 else: 

462 raise ValueError( 

463 "No location found to send message to (neither in .opt.uri_host nor in .remote)" 

464 ) 

465 

466 # Take aside the zone identifier. While it can pass through getaddrinfo 

467 # in some situations (eg. 'fe80::1234%eth0' will give 'fe80::1234' 

468 # scope eth0, and similar for ff02:: addresses), in others (eg. ff05::) 

469 # it gives 'Name or service not known'. 

470 

471 if "%" in host: 

472 host, zone = host.split("%", 1) 

473 try: 

474 zone = socket.if_nametoindex(zone) 

475 except OSError: 

476 raise error.ResolutionError("Invalid zone identifier %s" % zone) 

477 else: 

478 zone = None 

479 

480 try: 

481 # Note that this is our special addrinfo that ensures there is a 

482 # route. 

483 ip, port, flowinfo, scopeid = await getaddrinfo( 

484 self.loop, 

485 self.log, 

486 host, 

487 port, 

488 ).__anext__() 

489 except socket.gaierror: 

490 raise error.ResolutionError( 

491 "No address information found for requests to %r" % host 

492 ) 

493 

494 if zone is not None: 

495 # Still trying to preserve the information returned (libc can't do 

496 # it as described at 

497 # <https://unix.stackexchange.com/questions/174767/ipv6-zone-id-in-etc-hosts>) 

498 # in case something sane does come out of that. 

499 if scopeid != 0 and scopeid != zone: 

500 self.log.warning( 

501 "Resolved address of %s came with zone ID %d whereas explicit ID %d takes precedence", 

502 host, 

503 scopeid, 

504 zone, 

505 ) 

506 scopeid = zone 

507 

508 # We could be done here and return UDP6EndpointAddress(the reassembled 

509 # sockaddr, self), but: 

510 # 

511 # Linux (unlike FreeBSD) takes the sockaddr's scope ID only for 

512 # link-local scopes (as per ipv6(7), and discards it otherwise. It does 

513 # need the information of the selected interface, though, in order to 

514 # pick the right outgoing interface. Thus, we provide it in the local 

515 # portion. 

516 

517 if scopeid: 

518 # "Any" does not include "even be it IPv4" -- the underlying family 

519 # unfortunately needs to be set, or Linux will refuse to send. 

520 if ipaddress.IPv6Address(ip).ipv4_mapped is None: 

521 local_source = _ipv6_unspecified 

522 else: 

523 local_source = _ipv4_unspecified 

524 local = InterfaceOnlyPktinfo(_in6_pktinfo.pack(local_source, scopeid)) 

525 else: 

526 local = None 

527 

528 sockaddr = ip, port, flowinfo, scopeid 

529 result = UDP6EndpointAddress(sockaddr, self, pktinfo=local) 

530 if request.remote.maximum_block_size_exp < result.maximum_block_size_exp: 

531 result.maximum_block_size_exp = request.remote.maximum_block_size_exp 

532 return result 

533 

534 # 

535 # implementing the typical DatagramProtocol interfaces. 

536 # 

537 # note from the documentation: we may rely on connection_made to be called 

538 # before datagram_received -- but sending immediately after context 

539 # creation will still fail 

540 

541 def connection_made(self, transport): 

542 """Implementation of the DatagramProtocol interface, called by the transport.""" 

543 self.ready.set_result(True) 

544 self.transport = transport 

545 

546 def datagram_msg_received(self, data, ancdata, flags, address): 

547 """Implementation of the RecvmsgDatagramProtocol interface, called by the transport.""" 

548 pktinfo = None 

549 for cmsg_level, cmsg_type, cmsg_data in ancdata: 

550 if ( 

551 cmsg_level == socket.IPPROTO_IPV6 

552 and cmsg_type == socknumbers.IPV6_PKTINFO 

553 ): 

554 pktinfo = cmsg_data 

555 else: 

556 self.log.info( 

557 "Received unexpected ancillary data to recvmsg: level %d, type %d, data %r", 

558 cmsg_level, 

559 cmsg_type, 

560 cmsg_data, 

561 ) 

562 if pktinfo is None: 

563 self.log.warning( 

564 "Did not receive requested pktinfo ancdata on message from %s", address 

565 ) 

566 try: 

567 message = Message.decode( 

568 data, UDP6EndpointAddress(address, self, pktinfo=pktinfo) 

569 ) 

570 except error.UnparsableMessage: 

571 self.log.warning("Ignoring unparsable message from %s", address) 

572 return 

573 

574 try: 

575 self._ctx.dispatch_message(message) 

576 except BaseException as exc: 

577 # Catching here because util.asyncio.recvmsg inherits 

578 # _SelectorDatagramTransport's bad handling of callback errors; 

579 # this is the last time we have a log at hand. 

580 self.log.error( 

581 "Exception raised through dispatch_message: %s", exc, exc_info=exc 

582 ) 

583 raise 

584 

585 def datagram_errqueue_received(self, data, ancdata, flags, address): 

586 assert ( 

587 flags == socknumbers.MSG_ERRQUEUE 

588 ), "Received non-error data through the errqueue" 

589 pktinfo = None 

590 errno_value = None 

591 for cmsg_level, cmsg_type, cmsg_data in ancdata: 

592 assert ( 

593 cmsg_level == socket.IPPROTO_IPV6 

594 ), "Received non-IPv6 protocol through the errqueue" 

595 if cmsg_type == socknumbers.IPV6_RECVERR: 

596 extended_err = SockExtendedErr.load(cmsg_data) 

597 self.log.debug("Socket error recevied, details: %s", extended_err) 

598 errno_value = extended_err.ee_errno 

599 elif ( 

600 cmsg_level == socket.IPPROTO_IPV6 

601 and cmsg_type == socknumbers.IPV6_PKTINFO 

602 ): 

603 pktinfo = cmsg_data 

604 else: 

605 self.log.info( 

606 "Received unexpected ancillary data to recvmsg errqueue: level %d, type %d, data %r", 

607 cmsg_level, 

608 cmsg_type, 

609 cmsg_data, 

610 ) 

611 remote = UDP6EndpointAddress(address, self, pktinfo=pktinfo) 

612 

613 # not trying to decode a message from data -- that works for 

614 # "connection refused", doesn't work for "no route to host", and 

615 # anyway, when an icmp error comes back, everything pending from that 

616 # port should err out. 

617 

618 try: 

619 text = os.strerror(errno_value) 

620 symbol = errno.errorcode.get(errno_value, None) 

621 symbol = "" if symbol is None else f"{symbol}, " 

622 self._ctx.dispatch_error( 

623 OSError(errno_value, f"{text} ({symbol}received through errqueue)"), 

624 remote, 

625 ) 

626 except BaseException as exc: 

627 # Catching here because util.asyncio.recvmsg inherits 

628 # _SelectorDatagramTransport's bad handling of callback errors; 

629 # this is the last time we have a log at hand. 

630 self.log.error( 

631 "Exception raised through dispatch_error: %s", exc, exc_info=exc 

632 ) 

633 raise 

634 

635 def error_received(self, exc): 

636 """Implementation of the DatagramProtocol interface, called by the transport.""" 

637 

638 remote = self._remote_being_sent_to.get() 

639 

640 if remote is None: 

641 self.log.info( 

642 "Error received in situation with no way to to determine which sending caused the error; this should be accompanied by an error in another code path: %s", 

643 exc, 

644 ) 

645 return 

646 

647 try: 

648 self._ctx.dispatch_error(exc, remote) 

649 except BaseException as exc: 

650 # Catching here because util.asyncio.recvmsg inherits 

651 # _SelectorDatagramTransport's bad handling of callback errors; 

652 # this is the last time we have a log at hand. 

653 self.log.error( 

654 "Exception raised through dispatch_error: %s", exc, exc_info=exc 

655 ) 

656 raise 

657 

658 def connection_lost(self, exc): 

659 # TODO better error handling -- find out what can cause this at all 

660 # except for a shutdown 

661 if exc is not None: 

662 self.log.error("Connection lost: %s", exc) 

663 

664 if self._shutting_down is None: 

665 self.log.error("Connection loss was not expected.") 

666 else: 

667 self._shutting_down.set_result(None)