Coverage for aiocoap / cli / client.py: 72%

368 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 12:28 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers""" 

6 

7import copy 

8import sys 

9import asyncio 

10import argparse 

11import logging 

12import signal 

13import subprocess 

14from pathlib import Path 

15 

16import shlex 

17 

18# even though not used directly, this has side effects on the input() function 

19# used in interactive mode 

20try: 

21 import readline # noqa: F401 

22except ImportError: 

23 pass # that's normal on some platforms, and ok since it's just a usability enhancement 

24 

25import aiocoap 

26import aiocoap.config 

27import aiocoap.defaults 

28import aiocoap.meta 

29import aiocoap.proxy.client 

30from aiocoap.util import contenttype 

31from aiocoap.util.cli import ActionNoYes 

32from aiocoap.numbers import ContentFormat 

33 

34log = logging.getLogger("coap.aiocoap-client") 

35 

36 

37def augment_parser_for_global(p, *, prescreen=False): 

38 p.add_argument( 

39 "--config", 

40 help="Configuration file to load", 

41 type=Path, 

42 ) 

43 p.add_argument( 

44 "-v", 

45 "--verbose", 

46 help="Increase the debug output", 

47 action="count", 

48 ) 

49 p.add_argument( 

50 "-q", 

51 "--quiet", 

52 help="Decrease the debug output", 

53 action="count", 

54 ) 

55 p.add_argument( 

56 "--version", action="version", version="%(prog)s " + aiocoap.meta.version 

57 ) 

58 

59 p.add_argument( 

60 "--interactive", 

61 help="Enter interactive mode. Combine with --help or run `help` interactively to see which options apply where; some can be used globally and overwritten locally.", 

62 action="store_true", 

63 ) 

64 

65 

66def augment_parser_for_either(p): 

67 p.add_argument( 

68 "--color", 

69 help="Color output (default on TTYs if all required modules are installed)", 

70 default=None, 

71 action=ActionNoYes, 

72 ) 

73 p.add_argument( 

74 "--pretty-print", 

75 help="Pretty-print known content formats (default on TTYs if all required modules are installed)", 

76 default=None, 

77 action=ActionNoYes, 

78 ) 

79 p.add_argument( 

80 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI" 

81 ) 

82 p.add_argument( 

83 "--credentials", 

84 help="Load credentials to use from a given file", 

85 type=Path, 

86 ) 

87 p.add_argument( 

88 "--no-set-hostname", 

89 help="Suppress transmission of Uri-Host even if the host name is not an IP literal", 

90 dest="set_hostname", 

91 action="store_false", 

92 default=True, 

93 ) 

94 p.add_argument( 

95 "--no-sec", 

96 # Can be "Send request without any security" once it actually does 

97 # anything; until then, it's fine as a no-op to not impede changing 

98 # scripts later. 

99 help=argparse.SUPPRESS, 

100 dest="sec", 

101 action="store_false", 

102 default=None, 

103 ) 

104 

105 

106def augment_parser_for_interactive(p, *, prescreen=False): 

107 p.add_argument( 

108 "--non", 

109 help="Send request without reliable transport (e.g. over UDP: as non-confirmable (NON) message)", 

110 action="store_true", 

111 ) 

112 p.add_argument( 

113 "-m", 

114 "--method", 

115 help="Name or number of request method to use (default: %(default)s)", 

116 default="GET", 

117 ) 

118 p.add_argument( 

119 "--observe", help="Register an observation on the resource", action="store_true" 

120 ) 

121 p.add_argument( 

122 "--observe-exec", 

123 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin", 

124 metavar="CMD", 

125 ) 

126 p.add_argument( 

127 "--accept", 

128 help="Content format to request", 

129 metavar="MIME", 

130 ) 

131 p.add_argument( 

132 "--payload", 

133 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.", 

134 metavar="X", 

135 ) 

136 p.add_argument( 

137 "--payload-initial-szx", 

138 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)", 

139 metavar="SZX", 

140 type=int, 

141 ) 

142 p.add_argument( 

143 "--content-format", 

144 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.", 

145 metavar="MIME", 

146 ) 

147 p.add_argument( 

148 "url", 

149 nargs="?" if prescreen else None, 

150 help="CoAP address to fetch", 

151 ) 

152 

153 

154def build_parser(*, use_global=True, use_interactive=True, prescreen=False): 

155 p = argparse.ArgumentParser(description=__doc__, add_help=not prescreen) 

156 if prescreen: 

157 p.add_argument("--help", action="store_true") 

158 if use_global: 

159 augment_parser_for_global(p, prescreen=prescreen) 

160 augment_parser_for_either(p) 

161 if use_interactive: 

162 augment_parser_for_interactive(p, prescreen=prescreen) 

163 

164 return p 

165 

166 

167def configure_logging(verbosity, color): 

168 if color is not False: 

169 try: 

170 import colorlog 

171 except ImportError: 

172 color = False 

173 else: 

174 colorlog.basicConfig() 

175 if not color: 

176 logging.basicConfig() 

177 

178 if verbosity <= -2: 

179 logging.getLogger("coap").setLevel(logging.CRITICAL + 1) 

180 elif verbosity == -1: 

181 logging.getLogger("coap").setLevel(logging.ERROR) 

182 elif verbosity == 0: 

183 logging.getLogger("coap").setLevel(logging.WARNING) 

184 elif verbosity == 1: 

185 logging.getLogger("coap").setLevel(logging.WARNING) 

186 logging.getLogger("coap.aiocoap-client").setLevel(logging.INFO) 

187 elif verbosity == 2: 

188 logging.getLogger("coap").setLevel(logging.INFO) 

189 elif verbosity >= 3: 

190 logging.getLogger("coap").setLevel(logging.DEBUG) 

191 elif verbosity >= 4: 

192 logging.getLogger("coap").setLevel(0) 

193 

194 log.debug("Logging configured.") 

195 

196 

197def colored(text, options, tokenlambda): 

198 """Apply pygments based coloring if options.color is set. Tokelambda is a 

