hat.juggler

Juggler communication protocol

 1"""Juggler communication protocol"""
 2
 3from hat.juggler.client import (NotifyCb,
 4                                JugglerError,
 5                                connect,
 6                                Client)
 7from hat.juggler.server import (ConnectionCb,
 8                                RequestCb,
 9                                listen,
10                                Server,
11                                Connection)
12
13
14__all__ = ['NotifyCb',
15           'JugglerError',
16           'connect',
17           'Client',
18           'ConnectionCb',
19           'RequestCb',
20           'listen',
21           'Server',
22           'Connection']
NotifyCb = typing.Callable[[ForwardRef('Client'), str, None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], typing.Optional[typing.Awaitable[NoneType]]]
class JugglerError(builtins.Exception):
23class JugglerError(Exception):
24    """Juggler error"""
25
26    def __init__(self, data: json.Data):
27        self.__data = data
28
29    @property
30    def data(self) -> json.Data:
31        """Error data"""
32        return self.__data

Juggler error

JugglerError( data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')])
26    def __init__(self, data: json.Data):
27        self.__data = data
data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]
29    @property
30    def data(self) -> json.Data:
31        """Error data"""
32        return self.__data

Error data

Inherited Members
builtins.BaseException
with_traceback
args
async def connect( address: str, notify_cb: Optional[Callable[[ForwardRef('Client'), str, None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], Optional[Awaitable[NoneType]]]] = None) -> Client:
35async def connect(address: str,
36                  notify_cb: NotifyCb | None = None
37                  ) -> 'Client':
38    """Connect to remote server
39
40    `address` represents remote WebSocket URL formated as
41    ``<schema>://<host>:<port>/<path>`` where ``<schema>`` is ``ws`` or
42    ``wss``.
43
44    """
45    client = Client()
46    client._notify_cb = notify_cb
47    client._async_group = aio.Group()
48    client._state = json.Storage()
49    client._res_futures = {}
50    client._next_req_ids = itertools.count(1)
51    client._session = aiohttp.ClientSession()
52
53    try:
54        client._ws = await client._session.ws_connect(address, max_msg_size=0)
55
56    except BaseException:
57        await aio.uncancellable(client._session.close())
58        raise
59
60    client.async_group.spawn(client._receive_loop)
61
62    return client

Connect to remote server

address represents remote WebSocket URL formated as <schema>://<host>:<port>/<path> where <schema> is ws or wss.

class Client(hat.aio.group.Resource):
 65class Client(aio.Resource):
 66    """Client
 67
 68    For creating new client see `connect` coroutine.
 69
 70    """
 71
 72    @property
 73    def async_group(self) -> aio.Group:
 74        """Async group"""
 75        return self._async_group
 76
 77    @property
 78    def state(self) -> json.Storage:
 79        """Remote server state"""
 80        return self._state
 81
 82    async def send(self,
 83                   name: str,
 84                   data: json.Data
 85                   ) -> json.Data:
 86        """Send request and wait for response
 87
 88        Args:
 89            name: request name
 90            data: request payload
 91
 92        Raises:
 93            JugglerError
 94            ConnectionError
 95
 96        """
 97        if not self.is_open:
 98            raise ConnectionError()
 99
100        req_id = next(self._next_req_ids)
101        res_future = asyncio.Future()
102        self._res_futures[req_id] = res_future
103
104        try:
105            await self._ws.send_json({'type': 'request',
106                                      'id': req_id,
107                                      'name': name,
108                                      'data': data})
109            return await res_future
110
111        finally:
112            self._res_futures.pop(req_id)
113
114    async def _receive_loop(self):
115        try:
116            while True:
117                msg_ws = await self._ws.receive()
118                if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING:
119                    break
120                if msg_ws.type != aiohttp.WSMsgType.TEXT:
121                    raise Exception("unsupported ws message type")
122
123                msg = json.decode(msg_ws.data)
124
125                if msg['type'] == 'response':
126                    res_future = self._res_futures.get(msg['id'])
127                    if not res_future or res_future.done():
128                        continue
129
130                    if msg['success']:
131                        res_future.set_result(msg['data'])
132
133                    else:
134                        res_future.set_exception(JugglerError(msg['data']))
135
136                elif msg['type'] == 'state':
137                    data = json.patch(self._state.data, msg['diff'])
138                    self._state.set([], data)
139
140                elif msg['type'] == 'notify':
141                    if not self._notify_cb:
142                        continue
143
144                    await aio.call(self._notify_cb, self, msg['name'],
145                                   msg['data'])
146
147                else:
148                    raise Exception("invalid message type")
149
150        except ConnectionError:
151            pass
152
153        except Exception as e:
154            mlog.error("receive loop error: %s", e, exc_info=e)
155
156        finally:
157            self.close()
158
159            for f in self._res_futures.values():
160                if not f.done():
161                    f.set_exception(ConnectionError())
162
163            await aio.uncancellable(self._close_ws())
164
165    async def _close_ws(self):
166        await self._ws.close()
167        await self._session.close()

