hat.juggler

Juggler communication protocol

 1"""Juggler communication protocol"""
 2
 3from hat.juggler.client import (NotifyCb,
 4                                JugglerError,
 5                                connect,
 6                                Client)
 7from hat.juggler.server import (ConnectionCb,
 8                                RequestCb,
 9                                listen,
10                                Server,
11                                Connection)
12
13
14__all__ = ['NotifyCb',
15           'JugglerError',
16           'connect',
17           'Client',
18           'ConnectionCb',
19           'RequestCb',
20           'listen',
21           'Server',
22           'Connection']
NotifyCb = typing.Callable[[ForwardRef('Client'), str, None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], typing.Optional[typing.Awaitable[NoneType]]]
class JugglerError(builtins.Exception):
24class JugglerError(Exception):
25    """Juggler error"""
26
27    def __init__(self, data: json.Data):
28        self.__data = data
29
30    @property
31    def data(self) -> json.Data:
32        """Error data"""
33        return self.__data

Juggler error

JugglerError( data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')])
27    def __init__(self, data: json.Data):
28        self.__data = data
data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]
30    @property
31    def data(self) -> json.Data:
32        """Error data"""
33        return self.__data

Error data

async def connect( address: str, notify_cb: Optional[Callable[[ForwardRef('Client'), str, None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], Optional[Awaitable[NoneType]]]] = None, *, auth: aiohttp.helpers.BasicAuth | None = None, ssl_ctx: ssl.SSLContext | None = None) -> Client:
36async def connect(address: str,
37                  notify_cb: NotifyCb | None = None,
38                  *,
39                  auth: aiohttp.BasicAuth | None = None,
40                  ssl_ctx: ssl.SSLContext | None = None
41                  ) -> 'Client':
42    """Connect to remote server
43
44    `address` represents remote WebSocket URL formated as
45    ``<schema>://<host>:<port>/<path>`` where ``<schema>`` is ``ws`` or
46    ``wss``.
47
48    """
49    client = Client()
50    client._notify_cb = notify_cb
51    client._async_group = aio.Group()
52    client._state = json.Storage()
53    client._res_futures = {}
54    client._next_req_ids = itertools.count(1)
55    client._session = aiohttp.ClientSession()
56
57    try:
58        client._ws = await client._session.ws_connect(address,
59                                                      auth=auth,
60                                                      ssl=ssl_ctx or False,
61                                                      max_msg_size=0)
62
63    except BaseException:
64        await aio.uncancellable(client._session.close())
65        raise
66
67    client.async_group.spawn(client._receive_loop)
68
69    return client

Connect to remote server

address represents remote WebSocket URL formated as <schema>://<host>:<port>/<path> where <schema> is ws or wss.

class Client(hat.aio.group.Resource):
 72class Client(aio.Resource):
 73    """Client
 74
 75    For creating new client see `connect` coroutine.
 76
 77    """
 78
 79    @property
 80    def async_group(self) -> aio.Group:
 81        """Async group"""
 82        return self._async_group
 83
 84    @property
 85    def state(self) -> json.Storage:
 86        """Remote server state"""
 87        return self._state
 88
 89    async def send(self,
 90                   name: str,
 91                   data: json.Data
 92                   ) -> json.Data:
 93        """Send request and wait for response
 94
 95        Args:
 96            name: request name
 97            data: request payload
 98
 99        Raises:
100            JugglerError
101            ConnectionError
102
103        """
104        if not self.is_open:
105            raise ConnectionError()
106
107        req_id = next(self._next_req_ids)
108        res_future = asyncio.Future()
109        self._res_futures[req_id] = res_future
110
111        try:
112            await self._ws.send_json({'type': 'request',
113                                      'id': req_id,
114                                      'name': name,
115                                      'data': data})
116            return await res_future
117
118        finally:
119            self._res_futures.pop(req_id)
120
121    async def _receive_loop(self):
122        try:
123            while True:
124                msg_ws = await self._ws.receive()
125                if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING:
126                    break
127                if msg_ws.type != aiohttp.WSMsgType.TEXT:
128                    raise Exception("unsupported ws message type")
129
130                msg = json.decode(msg_ws.data)
131
132                if msg['type'] == 'response':
133                    res_future = self._res_futures.get(msg['id'])
134                    if not res_future or res_future.done():
135                        continue
136
137                    if msg['success']:
138                        res_future.set_result(msg['data'])
139
140                    else:
141                        res_future.set_exception(JugglerError(msg['data']))
142
143                elif msg['type'] == 'state':
144                    data = json.patch(self._state.data, msg['diff'])
145                    self._state.set([], data)
146
147                elif msg['type'] == 'notify':
148                    if not self._notify_cb:
149                        continue
150
151                    await aio.call(self._notify_cb, self, msg['name'],
152                                   msg['data'])
153
154                else:
155                    raise Exception("invalid message type")
156
157        except ConnectionError:
158            pass
159
160        except Exception as e:
161            mlog.error("receive loop error: %s", e, exc_info=e)
162
163        finally:
164            self.close()
165
166            for f in self._res_futures.values():
167                if not f.done():
168                    f.set_exception(ConnectionError())
169
170            await aio.uncancellable(self._close_ws())
171
172    async def _close_ws(self):
173        await self._ws.close()
174        await self._session.close()

