Coverage for aiocoap/cli/client.py: 72%

360 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-09-30 11:17 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers""" 

6 

7import copy 

8import sys 

9import asyncio 

10import argparse 

11import logging 

12import signal 

13import subprocess 

14from pathlib import Path 

15 

16import shlex 

17 

18# even though not used directly, this has side effects on the input() function 

19# used in interactive mode 

20try: 

21 import readline # noqa: F401 

22except ImportError: 

23 pass # that's normal on some platforms, and ok since it's just a usability enhancement 

24 

25import aiocoap 

26import aiocoap.defaults 

27import aiocoap.meta 

28import aiocoap.proxy.client 

29from aiocoap.util import contenttype 

30from aiocoap.util.cli import ActionNoYes 

31from aiocoap.numbers import ContentFormat 

32 

33log = logging.getLogger("coap.aiocoap-client") 

34 

35 

36def augment_parser_for_global(p, *, prescreen=False): 

37 p.add_argument( 

38 "-v", 

39 "--verbose", 

40 help="Increase the debug output", 

41 action="count", 

42 ) 

43 p.add_argument( 

44 "-q", 

45 "--quiet", 

46 help="Decrease the debug output", 

47 action="count", 

48 ) 

49 p.add_argument( 

50 "--version", action="version", version="%(prog)s " + aiocoap.meta.version 

51 ) 

52 

53 p.add_argument( 

54 "--interactive", 

55 help="Enter interactive mode. Combine with --help or run `help` interactively to see which options apply where; some can be used globally and overwritten locally.", 

56 action="store_true", 

57 ) 

58 

59 

60def augment_parser_for_either(p): 

61 p.add_argument( 

62 "--color", 

63 help="Color output (default on TTYs if all required modules are installed)", 

64 default=None, 

65 action=ActionNoYes, 

66 ) 

67 p.add_argument( 

68 "--pretty-print", 

69 help="Pretty-print known content formats (default on TTYs if all required modules are installed)", 

70 default=None, 

71 action=ActionNoYes, 

72 ) 

73 p.add_argument( 

74 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI" 

75 ) 

76 p.add_argument( 

77 "--credentials", 

78 help="Load credentials to use from a given file", 

79 type=Path, 

80 ) 

81 p.add_argument( 

82 "--no-set-hostname", 

83 help="Suppress transmission of Uri-Host even if the host name is not an IP literal", 

84 dest="set_hostname", 

85 action="store_false", 

86 default=True, 

87 ) 

88 

89 

90def augment_parser_for_interactive(p, *, prescreen=False): 

91 p.add_argument( 

92 "--non", 

93 help="Send request as non-confirmable (NON) message", 

94 action="store_true", 

95 ) 

96 p.add_argument( 

97 "-m", 

98 "--method", 

99 help="Name or number of request method to use (default: %(default)s)", 

100 default="GET", 

101 ) 

102 p.add_argument( 

103 "--observe", help="Register an observation on the resource", action="store_true" 

104 ) 

105 p.add_argument( 

106 "--observe-exec", 

107 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin", 

108 metavar="CMD", 

109 ) 

110 p.add_argument( 

111 "--accept", 

112 help="Content format to request", 

113 metavar="MIME", 

114 ) 

115 p.add_argument( 

116 "--payload", 

117 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.", 

118 metavar="X", 

119 ) 

120 p.add_argument( 

121 "--payload-initial-szx", 

122 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)", 

123 metavar="SZX", 

124 type=int, 

125 ) 

126 p.add_argument( 

127 "--content-format", 

128 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.", 

129 metavar="MIME", 

130 ) 

131 p.add_argument( 

132 "url", 

133 nargs="?" if prescreen else None, 

134 help="CoAP address to fetch", 

135 ) 

136 

137 

138def build_parser(*, use_global=True, use_interactive=True, prescreen=False): 

139 p = argparse.ArgumentParser(description=__doc__, add_help=not prescreen) 

140 if prescreen: 

141 p.add_argument("--help", action="store_true") 

142 if use_global: 

143 augment_parser_for_global(p, prescreen=prescreen) 

144 augment_parser_for_either(p) 

145 if use_interactive: 

146 augment_parser_for_interactive(p, prescreen=prescreen) 

147 

148 return p 

149 

150 

151def configure_logging(verbosity, color): 

152 if color is not False: 

153 try: 

154 import colorlog 

155 except ImportError: 

156 color = False 

157 else: 

158 colorlog.basicConfig() 

159 if not color: 

160 logging.basicConfig() 

161 

162 if verbosity <= -2: 

163 logging.getLogger("coap").setLevel(logging.CRITICAL + 1) 

164 elif verbosity == -1: 

165 logging.getLogger("coap").setLevel(logging.ERROR) 

166 elif verbosity == 0: 

167 logging.getLogger("coap").setLevel(logging.WARNING) 

168 elif verbosity == 1: 

169 logging.getLogger("coap").setLevel(logging.WARNING) 

170 logging.getLogger("coap.aiocoap-client").setLevel(logging.INFO) 

171 elif verbosity == 2: 

172 logging.getLogger("coap").setLevel(logging.INFO) 

173 elif verbosity >= 3: 

174 logging.getLogger("coap").setLevel(logging.DEBUG) 

175 elif verbosity >= 4: 

176 logging.getLogger("coap").setLevel(0) 

177 

178 log.debug("Logging configured.") 

179 

180 

181def colored(text, options, tokenlambda): 

182 """Apply pygments based coloring if options.color is set. Tokelambda is a 

