How to Set Up Kafka Integration Test – Grape Up

Do you look at device testing as not enough solution for holding the application’s reliability and stability? Are you concerned that someway or somewhere there is a opportunity bug hiding in the assumption that unit assessments ought to go over all situations? And also is mocking Kafka not ample for challenge necessities? If even a single reply is  ‘yes’, then welcome to a awesome and straightforward guidebook on how to set up Integration Checks for Kafka using TestContainers and Embedded Kafka for Spring!

What is TestContainers?

TestContainers is an open-source Java library specialised in furnishing all necessary alternatives for the integration and testing of external sources. It suggests that we are equipped to mimic an genuine database, world wide web server, or even an event bus surroundings and handle that as a trusted location to take a look at application functionality. All these fancy options are hooked into docker visuals, defined as containers. Do we require to check the database layer with real MongoDB? No worries, we have a examination container for that. We can not also neglect about UI assessments – Selenium Container will do something that we in fact need to have.
In our case, we will target on Kafka Testcontainer.

What is Embedded Kafka?

As the name implies, we are heading to deal with an in-memory Kafka occasion, ready to be utilized as a typical broker with whole functionality. It permits us to perform with producers and individuals, as standard, making our integration exams lightweight. 

Prior to we start

The principle for our take a look at is easy – I would like to examination Kafka buyer and producer using two different techniques and verify how we can make use of them in real circumstances. 

Kafka Messages are serialized applying Avro schemas.

Embedded Kafka – Producer Test

The thought is quick – let’s make a very simple task with the controller, which invokes a company approach to thrust a Kafka Avro serialized information.

Dependencies:

dependencies
implementation "org.apache.avro:avro:1.10.1"
implementation("io.confluent:kafka-avro-serializer:6.1.")
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'
implementation('org.springframework.cloud:spring-cloud-stream:3.1.1')
implementation('org.springframework.cloud:spring-cloud-stream-binder-kafka:3.1.1')

implementation('org.springframework.boot:spring-boot-starter-world-wide-web:2.4.3')
implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.cloud:spring-cloud-stream-take a look at-help:3.1.1')
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Also value mentioning fantastic plugin for Avro. Here plugins area:

plugins
id 'org.springframework.boot' model '2.6.8'
id 'io.spring.dependency-management' model '1..11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" variation "1.3."

Avro Plugin supports schema automobile-creating. This is a have to-have.

Url to plugin: https://github.com/davidmc24/gradle-avro-plugin

Now let us outline the Avro schema:


  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "form": "record",
  "title": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "avro.java.string": "String"


  ]

Our ProducerService will be focused only on sending messages to Kafka working with a template, absolutely nothing fascinating about that part. Most important performance can be done just using this line:

ListenableFuture> long run = this.kafkaTemplate.mail("sign up-ask for", kafkaMessage)

We can’t neglect about take a look at attributes:

spring:
  principal:
    permit-bean-definition-overriding: true
  kafka:
    consumer:
      group-id: team_id
      car-offset-reset: earliest
      critical-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      benefit-deserializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroDeserializer
    producer:
      automobile.register.schemas: genuine
      vital-serializer: org.apache.kafka.widespread.serialization.StringSerializer
      worth-serializer: com.grapeup.myawesome.myawesomeconsumer.common.CustomKafkaAvroSerializer
    attributes:
      certain.avro.reader: correct

As we see in the stated exam qualities, we declare a tailor made deserializer/serializer for KafkaMessages. It is very suggested to use Kafka with Avro – really do not permit JSONs retain item structure, let’s use civilized mapper and item definition like Avro.

Serializer:

public class CustomKafkaAvroSerializer extends KafkaAvroSerializer
    public CustomKafkaAvroSerializer()
        tremendous()
        super.schemaRegistry = new MockSchemaRegistryClient()


    community CustomKafkaAvroSerializer(SchemaRegistryClient customer)
        tremendous(new MockSchemaRegistryClient())


    general public CustomKafkaAvroSerializer(SchemaRegistryClient consumer, Map props)
        tremendous(new MockSchemaRegistryClient(), props)


