Skip to content

WebSocket server

WebSocket server.

Source code in pycrdt_websocket/websocket_server.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class WebsocketServer:
    """WebSocket server."""

    auto_clean_rooms: bool
    rooms: dict[str, YRoom]
    _started: Event | None = None
    _stopped: Event
    _task_group: TaskGroup | None = None
    __start_lock: Lock | None = None

    def __init__(
        self,
        rooms_ready: bool = True,
        auto_clean_rooms: bool = True,
        exception_handler: Callable[[Exception, Logger], bool] | None = None,
        log: Logger | None = None,
    ) -> None:
        """Initialize the object.

        The WebsocketServer instance should preferably be used as an async context manager:
        ```py
        async with websocket_server:
            ...
        ```
        However, a lower-level API can also be used:
        ```py
        task = asyncio.create_task(websocket_server.start())
        await websocket_server.started.wait()
        ...
        await websocket_server.stop()
        ```

        Arguments:
            rooms_ready: Whether rooms are ready to be synchronized when opened.
            auto_clean_rooms: Whether rooms should be deleted when no client is there anymore.
            exception_handler: An optional callback to call when an exception is raised, that
                returns True if the exception was handled.
            log: An optional logger.
        """
        self.rooms_ready = rooms_ready
        self.auto_clean_rooms = auto_clean_rooms
        self.exception_handler = exception_handler
        self.log = log or getLogger(__name__)
        self.rooms = {}
        self._stopped = Event()

    @property
    def started(self) -> Event:
        """An async event that is set when the WebSocket server has started."""
        if self._started is None:
            self._started = Event()
        return self._started

    @property
    def _start_lock(self) -> Lock:
        if self.__start_lock is None:
            self.__start_lock = Lock()
        return self.__start_lock

    async def get_room(self, name: str) -> YRoom:
        """Get or create a room with the given name, and start it.

        Arguments:
            name: The room name.

        Returns:
            The room with the given name, or a new one if no room with that name was found.
        """
        if name not in self.rooms.keys():
            self.rooms[name] = YRoom(ready=self.rooms_ready, log=self.log)
        room = self.rooms[name]
        await self.start_room(room)
        return room

    async def start_room(self, room: YRoom) -> None:
        """Start a room, if not already started.

        Arguments:
            room: The room to start.
        """
        if self._task_group is None:
            raise RuntimeError(
                "The WebsocketServer is not running: use `async with websocket_server:` or "
                "`await websocket_server.start()`"
            )

        if not room.started.is_set():
            await self._task_group.start(room.start)

    def get_room_name(self, room: YRoom) -> str:
        """Get the name of a room.

        Arguments:
            room: The room to get the name from.

        Returns:
            The room name.
        """
        return list(self.rooms.keys())[list(self.rooms.values()).index(room)]

    def rename_room(
        self, to_name: str, *, from_name: str | None = None, from_room: YRoom | None = None
    ) -> None:
        """Rename a room.

        Arguments:
            to_name: The new name of the room.
            from_name: The previous name of the room (if `from_room` is not passed).
            from_room: The room to be renamed (if `from_name` is not passed).
        """
        if from_name is not None and from_room is not None:
            raise RuntimeError("Cannot pass from_name and from_room")
        if from_name is None:
            assert from_room is not None
            from_name = self.get_room_name(from_room)
        self.rooms[to_name] = self.rooms.pop(from_name)

    async def delete_room(self, *, name: str | None = None, room: YRoom | None = None) -> None:
        """Delete a room.

        Arguments:
            name: The name of the room to delete (if `room` is not passed).
            room: The room to delete (if `name` is not passed).
        """
        if name is not None and room is not None:
            raise RuntimeError("Cannot pass name and room")
        if name is None:
            assert room is not None
            name = self.get_room_name(room)
        room = self.rooms.pop(name)
        await room.stop()

    async def serve(self, websocket: Websocket) -> None:
        """Serve a client through a WebSocket.

        Arguments:
            websocket: The WebSocket through which to serve the client.
        """
        if self._task_group is None:
            raise RuntimeError(
                "The WebsocketServer is not running: use `async with websocket_server:` or "
                "`await websocket_server.start()`"
            )

        try:
            async with create_task_group():
                room = await self.get_room(websocket.path)
                await self.start_room(room)
                await room.serve(websocket)
                if self.auto_clean_rooms and not room.clients:
                    await self.delete_room(room=room)
        except Exception as exception:
            self._handle_exception(exception)

    async def __aenter__(self) -> WebsocketServer:
        async with self._start_lock:
            if self._task_group is not None:
                raise RuntimeError("WebsocketServer already running")

            async with AsyncExitStack() as exit_stack:
                self._task_group = await exit_stack.enter_async_context(create_task_group())
                self._exit_stack = exit_stack.pop_all()
                await self._task_group.start(partial(self.start, from_context_manager=True))

        return self

    async def __aexit__(self, exc_type, exc_value, exc_tb):
        await self.stop()
        return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)

    def _handle_exception(self, exception: Exception) -> None:
        exception_handled = False
        if self.exception_handler is not None:
            exception_handled = self.exception_handler(exception, self.log)
        if not exception_handled:
            raise exception

    async def start(
        self,
        *,
        task_status: TaskStatus[None] = TASK_STATUS_IGNORED,
        from_context_manager: bool = False,
    ):
        """Start the WebSocket server.

        Arguments:
            task_status: The status to set when the task has started.
        """
        if from_context_manager:
            task_status.started()
            self.started.set()
            assert self._task_group is not None
            # wait until stopped
            self._task_group.start_soon(self._stopped.wait)
            return

        async with self._start_lock:
            if self._task_group is not None:
                raise RuntimeError("WebsocketServer already running")

            while True:
                try:
                    async with create_task_group() as self._task_group:
                        if not self.started.is_set():
                            task_status.started()
                            self.started.set()
                        # wait until stopped
                        self._task_group.start_soon(self._stopped.wait)
                    return
                except Exception as exception:
                    self._handle_exception(exception)

    async def stop(self) -> None:
        """Stop the WebSocket server."""
        if self._task_group is None:
            raise RuntimeError("WebsocketServer not running")

        self._stopped.set()
        self._task_group.cancel_scope.cancel()
        self._task_group = None

