Coverage for contrib/oscore-plugtest/plugtest_common.py: 97%
39 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-12 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-12 11:18 +0000
1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
2#
3# SPDX-License-Identifier: MIT
5import shutil
6from pathlib import Path
8import cbor2 as cbor
10from aiocoap import oscore
12contextdir = Path(__file__).parent / "common-context"
15class LoggingFilesystemSecurityContext(oscore.FilesystemSecurityContext):
16 def _extract_external_aad(self, message, request_id, local_is_sender):
17 result = super()._extract_external_aad(message, request_id, local_is_sender)
18 print(
19 "Verify: External AAD: bytes.fromhex(%r), %r"
20 % (result.hex(), cbor.loads(result))
21 )
22 return result
25class NotifyingPlugtestSecurityContext(oscore.FilesystemSecurityContext):
26 def __init__(self, *args, **kwargs):
27 super().__init__(*args, **kwargs)
28 self.notification_hooks = []
30 def notify(self):
31 for x in self.notification_hooks:
32 x()
34 def post_seqnoincrease(self):
35 super().post_seqnoincrease()
36 self.notify()
38 def _replay_window_changed(self):
39 super()._replay_window_changed()
40 self.notify()
43class PlugtestFilesystemSecurityContext(
44 LoggingFilesystemSecurityContext, NotifyingPlugtestSecurityContext
45):
46 pass
49def get_security_context(
50 contextname, contextcopy: Path, simulate_unclean_shutdown=False
51):
52 """Copy the base context (disambiguated by contextname in "ab", "cd") onto
53 the path in contextcopy if it does not already exist, and load the
54 resulting context with the given role. The context will be monkey-patched
55 for debugging purposes.
57 With the simulate_unclean_shutdown aprameter set to True, any existing
58 replay window is removed from the loaded state."""
59 if not contextcopy.exists():
60 contextcopy.parent.mkdir(parents=True, exist_ok=True)
61 shutil.copytree((contextdir / contextname), contextcopy)
63 print("Context %s copied to %s" % (contextname, contextcopy))
65 secctx = PlugtestFilesystemSecurityContext(contextcopy)
67 if simulate_unclean_shutdown:
68 secctx.recipient_replay_window._index = None
69 secctx.recipient_replay_window._bitfield = None
71 return secctx
74def additional_verify(description, lhs, rhs):
75 if lhs == rhs:
76 print("Additional verify passed: %s" % description)
77 else:
78 print("Additional verify failed (%s != %s): %s" % (lhs, rhs, description))