Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant, real-time data streaming. It provides a unified, high-performance solution for handling real-time data feeds. Kafka is built to be horizontally scalable, reliably store massive amounts of data, and connect disparate data sources and sinks.
full architecture
graph TD
subgraph Client Applications
A[Producer Clients]
B[Consumer Clients]
C[Admin Clients]
D[Streams Applications]
end
A -- Produces messages --> E["Broker(Controller) Nodes"]
B -- Consumes messages --> E
C -- Management operations --> E
D -- Process, transform --> E
subgraph Kafka Cluster
E --> F[Socket Server]
F -- Route Requests --> G[KafkaApis/ControllerApis]
G -- Request Processor --> H[ReplicaManager]
H -- Manages --> J[LogManager]
J -- "Topic Partitions" --> K[Storage]
G --> L[GroupCoordinator]
G --> M[TransactionCoordinator]
subgraph Coordinator Services
L -- "Coordinator Services" --> N[QuorumController]
M -- "Coordinator Services" --> N
end
N -- "KRaft Controller" --> O[KRaft Controller]
O --> P[Kafka Broker]
P -- "Metadata updates" --> G
end
replica management
create a topic with 3 partition and 2 replication factor, we will have a kafka cluster like this:
graph TD
subgraph "Kafka Cluster (3 Nodes)"
Node1["Kafka Broker 1"]
Node2["Kafka Broker 2"]
Node3["Kafka Broker 3"]
end
Topic["MyTopic (3 Partitions, RF=2)"]
%% Connect Topic to its logical Partitions
Topic -- "Has" --> P0["Partition 0"]
Topic -- "Has" --> P1["Partition 1"]
Topic -- "Has" --> P2["Partition 2"]
%% Assign Partition 0 replicas to nodes
Node1 -- "Hosts Leader" --> P0L["P0 (Leader)"]
Node2 -- "Hosts Follower" --> P0F["P0 (Follower)"]
%% Assign Partition 1 replicas to nodes
Node2 -- "Hosts Leader" --> P1L["P1 (Leader)"]
Node3 -- "Hosts Follower" --> P1F["P1 (Follower)"]
%% Assign Partition 2 replicas to nodes
Node3 -- "Hosts Leader" --> P2L["P2 (Leader)"]
Node1 -- "Hosts Follower" --> P2F["P2 (Follower)"]
%% Styling for clarity
classDef broker_style fill:#E0F7FA,stroke:#333,stroke-width:2px;
class Node1,Node2,Node3 broker_style;
classDef topic_style fill:#D1C4E9,stroke:#333,stroke-width:2px;
class Topic topic_style;
classDef partition_base_style fill:#FFFDE7,stroke:#333,stroke-width:1px;
class P0,P1,P2 partition_base_style;
classDef leader_replica_style fill:#A2D9CE,stroke:#333,stroke-width:1px;
class P0L,P1L,P2L leader_replica_style;
classDef follower_replica_style fill:#F4D03F,stroke:#333,stroke-width:1px;
class P0F,P1F,P2F follower_replica_style;
linkStyle 0,1,2 stroke-width:0px;
How did partitions get handled? there are two critical components: ReplicaManager and Partition.
ReplicaManager key responsibilities:
- manages local partition replicas
protected val allPartitions = new ConcurrentHashMap[TopicPartition, HostedPartition]
- handles log appends, fetches, truncations, and high watermark management
- coordinates with fetcher threads for replication from leaders to followers
ReplicaFetcherManager
- manages ISRs (in sync replica) set and triggerrs actions when replicas fall out of sync
- startup() –>
scheduler.schedule("isr-expiration", () => maybeShrinkIsr(), 0L, config.replicaLagTimeMaxMs / 2)
- startup() –>
- handles partition leadership changes (role change) as directed by the controller
becomeLeaderOrFollower
- cleans up resources and metrics for partitions as needed
Partition Data structure that represents a topic partition.
the sequence diagram of creating new topic was as followed.
sequenceDiagram
participant AdminClient
participant Controller
participant Broker1
participant Broker2
participant ReplicaManager1 as ReplicaManager (Broker1)
participant ReplicaManager2 as ReplicaManager (Broker2)
AdminClient->>Controller: CreateTopicsRequest
Controller->>Controller: Update metadata, assign partitions/replicas
Controller->>Broker1: LeaderAndIsrRequest (partition assignment)
Controller->>Broker2: LeaderAndIsrRequest (partition assignment)
Broker1->>ReplicaManager1: becomeLeaderOrFollower()
Broker2->>ReplicaManager2: becomeLeaderOrFollower()
ReplicaManager1->>ReplicaManager1: Create Partition(s), initialize log(s)
ReplicaManager2->>ReplicaManager2: Create Partition(s), initialize log(s)
ReplicaManager1-->>Broker1: Partition ready
ReplicaManager2-->>Broker2: Partition ready
Broker1-->>Controller: LeaderAndIsrResponse
Broker2-->>Controller: LeaderAndIsrResponse
Controller-->>AdminClient: CreateTopicsResponse
group coordinating
Group coordination in KRaft mode handles consumer group membership, partition assignment, and offset management.
the workflow of partition assign.
sequenceDiagram
participant C1 as Consumer 1
participant C2 as Consumer 2
participant C3 as Consumer 3
participant GC as GroupCoordinator
Note over C1,GC: Phase 1: Join Group
C1->>GC: JoinGroupRequest(groupId, memberId="", protocols)
C2->>GC: JoinGroupRequest(groupId, memberId="", protocols)
C3->>GC: JoinGroupRequest(groupId, memberId="", protocols)
GC->>GC: Wait for all members or timeout
GC->>GC: Select group leader (first to join)
GC-->>C1: JoinGroupResponse(leader=true, members=[C1,C2,C3])
GC-->>C2: JoinGroupResponse(leader=false, members=[])
GC-->>C3: JoinGroupResponse(leader=false, members=[])
Note over C1,GC: Phase 2: Sync Group
C1->>C1: Perform partition assignment
C1->>GC: SyncGroupRequest(groupId, assignments)
C2->>GC: SyncGroupRequest(groupId, assignments=[])
C3->>GC: SyncGroupRequest(groupId, assignments=[])
GC-->>C1: SyncGroupResponse(assignment=[P0])
GC-->>C2: SyncGroupResponse(assignment=[P1])
GC-->>C3: SyncGroupResponse(assignment=[P2])
Note over C1,GC: Phase 3: Heartbeat & Normal Operation
loop Every session.timeout.ms/3
C1->>GC: HeartbeatRequest
C2->>GC: HeartbeatRequest
C3->>GC: HeartbeatRequest
GC-->>C1: HeartbeatResponse
GC-->>C2: HeartbeatResponse
GC-->>C3: HeartbeatResponse
end
log management
the sequence diagram of create new log file.
sequenceDiagram
participant AdminClient
participant Controller
participant Broker
participant ReplicaManager
participant LogManager
participant UnifiedLog
participant FileSystem
Note over AdminClient, FileSystem: Topic Creation Phase
AdminClient->>Controller: CreateTopicsRequest
Controller->>Controller: Assign partitions & replicas
Controller->>Broker: LeaderAndIsrRequest (new topic partitions)
Broker->>ReplicaManager: becomeLeaderOrFollower()
ReplicaManager->>ReplicaManager: makeLeaders() / makeFollowers()
ReplicaManager->>LogManager: getOrCreateLog(topicPartition)
LogManager->>LogManager: Check if log exists
LogManager->>UnifiedLog: new UnifiedLog(dir, config, ...)
UnifiedLog->>FileSystem: Create log directory (/kafka-logs/topic-partition/)
UnifiedLog->>FileSystem: Create .log, .index, .timeindex files
UnifiedLog-->>LogManager: Log created
LogManager-->>ReplicaManager: Log instance
ReplicaManager-->>Broker: Partition ready
Broker-->>Controller: LeaderAndIsrResponse
Controller-->>AdminClient: CreateTopicsResponse
Note over AdminClient, FileSystem: Data Production Phase
participant Producer
Producer->>Broker: ProduceRequest (records)
Broker->>ReplicaManager: appendRecords()
ReplicaManager->>ReplicaManager: getPartition() -> get leader partition
ReplicaManager->>UnifiedLog: appendRecordsToLeader(records)
UnifiedLog->>UnifiedLog: analyzeAndValidateRecords()
UnifiedLog->>UnifiedLog: append() to active segment
UnifiedLog->>FileSystem: Write records to .log file
UnifiedLog->>FileSystem: Update .index file (offset index)
UnifiedLog->>FileSystem: Update .timeindex file (time index)
UnifiedLog->>UnifiedLog: maybeRoll() - check if need new segment
alt If segment size limit reached
UnifiedLog->>FileSystem: Create new .log segment file
UnifiedLog->>FileSystem: Create new .index file
UnifiedLog->>FileSystem: Create new .timeindex file
end
UnifiedLog-->>ReplicaManager: LogAppendInfo (offset, timestamp)
ReplicaManager->>ReplicaManager: tryCompleteDelayedRequests()
ReplicaManager-->>Broker: ProduceResponse
Broker-->>Producer: ProduceResponse (offset, timestamp)
the general log file types and the hierarchy.
graph LR
Topic["主题"] -->|分区| Partition1["Partition (分区#0)"]
Topic -->|分区| Partition2["Partition (分区#1)"]
Topic -->|分区| Partition3["Partition (分区#2)"]
Partition1 -->|副本| Replica1["Replica (副本#0)"] --> Log1["Log (日志#0)"]
Partition2 -->|副本| Replica2["Replica (副本#1)"] --> Log2["Log (日志#1)"]
Partition3 -->|副本| Replica3["Replica (副本#2)"] --> Log3["Log (日志#2)"]
subgraph LogSegment
Log1 -->|LogSegment| LogSegment1["LogSegment (日志分段#0 分区#0)"]
Log1 -->|LogSegment| LogSegment2["LogSegment (日志分段#1 分区#0)"]
Log1 -->|LogSegment| LogSegment3["LogSegment (日志分段#2 分区#0)"]
end
LogSegment1 -->|Log| Log[".log 日志文件"]
LogSegment1 -->|Index| Index[".index 偏移量索引文件"]
LogSegment1 -->|TimeIndex| TimeIndex[".timeindex 时间戳索引文件"]
log retention sequence
sequenceDiagram
participant Producer
participant Broker
participant LogSegment
participant TimeIndex
participant LogCleaner
Note over Producer,LogCleaner: Message Production & Timestamp Recording
Producer->>Broker: ProduceRequest(records with timestamps)
Broker->>LogSegment: Append records
LogSegment->>LogSegment: Track largest timestamp
LogSegment->>TimeIndex: Update time index mapping
Note over Producer,LogCleaner: Retention Check Process
LogCleaner->>LogSegment: Check segment for retention
LogSegment->>LogSegment: Get largest timestamp in segment
LogSegment->>LogCleaner: Return segment.largestTimestamp
LogCleaner->>LogCleaner: Calculate: now - largestTimestamp
alt If age > log.retention.ms
LogCleaner->>LogSegment: Mark for deletion
LogSegment->>LogSegment: Delete segment files
else Keep segment
LogCleaner->>LogSegment: Keep segment
end
silly questions
what’s the dataflow while client produces data (in kraft mode)
- how did client know which broker to connect
- through metadata request
- maintain in local cache
- refresh if any error occurred
- how the topic partition was handled, by client or broker
- if the topic partition was in client side, the requests would be maintained in local memory buffer and resided inside the broker-partition section to achieve batching (and the memory pool was shared across all broker connections)
sequenceDiagram
participant Producer
participant BootstrapBroker as Bootstrap Broker
participant Controller
participant TargetBroker as Target Broker (Leader)
Note over Producer,TargetBroker: Bootstrap & Metadata Discovery
Producer->>BootstrapBroker: Connect to bootstrap.servers
Producer->>BootstrapBroker: MetadataRequest(topics=[])
BootstrapBroker->>Controller: Get cluster metadata
Controller-->>BootstrapBroker: Cluster metadata
BootstrapBroker-->>Producer: MetadataResponse(brokers, topics, partitions, leaders)
Note over Producer,TargetBroker: Topic-Specific Metadata
Producer->>BootstrapBroker: MetadataRequest(topics=["my-topic"])
BootstrapBroker-->>Producer: MetadataResponse(partition leaders for "my-topic")
Note over Producer,TargetBroker: Direct Connection to Leader
Producer->>Producer: Determine partition (hash key or round-robin)
Producer->>Producer: Find leader broker for target partition
Producer->>TargetBroker: Establish connection
Producer->>TargetBroker: ProduceRequest(records)
TargetBroker-->>Producer: ProduceResponse(offset, timestamp)