Client

For creating new client see connect coroutine.

async_group: hat.aio.group.Group
72    @property
73    def async_group(self) -> aio.Group:
74        """Async group"""
75        return self._async_group

Async group

state: hat.json.path.Storage
77    @property
78    def state(self) -> json.Storage:
79        """Remote server state"""
80        return self._state

Remote server state

async def send( self, name: str, data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]) -> None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]:
 82    async def send(self,
 83                   name: str,
 84                   data: json.Data
 85                   ) -> json.Data:
 86        """Send request and wait for response
 87
 88        Args:
 89            name: request name
 90            data: request payload
 91
 92        Raises:
 93            JugglerError
 94            ConnectionError
 95
 96        """
 97        if not self.is_open:
 98            raise ConnectionError()
 99
100        req_id = next(self._next_req_ids)
101        res_future = asyncio.Future()
102        self._res_futures[req_id] = res_future
103
104        try:
105            await self._ws.send_json({'type': 'request',
106                                      'id': req_id,
107                                      'name': name,
108                                      'data': data})
109            return await res_future
110
111        finally:
112            self._res_futures.pop(req_id)

Send request and wait for response

Arguments:
  • name: request name
  • data: request payload
Raises:
  • JugglerError
  • ConnectionError
Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
ConnectionCb = typing.Callable[[ForwardRef('Connection')], typing.Optional[typing.Awaitable[NoneType]]]
RequestCb = typing.Callable[[ForwardRef('Connection'), str, None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], typing.Union[NoneType, bool, int, float, str, list[ForwardRef('Data')], dict[str, ForwardRef('Data')], typing.Awaitable[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]]]]
async def listen( host: str, port: int, connection_cb: Optional[Callable[[Connection], Optional[Awaitable[NoneType]]]] = None, request_cb: Optional[Callable[[ForwardRef('Connection'), str, None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]], Union[NoneType, bool, int, float, str, list[ForwardRef('Data')], dict[str, ForwardRef('Data')], Awaitable[None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]]]]] = None, *, ws_path: str = '/ws', static_dir: pathlib.PurePath | None = None, index_path: str | None = '/index.html', pem_file: pathlib.PurePath | None = None, autoflush_delay: float | None = 0.2, shutdown_timeout: float = 0.1, state: hat.json.path.Storage | None = None, parallel_requests: bool = False, additional_routes: Iterable[aiohttp.web_routedef.RouteDef] = []) -> Server:
 27async def listen(host: str,
 28                 port: int,
 29                 connection_cb: ConnectionCb | None = None,
 30                 request_cb: RequestCb | None = None, *,
 31                 ws_path: str = '/ws',
 32                 static_dir: pathlib.PurePath | None = None,
 33                 index_path: str | None = '/index.html',
 34                 pem_file: pathlib.PurePath | None = None,
 35                 autoflush_delay: float | None = 0.2,
 36                 shutdown_timeout: float = 0.1,
 37                 state: json.Storage | None = None,
 38                 parallel_requests: bool = False,
 39                 additional_routes: typing.Iterable[aiohttp.web.RouteDef] = []
 40                 ) -> 'Server':
 41    """Create listening server
 42
 43    Each time server receives new incoming juggler connection, `connection_cb`
 44    is called with newly created connection.
 45
 46    For each connection, when server receives `request` message, `request_cb`
 47    is called with associated connection, request name and request data.
 48    If `request_cb` returns value, successful `response` message is sent
 49    with resulting value as data. If `request_cb` raises exception,
 50    unsuccessful `response` message is sent with raised exception as data.
 51    If `request_cb` is ``None``, each `request` message causes sending
 52    of unsuccessful `response` message.
 53
 54    If `static_dir` is set, server serves static files is addition to providing
 55    juggler communication.
 56
 57    If `index_path` is set, request for url path ``/`` are redirected to
 58    `index_path`.
 59
 60    If `pem_file` is set, server provides `https/wss` communication instead
 61    of `http/ws` communication.
 62
 63    Argument `autoflush_delay` is associated with all connections associated
 64    with this server. `autoflush_delay` defines maximum time delay for
 65    automatic synchronization of `state` changes. If `autoflush_delay` is set
 66    to ``None``, automatic synchronization is disabled and user is responsible
 67    for calling :meth:`Connection.flush`. If `autoflush_delay` is set to ``0``,
 68    synchronization of `state` is performed on each change of `state` data.
 69
 70    `shutdown_timeout` defines maximum time duration server will wait for
 71    regular connection close procedures during server shutdown. All connections
 72    that are not closed during this period are forcefully closed.
 73
 74    If `state` is ``None``, each connection is initialized with it's own
 75    instance of server state. If `state` is set, provided state is shared
 76    between all connections.
 77
 78    If `parallel_requests` is set to ``True``, incoming requests will be
 79    processed in parallel - processing of subsequent requests can start (and
 80    finish) before prior responses are generated.
 81
 82    Argument `additional_routes` can be used for providing addition aiohttp
 83    route definitions handled by running web server.
 84
 85    Args:
 86        host: listening hostname
 87        port: listening TCP port
 88        connection_cb: connection callback
 89        request_cb: request callback
 90        ws_path: WebSocket url path segment
 91        static_dir: static files directory path
 92        index_path: index path
 93        pem_file: PEM file path
 94        autoflush_delay: autoflush delay
 95        shutdown_timeout: shutdown timeout
 96        state: shared server state
 97        parallel_requests: parallel request processing
 98        additional_routes: additional route definitions
 99
100    """
101    server = Server()
102    server._connection_cb = connection_cb
103    server._request_cb = request_cb
104    server._autoflush_delay = autoflush_delay
105    server._state = state
106    server._parallel_requests = parallel_requests
107    server._async_group = aio.Group()
108
109    routes = []
110
111    if index_path:
112
113        async def root_handler(request):
114            raise aiohttp.web.HTTPFound(index_path)
115
116        routes.append(aiohttp.web.get('/', root_handler))
117
118    routes.append(aiohttp.web.get(ws_path, server._ws_handler))
119    routes.extend(additional_routes)
120
121    if static_dir:
122        routes.append(aiohttp.web.static('/', static_dir))
123
124    app = aiohttp.web.Application()
125    app.add_routes(routes)
126    runner = aiohttp.web.AppRunner(app)
127    await runner.setup()
128    server.async_group.spawn(aio.call_on_cancel, runner.cleanup)
129
130    try:
131        ssl_ctx = _create_ssl_context(pem_file) if pem_file else None
132        site = aiohttp.web.TCPSite(runner=runner,
133                                   host=host,
134                                   port=port,
135                                   shutdown_timeout=shutdown_timeout,
136                                   ssl_context=ssl_ctx,
137                                   reuse_address=True)
138        await site.start()
139
140    except BaseException:
141        await aio.uncancellable(server.async_close())
142        raise
143
144    return server

