Coverage for contrib/oscore-plugtest/plugtest-server: 82%
160 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-12 11:18 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-12 11:18 +0000
1#!/usr/bin/env python3
2# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
3#
4# SPDX-License-Identifier: MIT
6"""A server suitable for running the OSCORE plug test series against it
8See https://github.com/EricssonResearch/OSCOAP for the test suite
9description."""
11import sys
12import asyncio
13import logging
14import argparse
15from pathlib import Path
17import aiocoap
18import aiocoap.oscore as oscore
19from aiocoap.oscore_sitewrapper import OscoreSiteWrapper
20import aiocoap.error as error
21from aiocoap.util.cli import AsyncCLIDaemon
22import aiocoap.resource as resource
23from aiocoap.credentials import CredentialsMap
24from aiocoap.cli.common import add_server_arguments, server_context_from_arguments
26from aiocoap.transports.oscore import OSCOREAddress
28# In some nested combinations of unittest and coverage, the usually
29# provided-by-default inclusion of local files does not work. Ensuring the
30# local plugtest_common *can* be included.
31import os.path
32sys.path.append(os.path.dirname(__file__))
33from plugtest_common import *
35class PleaseUseOscore(error.ConstructionRenderableError):
36 code = aiocoap.UNAUTHORIZED
37 message = "This is an OSCORE plugtest, please use option %d"%aiocoap.numbers.optionnumbers.OptionNumber.OSCORE
39def additional_verify_request_options(reference, request):
40 if request.opt.echo is not None:
41 # Silently accepting Echo as that's an artefact of B.1.2 recovery
42 reference.opt.echo = request.opt.echo
43 additional_verify("Request options as expected", reference.opt, request.opt)
45class PlugtestResource(resource.Resource):
46 options = {}
47 expected_options = {}
49 async def render_get(self, request):
50 reference = aiocoap.Message(**self.expected_options)
51 if request.opt.observe is not None and 'observe' not in self.expected_options:
52 # workaround for test 4 hitting on Hello1
53 reference.opt.observe = request.opt.observe
54 additional_verify_request_options(reference, request)
56 return aiocoap.Message(payload=self.message.encode('ascii'), **self.options)
58class Hello(PlugtestResource):
59 options = {'content_format': 0}
61 expected_options = {} # Uri-Path is stripped by the site
63 message = "Hello World!"
65Hello1 = Hello # same, just registered with the site for protected access
67class Hello2(Hello):
68 expected_options = {'uri_query': ['first=1']}
70 options = {'etag': b"\x2b", **Hello1.options}
72class Hello3(Hello):
73 expected_options = {'accept': 0}
75 options = {'max_age': 5, **Hello1.options}
77class Observe(PlugtestResource, aiocoap.interfaces.ObservableResource):
78 expected_options = {'observe': 0}
79 options = {'content_format': 0}
81 message = "one"
83 async def add_observation(self, request, serverobservation):
84 async def keep_entertained():
85 await asyncio.sleep(2)
86 serverobservation.trigger(aiocoap.Message(
87 mtype=aiocoap.CON, code=aiocoap.CONTENT,
88 payload=b"two", content_format=0,
89 ))
90 await asyncio.sleep(2)
91 serverobservation.trigger(aiocoap.Message(
92 mtype=aiocoap.CON, code=aiocoap.INTERNAL_SERVER_ERROR,
93 payload=b"Terminate Observe", content_format=0,
94 ))
95 t = asyncio.create_task(keep_entertained())
96 serverobservation.accept(t.cancel)
98class Hello6(resource.Resource):
99 async def render_post(self, request):
100 additional_verify_request_options(aiocoap.Message(content_format=0), request)
101 additional_verify("Request payload as expected", request.payload, b"\x4a")
103 return aiocoap.Message(code=aiocoap.CHANGED, payload=b"\x4a", content_format=0)
105class Hello7(resource.Resource):
106 async def render_put(self, request):
107 if request.opt.if_none_match:
108 print("This looks like test 10b")
109 additional_verify_request_options(aiocoap.Message(content_format=0, if_none_match=True), request)
110 additional_verify("Request payload as expected", request.payload, b"\x8a")
112 return aiocoap.Message(code=aiocoap.PRECONDITION_FAILED)
113 else:
114 print("This looks like test 9b")
115 additional_verify_request_options(aiocoap.Message(content_format=0, if_match=[b"{"]), request)
116 additional_verify("Request payload as expected", request.payload, b"z")
118 return aiocoap.Message(code=aiocoap.CHANGED)
120class DeleteResource(resource.Resource):
121 async def render_delete(self, request):
122 additional_verify_request_options(aiocoap.Message(), request)
123 return aiocoap.Message(code=aiocoap.DELETED)
125class BlockResource(PlugtestResource):
126 expected_options = {}
127 options = {'content_format': 0}
129 message = "This is a large resource\n" + "0123456789" * 101
131class InnerBlockMixin:
132 # this might become general enough that it could replace response blockwise
133 # handler some day -- right now, i'm only doing the absolute minimum
134 # necessary to satisfy the use case
136 inner_default_szx = aiocoap.MAX_REGULAR_BLOCK_SIZE_EXP
138 async def render(self, request):
139 response = await super().render(request)
141 if request.opt.block2 is None:
142 szx = self.inner_default_szx
143 blockno = 0
144 else:
145 szx = request.opt.block2.size_exponent
146 blockno = request.opt.block2.block_number
148 return response._extract_block(blockno, szx)
150class InnerBlockResource(InnerBlockMixin, BlockResource):
151 pass
153class SeqnoManager(resource.ObservableResource):
154 def __init__(self, contextmap):
155 super().__init__()
156 self.contextmap = contextmap
158 for c in self.contextmap.values():
159 c.notification_hooks.append(self.updated_state)
161 async def render_get(self, request):
162 text = ""
163 for name in ('b', 'd'):
164 the_context = self.contextmap[':' + name]
166 # this direct access is technically outside the interface for a
167 # SecurityContext, but then again, there isn't one yet
168 text += """In context %s, next seqno is %d (persisted up to %d)\n""" % (name.upper(), the_context.sender_sequence_number, the_context.sequence_number_persisted)
169 if the_context.recipient_replay_window.is_initialized():
170 index = the_context.recipient_replay_window._index
171 bitfield = the_context.recipient_replay_window._bitfield
172 # Useless for the internal representation, but much more readable
173 while bitfield & 1:
174 bitfield >>= 1
175 index += 1
176 print(index, bitfield)
177 bitfield_values = [i + index for (i, v) in enumerate(bin(bitfield)[2:][::-1]) if v == '1']
178 text += """I've seen all sequence numbers lower than %d%s.""" % (
179 index,
180 ", and also %s" % bitfield_values if bitfield else ""
181 )
182 else:
183 text += "The replay window is uninitialized"
184 text += "\n"
185 return aiocoap.Message(payload=text.encode('utf-8'), content_format=0)
187 async def render_put(self, request):
188 try:
189 number = int(request.payload.decode('utf8'))
190 except (ValueError, UnicodeDecodeError):
191 raise aiocoap.error.BadRequest("Only numeric values are accepted.")
193 raise NotImplementedError
195class PlugtestSite(resource.Site):
196 def __init__(self, server_credentials):
197 super().__init__()
199 self.add_resource(['.well-known', 'core'], resource.WKCResource(self.get_resources_as_linkheader))
200 self.add_resource(['oscore', 'hello', 'coap'], Hello())
202 self.add_resource(['oscore', 'hello', '1'], Hello1())
203 self.add_resource(['oscore', 'hello', '2'], Hello2())
204 self.add_resource(['oscore', 'hello', '3'], Hello3())
205 self.add_resource(['oscore', 'hello', '6'], Hello6())
206 self.add_resource(['oscore', 'hello', '7'], Hello7())
207 self.add_resource(['oscore', 'observe1'], Observe())
208 self.add_resource(['oscore', 'observe2'], Observe())
209 self.add_resource(['oscore', 'test'], DeleteResource())
211 self.add_resource(['oscore', 'block', 'outer'], BlockResource())
212 self.add_resource(['oscore', 'block', 'inner'], InnerBlockResource())
214 self.add_resource(['sequence-numbers'], SeqnoManager(server_credentials))
216class PlugtestServerProgram(AsyncCLIDaemon):
217 async def start(self):
218 p = argparse.ArgumentParser(description="Server for the OSCORE plug test. Requires a test number to be present.")
219 p.add_argument("contextdir", help="Directory name where to persist sequence numbers", type=Path)
220 p.add_argument('--verbose', help="Increase log level", action='store_true')
221 p.add_argument('--state-was-lost', help="Lose memory of the replay window, forcing B.1.2 recovery", action='store_true')
222 add_server_arguments(p)
223 opts = p.parse_args()
225 if opts.verbose:
226 logging.basicConfig(level=logging.DEBUG)
227 else:
228 logging.basicConfig(level=logging.WARNING)
231 server_credentials = CredentialsMap()
232 server_credentials[':b'] = get_security_context('b', opts.contextdir / "b", opts.state_was_lost)
233 server_credentials[':d'] = get_security_context('d', opts.contextdir / "d", opts.state_was_lost)
235 site = PlugtestSite(server_credentials)
236 site = OscoreSiteWrapper(site, server_credentials)
238 self.context = await server_context_from_arguments(site, opts)
240 print("Plugtest server ready.")
241 sys.stdout.flush() # the unit tests might wait abundantly long for this otherwise
243 async def shutdown(self):
244 await self.context.shutdown()
246if __name__ == "__main__":
247 PlugtestServerProgram.sync_main()