Coverage for aiocoap / transports / udp6.py: 76%

297 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-29 12:32 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module implements a MessageInterface for UDP based on a variation of 

6the asyncio DatagramProtocol. 

7 

8This implementation strives to be correct and complete behavior while still 

9only using a single socket (or a few, if distinct addresses are confgiured); 

10that is, to be usable for all kinds of multicast 

11traffic, to support server and client behavior at the same time, and to work 

12correctly even when multiple IPv6 and IPv4 (using V4MAPPED style addresses) 

13interfaces are present, and any of the interfaces has multiple addresses. 

14 

15Configuration 

16------------- 

17 

18This transport is configured in the ``transports.udp`` item of an 

19:mod:`aiocoap.config`, a :class:`Udp6Parameters 

20<aiocoap.config.Udp6Parameters>` instance. 

21 

22Implementation 

23-------------- 

24 

25This requires using some standardized and (as of 2025) widely supported 

26features: 

27 

28* ``IPV6_JOIN_GROUP`` for multicast membership management. 

29 

30* ``IPV6_RECVPKTINFO`` to determine incoming packages' destination addresses 

31 (was it multicast) and to return packages from the same address, 

32 

33* ``recvmsg`` to obtain that data (and, see below, also data from the error queue). 

34 

35The need for ``AI_V4MAPPED`` and ``AI_ADDRCONFIG`` is not manifest 

36in the code because the latter on its own is insufficient to enable seamless 

37interoperability with IPv4+IPv6 servers on IPv4-only hosts; instead, 

38short-lived sockets are created to assess which addresses are routable. (Those 

39are created in the :mod:`aiocoap.util.asyncio.getaddrinfo_addrconfig` module). 

40This should correctly deal with situations in which a client has an IPv6 ULA 

41assigned but no route, no matter whether the server advertises global IPv6 

42addresses or addresses inside that ULA. It can not deal with situations in 

43which the host has a default IPv6 route, but that route is not actually usable: 

44There is no Happy Eyeballs mechanism implemented. 

45 

46To the author's knowledge, there is no standardized mechanism for receiving 

47ICMP errors in such a setup. On Linux, ``IPV6_RECVERR`` and ``MSG_ERRQUEUE`` 

48are used to receive ICMP errors from the socket; on other platforms, a warning 

49is emitted that ICMP errors are ignored. Using a :mod:`.simple6` for clients is 

50recommended for those when working as a client only. 

51 

52Exceeding for the above error handling, no attempts are made to fall back to a 

53kind-of-correct or limited-functionality behavior if these options are 

54unavailable, for the resulting code would be hard to maintain ("``ifdef`` 

55hell") or would cause odd bugs at users (eg. servers that stop working when an 

56additional IPv6 address gets assigned). If the module does not work for you, 

57and the options can not be added easily to your platform, consider using the 

58:mod:`.simple6` module instead. 

59""" 

60 

61import asyncio 

62import contextvars 

63import errno 

64import os 

65import socket 

66import ipaddress 

67import struct 

68import weakref 

69from collections import namedtuple 

70 

71from ..config import TransportParameters 

72from ..message import Message, Direction 

73from ..numbers import constants 

74from .. import defaults 

75from .. import error 

76from .. import interfaces 

77from ..numbers import COAP_PORT 

78from ..util.asyncio.recvmsg import ( 

79 RecvmsgDatagramProtocol, 

80 create_recvmsg_datagram_endpoint, 

81) 

82from ..util.asyncio.getaddrinfo_addrconfig import ( 

83 getaddrinfo_routechecked as getaddrinfo, 

84) 

85from ..util import hostportjoin, hostportsplit 

86from ..util import socknumbers 

87 

88"""The `struct in6_pktinfo` from RFC3542""" 

89_in6_pktinfo = struct.Struct("16sI") 

90 

91_ipv6_unspecified = socket.inet_pton(socket.AF_INET6, "::") 

92_ipv4_unspecified = socket.inet_pton(socket.AF_INET6, "::ffff:0.0.0.0") 

93 

94 

95class InterfaceOnlyPktinfo(bytes): 

96 """A thin wrapper over bytes that represent a pktinfo built just to select 

97 an outgoing interface. 

98 