Create listening server

Each time server receives new incoming juggler connection, connection_cb is called with newly created connection.

For each connection, when server receives request message, request_cb is called with associated connection, request name and request data. If request_cb returns value, successful response message is sent with resulting value as data. If request_cb raises exception, unsuccessful response message is sent with raised exception as data. If request_cb is None, each request message causes sending of unsuccessful response message.

If static_dir is set, server serves static files is addition to providing juggler communication.

If index_path is set, request for url path / are redirected to index_path.

If pem_file is set, server provides https/wss communication instead of http/ws communication.

Argument autoflush_delay is associated with all connections associated with this server. autoflush_delay defines maximum time delay for automatic synchronization of state changes. If autoflush_delay is set to None, automatic synchronization is disabled and user is responsible for calling Connection.flush(). If autoflush_delay is set to 0, synchronization of state is performed on each change of state data.

shutdown_timeout defines maximum time duration server will wait for regular connection close procedures during server shutdown. All connections that are not closed during this period are forcefully closed.

If state is None, each connection is initialized with it's own instance of server state. If state is set, provided state is shared between all connections.

If parallel_requests is set to True, incoming requests will be processed in parallel - processing of subsequent requests can start (and finish) before prior responses are generated.

Argument additional_routes can be used for providing addition aiohttp route definitions handled by running web server.

