Разработка простого чата на Quarkus с использованием WebSocket

На работе я активно использую Websocket, поэтому решил рассказать, как использовать WebSocket в Quarkus. Разберем реализацию простого онлайн-чата.

· 11 минуты на чтение

WebSocket — это технология, позволяющая обеспечить двунаправленное общение между клиентом и сервером через постоянное TCP-соединение. Это противопоставляется HTTP-соединению, где каждый обмен данными сопровождается новым открытием соединения.

Суть WebSocket заключается в том, что он позволяет обмениваться данными в обе стороны в режиме реального времени без заметных задержек. Это делает технологию идеально подходящей для приложений, требующих непрерывных обновлений в реальном времени, таких как онлайн-игры или сервисы мгновенного обмена сообщениями.

Спонсор поста

WebSocket соединение устанавливается следующим образом:

  1. Клиент отправляет HTTP-запрос на сервер с заголовком "Upgrade: websocket", чтобы начать соединение с сервером через вебсокет.
  2. Если сервер поддерживает протокол вебсокетов, он отправляет ответ с кодом 101 (Switching Protocols), что означает переключение с HTTP на протокол вебсокетов.
  3. Теперь клиент и сервер могут отправлять и принимать сообщения независимо друг от друга.

Чтобы начать работать с WebSocket в Quarkus необходимо добавить зависимость:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-websockets</artifactId>
</dependency>
Актуальная версия в Maven Central
Демо проект

Для примера создадим простой онлайн-чат, который позволит нескольким пользователям общаться между собой.

Чат будем создавать на базе фреймворка Quarkus.

😸 Демо репозиторий: Struchkov Git | GitHub

Используемые версии

Java: 17
Quarkus: 3.1.0.Final

Сначала мы создадим простой обработчик событий, что-то вроде контроллера в мире WebSocket. Обработчиков может быть много, каждый со своими специфическими функциями. Наш обработчик будет просто показывать полученные сообщения в консоли.

Пока наш обработчик будет просто выводить получаемые сообщения в консоль.

package dev.struchkov.example;

import jakarta.enterprise.context.ApplicationScoped; import jakarta.websocket.OnClose; import jakarta.websocket.OnError; import jakarta.websocket.OnMessage; import jakarta.websocket.OnOpen; import jakarta.websocket.Session; import jakarta.websocket.server.PathParam; import jakarta.websocket.server.ServerEndpoint;
@ApplicationScoped @ServerEndpoint("/chat/{chatId}") public class WebSocket { @OnOpen public void onOpen(Session session, @PathParam("chatId") String chatId) { System.out.println("onOpen> " + chatId); } @OnClose public void onClose(Session session, @PathParam("chatId") String chatId) { System.out.println("onClose> " + chatId); } @OnError public void onError(Session session, @PathParam("chatId") String chatId, Throwable throwable) { System.out.println("onError> " + chatId + ": " + throwable); } @OnMessage public void onMessage(Session session, @PathParam("chatId") String chatId, String message) { System.out.println("onMessage> " + chatId + ": " + message); } }

Аннотация @ServerEndpoint("/chat/{chatId}") помечает класс как точку подключения WebSocket. В скобках указан URL-адрес, по которому эта точка подключения будет доступна. {chatId} в URL-адресе - это переменная пути, которую мы можем использовать в методах этого класса.

@PathParam аналогичен аннотациям в REST контроллерах и используется для привязки значений из URL к переменным в методе. В нашем случае, это переменная chatId.

UUID и @PathParam

При желании использовать UUID в качестве идентификатора чата столкнёмся с проблемой: @PathParam не поддерживает преобразование некоторых типов данных, включая UUID.

Простейший способ обойти это - получать идентификатор в виде строки, а затем преобразовывать его в UUID с помощью функции UUID.fromString().

Рассмотрим обработчик, который включает в себя 4 метода:

  • @OnOpen: аннотация, помечающая метод, который будет вызываться при установлении нового WebSocket соединения.
  • @OnClose: аннотация, помечающая метод, который будет вызываться при закрытии WebSocket соединения.
  • @OnError: аннотация, помечающая метод, который будет вызываться при возникновении ошибки в WebSocket соединении.
  • @OnMessage: аннотация, помечающая метод, который будет вызываться при получении сообщения от клиента через WebSocket соединение.