99 This must not be treated any different than a regular pktinfo, and is just 

100 tagged for better debug output. (Ie. if this is replaced everywhere with 

101 plain `bytes`, things must still work).""" 

102 

103 

104class UDP6EndpointAddress(interfaces.EndpointAddress): 

105 """Remote address type for :class:`MessageInterfaceUDP6`. Remote address is 

106 stored in form of a socket address; local address can be roundtripped by 

107 opaque pktinfo data. 

108 

109 For purposes of equality (and thus hashing), the local address is *not* 

110 checked. Neither is the scopeid that is part of the socket address. 

111 

112 >>> interface = type("FakeMessageInterface", (), {}) 

113 >>> if1_name = socket.if_indextoname(1) 

114 >>> local = UDP6EndpointAddress(socket.getaddrinfo('127.0.0.1', 5683, type=socket.SOCK_DGRAM, family=socket.AF_INET6, flags=socket.AI_V4MAPPED)[0][-1], interface) 

115 >>> local.is_multicast 

116 False 

117 >>> local.hostinfo 

118 '127.0.0.1' 

119 >>> all_coap_link1 = UDP6EndpointAddress(socket.getaddrinfo('ff02:0:0:0:0:0:0:fd%1', 1234, type=socket.SOCK_DGRAM, family=socket.AF_INET6)[0][-1], interface) 

120 >>> all_coap_link1.is_multicast 

121 True 

122 >>> all_coap_link1.hostinfo == '[ff02::fd%{}]:1234'.format(if1_name) 

123 True 

124 >>> all_coap_site = UDP6EndpointAddress(socket.getaddrinfo('ff05:0:0:0:0:0:0:fd', 1234, type=socket.SOCK_DGRAM, family=socket.AF_INET6)[0][-1], interface) 

125 >>> all_coap_site.is_multicast 

126 True 

127 >>> all_coap_site.hostinfo 

128 '[ff05::fd]:1234' 

129 >>> all_coap4 = UDP6EndpointAddress(socket.getaddrinfo('224.0.1.187', 5683, type=socket.SOCK_DGRAM, family=socket.AF_INET6, flags=socket.AI_V4MAPPED)[0][-1], interface) 

130 >>> all_coap4.is_multicast 

131 True 

132 """ 

133 

134 def __init__(self, sockaddr, interface, *, pktinfo=None): 

135 self.sockaddr = sockaddr 

136 self.pktinfo = pktinfo 

137 self._interface = weakref.ref(interface) 

138 

139 scheme = "coap" 

140 

141 interface = property(lambda self: self._interface()) 

142 

143 # Unlike for other remotes, this is settable per instance. 

144 maximum_block_size_exp = constants.MAX_REGULAR_BLOCK_SIZE_EXP 

145 

146 def __hash__(self): 

147 return hash(self.sockaddr[:-1]) 

148 

149 def __eq__(self, other): 

150 return self.sockaddr[:-1] == other.sockaddr[:-1] 

151 

152 def __repr__(self): 

153 return "<%s %s%s>" % ( 

154 type(self).__name__, 

155 self.hostinfo, 

156 " (locally %s)" % self._repr_pktinfo() if self.pktinfo is not None else "", 

157 ) 

158 

159 @staticmethod 

160 def _strip_v4mapped(address): 

161 """Turn anything that's a valid input to ipaddress.IPv6Address into a 

162 user-friendly string that's either an IPv6 or an IPv4 address. 

163 

164 This also compresses (normalizes) the IPv6 address as a convenient side 

165 effect.""" 

166 address = ipaddress.IPv6Address(address) 

167 mapped = address.ipv4_mapped 

168 if mapped is not None: 

169 return str(mapped) 

170 return str(address) 

171 

172 def _plainaddress(self): 

173 """Return the IP address part of the sockaddr in IPv4 notation if it is 

174 mapped, otherwise the plain v6 address including the interface 

