Coverage for aiocoap/credentials.py: 79%

169 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-28 12:34 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""This module describes how security credentials are expressed in aiocoap, 

6how security protocols (TLS, DTLS, OSCOAP) can store and access their key 

7material, and for which URIs they are used. 

8 

9For consistency, mappings between accessible resources and their credentials 

10are always centered around URIs. This is slightly atypical, because a client 

11will typically use a particular set of credentials for all operations on one 

12server, while a server first loads all available credentials and then filters 

13out whether the client may actually access a resource per-path, but it works 

14with full URIs (or patterns thereof) just as well. That approach allows using 

15more similar structures both on the server and the client, and works smoothly 

16for virtual hosting, firewalling and clients accessing resources with varying 

17credentials. 

18 

19Still, client and server credentials are kept apart, lest a server open up (and 

20potentially reveal) to a PSK set it is only configured to use as a client. 

21While client credentials already have their place in 

22:attr:`aiocoap.protocol.Context.client_credentials`, server credentials are not 

23in use at a standardized location yet because there is only code in the OSCORE 

24plug tests that can use it so far. 

25 

26Library developer notes 

27~~~~~~~~~~~~~~~~~~~~~~~ 

28 

29This whole module currently relies on a mixture of introspection and manual 

30parsing of the JSON-ish tree. A preferred expression of the same would rely on 

31the credentials.cddl description and build an object tree from that, but the 

32author is unaware of any existing CDDL Python implementation. That might also 

33ease porting to platforms that don't support inspect like micropython does. 

34""" 

35 

36import re 

37import inspect 

38 

39from typing import Optional, List, Tuple 

40 

41 

42""" 

43server: { 

44 'coaps://mysite/*': { 'dtls-psk' (or other granularity): { 'psk': 'abcd' }}, 

45 'coap://mysite/*': { 'oscore': { 'basedir': 'my-basedir/' } }, 

46 'coap://myothersite/firmware': ':myotherkey', 

47 'coap://myothersite/reset': ':myotherkey', 

48 'coap://othersite*': { 'unprotected': true }, 

49 ':myotherkey': { 'oscore': { 'basedir': 'my-basedir/' } } 

50 } 

51 

52server can of course just say it doesn't want to have the Site handle it and 

53just say '*': { 'unprotected': true }, add some ':foo': {'dtls-psk': ...} 

54entries (so communication can be established in the first place) and let 

55individual resources decide whether they return 4.01 or something else. 

56 

57client can be the same with different implied role, or have something like 

58 

59client: { 

60 'coap://myothersite/*': ':myotherkey', 

61 ... 

62 } 

63 

64in future also 

65 

66server: { 

67 'coaps://mysite/*': { 'dtls-cert': {'key': '...pem', 'cert': '...crt'} } 

68 } 

69 

70client: { 

71 '*': { 'dtls-cert': { 'ca': '/etc/ssl/...' } } 

72} 

73 

74or more complex ones: 

75 

76server: { 

77 'coaps://myothersite/wellprotected': { 'all': [ ':mydtls', ':myotherkey' ]} 

78 'coaps://myothersite/*': { 'any': [ ':mydtls', ':myotherkey' ]} 

79} 

80""" 

81 

82 

83class CredentialsLoadError(ValueError): 

84 """Raised by functions that create a CredentialsMap or its parts from 

85 simple data structures""" 

86 

87 

88class CredentialsMissingError(RuntimeError): 

89 """Raised when no suiting credentials can be found for a message, or 

90 credentials are found but inapplicable to a transport's security 

