Coverage for aiocoap/cli/client.py: 73%
361 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 23:12 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 23:12 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers"""
7import copy
8import sys
9import asyncio
10import argparse
11import logging
12import signal
13import subprocess
14from pathlib import Path
16import shlex
18# even though not used directly, this has side effects on the input() function
19# used in interactive mode
20try:
21 import readline # noqa: F401
22except ImportError:
23 pass # that's normal on some platforms, and ok since it's just a usability enhancement
25import aiocoap
26import aiocoap.defaults
27import aiocoap.meta
28import aiocoap.proxy.client
29from aiocoap.util import contenttype
30from aiocoap.util.cli import ActionNoYes
31from aiocoap.numbers import ContentFormat
33log = logging.getLogger("coap.aiocoap-client")
36def augment_parser_for_global(p, *, prescreen=False):
37 p.add_argument(
38 "-v",
39 "--verbose",
40 help="Increase the debug output",
41 action="count",
42 )
43 p.add_argument(
44 "-q",
45 "--quiet",
46 help="Decrease the debug output",
47 action="count",
48 )
49 p.add_argument(
50 "--version", action="version", version="%(prog)s " + aiocoap.meta.version
51 )
53 p.add_argument(
54 "--interactive",
55 help="Enter interactive mode. Combine with --help or run `help` interactively to see which options apply where; some can be used globally and overwritten locally.",
56 action="store_true",
57 )
60def augment_parser_for_either(p):
61 p.add_argument(
62 "--color",
63 help="Color output (default on TTYs if all required modules are installed)",
64 default=None,
65 action=ActionNoYes,
66 )
67 p.add_argument(
68 "--pretty-print",
69 help="Pretty-print known content formats (default on TTYs if all required modules are installed)",
70 default=None,
71 action=ActionNoYes,
72 )
73 p.add_argument(
74 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI"
75 )
76 p.add_argument(
77 "--credentials",
78 help="Load credentials to use from a given file",
79 type=Path,
80 )
81 p.add_argument(
82 "--no-set-hostname",
83 help="Suppress transmission of Uri-Host even if the host name is not an IP literal",
84 dest="set_hostname",
85 action="store_false",
86 default=True,
87 )
88 p.add_argument(
89 "--no-sec",
90 # Can be "Send request without any security" once it actually does
91 # anything; until then, it's fine as a no-op to not impede changing
92 # scripts later.
93 help=argparse.SUPPRESS,
94 dest="sec",
95 action="store_false",
96 default=None,
97 )
100def augment_parser_for_interactive(p, *, prescreen=False):
101 p.add_argument(
102 "--non",
103 help="Send request as non-confirmable (NON) message",
104 action="store_true",
105 )
106 p.add_argument(
107 "-m",
108 "--method",
109 help="Name or number of request method to use (default: %(default)s)",
110 default="GET",
111 )
112 p.add_argument(
113 "--observe", help="Register an observation on the resource", action="store_true"
114 )
115 p.add_argument(
116 "--observe-exec",
117 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin",
118 metavar="CMD",
119 )
120 p.add_argument(
121 "--accept",
122 help="Content format to request",
123 metavar="MIME",
124 )
125 p.add_argument(
126 "--payload",
127 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.",
128 metavar="X",
129 )
130 p.add_argument(
131 "--payload-initial-szx",
132 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)",
133 metavar="SZX",
134 type=int,
135 )
136 p.add_argument(
137 "--content-format",
138 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.",
139 metavar="MIME",
140 )
141 p.add_argument(
142 "url",
143 nargs="?" if prescreen else None,
144 help="CoAP address to fetch",
145 )
148def build_parser(*, use_global=True, use_interactive=True, prescreen=False):
149 p = argparse.ArgumentParser(description=__doc__, add_help=not prescreen)
150 if prescreen:
151 p.add_argument("--help", action="store_true")
152 if use_global:
153 augment_parser_for_global(p, prescreen=prescreen)
154 augment_parser_for_either(p)
155 if use_interactive:
156 augment_parser_for_interactive(p, prescreen=prescreen)
158 return p
161def configure_logging(verbosity, color):
162 if color is not False:
163 try:
164 import colorlog
165 except ImportError:
166 color = False
167 else:
168 colorlog.basicConfig()
169 if not color:
170 logging.basicConfig()
172 if verbosity <= -2:
173 logging.getLogger("coap").setLevel(logging.CRITICAL + 1)
174 elif verbosity == -1:
175 logging.getLogger("coap").setLevel(logging.ERROR)
176 elif verbosity == 0:
177 logging.getLogger("coap").setLevel(logging.WARNING)
178 elif verbosity == 1:
179 logging.getLogger("coap").setLevel(logging.WARNING)
180 logging.getLogger("coap.aiocoap-client").setLevel(logging.INFO)
181 elif verbosity == 2:
182 logging.getLogger("coap").setLevel(logging.INFO)
183 elif verbosity >= 3:
184 logging.getLogger("coap").setLevel(logging.DEBUG)
185 elif verbosity >= 4:
186 logging.getLogger("coap").setLevel(0)
188 log.debug("Logging configured.")
191def colored(text, options, tokenlambda):
192 """Apply pygments based coloring if options.color is set. Tokelambda is a
193 callback to which pygments.token is passed and which returns a token type;
194 this makes it easy to not need to conditionally react to pygments' possible
195 absence in all color locations."""
196 if not options.color:
197 return str(text)
199 from pygments.formatters import TerminalFormatter
200 from pygments import token, format
202 return format(
203 [(tokenlambda(token), str(text))],
204 TerminalFormatter(),
205 )
208def incoming_observation(options, response):
209 log.info("Received Observe notification:")
210 for line in message_to_text(response, "from"):
211 log.info(line)
213 if options.observe_exec:
214 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE)
215 # FIXME this blocks
216 p.communicate(response.payload)
217 else:
218 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc))
219 if response.code.is_successful():
220 present(response, options, file=sys.stderr)
221 else:
222 sys.stdout.flush()
223 print(
224 colored(
225 response.code, options, lambda token: token.Token.Generic.Error
226 ),
227 file=sys.stderr,
228 )
229 if response.payload:
230 present(response, options, file=sys.stderr)
233def apply_credentials(context, credentials, errfn):
234 try:
235 if credentials.suffix == ".json":
236 import json
238 context.client_credentials.load_from_dict(json.load(credentials.open("rb")))
239 elif credentials.suffix == ".diag":
240 try:
241 import cbor_diag
242 import cbor2
243 except ImportError:
244 raise errfn(
245 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package"
246 )
247 context.client_credentials.load_from_dict(
248 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read()))
249 )
250 else:
251 raise errfn(
252 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix)
253 )
254 except FileNotFoundError as e:
255 raise errfn("Credential file not found: %s" % e.filename)
256 except (OSError, ValueError) as e:
257 # Any of the parsers could reasonably raise those, and while they don't
258 # have HelpfulError support, they should still not render a backtrace
259 # but a proper CLI error.
260 raise errfn("Processing credential file: %s" % e)
263def message_to_text(m, direction):
264 """Convert a message to a text form similar to how they are shown in RFCs.
266 Refactoring this into a message method will need to address the direction
267 discovery eventually."""
268 if m.remote is None:
269 # This happens when unprocessable remotes are, eg. putting in an HTTP URI
270 yield f"{m.code} {direction} (unknown)"
271 else:
272 # FIXME: Update when transport-indication is available
273 # FIXME: This is slightly wrong because it does not account for what ProxyRedirector does
274 yield f"{m.code} {direction} {m.remote.scheme}://{m.remote.hostinfo}"
275 for opt in m.opt.option_list():
276 if hasattr(opt.number, "name"):
277 yield f"- {opt.number.name_printable} ({opt.number.value}): {opt.value!r}"
278 else:
279 yield f"- {opt.number.value}: {opt.value!r}"
280 if m.payload:
281 limit = 16
282 if len(m.payload) > limit:
283 yield f"Payload: {m.payload[:limit].hex()}... ({len(m.payload)} bytes total)"
284 else:
285 yield f"Payload: {m.payload[:limit].hex()} ({len(m.payload)} bytes)"
286 else:
287 yield "No payload"
290def present(message, options, file=sys.stdout):
291 """Write a message payload to the output, pretty printing and/or coloring
292 it as configured in the options."""
293 if not options.quiet and (message.opt.location_path or message.opt.location_query):
294 # FIXME: Percent encoding is completely missing; this would be done
295 # most easily with a CRI library
296 location_ref = "/" + "/".join(message.opt.location_path)
297 if message.opt.location_query:
298 location_ref += "?" + "&".join(message.opt.location_query)
299 print(
300 colored(
301 f"Location options indicate new resource: {location_ref}",
302 options,
303 lambda token: token.Token.Generic.Inserted,
304 ),
305 file=sys.stderr,
306 )
308 if not message.payload:
309 return
311 payload = None
313 cf = message.opt.content_format or message.request.opt.content_format
314 if cf is not None and cf.is_known():
315 mime = cf.media_type
316 else:
317 mime = "application/octet-stream"
318 if options.pretty_print:
319 from aiocoap.util.prettyprint import pretty_print
321 prettyprinted = pretty_print(message)
322 if prettyprinted is not None:
323 (infos, mime, payload) = prettyprinted
324 if not options.quiet:
325 for i in infos:
326 print(
327 colored("# " + i, options, lambda token: token.Comment),
328 file=sys.stderr,
329 )
331 color = options.color
332 if color:
333 from aiocoap.util.prettyprint import lexer_for_mime
334 import pygments
336 try:
337 lexer = lexer_for_mime(mime)
338 except pygments.util.ClassNotFound:
339 color = False
341 if color and payload is None:
342 # Coloring requires a unicode-string style payload, either from the
343 # mime type or from the pretty printer.
344 try:
345 payload = message.payload.decode("utf8")
346 except UnicodeDecodeError:
347 color = False
349 if color:
350 from pygments.formatters import TerminalFormatter
351 from pygments import highlight
353 highlit = highlight(
354 payload,
355 lexer,
356 TerminalFormatter(),
357 )
358 # The TerminalFormatter already adds an end-of-line character, not
359 # trying to add one for any missing trailing newlines.
360 print(highlit, file=file, end="")
361 file.flush()
362 else:
363 if payload is None:
364 file.buffer.write(message.payload)
365 if file.isatty() and message.payload[-1:] != b"\n":
366 file.write("\n")
367 else:
368 file.write(payload)
369 if file.isatty() and payload[-1] != "\n":
370 file.write("\n")
373async def single_request(args, context, globalopts=None):
374 parser = build_parser(use_global=globalopts is None)
375 options = parser.parse_args(args, copy.copy(globalopts))
377 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules()
378 if pretty_print_modules and (options.color is True or options.pretty_print is True):
379 parser.error(
380 "Color and pretty printing require the following"
381 " additional module(s) to be installed: %s"
382 % ", ".join(pretty_print_modules)
383 )
384 if options.color is None:
385 options.color = sys.stdout.isatty() and not pretty_print_modules
386 if options.pretty_print is None:
387 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules
389 try:
390 try:
391 code = getattr(
392 aiocoap.numbers.codes.Code,
393 options.method.upper().replace("IPATCH", "iPATCH"),
394 )
395 except AttributeError:
396 try:
397 code = aiocoap.numbers.codes.Code(int(options.method))
398 except ValueError:
399 raise parser.error("Unknown method")
401 if options.credentials is not None:
402 apply_credentials(context, options.credentials, parser.error)
404 request = aiocoap.Message(
405 code=code, mtype=aiocoap.NON if options.non else aiocoap.CON
406 )
407 request.set_request_uri(options.url, set_uri_host=options.set_hostname)
409 if options.accept:
410 try:
411 request.opt.accept = ContentFormat(int(options.accept))
412 except ValueError:
413 try:
414 request.opt.accept = ContentFormat.by_media_type(options.accept)
415 except KeyError:
416 raise parser.error("Unknown accept type")
418 if options.observe:
419 request.opt.observe = 0
420 observation_is_over = asyncio.get_event_loop().create_future()
422 if options.content_format:
423 try:
424 request.opt.content_format = ContentFormat(int(options.content_format))
425 except ValueError:
426 try:
427 request.opt.content_format = ContentFormat.by_media_type(
428 options.content_format
429 )
430 except KeyError:
431 raise parser.error("Unknown content format")
433 if options.payload:
434 if options.payload.startswith("@"):
435 filename = options.payload[1:]
436 if filename == "-":
437 f = sys.stdin.buffer
438 else:
439 f = open(filename, "rb")
440 try:
441 request.payload = f.read()
442 except OSError as e:
443 raise parser.error("File could not be opened: %s" % e)
444 else:
445 request_classification = contenttype.categorize(
446 request.opt.content_format.media_type
447 if request.opt.content_format is not None
448 and request.opt.content_format.is_known()
449 else ""
450 )
451 if request_classification in ("cbor", "cbor-seq"):
452 try:
453 import cbor_diag
454 except ImportError as e:
455 raise parser.error(f"CBOR recoding not available ({e})")
457 try:
458 encoded = cbor_diag.diag2cbor(options.payload)
459 except ValueError as e:
460 raise parser.error(
461 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}"
462 )
464 if request_classification == "cbor-seq":
465 try:
466 import cbor2
467 except ImportError as e:
468 raise parser.error(
469 f"CBOR sequence recoding not available ({e})"
470 )
471 decoded = cbor2.loads(encoded)
472 if not isinstance(decoded, list):
473 raise parser.error(
474 "CBOR sequence recoding requires an array as the top-level element."
475 )
476 request.payload = b"".join(cbor2.dumps(d) for d in decoded)
477 else:
478 request.payload = encoded
479 else:
480 request.payload = options.payload.encode("utf8")
482 if options.payload_initial_szx is not None:
483 request.remote.maximum_block_size_exp = options.payload_initial_szx
485 if options.proxy is None or options.proxy in ("none", "", "-"):
486 interface = context
487 else:
488 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context)
490 requested_uri = request.get_request_uri()
492 log.info("Sending request:")
493 for line in message_to_text(request, "to"):
494 log.info(line)
496 requester = interface.request(request)
498 if options.observe:
499 requester.observation.register_errback(observation_is_over.set_result)
500 requester.observation.register_callback(
501 lambda data, options=options: incoming_observation(options, data)
502 )
504 try:
505 response_data = await requester.response
506 finally:
507 if not requester.response.done():
508 requester.response.cancel()
509 if options.observe and not requester.observation.cancelled:
510 requester.observation.cancel()
512 log.info("Received response:")
513 for line in message_to_text(response_data, "from"):
514 log.info(line)
516 response_uri = response_data.get_request_uri()
517 if requested_uri != response_uri:
518 print(
519 colored(
520 f"Response arrived from different address; base URI is {response_uri}",
521 options,
522 lambda token: token.Generic.Inserted,
523 ),
524 file=sys.stderr,
525 )
526 if response_data.code.is_successful():
527 present(response_data, options)
528 else:
529 print(
530 colored(response_data.code, options, lambda token: token.Generic.Error),
531 file=sys.stderr,
532 )
533 present(response_data, options, file=sys.stderr)
534 sys.exit(1)
536 if options.observe:
537 exit_reason = await observation_is_over
538 print("Observation is over: %r" % (exit_reason,), file=sys.stderr)
539 except aiocoap.error.HelpfulError as e:
540 print(str(e), file=sys.stderr)
541 extra_help = e.extra_help(
542 hints=dict(
543 original_uri=options.url,
544 request=request,
545 )
546 )
547 if extra_help:
548 print("Debugging hint:", extra_help, file=sys.stderr)
549 sys.exit(1)
550 # Fallback while not all backends raise NetworkErrors
551 except OSError as e:
552 text = str(e)
553 if not text:
554 text = repr(e)
555 if not text:
556 # eg ConnectionResetError flying out of a misconfigured SSL server
557 text = type(e)
558 print(
559 "Warning: OS errors should not be raised this way any more.",
560 file=sys.stderr,
561 )
562 # not telling what to do precisely: the form already tells users to
563 # include `aiocoap.cli.defaults` output, which is exactly what we
564 # need.
565 print(
566 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.",
567 file=sys.stderr,
568 )
569 print("Error:", text, file=sys.stderr)
570 sys.exit(1)
573async def single_request_with_context(args):
574 """Wrapper around single_request until sync_main gets made fully async, and
575 async context managers are used to manage contexts."""
576 context = await aiocoap.Context.create_client_context()
577 try:
578 await single_request(args, context)
579 finally:
580 await context.shutdown()
583interactive_expecting_keyboard_interrupt = None
586async def interactive(globalopts):
587 global interactive_expecting_keyboard_interrupt
588 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future()
590 context = await aiocoap.Context.create_client_context()
592 while True:
593 try:
594 # when http://bugs.python.org/issue22412 is resolved, use that instead
595 line = await asyncio.get_event_loop().run_in_executor(
596 None, lambda: input("aiocoap> ")
597 )
598 except EOFError:
599 line = "exit"
600 line = shlex.split(line)
601 if not line:
602 continue
603 if line in (["help"], ["?"]):
604 line = ["--help"]
605 if line in (["quit"], ["q"], ["exit"]):
606 break
608 async def single_request_noexit(*args, **kwargs):
609 """Protects a run against the exit automatically generated by
610 argparse errors or help"""
611 try:
612 # We could also set exit_on_error, but this way we do get the
613 # original error code and can show that; might still revisit.
614 await single_request(*args, **kwargs)
615 except SystemExit as e:
616 return e.code
617 else:
618 return 0
620 current_task = asyncio.create_task(
621 single_request_noexit(line, context=context, globalopts=globalopts),
622 name="Interactive prompt command %r" % line,
623 )
624 interactive_expecting_keyboard_interrupt = (
625 asyncio.get_event_loop().create_future()
626 )
628 done, pending = await asyncio.wait(
629 [current_task, interactive_expecting_keyboard_interrupt],
630 return_when=asyncio.FIRST_COMPLETED,
631 )
633 if current_task not in done:
634 current_task.cancel()
635 else:
636 try:
637 code = await current_task
638 except Exception as e:
639 print("Unhandled exception raised: %s" % (e,))
640 if code != 0:
641 print("Exit code: %d" % code, file=sys.stderr)
643 await context.shutdown()
646async def main(args=None):
647 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet
648 if args is None:
649 args = sys.argv[1:]
651 # This one is tolerant and doesn't even terminate with --help, so that
652 # --help and --interactive --help can do the right thing.
653 first_parser = build_parser(prescreen=True)
654 first_args = first_parser.parse_args(args)
656 configure_logging(
657 (first_args.verbose or 0) - (first_args.quiet or 0), first_args.color
658 )
660 if not first_args.interactive:
661 try:
662 await single_request_with_context(args)
663 except asyncio.exceptions.CancelledError:
664 sys.exit(3)
665 else:
666 global_parser = build_parser(use_interactive=False)
667 globalopts = global_parser.parse_args(args)
669 loop = asyncio.get_event_loop()
671 def ctrl_c():
672 try:
673 interactive_expecting_keyboard_interrupt.set_result(None)
674 except asyncio.exceptions.InvalidStateError:
675 # Too many Ctlr-C before the program could clean up
676 sys.exit(3)
678 loop.add_signal_handler(signal.SIGINT, ctrl_c)
680 await interactive(globalopts)
683def sync_main(args=None):
684 asyncio.run(main(args=args))
687if __name__ == "__main__":
688 sync_main()