Проблема реактивного Hibernate Panache с Kafka в Quarkus

При использовании Kafka с Hibarnate Panache вы сможете записать в базу данных только первые 5 сообщений из кафки. Разбираемся почему это происходит.

· 4 минуты на чтение

Итак, новая статья в продолжение рубрики "костыли Quarkus-а". В прошлый раз мы боролись с миграциями базы данных с реактивным драйвером БД. В этот раз поговорим об особенностях работы кафки с Hibernate Panache. На самом деле особенность простая, вместе они не работают 😅

Но давайте по порядку. Типичная ситуация: вы считываете сообщение с топика кафки, выполняете какую-то бизнес логику и хотите сохранить результат в БД. Довольно простая и обыденная ситуация, но только не в реактивном Quarkus.

При попытке сохранить полученные данные из кафки в БД через Hibernate Panache возможны два варианта. Они зависят от версии Quarkus, который используется на проекте.

Спонсор поста

Quarkus 2.14.0+

Сообщения из кафки будут успешно читаться, но при попытке сохранить что-то в БД выбрасывается следующее исключение:

2022-11-09 16:55:53,541 WARN  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-1) SRMSG18204: A message sent to channel `test` has been nacked, ignored failure is: RequestScoped context was not active when trying to obtain a bean instance for a client proxy of PRODUCER_METHOD bean [class=io.quarkus.hibernate.reactive.runtime.ReactiveSessionProducer, id=37346551fd6c9e87c80f25842d68c2d902107673]
	- you can activate the request context for a specific method using the @ActivateRequestContext interceptor binding.

Важно, сообщения продолжают читаться из топика, проблемы только с сохранением в БД.

В этом случае проблему довольно легко решить. Но не ручаюсь, что это самое хорошее и оптимальное решение. Просто добавляем аннотацию @ActivateRequestContext над @Incoming:

@Incoming("test")
@ActivateRequestContext
public Uni<Void> handle(KafkaMessage message) {
    System.out.println("Получено сообщение " + message);
    final EntityForDb entityForDb = new EntityForDb();
    entityForDb.setCount(message.getCount());
    return panacheRepository.persistAndFlush(entityForDb).replaceWithVoid();
}

Quakrus 2.13.4 и старее

В этой версии будут сохранены только первые 5 полученных сообщения, после чего вы будете получать следующие исключение:

2022-08-07 12:29:14,407 WARN  [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-1) SRMSG18204: A message sent to channel `test` has been nacked, ignored failure is: org.hibernate.HibernateException: java.util.concurrent.CompletionException: io.vertx.core.impl.NoStackTraceThrowable: Timeout.

После чего вы будете считывать новое сообщение каждую минуту, но в БД сохранить вам ничего не удастся.

😿
Подробнее о проблеме можно почитать тут: issue-26379.
😿
Проект для демонстрации: quarkus-kafka-panache

Вы можете скланировать себе проект для демонстрации и воспроизвести проблему у себя. Проект состоит из двух логически независимых модуля-приложения:

  • quarkus-kafka-panache-generator – создает и отправляет сообщения в кафку
  • quarkus-kafka-panache-problem – принимает сообщения и пытается сохранить их в БД.

Как я уже сказал, склонируйте проект для детального изучения, в статье я покажу только класс принимающий сообщение из кафки

package dev.struchkov.example.quarkus.kafka;

import io.smallrye.mutiny.Uni;
import lombok.RequiredArgsConstructor;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
@RequiredArgsConstructor
public class KafkaHandler {

    private final EntityRepositoryImpl panacheRepository;

    @Incoming("test")
    public Uni<Void> handle(KafkaMessage message) {
        System.out.println("Получено сообщение " + message);
        final EntityForDb entityForDb = new EntityForDb();
        entityForDb.setCount(message.getCount());
        return panacheRepository.save(entityForDb).replaceWithVoid();
    }

}
package dev.struchkov.example.quarkus.kafka;

import io.quarkus.hibernate.reactive.panache.PanacheRepository;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PanacheRepositoryImpl implements PanacheRepository<EntityForDb> {

}

Как видите ничего особенного, получаем сообщение из кафки, преобразуем его в сущность и пытаемся сохранить в БД, но в итоге получаем следующую картину

Почему это происходит?

По умолчанию в Quarkus используется пул из 5 соединений для доступа к базе данных при работе с реактивным драйвером. Судя по всему при использовании кафки используемое соединение не возвращается в пул и оно становится не доступно.

Поэтому если увеличить пул соединений до 20, то вы сохраните первые 20 сообщений, и уже потом получите ошибку. Нашу проблему это не решает, но увеличить размер пула однозначно стоит.

Для этого необходимо изменить значение параметра max-size в application.yml. Я обычно устанавливаю 20.

quarkus:
  datasource:
    db-kind: postgresql
    username: ${DB_USERNAME}
    password: ${DB_PASSWORD}
    jdbc: false
    reactive:
      url: ${DATABASE_URL}
      max-size: 20

Как с этим жить?

Никак. Необходимо отказаться от использования Hibernate Panache в сценариях с использованием кафки. Для взаимодействия с базой используйте Mutiny.SessionFactory.

😺
Рабочая версия в ветке: solution

Давайте переделаем наш пример и заставим его работать:

package dev.struchkov.example.quarkus.kafka;

import io.smallrye.mutiny.Uni;
import lombok.RequiredArgsConstructor;
import org.hibernate.reactive.mutiny.Mutiny;

import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
@RequiredArgsConstructor
public class EntityRepositoryImpl {

    private final Mutiny.SessionFactory factory;

    public Uni<EntityForDb> save(EntityForDb entityForDb) {
        return factory.withTransaction(
                session -> session.merge(entityForDb)
        );
    }

}
⚠️
Учтите, что Mutiny.SessionFactory не будет работать, если вызов не будет исходить от кафки.

Посмотрим на вывод консоли:

Все сообщения были получены и сохранены в БД, никаких ошибок и задержек.

Используя factory вы можете не только сохранять сущности, но и выполнять запросы JPQL/SQL, а также создавать запросы с помощью Criteria API. Вот пример использования JPQL:

public Uni<Person> findByTelegramId(@NonNull Long telegramId) {
    return factory.withTransaction(
            session -> session.<Person>createQuery("SELECT p FROM Person p WHERE p.telegramId =" + telegramId).getSingleResult()
                    .onFailure().recoverWithNull()
    );
}

Но в случае с JPQL придется забыть про удобный способ подстановки параметров запроса с помощью аргументов метода, здесь это не сработает. Поэтому нужно заранее собрать запрос целиком.

Резюмирую

В старых версиях Quarkus есть проблема при работе Kafka и Hibernate Panache. Однако в новых версиях эту проблему исправили. По возможности обновитесь на новую версию.

Если обновление не возможно, то учитывайте, что соединения с базой данных не закрываются и ваш пул соединений быстро пустеет. Обходным решением является использование Hibarnate с помощью таких объектов, как Mutiny.SessionFactory.

Struchkov Mark
Struchkov Mark
Задавайте вопросы, если что-то осталось не понятным👇