Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, Shar
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde, ShareCoordinatorService}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataVersionConfigValidator}
import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
Expand Down Expand Up @@ -469,7 +469,10 @@ class BrokerServer(
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, "RequestHandlerAvgIdlePercent")

metadataPublishers.add(new MetadataVersionConfigValidator(config, sharedServer.metadataPublishingFaultHandler))
metadataPublishers.add(new MetadataVersionConfigValidator(config.brokerId,
() => config.processRoles.contains(ProcessRole.BrokerRole) && config.logDirs().size() > 1,
sharedServer.metadataPublishingFaultHandler
))
brokerMetadataPublisher = new BrokerMetadataPublisher(config,
metadataCache,
logManager,
Expand Down
13 changes: 0 additions & 13 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.AbstractKafkaConfig.getMap
import org.apache.kafka.server.config.{AbstractKafkaConfig, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
Expand Down Expand Up @@ -653,18 +652,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
}

/**
* Validate some configurations for new MetadataVersion. A new MetadataVersion can take place when
* a FeatureLevelRecord for "metadata.version" is read from the cluster metadata.
*/
def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = {
if (processRoles.contains(ProcessRole.BrokerRole) && logDirs.size > 1) {
require(metadataVersion.isDirectoryAssignmentSupported,
s"Multiple log directories (aka JBOD) are not supported in the current MetadataVersion ${metadataVersion}. " +
s"Need ${MetadataVersion.IBP_3_7_IV2} or higher")
}
}

/**
* Copy the subset of properties that are relevant to Logs. The individual properties
* are listed here since the names are slightly different in each Config class...
Expand Down

This file was deleted.

18 changes: 0 additions & 18 deletions core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
import org.apache.kafka.common.config.ConfigDef.Type.INT
import org.apache.kafka.common.config.{ConfigException, SslConfigs, TopicConfig}
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test

Expand Down Expand Up @@ -429,21 +428,4 @@ class LogConfigTest {
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable.toString)
LogConfig.validate(logProps)
}

@Test
def testValidateWithMetadataVersionJbodSupport(): Unit = {
def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =
KafkaConfig.fromProps(
TestUtils.createBrokerConfig(nodeId = 0, logDirCount = if (jbodConfig) 2 else 1)
).validateWithMetadataVersion(metadataVersion)

validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = false)
validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = false)
validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = false)
assertThrows(classOf[IllegalArgumentException], () =>
validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = true))
assertThrows(classOf[IllegalArgumentException], () =>
validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = true))
validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package kafka.server;
package org.apache.kafka.metadata;

import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
Expand All @@ -24,18 +24,20 @@
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;

import java.util.function.Supplier;

public class MetadataVersionConfigValidator implements MetadataPublisher {
private final String name;
private final KafkaConfig config;
private final Supplier<Boolean> hasMultiLogDirs;
private final FaultHandler faultHandler;

public MetadataVersionConfigValidator(
KafkaConfig config,
FaultHandler faultHandler
int brokerId,
Supplier<Boolean> hasMultiLogDirs,
FaultHandler faultHandler
) {
int id = config.brokerId();
this.name = "MetadataVersionPublisher(id=" + id + ")";
this.config = config;
this.name = "MetadataVersionPublisher(id=" + brokerId + ")";
this.hasMultiLogDirs = hasMultiLogDirs;
this.faultHandler = faultHandler;
}

Expand All @@ -46,9 +48,9 @@ public String name() {

@Override
public void onMetadataUpdate(
MetadataDelta delta,
MetadataImage newImage,
LoaderManifest manifest
MetadataDelta delta,
MetadataImage newImage,
LoaderManifest manifest
) {
if (delta.featuresDelta() != null) {
if (delta.metadataVersionChanged().isPresent()) {
Expand All @@ -57,13 +59,22 @@ public void onMetadataUpdate(
}
}

/**
* Validate some configurations for the new MetadataVersion. A new MetadataVersion can take place when
* a FeatureLevelRecord for "metadata.version" is read from the cluster metadata.
*/
@SuppressWarnings("ThrowableNotThrown")
private void onMetadataVersionChanged(MetadataVersion metadataVersion) {
try {
this.config.validateWithMetadataVersion(metadataVersion);
} catch (Throwable t) {
if (this.hasMultiLogDirs.get() && !metadataVersion.isDirectoryAssignmentSupported()) {
String errorMsg = String.format(
"Multiple log directories (aka JBOD) are not supported in the current MetadataVersion %s. Need %s or higher",
metadataVersion, MetadataVersion.IBP_3_7_IV2
);

this.faultHandler.handleFault(
"Broker configuration does not support the cluster MetadataVersion", t);
"Broker configuration does not support the cluster MetadataVersion",
new IllegalArgumentException(errorMsg)
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.metadata;

import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;

import org.junit.jupiter.api.Test;

import java.util.function.Supplier;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@SuppressWarnings({"unchecked", "ThrowableNotThrown"})
public class MetadataVersionConfigValidatorTest {

private static final LogDeltaManifest TEST_MANIFEST = LogDeltaManifest.newBuilder()
.provenance(MetadataProvenance.EMPTY)
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
.numBatches(1)
.elapsedNs(90)
.numBytes(88)
.build();
public static final MetadataProvenance TEST_PROVENANCE =
new MetadataProvenance(50, 3, 8000, true);

void executeMetadataUpdate(
MetadataVersion metadataVersion,
Supplier<Boolean> multiLogDirSupplier,
FaultHandler faultHandler
) throws Exception {
try (MetadataVersionConfigValidator validator = new MetadataVersionConfigValidator(0, multiLogDirSupplier, faultHandler)) {
MetadataDelta delta = new MetadataDelta.Builder()
.setImage(MetadataImage.EMPTY)
.build();
if (metadataVersion != null) {
delta.replay(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(metadataVersion.featureLevel()));
}
MetadataImage image = delta.apply(TEST_PROVENANCE);

validator.onMetadataUpdate(delta, image, TEST_MANIFEST);
}
}

@Test
void testValidatesConfigOnMetadataChange() throws Exception {
MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
FaultHandler faultHandler = mock(FaultHandler.class);
Supplier<Boolean> multiLogDirSupplier = mock(Supplier.class);
when(multiLogDirSupplier.get()).thenReturn(false);

executeMetadataUpdate(metadataVersion, multiLogDirSupplier, faultHandler);

verify(multiLogDirSupplier, times(1)).get();
verifyNoMoreInteractions(faultHandler);
}

@Test
void testInvokesFaultHandlerOnException() throws Exception {
MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV1;
Supplier<Boolean> multiLogDirSupplier = mock(Supplier.class);
FaultHandler faultHandler = mock(FaultHandler.class);

when(multiLogDirSupplier.get()).thenReturn(true);

executeMetadataUpdate(metadataVersion, multiLogDirSupplier, faultHandler);

verify(multiLogDirSupplier, times(1)).get();
verify(faultHandler, times(1)).handleFault(
eq("Broker configuration does not support the cluster MetadataVersion"),
any(IllegalArgumentException.class));
}

@Test
void testValidateWithMetadataVersionJbodSupport() throws Exception {
FaultHandler faultHandler = mock(FaultHandler.class);
validate(MetadataVersion.IBP_3_6_IV2, false, faultHandler);
verifyNoMoreInteractions(faultHandler);

faultHandler = mock(FaultHandler.class);
validate(MetadataVersion.IBP_3_7_IV0, false, faultHandler);
verifyNoMoreInteractions(faultHandler);

faultHandler = mock(FaultHandler.class);
validate(MetadataVersion.IBP_3_7_IV2, false, faultHandler);
verifyNoMoreInteractions(faultHandler);

faultHandler = mock(FaultHandler.class);
validate(MetadataVersion.IBP_3_6_IV2, true, faultHandler);
verify(faultHandler, times(1)).handleFault(
eq("Broker configuration does not support the cluster MetadataVersion"),
any(IllegalArgumentException.class));

faultHandler = mock(FaultHandler.class);
validate(MetadataVersion.IBP_3_7_IV0, true, faultHandler);
verify(faultHandler, times(1)).handleFault(
eq("Broker configuration does not support the cluster MetadataVersion"),
any(IllegalArgumentException.class));

faultHandler = mock(FaultHandler.class);
validate(MetadataVersion.IBP_3_7_IV2, true, faultHandler);
verifyNoMoreInteractions(faultHandler);
}

private void validate(MetadataVersion metadataVersion, boolean jbodConfig, FaultHandler faultHandler)
throws Exception {
Supplier<Boolean> multiLogDirSupplier = () -> jbodConfig;

executeMetadataUpdate(metadataVersion, multiLogDirSupplier, faultHandler);
}
}