9 minutes
Migrating Kafka Consumer Group Offsets
At Doximity we use a lot of Kafka. Like, a lot. We’re moving an average of ~300 million events through our Kafka environment per day. Our Kafka environment started humbly, with a couple topics, then grew and grew and now we have a few dozen topics, and a few dozen consumer groups. With this, our consumer group architecture has changed pretty rapidly over time, and the consumer groups defined in the beginning don’t match up well with current best practices I’ve established around the environment.
One of the main concerns this past year is that some of our consumer groups have gotten pretty large, and are processing from multiple topics each, and some of our consumer groups can consist of consumers from different applications. This introduces scaling issues, and “bad neighbor” situations when for example one topic has bad data in it all of a sudden, or there’s a bug introduced in consumer logic for a particular set of consumers, both of which can cause consumers in their context to leave the consumer group; which causes a rebalance. This rebalance will be felt across everyone in the consumer group whether or not they are even subscribed to the same topic or are in the same application. Rebalances are basically a “stop-the-world” situation, and if the initial problem doesn’t get resolved, you can get stuck in a perpetual rebalance. Obviously this causes some stability issues for everyone.
In order to fix scaling problems that sprout from this, we came up with a plan to migrate consumer groups to a new structure. By design, a consumer group should be a set of consumers that have a common context, where they are within the same application, subscribe to the same data, and perform the same functions on that data. Each application / microservice should have its own consumer group. So the goal here is to take large consumer groups with more than one applications’ consumers within them, and break them apart into smaller consumer groups that follow the aforementioned best practice.
In Ruby on Rails, our consumers are running as rake tasks (you can certainly argue this isn’t what rake was designed to do, it should be a finite process, but this is a later discussion in another post), with their own unique processing logic that is used to process Kafka events. Most of our consumers are idempotent, however, there’s a few older ones that may not be. Since certainty isn’t 100% established here, it’s best to reduce the amount of reprocessing as much as possible so we don’t process duplicate events. Since we care deeply about our users and downstream data, we will need to take our current and deemed old consumer group, calculate it’s current committed offsets for each partition of every topic that it is consuming from. And in the new consumer group, we take those calculations and migrate that state into this new group so it starts off exactly where the previous one left off. If our offsets are incorrect, we will either reprocess data and potentially cause issues with duplicates, or we will overshoot and miss events in the log. Either outcome is undesired. Since this is a problem I’ve yet to encounter in Kafka, I decided this would be a great blog post and something to document.
Note: I’m advocating here that new consumer groups are created for everything, including the application that you may want to keep in the original consumer group. Although it’s technically viable, I’m against this only because the original consumer group will have really messy and inacurrate consumer lag metrics.
Okay so how do we achieve this? One way of doing it is to stop all producers, and the consumers in the groups we care about. With this, no new messages are coming in and offsets are frozen in place and no lag is being introduced. Once you confirm the producers are turned off and no messages are coming in, you could simply deploy your changes which add the new consumer group. Since the default behavior is to start consuming from the log now and not from beginning, this will mean that the new consumer group and the older group will be at the same exact spot in the log. Then you could just start up your producers and turn on this new group. This is a great and easy solution, but not when you’re running in production and your producers (we have more than 50) are fragmented across various services and embedded within applications that can’t be turned off. Additionally, if we stop producers we risk not capturing data into the environment at all, and other consumers which aren’t involved in this migration would be impacted.
My goals here were to turn off the old consumer group which makes sure the committed offsets don’t change, create the new consumer group, and make sure that this new consumer group starts off where the last one left off. I realized while tinkering with the CLI tools, that you can use the kafka-consumer-groups
CLI to do this by using the --reset-offsets
flag. This flag takes the following reset specifications: --to-datetime
, --by-period
, --to-earliest
, --to-latest
, --shift-by
, --from-file
, --to-current
. The particular spec of --from-file
is where I also realized that I can just modify the file to set the offset for each topic and partitions. The --export
flag will create this file, however, you can only use it with specifying one of the reset specifications above… which means I can’t generate the export file with the current offsets of where the consumer group finished. Additionally, these older consumer groups are constantly changing what topics they are consuming from. This would get pretty tricky when doing this on a per topic basis. So it’s much easier to just lift up the entire ‘state’ and mirror it into the new group.
Although I could probably figure out a way to bastardize the CLI tools and hack them apart with awk and sed to do this, I believe for this type of problem the best way to achieve something like this is programmatically so we’re being exact in our expectations here and not at the mercy of CLI output that we’re manipulating. We developed our own Kafka library which is a wrapper around the ruby-kafka
gem (soon to be replaced by librdkafka-ruby
in a new library I’m helping to write), which has .seek()
to help point a consumer group to a specific offset for each topic and partition. At first, we wanted to let our developers have control over this process and to be able to deploy these changes themselves by using such a method. Unfortunately, ruby-kafka
has some bugs in the way for us to achieve this perfectly. One such bug I found noted here. While we work through these bugs, I still need to get the main job done for now, and still programmatically.
The Apache Kafka community started to work on a public AdminClient API in 0.11.0 that can be used for admin operations programmatically. I absolutely love this and have been using it for the past year in various ways. The client libraries in various languages such as Ruby and Python are catching on and implementing their own versions of this. Unfortunately it’s not completely finished being developed, so things like resetting offsets like this aren’t supported as of yet.
However, there is a way to list all of the topics / partitions / offsets that a consumer group is currently at. Without “scripting” in Java, I found that the kafka-python
module suites me best for this task. Although I primarily work in a Ruby shop, I adore any opportunity to bust out my Python toolbox since it’s still my favorite language.
Kafka python has the KafkaAdminClient class. With this, therein lies a method list_consumer_group_offsets. This returns a single dictionary, with nested namedtuple’s as the key value pairs. You can see the namedtuple definition here. An example of how the key value pair looks is shown here:
TopicPartition(topic='dezka', partition=0): OffsetAndMetadata(offset=381606, metadata='')
The key being a namedtuple of TopicPartition(topic='dezka', partition=0)
and value being a namedtuple of OffsetAndMetadata(offset=381606, metadata='')
. Very cool. Thankfully, you can iterate over them pretty easily. I found the best way of iterating through this list is by doing something like this:
results = adminClient.list_consumer_group_offsets(GROUP)
for k,v in results.items():
print(k._asdict()['topic'])
print(k._asdict()['partition'])
print(v._asdict()['offset'])
Which would print out as:
dezka
0
381606
Cool, so knowing now how to iterate through this dictionary, I can generate the --from-file
needed in the kafka-consumer-groups
CLI. This file needs to be a csv and look like the following:
dezka,0,381606
dezka,1,381508
dezka,2,377782
And so on… So now taking the above, I can generate it by doing:
delimeter = ','
with open('/tmp/offset_reset.csv', 'w') as f:
for k,v in results.items():
f.write(str(k._asdict()['topic']) + delimeter)
f.write(str(k._asdict()['partition']) + delimeter)
f.write(str(v._asdict()['offset']) + '\n')
There we go. I now have a csv containing every topic, partition, and offset of where the group left off at. I can then take this file and use the CLI tool to “reset” the new group’s offsets to exactly this. Conveniently, as you perform the below, if the new consumer group doesn’t exist yet in Kafka, this tool will create it beforehand for you. Here would be the command to use this file to reset the offsets, which essentially migrates this log position over to the new group.
/usr/bin/kafka-consumer-groups --bootstrap-server <kafka_broker_host>:9092 --group <my_new_consumer_group> --reset-offsets --from-file /tmp/offset_reset.csv --dry-run
This dry run option is nice because it shows you what the end outcome will be based on this file that we generated. Once you’re good with the outcome, you can solidify it by replacing --dry-run
with --execute
in the above command. Afterwhich you can confirm the results by doing:
/usr/bin/kafka-consumer-groups --bootstrap-server <kafka_broker_host>:9092 --group <my_new_consumer_group> --describe
Which will show all of the partitions and positions. Compare this with your old group and you will see everything looks the exact same.
In summary, this was a very fun afternoon problem, I wrote all of this out in a nifty Python program that you can use here. While this currently only solves one use case, and a very unique consumer group management problem, I realize there are other edge cases that can be done here to make this script better which I plan to expand on. Such cases would be “what If I want to only migrate a certain set of topics and not all of the topics that the old consumer group was using?”. Ultimately, I’d prefer to not use the CLI tool at all, and just do the entire process in Python. Once the required features in AdminClient are released to do this, I will definitely be updating all of this to go that route instead. But as of writing those features aren’t there.