The last step in implementing user-to-user messages is to apply a filter to OutboundChatService. Since we coded up UserParsingHandshakeHandler, we have to adjust the service to handle this:
@Service @EnableBinding(ChatServiceStreams.class) public class OutboundChatService extends UserParsingHandshakeHandler { ... }
For starters, we need to change this class to extend UserParsingHandshakeHandler instead of WebSocketHandler.
There's no need to alter the constructor call where our FluxSink is configured. However, the handler itself must be adjusted as follows:
@Override protected Mono<Void> handleInternal(WebSocketSession session) { return session .send(this.flux .filter(s -> validate(s, getUser(session.getId()))) ...