started: Event property

An async event that is set when the WebSocket server has started.

__init__(rooms_ready=True, auto_clean_rooms=True, exception_handler=None, log=None)

Initialize the object.

The WebsocketServer instance should preferably be used as an async context manager:

async with websocket_server:
    ...
However, a lower-level API can also be used:
task = asyncio.create_task(websocket_server.start())
await websocket_server.started.wait()
...
await websocket_server.stop()

Parameters:

Name Type Description Default
rooms_ready bool

Whether rooms are ready to be synchronized when opened.

True
auto_clean_rooms bool

Whether rooms should be deleted when no client is there anymore.

True
exception_handler Callable[[Exception, Logger], bool] | None

An optional callback to call when an exception is raised, that returns True if the exception was handled.

None
log Logger | None

An optional logger.

None
Source code in pycrdt_websocket/websocket_server.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __init__(
    self,
    rooms_ready: bool = True,
    auto_clean_rooms: bool = True,
    exception_handler: Callable[[Exception, Logger], bool] | None = None,
    log: Logger | None = None,
) -> None:
    """Initialize the object.

    The WebsocketServer instance should preferably be used as an async context manager:
    ```py
    async with websocket_server:
        ...
    ```
    However, a lower-level API can also be used:
    ```py
    task = asyncio.create_task(websocket_server.start())
    await websocket_server.started.wait()
    ...
    await websocket_server.stop()
    ```

    Arguments:
        rooms_ready: Whether rooms are ready to be synchronized when opened.
        auto_clean_rooms: Whether rooms should be deleted when no client is there anymore.
        exception_handler: An optional callback to call when an exception is raised, that
            returns True if the exception was handled.
        log: An optional logger.
    """
    self.rooms_ready = rooms_ready
    self.auto_clean_rooms = auto_clean_rooms
    self.exception_handler = exception_handler
    self.log = log or getLogger(__name__)
    self.rooms = {}
    self._stopped = Event()

delete_room(*, name=None, room=None) async

Delete a room.

Parameters:

Name Type Description Default
name str | None

The name of the room to delete (if room is not passed).

None
room YRoom | None

The room to delete (if name is not passed).

None
Source code in pycrdt_websocket/websocket_server.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
async def delete_room(self, *, name: str | None = None, room: YRoom | None = None) -> None:
    """Delete a room.

    Arguments:
        name: The name of the room to delete (if `room` is not passed).
        room: The room to delete (if `name` is not passed).
    """
    if name is not None and room is not None:
        raise RuntimeError("Cannot pass name and room")
    if name is None:
        assert room is not None
        name = self.get_room_name(room)
    room = self.rooms.pop(name)
    await room.stop()

get_room(name) async

Get or create a room with the given name, and start it.

Parameters:

Name Type Description Default
name str

The room name.

required

Returns:

Type Description
YRoom

The room with the given name, or a new one if no room with that name was found.

