hat.juggler

Juggler communication protocol

 1"""Juggler communication protocol"""
 2
 3from hat.juggler.client import (NotifyCb,
 4                                JugglerError,
 5                                connect,
 6                                Client)
 7from hat.juggler.server import (ConnectionCb,
 8                                RequestCb,
 9                                listen,
10                                Server,
11                                Connection)
12
13
14__all__ = ['NotifyCb',
15           'JugglerError',
16           'connect',
17           'Client',
18           'ConnectionCb',
19           'RequestCb',
20           'listen',
21           'Server',
22           'Connection']
NotifyCb = typing.Callable[[ForwardRef('Client'), str, typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]], None | collections.abc.Awaitable[None]]
class JugglerError(builtins.Exception):
26class JugglerError(Exception):
27    """Juggler error"""
28
29    def __init__(self, data: json.Data):
30        self.__data = data
31
32    @property
33    def data(self) -> json.Data:
34        """Error data"""
35        return self.__data

Juggler error

JugglerError( data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]])
29    def __init__(self, data: json.Data):
30        self.__data = data
data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]
32    @property
33    def data(self) -> json.Data:
34        """Error data"""
35        return self.__data

Error data

async def connect( address: str, notify_cb: Optional[Callable[[ForwardRef('Client'), str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], None | Awaitable[None]]] = None, *, auth: aiohttp.helpers.BasicAuth | None = None, ssl_ctx: ssl.SSLContext | None = None, send_queue_size: int = 1024, max_segment_size: int = 65536, ping_delay: float = 30, ping_timeout: float = 30) -> Client:
38async def connect(address: str,
39                  notify_cb: NotifyCb | None = None,
40                  *,
41                  auth: aiohttp.BasicAuth | None = None,
42                  ssl_ctx: ssl.SSLContext | None = None,
43                  send_queue_size: int = 1024,
44                  max_segment_size: int = 64 * 1024,
45                  ping_delay: float = 30,
46                  ping_timeout: float = 30
47                  ) -> 'Client':
48    """Connect to remote server
49
50    `address` represents remote WebSocket URL formated as
51    ``<schema>://<host>:<port>/<path>`` where ``<schema>`` is ``ws`` or
52    ``wss``.
53
54    """
55    client = Client()
56    client._notify_cb = notify_cb
57    client._loop = asyncio.get_running_loop()
58    client._async_group = aio.Group()
59    client._state = json.Storage()
60    client._res_futures = {}
61    client._next_req_ids = itertools.count(1)
62    client._session = aiohttp.ClientSession()
63
64    try:
65        ws = await client._session.ws_connect(address,
66                                              auth=auth,
67                                              ssl=ssl_ctx or False,
68                                              max_msg_size=0)
69
70    except BaseException:
71        await aio.uncancellable(client._session.close())
72        raise
73
74    client._transport = Transport(ws=ws,
75                                  msg_cb=client._on_msg,
76                                  send_queue_size=send_queue_size,
77                                  max_segment_size=max_segment_size,
78                                  ping_delay=ping_delay,
79                                  ping_timeout=ping_timeout)
80
81    client.async_group.spawn(aio.call_on_cancel, client._on_close)
82    client.async_group.spawn(aio.call_on_done,
83                             client._transport.wait_closing(), client.close)
84
85    return client

Connect to remote server

address represents remote WebSocket URL formated as <schema>://<host>:<port>/<path> where <schema> is ws or wss.

class Client(hat.aio.group.Resource):
 88class Client(aio.Resource):
 89    """Client
 90
 91    For creating new client see `connect` coroutine.
 92
 93    """
 94
 95    @property
 96    def async_group(self) -> aio.Group:
 97        """Async group"""
 98        return self._async_group
 99
