Reactive WebSocket (with Spring Security, Spring-Cloud-Streams, Spring-Cloud-Functions)

Kapil Raina
12 min readFeb 8, 2022

WebSocket, by now is a well-known protocol and an architecture pattern. This article brings together ramblings I had in finding spring reactive support for WebSocket. The web support for sockets in Spring with STOMP and SocksJS is very convenient and enables many socket messaging patterns (like Broadcast & point-to-point messaging). Lot has been written about the Spring Web support for sockets (example). But IMO, the WebFlux support for WebSocket is not as clean and does not provide convenient abstractions like web had. So implementing a complex enough socket app may prove to be a little more involved.

Before moving on, it is relevant to mention that RSocket is a more interesting protocol that is catching up and provides byte level, TCP transported messaging and theoretically provided much of WebSocket patterns).

So, the goals of what this article tries to demonstrate :

  1. Use Spring WebSecurity for WebSocket Connections and use as much inbuilt support as needed.

2. Provide simple publishing and subscription options and progressively use the same channels to route socket traffic outside the app to a middleware (RabbitMQ & Kafka in this case) without impacting the WebSocket session itself. This is to demonstrate that clean design for developers to move between simple to complex option (across environments for example)

3. Be reactive all the way (no blocking IO, so generous use of Publishers and Subscribers).

4. Create a separation between HTTP Session and WebSocket session by maintaining separate session repos.

Code -> https://github.com/kapilraina/spring-reactive-websocket-chat

To get started, clone, build and run with a “sink” profile

java -Dspring.profiles.active=wsserver,sink -jar target/springreactivewebsocketchat-0.0.1-SNAPSHOT.jar

Docker image with these two profiles pre-configured is also available for quick run : docker pull kapilshekharraina/rwebsockchat:1.0

http://localhost:8080/ (anyusername / password=password)

Briefly then, the WebSocket support in Webflux is essentially via implementation of WebSocketHandler Interface. The framework assigns a new handler instance for each socket session. WebSocket sessions works with two publishers(Flux) bound to it, one as a unbounded stream of incoming messages and the other as an unbounded stream of outgoing messages to the socket client (thus full duplex). Since WebSocket connection is initiated by issuing an HTTP request initially, the Spring Web security works well with sockets in the sense that the authentication and authorization via filters works in socket apps as well. In the sample app, the a form based security is used which delegates the native login and logout forms. And a few synthetic users are created for the app.

Generic Design

Ordinarily the services can be made stateless with the use of token based security, but WebSocket connections, by definitions are stateful. Once established, the channel is open for the life of the session. The app maintains a repository of WebSocket sessions which is in addition to any HTTP Session repository that security layer might have.

Reactive Socket App

The sample app to demonstrate sockets is .. hold your breath … a chat app!! But this could be anything, it is just that demonstrating full duplex reactive streams based concepts is particularly obvious in a chat app with N — N producers and consumers. The chat messages are published to all consumers with a valid chat session ( and not just an HTTP session). The app demonstrates few socket message handlers and options that can be used to create incoming and outgoing flux for the chat session. These options are detailed in sections below, but first, lets see the security

A. Security

Most of security concepts in webflux are same as in the web world of spring. The difference though for a form based security is in the UserDetailService, which in webflux world is bean of type ReactiveUserDetailsService that returns a Mono<UserDetails> object for the auth challeged user or a UsernameNotFoundException if the user is invalid. So authenticating a user with DB authentication can be completely reactive.

public class CustomUserDetailService implements ReactiveUserDetailsService

The chat repository is a simple ConcurrentHashMap, but can be any user database. ChatUserRepository provides APIs to maintains Chat WebSocket sessions and is accessed in various security filters.

Security Config (Class : SocksWebSecurityConfig)

Security Config

With this config, user is challenged with Spring provided Login page and Spring provided logout during logout.

Spring Provided Pages.

Post Login, the user Principal is available as Authentication Object that is wired into controller methods or can be accessed from ReactiveSecurityContextHolder. Although there are some users pre-created, but to make it easy to test, any user id is accepted with the “password”.

B. App UI

Post successful login, the chat page is presented to the user (the only page in the app — chat.html). Login only establishes HTTP Session, the socket session needs to be explicitly initiated by connecting into chat service.

Landing Page

The connect button initiates a WebSocket based chat session. Disconnect action publishes a LEAVE message for all users and then closes the socket.