183 callback to which pygments.token is passed and which returns a token type; 

184 this makes it easy to not need to conditionally react to pygments' possible 

185 absence in all color locations.""" 

186 if not options.color: 

187 return str(text) 

188 

189 from pygments.formatters import TerminalFormatter 

190 from pygments import token, format 

191 

192 return format( 

193 [(tokenlambda(token), str(text))], 

194 TerminalFormatter(), 

195 ) 

196 

197 

198def incoming_observation(options, response): 

199 log.info("Received Observe notification:") 

200 for line in message_to_text(response, "from"): 

201 log.info(line) 

202 

203 if options.observe_exec: 

204 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE) 

205 # FIXME this blocks 

206 p.communicate(response.payload) 

207 else: 

208 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc)) 

209 if response.code.is_successful(): 

210 present(response, options, file=sys.stderr) 

211 else: 

212 sys.stdout.flush() 

213 print( 

214 colored( 

215 response.code, options, lambda token: token.Token.Generic.Error 

216 ), 

217 file=sys.stderr, 

218 ) 

219 if response.payload: 

220 present(response, options, file=sys.stderr) 

221 

222 

223def apply_credentials(context, credentials, errfn): 

224 try: 

225 if credentials.suffix == ".json": 

226 import json 

227 

228 context.client_credentials.load_from_dict(json.load(credentials.open("rb"))) 

229 elif credentials.suffix == ".diag": 

230 try: 

231 import cbor_diag 

232 import cbor2 

233 except ImportError: 

234 raise errfn( 

235 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package" 

236 ) 

237 context.client_credentials.load_from_dict( 

238 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read())) 

239 ) 

240 else: 

241 raise errfn( 

242 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix) 

243 ) 

244 except FileNotFoundError as e: 

245 raise errfn("Credential file not found: %s" % e.filename) 

246 except (OSError, ValueError) as e: 

247 # Any of the parsers could reasonably raise those, and while they don't 

248 # have HelpfulError support, they should still not render a backtrace 

249 # but a proper CLI error. 

250 raise errfn("Processing credential file: %s" % e) 

251 

252 

253def message_to_text(m, direction): 

254 """Convert a message to a text form similar to how they are shown in RFCs. 

255 

256 Refactoring this into a message method will need to address the direction 

257 discovery eventually.""" 

258 if m.remote is None: 

259 # This happens when unprocessable remotes are, eg. putting in an HTTP URI 

260 yield f"{m.code} {direction} (unknown)" 

261 else: 

262 # FIXME: Update when transport-indication is available 

263 # FIXME: This is slightly wrong because it does not account for what ProxyRedirector does 

264 yield f"{m.code} {direction} {m.remote.scheme}://{m.remote.hostinfo}" 

265 for opt in m.opt.option_list(): 

266 if hasattr(opt.number, "name"): 

267 yield f"- {opt.number.name_printable} ({opt.number.value}): {opt.value!r}" 

268 else: 

269 yield f"- {opt.number.value}: {opt.value!r}" 

270 if m.payload: 

271 limit = 16 

272 if len(m.payload) > limit: 

273 yield f"Payload: {m.payload[:limit].hex()}... ({len(m.payload)} bytes total)" 

274 else: 

275 yield f"Payload: {m.payload[:limit].hex()} ({len(m.payload)} bytes)" 

276 else: 

277 yield "No payload" 

278 

279 

280def present(message, options, file=sys.stdout): 

281 """Write a message payload to the output, pretty printing and/or coloring 

