Coverage for src/aiocoap/cli/client.py: 0%

355 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 12:14 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers""" 

6 

7import copy 

8import sys 

9import asyncio 

10import argparse 

11import logging 

12import subprocess 

13from pathlib import Path 

14 

15import shlex 

16 

17# even though not used directly, this has side effects on the input() function 

18# used in interactive mode 

19try: 

20 import readline # noqa: F401 

21except ImportError: 

22 pass # that's normal on some platforms, and ok since it's just a usability enhancement 

23 

24import aiocoap 

25import aiocoap.defaults 

26import aiocoap.meta 

27import aiocoap.proxy.client 

28from aiocoap.util import contenttype 

29from aiocoap.util.cli import ActionNoYes 

30from aiocoap.numbers import ContentFormat 

31 

32log = logging.getLogger("coap.aiocoap-client") 

33 

34 

35def augment_parser_for_global(p, *, prescreen=False): 

36 p.add_argument( 

37 "-v", 

38 "--verbose", 

39 help="Increase the debug output", 

40 action="count", 

41 ) 

42 p.add_argument( 

43 "-q", 

44 "--quiet", 

45 help="Decrease the debug output", 

46 action="count", 

47 ) 

48 p.add_argument( 

49 "--version", action="version", version="%(prog)s " + aiocoap.meta.version 

50 ) 

51 

52 p.add_argument( 

53 "--interactive", 

54 help="Enter interactive mode. Combine with --help or run `help` interactively to see which options apply where; some can be used globally and overwritten locally.", 

55 action="store_true", 

56 ) 

57 

58 

59def augment_parser_for_either(p): 

60 p.add_argument( 

61 "--color", 

62 help="Color output (default on TTYs if all required modules are installed)", 

63 default=None, 

64 action=ActionNoYes, 

65 ) 

66 p.add_argument( 

67 "--pretty-print", 

68 help="Pretty-print known content formats (default on TTYs if all required modules are installed)", 

69 default=None, 

70 action=ActionNoYes, 

71 ) 

72 p.add_argument( 

73 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI" 

74 ) 

75 p.add_argument( 

76 "--credentials", 

77 help="Load credentials to use from a given file", 

78 type=Path, 

79 ) 

80 p.add_argument( 

81 "--no-set-hostname", 

82 help="Suppress transmission of Uri-Host even if the host name is not an IP literal", 

83 dest="set_hostname", 

84 action="store_false", 

85 default=True, 

86 ) 

87 

88 

89def augment_parser_for_interactive(p, *, prescreen=False): 

90 p.add_argument( 

91 "--non", 

92 help="Send request as non-confirmable (NON) message", 

93 action="store_true", 

94 ) 

95 p.add_argument( 

96 "-m", 

97 "--method", 

98 help="Name or number of request method to use (default: %(default)s)", 

99 default="GET", 

100 ) 

101 p.add_argument( 

102 "--observe", help="Register an observation on the resource", action="store_true" 

103 ) 

104 p.add_argument( 

105 "--observe-exec", 

106 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin", 

107 metavar="CMD", 

108 ) 

109 p.add_argument( 

110 "--accept", 

111 help="Content format to request", 

112 metavar="MIME", 

113 ) 

114 p.add_argument( 

115 "--payload", 

116 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.", 

117 metavar="X", 

118 ) 

119 p.add_argument( 

120 "--payload-initial-szx", 

121 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)", 

122 metavar="SZX", 

123 type=int, 

124 ) 

125 p.add_argument( 

126 "--content-format", 

127 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.", 

128 metavar="MIME", 

129 ) 

130 p.add_argument( 

131 "url", 

132 nargs="?" if prescreen else None, 

133 help="CoAP address to fetch", 

134 ) 

135 

136 

137def build_parser(*, use_global=True, use_interactive=True, prescreen=False): 

138 p = argparse.ArgumentParser(description=__doc__, add_help=not prescreen) 

139 if prescreen: 

140 p.add_argument("--help", action="store_true") 

141 if use_global: 

142 augment_parser_for_global(p, prescreen=prescreen) 

143 augment_parser_for_either(p) 

144 if use_interactive: 

145 augment_parser_for_interactive(p, prescreen=prescreen) 

146 

147 return p 

148 

149 

150def configure_logging(verbosity, color): 

151 if color is not False: 

152 try: 

153 import colorlog 

154 except ImportError: 

155 color = False 

156 else: 

157 colorlog.basicConfig() 

158 if not color: 

159 logging.basicConfig() 

160 

161 if verbosity <= -2: 

162 logging.getLogger("coap").setLevel(logging.CRITICAL + 1) 

163 elif verbosity == -1: 

164 logging.getLogger("coap").setLevel(logging.ERROR) 

165 elif verbosity == 0: 

166 logging.getLogger("coap").setLevel(logging.WARNING) 

167 elif verbosity == 1: 

168 logging.getLogger("coap").setLevel(logging.WARNING) 

169 logging.getLogger("coap.aiocoap-client").setLevel(logging.INFO) 

170 elif verbosity == 2: 

171 logging.getLogger("coap").setLevel(logging.INFO) 

172 elif verbosity >= 3: 

173 logging.getLogger("coap").setLevel(logging.DEBUG) 

174 elif verbosity >= 4: 

175 logging.getLogger("coap").setLevel(0) 

176 

177 log.debug("Logging configured.") 

178 

179 

180def colored(text, options, tokenlambda): 

181 """Apply pygments based coloring if options.color is set. Tokelambda is a 

