Coverage for aiocoap/transports/rfc8323common.py: 82%

105 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-11-28 12:34 +0000

1# SPDX-FileCopyrightText: Christian Amsüss and the aiocoap contributors 

2# 

3# SPDX-License-Identifier: MIT 

4 

5"""Common code for the tcp and the ws modules, both of which are based on 

6RFC8323 mechanisms, but differ in their underlying protocol implementations 

7(asyncio stream vs. websockets module) far enough that they only share small 

8portions of their code""" 

9 

10import asyncio 

11from typing import Optional 

12from aiocoap import Message 

13from aiocoap import optiontypes, util 

14from aiocoap.numbers.codes import CSM, PING, PONG, RELEASE, ABORT 

15from aiocoap import error 

16 

17 

18class CloseConnection(Exception): 

19 """Raised in RFC8323 common processing to trigger a connection shutdown on 

20 the TCP / WebSocket side. 

21 

22 The TCP / WebSocket side should send the exception's argument on to the 

23 token manager, close the connection, and does not need to perform further 

24 logging.""" 

25 

26 

27class RFC8323Remote: 

28 """Mixin for Remotes for all the common RFC8323 processing 

29 

30 Implementations still need the per-transport parts, especially a 

31 _send_message and an _abort_with implementation. 

32 """ 

33 

34 # CSM received from the peer. The receive hook should abort suitably when 

35 # receiving a non-CSM message and this is not set yet. 

36 _remote_settings: Optional[Message] 

37 

38 # Parameter usually set statically per implementation 

39 _my_max_message_size = 1024 * 1024 

40 

41 def __init__(self): 

42 self._remote_settings = None 

43 

44 is_multicast = False 

45 is_multicast_locally = False 

46 

47 # implementing interfaces.EndpointAddress 

48 

49 def __repr__(self): 

50 return "<%s at %#x, hostinfo %s, local %s>" % ( 

51 type(self).__name__, 

52 id(self), 

53 self.hostinfo, 

54 self.hostinfo_local, 

55 ) 

56 

57 @property 

58 def hostinfo(self): 

59 # keeping _remote_hostinfo and _local_hostinfo around structurally rather than in 

60 # hostinfo / hostinfo_local form looks odd now, but on the long run the 

61 # remote should be able to tell the message what its default Uri-Host 

62 # value is 

63 return util.hostportjoin(*self._remote_hostinfo) 

64 

65 @property 

66 def hostinfo_local(self): 

67 return util.hostportjoin(*self._local_hostinfo) 

68 

69 @property 

70 def uri_base(self): 

71 if self._local_is_server: 

72 raise error.AnonymousHost( 

73 "Client side of %s can not be expressed as a URI" % self._ctx._scheme 

74 ) 

75 else: 

76 return self._ctx._scheme + "://" + self.hostinfo 

77 

78 @property 

79 def uri_base_local(self): 

80 if self._local_is_server: 

81 return self._ctx._scheme + "://" + self.hostinfo_local 

82 else: 

83 raise error.AnonymousHost( 

84 "Client side of %s can not be expressed as a URI" % self._ctx._scheme 

85 ) 

86 

87 @property 

88 def maximum_block_size_exp(self): 

89 if self._remote_settings is None: 

90 # This is assuming that we can do BERT, so a first Block1 would be 

91 # exponent 7 but still only 1k -- because by the time we send this, 

92 # we typically haven't seen a CSM yet, so we'd be stuck with 6 

93 # because 7959 says we can't increase the exponent... 

94 # 

95 # FIXME: test whether we're properly using lower block sizes if 

96 # server says that szx=7 is not OK. 

97 return 7 

98 

99 max_message_size = (self._remote_settings or {}).get("max-message-size", 1152) 

100 has_blockwise = (self._remote_settings or {}).get("block-wise-transfer", False) 

101 if max_message_size > 1152 and has_blockwise: 

102 return 7 

103 return 6 # FIXME: deal with smaller max-message-size 

104 

105 @property 

106 def maximum_payload_size(self): 

107 # see maximum_payload_size of interfaces comment 

108 slack = 100 

109 

110 max_message_size = (self._remote_settings or {}).get("max-message-size", 1152) 

111 has_blockwise = (self._remote_settings or {}).get("block-wise-transfer", False) 

