Getting Started With RSocket Part 2

In this blog, you will continue where you left off after Part 1. You will explore the RSocket communication models Fire-and-Forget, Request-Stream and Channel. For all of these models, you will create the server, client and a unit test.

1. Introduction

In Part 1, you learnt the basics of the RSocket communication protocol. It is advised to read Part 1 first before continuing with Part 2. Remember that RSocket provides 4 communication models:

  • Request-Response (a stream of 1)
  • Fire-and-Forget (no response)
  • Request-Stream (a stream of many)
  • Channel (bi-directional streams)

You covered Request-Response in Part 1, the others will be covered in Part 2.

The source code being used in this post is of course available at GitHub.

2. Fire-and-Forget Model

The Fire-and-Forget model is quite similar to the Request-Response model. The only difference is that you do not expect a response to your request.

2.1 The Server Side

In the RSocketServerController you create a method fireAndForget. Because the request does not return anything, the return type of the method is void. Again, with the annotation @MessageMapping you define the name of the route. Just as with the Request-Response example, the server receives a Notification message. In order to see something happening when the message is received, you just log the Notification message.

@MessageMapping("my-fire-and-forget")
public void fireAndForget(Notification notification) {
    logger.info("Received notification: " + notification);
}

2.2 The Client Side

In the RSocketClientController you create a method fireAndForget. The implementation is identical to the Request-Response example except for the expected return type. Here you use retrieveMono(Void.class) instead of retrieveMono(Notification.class).

@GetMapping("/fire-and-forget")
public Mono<Void> fireAndForget() {
    Notification notification = new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model");
    logger.info("Send notification for my-fire-and-forget: " + notification);
    return rSocketRequester
            .route("my-fire-and-forget")
            .data(notification)
            .retrieveMono(Void.class);
}

Start both the server and the client and invoke the URL:

$ http://localhost:8080/fire-and-forget

As you can see, no response is returned. In the logging of client and server, you can verify the sending and receiving messages.

Client:

Send notification for my-fire-and-forget: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}

Server:

Received notification: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}

2.3 The Test Side

The test is again quite similar to the client code and the Request-Response example. In order to validate whether the Mono does not emit any data, it is sufficient to call verifyComplete. You do not need to call consumeWithNext. If the Mono does emit data, the test should fail. However, replacing the route my-fire-and-forget into my-request-response f.e., does not fail the test. It is unclear why this will not fail the test. If anyone has any suggestions or a solution, please add it into the comments of this blog.

@Test
void testFireAndForget() {
    // Send a fire-and-forget message
    Mono<Void> result = rSocketRequester
            .route("my-fire-and-forget")
            .data(new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model"))
            .retrieveMono(Void.class);

    // Assert that the result is a completed Mono.
    StepVerifier
            .create(result)
            .verifyComplete();
}

3. Request-Stream Model

With the Request-Stream model, you send a request to the server and you will receive a Stream of Notification messages.

3.1 The Server Side

In the RSocketServerController you create a method requestStream. This time, the server will return a Flux of Notification messages. Again, with the annotation @MessageMapping you define the name of the route. In this example, upon receipt of a Notification message, a Flux is returned which emits a new Notification every 3 seconds.

@MessageMapping("my-request-stream")
Flux<Notification> requestStream(Notification notification) {
    logger.info("Received notification for my-request-stream: " + notification);
    return Flux
            .interval(Duration.ofSeconds(3))
            .map(i -> new Notification(notification.getDestination(), notification.getSource(), "In response to: " + notification.getText()));
}

3.2 The Client Side

In the RSocketClientController you create a method requestStream. The implementation is identical to the Request-Response example except for the expected return type. Here you use retrieveFlux(Notification.class) instead of retrieveMono(Notification.class).

@GetMapping("/request-stream")
public ResponseEntity<Flux<Notification>> requestStream() {
    Notification notification = new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model");
    logger.info("Send notification for my-request-stream: " + notification);
    Flux<Notification> notificationFlux = rSocketRequester
            .route("my-request-stream")
            .data(notification)
            .retrieveFlux(Notification.class);
    return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(notificationFlux);
}

Start both the server and the client and invoke the URL:

$ curl http://localhost:8080/request-stream
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}

data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}

data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}

data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
...

As you can see, the server emits every 3 seconds a Notification. In the logging of client and server, you can verify the sending and receiving messages.

Client:

Send notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}

Server:

Received notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}

3.3 The Test Side

The test is again similar to the client code. During verification, you verify the first message received, then verify whether 5 messages are received and finally verify the last message.