Arguments:
  • host: listening hostname
  • port: listening TCP port
  • connection_cb: connection callback
  • request_cb: request callback
  • ws_path: WebSocket url path segment
  • static_dir: static files directory path
  • index_path: index path
  • pem_file: PEM file path
  • autoflush_delay: autoflush delay
  • shutdown_timeout: shutdown timeout
  • state: shared server state
  • parallel_requests: parallel request processing
  • additional_routes: additional route definitions
class Server(hat.aio.group.Resource):
147class Server(aio.Resource):
148    """Server
149
150    For creating new server see `listen` coroutine.
151
152    When server is closed, all incoming connections are also closed.
153
154    """
155
156    @property
157    def async_group(self) -> aio.Group:
158        """Async group"""
159        return self._async_group
160
161    async def _ws_handler(self, request):
162        ws = aiohttp.web.WebSocketResponse()
163        await ws.prepare(request)
164
165        conn = Connection()
166        conn._ws = ws
167        conn._async_group = self.async_group.create_subgroup()
168        conn._request_cb = self._request_cb
169        conn._autoflush_delay = self._autoflush_delay
170        conn._state = self._state or json.Storage()
171        conn._parallel_requests = self._parallel_requests
172        conn._flush_queue = aio.Queue()
173
174        conn.async_group.spawn(conn._receive_loop)
175        conn.async_group.spawn(conn._sync_loop)
176
177        if self._connection_cb:
178            conn.async_group.spawn(aio.call, self._connection_cb, conn)
179
180        await conn.wait_closed()
181
182        return ws

Server

For creating new server see listen coroutine.

When server is closed, all incoming connections are also closed.

async_group: hat.aio.group.Group
156    @property
157    def async_group(self) -> aio.Group:
158        """Async group"""
159        return self._async_group