Чтобы проверить работоспособность обработчика, запустим приложение и воспользуемся Postman-ом. Указываем наш url, и нажимаем кнопку Connect.

Мы видим сообщение о том, что соединение установлено, что означает, что в консоли нашего приложения должно появиться соответствующее сообщение.

Теперь, когда мы уверены, что соединение установлено, отправим какое-нибудь сообщение, например Hello.

В окне ответа ("Response") мы видим, что наше сообщение было успешно отправлено. Это подтверждается и в консоли нашего приложения.

Затем мы нажимаем на кнопку "Disconnect", и в ответ получаем событие о закрытии соединения.

На первый взгляд, все кажется довольно простым и понятным, и казалось бы, статью можно было бы завершить на этом моменте. Однако есть несколько нюансов, которые следует учесть.

Отправляем объект

Предположим, вы хотите отправить объект. Редко кто отправляет по websocket неструктурированный текст в виде строки. Гораздо удобнее использовать JSON.

Создадим DTO, который будет отвечать за передачу новых сообщений.

@Getter
@Setter
@ToString
public class ChatInputMessage {

    private UUID fromUserId;
    private String text;

}

Если мы попытаемся прямо заменить String на ChatMessage в методе onMessage(), мы столкнемся с ошибкой при запуске нашего приложения.

Для решения этой проблемы, нам нужно создать декодер и явно указать его в @ServerEndpoint.

package dev.struchkov.example;

import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import jakarta.websocket.DecodeException; import jakarta.websocket.Decoder; import lombok.SneakyThrows;
public class ChatMessageDecoder implements Decoder.Text<ChatMessage> { private final ObjectMapper jackson = ChatMessageDecoder.getJackson(); @Override @SneakyThrows public ChatMessage decode(String s) throws DecodeException { return jackson.readValue(s, ChatMessage.class); } @Override public boolean willDecode(String s) { return s != null; } public static ObjectMapper getJackson() { ObjectMapper om = new ObjectMapper(); om.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true); om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); om.registerModule(new JavaTimeModule()); return om; } }
@ServerEndpoint(
        value = "/chat/{chatId}",
        decoders = ChatMessageDecoder.class
)

Вы можете выбрать между парсингом строки самостоятельно или использованием готового решения, например ObjectMapper из библиотеки Jackson.

Теперь мы способны отправлять JSON и принимать готовый объект на вход.

Для удобства в Postman можно переключить тип сообщения с Text на JSON
Рандомный блок

Ответное сообщение

На данный момент наша система только принимает сообщения от клиентов. Давайте теперь рассмотрим, как отправлять полученные сообщения всем участникам чата - в конце концов, именно в этом и заключается суть чата.

Прежде чем мы сможем это сделать, нам необходимо определить, кому мы будем отправлять сообщения. Когда устанавливается соединение через WebSocket, создается объект класса Session. Этот объект управляет установленным соединением и позволяет отправлять ответы.

Мы должны "запоминать", какие сессии принадлежат каким чатам, когда устанавливается соединение в методе onOpen(). Также нам нужно "забывать" эту связь, когда соединение закрывается.

Для хранения активных сессий мы используем структуру данных ConcurrentHashMap.

@ApplicationScoped
@ServerEndpoint(...)
public class WebSocket {

    private final Map<String, List<Session>> sessions = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(Session session, @PathParam("chatId") String chatId) {
        System.out.println("onOpen> " + chatId);
        sessions.computeIfAbsent(chatId, key -> new ArrayList<>()).add(session);
    }

    @OnClose
    public void onClose(Session session, @PathParam("chatId") String chatId) {
        System.out.println("onClose> " + chatId);
        closeSession(session, chatId);
    }

    private void closeSession(Session session, String chatId) {
        final List<Session> chatSessions = sessions.get(chatId);
        final Iterator<Session> sessionIterator = chatSessions.iterator();
        while (sessionIterator.hasNext()) {
            final Session chatSession = sessionIterator.next();
            if (session.getId().equals(chatSession.getId())) {
                sessionIterator.remove();
                break;
            }
        }
    }
    