100    @property
101    def state(self) -> json.Storage:
102        """Remote server state"""
103        return self._state
104
105    async def send(self,
106                   name: str,
107                   data: json.Data
108                   ) -> json.Data:
109        """Send request and wait for response
110
111        Args:
112            name: request name
113            data: request payload
114
115        Raises:
116            JugglerError
117            ConnectionError
118
119        """
120        if not self.is_open:
121            raise ConnectionError()
122
123        req_id = next(self._next_req_ids)
124        res_future = self._loop.create_future()
125        self._res_futures[req_id] = res_future
126
127        try:
128            await self._transport.send({'type': 'request',
129                                        'id': req_id,
130                                        'name': name,
131                                        'data': data})
132            return await res_future
133
134        finally:
135            self._res_futures.pop(req_id)
136
137    async def _on_close(self):
138        for f in self._res_futures.values():
139            if not f.done():
140                f.set_exception(ConnectionError())
141
142        await self._transport.async_close()
143        await self._session.close()
144
145    async def _on_msg(self, msg):
146        if msg['type'] == 'response':
147            res_future = self._res_futures.get(msg['id'])
148            if not res_future or res_future.done():
149                return
150
151            if msg['success']:
152                res_future.set_result(msg['data'])
153
154            else:
155                res_future.set_exception(JugglerError(msg['data']))
156
157        elif msg['type'] == 'state':
158            data = json.patch(self._state.data, msg['diff'])
159            self._state.set([], data)
160
161        elif msg['type'] == 'notify':
162            if not self._notify_cb:
163                return
164
165            await aio.call(self._notify_cb, self, msg['name'],
166                           msg['data'])
167
168        else:
169            raise Exception("invalid message type")

Client

For creating new client see connect coroutine.

async_group: hat.aio.group.Group
95    @property
96    def async_group(self) -> aio.Group:
97        """Async group"""
98        return self._async_group

Async group

state: hat.json.path.Storage
100    @property
101    def state(self) -> json.Storage:
102        """Remote server state"""
103        return self._state

Remote server state

async def send( self, name: str, data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]:
105    async def send(self,
106                   name: str,
107                   data: json.Data
108                   ) -> json.Data:
109        """Send request and wait for response
110
111        Args:
112            name: request name
113            data: request payload
114
115        Raises:
116            JugglerError
117            ConnectionError
118
119        """
120        if not self.is_open:
121            raise ConnectionError()
122
123        req_id = next(self._next_req_ids)
124        res_future = self._loop.create_future()
125        self._res_futures[req_id] = res_future
126
127        try:
128            await self._transport.send({'type': 'request',
129                                        'id': req_id,
130                                        'name': name,
131                                        'data': data})
132            return await res_future
133
134        finally:
135            self._res_futures.pop(req_id)

Send request and wait for response

Arguments:
  • name: request name
  • data: request payload
Raises:
  • JugglerError
  • ConnectionError