Async group

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
class Connection(hat.aio.group.Resource):
185class Connection(aio.Resource):
186    """Connection
187
188    For creating new connection see `listen` coroutine.
189
190    """
191
192    @property
193    def async_group(self) -> aio.Group:
194        """Async group"""
195        return self._async_group
196
197    @property
198    def state(self) -> json.Storage:
199        """Server state"""
200        return self._state
201
202    async def flush(self):
203        """Force synchronization of state data
204
205        Raises:
206            ConnectionError
207
208        """
209        try:
210            flush_future = asyncio.Future()
211            self._flush_queue.put_nowait(flush_future)
212            await flush_future
213
214        except aio.QueueClosedError:
215            raise ConnectionError()
216
217    async def notify(self,
218                     name: str,
219                     data: json.Data):
220        """Send notification
221
222        Raises:
223            ConnectionError
224
225        """
226        if not self.is_open:
227            raise ConnectionError()
228
229        await self._ws.send_str(json.encode({'type': 'notify',
230                                             'name': name,
231                                             'data': data}))
232
233    async def _receive_loop(self):
234        try:
235            while self.is_open:
236                msg_ws = await self._ws.receive()
237                if self._ws.closed or msg_ws.type == aiohttp.WSMsgType.CLOSING:
238                    break
239                if msg_ws.type != aiohttp.WSMsgType.TEXT:
240                    raise Exception("unsupported ws message type")
241
242                msg = json.decode(msg_ws.data)
243
244                if msg['type'] != 'request':
245                    raise Exception("invalid message type")
246
247                if self._parallel_requests:
248                    self.async_group.spawn(self._process_request, msg)
249
250                else:
251                    await self._process_request(msg)
252
253        except ConnectionError:
254            pass
255
256        except Exception as e:
257            mlog.error("receive loop error: %s", e, exc_info=e)
258
259        finally:
260            self.close()
261            await aio.uncancellable(self._ws.close())
262
263    async def _process_request(self, req):
264        try:
265            res = {'type': 'response',
266                   'id': req['id']}
267
268            if req['name']:
269                try:
270                    if not self._request_cb:
271                        raise Exception('request handler not implemented')
272
273                    res['data'] = await aio.call(self._request_cb, self,
274                                                 req['name'], req['data'])
275                    res['success'] = True
276
277                except Exception as e:
278                    res['data'] = str(e)
279                    res['success'] = False
280
281            else:
282                res['data'] = req['data']
283                res['success'] = True
284
285            await self._ws.send_str(json.encode(res))
286
287        except ConnectionError:
288            self.close()
289
290        except Exception as e:
291            self.close()
292            mlog.error("process request error: %s", e, exc_info=e)
293
294    async def _sync_loop(self):
295        flush_future = None
296        data = None
297        synced_data = None
298        data_queue = aio.Queue()
299
300        try:
301            with self._state.register_change_cb(data_queue.put_nowait):
302                data_queue.put_nowait(self._state.data)
303
304                if not self.is_open:
305                    return
306
307                get_data_future = self.async_group.spawn(data_queue.get)
308                get_flush_future = self.async_group.spawn(
309                    self._flush_queue.get)
310
311                while True:
312                    await asyncio.wait([get_data_future, get_flush_future],
313                                       return_when=asyncio.FIRST_COMPLETED)
314
315                    if get_flush_future.done():
316                        flush_future = get_flush_future.result()
317                        get_flush_future = self.async_group.spawn(
318                            self._flush_queue.get)
319
320                    else:
321                        await asyncio.wait([get_flush_future],
322                                           timeout=self._autoflush_delay)
323
324                        if get_flush_future.done():
325                            flush_future = get_flush_future.result()
326                            get_flush_future = self.async_group.spawn(
327                                self._flush_queue.get)
328
329                        else:
330                            flush_future = None
331
332                    if get_data_future.done():
333                        data = get_data_future.result()
334                        get_data_future = self.async_group.spawn(
335                            data_queue.get)
336
337                    if self._autoflush_delay != 0:
338                        if not data_queue.empty():
339                            data = data_queue.get_nowait_until_empty()
340
341                    if synced_data is not data:
342                        diff = json.diff(synced_data, data)
343                        synced_data = data
344
345                        if diff:
346                            await self._ws.send_str(json.encode({
347                                'type': 'state',
348                                'diff': diff}))
349
350                    if flush_future and not flush_future.done():
351                        flush_future.set_result(True)
352
353        except Exception as e:
354            mlog.error("sync loop error: %s", e, exc_info=e)
355
356        finally:
357            self.close()
358
359            self._flush_queue.close()
360            while True:
361                if flush_future and not flush_future.done():
362                    flush_future.set_exception(ConnectionError())
363
364                if self._flush_queue.empty():
365                    break
366
367                flush_future = self._flush_queue.get_nowait()

Connection

For creating new connection see listen coroutine.

async_group: hat.aio.group.Group
192    @property
193    def async_group(self) -> aio.Group:
194        """Async group"""
195        return self._async_group

Async group

state: hat.json.path.Storage
197    @property
198    def state(self) -> json.Storage:
199        """Server state"""
200        return self._state

Server state

async def flush(self):
202    async def flush(self):
203        """Force synchronization of state data
204
205        Raises:
206            ConnectionError
207
208        """
209        try:
210            flush_future = asyncio.Future()
211            self._flush_queue.put_nowait(flush_future)
212            await flush_future
213
214        except aio.QueueClosedError:
215            raise ConnectionError()

Force synchronization of state data

Raises:
  • ConnectionError
async def notify( self, name: str, data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]):
217    async def notify(self,
218                     name: str,
219                     data: json.Data):
220        """Send notification
221
222        Raises:
223            ConnectionError
224
225        """
226        if not self.is_open:
227            raise ConnectionError()
228
229        await self._ws.send_str(json.encode({'type': 'notify',
230                                             'name': name,
231                                             'data': data}))

Send notification

Raises:
  • ConnectionError
Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close