91 mechanisms.""" 

92 

93 

94class CredentialReference: 

95 def __init__(self, target, map): 

96 if not target.startswith(":"): 

97 raise CredentialsLoadError( 

98 "Credential references must start with a colon (':')" 

99 ) 

100 self.target = target 

101 self.map = map 

102 

103 # FIXME either generalize this with getattr, or introduce a function to 

104 # resolve any indirect credentials to a particular instance. 

105 

106 def as_dtls_psk(self): 

107 return self.map[self.target].as_dtls_psk() 

108 

109 

110class _Listish(list): 

111 @classmethod 

112 def from_item(cls, v): 

113 if not isinstance(v, list): 

114 raise CredentialsLoadError("%s goes with a list" % cls.__name__) 

115 return cls(v) 

116 

117 

118class AnyOf(_Listish): 

119 pass 

120 

121 

122class AllOf(_Listish): 

123 pass 

124 

125 

126def _call_from_structureddata(constructor, name, init_data): 

127 if not isinstance(init_data, dict): 

128 raise CredentialsLoadError("%s goes with an object" % name) 

129 

130 init_data = {k.replace("-", "_"): v for (k, v) in init_data.items()} 

131 

132 sig = inspect.signature(constructor) 

133 

134 checked_items = {} 

135 

136 for k, v in init_data.items(): 

137 try: 

138 annotation = sig.parameters[k].annotation 

139 except KeyError: 

140 # let this raise later in binding 

141 checked_items[k] = object() 

142 annotation = "attribute does not exist" 

143 

144 if isinstance(v, dict) and "ascii" in v: 

145 if len(v) != 1: 

146 raise CredentialsLoadError("ASCII objects can only have one elemnt.") 

147 try: 

148 v = v["ascii"].encode("ascii") 

149 except UnicodeEncodeError: 

150 raise CredentialsLoadError( 

151 "Elements of the ASCII object can not be represented in ASCII, please use binary or hex representation." 

152 ) 

153 

154 if isinstance(v, dict) and "hex" in v: 

155 if len(v) != 1: 

156 raise CredentialsLoadError("Hex objects can only have one elemnt.") 

157 try: 

158 v = bytes.fromhex( 

159 v["hex"].replace("-", "").replace(" ", "").replace(":", "") 

160 ) 

161 except ValueError as e: 

162 raise CredentialsLoadError( 

163 "Hex object can not be read: %s" % (e.args[0]) 

164 ) 

165 

166 # Not using isinstance because I foundno way to extract the type 

167 # information from an Optional/Union again; this whole thing works 

168 # only for strings and ints anyway, so why not. 

169 # 

170 # The second or-branch is for functions from modules with __future__.annotations 

171 if annotation not in (type(v), Optional[type(v)]) and annotation not in ( 

172 type(v).__name__, 

173 "Optional[%s]" % type(v).__name__, 

174 ): 

175 # explicitly not excluding inspect._empty here: constructors 

176 # need to be fully annotated 

177 raise CredentialsLoadError( 

178 "Type mismatch in attribute %s of %s: expected %s, got %r" 

179 % (k, name, annotation, v) 

180 ) 

181 

182 checked_items[k] = v 

183 

184 try: 

185 bound = sig.bind(**checked_items) 

186 except TypeError as e: 

187 raise CredentialsLoadError("%s: %s" % (name, e.args[0])) 

188 

189 return constructor(*bound.args, **bound.kwargs) 

190 

191 

192class _Objectish: 

193 @classmethod 

194 def from_item(cls, init_data): 

195 return _call_from_structureddata(cls, cls.__name__, init_data) 

196 

197 

198class DTLS(_Objectish): 

199 def __init__(self, psk: bytes, client_identity: bytes): 

200 self.psk = psk 

201 self.client_identity = client_identity 

202 

203 def as_dtls_psk(self): 

204 return (self.client_identity, self.psk) 

205 

206 

207class TLSCert(_Objectish): 

208 """Indicates that a client can use the given certificate file to authenticate the server. 

209 

210 Can only be used with 'coaps+tcp://HOSTINFO/*' and 'coaps+tcp://*' forms. 

211 """ 

212 

213 def __init__(self, certfile: str): 

214 self.certfile = certfile 

215 

216 def as_ssl_params(self): 

217 """Generate parameters suitable for passing via ** to 

218 ssl.create_default_context when purpose is alreay set""" 

219 return {"cafile": self.certfile} 

220 

221 

222def import_filesystem_security_context(): 

223 from .oscore import FilesystemSecurityContext 

224 

225 return FilesystemSecurityContext 

226 

227 

228def import_edhoc_credential_pair(): 

229 from . import edhoc 

230 

231 return edhoc.EdhocCredentials 

232 

233 

234_re_cache = {} 

235 

236 

237class CredentialsMap(dict): 

238 """ 

239 FIXME: outdated, rewrite when usable 

240 

241 A CredentialsMap, for any URI template and operation, which 

242 security contexts are sufficient to to perform the operation on a matching 

243 URI. 

244 

245 The same context can be used both by the server and the client, where the 

246 client uses the information on allowed client credentials to decide which 

247 credentials to present, and the information on allowed server credentials 

248 to decide whether the server can be trusted. 

249 

250 Conversely, the server typically loads all available server credentials at 

251 startup, and then uses the client credentials list to decide whether to 

252 serve the request.""" 

253 

254 def load_from_dict(self, d): 

255 """Populate the map from a dictionary, which would typically have been 

256 loaded from a JSON/YAML file and needs to match the CDDL in 

257 credentials.cddl. 

258 

259 Running this multiple times will overwriter individual entries in the 

