Coverage for src/aiocoap/cli/client.py: 0%
350 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-20 17:26 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-20 17:26 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""aiocoap-client is a simple command-line tool for interacting with CoAP servers"""
7import copy
8import sys
9import asyncio
10import argparse
11import logging
12import subprocess
13from pathlib import Path
15import shlex
17# even though not used directly, this has side effects on the input() function
18# used in interactive mode
19try:
20 import readline # noqa: F401
21except ImportError:
22 pass # that's normal on some platforms, and ok since it's just a usability enhancement
24import aiocoap
25import aiocoap.defaults
26import aiocoap.meta
27import aiocoap.proxy.client
28from aiocoap.util import contenttype
29from aiocoap.util.cli import ActionNoYes
30from aiocoap.numbers import ContentFormat
32log = logging.getLogger("coap.aiocoap-client")
35def augment_parser_for_global(p, *, prescreen=False):
36 p.add_argument(
37 "-v",
38 "--verbose",
39 help="Increase the debug output",
40 action="count",
41 )
42 p.add_argument(
43 "-q",
44 "--quiet",
45 help="Decrease the debug output",
46 action="count",
47 )
48 p.add_argument(
49 "--version", action="version", version="%(prog)s " + aiocoap.meta.version
50 )
52 p.add_argument(
53 "--interactive",
54 help="Enter interactive mode. Combine with --help or run `help` interactively to see which options apply where; some can be used globally and overwritten locally.",
55 action="store_true",
56 )
59def augment_parser_for_either(p):
60 p.add_argument(
61 "--color",
62 help="Color output (default on TTYs if all required modules are installed)",
63 default=None,
64 action=ActionNoYes,
65 )
66 p.add_argument(
67 "--pretty-print",
68 help="Pretty-print known content formats (default on TTYs if all required modules are installed)",
69 default=None,
70 action=ActionNoYes,
71 )
72 p.add_argument(
73 "--proxy", help="Relay the CoAP request to a proxy for execution", metavar="URI"
74 )
75 p.add_argument(
76 "--credentials",
77 help="Load credentials to use from a given file",
78 type=Path,
79 )
80 p.add_argument(
81 "--no-set-hostname",
82 help="Suppress transmission of Uri-Host even if the host name is not an IP literal",
83 dest="set_hostname",
84 action="store_false",
85 default=True,
86 )
89def augment_parser_for_interactive(p, *, prescreen=False):
90 p.add_argument(
91 "--non",
92 help="Send request as non-confirmable (NON) message",
93 action="store_true",
94 )
95 p.add_argument(
96 "-m",
97 "--method",
98 help="Name or number of request method to use (default: %(default)s)",
99 default="GET",
100 )
101 p.add_argument(
102 "--observe", help="Register an observation on the resource", action="store_true"
103 )
104 p.add_argument(
105 "--observe-exec",
106 help="Run the specified program whenever the observed resource changes, feeding the response data to its stdin",
107 metavar="CMD",
108 )
109 p.add_argument(
110 "--accept",
111 help="Content format to request",
112 metavar="MIME",
113 )
114 p.add_argument(
115 "--payload",
116 help="Send X as request payload (eg. with a PUT). If X starts with an '@', its remainder is treated as a file name and read from; '@-' reads from the console. Non-file data may be recoded, see --content-format.",
117 metavar="X",
118 )
119 p.add_argument(
120 "--payload-initial-szx",
121 help="Size exponent to limit the initial block's size (0 ≙ 16 Byte, 6 ≙ 1024 Byte)",
122 metavar="SZX",
123 type=int,
124 )
125 p.add_argument(
126 "--content-format",
127 help="Content format of the --payload data. If a known format is given and --payload has a non-file argument, the payload is converted from CBOR Diagnostic Notation.",
128 metavar="MIME",
129 )
130 p.add_argument(
131 "url",
132 nargs="?" if prescreen else None,
133 help="CoAP address to fetch",
134 )
137def build_parser(*, use_global=True, use_interactive=True, prescreen=False):
138 p = argparse.ArgumentParser(description=__doc__, add_help=not prescreen)
139 if prescreen:
140 p.add_argument("--help", action="store_true")
141 if use_global:
142 augment_parser_for_global(p, prescreen=prescreen)
143 augment_parser_for_either(p)
144 if use_interactive:
145 augment_parser_for_interactive(p, prescreen=prescreen)
147 return p
150def configure_logging(verbosity, color):
151 if color is not False:
152 try:
153 import colorlog
154 except ImportError:
155 color = False
156 else:
157 colorlog.basicConfig()
158 if not color:
159 logging.basicConfig()
161 if verbosity <= -2:
162 logging.getLogger("coap").setLevel(logging.CRITICAL + 1)
163 elif verbosity == -1:
164 logging.getLogger("coap").setLevel(logging.ERROR)
165 elif verbosity == 0:
166 logging.getLogger("coap").setLevel(logging.WARNING)
167 elif verbosity == 1:
168 logging.getLogger("coap").setLevel(logging.WARNING)
169 logging.getLogger("coap.aiocoap-client").setLevel(logging.INFO)
170 elif verbosity == 2:
171 logging.getLogger("coap").setLevel(logging.INFO)
172 elif verbosity >= 3:
173 logging.getLogger("coap").setLevel(logging.DEBUG)
174 elif verbosity >= 4:
175 logging.getLogger("coap").setLevel(0)
177 log.debug("Logging configured.")
180def colored(text, options, tokenlambda):
181 """Apply pygments based coloring if options.color is set. Tokelambda is a
182 callback to which pygments.token is passed and which returns a token type;
183 this makes it easy to not need to conditionally react to pygments' possible
184 absence in all color locations."""
185 if not options.color:
186 return str(text)
188 from pygments.formatters import TerminalFormatter
189 from pygments import token, format
191 return format(
192 [(tokenlambda(token), str(text))],
193 TerminalFormatter(),
194 )
197def incoming_observation(options, response):
198 log.info("Received Observe notification:")
199 for line in message_to_text(response, "from"):
200 log.info(line)
202 if options.observe_exec:
203 p = subprocess.Popen(options.observe_exec, shell=True, stdin=subprocess.PIPE)
204 # FIXME this blocks
205 p.communicate(response.payload)
206 else:
207 sys.stdout.write(colored("---", options, lambda token: token.Comment.Preproc))
208 if response.code.is_successful():
209 present(response, options, file=sys.stderr)
210 else:
211 sys.stdout.flush()
212 print(
213 colored(
214 response.code, options, lambda token: token.Token.Generic.Error
215 ),
216 file=sys.stderr,
217 )
218 if response.payload:
219 present(response, options, file=sys.stderr)
222def apply_credentials(context, credentials, errfn):
223 if credentials.suffix == ".json":
224 import json
226 context.client_credentials.load_from_dict(json.load(credentials.open("rb")))
227 elif credentials.suffix == ".diag":
228 try:
229 import cbor_diag
230 import cbor2
231 except ImportError:
232 raise errfn(
233 "Loading credentials in CBOR diagnostic format requires cbor2 and cbor_diag package"
234 )
235 context.client_credentials.load_from_dict(
236 cbor2.loads(cbor_diag.diag2cbor(credentials.open().read()))
237 )
238 else:
239 raise errfn(
240 "Unknown suffix: %s (expected: .json or .diag)" % (credentials.suffix)
241 )
244def message_to_text(m, direction):
245 """Convert a message to a text form similar to how they are shown in RFCs.
247 Refactoring this into a message method will need to address the direction
248 discovery eventually."""
249 if m.remote is None:
250 # This happens when unprocessable remotes are, eg. putting in an HTTP URI
251 yield f"{m.code} {direction} (unknown)"
252 else:
253 # FIXME: Update when transport-indication is available
254 # FIXME: This is slightly wrong because it does not account for what ProxyRedirector does
255 yield f"{m.code} {direction} {m.remote.scheme}://{m.remote.hostinfo}"
256 for opt in m.opt.option_list():
257 if hasattr(opt.number, "name"):
258 yield f"- {opt.number.name_printable} ({opt.number.value}): {opt.value!r}"
259 else:
260 yield f"- {opt.number.value}: {opt.value!r}"
261 if m.payload:
262 limit = 16
263 if len(m.payload) > limit:
264 yield f"Payload: {m.payload[:limit].hex()}... ({len(m.payload)} bytes total)"
265 else:
266 yield f"Payload: {m.payload[:limit].hex()} ({len(m.payload)} bytes)"
267 else:
268 yield "No payload"
271def present(message, options, file=sys.stdout):
272 """Write a message payload to the output, pretty printing and/or coloring
273 it as configured in the options."""
274 if not options.quiet and (message.opt.location_path or message.opt.location_query):
275 # FIXME: Percent encoding is completely missing; this would be done
276 # most easily with a CRI library
277 location_ref = "/" + "/".join(message.opt.location_path)
278 if message.opt.location_query:
279 location_ref += "?" + "&".join(message.opt.location_query)
280 print(
281 colored(
282 f"Location options indicate new resource: {location_ref}",
283 options,
284 lambda token: token.Token.Generic.Inserted,
285 ),
286 file=sys.stderr,
287 )
289 if not message.payload:
290 return
292 payload = None
294 cf = message.opt.content_format or message.request.opt.content_format
295 if cf is not None and cf.is_known():
296 mime = cf.media_type
297 else:
298 mime = "application/octet-stream"
299 if options.pretty_print:
300 from aiocoap.util.prettyprint import pretty_print
302 prettyprinted = pretty_print(message)
303 if prettyprinted is not None:
304 (infos, mime, payload) = prettyprinted
305 if not options.quiet:
306 for i in infos:
307 print(
308 colored("# " + i, options, lambda token: token.Comment),
309 file=sys.stderr,
310 )
312 color = options.color
313 if color:
314 from aiocoap.util.prettyprint import lexer_for_mime
315 import pygments
317 try:
318 lexer = lexer_for_mime(mime)
319 except pygments.util.ClassNotFound:
320 color = False
322 if color and payload is None:
323 # Coloring requires a unicode-string style payload, either from the
324 # mime type or from the pretty printer.
325 try:
326 payload = message.payload.decode("utf8")
327 except UnicodeDecodeError:
328 color = False
330 if color:
331 from pygments.formatters import TerminalFormatter
332 from pygments import highlight
334 highlit = highlight(
335 payload,
336 lexer,
337 TerminalFormatter(),
338 )
339 # The TerminalFormatter already adds an end-of-line character, not
340 # trying to add one for any missing trailing newlines.
341 print(highlit, file=file, end="")
342 file.flush()
343 else:
344 if payload is None:
345 file.buffer.write(message.payload)
346 if file.isatty() and message.payload[-1:] != b"\n":
347 file.write("\n")
348 else:
349 file.write(payload)
350 if file.isatty() and payload[-1] != "\n":
351 file.write("\n")
354async def single_request(args, context, globalopts=None):
355 parser = build_parser(use_global=globalopts is None)
356 options = parser.parse_args(args, copy.copy(globalopts))
358 pretty_print_modules = aiocoap.defaults.prettyprint_missing_modules()
359 if pretty_print_modules and (options.color is True or options.pretty_print is True):
360 parser.error(
361 "Color and pretty printing require the following"
362 " additional module(s) to be installed: %s"
363 % ", ".join(pretty_print_modules)
364 )
365 if options.color is None:
366 options.color = sys.stdout.isatty() and not pretty_print_modules
367 if options.pretty_print is None:
368 options.pretty_print = sys.stdout.isatty() and not pretty_print_modules
370 try:
371 try:
372 code = getattr(
373 aiocoap.numbers.codes.Code,
374 options.method.upper().replace("IPATCH", "iPATCH"),
375 )
376 except AttributeError:
377 try:
378 code = aiocoap.numbers.codes.Code(int(options.method))
379 except ValueError:
380 raise parser.error("Unknown method")
382 if options.credentials is not None:
383 apply_credentials(context, options.credentials, parser.error)
385 request = aiocoap.Message(
386 code=code, mtype=aiocoap.NON if options.non else aiocoap.CON
387 )
388 request.set_request_uri(options.url, set_uri_host=options.set_hostname)
390 if options.accept:
391 try:
392 request.opt.accept = ContentFormat(int(options.accept))
393 except ValueError:
394 try:
395 request.opt.accept = ContentFormat.by_media_type(options.accept)
396 except KeyError:
397 raise parser.error("Unknown accept type")
399 if options.observe:
400 request.opt.observe = 0
401 observation_is_over = asyncio.get_event_loop().create_future()
403 if options.content_format:
404 try:
405 request.opt.content_format = ContentFormat(int(options.content_format))
406 except ValueError:
407 try:
408 request.opt.content_format = ContentFormat.by_media_type(
409 options.content_format
410 )
411 except KeyError:
412 raise parser.error("Unknown content format")
414 if options.payload:
415 if options.payload.startswith("@"):
416 filename = options.payload[1:]
417 if filename == "-":
418 f = sys.stdin.buffer
419 else:
420 f = open(filename, "rb")
421 try:
422 request.payload = f.read()
423 except OSError as e:
424 raise parser.error("File could not be opened: %s" % e)
425 else:
426 request_classification = contenttype.categorize(
427 request.opt.content_format.media_type
428 if request.opt.content_format is not None
429 and request.opt.content_format.is_known()
430 else ""
431 )
432 if request_classification in ("cbor", "cbor-seq"):
433 try:
434 import cbor_diag
435 except ImportError as e:
436 raise parser.error(f"CBOR recoding not available ({e})")
438 try:
439 encoded = cbor_diag.diag2cbor(options.payload)
440 except ValueError as e:
441 raise parser.error(
442 f"Parsing CBOR diagnostic notation failed. Make sure quotation marks are escaped from the shell. Error: {e}"
443 )
445 if request_classification == "cbor-seq":
446 try:
447 import cbor2
448 except ImportError as e:
449 raise parser.error(
450 f"CBOR sequence recoding not available ({e})"
451 )
452 decoded = cbor2.loads(encoded)
453 if not isinstance(decoded, list):
454 raise parser.error(
455 "CBOR sequence recoding requires an array as the top-level element."
456 )
457 request.payload = b"".join(cbor2.dumps(d) for d in decoded)
458 else:
459 request.payload = encoded
460 else:
461 request.payload = options.payload.encode("utf8")
463 if options.payload_initial_szx is not None:
464 request.remote.maximum_block_size_exp = options.payload_initial_szx
466 if options.proxy is None or options.proxy in ("none", "", "-"):
467 interface = context
468 else:
469 interface = aiocoap.proxy.client.ProxyForwarder(options.proxy, context)
471 requested_uri = request.get_request_uri()
473 log.info("Sending request:")
474 for line in message_to_text(request, "to"):
475 log.info(line)
477 requester = interface.request(request)
479 if options.observe:
480 requester.observation.register_errback(observation_is_over.set_result)
481 requester.observation.register_callback(
482 lambda data, options=options: incoming_observation(options, data)
483 )
485 try:
486 response_data = await requester.response
487 finally:
488 if not requester.response.done():
489 requester.response.cancel()
490 if options.observe and not requester.observation.cancelled:
491 requester.observation.cancel()
493 log.info("Received response:")
494 for line in message_to_text(response_data, "from"):
495 log.info(line)
497 response_uri = response_data.get_request_uri()
498 if requested_uri != response_uri:
499 print(
500 colored(
501 f"Response arrived from different address; base URI is {response_uri}",
502 options,
503 lambda token: token.Generic.Inserted,
504 ),
505 file=sys.stderr,
506 )
507 if response_data.code.is_successful():
508 present(response_data, options)
509 else:
510 print(
511 colored(response_data.code, options, lambda token: token.Generic.Error),
512 file=sys.stderr,
513 )
514 present(response_data, options, file=sys.stderr)
515 sys.exit(1)
517 if options.observe:
518 exit_reason = await observation_is_over
519 print("Observation is over: %r" % (exit_reason,), file=sys.stderr)
520 except aiocoap.error.HelpfulError as e:
521 print(str(e), file=sys.stderr)
522 extra_help = e.extra_help(
523 hints=dict(
524 original_uri=options.url,
525 request=request,
526 )
527 )
528 if extra_help:
529 print("Debugging hint:", extra_help, file=sys.stderr)
530 sys.exit(1)
531 # Fallback while not all backends raise NetworkErrors
532 except OSError as e:
533 text = str(e)
534 if not text:
535 text = repr(e)
536 if not text:
537 # eg ConnectionResetError flying out of a misconfigured SSL server
538 text = type(e)
539 print(
540 "Warning: OS errors should not be raised this way any more.",
541 file=sys.stderr,
542 )
543 # not telling what to do precisely: the form already tells users to
544 # include `aiocoap.cli.defaults` output, which is exactly what we
545 # need.
546 print(
547 f"Even if the cause of the error itself is clear, please file an issue at {aiocoap.meta.bugreport_uri}.",
548 file=sys.stderr,
549 )
550 print("Error:", text, file=sys.stderr)
551 sys.exit(1)
554async def single_request_with_context(args):
555 """Wrapper around single_request until sync_main gets made fully async, and
556 async context managers are used to manage contexts."""
557 context = await aiocoap.Context.create_client_context()
558 try:
559 await single_request(args, context)
560 finally:
561 await context.shutdown()
564interactive_expecting_keyboard_interrupt = None
567async def interactive(globalopts):
568 global interactive_expecting_keyboard_interrupt
569 interactive_expecting_keyboard_interrupt = asyncio.get_event_loop().create_future()
571 context = await aiocoap.Context.create_client_context()
573 while True:
574 try:
575 # when http://bugs.python.org/issue22412 is resolved, use that instead
576 line = await asyncio.get_event_loop().run_in_executor(
577 None, lambda: input("aiocoap> ")
578 )
579 except EOFError:
580 line = "exit"
581 line = shlex.split(line)
582 if not line:
583 continue
584 if line in (["help"], ["?"]):
585 line = ["--help"]
586 if line in (["quit"], ["q"], ["exit"]):
587 break
589 current_task = asyncio.create_task(
590 single_request(line, context=context, globalopts=globalopts),
591 name="Interactive prompt command %r" % line,
592 )
593 interactive_expecting_keyboard_interrupt = (
594 asyncio.get_event_loop().create_future()
595 )
597 done, pending = await asyncio.wait(
598 [current_task, interactive_expecting_keyboard_interrupt],
599 return_when=asyncio.FIRST_COMPLETED,
600 )
602 if current_task not in done:
603 current_task.cancel()
604 else:
605 try:
606 await current_task
607 except SystemExit as e:
608 if e.code != 0:
609 print("Exit code: %d" % e.code, file=sys.stderr)
610 continue
611 except Exception as e:
612 print("Unhandled exception raised: %s" % (e,))
614 await context.shutdown()
617def sync_main(args=None):
618 # interactive mode is a little messy, that's why this is not using aiocoap.util.cli yet
619 if args is None:
620 args = sys.argv[1:]
622 # This one is tolerant and doesn't even terminate with --help, so that
623 # --help and --interactive --help can do the right thing.
624 first_parser = build_parser(prescreen=True)
625 first_args = first_parser.parse_args(args)
627 configure_logging(
628 (first_args.verbose or 0) - (first_args.quiet or 0), first_args.color
629 )
631 if not first_args.interactive:
632 try:
633 asyncio.run(single_request_with_context(args))
634 except KeyboardInterrupt:
635 sys.exit(3)
636 else:
637 global_parser = build_parser(use_interactive=False)
638 globalopts = global_parser.parse_args(args)
640 loop = asyncio.get_event_loop()
641 task = loop.create_task(
642 interactive(globalopts),
643 name="Interactive prompt",
644 )
646 while not task.done():
647 try:
648 loop.run_until_complete(task)
649 except KeyboardInterrupt:
650 if not interactive_expecting_keyboard_interrupt.done():
651 interactive_expecting_keyboard_interrupt.set_result(None)
652 except SystemExit:
653 continue # asyncio/tasks.py(242) raises those after setting them as results, but we particularly want them back in the loop
656if __name__ == "__main__":
657 sync_main()