1#!/usr/bin/env python3
2# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors
4# SPDX-License-Identifier: MIT
6"""A server suitable for running the OSCORE plug test series against it
8See for the test suite
11import sys
12import asyncio
13import logging
14import argparse
15from pathlib import Path
17import aiocoap
18import aiocoap.oscore as oscore
19from aiocoap.oscore_sitewrapper import OscoreSiteWrapper
20import aiocoap.error as error
21from aiocoap.util.cli import AsyncCLIDaemon
22import aiocoap.resource as resource
23from aiocoap.credentials import CredentialsMap
24from aiocoap.cli.common import add_server_arguments, server_context_from_arguments
26from aiocoap.transports.oscore import OSCOREAddress
28# In some nested combinations of unittest and coverage, the usually
29# provided-by-default inclusion of local files does not work. Ensuring the
30# local plugtest_common *can* be included.
31import os.path
33from plugtest_common import *
35class PleaseUseOscore(error.ConstructionRenderableError):
36 code = aiocoap.UNAUTHORIZED
37 message = "This is an OSCORE plugtest, please use option %d"%aiocoap.numbers.optionnumbers.OptionNumber.OSCORE
39def additional_verify_request_options(reference, request):
40 if request.opt.echo is not None:
41 # Silently accepting Echo as that's an artefact of B.1.2 recovery
42 reference.opt.echo = request.opt.echo
43 additional_verify("Request options as expected", reference.opt, request.opt)
45class PlugtestResource(resource.Resource):
46 options = {}
47 expected_options = {}
49 async def render_get(self, request):
50 reference = aiocoap.Message(**self.expected_options)
51 if request.opt.observe is not None and 'observe' not in self.expected_options:
52 # workaround for test 4 hitting on Hello1
53 reference.opt.observe = request.opt.observe
54 additional_verify_request_options(reference, request)
56 return aiocoap.Message(payload=self.message.encode('ascii'), **self.options)
58class Hello(PlugtestResource):
59 options = {'content_format': 0}
61 expected_options = {} # Uri-Path is stripped by the site
63 message = "Hello World!"
65Hello1 = Hello # same, just registered with the site for protected access
67class Hello2(Hello):
68 expected_options = {'uri_query': ['first=1']}
70 options = {'etag': b"\x2b", **Hello1.options}
72class Hello3(Hello):
73 expected_options = {'accept': 0}
75 options = {'max_age': 5, **Hello1.options}
77class Observe(PlugtestResource, aiocoap.interfaces.ObservableResource):
78 expected_options = {'observe': 0}
79 options = {'content_format': 0}
81 message = "one"
83 async def add_observation(self, request, serverobservation):
84 async def keep_entertained():
85 await asyncio.sleep(2)
86 serverobservation.trigger(aiocoap.Message(
87 mtype=aiocoap.CON, code=aiocoap.CONTENT,
88 payload=b"two", content_format=0,
89 ))
90 await asyncio.sleep(2)
91 serverobservation.trigger(aiocoap.Message(
92 mtype=aiocoap.CON, code=aiocoap.INTERNAL_SERVER_ERROR,
93 payload=b"Terminate Observe", content_format=0,
94 ))
95 t = asyncio.create_task(keep_entertained())
96 serverobservation.accept(t.cancel)
98class Hello6(resource.Resource):
99 async def render_post(self, request):
100 additional_verify_request_options(aiocoap.Message(content_format=0), request)
101 additional_verify("Request payload as expected", request.payload, b"\x4a")
103 return aiocoap.Message(code=aiocoap.CHANGED, payload=b"\x4a", content_format=0)
105class Hello7(resource.Resource):
106 async def render_put(self, request):
107 if request.opt.if_none_match:
108 print("This looks like test 10b")
109 additional_verify_request_options(aiocoap.Message(content_format=0, if_none_match=True), request)
110 additional_verify("Request payload as expected", request.payload, b"\x8a")
112 return aiocoap.Message(code=aiocoap.PRECONDITION_FAILED)
113 else:
114 print("This looks like test 9b")
115 additional_verify_request_options(aiocoap.Message(content_format=0, if_match=[b"{"]), request)
116 additional_verify("Request payload as expected", request.payload, b"z")
118 return aiocoap.Message(code=aiocoap.CHANGED)
120class DeleteResource(resource.Resource):
121 async def render_delete(self, request):
122 additional_verify_request_options(aiocoap.Message(), request)
123 return aiocoap.Message(code=aiocoap.DELETED)
125class BlockResource(PlugtestResource):
126 expected_options = {}
127 options = {'content_format': 0}
129 message = "This is a large resource\n" + "0123456789" * 101
131class InnerBlockMixin:
132 # this might become general enough that it could replace response blockwise
133 # handler some day -- right now, i'm only doing the absolute minimum
134 # necessary to satisfy the use case
136 inner_default_szx = aiocoap.MAX_REGULAR_BLOCK_SIZE_EXP
138 async def render(self, request):
139 response = await super().render(request)
141 if request.opt.block2 is None:
142 szx = self.inner_default_szx
143 blockno = 0
144 else:
145 szx = request.opt.block2.size_exponent
146 blockno = request.opt.block2.block_number
148 return response._extract_block(blockno, szx)
150class InnerBlockResource(InnerBlockMixin, BlockResource):
151 pass
153class SeqnoManager(resource.ObservableResource):
154 def __init__(self, contextmap):
155 super().__init__()
156 self.contextmap = contextmap
158 for c in self.contextmap.values():
159 c.notification_hooks.append(self.updated_state)
161 async def render_get(self, request):
162 text = ""
163 for name in ('b', 'd'):
164 the_context = self.contextmap[':' + name]
166 # this direct access is technically outside the interface for a
167 # SecurityContext, but then again, there isn't one yet
168 text += """In context %s, next seqno is %d (persisted up to %d)\n""" % (name.upper(), the_context.sender_sequence_number, the_context.sequence_number_persisted)
169 if the_context.recipient_replay_window.is_initialized():
170 index = the_context.recipient_replay_window._index
171 bitfield = the_context.recipient_replay_window._bitfield
172 # Useless for the internal representation, but much more readable
173 while bitfield & 1:
174 bitfield >>= 1
175 index += 1
176 print(index, bitfield)
177 bitfield_values = [i + index for (i, v) in enumerate(bin(bitfield)[2:][::-1]) if v == '1']
178 text += """I've seen all sequence numbers lower than %d%s.""" % (
179 index,
180 ", and also %s" % bitfield_values if bitfield else ""
181 )
182 else:
183 text += "The replay window is uninitialized"
184 text += "\n"
185 return aiocoap.Message(payload=text.encode('utf-8'), content_format=0)
187 async def render_put(self, request):
188 try:
189 number = int(request.payload.decode('utf8'))
190 except (ValueError, UnicodeDecodeError):
191 raise aiocoap.error.BadRequest("Only numeric values are accepted.")
193 raise NotImplementedError
195class PlugtestSite(resource.Site):
196 def __init__(self, server_credentials):
197 super().__init__()
199 self.add_resource(['.well-known', 'core'], resource.WKCResource(self.get_resources_as_linkheader))
200 self.add_resource(['oscore', 'hello', 'coap'], Hello())
202 self.add_resource(['oscore', 'hello', '1'], Hello1())
203 self.add_resource(['oscore', 'hello', '2'], Hello2())
204 self.add_resource(['oscore', 'hello', '3'], Hello3())
205 self.add_resource(['oscore', 'hello', '6'], Hello6())
206 self.add_resource(['oscore', 'hello', '7'], Hello7())
207 self.add_resource(['oscore', 'observe1'], Observe())
208 self.add_resource(['oscore', 'observe2'], Observe())
209 self.add_resource(['oscore', 'test'], DeleteResource())
211 self.add_resource(['oscore', 'block', 'outer'], BlockResource())
212 self.add_resource(['oscore', 'block', 'inner'], InnerBlockResource())
214 self.add_resource(['sequence-numbers'], SeqnoManager(server_credentials))
216class PlugtestServerProgram(AsyncCLIDaemon):
217 async def start(self):
218 p = argparse.ArgumentParser(description="Server for the OSCORE plug test. Requires a test number to be present.")
219 p.add_argument("contextdir", help="Directory name where to persist sequence numbers", type=Path)
220 p.add_argument('--verbose', help="Increase log level", action='store_true')
221 p.add_argument('--state-was-lost', help="Lose memory of the replay window, forcing B.1.2 recovery", action='store_true')
222 add_server_arguments(p)
223 opts = p.parse_args()
225 if opts.verbose:
226 logging.basicConfig(level=logging.DEBUG)
227 else:
228 logging.basicConfig(level=logging.WARNING)
231 server_credentials = CredentialsMap()
232 server_credentials[':b'] = get_security_context('b', opts.contextdir / "b", opts.state_was_lost)
233 server_credentials[':d'] = get_security_context('d', opts.contextdir / "d", opts.state_was_lost)
235 site = PlugtestSite(server_credentials)
236 site = OscoreSiteWrapper(site, server_credentials)
238 self.context = await server_context_from_arguments(site, opts)
240 print("Plugtest server ready.")
241 sys.stdout.flush() # the unit tests might wait abundantly long for this otherwise
243 async def shutdown(self):
244 await self.context.shutdown()
246if __name__ == "__main__":
247 PlugtestServerProgram.sync_main()