Client

For creating new client see connect coroutine.

async_group: hat.aio.group.Group
79    @property
80    def async_group(self) -> aio.Group:
81        """Async group"""
82        return self._async_group

Async group

state: hat.json.path.Storage
84    @property
85    def state(self) -> json.Storage:
86        """Remote server state"""
87        return self._state

Remote server state

async def send( self, name: str, data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]) -> None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]:
 89    async def send(self,
 90                   name: str,
 91                   data: json.Data
 92                   ) -> json.Data:
 93        """Send request and wait for response
 94
 95        Args:
 96            name: request name
 97            data: request payload
 98
 99        Raises:
100            JugglerError
101            ConnectionError
102
103        """
104        if not self.is_open:
105            raise ConnectionError()
106
107        req_id = next(self._next_req_ids)
108        res_future = asyncio.Future()
109        self._res_futures[req_id] = res_future
110
111        try:
112            await self._ws.send_json({'type': 'request',
113                                      'id': req_id,
114                                      'name': name,
115                                      'data': data})
116            return await res_future
117
118        finally:
119            self._res_futures.pop(req_id)

Send request and wait for response

Arguments:
  • name: request name
  • data: request payload
Raises:
  • JugglerError
  • ConnectionError
ConnectionCb = typing.Callable[[ForwardRef('Connection')], typing.Optional[typing.Awaitable[NoneType]]]
RequestCb = typing.Callable[[ForwardRef('Connection'), str, None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], typing.Union[NoneType, bool, int, float, str, list[ForwardRef('Data')], dict[str, ForwardRef('Data')], typing.Awaitable[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]]]]
async def listen( host: str, port: int, connection_cb: Optional[Callable[[Connection], Optional[Awaitable[NoneType]]]] = None, request_cb: Optional[Callable[[ForwardRef('Connection'), str, None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], Union[NoneType, bool, int, float, str, list[ForwardRef('Data')], dict[str, ForwardRef('Data')], Awaitable[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]]]]] = None, *, ws_path: str = '/ws', static_dir: pathlib.PurePath | None = None, index_path: str | None = '/index.html', htpasswd_file: pathlib.PurePath | None = None, pem_file: pathlib.PurePath | None = None, autoflush_delay: float | None = 0.2, shutdown_timeout: float = 0.1, state: hat.json.path.Storage | None = None, parallel_requests: bool = False, additional_routes: Iterable[aiohttp.web_routedef.RouteDef] = []) -> Server:
 31async def listen(host: str,
 32                 port: int,
 33                 connection_cb: ConnectionCb | None = None,
 34                 request_cb: RequestCb | None = None,
 35                 *,
 36                 ws_path: str = '/ws',
 37                 static_dir: pathlib.PurePath | None = None,
 38                 index_path: str | None = '/index.html',
 39                 htpasswd_file: pathlib.PurePath | None = None,
 40                 pem_file: pathlib.PurePath | None = None,
 41                 autoflush_delay: float | None = 0.2,
 42                 shutdown_timeout: float = 0.1,
 43                 state: json.Storage | None = None,
 44                 parallel_requests: bool = False,
 45                 additional_routes: Iterable[aiohttp.web.RouteDef] = []
 46                 ) -> 'Server':
 47    """Create listening server
 48
 49    Each time server receives new incoming juggler connection, `connection_cb`
 50    is called with newly created connection.
 51
 52    For each connection, when server receives `request` message, `request_cb`
 53    is called with associated connection, request name and request data.
 54    If `request_cb` returns value, successful `response` message is sent
 55    with resulting value as data. If `request_cb` raises exception,
 56    unsuccessful `response` message is sent with raised exception as data.
 57    If `request_cb` is ``None``, each `request` message causes sending
 58    of unsuccessful `response` message.
 59
 60    If `static_dir` is set, server serves static files is addition to providing
 61    juggler communication.
 62
 63    If `index_path` is set, request for url path ``/`` are redirected to
 64    `index_path`.
 65
 66    If `htpasswd_file` is set, HTTP Basic Authentication is enabled.
 67    All requests are checked for ``Authorization`` header and only users
 68    specified by `htpassword_file` are accepted. `htpasswd_file` is read
 69    during initialization and changes to it's content, after initialization
 70    finishes, are not monitored.
 71
 72    If `pem_file` is set, server provides `https/wss` communication instead
 73    of `http/ws` communication.
 74
 75    Argument `autoflush_delay` is associated with all connections associated
 76    with this server. `autoflush_delay` defines maximum time delay for
 77    automatic synchronization of `state` changes. If `autoflush_delay` is set
 78    to ``None``, automatic synchronization is disabled and user is responsible
 79    for calling :meth:`Connection.flush`. If `autoflush_delay` is set to ``0``,
 80    synchronization of `state` is performed on each change of `state` data.
 81
 82    `shutdown_timeout` defines maximum time duration server will wait for
 83    regular connection close procedures during server shutdown. All connections
 84    that are not closed during this period are forcefully closed.
 85
 86    If `state` is ``None``, each connection is initialized with it's own
 87    instance of server state. If `state` is set, provided state is shared
 88    between all connections.
 89
 90    If `parallel_requests` is set to ``True``, incoming requests will be
 91    processed in parallel - processing of subsequent requests can start (and
 92    finish) before prior responses are generated.
 93
 94    Argument `additional_routes` can be used for providing addition aiohttp
 95    route definitions handled by running web server.
 96
 97    Args:
 98        host: listening hostname
 99        port: listening TCP port
100        connection_cb: connection callback
101        request_cb: request callback
102        ws_path: WebSocket url path segment
103        static_dir: static files directory path
104        index_path: index path
105        pem_file: PEM file path
106        autoflush_delay: autoflush delay
107        shutdown_timeout: shutdown timeout
108        state: shared server state
109        parallel_requests: parallel request processing
110        additional_routes: additional route definitions
111
112    """
113    server = Server()
114    server._connection_cb = connection_cb
115    server._request_cb = request_cb
116    server._autoflush_delay = autoflush_delay
117    server._state = state
118    server._parallel_requests = parallel_requests
119    server._async_group = aio.Group()
120
121    middlewares = []
122
123    if htpasswd_file:
124        middlewares.append(BasicAuthMiddleware(htpasswd_file))
125
126    routes = []
127
128    if index_path:
129
130        async def root_handler(request):
131            raise aiohttp.web.HTTPFound(index_path)
132
133        routes.append(aiohttp.web.get('/', root_handler))
134
135    routes.append(aiohttp.web.get(ws_path, server._ws_handler))
136    routes.extend(additional_routes)
137
138    if static_dir:
139        routes.append(aiohttp.web.static('/', static_dir))
140
141    app = aiohttp.web.Application(middlewares=middlewares)
142    app.add_routes(routes)
143    runner = aiohttp.web.AppRunner(app,
144                                   shutdown_timeout=shutdown_timeout)
145    await runner.setup()
146    server.async_group.spawn(aio.call_on_cancel, runner.cleanup)
147
148    try:
149        ssl_ctx = _create_ssl_context(pem_file) if pem_file else None
150        site = aiohttp.web.TCPSite(runner=runner,
151                                   host=host,
152                                   port=port,
153                                   ssl_context=ssl_ctx,
154                                   reuse_address=True)
155        await site.start()
156
157    except BaseException:
158        await aio.uncancellable(server.async_close())
159        raise
160
161    return server