199 callback to which pygments.token is passed and which returns a token type; 

200 this makes it easy to not need to conditionally react to pygments' possible 

201 absence in all color locations.""" 

202 if not options.color: 

203 return str(text) 

204 

205 from pygments.formatters import TerminalFormatter 

206 from pygments import token, format 

207 

208 return format( 

209 [(tokenlambda(token), str(text))], 

210 TerminalFormatter(), 

211 ) 

212 

213 

214def incoming_observation(options, response): 

215 log.info("Received Observe notification:") 

216 for line in message_to_text(response, "from"): 

217 log.info(line) 

218 

219 if options.observe_exec: 

220 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE) 

221 # FIXME this blocks 

222 p.communicate(response.payload) 

223 else: 

224 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc)) 

225 sys.stdout.write("\n") 

226 if response.code.is_successful(): 

227 present(response, options, file=sys.stderr) 

228 else: 

229 print( 

230 colored( 

231 response.code, options, lambda token: token.Token.Generic.Error 

232 ), 

233 file=sys.stderr, 

234 ) 

235 if response.payload: 

236 present(response, options, file=sys.stderr) 

237 sys.stdout.flush() 

238 

239 

240def apply_credentials(context, credentials, errfn): 

241 try: 

242 if credentials.suffix == ".json": 

243 import json 

244 

245 context.client_credentials.load_from_dict(json.load(credentials.open("rb"))) 

246 elif credentials.suffix == ".diag": 

247 try: 

248 import cbor_diag 

249 import cbor2 

250 except ImportError: 

251 raise errfn( 

252 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package" 

253 ) 

254 context.client_credentials.load_from_dict( 

255 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read())) 

256 ) 

257 else: 

258 raise errfn( 

259 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix) 

260 ) 

261 except FileNotFoundError as e: 

262 raise errfn("Credential file not found: %s" % e.filename) 

263 except (OSError, ValueError) as e: 

264 # Any of the parsers could reasonably raise those, and while they don't 

265 # have HelpfulError support, they should still not render a backtrace 

266 # but a proper CLI error. 

267 raise errfn("Processing credential file: %s" % e) 

268 

269 

270def message_to_text(m, direction): 

271 """Convert a message to a text form similar to how they are shown in RFCs. 

272 

273 Refactoring this into a message method will need to address the direction 

274 discovery eventually.""" 

275 if m.remote is None: 

276 # This happens when unprocessable remotes are, eg. putting in an HTTP URI 

277 yield f"{m.code} {direction} (unknown)" 

278 else: 

279 # FIXME: Update when transport-indication is available 

280 # FIXME: This is slightly wrong because it does not account for what ProxyRedirector does 

281 yield f"{m.code} {direction} {m.remote.scheme}://{m.remote.hostinfo}" 

282 for opt in m.opt.option_list(): 

283 if hasattr(opt.number, "name"): 

284 yield f"- {opt.number.name_printable} ({opt.number.value}): {opt.value!r}" 

285 else: 

286 yield f"- {opt.number.value}: {opt.value!r}" 

287 if m.payload: 

288 limit = 16 

289 if len(m.payload) > limit: 

290 yield f"Payload: {m.payload[:limit].hex()}... ({len(m.payload)} bytes total)" 

291 else: 

292 yield f"Payload: {m.payload[:limit].hex()} ({len(m.payload)} bytes)" 

293 else: 

294 yield "No payload" 

295 

296 

297def present(message, options, file=sys.stdout): 

298 """Write a message payload to the output, pretty printing and/or coloring 