ConnectionCb = typing.Callable[[ForwardRef('Connection')], None | collections.abc.Awaitable[None]]
RequestCb = typing.Callable[[ForwardRef('Connection'), str, typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]], typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], collections.abc.Awaitable[typing.Union[NoneType, bool, int, float, str, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')]]]]]
async def listen( host: str, port: int, connection_cb: Optional[Callable[[Connection], None | Awaitable[None]]] = None, request_cb: Optional[Callable[[ForwardRef('Connection'), str, Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]], Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Awaitable[Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]]]]] = None, *, ws_path: str = '/ws', static_dir: pathlib.PurePath | None = None, index_path: str | None = '/index.html', htpasswd_file: pathlib.PurePath | None = None, ssl_ctx: ssl.SSLContext | None = None, autoflush_delay: float | None = 0.2, shutdown_timeout: float = 0.1, state: hat.json.path.Storage | None = None, parallel_requests: bool = False, additional_routes: Iterable[aiohttp.web_routedef.RouteDef] = [], send_queue_size: int = 1024, max_segment_size: int = 65536, ping_delay: float = 30, ping_timeout: float = 30, no_cache: bool = True) -> Server:
 32async def listen(host: str,
 33                 port: int,
 34                 connection_cb: ConnectionCb | None = None,
 35                 request_cb: RequestCb | None = None,
 36                 *,
 37                 ws_path: str = '/ws',
 38                 static_dir: pathlib.PurePath | None = None,
 39                 index_path: str | None = '/index.html',
 40                 htpasswd_file: pathlib.PurePath | None = None,
 41                 ssl_ctx: ssl.SSLContext | None = None,
 42                 autoflush_delay: float | None = 0.2,
 43                 shutdown_timeout: float = 0.1,
 44                 state: json.Storage | None = None,
 45                 parallel_requests: bool = False,
 46                 additional_routes: Iterable[aiohttp.web.RouteDef] = [],
 47                 send_queue_size: int = 1024,
 48                 max_segment_size: int = 64 * 1024,
 49                 ping_delay: float = 30,
 50                 ping_timeout: float = 30,
 51                 no_cache: bool = True
 52                 ) -> 'Server':
 53    """Create listening server
 54
 55    Each time server receives new incoming juggler connection, `connection_cb`
 56    is called with newly created connection.
 57
 58    For each connection, when server receives `request` message, `request_cb`
 59    is called with associated connection, request name and request data.
 60    If `request_cb` returns value, successful `response` message is sent
 61    with resulting value as data. If `request_cb` raises exception,
 62    unsuccessful `response` message is sent with raised exception as data.
 63    If `request_cb` is ``None``, each `request` message causes sending
 64    of unsuccessful `response` message.
 65
 66    If `static_dir` is set, server serves static files is addition to providing
 67    juggler communication.
 68
 69    If `index_path` is set, request for url path ``/`` are redirected to
 70    `index_path`.
 71
 72    If `htpasswd_file` is set, HTTP Basic Authentication is enabled.
 73    All requests are checked for ``Authorization`` header and only users
 74    specified by `htpassword_file` are accepted. `htpasswd_file` is read
 75    during initialization and changes to it's content, after initialization
 76    finishes, are not monitored.
 77
 78    If `ssl_ctx` is set, server provides `https/wss` communication instead
 79    of `http/ws` communication.
 80
 81    Argument `autoflush_delay` is associated with all connections associated
 82    with this server. `autoflush_delay` defines maximum time delay for
 83    automatic synchronization of `state` changes. If `autoflush_delay` is set
 84    to ``None``, automatic synchronization is disabled and user is responsible
 85    for calling :meth:`Connection.flush`. If `autoflush_delay` is set to ``0``,
 86    synchronization of `state` is performed on each change of `state` data.
 87
 88    `shutdown_timeout` defines maximum time duration server will wait for
 89    regular connection close procedures during server shutdown. All connections
 90    that are not closed during this period are forcefully closed.
 91
 92    If `state` is ``None``, each connection is initialized with it's own
 93    instance of server state. If `state` is set, provided state is shared
 94    between all connections.
 95
 96    If `parallel_requests` is set to ``True``, incoming requests will be
 97    processed in parallel - processing of subsequent requests can start (and
 98    finish) before prior responses are generated.
 99
100    Argument `additional_routes` can be used for providing addition aiohttp
101    route definitions handled by running web server.
102
103    `send_queue_size` limits number of messages that can be put in send queue.
104    This limit can impact blocking of :meth:`Connection.notify`.
105
106    `max_segment_size` limits maximum size of single segment
107    (transport payload size).
108
109    When connection doesn't receive incoming data,
110    `ping_delay` is time (in seconds) that connection waits before sending
111    ping request.
112
113    `ping_timeout` is time (in seconds), that connection waits for any kind
114    of incoming traffic before closed connection is assumed.
115
116    If `no_cache` is set to ``True``, server will include
117    ``Cache-Control: no-cache`` header in all responses.
118
119    Args:
120        host: listening hostname
121        port: listening TCP port
122        connection_cb: connection callback
123        request_cb: request callback
124        ws_path: WebSocket url path segment
125        static_dir: static files directory path
126        index_path: index path
127        htpasswd_file: htpasswd file path
128        ssl_ctx: SSL context
129        autoflush_delay: autoflush delay
130        shutdown_timeout: shutdown timeout
131        state: shared server state
132        parallel_requests: parallel request processing
133        additional_routes: additional route definitions
134        send_queue_size: send queue size
135        max_segment_size: maximum segment size
136        ping_delay: ping delay
137        ping_timeout: ping timeout
138        no_cache: no cache header
139
140    """
141    server = Server()
142    server._connection_cb = connection_cb
143    server._request_cb = request_cb
144    server._autoflush_delay = autoflush_delay
145    server._state = state
146    server._parallel_requests = parallel_requests
147    server._send_queue_size = send_queue_size
148    server._max_segment_size = max_segment_size
149    server._ping_delay = ping_delay
150    server._ping_timeout = ping_timeout
151    server._async_group = aio.Group()
152
153    middlewares = []
154
155    if htpasswd_file:
156        middlewares.append(BasicAuthMiddleware(htpasswd_file))
157
158    routes = []
159
160    if index_path:
161
162        async def root_handler(request):
163            raise aiohttp.web.HTTPFound(index_path)
164
165        routes.append(aiohttp.web.get('/', root_handler))
166
167    routes.append(aiohttp.web.get(ws_path, server._ws_handler))
168    routes.extend(additional_routes)
169
170    if static_dir:
171        routes.append(aiohttp.web.static('/', static_dir))
172
173    app = aiohttp.web.Application(middlewares=middlewares)
174    app.add_routes(routes)
175
176    if no_cache:
177        app.on_response_prepare.append(_no_cache_prepare)
178
179    runner = aiohttp.web.AppRunner(app,
180                                   shutdown_timeout=shutdown_timeout)
181    await runner.setup()
182    server.async_group.spawn(aio.call_on_cancel, runner.cleanup)
183
184    try:
185        site = aiohttp.web.TCPSite(runner=runner,
186                                   host=host,
187                                   port=port,
188                                   ssl_context=ssl_ctx,
189                                   reuse_address=True)
190        await site.start()
191
192    except BaseException:
193        await aio.uncancellable(server.async_close())
194        raise
195
196    return server

