Coverage for src/aiocoap/cli/client.py: 0%

350 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-20 17:26 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers""" 

6 

7import copy 

8import sys 

9import asyncio 

10import argparse 

11import logging 

12import subprocess 

13from pathlib import Path 

14 

15import shlex 

16 

17# even though not used directly, this has side effects on the input() function 

18# used in interactive mode 

19try: 

20 import readline # noqa: F401 

21except ImportError: 

22 pass # that's normal on some platforms, and ok since it's just a usability enhancement 

23 

24import aiocoap 

25import aiocoap.defaults 

26import aiocoap.meta 

27import aiocoap.proxy.client 

28from aiocoap.util import contenttype 

29from aiocoap.util.cli import ActionNoYes 

30from aiocoap.numbers import ContentFormat 

31 

32log = logging.getLogger("coap.aiocoap-client") 

33 

34 

35def augment_parser_for_global(p, *, prescreen=False): 

36 p.add_argument( 

37 "-v", 

38 "--verbose", 

39 help="Increase the debug output", 

40 action="count", 

41 ) 

42 p.add_argument( 

43 "-q", 

44 "--quiet", 

45 help="Decrease the debug output", 

46 action="count", 

47 ) 

48 p.add_argument( 

49 "--version", action="version", version="%(prog)s " + aiocoap.meta.version 

50 ) 

51 

52 p.add_argument( 

53 "--interactive", 

54 help="Enter interactive mode. Combine with --help or run `help` interactively to see which options apply where; some can be used globally and overwritten locally.", 

55 action="store_true", 

56 ) 

57 

58 

59def augment_parser_for_either(p): 

60 p.add_argument( 

61 "--color", 

62 help="Color output (default on TTYs if all required modules are installed)", 

63 default=None, 

64 action=ActionNoYes, 

65 ) 

66 p.add_argument( 

67 "--pretty-print", 

68 help="Pretty-print known content formats (default on TTYs if all required modules are installed)", 

69 default=None, 

70 action=ActionNoYes, 

71 ) 

72 p.add_argument( 

73 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI" 

74 ) 

75 p.add_argument( 

76 "--credentials", 

77 help="Load credentials to use from a given file", 

78 type=Path, 

79 ) 

80 p.add_argument( 

81 "--no-set-hostname", 

82 help="Suppress transmission of Uri-Host even if the host name is not an IP literal", 

83 dest="set_hostname", 

84 action="store_false", 

85 default=True, 

86 ) 

87 

88 

89def augment_parser_for_interactive(p, *, prescreen=False): 

90 p.add_argument( 

91 "--non", 

92 help="Send request as non-confirmable (NON) message", 

93 action="store_true", 

94 ) 

95 p.add_argument( 

96 "-m", 

97 "--method", 

98 help="Name or number of request method to use (default: %(default)s)", 

99 default="GET", 

100 ) 

101 p.add_argument( 

102 "--observe", help="Register an observation on the resource", action="store_true" 

103 ) 

104 p.add_argument( 

105 "--observe-exec", 

106 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin", 

107 metavar="CMD", 

108 ) 

109 p.add_argument( 

110 "--accept", 

111 help="Content format to request", 

112 metavar="MIME", 

113 ) 

114 p.add_argument( 

115 "--payload", 

116 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.", 

117 metavar="X", 

118 ) 

119 p.add_argument( 

120 "--payload-initial-szx", 

121 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)", 

122 metavar="SZX", 

123 type=int, 

124 ) 

125 p.add_argument( 

126 "--content-format", 

127 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.", 

128 metavar="MIME", 

129 ) 

130 p.add_argument( 

131 "url", 

132 nargs="?" if prescreen else None, 

133 help="CoAP address to fetch", 

134 ) 

135 

136 

137def build_parser(*, use_global=True, use_interactive=True, prescreen=False): 

138 p = argparse.ArgumentParser(description=__doc__, add_help=not prescreen) 

139 if prescreen: 

140 p.add_argument("--help", action="store_true") 

141 if use_global: 

142 augment_parser_for_global(p, prescreen=prescreen) 

143 augment_parser_for_either(p) 

144 if use_interactive: 

145 augment_parser_for_interactive(p, prescreen=prescreen) 

146 

147 return p 

148 

149 

150def configure_logging(verbosity, color): 

151 if color is not False: 

152 try: 

153 import colorlog 

154 except ImportError: 

155 color = False 

156 else: 

157 colorlog.basicConfig() 

158 if not color: 

159 logging.basicConfig() 

160 

161 if verbosity <= -2: 

162 logging.getLogger("coap").setLevel(logging.CRITICAL + 1) 

163 elif verbosity == -1: 

164 logging.getLogger("coap").setLevel(logging.ERROR) 

165 elif verbosity == 0: 

166 logging.getLogger("coap").setLevel(logging.WARNING) 

167 elif verbosity == 1: 

168 logging.getLogger("coap").setLevel(logging.WARNING) 

169 logging.getLogger("coap.aiocoap-client").setLevel(logging.INFO) 

170 elif verbosity == 2: 

171 logging.getLogger("coap").setLevel(logging.INFO) 

172 elif verbosity >= 3: 

173 logging.getLogger("coap").setLevel(logging.DEBUG) 

174 elif verbosity >= 4: 

175 logging.getLogger("coap").setLevel(0) 

176 

177 log.debug("Logging configured.") 

178 

179 

180def colored(text, options, tokenlambda): 

181 """Apply pygments based coloring if options.color is set. Tokelambda is a 