182 callback to which pygments.token is passed and which returns a token type; 

183 this makes it easy to not need to conditionally react to pygments' possible 

184 absence in all color locations.""" 

185 if not options.color: 

186 return str(text) 

187 

188 from pygments.formatters import TerminalFormatter 

189 from pygments import token, format 

190 

191 return format( 

192 [(tokenlambda(token), str(text))], 

193 TerminalFormatter(), 

194 ) 

195 

196 

197def incoming_observation(options, response): 

198 log.info("Received Observe notification:") 

199 for line in message_to_text(response, "from"): 

200 log.info(line) 

201 

202 if options.observe_exec: 

203 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE) 

204 # FIXME this blocks 

205 p.communicate(response.payload) 

206 else: 

207 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc)) 

208 if response.code.is_successful(): 

209 present(response, options, file=sys.stderr) 

210 else: 

211 sys.stdout.flush() 

212 print( 

213 colored( 

214 response.code, options, lambda token: token.Token.Generic.Error 

215 ), 

216 file=sys.stderr, 

217 ) 

218 if response.payload: 

219 present(response, options, file=sys.stderr) 

220 

221 

222def apply_credentials(context, credentials, errfn): 

223 try: 

224 if credentials.suffix == ".json": 

225 import json 

226 

227 context.client_credentials.load_from_dict(json.load(credentials.open("rb"))) 

228 elif credentials.suffix == ".diag": 

229 try: 

230 import cbor_diag 

231 import cbor2 

232 except ImportError: 

233 raise errfn( 

234 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package" 

235 ) 

236 context.client_credentials.load_from_dict( 

237 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read())) 

238 ) 

239 else: 

240 raise errfn( 

241 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix) 

242 ) 

243 except FileNotFoundError as e: 

244 raise errfn("Credential file not found: %s" % e.filename) 

245 except (OSError, ValueError) as e: 

246 # Any of the parsers could reasonably raise those, and while they don't 

247 # have HelpfulError support, they should still not render a backtrace 

248 # but a proper CLI error. 

249 raise errfn("Processing credential file: %s" % e) 

250 

251 

252def message_to_text(m, direction): 

253 """Convert a message to a text form similar to how they are shown in RFCs. 

254 

255 Refactoring this into a message method will need to address the direction 

256 discovery eventually.""" 

257 if m.remote is None: 

258 # This happens when unprocessable remotes are, eg. putting in an HTTP URI 

259 yield f"{m.code} {direction} (unknown)" 

260 else: 

261 # FIXME: Update when transport-indication is available 

262 # FIXME: This is slightly wrong because it does not account for what ProxyRedirector does 

263 yield f"{m.code} {direction} {m.remote.scheme}://{m.remote.hostinfo}" 

264 for opt in m.opt.option_list(): 

265 if hasattr(opt.number, "name"): 

266 yield f"- {opt.number.name_printable} ({opt.number.value}): {opt.value!r}" 

267 else: 

268 yield f"- {opt.number.value}: {opt.value!r}" 

269 if m.payload: 

270 limit = 16 

271 if len(m.payload) > limit: 

272 yield f"Payload: {m.payload[:limit].hex()}... ({len(m.payload)} bytes total)" 

273 else: 

274 yield f"Payload: {m.payload[:limit].hex()} ({len(m.payload)} bytes)" 

275 else: 

276 yield "No payload" 

277 

278 

279def present(message, options, file=sys.stdout): 

280 """Write a message payload to the output, pretty printing and/or coloring 

