Coverage for aiocoap / cli / client.py: 72%
368 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 12:28 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers"""
7import copy
8import sys
9import asyncio
10import argparse
11import logging
12import signal
13import subprocess
14from pathlib import Path
16import shlex
18# even though not used directly, this has side effects on the input() function
19# used in interactive mode
20try:
21 import readline # noqa: F401
22except ImportError:
23 pass # that's normal on some platforms, and ok since it's just a usability enhancement
25import aiocoap
26import aiocoap.config
27import aiocoap.defaults
28import aiocoap.meta
29import aiocoap.proxy.client
30from aiocoap.util import contenttype
31from aiocoap.util.cli import ActionNoYes
32from aiocoap.numbers import ContentFormat
34log = logging.getLogger("coap.aiocoap-client")
37def augment_parser_for_global(p, *, prescreen=False):
38 p.add_argument(
39 "--config",
40 help="Configuration file to load",
41 type=Path,
42 )
43 p.add_argument(
44 "-v",
45 "--verbose",
46 help="Increase the debug output",
47 action="count",
48 )
49 p.add_argument(
50 "-q",
51 "--quiet",
52 help="Decrease the debug output",
53 action="count",
54 )
55 p.add_argument(
56 "--version", action="version", version="%(prog)s " + aiocoap.meta.version
57 )
59 p.add_argument(
60 "--interactive",
61 help="Enter interactive mode. Combine with --help or run `help` interactively to see which options apply where; some can be used globally and overwritten locally.",
62 action="store_true",
63 )
66def augment_parser_for_either(p):
67 p.add_argument(
68 "--color",
69 help="Color output (default on TTYs if all required modules are installed)",
70 default=None,
71 action=ActionNoYes,
72 )
73 p.add_argument(
74 "--pretty-print",
75 help="Pretty-print known content formats (default on TTYs if all required modules are installed)",
76 default=None,
77 action=ActionNoYes,
78 )
79 p.add_argument(
80 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI"
81 )
82 p.add_argument(
83 "--credentials",
84 help="Load credentials to use from a given file",
85 type=Path,
86 )
87 p.add_argument(
88 "--no-set-hostname",
89 help="Suppress transmission of Uri-Host even if the host name is not an IP literal",
90 dest="set_hostname",
91 action="store_false",
92 default=True,
93 )
94 p.add_argument(
95 "--no-sec",
96 # Can be "Send request without any security" once it actually does
97 # anything; until then, it's fine as a no-op to not impede changing
98 # scripts later.
99 help=argparse.SUPPRESS,
100 dest="sec",
101 action="store_false",
102 default=None,
103 )
106def augment_parser_for_interactive(p, *, prescreen=False):
107 p.add_argument(
108 "--non",
109 help="Send request without reliable transport (e.g. over UDP: as non-confirmable (NON) message)",
110 action="store_true",
111 )
112 p.add_argument(
113 "-m",
114 "--method",
115 help="Name or number of request method to use (default: %(default)s)",
116 default="GET",
117 )
118 p.add_argument(
119 "--observe", help="Register an observation on the resource", action="store_true"
120 )
121 p.add_argument(
122 "--observe-exec",
123 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin",
124 metavar="CMD",
125 )
126 p.add_argument(
127 "--accept",
128 help="Content format to request",
129 metavar="MIME",
130 )
131 p.add_argument(
132 "--payload",
133 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.",
134 metavar="X",
135 )
136 p.add_argument(
137 "--payload-initial-szx",
138 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)",
139 metavar="SZX",
140 type=int,
141 )
142 p.add_argument(
143 "--content-format",
144 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.",
145 metavar="MIME",
146 )
147 p.add_argument(
148 "url",
149 nargs="?" if prescreen else None,
150 help="CoAP address to fetch",
151 )
154def build_parser(*, use_global=True, use_interactive=True, prescreen=False):
155 p = argparse.ArgumentParser(description=__doc__, add_help=not prescreen)
156 if prescreen:
157 p.add_argument("--help", action="store_true")
158 if use_global:
159 augment_parser_for_global(p, prescreen=prescreen)
160 augment_parser_for_either(p)
161 if use_interactive:
162 augment_parser_for_interactive(p, prescreen=prescreen)
164 return p
167def configure_logging(verbosity, color):
168 if color is not False:
169 try:
170 import colorlog
171 except ImportError:
172 color = False
173 else:
174 colorlog.basicConfig()
175 if not color:
176 logging.basicConfig()
178 if verbosity <= -2:
179 logging.getLogger("coap").setLevel(logging.CRITICAL + 1)
180 elif verbosity == -1:
181 logging.getLogger("coap").setLevel(logging.ERROR)
182 elif verbosity == 0:
183 logging.getLogger("coap").setLevel(logging.WARNING)
184 elif verbosity == 1:
185 logging.getLogger("coap").setLevel(logging.WARNING)
186 logging.getLogger("coap.aiocoap-client").setLevel(logging.INFO)
187 elif verbosity == 2:
188 logging.getLogger("coap").setLevel(logging.INFO)
189 elif verbosity >= 3:
190 logging.getLogger("coap").setLevel(logging.DEBUG)
191 elif verbosity >= 4:
192 logging.getLogger("coap").setLevel(0)
194 log.debug("Logging configured.")
197def colored(text, options, tokenlambda):
198 """Apply pygments based coloring if options.color is set. Tokelambda is a
199 callback to which pygments.token is passed and which returns a token type;
200 this makes it easy to not need to conditionally react to pygments' possible
201 absence in all color locations."""
202 if not options.color:
203 return str(text)
205 from pygments.formatters import TerminalFormatter
206 from pygments import token, format
208 return format(
209 [(tokenlambda(token), str(text))],
210 TerminalFormatter(),
211 )
214def incoming_observation(options, response):
215 log.info("Received Observe notification:")
216 for line in message_to_text(response, "from"):
217 log.info(line)
219 if options.observe_exec:
220 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE)
221 # FIXME this blocks
222 p.communicate(response.payload)
223 else:
224 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc))
225 sys.stdout.write("\n")
226 if response.code.is_successful():
227 present(response, options, file=sys.stderr)
228 else:
229 print(
230 colored(
231 response.code, options, lambda token: token.Token.Generic.Error
232 ),
233 file=sys.stderr,
234 )
235 if response.payload:
236 present(response, options, file=sys.stderr)
237 sys.stdout.flush()
240def apply_credentials(context, credentials, errfn):
241 try:
242 if credentials.suffix == ".json":
243 import json
245 context.client_credentials.load_from_dict(json.load(credentials.open("rb")))
246 elif credentials.suffix == ".diag":
247 try:
248 import cbor_diag
249 import cbor2
250 except ImportError:
251 raise errfn(
252 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package"
253 )
254 context.client_credentials.load_from_dict(
255 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read()))
256 )
257 else:
258 raise errfn(
259 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix)
260 )
261 except FileNotFoundError as e:
262 raise errfn("Credential file not found: %s" % e.filename)
263 except (OSError, ValueError) as e:
264 # Any of the parsers could reasonably raise those, and while they don't
265 # have HelpfulError support, they should still not render a backtrace
266 # but a proper CLI error.
267 raise errfn("Processing credential file: %s" % e)
270def message_to_text(m, direction):
271 """Convert a message to a text form similar to how they are shown in RFCs.
273 Refactoring this into a message method will need to address the direction
274 discovery eventually."""
275 if m.remote is None:
276 # This happens when unprocessable remotes are, eg. putting in an HTTP URI
277 yield f"{m.code} {direction} (unknown)"
278 else:
279 # FIXME: Update when transport-indication is available
280 # FIXME: This is slightly wrong because it does not account for what ProxyRedirector does
281 yield f"{m.code} {direction} {m.remote.scheme}://{m.remote.hostinfo}"
282 for opt in m.opt.option_list():
283 if hasattr(opt.number, "name"):
284 yield f"- {opt.number.name_printable} ({opt.number.value}): {opt.value!r}"
285 else:
286 yield f"- {opt.number.value}: {opt.value!r}"
287 if m.payload:
288 limit = 16
289 if len(m.payload) > limit:
290 yield f"Payload: {m.payload[:limit].hex()}... ({len(m.payload)} bytes total)"
291 else:
292 yield f"Payload: {m.payload[:limit].hex()} ({len(m.payload)} bytes)"
293 else:
294 yield "No payload"
297def present(message, options, file=sys.stdout):
298 """Write a message payload to the output, pretty printing and/or coloring
299 it as configured in the options."""
300 if not options.quiet and (message.opt.location_path or message.opt.location_query):
301 # FIXME: Percent encoding is completely missing; this would be done
302 # most easily with a CRI library
303 location_ref = "/" + "/".join(message.opt.location_path)
304 if message.opt.location_query:
305 location_ref += "?" + "&".join(message.opt.location_query)
306 print(
307 colored(
308 f"Location options indicate new resource: {location_ref}",
309 options,
310 lambda token: token.Token.Generic.Inserted,
311 ),
312 file=sys.stderr,
313 )
315 if not message.payload:
316 return
318 payload = None
320 cf = message.opt.content_format
322 if cf is None:
323 if message.code.is_successful():
324 cf = message.request.opt.content_format
325 else:
326 cf = ContentFormat.TEXT
328 if cf is not None and cf.is_known():
329 mime = cf.media_type
330 else:
331 mime = "application/octet-stream"
332 if options.pretty_print:
333 from aiocoap.util.prettyprint import pretty_print
335 prettyprinted = pretty_print(message)
336 if prettyprinted is not None:
337 (infos, mime, payload) = prettyprinted
338 if not options.quiet:
339 for i in infos:
340 print(
341 colored("# " + i, options, lambda token: token.Comment),
342 file=sys.stderr,
343 )
345 color = options.color
346 if color:
347 from aiocoap.util.prettyprint import lexer_for_mime
348 import pygments
350 try:
351 lexer = lexer_for_mime(mime)
352 except pygments.util.ClassNotFound:
353 color = False
355 if color and payload is None:
356 # Coloring requires a unicode-string style payload, either from the
357 # mime type or from the pretty printer.
358 try:
359 payload = message.payload.decode("utf8")
360 except UnicodeDecodeError:
361 color = False
363 if color:
364 from pygments.formatters import TerminalFormatter
365 from pygments import highlight
367 highlit = highlight(
368 payload,
369 lexer,
370 TerminalFormatter(),
371 )
372 # The TerminalFormatter already adds an end-of-line character, not
373 # trying to add one for any missing trailing newlines.
374 print(highlit, file=file, end="")
375 file.flush()
376 else:
377 if payload is None:
378 file.buffer.write(message.payload)
379 if file.isatty() and message.payload[-1:] != b"\n":
380 file.write("\n")
381 else:
382 file.write(payload)
383 if file.isatty() and payload[-1] != "\n":
384 file.write("\n")
387async def single_request(args, context, globalopts=None):
388 parser = build_parser(use_global=globalopts is None)
389 options = parser.parse_args(args, copy.copy(globalopts))
391 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules()
392 if pretty_print_modules and (options.color is True or options.pretty_print is True):
393 parser.error(
394 "Color and pretty printing require the following"
395 " additional module(s) to be installed: %s"
396 % ", ".join(pretty_print_modules)
397 )
398 if options.color is None:
399 options.color = sys.stdout.isatty() and not pretty_print_modules
400 if options.pretty_print is None:
401 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules
403 try:
404 try:
405 code = getattr(
406 aiocoap.numbers.codes.Code,
407 options.method.upper().replace("IPATCH", "iPATCH"),
408 )
409 except AttributeError:
410 try:
411 code = aiocoap.numbers.codes.Code(int(options.method))
412 except ValueError:
413 raise parser.error("Unknown method")
415 if options.credentials is not None:
416 apply_credentials(context, options.credentials, parser.error)
418 request = aiocoap.Message(
419 code=code,
420 transport_tuning=aiocoap.Unreliable if options.non else aiocoap.Reliable,
421 )
422 request.set_request_uri(options.url, set_uri_host=options.set_hostname)
424 if options.accept:
425 try:
426 request.opt.accept = ContentFormat(int(options.accept))
427 except ValueError:
428 try:
429 request.opt.accept = ContentFormat.by_media_type(options.accept)
430 except KeyError:
431 raise parser.error("Unknown accept type")
433 if options.observe:
434 request.opt.observe = 0
436 if options.content_format:
437 try:
438 request.opt.content_format = ContentFormat(int(options.content_format))
439 except ValueError:
440 try:
441 request.opt.content_format = ContentFormat.by_media_type(
442 options.content_format
443 )
444 except KeyError:
445 raise parser.error("Unknown content format")
447 if options.payload:
448 if options.payload.startswith("@"):
449 filename = options.payload[1:]
450 if filename == "-":
451 f = sys.stdin.buffer
452 else:
453 f = open(filename, "rb")
454 try:
455 request.payload = f.read()
456 except OSError as e:
457 raise parser.error("File could not be opened: %s" % e)
458 else:
459 request_classification = contenttype.categorize(
460 request.opt.content_format.media_type
461 if request.opt.content_format is not None
462 and request.opt.content_format.is_known()
463 else ""
464 )
465 if request_classification in ("cbor", "cbor-seq"):
466 try:
467 import cbor_diag
468 except ImportError as e:
469 raise parser.error(f"CBOR recoding not available ({e})")
471 try:
472 encoded = cbor_diag.diag2cbor(options.payload)
473 except ValueError as e:
474 raise parser.error(
475 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}"
476 )
478 if request_classification == "cbor-seq":
479 try:
480 import cbor2
481 except ImportError as e:
482 raise parser.error(
483 f"CBOR sequence recoding not available ({e})"
484 )
485 decoded = cbor2.loads(encoded)
486 if not isinstance(decoded, list):
487 raise parser.error(
488 "CBOR sequence recoding requires an array as the top-level element."
489 )
490 request.payload = b"".join(cbor2.dumps(d) for d in decoded)
491 else:
492 request.payload = encoded
493 else:
494 request.payload = options.payload.encode("utf8")
496 if options.payload_initial_szx is not None:
497 request.remote.maximum_block_size_exp = options.payload_initial_szx
499 if options.proxy is None or options.proxy in ("none", "", "-"):
500 interface = context
501 else:
502 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context)
504 requested_uri = request.get_request_uri()
506 log.info("Sending request:")
507 for line in message_to_text(request, "to"):
508 log.info(line)
510 requester = interface.request(request)
512 response_data = await requester.response
514 log.info("Received response:")
515 for line in message_to_text(response_data, "from"):
516 log.info(line)
518 response_uri = response_data.get_request_uri()
519 if requested_uri != response_uri:
520 print(
521 colored(
522 f"Response arrived from different address; base URI is {response_uri}",
523 options,
524 lambda token: token.Generic.Inserted,
525 ),
526 file=sys.stderr,
527 )
528 if response_data.code.is_successful():
529 present(response_data, options)
530 else:
531 print(
532 colored(response_data.code, options, lambda token: token.Generic.Error),
533 file=sys.stderr,
534 )
535 present(response_data, options, file=sys.stderr)
536 sys.exit(1)
538 if options.observe:
539 try:
540 async for notification in requester.observation:
541 incoming_observation(options, notification)
542 except Exception as exit_reason:
543 print("Observation is over: %r" % (exit_reason,), file=sys.stderr)
544 except aiocoap.error.HelpfulError as e:
545 print(str(e), file=sys.stderr)
546 extra_help = e.extra_help(
547 hints=dict(
548 original_uri=options.url,
549 request=request,
550 )
551 )
552 if extra_help:
553 print("Debugging hint:", extra_help, file=sys.stderr)
554 sys.exit(1)
555 # Fallback while not all backends raise NetworkErrors
556 except OSError as e:
557 text = str(e)
558 if not text:
559 text = repr(e)
560 if not text:
561 # eg ConnectionResetError flying out of a misconfigured SSL server
562 text = type(e)
563 print(
564 "Warning: OS errors should not be raised this way any more.",
565 file=sys.stderr,
566 )
567 # not telling what to do precisely: the form already tells users to
568 # include `aiocoap.cli.defaults` output, which is exactly what we
569 # need.
570 print(
571 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.",
572 file=sys.stderr,
573 )
574 print("Error:", text, file=sys.stderr)
575 sys.exit(1)
578async def single_request_with_context(args, config):
579 """Wrapper around single_request until sync_main gets made fully async, and
580 async context managers are used to manage contexts."""
581 context = await aiocoap.Context.create_client_context(transports=config.transport)
582 try:
583 await single_request(args, context)
584 finally:
585 await context.shutdown()
588interactive_expecting_keyboard_interrupt = None
591async def interactive(globalopts, config):
592 global interactive_expecting_keyboard_interrupt
593 interactive_expecting_keyboard_interrupt = (
594 asyncio.get_running_loop().create_future()
595 )
597 context = await aiocoap.Context.create_client_context(transports=config.transport)
599 while True:
600 try:
601 # when http://bugs.python.org/issue22412 is resolved, use that instead
602 line = await asyncio.get_running_loop().run_in_executor(
603 None, lambda: input("aiocoap> ")
604 )
605 except EOFError:
606 line = "exit"
607 line = shlex.split(line)
608 if not line:
609 continue
610 if line in (["help"], ["?"]):
611 line = ["--help"]
612 if line in (["quit"], ["q"], ["exit"]):
613 break
615 async def single_request_noexit(*args, **kwargs):
616 """Protects a run against the exit automatically generated by
617 argparse errors or help"""
618 try:
619 # We could also set exit_on_error, but this way we do get the
620 # original error code and can show that; might still revisit.
621 await single_request(*args, **kwargs)
622 except SystemExit as e:
623 return e.code
624 else:
625 return 0
627 current_task = asyncio.create_task(
628 single_request_noexit(line, context=context, globalopts=globalopts),
629 name="Interactive prompt command %r" % line,
630 )
631 interactive_expecting_keyboard_interrupt = (
632 asyncio.get_running_loop().create_future()
633 )
635 done, pending = await asyncio.wait(
636 [current_task, interactive_expecting_keyboard_interrupt],
637 return_when=asyncio.FIRST_COMPLETED,
638 )
640 if current_task not in done:
641 current_task.cancel()
642 else:
643 try:
644 code = await current_task
645 except Exception as e:
646 print("Unhandled exception raised: %s" % (e,))
647 if code != 0:
648 print("Exit code: %d" % code, file=sys.stderr)
650 await context.shutdown()
653async def main(args=None):
654 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet
655 if args is None:
656 args = sys.argv[1:]
658 # This one is tolerant and doesn't even terminate with --help, so that
659 # --help and --interactive --help can do the right thing.
660 first_parser = build_parser(prescreen=True)
661 first_args = first_parser.parse_args(args)
663 configure_logging(
664 (first_args.verbose or 0) - (first_args.quiet or 0), first_args.color
665 )
667 if first_args.config is not None:
668 try:
669 config = aiocoap.config.Config.load_from_file(first_args.config)
670 except ValueError as e:
671 first_parser.error(f"Error loading file {first_args.config!r}: {e}")
672 else:
673 config = aiocoap.config.Config()
675 if not first_args.interactive:
676 try:
677 await single_request_with_context(args, config)
678 except asyncio.exceptions.CancelledError:
679 sys.exit(3)
680 else:
681 global_parser = build_parser(use_interactive=False)
682 globalopts = global_parser.parse_args(args)
684 loop = asyncio.get_running_loop()
686 def ctrl_c():
687 try:
688 interactive_expecting_keyboard_interrupt.set_result(None)
689 except asyncio.exceptions.InvalidStateError:
690 # Too many Ctlr-C before the program could clean up
691 sys.exit(3)
693 loop.add_signal_handler(signal.SIGINT, ctrl_c)
695 await interactive(globalopts, config)
698def sync_main(args=None):
699 asyncio.run(main(args=args))
702if __name__ == "__main__":
703 sync_main()