Итак, новая статья в продолжение рубрики "костыли Quarkus-а". В прошлый раз мы боролись с миграциями базы данных с реактивным драйвером БД. В этот раз поговорим об особенностях работы кафки с Hibernate Panache. На самом деле особенность простая, вместе они не работают 😅
Но давайте по порядку. Типичная ситуация: вы считываете сообщение с топика кафки, выполняете какую-то бизнес логику и хотите сохранить результат в БД. Довольно простая и обыденная ситуация, но только не в реактивном Quarkus.
При попытке сохранить полученные данные из кафки в БД через Hibernate Panache возможны два варианта. Они зависят от версии Quarkus, который используется на проекте.
Quarkus 2.14.0+
Сообщения из кафки будут успешно читаться, но при попытке сохранить что-то в БД выбрасывается следующее исключение:
2022-11-09 16:55:53,541 WARN [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-1) SRMSG18204: A message sent to channel `test` has been nacked, ignored failure is: RequestScoped context was not active when trying to obtain a bean instance for a client proxy of PRODUCER_METHOD bean [class=io.quarkus.hibernate.reactive.runtime.ReactiveSessionProducer, id=37346551fd6c9e87c80f25842d68c2d902107673]
- you can activate the request context for a specific method using the @ActivateRequestContext interceptor binding.
Важно, сообщения продолжают читаться из топика, проблемы только с сохранением в БД.
В этом случае проблему довольно легко решить. Но не ручаюсь, что это самое хорошее и оптимальное решение. Просто добавляем аннотацию @ActivateRequestContext
над @Incoming
:
@Incoming("test")
@ActivateRequestContext
public Uni<Void> handle(KafkaMessage message) {
System.out.println("Получено сообщение " + message);
final EntityForDb entityForDb = new EntityForDb();
entityForDb.setCount(message.getCount());
return panacheRepository.persistAndFlush(entityForDb).replaceWithVoid();
}
Quakrus 2.13.4 и старее
В этой версии будут сохранены только первые 5 полученных сообщения, после чего вы будете получать следующие исключение:
2022-08-07 12:29:14,407 WARN [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-1) SRMSG18204: A message sent to channel `test` has been nacked, ignored failure is: org.hibernate.HibernateException: java.util.concurrent.CompletionException: io.vertx.core.impl.NoStackTraceThrowable: Timeout.
После чего вы будете считывать новое сообщение каждую минуту, но в БД сохранить вам ничего не удастся.
Вы можете скланировать себе проект для демонстрации и воспроизвести проблему у себя. Проект состоит из двух логически независимых модуля-приложения:
quarkus-kafka-panache-generator
– создает и отправляет сообщения в кафкуquarkus-kafka-panache-problem
– принимает сообщения и пытается сохранить их в БД.
Как я уже сказал, склонируйте проект для детального изучения, в статье я покажу только класс принимающий сообщение из кафки
package dev.struchkov.example.quarkus.kafka;
import io.smallrye.mutiny.Uni;
import lombok.RequiredArgsConstructor;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
@RequiredArgsConstructor
public class KafkaHandler {
private final EntityRepositoryImpl panacheRepository;
@Incoming("test")
public Uni<Void> handle(KafkaMessage message) {
System.out.println("Получено сообщение " + message);
final EntityForDb entityForDb = new EntityForDb();
entityForDb.setCount(message.getCount());
return panacheRepository.save(entityForDb).replaceWithVoid();
}
}
package dev.struchkov.example.quarkus.kafka;
import io.quarkus.hibernate.reactive.panache.PanacheRepository;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PanacheRepositoryImpl implements PanacheRepository<EntityForDb> {
}
Как видите ничего особенного, получаем сообщение из кафки, преобразуем его в сущность и пытаемся сохранить в БД, но в итоге получаем следующую картину
Почему это происходит?
По умолчанию в Quarkus используется пул из 5 соединений для доступа к базе данных при работе с реактивным драйвером. Судя по всему при использовании кафки используемое соединение не возвращается в пул и оно становится не доступно.
Поэтому если увеличить пул соединений до 20, то вы сохраните первые 20 сообщений, и уже потом получите ошибку. Нашу проблему это не решает, но увеличить размер пула однозначно стоит.
Для этого необходимо изменить значение параметра max-size
в application.yml
. Я обычно устанавливаю 20.
quarkus:
datasource:
db-kind: postgresql
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
jdbc: false
reactive:
url: ${DATABASE_URL}
max-size: 20
Как с этим жить?
Никак. Необходимо отказаться от использования Hibernate Panache в сценариях с использованием кафки. Для взаимодействия с базой используйте Mutiny.SessionFactory
.
Давайте переделаем наш пример и заставим его работать:
package dev.struchkov.example.quarkus.kafka;
import io.smallrye.mutiny.Uni;
import lombok.RequiredArgsConstructor;
import org.hibernate.reactive.mutiny.Mutiny;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
@RequiredArgsConstructor
public class EntityRepositoryImpl {
private final Mutiny.SessionFactory factory;
public Uni<EntityForDb> save(EntityForDb entityForDb) {
return factory.withTransaction(
session -> session.merge(entityForDb)
);
}
}
Посмотрим на вывод консоли:
Все сообщения были получены и сохранены в БД, никаких ошибок и задержек.
Используя factory
вы можете не только сохранять сущности, но и выполнять запросы JPQL/SQL, а также создавать запросы с помощью Criteria API. Вот пример использования JPQL:
public Uni<Person> findByTelegramId(@NonNull Long telegramId) {
return factory.withTransaction(
session -> session.<Person>createQuery("SELECT p FROM Person p WHERE p.telegramId =" + telegramId).getSingleResult()
.onFailure().recoverWithNull()
);
}
Но в случае с JPQL придется забыть про удобный способ подстановки параметров запроса с помощью аргументов метода, здесь это не сработает. Поэтому нужно заранее собрать запрос целиком.
Резюмирую
В старых версиях Quarkus есть проблема при работе Kafka и Hibernate Panache. Однако в новых версиях эту проблему исправили. По возможности обновитесь на новую версию.
Если обновление не возможно, то учитывайте, что соединения с базой данных не закрываются и ваш пул соединений быстро пустеет. Обходным решением является использование Hibarnate с помощью таких объектов, как Mutiny.SessionFactory
.