    // onError, onMessage

}

Теперь нам осталось только принять сообщение и отправить его во все связанные сессии. Давайте сделаем это.

@OnMessage
public void onMessage(Session session, @PathParam("chatId") String chatId, ChatInputMessage message) {
    System.out.println("onMessage> " + chatId + ": " + message);
    sendMessage(session, chatId, message);
}

private void sendMessage(Session session, String chatId, ChatInputMessage message) {
    final List<Session> chatSessions = sessions.get(chatId);
    for (Session chatSession : chatSessions) {
        if (session.getId().equals(chatSession.getId())) {
            continue;
        }
        chatSession.getAsyncRemote().sendObject(message);
    }
}

Казалось бы, все готово, но есть одна тонкость. Метод sendObject() не может отправлять произвольные объекты. Как и в случае получения JSON, нам необходимо создать преобразователь - в этом случае это будет ChatMessageEncoder.

package dev.struchkov.example;

import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.websocket.EncodeException; import jakarta.websocket.Encoder; import lombok.SneakyThrows;
public class ChatMessageEncoder implements Encoder.Text<ChatInputMessage> { private final ObjectMapper jackson = ChatMessageDecoder.getJackson(); @Override @SneakyThrows public String encode(ChatInputMessage chatInputMessage) throws EncodeException { return jackson.writeValueAsString(chatInputMessage); } }

Не забудьте указать ChatMessageEncoder в поле encoders аннотации @ServerEndpoint. Теперь все должно работать корректно.

Для проверки устанавливаем два соединения с одним и тем же чатом. Сообщения, отправленные из первого соединения, должны появляться во втором и наоборот.

0:00
/

Поздравляю, у нас получился простой чат!

Мульти типы

Часто бывает полезно отправлять данные разных типов по одному и тому же каналу. В нашем случае было бы удобно отправлять информацию о том, какое сообщение и кем было прочитано. Чтобы избежать создания множества WebSocket-соединений, мы можем отправлять все данные в одном.

Для этого мы создаем контейнер сообщений.

public class EventContainer {

    private EventType eventType;
    private Object event;
    
}
public enum EventType {

    MESSAGE_NEW, MESSAGE_VIEWED

}

EventType — это тип сообщения, а event — это сам объект.

Изменим преобразователи. У энкодера меняется только класс в дженерике, а декодер будет выглядеть примерно так:

@Override
@SneakyThrows
public EventContainer decode(String s) throws DecodeException {
    final String eventType = jackson.readTree(s).get("eventType").asText();
    final JsonNode event = jackson.readTree(s).get("event");
    return switch (EventType.valueOf(eventType)) {
        case MESSAGE_NEW -> EventContainer.messageInput(jackson.treeToValue(event, ChatInputMessage.class));
        case MESSAGE_VIEWED -> EventContainer.viewInput(jackson.treeToValue(event, ChatViewInput.class));
    };
}

Теперь в методе onMessage() заменяем ChatInputMessage на EventContainer. И пишем switch, который в зависимости от eventType будет распределять сообщения в разные методы.

@OnMessage
public void onMessage(Session session, @PathParam("chatId") String chatId, EventContainer event) {
    System.out.println("onMessage> " + chatId + ": " + event);
    switch (event.getEventType()) {
        case MESSAGE_NEW -> sendMessage(session, chatId, (ChatInputMessage) event.getEvent());
        case MESSAGE_VIEWED -> viewMessage(session, chatId, (ChatViewInput) event.getEvent());
    }
}

Авторизация

Текущая реализация предполагает передачу идентификатора отправителя в самом сообщении, что означает, что любой может "притвориться" любым пользователем нашей системы. Нехорошо.

К тому же любой может подключиться к любому чату и читать приходящие сообщения. Это тоже недопустимо. Добавим авторизацию, основанную на передаваемом cookie.

⚠️
В этом примере мы не будем реализовывать способ регистрации пользователей, генерацию токена/cookie или его проверку. Нас интересует только вопрос о том, как получить значение этого cookie.

Нам приходится создавать собственную реализацию ServerEndpointConfig.Configurator, чтобы получить доступ к cookie, поскольку в отличие от @PathParam, нет прямого способа получить cookies пярмо в обработчике.

