All topics
Data · Learning hub

Apache Kafka notes for developers

Master Apache Kafka with a curated set of 3 developer notes — core concepts, patterns, and interview prep. Maintained by the DevRecall team.

Save this stack to your DevRecallMore Data notes
Apache Kafka

Core Concepts: Topics, Partitions & Brokers

Apache Kafka: Core Concepts Apache Kafka is a distributed event streaming platform. Producers publish records to topics; consumers read from topics. Kafka store

Apache Kafka: Core Concepts

Apache Kafka is a distributed event streaming platform. Producers publish records to topics; consumers read from topics. Kafka stores records durably on disk with configurable retention. Throughput scales horizontally via partitions.

Architecture

Core components:

Broker
  - A single Kafka server
  - A cluster has 3+ brokers (for replication + HA)
  - One broker is the controller (manages partition leadership)
  - KRaft mode (Kafka 2.8+): replaces ZooKeeper with built-in Raft consensus

Topic
  - Named stream of records (like a DB table or log file category)
  - Partitioned for parallelism
  - Retained for a configurable time (default: 7 days) regardless of consumption

Partition
  - Ordered, immutable sequence of records
  - Each partition stored on one broker (leader) + replicated to N-1 others (followers)
  - Records get an incrementing offset within the partition
  - Ordering guaranteed within a partition, NOT across partitions

Replication
  - Replication factor: how many copies (typical: 3)
  - In-Sync Replicas (ISR): replicas that are caught up with the leader
  - Leader handles all reads/writes; followers replicate asynchronously
  - If leader fails, a new leader is elected from ISR

ZooKeeper (legacy) / KRaft (modern)
  - Manages broker metadata, controller election
  - KRaft: Kafka 3.3+ can run without ZooKeeper (use KRaft in production from 3.3+)

CLI: Topics & Messages

# Start (Docker Compose — easiest for dev)
# docker-compose.yml with confluentinc/cp-kafka or apache/kafka

# Create topic
kafka-topics.sh --create   --bootstrap-server localhost:9092   --topic orders   --partitions 6   --replication-factor 3   --config retention.ms=604800000   # 7 days

# List topics
kafka-topics.sh --list --bootstrap-server localhost:9092

# Describe topic (partition leaders, ISR)
kafka-topics.sh --describe --topic orders --bootstrap-server localhost:9092

# Delete topic
kafka-topics.sh --delete --topic orders --bootstrap-server localhost:9092

# Alter topic (add partitions — can only increase, never decrease)
kafka-topics.sh --alter --topic orders --partitions 12 --bootstrap-server localhost:9092

# Produce messages (CLI)
kafka-console-producer.sh   --bootstrap-server localhost:9092   --topic orders   --property key.separator=:   --property parse.key=true
# Type: user-123:{"orderId": 1, "amount": 99.99}

# Consume messages (CLI)
kafka-console-consumer.sh   --bootstrap-server localhost:9092   --topic orders   --from-beginning   --property print.key=true   --property print.timestamp=true

# Consumer group CLI
kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
kafka-consumer-groups.sh --describe --group my-group --bootstrap-server localhost:9092
kafka-consumer-groups.sh --reset-offsets --group my-group --topic orders   --to-earliest --execute --bootstrap-server localhost:9092

Key Concepts

  • Offset: unique ID of a record within a partition. Consumers commit offsets to track progress.

  • Consumer group: set of consumers sharing a topic. Each partition assigned to exactly one consumer in the group.

  • Throughput scales with partitions: more partitions = more consumer instances can work in parallel.

  • Log compaction: instead of time-based retention, keep only the latest record per key (useful for state/changelog topics).

  • Producer acks: acks=0 (fire-and-forget), acks=1 (leader confirmed), acks=all (all ISR confirmed — strongest).

  • Exactly-once semantics (EOS): enable with enable.idempotence=true on producer + transactional APIs.

  • KIP-500 / KRaft: Kafka 3.3+ no longer requires ZooKeeper — use KRaft mode for new deployments.

Apache Kafka

Producers & Consumers

Apache Kafka: Producers & Consumers Producer # pip install confluent-kafka from confluent_kafka import Producer import json producer = Producer({ 'bootstrap.ser

Apache Kafka: Producers & Consumers

Producer

# pip install confluent-kafka
from confluent_kafka import Producer
import json

producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all',                  # wait for all ISR replicas
    'retries': 5,
    'retry.backoff.ms': 300,
    'linger.ms': 5,                 # batch messages for 5ms (higher throughput)
    'batch.size': 16384,
    'compression.type': 'snappy',   # compress batches (lz4, gzip, snappy, zstd)
    'enable.idempotence': True,     # exactly-once delivery
})

def delivery_callback(err, msg):
    if err:
        print(f'Delivery failed: {err}')
    else:
        print(f'Delivered to {msg.topic()}[{msg.partition()}] at offset {msg.offset()}')

