Coverage for aiocoap/util/prettyprint.py: 78%

90 statements  

« prev     ^ index     » next       coverage.py v7.6.3, created at 2024-10-15 22:10 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""A pretty-printer for known mime types""" 

6 

7import json 

8import re 

9 

10import pygments 

11import pygments.lexers 

12import pygments.formatters 

13 

14from aiocoap.util import linkformat, contenttype 

15 

16from aiocoap.util.linkformat_pygments import _register 

17 

18_register() 

19 

20MEDIATYPE_HEXDUMP = "text/vnd.aiocoap.hexdump" 

21 

22 

23def lexer_for_mime(mime): 

24 """A wrapper around pygments.lexers.get_lexer_for_mimetype that takes 

25 subtypes into consideration and catches the custom hexdump mime type.""" 

26 

27 if mime == MEDIATYPE_HEXDUMP: 

28 return pygments.lexers.HexdumpLexer() 

29 

30 if mime == "text/plain;charset=utf8": 

31 # We have fall-throughs in place anwyay, no need to go through a no-op 

32 # TextLexer 

33 raise pygments.util.ClassNotFound 

34 

35 try: 

36 return pygments.lexers.get_lexer_for_mimetype(mime) 

37 except pygments.util.ClassNotFound: 

38 mime = re.sub( 

39 "^([^/]+)/.*\\+([^;]+)(;.*)?$", lambda args: args[1] + "/" + args[2], mime 

40 ) 

41 return pygments.lexers.get_lexer_for_mimetype(mime) 

42 

43 

44def pretty_print(message): 

45 """Given a CoAP message, reshape its payload into something human-readable. 

46 The return value is a triple (infos, mime, text) where text represents the 

47 payload, mime is a type that could be used to syntax-highlight the text 

48 (not necessarily related to the original mime type, eg. a report of some 

49 binary data that's shaped like Markdown could use a markdown mime type), 

50 and some line of infos that give additional data (like the reason for a hex 

51 dump or the original mime type). 

52 

53 >>> from aiocoap import Message 

54 >>> def build(payload, request_cf, response_cf): 

55 ... response = Message(payload=payload, content_format=response_cf) 

56 ... request = Message(accept=request_cf) 

57 ... response.request = request 

58 ... return response 

59 >>> pretty_print(Message(payload=b"Hello", content_format=0)) 

60 ([], 'text/plain;charset=utf8', 'Hello') 

61 >>> print(pretty_print(Message(payload=b'{"hello":"world"}', content_format=50))[-1]) 

62 { 

63 "hello": "world" 

64 } 

65 >>> # Erroneous inputs still go to the pretty printer as long as they're 

66 >>> #Unicode 

67 >>> pretty_print(Message(payload=b'{"hello":"world', content_format=50)) 

68 (['Invalid JSON not re-formated'], 'application/json', '{"hello":"world') 

69 >>> pretty_print(Message(payload=b'<>,', content_format=40)) 

70 (['Invalid application/link-format content was not re-formatted'], 'application/link-format', '<>,') 

71 >>> pretty_print(Message(payload=b'a', content_format=60)) # doctest: +ELLIPSIS 

72 (['Showing hex dump of application/cbor payload: CBOR value is invalid'], 'text/vnd.aiocoap.hexdump', '00000000 61 ... 

73 """ 

74 infos = [] 

75 info = infos.append 

76 

77 cf = message.opt.content_format or message.request.opt.accept 

78 if cf is None: 

79 content_type = "type unknown" 

80 elif cf.is_known(): 

81 content_type = cf.media_type 

82 if cf.encoding != "identity": 

83 info( 

84 "Content format is %s in %s encoding; treating as " 

85 "application/octet-stream because decompression is not " 

86 "supported yet" % (cf.media_type, cf.encoding) 

87 ) 

88 else: 

89 content_type = "type %d" % cf 

90 category = contenttype.categorize(content_type) 

91 

92 show_hex = None 

93 

94 if linkformat is not None and category == "link-format": 

95 try: 

96 decoded = message.payload.decode("utf8") 

97 try: 

98 parsed = linkformat.link_header.parse(decoded) 

99 except linkformat.link_header.ParseException: 

100 info("Invalid application/link-format content was not re-formatted") 

101 return (infos, "application/link-format", decoded) 

102 else: 

103 info("application/link-format content was re-formatted") 

104 prettyprinted = ",\n".join(str(link) for link in parsed.links) 

105 return (infos, "application/link-format", prettyprinted) 

106 except ValueError: 

107 # Handled later 

108 pass 

109 

110 elif category in ("cbor", "cbor-seq"): 

111 if category == "cbor-seq": 

112 # Faking an indefinite length CBOR array is the easiest way to 

113 # parse an array into a list-like data structure, especially as 

114 # long as we don't indicate precise locations of invalid CBOR 

115 # anyway 

116 payload = b"\x9f" + message.payload + b"\xff" 

117 else: 

118 payload = message.payload 

119 

120 try: 

121 import cbor_diag 

122 

123 formatted = cbor_diag.cbor2diag(payload) 

124 

125 if category == "cbor-seq": 

126 info("CBOR sequence message shown as array in Diagnostic Notation") 

127 else: 

128 info("CBOR message shown in Diagnostic Notation") 

129 

130 # It's not exactly CDDL, but it's close enough that the syntax 

131 # highlighting looks OK, and tolerant enough to not complain about 

132 # missing leading barewords and "=" signs 

133 return (infos, "text/x-cddl", formatted) 

134 except ImportError: 

135 show_hex = "No CBOR pretty-printer available" 

136 except ValueError: 

137 show_hex = "CBOR value is invalid" 

138 

139 elif category == "json": 

140 try: 

141 decoded = message.payload.decode("utf8") 

142 except ValueError: 

143 pass 

144 else: 

145 try: 

146 parsed = json.loads(decoded) 

147 except ValueError: 

148 info("Invalid JSON not re-formated") 

149 return (infos, "application/json", decoded) 

150 else: 

151 info("JSON re-formated and indented") 

152 formatted = json.dumps(parsed, indent=4) 

153 return (infos, "application/json", formatted) 

154 

155 # That's about the formats we do for now. 

156 

157 if show_hex is None: 

158 try: 

159 text = message.payload.decode("utf8") 

160 except UnicodeDecodeError: 

161 show_hex = "Message can not be parsed as UTF-8" 

162 else: 

163 return (infos, "text/plain;charset=utf8", text) 

164 

165 info( 

166 "Showing hex dump of %s payload%s" 

167 % ( 

168 content_type if cf is not None else "untyped", 

169 ": " + show_hex if show_hex is not None else "", 

170 ) 

171 ) 

172 data = message.payload 

173 # Not the most efficient hex dumper, but we won't stream video over 

174 # this anyway 

175 formatted = [] 

176 offset = 0 

177 while data: 

178 line, data = data[:16], data[16:] 

179 

180 formatted.append( 

181 "%08x " % offset 

182 + " ".join("%02x" % line[i] if i < len(line) else " " for i in range(8)) 

183 + " " 

184 + " ".join( 

185 "%02x" % line[i] if i < len(line) else " " for i in range(8, 16) 

186 ) 

187 + " |" 

188 + "".join(chr(x) if 32 <= x < 127 else "." for x in line) 

189 + "|\n" 

190 ) 

191 

192 offset += len(line) 

193 if offset % 16 != 0: 

194 formatted.append("%08x\n" % offset) 

195 return (infos, MEDIATYPE_HEXDUMP, "".join(formatted))