Create listening server

Each time server receives new incoming juggler connection, connection_cb is called with newly created connection.

For each connection, when server receives request message, request_cb is called with associated connection, request name and request data. If request_cb returns value, successful response message is sent with resulting value as data. If request_cb raises exception, unsuccessful response message is sent with raised exception as data. If request_cb is None, each request message causes sending of unsuccessful response message.

If static_dir is set, server serves static files is addition to providing juggler communication.

If index_path is set, request for url path / are redirected to index_path.

If htpasswd_file is set, HTTP Basic Authentication is enabled. All requests are checked for Authorization header and only users specified by htpassword_file are accepted. htpasswd_file is read during initialization and changes to it's content, after initialization finishes, are not monitored.

If pem_file is set, server provides https/wss communication instead of http/ws communication.

Argument autoflush_delay is associated with all connections associated with this server. autoflush_delay defines maximum time delay for automatic synchronization of state changes. If autoflush_delay is set to None, automatic synchronization is disabled and user is responsible for calling Connection.flush(). If autoflush_delay is set to 0, synchronization of state is performed on each change of state data.

shutdown_timeout defines maximum time duration server will wait for regular connection close procedures during server shutdown. All connections that are not closed during this period are forcefully closed.

If state is None, each connection is initialized with it's own instance of server state. If state is set, provided state is shared between all connections.

