Coverage for aiocoap/cli/client.py: 72%

360 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-05 18:37 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers""" 

6 

7import copy 

8import sys 

9import asyncio 

10import argparse 

11import logging 

12import signal 

13import subprocess 

14from pathlib import Path 

15 

16import shlex 

17 

18# even though not used directly, this has side effects on the input() function 

19# used in interactive mode 

20try: 

21 import readline # noqa: F401 

22except ImportError: 

23 pass # that's normal on some platforms, and ok since it's just a usability enhancement 

24 

25import aiocoap 

26import aiocoap.defaults 

27import aiocoap.meta 

28import aiocoap.proxy.client 

29from aiocoap.util import contenttype 

30from aiocoap.util.cli import ActionNoYes 

31from aiocoap.numbers import ContentFormat 

32 

33log = logging.getLogger("coap.aiocoap-client") 

34 

35 

36def augment_parser_for_global(p, *, prescreen=False): 

37 p.add_argument( 

38 "-v", 

39 "--verbose", 

40 help="Increase the debug output", 

41 action="count", 

42 ) 

43 p.add_argument( 

44 "-q", 

45 "--quiet", 

46 help="Decrease the debug output", 

47 action="count", 

48 ) 

49 p.add_argument( 

50 "--version", action="version", version="%(prog)s " + aiocoap.meta.version 

51 ) 

52 

53 p.add_argument( 

54 "--interactive", 

55 help="Enter interactive mode. Combine with --help or run `help` interactively to see which options apply where; some can be used globally and overwritten locally.", 

56 action="store_true", 

57 ) 

58 

59 

60def augment_parser_for_either(p): 

61 p.add_argument( 

62 "--color", 

63 help="Color output (default on TTYs if all required modules are installed)", 

64 default=None, 

65 action=ActionNoYes, 

66 ) 

67 p.add_argument( 

68 "--pretty-print", 

69 help="Pretty-print known content formats (default on TTYs if all required modules are installed)", 

70 default=None, 

71 action=ActionNoYes, 

72 ) 

73 p.add_argument( 

74 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI" 

75 ) 

76 p.add_argument( 

77 "--credentials", 

78 help="Load credentials to use from a given file", 

79 type=Path, 

80 ) 

81 p.add_argument( 

82 "--no-set-hostname", 

83 help="Suppress transmission of Uri-Host even if the host name is not an IP literal", 

84 dest="set_hostname", 

85 action="store_false", 

86 default=True, 

87 ) 

88 p.add_argument( 

89 "--no-sec", 

90 # Can be "Send request without any security" once it actually does 

91 # anything; until then, it's fine as a no-op to not impede changing 

92 # scripts later. 

93 help=argparse.SUPPRESS, 

94 dest="sec", 

95 action="store_false", 

96 default=None, 

97 ) 

98 

99 

100def augment_parser_for_interactive(p, *, prescreen=False): 

101 p.add_argument( 

102 "--non", 

103 help="Send request without reliable transport (e.g. over UDP: as non-confirmable (NON) message)", 

104 action="store_true", 

105 ) 

106 p.add_argument( 

107 "-m", 

108 "--method", 

109 help="Name or number of request method to use (default: %(default)s)", 

110 default="GET", 

111 ) 

112 p.add_argument( 

113 "--observe", help="Register an observation on the resource", action="store_true" 

114 ) 

115 p.add_argument( 

116 "--observe-exec", 

117 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin", 

118 metavar="CMD", 

119 ) 

120 p.add_argument( 

121 "--accept", 

122 help="Content format to request", 

123 metavar="MIME", 

124 ) 

125 p.add_argument( 

126 "--payload", 

127 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.", 

128 metavar="X", 

129 ) 

130 p.add_argument( 

131 "--payload-initial-szx", 

132 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)", 

133 metavar="SZX", 

134 type=int, 

135 ) 

136 p.add_argument( 

137 "--content-format", 

138 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.", 

139 metavar="MIME", 

140 ) 

141 p.add_argument( 

142 "url", 

143 nargs="?" if prescreen else None, 

144 help="CoAP address to fetch", 

145 ) 

146 

147 

148def build_parser(*, use_global=True, use_interactive=True, prescreen=False): 

149 p = argparse.ArgumentParser(description=__doc__, add_help=not prescreen) 

150 if prescreen: 

151 p.add_argument("--help", action="store_true") 

152 if use_global: 

153 augment_parser_for_global(p, prescreen=prescreen) 

154 augment_parser_for_either(p) 

155 if use_interactive: 

156 augment_parser_for_interactive(p, prescreen=prescreen) 

157 

158 return p 

159 

160 

161def configure_logging(verbosity, color): 

162 if color is not False: 

163 try: 

164 import colorlog 

165 except ImportError: 

166 color = False 

167 else: 

168 colorlog.basicConfig() 

169 if not color: 

170 logging.basicConfig() 

171 

172 if verbosity <= -2: 

173 logging.getLogger("coap").setLevel(logging.CRITICAL + 1) 

174 elif verbosity == -1: 

175 logging.getLogger("coap").setLevel(logging.ERROR) 

176 elif verbosity == 0: 

177 logging.getLogger("coap").setLevel(logging.WARNING) 

178 elif verbosity == 1: 

179 logging.getLogger("coap").setLevel(logging.WARNING) 

180 logging.getLogger("coap.aiocoap-client").setLevel(logging.INFO) 

181 elif verbosity == 2: 

182 logging.getLogger("coap").setLevel(logging.INFO) 

183 elif verbosity >= 3: 

184 logging.getLogger("coap").setLevel(logging.DEBUG) 

185 elif verbosity >= 4: 

186 logging.getLogger("coap").setLevel(0) 

187 

188 log.debug("Logging configured.") 

189 

190 

191def colored(text, options, tokenlambda): 

192 """Apply pygments based coloring if options.color is set. Tokelambda is a 

