hat.juggler

Juggler communication protocol

 1"""Juggler communication protocol"""
 2
 3from hat.juggler.client import (NotifyCb,
 4                                JugglerError,
 5                                connect,
 6                                Client)
 7from hat.juggler.server import (ConnectionCb,
 8                                RequestCb,
 9                                listen,
10                                Server,
11                                Connection)
12
13
14__all__ = ['NotifyCb',
15           'JugglerError',
16           'connect',
17           'Client',
18           'ConnectionCb',
19           'RequestCb',
20           'listen',
21           'Server',
22           'Connection']
NotifyCb = typing.Callable[[ForwardRef('Client'), str, typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]], None | collections.abc.Awaitable[None]]
class JugglerError(builtins.Exception):
24class JugglerError(Exception):
25    """Juggler error"""
26
27    def __init__(self, data: json.Data):
28        self.__data = data
29
30    @property
31    def data(self) -> json.Data:
32        """Error data"""
33        return self.__data

Juggler error

JugglerError( data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]])
27    def __init__(self, data: json.Data):
28        self.__data = data
data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]
30    @property
31    def data(self) -> json.Data:
32        """Error data"""
33        return self.__data

Error data

async def connect( address: str, notify_cb: Optional[Callable[[ForwardRef('Client'), str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], None | Awaitable[None]]] = None, *, auth: aiohttp.helpers.BasicAuth | None = None, ssl_ctx: ssl.SSLContext | None = None) -> Client:
36async def connect(address: str,
37                  notify_cb: NotifyCb | None = None,
38                  *,
39                  auth: aiohttp.BasicAuth | None = None,
40                  ssl_ctx: ssl.SSLContext | None = None
41                  ) -> 'Client':
42    """Connect to remote server
43
44    `address` represents remote WebSocket URL formated as
45    ``<schema>://<host>:<port>/<path>`` where ``<schema>`` is ``ws`` or
46    ``wss``.
47
48    """
49    client = Client()
50    client._notify_cb = notify_cb
51    client._async_group = aio.Group()
52    client._state = json.Storage()
53    client._res_futures = {}
54    client._next_req_ids = itertools.count(1)
55    client._session = aiohttp.ClientSession()
56
57    try:
58        client._ws = await client._session.ws_connect(address,
59                                                      auth=auth,
60                                                      ssl=ssl_ctx or False,
61                                                      max_msg_size=0)
62
63    except BaseException:
64        await aio.uncancellable(client._session.close())
65        raise
66
67    client.async_group.spawn(client._receive_loop)
68
69    return client

Connect to remote server

address represents remote WebSocket URL formated as <schema>://<host>:<port>/<path> where <schema> is ws or wss.

class Client(hat.aio.group.Resource):
 72class Client(aio.Resource):
 73    """Client
 74
 75    For creating new client see `connect` coroutine.
 76
 77    """
 78
 79    @property
 80    def async_group(self) -> aio.Group:
 81        """Async group"""
 82        return self._async_group
 83
 84    @property
 85    def state(self) -> json.Storage:
 86        """Remote server state"""
 87        return self._state
 88
 89    async def send(self,
 90                   name: str,
 91                   data: json.Data
 92                   ) -> json.Data:
 93        """Send request and wait for response
 94
 95        Args:
 96            name: request name
 97            data: request payload
 98
 99        Raises:
100            JugglerError
101            ConnectionError
102
103        """
104        if not self.is_open:
105            raise ConnectionError()
106
107        req_id = next(self._next_req_ids)
108        res_future = asyncio.Future()
109        self._res_futures[req_id] = res_future
110
111        try:
112            await self._ws.send_json({'type': 'request',
113                                      'id': req_id,
114                                      'name': name,
115                                      'data': data})
116            return await res_future
117
118        finally:
119            self._res_futures.pop(req_id)
120
121    async def _receive_loop(self):
122        try:
123            while True:
124                msg_ws = await self._ws.receive()
125                if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING:
126                    break
127                if msg_ws.type != aiohttp.WSMsgType.TEXT:
128                    raise Exception("unsupported ws message type")
129
130                msg = json.decode(msg_ws.data)
131
132                if msg['type'] == 'response':
133                    res_future = self._res_futures.get(msg['id'])
134                    if not res_future or res_future.done():
135                        continue
136
137                    if msg['success']:
138                        res_future.set_result(msg['data'])
139
140                    else:
141                        res_future.set_exception(JugglerError(msg['data']))
142
143                elif msg['type'] == 'state':
144                    data = json.patch(self._state.data, msg['diff'])
145                    self._state.set([], data)
146
147                elif msg['type'] == 'notify':
148                    if not self._notify_cb:
149                        continue
150
151                    await aio.call(self._notify_cb, self, msg['name'],
152                                   msg['data'])
153
154                else:
155                    raise Exception("invalid message type")
156
157        except ConnectionError:
158            pass
159
160        except Exception as e:
161            mlog.error("receive loop error: %s", e, exc_info=e)
162
163        finally:
164            self.close()
165
166            for f in self._res_futures.values():
167                if not f.done():
168                    f.set_exception(ConnectionError())
169
170            await aio.uncancellable(self._close_ws())
171
172    async def _close_ws(self):
173        await self._ws.close()
174        await self._session.close()