299 it as configured in the options.""" 

300 if not options.quiet and (message.opt.location_path or message.opt.location_query): 

301 # FIXME: Percent encoding is completely missing; this would be done 

302 # most easily with a CRI library 

303 location_ref = "/" + "/".join(message.opt.location_path) 

304 if message.opt.location_query: 

305 location_ref += "?" + "&".join(message.opt.location_query) 

306 print( 

307 colored( 

308 f"Location options indicate new resource: {location_ref}", 

309 options, 

310 lambda token: token.Token.Generic.Inserted, 

311 ), 

312 file=sys.stderr, 

313 ) 

314 

315 if not message.payload: 

316 return 

317 

318 payload = None 

319 

320 cf = message.opt.content_format 

321 

322 if cf is None: 

323 if message.code.is_successful(): 

324 cf = message.request.opt.content_format 

325 else: 

326 cf = ContentFormat.TEXT 

327 

328 if cf is not None and cf.is_known(): 

329 mime = cf.media_type 

330 else: 

331 mime = "application/octet-stream" 

332 if options.pretty_print: 

333 from aiocoap.util.prettyprint import pretty_print 

334 

335 prettyprinted = pretty_print(message) 

336 if prettyprinted is not None: 

337 (infos, mime, payload) = prettyprinted 

338 if not options.quiet: 

339 for i in infos: 

340 print( 

341 colored("# " + i, options, lambda token: token.Comment), 

342 file=sys.stderr, 

343 ) 

344 

345 color = options.color 

346 if color: 

347 from aiocoap.util.prettyprint import lexer_for_mime 

348 import pygments 

349 

350 try: 

351 lexer = lexer_for_mime(mime) 

352 except pygments.util.ClassNotFound: 

353 color = False 

354 

355 if color and payload is None: 

356 # Coloring requires a unicode-string style payload, either from the 

357 # mime type or from the pretty printer. 

358 try: 

359 payload = message.payload.decode("utf8") 

360 except UnicodeDecodeError: 

361 color = False 

362 

363 if color: 

364 from pygments.formatters import TerminalFormatter 

365 from pygments import highlight 

366 

367 highlit = highlight( 

368 payload, 

369 lexer, 

370 TerminalFormatter(), 

371 ) 

372 # The TerminalFormatter already adds an end-of-line character, not 

373 # trying to add one for any missing trailing newlines. 

374 print(highlit, file=file, end="") 

375 file.flush() 

376 else: 

377 if payload is None: 

378 file.buffer.write(message.payload) 

379 if file.isatty() and message.payload[-1:] != b"\n": 

380 file.write("\n") 

381 else: 

382 file.write(payload) 

383 if file.isatty() and payload[-1] != "\n": 

384 file.write("\n") 

385 

386 

387async def single_request(args, context, globalopts=None): 

388 parser = build_parser(use_global=globalopts is None) 

389 options = parser.parse_args(args, copy.copy(globalopts)) 

390 

391 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules() 

392 if pretty_print_modules and (options.color is True or options.pretty_print is True): 

393 parser.error( 

394 "Color and pretty printing require the following" 

395 " additional module(s) to be installed: %s" 

396 % ", ".join(pretty_print_modules) 

397 ) 

398 if options.color is None: 

399 options.color = sys.stdout.isatty() and not pretty_print_modules 

400 if options.pretty_print is None: 

401 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules 

402 

403 try: 

404 try: 

405 code = getattr( 

406 aiocoap.numbers.codes.Code, 

407 options.method.upper().replace("IPATCH", "iPATCH"), 

408 ) 

409 except AttributeError: 

410 try: 

411 code = aiocoap.numbers.codes.Code(int(options.method)) 

412 except ValueError: 

413 raise parser.error("Unknown method") 

414 

415 if options.credentials is not None: 

416 apply_credentials(context, options.credentials, parser.error) 

417 

418 request = aiocoap.Message( 

419 code=code, 

420 transport_tuning=aiocoap.Unreliable if options.non else aiocoap.Reliable, 

421 ) 

422 request.set_request_uri(options.url, set_uri_host=options.set_hostname) 

423 

424 if options.accept: 

425 try: 

426 request.opt.accept = ContentFormat(int(options.accept)) 

427 except ValueError: 

428 try: 

429 request.opt.accept = ContentFormat.by_media_type(options.accept) 

430 except KeyError: 

431 raise parser.error("Unknown accept type") 

432 

433 if options.observe: 

434 request.opt.observe = 0 

435 

436 if options.content_format: 

437 try: 

438 request.opt.content_format = ContentFormat(int(options.content_format)) 

439 except ValueError: 

440 try: 

441 request.opt.content_format = ContentFormat.by_media_type( 

442 options.content_format 

443 ) 

444 except KeyError: 

445 raise parser.error("Unknown content format") 

446 

447 if options.payload: 

448 if options.payload.startswith("@"): 

449 filename = options.payload[1:] 

450 if filename == "-": 

451 f = sys.stdin.buffer 

452 else: 

453 f = open(filename, "rb") 

454 try: 

455 request.payload = f.read() 

456 except OSError as e: 

457 raise parser.error("File could not be opened: %s" % e) 

458 else: 

459 request_classification = contenttype.categorize( 

460 request.opt.content_format.media_type 

461 if request.opt.content_format is not None 

462 and request.opt.content_format.is_known() 

463 else "" 

464 ) 

465 if request_classification in ("cbor", "cbor-seq"): 

466 try: 

467 import cbor_diag 

468 except ImportError as e: 

469 raise parser.error(f"CBOR recoding not available ({e})") 

470 

471 try: 

472 encoded = cbor_diag.diag2cbor(options.payload) 

473 except ValueError as e: 

474 raise parser.error( 

475 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}" 

476 ) 

477 

478 if request_classification == "cbor-seq": 

479 try: 

480 import cbor2 

481 except ImportError as e: 

482 raise parser.error( 

483 f"CBOR sequence recoding not available ({e})" 

484 ) 

485 decoded = cbor2.loads(encoded) 

486 if not isinstance(decoded, list): 

487 raise parser.error( 

488 "CBOR sequence recoding requires an array as the top-level element." 

489 ) 

490 request.payload = b"".join(cbor2.dumps(d) for d in decoded) 

491 else: 

492 request.payload = encoded 

493 else: 

494 request.payload = options.payload.encode("utf8") 

495 

496 if options.payload_initial_szx is not None: 

497 request.remote.maximum_block_size_exp = options.payload_initial_szx 

498 

499 if options.proxy is None or options.proxy in ("none", "", "-"): 

500 interface = context 

501 else: 

502 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context) 

503 

504 requested_uri = request.get_request_uri() 

505 

506 log.info("Sending request:") 

507 for line in message_to_text(request, "to"): 

508 log.info(line) 

509 

510 requester = interface.request(request) 

511 

512 response_data = await requester.response 

513 

514 log.info("Received response:") 

515 for line in message_to_text(response_data, "from"): 

516 log.info(line) 

517 

518 response_uri = response_data.get_request_uri() 

519 if requested_uri != response_uri: 

520 print( 

521 colored( 

522 f"Response arrived from different address; base URI is {response_uri}", 

523 options, 

524 lambda token: token.Generic.Inserted, 

525 ), 

526 file=sys.stderr, 

527 ) 

528 if response_data.code.is_successful(): 

529 present(response_data, options) 

530 else: 

531 print( 

532 colored(response_data.code, options, lambda token: token.Generic.Error), 

533 file=sys.stderr, 

534 ) 

535 present(response_data, options, file=sys.stderr) 

536 sys.exit(1) 

537 

538 if options.observe: 

539 try: 

540 async for notification in requester.observation: 

541 incoming_observation(options, notification) 

542 except Exception as exit_reason: 

543 print("Observation is over: %r" % (exit_reason,), file=sys.stderr) 

544 except aiocoap.error.HelpfulError as e: 

545 print(str(e), file=sys.stderr) 

546 extra_help = e.extra_help( 

547 hints=dict( 

548 original_uri=options.url, 

549 request=request, 

550 ) 

551 ) 

552 if extra_help: 

553 print("Debugging hint:", extra_help, file=sys.stderr) 

554 sys.exit(1) 

555 # Fallback while not all backends raise NetworkErrors 

556 except OSError as e: 

557 text = str(e) 

558 if not text: 

559 text = repr(e) 

560 if not text: 

561 # eg ConnectionResetError flying out of a misconfigured SSL server 

562 text = type(e) 

563 print( 

564 "Warning: OS errors should not be raised this way any more.", 

565 file=sys.stderr, 

566 ) 

567 # not telling what to do precisely: the form already tells users to 

568 # include `aiocoap.cli.defaults` output, which is exactly what we 

569 # need. 

570 print( 

571 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.", 

572 file=sys.stderr, 

573 ) 

574 print("Error:", text, file=sys.stderr) 

575 sys.exit(1) 

576 

577 

578async def single_request_with_context(args, config): 

579 """Wrapper around single_request until sync_main gets made fully async, and 

