Skip to content

Commit 768b31b

Browse files
committed
MINOR: Small cleanups in clients
1 parent 865beb6 commit 768b31b

24 files changed

+59
-57
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.Collection;
2323

2424
/**
25-
* Options for the {@link Admin#deleteShareGroups(Collection <String>, DeleteShareGroupsOptions)} call.
25+
* Options for the {@link Admin#deleteShareGroups(Collection, DeleteShareGroupsOptions)} call.
2626
* <p>
2727
* The API of this class is evolving, see {@link Admin} for details.
2828
*/

clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupsResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.Map;
2626

2727
/**
28-
* The result of the {@link Admin#deleteShareGroups(Collection <String>, DeleteShareGroupsOptions)} call.
28+
* The result of the {@link Admin#deleteShareGroups(Collection, DeleteShareGroupsOptions)} call.
2929
* <p></p>
3030
* The API of this class is evolving, see {@link Admin} for details.
3131
*/

clients/src/main/java/org/apache/kafka/clients/admin/DeleteStreamsGroupsOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.Collection;
2222

2323
/**
24-
* Options for the {@link Admin#deleteStreamsGroups(Collection<String>, DeleteStreamsGroupsOptions)} call.
24+
* Options for the {@link Admin#deleteStreamsGroups(Collection, DeleteStreamsGroupsOptions)} call.
2525
* <p>
2626
* The API of this class is evolving, see {@link Admin} for details.
2727
*/

clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.kafka.common.annotation.InterfaceStability;
2222

2323
import java.util.Collection;
24+
import java.util.Map;
2425

2526
/**
2627
* Specification of streams group offsets to list using {@link Admin#listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)}.

clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ public ApiResult<CoordinatorKey, Map<String, ApiException>> handleResponse(
122122
if (topic.errorCode() != Errors.NONE.code()) {
123123
final Errors topicError = Errors.forCode(topic.errorCode());
124124
final String topicErrorMessage = topic.errorMessage();
125-
log.debug("DeleteShareGroupOffsets request for group id {} and topic {} failed and returned error {}." + topicErrorMessage,
126-
groupId.idValue, topic.topicName(), topicError);
125+
log.debug("DeleteShareGroupOffsets request for group id {} and topic {} failed and returned error {}. {}",
126+
groupId.idValue, topic.topicName(), topicError, topicErrorMessage);
127127
}
128128
topicResults.put(
129129
topic.topicName(),
@@ -147,14 +147,14 @@ private void handleGroupError(
147147
case REBALANCE_IN_PROGRESS:
148148
// If the coordinator is in the middle of loading, then we just need to retry
149149
log.debug("DeleteShareGroupOffsets request for group id {} failed because the coordinator" +
150-
" is still in the process of loading state. Will retry. " + errorMessage, groupId.idValue);
150+
" is still in the process of loading state. Will retry. {}", groupId.idValue, errorMessage);
151151
break;
152152
case COORDINATOR_NOT_AVAILABLE:
153153
case NOT_COORDINATOR:
154154
// If the coordinator is unavailable or there was a coordinator change, then we unmap
155155
// the key so that we retry the `FindCoordinator` request
156-
log.debug("DeleteShareGroupOffsets request for group id {} returned error {}. Will rediscover the coordinator and retry. " + errorMessage,
157-
groupId.idValue, error);
156+
log.debug("DeleteShareGroupOffsets request for group id {} returned error {}. Will rediscover the coordinator and retry. {}",
157+
groupId.idValue, error, errorMessage);
158158
groupsToUnmap.add(groupId);
159159
break;
160160
case INVALID_GROUP_ID:
@@ -164,11 +164,11 @@ private void handleGroupError(
164164
case UNKNOWN_SERVER_ERROR:
165165
case KAFKA_STORAGE_ERROR:
166166
case GROUP_AUTHORIZATION_FAILED:
167-
log.debug("DeleteShareGroupOffsets request for group id {} failed due to error {}. " + errorMessage, groupId.idValue, error);
167+
log.debug("DeleteShareGroupOffsets request for group id {} failed due to error {}. {}", groupId.idValue, error, errorMessage);
168168
failed.put(groupId, error.exception(errorMessage));
169169
break;
170170
default:
171-
log.error("DeleteShareGroupOffsets request for group id {} failed due to unexpected error {}. " + errorMessage, groupId.idValue, error);
171+
log.error("DeleteShareGroupOffsets request for group id {} failed due to unexpected error {}. {}", groupId.idValue, error, errorMessage);
172172
failed.put(groupId, error.exception(errorMessage));
173173
}
174174
}

clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ private void handlePartitionError(
202202
public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
203203
int brokerId, UnsupportedVersionException exception, Set<TopicPartition> keys
204204
) {
205-
log.warn("Broker " + brokerId + " does not support MAX_TIMESTAMP offset specs");
205+
log.warn("Broker {} does not support MAX_TIMESTAMP offset specs", brokerId);
206206
Map<TopicPartition, Throwable> maxTimestampPartitions = new HashMap<>();
207207
for (TopicPartition topicPartition : keys) {
208208
Long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition);

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,7 @@ boolean maybeAbortReconciliation() {
988988
String reason = rejoinedWhileReconciliationInProgress ?
989989
"the member has re-joined the group" :
990990
"the member already transitioned out of the reconciling state into " + state;
991-
log.info("Interrupting reconciliation that is not relevant anymore because " + reason);
991+
log.info("Interrupting reconciliation that is not relevant anymore because {}", reason);
992992
markReconciliationCompleted();
993993
}
994994
return shouldAbort;

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -965,8 +965,8 @@ private class GeneralAssignmentBuilder extends AbstractAssignmentBuilder {
965965
super(partitionsPerTopic, rackInfo, currentAssignment);
966966
this.subscriptions = subscriptions;
967967

968-
topic2AllPotentialConsumers = new HashMap<>(partitionsPerTopic.keySet().size());
969-
consumer2AllPotentialTopics = new HashMap<>(subscriptions.keySet().size());
968+
topic2AllPotentialConsumers = new HashMap<>(partitionsPerTopic.size());
969+
consumer2AllPotentialTopics = new HashMap<>(subscriptions.size());
970970

971971
// initialize topic2AllPotentialConsumers and consumer2AllPotentialTopics
972972
partitionsPerTopic.keySet().forEach(

clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ private void fetchOffsetsWithRetries(final OffsetFetchRequestState fetchRequest,
541541
boolean inflightRemoved = pendingRequests.inflightOffsetFetches.remove(fetchRequest);
542542
if (!inflightRemoved) {
543543
log.warn("A duplicated, inflight, request was identified, but unable to find it in the " +
544-
"outbound buffer:" + fetchRequest);
544+
"outbound buffer: {}", fetchRequest);
545545
}
546546
if (error == null) {
547547
maybeUpdateLastSeenEpochIfNewer(res);

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ private static List<StreamsGroupHeartbeatRequestData.TopicInfo> getChangelogTopi
287287

288288
private final HeartbeatMetricsManager metricsManager;
289289

290-
private StreamsRebalanceData streamsRebalanceData;
290+
private final StreamsRebalanceData streamsRebalanceData;
291291

292292
/**
293293
* Timer for tracking the time since the last consumer poll. If the timer expires, the consumer will stop

0 commit comments

Comments
 (0)