281 it as configured in the options.""" 

282 if not options.quiet and (message.opt.location_path or message.opt.location_query): 

283 # FIXME: Percent encoding is completely missing; this would be done 

284 # most easily with a CRI library 

285 location_ref = "/" + "/".join(message.opt.location_path) 

286 if message.opt.location_query: 

287 location_ref += "?" + "&".join(message.opt.location_query) 

288 print( 

289 colored( 

290 f"Location options indicate new resource: {location_ref}", 

291 options, 

292 lambda token: token.Token.Generic.Inserted, 

293 ), 

294 file=sys.stderr, 

295 ) 

296 

297 if not message.payload: 

298 return 

299 

300 payload = None 

301 

302 cf = message.opt.content_format or message.request.opt.content_format 

303 if cf is not None and cf.is_known(): 

304 mime = cf.media_type 

305 else: 

306 mime = "application/octet-stream" 

307 if options.pretty_print: 

308 from aiocoap.util.prettyprint import pretty_print 

309 

310 prettyprinted = pretty_print(message) 

311 if prettyprinted is not None: 

312 (infos, mime, payload) = prettyprinted 

313 if not options.quiet: 

314 for i in infos: 

315 print( 

316 colored("# " + i, options, lambda token: token.Comment), 

317 file=sys.stderr, 

318 ) 

319 

320 color = options.color 

321 if color: 

322 from aiocoap.util.prettyprint import lexer_for_mime 

323 import pygments 

324 

325 try: 

326 lexer = lexer_for_mime(mime) 

327 except pygments.util.ClassNotFound: 

328 color = False 

329 

330 if color and payload is None: 

331 # Coloring requires a unicode-string style payload, either from the 

332 # mime type or from the pretty printer. 

333 try: 

334 payload = message.payload.decode("utf8") 

335 except UnicodeDecodeError: 

336 color = False 

337 

338 if color: 

339 from pygments.formatters import TerminalFormatter 

340 from pygments import highlight 

341 

342 highlit = highlight( 

343 payload, 

344 lexer, 

345 TerminalFormatter(), 

346 ) 

347 # The TerminalFormatter already adds an end-of-line character, not 

348 # trying to add one for any missing trailing newlines. 

349 print(highlit, file=file, end="") 

350 file.flush() 

351 else: 

352 if payload is None: 

353 file.buffer.write(message.payload) 

354 if file.isatty() and message.payload[-1:] != b"\n": 

355 file.write("\n") 

356 else: 

357 file.write(payload) 

358 if file.isatty() and payload[-1] != "\n": 

359 file.write("\n") 

360 

361 

362async def single_request(args, context, globalopts=None): 

363 parser = build_parser(use_global=globalopts is None) 

364 options = parser.parse_args(args, copy.copy(globalopts)) 

365 

366 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules() 

367 if pretty_print_modules and (options.color is True or options.pretty_print is True): 

368 parser.error( 

369 "Color and pretty printing require the following" 

370 " additional module(s) to be installed: %s" 

371 % ", ".join(pretty_print_modules) 

372 ) 

373 if options.color is None: 

374 options.color = sys.stdout.isatty() and not pretty_print_modules 

375 if options.pretty_print is None: 

376 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules 

377 

378 try: 

379 try: 

380 code = getattr( 

381 aiocoap.numbers.codes.Code, 

382 options.method.upper().replace("IPATCH", "iPATCH"), 

383 ) 

384 except AttributeError: 

385 try: 

386 code = aiocoap.numbers.codes.Code(int(options.method)) 

387 except ValueError: 

388 raise parser.error("Unknown method") 

389 

390 if options.credentials is not None: 

391 apply_credentials(context, options.credentials, parser.error) 

392 

393 request = aiocoap.Message( 

394 code=code, mtype=aiocoap.NON if options.non else aiocoap.CON 

395 ) 

396 request.set_request_uri(options.url, set_uri_host=options.set_hostname) 

397 

398 if options.accept: 

399 try: 

400 request.opt.accept = ContentFormat(int(options.accept)) 

401 except ValueError: 

402 try: 

403 request.opt.accept = ContentFormat.by_media_type(options.accept) 

404 except KeyError: 

405 raise parser.error("Unknown accept type") 

406 

407 if options.observe: 

408 request.opt.observe = 0 

409 observation_is_over = asyncio.get_event_loop().create_future() 

410 

411 if options.content_format: 

412 try: 

413 request.opt.content_format = ContentFormat(int(options.content_format)) 

414 except ValueError: 

415 try: 

416 request.opt.content_format = ContentFormat.by_media_type( 

417 options.content_format 

418 ) 

419 except KeyError: 

420 raise parser.error("Unknown content format") 

421 

422 if options.payload: 

423 if options.payload.startswith("@"): 

424 filename = options.payload[1:] 

425 if filename == "-": 

426 f = sys.stdin.buffer 

427 else: 

428 f = open(filename, "rb") 

429 try: 

430 request.payload = f.read() 

431 except OSError as e: 

432 raise parser.error("File could not be opened: %s" % e) 

433 else: 

434 request_classification = contenttype.categorize( 

435 request.opt.content_format.media_type 

436 if request.opt.content_format is not None 

437 and request.opt.content_format.is_known() 

438 else "" 

439 ) 

440 if request_classification in ("cbor", "cbor-seq"): 

441 try: 

442 import cbor_diag 

443 except ImportError as e: 

444 raise parser.error(f"CBOR recoding not available ({e})") 

445 

446 try: 

447 encoded = cbor_diag.diag2cbor(options.payload) 

448 except ValueError as e: 

449 raise parser.error( 

450 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}" 

451 ) 

452 

453 if request_classification == "cbor-seq": 

454 try: 

455 import cbor2 

456 except ImportError as e: 

457 raise parser.error( 

458 f"CBOR sequence recoding not available ({e})" 

459 ) 

460 decoded = cbor2.loads(encoded) 

461 if not isinstance(decoded, list): 

462 raise parser.error( 

463 "CBOR sequence recoding requires an array as the top-level element." 

464 ) 

465 request.payload = b"".join(cbor2.dumps(d) for d in decoded) 

466 else: 

467 request.payload = encoded 

468 else: 

469 request.payload = options.payload.encode("utf8") 

470 

471 if options.payload_initial_szx is not None: 

472 request.remote.maximum_block_size_exp = options.payload_initial_szx 

473 

474 if options.proxy is None or options.proxy in ("none", "", "-"): 

475 interface = context 

476 else: 

477 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context) 

478 

479 requested_uri = request.get_request_uri() 

480 

481 log.info("Sending request:") 

482 for line in message_to_text(request, "to"): 

483 log.info(line) 

484 

485 requester = interface.request(request) 

486 

487 if options.observe: 

488 requester.observation.register_errback(observation_is_over.set_result) 

489 requester.observation.register_callback( 

490 lambda data, options=options: incoming_observation(options, data) 

491 ) 

492 

493 try: 

494 response_data = await requester.response 

495 finally: 

496 if not requester.response.done(): 

497 requester.response.cancel() 

498 if options.observe and not requester.observation.cancelled: 

499 requester.observation.cancel() 

500 

501 log.info("Received response:") 

502 for line in message_to_text(response_data, "from"): 

503 log.info(line) 

504 

505 response_uri = response_data.get_request_uri() 

506 if requested_uri != response_uri: 

507 print( 

508 colored( 

509 f"Response arrived from different address; base URI is {response_uri}", 

510 options, 

511 lambda token: token.Generic.Inserted, 

512 ), 

513 file=sys.stderr, 

514 ) 

515 if response_data.code.is_successful(): 

516 present(response_data, options) 

517 else: 

518 print( 

519 colored(response_data.code, options, lambda token: token.Generic.Error), 

520 file=sys.stderr, 

521 ) 

522 present(response_data, options, file=sys.stderr) 

523 sys.exit(1) 

524 

525 if options.observe: 

526 exit_reason = await observation_is_over 

527 print("Observation is over: %r" % (exit_reason,), file=sys.stderr) 

528 except aiocoap.error.HelpfulError as e: 

529 print(str(e), file=sys.stderr) 

530 extra_help = e.extra_help( 

531 hints=dict( 

532 original_uri=options.url, 

533 request=request, 

534 ) 

535 ) 

536 if extra_help: 

537 print("Debugging hint:", extra_help, file=sys.stderr) 

538 sys.exit(1) 

539 # Fallback while not all backends raise NetworkErrors 

540 except OSError as e: 

541 text = str(e) 

542 if not text: 

543 text = repr(e) 

544 if not text: 

545 # eg ConnectionResetError flying out of a misconfigured SSL server 

546 text = type(e) 

547 print( 

548 "Warning: OS errors should not be raised this way any more.", 

549 file=sys.stderr, 

550 ) 

551 # not telling what to do precisely: the form already tells users to 

552 # include `aiocoap.cli.defaults` output, which is exactly what we 

553 # need. 

554 print( 

555 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.", 

556 file=sys.stderr, 

557 ) 

558 print("Error:", text, file=sys.stderr) 

559 sys.exit(1) 

560 

561 

562async def single_request_with_context(args): 

563 """Wrapper around single_request until sync_main gets made fully async, and 

