Coverage for aiocoap/cli/client.py: 57%

301 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-28 12:34 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers""" 

6 

7import sys 

8import asyncio 

9import argparse 

10import logging 

11import subprocess 

12from pathlib import Path 

13 

14import shlex 

15 

16# even though not used directly, this has side effects on the input() function 

17# used in interactive mode 

18try: 

19 import readline # noqa: F401 

20except ImportError: 

21 pass # that's normal on some platforms, and ok since it's just a usability enhancement 

22 

23import aiocoap 

24import aiocoap.defaults 

25import aiocoap.meta 

26import aiocoap.proxy.client 

27from aiocoap.util import contenttype 

28from aiocoap.util.cli import ActionNoYes 

29from aiocoap.numbers import ContentFormat 

30 

31 

32def build_parser(): 

33 p = argparse.ArgumentParser(description=__doc__) 

34 p.add_argument( 

35 "--non", 

36 help="Send request as non-confirmable (NON) message", 

37 action="store_true", 

38 ) 

39 p.add_argument( 

40 "-m", 

41 "--method", 

42 help="Name or number of request method to use (default: %(default)s)", 

43 default="GET", 

44 ) 

45 p.add_argument( 

46 "--observe", help="Register an observation on the resource", action="store_true" 

47 ) 

48 p.add_argument( 

49 "--observe-exec", 

50 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin", 

51 metavar="CMD", 

52 ) 

53 p.add_argument( 

54 "--accept", 

55 help="Content format to request", 

56 metavar="MIME", 

57 ) 

58 p.add_argument( 

59 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI" 

60 ) 

61 p.add_argument( 

62 "--payload", 

63 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.", 

64 metavar="X", 

65 ) 

66 p.add_argument( 

67 "--payload-initial-szx", 

68 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)", 

69 metavar="SZX", 

70 type=int, 

71 ) 

72 p.add_argument( 

73 "--content-format", 

74 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.", 

75 metavar="MIME", 

76 ) 

77 p.add_argument( 

78 "--no-set-hostname", 

79 help="Suppress transmission of Uri-Host even if the host name is not an IP literal", 

80 dest="set_hostname", 

81 action="store_false", 

82 default=True, 

83 ) 

84 p.add_argument( 

85 "-v", 

86 "--verbose", 

87 help="Increase the debug output", 

88 action="count", 

89 ) 

90 p.add_argument( 

91 "-q", 

92 "--quiet", 

93 help="Decrease the debug output", 

94 action="count", 

95 ) 

96 # careful: picked before parsing 

97 p.add_argument( 

98 "--interactive", 

99 help="Enter interactive mode", 

100 action="store_true", 

101 ) 

102 p.add_argument( 

103 "--credentials", 

104 help="Load credentials to use from a given file", 

105 type=Path, 

106 ) 

107 p.add_argument( 

108 "--version", action="version", version="%(prog)s " + aiocoap.meta.version 

109 ) 

110 

111 p.add_argument( 

112 "--color", 

113 help="Color output (default on TTYs if all required modules are installed)", 

114 default=None, 

115 action=ActionNoYes, 

116 ) 

117 p.add_argument( 

118 "--pretty-print", 

119 help="Pretty-print known content formats (default on TTYs if all required modules are installed)", 

120 default=None, 

121 action=ActionNoYes, 

122 ) 

123 p.add_argument( 

124 "url", 

125 help="CoAP address to fetch", 

126 ) 

127 

128 return p 

129 

130 

131def configure_logging(verbosity): 

132 logging.basicConfig() 

133 

134 if verbosity <= -2: 

135 logging.getLogger("coap").setLevel(logging.CRITICAL + 1) 

136 elif verbosity == -1: 

137 logging.getLogger("coap").setLevel(logging.ERROR) 

138 elif verbosity == 0: 

139 logging.getLogger("coap").setLevel(logging.WARNING) 

140 elif verbosity == 1: 

141 logging.getLogger("coap").setLevel(logging.INFO) 

142 elif verbosity >= 2: 

143 logging.getLogger("coap").setLevel(logging.DEBUG) 

144 

145 

146def colored(text, options, tokenlambda): 

147 """Apply pygments based coloring if options.color is set. Tokelambda is a 

148 callback to which pygments.token is passed and which returns a token type; 

149 this makes it easy to not need to conditionally react to pygments' possible 