182 callback to which pygments.token is passed and which returns a token type; 

183 this makes it easy to not need to conditionally react to pygments' possible 

184 absence in all color locations.""" 

185 if not options.color: 

186 return str(text) 

187 

188 from pygments.formatters import TerminalFormatter 

189 from pygments import token, format 

190 

191 return format( 

192 [(tokenlambda(token), str(text))], 

193 TerminalFormatter(), 

194 ) 

195 

196 

197def incoming_observation(options, response): 

198 log.info("Received Observe notification:") 

199 for line in message_to_text(response, "from"): 

200 log.info(line) 

201 

202 if options.observe_exec: 

203 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE) 

204 # FIXME this blocks 

205 p.communicate(response.payload) 

206 else: 

207 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc)) 

208 if response.code.is_successful(): 

209 present(response, options, file=sys.stderr) 

210 else: 

211 sys.stdout.flush() 

212 print( 

213 colored( 

214 response.code, options, lambda token: token.Token.Generic.Error 

215 ), 

216 file=sys.stderr, 

217 ) 

218 if response.payload: 

219 present(response, options, file=sys.stderr) 

220 

221 

222def apply_credentials(context, credentials, errfn): 

223 if credentials.suffix == ".json": 

224 import json 

225 

226 context.client_credentials.load_from_dict(json.load(credentials.open("rb"))) 

227 elif credentials.suffix == ".diag": 

228 try: 

229 import cbor_diag 

230 import cbor2 

231 except ImportError: 

232 raise errfn( 

233 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package" 

234 ) 

235 context.client_credentials.load_from_dict( 

236 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read())) 

237 ) 

238 else: 

239 raise errfn( 

240 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix) 

241 ) 

242 

243 

244def message_to_text(m, direction): 

245 """Convert a message to a text form similar to how they are shown in RFCs. 

246 

247 Refactoring this into a message method will need to address the direction 

248 discovery eventually.""" 

249 if m.remote is None: 

250 # This happens when unprocessable remotes are, eg. putting in an HTTP URI 

251 yield f"{m.code} {direction} (unknown)" 

252 else: 

253 # FIXME: Update when transport-indication is available 

254 # FIXME: This is slightly wrong because it does not account for what ProxyRedirector does 

255 yield f"{m.code} {direction} {m.remote.scheme}://{m.remote.hostinfo}" 

256 for opt in m.opt.option_list(): 

257 if hasattr(opt.number, "name"): 

258 yield f"- {opt.number.name_printable} ({opt.number.value}): {opt.value!r}" 

259 else: 

260 yield f"- {opt.number.value}: {opt.value!r}" 

261 if m.payload: 

262 limit = 16 

263 if len(m.payload) > limit: 

264 yield f"Payload: {m.payload[:limit].hex()}... ({len(m.payload)} bytes total)" 

265 else: 

266 yield f"Payload: {m.payload[:limit].hex()} ({len(m.payload)} bytes)" 

267 else: 

268 yield "No payload" 

269 

270 

271def present(message, options, file=sys.stdout): 

272 """Write a message payload to the output, pretty printing and/or coloring 

