Coverage for contrib/oscore-plugtest/plugtest_common.py: 97%

39 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-12 11:18 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5import shutil 

6from pathlib import Path 

7 

8import cbor2 as cbor 

9 

10from aiocoap import oscore 

11 

12contextdir = Path(__file__).parent / "common-context" 

13 

14 

15class LoggingFilesystemSecurityContext(oscore.FilesystemSecurityContext): 

16 def _extract_external_aad(self, message, request_id, local_is_sender): 

17 result = super()._extract_external_aad(message, request_id, local_is_sender) 

18 print( 

19 "Verify: External AAD: bytes.fromhex(%r), %r" 

20 % (result.hex(), cbor.loads(result)) 

21 ) 

22 return result 

23 

24 

25class NotifyingPlugtestSecurityContext(oscore.FilesystemSecurityContext): 

26 def __init__(self, *args, **kwargs): 

27 super().__init__(*args, **kwargs) 

28 self.notification_hooks = [] 

29 

30 def notify(self): 

31 for x in self.notification_hooks: 

32 x() 

33 

34 def post_seqnoincrease(self): 

35 super().post_seqnoincrease() 

36 self.notify() 

37 

38 def _replay_window_changed(self): 

39 super()._replay_window_changed() 

40 self.notify() 

41 

42 

43class PlugtestFilesystemSecurityContext( 

44 LoggingFilesystemSecurityContext, NotifyingPlugtestSecurityContext 

45): 

46 pass 

47 

48 

49def get_security_context( 

50 contextname, contextcopy: Path, simulate_unclean_shutdown=False 

51): 

52 """Copy the base context (disambiguated by contextname in "ab", "cd") onto 

53 the path in contextcopy if it does not already exist, and load the 

54 resulting context with the given role. The context will be monkey-patched 

55 for debugging purposes. 

56 

57 With the simulate_unclean_shutdown aprameter set to True, any existing 

58 replay window is removed from the loaded state.""" 

59 if not contextcopy.exists(): 

60 contextcopy.parent.mkdir(parents=True, exist_ok=True) 

61 shutil.copytree((contextdir / contextname), contextcopy) 

62 

63 print("Context %s copied to %s" % (contextname, contextcopy)) 

64 

65 secctx = PlugtestFilesystemSecurityContext(contextcopy) 

66 

67 if simulate_unclean_shutdown: 

68 secctx.recipient_replay_window._index = None 

69 secctx.recipient_replay_window._bitfield = None 

70 

71 return secctx 

72 

73 

74def additional_verify(description, lhs, rhs): 

75 if lhs == rhs: 

76 print("Additional verify passed: %s" % description) 

77 else: 

78 print("Additional verify failed (%s != %s): %s" % (lhs, rhs, description))