Source code in pycrdt_websocket/websocket_server.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def get_room(self, name: str) -> YRoom:
    """Get or create a room with the given name, and start it.

    Arguments:
        name: The room name.

    Returns:
        The room with the given name, or a new one if no room with that name was found.
    """
    if name not in self.rooms.keys():
        self.rooms[name] = YRoom(ready=self.rooms_ready, log=self.log)
    room = self.rooms[name]
    await self.start_room(room)
    return room

get_room_name(room)

Get the name of a room.

Parameters:

Name Type Description Default
room YRoom

The room to get the name from.

required

Returns:

Type Description
str

The room name.

Source code in pycrdt_websocket/websocket_server.py
104
105
106
107
108
109
110
111
112
113
def get_room_name(self, room: YRoom) -> str:
    """Get the name of a room.

    Arguments:
        room: The room to get the name from.

    Returns:
        The room name.
    """
    return list(self.rooms.keys())[list(self.rooms.values()).index(room)]

rename_room(to_name, *, from_name=None, from_room=None)

Rename a room.

Parameters:

Name Type Description Default
to_name str

The new name of the room.

required
from_name str | None

The previous name of the room (if from_room is not passed).

None
from_room YRoom | None

The room to be renamed (if from_name is not passed).

None
Source code in pycrdt_websocket/websocket_server.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def rename_room(
    self, to_name: str, *, from_name: str | None = None, from_room: YRoom | None = None
) -> None:
    """Rename a room.

    Arguments:
        to_name: The new name of the room.
        from_name: The previous name of the room (if `from_room` is not passed).
        from_room: The room to be renamed (if `from_name` is not passed).
    """
    if from_name is not None and from_room is not None:
        raise RuntimeError("Cannot pass from_name and from_room")
    if from_name is None:
        assert from_room is not None
        from_name = self.get_room_name(from_room)
    self.rooms[to_name] = self.rooms.pop(from_name)

serve(websocket) async

Serve a client through a WebSocket.

Parameters:

Name Type Description Default
websocket Websocket

The WebSocket through which to serve the client.

required
Source code in pycrdt_websocket/websocket_server.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
async def serve(self, websocket: Websocket) -> None:
    """Serve a client through a WebSocket.

    Arguments:
        websocket: The WebSocket through which to serve the client.
    """
    if self._task_group is None:
        raise RuntimeError(
            "The WebsocketServer is not running: use `async with websocket_server:` or "
            "`await websocket_server.start()`"
        )

    try:
        async with create_task_group():
            room = await self.get_room(websocket.path)
            await self.start_room(room)
            await room.serve(websocket)
            if self.auto_clean_rooms and not room.clients:
                await self.delete_room(room=room)
    except Exception as exception:
        self._handle_exception(exception)

start(*, task_status=TASK_STATUS_IGNORED, from_context_manager=False) async

Start the WebSocket server.

Parameters:

Name Type Description Default
task_status TaskStatus[None]

The status to set when the task has started.

TASK_STATUS_IGNORED
Source code in pycrdt_websocket/websocket_server.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
async def start(
    self,
    *,
    task_status: TaskStatus[None] = TASK_STATUS_IGNORED,
    from_context_manager: bool = False,
):
    """Start the WebSocket server.

    Arguments:
        task_status: The status to set when the task has started.
    """
    if from_context_manager:
        task_status.started()
        self.started.set()
        assert self._task_group is not None
        # wait until stopped
        self._task_group.start_soon(self._stopped.wait)
        return

    async with self._start_lock:
        if self._task_group is not None:
            raise RuntimeError("WebsocketServer already running")

        while True:
            try:
                async with create_task_group() as self._task_group:
                    if not self.started.is_set():
                        task_status.started()
                        self.started.set()
                    # wait until stopped
                    self._task_group.start_soon(self._stopped.wait)
                return
            except Exception as exception:
                self._handle_exception(exception)

start_room(room) async

Start a room, if not already started.

Parameters:

Name Type Description Default
room YRoom

The room to start.

required
Source code in pycrdt_websocket/websocket_server.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def start_room(self, room: YRoom) -> None:
    """Start a room, if not already started.

    Arguments:
        room: The room to start.
    """
    if self._task_group is None:
        raise RuntimeError(
            "The WebsocketServer is not running: use `async with websocket_server:` or "
            "`await websocket_server.start()`"
        )

    if not room.started.is_set():
        await self._task_group.start(room.start)

stop() async

Stop the WebSocket server.

Source code in pycrdt_websocket/websocket_server.py
227
228
229
230
231
232
233
234
async def stop(self) -> None:
    """Stop the WebSocket server."""
    if self._task_group is None:
        raise RuntimeError("WebsocketServer not running")

    self._stopped.set()
    self._task_group.cancel_scope.cancel()
    self._task_group = None