The final topology

We now have all the components necessary to build our log analysis topology as follows:

public class LogAnalysisTopology { public static StormTopology buildTopology() { TridentTopology topology = new TridentTopology(); StaticHosts kafkaHosts = KafkaConfig.StaticHosts.fromHostString(Arrays.asList(new String[] { "localhost" }), 1); TridentKafkaConfig spoutConf = new TridentKafkaConfig(kafkaHosts, "log-analysis"); spoutConf.scheme = new StringScheme(); spoutConf.forceStartOffsetTime(-1); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); Stream spoutStream = topology.newStream("kafka-stream", spout); Fields jsonFields = new Fields("level", "timestamp", "message", "logger"); Stream parsedStream = spoutStream.each(new ...

Get Storm Blueprints: Patterns for Distributed Real-time Computation now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.