← Learning

Lesson 2

Kafka Basics: Records, Topic Partitions, and Producers

In this lesson, you’ll learn how to store data in Kafka.

Key Points:

  • Record is the fundamental unit of data in Kafka
  • Topic partition is the fundamental unit of storage in Kafka
  • Topic partition is a log of records
  • Topic is one or more topic partitions
  • Record appended to one topic partition
  • Either explicitly specify topic partition for record, or topic partition chosen based on record’s key

As you work through this lesson you may be thinking “I just want one process to publish messages through Kafka to other processes”. As we’ll see, Kafka does more than just traditional pub-sub messaging, and all the details below about records and topic partitions are fundamental to understanding how to use Kafka.

Also, note that the focus of these first few lessons is learning the foundation of how Kafka works, so you can use it effectively. The Scala code examples will initially use the impure, non-referentially-transparent Java API to demonstrate Kafka concepts. This will not be code you should copy-and-paste into a production Banno service. In later lessons, we’ll see how to properly use Kafka in a pure way, using cats-effect and fs2. Until then, please just work through the code examples to learn the Kafka fundamentals.

If you need help with anything in this lesson, please ask in #sig-kafka.

Records (i.e. Data)

Kafka is fundamentally a system for storing and accessing data records. You can write records, and read records, and that’s pretty much it. Kafka can store lots of records, and provides useful guarantees related to reading and writing them, but at the most atomic level is the record.

Some documentation also refers to records as messages. We’ll use record instead, because that’s what the Kafka APIs use, and it’s really a more accurate description. Message implies messaging, and feels temporary, and while Kafka can be used for inter-process messaging, it’s really about something more general: storing and accessing records. But messages and records are the same thing.

Records consist of multiple parts, and look a bit different to readers and writers. For the moment, just think of a record as a single piece of data.

Topic Partitions (i.e. Logs)

A log is an ordered collection of records. The only supported write operation is appending a new record to the end of the log. Records are stored in the log in the order in which they were appended.

Each record written to the log is assigned an offset. The offset is just the position of the record in the log: offsets start at zero (for the first record) and increment by one for each record. Offsets are stored as 64-bit longs, so we probably don’t need to worry about running out of them.

The only supported read operation is fetching records from the log, in order, starting at some offset.

In Kafka, each log is called a topic partition. A topic partition is where records are stored. Programs called producers append records to topic partitions, and programs called consumers read records from topic partitions. When working with Kafka you usually won’t see the word log. Just remember that a topic partition is a log.

A Kafka topic is a collection of one or more topic partitions. While you may be tempted to think of a topic as a log, in reality a topic is partitioned into a set of logs (called topic partitions). The records stored in a topic are actually stored in that topic’s partitions. Kafka makes it convenient to work with all of a topic’s partitions together as a single unit, but please remember that the topic partition is the log, and a topic consists of multiple topic partitions.

Code Example: Writing Records to Topic Partitions

A lot of new terminology was presented above; let’s write some code to learn about all of this in practice. Make sure you have everything running in Minikube.

Create a new sbt project and define this dependency:

"org.apache.kafka" % "kafka-clients" % "1.0.0"

Create a new main class in this project, and copy/paste the code that follows. Here are all of the imports you’ll need:

import scala.collection.JavaConverters._
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.serialization._

First, let’s define some values we’ll need later:

val bootstrapServers = "kafka-0.broker.kafka.svc.cluster.local:9092"
val topic = "lesson2-topic1"
val partitionCount = 2

bootstrapServers tells other parts of our program how to communicate with the Kafka cluster on the network. The value above is the host and port of one of the three Kafka brokers running in Minikube. We only need to specify the host and port of one broker; the clients will discover the others from there.

We’re also specifying the name of a topic we want to work with, along with the number of partitions of that topic. Remember: topic partitions are logs, so this topic will be partitioned into two logs.

Create a Topic

Here is some code that creates this topic:

val adminClientConfig = Map[String, Object](
  AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers
)
val adminClient = AdminClient.create(adminClientConfig.asJava)
if (!adminClient.listTopics().names().get().asScala.contains(topic)) {
  adminClient.createTopics(List(new NewTopic(topic, partitionCount, 2)).asJava)
}
adminClient.close()

The Kafka Java API provides several different types of clients, and each one is typically configured using a map of config values, as above.

Note that this code is idempotent: it will only create the topic if it does not already exist. This is really useful so that we can run our program multiple times, and we will not get an error trying to create a topic that already exists.

You are likely also nervous about side effects in this code. It is pretty obviously performing network I/O and even modifying state in a remote system. Those calls can fail. There is a resource created, used, and manually closed. These are all perfectly valid concerns; in a real system we would wrap things in e.g. IO and Stream.bracket. But for now, let’s just focus on learning about Kafka using its existing (impure) Java client library, and we’ll revisit how to regain referential transparency in a future lesson. (Hint: we’ll wrap the Kafka Java clients in fs2 streams.)

Run your program now. After it’s finished, do you see the new topic in the Kafka Topics UI? If not, please seek help in #sig-kafka.

Write Records

Now that we have a topic, let’s write some records to its partitions. And remember, the topic’s partitions are the logs, not the topic itself!

