Coverage for aiocoap/cli/client.py: 72%
360 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-05 18:37 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-05 18:37 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers"""
7import copy
8import sys
9import asyncio
10import argparse
11import logging
12import signal
13import subprocess
14from pathlib import Path
16import shlex
18# even though not used directly, this has side effects on the input() function
19# used in interactive mode
20try:
21 import readline # noqa: F401
22except ImportError:
23 pass # that's normal on some platforms, and ok since it's just a usability enhancement
25import aiocoap
26import aiocoap.defaults
27import aiocoap.meta
28import aiocoap.proxy.client
29from aiocoap.util import contenttype
30from aiocoap.util.cli import ActionNoYes
31from aiocoap.numbers import ContentFormat
33log = logging.getLogger("coap.aiocoap-client")
36def augment_parser_for_global(p, *, prescreen=False):
37 p.add_argument(
38 "-v",
39 "--verbose",
40 help="Increase the debug output",
41 action="count",
42 )
43 p.add_argument(
44 "-q",
45 "--quiet",
46 help="Decrease the debug output",
47 action="count",
48 )
49 p.add_argument(
50 "--version", action="version", version="%(prog)s " + aiocoap.meta.version
51 )
53 p.add_argument(
54 "--interactive",
55 help="Enter interactive mode. Combine with --help or run `help` interactively to see which options apply where; some can be used globally and overwritten locally.",
56 action="store_true",
57 )
60def augment_parser_for_either(p):
61 p.add_argument(
62 "--color",
63 help="Color output (default on TTYs if all required modules are installed)",
64 default=None,
65 action=ActionNoYes,
66 )
67 p.add_argument(
68 "--pretty-print",
69 help="Pretty-print known content formats (default on TTYs if all required modules are installed)",
70 default=None,
71 action=ActionNoYes,
72 )
73 p.add_argument(
74 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI"
75 )
76 p.add_argument(
77 "--credentials",
78 help="Load credentials to use from a given file",
79 type=Path,
80 )
81 p.add_argument(
82 "--no-set-hostname",
83 help="Suppress transmission of Uri-Host even if the host name is not an IP literal",
84 dest="set_hostname",
85 action="store_false",
86 default=True,
87 )
88 p.add_argument(
89 "--no-sec",
90 # Can be "Send request without any security" once it actually does
91 # anything; until then, it's fine as a no-op to not impede changing
92 # scripts later.
93 help=argparse.SUPPRESS,
94 dest="sec",
95 action="store_false",
96 default=None,
97 )
100def augment_parser_for_interactive(p, *, prescreen=False):
101 p.add_argument(
102 "--non",
103 help="Send request without reliable transport (e.g. over UDP: as non-confirmable (NON) message)",
104 action="store_true",
105 )
106 p.add_argument(
107 "-m",
108 "--method",
109 help="Name or number of request method to use (default: %(default)s)",
110 default="GET",
111 )
112 p.add_argument(
113 "--observe", help="Register an observation on the resource", action="store_true"
114 )
115 p.add_argument(
116 "--observe-exec",
117 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin",
118 metavar="CMD",
119 )
120 p.add_argument(
121 "--accept",
122 help="Content format to request",
123 metavar="MIME",
124 )
125 p.add_argument(
126 "--payload",
127 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.",
128 metavar="X",
129 )
130 p.add_argument(
131 "--payload-initial-szx",
132 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)",
133 metavar="SZX",
134 type=int,
135 )
136 p.add_argument(
137 "--content-format",
138 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.",
139 metavar="MIME",
140 )
141 p.add_argument(
142 "url",
143 nargs="?" if prescreen else None,
144 help="CoAP address to fetch",
145 )
148def build_parser(*, use_global=True, use_interactive=True, prescreen=False):
149 p = argparse.ArgumentParser(description=__doc__, add_help=not prescreen)
150 if prescreen:
151 p.add_argument("--help", action="store_true")
152 if use_global:
153 augment_parser_for_global(p, prescreen=prescreen)
154 augment_parser_for_either(p)
155 if use_interactive:
156 augment_parser_for_interactive(p, prescreen=prescreen)
158 return p
161def configure_logging(verbosity, color):
162 if color is not False:
163 try:
164 import colorlog
165 except ImportError:
166 color = False
167 else:
168 colorlog.basicConfig()
169 if not color:
170 logging.basicConfig()
172 if verbosity <= -2:
173 logging.getLogger("coap").setLevel(logging.CRITICAL + 1)
174 elif verbosity == -1:
175 logging.getLogger("coap").setLevel(logging.ERROR)
176 elif verbosity == 0:
177 logging.getLogger("coap").setLevel(logging.WARNING)
178 elif verbosity == 1:
179 logging.getLogger("coap").setLevel(logging.WARNING)
180 logging.getLogger("coap.aiocoap-client").setLevel(logging.INFO)
181 elif verbosity == 2:
182 logging.getLogger("coap").setLevel(logging.INFO)
183 elif verbosity >= 3:
184 logging.getLogger("coap").setLevel(logging.DEBUG)
185 elif verbosity >= 4:
186 logging.getLogger("coap").setLevel(0)
188 log.debug("Logging configured.")
191def colored(text, options, tokenlambda):
192 """Apply pygments based coloring if options.color is set. Tokelambda is a
193 callback to which pygments.token is passed and which returns a token type;
194 this makes it easy to not need to conditionally react to pygments' possible
195 absence in all color locations."""
196 if not options.color:
197 return str(text)
199 from pygments.formatters import TerminalFormatter
200 from pygments import token, format
202 return format(
203 [(tokenlambda(token), str(text))],
204 TerminalFormatter(),
205 )
208def incoming_observation(options, response):
209 log.info("Received Observe notification:")
210 for line in message_to_text(response, "from"):
211 log.info(line)
213 if options.observe_exec:
214 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE)
215 # FIXME this blocks
216 p.communicate(response.payload)
217 else:
218 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc))
219 sys.stdout.write("\n")
220 if response.code.is_successful():
221 present(response, options, file=sys.stderr)
222 else:
223 print(
224 colored(
225 response.code, options, lambda token: token.Token.Generic.Error
226 ),
227 file=sys.stderr,
228 )
229 if response.payload:
230 present(response, options, file=sys.stderr)
231 sys.stdout.flush()
234def apply_credentials(context, credentials, errfn):
235 try:
236 if credentials.suffix == ".json":
237 import json
239 context.client_credentials.load_from_dict(json.load(credentials.open("rb")))
240 elif credentials.suffix == ".diag":
241 try:
242 import cbor_diag
243 import cbor2
244 except ImportError:
245 raise errfn(
246 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package"
247 )
248 context.client_credentials.load_from_dict(
249 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read()))
250 )
251 else:
252 raise errfn(
253 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix)
254 )
255 except FileNotFoundError as e:
256 raise errfn("Credential file not found: %s" % e.filename)
257 except (OSError, ValueError) as e:
258 # Any of the parsers could reasonably raise those, and while they don't
259 # have HelpfulError support, they should still not render a backtrace
260 # but a proper CLI error.
261 raise errfn("Processing credential file: %s" % e)
264def message_to_text(m, direction):
265 """Convert a message to a text form similar to how they are shown in RFCs.
267 Refactoring this into a message method will need to address the direction
268 discovery eventually."""
269 if m.remote is None:
270 # This happens when unprocessable remotes are, eg. putting in an HTTP URI
271 yield f"{m.code} {direction} (unknown)"
272 else:
273 # FIXME: Update when transport-indication is available
274 # FIXME: This is slightly wrong because it does not account for what ProxyRedirector does
275 yield f"{m.code} {direction} {m.remote.scheme}://{m.remote.hostinfo}"
276 for opt in m.opt.option_list():
277 if hasattr(opt.number, "name"):
278 yield f"- {opt.number.name_printable} ({opt.number.value}): {opt.value!r}"
279 else:
280 yield f"- {opt.number.value}: {opt.value!r}"
281 if m.payload:
282 limit = 16
283 if len(m.payload) > limit:
284 yield f"Payload: {m.payload[:limit].hex()}... ({len(m.payload)} bytes total)"
285 else:
286 yield f"Payload: {m.payload[:limit].hex()} ({len(m.payload)} bytes)"
287 else:
288 yield "No payload"
291def present(message, options, file=sys.stdout):
292 """Write a message payload to the output, pretty printing and/or coloring
293 it as configured in the options."""
294 if not options.quiet and (message.opt.location_path or message.opt.location_query):
295 # FIXME: Percent encoding is completely missing; this would be done
296 # most easily with a CRI library
297 location_ref = "/" + "/".join(message.opt.location_path)
298 if message.opt.location_query:
299 location_ref += "?" + "&".join(message.opt.location_query)
300 print(
301 colored(
302 f"Location options indicate new resource: {location_ref}",
303 options,
304 lambda token: token.Token.Generic.Inserted,
305 ),
306 file=sys.stderr,
307 )
309 if not message.payload:
310 return
312 payload = None
314 cf = message.opt.content_format
316 if cf is None:
317 if message.code.is_successful():
318 cf = message.request.opt.content_format
319 else:
320 cf = ContentFormat.TEXT
322 if cf is not None and cf.is_known():
323 mime = cf.media_type
324 else:
325 mime = "application/octet-stream"
326 if options.pretty_print:
327 from aiocoap.util.prettyprint import pretty_print
329 prettyprinted = pretty_print(message)
330 if prettyprinted is not None:
331 (infos, mime, payload) = prettyprinted
332 if not options.quiet:
333 for i in infos:
334 print(
335 colored("# " + i, options, lambda token: token.Comment),
336 file=sys.stderr,
337 )
339 color = options.color
340 if color:
341 from aiocoap.util.prettyprint import lexer_for_mime
342 import pygments
344 try:
345 lexer = lexer_for_mime(mime)
346 except pygments.util.ClassNotFound:
347 color = False
349 if color and payload is None:
350 # Coloring requires a unicode-string style payload, either from the
351 # mime type or from the pretty printer.
352 try:
353 payload = message.payload.decode("utf8")
354 except UnicodeDecodeError:
355 color = False
357 if color:
358 from pygments.formatters import TerminalFormatter
359 from pygments import highlight
361 highlit = highlight(
362 payload,
363 lexer,
364 TerminalFormatter(),
365 )
366 # The TerminalFormatter already adds an end-of-line character, not
367 # trying to add one for any missing trailing newlines.
368 print(highlit, file=file, end="")
369 file.flush()
370 else:
371 if payload is None:
372 file.buffer.write(message.payload)
373 if file.isatty() and message.payload[-1:] != b"\n":
374 file.write("\n")
375 else:
376 file.write(payload)
377 if file.isatty() and payload[-1] != "\n":
378 file.write("\n")
381async def single_request(args, context, globalopts=None):
382 parser = build_parser(use_global=globalopts is None)
383 options = parser.parse_args(args, copy.copy(globalopts))
385 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules()
386 if pretty_print_modules and (options.color is True or options.pretty_print is True):
387 parser.error(
388 "Color and pretty printing require the following"
389 " additional module(s) to be installed: %s"
390 % ", ".join(pretty_print_modules)
391 )
392 if options.color is None:
393 options.color = sys.stdout.isatty() and not pretty_print_modules
394 if options.pretty_print is None:
395 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules
397 try:
398 try:
399 code = getattr(
400 aiocoap.numbers.codes.Code,
401 options.method.upper().replace("IPATCH", "iPATCH"),
402 )
403 except AttributeError:
404 try:
405 code = aiocoap.numbers.codes.Code(int(options.method))
406 except ValueError:
407 raise parser.error("Unknown method")
409 if options.credentials is not None:
410 apply_credentials(context, options.credentials, parser.error)
412 request = aiocoap.Message(
413 code=code,
414 transport_tuning=aiocoap.Unreliable if options.non else aiocoap.Reliable,
415 )
416 request.set_request_uri(options.url, set_uri_host=options.set_hostname)
418 if options.accept:
419 try:
420 request.opt.accept = ContentFormat(int(options.accept))
421 except ValueError:
422 try:
423 request.opt.accept = ContentFormat.by_media_type(options.accept)
424 except KeyError:
425 raise parser.error("Unknown accept type")
427 if options.observe:
428 request.opt.observe = 0
430 if options.content_format:
431 try:
432 request.opt.content_format = ContentFormat(int(options.content_format))
433 except ValueError:
434 try:
435 request.opt.content_format = ContentFormat.by_media_type(
436 options.content_format
437 )
438 except KeyError:
439 raise parser.error("Unknown content format")
441 if options.payload:
442 if options.payload.startswith("@"):
443 filename = options.payload[1:]
444 if filename == "-":
445 f = sys.stdin.buffer
446 else:
447 f = open(filename, "rb")
448 try:
449 request.payload = f.read()
450 except OSError as e:
451 raise parser.error("File could not be opened: %s" % e)
452 else:
453 request_classification = contenttype.categorize(
454 request.opt.content_format.media_type
455 if request.opt.content_format is not None
456 and request.opt.content_format.is_known()
457 else ""
458 )
459 if request_classification in ("cbor", "cbor-seq"):
460 try:
461 import cbor_diag
462 except ImportError as e:
463 raise parser.error(f"CBOR recoding not available ({e})")
465 try:
466 encoded = cbor_diag.diag2cbor(options.payload)
467 except ValueError as e:
468 raise parser.error(
469 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}"
470 )
472 if request_classification == "cbor-seq":
473 try:
474 import cbor2
475 except ImportError as e:
476 raise parser.error(
477 f"CBOR sequence recoding not available ({e})"
478 )
479 decoded = cbor2.loads(encoded)
480 if not isinstance(decoded, list):
481 raise parser.error(
482 "CBOR sequence recoding requires an array as the top-level element."
483 )
484 request.payload = b"".join(cbor2.dumps(d) for d in decoded)
485 else:
486 request.payload = encoded
487 else:
488 request.payload = options.payload.encode("utf8")
490 if options.payload_initial_szx is not None:
491 request.remote.maximum_block_size_exp = options.payload_initial_szx
493 if options.proxy is None or options.proxy in ("none", "", "-"):
494 interface = context
495 else:
496 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context)
498 requested_uri = request.get_request_uri()
500 log.info("Sending request:")
501 for line in message_to_text(request, "to"):
502 log.info(line)
504 requester = interface.request(request)
506 response_data = await requester.response
508 log.info("Received response:")
509 for line in message_to_text(response_data, "from"):
510 log.info(line)
512 response_uri = response_data.get_request_uri()
513 if requested_uri != response_uri:
514 print(
515 colored(
516 f"Response arrived from different address; base URI is {response_uri}",
517 options,
518 lambda token: token.Generic.Inserted,
519 ),
520 file=sys.stderr,
521 )
522 if response_data.code.is_successful():
523 present(response_data, options)
524 else:
525 print(
526 colored(response_data.code, options, lambda token: token.Generic.Error),
527 file=sys.stderr,
528 )
529 present(response_data, options, file=sys.stderr)
530 sys.exit(1)
532 if options.observe:
533 try:
534 async for notification in requester.observation:
535 incoming_observation(options, notification)
536 except Exception as exit_reason:
537 print("Observation is over: %r" % (exit_reason,), file=sys.stderr)
538 except aiocoap.error.HelpfulError as e:
539 print(str(e), file=sys.stderr)
540 extra_help = e.extra_help(
541 hints=dict(
542 original_uri=options.url,
543 request=request,
544 )
545 )
546 if extra_help:
547 print("Debugging hint:", extra_help, file=sys.stderr)
548 sys.exit(1)
549 # Fallback while not all backends raise NetworkErrors
550 except OSError as e:
551 text = str(e)
552 if not text:
553 text = repr(e)
554 if not text:
555 # eg ConnectionResetError flying out of a misconfigured SSL server
556 text = type(e)
557 print(
558 "Warning: OS errors should not be raised this way any more.",
559 file=sys.stderr,
560 )
561 # not telling what to do precisely: the form already tells users to
562 # include `aiocoap.cli.defaults` output, which is exactly what we
563 # need.
564 print(
565 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.",
566 file=sys.stderr,
567 )
568 print("Error:", text, file=sys.stderr)
569 sys.exit(1)
572async def single_request_with_context(args):
573 """Wrapper around single_request until sync_main gets made fully async, and
574 async context managers are used to manage contexts."""
575 context = await aiocoap.Context.create_client_context()
576 try:
577 await single_request(args, context)
578 finally:
579 await context.shutdown()
582interactive_expecting_keyboard_interrupt = None
585async def interactive(globalopts):
586 global interactive_expecting_keyboard_interrupt
587 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future()
589 context = await aiocoap.Context.create_client_context()
591 while True:
592 try:
593 # when http://bugs.python.org/issue22412 is resolved, use that instead
594 line = await asyncio.get_event_loop().run_in_executor(
595 None, lambda: input("aiocoap> ")
596 )
597 except EOFError:
598 line = "exit"
599 line = shlex.split(line)
600 if not line:
601 continue
602 if line in (["help"], ["?"]):
603 line = ["--help"]
604 if line in (["quit"], ["q"], ["exit"]):
605 break
607 async def single_request_noexit(*args, **kwargs):
608 """Protects a run against the exit automatically generated by
609 argparse errors or help"""
610 try:
611 # We could also set exit_on_error, but this way we do get the
612 # original error code and can show that; might still revisit.
613 await single_request(*args, **kwargs)
614 except SystemExit as e:
615 return e.code
616 else:
617 return 0
619 current_task = asyncio.create_task(
620 single_request_noexit(line, context=context, globalopts=globalopts),
621 name="Interactive prompt command %r" % line,
622 )
623 interactive_expecting_keyboard_interrupt = (
624 asyncio.get_event_loop().create_future()
625 )
627 done, pending = await asyncio.wait(
628 [current_task, interactive_expecting_keyboard_interrupt],
629 return_when=asyncio.FIRST_COMPLETED,
630 )
632 if current_task not in done:
633 current_task.cancel()
634 else:
635 try:
636 code = await current_task
637 except Exception as e:
638 print("Unhandled exception raised: %s" % (e,))
639 if code != 0:
640 print("Exit code: %d" % code, file=sys.stderr)
642 await context.shutdown()
645async def main(args=None):
646 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet
647 if args is None:
648 args = sys.argv[1:]
650 # This one is tolerant and doesn't even terminate with --help, so that
651 # --help and --interactive --help can do the right thing.
652 first_parser = build_parser(prescreen=True)
653 first_args = first_parser.parse_args(args)
655 configure_logging(
656 (first_args.verbose or 0) - (first_args.quiet or 0), first_args.color
657 )
659 if not first_args.interactive:
660 try:
661 await single_request_with_context(args)
662 except asyncio.exceptions.CancelledError:
663 sys.exit(3)
664 else:
665 global_parser = build_parser(use_interactive=False)
666 globalopts = global_parser.parse_args(args)
668 loop = asyncio.get_event_loop()
670 def ctrl_c():
671 try:
672 interactive_expecting_keyboard_interrupt.set_result(None)
673 except asyncio.exceptions.InvalidStateError:
674 # Too many Ctlr-C before the program could clean up
675 sys.exit(3)
677 loop.add_signal_handler(signal.SIGINT, ctrl_c)
679 await interactive(globalopts)
682def sync_main(args=None):
683 asyncio.run(main(args=args))
686if __name__ == "__main__":
687 sync_main()