193 callback to which pygments.token is passed and which returns a token type; 

194 this makes it easy to not need to conditionally react to pygments' possible 

195 absence in all color locations.""" 

196 if not options.color: 

197 return str(text) 

198 

199 from pygments.formatters import TerminalFormatter 

200 from pygments import token, format 

201 

202 return format( 

203 [(tokenlambda(token), str(text))], 

204 TerminalFormatter(), 

205 ) 

206 

207 

208def incoming_observation(options, response): 

209 log.info("Received Observe notification:") 

210 for line in message_to_text(response, "from"): 

211 log.info(line) 

212 

213 if options.observe_exec: 

214 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE) 

215 # FIXME this blocks 

216 p.communicate(response.payload) 

217 else: 

218 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc)) 

219 sys.stdout.write("\n") 

220 if response.code.is_successful(): 

221 present(response, options, file=sys.stderr) 

222 else: 

223 print( 

224 colored( 

225 response.code, options, lambda token: token.Token.Generic.Error 

226 ), 

227 file=sys.stderr, 

228 ) 

229 if response.payload: 

230 present(response, options, file=sys.stderr) 

231 sys.stdout.flush() 

232 

233 

234def apply_credentials(context, credentials, errfn): 

235 try: 

236 if credentials.suffix == ".json": 

237 import json 

238 

239 context.client_credentials.load_from_dict(json.load(credentials.open("rb"))) 

240 elif credentials.suffix == ".diag": 

241 try: 

242 import cbor_diag 

243 import cbor2 

244 except ImportError: 

245 raise errfn( 

246 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package" 

247 ) 

248 context.client_credentials.load_from_dict( 

249 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read())) 

250 ) 

251 else: 

252 raise errfn( 

253 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix) 

254 ) 

255 except FileNotFoundError as e: 

256 raise errfn("Credential file not found: %s" % e.filename) 

257 except (OSError, ValueError) as e: 

258 # Any of the parsers could reasonably raise those, and while they don't 

259 # have HelpfulError support, they should still not render a backtrace 

260 # but a proper CLI error. 

261 raise errfn("Processing credential file: %s" % e) 

262 

263 

264def message_to_text(m, direction): 

265 """Convert a message to a text form similar to how they are shown in RFCs. 

266 

267 Refactoring this into a message method will need to address the direction 

268 discovery eventually.""" 

269 if m.remote is None: 

270 # This happens when unprocessable remotes are, eg. putting in an HTTP URI 

271 yield f"{m.code} {direction} (unknown)" 

272 else: 

273 # FIXME: Update when transport-indication is available 

274 # FIXME: This is slightly wrong because it does not account for what ProxyRedirector does 

275 yield f"{m.code} {direction} {m.remote.scheme}://{m.remote.hostinfo}" 

276 for opt in m.opt.option_list(): 

277 if hasattr(opt.number, "name"): 

278 yield f"- {opt.number.name_printable} ({opt.number.value}): {opt.value!r}" 

279 else: 

280 yield f"- {opt.number.value}: {opt.value!r}" 

281 if m.payload: 

282 limit = 16 

283 if len(m.payload) > limit: 

284 yield f"Payload: {m.payload[:limit].hex()}... ({len(m.payload)} bytes total)" 

285 else: 

286 yield f"Payload: {m.payload[:limit].hex()} ({len(m.payload)} bytes)" 

287 else: 

288 yield "No payload" 

289 

290 

291def present(message, options, file=sys.stdout): 

292 """Write a message payload to the output, pretty printing and/or coloring 