If parallel_requests is set to True, incoming requests will be processed in parallel - processing of subsequent requests can start (and finish) before prior responses are generated.

Argument additional_routes can be used for providing addition aiohttp route definitions handled by running web server.

Arguments:
  • host: listening hostname
  • port: listening TCP port
  • connection_cb: connection callback
  • request_cb: request callback
  • ws_path: WebSocket url path segment
  • static_dir: static files directory path
  • index_path: index path
  • pem_file: PEM file path
  • autoflush_delay: autoflush delay
  • shutdown_timeout: shutdown timeout
  • state: shared server state
  • parallel_requests: parallel request processing
  • additional_routes: additional route definitions
class Server(hat.aio.group.Resource):
164class Server(aio.Resource):
165    """Server
166
167    For creating new server see `listen` coroutine.
168
169    When server is closed, all incoming connections are also closed.
170
171    """
172
173    @property
174    def async_group(self) -> aio.Group:
175        """Async group"""
176        return self._async_group
177
178    async def _ws_handler(self, request):
179        ws = aiohttp.web.WebSocketResponse()
180        await ws.prepare(request)
181
182        conn = Connection()
183        conn._ws = ws
184        conn._remote = _get_remote(request)
185        conn._async_group = self.async_group.create_subgroup()
186        conn._request_cb = self._request_cb
187        conn._autoflush_delay = self._autoflush_delay
188        conn._state = self._state or json.Storage()
189        conn._parallel_requests = self._parallel_requests
190        conn._flush_queue = aio.Queue()
191
192        conn.async_group.spawn(conn._receive_loop)
193        conn.async_group.spawn(conn._sync_loop)
194
195        if self._connection_cb:
196            conn.async_group.spawn(aio.call, self._connection_cb, conn)
197
198        await conn.wait_closed()
199
200        return ws

Server

For creating new server see listen coroutine.

When server is closed, all incoming connections are also closed.

async_group: hat.aio.group.Group
173    @property
174    def async_group(self) -> aio.Group:
175        """Async group"""
176        return self._async_group

Async group

