Coverage for contrib/oscore-plugtest/plugtest-client: 80%
265 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-12 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-12 11:18 +0000
1#!/usr/bin/env python3
2# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
3#
4# SPDX-License-Identifier: MIT
6"""A client suitable for running the OSCORE plug test series against a given
7server
9See https://github.com/EricssonResearch/OSCOAP for the test suite
10description."""
12import argparse
13import asyncio
14import logging
15import signal
16import functools
17from pathlib import Path
18import sys
20from aiocoap import *
21from aiocoap import error
22from aiocoap import interfaces
23from aiocoap import credentials
25# In some nested combinations of unittest and coverage, the usually
26# provided-by-default inclusion of local files does not work. Ensuring the
27# local plugtest_common *can* be included.
28import os.path
29sys.path.append(os.path.dirname(__file__))
30from plugtest_common import *
33class PlugtestClientProgram:
34 async def run(self):
35 p = argparse.ArgumentParser(description="Client for the OSCORE plug test.")
36 p.add_argument("host", help="Hostname of the server")
37 p.add_argument("contextdir", help="Directory name where to persist sequence numbers", type=Path)
38 p.add_argument("testno", nargs="?", type=int, help="Test number to run (integer part); leave out for interactive mode")
39 p.add_argument("--verbose", help="Show aiocoap debug messages", action='store_true')
40 opts = p.parse_args()
42 self.host = opts.host
44 # this also needs to be called explicitly as only the
45 # 'logging.warning()'-style functions will call it; creating a
46 # sub-logger and logging from there makes the whole logging system not
47 # emit the 'WARNING' prefixes that set apart log messages from regular
48 # prints and also help the test suite catch warnings and errors
49 if opts.verbose:
50 logging.basicConfig(level=logging.DEBUG)
51 else:
52 logging.basicConfig(level=logging.WARNING)
54 security_context_a = get_security_context('a', opts.contextdir / "a")
55 security_context_c = get_security_context('c', opts.contextdir / "c")
57 self.ctx = await Context.create_client_context()
58 self.ctx.client_credentials[":ab"] = security_context_a
59 self.ctx.client_credentials[":cd"] = security_context_c
61 if opts.testno is not None:
62 await self.run_test(opts.testno)
63 else:
64 next_testno = 0
65 delta = 1
67 while True:
68 # Yes this is blocking, but since the tests usually terminate
69 # by themselves ... *shrug*
70 try:
71 next = input("Enter a test number (empty input runs %s, q to quit): " % next_testno)
72 except (KeyboardInterrupt, EOFError):
73 keeprunning = False
74 break
75 if next == "q":
76 keeprunning = False
77 break
78 if next:
79 try:
80 as_int = int(next)
81 except ValueError:
82 print("That's not a number.")
83 continue
84 else:
85 if as_int - next_testno + delta in (0, 1):
86 # Don't jump around randomly if user jumped around,
87 # but otherwise do something sane
88 delta = as_int - next_testno + delta
89 next_testno = as_int
91 print("Running test %s" % next_testno)
92 try:
93 await self.run_test(next_testno)
94 except error.Error as e:
95 print("Test failed with an exception:", e)
96 print()
97 next_testno += delta
99 ctx = None
101 async def run_with_shutdown(self):
102 # Having SIGTERM cause a more graceful shutdown (even if it's not
103 # asynchronously awaiting the shutdown, which would be impractical
104 # since we're likely inside some unintended timeout already) allow for
105 # output buffers to be flushed when the unit test program instrumenting
106 # it terminates it.
107 loop = asyncio.get_running_loop()
108 loop.add_signal_handler(signal.SIGTERM, loop.close)
110 try:
111 await self.run()
112 finally:
113 if self.ctx is not None:
114 await self.ctx.shutdown()
116 def use_context(self, which):
117 if which is None:
118 self.ctx.client_credentials.pop("coap://%s/*" % self.host, None)
119 else:
120 self.ctx.client_credentials["coap://%s/*" % self.host] = ":" + which
122 async def run_test(self, testno):
123 self.testno = testno
124 testfun = self.__methods[testno]
125 try:
126 await getattr(self, testfun)()
127 except oscore.NotAProtectedMessage as e:
128 print("Response carried no Object-Security option, but was: %s %s"%(e.plain_message, e.plain_message.payload))
129 raise
131 __methods = {}
132 def __implements_tests(numbers, __methods=__methods):
133 def registerer(method):
134 for n in numbers:
135 __methods[n] = method.__name__
136 return method
137 return registerer
139 @__implements_tests([0])
140 async def test_plain(self):
141 request = Message(code=GET, uri='coap://' + self.host + '/oscore/hello/coap')
143 self.use_context(None)
145 response = await self.ctx.request(request).response
147 print("Response:", response)
148 additional_verify("Responde had correct code", response.code, CONTENT)
149 additional_verify("Responde had correct payload", response.payload, b"Hello World!")
150 additional_verify("Options as expected", response.opt, Message(content_format=0).opt)
152 @__implements_tests([1, 2])
153 async def test_hello12(self):
154 if self.testno == 1:
155 self.use_context("ab")
156 else:
157 self.use_context("cd")
159 request = Message(code=GET, uri='coap://' + self.host+ '/oscore/hello/1')
160 expected = {'content_format': 0}
161 unprotected_response = await self.ctx.request(request).response
163 print("Unprotected response:", unprotected_response)
164 additional_verify("Code as expected", unprotected_response.code, CONTENT)
165 additional_verify("Options as expected", unprotected_response.opt, Message(**expected).opt)
166 additional_verify("Payload as expected", unprotected_response.payload, b"Hello World!")
168 @__implements_tests([3])
169 async def test_hellotest3(self):
170 self.use_context("ab")
172 request = Message(code=GET, uri='coap://' + self.host+ '/oscore/hello/2?first=1')
173 expected = {'content_format': 0, 'etag': b'\x2b'}
174 unprotected_response = await self.ctx.request(request).response
176 print("Unprotected response:", unprotected_response)
177 additional_verify("Code as expected", unprotected_response.code, CONTENT)
178 additional_verify("Options as expected", unprotected_response.opt, Message(**expected).opt)
179 additional_verify("Payload as expected", unprotected_response.payload, b"Hello World!")
181 @__implements_tests([4])
182 async def test_hellotest4(self):
183 self.use_context("ab")
185 request = Message(code=GET, uri='coap://' + self.host+ '/oscore/hello/3', accept=0)
186 expected = {'content_format': 0, 'max_age': 5}
187 unprotected_response = await self.ctx.request(request).response
189 print("Unprotected response:", unprotected_response)
190 additional_verify("Code as expected", unprotected_response.code, CONTENT)
191 additional_verify("Options as expected", unprotected_response.opt, Message(**expected).opt)
192 additional_verify("Payload as expected", unprotected_response.payload, b"Hello World!")
194 @__implements_tests([5])
195 async def test_nonobservable(self):
196 self.use_context("ab")
198 request = Message(code=GET, uri='coap://' + self.host + '/oscore/hello/1', observe=0)
200 request = self.ctx.request(request)
202 unprotected_response = await request.response
204 print("Unprotected response:", unprotected_response)
205 additional_verify("Code as expected", unprotected_response.code, CONTENT)
206 additional_verify("Observe option is absent", unprotected_response.opt.observe, None)
208 async for o in request.observation:
209 print("Expectation failed: Observe events coming in.")
211 @__implements_tests([6, 7])
212 async def test_observable(self):
213 self.use_context("ab")
215 request = Message(code=GET, uri='coap://' + self.host + '/oscore/observe%s' % (self.testno - 5), observe=0)
217 request = self.ctx.request(request)
219 unprotected_response = await request.response
221 print("Unprotected response:", unprotected_response)
222 additional_verify("Code as expected", unprotected_response.code, CONTENT)
223 additional_verify("Observe option present", unprotected_response.opt.observe is not None, True)
225 payloads = [unprotected_response.payload]
227 async for o in request.observation:
228 # FIXME: where's the 'two' stuck?
229 payloads.append(o.payload)
230 print("Verify: Received message", o, o.payload)
231 if len(payloads) == 2 and self.testno == 7:
232 # FIXME: Not yet ready to send actual cancellations
233 break
235 expected_payloads = [b'one', b'two']
236 if self.testno == 6:
237 expected_payloads.append(b'Terminate Observe')
238 additional_verify("Server gave the correct responses", payloads, expected_payloads)
240 @__implements_tests([8])
241 async def test_post(self):
242 self.use_context("ab")
244 request = Message(code=POST, uri='coap://' + self.host+ '/oscore/hello/6', payload=b"\x4a", content_format=0)
245 unprotected_response = await self.ctx.request(request).response
247 print("Unprotected response:", unprotected_response)
248 additional_verify("Code as expected", CHANGED, unprotected_response.code)
249 additional_verify("Options as expected", unprotected_response.opt, Message(content_format=0).opt)
250 additional_verify("Payload as expected", unprotected_response.payload, b"\x4a")
252 @__implements_tests([9])
253 async def test_put_match(self):
254 self.use_context("ab")
256 request = Message(code=PUT, uri='coap://' + self.host+ '/oscore/hello/7', payload=b"\x7a", content_format=0, if_match=[b"\x7b"])
257 unprotected_response = await self.ctx.request(request).response
259 print("Unprotected response:", unprotected_response)
260 additional_verify("Code as expected", CHANGED, unprotected_response.code)
261 additional_verify("Options empty as expected", Message().opt, unprotected_response.opt)
262 additional_verify("Payload absent as expected", unprotected_response.payload, b"")
264 @__implements_tests([10])
265 async def test_put_nonmatch(self):
266 self.use_context("ab")
268 request = Message(code=PUT, uri='coap://' + self.host+ '/oscore/hello/7', payload=b"\x8a", content_format=0, if_none_match=True)
269 unprotected_response = await self.ctx.request(request).response
271 print("Unprotected response:", unprotected_response)
272 additional_verify("Code as expected", PRECONDITION_FAILED, unprotected_response.code)
273 additional_verify("Options empty as expected", Message().opt, unprotected_response.opt)
275 @__implements_tests([11])
276 async def test_delete(self):
277 self.use_context("ab")
279 request = Message(code=DELETE, uri='coap://' + self.host+ '/oscore/test')
280 unprotected_response = await self.ctx.request(request).response
282 print("Unprotected response:", unprotected_response)
283 additional_verify("Code as expected", DELETED, unprotected_response.code)
284 additional_verify("Options empty as expected", Message().opt, unprotected_response.opt)
286 @__implements_tests([12, 13, 14])
287 async def test_oscoreerror_server_reports_error(self):
288 self.use_context("ab")
289 secctx = self.ctx.client_credentials[":ab"]
291 if self.testno == 12:
292 expected_code = UNAUTHORIZED
293 expected_error = oscore.NotAProtectedMessage
294 # FIXME: frobbing the sender_id breaks sequence numbers...
295 frobnicate_field = 'sender_id'
296 elif self.testno == 13:
297 expected_code = BAD_REQUEST
298 expected_error = oscore.NotAProtectedMessage
299 frobnicate_field = 'sender_key'
300 elif self.testno == 14:
301 expected_code = None
302 expected_error = oscore.ProtectionInvalid
303 frobnicate_field = 'recipient_key'
305 unfrobnicated = getattr(secctx, frobnicate_field)
306 if unfrobnicated == b'':
307 setattr(secctx, frobnicate_field, b'spam')
308 else:
309 setattr(secctx, frobnicate_field, bytes((255 - x) for x in unfrobnicated))
311 request = Message(code=GET, uri='coap://' + self.host + '/oscore/hello/1')
313 try:
314 unprotected_response = await self.ctx.request(request).response
315 except expected_error as e:
316 if expected_code is not None:
317 if e.plain_message.code == expected_code:
318 print("Check passed: The server responded with unencrypted %s."%(expected_code))
319 else:
320 print("Failed: Server responded with something unencrypted, but not the expected code %s: %s"%(expected_code, e.plain_message))
321 else:
322 print("Check passed: The validation failed. (%s)" % e)
323 else:
324 print("Failed: The validation passed.")
325 print("Unprotected response:", unprotected_response)
326 finally:
327 setattr(secctx, frobnicate_field, unfrobnicated)
328 # With a frobnicated sender_id, the stored context could not be
329 # loaded for later use; making sure it's stored properly again.
330 secctx._store()
332 @__implements_tests([15])
333 async def test_replay(self):
334 self.use_context("ab")
336 request = Message(code=GET, uri='coap://' + self.host + '/oscore/hello/1')
338 unprotected_response = await self.ctx.request(request).response # make this _nonraising as soon as there's a proper context backend
339 if unprotected_response.code != CONTENT:
340 print("Failed: Request did not even pass before replay (%s)"%unprotected_response)
341 return
343 secctx = self.ctx.client_credentials[":ab"]
344 secctx.sender_sequence_number -= 1
346 try:
347 unprotected_response = await self.ctx.request(request).response
348 except oscore.NotAProtectedMessage as e:
349 if e.plain_message.code == UNAUTHORIZED:
350 print("Check passed: The server responded with unencrypted replay error.")
351 else:
352 print("Failed: Server responded with something unencrypted, but not the expected code %s: %s"%(e.plain_message.code, e.plain_message))
353 else:
354 print("Failed: the validation passed.")
355 print("Unprotected response:", unprotected_response)
357 @__implements_tests([16])
358 async def test_nonoscore_server(self):
359 self.use_context("ab")
361 request = Message(code=GET, uri='coap://' + self.host+ '/oscore/hello/coap')
363 try:
364 response = await self.ctx.request(request).response
365 except oscore.NotAProtectedMessage as e:
366 if e.plain_message.code == BAD_OPTION:
367 print("Passed: Server reported bad option.")
368 elif e.plain_message.code == METHOD_NOT_ALLOWED:
369 print("Dubious: Server reported Method Not Allowed.")
370 else:
371 print("Failed: Server reported %s" % e.plain_message.code)
372 else:
373 print("Failed: The server accepted an OSCORE message")
375 @__implements_tests([17])
376 async def test_unauthorized(self):
377 request = Message(code=GET, uri='coap://' + self.host + '/oscore/hello/1')
379 self.use_context(None)
381 response = await self.ctx.request(request).response
383 print("Response:", response, response.payload)
384 additional_verify("Responde had correct code", response.code, UNAUTHORIZED)
386# # unofficial blockwise tests start here
387#
388# @__implements_tests([16, 17])
389# async def test_block2(self):
390# #request = Message(code=GET, uri='coap://' + self.host + '/oscore/block/' + {16: 'outer', 17: 'inner'}[self.testno])
391# request = Message(code=GET, uri='coap://' + self.host + '/oscore/LargeResource')
392#
393# expected = {'content_format': 0}
394# unprotected_response = await self.ctx.request(request, handle_blockwise=True).response
395# if self.testno == 17:
396# # the library should probably strip that
397# expected['block2'] = optiontypes.BlockOption.BlockwiseTuple(block_number=1, more=False, size_exponent=6)
398#
399# print("Unprotected response:", unprotected_response)
400# additional_verify("Code as expected", unprotected_response.code, CONTENT)
401# additional_verify("Options as expected", unprotected_response.opt, Message(**expected).opt)
403if __name__ == "__main__":
404 asyncio.run(PlugtestClientProgram().run_with_shutdown())