293 it as configured in the options.""" 

294 if not options.quiet and (message.opt.location_path or message.opt.location_query): 

295 # FIXME: Percent encoding is completely missing; this would be done 

296 # most easily with a CRI library 

297 location_ref = "/" + "/".join(message.opt.location_path) 

298 if message.opt.location_query: 

299 location_ref += "?" + "&".join(message.opt.location_query) 

300 print( 

301 colored( 

302 f"Location options indicate new resource: {location_ref}", 

303 options, 

304 lambda token: token.Token.Generic.Inserted, 

305 ), 

306 file=sys.stderr, 

307 ) 

308 

309 if not message.payload: 

310 return 

311 

312 payload = None 

313 

314 cf = message.opt.content_format 

315 

316 if cf is None: 

317 if message.code.is_successful(): 

318 cf = message.request.opt.content_format 

319 else: 

320 cf = ContentFormat.TEXT 

321 

322 if cf is not None and cf.is_known(): 

323 mime = cf.media_type 

324 else: 

325 mime = "application/octet-stream" 

326 if options.pretty_print: 

327 from aiocoap.util.prettyprint import pretty_print 

328 

329 prettyprinted = pretty_print(message) 

330 if prettyprinted is not None: 

331 (infos, mime, payload) = prettyprinted 

332 if not options.quiet: 

333 for i in infos: 

334 print( 

335 colored("# " + i, options, lambda token: token.Comment), 

336 file=sys.stderr, 

337 ) 

338 

339 color = options.color 

340 if color: 

341 from aiocoap.util.prettyprint import lexer_for_mime 

342 import pygments 

343 

344 try: 

345 lexer = lexer_for_mime(mime) 

346 except pygments.util.ClassNotFound: 

347 color = False 

348 

349 if color and payload is None: 

350 # Coloring requires a unicode-string style payload, either from the 

351 # mime type or from the pretty printer. 

352 try: 

353 payload = message.payload.decode("utf8") 

354 except UnicodeDecodeError: 

355 color = False 

356 

357 if color: 

358 from pygments.formatters import TerminalFormatter 

359 from pygments import highlight 

360 

361 highlit = highlight( 

362 payload, 

363 lexer, 

364 TerminalFormatter(), 

365 ) 

366 # The TerminalFormatter already adds an end-of-line character, not 

367 # trying to add one for any missing trailing newlines. 

368 print(highlit, file=file, end="") 

369 file.flush() 

370 else: 

371 if payload is None: 

372 file.buffer.write(message.payload) 

373 if file.isatty() and message.payload[-1:] != b"\n": 

374 file.write("\n") 

375 else: 

376 file.write(payload) 

377 if file.isatty() and payload[-1] != "\n": 

378 file.write("\n") 

379 

380 

381async def single_request(args, context, globalopts=None): 

382 parser = build_parser(use_global=globalopts is None) 

383 options = parser.parse_args(args, copy.copy(globalopts)) 

384 

385 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules() 

386 if pretty_print_modules and (options.color is True or options.pretty_print is True): 

387 parser.error( 

388 "Color and pretty printing require the following" 

389 " additional module(s) to be installed: %s" 

390 % ", ".join(pretty_print_modules) 

391 ) 

392 if options.color is None: 

393 options.color = sys.stdout.isatty() and not pretty_print_modules 

394 if options.pretty_print is None: 

395 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules 

396 

397 try: 

398 try: 

399 code = getattr( 

400 aiocoap.numbers.codes.Code, 

401 options.method.upper().replace("IPATCH", "iPATCH"), 

402 ) 

403 except AttributeError: 

404 try: 

405 code = aiocoap.numbers.codes.Code(int(options.method)) 

406 except ValueError: 

407 raise parser.error("Unknown method") 

408 

409 if options.credentials is not None: 

410 apply_credentials(context, options.credentials, parser.error) 

411 

412 request = aiocoap.Message( 

413 code=code, 

414 transport_tuning=aiocoap.Unreliable if options.non else aiocoap.Reliable, 

415 ) 

416 request.set_request_uri(options.url, set_uri_host=options.set_hostname) 

417 

418 if options.accept: 

419 try: 

420 request.opt.accept = ContentFormat(int(options.accept)) 

421 except ValueError: 

422 try: 

423 request.opt.accept = ContentFormat.by_media_type(options.accept) 

424 except KeyError: 

425 raise parser.error("Unknown accept type") 

426 

427 if options.observe: 

428 request.opt.observe = 0 

429 

430 if options.content_format: 

431 try: 

432 request.opt.content_format = ContentFormat(int(options.content_format)) 

433 except ValueError: 

434 try: 

435 request.opt.content_format = ContentFormat.by_media_type( 

436 options.content_format 

437 ) 

438 except KeyError: 

439 raise parser.error("Unknown content format") 

440 

441 if options.payload: 

442 if options.payload.startswith("@"): 

443 filename = options.payload[1:] 

444 if filename == "-": 

445 f = sys.stdin.buffer 

446 else: 

447 f = open(filename, "rb") 

448 try: 

449 request.payload = f.read() 

450 except OSError as e: 

451 raise parser.error("File could not be opened: %s" % e) 

452 else: 

453 request_classification = contenttype.categorize( 

454 request.opt.content_format.media_type 

455 if request.opt.content_format is not None 

456 and request.opt.content_format.is_known() 

457 else "" 

458 ) 

459 if request_classification in ("cbor", "cbor-seq"): 

460 try: 

461 import cbor_diag 

462 except ImportError as e: 

463 raise parser.error(f"CBOR recoding not available ({e})") 

464 

465 try: 

466 encoded = cbor_diag.diag2cbor(options.payload) 

467 except ValueError as e: 

468 raise parser.error( 

469 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}" 

470 ) 

471 

472 if request_classification == "cbor-seq": 

473 try: 

474 import cbor2 

475 except ImportError as e: 

476 raise parser.error( 

477 f"CBOR sequence recoding not available ({e})" 

478 ) 

479 decoded = cbor2.loads(encoded) 

480 if not isinstance(decoded, list): 

481 raise parser.error( 

482 "CBOR sequence recoding requires an array as the top-level element." 

483 ) 

484 request.payload = b"".join(cbor2.dumps(d) for d in decoded) 

485 else: 

486 request.payload = encoded 

487 else: 

488 request.payload = options.payload.encode("utf8") 

489 

490 if options.payload_initial_szx is not None: 

491 request.remote.maximum_block_size_exp = options.payload_initial_szx 

492 

493 if options.proxy is None or options.proxy in ("none", "", "-"): 

494 interface = context 

495 else: 

496 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context) 

497 

498 requested_uri = request.get_request_uri() 

499 

500 log.info("Sending request:") 

501 for line in message_to_text(request, "to"): 

502 log.info(line) 

503 

504 requester = interface.request(request) 

505 

506 response_data = await requester.response 

507 

508 log.info("Received response:") 

509 for line in message_to_text(response_data, "from"): 

510 log.info(line) 

511 

512 response_uri = response_data.get_request_uri() 

513 if requested_uri != response_uri: 

514 print( 

515 colored( 

516 f"Response arrived from different address; base URI is {response_uri}", 

517 options, 

518 lambda token: token.Generic.Inserted, 

519 ), 

520 file=sys.stderr, 

521 ) 

522 if response_data.code.is_successful(): 

523 present(response_data, options) 

524 else: 

525 print( 

526 colored(response_data.code, options, lambda token: token.Generic.Error), 

527 file=sys.stderr, 

528 ) 

529 present(response_data, options, file=sys.stderr) 

530 sys.exit(1) 

531 

532 if options.observe: 

533 try: 

534 async for notification in requester.observation: 

535 incoming_observation(options, notification) 

536 except Exception as exit_reason: 

537 print("Observation is over: %r" % (exit_reason,), file=sys.stderr) 

538 except aiocoap.error.HelpfulError as e: 

539 print(str(e), file=sys.stderr) 

540 extra_help = e.extra_help( 

541 hints=dict( 

542 original_uri=options.url, 

543 request=request, 

544 ) 

545 ) 

546 if extra_help: 

547 print("Debugging hint:", extra_help, file=sys.stderr) 

548 sys.exit(1) 

549 # Fallback while not all backends raise NetworkErrors 

550 except OSError as e: 

551 text = str(e) 

552 if not text: 

553 text = repr(e) 

554 if not text: 

555 # eg ConnectionResetError flying out of a misconfigured SSL server 

556 text = type(e) 

557 print( 

558 "Warning: OS errors should not be raised this way any more.", 

559 file=sys.stderr, 

560 ) 

561 # not telling what to do precisely: the form already tells users to 

562 # include `aiocoap.cli.defaults` output, which is exactly what we 

563 # need. 

564 print( 

565 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.", 

566 file=sys.stderr, 

567 ) 

568 print("Error:", text, file=sys.stderr) 

569 sys.exit(1) 

570 

571 

572async def single_request_with_context(args): 

573 """Wrapper around single_request until sync_main gets made fully async, and 