282 it as configured in the options.""" 

283 if not options.quiet and (message.opt.location_path or message.opt.location_query): 

284 # FIXME: Percent encoding is completely missing; this would be done 

285 # most easily with a CRI library 

286 location_ref = "/" + "/".join(message.opt.location_path) 

287 if message.opt.location_query: 

288 location_ref += "?" + "&".join(message.opt.location_query) 

289 print( 

290 colored( 

291 f"Location options indicate new resource: {location_ref}", 

292 options, 

293 lambda token: token.Token.Generic.Inserted, 

294 ), 

295 file=sys.stderr, 

296 ) 

297 

298 if not message.payload: 

299 return 

300 

301 payload = None 

302 

303 cf = message.opt.content_format or message.request.opt.content_format 

304 if cf is not None and cf.is_known(): 

305 mime = cf.media_type 

306 else: 

307 mime = "application/octet-stream" 

308 if options.pretty_print: 

309 from aiocoap.util.prettyprint import pretty_print 

310 

311 prettyprinted = pretty_print(message) 

312 if prettyprinted is not None: 

313 (infos, mime, payload) = prettyprinted 

314 if not options.quiet: 

315 for i in infos: 

316 print( 

317 colored("# " + i, options, lambda token: token.Comment), 

318 file=sys.stderr, 

319 ) 

320 

321 color = options.color 

322 if color: 

323 from aiocoap.util.prettyprint import lexer_for_mime 

324 import pygments 

325 

326 try: 

327 lexer = lexer_for_mime(mime) 

328 except pygments.util.ClassNotFound: 

329 color = False 

330 

331 if color and payload is None: 

332 # Coloring requires a unicode-string style payload, either from the 

333 # mime type or from the pretty printer. 

334 try: 

335 payload = message.payload.decode("utf8") 

336 except UnicodeDecodeError: 

337 color = False 

338 

339 if color: 

340 from pygments.formatters import TerminalFormatter 

341 from pygments import highlight 

342 

343 highlit = highlight( 

344 payload, 

345 lexer, 

346 TerminalFormatter(), 

347 ) 

348 # The TerminalFormatter already adds an end-of-line character, not 

349 # trying to add one for any missing trailing newlines. 

350 print(highlit, file=file, end="") 

351 file.flush() 

352 else: 

353 if payload is None: 

354 file.buffer.write(message.payload) 

355 if file.isatty() and message.payload[-1:] != b"\n": 

356 file.write("\n") 

357 else: 

358 file.write(payload) 

359 if file.isatty() and payload[-1] != "\n": 

360 file.write("\n") 

361 

362 

363async def single_request(args, context, globalopts=None): 

364 parser = build_parser(use_global=globalopts is None) 

365 options = parser.parse_args(args, copy.copy(globalopts)) 

366 

367 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules() 

368 if pretty_print_modules and (options.color is True or options.pretty_print is True): 

369 parser.error( 

370 "Color and pretty printing require the following" 

371 " additional module(s) to be installed: %s" 

372 % ", ".join(pretty_print_modules) 

373 ) 

374 if options.color is None: 

375 options.color = sys.stdout.isatty() and not pretty_print_modules 

376 if options.pretty_print is None: 

377 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules 

378 

379 try: 

380 try: 

381 code = getattr( 

382 aiocoap.numbers.codes.Code, 

383 options.method.upper().replace("IPATCH", "iPATCH"), 

384 ) 

385 except AttributeError: 

386 try: 

387 code = aiocoap.numbers.codes.Code(int(options.method)) 

388 except ValueError: 

389 raise parser.error("Unknown method") 

390 

391 if options.credentials is not None: 

392 apply_credentials(context, options.credentials, parser.error) 

393 

394 request = aiocoap.Message( 

395 code=code, mtype=aiocoap.NON if options.non else aiocoap.CON 

396 ) 

397 request.set_request_uri(options.url, set_uri_host=options.set_hostname) 

398 

399 if options.accept: 

400 try: 

401 request.opt.accept = ContentFormat(int(options.accept)) 

402 except ValueError: 

403 try: 

404 request.opt.accept = ContentFormat.by_media_type(options.accept) 

405 except KeyError: 

406 raise parser.error("Unknown accept type") 

407 

408 if options.observe: 

409 request.opt.observe = 0 

410 observation_is_over = asyncio.get_event_loop().create_future() 

411 

412 if options.content_format: 

413 try: 

414 request.opt.content_format = ContentFormat(int(options.content_format)) 

415 except ValueError: 

416 try: 

417 request.opt.content_format = ContentFormat.by_media_type( 

418 options.content_format 

419 ) 

420 except KeyError: 

421 raise parser.error("Unknown content format") 

422 

423 if options.payload: 

424 if options.payload.startswith("@"): 

425 filename = options.payload[1:] 

426 if filename == "-": 

427 f = sys.stdin.buffer 

428 else: 

429 f = open(filename, "rb") 

430 try: 

431 request.payload = f.read() 

432 except OSError as e: 

433 raise parser.error("File could not be opened: %s" % e) 

434 else: 

435 request_classification = contenttype.categorize( 

436 request.opt.content_format.media_type 

437 if request.opt.content_format is not None 

438 and request.opt.content_format.is_known() 

439 else "" 

440 ) 

441 if request_classification in ("cbor", "cbor-seq"): 

442 try: 

443 import cbor_diag 

444 except ImportError as e: 

445 raise parser.error(f"CBOR recoding not available ({e})") 

446 

447 try: 

448 encoded = cbor_diag.diag2cbor(options.payload) 

449 except ValueError as e: 

450 raise parser.error( 

451 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}" 

452 ) 

453 

454 if request_classification == "cbor-seq": 

455 try: 

456 import cbor2 

457 except ImportError as e: 

458 raise parser.error( 

459 f"CBOR sequence recoding not available ({e})" 

460 ) 

461 decoded = cbor2.loads(encoded) 

462 if not isinstance(decoded, list): 

463 raise parser.error( 

464 "CBOR sequence recoding requires an array as the top-level element." 

465 ) 

466 request.payload = b"".join(cbor2.dumps(d) for d in decoded) 

467 else: 

468 request.payload = encoded 

469 else: 

470 request.payload = options.payload.encode("utf8") 

471 

472 if options.payload_initial_szx is not None: 

473 request.remote.maximum_block_size_exp = options.payload_initial_szx 

474 

475 if options.proxy is None or options.proxy in ("none", "", "-"): 

476 interface = context 

477 else: 

478 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context) 

479 

480 requested_uri = request.get_request_uri() 

481 

482 log.info("Sending request:") 

483 for line in message_to_text(request, "to"): 

484 log.info(line) 

485 

486 requester = interface.request(request) 

487 

488 if options.observe: 

489 requester.observation.register_errback(observation_is_over.set_result) 

490 requester.observation.register_callback( 

491 lambda data, options=options: incoming_observation(options, data) 

492 ) 

493 

494 try: 

495 response_data = await requester.response 

496 finally: 

497 if not requester.response.done(): 

498 requester.response.cancel() 

499 if options.observe and not requester.observation.cancelled: 

500 requester.observation.cancel() 

501 

502 log.info("Received response:") 

503 for line in message_to_text(response_data, "from"): 

504 log.info(line) 

505 

506 response_uri = response_data.get_request_uri() 

507 if requested_uri != response_uri: 

508 print( 

509 colored( 

510 f"Response arrived from different address; base URI is {response_uri}", 

511 options, 

512 lambda token: token.Generic.Inserted, 

513 ), 

514 file=sys.stderr, 

515 ) 

516 if response_data.code.is_successful(): 

517 present(response_data, options) 

518 else: 

519 print( 

520 colored(response_data.code, options, lambda token: token.Generic.Error), 

521 file=sys.stderr, 

522 ) 

523 present(response_data, options, file=sys.stderr) 

524 sys.exit(1) 

525 

526 if options.observe: 

527 exit_reason = await observation_is_over 

528 print("Observation is over: %r" % (exit_reason,), file=sys.stderr) 

529 except aiocoap.error.HelpfulError as e: 

530 print(str(e), file=sys.stderr) 

531 extra_help = e.extra_help( 

532 hints=dict( 

533 original_uri=options.url, 

534 request=request, 

535 ) 

536 ) 

537 if extra_help: 

538 print("Debugging hint:", extra_help, file=sys.stderr) 

539 sys.exit(1) 

540 # Fallback while not all backends raise NetworkErrors 

541 except OSError as e: 

542 text = str(e) 

543 if not text: 

544 text = repr(e) 

545 if not text: 

546 # eg ConnectionResetError flying out of a misconfigured SSL server 

547 text = type(e) 

548 print( 

549 "Warning: OS errors should not be raised this way any more.", 

550 file=sys.stderr, 

551 ) 

552 # not telling what to do precisely: the form already tells users to 

553 # include `aiocoap.cli.defaults` output, which is exactly what we 

554 # need. 

555 print( 

556 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.", 

557 file=sys.stderr, 

558 ) 

559 print("Error:", text, file=sys.stderr) 

560 sys.exit(1) 

561 

562 

563async def single_request_with_context(args): 

564 """Wrapper around single_request until sync_main gets made fully async, and 

