In this post we will continue exploring the capabilities of Spring WebFlux by means of creating a basic CRUD application. We will not be using a database in this post because I did not want to be distracted with database stuff at this moment 😉 Adding a database will be handled in part 2. Furthermore, we will implement a few CRUD operations and see how we can unit test a Flux. Source code can be found at GitHub.

Setup

In order to get started, we will create a project with Spring Initializr. Select the following dependencies:

  • Java 9
  • Spring Boot 2.0.0
  • Reactive Web

Generate the project and import it in your IDE. Now we are ready to get started.

What are we going to build? Let’s assume that we have a ticketing company which sells tickets for several kinds of shows (e.g. concerts, musicals, etc.). The shows are listed in our ‘database’ and events occur on the shows like buying a ticket.

We will need the following:

  • The domain objects
  • A data repository
  • A handler (read: service)
  • A router

Create the domain objects

The domain objects will be created in package com.mydeveloperplanet.myspringwebfluxcrudplanet.domain. We need a Show domain object in order to store the shows we are selling tickets for. A Show has an id and a title. The corresponding getters and setters are generated.

public class Show {
  private String id;
  private String title;

  public Show(String id, String title) {
    this.id = id;
    this.title = title;
  }

  public String getId() {
    return id;
  }

  public void setId(String id) {
    this.id = id;
  }

  public String getTitle() {
    return title;
  }

  public void setTitle(String title) {
    this.title = title;
  }

}

Next to the Show domain object, we have a ShowEvent domain object, which will correspond to events which occur on a Show. The ShowEvent object has an id and a date which simulates the datetime the event occurred.

public class ShowEvent {
  private String id;
  private Date date;

  public ShowEvent(String id, Date date) {
    this.id = id;
    this.date = date;
  }

  public String getId() {
    return id;
  }

  public void setId(String id) {
    this.id = id;
  }

  public Date getDate() {
    return date;
  }

  public void setDate(Date date) {
    this.date = date;
  }

}

Create the Show repository

In order to create the repository, we will implement the ReactiveCrudRepository interface. In order to make use of this interface, we need to add the spring-data-commons dependency to our pom.

<dependency>
  <groupId>org.springframework.data</groupId>
  <artifactId>spring-data-commons</artifactId>
  <version>2.0.5.RELEASE</version>
</dependency>

We create in package com.mydeveloperplanet.myspringwebfluxcrudplanet.repositories a ReactiveShowRepository. We need to add the @Repository annotation in order to be able to inject it later on. As you can see, all the methods that need to be implemented, make use of the reactive types Mono and Flux.

@Repository
public class ReactiveShowRepository implements ReactiveCrudRepository<Show, String> {

  @Override
  public <S extends Show> Mono<S> save(S s) {
    return null;
  }

  @Override
  public <S extends Show> Flux<S> saveAll(Iterable<S> iterable) {
    return null;
  }

  @Override
  public <S extends Show> Flux<S> saveAll(Publisher<S> publisher) {
    return null;
  }

  @Override
  public Mono<Show> findById(String s) {
    return null;
  }

  @Override
  public Mono<Show> findById(Publisher<String> publisher) {
    return null;
  }

  @Override
  public Mono<Boolean> existsById(String s) {
    return null;
  }

  @Override
  public Mono<Boolean> existsById(Publisher<String> publisher) {
    return null;
  }

  @Override
  public Flux<Show> findAll() {
    return null;
  }

  @Override
  public Flux<Show> findAllById(Iterable<String> iterable) {
    return null;
  }

  @Override
  public Flux<Show> findAllById(Publisher<String> publisher) {
    return null;
  }

  @Override
  public Mono<Long> count() {
    return null;
  }

  @Override
  public Mono<Void> deleteById(String s) {
    return null;
  }

  @Override
  public Mono<Void> deleteById(Publisher<String> publisher) {
    return null;
  }

  @Override
  public Mono<Void> delete(Show show) {
    return null;
  }

  @Override
  public Mono<Void> deleteAll(Iterable<? extends Show> iterable) {
    return null;
  }

  @Override
  public Mono<Void> deleteAll(Publisher<? extends Show> publisher) {
    return null;
  }

  @Override
  public Mono<Void> deleteAll() {
    return null;
  }
}

Create the Handler

As seen in the previous post, a handler is the equivalent for a service in the MVC pattern. So, we create a ShowHandler which serves a method to retrieve all the shows we have to offer. We inject our ShowRepository and make use of the findAll method, wich we still need to implement. We will return the list as a list of Shows. Just notice that, although we are retrieving a list of Shows as a Flux, we convert it into a Mono of type ServerResponse. This can be confusing in the beginning.

@Component
public class ShowHandler {

  private final ReactiveShowRepository showRepository;

  public ShowHandler(ReactiveShowRepository showRepository) {
    this.showRepository = showRepository;
  }

  public Mono<ServerResponse> all(ServerRequest serverRequest) {
    Flux<Show> shows = this.showRepository.findAll();
    return ServerResponse.ok().body(shows, Show.class);
  }
}

Create the Router

Next step is to create the Router in order to link an url to our Handler. RouterFunctions are to be defined in a WebConfig class as can be read here. We add a route in order to retrieve all the Shows.

@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {

  @Bean
  public RouterFunction<ServerResponse> routeShow(ShowHandler showHandler) {
    return RouterFunctions
      .route(RequestPredicates.GET("/shows"), showHandler::all);
  }

}

Implement findAll operation

Our last step is to implement the findAll method in our ReactiveShowRepository. Because we are not using a database, we will store the items in a map. I am taking another shortcut here by setting the key of the map equal to the id of the Show object. This will make it easier to retrieve the Show object later on. The following is added to the ReactiveShowRepository class:

private final Map<String, Show> showsMap = new ConcurrentHashMap<>();

public ReactiveShowRepository() {
  showsMap.put("1", new Show("1", "Title1"));
  showsMap.put("2", new Show("2", "Title2"));
  showsMap.put("3", new Show("3", "Title3"));
  showsMap.put("4", new Show("4", "Title4"));
  showsMap.put("5", new Show("5", "Title5"));
}

The findAll method now retrieves the values from the map en returns them as a Flux.

@Override
public Flux<Show> findAll() {
  return Flux.fromIterable(showsMap.values());
}

Now its time to see if it all works. Start the application with the Maven target spring-boot:run. If started successfully, go to the url http://localhost:8080/shows. The following output should be visible in your browser:

[
  {"id":"1","title":"Title1"},
  {"id":"2","title":"Title2"},
  {"id":"3","title":"Title3"},
  {"id":"4","title":"Title4"},
  {"id":"5","title":"Title5"}
]

Implement findById operation

We also want to be able to retrieve a specific Show. Therefore, we will implement the findById method in the ReactiveShowRepository. It takes a String as a parameter which corresponds to a key in our map. With the key, we retrieve the Show that was requested. We then return it as a Mono. Notice that we use the justOrEmpty method. If we use the just method, it would give us an exception when an id was requested that doesn’t exist.

@Override
public Mono<Show> findById(String s) {
  return Mono.justOrEmpty(showsMap.get(s));
}

In the ShowHandler we add a byId method which extracts the id from the request, calls the findById method of our repository and returns it as a Mono.

public Mono<ServerResponse> byId(ServerRequest serverRequest) {
  String id = serverRequest.pathVariable("id");
  Mono<Show> show = this.showRepository.findById(id);
  return ServerResponse.ok().body(show, Show.class);
}

The only thing we need to do, is to add a route to our handler in the WebConfig class. More specific routes must be put before more general routes, so therefore we add the route before the shows route.

public RouterFunction<ServerResponse> routeShow(ShowHandler showHandler) {
  return RouterFunctions
    .route(RequestPredicates.GET("/shows/{id}"), showHandler::byId)
    .andRoute(RequestPredicates.GET("/shows"), showHandler::all);
}

Run the application and go to the url http://localhost:8080/shows/2. The following is output in the browser:

{"id":"2","title":"Title2"}

Create ShowEvent handler

Now it is time to do something with the ShowEvent we created earlier as a domain object. We will create a ShowEventHandler which will emit every second an event for a given Show. In order to accomplish this, we use the static generate method of Flux. The definition is as follows:

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)

So, the generate method takes a Consumer of type SynchronousSink. The SynchronousSink emits by means of the next method objects. In our ShowEventHandler we emit ShowEvents. If we would run this, then we will be flooded by ShowEvents. Therefore, we add the delayElements method to the Flux in order to make sure that we slow things down by emitting every second an object.

 

@Component
public class ShowEventHandler {

  public Mono<ServerResponse> events(ServerRequest serverRequest) {
    String showId = serverRequest.pathVariable("id");
    Flux<ShowEvent> events = Flux.<ShowEvent>generate(sink -> sink.next(new ShowEvent(showId, new Date()))).delayElements(Duration.ofSeconds(1));
    return ServerResponse.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(events, ShowEvent.class);
  }
}

Final step is to add a route in order that we can retrieve the events via an url. Again, we place it at the beginning of the routes, because it is the most specific route.

public RouterFunction<ServerResponse> routeShow(ShowHandler showHandler, ShowEventHandler showEventHandler) {

  return RouterFunctions
    .route(RequestPredicates.GET("/shows/{id}/events"), showEventHandler::events)
    .andRoute(RequestPredicates.GET("/shows/{id}"), showHandler::byId)
    .andRoute(RequestPredicates.GET("/shows"), showHandler::all);
  
}

Run the application and go to the url http://localhost:8080/shows/2/events. The following is output in the browser:

data:{"id":"2","date":1520084205593}

data:{"id":"2","date":1520084206712}

data:{"id":"2","date":1520084207741}

As you will notice, every second a new event is added.

Test the show events

Up till now, we tested manually. In the next unit test, we will show how we can test the show events url. We will make use of a WebTestClient for this. In a first test, we will invoke the url, check the content type and verify the response status.

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class MySpringWebfluxCrudPlanetApplicationTests {

  @Autowired
  private WebTestClient webTestClient;

  @Test
  public void contextLoads() {
  }

  @Test
  public void testShowEvents() {
    webTestClient
      .get().uri("/shows/2/events")
      .accept(MediaType.TEXT_EVENT_STREAM)
      .exchange()
      .expectStatus().isOk();
  }
}

Running this test results in the following exception:

2018-03-03 14:51:47.468 ERROR 7508 --- [ctor-http-nio-1] r.ipc.netty.channel.ChannelOperations : [HttpClient] Error processing connection. Requesting close the channel

java.io.IOException: Connection closed prematurely

The stream continues sending data, resulting in this exception. The stream is exactly doing what we want, but this causes a problem for our test.

A solution is to catch the streaming response into a FluxExchangeResult. Next, we retrieve the body into a Flux.  Finally, we use a StepVerifier in order to consume the Flux. We check whether 10 items are received and cancel the consumption of the Flux.

@Test
public void testShowEvents() {
  FluxExchangeResult<ShowEvent> result = webTestClient
    .get().uri("/shows/2/events")
    .accept(MediaType.TEXT_EVENT_STREAM)
    .exchange()
    .expectStatus().isOk()
    .returnResult(ShowEvent.class);

  Flux<ShowEvent> eventFlux = result.getResponseBody();

  StepVerifier.create(eventFlux)
    .expectNextCount(10)
    .thenCancel()
    .verify();

}

Unfortunately, again an exception is thrown:

java.lang.AssertionError: expectation "expectNextCount(10)" failed (expected: count = 10; actual: counted = 0; signal: onError(org.springframework.core.codec.CodecException: Type definition error: [simple type, class com.mydeveloperplanet.myspringwebfluxcrudplanet.domain.ShowEvent]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `com.mydeveloperplanet.myspringwebfluxcrudplanet.domain.ShowEvent` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: UNKNOWN; line: -1, column: -1]))

The problem is that the ShowEvent domain object cannot be deserialized because we don’t have a default constructor. Solution is to add the default constructor to the ShowEvent domain object:

public ShowEvent() {
  super();
}

Running the test results in a successful test result 🙂

Summary

In this post we explored some of the capabilities of Spring WebFlux in order to build a basic CRUD application without an underlying database. At the end, we showed how a stream of Flux can be created and how it can be tested. In part 2 we will explore how we can add a database to this application.