Coverage for contrib/oscore-plugtest/plugtest-client: 80%

265 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-12 11:18 +0000

1#!/usr/bin/env python3 

2# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

3# 

4# SPDX-License-Identifier: MIT 

5 

6"""A client suitable for running the OSCORE plug test series against a given 

7server 

8 

9See https://github.com/EricssonResearch/OSCOAP for the test suite 

10description.""" 

11 

12import argparse 

13import asyncio 

14import logging 

15import signal 

16import functools 

17from pathlib import Path 

18import sys 

19 

20from aiocoap import * 

21from aiocoap import error 

22from aiocoap import interfaces 

23from aiocoap import credentials 

24 

25# In some nested combinations of unittest and coverage, the usually 

26# provided-by-default inclusion of local files does not work. Ensuring the 

27# local plugtest_common *can* be included. 

28import os.path 

29sys.path.append(os.path.dirname(__file__)) 

30from plugtest_common import * 

31 

32 

33class PlugtestClientProgram: 

34 async def run(self): 

35 p = argparse.ArgumentParser(description="Client for the OSCORE plug test.") 

36 p.add_argument("host", help="Hostname of the server") 

37 p.add_argument("contextdir", help="Directory name where to persist sequence numbers", type=Path) 

38 p.add_argument("testno", nargs="?", type=int, help="Test number to run (integer part); leave out for interactive mode") 

39 p.add_argument("--verbose", help="Show aiocoap debug messages", action='store_true') 

40 opts = p.parse_args() 

41 

42 self.host = opts.host 

43 

44 # this also needs to be called explicitly as only the 

45 # 'logging.warning()'-style functions will call it; creating a 

46 # sub-logger and logging from there makes the whole logging system not 

47 # emit the 'WARNING' prefixes that set apart log messages from regular 

48 # prints and also help the test suite catch warnings and errors 

49 if opts.verbose: 

50 logging.basicConfig(level=logging.DEBUG) 

51 else: 

52 logging.basicConfig(level=logging.WARNING) 

53 

54 security_context_a = get_security_context('a', opts.contextdir / "a") 

55 security_context_c = get_security_context('c', opts.contextdir / "c") 

56 

57 self.ctx = await Context.create_client_context() 

58 self.ctx.client_credentials[":ab"] = security_context_a 

59 self.ctx.client_credentials[":cd"] = security_context_c 

60 

61 if opts.testno is not None: 

62 await self.run_test(opts.testno) 

63 else: 

64 next_testno = 0 

65 delta = 1 

66 

67 while True: 

68 # Yes this is blocking, but since the tests usually terminate 

69 # by themselves ... *shrug* 

70 try: 

71 next = input("Enter a test number (empty input runs %s, q to quit): " % next_testno) 

72 except (KeyboardInterrupt, EOFError): 

73 keeprunning = False 

74 break 

75 if next == "q": 

76 keeprunning = False 

77 break 

78 if next: 

79 try: 

80 as_int = int(next) 

81 except ValueError: 

82 print("That's not a number.") 

83 continue 

84 else: 

85 if as_int - next_testno + delta in (0, 1): 

86 # Don't jump around randomly if user jumped around, 

87 # but otherwise do something sane 

88 delta = as_int - next_testno + delta 

89 next_testno = as_int 

90 

91 print("Running test %s" % next_testno) 

92 try: 

93 await self.run_test(next_testno) 

94 except error.Error as e: 

95 print("Test failed with an exception:", e) 

96 print() 

97 next_testno += delta 

98 

99 ctx = None 

100 

101 async def run_with_shutdown(self): 

102 # Having SIGTERM cause a more graceful shutdown (even if it's not 

103 # asynchronously awaiting the shutdown, which would be impractical 

104 # since we're likely inside some unintended timeout already) allow for 

105 # output buffers to be flushed when the unit test program instrumenting 

106 # it terminates it. 

107 loop = asyncio.get_running_loop() 

108 loop.add_signal_handler(signal.SIGTERM, loop.close) 

109 

110 try: 

111 await self.run() 

112 finally: 

113 if self.ctx is not None: 

114 await self.ctx.shutdown() 

115 

116 def use_context(self, which): 

117 if which is None: 

118 self.ctx.client_credentials.pop("coap://%s/*" % self.host, None) 

119 else: 