273 it as configured in the options.""" 

274 if not options.quiet and (message.opt.location_path or message.opt.location_query): 

275 # FIXME: Percent encoding is completely missing; this would be done 

276 # most easily with a CRI library 

277 location_ref = "/" + "/".join(message.opt.location_path) 

278 if message.opt.location_query: 

279 location_ref += "?" + "&".join(message.opt.location_query) 

280 print( 

281 colored( 

282 f"Location options indicate new resource: {location_ref}", 

283 options, 

284 lambda token: token.Token.Generic.Inserted, 

285 ), 

286 file=sys.stderr, 

287 ) 

288 

289 if not message.payload: 

290 return 

291 

292 payload = None 

293 

294 cf = message.opt.content_format or message.request.opt.content_format 

295 if cf is not None and cf.is_known(): 

296 mime = cf.media_type 

297 else: 

298 mime = "application/octet-stream" 

299 if options.pretty_print: 

300 from aiocoap.util.prettyprint import pretty_print 

301 

302 prettyprinted = pretty_print(message) 

303 if prettyprinted is not None: 

304 (infos, mime, payload) = prettyprinted 

305 if not options.quiet: 

306 for i in infos: 

307 print( 

308 colored("# " + i, options, lambda token: token.Comment), 

309 file=sys.stderr, 

310 ) 

311 

312 color = options.color 

313 if color: 

314 from aiocoap.util.prettyprint import lexer_for_mime 

315 import pygments 

316 

317 try: 

318 lexer = lexer_for_mime(mime) 

319 except pygments.util.ClassNotFound: 

320 color = False 

321 

322 if color and payload is None: 

323 # Coloring requires a unicode-string style payload, either from the 

324 # mime type or from the pretty printer. 

325 try: 

326 payload = message.payload.decode("utf8") 

327 except UnicodeDecodeError: 

328 color = False 

329 

330 if color: 

331 from pygments.formatters import TerminalFormatter 

332 from pygments import highlight 

333 

334 highlit = highlight( 

335 payload, 

336 lexer, 

337 TerminalFormatter(), 

338 ) 

339 # The TerminalFormatter already adds an end-of-line character, not 

340 # trying to add one for any missing trailing newlines. 

341 print(highlit, file=file, end="") 

342 file.flush() 

343 else: 

344 if payload is None: 

345 file.buffer.write(message.payload) 

346 if file.isatty() and message.payload[-1:] != b"\n": 

347 file.write("\n") 

348 else: 

349 file.write(payload) 

350 if file.isatty() and payload[-1] != "\n": 

351 file.write("\n") 

352 

353 

354async def single_request(args, context, globalopts=None): 

355 parser = build_parser(use_global=globalopts is None) 

356 options = parser.parse_args(args, copy.copy(globalopts)) 

357 

358 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules() 

359 if pretty_print_modules and (options.color is True or options.pretty_print is True): 

360 parser.error( 

361 "Color and pretty printing require the following" 

362 " additional module(s) to be installed: %s" 

363 % ", ".join(pretty_print_modules) 

364 ) 

365 if options.color is None: 

366 options.color = sys.stdout.isatty() and not pretty_print_modules 

367 if options.pretty_print is None: 

368 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules 

369 

370 try: 

371 try: 

372 code = getattr( 

373 aiocoap.numbers.codes.Code, 

374 options.method.upper().replace("IPATCH", "iPATCH"), 

375 ) 

376 except AttributeError: 

377 try: 

378 code = aiocoap.numbers.codes.Code(int(options.method)) 

379 except ValueError: 

380 raise parser.error("Unknown method") 

381 

382 if options.credentials is not None: 

383 apply_credentials(context, options.credentials, parser.error) 

384 

385 request = aiocoap.Message( 

386 code=code, mtype=aiocoap.NON if options.non else aiocoap.CON 

387 ) 

388 request.set_request_uri(options.url, set_uri_host=options.set_hostname) 

389 

390 if options.accept: 

391 try: 

392 request.opt.accept = ContentFormat(int(options.accept)) 

393 except ValueError: 

394 try: 

395 request.opt.accept = ContentFormat.by_media_type(options.accept) 

396 except KeyError: 

397 raise parser.error("Unknown accept type") 

398 

399 if options.observe: 

400 request.opt.observe = 0 

401 observation_is_over = asyncio.get_event_loop().create_future() 

402 

403 if options.content_format: 

404 try: 

405 request.opt.content_format = ContentFormat(int(options.content_format)) 

406 except ValueError: 

407 try: 

408 request.opt.content_format = ContentFormat.by_media_type( 

409 options.content_format 

410 ) 

411 except KeyError: 

412 raise parser.error("Unknown content format") 

413 

414 if options.payload: 

415 if options.payload.startswith("@"): 

416 filename = options.payload[1:] 

417 if filename == "-": 

418 f = sys.stdin.buffer 

419 else: 

420 f = open(filename, "rb") 

421 try: 

422 request.payload = f.read() 

423 except OSError as e: 

424 raise parser.error("File could not be opened: %s" % e) 

425 else: 

426 request_classification = contenttype.categorize( 

427 request.opt.content_format.media_type 

428 if request.opt.content_format is not None 

429 and request.opt.content_format.is_known() 

430 else "" 

431 ) 

432 if request_classification in ("cbor", "cbor-seq"): 

433 try: 

434 import cbor_diag 

435 except ImportError as e: 

436 raise parser.error(f"CBOR recoding not available ({e})") 

437 

438 try: 

439 encoded = cbor_diag.diag2cbor(options.payload) 

440 except ValueError as e: 

441 raise parser.error( 

442 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}" 

443 ) 

444 

445 if request_classification == "cbor-seq": 

446 try: 

447 import cbor2 

448 except ImportError as e: 

449 raise parser.error( 

450 f"CBOR sequence recoding not available ({e})" 

451 ) 

452 decoded = cbor2.loads(encoded) 

453 if not isinstance(decoded, list): 

454 raise parser.error( 

455 "CBOR sequence recoding requires an array as the top-level element." 

456 ) 

457 request.payload = b"".join(cbor2.dumps(d) for d in decoded) 

458 else: 

459 request.payload = encoded 

460 else: 

461 request.payload = options.payload.encode("utf8") 

462 

463 if options.payload_initial_szx is not None: 

464 request.remote.maximum_block_size_exp = options.payload_initial_szx 

465 

466 if options.proxy is None or options.proxy in ("none", "", "-"): 

467 interface = context 

468 else: 

469 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context) 

470 

471 requested_uri = request.get_request_uri() 

472 

473 log.info("Sending request:") 

474 for line in message_to_text(request, "to"): 

475 log.info(line) 

476 

477 requester = interface.request(request) 

478 

479 if options.observe: 

480 requester.observation.register_errback(observation_is_over.set_result) 

481 requester.observation.register_callback( 

482 lambda data, options=options: incoming_observation(options, data) 

483 ) 

484 

485 try: 

486 response_data = await requester.response 

487 finally: 

488 if not requester.response.done(): 

489 requester.response.cancel() 

490 if options.observe and not requester.observation.cancelled: 

491 requester.observation.cancel() 

492 

493 log.info("Received response:") 

494 for line in message_to_text(response_data, "from"): 

495 log.info(line) 

496 

497 response_uri = response_data.get_request_uri() 

498 if requested_uri != response_uri: 

499 print( 

500 colored( 

501 f"Response arrived from different address; base URI is {response_uri}", 

502 options, 

503 lambda token: token.Generic.Inserted, 

504 ), 

505 file=sys.stderr, 

506 ) 

507 if response_data.code.is_successful(): 

508 present(response_data, options) 

509 else: 

510 print( 

511 colored(response_data.code, options, lambda token: token.Generic.Error), 

512 file=sys.stderr, 

513 ) 

514 present(response_data, options, file=sys.stderr) 

515 sys.exit(1) 

516 

517 if options.observe: 

518 exit_reason = await observation_is_over 

519 print("Observation is over: %r" % (exit_reason,), file=sys.stderr) 

520 except aiocoap.error.HelpfulError as e: 

521 print(str(e), file=sys.stderr) 

522 extra_help = e.extra_help( 

523 hints=dict( 

524 original_uri=options.url, 

525 request=request, 

526 ) 

527 ) 

528 if extra_help: 

529 print("Debugging hint:", extra_help, file=sys.stderr) 

530 sys.exit(1) 

531 # Fallback while not all backends raise NetworkErrors 

532 except OSError as e: 

533 text = str(e) 

534 if not text: 

535 text = repr(e) 

536 if not text: 

537 # eg ConnectionResetError flying out of a misconfigured SSL server 

538 text = type(e) 

539 print( 

540 "Warning: OS errors should not be raised this way any more.", 

541 file=sys.stderr, 

542 ) 

543 # not telling what to do precisely: the form already tells users to 

544 # include `aiocoap.cli.defaults` output, which is exactly what we 

545 # need. 

546 print( 

547 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.", 

548 file=sys.stderr, 

549 ) 

550 print("Error:", text, file=sys.stderr) 

551 sys.exit(1) 

552 

553 

554async def single_request_with_context(args): 

555 """Wrapper around single_request until sync_main gets made fully async, and 

