Coverage for contrib/oscore-plugtest/plugtest-server: 82%

160 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-12 11:18 +0000

1#!/usr/bin/env python3 

2# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

3# 

4# SPDX-License-Identifier: MIT 

5 

6"""A server suitable for running the OSCORE plug test series against it 

7 

8See https://github.com/EricssonResearch/OSCOAP for the test suite 

9description.""" 

10 

11import sys 

12import asyncio 

13import logging 

14import argparse 

15from pathlib import Path 

16 

17import aiocoap 

18import aiocoap.oscore as oscore 

19from aiocoap.oscore_sitewrapper import OscoreSiteWrapper 

20import aiocoap.error as error 

21from aiocoap.util.cli import AsyncCLIDaemon 

22import aiocoap.resource as resource 

23from aiocoap.credentials import CredentialsMap 

24from aiocoap.cli.common import add_server_arguments, server_context_from_arguments 

25 

26from aiocoap.transports.oscore import OSCOREAddress 

27 

28# In some nested combinations of unittest and coverage, the usually 

29# provided-by-default inclusion of local files does not work. Ensuring the 

30# local plugtest_common *can* be included. 

31import os.path 

32sys.path.append(os.path.dirname(__file__)) 

33from plugtest_common import * 

34 

35class PleaseUseOscore(error.ConstructionRenderableError): 

36 code = aiocoap.UNAUTHORIZED 

37 message = "This is an OSCORE plugtest, please use option %d"%aiocoap.numbers.optionnumbers.OptionNumber.OSCORE 

38 

39def additional_verify_request_options(reference, request): 

40 if request.opt.echo is not None: 

41 # Silently accepting Echo as that's an artefact of B.1.2 recovery 

42 reference.opt.echo = request.opt.echo 

43 additional_verify("Request options as expected", reference.opt, request.opt) 

44 

45class PlugtestResource(resource.Resource): 

46 options = {} 

47 expected_options = {} 

48 

49 async def render_get(self, request): 

50 reference = aiocoap.Message(**self.expected_options) 

51 if request.opt.observe is not None and 'observe' not in self.expected_options: 

52 # workaround for test 4 hitting on Hello1 

53 reference.opt.observe = request.opt.observe 

54 additional_verify_request_options(reference, request) 

55 

56 return aiocoap.Message(payload=self.message.encode('ascii'), **self.options) 

57 

58class Hello(PlugtestResource): 

59 options = {'content_format': 0} 

60 

61 expected_options = {} # Uri-Path is stripped by the site 

62 

63 message = "Hello World!" 

64 

65Hello1 = Hello # same, just registered with the site for protected access 

66 

67class Hello2(Hello): 

68 expected_options = {'uri_query': ['first=1']} 

69 

70 options = {'etag': b"\x2b", **Hello1.options} 

71 

72class Hello3(Hello): 

73 expected_options = {'accept': 0} 

74 

75 options = {'max_age': 5, **Hello1.options} 

76 

77class Observe(PlugtestResource, aiocoap.interfaces.ObservableResource): 

78 expected_options = {'observe': 0} 

79 options = {'content_format': 0} 

80 

81 message = "one" 

82 

83 async def add_observation(self, request, serverobservation): 

84 async def keep_entertained(): 

85 await asyncio.sleep(2) 

86 serverobservation.trigger(aiocoap.Message( 

87 mtype=aiocoap.CON, code=aiocoap.CONTENT, 

88 payload=b"two", content_format=0, 

89 )) 

90 await asyncio.sleep(2) 

91 serverobservation.trigger(aiocoap.Message( 

92 mtype=aiocoap.CON, code=aiocoap.INTERNAL_SERVER_ERROR, 

93 payload=b"Terminate Observe", content_format=0, 

94 )) 

95 t = asyncio.create_task(keep_entertained()) 

96 serverobservation.accept(t.cancel) 

97 

98class Hello6(resource.Resource): 

99 async def render_post(self, request): 

100 additional_verify_request_options(aiocoap.Message(content_format=0), request) 

101 additional_verify("Request payload as expected", request.payload, b"\x4a") 

102 

103 return aiocoap.Message(code=aiocoap.CHANGED, payload=b"\x4a", content_format=0) 

104 

105class Hello7(resource.Resource): 

106 async def render_put(self, request): 

107 if request.opt.if_none_match: 

108 print("This looks like test 10b") 

109 additional_verify_request_options(aiocoap.Message(content_format=0, if_none_match=True), request) 

110 additional_verify("Request payload as expected", request.payload, b"\x8a") 

111 

112 return aiocoap.Message(code=aiocoap.PRECONDITION_FAILED) 

113 else: 

114 print("This looks like test 9b") 

115 additional_verify_request_options(aiocoap.Message(content_format=0, if_match=[b"{"]), request) 

116 additional_verify("Request payload as expected", request.payload, b"z") 

117 

118 return aiocoap.Message(code=aiocoap.CHANGED) 

119 

120class DeleteResource(resource.Resource): 

121 async def render_delete(self, request): 

122 additional_verify_request_options(aiocoap.Message(), request) 

123 return aiocoap.Message(code=aiocoap.DELETED) 

124 

125class BlockResource(PlugtestResource): 

126 expected_options = {} 

127 options = {'content_format': 0} 

128 

129 message = "This is a large resource\n" + "0123456789" * 101 