@Test
void testRequestStream() {
    // Send a request message
    Flux<Notification> result = rSocketRequester
            .route("my-request-stream")
            .data(new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model"))
            .retrieveFlux(Notification.class);

    // Verify that the response messages contain the expected data
    StepVerifier
      .create(result)
      .consumeNextWith(notification -> {
         assertThat(notification.getSource()).isEqualTo(SERVER);
         assertThat(notification.getDestination())
                                             .isEqualTo(CLIENT);
         assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
      .expectNextCount(5)
      .consumeNextWith(notification -> {
         assertThat(notification.getSource()).isEqualTo(SERVER);
         assertThat(notification.getDestination())
                                             .isEqualTo(CLIENT);
         assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
      .thenCancel()
      .verify();
}

4. Channel Model

The Channel model is a bit more complicated than the other models you have seen so far. Here you will send a Flux and as a response a Flux will be returned. This provides you the ability to send messages back and forth like within a chat conversation for example.

4.1 The Server Side

In the RSocketServerController you create a method channel. You will upon receipt of a Notification increment a counter and every one second the result of the counter notificationCount will be sent to the client. In order to be able to follow what is happening, you add logging at receiving the Notification and when the result is returned.

@MessageMapping("my-channel")
public Flux<Long> channel(Flux<Notification> notifications) {
    final AtomicLong notificationCount = new AtomicLong(0);
    return notifications
        .doOnNext(notification -> {
            logger.info("Received notification for channel: " + notification);
            notificationCount.incrementAndGet();
         })
        .switchMap(notification -> 
Flux.interval(Duration.ofSeconds(1)).map(new Object() {
                private Function<Long, Long> numberOfMessages(AtomicLong notificationCount) {
                    long count = notificationCount.get();
                    logger.info("Return flux with count: " + count);
                    return i -> count;
                }
         }.numberOfMessages(notificationCount))).log();
}

4.2 The Client Side

In the RSocketClientController you create a method channel. You need to create a Flux. In order to accomplish this, you create 3 Mono Notification items, one with a delay of 0 seconds (notification0), one with a delay of 2 seconds (notification2) and one with a delay of 5 seconds (notification5). You create the Flux notifications with a combination of the Mono‘s you just created. Every time the Flux emits a Notification, you log this in order to be able to follow what is happening. Finally, you send the Flux to the RSocket channel and retrieve the response as a Flux of Long and return it to the caller.

@GetMapping("/channel")
public ResponseEntity<Flux<Long>> channel() {
    Mono<Notification> notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
    Mono<Notification> notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
    Mono<Notification> notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));
        
    Flux<Notification> notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2)
            .doOnNext(d -> logger.info("Send notification for my-channel"));

    Flux<Long> numberOfNotifications = this.rSocketRequester
            .route("my-channel")
            .data(notifications)
            .retrieveFlux(Long.class);

    return  ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(numberOfNotifications);
}

Start both the server and the client and invoke the URL:

$ curl http://localhost:8080/channel
data:1
data:1
data:1
data:1
data:3
data:3
data:4
data:4
data:5
data:5
data:6
data:6
...

The result is as expected. First the notification0 is sent, after 5 seconds (notification5) the following Notification is sent together with a notification0, 2 seconds later a notification2, 2 seconds later a new one, and finally 2 seconds later the last one. After the last result, the Flux will keep on transmitting a count of 6. In the logging of client and server, you can verify the sending and receiving messages. This time including the timestamps, the complete logging contains even more information which is left out for brevity purposes. You should take a look at it more closely when running the examples yourself. Important to notice are the onNext log statements which occur each second and correspond with the response the server is sending.

Client:

17:01:19.820 Send notification for my-channel
17:01:24.849 Send notification for my-channel
17:01:24.879 Send notification for my-channel
17:01:26.881 Send notification for my-channel
17:01:28.908 Send notification for my-channel
17:01:30.935 Send notification for my-channel

Server:

17:01:19.945 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:19.947 Return flux with count: 1
17:01:20.949 onNext(1)
17:01:21.947 onNext(1)
17:01:22.947 onNext(1)
17:01:23.947 onNext(1)
17:01:24.881 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.882 Return flux with count: 2
17:01:24.884 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.885 Return flux with count: 3
17:01:25.886 onNext(3)
17:01:26.886 onNext(3)
17:01:26.909 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:26.909 Return flux with count: 4
17:01:27.910 onNext(4)
17:01:28.910 onNext(4)
17:01:28.936 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:28.937 Return flux with count: 5
17:01:29.937 onNext(5)
17:01:30.937 onNext(5)
17:01:30.963 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:30.964 Return flux with count: 6
17:01:31.964 onNext(6)
17:01:32.964 onNext(6)
...

4.3 The Test Side

The test is similar as the client code. You send the messages to the channel and verify the resulting counts. The repeating elements in the test are left out for brevity, the complete test is available at GitHub.

@Test
void testChannel() {
    Mono<Notification> notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
    Mono<Notification> notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
    Mono<Notification> notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));

    Flux<Notification> notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2);
    // Send a request message
    Flux<Long> result = rSocketRequester
            .route("my-channel")
            .data(notifications)
            .retrieveFlux(Long.class);

    // Verify that the response messages contain the expected data
    StepVerifier
            .create(result)
            .consumeNextWith(count -> {
                assertThat(count).isEqualTo(1);
                })
...
            .consumeNextWith(count -> {
                assertThat(count).isEqualTo(6);
                })
            .thenCancel()
            .verify();
}

5. Conclusion

You have learnt how to create a server, client and a unit test for the RSocket communication models Fire-and-Forget, Request-Streams and Channel. By now, you will have the basic knowledge in order to do some exploring, experimentation yourself.

One thought on “Getting Started With RSocket Part 2”

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.