150 absence in all color locations.""" 

151 if not options.color: 

152 return str(text) 

153 

154 from pygments.formatters import TerminalFormatter 

155 from pygments import token, format 

156 

157 return format( 

158 [(tokenlambda(token), str(text))], 

159 TerminalFormatter(), 

160 ) 

161 

162 

163def incoming_observation(options, response): 

164 if options.observe_exec: 

165 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE) 

166 # FIXME this blocks 

167 p.communicate(response.payload) 

168 else: 

169 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc)) 

170 if response.code.is_successful(): 

171 present(response, options, file=sys.stderr) 

172 else: 

173 sys.stdout.flush() 

174 print( 

175 colored( 

176 response.code, options, lambda token: token.Token.Generic.Error 

177 ), 

178 file=sys.stderr, 

179 ) 

180 if response.payload: 

181 present(response, options, file=sys.stderr) 

182 

183 

184def apply_credentials(context, credentials, errfn): 

185 if credentials.suffix == ".json": 

186 import json 

187 

188 context.client_credentials.load_from_dict(json.load(credentials.open("rb"))) 

189 elif credentials.suffix == ".diag": 

190 try: 

191 import cbor_diag 

192 import cbor2 

193 except ImportError: 

194 raise errfn( 

195 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package" 

196 ) 

197 context.client_credentials.load_from_dict( 

198 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read())) 

199 ) 

200 else: 

201 raise errfn( 

202 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix) 

203 ) 

204 

205 

206def present(message, options, file=sys.stdout): 

207 """Write a message payload to the output, pretty printing and/or coloring 

