tags : Distributed Systems, Infrastructure, Message Passing, Zookeeper

What

Gotchas

  • Kafka doesn’t scale well with partitions, throughput falls over quickly especially if using replication and acks.
  • If each user has a topic your partitions are unbounded. You have at least one partition per user.
    • Instead, I’d use a single notification topic, set a reasonable number of partitions on it and partition by user id.

Resources

Redpanda

See Ingestion Patterns

  • Redpanda achieves this decoupling(producer/consumer) by organizing events into topics.

Topics & Partitions

Kafka guarantees ordered message delivery ONLY within a single partition

Topics (unordered)

  • Topic = a named log of events
  • It’s unordered collection
  • Topics = logical grouping of events that are written to the same log.
    • Topic can have multiple producers and multiple consumers
  • By default, a topic has at least one partition.
  • Topic storage

    • Topic are stored in a tiered manner, local < object store etc.
      • consumer can use the same api to access both (immediate vs historical data etc.)
      • Historical: Consumers can read and reread events from any point within the maximum retention period.

Topic Partitions (ordered)

  • The idea of same log file is altered when we introduce partitioning. We do this so that we can scale well.
  • So with partitions, events are written to a topic-partition
  • It’s ordered collection
  • Sharding and distribution
    • Redpanda scales topics by sharding them into multiple partitions distributed across cluster nodes/broker
    • Enabling concurrent writing and reading.
  • Event routing and ordering
    • Producers send messages to brokers, and the broker route the data across partitions.
    • events sharing the same key always going to the same partition.
    • events are ordered at the partition level.
  • Consumption and key-less routing
    • Consumers read events from brokers from the topics they subscribe to. broker then managers the topic-partition part.
    • When reading, per topic-partition events are read in the order they were written.
    • If no key is specified, events are distributed among all topic-partitions in a round-robin manner.

Topic Event replication consensus (for fault tolerance)

  • For providing fault tolerance and safety, redpanda replicates the events to other nodes/brokers. For this, it uses Raft
  • Every topic-partition forms a Raft group
    • group: single leader + 0 or more followers(replication factor)

Broker

  • See Kafka broker
  • Multiple brokers help with
    • Fault tolerance: Data Replication can now happen via Raft
    • Concurrency: Because topics are partitioned, we can write in a sharded manner
  • It contains topic-partitions (log files) that store messages in an immutable sequence.
  • Brokers have tunable parameters

Consumer

Consumer Group

The idea of replication and scaling out etc come when we introduce partitions and introduce consumer group to manage things between topics, partitions and consumers. Since now that we’re dealing with replication etc, the concept of leader and Raft etc come out.

RelationMappingDescription/Notes
Partition <> topic-partition-Same thing.
Partition <> TopicN:1a topic is like a label/name for all a partitions in the topic. (A topic will atleast have one partition)
Partition <> ConsumerM:Nmultiple consumers can consume from same partition.
Partition <> Data Pane Raft Group1:1This is how event/message replication happens
Partition <> Control Pane Raft Group1:1Eg. Each partition
CG <> Control Pane Raft Group1:1CG is essentially the raft group
CG <> TopicN:1A consumer group belongs to ONLY one Topic, there can be multiple CG belonging to the same topic.
CG <> ConsumerM:Nmany consumers can be part of a consumer group
CG Coordinator <> BrokerN:1A broker/node can be handling data of multiple CG Coordinator, broker here is like a reverse proxy.
If it goes down, it can take down multiple CG coordinators, but because of HA, we’ll have another broker will everything replayed.
From the pov of the cg coordinator it is the broker (Buzz Lightyear Clones meme)
CG Coordinator <> CG1:1Each CG only ever has 1 leader elected via Raft, the leader is the CG Coordinator
Partion Leader <> CG Coordinator-Same thing
CG Coordinator <> Control Pane Raft Leader???The CG Coordinator has affinity to raft leader.

My doubts:

Others

  • Every consumer/client by default will read via the leader. This to ensure strong consistency
  • Reading from replica can be a usecase(and technically possible leading to Eventual Consistency) but currently that’s not the default practice and atm of this writing it’s not even supported.
  • Renpanda does: ACK=all.
    • i.e ACKs to all replicas. -> strong consistency, but eitherway we can’t read from replicas yet in redpanda
    • This replicate the consumer group state (i.e raft group state)
    • i.e the serialized consumer group state is replicated to all other brokers and “saved to disk”. (before the write goes back to the client)