112 if max_message_size > 1152 and has_blockwise: 

113 return ((max_message_size - 128) // 1024) * 1024 + slack 

114 return 1024 + slack # FIXME: deal with smaller max-message-size 

115 

116 @property 

117 def blockwise_key(self): 

118 return (self._remote_hostinfo, self._local_hostinfo) 

119 

120 # Utility methods for implementing an RFC8323 transport 

121 

122 def _send_initial_csm(self): 

123 my_csm = Message(code=CSM) 

124 # this is a tad awkward in construction because the options objects 

125 # were designed under the assumption that the option space is constant 

126 # for all message codes. 

127 block_length = optiontypes.UintOption(2, self._my_max_message_size) 

128 my_csm.opt.add_option(block_length) 

129 supports_block = optiontypes.UintOption(4, 0) 

130 my_csm.opt.add_option(supports_block) 

131 self._send_message(my_csm) 

132 

133 def _process_signaling(self, msg): 

134 if msg.code == CSM: 

135 if self._remote_settings is None: 

136 self._remote_settings = {} 

137 for opt in msg.opt.option_list(): 

138 # FIXME: this relies on the relevant option numbers to be 

139 # opaque; message parsing should already use the appropriate 

140 # option types, or re-think the way options are parsed 

141 if opt.number == 2: 

142 self._remote_settings["max-message-size"] = int.from_bytes( 

143 opt.value, "big" 

144 ) 

145 elif opt.number == 4: 

146 self._remote_settings["block-wise-transfer"] = True 

147 elif opt.number.is_critical(): 

148 self.abort("Option not supported", bad_csm_option=opt.number) 

149 else: 

150 pass # ignoring elective CSM options 

151 elif msg.code in (PING, PONG, RELEASE, ABORT): 

152 # not expecting data in any of them as long as Custody is not implemented 

153 for opt in msg.opt.option_list(): 

154 if opt.number.is_critical(): 

155 self.abort("Unknown critical option") 

156 else: 

157 pass 

158 

159 if msg.code == PING: 

160 pong = Message(code=PONG, token=msg.token) 

161 self._send_message(pong) 

162 elif msg.code == PONG: 

163 pass 

164 elif msg.code == RELEASE: 

165 # The behavior SHOULD be enhanced to answer outstanding 

166 # requests, but it is unclear to which extent this side may 

167 # still use the connection. 

168 self.log.info( 

169 "Received Release, closing on this end (options: %s)", msg.opt 

170 ) 

171 raise CloseConnection( 

172 error.RemoteServerShutdown("Peer released connection") 

173 ) 

174 elif msg.code == ABORT: 

175 self.log.warning("Received Abort (options: %s)", msg.opt) 

176 raise CloseConnection( 

177 error.RemoteServerShutdown("Peer aborted connection") 

178 ) 

179 else: 

180 self.abort("Unknown signalling code") 

181 

182 def abort(self, errormessage=None, bad_csm_option=None): 

183 self.log.warning("Aborting connection: %s", errormessage) 

184 abort_msg = Message(code=ABORT) 

185 if errormessage is not None: 

186 abort_msg.payload = errormessage.encode("utf8") 

187 if bad_csm_option is not None: 

188 bad_csm_option_option = optiontypes.UintOption(2, bad_csm_option) 

189 abort_msg.opt.add_option(bad_csm_option_option) 

190 self._abort_with(abort_msg) 

191 

192 async def release(self): 

193 """Send Release message, (not implemented:) wait for connection to be 

194 actually closed by the peer. 

195 

196 Subclasses should extend this to await closing of the connection, 

197 especially if they'd get into lock-up states otherwise (was would 

198 WebSockets). 

199 """ 

200 self.log.info("Releasing connection %s", self) 

201 release_msg = Message(code=RELEASE) 

202 self._send_message(release_msg) 

203 

204 try: 

205 # FIXME: we could wait for the peer to close the connection, but a) 

206 # that'll need some work on the interface between this module and 

207 # ws/tcp, and b) we have no peers to test this with that would 

208 # produce any sensible data (as aiocoap on release just closes). 

209 pass 

210 except asyncio.CancelledError: 

211 self.log.warning( 

212 "Connection %s was not closed by peer in time after release", self 

213 )