A user can connect and disconnect multiple times in the same valid HTTP Session.

C. Handlers

There are 4 handlers in the app, to demonstrate the increasingly level of complexity of handling and multiplexing the chat traffic across many socket sessions. Of course not all of them need to be used at once, but this is just me trying to show what options I was able to work with. The options are given below. As can be seen, the bean creation for the handlers and everything else in the app is based on the active spring profile to make the bean creation very specific to a usage.

@Bean
@Profile({"amqp","fuse","kafka"})
SimpleUrlHandlerMapping getDefwsh_8() {
return new SimpleUrlHandlerMapping(Map.of("/ws/chat", wshbean8Chat), 10);
}

@Bean
@Profile({"localpubsub"})
SimpleUrlHandlerMapping getDefwsh_7() {
return new SimpleUrlHandlerMapping(Map.of("/ws/chat", wshbean7Chat), 10);
}

@Bean
@Profile({"flowreg"})
SimpleUrlHandlerMapping getDefwsh_6() {
return new SimpleUrlHandlerMapping(Map.of("/ws/chat", wshbean6Chat), 10);
}

@Bean
@Profile({"sink"})
SimpleUrlHandlerMapping getDefwsh_5() {
return new SimpleUrlHandlerMapping(Map.of("/ws/chat", wshbean5Chat), 10);
}

C.1 Reactive Streams Sinks

profile : sink

Run-As : java -Dspring.profiles.active=wsserver,sink -jar target/springreactivewebsocketchat-0.0.1-SNAPSHOT.jar

Handler Bean : wshbean5Chat

Sinks

Sinks is a reactor construct that allows programmatic streaming from publisher to single or multiple consumers. This is a relatively simple option that of course will work on a single instance of the app, but is very convenient for establishing multiplexing across multiple WebSocket sessions. Sinks.Many spec is used by each WebSocket session to subscribe to the Sink and get its own Flux, which is used to construct the outbound Flux for the WebSocketSession.

The app creates a single instance of Sinks.Many.

@Bean
@Profile({"sink"})
Many<ChatMessage> chatMessageStream() {
return Sinks.many().multicast().<ChatMessage>onBackpressureBuffer();
}

The WebSocketHandler bean for this option uses the Sinks.Many to create respective Flux

@Bean(name = "wshbean5Chat")
@Profile({"sink"})
WebSocketHandler wshbean5Chat(
Many<ChatMessage> chatSessionStream,
ChatUtils utils,
ObjectMapper mapper,
ChatUserRepository repo
)
throws JsonProcessingException {

Mono<Void> inbound = session
.receive()
.map(wsm -> wsm.getPayloadAsText())
.....................
.doOnNext(chatSessionStream::tryEmitNext)
.then();

Flux<WebSocketMessage> sessionOutboundFlux = chatSessionStream.asFlux()
.map(
................
Mono<Void> outbound = session.send(sessionOutboundFlux);
return Mono.zip(inbound, outbound).then();
};
}

Sinks basically work with Publishers and Consumers in the same JVM, so although this handlers works well for development and testing on a single JVM, we need to enable design that abstracts the multiplexing or pub/sub from the socket handler and potentially outside the JVM. The next options build on a special Channel support for reactive apps in Spring known as FluxMessageChannel.

A note on FluxMessageChannel

Spring Integration has extended the support for reactive streams. The EIP concepts of Channel, Adapter, Binder, Binding etc. remains the same. Spring Integration reactive support means that Channel can be a Flux publisher or a Flux Consumer. This special channel is the FluxMessageChannel. FluxMessageChannel is a implements MessageChannel and Publisher<Message<?>> and has internal Flux which acts a sink for incoming messages streamed via the send() API. The internal Flux can be subscribed to by the subscribers as well. So FluxMessageChannel is a Publisher as well as a Subscriber and is the basis for implementation in the next options. The reason why a Channel concept is used for Sockets is the way the core domain is abstracted from the low level integration and streaming operations. Same channel can have any binder and adapter and can integrate with any middleware without impacting the core code. And even if there is no middleware, the FluxMessageChannel provides a convenient Flux that multiple socket sessions can publish and subscribe to. When middlewares are added, the socket session handling code does not change.

FluxMessageChannel

C.2 FluxMessageChannel — Manual IntegrationFlow Registration

profile: flowreg