208 it as configured in the options.""" 

209 if not options.quiet and (message.opt.location_path or message.opt.location_query): 

210 # FIXME: Percent encoding is completely missing; this would be done 

211 # most easily with a CRI library 

212 location_ref = "/" + "/".join(message.opt.location_path) 

213 if message.opt.location_query: 

214 location_ref += "?" + "&".join(message.opt.location_query) 

215 print( 

216 colored( 

217 f"Location options indicate new resource: {location_ref}", 

218 options, 

219 lambda token: token.Token.Generic.Inserted, 

220 ), 

221 file=sys.stderr, 

222 ) 

223 

224 if not message.payload: 

225 return 

226 

227 payload = None 

228 

229 cf = message.opt.content_format or message.request.opt.content_format 

230 if cf is not None and cf.is_known(): 

231 mime = cf.media_type 

232 else: 

233 mime = "application/octet-stream" 

234 if options.pretty_print: 

235 from aiocoap.util.prettyprint import pretty_print 

236 

237 prettyprinted = pretty_print(message) 

238 if prettyprinted is not None: 

239 (infos, mime, payload) = prettyprinted 

240 if not options.quiet: 

241 for i in infos: 

242 print( 

243 colored("# " + i, options, lambda token: token.Comment), 

244 file=sys.stderr, 

245 ) 

246 

247 color = options.color 

248 if color: 

249 from aiocoap.util.prettyprint import lexer_for_mime 

250 import pygments 

251 

252 try: 

253 lexer = lexer_for_mime(mime) 

254 except pygments.util.ClassNotFound: 

255 color = False 

256 

257 if color and payload is None: 

258 # Coloring requires a unicode-string style payload, either from the 

259 # mime type or from the pretty printer. 

260 try: 

261 payload = message.payload.decode("utf8") 

262 except UnicodeDecodeError: 

263 color = False 

264 

265 if color: 

266 from pygments.formatters import TerminalFormatter 

267 from pygments import highlight 

268 

269 highlit = highlight( 

270 payload, 

271 lexer, 

272 TerminalFormatter(), 

273 ) 

274 # The TerminalFormatter already adds an end-of-line character, not 

275 # trying to add one for any missing trailing newlines. 

276 print(highlit, file=file, end="") 

277 file.flush() 

278 else: 

279 if payload is None: 

280 file.buffer.write(message.payload) 

281 if file.isatty() and message.payload[-1:] != b"\n": 

282 file.write("\n") 

283 else: 

284 file.write(payload) 

285 if file.isatty() and payload[-1] != "\n": 

286 file.write("\n") 

287 

288 

289async def single_request(args, context): 

290 parser = build_parser() 

291 options = parser.parse_args(args) 

292 

293 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules() 

294 if pretty_print_modules and (options.color is True or options.pretty_print is True): 

295 parser.error( 

296 "Color and pretty printing require the following" 

297 " additional module(s) to be installed: %s" 

298 % ", ".join(pretty_print_modules) 

299 ) 

300 if options.color is None: 

301 options.color = sys.stdout.isatty() and not pretty_print_modules 

302 if options.pretty_print is None: 

303 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules 

304 

305 configure_logging((options.verbose or 0) - (options.quiet or 0)) 

306 

307 try: 

308 try: 

309 code = getattr( 

310 aiocoap.numbers.codes.Code, 

311 options.method.upper().replace("IPATCH", "iPATCH"), 

312 ) 

313 except AttributeError: 

314 try: 

315 code = aiocoap.numbers.codes.Code(int(options.method)) 

316 except ValueError: 

317 raise parser.error("Unknown method") 

318 

319 if options.credentials is not None: 

320 apply_credentials(context, options.credentials, parser.error) 

321 

322 request = aiocoap.Message( 

323 code=code, mtype=aiocoap.NON if options.non else aiocoap.CON 

324 ) 

325 request.set_request_uri(options.url, set_uri_host=options.set_hostname) 

326 

327 if options.accept: 

328 try: 

329 request.opt.accept = ContentFormat(int(options.accept)) 

330 except ValueError: 

331 try: 

332 request.opt.accept = ContentFormat.by_media_type(options.accept) 

333 except KeyError: 

334 raise parser.error("Unknown accept type") 

335 

336 if options.observe: 

337 request.opt.observe = 0 

338 observation_is_over = asyncio.get_event_loop().create_future() 

339 

340 if options.content_format: 

341 try: 

342 request.opt.content_format = ContentFormat(int(options.content_format)) 

343 except ValueError: 

344 try: 

345 request.opt.content_format = ContentFormat.by_media_type( 

346 options.content_format 

347 ) 

348 except KeyError: 

349 raise parser.error("Unknown content format") 

350 

351 if options.payload: 

352 if options.payload.startswith("@"): 

353 filename = options.payload[1:] 

354 if filename == "-": 

355 f = sys.stdin.buffer 

356 else: 

357 f = open(filename, "rb") 

358 try: 

359 request.payload = f.read() 

360 except OSError as e: 

361 raise parser.error("File could not be opened: %s" % e) 

362 else: 

363 request_classification = contenttype.categorize( 

364 request.opt.content_format.media_type 

365 if request.opt.content_format is not None 

366 and request.opt.content_format.is_known() 

367 else "" 

368 ) 

369 if request_classification in ("cbor", "cbor-seq"): 

370 try: 

371 import cbor_diag 

372 except ImportError as e: 

373 raise parser.error(f"CBOR recoding not available ({e})") 

374 

375 try: 

376 encoded = cbor_diag.diag2cbor(options.payload) 

377 except ValueError as e: 

378 raise parser.error( 

379 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}" 

380 ) 

381 

382 if request_classification == "cbor-seq": 

383 try: 

384 import cbor2 

385 except ImportError as e: 

386 raise parser.error( 

387 f"CBOR sequence recoding not available ({e})" 

388 ) 

389 decoded = cbor2.loads(encoded) 

390 if not isinstance(decoded, list): 

391 raise parser.error( 

392 "CBOR sequence recoding requires an array as the top-level element." 

393 ) 

394 request.payload = b"".join(cbor2.dumps(d) for d in decoded) 

395 else: 

396 request.payload = encoded 

397 else: 

398 request.payload = options.payload.encode("utf8") 

399 

400 if options.payload_initial_szx is not None: 

401 request.remote.maximum_block_size_exp = options.payload_initial_szx 

402 

403 if options.proxy is None: 

404 interface = context 

405 else: 

406 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context) 

407 

408 requested_uri = request.get_request_uri() 

409 

410 requester = interface.request(request) 

411 

412 if options.observe: 

413 requester.observation.register_errback(observation_is_over.set_result) 

414 requester.observation.register_callback( 

415 lambda data, options=options: incoming_observation(options, data) 

416 ) 

417 

418 try: 

419 response_data = await requester.response 

420 finally: 

421 if not requester.response.done(): 

422 requester.response.cancel() 

423 if options.observe and not requester.observation.cancelled: 

424 requester.observation.cancel() 

425 

426 response_uri = response_data.get_request_uri() 

427 if requested_uri != response_uri: 

428 print( 

429 colored( 

430 f"Response arrived from different address; base URI is {response_uri}", 

431 options, 

432 lambda token: token.Generic.Inserted, 

433 ), 

434 file=sys.stderr, 

435 ) 

436 if response_data.code.is_successful(): 

437 present(response_data, options) 

438 else: 

439 print( 

440 colored(response_data.code, options, lambda token: token.Generic.Error), 

441 file=sys.stderr, 

442 ) 

443 present(response_data, options, file=sys.stderr) 

444 sys.exit(1) 

445 

446 if options.observe: 

447 exit_reason = await observation_is_over 

448 print("Observation is over: %r" % (exit_reason,), file=sys.stderr) 

449 except aiocoap.error.HelpfulError as e: 

450 print(str(e), file=sys.stderr) 

451 extra_help = e.extra_help( 

452 hints=dict( 

453 original_uri=options.url, 

454 request=request, 

455 ) 

456 ) 

457 if extra_help: 

458 print("Debugging hint:", extra_help, file=sys.stderr) 

459 sys.exit(1) 

460 # Fallback while not all backends raise NetworkErrors 

461 except OSError as e: 

462 text = str(e) 

463 if not text: 

464 text = repr(e) 

465 if not text: 

466 # eg ConnectionResetError flying out of a misconfigured SSL server 

467 text = type(e) 

468 print( 

469 "Warning: OS errors should not be raised this way any more.", 

470 file=sys.stderr, 

471 ) 

472 # not telling what to do precisely: the form already tells users to 

473 # include `aiocoap.cli.defaults` output, which is exactly what we 

474 # need. 

475 print( 

476 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.", 

477 file=sys.stderr, 

478 ) 

479 print("Error:", text, file=sys.stderr) 

480 sys.exit(1) 

481 

482 

483async def single_request_with_context(args): 

484 """Wrapper around single_request until sync_main gets made fully async, and 