Basics

  • Consumer Group is part of the kafka protocol: Kafka consumer group
  • Usecase: Parallelism, Coordination between consumers, Tracking progress of consumer state, Availability
  • Consumers needs to ingest from: (example scenarios)
    • single consumer consuming from multiple topic-partition.
    • multiple consumers consuming from single topic-partition
  • To coordinate all this example scenarios, We can either use our own logic for this, or we can use Consumer Group which make things easy for us.
  • By using Consumer Group, we use the Coordination Protocol

Why Consumer Group?

Using this primitive, multiple consumer can now ingest from the same topic and consumer group takes all care of:

  • parallelism & coordination: which partition should this consumer read from?
  • tracking progress: Helps w dealing w failures, consumers need to manage offset, metadata, checkpointing etc. CG helps with all that.
    • When using CG to track progress, internally it uses Raft, the Raft Group is essentially what the Consumer Group’s is. The data is serialized and stored on-disk.
  • availability
    • Because we have CG going on, based on the spec/protocol, we’ll have replicas ready to go off if the CG coordinators

Identification of Consumer Group

  • From what I understand, it’s identity is (topic_id, consumer_group_id). So if you have the same consumer_group_id for 2 different topics, the actual entity of consumer_group will be different even if the value of consumer_group_id is same. ie. (topic1, cg_xyz), (topic2, cg_xyz), these two are 2 different consumer_group, completely separate. Usually it’s confusing to name things like this but if you happen to they’ll be 2 different consumer group. A consumer group will NOT share a topic.
  • Atleast in the case of redpanda connect, consumer_group_id is created on the fly when you start the consumer.

Consumer Group Coordinator

Consumer Group Rebalance

from the pov of the CG coordinator, the system needs a “stable state” to make progress. After rebalancing, we achieve the stable state and then we go on. The “go on” is based on the debounce effect which extends the window for good.

  • Consumer comes/leaves out of CG, or anything that changes the dynamic relationship between Consumer and CG triggers something called Group Rebalance
  • Rebalance: Who should be consuming from which partition
  • If CG has a lot of members(consumers) or flaky members(consumers), we spend a lot of time in doing rebalancing plus if rebalaning keeps on happening we cannot make any progress. Eg. if a consumer keeps joining and leaving the CG every 10ms, we’ll be triggering rebalance every 10ms and things would not make progress etc.
  • Solution is:

Leadership Coordination Protocol

See “The Magical Rebalance Protocol of Apache Kafka” by Gwen Shapira - YouTube 🌟

  • In event of one of the leader broker (CG coordinator) failing

    • The leadership is changed by Raft
    • Because we have the replicated CG data of the failed node in the new leader broker, we “replay that log” in the new leader.
    • Once the replay/recovery is finished, we announce to the cluster that I am the new leader for the Raft group. i.e the new CG coordinator.
    • The Consumer Group protocol ensures that the consumers switch the CG coordinator to the newly elected leader. (i.e clients/consumer automatically discover the address of the new leader node)
      • https://kafka.apache.org/protocol#The_Messages_FindCoordinator : This is defined in the Kafka spec. Clients/Consumers would query the FindCoordinator API to find the new CG coordinator.
      • Basically client/consumer asks “which broker in the cluster is acting as the CG coordinator for my consumer group”. Any broker in the cluster can answer this question. It hashes the consumer group name and looks up the replicated&deterministic on cg <> cg coordinator broker mapping.
        • Mapping: CG coordinator <> Partition Leader , but if both of them are the same we just have to find one.
    • DOUBT: What do we mean when we ask “who is the leader of this partition?”
      • OKAY clarification, consumer/client talk in terms of partition, not in terms of CG coordinator. TODO
      • We’ve actually assigned the leader of the partition to the CG coordinator
  • How does the client/consumer “really” get to know of the new broker for the topic its’s trying to read from

    • This is the explanation of what FindCoordinator API does.
  • Hierarchy of things

    • client/consumer connects to a Consumer Group