175 identifier if set.""" 

176 

177 if self.sockaddr[3] != 0: 

178 try: 

179 scopepart = "%" + socket.if_indextoname(self.sockaddr[3]) 

180 except Exception: # could be an OS error, could just be that there is no function of this name, as it is on Android 

181 scopepart = "%" + str(self.sockaddr[3]) 

182 else: 

183 scopepart = "" 

184 if "%" in self.sockaddr[0]: 

185 # Fix for Python 3.6 and earlier that reported the scope information 

186 # in the IP literal (3.7 consistently expresses it in the tuple slot 3) 

187 scopepart = "" 

188 return self._strip_v4mapped(self.sockaddr[0]) + scopepart 

189 

190 def _repr_pktinfo(self): 

191 """What repr(self.pktinfo) would be if that were not a plain untyped bytestring""" 

192 addr, interface = _in6_pktinfo.unpack_from(self.pktinfo) 

193 if interface == 0: 

194 interface = "" 

195 else: 

196 try: 

197 interface = "%" + socket.if_indextoname(interface) 

198 except Exception as e: 

199 interface = "%%%d(%s)" % (interface, e) 

200 

201 return "%s%s" % (self._strip_v4mapped(addr), interface) 

202 

203 def _plainaddress_local(self): 

204 """Like _plainaddress, but on the address in the pktinfo. Unlike 

205 _plainaddress, this does not contain the interface identifier.""" 

206 

207 addr, interface = _in6_pktinfo.unpack_from(self.pktinfo) 

208 

209 return self._strip_v4mapped(addr) 

210 

211 @property 

212 def netif(self): 

213 """Textual interface identifier of the explicitly configured remote 

214 interface, or the interface identifier reported in an incoming 

215 link-local message. None if not set.""" 

216 index = self.sockaddr[3] 

217 return socket.if_indextoname(index) if index else None 

218 

219 @property 

220 def hostinfo(self): 

221 port = self.sockaddr[1] 

222 if port == COAP_PORT: 

223 port = None 

224 

225 # plainaddress: don't assume other applications can deal with v4mapped addresses 

226 return hostportjoin(self._plainaddress(), port) 

227 

228 @property 

229 def hostinfo_local(self): 

230 host = self._plainaddress_local() 

231 port = self.interface._local_port() 

232 if port == 0: 

233 raise ValueError("Local port read before socket has bound itself") 

234 if port == COAP_PORT: 

235 port = None 

236 return hostportjoin(host, port) 

237 

238 @property 

239 def uri_base(self): 

240 return "coap://" + self.hostinfo 

241 

242 @property 

243 def uri_base_local(self): 

244 return "coap://" + self.hostinfo_local 

245 

246 @property 

247 def is_multicast(self): 

248 return ipaddress.ip_address(self._plainaddress().split("%", 1)[0]).is_multicast 

249 

250 @property 

251 def is_multicast_locally(self): 

252 return ipaddress.ip_address(self._plainaddress_local()).is_multicast 

253 

254 def as_response_address(self): 

255 if not self.is_multicast_locally: 

256 return self 

257 

258 # Create a copy without pktinfo, as responses to messages received to 

259 # multicast addresses can not have their request's destination address 

260 # as source address 

261 return type(self)(self.sockaddr, self.interface) 

262 

263 @property 

264 def blockwise_key(self): 

265 return (self.sockaddr, self.pktinfo) 

266 

267 

268class SockExtendedErr( 

269 namedtuple( 

270 "_SockExtendedErr", "ee_errno ee_origin ee_type ee_code ee_pad ee_info ee_data" 

271 ) 

272): 

273 _struct = struct.Struct("IbbbbII") 

274 

275 @classmethod 

276 def load(cls, data): 

277 # unpack_from: recvmsg(2) says that more data may follow 

278 return cls(*cls._struct.unpack_from(data)) 

279 

280 

281class MessageInterfaceUDP6(RecvmsgDatagramProtocol, interfaces.MessageInterface): 

282 # Set between prepare_transport_endpoints and start_transport_endpoint 

283 _ctx: interfaces.MessageManager 

284 

285 def __init__(self, bind, log, loop): 

286 self.__bind = bind 

287 self.log = log 

288 self.loop = loop 

289 

290 self._shutting_down = ( 

291 None #: Future created and used in the .shutdown() method. 

292 ) 

293 

294 self.ready = asyncio.get_running_loop().create_future() #: Future that gets fulfilled by connection_made (ie. don't send before this is done; handled by ``create_..._context`` 

295 

296 # This is set while a send is underway to determine in the 

297 # error_received call site whom we were actually sending something to. 

298 # This is a workaround for the abysmal error handling unconnected 

299 # sockets have :-/ 

300 # 

301 # FIXME: Figure out whether aiocoap can at all support a context being 

302 # used with multiple aiocoap contexts, and if not, raise an error early 

303 # rather than just-in-case doing extra stuff here. 

304 self._remote_being_sent_to = contextvars.ContextVar( 

305 "_remote_being_sent_to", default=None 

306 ) 

307 

308 def __repr__(self): 

309 socket = self.transport.get_extra_info("socket") 

310 return f"<{type(self).__name__} at {id(self):#x} bound to {socket.getsockname() if socket else '(nothing)'}>" 

311 

312 def _local_port(self): 

313 # FIXME: either raise an error if this is 0, or send a message to self 

314 # to force the OS to decide on a port. Right now, this reports wrong 

315 # results while the first message has not been sent yet. 

316 return self.transport.get_extra_info("socket").getsockname()[1] 

317 

318 @classmethod 

319 async def prepare_transport_endpoints( 

320 cls, 

321 *, 

322 params: TransportParameters, 

323 log, 

324 loop, 

325 ): 

326 """Produces instances that do *not* accept messages yet. 

