Apache kafka is one of the most popular and scalable message broker.
Apache kafka terminology
- Producer : An application that sends messsage to kafka.
- Message : Small to medium sized piece of data. Often human readable formats like json and xml are used.
- Consumer : An application that receives data from kafka.
- In rabbit mq messages are pushed to consumer by broker. Pushing has issues related to backpressure . If you keep pushing a slow consumer can get overwhelmed. Kafka has pull/polling mechanism where consumer will pull data . How ever it is not dumb polling which wastes cpu. Long polling is used. Kafka consumer can ask for messages. If no messages are there kafka server will not respond immediately .Hence cpu cycles will not be burnt and this also resolves backpressure issues. A consumer connects to a partition in a broker, reads the messages in the order in which they were written.
- Broker : A kafka server in the kafka cluster is called the broker. Kafka acts broker/agent between consumer and producer.
- Kafka cluster: A group of computers/vm sharing workload for a common purpose. In context of kafka it simly a group of machines/vms , each executing one instance of kafka broker.
- Topic : A topic is a unique name given to kafka stream.Topics are the central concept in Kafka that decouples producers and consumers. A consumer pulls messages off of a Kafka topic while producers push messages into a Kafka topic. A topic can have many producers and many consumers. In Kafka while creating a topic, we need to provide number of partitions and replication factor.
- Partitions: Kafka is a distributed message broker. It can partition a topic. Kafka distributes the partitions of a particular topic across multiple brokers hence ensuring that large topics which cannot be handled by single machine can be handled. Hence for topics where load is high partitioning is important .Partitioning servers as load balancing for downstream consumers.Note that a partition can be replicated for high data availaibility. Typically a message can contain partion key (eg consumer id/deviceid) . All records with same key will arrive at same partition.Specifying a partition key enables keeping related events together in the same partition and in the exact order in which they were sent. Note that incorrect partion key can lead to a situation that too many messages being routed to one partition. Partion key should ensure that messages are evenly distributed to partitions.If partion key is not specified kafka will use a round-robin technique to route messages to partition.
- Offset: Offset is sequence number of a message in a partition. These numbers are immutable. The first message gets offset zero, the next arriving messsage will get message one and so on. There is no global offset across partitions which means that if you have a topic with multiple partitions, messages on different partitions might have the same offset. Message ordering is only guranteed at a partion level. Messages across a topic are not guaranteed to be ordered. Offsets are particularly useful for consumers when reading records from a partition. With default configuration consumer store offset in kafka.However it is possible to store offset outside kafka especially for creating idempotent consumer.
- Consumer group: A group of consumers acting like a single logical unit to consume a topic. Each partion in the topic is assigned to one of the consumers in the consumer group. If there are multiple partitions and only one consumer in the consumer group then each partition will go to the same consumer .Note that each partion has to be consumed by one and only one consumer in a consumer group.Hence the number of consumers cannot be more than the numer of partitions.If you have N + 1 consumers for a topic which has N partitions, then only N consumers will be assigned a partition, and the remaining consumer will be idle(it can be used for hot failover) .This has to be true as all messages of a single logical entity (eg ecommerce order) must go to the same consumer otherwise messages could get processed in the wrong order. Note that one consumer can be consuming multiple partions. consumer groups are very useful in microservices where one service may have many instances. Partioning and consumer group in a nutshell is a tool for scalablity. Note that consumers can join and exit the group which can trigger rebalancing which is managed by the group co-ordinator (one of the kafka brokers). During rebalancing typically none of the consumers are not allowed to read any message. In a nutshell consumer group helps in
- parallel processing of topic,
- automatically manages partition assignment
- does rebalancing based on entry / exit of consumer.
note that it is also possible to have custom partition to consumer assignment logic.
- zookeeper : kafka uses zookeeper as zookeeper provides co-ordination services for a distributed system. since kafka is a distributed system hence zookeper is required.
- bootstrap-server : A kafka cluster can have thousands of brokers . We do not need to specify all thousands of kafka brokers in the configuration of clients. Instead what we can do is, take two to three brokers and consider them as bootstrap servers where a client initially connects. These brokers will then point to to a good kafka broker.So bootstrap.servers is a configuration we place within clients, which is a comma-separated list of host and port pairs that are the addresses of the Kafka brokers in a "bootstrap" Kafka cluster that a Kafka client connects to initially to bootstrap itself. In a nutshell bootstrap.servers provides the initial hosts that act as the starting point for a Kafka client to discover the full set of alive servers in the cluster.It should be noted that Clients (producers or consumers) make use of all servers irrespective of which servers are specified in bootstrap.servers for bootstrapping.
Producer Example
Properties props = new Properties();
//list of brokers used to connect to kafka cluster
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
//for kafka messages are just array of bytes , hence we need serializer to convert string to array of bytes
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//instantiate kafka producer object
Producer<Integer, String> producer = new KafkaProducer<>(props);
for (int i = 1; i <= 10; i++) {
//create a record to be sent
ProducerRecord record = new ProducerRecord<String,String>("exampleTopicName" , i+ "exampleKey" , i+ "exampleMsgValue"
//add the record to the output buffer and return right away
producer.send(record);
}
producer.close();
The above code shows how to send a string message to kafka. Kafka messages often are key value pairs although key is not mandatory. The most detailed constructor creating a producer record is ProducerRecord(String topic,Integer partition,Long timestamp,K key, V value). The key parameters are
- Message key - It is used to determine the partion number and is optional.It is used by default kafka partitioner .Kafka hashes the key for getting the partion number .If not specified the messages will be evenly distributed across partitions by the default kafka partitioner. Note that if the partion is explicitly specified then default kafka partitioner will not be used.
- Time stamp is the message timestamp. It is optional. If not specified kafka will assign message timestamp when it receives the message.
- Partion - This is optional.
There are 3 approaches to send a message to kafka.
- Fire and forget : This should be used when loosing a small percetage of messages is ok. (Eg counting likes on a page).Note that send() method is asynchronous. Calling the send method adds the record to the output buffer and return right away. The buffer is used to batch records for efficient IO and compression. Also the Kafka Producer configures acks to control record durability. The following ack configurations options are there
- None: The producer considers the records successfully delivered once it sends the records to the broker. The producer will not retry in this approach. This will give highest possible throughput.
- one: The producer waits for the lead broker to acknowledge that it has written the record to its log. However the the record may not have been received by replicas.
- all: The producer waits for an acknowledgment from the lead broker and from the following brokers that they have successfully written the record to their logs. This is the slowest yet the most reliable configuration.
- Synchronous send : producer sends a message and waits till it gets a response. In case of success RecordMetaData object is received and in event of failure we get an exception. This approach should be used when messages are super critical. How this approach will slow down the producer.
// producer.send(record); //this is is fire and forget RecordMetaData metaData = producer.send(record).get();//send function will block
the send method returns a java future and we call get method on the future object. The get method will block till we get success or expcetion is thrown.
- Asynchronous send : In this approach call back function can be provided in the send method to receive acknowledgement. Hence send does not wait for success of failure. The callback function will be invoked with RecordMetaData and exception object.If the exception is not null then we know that send operation has failed.
//ProducerCallBack implements the Callback interface //onCompletion function will be called RecordMetaData metaData = producer.send(record, new ProducerCallBack()); //callback class class ProducerCallBack implements Callback{ public void onCompletion (RecordMetaData metaData, Exception ex){ if(ex != null){ //send has failed }else{ //send is successful } } }
This approach is faster than synchronous send and slower compared to fire and forget strategy. Also note that the producer can crash before callback is called hence this is not as bullet proof as synchronous send. The throughput can be as good as fire and forget , how ever it should be noted that there is a restriction on inflight messages which have not been acknowledged. This limit is controlled by configuration parameter called max.in.flight.requests.per.connection . Note that while increasing max.in.flight.requests.per.connection will increase the throughput but the ordering of messages may not be preserved as if one message is rejected and needs a retry but in the mid-time a second message has been dispatched and written to the topic the first message will be written in the topic after the second one. To avoid this scenario you can only have one message in flight. ie max.in.flight.requests.per.connection should be set to 1 to preserve ordering of messages. You can set this property to 5 if you only want exacty-once delivery (idempotent producer).
Consumer example
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("group.id", "groupName");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "SampleRecordDeserializer");
props.put("enable.auto.commit","false"); //do not commit automatically to kafka broker
//consumer instantiated
KafkaConsumer<String, SampleRecord> consumer = new KafkaConsumer<>(props);
//consumer specifies the topic(s) to which it is subscribing
consumer.subscribe(Arrays.asList("exampleTopic"));
while (true) {
//consumer pulls records from kafka
ConsumerRecords<String,SampleRecord> records = consumer.poll(Duration.ofMillis(10000));
// Iterate throughfetched records in the same thread
//synchnous commit, blocking call
consumer.commitSync();//synchronous commit ,
//async commit is the other possibility for faster processing.
//consumer.commitAsync();
}
Note that there is no need to create a group. All consumers who have same group.id property will be part of the same consumer group. group.id property is not mandatory. If you do not specify this property then u are specifying an independent consumer and you will read all of the data for the topic irrespective of number of partitions in the topic. In the microservice architecture the servicename will be a good group id. If multiple service instances are there then they will each become part of the same consumer group. If only one service instance is there then it will a consumer group with single node .Note that poll function takes timeout as an argument (how quickly you want the poll method to return with or without data. Long polling is very useful as we do not want consumer to poll continuously when data is not availaible. Poll function takes care of
- connecting to group co-ordinator
- joining the group
- receives partion assingment
- fetches your messsages
- sends heart beat (on every poll)
Offset Management in apache kafka.
Kafka maintains two types of offset.
- Current offset Current offset (position) is the offset from which next new record will be fetched (when it's available).In simple words offset is simple integer that is used by kafka to maintain the current position of the consumer so that consumer does not get the same record twice.
- Committed offset Committed offsets is the last committed offset for the given partition.In other words it is pointer to last record the consumer has successfully processed.Committing an offset for a partition is the action of saying that the offset has been processed so that Kafka cluster won't send the committed records for the same partition. Committed offset is important in case of a consumer restarts or rebalancing . In the event of rebalancing when a new consumer is assigned same partition it should know where to start ie what is allready processed.
The consumer stores offsets to Kafka automatically when enable.auto.commit is set to true . Kafka consumer will auto commit the offset of the last message received in response to its poll() call. Only offsets of records returned in previous poll calls are committed. ie when a poll() is executed offsets of records returned in previous poll() are committed .Since processing happens between poll calls, offsets of unprocessed records will never be committed. This guarantees at-least-once delivery semantics. Note that auto.commit.interval.ms determines the minimum delay between commits. We know that when a poll() is executed, offsets of records returned in previous poll() are committed but this will happen if time interval defined by auto.commit.interval.ms has elapsed. If during processing records returned from poll, if after processing some records, the consumer crashes, the records will be redelievered. If the consumer is an idempotent consumer then this is a good model.
Automatic offset committing can be disabled, where the application itself must take care of manually committing offsets of processed records. Manual commit can be done in 2 ways.
- Commit Sync - this is a bloking method.
- Commit Async - this will send the request and continue (but no retries as two asychronous commits can get into a race condition). No retries is a decent choice as even if one asynchnous commit fails for some recovarable reason, the next higher order commit will succeed.
Note that it is possible to commit a particular offset instead of committing latest offset. The synchnous and asynchnous commit only commit the latest offset given by last poll.
Idempotent consumer
The act of processing the the record and committing are not atomic . This can lead to same record being delivered twice to a consumer as the consumer could crash before committing the offset in which case the message will be delivered to the consumer again. To make the action of processing of record (with updates in db) and commtting atomic, we dont want kafka to store the offsets. it should be stored in the same database where consumer record is being stored and updated along with the consumer record as a part of single atomic transaction. Then the offset can be fetched from external database and consumer.seek(getOffsetFromdb()) can be used to set offset to correct position.
Handling rebalance by consumer
A commit will convert current offset (say 10) into a committed offset. The ability to commit a particular offset rather than latest offset is important . A rebalance can be triggered when a consumer in a consumer group dies or new consumer enters the consumer group. If a consumer does not poll for certain time, a rebalance can get triggered. In this case the consumer's partion will be assigned to another consumer(s) and the hence the consumer may want to commit offset before it looses ownership of partion and the new owner of partition would want to start from last committed offset. Kafka allows to specify a rebalance listner which has two methods
- onPartitionsRevoked() ,called just before partions are taken away from consumer , this is where offsets should be committed
- onPartitionsAssigned() , called right after rebalnacing is complete and before consumer starts consuming records.
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9093"); props.put("group.id", "groupName"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("enable.auto.commit","false"); //do not commit automatically to kafka broker //consumer instantiated KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //instantantiate rebalance listener RebalanceListner rebalanceListner = new RebalanceListener(consumer); //consumer specifies the topic(s) to which it is subscribing and specifies rebalance listner consumer.subscribe(Arrays.asList("exampleTopic",rebalanceListener)); while (true) { //consumer pulls records from kafka ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000)); // Iterate throughfetched records in the same thread for(ConsumerRecord<String,String> record: records){ //saving the records offset in a map rebalanceListner.addOffset(record.topic,record.partition(),record.offset()); } //synchnous commit, blocking call , once all messages processed and ready for next poll , commit offsets and reset the list of offsets //maintained by listener. consumer.commitSync(rebalanceListener.getCurrentOffsets());//synchronous commit , commits latest offset }
the listener maintains list of offsets that are processed and are ready to be committed and commits the offsets when the partitions are being removed . Here is the code for rebalance listener.
package org.apache.kafka.clients.consumer; public class RebalanceListener implements ConsumerRebalanceListener { private KafkaConsumer consumer = null; private Map<TopicPartition,OffsetAndMetadata> offsetsMap = new HashMap(); public RebalanceListner(KafkaConsumer consumer){ this.consumer = consumer; } public void addOffset(String topic, int partition, long offset){ offsetsMap.put(new TopicPartition(topic,partition), new OffsetAndMetadata(offset)); } public Map<TopicPartition,OffsetAndMetadata> getOffset(String topic, int partition, long offset){ return offsetsMap; } //This method will be called during a rebalance operation when the consumer has to give up some partitions. void onPartitionsRevoked(Collection partitions){ consumer.commitSync(currentOffsets); //commit particular offsets currentOffsets.clear();//particular offsets are committed, reset the list } //This method will be called after the partition re-assignment completes and before the //consumer starts fetching data, and only as the result of a Consumer#poll() call void onPartitionsAssigned(Collection partitions); }
Consumer threading architecture
- Single threaded consumer (One thread per consumer, across all partitions model) A single-threaded implementation is based on usually infinite poll loop. This infinite loop repeats two actions:
- Retrieving records using the poll() method
- Processing fetched records
-
while (true) { //consumer pulls records from kafka with timeout ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); // Handle fetched records in the same thread }
- It should be noted that the consumer object is not threadsafe.
- Because records are fetched and processed by the same thread, they are processed in the same order as they were written to the partition. This is referred to as processing order guarantees.
- As allready explained in kafka consumer pull messages. This is efficient and does not waste cpu becuase of long polling.
- The maximum delay allowed between poll method calls is defined by the max.poll.interval.ms config, which is five minutes by default. If a consumer fails to call the poll method within that interval, it is considered dead, and group rebalancing is triggered. This can happen often with the thread per consumer and default configuration for use cases where each record takes a long time to be processed.Hence setting max.poll.interval.ms to a higher value can be useful in one thread per consumer model.
- The consumers process can be horizontally scaled using kuberntes cluster but one thread per consumer is often not a good solution when topic partition is very large. Also More consumers = More tcp connections to kafka cluster.
- Multi threaded kafka consumer(multiple processing threads per partion) Processing a single message can take significant time depending upon the use case. On the other hand, messages may be produced at much higher speeds, and the single thread may be insufficent to handle the load. Note that when blocking calls are being made typically thread requirements increase. While we can start multiple consumers in a consumer group, but even there a single consumer needs to handle at least a single partition, which may be quite large. Hence consumers can poll for records in one thread and then use multiple processing threads to process them in parallel. The consumption and processing are decoupled. Note that if automatic offset commits (the default) is used then there can be side effects like
- Offset might be committed before a record is processed. Note that Kafka consumer will auto commit the offset of the last message received in response to its poll() call.in single threaded model the call to poll() will go after all records are processed.One way to solve this problem can be wait for all threads to finish before next poll call. This has downside that processing will become as slow as the slowest call. Manual offset committing can be used (committing of particular offsets as used in rebalance listener). Read more here https://www.programmerall.com/article/69681502835/
- Message processing order can’t be guaranteed since messages from the same partition could be processed in parallel.
- In a one thread per consumer model maintaining processing order guarantees per partition and at-least-once delivery semantics is easier.
- In some apps processing order and order of events sent down stream may not be relevant.
- In some apps the order of events sent down stream must be maintained yet the processing order does not matter.
- If the processing order matters the applcation logic will become complicated. (eg an order cancellation event reaches before order creation event). This can happen in synchronous rest calls also)
- Multi threaded kafka consumer(single thread per partition) multiple consumers can be part of same process thanks to multi threading. This is more memory efficient compared to one thread per consumer model where each consumer is in a seprate process. This ensures that message processing order is preserved as records from a partition are handled by same thread. The records are grouped by partition. This will not scale well especially when the application is io intensive. One solution is number topics partitions can be increased.
Fault tolerance in kafka
A system should ideally continue to function properly in the event of failure of some of its components.It is a key property of distributed systems. kafka can create copies of partitions and store them different machines for fault tolerance. The replication factor determines the total number of copies. replication factor can be defined at the topic level which applies to all partitions within the topic. For every partition one broker is elected as a leader .The leader takes care of all producer and consumer interactions. The leaders receives the message from producer and sends back acknowledgements. The consumer also connects to leader . If replication factor is set to 4 , 3 more brokers will set as followers . These followers copy data from leader .
Pitfalls of asynchronous interaction using message broker
- Message ordering typically for handling load one logical service will have several (typically stateless) instances. As the the buffered messsages increase the number of service instances should increase to handle the increased load. The issue is that these instances of the same service which are processing messages on the same messaging channels become competing consumers. But this creates the issue of processing the messages in the correct order. for instance a request to create an order and reqeust to cancel the same order could route to different instances of the same service. depending on load , gc and various other factors the two instances may be handling requests at varying speed and the order cancellation could get processed before order creation. SHARDED /PARTIONED CHANNELS provided by kafka can ensure that each event of the same entity route to the same shard.Lets see how SHARDING OF CHANNELS works in kafka.
- As allready discussed , kafka has the concept of consumer group. A consumer group is considered as a single logical consumer.multiple instances of a service can be within the same consumer group. The consumer group will represent the service. A message will be delievered to only one of the instances within the consumer group. Message broker will assign partions the message channel into shards and assigns one shard to each instance within the message group.
- The sender specifies a shard key in the message header. the shard key will determine the shard (and hence the service instance) to which the message will go. so for example entity id can be shard key. This will ensure that all events of the same entity go to same shard. Note that even though all the messages are going to the same instance of the service, if the service is multithreaded, in order processing is still not guranteed (although the probablity is reduced) .For instance a order cancellation event may start processing before order creation event . The application logic has to handle such out of order events for an entity.Different approaches are possible . 1>The order of emitted events downstream is maintained but processing can be any order. Also the order of processing and order of events sent down stream both may not be relevant.ie as soon as processing of message is done it could be sent down stream regardless of the receiving order.Note that this issue can come even when IPC is through synchronous request reponse model. Here is one pattern which can be used for maitaining order when messages are coming from input channel/queue and need to be processed concurrently yet output queue must have the same order as input queue. check the answer by anthony williams. https://stackoverflow.com/questions/3227042/maintaining-order-in-a-multi-threaded-pipeline.
- Idempotency - most message brokers gurantee atleast one delivery. a service can crash after receiving the message, processing it including updating the db but before acknowledging the message. the message broker can deliver the message again. Idempotent consumer can handle duplicate messages. https://clarifyforme.com/posts/5658330156498944/Idempotent-consumer.
- Preserving order when redelivering message
- if the message was processed by idempotent receiver and only the ack failed then when the message is redelivered the order preservation does not matter how ever if there is an error while processing order , if the service wants to retry the message it has to decide whether it wants presevation of order. read this. https://stackoverflow.com/questions/60318890/redelivery-from-queue-is-unordered-in-activemq-artemis-2-11-0 and https://dzone.com/articles/processing-messages-of-users-in-order-with-complet
Test yourself
Q)Is Consumer Offset managed at Consumer group level or at the individual consumer inside that consumer group?
Offsets are tracked at ConsumerGroup level.One offset (basically a shared int/long value) will be shared/updated by all the consumers in a consumer group. The offsets of the consumers groups are stored in an internal Kafka topic called __consumer_offsets. In this topic you basically have a key/value pair, where your key is basically the concatenation of
- ConsumerGroup
- Topic
- Partition within Topic
and your value is the offset
Q)Partions help in scaling the number of consumers as number of consumers cannot be greater than number of partitions. How to dynamically increase partitions in topic so that we can increase the consumers?
Q)How to dynamically scale consumers in response to increased load on the system.
In a nutshell Kafka consumer group lag is a key performance indicator of any Kafka-based event-driven system.Consumer lag is simply the delta between the consumer's last committed offset and the producer's end offset in the log. In other words, the consumer lag measures the delay between producing and consuming messages in any producer-consumer system. autoscaling should be based on consumer lag.
Read more here https://www.baeldung.com/java-kafka-consumer-lag
https://medium.com/@ranrubin/horizontal-pod-autoscaling-hpa-triggered-by-kafka-event-f30fe99f3948