556 async context managers are used to manage contexts.""" 

557 context = await aiocoap.Context.create_client_context() 

558 try: 

559 await single_request(args, context) 

560 finally: 

561 await context.shutdown() 

562 

563 

564interactive_expecting_keyboard_interrupt = None 

565 

566 

567async def interactive(globalopts): 

568 global interactive_expecting_keyboard_interrupt 

569 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future() 

570 

571 context = await aiocoap.Context.create_client_context() 

572 

573 while True: 

574 try: 

575 # when http://bugs.python.org/issue22412 is resolved, use that instead 

576 line = await asyncio.get_event_loop().run_in_executor( 

577 None, lambda: input("aiocoap> ") 

578 ) 

579 except EOFError: 

580 line = "exit" 

581 line = shlex.split(line) 

582 if not line: 

583 continue 

584 if line in (["help"], ["?"]): 

585 line = ["--help"] 

586 if line in (["quit"], ["q"], ["exit"]): 

587 break 

588 

589 current_task = asyncio.create_task( 

590 single_request(line, context=context, globalopts=globalopts), 

591 name="Interactive prompt command %r" % line, 

592 ) 

593 interactive_expecting_keyboard_interrupt = ( 

594 asyncio.get_event_loop().create_future() 

595 ) 

596 

597 done, pending = await asyncio.wait( 

598 [current_task, interactive_expecting_keyboard_interrupt], 

599 return_when=asyncio.FIRST_COMPLETED, 

600 ) 

601 

602 if current_task not in done: 

603 current_task.cancel() 

604 else: 

605 try: 

606 await current_task 

607 except SystemExit as e: 

608 if e.code != 0: 

609 print("Exit code: %d" % e.code, file=sys.stderr) 

610 continue 

611 except Exception as e: 

612 print("Unhandled exception raised: %s" % (e,)) 

613 

614 await context.shutdown() 

615 

616 

617def sync_main(args=None): 

618 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet 

619 if args is None: 

620 args = sys.argv[1:] 

621 

622 # This one is tolerant and doesn't even terminate with --help, so that 

623 # --help and --interactive --help can do the right thing. 

624 first_parser = build_parser(prescreen=True) 

625 first_args = first_parser.parse_args(args) 

626 

627 configure_logging( 

628 (first_args.verbose or 0) - (first_args.quiet or 0), first_args.color 

629 ) 

630 

631 if not first_args.interactive: 

632 try: 

633 asyncio.run(single_request_with_context(args)) 

634 except KeyboardInterrupt: 

635 sys.exit(3) 

636 else: 

637 global_parser = build_parser(use_interactive=False) 

638 globalopts = global_parser.parse_args(args) 

639 

640 loop = asyncio.get_event_loop() 

641 task = loop.create_task( 

642 interactive(globalopts), 

643 name="Interactive prompt", 

644 ) 

645 

646 while not task.done(): 

647 try: 

648 loop.run_until_complete(task) 

649 except KeyboardInterrupt: 

650 if not interactive_expecting_keyboard_interrupt.done(): 

651 interactive_expecting_keyboard_interrupt.set_result(None) 

652 except SystemExit: 

653 continue # asyncio/tasks.py(242) raises those after setting them as results, but we particularly want them back in the loop 

654 

655 

656if __name__ == "__main__": 

657 sync_main()