Run-As: java -Dspring.profiles.active=wsserver,flowreg -jar target/springreactivewebsocketchat-0.0.1-SNAPSHOT.jar

Handler Bean: wshbean6Chat

In this flow, a FluxMessageChannel bean is created. This bean is used to create an IntegrationFlow programmatically and registering the flow in the IntegrationFlowContext. Although there is a declarative way to create IntegrationFlow, programmatic way enables on-the-fly message streaming cases like point-to-point messaging between users or creating new groups in the UI that restricts the way message are multiplex to only the users subscribed to that integration. User-To-User messaging is not shown in the app (left as an exercise may be).

FluxMessageChannel bean is created

@Bean
@Qualifier("pubfmc")
@Profile({"!sink"})
FluxMessageChannel pubfmc() {
FluxMessageChannel fmc = new FluxMessageChannel();
return fmc;
}

A WebSocket Handler for this uses the FluxMessageChannel to subscribe to the WebSocket session inbound Flux, Register the IntegrationFlow & pipe the Flux to the WebSocket session outbound Flux.

@Bean(name = "wshbean6Chat")
@Profile({"flowreg"})
WebSocketHandler wshbean6Chat(
ChatUtils utils,
@Qualifier("pubfmc") FluxMessageChannel pubfmcreg,
ObjectMapper mapper,
ChatUserRepository repo
)
throws JsonProcessingException {
return session -> {
Flux<Message<ChatMessage>> incomingFlux = utils.prepareIncomingFlux(session, mapper, repo);
IntegrationFlow standardIntegrationFlow = IntegrationFlows.from(((MessageChannel) pubfmcreg)).bridge().nullChannel();
if (integrationFlowContext.getRegistrationById("globalChatIntegration") == null) {
integrationFlowContext.registration(standardIntegrationFlow).register();
}
pubfmcreg.subscribeTo(incomingFlux);

Flux<WebSocketMessage> sessionOutboundFlux = utils.prepareOutBoundFlux(session, pubfmcreg, mapper);
Mono<Void> outbound = session.send(sessionOutboundFlux);
return outbound;
};
}

C.3 FluxMessageChannel — Local Unique PubSub

profile: localpubsub

Run-As: java -Dspring.profiles.active=wsserver,localpubsub -jar target/springreactivewebsocketchat-0.0.1-SNAPSHOT.jar

Handler Bean: wshbean7Chat

Like C.2 this option also used the Flux of FluxMessageChannel, but does not register any dynamic IntegrationFlow. The option restricts the use of FluxMessageChannel as pure reactive pub/sub Channel. Like Sinks.Many, this options works on publishers and subscribers in the same JVM

@Bean(name = "wshbean7Chat")
@Profile({"localpubsub"})
WebSocketHandler wshbean7Chat(
ChatUtils utils,
@Qualifier("pubfmc") FluxMessageChannel fmcpubsub,
ObjectMapper mapper,
ChatUserRepository repo
)
throws JsonProcessingException {
return session -> {

Flux<Message<ChatMessage>> incomingFlux = utils.prepareIncomingFlux(session, mapper, repo);
fmcpubsub.subscribeTo(incomingFlux);

Flux<WebSocketMessage> sessionOutboundFlux = utils.prepareOutBoundFlux(session, fmcpubsub, mapper);

Mono<Void> outbound = session.send(sessionOutboundFlux);
return outbound;
};
}

C.3 FluxMessageChannel — Separate Pub and Sub Channels, with IntegrationFlow

profile: fuse

Run-As: java -Dspring.profiles.active=wsserver,fuse -jar target/springreactivewebsocketchat-0.0.1-SNAPSHOT.jar

Handler Bean: wshbean8Chat

Fuse Pub Sub

To prepare for the real world cases where inbound and outbound channels are separate and decoupled, this option works with two FluxMessageChannel beans and creates an IntegrationFlow object that ‘fuses’ these two channels together. These channels can later be adapted to stream to a downstream messaging middleware with no impact to session handling code.