val producerConfig = Map[String, Object](
  ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
  ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG -> "true",
  ProducerConfig.COMPRESSION_TYPE_CONFIG -> "lz4",
  ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
  ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
)
val producer = new KafkaProducer[String, String](producerConfig.asJava)
for (i <- 0 until 10) {
  val partition = i % partitionCount
  val key = "key" + i
  val value = "value" + i
  val record = new ProducerRecord(topic, partition, key, value)
  val metadataFuture = producer.send(record)
  val metadata = metadataFuture.get()
  println(record + ", metadata=" + metadata)
}
producer.close()

The overall structure of this code is:

  • Create a KafkaProducer using some config values
  • Create a ProducerRecord for each record, and write it to Kafka using producer.send(record)
  • Close the producer at the end

This is a very short-lived program. In “the real world” of long-lived programs (e.g. a web server) we would typically create the producer client when the service starts up, we’d use that same producer instance throughout the entire life of the service, and we’d close the producer as the service is shutting down. We typically would not create lots of short-lived producers just to send a few records and then close them.

Remember (yet again!) that Kafka actually stores records in topic partitions (which are logs). In this program, we explicitly specify which partition of the topic to store each record in. Because partitionCount = 2, i % partitionCount alternates between 0, 1, 0, 1, .... So the first record is stored in partition 0, second record is stored in partition 1, third record is stored in partition 0, etc. Later on, we’ll learn more about which partition a record is stored in.

Each record contains a value field. The value contains the main data that you want to record. In our example we just use a silly record value: "value" + i. In a real system, the record value could be an event object, a description of a change to some state, the entire representation of some state, or really anything you want. Kafka stores this value as a byte array (i.e. Array[Byte]) and doesn’t care what you put in it.

Run your program, study its output, then refresh the Kafka Topics UI and select the topic. You should see the records that the program stored in the Kafka topic’s partitions. Note that this UI shows the key, value, partition, and offset for each record. You can also select a specific partition of the topic to display, or “All partitions”. Play around with this UI, examining the records your program stored in the topic.

Notice that records in each partition appear in the same order as our program wrote them. This is an important guarantee that Kafka provides: records sent by the same producer to the same topic partition will be stored in the order in which the producer sent them.

Record Keys

Let’s modify the code above a little bit:

val topic = "lesson2-topic2"

...

for (i <- 0 until 10) {
  val key = "key" + i
  val value = "value" + i
  val record = new ProducerRecord(topic, key, value)
  val metadataFuture = producer.send(record)
  val metadata = metadataFuture.get()
  println(record + ", metadata=" + metadata)
}

First off, we used a new topic, just to make it easier to find our new records in the Kafka Topics UI.

We also removed the partition value, and now create the record using new ProducerRecord(topic, key, value). We know that Kafka stores each record in a topic partition, but now that we no longer explicitly specify which partition to append each record to, which partition will each record be stored in?

Run your program, and examine the records in the Kafka Topic UI. See which records are in partition 0, and which records are in partition 1. Any idea how Kafka is choosing which partition to store the records in?

There’s no obvious pattern to the records in each partition. Partition 0 contains records 0, 1, and 6. Partition 1 contains records 2, 3, 4, 5, 7, 8, and 9.

Internally, KafkaProducer uses this class to choose which partition to append a record to. According to its documentation, it will actually do three different things:

  1. If a partition is specified in the record, use it
  2. If no partition is specified but a key is present choose a partition based on a hash of the key
  3. If no partition or key is present choose a partition in a round-robin fashion

In the previous section, where we specified the partition value of the record, we were actually doing #1.

Now we’re doing #2. You can read the code to see exactly what’s going on, but the gist is partition = hash(key) % partitionCount. This partition function only depends on the record key and the count of the topic’s partitions, and is deterministic. If you use a different topic name and run the program again, the same records will be stored in the same partitions of the new topic. If you change the keys to e.g. "key-" + i you’ll notice different partition assignments.

This is a very important property to keep in mind: different records with the same key will be stored in the same topic partition. As an example, say our user account service records a change event in a Kafka topic each time a user account is updated. If we want all events for the same user account to be stored in the same partition, we’d want to use the user account ID as the record key.

Why not just use one partition for every topic? You could, but keep in mind that using multiple topic partitions allows Kafka to store each partition on a different broker machine, and also allows multiple consumers to process the partitions in parallel (we’ll see this in the next lesson).

What happens if you add more partitions to an existing topic? Kafka does allow you to do this. But then every record key could be assigned to a different partition, and records with the same key may be stored in different partitions than before. It’s a best practice to not add partitions to an existing topic. If you need more partitions, you should create a new topic, and copy records from the old topic to the new topic. This requires time and effort, so it’s good to choose enough partitions up-front, so you won’t need to add more later. Can we just use 1000 partitions for every topic? No, sorry.

Finally, you can test out #3 by setting every record’s key to null. What happens now when you run the program?

Summary

In this lesson, you wrote code that stored records in Kafka topic partitions, and explored the records using the Kafka Topics UI. Nice work! You also learned about records, topic partitions (i.e. logs), and keys. Now that you have this code, feel free to modify it to try to answer any questions you have that were not covered in this lesson.

Key Points:

  • Record is the fundamental unit of data in Kafka
  • Topic partition is the fundamental unit of storage in Kafka
  • Topic partition is a log of records
  • Topic is one or more topic partitions
  • Record appended to one topic partition
  • Either explicitly specify topic partition for record, or topic partition chosen based on record’s key

In the next lesson, we’ll write code that reads records from topic partitions.

Resources