120 self.ctx.client_credentials["coap://%s/*" % self.host] = ":" + which 

121 

122 async def run_test(self, testno): 

123 self.testno = testno 

124 testfun = self.__methods[testno] 

125 try: 

126 await getattr(self, testfun)() 

127 except oscore.NotAProtectedMessage as e: 

128 print("Response carried no Object-Security option, but was: %s %s"%(e.plain_message, e.plain_message.payload)) 

129 raise 

130 

131 __methods = {} 

132 def __implements_tests(numbers, __methods=__methods): 

133 def registerer(method): 

134 for n in numbers: 

135 __methods[n] = method.__name__ 

136 return method 

137 return registerer 

138 

139 @__implements_tests([0]) 

140 async def test_plain(self): 

141 request = Message(code=GET, uri='coap://' + self.host + '/oscore/hello/coap') 

142 

143 self.use_context(None) 

144 

145 response = await self.ctx.request(request).response 

146 

147 print("Response:", response) 

148 additional_verify("Responde had correct code", response.code, CONTENT) 

149 additional_verify("Responde had correct payload", response.payload, b"Hello World!") 

150 additional_verify("Options as expected", response.opt, Message(content_format=0).opt) 

151 

152 @__implements_tests([1, 2]) 

153 async def test_hello12(self): 

154 if self.testno == 1: 

155 self.use_context("ab") 

156 else: 

157 self.use_context("cd") 

158 

159 request = Message(code=GET, uri='coap://' + self.host+ '/oscore/hello/1') 

160 expected = {'content_format': 0} 

161 unprotected_response = await self.ctx.request(request).response 

162 

163 print("Unprotected response:", unprotected_response) 

164 additional_verify("Code as expected", unprotected_response.code, CONTENT) 

165 additional_verify("Options as expected", unprotected_response.opt, Message(**expected).opt) 

166 additional_verify("Payload as expected", unprotected_response.payload, b"Hello World!") 

167 

168 @__implements_tests([3]) 

169 async def test_hellotest3(self): 

170 self.use_context("ab") 

171 

172 request = Message(code=GET, uri='coap://' + self.host+ '/oscore/hello/2?first=1') 

173 expected = {'content_format': 0, 'etag': b'\x2b'} 

174 unprotected_response = await self.ctx.request(request).response 

175 

176 print("Unprotected response:", unprotected_response) 

177 additional_verify("Code as expected", unprotected_response.code, CONTENT) 

178 additional_verify("Options as expected", unprotected_response.opt, Message(**expected).opt) 

179 additional_verify("Payload as expected", unprotected_response.payload, b"Hello World!") 

180 

181 @__implements_tests([4]) 

182 async def test_hellotest4(self): 

183 self.use_context("ab") 

184 

185 request = Message(code=GET, uri='coap://' + self.host+ '/oscore/hello/3', accept=0) 

186 expected = {'content_format': 0, 'max_age': 5} 

187 unprotected_response = await self.ctx.request(request).response 

188 

189 print("Unprotected response:", unprotected_response) 

190 additional_verify("Code as expected", unprotected_response.code, CONTENT) 

191 additional_verify("Options as expected", unprotected_response.opt, Message(**expected).opt) 

192 additional_verify("Payload as expected", unprotected_response.payload, b"Hello World!") 

193 

194 @__implements_tests([5]) 

195 async def test_nonobservable(self): 

196 self.use_context("ab") 

197 

198 request = Message(code=GET, uri='coap://' + self.host + '/oscore/hello/1', observe=0) 

199 

200 request = self.ctx.request(request) 

201 

202 unprotected_response = await request.response 

203 

204 print("Unprotected response:", unprotected_response) 

205 additional_verify("Code as expected", unprotected_response.code, CONTENT) 

206 additional_verify("Observe option is absent", unprotected_response.opt.observe, None) 

207 

208 async for o in request.observation: 

209 print("Expectation failed: Observe events coming in.") 

210 

211 @__implements_tests([6, 7]) 

212 async def test_observable(self): 

213 self.use_context("ab") 

214 

215 request = Message(code=GET, uri='coap://' + self.host + '/oscore/observe%s' % (self.testno - 5), observe=0) 

216 

217 request = self.ctx.request(request) 

218 

219 unprotected_response = await request.response 

220 