class Connection(hat.aio.group.Resource):
203class Connection(aio.Resource):
204    """Connection
205
206    For creating new connection see `listen` coroutine.
207
208    """
209
210    @property
211    def async_group(self) -> aio.Group:
212        """Async group"""
213        return self._async_group
214
215    @property
216    def remote(self) -> str:
217        """Remote IP address
218
219        Address is obtained from Forwarded or X-Forwarded-For headers. If
220        these headers are not available, socket's remote address is used.
221
222        """
223        return self._remote
224
225    @property
226    def state(self) -> json.Storage:
227        """Server state"""
228        return self._state
229
230    async def flush(self):
231        """Force synchronization of state data
232
233        Raises:
234            ConnectionError
235
236        """
237        try:
238            flush_future = asyncio.Future()
239            self._flush_queue.put_nowait(flush_future)
240            await flush_future
241
242        except aio.QueueClosedError:
243            raise ConnectionError()
244
245    async def notify(self,
246                     name: str,
247                     data: json.Data):
248        """Send notification
249
250        Raises:
251            ConnectionError
252
253        """
254        if not self.is_open:
255            raise ConnectionError()
256
257        await self._ws.send_str(json.encode({'type': 'notify',
258                                             'name': name,
259                                             'data': data}))
260
261    async def _receive_loop(self):
262        try:
263            while self.is_open:
264                msg_ws = await self._ws.receive()
265                if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING:
266                    break
267                if msg_ws.type != aiohttp.WSMsgType.TEXT:
268                    raise Exception("unsupported ws message type")
269
270                msg = json.decode(msg_ws.data)
271
272                if msg['type'] != 'request':
273                    raise Exception("invalid message type")
274
275                if self._parallel_requests:
276                    self.async_group.spawn(self._process_request, msg)
277
278                else:
279                    await self._process_request(msg)
280
281        except ConnectionError:
282            pass
283
284        except Exception as e:
285            mlog.error("receive loop error: %s", e, exc_info=e)
286
287        finally:
288            self.close()
289            await aio.uncancellable(self._ws.close())
290
291    async def _process_request(self, req):
292        try:
293            res = {'type': 'response',
294                   'id': req['id']}
295
296            if req['name']:
297                try:
298                    if not self._request_cb:
299                        raise Exception('request handler not implemented')
300
301                    res['data'] = await aio.call(self._request_cb, self,
302                                                 req['name'], req['data'])
303                    res['success'] = True
304
305                except Exception as e:
306                    res['data'] = str(e)
307                    res['success'] = False
308
309            else:
310                res['data'] = req['data']
311                res['success'] = True
312
313            await self._ws.send_str(json.encode(res))
314
315        except ConnectionError:
316            self.close()
317
318        except Exception as e:
319            self.close()
320            mlog.error("process request error: %s", e, exc_info=e)
321
322    async def _sync_loop(self):
323        flush_future = None
324        data = None
325        synced_data = None
326        data_queue = aio.Queue()
327
328        try:
329            with self._state.register_change_cb(data_queue.put_nowait):
330                data_queue.put_nowait(self._state.data)
331
332                if not self.is_open:
333                    return
334
335                get_data_future = self.async_group.spawn(data_queue.get)
336                get_flush_future = self.async_group.spawn(
337                    self._flush_queue.get)
338
339                while True:
340                    await asyncio.wait([get_data_future, get_flush_future],
341                                       return_when=asyncio.FIRST_COMPLETED)
342
343                    if get_flush_future.done():
344                        flush_future = get_flush_future.result()
345                        get_flush_future = self.async_group.spawn(
346                            self._flush_queue.get)
347
348                    else:
349                        await asyncio.wait([get_flush_future],
350                                           timeout=self._autoflush_delay)
351
352                        if get_flush_future.done():
353                            flush_future = get_flush_future.result()
354                            get_flush_future = self.async_group.spawn(
355                                self._flush_queue.get)
356
357                        else:
358                            flush_future = None
359
360                    if get_data_future.done():
361                        data = get_data_future.result()
362                        get_data_future = self.async_group.spawn(
363                            data_queue.get)
364
365                    if self._autoflush_delay != 0:
366                        if not data_queue.empty():
367                            data = data_queue.get_nowait_until_empty()
368
369                    if synced_data is not data:
370                        diff = json.diff(synced_data, data)
371                        synced_data = data
372
373                        if diff:
374                            await self._ws.send_str(json.encode({
375                                'type': 'state',
376                                'diff': diff}))
377
378                    if flush_future and not flush_future.done():
379                        flush_future.set_result(True)
380
381        except Exception as e:
382            mlog.error("sync loop error: %s", e, exc_info=e)
383
384        finally:
385            self.close()
386
387            self._flush_queue.close()
388            while True:
389                if flush_future and not flush_future.done():
390                    flush_future.set_exception(ConnectionError())
391
392                if self._flush_queue.empty():
393                    break
394
395                flush_future = self._flush_queue.get_nowait()

Connection

For creating new connection see listen coroutine.

async_group: hat.aio.group.Group
210    @property
211    def async_group(self) -> aio.Group:
212        """Async group"""
213        return self._async_group

Async group

remote: str
215    @property
216    def remote(self) -> str:
217        """Remote IP address
218
219        Address is obtained from Forwarded or X-Forwarded-For headers. If
220        these headers are not available, socket's remote address is used.
221
222        """
223        return self._remote

Remote IP address

Address is obtained from Forwarded or X-Forwarded-For headers. If these headers are not available, socket's remote address is used.

state: hat.json.path.Storage
225    @property
226    def state(self) -> json.Storage:
227        """Server state"""
228        return self._state

Server state

async def flush(self):
230    async def flush(self):
231        """Force synchronization of state data
232
233        Raises:
234            ConnectionError
235
236        """
237        try:
238            flush_future = asyncio.Future()
239            self._flush_queue.put_nowait(flush_future)
240            await flush_future
241
242        except aio.QueueClosedError:
243            raise ConnectionError()

Force synchronization of state data

Raises:
  • ConnectionError
async def notify( self, name: str, data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]):
245    async def notify(self,
246                     name: str,
247                     data: json.Data):
248        """Send notification
249
250        Raises:
251            ConnectionError
252
253        """
254        if not self.is_open:
255            raise ConnectionError()
256
257        await self._ws.send_str(json.encode({'type': 'notify',
258                                             'name': name,
259                                             'data': data}))

Send notification

Raises:
  • ConnectionError