Coverage for aiocoap/cli/client.py: 73%

361 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-02 23:12 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers""" 

6 

7import copy 

8import sys 

9import asyncio 

10import argparse 

11import logging 

12import signal 

13import subprocess 

14from pathlib import Path 

15 

16import shlex 

17 

18# even though not used directly, this has side effects on the input() function 

19# used in interactive mode 

20try: 

21 import readline # noqa: F401 

22except ImportError: 

23 pass # that's normal on some platforms, and ok since it's just a usability enhancement 

24 

25import aiocoap 

26import aiocoap.defaults 

27import aiocoap.meta 

28import aiocoap.proxy.client 

29from aiocoap.util import contenttype 

30from aiocoap.util.cli import ActionNoYes 

31from aiocoap.numbers import ContentFormat 

32 

33log = logging.getLogger("coap.aiocoap-client") 

34 

35 

36def augment_parser_for_global(p, *, prescreen=False): 

37 p.add_argument( 

38 "-v", 

39 "--verbose", 

40 help="Increase the debug output", 

41 action="count", 

42 ) 

43 p.add_argument( 

44 "-q", 

45 "--quiet", 

46 help="Decrease the debug output", 

47 action="count", 

48 ) 

49 p.add_argument( 

50 "--version", action="version", version="%(prog)s " + aiocoap.meta.version 

51 ) 

52 

53 p.add_argument( 

54 "--interactive", 

55 help="Enter interactive mode. Combine with --help or run `help` interactively to see which options apply where; some can be used globally and overwritten locally.", 

56 action="store_true", 

57 ) 

58 

59 

60def augment_parser_for_either(p): 

61 p.add_argument( 

62 "--color", 

63 help="Color output (default on TTYs if all required modules are installed)", 

64 default=None, 

65 action=ActionNoYes, 

66 ) 

67 p.add_argument( 

68 "--pretty-print", 

69 help="Pretty-print known content formats (default on TTYs if all required modules are installed)", 

70 default=None, 

71 action=ActionNoYes, 

72 ) 

73 p.add_argument( 

74 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI" 

75 ) 

76 p.add_argument( 

77 "--credentials", 

78 help="Load credentials to use from a given file", 

79 type=Path, 

80 ) 

81 p.add_argument( 

82 "--no-set-hostname", 

83 help="Suppress transmission of Uri-Host even if the host name is not an IP literal", 

84 dest="set_hostname", 

85 action="store_false", 

86 default=True, 

87 ) 

88 p.add_argument( 

89 "--no-sec", 

90 # Can be "Send request without any security" once it actually does 

91 # anything; until then, it's fine as a no-op to not impede changing 

92 # scripts later. 

93 help=argparse.SUPPRESS, 

94 dest="sec", 

95 action="store_false", 

96 default=None, 

97 ) 

98 

99 

100def augment_parser_for_interactive(p, *, prescreen=False): 

101 p.add_argument( 

102 "--non", 

103 help="Send request as non-confirmable (NON) message", 

104 action="store_true", 

105 ) 

106 p.add_argument( 

107 "-m", 

108 "--method", 

109 help="Name or number of request method to use (default: %(default)s)", 

110 default="GET", 

111 ) 

112 p.add_argument( 

113 "--observe", help="Register an observation on the resource", action="store_true" 

114 ) 

115 p.add_argument( 

116 "--observe-exec", 

117 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin", 

118 metavar="CMD", 

119 ) 

120 p.add_argument( 

121 "--accept", 

122 help="Content format to request", 

123 metavar="MIME", 

124 ) 

125 p.add_argument( 

126 "--payload", 

127 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.", 

128 metavar="X", 

129 ) 

130 p.add_argument( 

131 "--payload-initial-szx", 

132 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)", 

133 metavar="SZX", 

134 type=int, 

135 ) 

136 p.add_argument( 

137 "--content-format", 

138 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.", 

139 metavar="MIME", 

140 ) 

141 p.add_argument( 

142 "url", 

143 nargs="?" if prescreen else None, 

144 help="CoAP address to fetch", 

145 ) 

146 

147 

148def build_parser(*, use_global=True, use_interactive=True, prescreen=False): 

149 p = argparse.ArgumentParser(description=__doc__, add_help=not prescreen) 

150 if prescreen: 

151 p.add_argument("--help", action="store_true") 

152 if use_global: 

153 augment_parser_for_global(p, prescreen=prescreen) 

154 augment_parser_for_either(p) 

155 if use_interactive: 

156 augment_parser_for_interactive(p, prescreen=prescreen) 

157 

158 return p 

159 

160 

161def configure_logging(verbosity, color): 

162 if color is not False: 

163 try: 

164 import colorlog 

165 except ImportError: 

166 color = False 

167 else: 

168 colorlog.basicConfig() 

169 if not color: 

170 logging.basicConfig() 

171 

172 if verbosity <= -2: 

173 logging.getLogger("coap").setLevel(logging.CRITICAL + 1) 

174 elif verbosity == -1: 

175 logging.getLogger("coap").setLevel(logging.ERROR) 

176 elif verbosity == 0: 

177 logging.getLogger("coap").setLevel(logging.WARNING) 

178 elif verbosity == 1: 

179 logging.getLogger("coap").setLevel(logging.WARNING) 

180 logging.getLogger("coap.aiocoap-client").setLevel(logging.INFO) 

181 elif verbosity == 2: 

182 logging.getLogger("coap").setLevel(logging.INFO) 

183 elif verbosity >= 3: 

184 logging.getLogger("coap").setLevel(logging.DEBUG) 

185 elif verbosity >= 4: 

186 logging.getLogger("coap").setLevel(0) 

187 

188 log.debug("Logging configured.") 

189 

190 

191def colored(text, options, tokenlambda): 

192 """Apply pygments based coloring if options.color is set. Tokelambda is a 

