Coverage for aiocoap/cli/client.py: 57%
301 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers"""
7import sys
8import asyncio
9import argparse
10import logging
11import subprocess
12from pathlib import Path
14import shlex
16# even though not used directly, this has side effects on the input() function
17# used in interactive mode
18try:
19 import readline # noqa: F401
20except ImportError:
21 pass # that's normal on some platforms, and ok since it's just a usability enhancement
23import aiocoap
24import aiocoap.defaults
25import aiocoap.meta
26import aiocoap.proxy.client
27from aiocoap.util import contenttype
28from aiocoap.util.cli import ActionNoYes
29from aiocoap.numbers import ContentFormat
32def build_parser():
33 p = argparse.ArgumentParser(description=__doc__)
34 p.add_argument(
35 "--non",
36 help="Send request as non-confirmable (NON) message",
37 action="store_true",
38 )
39 p.add_argument(
40 "-m",
41 "--method",
42 help="Name or number of request method to use (default: %(default)s)",
43 default="GET",
44 )
45 p.add_argument(
46 "--observe", help="Register an observation on the resource", action="store_true"
47 )
48 p.add_argument(
49 "--observe-exec",
50 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin",
51 metavar="CMD",
52 )
53 p.add_argument(
54 "--accept",
55 help="Content format to request",
56 metavar="MIME",
57 )
58 p.add_argument(
59 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI"
60 )
61 p.add_argument(
62 "--payload",
63 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.",
64 metavar="X",
65 )
66 p.add_argument(
67 "--payload-initial-szx",
68 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)",
69 metavar="SZX",
70 type=int,
71 )
72 p.add_argument(
73 "--content-format",
74 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.",
75 metavar="MIME",
76 )
77 p.add_argument(
78 "--no-set-hostname",
79 help="Suppress transmission of Uri-Host even if the host name is not an IP literal",
80 dest="set_hostname",
81 action="store_false",
82 default=True,
83 )
84 p.add_argument(
85 "-v",
86 "--verbose",
87 help="Increase the debug output",
88 action="count",
89 )
90 p.add_argument(
91 "-q",
92 "--quiet",
93 help="Decrease the debug output",
94 action="count",
95 )
96 # careful: picked before parsing
97 p.add_argument(
98 "--interactive",
99 help="Enter interactive mode",
100 action="store_true",
101 )
102 p.add_argument(
103 "--credentials",
104 help="Load credentials to use from a given file",
105 type=Path,
106 )
107 p.add_argument(
108 "--version", action="version", version="%(prog)s " + aiocoap.meta.version
109 )
111 p.add_argument(
112 "--color",
113 help="Color output (default on TTYs if all required modules are installed)",
114 default=None,
115 action=ActionNoYes,
116 )
117 p.add_argument(
118 "--pretty-print",
119 help="Pretty-print known content formats (default on TTYs if all required modules are installed)",
120 default=None,
121 action=ActionNoYes,
122 )
123 p.add_argument(
124 "url",
125 help="CoAP address to fetch",
126 )
128 return p
131def configure_logging(verbosity):
132 logging.basicConfig()
134 if verbosity <= -2:
135 logging.getLogger("coap").setLevel(logging.CRITICAL + 1)
136 elif verbosity == -1:
137 logging.getLogger("coap").setLevel(logging.ERROR)
138 elif verbosity == 0:
139 logging.getLogger("coap").setLevel(logging.WARNING)
140 elif verbosity == 1:
141 logging.getLogger("coap").setLevel(logging.INFO)
142 elif verbosity >= 2:
143 logging.getLogger("coap").setLevel(logging.DEBUG)
146def colored(text, options, tokenlambda):
147 """Apply pygments based coloring if options.color is set. Tokelambda is a
148 callback to which pygments.token is passed and which returns a token type;
149 this makes it easy to not need to conditionally react to pygments' possible
150 absence in all color locations."""
151 if not options.color:
152 return str(text)
154 from pygments.formatters import TerminalFormatter
155 from pygments import token, format
157 return format(
158 [(tokenlambda(token), str(text))],
159 TerminalFormatter(),
160 )
163def incoming_observation(options, response):
164 if options.observe_exec:
165 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE)
166 # FIXME this blocks
167 p.communicate(response.payload)
168 else:
169 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc))
170 if response.code.is_successful():
171 present(response, options, file=sys.stderr)
172 else:
173 sys.stdout.flush()
174 print(
175 colored(
176 response.code, options, lambda token: token.Token.Generic.Error
177 ),
178 file=sys.stderr,
179 )
180 if response.payload:
181 present(response, options, file=sys.stderr)
184def apply_credentials(context, credentials, errfn):
185 if credentials.suffix == ".json":
186 import json
188 context.client_credentials.load_from_dict(json.load(credentials.open("rb")))
189 elif credentials.suffix == ".diag":
190 try:
191 import cbor_diag
192 import cbor2
193 except ImportError:
194 raise errfn(
195 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package"
196 )
197 context.client_credentials.load_from_dict(
198 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read()))
199 )
200 else:
201 raise errfn(
202 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix)
203 )
206def present(message, options, file=sys.stdout):
207 """Write a message payload to the output, pretty printing and/or coloring
208 it as configured in the options."""
209 if not options.quiet and (message.opt.location_path or message.opt.location_query):
210 # FIXME: Percent encoding is completely missing; this would be done
211 # most easily with a CRI library
212 location_ref = "/" + "/".join(message.opt.location_path)
213 if message.opt.location_query:
214 location_ref += "?" + "&".join(message.opt.location_query)
215 print(
216 colored(
217 f"Location options indicate new resource: {location_ref}",
218 options,
219 lambda token: token.Token.Generic.Inserted,
220 ),
221 file=sys.stderr,
222 )
224 if not message.payload:
225 return
227 payload = None
229 cf = message.opt.content_format or message.request.opt.content_format
230 if cf is not None and cf.is_known():
231 mime = cf.media_type
232 else:
233 mime = "application/octet-stream"
234 if options.pretty_print:
235 from aiocoap.util.prettyprint import pretty_print
237 prettyprinted = pretty_print(message)
238 if prettyprinted is not None:
239 (infos, mime, payload) = prettyprinted
240 if not options.quiet:
241 for i in infos:
242 print(
243 colored("# " + i, options, lambda token: token.Comment),
244 file=sys.stderr,
245 )
247 color = options.color
248 if color:
249 from aiocoap.util.prettyprint import lexer_for_mime
250 import pygments
252 try:
253 lexer = lexer_for_mime(mime)
254 except pygments.util.ClassNotFound:
255 color = False
257 if color and payload is None:
258 # Coloring requires a unicode-string style payload, either from the
259 # mime type or from the pretty printer.
260 try:
261 payload = message.payload.decode("utf8")
262 except UnicodeDecodeError:
263 color = False
265 if color:
266 from pygments.formatters import TerminalFormatter
267 from pygments import highlight
269 highlit = highlight(
270 payload,
271 lexer,
272 TerminalFormatter(),
273 )
274 # The TerminalFormatter already adds an end-of-line character, not
275 # trying to add one for any missing trailing newlines.
276 print(highlit, file=file, end="")
277 file.flush()
278 else:
279 if payload is None:
280 file.buffer.write(message.payload)
281 if file.isatty() and message.payload[-1:] != b"\n":
282 file.write("\n")
283 else:
284 file.write(payload)
285 if file.isatty() and payload[-1] != "\n":
286 file.write("\n")
289async def single_request(args, context):
290 parser = build_parser()
291 options = parser.parse_args(args)
293 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules()
294 if pretty_print_modules and (options.color is True or options.pretty_print is True):
295 parser.error(
296 "Color and pretty printing require the following"
297 " additional module(s) to be installed: %s"
298 % ", ".join(pretty_print_modules)
299 )
300 if options.color is None:
301 options.color = sys.stdout.isatty() and not pretty_print_modules
302 if options.pretty_print is None:
303 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules
305 configure_logging((options.verbose or 0) - (options.quiet or 0))
307 try:
308 try:
309 code = getattr(
310 aiocoap.numbers.codes.Code,
311 options.method.upper().replace("IPATCH", "iPATCH"),
312 )
313 except AttributeError:
314 try:
315 code = aiocoap.numbers.codes.Code(int(options.method))
316 except ValueError:
317 raise parser.error("Unknown method")
319 if options.credentials is not None:
320 apply_credentials(context, options.credentials, parser.error)
322 request = aiocoap.Message(
323 code=code, mtype=aiocoap.NON if options.non else aiocoap.CON
324 )
325 request.set_request_uri(options.url, set_uri_host=options.set_hostname)
327 if options.accept:
328 try:
329 request.opt.accept = ContentFormat(int(options.accept))
330 except ValueError:
331 try:
332 request.opt.accept = ContentFormat.by_media_type(options.accept)
333 except KeyError:
334 raise parser.error("Unknown accept type")
336 if options.observe:
337 request.opt.observe = 0
338 observation_is_over = asyncio.get_event_loop().create_future()
340 if options.content_format:
341 try:
342 request.opt.content_format = ContentFormat(int(options.content_format))
343 except ValueError:
344 try:
345 request.opt.content_format = ContentFormat.by_media_type(
346 options.content_format
347 )
348 except KeyError:
349 raise parser.error("Unknown content format")
351 if options.payload:
352 if options.payload.startswith("@"):
353 filename = options.payload[1:]
354 if filename == "-":
355 f = sys.stdin.buffer
356 else:
357 f = open(filename, "rb")
358 try:
359 request.payload = f.read()
360 except OSError as e:
361 raise parser.error("File could not be opened: %s" % e)
362 else:
363 request_classification = contenttype.categorize(
364 request.opt.content_format.media_type
365 if request.opt.content_format is not None
366 and request.opt.content_format.is_known()
367 else ""
368 )
369 if request_classification in ("cbor", "cbor-seq"):
370 try:
371 import cbor_diag
372 except ImportError as e:
373 raise parser.error(f"CBOR recoding not available ({e})")
375 try:
376 encoded = cbor_diag.diag2cbor(options.payload)
377 except ValueError as e:
378 raise parser.error(
379 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}"
380 )
382 if request_classification == "cbor-seq":
383 try:
384 import cbor2
385 except ImportError as e:
386 raise parser.error(
387 f"CBOR sequence recoding not available ({e})"
388 )
389 decoded = cbor2.loads(encoded)
390 if not isinstance(decoded, list):
391 raise parser.error(
392 "CBOR sequence recoding requires an array as the top-level element."
393 )
394 request.payload = b"".join(cbor2.dumps(d) for d in decoded)
395 else:
396 request.payload = encoded
397 else:
398 request.payload = options.payload.encode("utf8")
400 if options.payload_initial_szx is not None:
401 request.remote.maximum_block_size_exp = options.payload_initial_szx
403 if options.proxy is None:
404 interface = context
405 else:
406 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context)
408 requested_uri = request.get_request_uri()
410 requester = interface.request(request)
412 if options.observe:
413 requester.observation.register_errback(observation_is_over.set_result)
414 requester.observation.register_callback(
415 lambda data, options=options: incoming_observation(options, data)
416 )
418 try:
419 response_data = await requester.response
420 finally:
421 if not requester.response.done():
422 requester.response.cancel()
423 if options.observe and not requester.observation.cancelled:
424 requester.observation.cancel()
426 response_uri = response_data.get_request_uri()
427 if requested_uri != response_uri:
428 print(
429 colored(
430 f"Response arrived from different address; base URI is {response_uri}",
431 options,
432 lambda token: token.Generic.Inserted,
433 ),
434 file=sys.stderr,
435 )
436 if response_data.code.is_successful():
437 present(response_data, options)
438 else:
439 print(
440 colored(response_data.code, options, lambda token: token.Generic.Error),
441 file=sys.stderr,
442 )
443 present(response_data, options, file=sys.stderr)
444 sys.exit(1)
446 if options.observe:
447 exit_reason = await observation_is_over
448 print("Observation is over: %r" % (exit_reason,), file=sys.stderr)
449 except aiocoap.error.HelpfulError as e:
450 print(str(e), file=sys.stderr)
451 extra_help = e.extra_help(
452 hints=dict(
453 original_uri=options.url,
454 request=request,
455 )
456 )
457 if extra_help:
458 print("Debugging hint:", extra_help, file=sys.stderr)
459 sys.exit(1)
460 # Fallback while not all backends raise NetworkErrors
461 except OSError as e:
462 text = str(e)
463 if not text:
464 text = repr(e)
465 if not text:
466 # eg ConnectionResetError flying out of a misconfigured SSL server
467 text = type(e)
468 print(
469 "Warning: OS errors should not be raised this way any more.",
470 file=sys.stderr,
471 )
472 # not telling what to do precisely: the form already tells users to
473 # include `aiocoap.cli.defaults` output, which is exactly what we
474 # need.
475 print(
476 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.",
477 file=sys.stderr,
478 )
479 print("Error:", text, file=sys.stderr)
480 sys.exit(1)
483async def single_request_with_context(args):
484 """Wrapper around single_request until sync_main gets made fully async, and
485 async context managers are used to manage contexts."""
486 context = await aiocoap.Context.create_client_context()
487 try:
488 await single_request(args, context)
489 finally:
490 await context.shutdown()
493interactive_expecting_keyboard_interrupt = None
496async def interactive():
497 global interactive_expecting_keyboard_interrupt
498 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future()
500 context = await aiocoap.Context.create_client_context()
502 while True:
503 try:
504 # when http://bugs.python.org/issue22412 is resolved, use that instead
505 line = await asyncio.get_event_loop().run_in_executor(
506 None, lambda: input("aiocoap> ")
507 )
508 except EOFError:
509 line = "exit"
510 line = shlex.split(line)
511 if not line:
512 continue
513 if line in (["help"], ["?"]):
514 line = ["--help"]
515 if line in (["quit"], ["q"], ["exit"]):
516 break
518 current_task = asyncio.create_task(
519 single_request(line, context=context),
520 name="Interactive prompt command %r" % line,
521 )
522 interactive_expecting_keyboard_interrupt = (
523 asyncio.get_event_loop().create_future()
524 )
526 done, pending = await asyncio.wait(
527 [current_task, interactive_expecting_keyboard_interrupt],
528 return_when=asyncio.FIRST_COMPLETED,
529 )
531 if current_task not in done:
532 current_task.cancel()
533 else:
534 try:
535 await current_task
536 except SystemExit as e:
537 if e.code != 0:
538 print("Exit code: %d" % e.code, file=sys.stderr)
539 continue
540 except Exception as e:
541 print("Unhandled exception raised: %s" % (e,))
543 await context.shutdown()
546def sync_main(args=None):
547 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet
548 if args is None:
549 args = sys.argv[1:]
551 if "--interactive" not in args:
552 try:
553 asyncio.run(single_request_with_context(args))
554 except KeyboardInterrupt:
555 sys.exit(3)
556 else:
557 if len(args) != 1:
558 print(
559 "No other arguments must be specified when entering interactive mode",
560 file=sys.stderr,
561 )
562 sys.exit(1)
564 loop = asyncio.get_event_loop()
565 task = loop.create_task(
566 interactive(),
567 name="Interactive prompt",
568 )
570 while not task.done():
571 try:
572 loop.run_until_complete(task)
573 except KeyboardInterrupt:
574 if not interactive_expecting_keyboard_interrupt.done():
575 interactive_expecting_keyboard_interrupt.set_result(None)
576 except SystemExit:
577 continue # asyncio/tasks.py(242) raises those after setting them as results, but we particularly want them back in the loop
580if __name__ == "__main__":
581 sync_main()