Coverage for aiocoap/util/prettyprint.py: 78%
90 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-11-28 12:34 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5"""A pretty-printer for known mime types"""
7import json
8import re
10import pygments
11import pygments.lexers
12import pygments.formatters
14from aiocoap.util import linkformat, contenttype
16from aiocoap.util.linkformat_pygments import _register
18_register()
20MEDIATYPE_HEXDUMP = "text/vnd.aiocoap.hexdump"
23def lexer_for_mime(mime):
24 """A wrapper around pygments.lexers.get_lexer_for_mimetype that takes
25 subtypes into consideration and catches the custom hexdump mime type."""
27 if mime == MEDIATYPE_HEXDUMP:
28 return pygments.lexers.HexdumpLexer()
30 if mime == "text/plain;charset=utf8":
31 # We have fall-throughs in place anwyay, no need to go through a no-op
32 # TextLexer
33 raise pygments.util.ClassNotFound
35 try:
36 return pygments.lexers.get_lexer_for_mimetype(mime)
37 except pygments.util.ClassNotFound:
38 mime = re.sub(
39 "^([^/]+)/.*\\+([^;]+)(;.*)?$", lambda args: args[1] + "/" + args[2], mime
40 )
41 return pygments.lexers.get_lexer_for_mimetype(mime)
44def pretty_print(message):
45 """Given a CoAP message, reshape its payload into something human-readable.
46 The return value is a triple (infos, mime, text) where text represents the
47 payload, mime is a type that could be used to syntax-highlight the text
48 (not necessarily related to the original mime type, eg. a report of some
49 binary data that's shaped like Markdown could use a markdown mime type),
50 and some line of infos that give additional data (like the reason for a hex
51 dump or the original mime type).
53 >>> from aiocoap import Message
54 >>> def build(payload, request_cf, response_cf):
55 ... response = Message(payload=payload, content_format=response_cf)
56 ... request = Message(accept=request_cf)
57 ... response.request = request
58 ... return response
59 >>> pretty_print(Message(payload=b"Hello", content_format=0))
60 ([], 'text/plain;charset=utf8', 'Hello')
61 >>> print(pretty_print(Message(payload=b'{"hello":"world"}', content_format=50))[-1])
62 {
63 "hello": "world"
64 }
65 >>> # Erroneous inputs still go to the pretty printer as long as they're
66 >>> #Unicode
67 >>> pretty_print(Message(payload=b'{"hello":"world', content_format=50))
68 (['Invalid JSON not re-formated'], 'application/json', '{"hello":"world')
69 >>> pretty_print(Message(payload=b'<>,', content_format=40))
70 (['Invalid application/link-format content was not re-formatted'], 'application/link-format', '<>,')
71 >>> pretty_print(Message(payload=b'a', content_format=60)) # doctest: +ELLIPSIS
72 (['Showing hex dump of application/cbor payload: CBOR value is invalid'], 'text/vnd.aiocoap.hexdump', '00000000 61 ...
73 """
74 infos = []
75 info = infos.append
77 cf = message.opt.content_format or message.request.opt.accept
78 if cf is None:
79 content_type = "type unknown"
80 elif cf.is_known():
81 content_type = cf.media_type
82 if cf.encoding != "identity":
83 info(
84 "Content format is %s in %s encoding; treating as "
85 "application/octet-stream because decompression is not "
86 "supported yet" % (cf.media_type, cf.encoding)
87 )
88 else:
89 content_type = "type %d" % cf
90 category = contenttype.categorize(content_type)
92 show_hex = None
94 if linkformat is not None and category == "link-format":
95 try:
96 decoded = message.payload.decode("utf8")
97 try:
98 parsed = linkformat.link_header.parse(decoded)
99 except linkformat.link_header.ParseException:
100 info("Invalid application/link-format content was not re-formatted")
101 return (infos, "application/link-format", decoded)
102 else:
103 info("application/link-format content was re-formatted")
104 prettyprinted = ",\n".join(str(link) for link in parsed.links)
105 return (infos, "application/link-format", prettyprinted)
106 except ValueError:
107 # Handled later
108 pass
110 elif category in ("cbor", "cbor-seq"):
111 if category == "cbor-seq":
112 # Faking an indefinite length CBOR array is the easiest way to
113 # parse an array into a list-like data structure, especially as
114 # long as we don't indicate precise locations of invalid CBOR
115 # anyway
116 payload = b"\x9f" + message.payload + b"\xff"
117 else:
118 payload = message.payload
120 try:
121 import cbor_diag
123 formatted = cbor_diag.cbor2diag(payload)
125 if category == "cbor-seq":
126 info("CBOR sequence message shown as array in Diagnostic Notation")
127 else:
128 info("CBOR message shown in Diagnostic Notation")
130 # It's not exactly CDDL, but it's close enough that the syntax
131 # highlighting looks OK, and tolerant enough to not complain about
132 # missing leading barewords and "=" signs
133 return (infos, "text/x-cddl", formatted)
134 except ImportError:
135 show_hex = "No CBOR pretty-printer available"
136 except ValueError:
137 show_hex = "CBOR value is invalid"
139 elif category == "json":
140 try:
141 decoded = message.payload.decode("utf8")
142 except ValueError:
143 pass
144 else:
145 try:
146 parsed = json.loads(decoded)
147 except ValueError:
148 info("Invalid JSON not re-formated")
149 return (infos, "application/json", decoded)
150 else:
151 info("JSON re-formated and indented")
152 formatted = json.dumps(parsed, indent=4)
153 return (infos, "application/json", formatted)
155 # That's about the formats we do for now.
157 if show_hex is None:
158 try:
159 text = message.payload.decode("utf8")
160 except UnicodeDecodeError:
161 show_hex = "Message can not be parsed as UTF-8"
162 else:
163 return (infos, "text/plain;charset=utf8", text)
165 info(
166 "Showing hex dump of %s payload%s"
167 % (
168 content_type if cf is not None else "untyped",
169 ": " + show_hex if show_hex is not None else "",
170 )
171 )
172 data = message.payload
173 # Not the most efficient hex dumper, but we won't stream video over
174 # this anyway
175 formatted = []
176 offset = 0
177 while data:
178 line, data = data[:16], data[16:]
180 formatted.append(
181 "%08x " % offset
182 + " ".join("%02x" % line[i] if i < len(line) else " " for i in range(8))
183 + " "
184 + " ".join(
185 "%02x" % line[i] if i < len(line) else " " for i in range(8, 16)
186 )
187 + " |"
188 + "".join(chr(x) if 32 <= x < 127 else "." for x in line)
189 + "|\n"
190 )
192 offset += len(line)
193 if offset % 16 != 0:
194 formatted.append("%08x\n" % offset)
195 return (infos, MEDIATYPE_HEXDUMP, "".join(formatted))