485 async context managers are used to manage contexts.""" 

486 context = await aiocoap.Context.create_client_context() 

487 try: 

488 await single_request(args, context) 

489 finally: 

490 await context.shutdown() 

491 

492 

493interactive_expecting_keyboard_interrupt = None 

494 

495 

496async def interactive(): 

497 global interactive_expecting_keyboard_interrupt 

498 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future() 

499 

500 context = await aiocoap.Context.create_client_context() 

501 

502 while True: 

503 try: 

504 # when http://bugs.python.org/issue22412 is resolved, use that instead 

505 line = await asyncio.get_event_loop().run_in_executor( 

506 None, lambda: input("aiocoap> ") 

507 ) 

508 except EOFError: 

509 line = "exit" 

510 line = shlex.split(line) 

511 if not line: 

512 continue 

513 if line in (["help"], ["?"]): 

514 line = ["--help"] 

515 if line in (["quit"], ["q"], ["exit"]): 

516 break 

517 

518 current_task = asyncio.create_task( 

519 single_request(line, context=context), 

520 name="Interactive prompt command %r" % line, 

521 ) 

522 interactive_expecting_keyboard_interrupt = ( 

523 asyncio.get_event_loop().create_future() 

524 ) 

525 

526 done, pending = await asyncio.wait( 

527 [current_task, interactive_expecting_keyboard_interrupt], 

528 return_when=asyncio.FIRST_COMPLETED, 

529 ) 

530 

531 if current_task not in done: 

532 current_task.cancel() 

533 else: 

534 try: 

535 await current_task 

536 except SystemExit as e: 

537 if e.code != 0: 

538 print("Exit code: %d" % e.code, file=sys.stderr) 

539 continue 

540 except Exception as e: 

541 print("Unhandled exception raised: %s" % (e,)) 

542 

543 await context.shutdown() 

544 

545 

546def sync_main(args=None): 

547 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet 

548 if args is None: 

549 args = sys.argv[1:] 

550 

551 if "--interactive" not in args: 

552 try: 

553 asyncio.run(single_request_with_context(args)) 

554 except KeyboardInterrupt: 

555 sys.exit(3) 

556 else: 

557 if len(args) != 1: 

558 print( 

559 "No other arguments must be specified when entering interactive mode", 

560 file=sys.stderr, 

561 ) 

562 sys.exit(1) 

563 

564 loop = asyncio.get_event_loop() 

565 task = loop.create_task( 

566 interactive(), 

567 name="Interactive prompt", 

568 ) 

569 

570 while not task.done(): 

571 try: 

572 loop.run_until_complete(task) 

573 except KeyboardInterrupt: 

574 if not interactive_expecting_keyboard_interrupt.done(): 

575 interactive_expecting_keyboard_interrupt.set_result(None) 

576 except SystemExit: 

577 continue # asyncio/tasks.py(242) raises those after setting them as results, but we particularly want them back in the loop 

578 

579 

580if __name__ == "__main__": 

581 sync_main()