Apache kafka is one of the most popular and scalable message broker.

Apache kafka terminology

note that it is also possible to have custom partition to consumer assignment logic.

Producer Example

        Properties props = new Properties();
        //list of brokers used to connect to kafka cluster
        props.put("bootstrap.servers", "localhost:9092,localhost:9093");        
        //for kafka messages are just array of bytes , hence we need serializer to convert string to array of bytes
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        //instantiate kafka producer object
        Producer<Integer, String> producer = new KafkaProducer<>(props);
        for (int i = 1; i <= 10; i++) {
            //create a record to be sent
            ProducerRecord record = new ProducerRecord<String,String>("exampleTopicName" , i+ "exampleKey" , i+ "exampleMsgValue"
            //add the record to the output buffer and return right away
            producer.send(record);
        }
        producer.close();

The above code shows how to send a string message to kafka. Kafka messages often are key value pairs although key is not mandatory. The most detailed constructor creating a producer record is ProducerRecord(String topic,Integer partition,Long timestamp,K key, V value). The key parameters are

There are 3 approaches to send a message to kafka.

Consumer example

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", "groupName");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "SampleRecordDeserializer");
props.put("enable.auto.commit","false"); //do not commit automatically to kafka broker

//consumer instantiated
KafkaConsumer<String, SampleRecord> consumer = new KafkaConsumer<>(props);
//consumer specifies the topic(s) to which it is subscribing
consumer.subscribe(Arrays.asList("exampleTopic"));
while (true) {
        //consumer pulls records from kafka
         ConsumerRecords<String,SampleRecord> records = consumer.poll(Duration.ofMillis(10000));
        // Iterate throughfetched records in the same thread
        //synchnous commit, blocking call
         consumer.commitSync();//synchronous commit ,
        //async commit is the other possibility for faster processing.
        //consumer.commitAsync(); 
       
}

Note that there is no need to create a group. All consumers who have same group.id property will be part of the same consumer group. group.id property is not mandatory. If you do not specify this property then u are specifying an independent consumer and you will read all of the data for the topic irrespective of number of partitions in the topic. In the microservice architecture the servicename will be a good group id. If multiple service instances are there then they will each become part of the same consumer group. If only one service instance is there then it will a consumer group with single node .Note that poll function takes timeout as an argument (how quickly you want the poll method to return with or without data. Long polling is very useful as we do not want consumer to poll continuously when data is not availaible. Poll function takes care of

Offset Management in apache kafka.

Kafka maintains two types of offset.

The consumer stores offsets to Kafka automatically when enable.auto.commit is set to true . Kafka consumer will auto commit the offset of the last message received in response to its poll() call.  Only offsets of records returned in previous poll calls are committed. ie when a poll() is executed offsets of records returned in previous poll() are committed .Since processing happens between poll calls, offsets of unprocessed records will never be committed. This guarantees at-least-once delivery semantics. Note that auto.commit.interval.ms determines the minimum delay between commits. We know that when a poll() is executed, offsets of records returned in previous poll() are committed but this will happen if time interval defined by auto.commit.interval.ms has elapsed. If during processing records returned from poll, if after processing some records, the consumer crashes, the records will be redelievered. If the consumer is an idempotent consumer then this is a good model. 

 Automatic offset committing can be disabled, where the application itself must take care of manually committing offsets of processed records. Manual commit can be done in 2 ways.

Note that it is possible to commit a particular offset instead of committing latest offset. The synchnous and asynchnous commit only commit the latest offset given by last poll.

Idempotent consumer

The act of processing the the record and committing are not atomic . This can lead to same record being delivered twice to a consumer as the consumer could crash before committing the offset in which case the message will be delivered to the consumer again. To make the action of processing of record (with updates in db) and commtting atomic, we dont want kafka to store the offsets. it should be stored in the same database where consumer record is being stored and updated along with the consumer record as a part of single atomic transaction.  Then the offset can be fetched from external database and consumer.seek(getOffsetFromdb()) can be used to set offset to correct position. 

Handling rebalance by consumer

A commit will convert current offset (say 10) into a committed offset. The ability to commit a particular offset rather than latest offset is important . A rebalance can be triggered when a consumer in a consumer group dies or new consumer enters the consumer group. If a consumer does not poll for certain time, a rebalance can get triggered. In this case the consumer's partion will be assigned to another consumer(s) and the hence the consumer may want to commit offset before it looses ownership of partion and the new owner of partition would want to start from last committed offset. Kafka allows to specify a rebalance listner which has two methods

Consumer threading architecture

Fault tolerance in kafka

A system should ideally continue to function properly in the event of failure of some of its components.It is a  key property of distributed systems. kafka can create copies of partitions and store them different machines for fault tolerance. The replication factor determines the total number of copies. replication factor can be defined at the topic level  which applies to all partitions within the topic. For every partition one broker is elected as a leader .The leader takes care of all producer and consumer interactions. The leaders receives the message from producer and sends back acknowledgements. The consumer also connects to leader . If replication factor is set to 4 , 3 more brokers will set as followers . These followers copy data from leader . 

Pitfalls of asynchronous interaction using message broker

 

Test yourself

Q)Is Consumer Offset managed at Consumer group level or at the individual consumer inside that consumer group?

Offsets are tracked at ConsumerGroup level.One offset (basically a shared int/long value) will be shared/updated by all the consumers in a consumer group. The offsets of the consumers groups are stored in an internal Kafka topic called __consumer_offsets. In this topic you basically have a key/value pair, where your key is basically the concatenation of

and your value is the offset

Q)Partions help in scaling the number of consumers as number of consumers cannot be greater than number of partitions. How to dynamically increase partitions in topic so that we can increase the consumers?

Q)How to dynamically scale consumers in response to increased load on the system.

In a nutshell Kafka consumer group lag is a key performance indicator of any Kafka-based event-driven system.Consumer lag is simply the delta between the consumer's last committed offset and the producer's end offset in the log. In other words, the consumer lag measures the delay between producing and consuming messages in any producer-consumer system. autoscaling should be based on consumer lag.

Read more here https://www.baeldung.com/java-kafka-consumer-lag 

https://medium.com/@ranrubin/horizontal-pod-autoscaling-hpa-triggered-by-kafka-event-f30fe99f3948