public class CustomConfigurator extends ServerEndpointConfig.Configurator {

    @Override
    public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
        final Map<String, List<String>> headers = request.getHeaders();
        final List<String> cookies = headers.get("cookie");

        String sessionId = parseCookies(cookies); // ваша реализация парсинга кук
        config.getUserProperties().put("sessionId", sessionId);
    }

    public String parseCookies(List<String> cookies) {
        if (cookies != null) {
            for (String cookie : cookies) {
                String[] singleCookie = cookie.split(";");
                for (String part : singleCookie) {
                    part = part.trim();
                    if (part.startsWith("sessionId")) {
                        return part.substring("sessionId".length() + 1).trim();
                    }
                }
            }
        }
        return null;
    }

}

Извлекаем все HTTP-заголовки, находим заголовок с куками, после чего ищем куку с именем sessionId и сохраняем ее в пользовательские свойства, доступ к которым можно получить из объекта класса Session.

Теперь вносим необходимые изменения в класс WebSocket:

@ApplicationScoped
@ServerEndpoint(
        ...
        configurator = CustomConfigurator.class
)
@RequiredArgsConstructor
public class WebSocket {

    private final Map<String, List<Session>> sessions = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(Session session, @PathParam("chatId") String chatId) {
        System.out.println("onOpen> " + chatId);
        final String authCookieValue = (String) session.getUserProperties().get("sessionId");
        final UUID authUserId = getAuthUser(authCookieValue);
        session.getUserProperties().put("userId", authUserId);
        sessions.computeIfAbsent(chatId, key -> new ArrayList<>()).add(session);
    }

    private UUID getAuthUser(String authCookieValue) {
        // your auth logic here
        if (authCookieValue == null) throw new HttpException(401, "Не передан параметр авторизации.");
        if (authCookieValue.equals("user1")) return UUID.fromString("09e429de-a302-40b6-9d10-6b113ab9e89d");
        if (authCookieValue.equals("user2")) return UUID.fromString("f84dbae1-f9a9-4c37-8922-4eb207103676");
        throw new HttpException(403, "Пользователь не авторизован.");
    }

    @OnError
    public void onError(Session session, @PathParam("chatId") String chatId, Throwable throwable) {
        if (throwable instanceof HttpException httpException) {
            final int statusCode = httpException.getStatusCode();
            if (statusCode == 401) {
                session.getAsyncRemote().sendText("Вы не авторизованы.");
                closeSession(session, chatId);
                return;
            }
            if (statusCode == 403) {
                session.getAsyncRemote().sendText("Доступ запрещен.");
                closeSession(session, chatId);
                return;
            }
        }
        System.out.println("onError> " + chatId + ": " + throwable); 
   }
   
}

4: Указываем наш конфигуратор.

14-17: Извлекаем значение cookie и осуществляем проверку авторизации для получения идентификатора авторизованного пользователя (20-26). Подход может зависеть от используемого вами метода авторизации: это может быть вызов к сервису авторизации или проверка подписи. Если кука отсутствует или недействительна, выбрасываем исключение. Если все хорошо, то записываем идентификатор пользователя в объект класса Session.

30-42: В обработчике ошибок @OnError определяем вышеупомянутое исключение, информируем пользователя о проблеме, отправляя сообщение через канал, а затем закрываем канал.

Обновим DTO для отправляемых сообщений, так как теперь нам не требуется поле fromUserId - мы сохраняем идентификатор пользователя в процессе авторизации.

@Getter
@Setter
public class ChatInputMessage {

    private String text;

}

Создаём новый объект DTO для информирования других участников чата о новом сообщений, так как при получении сообщения важно знать, от кого оно пришло.

@Getter
@Setter
@AllArgsConstructor
public class ChatOutputMessage {

    private UUID fromUserId;
    private String text;

}

Не забываем обновить ChatMessageEncoder, чтобы он отправлял ChatOutputMessage.

Теперь исправим отправку сообщений в WebSocket. Достанем идентификатор пользователя из объекта сессии, чтобы указать его в ChatOutputMessage.

@ApplicationScoped
@ServerEndpoint(...)
@RequiredArgsConstructor
public class WebSocket {
    