221 print("Unprotected response:", unprotected_response) 

222 additional_verify("Code as expected", unprotected_response.code, CONTENT) 

223 additional_verify("Observe option present", unprotected_response.opt.observe is not None, True) 

224 

225 payloads = [unprotected_response.payload] 

226 

227 async for o in request.observation: 

228 # FIXME: where's the 'two' stuck? 

229 payloads.append(o.payload) 

230 print("Verify: Received message", o, o.payload) 

231 if len(payloads) == 2 and self.testno == 7: 

232 # FIXME: Not yet ready to send actual cancellations 

233 break 

234 

235 expected_payloads = [b'one', b'two'] 

236 if self.testno == 6: 

237 expected_payloads.append(b'Terminate Observe') 

238 additional_verify("Server gave the correct responses", payloads, expected_payloads) 

239 

240 @__implements_tests([8]) 

241 async def test_post(self): 

242 self.use_context("ab") 

243 

244 request = Message(code=POST, uri='coap://' + self.host+ '/oscore/hello/6', payload=b"\x4a", content_format=0) 

245 unprotected_response = await self.ctx.request(request).response 

246 

247 print("Unprotected response:", unprotected_response) 

248 additional_verify("Code as expected", CHANGED, unprotected_response.code) 

249 additional_verify("Options as expected", unprotected_response.opt, Message(content_format=0).opt) 

250 additional_verify("Payload as expected", unprotected_response.payload, b"\x4a") 

251 

252 @__implements_tests([9]) 

253 async def test_put_match(self): 

254 self.use_context("ab") 

255 

256 request = Message(code=PUT, uri='coap://' + self.host+ '/oscore/hello/7', payload=b"\x7a", content_format=0, if_match=[b"\x7b"]) 

257 unprotected_response = await self.ctx.request(request).response 

258 

259 print("Unprotected response:", unprotected_response) 

260 additional_verify("Code as expected", CHANGED, unprotected_response.code) 

261 additional_verify("Options empty as expected", Message().opt, unprotected_response.opt) 

262 additional_verify("Payload absent as expected", unprotected_response.payload, b"") 

263 

264 @__implements_tests([10]) 

265 async def test_put_nonmatch(self): 

266 self.use_context("ab") 

267 

268 request = Message(code=PUT, uri='coap://' + self.host+ '/oscore/hello/7', payload=b"\x8a", content_format=0, if_none_match=True) 

269 unprotected_response = await self.ctx.request(request).response 

270 

271 print("Unprotected response:", unprotected_response) 

272 additional_verify("Code as expected", PRECONDITION_FAILED, unprotected_response.code) 

273 additional_verify("Options empty as expected", Message().opt, unprotected_response.opt) 

274 

275 @__implements_tests([11]) 

276 async def test_delete(self): 

277 self.use_context("ab") 

278 

279 request = Message(code=DELETE, uri='coap://' + self.host+ '/oscore/test') 

280 unprotected_response = await self.ctx.request(request).response 

281 

282 print("Unprotected response:", unprotected_response) 

283 additional_verify("Code as expected", DELETED, unprotected_response.code) 

284 additional_verify("Options empty as expected", Message().opt, unprotected_response.opt) 

285 

286 @__implements_tests([12, 13, 14]) 

287 async def test_oscoreerror_server_reports_error(self): 

288 self.use_context("ab") 

289 secctx = self.ctx.client_credentials[":ab"] 

290 

291 if self.testno == 12: 

292 expected_code = UNAUTHORIZED 

293 expected_error = oscore.NotAProtectedMessage 

294 # FIXME: frobbing the sender_id breaks sequence numbers... 

295 frobnicate_field = 'sender_id' 

296 elif self.testno == 13: 

297 expected_code = BAD_REQUEST 

298 expected_error = oscore.NotAProtectedMessage 

299 frobnicate_field = 'sender_key' 

300 elif self.testno == 14: 

301 expected_code = None 

302 expected_error = oscore.ProtectionInvalid 

303 frobnicate_field = 'recipient_key' 

304 

305 unfrobnicated = getattr(secctx, frobnicate_field) 

306 if unfrobnicated == b'': 

307 setattr(secctx, frobnicate_field, b'spam') 

308 else: 

309 setattr(secctx, frobnicate_field, bytes((255 - x) for x in unfrobnicated)) 

