Let's see the following service class that is responsible for writing to Kafka in our application:
@Servicepublic class NotificationService { private final NotificationStreams notificationStreams; public NotificationService(NotificationStreams notificationStreams) { super(); this.notificationStreams = notificationStreams; } public void sendNotification(final Notification notification) { MessageChannel messageChannel = notificationStreams.notifyTo(); messageChannel.send(MessageBuilder.withPayload(notification) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build()); }}
In the preceding service class, the sentNotification() method uses an injected NotificationStreams object to send ...