hat.juggler
Juggler communication protocol
1"""Juggler communication protocol""" 2 3from hat.juggler.client import (NotifyCb, 4 JugglerError, 5 connect, 6 Client) 7from hat.juggler.server import (ConnectionCb, 8 RequestCb, 9 listen, 10 Server, 11 Connection) 12 13 14__all__ = ['NotifyCb', 15 'JugglerError', 16 'connect', 17 'Client', 18 'ConnectionCb', 19 'RequestCb', 20 'listen', 21 'Server', 22 'Connection']
24class JugglerError(Exception): 25 """Juggler error""" 26 27 def __init__(self, data: json.Data): 28 self.__data = data 29 30 @property 31 def data(self) -> json.Data: 32 """Error data""" 33 return self.__data
Juggler error
36async def connect(address: str, 37 notify_cb: NotifyCb | None = None, 38 *, 39 auth: aiohttp.BasicAuth | None = None, 40 ssl_ctx: ssl.SSLContext | None = None 41 ) -> 'Client': 42 """Connect to remote server 43 44 `address` represents remote WebSocket URL formated as 45 ``<schema>://<host>:<port>/<path>`` where ``<schema>`` is ``ws`` or 46 ``wss``. 47 48 """ 49 client = Client() 50 client._notify_cb = notify_cb 51 client._async_group = aio.Group() 52 client._state = json.Storage() 53 client._res_futures = {} 54 client._next_req_ids = itertools.count(1) 55 client._session = aiohttp.ClientSession() 56 57 try: 58 client._ws = await client._session.ws_connect(address, 59 auth=auth, 60 ssl=ssl_ctx or False, 61 max_msg_size=0) 62 63 except BaseException: 64 await aio.uncancellable(client._session.close()) 65 raise 66 67 client.async_group.spawn(client._receive_loop) 68 69 return client
Connect to remote server
address
represents remote WebSocket URL formated as
<schema>://<host>:<port>/<path>
where <schema>
is ws
or
wss
.
72class Client(aio.Resource): 73 """Client 74 75 For creating new client see `connect` coroutine. 76 77 """ 78 79 @property 80 def async_group(self) -> aio.Group: 81 """Async group""" 82 return self._async_group 83 84 @property 85 def state(self) -> json.Storage: 86 """Remote server state""" 87 return self._state 88 89 async def send(self, 90 name: str, 91 data: json.Data 92 ) -> json.Data: 93 """Send request and wait for response 94 95 Args: 96 name: request name 97 data: request payload 98 99 Raises: 100 JugglerError 101 ConnectionError 102 103 """ 104 if not self.is_open: 105 raise ConnectionError() 106 107 req_id = next(self._next_req_ids) 108 res_future = asyncio.Future() 109 self._res_futures[req_id] = res_future 110 111 try: 112 await self._ws.send_json({'type': 'request', 113 'id': req_id, 114 'name': name, 115 'data': data}) 116 return await res_future 117 118 finally: 119 self._res_futures.pop(req_id) 120 121 async def _receive_loop(self): 122 try: 123 while True: 124 msg_ws = await self._ws.receive() 125 if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING: 126 break 127 if msg_ws.type != aiohttp.WSMsgType.TEXT: 128 raise Exception("unsupported ws message type") 129 130 msg = json.decode(msg_ws.data) 131 132 if msg['type'] == 'response': 133 res_future = self._res_futures.get(msg['id']) 134 if not res_future or res_future.done(): 135 continue 136 137 if msg['success']: 138 res_future.set_result(msg['data']) 139 140 else: 141 res_future.set_exception(JugglerError(msg['data'])) 142 143 elif msg['type'] == 'state': 144 data = json.patch(self._state.data, msg['diff']) 145 self._state.set([], data) 146 147 elif msg['type'] == 'notify': 148 if not self._notify_cb: 149 continue 150 151 await aio.call(self._notify_cb, self, msg['name'], 152 msg['data']) 153 154 else: 155 raise Exception("invalid message type") 156 157 except ConnectionError: 158 pass 159 160 except Exception as e: 161 mlog.error("receive loop error: %s", e, exc_info=e) 162 163 finally: 164 self.close() 165 166 for f in self._res_futures.values(): 167 if not f.done(): 168 f.set_exception(ConnectionError()) 169 170 await aio.uncancellable(self._close_ws()) 171 172 async def _close_ws(self): 173 await self._ws.close() 174 await self._session.close()
Client
For creating new client see connect
coroutine.
79 @property 80 def async_group(self) -> aio.Group: 81 """Async group""" 82 return self._async_group
Async group
89 async def send(self, 90 name: str, 91 data: json.Data 92 ) -> json.Data: 93 """Send request and wait for response 94 95 Args: 96 name: request name 97 data: request payload 98 99 Raises: 100 JugglerError 101 ConnectionError 102 103 """ 104 if not self.is_open: 105 raise ConnectionError() 106 107 req_id = next(self._next_req_ids) 108 res_future = asyncio.Future() 109 self._res_futures[req_id] = res_future 110 111 try: 112 await self._ws.send_json({'type': 'request', 113 'id': req_id, 114 'name': name, 115 'data': data}) 116 return await res_future 117 118 finally: 119 self._res_futures.pop(req_id)
Send request and wait for response
Arguments:
- name: request name
- data: request payload
Raises:
- JugglerError
- ConnectionError
31async def listen(host: str, 32 port: int, 33 connection_cb: ConnectionCb | None = None, 34 request_cb: RequestCb | None = None, 35 *, 36 ws_path: str = '/ws', 37 static_dir: pathlib.PurePath | None = None, 38 index_path: str | None = '/index.html', 39 htpasswd_file: pathlib.PurePath | None = None, 40 pem_file: pathlib.PurePath | None = None, 41 autoflush_delay: float | None = 0.2, 42 shutdown_timeout: float = 0.1, 43 state: json.Storage | None = None, 44 parallel_requests: bool = False, 45 additional_routes: Iterable[aiohttp.web.RouteDef] = [] 46 ) -> 'Server': 47 """Create listening server 48 49 Each time server receives new incoming juggler connection, `connection_cb` 50 is called with newly created connection. 51 52 For each connection, when server receives `request` message, `request_cb` 53 is called with associated connection, request name and request data. 54 If `request_cb` returns value, successful `response` message is sent 55 with resulting value as data. If `request_cb` raises exception, 56 unsuccessful `response` message is sent with raised exception as data. 57 If `request_cb` is ``None``, each `request` message causes sending 58 of unsuccessful `response` message. 59 60 If `static_dir` is set, server serves static files is addition to providing 61 juggler communication. 62 63 If `index_path` is set, request for url path ``/`` are redirected to 64 `index_path`. 65 66 If `htpasswd_file` is set, HTTP Basic Authentication is enabled. 67 All requests are checked for ``Authorization`` header and only users 68 specified by `htpassword_file` are accepted. `htpasswd_file` is read 69 during initialization and changes to it's content, after initialization 70 finishes, are not monitored. 71 72 If `pem_file` is set, server provides `https/wss` communication instead 73 of `http/ws` communication. 74 75 Argument `autoflush_delay` is associated with all connections associated 76 with this server. `autoflush_delay` defines maximum time delay for 77 automatic synchronization of `state` changes. If `autoflush_delay` is set 78 to ``None``, automatic synchronization is disabled and user is responsible 79 for calling :meth:`Connection.flush`. If `autoflush_delay` is set to ``0``, 80 synchronization of `state` is performed on each change of `state` data. 81 82 `shutdown_timeout` defines maximum time duration server will wait for 83 regular connection close procedures during server shutdown. All connections 84 that are not closed during this period are forcefully closed. 85 86 If `state` is ``None``, each connection is initialized with it's own 87 instance of server state. If `state` is set, provided state is shared 88 between all connections. 89 90 If `parallel_requests` is set to ``True``, incoming requests will be 91 processed in parallel - processing of subsequent requests can start (and 92 finish) before prior responses are generated. 93 94 Argument `additional_routes` can be used for providing addition aiohttp 95 route definitions handled by running web server. 96 97 Args: 98 host: listening hostname 99 port: listening TCP port 100 connection_cb: connection callback 101 request_cb: request callback 102 ws_path: WebSocket url path segment 103 static_dir: static files directory path 104 index_path: index path 105 pem_file: PEM file path 106 autoflush_delay: autoflush delay 107 shutdown_timeout: shutdown timeout 108 state: shared server state 109 parallel_requests: parallel request processing 110 additional_routes: additional route definitions 111 112 """ 113 server = Server() 114 server._connection_cb = connection_cb 115 server._request_cb = request_cb 116 server._autoflush_delay = autoflush_delay 117 server._state = state 118 server._parallel_requests = parallel_requests 119 server._async_group = aio.Group() 120 121 middlewares = [] 122 123 if htpasswd_file: 124 middlewares.append(BasicAuthMiddleware(htpasswd_file)) 125 126 routes = [] 127 128 if index_path: 129 130 async def root_handler(request): 131 raise aiohttp.web.HTTPFound(index_path) 132 133 routes.append(aiohttp.web.get('/', root_handler)) 134 135 routes.append(aiohttp.web.get(ws_path, server._ws_handler)) 136 routes.extend(additional_routes) 137 138 if static_dir: 139 routes.append(aiohttp.web.static('/', static_dir)) 140 141 app = aiohttp.web.Application(middlewares=middlewares) 142 app.add_routes(routes) 143 runner = aiohttp.web.AppRunner(app, 144 shutdown_timeout=shutdown_timeout) 145 await runner.setup() 146 server.async_group.spawn(aio.call_on_cancel, runner.cleanup) 147 148 try: 149 ssl_ctx = _create_ssl_context(pem_file) if pem_file else None 150 site = aiohttp.web.TCPSite(runner=runner, 151 host=host, 152 port=port, 153 ssl_context=ssl_ctx, 154 reuse_address=True) 155 await site.start() 156 157 except BaseException: 158 await aio.uncancellable(server.async_close()) 159 raise 160 161 return server
Create listening server
Each time server receives new incoming juggler connection, connection_cb
is called with newly created connection.
For each connection, when server receives request
message, request_cb
is called with associated connection, request name and request data.
If request_cb
returns value, successful response
message is sent
with resulting value as data. If request_cb
raises exception,
unsuccessful response
message is sent with raised exception as data.
If request_cb
is None
, each request
message causes sending
of unsuccessful response
message.
If static_dir
is set, server serves static files is addition to providing
juggler communication.
If index_path
is set, request for url path /
are redirected to
index_path
.
If htpasswd_file
is set, HTTP Basic Authentication is enabled.
All requests are checked for Authorization
header and only users
specified by htpassword_file
are accepted. htpasswd_file
is read
during initialization and changes to it's content, after initialization
finishes, are not monitored.
If pem_file
is set, server provides https/wss
communication instead
of http/ws
communication.
Argument autoflush_delay
is associated with all connections associated
with this server. autoflush_delay
defines maximum time delay for
automatic synchronization of state
changes. If autoflush_delay
is set
to None
, automatic synchronization is disabled and user is responsible
for calling Connection.flush()
. If autoflush_delay
is set to 0
,
synchronization of state
is performed on each change of state
data.
shutdown_timeout
defines maximum time duration server will wait for
regular connection close procedures during server shutdown. All connections
that are not closed during this period are forcefully closed.
If state
is None
, each connection is initialized with it's own
instance of server state. If state
is set, provided state is shared
between all connections.
If parallel_requests
is set to True
, incoming requests will be
processed in parallel - processing of subsequent requests can start (and
finish) before prior responses are generated.
Argument additional_routes
can be used for providing addition aiohttp
route definitions handled by running web server.
Arguments:
- host: listening hostname
- port: listening TCP port
- connection_cb: connection callback
- request_cb: request callback
- ws_path: WebSocket url path segment
- static_dir: static files directory path
- index_path: index path
- pem_file: PEM file path
- autoflush_delay: autoflush delay
- shutdown_timeout: shutdown timeout
- state: shared server state
- parallel_requests: parallel request processing
- additional_routes: additional route definitions
164class Server(aio.Resource): 165 """Server 166 167 For creating new server see `listen` coroutine. 168 169 When server is closed, all incoming connections are also closed. 170 171 """ 172 173 @property 174 def async_group(self) -> aio.Group: 175 """Async group""" 176 return self._async_group 177 178 async def _ws_handler(self, request): 179 ws = aiohttp.web.WebSocketResponse() 180 await ws.prepare(request) 181 182 conn = Connection() 183 conn._ws = ws 184 conn._remote = _get_remote(request) 185 conn._async_group = self.async_group.create_subgroup() 186 conn._request_cb = self._request_cb 187 conn._autoflush_delay = self._autoflush_delay 188 conn._state = self._state or json.Storage() 189 conn._parallel_requests = self._parallel_requests 190 conn._flush_queue = aio.Queue() 191 192 conn.async_group.spawn(conn._receive_loop) 193 conn.async_group.spawn(conn._sync_loop) 194 195 if self._connection_cb: 196 conn.async_group.spawn(aio.call, self._connection_cb, conn) 197 198 await conn.wait_closed() 199 200 return ws
Server
For creating new server see listen
coroutine.
When server is closed, all incoming connections are also closed.
203class Connection(aio.Resource): 204 """Connection 205 206 For creating new connection see `listen` coroutine. 207 208 """ 209 210 @property 211 def async_group(self) -> aio.Group: 212 """Async group""" 213 return self._async_group 214 215 @property 216 def remote(self) -> str: 217 """Remote IP address 218 219 Address is obtained from Forwarded or X-Forwarded-For headers. If 220 these headers are not available, socket's remote address is used. 221 222 """ 223 return self._remote 224 225 @property 226 def state(self) -> json.Storage: 227 """Server state""" 228 return self._state 229 230 async def flush(self): 231 """Force synchronization of state data 232 233 Raises: 234 ConnectionError 235 236 """ 237 try: 238 flush_future = asyncio.Future() 239 self._flush_queue.put_nowait(flush_future) 240 await flush_future 241 242 except aio.QueueClosedError: 243 raise ConnectionError() 244 245 async def notify(self, 246 name: str, 247 data: json.Data): 248 """Send notification 249 250 Raises: 251 ConnectionError 252 253 """ 254 if not self.is_open: 255 raise ConnectionError() 256 257 await self._ws.send_str(json.encode({'type': 'notify', 258 'name': name, 259 'data': data})) 260 261 async def _receive_loop(self): 262 try: 263 while self.is_open: 264 msg_ws = await self._ws.receive() 265 if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING: 266 break 267 if msg_ws.type != aiohttp.WSMsgType.TEXT: 268 raise Exception("unsupported ws message type") 269 270 msg = json.decode(msg_ws.data) 271 272 if msg['type'] != 'request': 273 raise Exception("invalid message type") 274 275 if self._parallel_requests: 276 self.async_group.spawn(self._process_request, msg) 277 278 else: 279 await self._process_request(msg) 280 281 except ConnectionError: 282 pass 283 284 except Exception as e: 285 mlog.error("receive loop error: %s", e, exc_info=e) 286 287 finally: 288 self.close() 289 await aio.uncancellable(self._ws.close()) 290 291 async def _process_request(self, req): 292 try: 293 res = {'type': 'response', 294 'id': req['id']} 295 296 if req['name']: 297 try: 298 if not self._request_cb: 299 raise Exception('request handler not implemented') 300 301 res['data'] = await aio.call(self._request_cb, self, 302 req['name'], req['data']) 303 res['success'] = True 304 305 except Exception as e: 306 res['data'] = str(e) 307 res['success'] = False 308 309 else: 310 res['data'] = req['data'] 311 res['success'] = True 312 313 await self._ws.send_str(json.encode(res)) 314 315 except ConnectionError: 316 self.close() 317 318 except Exception as e: 319 self.close() 320 mlog.error("process request error: %s", e, exc_info=e) 321 322 async def _sync_loop(self): 323 flush_future = None 324 data = None 325 synced_data = None 326 data_queue = aio.Queue() 327 328 try: 329 with self._state.register_change_cb(data_queue.put_nowait): 330 data_queue.put_nowait(self._state.data) 331 332 if not self.is_open: 333 return 334 335 get_data_future = self.async_group.spawn(data_queue.get) 336 get_flush_future = self.async_group.spawn( 337 self._flush_queue.get) 338 339 while True: 340 await asyncio.wait([get_data_future, get_flush_future], 341 return_when=asyncio.FIRST_COMPLETED) 342 343 if get_flush_future.done(): 344 flush_future = get_flush_future.result() 345 get_flush_future = self.async_group.spawn( 346 self._flush_queue.get) 347 348 else: 349 await asyncio.wait([get_flush_future], 350 timeout=self._autoflush_delay) 351 352 if get_flush_future.done(): 353 flush_future = get_flush_future.result() 354 get_flush_future = self.async_group.spawn( 355 self._flush_queue.get) 356 357 else: 358 flush_future = None 359 360 if get_data_future.done(): 361 data = get_data_future.result() 362 get_data_future = self.async_group.spawn( 363 data_queue.get) 364 365 if self._autoflush_delay != 0: 366 if not data_queue.empty(): 367 data = data_queue.get_nowait_until_empty() 368 369 if synced_data is not data: 370 diff = json.diff(synced_data, data) 371 synced_data = data 372 373 if diff: 374 await self._ws.send_str(json.encode({ 375 'type': 'state', 376 'diff': diff})) 377 378 if flush_future and not flush_future.done(): 379 flush_future.set_result(True) 380 381 except Exception as e: 382 mlog.error("sync loop error: %s", e, exc_info=e) 383 384 finally: 385 self.close() 386 387 self._flush_queue.close() 388 while True: 389 if flush_future and not flush_future.done(): 390 flush_future.set_exception(ConnectionError()) 391 392 if self._flush_queue.empty(): 393 break 394 395 flush_future = self._flush_queue.get_nowait()
Connection
For creating new connection see listen
coroutine.
210 @property 211 def async_group(self) -> aio.Group: 212 """Async group""" 213 return self._async_group
Async group
215 @property 216 def remote(self) -> str: 217 """Remote IP address 218 219 Address is obtained from Forwarded or X-Forwarded-For headers. If 220 these headers are not available, socket's remote address is used. 221 222 """ 223 return self._remote
Remote IP address
Address is obtained from Forwarded or X-Forwarded-For headers. If these headers are not available, socket's remote address is used.
230 async def flush(self): 231 """Force synchronization of state data 232 233 Raises: 234 ConnectionError 235 236 """ 237 try: 238 flush_future = asyncio.Future() 239 self._flush_queue.put_nowait(flush_future) 240 await flush_future 241 242 except aio.QueueClosedError: 243 raise ConnectionError()
Force synchronization of state data
Raises:
- ConnectionError
245 async def notify(self, 246 name: str, 247 data: json.Data): 248 """Send notification 249 250 Raises: 251 ConnectionError 252 253 """ 254 if not self.is_open: 255 raise ConnectionError() 256 257 await self._ws.send_str(json.encode({'type': 'notify', 258 'name': name, 259 'data': data}))
Send notification
Raises:
- ConnectionError