Deserializer:

community class CustomKafkaAvroSerializer extends KafkaAvroSerializer
    public CustomKafkaAvroSerializer()
        tremendous()
        tremendous.schemaRegistry = new MockSchemaRegistryClient()


    general public CustomKafkaAvroSerializer(SchemaRegistryClient shopper)
        super(new MockSchemaRegistryClient())


    public CustomKafkaAvroSerializer(SchemaRegistryClient consumer, Map props)
        tremendous(new MockSchemaRegistryClient(), props)


And we have every thing to get started creating our take a look at.

@ExtendWith(SpringExtension.class)
@SpringBootTest
@AutoConfigureMockMvc
@TestInstance(TestInstance.Lifecycle.For each_Class)
@ActiveProfiles("examination")
@EmbeddedKafka(partitions = 1, topics = "sign-up-ask for")
class ProducerControllerTest {

All we require to do is incorporate @EmbeddedKafka annotation with listed subjects and partitions. Application Context will boot Kafka Broker with provided configuration just like that. Hold in thoughts that @TestInstance should really be used with specific thing to consider. Lifecycle.For every_Course will steer clear of making the same objects/context for each test process. Truly worth checking if assessments are far too time-consuming.

Customer consumerServiceTest
@BeforeEach
void Set up()
DefaultKafkaConsumerFactory consumer = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()

consumerServiceTest = shopper.createConsumer()
consumerServiceTest.subscribe(Collections.singletonList(Subject matter_Title))

Below we can declare the test shopper, centered on the Avro schema return kind. All Kafka attributes are already presented in the .yml file. That shopper will be used as a check if the producer in fact pushed a concept.

Listed here is the genuine test process:

@Exam
void whenValidInput_therReturns200() throws Exception
        RegisterRequestDto request = RegisterRequestDto.builder()
                .id(12)
                .tackle("tempAddress")
                .develop()

        mockMvc.carry out(
                publish("/sign-up-ask for")
                      .contentType("software/json")
                      .material(objectMapper.writeValueAsBytes(request)))
                .andExpect(standing().isOk())

      ConsumerRecord consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, Topic_Name)

        RegisterRequest valueReceived = consumedRegisterRequest.worth()

        assertEquals(12, valueReceived.getId())
        assertEquals("tempAddress", valueReceived.getAddress())

First of all, we use MockMvc to conduct an action on our endpoint. That endpoint works by using ProducerService to drive messages to Kafka. KafkaConsumer is applied to verify if the producer labored as envisioned. And that’s it – we have a totally operating check with embedded Kafka.

Check Containers – Purchaser Check

TestContainers are very little else like impartial docker illustrations or photos all set for staying dockerized. The pursuing take a look at circumstance will be improved by a MongoDB graphic. Why not hold our facts in the databases correct immediately after nearly anything took place in Kafka move?

Dependencies are not a great deal diverse than in the former case in point. The subsequent ways are desired for examination containers:

testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'

ext
set('testcontainersVersion', "1.17.1")


dependencyManagement
imports
mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"


Let’s target now on the Consumer part. The take a look at circumstance will be very simple – one consumer services will be liable for acquiring the Kafka information and storing the parsed payload in the MongoDB assortment. All that we want to know about KafkaListeners, for now, is that annotation:

@KafkaListener(topics = "register-ask for")

By the features of the annotation processor, KafkaListenerContainerFactory will be dependable to develop a listener on our technique. From this instant our strategy will react to any forthcoming Kafka information with the described matter.

Avro serializer and deserializer configs are the identical as in the prior test.

With regards to TestContainer, we really should commence with the next annotations:

