-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Performance improvement for the cooperative sticky assignor #5085
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Performance improvement for the cooperative sticky assignor #5085
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
Uses a map for consumer2AllPotentialPartitions and keeps the assignment sorted in currentAssignment for binary search. Preliminary results with a 35s timeout: **Before:** partitions/members ratio of: - 1.5: max 3160 partitions, 2125 members - 10: max 5253 partitions, 530 members **After:** partitions/members ratio of: - 1.5: max 5460 partitions, 3625 members - 10: max 7503 partitions, 750 members Assignment time increases with a higher partitions/members ratio when removing members from the group, because of the `performReassignments` time, that should be checked for improvements as well. Tested with `ut_testLargeAssignmentWithMultipleConsumersLeaving`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances the cooperative sticky assignor’s performance by replacing lists with maps for potential partitions, keeping assignments sorted for binary search, and fixing an off-by-one in isBalanced.
- introduce
rd_map_cmpandrd_map_destroy_freehelpers in the map implementation - switch
consumer2AllPotentialPartitionsfrom a list to a map and update assignor logic accordingly - add
find_sorted/sort_by_topicAPIs for partition lists and correct theisBalancedloop bounds
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| src/rdmap.h | Added rd_map_cmp and rd_map_destroy_free declarations |
| src/rdmap.c | Implemented rd_map_cmp and rd_map_destroy_free |
| src/rdkafka_sticky_assignor.c | Converted potential-partitions to maps; updated assignment logic |
| src/rdkafka_partition.h | Declared rd_kafka_topic_partition_list_find_sorted |
| src/rdkafka_partition.c | Implemented find_sorted with bsearch |
Comments suppressed due to low confidence (1)
src/rdkafka_sticky_assignor.c:518
- [nitpick] The alias
consumer2AllPotentialPartitions_tand its underlyingmap_str_map_toppar_str_tare verbose and may be hard to read. Consider renaming to something more descriptive likeconsumer_to_partitions_map_t.
typedef map_str_map_toppar_str_t consumer2AllPotentialPartitions_t;
| void *val2 = rd_map_get(rmapb, elem->key); | ||
| if (!val2) | ||
| return 1; |
Copilot
AI
May 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using rd_map_get to check for a missing key conflates absent entries with entries whose value is legitimately NULL. Consider adding a dedicated key-existence check or prohibiting NULL values when using this comparator.
| void *val2 = rd_map_get(rmapb, elem->key); | |
| if (!val2) | |
| return 1; | |
| if (!rd_map_contains(rmapb, elem->key)) | |
| return 1; | |
| void *val2 = rd_map_get(rmapb, elem->key); |
| atopic->metadata->topic, | ||
| atopic->metadata->partitions[j].id); | ||
|
|
||
| RD_MAP_SET(partitions, partition, (void *)1); |
Copilot
AI
May 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Storing (void*)1 as a map value in a map_toppar_str_t (which expects const char *) is a magic pointer workaround. Consider using a dedicated map_toppar_void_t or a boolean flag type to make intent clearer.
| rd_kafka_topic_partition_t *rd_kafka_topic_partition_list_find_topic_by_id( | ||
| const rd_kafka_topic_partition_list_t *rktparlist, | ||
| rd_kafka_Uuid_t topic_id); | ||
|
|
Copilot
AI
May 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The new find_sorted API lacks a header comment. Please add a docstring indicating that the list must be sorted and what the function returns on miss.
| /** | |
| * @brief Find a topic partition in a sorted list by topic name and partition. | |
| * | |
| * The input list must be sorted by topic name and partition. | |
| * | |
| * @param rktparlist The sorted list of topic partitions to search. | |
| * @param topic The name of the topic to find. | |
| * @param partition The partition number to find. | |
| * | |
| * @returns A pointer to the matching topic partition, or NULL if not found. | |
| */ |
|
|
||
| return score; | ||
| } | ||
|
|
Copilot
AI
May 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The behavior of maybeAssign when removeAssigned is true (replacing the list via the double pointer) isn't documented. Add a brief explanation in the comment to clarify this side effect.
| /** | |
| * @brief Assign partitions to consumers and optionally update the list of | |
| * unassigned partitions. | |
| * | |
| * If \p removeAssigned is true, the function replaces the | |
| * \p unassignedPartitions list with a new list containing only the partitions | |
| * that were not assigned. The caller's pointer to the original list is updated | |
| * to point to the new list, and the original list is destroyed. | |
| * | |
| * @param unassignedPartitions Pointer to the list of unassigned partitions. | |
| * @param partition2AllPotentialConsumers Map of partitions to potential consumers. | |
| * @param sortedCurrentSubscriptions List of current subscriptions, sorted. | |
| * @param currentAssignment Map of current assignments. | |
| * @param consumer2AllPotentialPartitions Map of consumers to potential partitions. | |
| * @param currentPartitionConsumer Map of current partition-to-consumer assignments. | |
| * @param removeAssigned Whether to remove assigned partitions from the list. | |
| * @param rkri Rack information. | |
| */ |
4c5ba19 to
13e57a6
Compare
Uses a map for consumer2AllPotentialPartitions and keeps the assignment sorted in currentAssignment for binary search.
Preliminary results with a 35s timeout:
Before:
partitions/members ratio of:
After:
partitions/members ratio of:
Assignment time increases with a higher partitions/members ratio when removing members from the group, because of the
performReassignmentstime, that should be checked for improvements as well.Tested with
ut_testLargeAssignmentWithMultipleConsumersLeavingComparison with range assignor, running the same test: