Home > Spring > Spring WebFlux > Spring WebFlux & Reactive MongoDB

Spring WebFlux & Reactive MongoDB

In this article, I will be providing a step-by-step guide to developing the Spring WebFlux RestFul API using a reactive programming approach along with Reactive MongoDB.

Spring WebFlux is the spring framework step towards the reactive programming model. It uses Reactor to support the Reactive Streams API and runs on the netty server. Here we will be using Reactive MongoDB to create an application that creates and show the list of employees.

  1. pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.6</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
 
    <groupId>com.jkoder.employee</groupId>
    <artifactId>spring-boot-webflux-example</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
 
    <name>spring-boot-webflux-example</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>17</java.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId> 
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>javax.xml.bind</groupId>
            <artifactId>jaxb-api</artifactId>
            <version>2.3.0</version> 
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
 
</project>

2. application.properties

server.port = 9091

mongodb.hostname=localhost
mongodb.dbport=27017
mongodb.dbname=employeedb


3. Mongo DB config

package com.jkoder.employee.config;

import com.mongodb.MongoClientSettings;
import com.mongodb.ServerAddress;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
 
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;

import java.util.Arrays;

@Configuration
@EnableReactiveMongoRepositories(basePackages = "com.jkoder.employee.dao")
public class MongoDbConfig extends AbstractReactiveMongoConfiguration{
    @Value("${mongodb.hostname}")
    private String hostname;

    @Value("${mongodb.dbport}")
    private String port;
     
    @Value("${mongodb.dbname}")
    private String dbName;
 
    @Override
    public MongoClient reactiveMongoClient() {
        return MongoClients.create(
                MongoClientSettings.builder()
                        .applyToClusterSettings(builder ->
                                builder.hosts(Arrays.asList(new ServerAddress(getHostname(), Integer.parseInt(getPort())))))
                        .build());
    }
 
    @Override
    protected String getDatabaseName() {
        return dbName;
    }

    protected String getHostname() {
        return hostname;
    }

    protected String getPort() {
        return port;
    }
 
    @Bean
    public ReactiveMongoTemplate reactiveMongoTemplate() {
        return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
    }
}
4. WebFluxConfig
package com.jkoder.employee.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;

@Configuration
@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer{
}

5. Dao

package com.jkoder.employee.dao;

import com.jkoder.employee.model.Employee;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface EmployeeRepository extends ReactiveMongoRepository<Employee, Integer> {
}

6. Models

package com.jkoder.employee.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;
import org.springframework.data.mongodb.core.mapping.Document;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Scope(scopeName = "request", proxyMode = ScopedProxyMode.TARGET_CLASS)
@Document(collection = "employee")
public class Employee {
    @Transient
    public static final String SEQUENCE_NAME = "employee_sequence";

    @Id
    long id;

    String name;

    String role;
}

package com.jkoder.employee.model;

import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Data
@Document(collection = "database_sequences")
public class DatabaseSequence {

    @Id
    private String id;

    private long sequence;

}

Here we are keeping the sequence in another collection and using that sequence to auto-generate the document id.

package com.jkoder.employee.service;

import java.util.concurrent.ExecutionException;

public interface ISequenceGeneratorService {
    long generateSequence(final String sequenceName) throws InterruptedException, ExecutionException;
}

package com.jkoder.employee.service;

import com.jkoder.employee.model.DatabaseSequence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;

import java.util.concurrent.ExecutionException;

import static org.springframework.data.mongodb.core.FindAndModifyOptions.options;

@Service
public class SequenceGeneratorService implements ISequenceGeneratorService{
    private static final Logger logger = LoggerFactory.getLogger(SequenceGeneratorService.class);

    private ReactiveMongoOperations mongoOperations;

    @Autowired
    public SequenceGeneratorService(ReactiveMongoOperations mongoOperations) {
        this.mongoOperations = mongoOperations;
    }

    @Override
    public long generateSequence(final String sequenceName) throws InterruptedException, ExecutionException {
        return mongoOperations.findAndModify(new Query(Criteria.where("_id").is(sequenceName)),
                new Update().inc("sequence", 1), options().returnNew(true).upsert(true), DatabaseSequence.class).doOnSuccess(object -> {
            logger.debug("databaseSequence is evaluated: {}", object);
        }).toFuture().get().getSequence();
    }
}

package com.jkoder.employee.listener;

import com.jkoder.employee.model.Employee;
import com.jkoder.employee.service.ISequenceGeneratorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.mapping.event.AbstractMongoEventListener;
import org.springframework.data.mongodb.core.mapping.event.BeforeConvertEvent;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutionException;

@Component
public class EmployeeModelListener extends AbstractMongoEventListener<Employee> {
    private static final Logger logger = LoggerFactory.getLogger(EmployeeModelListener.class);

    private ISequenceGeneratorService sequenceGenerator;

    @Autowired
    public EmployeeModelListener(ISequenceGeneratorService sequenceGenerator) {
        this.sequenceGenerator = sequenceGenerator;
    }

    @Override
    public void onBeforeConvert(BeforeConvertEvent<Employee> event) {
        try {
            event.getSource().setId(sequenceGenerator.generateSequence(Employee.SEQUENCE_NAME));
        } catch (InterruptedException | ExecutionException e) {
            logger.error("Error:{}", e.getMessage());
        }
    }
}

7. Services class

package com.jkoder.employee.service;

import com.jkoder.employee.model.Employee;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
public interface IEmployeeService
{
    void createEmp(Employee e);
     
    Mono<Employee> findByEmpId(Integer id);
 
    Flux<Employee> findAllEmp();

}
package com.jkoder.employee.service;

import com.jkoder.employee.dao.EmployeeRepository;
import com.jkoder.employee.model.Employee;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class EmployeeServiceImpl implements IEmployeeService {

    @Autowired
    EmployeeRepository employeeRepo;

    public void createEmp(Employee employee) {
        employeeRepo.save(employee).subscribe();
    }

    public Mono<Employee> findByEmpId(Integer id) {
        return employeeRepo.findById(id);
    }

    public Flux<Employee> findAllEmp() {
        return employeeRepo.findAll();
    }
}

8. Controller class

package com.jkoder.employee.controller;

import com.jkoder.employee.model.Employee;
import com.jkoder.employee.service.IEmployeeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class EmployeeController {
    @Autowired
    private IEmployeeService employeeService;

    @PostMapping("/create/emp")
    @ResponseStatus(HttpStatus.CREATED)
    public void createEmp(@RequestBody Employee employee) {
        employeeService.createEmp(employee);
    }

    @GetMapping(value = "/get/all")
    @ResponseBody
    public Flux<Employee> findAll() {
        return employeeService.findAllEmp();
    }

    @GetMapping("/get/{id}")
    public ResponseEntity<Mono<Employee>> findEmpById(@PathVariable("id") Integer id) {
        Mono<Employee> employee = employeeService.findByEmpId(id);
        return new ResponseEntity<Mono<Employee>>(employee, employee != null ? HttpStatus.OK : HttpStatus.NOT_FOUND);
    }


}

And the last the spring boot main class

package com.jkoder.employee;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootWebFluxExample {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootWebFluxExample.class, args);
    }
}

Below is the sample CURL for the create and get employee API

  1. Create Employee CURL
curl --location --request POST 'http://localhost:9091/create/emp' \
--header 'Content-Type: application/json' \
--data-raw '{
    "name": "Jkoder",
    "role" : "Technical Architect"
}'

2. Get All Employee CURL

curl --location --request GET 'http://localhost:9091/get/all'

Code can be downloaded from the GitHub Repositoryhttps://github.com/asifiq/spring-webflux-reactive-mongodb