193 callback to which pygments.token is passed and which returns a token type; 

194 this makes it easy to not need to conditionally react to pygments' possible 

195 absence in all color locations.""" 

196 if not options.color: 

197 return str(text) 

198 

199 from pygments.formatters import TerminalFormatter 

200 from pygments import token, format 

201 

202 return format( 

203 [(tokenlambda(token), str(text))], 

204 TerminalFormatter(), 

205 ) 

206 

207 

208def incoming_observation(options, response): 

209 log.info("Received Observe notification:") 

210 for line in message_to_text(response, "from"): 

211 log.info(line) 

212 

213 if options.observe_exec: 

214 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE) 

215 # FIXME this blocks 

216 p.communicate(response.payload) 

217 else: 

218 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc)) 

219 if response.code.is_successful(): 

220 present(response, options, file=sys.stderr) 

221 else: 

222 sys.stdout.flush() 

223 print( 

224 colored( 

225 response.code, options, lambda token: token.Token.Generic.Error 

226 ), 

227 file=sys.stderr, 

228 ) 

229 if response.payload: 

230 present(response, options, file=sys.stderr) 

231 

232 

233def apply_credentials(context, credentials, errfn): 

234 try: 

235 if credentials.suffix == ".json": 

236 import json 

237 

238 context.client_credentials.load_from_dict(json.load(credentials.open("rb"))) 

239 elif credentials.suffix == ".diag": 

240 try: 

241 import cbor_diag 

242 import cbor2 

243 except ImportError: 

244 raise errfn( 

245 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package" 

246 ) 

247 context.client_credentials.load_from_dict( 

248 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read())) 

249 ) 

250 else: 

251 raise errfn( 

252 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix) 

253 ) 

254 except FileNotFoundError as e: 

255 raise errfn("Credential file not found: %s" % e.filename) 

256 except (OSError, ValueError) as e: 

257 # Any of the parsers could reasonably raise those, and while they don't 

258 # have HelpfulError support, they should still not render a backtrace 

259 # but a proper CLI error. 

260 raise errfn("Processing credential file: %s" % e) 

261 

262 

263def message_to_text(m, direction): 

264 """Convert a message to a text form similar to how they are shown in RFCs. 

265 

266 Refactoring this into a message method will need to address the direction 

267 discovery eventually.""" 

268 if m.remote is None: 

269 # This happens when unprocessable remotes are, eg. putting in an HTTP URI 

270 yield f"{m.code} {direction} (unknown)" 

271 else: 

272 # FIXME: Update when transport-indication is available 

273 # FIXME: This is slightly wrong because it does not account for what ProxyRedirector does 

274 yield f"{m.code} {direction} {m.remote.scheme}://{m.remote.hostinfo}" 

275 for opt in m.opt.option_list(): 

276 if hasattr(opt.number, "name"): 

277 yield f"- {opt.number.name_printable} ({opt.number.value}): {opt.value!r}" 

278 else: 

279 yield f"- {opt.number.value}: {opt.value!r}" 

280 if m.payload: 

281 limit = 16 

282 if len(m.payload) > limit: 

283 yield f"Payload: {m.payload[:limit].hex()}... ({len(m.payload)} bytes total)" 

284 else: 

285 yield f"Payload: {m.payload[:limit].hex()} ({len(m.payload)} bytes)" 

286 else: 

287 yield "No payload" 

288 

289 

290def present(message, options, file=sys.stdout): 

291 """Write a message payload to the output, pretty printing and/or coloring 