    ...

    private void sendMessage(Session session, String chatId, ChatInputMessage message) {
        final List<Session> chatSessions = sessions.get(chatId);
        for (Session chatSession : chatSessions) {
            if (session.getId().equals(chatSession.getId())) {
                continue;
            }
            final UUID fromUserId = (UUID) session.getUserProperties().get("userId");
            final ChatOutputMessage outputMessage = new ChatOutputMessage(fromUserId, message.getText());
            chatSession.getAsyncRemote().sendObject(outputMessage);
        }
    }
    
    ...

}

После внесения окончательных изменений в WebSocket, запускаем приложение и проверяем результаты наших действий.

0:00
/

Проблема горизонтального масштабирования

В отличие от REST, у WebSocket есть одна уникальная особенность: WebSocket-соединение имеет состояние, которое представлено объектом класса Session. Это создаёт проблемы горизонтального масштабирования.

Представим, что теперь сервис чатов развернут в Kubernetes в 3 репликах. Пользователь чата открывает 3 вкладки браузера с нашим онлайн-чатом. Если балансировщик использует алгоритм round robin, он может распределить соединения между тремя репликами, каждое соединение (вкладка браузера) попадёт на свою реплику.

Проблема в том, что каждая реплика знает только о своих подключениях к чату и не знает о подключениях других реплик. Если кто-то отправит новое сообщение на первую реплику, пользователь увидит это сообщение только в одной вкладке браузера, которая установила соединение с репликой, на которую пришло новое сообщение.

Существуют разные подходы к решению проблемы масштабирования для сервисов, использующих WebSocket. Основные варианты включают:

Ничего не делать.
Для некоторых приложений не требуется строгая синхронизация между WebSocket-соединениями. Например, если ваше приложение только принимает сообщения или отправляет данные, и вам не важно, из какой реплики они отправляются, то возможно, вам не стоит заморачиваться над этой проблемой.

Используйте другой алгоритм балансировки.
Вы можете настроить балансировку таким образом, чтобы все соединения для одного и того же чата всегда направлялись на одну и ту же реплику. Это может привести к менее равномерной балансировке нагрузки, но зато решит проблему синхронизации.

Тонкий вебсокет сервис.
В этом случае, ваш WebSocket-сервис занимается только управлением WebSocket-соединениями и не выполняет никакой бизнес-логики.

Вся бизнес-логика обрабатывается отдельным сервисом, который не имеет понятия о WebSocket. Когда WebSocket-сервис получает сообщение, он просто пересылает его в бизнес-сервис через брокер сообщений, например, Kafka.

Бизнес-сервис затем обрабатывает сообщение и возвращает его обратно во все реплики WebSocket-сервиса для отправки всем участникам чата. Чтобы сразу все реплики получили сообщение, используйте Kafka Consumer Group.

Это решение требует больше усилий на реализацию, но может обеспечить более масштабируемую и гибкую архитектуру.

Заключение

В заключении, WebSocket представляет собой мощный инструмент для создания интерактивных сервисов, таких как чаты, игры и многие другие. Но использование WebSocket также ставит перед разработчиками ряд специфических проблем и вызовов, особенно когда речь идет о масштабировании.

В данной статье мы разобрали основы создания WebSocket-чата на Java, включая авторизацию, обработку сообщений и отправку ответных сообщений. Мы также рассмотрели вопросы, связанные с преобразованием JSON и использованием DTO для обработки входящих и исходящих сообщений.

Однако одной из главных темы статьи стала проблема масштабирования при использовании WebSocket. Мы рассмотрели разные подходы к ее решению, включая разные стратегии балансировки нагрузки и использование "тонкого" WebSocket-сервиса в сочетании с отдельным бизнес-сервисом.

Надеюсь, эта статья будет полезна для тех, кто только начинает работать с WebSocket на Java, и для опытных разработчиков, ищущих способы оптимизации и масштабирования своих WebSocket-приложений. В любом случае, WebSocket является важным инструментом в арсенале современного разработчика, и его стоит изучить и использовать в своих проектах.

Struchkov Mark
Struchkov Mark
Задавайте вопросы, если что-то осталось не понятным👇