310 

311 request = Message(code=GET, uri='coap://' + self.host + '/oscore/hello/1') 

312 

313 try: 

314 unprotected_response = await self.ctx.request(request).response 

315 except expected_error as e: 

316 if expected_code is not None: 

317 if e.plain_message.code == expected_code: 

318 print("Check passed: The server responded with unencrypted %s."%(expected_code)) 

319 else: 

320 print("Failed: Server responded with something unencrypted, but not the expected code %s: %s"%(expected_code, e.plain_message)) 

321 else: 

322 print("Check passed: The validation failed. (%s)" % e) 

323 else: 

324 print("Failed: The validation passed.") 

325 print("Unprotected response:", unprotected_response) 

326 finally: 

327 setattr(secctx, frobnicate_field, unfrobnicated) 

328 # With a frobnicated sender_id, the stored context could not be 

329 # loaded for later use; making sure it's stored properly again. 

330 secctx._store() 

331 

332 @__implements_tests([15]) 

333 async def test_replay(self): 

334 self.use_context("ab") 

335 

336 request = Message(code=GET, uri='coap://' + self.host + '/oscore/hello/1') 

337 

338 unprotected_response = await self.ctx.request(request).response # make this _nonraising as soon as there's a proper context backend 

339 if unprotected_response.code != CONTENT: 

340 print("Failed: Request did not even pass before replay (%s)"%unprotected_response) 

341 return 

342 

343 secctx = self.ctx.client_credentials[":ab"] 

344 secctx.sender_sequence_number -= 1 

345 

346 try: 

347 unprotected_response = await self.ctx.request(request).response 

348 except oscore.NotAProtectedMessage as e: 

349 if e.plain_message.code == UNAUTHORIZED: 

350 print("Check passed: The server responded with unencrypted replay error.") 

351 else: 

352 print("Failed: Server responded with something unencrypted, but not the expected code %s: %s"%(e.plain_message.code, e.plain_message)) 

353 else: 

354 print("Failed: the validation passed.") 

355 print("Unprotected response:", unprotected_response) 

356 

357 @__implements_tests([16]) 

358 async def test_nonoscore_server(self): 

359 self.use_context("ab") 

360 

361 request = Message(code=GET, uri='coap://' + self.host+ '/oscore/hello/coap') 

362 

363 try: 

364 response = await self.ctx.request(request).response 

365 except oscore.NotAProtectedMessage as e: 

366 if e.plain_message.code == BAD_OPTION: 

367 print("Passed: Server reported bad option.") 

368 elif e.plain_message.code == METHOD_NOT_ALLOWED: 

369 print("Dubious: Server reported Method Not Allowed.") 

370 else: 

371 print("Failed: Server reported %s" % e.plain_message.code) 

372 else: 

373 print("Failed: The server accepted an OSCORE message") 

374 

375 @__implements_tests([17]) 

376 async def test_unauthorized(self): 

377 request = Message(code=GET, uri='coap://' + self.host + '/oscore/hello/1') 

378 

379 self.use_context(None) 

380 

381 response = await self.ctx.request(request).response 

382 

383 print("Response:", response, response.payload) 

384 additional_verify("Responde had correct code", response.code, UNAUTHORIZED) 

385 

386# # unofficial blockwise tests start here 

387#  

388# @__implements_tests([16, 17]) 

389# async def test_block2(self): 

390# #request = Message(code=GET, uri='coap://' + self.host + '/oscore/block/' + {16: 'outer', 17: 'inner'}[self.testno]) 

391# request = Message(code=GET, uri='coap://' + self.host + '/oscore/LargeResource') 

392#  

393# expected = {'content_format': 0} 

394# unprotected_response = await self.ctx.request(request, handle_blockwise=True).response 

395# if self.testno == 17: 

396# # the library should probably strip that 

397# expected['block2'] = optiontypes.BlockOption.BlockwiseTuple(block_number=1, more=False, size_exponent=6) 

398#  

399# print("Unprotected response:", unprotected_response) 

400# additional_verify("Code as expected", unprotected_response.code, CONTENT) 

401# additional_verify("Options as expected", unprotected_response.opt, Message(**expected).opt) 

402 

403if __name__ == "__main__": 

404 asyncio.run(PlugtestClientProgram().run_with_shutdown())