Create listening server

Each time server receives new incoming juggler connection, connection_cb is called with newly created connection.

For each connection, when server receives request message, request_cb is called with associated connection, request name and request data. If request_cb returns value, successful response message is sent with resulting value as data. If request_cb raises exception, unsuccessful response message is sent with raised exception as data. If request_cb is None, each request message causes sending of unsuccessful response message.

If static_dir is set, server serves static files is addition to providing juggler communication.

If index_path is set, request for url path / are redirected to index_path.

If htpasswd_file is set, HTTP Basic Authentication is enabled. All requests are checked for Authorization header and only users specified by htpassword_file are accepted. htpasswd_file is read during initialization and changes to it's content, after initialization finishes, are not monitored.

If ssl_ctx is set, server provides https/wss communication instead of http/ws communication.

Argument autoflush_delay is associated with all connections associated with this server. autoflush_delay defines maximum time delay for automatic synchronization of state changes. If autoflush_delay is set to None, automatic synchronization is disabled and user is responsible for calling Connection.flush(). If autoflush_delay is set to 0, synchronization of state is performed on each change of state data.

shutdown_timeout defines maximum time duration server will wait for regular connection close procedures during server shutdown. All connections that are not closed during this period are forcefully closed.

If state is None, each connection is initialized with it's own instance of server state. If state is set, provided state is shared between all connections.

If parallel_requests is set to True, incoming requests will be processed in parallel - processing of subsequent requests can start (and finish) before prior responses are generated.

Argument additional_routes can be used for providing addition aiohttp route definitions handled by running web server.

send_queue_size limits number of messages that can be put in send queue. This limit can impact blocking of Connection.notify().

max_segment_size limits maximum size of single segment (transport payload size).

When connection doesn't receive incoming data, ping_delay is time (in seconds) that connection waits before sending ping request.

ping_timeout is time (in seconds), that connection waits for any kind of incoming traffic before closed connection is assumed.

If no_cache is set to True, server will include Cache-Control: no-cache header in all responses.