292 it as configured in the options.""" 

293 if not options.quiet and (message.opt.location_path or message.opt.location_query): 

294 # FIXME: Percent encoding is completely missing; this would be done 

295 # most easily with a CRI library 

296 location_ref = "/" + "/".join(message.opt.location_path) 

297 if message.opt.location_query: 

298 location_ref += "?" + "&".join(message.opt.location_query) 

299 print( 

300 colored( 

301 f"Location options indicate new resource: {location_ref}", 

302 options, 

303 lambda token: token.Token.Generic.Inserted, 

304 ), 

305 file=sys.stderr, 

306 ) 

307 

308 if not message.payload: 

309 return 

310 

311 payload = None 

312 

313 cf = message.opt.content_format or message.request.opt.content_format 

314 if cf is not None and cf.is_known(): 

315 mime = cf.media_type 

316 else: 

317 mime = "application/octet-stream" 

318 if options.pretty_print: 

319 from aiocoap.util.prettyprint import pretty_print 

320 

321 prettyprinted = pretty_print(message) 

322 if prettyprinted is not None: 

323 (infos, mime, payload) = prettyprinted 

324 if not options.quiet: 

325 for i in infos: 

326 print( 

327 colored("# " + i, options, lambda token: token.Comment), 

328 file=sys.stderr, 

329 ) 

330 

331 color = options.color 

332 if color: 

333 from aiocoap.util.prettyprint import lexer_for_mime 

334 import pygments 

335 

336 try: 

337 lexer = lexer_for_mime(mime) 

338 except pygments.util.ClassNotFound: 

339 color = False 

340 

341 if color and payload is None: 

342 # Coloring requires a unicode-string style payload, either from the 

343 # mime type or from the pretty printer. 

344 try: 

345 payload = message.payload.decode("utf8") 

346 except UnicodeDecodeError: 

347 color = False 

348 

349 if color: 

350 from pygments.formatters import TerminalFormatter 

351 from pygments import highlight 

352 

353 highlit = highlight( 

354 payload, 

355 lexer, 

356 TerminalFormatter(), 

357 ) 

358 # The TerminalFormatter already adds an end-of-line character, not 

359 # trying to add one for any missing trailing newlines. 

360 print(highlit, file=file, end="") 

361 file.flush() 

362 else: 

363 if payload is None: 

364 file.buffer.write(message.payload) 

365 if file.isatty() and message.payload[-1:] != b"\n": 

366 file.write("\n") 

367 else: 

368 file.write(payload) 

369 if file.isatty() and payload[-1] != "\n": 

370 file.write("\n") 

371 

372 

373async def single_request(args, context, globalopts=None): 

374 parser = build_parser(use_global=globalopts is None) 

375 options = parser.parse_args(args, copy.copy(globalopts)) 

376 

377 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules() 

378 if pretty_print_modules and (options.color is True or options.pretty_print is True): 

379 parser.error( 

380 "Color and pretty printing require the following" 

381 " additional module(s) to be installed: %s" 

382 % ", ".join(pretty_print_modules) 

383 ) 

384 if options.color is None: 

385 options.color = sys.stdout.isatty() and not pretty_print_modules 

386 if options.pretty_print is None: 

387 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules 

388 

389 try: 

390 try: 

391 code = getattr( 

392 aiocoap.numbers.codes.Code, 

393 options.method.upper().replace("IPATCH", "iPATCH"), 

394 ) 

395 except AttributeError: 

396 try: 

397 code = aiocoap.numbers.codes.Code(int(options.method)) 

398 except ValueError: 

399 raise parser.error("Unknown method") 

400 

401 if options.credentials is not None: 

402 apply_credentials(context, options.credentials, parser.error) 

403 

404 request = aiocoap.Message( 

405 code=code, mtype=aiocoap.NON if options.non else aiocoap.CON 

406 ) 

407 request.set_request_uri(options.url, set_uri_host=options.set_hostname) 

408 

409 if options.accept: 

410 try: 

411 request.opt.accept = ContentFormat(int(options.accept)) 

412 except ValueError: 

413 try: 

414 request.opt.accept = ContentFormat.by_media_type(options.accept) 

415 except KeyError: 

416 raise parser.error("Unknown accept type") 

417 

418 if options.observe: 

419 request.opt.observe = 0 

420 observation_is_over = asyncio.get_event_loop().create_future() 

421 

422 if options.content_format: 

423 try: 

424 request.opt.content_format = ContentFormat(int(options.content_format)) 

425 except ValueError: 

426 try: 

427 request.opt.content_format = ContentFormat.by_media_type( 

428 options.content_format 

429 ) 

430 except KeyError: 

431 raise parser.error("Unknown content format") 

432 

433 if options.payload: 

434 if options.payload.startswith("@"): 

435 filename = options.payload[1:] 

436 if filename == "-": 

437 f = sys.stdin.buffer 

438 else: 

439 f = open(filename, "rb") 

440 try: 

441 request.payload = f.read() 

442 except OSError as e: 

443 raise parser.error("File could not be opened: %s" % e) 

444 else: 

445 request_classification = contenttype.categorize( 

446 request.opt.content_format.media_type 

447 if request.opt.content_format is not None 

448 and request.opt.content_format.is_known() 

449 else "" 

450 ) 

451 if request_classification in ("cbor", "cbor-seq"): 

452 try: 

453 import cbor_diag 

454 except ImportError as e: 

455 raise parser.error(f"CBOR recoding not available ({e})") 

456 

457 try: 

458 encoded = cbor_diag.diag2cbor(options.payload) 

459 except ValueError as e: 

460 raise parser.error( 

461 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}" 

462 ) 

463 

464 if request_classification == "cbor-seq": 

465 try: 

466 import cbor2 

467 except ImportError as e: 

468 raise parser.error( 

469 f"CBOR sequence recoding not available ({e})" 

470 ) 

471 decoded = cbor2.loads(encoded) 

472 if not isinstance(decoded, list): 

473 raise parser.error( 

474 "CBOR sequence recoding requires an array as the top-level element." 

475 ) 

476 request.payload = b"".join(cbor2.dumps(d) for d in decoded) 

477 else: 

478 request.payload = encoded 

479 else: 

480 request.payload = options.payload.encode("utf8") 

481 

482 if options.payload_initial_szx is not None: 

483 request.remote.maximum_block_size_exp = options.payload_initial_szx 

484 

485 if options.proxy is None or options.proxy in ("none", "", "-"): 

486 interface = context 

487 else: 

488 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context) 

489 

490 requested_uri = request.get_request_uri() 

491 

492 log.info("Sending request:") 

493 for line in message_to_text(request, "to"): 

494 log.info(line) 

495 

496 requester = interface.request(request) 

497 

498 if options.observe: 

499 requester.observation.register_errback(observation_is_over.set_result) 

500 requester.observation.register_callback( 

501 lambda data, options=options: incoming_observation(options, data) 

502 ) 

503 

504 try: 

505 response_data = await requester.response 

506 finally: 

507 if not requester.response.done(): 

508 requester.response.cancel() 

509 if options.observe and not requester.observation.cancelled: 

510 requester.observation.cancel() 

511 

512 log.info("Received response:") 

513 for line in message_to_text(response_data, "from"): 

514 log.info(line) 

515 

516 response_uri = response_data.get_request_uri() 

517 if requested_uri != response_uri: 

518 print( 

519 colored( 

520 f"Response arrived from different address; base URI is {response_uri}", 

521 options, 

522 lambda token: token.Generic.Inserted, 

523 ), 

524 file=sys.stderr, 

525 ) 

526 if response_data.code.is_successful(): 

527 present(response_data, options) 

528 else: 

529 print( 

530 colored(response_data.code, options, lambda token: token.Generic.Error), 

531 file=sys.stderr, 

532 ) 

533 present(response_data, options, file=sys.stderr) 

534 sys.exit(1) 

535 

536 if options.observe: 

537 exit_reason = await observation_is_over 

538 print("Observation is over: %r" % (exit_reason,), file=sys.stderr) 

539 except aiocoap.error.HelpfulError as e: 

540 print(str(e), file=sys.stderr) 

541 extra_help = e.extra_help( 

542 hints=dict( 

543 original_uri=options.url, 

544 request=request, 

545 ) 

546 ) 

547 if extra_help: 

548 print("Debugging hint:", extra_help, file=sys.stderr) 

549 sys.exit(1) 

550 # Fallback while not all backends raise NetworkErrors 

551 except OSError as e: 

552 text = str(e) 

553 if not text: 

554 text = repr(e) 

555 if not text: 

556 # eg ConnectionResetError flying out of a misconfigured SSL server 

557 text = type(e) 

558 print( 

559 "Warning: OS errors should not be raised this way any more.", 

560 file=sys.stderr, 

561 ) 

562 # not telling what to do precisely: the form already tells users to 

563 # include `aiocoap.cli.defaults` output, which is exactly what we 

564 # need. 

565 print( 

566 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.", 

567 file=sys.stderr, 

568 ) 

569 print("Error:", text, file=sys.stderr) 

570 sys.exit(1) 

571 

572 

573async def single_request_with_context(args): 

574 """Wrapper around single_request until sync_main gets made fully async, and 

