---
title: Create the Spring Boot project
linkTitle: Create the project
description: Set up a Spring Boot project with Kafka, Spring Data JPA, and MySQL.
weight: 10
---
## Set up the project
Create a Spring Boot project from [Spring Initializr](https://start.spring.io)
by selecting the **Spring for Apache Kafka**, **Spring Data JPA**, **MySQL Driver**,
and **Testcontainers** starters.
Alternatively, clone the
[guide repository](https://github.com/testcontainers/tc-guide-testing-spring-boot-kafka-listener).
After generating the application, add the Awaitility library as a test
dependency. You'll use it later to assert the expectations of an asynchronous
process flow.
The key dependencies in `pom.xml` are:
```xml
17
2.0.4
org.springframework.boot
spring-boot-starter-data-jpa
org.springframework.kafka
spring-kafka
com.mysql
mysql-connector-j
runtime
org.springframework.boot
spring-boot-starter-test
test
org.springframework.kafka
spring-kafka-test
test
org.testcontainers
testcontainers-junit-jupiter
test
org.testcontainers
testcontainers-kafka
test
org.testcontainers
testcontainers-mysql
test
org.awaitility
awaitility
test
```
Using the Testcontainers BOM (Bill of Materials) is recommended so that you
don't have to repeat the version for every Testcontainers module dependency.
## Create the JPA entity
The application listens to a topic called `product-price-changes`. When a
message arrives, it extracts the product code and price from the event payload
and updates the price for that product in the MySQL database.
Create `Product.java`:
```java
package com.testcontainers.demo;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import java.math.BigDecimal;
@Entity
@Table(name = "products")
class Product {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String code;
@Column(nullable = false)
private String name;
@Column(nullable = false)
private BigDecimal price;
public Product() {}
public Product(Long id, String code, String name, BigDecimal price) {
this.id = id;
this.code = code;
this.name = name;
this.price = price;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
}
```
## Create the Spring Data JPA repository
Create a repository interface for the `Product` entity with a method to find a
product by code and a method to update the price for a given product code:
```java
package com.testcontainers.demo;
import java.math.BigDecimal;
import java.util.Optional;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
interface ProductRepository extends JpaRepository {
Optional findByCode(String code);
@Modifying
@Query("update Product p set p.price = :price where p.code = :productCode")
void updateProductPrice(
@Param("productCode") String productCode,
@Param("price") BigDecimal price
);
}
```
## Add a schema creation script
Because the application doesn't use an in-memory database, you need to create
the MySQL tables. The recommended approach for production is a migration tool
like Flyway or Liquibase, but for this guide the built-in Spring Boot schema
initialization is sufficient.
Create `src/main/resources/schema.sql`:
```sql
create table products (
id int NOT NULL AUTO_INCREMENT,
code varchar(255) not null,
name varchar(255) not null,
price numeric(5,2) not null,
PRIMARY KEY (id),
UNIQUE (code)
);
```
Enable schema initialization in `src/main/resources/application.properties`:
```properties
spring.sql.init.mode=always
```
## Create the event payload
Create a record named `ProductPriceChangedEvent` that represents the structure
of the event payload received from the Kafka topic:
```java
package com.testcontainers.demo;
import java.math.BigDecimal;
record ProductPriceChangedEvent(String productCode, BigDecimal price) {}
```
The sender and receiver agree on the following JSON format:
```json
{
"productCode": "P100",
"price": 25.0
}
```
## Implement the Kafka listener
Create `ProductPriceChangedEventHandler.java`, which handles messages from the
`product-price-changes` topic and updates the product price in the database:
```java
package com.testcontainers.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
@Transactional
class ProductPriceChangedEventHandler {
private static final Logger log = LoggerFactory.getLogger(
ProductPriceChangedEventHandler.class
);
private final ProductRepository productRepository;
ProductPriceChangedEventHandler(ProductRepository productRepository) {
this.productRepository = productRepository;
}
@KafkaListener(topics = "product-price-changes", groupId = "demo")
public void handle(ProductPriceChangedEvent event) {
log.info(
"Received a ProductPriceChangedEvent with productCode:{}: ",
event.productCode()
);
productRepository.updateProductPrice(event.productCode(), event.price());
}
}
```
The `@KafkaListener` annotation specifies the topic name to listen to. Spring
Kafka handles serialization and deserialization based on the properties
configured in `application.properties`.
## Configure Kafka serialization
Add the following Kafka properties to
`src/main/resources/application.properties`:
```properties
######## Kafka Configuration #########
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.group-id=demo
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.testcontainers.demo
```
The `productCode` key is (de)serialized using `StringSerializer`/`StringDeserializer`,
and the `ProductPriceChangedEvent` value is (de)serialized using
`JsonSerializer`/`JsonDeserializer`.