Arguments:
  • host: listening hostname
  • port: listening TCP port
  • connection_cb: connection callback
  • request_cb: request callback
  • ws_path: WebSocket url path segment
  • static_dir: static files directory path
  • index_path: index path
  • htpasswd_file: htpasswd file path
  • ssl_ctx: SSL context
  • autoflush_delay: autoflush delay
  • shutdown_timeout: shutdown timeout
  • state: shared server state
  • parallel_requests: parallel request processing
  • additional_routes: additional route definitions
  • send_queue_size: send queue size
  • max_segment_size: maximum segment size
  • ping_delay: ping delay
  • ping_timeout: ping timeout
  • no_cache: no cache header
class Server(hat.aio.group.Resource):
199class Server(aio.Resource):
200    """Server
201
202    For creating new server see `listen` coroutine.
203
204    When server is closed, all incoming connections are also closed.
205
206    """
207
208    @property
209    def async_group(self) -> aio.Group:
210        """Async group"""
211        return self._async_group
212
213    async def _ws_handler(self, request):
214        ws = aiohttp.web.WebSocketResponse()
215        await ws.prepare(request)
216
217        conn = Connection()
218        conn._remote = _get_remote(request)
219        conn._async_group = self.async_group.create_subgroup()
220        conn._request_cb = self._request_cb
221        conn._autoflush_delay = self._autoflush_delay
222        conn._state = self._state or json.Storage()
223        conn._parallel_requests = self._parallel_requests
224        conn._flush_queue = aio.Queue()
225
226        conn._transport = Transport(ws=ws,
227                                    msg_cb=conn._on_msg,
228                                    send_queue_size=self._send_queue_size,
229                                    max_segment_size=self._max_segment_size,
230                                    ping_delay=self._ping_delay,
231                                    ping_timeout=self._ping_timeout)
232
233        conn.async_group.spawn(aio.call_on_cancel, conn._transport.async_close)
234        conn.async_group.spawn(aio.call_on_done,
235                               conn._transport.wait_closing(), conn.close)
236
237        conn.async_group.spawn(conn._sync_loop)
238
239        if self._connection_cb:
240            conn.async_group.spawn(aio.call, self._connection_cb, conn)
241
242        await conn.wait_closed()
243
244        return ws

Server

For creating new server see listen coroutine.

When server is closed, all incoming connections are also closed.

async_group: hat.aio.group.Group
208    @property
209    def async_group(self) -> aio.Group:
210        """Async group"""
211        return self._async_group

Async group