327 

328 After this, interconnect the instance with a message manager, and then 

329 call start_transport_endpoint() to receive messages. 

330 

331 This split setup is needed because UDP sockets do not have TCP's 

332 separation between bind() and accept(); were this not split, there is a 

333 danger of receiving datagrams that are not trigger ICMP errors, but can 

334 still not be processed fully because the token and message manager are 

335 not fully set up.""" 

336 

337 assert params.udp6 is not None, ( 

338 "Transport initiated without actual configuration" 

339 ) 

340 

341 bind = params.udp6.bind 

342 if bind is None and params._legacy_bind is not None: 

343 bind = [hostportjoin(*params._legacy_bind)] 

344 if bind is None: 

345 if params.is_server: 

346 bind = ["[::]"] 

347 else: 

348 bind = ["[::]:0"] 

349 

350 reuseport = params.udp6.reuse_port 

351 if reuseport is None: 

352 reuseport = defaults.has_reuse_port() 

353 

354 for joined in bind: 

355 (host, port) = hostportsplit(joined) 

356 if port is None: 

357 port = COAP_PORT 

358 

359 log.debug("Resolving address for %r (port %r) to bind to.", host, port) 

360 

361 # Using getaddrinfo before bind for multiple reasons: 

362 # 

363 # * getaddrinfo might produce multiple addresses, typical example 

364 # is `localhost`. 

365 # 

366 # * bind does not populate the zone identifier of an IPv6 address 

367 # from a %-suffix, making it impossible without a getaddrinfo (or 

368 # manual parsing into the tuple) to bind to a specific link-local 

369 # address. 

370 try: 

371 async for address in getaddrinfo( 

372 loop, 

373 log, 

374 host, 

375 port, 

376 ): 

377 log.debug("Found address %r, preparing transport for it.", address) 

378 # FIXME: 

379 # 

380 # * Should we tolerate not being able to bind to all addresses, 

381 # eg. so that an anycast server can still use its name? 

382 # * If so, can we find out before we yield and later bind, or 

383 # do we need to back down later? 

384 # 

385 # For the time being the resolution is that setup will fail 

386 # if not all; users of anycast / DNS-switched domains 

387 # better have an "this-instance-only" name, or give 

388 # addresses explicitly. 

389 

390 sock = socket.socket(family=socket.AF_INET6, type=socket.SOCK_DGRAM) 

391 if reuseport: 

392 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 

393 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) 

394 

395 try: 

396 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1) 

397 except NameError: 

398 raise RuntimeError( 

399 "RFC3542 PKTINFO flags are unavailable, unable to create a udp6 transport." 

400 ) 

401 if socknumbers.HAS_RECVERR: 

402 sock.setsockopt( 

403 socket.IPPROTO_IPV6, socknumbers.IPV6_RECVERR, 1 

404 ) 

405 # i'm curious why this is required; didn't IPV6_V6ONLY=0 already make 

406 # it clear that i don't care about the ip version as long as everything looks the same? 

407 sock.setsockopt(socket.IPPROTO_IP, socknumbers.IP_RECVERR, 1) 

408 else: 

409 log.warning( 

410 "Transport udp6 set up on platform without RECVERR capability. ICMP errors will be ignored." 

411 ) 

412 

413 interface_string: str 

414 for address_string, interface_string in sum( 

415 map( 

416 # Expand shortcut of "interface name means default CoAP all-nodes addresses" 

417 lambda i: ( 

418 [(a, i) for a in constants.MCAST_ALL] 

419 if isinstance(i, str) 

420 else [i] 

421 ), 

422 params._legacy_multicast or [], 

423 ), 

424 [], 

425 ): 

426 mcaddress = ipaddress.ip_address(address_string) 

427 interface = socket.if_nametoindex(interface_string) 

428 

429 if isinstance(mcaddress, ipaddress.IPv4Address): 

430 s = struct.pack( 

431 "4s4si", 

432 mcaddress.packed, 

433 socket.inet_aton("0.0.0.0"), 

434 interface, 

435 ) 

436 try: 

437 sock.setsockopt( 

438 socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, s 

439 ) 

440 except OSError: 

441 log.warning("Could not join IPv4 multicast group") 

442 

443 elif isinstance(mcaddress, ipaddress.IPv6Address): 

444 s = struct.pack("16si", mcaddress.packed, interface) 

445 try: 

446 sock.setsockopt( 

447 socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, s 

448 ) 

449 except OSError: 

450 log.warning("Could not join IPv6 multicast group") 

451 

452 else: 

453 raise RuntimeError("Unknown address format") 

454 

455 transport, protocol = await create_recvmsg_datagram_endpoint( 

456 loop, lambda: cls(bind=address, log=log, loop=loop), sock=sock 

457 ) 

458 

459 await protocol.ready 

460 

461 yield protocol 

462 except socket.gaierror: 

463 raise error.ResolutionError( 

464 "No local bindable address found for %s" % bind[0] 

465 ) 

466 

467 async def start_transport_endpoint(self): 

468 """Run the last phase of transport startup: actually binding. 