@SpringBootTest
@ActiveProfiles("test")
@Testcontainers
public class AbstractIntegrationTest {

Through startup, all configured TestContainers modules will be activated. It signifies that we will get entry to the total functioning atmosphere of the picked resource. As illustration:

@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

@Container
community static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))

@Container
static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017)

As a consequence of booting the exam, we can be expecting two docker containers to start off with the delivered configuration.

What is definitely essential for the mongo container – it gives us full obtain to the database employing just a easy relationship uri. With this kind of a characteristic, we are capable to take a look what is the existing condition in our collections, even all through debug manner and geared up breakpoints.
Get a glance also at the Ryuk container – it operates like overwatch and checks if our containers have started off correctly.

And right here is the last aspect of the configuration:

@DynamicPropertySource
static void dataSourceProperties(DynamicPropertyRegistry registry)
   registry.incorporate("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.increase("spring.kafka.buyer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.increase("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.insert("spring.information.mongodb.uri", mongoDBContainer::getReplicaSetUrl)


static
   kafkaContainer.begin()
   mongoDBContainer.start()

   mongoDBContainer.waitingFor(Wait around.forListeningPort()
           .withStartupTimeout(Length.ofSeconds(180L)))


@BeforeTestClass
community void beforeTest()

   kafkaListenerEndpointRegistry.getListenerContainers().forEach(
           messageListenerContainer ->
               ContainerTestUtils
                       .waitForAssignment(messageListenerContainer, 1)


   )


@AfterAll
static void tearDown()
   kafkaContainer.quit()
   mongoDBContainer.stop()

DynamicPropertySource offers us the solution to established all necessary surroundings variables through the examination lifecycle. Strongly needed for any config uses for TestContainers. Also, beforeTestClass kafkaListenerEndpointRegistry waits for each and every listener to get anticipated partitions throughout container startup.

And the last section of the Kafka check containers journey – the principal physique of the test:

@Test
community void containerStartsAndPublicPortIsAvailable() throws Exception
   writeToTopic("sign-up-ask for", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").develop())

   //Wait around for KafkaListener
   TimeUnit.SECONDS.slumber(5)
   Assertions.assertEquals(1, taxiRepository.findAll().dimension())



non-public KafkaProducer createProducer()
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties())


personal void writeToTopic(String topicName, RegisterRequest... registerRequests)

   consider (KafkaProducer producer = createProducer())
       Arrays.stream(registerRequests)
               .forEach(registerRequest ->
                           ProducerRecord record = new ProducerRecord<>(topicName, registerRequest)
                           producer.send out(document)

               )


The tailor made producer is dependable for writing our information to KafkaBroker. Also, it is proposed to give some time for buyers to tackle messages thoroughly. As we see, the concept was not just eaten by the listener, but also saved in the MongoDB selection.

Conclusions

As we can see, present solutions for integration checks are quite straightforward to implement and manage in jobs. There is no level in holding just unit assessments and counting on all traces protected as a indication of code/logic quality. Now the concern is, must we use an Embedded option or TestContainers? I counsel first of all focusing on the term “Embedded”. As a best integration check, we want to get an virtually suitable copy of the generation natural environment with all qualities/attributes provided. In-memory remedies are fantastic, but typically, not plenty of for large organization projects. Surely, the benefit of Embedded companies is the straightforward way to apply these exams and manage configuration, just when just about anything takes place in memory.
TestContainers at the initial sight could possibly glance like overkill, but they give us the most important aspect, which is a independent natural environment. We never have to even depend on existing docker visuals – if we want we can use tailor made ones. This is a big advancement for probable exam scenarios.
What about Jenkins? There is no reason to be frightened also to use TestContainers in Jenkins. I firmly suggest examining TestContainers documentation on how effortlessly we can set up the configuration for Jenkins agents.
To sum up – if there is no blocker or any unwelcome problem for working with TestContainers, then never wait. It is often superior to keep all companies managed and secured with integration exam contracts.