@Bean(name = "wshbean8Chat")
@Profile({"amqp", "fuse","kafka"})
WebSocketHandler wshbean8Chat(
ChatUtils utils,
@Qualifier("pubfmc") FluxMessageChannel fmcpub,
@Qualifier("subfmc") FluxMessageChannel fmcsub,
ObjectMapper mapper,
ChatUserRepository repo
) throws JsonProcessingException {
return session -> {
logger.info("[8] New Chat Session Initiated : " + session.getId());

Flux<Message<ChatMessage>> incomingFlux = utils.prepareIncomingFlux(session, mapper, repo);
fmcpub.subscribeTo(incomingFlux);

Flux<WebSocketMessage> sessionOutboundFlux = utils.prepareOutBoundFluxFromRaw(session, fmcsub, mapper);

Mono<Void> outbound = session.send(sessionOutboundFlux);
return outbound;
};
}
@Bean
@Profile("fuse")
IntegrationFlow fluxItegration(@Qualifier("pubfmc") FluxMessageChannel fmcin, @Qualifier("subfmc") FluxMessageChannel fmcout) {
return IntegrationFlows.from(((MessageChannel) fmcin))
.channel(fmcout).get();

}

C.4 FluxMessageChannel — Separate Pub and Sub Channels, with AMQP Middleware

profile: amqp

Run-As: java -Dspring.profiles.active=wsserver,amqp -jar -Dserver.port=xxxx target/springreactivewebsocketchat-0.0.1-SNAPSHOT.jar

Handler Bean: wshbean8Chat

AMQP

Same handler bean with a different profile triggers the binders for RabbitMQ, which is used to publish the socket messages to a RabbitMQ Exchange and read from the queue that is connected to the exchange. The Flux from two channels used to create the outbound and inbound messages.

Spring Cloud Stream provides the binder support for RabbitMQ and post 3.x provide the functional programming model to abstract the binding and binder configs and allows using the Java Functions to declare the core publishers and subscribers. Java Supplier and Consumer functions are used to define the bindings as configurations. When amqp profile is used, the application-amqp.yaml is read connections to RabbitMQ is initiated. The FluxMessageChannels are injected in the Supplier and Consumer Functions to pipe the inbound and outbound flux which is used the Spring Cloud Functions to publish and Subscribe the socket messages to RabbitMQ.

@Bean(name = "wshbean8Chat")
@Profile({"amqp", "fuse","kafka"})
WebSocketHandler wshbean8Chat(
ChatUtils utils,
@Qualifier("pubfmc") FluxMessageChannel fmcpub,
@Qualifier("subfmc") FluxMessageChannel fmcsub,
ObjectMapper mapper,
ChatUserRepository repo
) throws JsonProcessingException {
return session -> {
logger.info("[8] New Chat Session Initiated : " + session.getId());

Flux<Message<ChatMessage>> incomingFlux = utils.prepareIncomingFlux(session, mapper, repo);
fmcpub.subscribeTo(incomingFlux);

Flux<WebSocketMessage> sessionOutboundFlux = utils.prepareOutBoundFluxFromRaw(session, fmcsub, mapper);

Mono<Void> outbound = session.send(sessionOutboundFlux);
return outbound;
};
}
@Profile({"amqp","kafka"})
@Configuration
public class IntegrationStreamFunctions {
@Autowired
@Qualifier("pubfmc")
FluxMessageChannel pubfmc;

@Autowired
@Qualifier("subfmc")
FluxMessageChannel subfmc;

@Bean
public Supplier<Flux<Message<?>>> globalchatpubchannel() {
return () -> {
return Flux.from(pubfmc);
};
}

@Bean
public Consumer<Flux<Message<?>>> globalchatsubchannel() {
return cFlux -> {

cFlux.log().subscribe(m -> {
subfmc.send(m);
});
};
}
}

The Supplier and Consumer names are used to define the bindings via the application-amqp.yaml for the Cloud Functions and Cloud Stream to use.

spring:
cloud:
stream:
function:
definition: globalchatpubchannel;globalchatsubchannel
source: globalchatpubchannel;globalchatsubchannel
bindings:
globalchatpubchannel-out-0:
binder: rabbit-binder
destination: ex.globalchatexchange
content-type: application/json
group: q.globalchatqueue
globalchatsubchannel-in-0:
binder: rabbit-binder
destination: ex.globalchatexchange
content-type: application/json
group: q.globalchatqueue
rabbit:
bindings:
globalchatpubchannel-out-0:
producer:
exchangeType: topic
bindQueue: false
declareExchange: false
queueNameGroupOnly: true
globalchatsubchannel-in-0:
consumer:
bindQueue: false
exchangeType: topic
declareExchange: false
queueNameGroupOnly: true