564 async context managers are used to manage contexts.""" 

565 context = await aiocoap.Context.create_client_context() 

566 try: 

567 await single_request(args, context) 

568 finally: 

569 await context.shutdown() 

570 

571 

572interactive_expecting_keyboard_interrupt = None 

573 

574 

575async def interactive(globalopts): 

576 global interactive_expecting_keyboard_interrupt 

577 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future() 

578 

579 context = await aiocoap.Context.create_client_context() 

580 

581 while True: 

582 try: 

583 # when http://bugs.python.org/issue22412 is resolved, use that instead 

584 line = await asyncio.get_event_loop().run_in_executor( 

585 None, lambda: input("aiocoap> ") 

586 ) 

587 except EOFError: 

588 line = "exit" 

589 line = shlex.split(line) 

590 if not line: 

591 continue 

592 if line in (["help"], ["?"]): 

593 line = ["--help"] 

594 if line in (["quit"], ["q"], ["exit"]): 

595 break 

596 

597 current_task = asyncio.create_task( 

598 single_request(line, context=context, globalopts=globalopts), 

599 name="Interactive prompt command %r" % line, 

600 ) 

601 interactive_expecting_keyboard_interrupt = ( 

602 asyncio.get_event_loop().create_future() 

603 ) 

604 

605 done, pending = await asyncio.wait( 

606 [current_task, interactive_expecting_keyboard_interrupt], 

607 return_when=asyncio.FIRST_COMPLETED, 

608 ) 

609 

610 if current_task not in done: 

611 current_task.cancel() 

612 else: 

613 try: 

614 await current_task 

615 except SystemExit as e: 

616 if e.code != 0: 

617 print("Exit code: %d" % e.code, file=sys.stderr) 

618 continue 

619 except Exception as e: 

620 print("Unhandled exception raised: %s" % (e,)) 

621 

622 await context.shutdown() 

623 

624 

625def sync_main(args=None): 

626 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet 

627 if args is None: 

628 args = sys.argv[1:] 

629 

630 # This one is tolerant and doesn't even terminate with --help, so that 

631 # --help and --interactive --help can do the right thing. 

632 first_parser = build_parser(prescreen=True) 

633 first_args = first_parser.parse_args(args) 

634 

635 configure_logging( 

636 (first_args.verbose or 0) - (first_args.quiet or 0), first_args.color 

637 ) 

638 

639 if not first_args.interactive: 

640 try: 

641 asyncio.run(single_request_with_context(args)) 

642 except KeyboardInterrupt: 

643 sys.exit(3) 

644 else: 

645 global_parser = build_parser(use_interactive=False) 

646 globalopts = global_parser.parse_args(args) 

647 

648 loop = asyncio.get_event_loop() 

649 task = loop.create_task( 

650 interactive(globalopts), 

651 name="Interactive prompt", 

652 ) 

653 

654 while not task.done(): 

655 try: 

656 loop.run_until_complete(task) 

657 except KeyboardInterrupt: 

658 if not interactive_expecting_keyboard_interrupt.done(): 

659 interactive_expecting_keyboard_interrupt.set_result(None) 

660 except SystemExit: 

661 continue # asyncio/tasks.py(242) raises those after setting them as results, but we particularly want them back in the loop 

662 

663 

664if __name__ == "__main__": 

665 sync_main()