130 

131class InnerBlockMixin: 

132 # this might become general enough that it could replace response blockwise 

133 # handler some day -- right now, i'm only doing the absolute minimum 

134 # necessary to satisfy the use case 

135 

136 inner_default_szx = aiocoap.MAX_REGULAR_BLOCK_SIZE_EXP 

137 

138 async def render(self, request): 

139 response = await super().render(request) 

140 

141 if request.opt.block2 is None: 

142 szx = self.inner_default_szx 

143 blockno = 0 

144 else: 

145 szx = request.opt.block2.size_exponent 

146 blockno = request.opt.block2.block_number 

147 

148 return response._extract_block(blockno, szx) 

149 

150class InnerBlockResource(InnerBlockMixin, BlockResource): 

151 pass 

152 

153class SeqnoManager(resource.ObservableResource): 

154 def __init__(self, contextmap): 

155 super().__init__() 

156 self.contextmap = contextmap 

157 

158 for c in self.contextmap.values(): 

159 c.notification_hooks.append(self.updated_state) 

160 

161 async def render_get(self, request): 

162 text = "" 

163 for name in ('b', 'd'): 

164 the_context = self.contextmap[':' + name] 

165 

166 # this direct access is technically outside the interface for a 

167 # SecurityContext, but then again, there isn't one yet 

168 text += """In context %s, next seqno is %d (persisted up to %d)\n""" % (name.upper(), the_context.sender_sequence_number, the_context.sequence_number_persisted) 

169 if the_context.recipient_replay_window.is_initialized(): 

170 index = the_context.recipient_replay_window._index 

171 bitfield = the_context.recipient_replay_window._bitfield 

172 # Useless for the internal representation, but much more readable 

173 while bitfield & 1: 

174 bitfield >>= 1 

175 index += 1 

176 print(index, bitfield) 

177 bitfield_values = [i + index for (i, v) in enumerate(bin(bitfield)[2:][::-1]) if v == '1'] 

178 text += """I've seen all sequence numbers lower than %d%s.""" % ( 

179 index, 

180 ", and also %s" % bitfield_values if bitfield else "" 

181 ) 

182 else: 

183 text += "The replay window is uninitialized" 

184 text += "\n" 

185 return aiocoap.Message(payload=text.encode('utf-8'), content_format=0) 

186 

187 async def render_put(self, request): 

188 try: 

189 number = int(request.payload.decode('utf8')) 

190 except (ValueError, UnicodeDecodeError): 

191 raise aiocoap.error.BadRequest("Only numeric values are accepted.") 

192 

193 raise NotImplementedError 

194 

195class PlugtestSite(resource.Site): 

196 def __init__(self, server_credentials): 

197 super().__init__() 

198 

199 self.add_resource(['.well-known', 'core'], resource.WKCResource(self.get_resources_as_linkheader)) 

200 self.add_resource(['oscore', 'hello', 'coap'], Hello()) 

201 

202 self.add_resource(['oscore', 'hello', '1'], Hello1()) 

203 self.add_resource(['oscore', 'hello', '2'], Hello2()) 

204 self.add_resource(['oscore', 'hello', '3'], Hello3()) 

205 self.add_resource(['oscore', 'hello', '6'], Hello6()) 

206 self.add_resource(['oscore', 'hello', '7'], Hello7()) 

207 self.add_resource(['oscore', 'observe1'], Observe()) 

208 self.add_resource(['oscore', 'observe2'], Observe()) 

209 self.add_resource(['oscore', 'test'], DeleteResource()) 

210 

211 self.add_resource(['oscore', 'block', 'outer'], BlockResource()) 

212 self.add_resource(['oscore', 'block', 'inner'], InnerBlockResource()) 

213 

214 self.add_resource(['sequence-numbers'], SeqnoManager(server_credentials)) 

215 

216class PlugtestServerProgram(AsyncCLIDaemon): 

217 async def start(self): 

218 p = argparse.ArgumentParser(description="Server for the OSCORE plug test. Requires a test number to be present.") 

219 p.add_argument("contextdir", help="Directory name where to persist sequence numbers", type=Path) 

220 p.add_argument('--verbose', help="Increase log level", action='store_true') 

221 p.add_argument('--state-was-lost', help="Lose memory of the replay window, forcing B.1.2 recovery", action='store_true') 

222 add_server_arguments(p) 

223 opts = p.parse_args() 

224 

225 if opts.verbose: 

226 logging.basicConfig(level=logging.DEBUG) 

227 else: 

228 logging.basicConfig(level=logging.WARNING) 

229 

230 

231 server_credentials = CredentialsMap() 

232 server_credentials[':b'] = get_security_context('b', opts.contextdir / "b", opts.state_was_lost) 

233 server_credentials[':d'] = get_security_context('d', opts.contextdir / "d", opts.state_was_lost) 

234 

235 site = PlugtestSite(server_credentials) 

236 site = OscoreSiteWrapper(site, server_credentials) 

237 

238 self.context = await server_context_from_arguments(site, opts) 

239 

240 print("Plugtest server ready.") 

241 sys.stdout.flush() # the unit tests might wait abundantly long for this otherwise 

242 

243 async def shutdown(self): 

244 await self.context.shutdown() 

245 

246if __name__ == "__main__": 

247 PlugtestServerProgram.sync_main()