class Connection(hat.aio.group.Resource):
247class Connection(aio.Resource):
248    """Connection
249
250    For creating new connection see `listen` coroutine.
251
252    """
253
254    @property
255    def async_group(self) -> aio.Group:
256        """Async group"""
257        return self._async_group
258
259    @property
260    def remote(self) -> str:
261        """Remote IP address
262
263        Address is obtained from Forwarded or X-Forwarded-For headers. If
264        these headers are not available, socket's remote address is used.
265
266        """
267        return self._remote
268
269    @property
270    def state(self) -> json.Storage:
271        """Server state"""
272        return self._state
273
274    async def flush(self):
275        """Force synchronization of state data
276
277        Raises:
278            ConnectionError
279
280        """
281        try:
282            flush_future = asyncio.Future()
283            self._flush_queue.put_nowait(flush_future)
284            await flush_future
285
286        except aio.QueueClosedError:
287            raise ConnectionError()
288
289    async def notify(self,
290                     name: str,
291                     data: json.Data):
292        """Send notification
293
294        Raises:
295            ConnectionError
296
297        """
298        if not self.is_open:
299            raise ConnectionError()
300
301        await self._transport.send({'type': 'notify',
302                                    'name': name,
303                                    'data': data})
304
305    async def _on_msg(self, msg):
306        if msg['type'] != 'request':
307            raise Exception("invalid message type")
308
309        if self._parallel_requests:
310            self.async_group.spawn(self._process_request, msg)
311
312        else:
313            await self._process_request(msg)
314
315    async def _process_request(self, req):
316        try:
317            res = {'type': 'response',
318                   'id': req['id']}
319
320            if req['name']:
321                try:
322                    if not self._request_cb:
323                        raise Exception('request handler not implemented')
324
325                    res['data'] = await aio.call(self._request_cb, self,
326                                                 req['name'], req['data'])
327                    res['success'] = True
328
329                except Exception as e:
330                    res['data'] = str(e)
331                    res['success'] = False
332
333            else:
334                res['data'] = req['data']
335                res['success'] = True
336
337            await self._transport.send(res)
338
339        except ConnectionError:
340            self.close()
341
342        except Exception as e:
343            self.close()
344            mlog.error("process request error: %s", e, exc_info=e)
345
346    async def _sync_loop(self):
347        flush_future = None
348        data = None
349        synced_data = None
350        data_queue = aio.Queue()
351
352        try:
353            with self._state.register_change_cb(data_queue.put_nowait):
354                data_queue.put_nowait(self._state.data)
355
356                if not self.is_open:
357                    return
358
359                get_data_future = self.async_group.spawn(data_queue.get)
360                get_flush_future = self.async_group.spawn(
361                    self._flush_queue.get)
362
363                while True:
364                    await asyncio.wait([get_data_future, get_flush_future],
365                                       return_when=asyncio.FIRST_COMPLETED)
366
367                    if get_flush_future.done():
368                        flush_future = get_flush_future.result()
369                        get_flush_future = self.async_group.spawn(
370                            self._flush_queue.get)
371
372                    else:
373                        await asyncio.wait([get_flush_future],
374                                           timeout=self._autoflush_delay)
375
376                        if get_flush_future.done():
377                            flush_future = get_flush_future.result()
378                            get_flush_future = self.async_group.spawn(
379                                self._flush_queue.get)
380
381                        else:
382                            flush_future = None
383
384                    if get_data_future.done():
385                        data = get_data_future.result()
386                        get_data_future = self.async_group.spawn(
387                            data_queue.get)
388
389                    if self._autoflush_delay != 0:
390                        if not data_queue.empty():
391                            data = data_queue.get_nowait_until_empty()
392
393                    if synced_data is not data:
394                        diff = json.diff(synced_data, data)
395                        synced_data = data
396
397                        if diff:
398                            await self._transport.send({'type': 'state',
399                                                        'diff': diff})
400
401                    if flush_future and not flush_future.done():
402                        flush_future.set_result(True)
403
404        except Exception as e:
405            mlog.error("sync loop error: %s", e, exc_info=e)
406
407        finally:
408            self.close()
409
410            self._flush_queue.close()
411            while True:
412                if flush_future and not flush_future.done():
413                    flush_future.set_exception(ConnectionError())
414
415                if self._flush_queue.empty():
416                    break
417
418                flush_future = self._flush_queue.get_nowait()

Connection

For creating new connection see listen coroutine.

async_group: hat.aio.group.Group
254    @property
255    def async_group(self) -> aio.Group:
256        """Async group"""
257        return self._async_group

Async group

remote: str
259    @property
260    def remote(self) -> str:
261        """Remote IP address
262
263        Address is obtained from Forwarded or X-Forwarded-For headers. If
264        these headers are not available, socket's remote address is used.
265
266        """
267        return self._remote

Remote IP address

Address is obtained from Forwarded or X-Forwarded-For headers. If these headers are not available, socket's remote address is used.

state: hat.json.path.Storage
269    @property
270    def state(self) -> json.Storage:
271        """Server state"""
272        return self._state

Server state

async def flush(self):
274    async def flush(self):
275        """Force synchronization of state data
276
277        Raises:
278            ConnectionError
279
280        """
281        try:
282            flush_future = asyncio.Future()
283            self._flush_queue.put_nowait(flush_future)
284            await flush_future
285
286        except aio.QueueClosedError:
287            raise ConnectionError()

Force synchronization of state data

Raises:
  • ConnectionError
async def notify( self, name: str, data: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]):
289    async def notify(self,
290                     name: str,
291                     data: json.Data):
292        """Send notification
293
294        Raises:
295            ConnectionError
296
297        """
298        if not self.is_open:
299            raise ConnectionError()
300
301        await self._transport.send({'type': 'notify',
302                                    'name': name,
303                                    'data': data})

Send notification

Raises:
  • ConnectionError