# Produce with key (same key → same partition = ordered per key)
order = {'orderId': 1, 'userId': 'user-123', 'amount': 99.99}
producer.produce(
    topic='orders',
    key='user-123',                 # key ensures ordering per user
    value=json.dumps(order).encode('utf-8'),
    callback=delivery_callback,
)
producer.poll(0)   # trigger delivery callbacks (non-blocking)

# Flush before exit
producer.flush(timeout=10)         # wait for all pending messages

Consumer

from confluent_kafka import Consumer, KafkaError
import json

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processing-group',
    'auto.offset.reset': 'earliest',    # 'latest' = only new messages
    'enable.auto.commit': False,        # manual commit for at-least-once
    'max.poll.interval.ms': 300000,     # max time between polls before rebalance
    'session.timeout.ms': 45000,
})

consumer.subscribe(['orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue   # reached end of partition
            else:
                raise KafkaException(msg.error())

        # Process message
        key = msg.key().decode('utf-8') if msg.key() else None
        value = json.loads(msg.value().decode('utf-8'))
        print(f'Offset {msg.offset()}: key={key}, value={value}')

        process_order(value)

        # Commit after successful processing (at-least-once delivery)
        consumer.commit(asynchronous=False)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()   # commits offsets and triggers rebalance

Java Producer/Consumer

// Maven: kafka-clients dependency

// Producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
    ProducerRecord<String, String> record = new ProducerRecord<>(
        "orders", "user-123", "{"orderId": 1}"
    );
    producer.send(record, (metadata, exception) -> {
        if (exception != null) log.error("Send failed", exception);
        else log.info("Sent to partition {} offset {}", metadata.partition(), metadata.offset());
    });
}

// Consumer
Properties cProps = new Properties();
cProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
cProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-group");
cProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
cProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
cProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
cProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(cProps)) {
    consumer.subscribe(List.of("orders"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            processOrder(record.key(), record.value());
        }
        consumer.commitSync();
    }
}
Apache Kafka

Kafka Streams, Connect & Best Practices

Apache Kafka: Streams, Connect & Best Practices Kafka Streams Kafka Streams is a client library for building real-time stream processing applications. It runs i

Apache Kafka: Streams, Connect & Best Practices

Kafka Streams

Kafka Streams is a client library for building real-time stream processing applications. It runs inside your application — no separate cluster needed.

// Build stream processing topology
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> orders = builder.stream("orders");

// Filter, transform
KStream<String, Order> parsedOrders = orders
    .filter((key, value) -> value != null)
    .mapValues(value -> parseOrder(value));

// Aggregate per user in a 5-minute window
parsedOrders
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .aggregate(
        () -> 0.0,
        (key, order, total) -> total + order.getAmount(),
        Materialized.as("order-totals-store")
    )
    .toStream()
    .to("order-totals-per-user");

// Join two streams (within 60s window)
KStream<String, Payment> payments = builder.stream("payments");
parsedOrders.join(
    payments,
    (order, payment) -> new FulfilledOrder(order, payment),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(60))
).to("fulfilled-orders");

// Interactive queries (query materialized state)
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
ReadOnlyKeyValueStore<String, Double> store =
    streams.store(StoreQueryParameters.fromNameAndType("order-totals-store",
        QueryableStoreTypes.keyValueStore()));
Double total = store.get("user-123");

Kafka Connect

Kafka Connect is a framework for streaming data between Kafka and external systems using reusable connectors.

// Source connector: PostgreSQL → Kafka (Debezium CDC)
{
  "name": "postgres-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "replicator",
    "database.password": "secret",
    "database.dbname": "mydb",
    "table.include.list": "public.orders,public.products",
    "topic.prefix": "cdc",
    "plugin.name": "pgoutput"
  }
}

// Sink connector: Kafka → Elasticsearch
{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "orders",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true"
  }
}
# Manage connectors via REST API
curl -X POST http://connect:8083/connectors   -H "Content-Type: application/json"   -d @connector-config.json

curl http://connect:8083/connectors                      # list
curl http://connect:8083/connectors/postgres-source/status  # status
curl -X DELETE http://connect:8083/connectors/postgres-source

Best Practices

  • Partition count: start with num_consumers * 2; can only increase partitions (never decrease).

  • Replication factor 3: tolerate 1 broker failure without data loss (acks=all + min.insync.replicas=2).

  • Use message keys for ordering guarantees — all records with the same key go to the same partition.

  • Schema Registry (Confluent): enforce Avro/JSON Schema, enable schema evolution with compatibility rules.

  • Monitor consumer lag: kafka-consumer-groups.sh --describe, or use Burrow/Kafdrop/Cruise Control.

  • Log compaction for state topics: retain only latest value per key (good for CDC and lookup tables).

  • Dead Letter Queue (DLQ): route failed/unparseable messages to a separate topic for inspection.

  • Idempotent consumers: always design consumers to handle duplicate messages safely.

Keep your Apache Kafka knowledge sharp.

Save this stack to your personal DevRecall — add your own notes, track what you're learning, and share what you know with the community.

Get started — free forever