Client

For creating new client see connect coroutine.

async_group: hat.aio.group.Group
79    @property
80    def async_group(self) -> aio.Group:
81        """Async group"""
82        return self._async_group

Async group

state: hat.json.path.Storage
84    @property
85    def state(self) -> json.Storage:
86        """Remote server state"""
87        return self._state

Remote server state

async def send( self, name: str, data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
 89    async def send(self,
 90                   name: str,
 91                   data: json.Data
 92                   ) -> json.Data:
 93        """Send request and wait for response
 94
 95        Args:
 96            name: request name
 97            data: request payload
 98
 99        Raises:
100            JugglerError
101            ConnectionError
102
103        """
104        if not self.is_open:
105            raise ConnectionError()
106
107        req_id = next(self._next_req_ids)
108        res_future = asyncio.Future()
109        self._res_futures[req_id] = res_future
110
111        try:
112            await self._ws.send_json({'type': 'request',
113                                      'id': req_id,
114                                      'name': name,
115                                      'data': data})
116            return await res_future
117
118        finally:
119            self._res_futures.pop(req_id)

Send request and wait for response

Arguments:
  • name: request name
  • data: request payload
Raises:
  • JugglerError
  • ConnectionError
ConnectionCb = typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
RequestCb = typing.Callable[[ForwardRef('Connection'), str, typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]], typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], collections.abc.Awaitable[typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]]]]
async def listen( host: str, port: int, connection_cb: Optional[Callable[[Connection], None | Awaitable[None]]] = None, request_cb: Optional[Callable[[ForwardRef('Connection'), str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Awaitable[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]]] = None, *, ws_path: str = '/ws', static_dir: pathlib.PurePath | None = None, index_path: str | None = '/index.html', htpasswd_file: pathlib.PurePath | None = None, ssl_ctx: ssl.SSLContext | None = None, autoflush_delay: float | None = 0.2, shutdown_timeout: float = 0.1, state: hat.json.path.Storage | None = None, parallel_requests: bool = False, additional_routes: Iterable[aiohttp.web_routedef.RouteDef] = []) -> Server:
 31async def listen(host: str,
 32                 port: int,
 33                 connection_cb: ConnectionCb | None = None,
 34                 request_cb: RequestCb | None = None,
 35                 *,
 36                 ws_path: str = '/ws',
 37                 static_dir: pathlib.PurePath | None = None,
 38                 index_path: str | None = '/index.html',
 39                 htpasswd_file: pathlib.PurePath | None = None,
 40                 ssl_ctx: ssl.SSLContext | None = None,
 41                 autoflush_delay: float | None = 0.2,
 42                 shutdown_timeout: float = 0.1,
 43                 state: json.Storage | None = None,
 44                 parallel_requests: bool = False,
 45                 additional_routes: Iterable[aiohttp.web.RouteDef] = []
 46                 ) -> 'Server':
 47    """Create listening server
 48
 49    Each time server receives new incoming juggler connection, `connection_cb`
 50    is called with newly created connection.
 51
 52    For each connection, when server receives `request` message, `request_cb`
 53    is called with associated connection, request name and request data.
 54    If `request_cb` returns value, successful `response` message is sent
 55    with resulting value as data. If `request_cb` raises exception,
 56    unsuccessful `response` message is sent with raised exception as data.
 57    If `request_cb` is ``None``, each `request` message causes sending
 58    of unsuccessful `response` message.
 59
 60    If `static_dir` is set, server serves static files is addition to providing
 61    juggler communication.
 62
 63    If `index_path` is set, request for url path ``/`` are redirected to
 64    `index_path`.
 65
 66    If `htpasswd_file` is set, HTTP Basic Authentication is enabled.
 67    All requests are checked for ``Authorization`` header and only users
 68    specified by `htpassword_file` are accepted. `htpasswd_file` is read
 69    during initialization and changes to it's content, after initialization
 70    finishes, are not monitored.
 71
 72    If `ssl_ctx` is set, server provides `https/wss` communication instead
 73    of `http/ws` communication.
 74
 75    Argument `autoflush_delay` is associated with all connections associated
 76    with this server. `autoflush_delay` defines maximum time delay for
 77    automatic synchronization of `state` changes. If `autoflush_delay` is set
 78    to ``None``, automatic synchronization is disabled and user is responsible
 79    for calling :meth:`Connection.flush`. If `autoflush_delay` is set to ``0``,
 80    synchronization of `state` is performed on each change of `state` data.
 81
 82    `shutdown_timeout` defines maximum time duration server will wait for
 83    regular connection close procedures during server shutdown. All connections
 84    that are not closed during this period are forcefully closed.
 85
 86    If `state` is ``None``, each connection is initialized with it's own
 87    instance of server state. If `state` is set, provided state is shared
 88    between all connections.
 89
 90    If `parallel_requests` is set to ``True``, incoming requests will be
 91    processed in parallel - processing of subsequent requests can start (and
 92    finish) before prior responses are generated.
 93
 94    Argument `additional_routes` can be used for providing addition aiohttp
 95    route definitions handled by running web server.
 96
 97    Args:
 98        host: listening hostname
 99        port: listening TCP port
100        connection_cb: connection callback
101        request_cb: request callback
102        ws_path: WebSocket url path segment
103        static_dir: static files directory path
104        index_path: index path
105        pem_file: PEM file path
106        autoflush_delay: autoflush delay
107        shutdown_timeout: shutdown timeout
108        state: shared server state
109        parallel_requests: parallel request processing
110        additional_routes: additional route definitions
111
112    """
113    server = Server()
114    server._connection_cb = connection_cb
115    server._request_cb = request_cb
116    server._autoflush_delay = autoflush_delay
117    server._state = state
118    server._parallel_requests = parallel_requests
119    server._async_group = aio.Group()
120
121    middlewares = []
122
123    if htpasswd_file:
124        middlewares.append(BasicAuthMiddleware(htpasswd_file))
125
126    routes = []
127
128    if index_path:
129
130        async def root_handler(request):
131            raise aiohttp.web.HTTPFound(index_path)
132
133        routes.append(aiohttp.web.get('/', root_handler))
134
135    routes.append(aiohttp.web.get(ws_path, server._ws_handler))
136    routes.extend(additional_routes)
137
138    if static_dir:
139        routes.append(aiohttp.web.static('/', static_dir))
140
141    app = aiohttp.web.Application(middlewares=middlewares)
142    app.add_routes(routes)
143    runner = aiohttp.web.AppRunner(app,
144                                   shutdown_timeout=shutdown_timeout)
145    await runner.setup()
146    server.async_group.spawn(aio.call_on_cancel, runner.cleanup)
147
148    try:
149        site = aiohttp.web.TCPSite(runner=runner,
150                                   host=host,
151                                   port=port,
152                                   ssl_context=ssl_ctx,
153                                   reuse_address=True)
154        await site.start()
155
156    except BaseException:
157        await aio.uncancellable(server.async_close())
158        raise
159
160    return server

Create listening server

Each time server receives new incoming juggler connection, connection_cb is called with newly created connection.

For each connection, when server receives request message, request_cb is called with associated connection, request name and request data. If request_cb returns value, successful response message is sent with resulting value as data. If request_cb raises exception, unsuccessful response message is sent with raised exception as data. If request_cb is None, each request message causes sending of unsuccessful response message.

If static_dir is set, server serves static files is addition to providing juggler communication.

If index_path is set, request for url path / are redirected to index_path.

If htpasswd_file is set, HTTP Basic Authentication is enabled. All requests are checked for Authorization header and only users specified by htpassword_file are accepted. htpasswd_file is read during initialization and changes to it's content, after initialization finishes, are not monitored.

If ssl_ctx is set, server provides https/wss communication instead of http/ws communication.

Argument autoflush_delay is associated with all connections associated with this server. autoflush_delay defines maximum time delay for automatic synchronization of state changes. If autoflush_delay is set to None, automatic synchronization is disabled and user is responsible for calling Connection.flush(). If autoflush_delay is set to 0, synchronization of state is performed on each change of state data.

shutdown_timeout defines maximum time duration server will wait for regular connection close procedures during server shutdown. All connections that are not closed during this period are forcefully closed.

If state is None, each connection is initialized with it's own instance of server state. If state is set, provided state is shared between all connections.

If parallel_requests is set to True, incoming requests will be processed in parallel - processing of subsequent requests can start (and finish) before prior responses are generated.

Argument additional_routes can be used for providing addition aiohttp route definitions handled by running web server.

Arguments:
  • host: listening hostname
  • port: listening TCP port
  • connection_cb: connection callback
  • request_cb: request callback
  • ws_path: WebSocket url path segment
  • static_dir: static files directory path
  • index_path: index path
  • pem_file: PEM file path
  • autoflush_delay: autoflush delay
  • shutdown_timeout: shutdown timeout
  • state: shared server state
  • parallel_requests: parallel request processing
  • additional_routes: additional route definitions
class Server(hat.aio.group.Resource):
163class Server(aio.Resource):
164    """Server
165
166    For creating new server see `listen` coroutine.
167
168    When server is closed, all incoming connections are also closed.
169
170    """
171
172    @property
173    def async_group(self) -> aio.Group:
174        """Async group"""
175        return self._async_group
176
177    async def _ws_handler(self, request):
178        ws = aiohttp.web.WebSocketResponse()
179        await ws.prepare(request)
180
181        conn = Connection()
182        conn._ws = ws
183        conn._remote = _get_remote(request)
184        conn._async_group = self.async_group.create_subgroup()
185        conn._request_cb = self._request_cb
186        conn._autoflush_delay = self._autoflush_delay
187        conn._state = self._state or json.Storage()
188        conn._parallel_requests = self._parallel_requests
189        conn._flush_queue = aio.Queue()
190
191        conn.async_group.spawn(conn._receive_loop)
192        conn.async_group.spawn(conn._sync_loop)
193
194        if self._connection_cb:
195            conn.async_group.spawn(aio.call, self._connection_cb, conn)
196
197        await conn.wait_closed()
198
199        return ws

Server

For creating new server see listen coroutine.

When server is closed, all incoming connections are also closed.

async_group: hat.aio.group.Group
172    @property
173    def async_group(self) -> aio.Group:
174        """Async group"""
175        return self._async_group

Async group

class Connection(hat.aio.group.Resource):
202class Connection(aio.Resource):
203    """Connection
204
205    For creating new connection see `listen` coroutine.
206
207    """
208
209    @property
210    def async_group(self) -> aio.Group:
211        """Async group"""
212        return self._async_group
213
214    @property
215    def remote(self) -> str:
216        """Remote IP address
217
218        Address is obtained from Forwarded or X-Forwarded-For headers. If
219        these headers are not available, socket's remote address is used.
220
221        """
222        return self._remote
223
224    @property
225    def state(self) -> json.Storage:
226        """Server state"""
227        return self._state
228
229    async def flush(self):
230        """Force synchronization of state data
231
232        Raises:
233            ConnectionError
234
235        """
236        try:
237            flush_future = asyncio.Future()
238            self._flush_queue.put_nowait(flush_future)
239            await flush_future
240
241        except aio.QueueClosedError:
242            raise ConnectionError()
243
244    async def notify(self,
245                     name: str,
246                     data: json.Data):
247        """Send notification
248
249        Raises:
250            ConnectionError
251
252        """
253        if not self.is_open:
254            raise ConnectionError()
255
256        await self._ws.send_str(json.encode({'type': 'notify',
257                                             'name': name,
258                                             'data': data}))
259
260    async def _receive_loop(self):
261        try:
262            while self.is_open:
263                msg_ws = await self._ws.receive()
264                if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING:
265                    break
266                if msg_ws.type != aiohttp.WSMsgType.TEXT:
267                    raise Exception("unsupported ws message type")
268
269                msg = json.decode(msg_ws.data)
270
271                if msg['type'] != 'request':
272                    raise Exception("invalid message type")
273
274                if self._parallel_requests:
275                    self.async_group.spawn(self._process_request, msg)
276
277                else:
278                    await self._process_request(msg)
279
280        except ConnectionError:
281            pass
282
283        except Exception as e:
284            mlog.error("receive loop error: %s", e, exc_info=e)
285
286        finally:
287            self.close()
288            await aio.uncancellable(self._ws.close())
289
290    async def _process_request(self, req):
291        try:
292            res = {'type': 'response',
293                   'id': req['id']}
294
295            if req['name']:
296                try:
297                    if not self._request_cb:
298                        raise Exception('request handler not implemented')
299
300                    res['data'] = await aio.call(self._request_cb, self,
301                                                 req['name'], req['data'])
302                    res['success'] = True
303
304                except Exception as e:
305                    res['data'] = str(e)
306                    res['success'] = False
307
308            else:
309                res['data'] = req['data']
310                res['success'] = True
311
312            await self._ws.send_str(json.encode(res))
313
314        except ConnectionError:
315            self.close()
316
317        except Exception as e:
318            self.close()
319            mlog.error("process request error: %s", e, exc_info=e)
320
321    async def _sync_loop(self):
322        flush_future = None
323        data = None
324        synced_data = None
325        data_queue = aio.Queue()
326
327        try:
328            with self._state.register_change_cb(data_queue.put_nowait):
329                data_queue.put_nowait(self._state.data)
330
331                if not self.is_open:
332                    return
333
334                get_data_future = self.async_group.spawn(data_queue.get)
335                get_flush_future = self.async_group.spawn(
336                    self._flush_queue.get)
337
338                while True:
339                    await asyncio.wait([get_data_future, get_flush_future],
340                                       return_when=asyncio.FIRST_COMPLETED)
341
342                    if get_flush_future.done():
343                        flush_future = get_flush_future.result()
344                        get_flush_future = self.async_group.spawn(
345                            self._flush_queue.get)
346
347                    else:
348                        await asyncio.wait([get_flush_future],
349                                           timeout=self._autoflush_delay)
350
351                        if get_flush_future.done():
352                            flush_future = get_flush_future.result()
353                            get_flush_future = self.async_group.spawn(
354                                self._flush_queue.get)
355
356                        else:
357                            flush_future = None
358
359                    if get_data_future.done():
360                        data = get_data_future.result()
361                        get_data_future = self.async_group.spawn(
362                            data_queue.get)
363
364                    if self._autoflush_delay != 0:
365                        if not data_queue.empty():
366                            data = data_queue.get_nowait_until_empty()
367
368                    if synced_data is not data:
369                        diff = json.diff(synced_data, data)
370                        synced_data = data
371
372                        if diff:
373                            await self._ws.send_str(json.encode({
374                                'type': 'state',
375                                'diff': diff}))
376
377                    if flush_future and not flush_future.done():
378                        flush_future.set_result(True)
379
380        except Exception as e:
381            mlog.error("sync loop error: %s", e, exc_info=e)
382
383        finally:
384            self.close()
385
386            self._flush_queue.close()
387            while True:
388                if flush_future and not flush_future.done():
389                    flush_future.set_exception(ConnectionError())
390
391                if self._flush_queue.empty():
392                    break
393
394                flush_future = self._flush_queue.get_nowait()

Connection

For creating new connection see listen coroutine.

async_group: hat.aio.group.Group
209    @property
210    def async_group(self) -> aio.Group:
211        """Async group"""
212        return self._async_group

Async group

remote: str
214    @property
215    def remote(self) -> str:
216        """Remote IP address
217
218        Address is obtained from Forwarded or X-Forwarded-For headers. If
219        these headers are not available, socket's remote address is used.
220
221        """
222        return self._remote

Remote IP address

Address is obtained from Forwarded or X-Forwarded-For headers. If these headers are not available, socket's remote address is used.

state: hat.json.path.Storage
224    @property
225    def state(self) -> json.Storage:
226        """Server state"""
227        return self._state

Server state

async def flush(self):
229    async def flush(self):
230        """Force synchronization of state data
231
232        Raises:
233            ConnectionError
234
235        """
236        try:
237            flush_future = asyncio.Future()
238            self._flush_queue.put_nowait(flush_future)
239            await flush_future
240
241        except aio.QueueClosedError:
242            raise ConnectionError()

Force synchronization of state data

Raises:
  • ConnectionError
async def notify( self, name: str, data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]):
244    async def notify(self,
245                     name: str,
246                     data: json.Data):
247        """Send notification
248
249        Raises:
250            ConnectionError
251
252        """
253        if not self.is_open:
254            raise ConnectionError()
255
256        await self._ws.send_str(json.encode({'type': 'notify',
257                                             'name': name,
258                                             'data': data}))

Send notification

Raises:
  • ConnectionError