Сервисы с Apache Kafka и тестирование
Когда сервисы интегрируются при помощи Kafka очень удобно использовать REST API, как универсальный и стандартный способ обмена сообщениями. При увеличении количества сервисов сложность коммуникаций увеличивается. Для контроля можно и нужно использовать интеграционное тестирование. Такие библиотеки как testcontainers или EmbeddedServer прекрасно помогают организовать такое тестирование. Существуют много примеров для micronaut, Spring Boot и т.д. Но в этих примерах опущены некоторые детали, которые не позволяют с первого раза запустить код. В статье приводятся примеры с подробным описанием и ссылками на код.
Для простоты можно принять такой REST API.
/runs — POST-метод. Инициализирует запрос в канал связи. Принимает данные и возвращает ключ запроса.
/runs/{key}/status – GET-метод. По ключу возвращает статус запроса. Может принимать следующие значения: UNKNOWN, RUNNING, DONE.
/runs /{key} – GET-метод. По ключу возвращает результат запроса.
Подобный API реализован у livy, хотя и для других задач.
Будут использоваться: micronaut, Spring Boot.
micronaut
Контроллер для API.
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Post;
import io.reactivex.Maybe;
import io.reactivex.schedulers.Schedulers;
import javax.inject.Inject;
import java.util.UUID;
@Controller("/runs")
public class RunController {
@Inject
RunClient runClient;
@Inject
RunCache runCache;
@Post
public String runs(@Body String body) {
String key = UUID.randomUUID().toString();
runCache.statuses.put(key, RunStatus.RUNNING);
runCache.responses.put(key, "");
runClient.sendRun(key, new Run(key, RunType.REQUEST, "", body));
return key;
}
@Get("/{key}/status")
public Maybe<RunStatus> getRunStatus(String key) {
return Maybe.just(key)
.subscribeOn(Schedulers.io())
.map(it -> runCache.statuses.getOrDefault(it, RunStatus.UNKNOWN));
}
@Get("/{key}")
public Maybe<String> getRunResponse(String key) {
return Maybe.just(key)
.subscribeOn(Schedulers.io())
.map(it -> runCache.responses.getOrDefault(it, ""));
}
}
Отправка сообщений в kafka.
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.messaging.annotation.Body;
@KafkaClient
public interface RunClient {
@Topic("runs")
void sendRun(@KafkaKey String key, @Body Run run);
}
Получение сообщений из kafka.
import io.micronaut.configuration.kafka.annotation.*;
import io.micronaut.messaging.annotation.Body;
import javax.inject.Inject;
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class RunListener {
@Inject
RunCalculator runCalculator;
@Topic("runs")
public void receive(@KafkaKey String key, @Body Run run) {
runCalculator.run(key, run);
}
}
Обработка сообщений происходит в RunCalculator. Для тестов используется особая реализация, в которой происходит переброска сообщений.
import io.micronaut.context.annotation.Replaces;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.UUID;
@Replaces(RunCalculatorImpl.class)
@Singleton
public class RunCalculatorWithWork implements RunCalculator {
@Inject
RunClient runClient;
@Inject
RunCache runCache;
@Override
public void run(String key, Run run) {
if (RunType.REQUEST.equals(run.getType())) {
String runKey = run.getKey();
String newKey = UUID.randomUUID().toString();
String runBody = run.getBody();
runClient.sendRun(newKey, new Run(newKey, RunType.RESPONSE, runKey, runBody + "_calculated"));
} else if (RunType.RESPONSE.equals(run.getType())) {
runCache.statuses.replace(run.getResponseKey(), RunStatus.DONE);
runCache.responses.replace(run.getResponseKey(), run.getBody());
}
}
}
Тест.
import io.micronaut.http.HttpRequest;
import io.micronaut.http.client.HttpClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
public abstract class RunBase {
void run(HttpClient client) {
String key = client.toBlocking().retrieve(HttpRequest.POST("/runs", "body"));
RunStatus runStatus = RunStatus.UNKNOWN;
while (runStatus != RunStatus.DONE) {
runStatus = client.toBlocking().retrieve(HttpRequest.GET("/runs/" + key + "/status"), RunStatus.class);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String response = client.toBlocking().retrieve(HttpRequest.GET("/runs/" + key), String.class);
assertEquals("body_calculated", response);
}
}
Для использования EmbeddedServer необходимо.
Подключить библиотеки:
testImplementation("org.apache.kafka:kafka-clients:2.6.0:test")
testImplementation("org.apache.kafka:kafka_2.12:2.6.0")
testImplementation("org.apache.kafka:kafka_2.12:2.6.0:test")
Тест может выглядеть так.
import io.micronaut.context.ApplicationContext;
import io.micronaut.http.client.HttpClient;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
public class RunKeTest extends RunBase {
@Test
void test() {
Map<String, Object> properties = new HashMap<>();
properties.put("kafka.bootstrap.servers", "localhost:9092");
properties.put("kafka.embedded.enabled", "true");
try (EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class, properties)) {
ApplicationContext applicationContext = embeddedServer.getApplicationContext();
HttpClient client = applicationContext.createBean(HttpClient.class, embeddedServer.getURI());
run(client);
}
}
}
Для использования testcontainers необходимо.
Подключить библиотеки:
implementation("org.testcontainers:kafka:1.14.3")
Тест может выглядеть так.
import io.micronaut.context.ApplicationContext;
import io.micronaut.http.client.HttpClient;
import io.micronaut.runtime.server.EmbeddedServer;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import java.util.HashMap;
import java.util.Map;
public class RunTcTest extends RunBase {
@Test
public void test() {
try (KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.3"))) {
kafka.start();
Map<String, Object> properties = new HashMap<>();
properties.put("kafka.bootstrap.servers", kafka.getBootstrapServers());
try (EmbeddedServer embeddedServer = ApplicationContext.run(EmbeddedServer.class, properties)) {
ApplicationContext applicationContext = embeddedServer.getApplicationContext();
HttpClient client = applicationContext.createBean(HttpClient.class, embeddedServer.getURI());
run(client);
}
}
}
}
Spring Boot
Контроллер для API.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.UUID;
@RestController
@RequestMapping("/runs")
public class RunController {
@Autowired
private RunClient runClient;
@Autowired
private RunCache runCache;
@PostMapping()
public String runs(@RequestBody String body) {
String key = UUID.randomUUID().toString();
runCache.statuses.put(key, RunStatus.RUNNING);
runCache.responses.put(key, "");
runClient.sendRun(key, new Run(key, RunType.REQUEST, "", body));
return key;
}
@GetMapping("/{key}/status")
public RunStatus getRunStatus(@PathVariable String key) {
return runCache.statuses.getOrDefault(key, RunStatus.UNKNOWN);
}
@GetMapping("/{key}")
public String getRunResponse(@PathVariable String key) {
return runCache.responses.getOrDefault(key, "");
}
}
Отправка сообщений в kafka.
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class RunClient {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
public void sendRun(String key, Run run) {
String data = "";
try {
data = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(run);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
kafkaTemplate.send("runs", key, data);
}
}
Получение сообщений из kafka.
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class RunListener {
@Autowired
private ObjectMapper objectMapper;
@Autowired
private RunCalculator runCalculator;
@KafkaListener(topics = "runs", groupId = "m-group")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
String key = consumerRecord.key().toString();
Run run = null;
try {
run = objectMapper.readValue(consumerRecord.value().toString(), Run.class);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
runCalculator.run(key, run);
}
}
Обработка сообщений происходит в RunCalculator. Для тестов используется особая реализация, в которой происходит переброска сообщений.
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class RunCalculatorWithWork implements RunCalculator {
@Autowired
RunClient runClient;
@Autowired
RunCache runCache;
@Override
public void run(String key, Run run) {
if (RunType.REQUEST.equals(run.getType())) {
String runKey = run.getKey();
String newKey = UUID.randomUUID().toString();
String runBody = run.getBody();
runClient.sendRun(newKey, new Run(newKey, RunType.RESPONSE, runKey, runBody + "_calculated"));
} else if (RunType.RESPONSE.equals(run.getType())) {
runCache.statuses.replace(run.getResponseKey(), RunStatus.DONE);
runCache.responses.replace(run.getResponseKey(), run.getBody());
}
}
}
Тест.
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
public abstract class RunBase {
void run(MockMvc mockMvc, ObjectMapper objectMapper) throws Exception {
MvcResult keyResult = mockMvc.perform(MockMvcRequestBuilders.post("/runs")
.content("body")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn();
String key = keyResult.getResponse().getContentAsString();
RunStatus runStatus = RunStatus.UNKNOWN;
while (runStatus != RunStatus.DONE) {
MvcResult statusResult = mockMvc.perform(MockMvcRequestBuilders.get("/runs/" + key + "/status")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn();
runStatus = objectMapper.readValue(statusResult.getResponse().getContentAsString(), RunStatus.class);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String response = mockMvc.perform(MockMvcRequestBuilders.get("/runs/" + key)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andReturn().getResponse().getContentAsString();
assertEquals("body_calculated", response);
}
}
Для использования EmbeddedServer необходимо.
Подключить библиотеки:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.5.10.RELEASE</version>
<scope>test</scope>
</dependency>
Тест может выглядеть так.
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.web.servlet.MockMvc;
@AutoConfigureMockMvc
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
@Import(RunKeTest.RunKeTestConfiguration.class)
public class RunKeTest extends RunBase {
@Autowired
private MockMvc mockMvc;
@Autowired
private ObjectMapper objectMapper;
@Test
void test() throws Exception {
run(mockMvc, objectMapper);
}
@TestConfiguration
static class RunKeTestConfiguration {
@Autowired
private RunCache runCache;
@Autowired
private RunClient runClient;
@Bean
public RunCalculator runCalculator() {
RunCalculatorWithWork runCalculatorWithWork = new RunCalculatorWithWork();
runCalculatorWithWork.runCache = runCache;
runCalculatorWithWork.runClient = runClient;
return runCalculatorWithWork;
}
}
}
Для использования testcontainers необходимо.
Подключить библиотеки:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.14.3</version>
<scope>test</scope>
</dependency>
Тест может выглядеть так.
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.ClassRule;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.test.web.servlet.MockMvc;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import java.util.HashMap;
import java.util.Map;
@AutoConfigureMockMvc
@SpringBootTest
@Import(RunTcTest.RunTcTestConfiguration.class)
public class RunTcTest extends RunBase {
@ClassRule
public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.3"));
static {
kafka.start();
}
@Autowired
private MockMvc mockMvc;
@Autowired
private ObjectMapper objectMapper;
@Test
void test() throws Exception {
run(mockMvc, objectMapper);
}
@TestConfiguration
static class RunTcTestConfiguration {
@Autowired
private RunCache runCache;
@Autowired
private RunClient runClient;
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "m-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public RunCalculator runCalculator() {
RunCalculatorWithWork runCalculatorWithWork = new RunCalculatorWithWork();
runCalculatorWithWork.runCache = runCache;
runCalculatorWithWork.runClient = runClient;
return runCalculatorWithWork;
}
}
}
Перед всеми тестами необходимо стартовать kafka. Это делается вот таким вот образом:
kafka.start();
Дополнительные свойства для kafka в тестах можно задать в ресурсном файле.
application.yml
spring:
kafka:
consumer:
auto-offset-reset: earliest