hat.juggler
Juggler communication protocol
1"""Juggler communication protocol""" 2 3from hat.juggler.client import (NotifyCb, 4 JugglerError, 5 connect, 6 Client) 7from hat.juggler.server import (ConnectionCb, 8 RequestCb, 9 listen, 10 Server, 11 Connection) 12 13 14__all__ = ['NotifyCb', 15 'JugglerError', 16 'connect', 17 'Client', 18 'ConnectionCb', 19 'RequestCb', 20 'listen', 21 'Server', 22 'Connection']
26class JugglerError(Exception): 27 """Juggler error""" 28 29 def __init__(self, data: json.Data): 30 self.__data = data 31 32 @property 33 def data(self) -> json.Data: 34 """Error data""" 35 return self.__data
Juggler error
38async def connect(address: str, 39 notify_cb: NotifyCb | None = None, 40 *, 41 auth: aiohttp.BasicAuth | None = None, 42 ssl_ctx: ssl.SSLContext | None = None, 43 send_queue_size: int = 1024, 44 max_segment_size: int = 64 * 1024, 45 ping_delay: float = 30, 46 ping_timeout: float = 30 47 ) -> 'Client': 48 """Connect to remote server 49 50 `address` represents remote WebSocket URL formated as 51 ``<schema>://<host>:<port>/<path>`` where ``<schema>`` is ``ws`` or 52 ``wss``. 53 54 """ 55 client = Client() 56 client._notify_cb = notify_cb 57 client._loop = asyncio.get_running_loop() 58 client._async_group = aio.Group() 59 client._state = json.Storage() 60 client._res_futures = {} 61 client._next_req_ids = itertools.count(1) 62 client._session = aiohttp.ClientSession() 63 64 try: 65 ws = await client._session.ws_connect(address, 66 auth=auth, 67 ssl=ssl_ctx or False, 68 max_msg_size=0) 69 70 except BaseException: 71 await aio.uncancellable(client._session.close()) 72 raise 73 74 client._transport = Transport(ws=ws, 75 msg_cb=client._on_msg, 76 send_queue_size=send_queue_size, 77 max_segment_size=max_segment_size, 78 ping_delay=ping_delay, 79 ping_timeout=ping_timeout) 80 81 client.async_group.spawn(aio.call_on_cancel, client._on_close) 82 client.async_group.spawn(aio.call_on_done, 83 client._transport.wait_closing(), client.close) 84 85 return client
Connect to remote server
address
represents remote WebSocket URL formated as
<schema>://<host>:<port>/<path>
where <schema>
is ws
or
wss
.
88class Client(aio.Resource): 89 """Client 90 91 For creating new client see `connect` coroutine. 92 93 """ 94 95 @property 96 def async_group(self) -> aio.Group: 97 """Async group""" 98 return self._async_group 99 100 @property 101 def state(self) -> json.Storage: 102 """Remote server state""" 103 return self._state 104 105 async def send(self, 106 name: str, 107 data: json.Data 108 ) -> json.Data: 109 """Send request and wait for response 110 111 Args: 112 name: request name 113 data: request payload 114 115 Raises: 116 JugglerError 117 ConnectionError 118 119 """ 120 if not self.is_open: 121 raise ConnectionError() 122 123 req_id = next(self._next_req_ids) 124 res_future = self._loop.create_future() 125 self._res_futures[req_id] = res_future 126 127 try: 128 await self._transport.send({'type': 'request', 129 'id': req_id, 130 'name': name, 131 'data': data}) 132 return await res_future 133 134 finally: 135 self._res_futures.pop(req_id) 136 137 async def _on_close(self): 138 for f in self._res_futures.values(): 139 if not f.done(): 140 f.set_exception(ConnectionError()) 141 142 await self._transport.async_close() 143 await self._session.close() 144 145 async def _on_msg(self, msg): 146 if msg['type'] == 'response': 147 res_future = self._res_futures.get(msg['id']) 148 if not res_future or res_future.done(): 149 return 150 151 if msg['success']: 152 res_future.set_result(msg['data']) 153 154 else: 155 res_future.set_exception(JugglerError(msg['data'])) 156 157 elif msg['type'] == 'state': 158 data = json.patch(self._state.data, msg['diff']) 159 self._state.set([], data) 160 161 elif msg['type'] == 'notify': 162 if not self._notify_cb: 163 return 164 165 await aio.call(self._notify_cb, self, msg['name'], 166 msg['data']) 167 168 else: 169 raise Exception("invalid message type")
Client
For creating new client see connect
coroutine.
95 @property 96 def async_group(self) -> aio.Group: 97 """Async group""" 98 return self._async_group
Async group
100 @property 101 def state(self) -> json.Storage: 102 """Remote server state""" 103 return self._state
Remote server state
105 async def send(self, 106 name: str, 107 data: json.Data 108 ) -> json.Data: 109 """Send request and wait for response 110 111 Args: 112 name: request name 113 data: request payload 114 115 Raises: 116 JugglerError 117 ConnectionError 118 119 """ 120 if not self.is_open: 121 raise ConnectionError() 122 123 req_id = next(self._next_req_ids) 124 res_future = self._loop.create_future() 125 self._res_futures[req_id] = res_future 126 127 try: 128 await self._transport.send({'type': 'request', 129 'id': req_id, 130 'name': name, 131 'data': data}) 132 return await res_future 133 134 finally: 135 self._res_futures.pop(req_id)
Send request and wait for response
Arguments:
- name: request name
- data: request payload
Raises:
- JugglerError
- ConnectionError
32async def listen(host: str, 33 port: int, 34 connection_cb: ConnectionCb | None = None, 35 request_cb: RequestCb | None = None, 36 *, 37 ws_path: str = '/ws', 38 static_dir: pathlib.PurePath | None = None, 39 index_path: str | None = '/index.html', 40 htpasswd_file: pathlib.PurePath | None = None, 41 ssl_ctx: ssl.SSLContext | None = None, 42 autoflush_delay: float | None = 0.2, 43 shutdown_timeout: float = 0.1, 44 state: json.Storage | None = None, 45 parallel_requests: bool = False, 46 additional_routes: Iterable[aiohttp.web.RouteDef] = [], 47 send_queue_size: int = 1024, 48 max_segment_size: int = 64 * 1024, 49 ping_delay: float = 30, 50 ping_timeout: float = 30, 51 no_cache: bool = True 52 ) -> 'Server': 53 """Create listening server 54 55 Each time server receives new incoming juggler connection, `connection_cb` 56 is called with newly created connection. 57 58 For each connection, when server receives `request` message, `request_cb` 59 is called with associated connection, request name and request data. 60 If `request_cb` returns value, successful `response` message is sent 61 with resulting value as data. If `request_cb` raises exception, 62 unsuccessful `response` message is sent with raised exception as data. 63 If `request_cb` is ``None``, each `request` message causes sending 64 of unsuccessful `response` message. 65 66 If `static_dir` is set, server serves static files is addition to providing 67 juggler communication. 68 69 If `index_path` is set, request for url path ``/`` are redirected to 70 `index_path`. 71 72 If `htpasswd_file` is set, HTTP Basic Authentication is enabled. 73 All requests are checked for ``Authorization`` header and only users 74 specified by `htpassword_file` are accepted. `htpasswd_file` is read 75 during initialization and changes to it's content, after initialization 76 finishes, are not monitored. 77 78 If `ssl_ctx` is set, server provides `https/wss` communication instead 79 of `http/ws` communication. 80 81 Argument `autoflush_delay` is associated with all connections associated 82 with this server. `autoflush_delay` defines maximum time delay for 83 automatic synchronization of `state` changes. If `autoflush_delay` is set 84 to ``None``, automatic synchronization is disabled and user is responsible 85 for calling :meth:`Connection.flush`. If `autoflush_delay` is set to ``0``, 86 synchronization of `state` is performed on each change of `state` data. 87 88 `shutdown_timeout` defines maximum time duration server will wait for 89 regular connection close procedures during server shutdown. All connections 90 that are not closed during this period are forcefully closed. 91 92 If `state` is ``None``, each connection is initialized with it's own 93 instance of server state. If `state` is set, provided state is shared 94 between all connections. 95 96 If `parallel_requests` is set to ``True``, incoming requests will be 97 processed in parallel - processing of subsequent requests can start (and 98 finish) before prior responses are generated. 99 100 Argument `additional_routes` can be used for providing addition aiohttp 101 route definitions handled by running web server. 102 103 `send_queue_size` limits number of messages that can be put in send queue. 104 This limit can impact blocking of :meth:`Connection.notify`. 105 106 `max_segment_size` limits maximum size of single segment 107 (transport payload size). 108 109 When connection doesn't receive incoming data, 110 `ping_delay` is time (in seconds) that connection waits before sending 111 ping request. 112 113 `ping_timeout` is time (in seconds), that connection waits for any kind 114 of incoming traffic before closed connection is assumed. 115 116 If `no_cache` is set to ``True``, server will include 117 ``Cache-Control: no-cache`` header in all responses. 118 119 Args: 120 host: listening hostname 121 port: listening TCP port 122 connection_cb: connection callback 123 request_cb: request callback 124 ws_path: WebSocket url path segment 125 static_dir: static files directory path 126 index_path: index path 127 htpasswd_file: htpasswd file path 128 ssl_ctx: SSL context 129 autoflush_delay: autoflush delay 130 shutdown_timeout: shutdown timeout 131 state: shared server state 132 parallel_requests: parallel request processing 133 additional_routes: additional route definitions 134 send_queue_size: send queue size 135 max_segment_size: maximum segment size 136 ping_delay: ping delay 137 ping_timeout: ping timeout 138 no_cache: no cache header 139 140 """ 141 server = Server() 142 server._connection_cb = connection_cb 143 server._request_cb = request_cb 144 server._autoflush_delay = autoflush_delay 145 server._state = state 146 server._parallel_requests = parallel_requests 147 server._send_queue_size = send_queue_size 148 server._max_segment_size = max_segment_size 149 server._ping_delay = ping_delay 150 server._ping_timeout = ping_timeout 151 server._async_group = aio.Group() 152 153 middlewares = [] 154 155 if htpasswd_file: 156 middlewares.append(BasicAuthMiddleware(htpasswd_file)) 157 158 routes = [] 159 160 if index_path: 161 162 async def root_handler(request): 163 raise aiohttp.web.HTTPFound(index_path) 164 165 routes.append(aiohttp.web.get('/', root_handler)) 166 167 routes.append(aiohttp.web.get(ws_path, server._ws_handler)) 168 routes.extend(additional_routes) 169 170 if static_dir: 171 routes.append(aiohttp.web.static('/', static_dir)) 172 173 app = aiohttp.web.Application(middlewares=middlewares) 174 app.add_routes(routes) 175 176 if no_cache: 177 app.on_response_prepare.append(_no_cache_prepare) 178 179 runner = aiohttp.web.AppRunner(app, 180 shutdown_timeout=shutdown_timeout) 181 await runner.setup() 182 server.async_group.spawn(aio.call_on_cancel, runner.cleanup) 183 184 try: 185 site = aiohttp.web.TCPSite(runner=runner, 186 host=host, 187 port=port, 188 ssl_context=ssl_ctx, 189 reuse_address=True) 190 await site.start() 191 192 except BaseException: 193 await aio.uncancellable(server.async_close()) 194 raise 195 196 return server
Create listening server
Each time server receives new incoming juggler connection, connection_cb
is called with newly created connection.
For each connection, when server receives request
message, request_cb
is called with associated connection, request name and request data.
If request_cb
returns value, successful response
message is sent
with resulting value as data. If request_cb
raises exception,
unsuccessful response
message is sent with raised exception as data.
If request_cb
is None
, each request
message causes sending
of unsuccessful response
message.
If static_dir
is set, server serves static files is addition to providing
juggler communication.
If index_path
is set, request for url path /
are redirected to
index_path
.
If htpasswd_file
is set, HTTP Basic Authentication is enabled.
All requests are checked for Authorization
header and only users
specified by htpassword_file
are accepted. htpasswd_file
is read
during initialization and changes to it's content, after initialization
finishes, are not monitored.
If ssl_ctx
is set, server provides https/wss
communication instead
of http/ws
communication.
Argument autoflush_delay
is associated with all connections associated
with this server. autoflush_delay
defines maximum time delay for
automatic synchronization of state
changes. If autoflush_delay
is set
to None
, automatic synchronization is disabled and user is responsible
for calling Connection.flush()
. If autoflush_delay
is set to 0
,
synchronization of state
is performed on each change of state
data.
shutdown_timeout
defines maximum time duration server will wait for
regular connection close procedures during server shutdown. All connections
that are not closed during this period are forcefully closed.
If state
is None
, each connection is initialized with it's own
instance of server state. If state
is set, provided state is shared
between all connections.
If parallel_requests
is set to True
, incoming requests will be
processed in parallel - processing of subsequent requests can start (and
finish) before prior responses are generated.
Argument additional_routes
can be used for providing addition aiohttp
route definitions handled by running web server.
send_queue_size
limits number of messages that can be put in send queue.
This limit can impact blocking of Connection.notify()
.
max_segment_size
limits maximum size of single segment
(transport payload size).
When connection doesn't receive incoming data,
ping_delay
is time (in seconds) that connection waits before sending
ping request.
ping_timeout
is time (in seconds), that connection waits for any kind
of incoming traffic before closed connection is assumed.
If no_cache
is set to True
, server will include
Cache-Control: no-cache
header in all responses.
Arguments:
- host: listening hostname
- port: listening TCP port
- connection_cb: connection callback
- request_cb: request callback
- ws_path: WebSocket url path segment
- static_dir: static files directory path
- index_path: index path
- htpasswd_file: htpasswd file path
- ssl_ctx: SSL context
- autoflush_delay: autoflush delay
- shutdown_timeout: shutdown timeout
- state: shared server state
- parallel_requests: parallel request processing
- additional_routes: additional route definitions
- send_queue_size: send queue size
- max_segment_size: maximum segment size
- ping_delay: ping delay
- ping_timeout: ping timeout
- no_cache: no cache header
199class Server(aio.Resource): 200 """Server 201 202 For creating new server see `listen` coroutine. 203 204 When server is closed, all incoming connections are also closed. 205 206 """ 207 208 @property 209 def async_group(self) -> aio.Group: 210 """Async group""" 211 return self._async_group 212 213 async def _ws_handler(self, request): 214 ws = aiohttp.web.WebSocketResponse() 215 await ws.prepare(request) 216 217 conn = Connection() 218 conn._remote = _get_remote(request) 219 conn._async_group = self.async_group.create_subgroup() 220 conn._request_cb = self._request_cb 221 conn._autoflush_delay = self._autoflush_delay 222 conn._state = self._state or json.Storage() 223 conn._parallel_requests = self._parallel_requests 224 conn._flush_queue = aio.Queue() 225 226 conn._transport = Transport(ws=ws, 227 msg_cb=conn._on_msg, 228 send_queue_size=self._send_queue_size, 229 max_segment_size=self._max_segment_size, 230 ping_delay=self._ping_delay, 231 ping_timeout=self._ping_timeout) 232 233 conn.async_group.spawn(aio.call_on_cancel, conn._transport.async_close) 234 conn.async_group.spawn(aio.call_on_done, 235 conn._transport.wait_closing(), conn.close) 236 237 conn.async_group.spawn(conn._sync_loop) 238 239 if self._connection_cb: 240 conn.async_group.spawn(aio.call, self._connection_cb, conn) 241 242 await conn.wait_closed() 243 244 return ws
Server
For creating new server see listen
coroutine.
When server is closed, all incoming connections are also closed.
247class Connection(aio.Resource): 248 """Connection 249 250 For creating new connection see `listen` coroutine. 251 252 """ 253 254 @property 255 def async_group(self) -> aio.Group: 256 """Async group""" 257 return self._async_group 258 259 @property 260 def remote(self) -> str: 261 """Remote IP address 262 263 Address is obtained from Forwarded or X-Forwarded-For headers. If 264 these headers are not available, socket's remote address is used. 265 266 """ 267 return self._remote 268 269 @property 270 def state(self) -> json.Storage: 271 """Server state""" 272 return self._state 273 274 async def flush(self): 275 """Force synchronization of state data 276 277 Raises: 278 ConnectionError 279 280 """ 281 try: 282 flush_future = asyncio.Future() 283 self._flush_queue.put_nowait(flush_future) 284 await flush_future 285 286 except aio.QueueClosedError: 287 raise ConnectionError() 288 289 async def notify(self, 290 name: str, 291 data: json.Data): 292 """Send notification 293 294 Raises: 295 ConnectionError 296 297 """ 298 if not self.is_open: 299 raise ConnectionError() 300 301 await self._transport.send({'type': 'notify', 302 'name': name, 303 'data': data}) 304 305 async def _on_msg(self, msg): 306 if msg['type'] != 'request': 307 raise Exception("invalid message type") 308 309 if self._parallel_requests: 310 self.async_group.spawn(self._process_request, msg) 311 312 else: 313 await self._process_request(msg) 314 315 async def _process_request(self, req): 316 try: 317 res = {'type': 'response', 318 'id': req['id']} 319 320 if req['name']: 321 try: 322 if not self._request_cb: 323 raise Exception('request handler not implemented') 324 325 res['data'] = await aio.call(self._request_cb, self, 326 req['name'], req['data']) 327 res['success'] = True 328 329 except Exception as e: 330 res['data'] = str(e) 331 res['success'] = False 332 333 else: 334 res['data'] = req['data'] 335 res['success'] = True 336 337 await self._transport.send(res) 338 339 except ConnectionError: 340 self.close() 341 342 except Exception as e: 343 self.close() 344 mlog.error("process request error: %s", e, exc_info=e) 345 346 async def _sync_loop(self): 347 flush_future = None 348 data = None 349 synced_data = None 350 data_queue = aio.Queue() 351 352 try: 353 with self._state.register_change_cb(data_queue.put_nowait): 354 data_queue.put_nowait(self._state.data) 355 356 if not self.is_open: 357 return 358 359 get_data_future = self.async_group.spawn(data_queue.get) 360 get_flush_future = self.async_group.spawn( 361 self._flush_queue.get) 362 363 while True: 364 await asyncio.wait([get_data_future, get_flush_future], 365 return_when=asyncio.FIRST_COMPLETED) 366 367 if get_flush_future.done(): 368 flush_future = get_flush_future.result() 369 get_flush_future = self.async_group.spawn( 370 self._flush_queue.get) 371 372 else: 373 await asyncio.wait([get_flush_future], 374 timeout=self._autoflush_delay) 375 376 if get_flush_future.done(): 377 flush_future = get_flush_future.result() 378 get_flush_future = self.async_group.spawn( 379 self._flush_queue.get) 380 381 else: 382 flush_future = None 383 384 if get_data_future.done(): 385 data = get_data_future.result() 386 get_data_future = self.async_group.spawn( 387 data_queue.get) 388 389 if self._autoflush_delay != 0: 390 if not data_queue.empty(): 391 data = data_queue.get_nowait_until_empty() 392 393 if synced_data is not data: 394 diff = json.diff(synced_data, data) 395 synced_data = data 396 397 if diff: 398 await self._transport.send({'type': 'state', 399 'diff': diff}) 400 401 if flush_future and not flush_future.done(): 402 flush_future.set_result(True) 403 404 except Exception as e: 405 mlog.error("sync loop error: %s", e, exc_info=e) 406 407 finally: 408 self.close() 409 410 self._flush_queue.close() 411 while True: 412 if flush_future and not flush_future.done(): 413 flush_future.set_exception(ConnectionError()) 414 415 if self._flush_queue.empty(): 416 break 417 418 flush_future = self._flush_queue.get_nowait()
Connection
For creating new connection see listen
coroutine.
254 @property 255 def async_group(self) -> aio.Group: 256 """Async group""" 257 return self._async_group
Async group
259 @property 260 def remote(self) -> str: 261 """Remote IP address 262 263 Address is obtained from Forwarded or X-Forwarded-For headers. If 264 these headers are not available, socket's remote address is used. 265 266 """ 267 return self._remote
Remote IP address
Address is obtained from Forwarded or X-Forwarded-For headers. If these headers are not available, socket's remote address is used.
274 async def flush(self): 275 """Force synchronization of state data 276 277 Raises: 278 ConnectionError 279 280 """ 281 try: 282 flush_future = asyncio.Future() 283 self._flush_queue.put_nowait(flush_future) 284 await flush_future 285 286 except aio.QueueClosedError: 287 raise ConnectionError()
Force synchronization of state data
Raises:
- ConnectionError
289 async def notify(self, 290 name: str, 291 data: json.Data): 292 """Send notification 293 294 Raises: 295 ConnectionError 296 297 """ 298 if not self.is_open: 299 raise ConnectionError() 300 301 await self._transport.send({'type': 'notify', 302 'name': name, 303 'data': data})
Send notification
Raises:
- ConnectionError