hat.juggler
Juggler communication protocol
1"""Juggler communication protocol""" 2 3from hat.juggler.client import (NotifyCb, 4 JugglerError, 5 connect, 6 Client) 7from hat.juggler.server import (ConnectionCb, 8 RequestCb, 9 listen, 10 Server, 11 Connection) 12 13 14__all__ = ['NotifyCb', 15 'JugglerError', 16 'connect', 17 'Client', 18 'ConnectionCb', 19 'RequestCb', 20 'listen', 21 'Server', 22 'Connection']
23class JugglerError(Exception): 24 """Juggler error""" 25 26 def __init__(self, data: json.Data): 27 self.__data = data 28 29 @property 30 def data(self) -> json.Data: 31 """Error data""" 32 return self.__data
Juggler error
Error data
Inherited Members
- builtins.BaseException
- with_traceback
- args
35async def connect(address: str, 36 notify_cb: NotifyCb | None = None 37 ) -> 'Client': 38 """Connect to remote server 39 40 `address` represents remote WebSocket URL formated as 41 ``<schema>://<host>:<port>/<path>`` where ``<schema>`` is ``ws`` or 42 ``wss``. 43 44 """ 45 client = Client() 46 client._notify_cb = notify_cb 47 client._async_group = aio.Group() 48 client._state = json.Storage() 49 client._res_futures = {} 50 client._next_req_ids = itertools.count(1) 51 client._session = aiohttp.ClientSession() 52 53 try: 54 client._ws = await client._session.ws_connect(address, max_msg_size=0) 55 56 except BaseException: 57 await aio.uncancellable(client._session.close()) 58 raise 59 60 client.async_group.spawn(client._receive_loop) 61 62 return client
Connect to remote server
address
represents remote WebSocket URL formated as
<schema>://<host>:<port>/<path>
where <schema>
is ws
or
wss
.
65class Client(aio.Resource): 66 """Client 67 68 For creating new client see `connect` coroutine. 69 70 """ 71 72 @property 73 def async_group(self) -> aio.Group: 74 """Async group""" 75 return self._async_group 76 77 @property 78 def state(self) -> json.Storage: 79 """Remote server state""" 80 return self._state 81 82 async def send(self, 83 name: str, 84 data: json.Data 85 ) -> json.Data: 86 """Send request and wait for response 87 88 Args: 89 name: request name 90 data: request payload 91 92 Raises: 93 JugglerError 94 ConnectionError 95 96 """ 97 if not self.is_open: 98 raise ConnectionError() 99 100 req_id = next(self._next_req_ids) 101 res_future = asyncio.Future() 102 self._res_futures[req_id] = res_future 103 104 try: 105 await self._ws.send_json({'type': 'request', 106 'id': req_id, 107 'name': name, 108 'data': data}) 109 return await res_future 110 111 finally: 112 self._res_futures.pop(req_id) 113 114 async def _receive_loop(self): 115 try: 116 while True: 117 msg_ws = await self._ws.receive() 118 if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING: 119 break 120 if msg_ws.type != aiohttp.WSMsgType.TEXT: 121 raise Exception("unsupported ws message type") 122 123 msg = json.decode(msg_ws.data) 124 125 if msg['type'] == 'response': 126 res_future = self._res_futures.get(msg['id']) 127 if not res_future or res_future.done(): 128 continue 129 130 if msg['success']: 131 res_future.set_result(msg['data']) 132 133 else: 134 res_future.set_exception(JugglerError(msg['data'])) 135 136 elif msg['type'] == 'state': 137 data = json.patch(self._state.data, msg['diff']) 138 self._state.set([], data) 139 140 elif msg['type'] == 'notify': 141 if not self._notify_cb: 142 continue 143 144 await aio.call(self._notify_cb, self, msg['name'], 145 msg['data']) 146 147 else: 148 raise Exception("invalid message type") 149 150 except ConnectionError: 151 pass 152 153 except Exception as e: 154 mlog.error("receive loop error: %s", e, exc_info=e) 155 156 finally: 157 self.close() 158 159 for f in self._res_futures.values(): 160 if not f.done(): 161 f.set_exception(ConnectionError()) 162 163 await aio.uncancellable(self._close_ws()) 164 165 async def _close_ws(self): 166 await self._ws.close() 167 await self._session.close()
Client
For creating new client see connect
coroutine.
72 @property 73 def async_group(self) -> aio.Group: 74 """Async group""" 75 return self._async_group
Async group
82 async def send(self, 83 name: str, 84 data: json.Data 85 ) -> json.Data: 86 """Send request and wait for response 87 88 Args: 89 name: request name 90 data: request payload 91 92 Raises: 93 JugglerError 94 ConnectionError 95 96 """ 97 if not self.is_open: 98 raise ConnectionError() 99 100 req_id = next(self._next_req_ids) 101 res_future = asyncio.Future() 102 self._res_futures[req_id] = res_future 103 104 try: 105 await self._ws.send_json({'type': 'request', 106 'id': req_id, 107 'name': name, 108 'data': data}) 109 return await res_future 110 111 finally: 112 self._res_futures.pop(req_id)
Send request and wait for response
Arguments:
- name: request name
- data: request payload
Raises:
- JugglerError
- ConnectionError
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
27async def listen(host: str, 28 port: int, 29 connection_cb: ConnectionCb | None = None, 30 request_cb: RequestCb | None = None, *, 31 ws_path: str = '/ws', 32 static_dir: pathlib.PurePath | None = None, 33 index_path: str | None = '/index.html', 34 pem_file: pathlib.PurePath | None = None, 35 autoflush_delay: float | None = 0.2, 36 shutdown_timeout: float = 0.1, 37 state: json.Storage | None = None, 38 parallel_requests: bool = False, 39 additional_routes: typing.Iterable[aiohttp.web.RouteDef] = [] 40 ) -> 'Server': 41 """Create listening server 42 43 Each time server receives new incoming juggler connection, `connection_cb` 44 is called with newly created connection. 45 46 For each connection, when server receives `request` message, `request_cb` 47 is called with associated connection, request name and request data. 48 If `request_cb` returns value, successful `response` message is sent 49 with resulting value as data. If `request_cb` raises exception, 50 unsuccessful `response` message is sent with raised exception as data. 51 If `request_cb` is ``None``, each `request` message causes sending 52 of unsuccessful `response` message. 53 54 If `static_dir` is set, server serves static files is addition to providing 55 juggler communication. 56 57 If `index_path` is set, request for url path ``/`` are redirected to 58 `index_path`. 59 60 If `pem_file` is set, server provides `https/wss` communication instead 61 of `http/ws` communication. 62 63 Argument `autoflush_delay` is associated with all connections associated 64 with this server. `autoflush_delay` defines maximum time delay for 65 automatic synchronization of `state` changes. If `autoflush_delay` is set 66 to ``None``, automatic synchronization is disabled and user is responsible 67 for calling :meth:`Connection.flush`. If `autoflush_delay` is set to ``0``, 68 synchronization of `state` is performed on each change of `state` data. 69 70 `shutdown_timeout` defines maximum time duration server will wait for 71 regular connection close procedures during server shutdown. All connections 72 that are not closed during this period are forcefully closed. 73 74 If `state` is ``None``, each connection is initialized with it's own 75 instance of server state. If `state` is set, provided state is shared 76 between all connections. 77 78 If `parallel_requests` is set to ``True``, incoming requests will be 79 processed in parallel - processing of subsequent requests can start (and 80 finish) before prior responses are generated. 81 82 Argument `additional_routes` can be used for providing addition aiohttp 83 route definitions handled by running web server. 84 85 Args: 86 host: listening hostname 87 port: listening TCP port 88 connection_cb: connection callback 89 request_cb: request callback 90 ws_path: WebSocket url path segment 91 static_dir: static files directory path 92 index_path: index path 93 pem_file: PEM file path 94 autoflush_delay: autoflush delay 95 shutdown_timeout: shutdown timeout 96 state: shared server state 97 parallel_requests: parallel request processing 98 additional_routes: additional route definitions 99 100 """ 101 server = Server() 102 server._connection_cb = connection_cb 103 server._request_cb = request_cb 104 server._autoflush_delay = autoflush_delay 105 server._state = state 106 server._parallel_requests = parallel_requests 107 server._async_group = aio.Group() 108 109 routes = [] 110 111 if index_path: 112 113 async def root_handler(request): 114 raise aiohttp.web.HTTPFound(index_path) 115 116 routes.append(aiohttp.web.get('/', root_handler)) 117 118 routes.append(aiohttp.web.get(ws_path, server._ws_handler)) 119 routes.extend(additional_routes) 120 121 if static_dir: 122 routes.append(aiohttp.web.static('/', static_dir)) 123 124 app = aiohttp.web.Application() 125 app.add_routes(routes) 126 runner = aiohttp.web.AppRunner(app) 127 await runner.setup() 128 server.async_group.spawn(aio.call_on_cancel, runner.cleanup) 129 130 try: 131 ssl_ctx = _create_ssl_context(pem_file) if pem_file else None 132 site = aiohttp.web.TCPSite(runner=runner, 133 host=host, 134 port=port, 135 shutdown_timeout=shutdown_timeout, 136 ssl_context=ssl_ctx, 137 reuse_address=True) 138 await site.start() 139 140 except BaseException: 141 await aio.uncancellable(server.async_close()) 142 raise 143 144 return server
Create listening server
Each time server receives new incoming juggler connection, connection_cb
is called with newly created connection.
For each connection, when server receives request
message, request_cb
is called with associated connection, request name and request data.
If request_cb
returns value, successful response
message is sent
with resulting value as data. If request_cb
raises exception,
unsuccessful response
message is sent with raised exception as data.
If request_cb
is None
, each request
message causes sending
of unsuccessful response
message.
If static_dir
is set, server serves static files is addition to providing
juggler communication.
If index_path
is set, request for url path /
are redirected to
index_path
.
If pem_file
is set, server provides https/wss
communication instead
of http/ws
communication.
Argument autoflush_delay
is associated with all connections associated
with this server. autoflush_delay
defines maximum time delay for
automatic synchronization of state
changes. If autoflush_delay
is set
to None
, automatic synchronization is disabled and user is responsible
for calling Connection.flush()
. If autoflush_delay
is set to 0
,
synchronization of state
is performed on each change of state
data.
shutdown_timeout
defines maximum time duration server will wait for
regular connection close procedures during server shutdown. All connections
that are not closed during this period are forcefully closed.
If state
is None
, each connection is initialized with it's own
instance of server state. If state
is set, provided state is shared
between all connections.
If parallel_requests
is set to True
, incoming requests will be
processed in parallel - processing of subsequent requests can start (and
finish) before prior responses are generated.
Argument additional_routes
can be used for providing addition aiohttp
route definitions handled by running web server.
Arguments:
- host: listening hostname
- port: listening TCP port
- connection_cb: connection callback
- request_cb: request callback
- ws_path: WebSocket url path segment
- static_dir: static files directory path
- index_path: index path
- pem_file: PEM file path
- autoflush_delay: autoflush delay
- shutdown_timeout: shutdown timeout
- state: shared server state
- parallel_requests: parallel request processing
- additional_routes: additional route definitions
147class Server(aio.Resource): 148 """Server 149 150 For creating new server see `listen` coroutine. 151 152 When server is closed, all incoming connections are also closed. 153 154 """ 155 156 @property 157 def async_group(self) -> aio.Group: 158 """Async group""" 159 return self._async_group 160 161 async def _ws_handler(self, request): 162 ws = aiohttp.web.WebSocketResponse() 163 await ws.prepare(request) 164 165 conn = Connection() 166 conn._ws = ws 167 conn._async_group = self.async_group.create_subgroup() 168 conn._request_cb = self._request_cb 169 conn._autoflush_delay = self._autoflush_delay 170 conn._state = self._state or json.Storage() 171 conn._parallel_requests = self._parallel_requests 172 conn._flush_queue = aio.Queue() 173 174 conn.async_group.spawn(conn._receive_loop) 175 conn.async_group.spawn(conn._sync_loop) 176 177 if self._connection_cb: 178 conn.async_group.spawn(aio.call, self._connection_cb, conn) 179 180 await conn.wait_closed() 181 182 return ws
Server
For creating new server see listen
coroutine.
When server is closed, all incoming connections are also closed.
156 @property 157 def async_group(self) -> aio.Group: 158 """Async group""" 159 return self._async_group
Async group
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close
185class Connection(aio.Resource): 186 """Connection 187 188 For creating new connection see `listen` coroutine. 189 190 """ 191 192 @property 193 def async_group(self) -> aio.Group: 194 """Async group""" 195 return self._async_group 196 197 @property 198 def state(self) -> json.Storage: 199 """Server state""" 200 return self._state 201 202 async def flush(self): 203 """Force synchronization of state data 204 205 Raises: 206 ConnectionError 207 208 """ 209 try: 210 flush_future = asyncio.Future() 211 self._flush_queue.put_nowait(flush_future) 212 await flush_future 213 214 except aio.QueueClosedError: 215 raise ConnectionError() 216 217 async def notify(self, 218 name: str, 219 data: json.Data): 220 """Send notification 221 222 Raises: 223 ConnectionError 224 225 """ 226 if not self.is_open: 227 raise ConnectionError() 228 229 await self._ws.send_str(json.encode({'type': 'notify', 230 'name': name, 231 'data': data})) 232 233 async def _receive_loop(self): 234 try: 235 while self.is_open: 236 msg_ws = await self._ws.receive() 237 if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING: 238 break 239 if msg_ws.type != aiohttp.WSMsgType.TEXT: 240 raise Exception("unsupported ws message type") 241 242 msg = json.decode(msg_ws.data) 243 244 if msg['type'] != 'request': 245 raise Exception("invalid message type") 246 247 if self._parallel_requests: 248 self.async_group.spawn(self._process_request, msg) 249 250 else: 251 await self._process_request(msg) 252 253 except ConnectionError: 254 pass 255 256 except Exception as e: 257 mlog.error("receive loop error: %s", e, exc_info=e) 258 259 finally: 260 self.close() 261 await aio.uncancellable(self._ws.close()) 262 263 async def _process_request(self, req): 264 try: 265 res = {'type': 'response', 266 'id': req['id']} 267 268 if req['name']: 269 try: 270 if not self._request_cb: 271 raise Exception('request handler not implemented') 272 273 res['data'] = await aio.call(self._request_cb, self, 274 req['name'], req['data']) 275 res['success'] = True 276 277 except Exception as e: 278 res['data'] = str(e) 279 res['success'] = False 280 281 else: 282 res['data'] = req['data'] 283 res['success'] = True 284 285 await self._ws.send_str(json.encode(res)) 286 287 except ConnectionError: 288 self.close() 289 290 except Exception as e: 291 self.close() 292 mlog.error("process request error: %s", e, exc_info=e) 293 294 async def _sync_loop(self): 295 flush_future = None 296 data = None 297 synced_data = None 298 data_queue = aio.Queue() 299 300 try: 301 with self._state.register_change_cb(data_queue.put_nowait): 302 data_queue.put_nowait(self._state.data) 303 304 if not self.is_open: 305 return 306 307 get_data_future = self.async_group.spawn(data_queue.get) 308 get_flush_future = self.async_group.spawn( 309 self._flush_queue.get) 310 311 while True: 312 await asyncio.wait([get_data_future, get_flush_future], 313 return_when=asyncio.FIRST_COMPLETED) 314 315 if get_flush_future.done(): 316 flush_future = get_flush_future.result() 317 get_flush_future = self.async_group.spawn( 318 self._flush_queue.get) 319 320 else: 321 await asyncio.wait([get_flush_future], 322 timeout=self._autoflush_delay) 323 324 if get_flush_future.done(): 325 flush_future = get_flush_future.result() 326 get_flush_future = self.async_group.spawn( 327 self._flush_queue.get) 328 329 else: 330 flush_future = None 331 332 if get_data_future.done(): 333 data = get_data_future.result() 334 get_data_future = self.async_group.spawn( 335 data_queue.get) 336 337 if self._autoflush_delay != 0: 338 if not data_queue.empty(): 339 data = data_queue.get_nowait_until_empty() 340 341 if synced_data is not data: 342 diff = json.diff(synced_data, data) 343 synced_data = data 344 345 if diff: 346 await self._ws.send_str(json.encode({ 347 'type': 'state', 348 'diff': diff})) 349 350 if flush_future and not flush_future.done(): 351 flush_future.set_result(True) 352 353 except Exception as e: 354 mlog.error("sync loop error: %s", e, exc_info=e) 355 356 finally: 357 self.close() 358 359 self._flush_queue.close() 360 while True: 361 if flush_future and not flush_future.done(): 362 flush_future.set_exception(ConnectionError()) 363 364 if self._flush_queue.empty(): 365 break 366 367 flush_future = self._flush_queue.get_nowait()
Connection
For creating new connection see listen
coroutine.
192 @property 193 def async_group(self) -> aio.Group: 194 """Async group""" 195 return self._async_group
Async group
202 async def flush(self): 203 """Force synchronization of state data 204 205 Raises: 206 ConnectionError 207 208 """ 209 try: 210 flush_future = asyncio.Future() 211 self._flush_queue.put_nowait(flush_future) 212 await flush_future 213 214 except aio.QueueClosedError: 215 raise ConnectionError()
Force synchronization of state data
Raises:
- ConnectionError
217 async def notify(self, 218 name: str, 219 data: json.Data): 220 """Send notification 221 222 Raises: 223 ConnectionError 224 225 """ 226 if not self.is_open: 227 raise ConnectionError() 228 229 await self._ws.send_str(json.encode({'type': 'notify', 230 'name': name, 231 'data': data}))
Send notification
Raises:
- ConnectionError
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close