574 async context managers are used to manage contexts.""" 

575 context = await aiocoap.Context.create_client_context() 

576 try: 

577 await single_request(args, context) 

578 finally: 

579 await context.shutdown() 

580 

581 

582interactive_expecting_keyboard_interrupt = None 

583 

584 

585async def interactive(globalopts): 

586 global interactive_expecting_keyboard_interrupt 

587 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future() 

588 

589 context = await aiocoap.Context.create_client_context() 

590 

591 while True: 

592 try: 

593 # when http://bugs.python.org/issue22412 is resolved, use that instead 

594 line = await asyncio.get_event_loop().run_in_executor( 

595 None, lambda: input("aiocoap> ") 

596 ) 

597 except EOFError: 

598 line = "exit" 

599 line = shlex.split(line) 

600 if not line: 

601 continue 

602 if line in (["help"], ["?"]): 

603 line = ["--help"] 

604 if line in (["quit"], ["q"], ["exit"]): 

605 break 

606 

607 async def single_request_noexit(*args, **kwargs): 

608 """Protects a run against the exit automatically generated by 

609 argparse errors or help""" 

610 try: 

611 # We could also set exit_on_error, but this way we do get the 

612 # original error code and can show that; might still revisit. 

613 await single_request(*args, **kwargs) 

614 except SystemExit as e: 

615 return e.code 

616 else: 

617 return 0 

618 

619 current_task = asyncio.create_task( 

620 single_request_noexit(line, context=context, globalopts=globalopts), 

621 name="Interactive prompt command %r" % line, 

622 ) 

623 interactive_expecting_keyboard_interrupt = ( 

624 asyncio.get_event_loop().create_future() 

625 ) 

626 

627 done, pending = await asyncio.wait( 

628 [current_task, interactive_expecting_keyboard_interrupt], 

629 return_when=asyncio.FIRST_COMPLETED, 

630 ) 

631 

632 if current_task not in done: 

633 current_task.cancel() 

634 else: 

635 try: 

636 code = await current_task 

637 except Exception as e: 

638 print("Unhandled exception raised: %s" % (e,)) 

639 if code != 0: 

640 print("Exit code: %d" % code, file=sys.stderr) 

641 

642 await context.shutdown() 

643 

644 

645async def main(args=None): 

646 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet 

647 if args is None: 

648 args = sys.argv[1:] 

649 

650 # This one is tolerant and doesn't even terminate with --help, so that 

651 # --help and --interactive --help can do the right thing. 

652 first_parser = build_parser(prescreen=True) 

653 first_args = first_parser.parse_args(args) 

654 

655 configure_logging( 

656 (first_args.verbose or 0) - (first_args.quiet or 0), first_args.color 

657 ) 

658 

659 if not first_args.interactive: 

660 try: 

661 await single_request_with_context(args) 

662 except asyncio.exceptions.CancelledError: 

663 sys.exit(3) 

664 else: 

665 global_parser = build_parser(use_interactive=False) 

666 globalopts = global_parser.parse_args(args) 

667 

668 loop = asyncio.get_event_loop() 

669 

670 def ctrl_c(): 

671 try: 

672 interactive_expecting_keyboard_interrupt.set_result(None) 

673 except asyncio.exceptions.InvalidStateError: 

674 # Too many Ctlr-C before the program could clean up 

675 sys.exit(3) 

676 

677 loop.add_signal_handler(signal.SIGINT, ctrl_c) 

678 

679 await interactive(globalopts) 

680 

681 

682def sync_main(args=None): 

683 asyncio.run(main(args=args)) 

684 

685 

686if __name__ == "__main__": 

687 sync_main()