469 

470 This is done in a 2nd phase for reasons outlined in 

471 :meth:`MessageInterfaceUDP6.prepare_transport_endpoints`.""" 

472 self.log.debug( 

473 "Starting transport endpoint %r binding to %r", self, self.__bind 

474 ) 

475 # Should we special-case '[::]:0' here and just not call? Does it make any difference? 

476 self.transport.get_extra_info("socket").bind(self.__bind) 

477 

478 async def shutdown(self): 

479 self._shutting_down = asyncio.get_running_loop().create_future() 

480 

481 self.transport.close() 

482 

483 await self._shutting_down 

484 

485 del self._ctx 

486 

487 def send(self, message): 

488 ancdata = [] 

489 if message.remote.pktinfo is not None: 

490 ancdata.append( 

491 (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, message.remote.pktinfo) 

492 ) 

493 assert self._remote_being_sent_to.get(None) is None, ( 

494 "udp6.MessageInterfaceUDP6.send was reentered in a single task" 

495 ) 

496 self._remote_being_sent_to.set(message.remote) 

497 try: 

498 self.transport.sendmsg( 

499 message.encode(), ancdata, 0, message.remote.sockaddr 

500 ) 

501 finally: 

502 self._remote_being_sent_to.set(None) 

503 

504 async def recognize_remote(self, remote): 

505 return isinstance(remote, UDP6EndpointAddress) and remote.interface == self 

506 

507 async def determine_remote(self, request): 

508 if request.requested_scheme not in ("coap", None): 

509 return None 

510 

511 if request.unresolved_remote is not None: 

512 host, port = hostportsplit(request.unresolved_remote) 

513 port = port or COAP_PORT 

514 elif request.opt.uri_host: 

515 host = request.opt.uri_host 

516 if host.startswith("[") and host.endswith("]"): 

517 host = host[1:-1] 

518 port = request.opt.uri_port or COAP_PORT 

519 else: 

520 raise ValueError( 

521 "No location found to send message to (neither in .opt.uri_host nor in .remote)" 

522 ) 

523 

524 # Take aside the zone identifier. While it can pass through getaddrinfo 

525 # in some situations (eg. 'fe80::1234%eth0' will give 'fe80::1234' 

526 # scope eth0, and similar for ff02:: addresses), in others (eg. ff05::) 

527 # it gives 'Name or service not known'. 

528 

529 if "%" in host: 

530 host, zone = host.split("%", 1) 

531 try: 

532 zone = socket.if_nametoindex(zone) 

533 except OSError: 

534 raise error.ResolutionError("Invalid zone identifier %s" % zone) 

535 else: 

536 zone = None 

537 

538 try: 

539 # Note that this is our special addrinfo that ensures there is a 

540 # route. 

541 ip, port, flowinfo, scopeid = await getaddrinfo( 

542 self.loop, 

543 self.log, 

544 host, 

545 port, 

546 ).__anext__() 

547 except socket.gaierror: 

548 raise error.ResolutionError( 

549 "No address information found for requests to %r" % host 

550 ) 

551 

552 if zone is not None: 

553 # Still trying to preserve the information returned (libc can't do 

554 # it as described at 

555 # <https://unix.stackexchange.com/questions/174767/ipv6-zone-id-in-etc-hosts>) 

556 # in case something sane does come out of that. 

557 if scopeid != 0 and scopeid != zone: 

558 self.log.warning( 

559 "Resolved address of %s came with zone ID %d whereas explicit ID %d takes precedence", 

560 host, 

561 scopeid, 

562 zone, 

563 ) 

564 scopeid = zone 

565 

566 # We could be done here and return UDP6EndpointAddress(the reassembled 

567 # sockaddr, self), but: 

568 # 

569 # Linux (unlike FreeBSD) takes the sockaddr's scope ID only for 

570 # link-local scopes (as per ipv6(7), and discards it otherwise. It does 

571 # need the information of the selected interface, though, in order to 

572 # pick the right outgoing interface. Thus, we provide it in the local 

573 # portion. 

574 

575 if scopeid: 

576 # "Any" does not include "even be it IPv4" -- the underlying family 

577 # unfortunately needs to be set, or Linux will refuse to send. 

578 if ipaddress.IPv6Address(ip).ipv4_mapped is None: 

579 local_source = _ipv6_unspecified 

580 else: 

581 local_source = _ipv4_unspecified 

582 local = InterfaceOnlyPktinfo(_in6_pktinfo.pack(local_source, scopeid)) 

583 else: 

584 local = None 

585 

586 sockaddr = ip, port, flowinfo, scopeid 

587 result = UDP6EndpointAddress(sockaddr, self, pktinfo=local) 

588 if request.remote.maximum_block_size_exp < result.maximum_block_size_exp: 

589 result.maximum_block_size_exp = request.remote.maximum_block_size_exp 

590 return result 

591 

592 # 

593 # implementing the typical DatagramProtocol interfaces. 

594 # 

595 # note from the documentation: we may rely on connection_made to be called 

596 # before datagram_received -- but sending immediately after context 

597 # creation will still fail 

598 

599 def connection_made(self, transport): 

600 """Implementation of the DatagramProtocol interface, called by the transport.""" 

601 self.ready.set_result(True) 

602 self.transport = transport 

603 

604 def datagram_msg_received(self, data, ancdata, flags, address): 

605 """Implementation of the RecvmsgDatagramProtocol interface, called by the transport.""" 

606 pktinfo = None 

607 for cmsg_level, cmsg_type, cmsg_data in ancdata: 

608 if cmsg_level == socket.IPPROTO_IPV6 and cmsg_type == socket.IPV6_PKTINFO: 

609 pktinfo = cmsg_data 

610 else: 

611 self.log.info( 

612 "Received unexpected ancillary data to recvmsg: level %d, type %d, data %r", 

613 cmsg_level, 

614 cmsg_type, 

615 cmsg_data, 

616 ) 

617 if pktinfo is None: 

618 self.log.warning( 

619 "Did not receive requested pktinfo ancdata on message from %s", address 

620 ) 

621 try: 

622 message = Message.decode( 

623 data, UDP6EndpointAddress(address, self, pktinfo=pktinfo) 

624 ) 

625 except error.UnparsableMessage: 

626 self.log.warning("Ignoring unparsable message from %s", address) 

627 return 

628 message.direction = Direction.INCOMING 

629 

630 try: 

631 self._ctx.dispatch_message(message) 

632 except BaseException as exc: 

633 # Catching here because util.asyncio.recvmsg inherits 

634 # _SelectorDatagramTransport's bad handling of callback errors; 

635 # this is the last time we have a log at hand. 

636 self.log.error( 

637 "Exception raised through dispatch_message: %s", exc, exc_info=exc 

638 ) 

639 raise 

640 

641 def datagram_errqueue_received(self, data, ancdata, flags, address): 

642 assert flags == socknumbers.MSG_ERRQUEUE, ( 

643 "Received non-error data through the errqueue" 

644 ) 

645 pktinfo = None 

646 errno_value = None 

647 for cmsg_level, cmsg_type, cmsg_data in ancdata: 

648 assert cmsg_level == socket.IPPROTO_IPV6, ( 

649 "Received non-IPv6 protocol through the errqueue" 

650 ) 

651 if cmsg_type == socknumbers.IPV6_RECVERR: 

652 extended_err = SockExtendedErr.load(cmsg_data) 

653 self.log.debug("Socket error received, details: %s", extended_err) 

654 errno_value = extended_err.ee_errno 

655 elif cmsg_level == socket.IPPROTO_IPV6 and cmsg_type == socket.IPV6_PKTINFO: 

656 pktinfo = cmsg_data 

657 else: 

658 self.log.info( 

659 "Received unexpected ancillary data to recvmsg errqueue: level %d, type %d, data %r", 

660 cmsg_level, 

661 cmsg_type, 

662 cmsg_data, 

663 ) 

664 remote = UDP6EndpointAddress(address, self, pktinfo=pktinfo) 

665 

666 # not trying to decode a message from data -- that works for 

667 # "connection refused", doesn't work for "no route to host", and 

668 # anyway, when an icmp error comes back, everything pending from that 

669 # port should err out. 

670 

671 try: 

672 text = os.strerror(errno_value) 

673 symbol = errno.errorcode.get(errno_value, None) 

674 symbol = "" if symbol is None else f"{symbol}, " 

675 self._ctx.dispatch_error( 

676 OSError(errno_value, f"{text} ({symbol}received through errqueue)"), 

677 remote, 

678 ) 

679 except BaseException as exc: 

680 # Catching here because util.asyncio.recvmsg inherits 

681 # _SelectorDatagramTransport's bad handling of callback errors; 

682 # this is the last time we have a log at hand. 

683 self.log.error( 

684 "Exception raised through dispatch_error: %s", exc, exc_info=exc 

685 ) 

686 raise 

687 

688 def error_received(self, exc): 

689 """Implementation of the DatagramProtocol interface, called by the transport.""" 

690 

691 remote = self._remote_being_sent_to.get() 

692 

693 if remote is None: 

694 self.log.info( 

695 "Error received in situation with no way to to determine which sending caused the error; this should be accompanied by an error in another code path: %s", 

696 exc, 

697 ) 

698 return 

699 

700 if exc.errno == errno.ENETUNREACH and self.__bind[0] != "::": 

701 self.log.warning( 

702 "Forwarding ENETUNREACH from a UDP6 connection that is not bound to [::]. Consider having a `[::]:0` binding at the start of the list to initiate requests as a client on both IP families." 

703 ) 

704 

705 try: 

706 self._ctx.dispatch_error(exc, remote) 

707 except BaseException as exc: 

708 # Catching here because util.asyncio.recvmsg inherits 

709 # _SelectorDatagramTransport's bad handling of callback errors; 

710 # this is the last time we have a log at hand. 

711 self.log.error( 

712 "Exception raised through dispatch_error: %s", exc, exc_info=exc 

713 ) 

714 raise 

715 

716 def connection_lost(self, exc): 

717 # TODO better error handling -- find out what can cause this at all 

718 # except for a shutdown 

719 if exc is not None: 

720 self.log.error("Connection lost: %s", exc) 

721 

722 if self._shutting_down is None: 

723 self.log.error("Connection loss was not expected.") 

724 else: 

725 self._shutting_down.set_result(None)