575 async context managers are used to manage contexts.""" 

576 context = await aiocoap.Context.create_client_context() 

577 try: 

578 await single_request(args, context) 

579 finally: 

580 await context.shutdown() 

581 

582 

583interactive_expecting_keyboard_interrupt = None 

584 

585 

586async def interactive(globalopts): 

587 global interactive_expecting_keyboard_interrupt 

588 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future() 

589 

590 context = await aiocoap.Context.create_client_context() 

591 

592 while True: 

593 try: 

594 # when http://bugs.python.org/issue22412 is resolved, use that instead 

595 line = await asyncio.get_event_loop().run_in_executor( 

596 None, lambda: input("aiocoap> ") 

597 ) 

598 except EOFError: 

599 line = "exit" 

600 line = shlex.split(line) 

601 if not line: 

602 continue 

603 if line in (["help"], ["?"]): 

604 line = ["--help"] 

605 if line in (["quit"], ["q"], ["exit"]): 

606 break 

607 

608 async def single_request_noexit(*args, **kwargs): 

609 """Protects a run against the exit automatically generated by 

610 argparse errors or help""" 

611 try: 

612 # We could also set exit_on_error, but this way we do get the 

613 # original error code and can show that; might still revisit. 

614 await single_request(*args, **kwargs) 

615 except SystemExit as e: 

616 return e.code 

617 else: 

618 return 0 

619 

620 current_task = asyncio.create_task( 

621 single_request_noexit(line, context=context, globalopts=globalopts), 

622 name="Interactive prompt command %r" % line, 

623 ) 

624 interactive_expecting_keyboard_interrupt = ( 

625 asyncio.get_event_loop().create_future() 

626 ) 

627 

628 done, pending = await asyncio.wait( 

629 [current_task, interactive_expecting_keyboard_interrupt], 

630 return_when=asyncio.FIRST_COMPLETED, 

631 ) 

632 

633 if current_task not in done: 

634 current_task.cancel() 

635 else: 

636 try: 

637 code = await current_task 

638 except Exception as e: 

639 print("Unhandled exception raised: %s" % (e,)) 

640 if code != 0: 

641 print("Exit code: %d" % code, file=sys.stderr) 

642 

643 await context.shutdown() 

644 

645 

646async def main(args=None): 

647 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet 

648 if args is None: 

649 args = sys.argv[1:] 

650 

651 # This one is tolerant and doesn't even terminate with --help, so that 

652 # --help and --interactive --help can do the right thing. 

653 first_parser = build_parser(prescreen=True) 

654 first_args = first_parser.parse_args(args) 

655 

656 configure_logging( 

657 (first_args.verbose or 0) - (first_args.quiet or 0), first_args.color 

658 ) 

659 

660 if not first_args.interactive: 

661 try: 

662 await single_request_with_context(args) 

663 except asyncio.exceptions.CancelledError: 

664 sys.exit(3) 

665 else: 

666 global_parser = build_parser(use_interactive=False) 

667 globalopts = global_parser.parse_args(args) 

668 

669 loop = asyncio.get_event_loop() 

670 

671 def ctrl_c(): 

672 try: 

673 interactive_expecting_keyboard_interrupt.set_result(None) 

674 except asyncio.exceptions.InvalidStateError: 

675 # Too many Ctlr-C before the program could clean up 

676 sys.exit(3) 

677 

678 loop.add_signal_handler(signal.SIGINT, ctrl_c) 

679 

680 await interactive(globalopts) 

681 

682 

683def sync_main(args=None): 

684 asyncio.run(main(args=args)) 

685 

686 

687if __name__ == "__main__": 

688 sync_main()