580 async context managers are used to manage contexts.""" 

581 context = await aiocoap.Context.create_client_context(transports=config.transport) 

582 try: 

583 await single_request(args, context) 

584 finally: 

585 await context.shutdown() 

586 

587 

588interactive_expecting_keyboard_interrupt = None 

589 

590 

591async def interactive(globalopts, config): 

592 global interactive_expecting_keyboard_interrupt 

593 interactive_expecting_keyboard_interrupt = ( 

594 asyncio.get_running_loop().create_future() 

595 ) 

596 

597 context = await aiocoap.Context.create_client_context(transports=config.transport) 

598 

599 while True: 

600 try: 

601 # when http://bugs.python.org/issue22412 is resolved, use that instead 

602 line = await asyncio.get_running_loop().run_in_executor( 

603 None, lambda: input("aiocoap> ") 

604 ) 

605 except EOFError: 

606 line = "exit" 

607 line = shlex.split(line) 

608 if not line: 

609 continue 

610 if line in (["help"], ["?"]): 

611 line = ["--help"] 

612 if line in (["quit"], ["q"], ["exit"]): 

613 break 

614 

615 async def single_request_noexit(*args, **kwargs): 

616 """Protects a run against the exit automatically generated by 

617 argparse errors or help""" 

618 try: 

619 # We could also set exit_on_error, but this way we do get the 

620 # original error code and can show that; might still revisit. 

621 await single_request(*args, **kwargs) 

622 except SystemExit as e: 

623 return e.code 

624 else: 

625 return 0 

626 

627 current_task = asyncio.create_task( 

628 single_request_noexit(line, context=context, globalopts=globalopts), 

629 name="Interactive prompt command %r" % line, 

630 ) 

631 interactive_expecting_keyboard_interrupt = ( 

632 asyncio.get_running_loop().create_future() 

633 ) 

634 

635 done, pending = await asyncio.wait( 

636 [current_task, interactive_expecting_keyboard_interrupt], 

637 return_when=asyncio.FIRST_COMPLETED, 

638 ) 

639 

640 if current_task not in done: 

641 current_task.cancel() 

642 else: 

643 try: 

644 code = await current_task 

645 except Exception as e: 

646 print("Unhandled exception raised: %s" % (e,)) 

647 if code != 0: 

648 print("Exit code: %d" % code, file=sys.stderr) 

649 

650 await context.shutdown() 

651 

652 

653async def main(args=None): 

654 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet 

655 if args is None: 

656 args = sys.argv[1:] 

657 

658 # This one is tolerant and doesn't even terminate with --help, so that 

659 # --help and --interactive --help can do the right thing. 

660 first_parser = build_parser(prescreen=True) 

661 first_args = first_parser.parse_args(args) 

662 

663 configure_logging( 

664 (first_args.verbose or 0) - (first_args.quiet or 0), first_args.color 

665 ) 

666 

667 if first_args.config is not None: 

668 try: 

669 config = aiocoap.config.Config.load_from_file(first_args.config) 

670 except ValueError as e: 

671 first_parser.error(f"Error loading file {first_args.config!r}: {e}") 

672 else: 

673 config = aiocoap.config.Config() 

674 

675 if not first_args.interactive: 

676 try: 

677 await single_request_with_context(args, config) 

678 except asyncio.exceptions.CancelledError: 

679 sys.exit(3) 

680 else: 

681 global_parser = build_parser(use_interactive=False) 

682 globalopts = global_parser.parse_args(args) 

683 

684 loop = asyncio.get_running_loop() 

685 

686 def ctrl_c(): 

687 try: 

688 interactive_expecting_keyboard_interrupt.set_result(None) 

689 except asyncio.exceptions.InvalidStateError: 

690 # Too many Ctlr-C before the program could clean up 

691 sys.exit(3) 

692 

693 loop.add_signal_handler(signal.SIGINT, ctrl_c) 

694 

695 await interactive(globalopts, config) 

696 

697 

698def sync_main(args=None): 

699 asyncio.run(main(args=args)) 

700 

701 

702if __name__ == "__main__": 

703 sync_main()