binders:
rabbit-binder:
type: rabbit
environment:
spring:
rabbitmq:
host: rabbitmq-master
port: 5672
default-binder: rabbit

I had created the rabbit-mq cluster on a local Docker Host and used the rabbit-mq master DNS (updated via local host file). This makes it easy to port the app in a higher environment where the same host is mapped to a different cluster.

Run the app multiple times with a different port to test that consumers across different JVMs are able to connect the inbound and outbound channels to the RabbitMQ.

C.5 FluxMessageChannel — Separate Pub and Sub Channels, with Kafka Middleware

profile: kafka

Run-As: java -Dspring.profiles.active=wsserver,kafka -jar -Dserver.port=xxxx target/springreactivewebsocketchat-0.0.1-SNAPSHOT.jar

Handler Bean: wshbean8Chat

Kafka

Same handler bean with a different profile triggers the binders for Kafka, which is used to publish the socket messages to a Kafka topic. All the consumers poll the same topic and read the messages in the inbound channel. The Supplier and Consumer stream functions works exactly as for RabbitMQ before.

Spring Cloud Stream provides the binder support for Kafka and even better, Spring Cloud Function even abstract the binding and binder configs and allows using the Java Functions to declare the core handler part. The Java Supplier and Consumer functions are used to define the bindings as configurations. When kafka profile is used, the application-kafka.yaml is read connections to kafka broker is initiated. The FluxMessageChannels are injected in the Supplier and Consumer Functions to pipe the inbound and outbound flux which is used the Spring Cloud Functions to publish and Subscribe the socket messages to kafka topic.

@Bean(name = "wshbean8Chat")
@Profile({"amqp", "fuse","kafka"})
WebSocketHandler wshbean8Chat(
ChatUtils utils,
@Qualifier("pubfmc") FluxMessageChannel fmcpub,
@Qualifier("subfmc") FluxMessageChannel fmcsub,
ObjectMapper mapper,
ChatUserRepository repo
) throws JsonProcessingException {
return session -> {
logger.info("[8] New Chat Session Initiated : " + session.getId());
Flux<Message<ChatMessage>> incomingFlux = utils.prepareIncomingFlux(session, mapper, repo);
fmcpub.subscribeTo(incomingFlux);
Flux<WebSocketMessage> sessionOutboundFlux = utils.prepareOutBoundFluxFromRaw(session, fmcsub, mapper); Mono<Void> outbound = session.send(sessionOutboundFlux);
return outbound;
};
}
@Profile({"amqp","kafka"})
@Configuration
public class IntegrationStreamFunctions {
@Autowired
@Qualifier("pubfmc")
FluxMessageChannel pubfmc;
@Autowired
@Qualifier("subfmc")
FluxMessageChannel subfmc;
@Bean
public Supplier<Flux<Message<?>>> globalchatpubchannel() {
return () -> {
return Flux.from(pubfmc);
};
}
@Bean
public Consumer<Flux<Message<?>>> globalchatsubchannel() {
return cFlux -> {
cFlux.log().subscribe(m -> {
subfmc.send(m);
});
};
}
}

The Supplier and Consumer names are used to define the bindings via the application-kafka.yaml for the Cloud Functions and Cloud Stream to use.

spring:
cloud:
stream:
function:
definition: globalchatpubchannel;globalchatsubchannel
source: globalchatpubchannel;globalchatsubchannel
bindings:
globalchatpubchannel-out-0:
destination: to.globalchattopic
producer:
applicationId: globalchat-app-${server.port}
globalchatsubchannel-in-0:
destination: to.globalchattopic
consumer:
use-native-decoding: true
startOffset: latest
resetOffsets: true
enableDlq: true
kafka:
binder:
brokers: kafka1:9092
autoCreateTopics: true
autoAddPartitions: true
minPartitionCount: 1
streams:
binder:
configuration:
commit.interval.ms: 1000
startOffset: latest
resetOffsets: true
default-binder: kafka
kafka:
consumer:
auto-offset-reset: latest

I have created a simple broker locally using Confluent Kafka. kafka1 is a simple locally resolved DNS for the single broker (using host file).

Run the app multiple times with a different port to test that consumers across different JVMs are able to connect the inbound and outbound channels to the Kafka.

With a simple config change, the chat messages are streamed via a different messaging middleware.

In the end

Near Home!!!!

--

--

Kapil Raina

Tech Architect | Unpredictably Curious | Deep Breathing