565 async context managers are used to manage contexts.""" 

566 context = await aiocoap.Context.create_client_context() 

567 try: 

568 await single_request(args, context) 

569 finally: 

570 await context.shutdown() 

571 

572 

573interactive_expecting_keyboard_interrupt = None 

574 

575 

576async def interactive(globalopts): 

577 global interactive_expecting_keyboard_interrupt 

578 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future() 

579 

580 context = await aiocoap.Context.create_client_context() 

581 

582 while True: 

583 try: 

584 # when http://bugs.python.org/issue22412 is resolved, use that instead 

585 line = await asyncio.get_event_loop().run_in_executor( 

586 None, lambda: input("aiocoap> ") 

587 ) 

588 except EOFError: 

589 line = "exit" 

590 line = shlex.split(line) 

591 if not line: 

592 continue 

593 if line in (["help"], ["?"]): 

594 line = ["--help"] 

595 if line in (["quit"], ["q"], ["exit"]): 

596 break 

597 

598 async def single_request_noexit(*args, **kwargs): 

599 """Protects a run against the exit automatically generated by 

600 argparse errors or help""" 

601 try: 

602 # We could also set exit_on_error, but this way we do get the 

603 # original error code and can show that; might still revisit. 

604 await single_request(*args, **kwargs) 

605 except SystemExit as e: 

606 return e.code 

607 else: 

608 return 0 

609 

610 current_task = asyncio.create_task( 

611 single_request_noexit(line, context=context, globalopts=globalopts), 

612 name="Interactive prompt command %r" % line, 

613 ) 

614 interactive_expecting_keyboard_interrupt = ( 

615 asyncio.get_event_loop().create_future() 

616 ) 

617 

618 done, pending = await asyncio.wait( 

619 [current_task, interactive_expecting_keyboard_interrupt], 

620 return_when=asyncio.FIRST_COMPLETED, 

621 ) 

622 

623 if current_task not in done: 

624 current_task.cancel() 

625 else: 

626 try: 

627 code = await current_task 

628 except Exception as e: 

629 print("Unhandled exception raised: %s" % (e,)) 

630 if code != 0: 

631 print("Exit code: %d" % code, file=sys.stderr) 

632 

633 await context.shutdown() 

634 

635 

636async def main(args=None): 

637 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet 

638 if args is None: 

639 args = sys.argv[1:] 

640 

641 # This one is tolerant and doesn't even terminate with --help, so that 

642 # --help and --interactive --help can do the right thing. 

643 first_parser = build_parser(prescreen=True) 

644 first_args = first_parser.parse_args(args) 

645 

646 configure_logging( 

647 (first_args.verbose or 0) - (first_args.quiet or 0), first_args.color 

648 ) 

649 

650 if not first_args.interactive: 

651 try: 

652 await single_request_with_context(args) 

653 except asyncio.exceptions.CancelledError: 

654 sys.exit(3) 

655 else: 

656 global_parser = build_parser(use_interactive=False) 

657 globalopts = global_parser.parse_args(args) 

658 

659 loop = asyncio.get_event_loop() 

660 

661 def ctrl_c(): 

662 try: 

663 interactive_expecting_keyboard_interrupt.set_result(None) 

664 except asyncio.exceptions.InvalidStateError: 

665 # Too many Ctlr-C before the program could clean up 

666 sys.exit(3) 

667 

668 loop.add_signal_handler(signal.SIGINT, ctrl_c) 

669 

670 await interactive(globalopts) 

671 

672 

673def sync_main(args=None): 

674 asyncio.run(main(args=args)) 

675 

676 

677if __name__ == "__main__": 

678 sync_main()