Проблема реактивного Hibernate Panache с Kafka в Quarkus

При использовании Kafka с Hibarnate Panache вы сможете записать в базу данных только первые 5 сообщений из кафки. Разбираемся почему это происходит.

· 3 минуты на чтение

Итак, новая статья в продолжение рубрики "костыли Quarkus-а". В прошлый раз мы боролись с миграциями базы данных с реактивным драйвером БД. В этот раз поговорим об особенностях работы кафки с Hibernate Panache. На самом деле особенность простая, вместе они не работают 😅

Но давайте по порядку. Типичная ситуация: вы считываете сообщение с топика кафки, выполняете какую-то бизнес логику и хотите сохранить результат в БД. Довольно простая и обыденная ситуация, но только не в реактивном Quarkus.

Если вы попробуете сохранить полученное сообщение из кафки в БД через Hibernate Panache, то удивитесь результату. Будут сохранены только первые 5 полученных сообщения, после чего вы будете получать следующие исключение

2022-08-07 12:29:14,407 WARN  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-1) SRMSG18204: A message sent to channel `test` has been nacked, ignored failure is: org.hibernate.HibernateException: java.util.concurrent.CompletionException: io.vertx.core.impl.NoStackTraceThrowable: Timeout.

После чего вы будете считывать новое сообщение каждую минуту, но в БД сохранить вам ничего не удастся.

Спонсор поста

Это известная проблема (issue-26379), но с ней ничего не делают, что странно, потому что Hibernate Panache призван упростить разработку. И проблема настолько серьезная, что костыли решили задокументировать, но я все же надеюсь что со временем это исправят.

😿
Проект для демонстрации: quarkus-kafka-panache

Вы можете скланировать себе проект для демонстрации и воспроизвести проблему у себя. Проект состоит из двух логически независимых модуля-приложения:

  • quarkus-kafka-panache-generator – создает и отправляет сообщения в кафку
  • quarkus-kafka-panache-problem – принимает сообщения и пытается сохранить их в БД.

Как я уже сказал, склонируйте проект для детального изучения, в статье я покажу только класс принимающий сообщение из кафки

package dev.struchkov.example.quarkus.kafka;

import io.smallrye.mutiny.Uni;
import lombok.RequiredArgsConstructor;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
@RequiredArgsConstructor
public class KafkaHandler {

    private final EntityRepositoryImpl panacheRepository;

    @Incoming("test")
    public Uni<Void> handle(KafkaMessage message) {
        System.out.println("Получено сообщение " + message);
        final EntityForDb entityForDb = new EntityForDb();
        entityForDb.setCount(message.getCount());
        return panacheRepository.save(entityForDb).replaceWithVoid();
    }

}
package dev.struchkov.example.quarkus.kafka;

import io.quarkus.hibernate.reactive.panache.PanacheRepository;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PanacheRepositoryImpl implements PanacheRepository<EntityForDb> {

}

Как видите ничего особенного, получаем сообщение из кафки, преобразуем его в сущность и пытаемся сохранить в БД, но в итоге получаем следующую картину

Почему это происходит?

По умолчанию в Quarkus используется пул из 5 соединений для доступа к базе данных при работе с реактивным драйвером. Судя по всему при использовании кафки используемое соединение не возвращается в пул и оно становится не доступно.

Поэтому если увеличить пул соединений до 20, то вы сохраните первые 20 сообщений, и уже потом получите ошибку. Нашу проблему это не решает, но увеличить размер пула однозначно стоит.

Для этого необходимо изменить значение параметра max-size в application.yml. Я обычно устанавливаю 20.

quarkus:
  datasource:
    db-kind: postgresql
    username: ${DB_USERNAME}
    password: ${DB_PASSWORD}
    jdbc: false
    reactive:
      url: ${DATABASE_URL}
      max-size: 20

Как с этим жить?

Никак. Необходимо отказаться от использования Hibernate Panache в сценариях с использованием кафки. Для взаимодействия с базой используйте Mutiny.SessionFactory.

😺
Рабочая версия в ветке: solution

Давайте переделаем наш пример и заставим его работать:

package dev.struchkov.example.quarkus.kafka;

import io.smallrye.mutiny.Uni;
import lombok.RequiredArgsConstructor;
import org.hibernate.reactive.mutiny.Mutiny;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
@RequiredArgsConstructor
public class EntityRepositoryImpl {

    private final Mutiny.SessionFactory factory;

    public Uni<EntityForDb> save(EntityForDb entityForDb) {
        return factory.withTransaction(
                session -> session.merge(entityForDb)
        );
    }

}
⚠️
Учтите, что Mutiny.SessionFactory не будет работать, если вызов не будет исходить от кафки.

Посмотрим на вывод консоли:

Все сообщения были получены и сохранены в БД, никаких ошибок и задержек.

Используя factory вы можете не только сохранять сущности, но и выполнять запросы JPQL/SQL, а также создавать запросы с помощью Criteria API. Вот пример использования JPQL:

public Uni<Person> findByTelegramId(@NonNull Long telegramId) {
    return factory.withTransaction(
            session -> session.<Person>createQuery("SELECT p FROM Person p WHERE p.telegramId =" + telegramId).getSingleResult()
                    .onFailure().recoverWithNull()
    );
}

Но в случае с JPQL придется забыть про удобный способ подстановки параметров запроса с помощью аргументов метода, здесь это не сработает. Поэтому нужно заранее собрать запрос целиком.

Резюмирую

Hibernate Panache был создан, чтобы упростить взаимодействие с базой данных, но в случае с кафкой в Quarkus что-то пошло не так. Соединения с базщзой данных не закрываются и ваш пул соединений быстро пустеет.

Обходным решением является использование Hibarnate с помощью таких объектов, как Mutiny.SessionFactory.

Struchkov Mark
Struchkov Mark
Задавайте вопросы, если что-то осталось не понятным👇