260 map.""" 

261 for k, v in d.items(): 

262 if v is None: 

263 if k in self: 

264 del self[k] 

265 else: 

266 self[k] = self._item_from_dict(v) 

267 # FIXME only works that way for OSCORE clients 

268 self[k].authenticated_claims = [k] 

269 

270 def _item_from_dict(self, v): 

271 if isinstance(v, str): 

272 return CredentialReference(v, self) 

273 elif isinstance(v, dict): 

274 try: 

275 ((key, value),) = v.items() 

276 except ValueError: 

277 # this follows how Rust Enums are encoded in serde JSON 

278 raise CredentialsLoadError( 

279 "Items in a credentials map must have exactly one key" 

280 " (found %s)" % (",".join(v.keys()) or "empty") 

281 ) 

282 

283 try: 

284 type_ = self._class_map[key] 

285 except KeyError: 

286 raise CredentialsLoadError("Unknown credential type: %s" % key) 

287 

288 return type_().from_item(value) 

289 

290 # Phrased as callbacks so they can import lazily. We make sure that all are 

291 # still present so that an entry that is not loadable raises an error 

292 # rather than possibly being ignored. 

293 _class_map = { 

294 "dtls": lambda: DTLS, 

295 "oscore": import_filesystem_security_context, 

296 "tlscert": lambda: TLSCert, 

297 "any-of": lambda: AnyOf, 

298 "all-of": lambda: AllOf, 

299 "edhoc-oscore": import_edhoc_credential_pair, 

300 } 

301 

302 @staticmethod 

303 def _wildcard_match(searchterm, pattern): 

304 if pattern not in _re_cache: 

305 _re_cache[pattern] = re.compile(re.escape(pattern).replace("\\*", ".*")) 

306 return _re_cache[pattern].fullmatch(searchterm) is not None 

307 

308 # used by a client 

309 

310 def credentials_from_request(self, msg): 

311 """Return the most specific match to a request message. Matching is 

312 currently based on wildcards, but not yet very well thought out.""" 

313 

314 uri = msg.get_request_uri() 

315 

316 for i in range(1000): 

317 for k, v in sorted(self.items(), key=lambda x: len(x[0]), reverse=True): 

318 if self._wildcard_match(uri, k): 

319 if isinstance(v, str): 

320 uri = v 

321 continue 

322 return v 

323 else: 

324 raise CredentialsMissingError("No suitable credentials for %s" % uri) 

325 else: 

326 raise CredentialsLoadError( 

327 "Search for suitable credentials for %s exceeds recursion limit" 

328 ) 

329 

330 def ssl_client_context(self, scheme, hostinfo): 

331 """Return an SSL client context as configured for the given request 

332 scheme and hostinfo (no full message is to be processed here, as 

333 connections are used across requests to the same origin). 

334 

335 If no credentials are configured, this returns None (for which the user 

336 may need to fill in ssl.create_default_context() if None is not already 

337 a good indicator for the eventual consumer to use the default).""" 

338 

339 ssl_params = {} 

340 tlscert = self.get("%s://%s/*" % (scheme, hostinfo), None) 

341 # FIXME: handle Any or All if they include TLSCert, or deprecate them 

342 if not isinstance(tlscert, TLSCert): 

343 return 

344 if tlscert is None: 

345 tlscert = self.get("%s://*" % scheme, None) 

346 if tlscert is not None: 

347 ssl_params = tlscert.as_ssl_params() 

348 if ssl_params: 

349 import ssl 

350 

351 return ssl.create_default_context(**ssl_params) 

352 

353 # used by a server 

354 

355 def find_oscore(self, unprotected): 

356 # FIXME: this is not constant-time as it should be, but too much in 

357 # flux to warrant optimization 

358 

359 # FIXME: duplicate contexts for being tried out are not supported yet. 

360 

361 for item in self.values(): 

362 if not hasattr(item, "get_oscore_context_for"): 

363 continue 

364 

365 ctx = item.get_oscore_context_for(unprotected) 

366 if ctx is not None: 

367 return ctx 

368 

369 raise KeyError() 

370 

371 def find_all_used_contextless_oscore_kid(self) -> set[bytes]: 

372 all_kid = set() 

373 

374 for item in self.values(): 

375 if not hasattr(item, "find_all_used_contextless_oscore_kid"): 

376 continue 

377 

378 all_kid |= item.find_all_used_contextless_oscore_kid() 

379 

380 return all_kid 

381 

382 def find_edhoc_by_id_cred_peer(self, id_cred_peer) -> Tuple[bytes, List[str]]: 

383 for label, item in self.items(): 

384 if not hasattr(item, "find_edhoc_by_id_cred_peer"): 

385 continue 

386 

387 # typically returning self 

388 credential = item.find_edhoc_by_id_cred_peer(id_cred_peer) 

389 if credential is not None: 

390 return (credential, [label]) 

391 

392 from . import edhoc 

393 import cbor2 

394 

395 for label, item in self.items(): 

396 if ( 

397 isinstance(item, edhoc.EdhocCredentials) 

398 and item.peer_cred_is_unauthenticated() 

399 ): 

400 id_cred_peer = cbor2.loads(id_cred_peer) 

401 if isinstance(id_cred_peer, dict) and 14 in id_cred_peer: 

402 return (cbor2.dumps(id_cred_peer[14], canonical=True), [label]) 

403 

404 raise KeyError 

405 

406 def find_dtls_psk(self, identity): 

407 # FIXME similar to find_oscore 

408 for entry, item in self.items(): 

409 if not hasattr(item, "as_dtls_psk"): 

410 continue 

411 

412 psk_id, psk = item.as_dtls_psk() 

413 if psk_id != identity: 

414 continue 

415 

416 # FIXME is returning the entry name a sane value to later put in to 

417 # authenticated_claims? OSCORE does something different. 

418 return (psk, entry) 

419 

420 raise KeyError()