hat.juggler
Juggler communication protocol
1"""Juggler communication protocol""" 2 3from hat.juggler.client import (NotifyCb, 4 JugglerError, 5 connect, 6 Client) 7from hat.juggler.server import (ConnectionCb, 8 RequestCb, 9 listen, 10 Server, 11 Connection) 12 13 14__all__ = ['NotifyCb', 15 'JugglerError', 16 'connect', 17 'Client', 18 'ConnectionCb', 19 'RequestCb', 20 'listen', 21 'Server', 22 'Connection']
24class JugglerError(Exception): 25 """Juggler error""" 26 27 def __init__(self, data: json.Data): 28 self.__data = data 29 30 @property 31 def data(self) -> json.Data: 32 """Error data""" 33 return self.__data
Juggler error
36async def connect(address: str, 37 notify_cb: NotifyCb | None = None, 38 *, 39 auth: aiohttp.BasicAuth | None = None, 40 ssl_ctx: ssl.SSLContext | None = None 41 ) -> 'Client': 42 """Connect to remote server 43 44 `address` represents remote WebSocket URL formated as 45 ``<schema>://<host>:<port>/<path>`` where ``<schema>`` is ``ws`` or 46 ``wss``. 47 48 """ 49 client = Client() 50 client._notify_cb = notify_cb 51 client._async_group = aio.Group() 52 client._state = json.Storage() 53 client._res_futures = {} 54 client._next_req_ids = itertools.count(1) 55 client._session = aiohttp.ClientSession() 56 57 try: 58 client._ws = await client._session.ws_connect(address, 59 auth=auth, 60 ssl=ssl_ctx or False, 61 max_msg_size=0) 62 63 except BaseException: 64 await aio.uncancellable(client._session.close()) 65 raise 66 67 client.async_group.spawn(client._receive_loop) 68 69 return client
Connect to remote server
address
represents remote WebSocket URL formated as
<schema>://<host>:<port>/<path>
where <schema>
is ws
or
wss
.
72class Client(aio.Resource): 73 """Client 74 75 For creating new client see `connect` coroutine. 76 77 """ 78 79 @property 80 def async_group(self) -> aio.Group: 81 """Async group""" 82 return self._async_group 83 84 @property 85 def state(self) -> json.Storage: 86 """Remote server state""" 87 return self._state 88 89 async def send(self, 90 name: str, 91 data: json.Data 92 ) -> json.Data: 93 """Send request and wait for response 94 95 Args: 96 name: request name 97 data: request payload 98 99 Raises: 100 JugglerError 101 ConnectionError 102 103 """ 104 if not self.is_open: 105 raise ConnectionError() 106 107 req_id = next(self._next_req_ids) 108 res_future = asyncio.Future() 109 self._res_futures[req_id] = res_future 110 111 try: 112 await self._ws.send_json({'type': 'request', 113 'id': req_id, 114 'name': name, 115 'data': data}) 116 return await res_future 117 118 finally: 119 self._res_futures.pop(req_id) 120 121 async def _receive_loop(self): 122 try: 123 while True: 124 msg_ws = await self._ws.receive() 125 if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING: 126 break 127 if msg_ws.type != aiohttp.WSMsgType.TEXT: 128 raise Exception("unsupported ws message type") 129 130 msg = json.decode(msg_ws.data) 131 132 if msg['type'] == 'response': 133 res_future = self._res_futures.get(msg['id']) 134 if not res_future or res_future.done(): 135 continue 136 137 if msg['success']: 138 res_future.set_result(msg['data']) 139 140 else: 141 res_future.set_exception(JugglerError(msg['data'])) 142 143 elif msg['type'] == 'state': 144 data = json.patch(self._state.data, msg['diff']) 145 self._state.set([], data) 146 147 elif msg['type'] == 'notify': 148 if not self._notify_cb: 149 continue 150 151 await aio.call(self._notify_cb, self, msg['name'], 152 msg['data']) 153 154 else: 155 raise Exception("invalid message type") 156 157 except ConnectionError: 158 pass 159 160 except Exception as e: 161 mlog.error("receive loop error: %s", e, exc_info=e) 162 163 finally: 164 self.close() 165 166 for f in self._res_futures.values(): 167 if not f.done(): 168 f.set_exception(ConnectionError()) 169 170 await aio.uncancellable(self._close_ws()) 171 172 async def _close_ws(self): 173 await self._ws.close() 174 await self._session.close()
Client
For creating new client see connect
coroutine.
79 @property 80 def async_group(self) -> aio.Group: 81 """Async group""" 82 return self._async_group
Async group
89 async def send(self, 90 name: str, 91 data: json.Data 92 ) -> json.Data: 93 """Send request and wait for response 94 95 Args: 96 name: request name 97 data: request payload 98 99 Raises: 100 JugglerError 101 ConnectionError 102 103 """ 104 if not self.is_open: 105 raise ConnectionError() 106 107 req_id = next(self._next_req_ids) 108 res_future = asyncio.Future() 109 self._res_futures[req_id] = res_future 110 111 try: 112 await self._ws.send_json({'type': 'request', 113 'id': req_id, 114 'name': name, 115 'data': data}) 116 return await res_future 117 118 finally: 119 self._res_futures.pop(req_id)
Send request and wait for response
Arguments:
- name: request name
- data: request payload
Raises:
- JugglerError
- ConnectionError
31async def listen(host: str, 32 port: int, 33 connection_cb: ConnectionCb | None = None, 34 request_cb: RequestCb | None = None, 35 *, 36 ws_path: str = '/ws', 37 static_dir: pathlib.PurePath | None = None, 38 index_path: str | None = '/index.html', 39 htpasswd_file: pathlib.PurePath | None = None, 40 ssl_ctx: ssl.SSLContext | None = None, 41 autoflush_delay: float | None = 0.2, 42 shutdown_timeout: float = 0.1, 43 state: json.Storage | None = None, 44 parallel_requests: bool = False, 45 additional_routes: Iterable[aiohttp.web.RouteDef] = [] 46 ) -> 'Server': 47 """Create listening server 48 49 Each time server receives new incoming juggler connection, `connection_cb` 50 is called with newly created connection. 51 52 For each connection, when server receives `request` message, `request_cb` 53 is called with associated connection, request name and request data. 54 If `request_cb` returns value, successful `response` message is sent 55 with resulting value as data. If `request_cb` raises exception, 56 unsuccessful `response` message is sent with raised exception as data. 57 If `request_cb` is ``None``, each `request` message causes sending 58 of unsuccessful `response` message. 59 60 If `static_dir` is set, server serves static files is addition to providing 61 juggler communication. 62 63 If `index_path` is set, request for url path ``/`` are redirected to 64 `index_path`. 65 66 If `htpasswd_file` is set, HTTP Basic Authentication is enabled. 67 All requests are checked for ``Authorization`` header and only users 68 specified by `htpassword_file` are accepted. `htpasswd_file` is read 69 during initialization and changes to it's content, after initialization 70 finishes, are not monitored. 71 72 If `ssl_ctx` is set, server provides `https/wss` communication instead 73 of `http/ws` communication. 74 75 Argument `autoflush_delay` is associated with all connections associated 76 with this server. `autoflush_delay` defines maximum time delay for 77 automatic synchronization of `state` changes. If `autoflush_delay` is set 78 to ``None``, automatic synchronization is disabled and user is responsible 79 for calling :meth:`Connection.flush`. If `autoflush_delay` is set to ``0``, 80 synchronization of `state` is performed on each change of `state` data. 81 82 `shutdown_timeout` defines maximum time duration server will wait for 83 regular connection close procedures during server shutdown. All connections 84 that are not closed during this period are forcefully closed. 85 86 If `state` is ``None``, each connection is initialized with it's own 87 instance of server state. If `state` is set, provided state is shared 88 between all connections. 89 90 If `parallel_requests` is set to ``True``, incoming requests will be 91 processed in parallel - processing of subsequent requests can start (and 92 finish) before prior responses are generated. 93 94 Argument `additional_routes` can be used for providing addition aiohttp 95 route definitions handled by running web server. 96 97 Args: 98 host: listening hostname 99 port: listening TCP port 100 connection_cb: connection callback 101 request_cb: request callback 102 ws_path: WebSocket url path segment 103 static_dir: static files directory path 104 index_path: index path 105 pem_file: PEM file path 106 autoflush_delay: autoflush delay 107 shutdown_timeout: shutdown timeout 108 state: shared server state 109 parallel_requests: parallel request processing 110 additional_routes: additional route definitions 111 112 """ 113 server = Server() 114 server._connection_cb = connection_cb 115 server._request_cb = request_cb 116 server._autoflush_delay = autoflush_delay 117 server._state = state 118 server._parallel_requests = parallel_requests 119 server._async_group = aio.Group() 120 121 middlewares = [] 122 123 if htpasswd_file: 124 middlewares.append(BasicAuthMiddleware(htpasswd_file)) 125 126 routes = [] 127 128 if index_path: 129 130 async def root_handler(request): 131 raise aiohttp.web.HTTPFound(index_path) 132 133 routes.append(aiohttp.web.get('/', root_handler)) 134 135 routes.append(aiohttp.web.get(ws_path, server._ws_handler)) 136 routes.extend(additional_routes) 137 138 if static_dir: 139 routes.append(aiohttp.web.static('/', static_dir)) 140 141 app = aiohttp.web.Application(middlewares=middlewares) 142 app.add_routes(routes) 143 runner = aiohttp.web.AppRunner(app, 144 shutdown_timeout=shutdown_timeout) 145 await runner.setup() 146 server.async_group.spawn(aio.call_on_cancel, runner.cleanup) 147 148 try: 149 site = aiohttp.web.TCPSite(runner=runner, 150 host=host, 151 port=port, 152 ssl_context=ssl_ctx, 153 reuse_address=True) 154 await site.start() 155 156 except BaseException: 157 await aio.uncancellable(server.async_close()) 158 raise 159 160 return server
Create listening server
Each time server receives new incoming juggler connection, connection_cb
is called with newly created connection.
For each connection, when server receives request
message, request_cb
is called with associated connection, request name and request data.
If request_cb
returns value, successful response
message is sent
with resulting value as data. If request_cb
raises exception,
unsuccessful response
message is sent with raised exception as data.
If request_cb
is None
, each request
message causes sending
of unsuccessful response
message.
If static_dir
is set, server serves static files is addition to providing
juggler communication.
If index_path
is set, request for url path /
are redirected to
index_path
.
If htpasswd_file
is set, HTTP Basic Authentication is enabled.
All requests are checked for Authorization
header and only users
specified by htpassword_file
are accepted. htpasswd_file
is read
during initialization and changes to it's content, after initialization
finishes, are not monitored.
If ssl_ctx
is set, server provides https/wss
communication instead
of http/ws
communication.
Argument autoflush_delay
is associated with all connections associated
with this server. autoflush_delay
defines maximum time delay for
automatic synchronization of state
changes. If autoflush_delay
is set
to None
, automatic synchronization is disabled and user is responsible
for calling Connection.flush()
. If autoflush_delay
is set to 0
,
synchronization of state
is performed on each change of state
data.
shutdown_timeout
defines maximum time duration server will wait for
regular connection close procedures during server shutdown. All connections
that are not closed during this period are forcefully closed.
If state
is None
, each connection is initialized with it's own
instance of server state. If state
is set, provided state is shared
between all connections.
If parallel_requests
is set to True
, incoming requests will be
processed in parallel - processing of subsequent requests can start (and
finish) before prior responses are generated.
Argument additional_routes
can be used for providing addition aiohttp
route definitions handled by running web server.
Arguments:
- host: listening hostname
- port: listening TCP port
- connection_cb: connection callback
- request_cb: request callback
- ws_path: WebSocket url path segment
- static_dir: static files directory path
- index_path: index path
- pem_file: PEM file path
- autoflush_delay: autoflush delay
- shutdown_timeout: shutdown timeout
- state: shared server state
- parallel_requests: parallel request processing
- additional_routes: additional route definitions
163class Server(aio.Resource): 164 """Server 165 166 For creating new server see `listen` coroutine. 167 168 When server is closed, all incoming connections are also closed. 169 170 """ 171 172 @property 173 def async_group(self) -> aio.Group: 174 """Async group""" 175 return self._async_group 176 177 async def _ws_handler(self, request): 178 ws = aiohttp.web.WebSocketResponse() 179 await ws.prepare(request) 180 181 conn = Connection() 182 conn._ws = ws 183 conn._remote = _get_remote(request) 184 conn._async_group = self.async_group.create_subgroup() 185 conn._request_cb = self._request_cb 186 conn._autoflush_delay = self._autoflush_delay 187 conn._state = self._state or json.Storage() 188 conn._parallel_requests = self._parallel_requests 189 conn._flush_queue = aio.Queue() 190 191 conn.async_group.spawn(conn._receive_loop) 192 conn.async_group.spawn(conn._sync_loop) 193 194 if self._connection_cb: 195 conn.async_group.spawn(aio.call, self._connection_cb, conn) 196 197 await conn.wait_closed() 198 199 return ws
Server
For creating new server see listen
coroutine.
When server is closed, all incoming connections are also closed.
202class Connection(aio.Resource): 203 """Connection 204 205 For creating new connection see `listen` coroutine. 206 207 """ 208 209 @property 210 def async_group(self) -> aio.Group: 211 """Async group""" 212 return self._async_group 213 214 @property 215 def remote(self) -> str: 216 """Remote IP address 217 218 Address is obtained from Forwarded or X-Forwarded-For headers. If 219 these headers are not available, socket's remote address is used. 220 221 """ 222 return self._remote 223 224 @property 225 def state(self) -> json.Storage: 226 """Server state""" 227 return self._state 228 229 async def flush(self): 230 """Force synchronization of state data 231 232 Raises: 233 ConnectionError 234 235 """ 236 try: 237 flush_future = asyncio.Future() 238 self._flush_queue.put_nowait(flush_future) 239 await flush_future 240 241 except aio.QueueClosedError: 242 raise ConnectionError() 243 244 async def notify(self, 245 name: str, 246 data: json.Data): 247 """Send notification 248 249 Raises: 250 ConnectionError 251 252 """ 253 if not self.is_open: 254 raise ConnectionError() 255 256 await self._ws.send_str(json.encode({'type': 'notify', 257 'name': name, 258 'data': data})) 259 260 async def _receive_loop(self): 261 try: 262 while self.is_open: 263 msg_ws = await self._ws.receive() 264 if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING: 265 break 266 if msg_ws.type != aiohttp.WSMsgType.TEXT: 267 raise Exception("unsupported ws message type") 268 269 msg = json.decode(msg_ws.data) 270 271 if msg['type'] != 'request': 272 raise Exception("invalid message type") 273 274 if self._parallel_requests: 275 self.async_group.spawn(self._process_request, msg) 276 277 else: 278 await self._process_request(msg) 279 280 except ConnectionError: 281 pass 282 283 except Exception as e: 284 mlog.error("receive loop error: %s", e, exc_info=e) 285 286 finally: 287 self.close() 288 await aio.uncancellable(self._ws.close()) 289 290 async def _process_request(self, req): 291 try: 292 res = {'type': 'response', 293 'id': req['id']} 294 295 if req['name']: 296 try: 297 if not self._request_cb: 298 raise Exception('request handler not implemented') 299 300 res['data'] = await aio.call(self._request_cb, self, 301 req['name'], req['data']) 302 res['success'] = True 303 304 except Exception as e: 305 res['data'] = str(e) 306 res['success'] = False 307 308 else: 309 res['data'] = req['data'] 310 res['success'] = True 311 312 await self._ws.send_str(json.encode(res)) 313 314 except ConnectionError: 315 self.close() 316 317 except Exception as e: 318 self.close() 319 mlog.error("process request error: %s", e, exc_info=e) 320 321 async def _sync_loop(self): 322 flush_future = None 323 data = None 324 synced_data = None 325 data_queue = aio.Queue() 326 327 try: 328 with self._state.register_change_cb(data_queue.put_nowait): 329 data_queue.put_nowait(self._state.data) 330 331 if not self.is_open: 332 return 333 334 get_data_future = self.async_group.spawn(data_queue.get) 335 get_flush_future = self.async_group.spawn( 336 self._flush_queue.get) 337 338 while True: 339 await asyncio.wait([get_data_future, get_flush_future], 340 return_when=asyncio.FIRST_COMPLETED) 341 342 if get_flush_future.done(): 343 flush_future = get_flush_future.result() 344 get_flush_future = self.async_group.spawn( 345 self._flush_queue.get) 346 347 else: 348 await asyncio.wait([get_flush_future], 349 timeout=self._autoflush_delay) 350 351 if get_flush_future.done(): 352 flush_future = get_flush_future.result() 353 get_flush_future = self.async_group.spawn( 354 self._flush_queue.get) 355 356 else: 357 flush_future = None 358 359 if get_data_future.done(): 360 data = get_data_future.result() 361 get_data_future = self.async_group.spawn( 362 data_queue.get) 363 364 if self._autoflush_delay != 0: 365 if not data_queue.empty(): 366 data = data_queue.get_nowait_until_empty() 367 368 if synced_data is not data: 369 diff = json.diff(synced_data, data) 370 synced_data = data 371 372 if diff: 373 await self._ws.send_str(json.encode({ 374 'type': 'state', 375 'diff': diff})) 376 377 if flush_future and not flush_future.done(): 378 flush_future.set_result(True) 379 380 except Exception as e: 381 mlog.error("sync loop error: %s", e, exc_info=e) 382 383 finally: 384 self.close() 385 386 self._flush_queue.close() 387 while True: 388 if flush_future and not flush_future.done(): 389 flush_future.set_exception(ConnectionError()) 390 391 if self._flush_queue.empty(): 392 break 393 394 flush_future = self._flush_queue.get_nowait()
Connection
For creating new connection see listen
coroutine.
209 @property 210 def async_group(self) -> aio.Group: 211 """Async group""" 212 return self._async_group
Async group
214 @property 215 def remote(self) -> str: 216 """Remote IP address 217 218 Address is obtained from Forwarded or X-Forwarded-For headers. If 219 these headers are not available, socket's remote address is used. 220 221 """ 222 return self._remote
Remote IP address
Address is obtained from Forwarded or X-Forwarded-For headers. If these headers are not available, socket's remote address is used.
229 async def flush(self): 230 """Force synchronization of state data 231 232 Raises: 233 ConnectionError 234 235 """ 236 try: 237 flush_future = asyncio.Future() 238 self._flush_queue.put_nowait(flush_future) 239 await flush_future 240 241 except aio.QueueClosedError: 242 raise ConnectionError()
Force synchronization of state data
Raises:
- ConnectionError
244 async def notify(self, 245 name: str, 246 data: json.Data): 247 """Send notification 248 249 Raises: 250 ConnectionError 251 252 """ 253 if not self.is_open: 254 raise ConnectionError() 255 256 await self._ws.send_str(json.encode({'type': 'notify', 257 'name': name, 258 'data': data}))
Send notification
Raises:
- ConnectionError