Coverage for aiocoap/cli/client.py: 72%
360 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-30 11:17 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-09-30 11:17 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers"""
7import copy
8import sys
9import asyncio
10import argparse
11import logging
12import signal
13import subprocess
14from pathlib import Path
16import shlex
18# even though not used directly, this has side effects on the input() function
19# used in interactive mode
20try:
21 import readline # noqa: F401
22except ImportError:
23 pass # that's normal on some platforms, and ok since it's just a usability enhancement
25import aiocoap
26import aiocoap.defaults
27import aiocoap.meta
28import aiocoap.proxy.client
29from aiocoap.util import contenttype
30from aiocoap.util.cli import ActionNoYes
31from aiocoap.numbers import ContentFormat
33log = logging.getLogger("coap.aiocoap-client")
36def augment_parser_for_global(p, *, prescreen=False):
37 p.add_argument(
38 "-v",
39 "--verbose",
40 help="Increase the debug output",
41 action="count",
42 )
43 p.add_argument(
44 "-q",
45 "--quiet",
46 help="Decrease the debug output",
47 action="count",
48 )
49 p.add_argument(
50 "--version", action="version", version="%(prog)s " + aiocoap.meta.version
51 )
53 p.add_argument(
54 "--interactive",
55 help="Enter interactive mode. Combine with --help or run `help` interactively to see which options apply where; some can be used globally and overwritten locally.",
56 action="store_true",
57 )
60def augment_parser_for_either(p):
61 p.add_argument(
62 "--color",
63 help="Color output (default on TTYs if all required modules are installed)",
64 default=None,
65 action=ActionNoYes,
66 )
67 p.add_argument(
68 "--pretty-print",
69 help="Pretty-print known content formats (default on TTYs if all required modules are installed)",
70 default=None,
71 action=ActionNoYes,
72 )
73 p.add_argument(
74 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI"
75 )
76 p.add_argument(
77 "--credentials",
78 help="Load credentials to use from a given file",
79 type=Path,
80 )
81 p.add_argument(
82 "--no-set-hostname",
83 help="Suppress transmission of Uri-Host even if the host name is not an IP literal",
84 dest="set_hostname",
85 action="store_false",
86 default=True,
87 )
90def augment_parser_for_interactive(p, *, prescreen=False):
91 p.add_argument(
92 "--non",
93 help="Send request as non-confirmable (NON) message",
94 action="store_true",
95 )
96 p.add_argument(
97 "-m",
98 "--method",
99 help="Name or number of request method to use (default: %(default)s)",
100 default="GET",
101 )
102 p.add_argument(
103 "--observe", help="Register an observation on the resource", action="store_true"
104 )
105 p.add_argument(
106 "--observe-exec",
107 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin",
108 metavar="CMD",
109 )
110 p.add_argument(
111 "--accept",
112 help="Content format to request",
113 metavar="MIME",
114 )
115 p.add_argument(
116 "--payload",
117 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.",
118 metavar="X",
119 )
120 p.add_argument(
121 "--payload-initial-szx",
122 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)",
123 metavar="SZX",
124 type=int,
125 )
126 p.add_argument(
127 "--content-format",
128 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.",
129 metavar="MIME",
130 )
131 p.add_argument(
132 "url",
133 nargs="?" if prescreen else None,
134 help="CoAP address to fetch",
135 )
138def build_parser(*, use_global=True, use_interactive=True, prescreen=False):
139 p = argparse.ArgumentParser(description=__doc__, add_help=not prescreen)
140 if prescreen:
141 p.add_argument("--help", action="store_true")
142 if use_global:
143 augment_parser_for_global(p, prescreen=prescreen)
144 augment_parser_for_either(p)
145 if use_interactive:
146 augment_parser_for_interactive(p, prescreen=prescreen)
148 return p
151def configure_logging(verbosity, color):
152 if color is not False:
153 try:
154 import colorlog
155 except ImportError:
156 color = False
157 else:
158 colorlog.basicConfig()
159 if not color:
160 logging.basicConfig()
162 if verbosity <= -2:
163 logging.getLogger("coap").setLevel(logging.CRITICAL + 1)
164 elif verbosity == -1:
165 logging.getLogger("coap").setLevel(logging.ERROR)
166 elif verbosity == 0:
167 logging.getLogger("coap").setLevel(logging.WARNING)
168 elif verbosity == 1:
169 logging.getLogger("coap").setLevel(logging.WARNING)
170 logging.getLogger("coap.aiocoap-client").setLevel(logging.INFO)
171 elif verbosity == 2:
172 logging.getLogger("coap").setLevel(logging.INFO)
173 elif verbosity >= 3:
174 logging.getLogger("coap").setLevel(logging.DEBUG)
175 elif verbosity >= 4:
176 logging.getLogger("coap").setLevel(0)
178 log.debug("Logging configured.")
181def colored(text, options, tokenlambda):
182 """Apply pygments based coloring if options.color is set. Tokelambda is a
183 callback to which pygments.token is passed and which returns a token type;
184 this makes it easy to not need to conditionally react to pygments' possible
185 absence in all color locations."""
186 if not options.color:
187 return str(text)
189 from pygments.formatters import TerminalFormatter
190 from pygments import token, format
192 return format(
193 [(tokenlambda(token), str(text))],
194 TerminalFormatter(),
195 )
198def incoming_observation(options, response):
199 log.info("Received Observe notification:")
200 for line in message_to_text(response, "from"):
201 log.info(line)
203 if options.observe_exec:
204 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE)
205 # FIXME this blocks
206 p.communicate(response.payload)
207 else:
208 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc))
209 if response.code.is_successful():
210 present(response, options, file=sys.stderr)
211 else:
212 sys.stdout.flush()
213 print(
214 colored(
215 response.code, options, lambda token: token.Token.Generic.Error
216 ),
217 file=sys.stderr,
218 )
219 if response.payload:
220 present(response, options, file=sys.stderr)
223def apply_credentials(context, credentials, errfn):
224 try:
225 if credentials.suffix == ".json":
226 import json
228 context.client_credentials.load_from_dict(json.load(credentials.open("rb")))
229 elif credentials.suffix == ".diag":
230 try:
231 import cbor_diag
232 import cbor2
233 except ImportError:
234 raise errfn(
235 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package"
236 )
237 context.client_credentials.load_from_dict(
238 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read()))
239 )
240 else:
241 raise errfn(
242 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix)
243 )
244 except FileNotFoundError as e:
245 raise errfn("Credential file not found: %s" % e.filename)
246 except (OSError, ValueError) as e:
247 # Any of the parsers could reasonably raise those, and while they don't
248 # have HelpfulError support, they should still not render a backtrace
249 # but a proper CLI error.
250 raise errfn("Processing credential file: %s" % e)
253def message_to_text(m, direction):
254 """Convert a message to a text form similar to how they are shown in RFCs.
256 Refactoring this into a message method will need to address the direction
257 discovery eventually."""
258 if m.remote is None:
259 # This happens when unprocessable remotes are, eg. putting in an HTTP URI
260 yield f"{m.code} {direction} (unknown)"
261 else:
262 # FIXME: Update when transport-indication is available
263 # FIXME: This is slightly wrong because it does not account for what ProxyRedirector does
264 yield f"{m.code} {direction} {m.remote.scheme}://{m.remote.hostinfo}"
265 for opt in m.opt.option_list():
266 if hasattr(opt.number, "name"):
267 yield f"- {opt.number.name_printable} ({opt.number.value}): {opt.value!r}"
268 else:
269 yield f"- {opt.number.value}: {opt.value!r}"
270 if m.payload:
271 limit = 16
272 if len(m.payload) > limit:
273 yield f"Payload: {m.payload[:limit].hex()}... ({len(m.payload)} bytes total)"
274 else:
275 yield f"Payload: {m.payload[:limit].hex()} ({len(m.payload)} bytes)"
276 else:
277 yield "No payload"
280def present(message, options, file=sys.stdout):
281 """Write a message payload to the output, pretty printing and/or coloring
282 it as configured in the options."""
283 if not options.quiet and (message.opt.location_path or message.opt.location_query):
284 # FIXME: Percent encoding is completely missing; this would be done
285 # most easily with a CRI library
286 location_ref = "/" + "/".join(message.opt.location_path)
287 if message.opt.location_query:
288 location_ref += "?" + "&".join(message.opt.location_query)
289 print(
290 colored(
291 f"Location options indicate new resource: {location_ref}",
292 options,
293 lambda token: token.Token.Generic.Inserted,
294 ),
295 file=sys.stderr,
296 )
298 if not message.payload:
299 return
301 payload = None
303 cf = message.opt.content_format or message.request.opt.content_format
304 if cf is not None and cf.is_known():
305 mime = cf.media_type
306 else:
307 mime = "application/octet-stream"
308 if options.pretty_print:
309 from aiocoap.util.prettyprint import pretty_print
311 prettyprinted = pretty_print(message)
312 if prettyprinted is not None:
313 (infos, mime, payload) = prettyprinted
314 if not options.quiet:
315 for i in infos:
316 print(
317 colored("# " + i, options, lambda token: token.Comment),
318 file=sys.stderr,
319 )
321 color = options.color
322 if color:
323 from aiocoap.util.prettyprint import lexer_for_mime
324 import pygments
326 try:
327 lexer = lexer_for_mime(mime)
328 except pygments.util.ClassNotFound:
329 color = False
331 if color and payload is None:
332 # Coloring requires a unicode-string style payload, either from the
333 # mime type or from the pretty printer.
334 try:
335 payload = message.payload.decode("utf8")
336 except UnicodeDecodeError:
337 color = False
339 if color:
340 from pygments.formatters import TerminalFormatter
341 from pygments import highlight
343 highlit = highlight(
344 payload,
345 lexer,
346 TerminalFormatter(),
347 )
348 # The TerminalFormatter already adds an end-of-line character, not
349 # trying to add one for any missing trailing newlines.
350 print(highlit, file=file, end="")
351 file.flush()
352 else:
353 if payload is None:
354 file.buffer.write(message.payload)
355 if file.isatty() and message.payload[-1:] != b"\n":
356 file.write("\n")
357 else:
358 file.write(payload)
359 if file.isatty() and payload[-1] != "\n":
360 file.write("\n")
363async def single_request(args, context, globalopts=None):
364 parser = build_parser(use_global=globalopts is None)
365 options = parser.parse_args(args, copy.copy(globalopts))
367 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules()
368 if pretty_print_modules and (options.color is True or options.pretty_print is True):
369 parser.error(
370 "Color and pretty printing require the following"
371 " additional module(s) to be installed: %s"
372 % ", ".join(pretty_print_modules)
373 )
374 if options.color is None:
375 options.color = sys.stdout.isatty() and not pretty_print_modules
376 if options.pretty_print is None:
377 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules
379 try:
380 try:
381 code = getattr(
382 aiocoap.numbers.codes.Code,
383 options.method.upper().replace("IPATCH", "iPATCH"),
384 )
385 except AttributeError:
386 try:
387 code = aiocoap.numbers.codes.Code(int(options.method))
388 except ValueError:
389 raise parser.error("Unknown method")
391 if options.credentials is not None:
392 apply_credentials(context, options.credentials, parser.error)
394 request = aiocoap.Message(
395 code=code, mtype=aiocoap.NON if options.non else aiocoap.CON
396 )
397 request.set_request_uri(options.url, set_uri_host=options.set_hostname)
399 if options.accept:
400 try:
401 request.opt.accept = ContentFormat(int(options.accept))
402 except ValueError:
403 try:
404 request.opt.accept = ContentFormat.by_media_type(options.accept)
405 except KeyError:
406 raise parser.error("Unknown accept type")
408 if options.observe:
409 request.opt.observe = 0
410 observation_is_over = asyncio.get_event_loop().create_future()
412 if options.content_format:
413 try:
414 request.opt.content_format = ContentFormat(int(options.content_format))
415 except ValueError:
416 try:
417 request.opt.content_format = ContentFormat.by_media_type(
418 options.content_format
419 )
420 except KeyError:
421 raise parser.error("Unknown content format")
423 if options.payload:
424 if options.payload.startswith("@"):
425 filename = options.payload[1:]
426 if filename == "-":
427 f = sys.stdin.buffer
428 else:
429 f = open(filename, "rb")
430 try:
431 request.payload = f.read()
432 except OSError as e:
433 raise parser.error("File could not be opened: %s" % e)
434 else:
435 request_classification = contenttype.categorize(
436 request.opt.content_format.media_type
437 if request.opt.content_format is not None
438 and request.opt.content_format.is_known()
439 else ""
440 )
441 if request_classification in ("cbor", "cbor-seq"):
442 try:
443 import cbor_diag
444 except ImportError as e:
445 raise parser.error(f"CBOR recoding not available ({e})")
447 try:
448 encoded = cbor_diag.diag2cbor(options.payload)
449 except ValueError as e:
450 raise parser.error(
451 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}"
452 )
454 if request_classification == "cbor-seq":
455 try:
456 import cbor2
457 except ImportError as e:
458 raise parser.error(
459 f"CBOR sequence recoding not available ({e})"
460 )
461 decoded = cbor2.loads(encoded)
462 if not isinstance(decoded, list):
463 raise parser.error(
464 "CBOR sequence recoding requires an array as the top-level element."
465 )
466 request.payload = b"".join(cbor2.dumps(d) for d in decoded)
467 else:
468 request.payload = encoded
469 else:
470 request.payload = options.payload.encode("utf8")
472 if options.payload_initial_szx is not None:
473 request.remote.maximum_block_size_exp = options.payload_initial_szx
475 if options.proxy is None or options.proxy in ("none", "", "-"):
476 interface = context
477 else:
478 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context)
480 requested_uri = request.get_request_uri()
482 log.info("Sending request:")
483 for line in message_to_text(request, "to"):
484 log.info(line)
486 requester = interface.request(request)
488 if options.observe:
489 requester.observation.register_errback(observation_is_over.set_result)
490 requester.observation.register_callback(
491 lambda data, options=options: incoming_observation(options, data)
492 )
494 try:
495 response_data = await requester.response
496 finally:
497 if not requester.response.done():
498 requester.response.cancel()
499 if options.observe and not requester.observation.cancelled:
500 requester.observation.cancel()
502 log.info("Received response:")
503 for line in message_to_text(response_data, "from"):
504 log.info(line)
506 response_uri = response_data.get_request_uri()
507 if requested_uri != response_uri:
508 print(
509 colored(
510 f"Response arrived from different address; base URI is {response_uri}",
511 options,
512 lambda token: token.Generic.Inserted,
513 ),
514 file=sys.stderr,
515 )
516 if response_data.code.is_successful():
517 present(response_data, options)
518 else:
519 print(
520 colored(response_data.code, options, lambda token: token.Generic.Error),
521 file=sys.stderr,
522 )
523 present(response_data, options, file=sys.stderr)
524 sys.exit(1)
526 if options.observe:
527 exit_reason = await observation_is_over
528 print("Observation is over: %r" % (exit_reason,), file=sys.stderr)
529 except aiocoap.error.HelpfulError as e:
530 print(str(e), file=sys.stderr)
531 extra_help = e.extra_help(
532 hints=dict(
533 original_uri=options.url,
534 request=request,
535 )
536 )
537 if extra_help:
538 print("Debugging hint:", extra_help, file=sys.stderr)
539 sys.exit(1)
540 # Fallback while not all backends raise NetworkErrors
541 except OSError as e:
542 text = str(e)
543 if not text:
544 text = repr(e)
545 if not text:
546 # eg ConnectionResetError flying out of a misconfigured SSL server
547 text = type(e)
548 print(
549 "Warning: OS errors should not be raised this way any more.",
550 file=sys.stderr,
551 )
552 # not telling what to do precisely: the form already tells users to
553 # include `aiocoap.cli.defaults` output, which is exactly what we
554 # need.
555 print(
556 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.",
557 file=sys.stderr,
558 )
559 print("Error:", text, file=sys.stderr)
560 sys.exit(1)
563async def single_request_with_context(args):
564 """Wrapper around single_request until sync_main gets made fully async, and
565 async context managers are used to manage contexts."""
566 context = await aiocoap.Context.create_client_context()
567 try:
568 await single_request(args, context)
569 finally:
570 await context.shutdown()
573interactive_expecting_keyboard_interrupt = None
576async def interactive(globalopts):
577 global interactive_expecting_keyboard_interrupt
578 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future()
580 context = await aiocoap.Context.create_client_context()
582 while True:
583 try:
584 # when http://bugs.python.org/issue22412 is resolved, use that instead
585 line = await asyncio.get_event_loop().run_in_executor(
586 None, lambda: input("aiocoap> ")
587 )
588 except EOFError:
589 line = "exit"
590 line = shlex.split(line)
591 if not line:
592 continue
593 if line in (["help"], ["?"]):
594 line = ["--help"]
595 if line in (["quit"], ["q"], ["exit"]):
596 break
598 async def single_request_noexit(*args, **kwargs):
599 """Protects a run against the exit automatically generated by
600 argparse errors or help"""
601 try:
602 # We could also set exit_on_error, but this way we do get the
603 # original error code and can show that; might still revisit.
604 await single_request(*args, **kwargs)
605 except SystemExit as e:
606 return e.code
607 else:
608 return 0
610 current_task = asyncio.create_task(
611 single_request_noexit(line, context=context, globalopts=globalopts),
612 name="Interactive prompt command %r" % line,
613 )
614 interactive_expecting_keyboard_interrupt = (
615 asyncio.get_event_loop().create_future()
616 )
618 done, pending = await asyncio.wait(
619 [current_task, interactive_expecting_keyboard_interrupt],
620 return_when=asyncio.FIRST_COMPLETED,
621 )
623 if current_task not in done:
624 current_task.cancel()
625 else:
626 try:
627 code = await current_task
628 except Exception as e:
629 print("Unhandled exception raised: %s" % (e,))
630 if code != 0:
631 print("Exit code: %d" % code, file=sys.stderr)
633 await context.shutdown()
636async def main(args=None):
637 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet
638 if args is None:
639 args = sys.argv[1:]
641 # This one is tolerant and doesn't even terminate with --help, so that
642 # --help and --interactive --help can do the right thing.
643 first_parser = build_parser(prescreen=True)
644 first_args = first_parser.parse_args(args)
646 configure_logging(
647 (first_args.verbose or 0) - (first_args.quiet or 0), first_args.color
648 )
650 if not first_args.interactive:
651 try:
652 await single_request_with_context(args)
653 except asyncio.exceptions.CancelledError:
654 sys.exit(3)
655 else:
656 global_parser = build_parser(use_interactive=False)
657 globalopts = global_parser.parse_args(args)
659 loop = asyncio.get_event_loop()
661 def ctrl_c():
662 try:
663 interactive_expecting_keyboard_interrupt.set_result(None)
664 except asyncio.exceptions.InvalidStateError:
665 # Too many Ctlr-C before the program could clean up
666 sys.exit(3)
668 loop.add_signal_handler(signal.SIGINT, ctrl_c)
670 await interactive(globalopts)
673def sync_main(args=None):
674 asyncio.run(main(args=args))
677if __name__ == "__main__":
678 sync_main()