Coverage for src/aiocoap/cli/client.py: 0%
355 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 12:14 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-13 12:14 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers"""
7import copy
8import sys
9import asyncio
10import argparse
11import logging
12import subprocess
13from pathlib import Path
15import shlex
17# even though not used directly, this has side effects on the input() function
18# used in interactive mode
19try:
20 import readline # noqa: F401
21except ImportError:
22 pass # that's normal on some platforms, and ok since it's just a usability enhancement
24import aiocoap
25import aiocoap.defaults
26import aiocoap.meta
27import aiocoap.proxy.client
28from aiocoap.util import contenttype
29from aiocoap.util.cli import ActionNoYes
30from aiocoap.numbers import ContentFormat
32log = logging.getLogger("coap.aiocoap-client")
35def augment_parser_for_global(p, *, prescreen=False):
36 p.add_argument(
37 "-v",
38 "--verbose",
39 help="Increase the debug output",
40 action="count",
41 )
42 p.add_argument(
43 "-q",
44 "--quiet",
45 help="Decrease the debug output",
46 action="count",
47 )
48 p.add_argument(
49 "--version", action="version", version="%(prog)s " + aiocoap.meta.version
50 )
52 p.add_argument(
53 "--interactive",
54 help="Enter interactive mode. Combine with --help or run `help` interactively to see which options apply where; some can be used globally and overwritten locally.",
55 action="store_true",
56 )
59def augment_parser_for_either(p):
60 p.add_argument(
61 "--color",
62 help="Color output (default on TTYs if all required modules are installed)",
63 default=None,
64 action=ActionNoYes,
65 )
66 p.add_argument(
67 "--pretty-print",
68 help="Pretty-print known content formats (default on TTYs if all required modules are installed)",
69 default=None,
70 action=ActionNoYes,
71 )
72 p.add_argument(
73 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI"
74 )
75 p.add_argument(
76 "--credentials",
77 help="Load credentials to use from a given file",
78 type=Path,
79 )
80 p.add_argument(
81 "--no-set-hostname",
82 help="Suppress transmission of Uri-Host even if the host name is not an IP literal",
83 dest="set_hostname",
84 action="store_false",
85 default=True,
86 )
89def augment_parser_for_interactive(p, *, prescreen=False):
90 p.add_argument(
91 "--non",
92 help="Send request as non-confirmable (NON) message",
93 action="store_true",
94 )
95 p.add_argument(
96 "-m",
97 "--method",
98 help="Name or number of request method to use (default: %(default)s)",
99 default="GET",
100 )
101 p.add_argument(
102 "--observe", help="Register an observation on the resource", action="store_true"
103 )
104 p.add_argument(
105 "--observe-exec",
106 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin",
107 metavar="CMD",
108 )
109 p.add_argument(
110 "--accept",
111 help="Content format to request",
112 metavar="MIME",
113 )
114 p.add_argument(
115 "--payload",
116 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.",
117 metavar="X",
118 )
119 p.add_argument(
120 "--payload-initial-szx",
121 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)",
122 metavar="SZX",
123 type=int,
124 )
125 p.add_argument(
126 "--content-format",
127 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.",
128 metavar="MIME",
129 )
130 p.add_argument(
131 "url",
132 nargs="?" if prescreen else None,
133 help="CoAP address to fetch",
134 )
137def build_parser(*, use_global=True, use_interactive=True, prescreen=False):
138 p = argparse.ArgumentParser(description=__doc__, add_help=not prescreen)
139 if prescreen:
140 p.add_argument("--help", action="store_true")
141 if use_global:
142 augment_parser_for_global(p, prescreen=prescreen)
143 augment_parser_for_either(p)
144 if use_interactive:
145 augment_parser_for_interactive(p, prescreen=prescreen)
147 return p
150def configure_logging(verbosity, color):
151 if color is not False:
152 try:
153 import colorlog
154 except ImportError:
155 color = False
156 else:
157 colorlog.basicConfig()
158 if not color:
159 logging.basicConfig()
161 if verbosity <= -2:
162 logging.getLogger("coap").setLevel(logging.CRITICAL + 1)
163 elif verbosity == -1:
164 logging.getLogger("coap").setLevel(logging.ERROR)
165 elif verbosity == 0:
166 logging.getLogger("coap").setLevel(logging.WARNING)
167 elif verbosity == 1:
168 logging.getLogger("coap").setLevel(logging.WARNING)
169 logging.getLogger("coap.aiocoap-client").setLevel(logging.INFO)
170 elif verbosity == 2:
171 logging.getLogger("coap").setLevel(logging.INFO)
172 elif verbosity >= 3:
173 logging.getLogger("coap").setLevel(logging.DEBUG)
174 elif verbosity >= 4:
175 logging.getLogger("coap").setLevel(0)
177 log.debug("Logging configured.")
180def colored(text, options, tokenlambda):
181 """Apply pygments based coloring if options.color is set. Tokelambda is a
182 callback to which pygments.token is passed and which returns a token type;
183 this makes it easy to not need to conditionally react to pygments' possible
184 absence in all color locations."""
185 if not options.color:
186 return str(text)
188 from pygments.formatters import TerminalFormatter
189 from pygments import token, format
191 return format(
192 [(tokenlambda(token), str(text))],
193 TerminalFormatter(),
194 )
197def incoming_observation(options, response):
198 log.info("Received Observe notification:")
199 for line in message_to_text(response, "from"):
200 log.info(line)
202 if options.observe_exec:
203 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE)
204 # FIXME this blocks
205 p.communicate(response.payload)
206 else:
207 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc))
208 if response.code.is_successful():
209 present(response, options, file=sys.stderr)
210 else:
211 sys.stdout.flush()
212 print(
213 colored(
214 response.code, options, lambda token: token.Token.Generic.Error
215 ),
216 file=sys.stderr,
217 )
218 if response.payload:
219 present(response, options, file=sys.stderr)
222def apply_credentials(context, credentials, errfn):
223 try:
224 if credentials.suffix == ".json":
225 import json
227 context.client_credentials.load_from_dict(json.load(credentials.open("rb")))
228 elif credentials.suffix == ".diag":
229 try:
230 import cbor_diag
231 import cbor2
232 except ImportError:
233 raise errfn(
234 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package"
235 )
236 context.client_credentials.load_from_dict(
237 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read()))
238 )
239 else:
240 raise errfn(
241 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix)
242 )
243 except FileNotFoundError as e:
244 raise errfn("Credential file not found: %s" % e.filename)
245 except (OSError, ValueError) as e:
246 # Any of the parsers could reasonably raise those, and while they don't
247 # have HelpfulError support, they should still not render a backtrace
248 # but a proper CLI error.
249 raise errfn("Processing credential file: %s" % e)
252def message_to_text(m, direction):
253 """Convert a message to a text form similar to how they are shown in RFCs.
255 Refactoring this into a message method will need to address the direction
256 discovery eventually."""
257 if m.remote is None:
258 # This happens when unprocessable remotes are, eg. putting in an HTTP URI
259 yield f"{m.code} {direction} (unknown)"
260 else:
261 # FIXME: Update when transport-indication is available
262 # FIXME: This is slightly wrong because it does not account for what ProxyRedirector does
263 yield f"{m.code} {direction} {m.remote.scheme}://{m.remote.hostinfo}"
264 for opt in m.opt.option_list():
265 if hasattr(opt.number, "name"):
266 yield f"- {opt.number.name_printable} ({opt.number.value}): {opt.value!r}"
267 else:
268 yield f"- {opt.number.value}: {opt.value!r}"
269 if m.payload:
270 limit = 16
271 if len(m.payload) > limit:
272 yield f"Payload: {m.payload[:limit].hex()}... ({len(m.payload)} bytes total)"
273 else:
274 yield f"Payload: {m.payload[:limit].hex()} ({len(m.payload)} bytes)"
275 else:
276 yield "No payload"
279def present(message, options, file=sys.stdout):
280 """Write a message payload to the output, pretty printing and/or coloring
281 it as configured in the options."""
282 if not options.quiet and (message.opt.location_path or message.opt.location_query):
283 # FIXME: Percent encoding is completely missing; this would be done
284 # most easily with a CRI library
285 location_ref = "/" + "/".join(message.opt.location_path)
286 if message.opt.location_query:
287 location_ref += "?" + "&".join(message.opt.location_query)
288 print(
289 colored(
290 f"Location options indicate new resource: {location_ref}",
291 options,
292 lambda token: token.Token.Generic.Inserted,
293 ),
294 file=sys.stderr,
295 )
297 if not message.payload:
298 return
300 payload = None
302 cf = message.opt.content_format or message.request.opt.content_format
303 if cf is not None and cf.is_known():
304 mime = cf.media_type
305 else:
306 mime = "application/octet-stream"
307 if options.pretty_print:
308 from aiocoap.util.prettyprint import pretty_print
310 prettyprinted = pretty_print(message)
311 if prettyprinted is not None:
312 (infos, mime, payload) = prettyprinted
313 if not options.quiet:
314 for i in infos:
315 print(
316 colored("# " + i, options, lambda token: token.Comment),
317 file=sys.stderr,
318 )
320 color = options.color
321 if color:
322 from aiocoap.util.prettyprint import lexer_for_mime
323 import pygments
325 try:
326 lexer = lexer_for_mime(mime)
327 except pygments.util.ClassNotFound:
328 color = False
330 if color and payload is None:
331 # Coloring requires a unicode-string style payload, either from the
332 # mime type or from the pretty printer.
333 try:
334 payload = message.payload.decode("utf8")
335 except UnicodeDecodeError:
336 color = False
338 if color:
339 from pygments.formatters import TerminalFormatter
340 from pygments import highlight
342 highlit = highlight(
343 payload,
344 lexer,
345 TerminalFormatter(),
346 )
347 # The TerminalFormatter already adds an end-of-line character, not
348 # trying to add one for any missing trailing newlines.
349 print(highlit, file=file, end="")
350 file.flush()
351 else:
352 if payload is None:
353 file.buffer.write(message.payload)
354 if file.isatty() and message.payload[-1:] != b"\n":
355 file.write("\n")
356 else:
357 file.write(payload)
358 if file.isatty() and payload[-1] != "\n":
359 file.write("\n")
362async def single_request(args, context, globalopts=None):
363 parser = build_parser(use_global=globalopts is None)
364 options = parser.parse_args(args, copy.copy(globalopts))
366 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules()
367 if pretty_print_modules and (options.color is True or options.pretty_print is True):
368 parser.error(
369 "Color and pretty printing require the following"
370 " additional module(s) to be installed: %s"
371 % ", ".join(pretty_print_modules)
372 )
373 if options.color is None:
374 options.color = sys.stdout.isatty() and not pretty_print_modules
375 if options.pretty_print is None:
376 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules
378 try:
379 try:
380 code = getattr(
381 aiocoap.numbers.codes.Code,
382 options.method.upper().replace("IPATCH", "iPATCH"),
383 )
384 except AttributeError:
385 try:
386 code = aiocoap.numbers.codes.Code(int(options.method))
387 except ValueError:
388 raise parser.error("Unknown method")
390 if options.credentials is not None:
391 apply_credentials(context, options.credentials, parser.error)
393 request = aiocoap.Message(
394 code=code, mtype=aiocoap.NON if options.non else aiocoap.CON
395 )
396 request.set_request_uri(options.url, set_uri_host=options.set_hostname)
398 if options.accept:
399 try:
400 request.opt.accept = ContentFormat(int(options.accept))
401 except ValueError:
402 try:
403 request.opt.accept = ContentFormat.by_media_type(options.accept)
404 except KeyError:
405 raise parser.error("Unknown accept type")
407 if options.observe:
408 request.opt.observe = 0
409 observation_is_over = asyncio.get_event_loop().create_future()
411 if options.content_format:
412 try:
413 request.opt.content_format = ContentFormat(int(options.content_format))
414 except ValueError:
415 try:
416 request.opt.content_format = ContentFormat.by_media_type(
417 options.content_format
418 )
419 except KeyError:
420 raise parser.error("Unknown content format")
422 if options.payload:
423 if options.payload.startswith("@"):
424 filename = options.payload[1:]
425 if filename == "-":
426 f = sys.stdin.buffer
427 else:
428 f = open(filename, "rb")
429 try:
430 request.payload = f.read()
431 except OSError as e:
432 raise parser.error("File could not be opened: %s" % e)
433 else:
434 request_classification = contenttype.categorize(
435 request.opt.content_format.media_type
436 if request.opt.content_format is not None
437 and request.opt.content_format.is_known()
438 else ""
439 )
440 if request_classification in ("cbor", "cbor-seq"):
441 try:
442 import cbor_diag
443 except ImportError as e:
444 raise parser.error(f"CBOR recoding not available ({e})")
446 try:
447 encoded = cbor_diag.diag2cbor(options.payload)
448 except ValueError as e:
449 raise parser.error(
450 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}"
451 )
453 if request_classification == "cbor-seq":
454 try:
455 import cbor2
456 except ImportError as e:
457 raise parser.error(
458 f"CBOR sequence recoding not available ({e})"
459 )
460 decoded = cbor2.loads(encoded)
461 if not isinstance(decoded, list):
462 raise parser.error(
463 "CBOR sequence recoding requires an array as the top-level element."
464 )
465 request.payload = b"".join(cbor2.dumps(d) for d in decoded)
466 else:
467 request.payload = encoded
468 else:
469 request.payload = options.payload.encode("utf8")
471 if options.payload_initial_szx is not None:
472 request.remote.maximum_block_size_exp = options.payload_initial_szx
474 if options.proxy is None or options.proxy in ("none", "", "-"):
475 interface = context
476 else:
477 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context)
479 requested_uri = request.get_request_uri()
481 log.info("Sending request:")
482 for line in message_to_text(request, "to"):
483 log.info(line)
485 requester = interface.request(request)
487 if options.observe:
488 requester.observation.register_errback(observation_is_over.set_result)
489 requester.observation.register_callback(
490 lambda data, options=options: incoming_observation(options, data)
491 )
493 try:
494 response_data = await requester.response
495 finally:
496 if not requester.response.done():
497 requester.response.cancel()
498 if options.observe and not requester.observation.cancelled:
499 requester.observation.cancel()
501 log.info("Received response:")
502 for line in message_to_text(response_data, "from"):
503 log.info(line)
505 response_uri = response_data.get_request_uri()
506 if requested_uri != response_uri:
507 print(
508 colored(
509 f"Response arrived from different address; base URI is {response_uri}",
510 options,
511 lambda token: token.Generic.Inserted,
512 ),
513 file=sys.stderr,
514 )
515 if response_data.code.is_successful():
516 present(response_data, options)
517 else:
518 print(
519 colored(response_data.code, options, lambda token: token.Generic.Error),
520 file=sys.stderr,
521 )
522 present(response_data, options, file=sys.stderr)
523 sys.exit(1)
525 if options.observe:
526 exit_reason = await observation_is_over
527 print("Observation is over: %r" % (exit_reason,), file=sys.stderr)
528 except aiocoap.error.HelpfulError as e:
529 print(str(e), file=sys.stderr)
530 extra_help = e.extra_help(
531 hints=dict(
532 original_uri=options.url,
533 request=request,
534 )
535 )
536 if extra_help:
537 print("Debugging hint:", extra_help, file=sys.stderr)
538 sys.exit(1)
539 # Fallback while not all backends raise NetworkErrors
540 except OSError as e:
541 text = str(e)
542 if not text:
543 text = repr(e)
544 if not text:
545 # eg ConnectionResetError flying out of a misconfigured SSL server
546 text = type(e)
547 print(
548 "Warning: OS errors should not be raised this way any more.",
549 file=sys.stderr,
550 )
551 # not telling what to do precisely: the form already tells users to
552 # include `aiocoap.cli.defaults` output, which is exactly what we
553 # need.
554 print(
555 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.",
556 file=sys.stderr,
557 )
558 print("Error:", text, file=sys.stderr)
559 sys.exit(1)
562async def single_request_with_context(args):
563 """Wrapper around single_request until sync_main gets made fully async, and
564 async context managers are used to manage contexts."""
565 context = await aiocoap.Context.create_client_context()
566 try:
567 await single_request(args, context)
568 finally:
569 await context.shutdown()
572interactive_expecting_keyboard_interrupt = None
575async def interactive(globalopts):
576 global interactive_expecting_keyboard_interrupt
577 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future()
579 context = await aiocoap.Context.create_client_context()
581 while True:
582 try:
583 # when http://bugs.python.org/issue22412 is resolved, use that instead
584 line = await asyncio.get_event_loop().run_in_executor(
585 None, lambda: input("aiocoap> ")
586 )
587 except EOFError:
588 line = "exit"
589 line = shlex.split(line)
590 if not line:
591 continue
592 if line in (["help"], ["?"]):
593 line = ["--help"]
594 if line in (["quit"], ["q"], ["exit"]):
595 break
597 current_task = asyncio.create_task(
598 single_request(line, context=context, globalopts=globalopts),
599 name="Interactive prompt command %r" % line,
600 )
601 interactive_expecting_keyboard_interrupt = (
602 asyncio.get_event_loop().create_future()
603 )
605 done, pending = await asyncio.wait(
606 [current_task, interactive_expecting_keyboard_interrupt],
607 return_when=asyncio.FIRST_COMPLETED,
608 )
610 if current_task not in done:
611 current_task.cancel()
612 else:
613 try:
614 await current_task
615 except SystemExit as e:
616 if e.code != 0:
617 print("Exit code: %d" % e.code, file=sys.stderr)
618 continue
619 except Exception as e:
620 print("Unhandled exception raised: %s" % (e,))
622 await context.shutdown()
625def sync_main(args=None):
626 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet
627 if args is None:
628 args = sys.argv[1:]
630 # This one is tolerant and doesn't even terminate with --help, so that
631 # --help and --interactive --help can do the right thing.
632 first_parser = build_parser(prescreen=True)
633 first_args = first_parser.parse_args(args)
635 configure_logging(
636 (first_args.verbose or 0) - (first_args.quiet or 0), first_args.color
637 )
639 if not first_args.interactive:
640 try:
641 asyncio.run(single_request_with_context(args))
642 except KeyboardInterrupt:
643 sys.exit(3)
644 else:
645 global_parser = build_parser(use_interactive=False)
646 globalopts = global_parser.parse_args(args)
648 loop = asyncio.get_event_loop()
649 task = loop.create_task(
650 interactive(globalopts),
651 name="Interactive prompt",
652 )
654 while not task.done():
655 try:
656 loop.run_until_complete(task)
657 except KeyboardInterrupt:
658 if not interactive_expecting_keyboard_interrupt.done():
659 interactive_expecting_keyboard_interrupt.set_result(None)
660 except SystemExit:
661 continue # asyncio/tasks.py(242) raises those after setting them as results, but we particularly want them back in the loop
664if __name__ == "__main__":
665 sync_main()