Kafka Stream API Json Parse


Kafka Stream API Json Parse

Hello, in this article, I will talk about how to process data incoming to Kafka queue with Kafka stream api.

We can send data from various sources to the Kafka queue,The data waiting in the queue can be in formats such as json, avro, etc. Or only a single string or integer values ​​can come.

If the incoming data contains one piece of String or Integer values, it is easier for us to process the data.

I will run this example on my Windows machine using Java. However, after preparing the code and outputting it as jar, you can do it by editing the necessary ports on the hortonworks sandbox.

To do the example, you must have Apache Kafka installed on your Windows (or Linux) machine.

I use the Intellij IDE as a code development environment.
Let’s create a new maven project.

After the project is created, a screen like the one below will meet us.

Let’s go to the maven repositories for our project and add the necessary dependencies.

Copy required space and paste it in pom.xml file

After adding Dependencies, refresh the pom.xml file

Now create a new Java class. Here we will do our work for Producer.

After creating the Main function, the first thing we need to do is to define Properties to communicate with Kafka.

Properties properties = new Properties();


properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); // strongest producing guarantee
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1");

properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // ensure we don't push duplicates

Let’s write a small function to generate a random data, the type of this function should be ProducerRecord<>

public static ProducerRecord<String, String> newRandomTransaction(String name) {

    ObjectNode transaction = JsonNodeFactory.instance.objectNode();


    transaction.put("name", "Jack");
    transaction.put("amount", 100);
    return new ProducerRecord<>("test2", name, transaction.toString());
}

The data we want to send will be in JSON format, so we must create a JsonNodeFactory object.

Then we put what we want to send into it with the put () method.

Now all we have to do is create an object from the function we have created and give it to Producer.

Producer<String, String> producer = new KafkaProducer<>(properties);


producer.send(newRandomTransaction("Denis"));
producer.close();

Very good, now a JSON with {“name”: “Jack”, “amount”: 100} will go to Kafka Queue

Let’s read the data written to the Queue as a stream and move on to the processing step. Create a new class

Let’s define the properties required to read from the Kafka Queue.

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

Since the incoming data will be in JSON format, we will need Serializer and Deserializer to parse it.

final Serializer<JsonNode> jsonNodeSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonNodeDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonNodeSerde = Serdes.serdeFrom(jsonNodeSerializer,jsonNodeDeserializer);
StreamsBuilder builder = new StreamsBuilder();
KStream<String,JsonNode > textLines = builder.stream("test2", Consumed.with(Serdes.String(),jsonNodeSerde));

Going to read “test2” topic. You can organize this place according to your own topic.

After processing the data read in this section, it determines the JSON format of the new output.

In this example, I group the people in the incoming data and sum up their account balances.

ObjectNode initialbalance = JsonNodeFactory.instance.objectNode();
initialbalance.put("count",0);
initialbalance.put("balance",0);

I am aggregating each unique person’s own balance with their previous balance.

private static JsonNode newBalance(JsonNode transaction, JsonNode balance){
    ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
    newBalance.put("count",balance.get("count").asInt()+1);
    newBalance.put("balance",balance.get("balance").asInt()+transaction.get("amount").asInt());

    return newBalance;

}

We come to the most important part of the code piece.

  • Data in the incoming JSON format is first subjected to a groupby function.
  • Next, the JSON object we created above is pointed out for its new output.
  • Then, the data coming with the JSON data is sent as a parameter to the function we have just created and the sumed balance is returned.
  • As a result of these operations, the result is written to a new topic with toStream.
  • Finally, the stream is started with KafkaStream.

KTable<String, JsonNode> bankBanalnce = textLines
        .groupByKey(Serialized.with(Serdes.String(), jsonNodeSerde))
        .aggregate(
                () -> initialbalance,
                (key,transaction,balance) -> newBalance(transaction, balance),

                Materialized.<String, JsonNode, KeyValueStore<Bytes, byte[]>>as("bank-balance-agg")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(jsonNodeSerde)
        );
bankBanalnce.toStream().to("out",Produced.with(Serdes.String(),jsonNodeSerde));




KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.cleanUp();
streams.start();

The full version of the Producer Class,

import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;

public class ProducerClass{

    public static void main(String[] args) {
        Properties properties = new Properties();

        // kafka bootstrap server
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // producer acks
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); // strongest producing guarantee
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1");
        // leverage idempotent producer from Kafka 0.11 !
        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // ensure we don't push duplicates

        Producer<String, String> producer = new KafkaProducer<>(properties);


        producer.send(newRandomTransaction("Jack"));
        producer.close();
    }
    public static ProducerRecord<String, String> newRandomTransaction(String name) {

        ObjectNode transaction = JsonNodeFactory.instance.objectNode();


        transaction.put("name", "Jack");
        transaction.put("amount", 100);
        return new ProducerRecord<>("test5", name, transaction.toString());
    }
}

The full version of Stream Class,

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Properties;


public class KafkaStream {

    public static void main(final String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test2");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,StreamsConfig.EXACTLY_ONCE);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");


        final Serializer<JsonNode> jsonNodeSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonNodeDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonNodeSerde = Serdes.serdeFrom(jsonNodeSerializer,jsonNodeDeserializer);


        StreamsBuilder builder = new StreamsBuilder();
        KStream<String,JsonNode > textLines = builder.stream("test5", Consumed.with(Serdes.String(),jsonNodeSerde));

        ObjectNode initialbalance = JsonNodeFactory.instance.objectNode();
        initialbalance.put("count",0);
        initialbalance.put("balance",0);


        KTable<String, JsonNode> bankBanalnce = textLines
                .groupByKey(Serialized.with(Serdes.String(), jsonNodeSerde))
                .aggregate(
                        () -> initialbalance,
                        (key,transaction,balance) -> newBalance(transaction, balance),

                        Materialized.<String, JsonNode, KeyValueStore<Bytes, byte[]>>as("bank-balance-agg")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(jsonNodeSerde)
                );
        bankBanalnce.toStream().to("out4",Produced.with(Serdes.String(),jsonNodeSerde));

        bankBanalnce.toStream().print(Printed.toSysOut());


        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.cleanUp();
        streams.start();
    }
    private static JsonNode newBalance(JsonNode transaction, JsonNode balance){
        ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
        newBalance.put("count",balance.get("count").asInt()+1);
        newBalance.put("balance",balance.get("balance").asInt()+transaction.get("amount").asInt());

        return newBalance;

    }

}

Let’s test it now

1-) First run the kafka consumer from cmd.

kafka-console-consumer.bat — bootstrap-server localhost:9092 — topic out2 — from-beginning — formatter kafka.tools.DefaultMessageFormatter — property print.key=true — property print.value=true — property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer — property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

2-) Run the stream class, then run the Producer class 3 times and watch the consumer.

Everything seems successful.

I hope it was useful.

Thank you.

About Deniz Parlak

Hi, i’m Security Data Scientist & Data Engineer at My Security Analytics. I have experienced Advance Python, Machine Learning and Big Data tools. Also i worked Oracle Database Administration, Migration and upgrade projects. For your questions [email protected]

Leave a Reply

Your email address will not be published. Required fields are marked *