From f5dd661cb5c90d91f2d88f26643e353b2fe67fb6 Mon Sep 17 00:00:00 2001 From: TengYao Chi Date: Tue, 11 Feb 2025 15:55:23 +0800 Subject: [PATCH] KAFKA-18396: Migrate log4j1 configuration to log4j2 in KafkaDockerWrapper (#18394) After log4j migration, we need to update the logging configuration in KafkaDockerWrapper from log4j1 to log4j2. Reviewers: Manikumar Reddy --- build.gradle | 1 + checkstyle/import-control-core.xml | 8 + .../java/kafka/docker/Log4jConfiguration.java | 221 ++++++++++++ .../kafka/docker/KafkaDockerWrapper.scala | 158 ++++++--- .../kafka/docker/KafkaDockerWrapperTest.scala | 318 +++++++++++++----- .../isolated/plaintext/docker-compose.yml | 3 - .../cluster/isolated/ssl/docker-compose.yml | 3 - .../fixtures/mode/isolated/docker-compose.yml | 3 - 8 files changed, 578 insertions(+), 137 deletions(-) create mode 100644 core/src/main/java/kafka/docker/Log4jConfiguration.java diff --git a/build.gradle b/build.gradle index da6f2259c4bcc..8d82b6f02ed79 100644 --- a/build.gradle +++ b/build.gradle @@ -1075,6 +1075,7 @@ project(':core') { implementation libs.jacksonModuleScala implementation libs.jacksonDataformatCsv implementation libs.jacksonJDK8Datatypes + implementation libs.jacksonDatabindYaml implementation libs.joptSimple implementation libs.jose4j implementation libs.metrics diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index 43fd2d3ff2fdf..e2f8aaf743587 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -74,6 +74,14 @@ + + + + + + + + diff --git a/core/src/main/java/kafka/docker/Log4jConfiguration.java b/core/src/main/java/kafka/docker/Log4jConfiguration.java new file mode 100644 index 0000000000000..45b06760066f2 --- /dev/null +++ b/core/src/main/java/kafka/docker/Log4jConfiguration.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.docker; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class Log4jConfiguration { + private Configuration configuration; + + @JsonProperty("Configuration") + public Configuration getConfiguration() { + return configuration; + } + + public void setConfiguration(Configuration configuration) { + this.configuration = configuration; + } +} + +@JsonPropertyOrder({ "Properties", "Appenders", "Loggers" }) +@JsonIgnoreProperties(ignoreUnknown = true) +class Configuration { + private Properties properties; + private Appenders appenders; + private Loggers loggers; + private final Map additionalProperties = new LinkedHashMap<>(); + + @JsonProperty("Properties") + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } + + @JsonProperty("Appenders") + public Appenders getAppenders() { + return appenders; + } + + public void setAppenders(Appenders appenders) { + this.appenders = appenders; + } + + @JsonProperty("Loggers") + public Loggers getLoggers() { + return loggers; + } + + public void setLoggers(Loggers loggers) { + this.loggers = loggers; + } + + @JsonAnyGetter + public Map getAdditionalProperties() { + return additionalProperties; + } + + @JsonAnySetter + public void setAdditionalProperties(String key, Object value) { + additionalProperties.put(key, value); + } +} + +@JsonIgnoreProperties(ignoreUnknown = true) +class Properties { + private final Map properties = new LinkedHashMap<>(); + + @JsonAnyGetter + public Map getProperties() { + return properties; + } + + @JsonAnySetter + public void setProperties(String key, Object value) { + properties.put(key, value); + } +} + +@JsonIgnoreProperties(ignoreUnknown = true) +class Appenders { + private final Map properties = new LinkedHashMap<>(); + + @JsonAnyGetter + public Map getProperties() { + return properties; + } + + @JsonAnySetter + public void setProperties(String key, Object value) { + properties.put(key, value); + } +} + +@JsonIgnoreProperties(ignoreUnknown = true) +class Loggers { + private Root root; + private List logger = Collections.emptyList(); + + @JsonProperty("Root") + public Root getRoot() { + return root; + } + + public void setRoot(Root root) { + this.root = root; + } + + @JsonProperty("Logger") + public List getLogger() { + return logger; + } + + public void setLogger(List logger) { + this.logger = logger; + } +} + +@JsonIgnoreProperties(ignoreUnknown = true) +class Root { + private String level; + private final Map otherProperties = new LinkedHashMap<>(); + + public String getLevel() { + return level; + } + + public void setLevel(String level) { + this.level = level; + } + + @JsonAnyGetter + public Map getOtherProperties() { + return otherProperties; + } + + @JsonAnySetter + public void setOtherProperties(String key, Object value) { + otherProperties.put(key, value); + } +} + +@JsonPropertyOrder({ "name", "level" }) +@JsonIgnoreProperties(ignoreUnknown = true) +class Logger { + private String name; + private String level; + private final Map otherProperties = new LinkedHashMap<>(); + + @JsonProperty("name") + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @JsonProperty("level") + public String getLevel() { + return level; + } + + public void setLevel(String level) { + this.level = level; + } + + @JsonAnyGetter + public Map getOtherProperties() { + return otherProperties; + } + + @JsonAnySetter + public void setOtherProperties(String key, Object value) { + otherProperties.put(key, value); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + + Logger logger = (Logger) o; + return Objects.equals(name, logger.name) && + Objects.equals(level, logger.level) && + Objects.equals(otherProperties, logger.otherProperties); + } + + @Override + public int hashCode() { + int result = Objects.hashCode(name); + result = 31 * result + Objects.hashCode(level); + result = 31 * result + Objects.hashCode(otherProperties); + return result; + } +} diff --git a/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala b/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala index 2c0238787f8ea..c0b586d1bf6c7 100644 --- a/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala +++ b/core/src/main/scala/kafka/docker/KafkaDockerWrapper.scala @@ -16,6 +16,11 @@ */ package kafka.docker +import com.fasterxml.jackson.annotation.JsonInclude +import com.fasterxml.jackson.databind.exc.MismatchedInputException +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature import kafka.Kafka import kafka.tools.StorageTool import kafka.utils.Logging @@ -26,6 +31,7 @@ import org.apache.kafka.common.utils.Exit import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path, Paths, StandardCopyOption, StandardOpenOption} +import scala.jdk.CollectionConverters._ object KafkaDockerWrapper extends Logging { def main(args: Array[String]): Unit = { @@ -79,7 +85,7 @@ object KafkaDockerWrapper extends Logging { required(true). help( """Directory which holds default properties. It should contain the three file:- - |server.properties, log4j.properties and tools-log4j.properties. + |server.properties, log4j2.yaml and tools-log4j2.yaml. |""".stripMargin) setupParser.addArgument("--mounted-configs-dir", "-M"). @@ -87,7 +93,7 @@ object KafkaDockerWrapper extends Logging { required(true). help( """Directory which holds user mounted properties. It can contain none to all the three files:- - |server.properties, log4j.properties and tools-log4j.properties.""".stripMargin) + |server.properties, log4j2.yaml and tools-log4j2.yaml.""".stripMargin) setupParser.addArgument("--final-configs-dir", "-F"). action(store()). @@ -109,8 +115,8 @@ object KafkaDockerWrapper extends Logging { private def prepareConfigs(defaultConfigsPath: Path, mountedConfigsPath: Path, finalConfigsPath: Path): Unit = { prepareServerConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) - prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) - prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) + prepareLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) + prepareToolsLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) } private[docker] def prepareServerConfigs(defaultConfigsPath: Path, @@ -137,36 +143,35 @@ object KafkaDockerWrapper extends Logging { } } - private[docker] def prepareLog4jConfigs(defaultConfigsPath: Path, - mountedConfigsPath: Path, - finalConfigsPath: Path, - env: Map[String, String]): Unit = { - val propsToAdd = getLog4jConfigsFromEnv(env) + private[docker] def prepareLog4j2Configs(defaultConfigsPath: Path, + mountedConfigsPath: Path, + finalConfigsPath: Path, + env: Map[String, String]): Unit = { + val loggerFromEnv = getLog4j2ConfigsFromEnv(env) + val rootOption = getLog4j2RootConfigsFromEnv(env) - val defaultFilePath = defaultConfigsPath.resolve(s"$Log4jPropsFilename") - val mountedFilePath = mountedConfigsPath.resolve(s"$Log4jPropsFilename") - val finalFilePath = finalConfigsPath.resolve(s"$Log4jPropsFilename") + val defaultFilePath = defaultConfigsPath.resolve(s"$Log4j2PropsFilename") + val mountedFilePath = mountedConfigsPath.resolve(s"$Log4j2PropsFilename") + val finalFilePath = finalConfigsPath.resolve(s"$Log4j2PropsFilename") copyFile(defaultFilePath, finalFilePath) copyFile(mountedFilePath, finalFilePath) - addToFile(propsToAdd, finalFilePath, StandardOpenOption.APPEND) + addToYaml(loggerFromEnv, rootOption, finalFilePath) } - private[docker] def prepareToolsLog4jConfigs(defaultConfigsPath: Path, - mountedConfigsPath: Path, - finalConfigsPath: Path, - env: Map[String, String]): Unit = { - val propToAdd = getToolsLog4jConfigsFromEnv(env) - - val defaultFilePath = defaultConfigsPath.resolve(s"$ToolsLog4jFilename") - val mountedFilePath = mountedConfigsPath.resolve(s"$ToolsLog4jFilename") - val finalFilePath = finalConfigsPath.resolve(s"$ToolsLog4jFilename") + private[docker] def prepareToolsLog4j2Configs(defaultConfigsPath: Path, + mountedConfigsPath: Path, + finalConfigsPath: Path, + env: Map[String, String]): Unit = { + val defaultFilePath = defaultConfigsPath.resolve(s"$ToolsLog4j2Filename") + val mountedFilePath = mountedConfigsPath.resolve(s"$ToolsLog4j2Filename") + val finalFilePath = finalConfigsPath.resolve(s"$ToolsLog4j2Filename") copyFile(defaultFilePath, finalFilePath) copyFile(mountedFilePath, finalFilePath) - addToFile(propToAdd, finalFilePath, StandardOpenOption.APPEND) + addToYaml(Array.empty, getToolsLog4j2ConfigsFromEnv(env), finalFilePath) } private[docker] def getServerConfigsFromEnv(env: Map[String, String]): List[String] = { @@ -186,29 +191,36 @@ object KafkaDockerWrapper extends Logging { .filterNot(_.trim.isEmpty) } - private[docker] def getLog4jConfigsFromEnv(env: Map[String, String]): String = { - val kafkaLog4jRootLogLevelProp = env.get(KafkaLog4jRootLoglevelEnv) + private[docker] def getLog4j2RootConfigsFromEnv(env: Map[String, String]): Option[Root] = { + env.get(KafkaLog4jRootLoglevelEnv) .filter(_.nonEmpty) - .map(kafkaLog4jRootLogLevel => s"log4j.rootLogger=$kafkaLog4jRootLogLevel, stdout") - .getOrElse("") + .map(buildRootLogger).getOrElse(Option.empty) + } - val kafkaLog4jLoggersProp = env.get(KafkaLog4JLoggersEnv) + private[docker] def getToolsLog4j2ConfigsFromEnv(env: Map[String, String]): Option[Root] = { + env.get(KafkaToolsLog4jLoglevelEnv) .filter(_.nonEmpty) - .map { - kafkaLog4JLoggersString => - kafkaLog4JLoggersString.split(",") - .map(kafkaLog4JLogger => s"log4j.logger.$kafkaLog4JLogger") - .mkString(NewlineChar) - }.getOrElse("") - - addNewlinePadding(kafkaLog4jRootLogLevelProp) + addNewlinePadding(kafkaLog4jLoggersProp) + .map(buildRootLogger).getOrElse(Option.empty) } - private[docker] def getToolsLog4jConfigsFromEnv(env: Map[String, String]): String = { - env.get(KafkaToolsLog4jLoglevelEnv) + private def buildRootLogger(level: String) = { + val root = new Root + root.setLevel(level) + Option.apply(root) + } + + private[docker] def getLog4j2ConfigsFromEnv(env: Map[String, String]): Array[Logger] = { + env.get(KafkaLog4JLoggersEnv) .filter(_.nonEmpty) - .map(kafkaToolsLog4jLogLevel => addNewlinePadding(s"log4j.rootLogger=$kafkaToolsLog4jLogLevel, stderr")) - .getOrElse("") + .map { loggersString => + loggersString.split(",").map { e => + val parts = e.split("=") + val logger = new Logger() + logger.setName(parts(0).trim) + logger.setLevel(parts(1).trim) + logger + } + }.getOrElse(Array.empty[Logger]) } private def addToFile(properties: String, filepath: Path, mode: StandardOpenOption): Unit = { @@ -219,6 +231,68 @@ object KafkaDockerWrapper extends Logging { Files.write(filepath, properties.getBytes(StandardCharsets.UTF_8), mode) } + private def addToYaml(loggerFromEnv: Array[Logger], rootOption: Option[Root], filepath: Path): Unit = { + val path = filepath + if (!Files.exists(path)) { + Files.createFile(path) + } + + val mapper = new ObjectMapper(new YAMLFactory().disable(Feature.WRITE_DOC_START_MARKER)) + .configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true) + .setSerializationInclusion(JsonInclude.Include.NON_EMPTY) + .findAndRegisterModules(); + + val yaml = try { + mapper.readValue(filepath.toFile, classOf[Log4jConfiguration]) + } catch { + case _: MismatchedInputException => new Log4jConfiguration + case e: RuntimeException => throw e + } + val config = yaml.getConfiguration + + if (config == null && loggerFromEnv.isEmpty && rootOption.isEmpty) { + return + } + + if (config == null) { + generateDefaultLog4jConfig(loggerFromEnv, rootOption, filepath, mapper) + } else { + overrideLog4jConfigByEnv(loggerFromEnv, rootOption, filepath, mapper, yaml, config) + } + } + + private def generateDefaultLog4jConfig(loggerFromEnv: Array[Logger], rootOption: Option[Root], filepath: Path, mapper: ObjectMapper): Unit = { + val log4jYaml = new Log4jConfiguration + val configuration = new Configuration + val loggers = new Loggers + val root = if (rootOption.isEmpty) { + val root = new Root + // log4j default root logger level + root.setLevel("ERROR") + root + } else rootOption.get + log4jYaml.setConfiguration(configuration) + configuration.setLoggers(loggers) + loggers.setRoot(root) + loggers.setLogger(loggerFromEnv.toList.asJava) + Files.write(filepath, mapper.writeValueAsString(log4jYaml).getBytes(StandardCharsets.UTF_8), StandardOpenOption.TRUNCATE_EXISTING) + } + + private def overrideLog4jConfigByEnv(loggerFromEnv: Array[Logger], + rootOption: Option[Root], + filepath: Path, + mapper: ObjectMapper, + yaml: Log4jConfiguration, + config: Configuration): Unit = { + val nameToLoggers = config.getLoggers.getLogger.asScala.map(logger => (logger.getName, logger)).to(collection.mutable.Map) + loggerFromEnv.foreach(logger => nameToLoggers.put(logger.getName, logger)) + config.getLoggers.setLogger(nameToLoggers.values.toList.asJava) + if (rootOption.isDefined) { + config.getLoggers.setRoot(rootOption.get) + } + Files.write(filepath, mapper.writeValueAsString(yaml).getBytes(StandardCharsets.UTF_8), StandardOpenOption.TRUNCATE_EXISTING) + } + private def copyFile(source: Path, destination: Path) = { if (Files.exists(source)) { Files.copy(source, destination, StandardCopyOption.REPLACE_EXISTING) @@ -238,8 +312,8 @@ object KafkaDockerWrapper extends Logging { private object Constants { val ServerPropsFilename = "server.properties" - val Log4jPropsFilename = "log4j.properties" - val ToolsLog4jFilename = "tools-log4j.properties" + val Log4j2PropsFilename = "log4j2.yaml" + val ToolsLog4j2Filename = "tools-log4j2.yaml" val KafkaLog4JLoggersEnv = "KAFKA_LOG4J_LOGGERS" val KafkaLog4jRootLoglevelEnv = "KAFKA_LOG4J_ROOT_LOGLEVEL" val KafkaToolsLog4jLoglevelEnv = "KAFKA_TOOLS_LOG4J_LOGLEVEL" diff --git a/core/src/test/scala/unit/kafka/docker/KafkaDockerWrapperTest.scala b/core/src/test/scala/unit/kafka/docker/KafkaDockerWrapperTest.scala index d01644de1fd5e..81dcad01a9938 100644 --- a/core/src/test/scala/unit/kafka/docker/KafkaDockerWrapperTest.scala +++ b/core/src/test/scala/unit/kafka/docker/KafkaDockerWrapperTest.scala @@ -16,7 +16,7 @@ */ package kafka.docker -import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertThrows} +import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Test import java.nio.charset.StandardCharsets @@ -150,31 +150,35 @@ class KafkaDockerWrapperTest { "KAFKA_LOG4J_ROOT_LOGLEVEL" -> "ERROR", "SOME_VARIABLE" -> "Some Value" ) - val expected = "\n" + "log4j.rootLogger=ERROR, stdout" + "\n" + - "log4j.logger.kafka=INFO" + "\n" + - "log4j.logger.kafka.network.RequestChannel$=WARN" + "\n" + - "log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG" - val actual = KafkaDockerWrapper.getLog4jConfigsFromEnv(envVars) - assertEquals(expected, actual) + val kafkaLogger = new Logger + kafkaLogger.setName("kafka") + kafkaLogger.setLevel("INFO") + + val requestChannelLogger = new Logger + requestChannelLogger.setName("kafka.network.RequestChannel$") + requestChannelLogger.setLevel("WARN") + + val defaultEventHandlerLogger = new Logger + defaultEventHandlerLogger.setName("kafka.producer.async.DefaultEventHandler") + defaultEventHandlerLogger.setLevel("DEBUG") + + val actual = KafkaDockerWrapper.getLog4j2ConfigsFromEnv(envVars) + assertEquals(List.apply(kafkaLogger, requestChannelLogger, defaultEventHandlerLogger), actual.toList) } @Test def testGetLog4jConfigsFromEnvInvalidEnvVariable(): Unit = { val envVars = Map("SOME_VARIABLE" -> "Some Value") - val expected = "" - - val actual = KafkaDockerWrapper.getLog4jConfigsFromEnv(envVars) - assertEquals(expected, actual) + val actual = KafkaDockerWrapper.getLog4j2ConfigsFromEnv(envVars) + assertTrue(actual.isEmpty) } @Test def testGetLog4jConfigsFromEnvWithEmptyEnvVariable(): Unit = { val envVars = Map("SOME_VARIABLE" -> "Some Value", "KAFKA_LOG4J_LOGGERS" -> "", "KAFKA_LOG4J_ROOT_LOGLEVEL" -> "") - val expected = "" - - val actual = KafkaDockerWrapper.getLog4jConfigsFromEnv(envVars) - assertEquals(expected, actual) + val actual = KafkaDockerWrapper.getLog4j2ConfigsFromEnv(envVars) + assertTrue(actual.isEmpty) } @Test @@ -187,18 +191,26 @@ class KafkaDockerWrapperTest { "SOME_VARIABLE" -> "Some Value" ) - Files.write(defaultConfigsPath.resolve("log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - Files.write(mountedConfigsPath.resolve("log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - Files.write(finalConfigsPath.resolve("log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - - KafkaDockerWrapper.prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) - - val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j.properties") - val actual = try source.mkString finally source.close() - val expected = "mounted.config=mounted value" + "\n" + "log4j.rootLogger=ERROR, stdout" + "\n" + - "log4j.logger.kafka=INFO" + "\n" + - "log4j.logger.kafka.network.RequestChannel$=WARN" + "\n" + - "log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG" + Files.write(defaultConfigsPath.resolve("log4j2.yaml"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + Files.write(mountedConfigsPath.resolve("log4j2.yaml"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + Files.write(finalConfigsPath.resolve("log4j2.yaml"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + + KafkaDockerWrapper.prepareLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) + + val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j2.yaml") + val actual = try source.mkString.trim finally source.close() + val expected = + """Configuration: + | Loggers: + | Root: + | level: "ERROR" + | Logger: + | - name: "kafka" + | level: "INFO" + | - name: "kafka.network.RequestChannel$" + | level: "WARN" + | - name: "kafka.producer.async.DefaultEventHandler" + | level: "DEBUG"""".stripMargin assertEquals(expected, actual) } @@ -213,17 +225,47 @@ class KafkaDockerWrapperTest { "SOME_VARIABLE" -> "Some Value" ) - Files.write(defaultConfigsPath.resolve("log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - Files.write(finalConfigsPath.resolve("log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - - KafkaDockerWrapper.prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) - - val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j.properties") - val actual = try source.mkString finally source.close() - val expected = "default.config=default value" + "\n" + "log4j.rootLogger=ERROR, stdout" + "\n" + - "log4j.logger.kafka=INFO" + "\n" + - "log4j.logger.kafka.network.RequestChannel$=WARN" + "\n" + - "log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG" + val default = + """Configuration: + | Appenders: + | Console: + | name: "Console" + | target: "SYSTEM_OUT" + | PatternLayout: + | pattern: "123" + | Loggers: + | Root: + | level: "123" + | Logger: + | - name: kafka + | level: 123""".stripMargin + + Files.write(defaultConfigsPath.resolve("log4j2.yaml"), default.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + Files.write(finalConfigsPath.resolve("log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + + KafkaDockerWrapper.prepareLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) + + val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j2.yaml") + val actual = try source.mkString.trim finally source.close() + + val expected = + """Configuration: + | Appenders: + | Console: + | name: "Console" + | target: "SYSTEM_OUT" + | PatternLayout: + | pattern: "123" + | Loggers: + | Root: + | level: "ERROR" + | Logger: + | - name: "kafka.network.RequestChannel$" + | level: "WARN" + | - name: "kafka.producer.async.DefaultEventHandler" + | level: "DEBUG" + | - name: "kafka" + | level: "INFO"""".stripMargin assertEquals(expected, actual) } @@ -234,41 +276,69 @@ class KafkaDockerWrapperTest { val envVars = Map.empty[String, String] - Files.write(defaultConfigsPath.resolve("log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - Files.write(mountedConfigsPath.resolve("log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - Files.write(finalConfigsPath.resolve("log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - - KafkaDockerWrapper.prepareLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) - - val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j.properties") - val actual = try source.mkString finally source.close() - val expected = "mounted.config=mounted value" - - assertEquals(expected, actual) + val default = + """ + |Configuration: + | Appenders: + | Console: + | name: "Console" + | target: "SYSTEM_OUT" + | PatternLayout: + | pattern: "123" + | Loggers: + | Root: + | level: "123" + | Logger: + | - name: kafka + | level: 123""".stripMargin + + val mounted = + """Configuration: + | Appenders: + | Console: + | name: "Console" + | target: "SYSTEM_OUT" + | PatternLayout: + | pattern: "[%d] %p %m (%c)%n" + | Loggers: + | Root: + | level: "ERROR" + | Logger: + | - name: "kafka" + | level: "DEBUG"""".stripMargin + + Files.write(defaultConfigsPath.resolve("log4j2.yaml"), default.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + Files.write(mountedConfigsPath.resolve("log4j2.yaml"), mounted.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + Files.write(finalConfigsPath.resolve("log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + + KafkaDockerWrapper.prepareLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) + + val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/log4j2.yaml") + val actual = try source.mkString.trim finally source.close() + + assertEquals(mounted, actual) } @Test def testGetToolsLog4jConfigsFromEnv(): Unit = { val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE", "SOME_VARIABLE" -> "Some Value") - val expected = "\n" + "log4j.rootLogger=TRACE, stderr" - val actual = KafkaDockerWrapper.getToolsLog4jConfigsFromEnv(envVars) - assertEquals(expected, actual) + val actual = KafkaDockerWrapper.getToolsLog4j2ConfigsFromEnv(envVars) + assertTrue(actual.isDefined) + assertEquals(actual.get.getLevel, "TRACE") } @Test def testGetToolsLog4jConfigsFromEnvInvalidEnvVariable(): Unit = { val envVars = Map("SOME_VARIABLE" -> "Some Value") - val expected = "" - val actual = KafkaDockerWrapper.getToolsLog4jConfigsFromEnv(envVars) - assertEquals(expected, actual) + val actual = KafkaDockerWrapper.getToolsLog4j2ConfigsFromEnv(envVars) + assertTrue(actual.isEmpty) } @Test def testGetToolsLog4jConfigsFromEnvWithEmptyEnvVariable(): Unit = { val envVars = Map("SOME_VARIABLE" -> "Some Value", "KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "") - val expected = "" - val actual = KafkaDockerWrapper.getToolsLog4jConfigsFromEnv(envVars) - assertEquals(expected, actual) + val actual = KafkaDockerWrapper.getToolsLog4j2ConfigsFromEnv(envVars) + assertTrue(actual.isEmpty) } @Test @@ -276,16 +346,59 @@ class KafkaDockerWrapperTest { val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs() val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE") - - Files.write(defaultConfigsPath.resolve("tools-log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - Files.write(mountedConfigsPath.resolve("tools-log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - Files.write(finalConfigsPath.resolve("tools-log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - - KafkaDockerWrapper.prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) - - val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j.properties") - val actual = try source.mkString finally source.close() - val expected = "mounted.config=mounted value" + "\n" + "log4j.rootLogger=TRACE, stderr" + val default = + """ + |Configuration: + | Appenders: + | Console: + | name: "Console" + | target: "SYSTEM_OUT" + | PatternLayout: + | pattern: "123" + | Loggers: + | Root: + | level: "123" + | Logger: + | - name: kafka + | level: 123""".stripMargin + + val mounted = + """Configuration: + | Appenders: + | Console: + | name: "Console" + | target: "SYSTEM_OUT" + | PatternLayout: + | pattern: "[%d] %p %m (%c)%n" + | Loggers: + | Root: + | level: "ERROR" + | Logger: + | - name: "kafka" + | level: "DEBUG"""".stripMargin + + Files.write(defaultConfigsPath.resolve("tools-log4j2.yaml"), default.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + Files.write(mountedConfigsPath.resolve("tools-log4j2.yaml"), mounted.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + Files.write(finalConfigsPath.resolve("tools-log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + + KafkaDockerWrapper.prepareToolsLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) + + val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j2.yaml") + val actual = try source.mkString.trim finally source.close() + val expected = + """Configuration: + | Appenders: + | Console: + | name: "Console" + | target: "SYSTEM_OUT" + | PatternLayout: + | pattern: "[%d] %p %m (%c)%n" + | Loggers: + | Root: + | level: "TRACE" + | Logger: + | - name: "kafka" + | level: "DEBUG"""".stripMargin assertEquals(expected, actual) } @@ -296,14 +409,18 @@ class KafkaDockerWrapperTest { val envVars = Map("KAFKA_TOOLS_LOG4J_LOGLEVEL" -> "TRACE") - Files.write(defaultConfigsPath.resolve("tools-log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - Files.write(finalConfigsPath.resolve("tools-log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + Files.write(defaultConfigsPath.resolve("tools-log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + Files.write(finalConfigsPath.resolve("tools-log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - KafkaDockerWrapper.prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) + KafkaDockerWrapper.prepareToolsLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) - val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j.properties") - val actual = try source.mkString finally source.close() - val expected = "default.config=default value" + "\n" + "log4j.rootLogger=TRACE, stderr" + val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j2.yaml") + val actual = try source.mkString.trim finally source.close() + val expected = + """Configuration: + | Loggers: + | Root: + | level: "TRACE"""".stripMargin assertEquals(expected, actual) } @@ -313,18 +430,47 @@ class KafkaDockerWrapperTest { val (defaultConfigsPath, mountedConfigsPath, finalConfigsPath) = createDirs() val envVars = Map.empty[String, String] - - Files.write(defaultConfigsPath.resolve("tools-log4j.properties"), "default.config=default value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - Files.write(mountedConfigsPath.resolve("tools-log4j.properties"), "mounted.config=mounted value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - Files.write(finalConfigsPath.resolve("tools-log4j.properties"), "existing.config=existing value".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() - - KafkaDockerWrapper.prepareToolsLog4jConfigs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) - - val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j.properties") - val actual = try source.mkString finally source.close() - val expected = "mounted.config=mounted value" - - assertEquals(expected, actual) + val default = + """ + |Configuration: + | Appenders: + | Console: + | name: "Console" + | target: "SYSTEM_OUT" + | PatternLayout: + | pattern: "123" + | Loggers: + | Root: + | level: "123" + | Logger: + | - name: kafka + | level: 123""".stripMargin + + val mounted = + """Configuration: + | Appenders: + | Console: + | name: "Console" + | target: "SYSTEM_OUT" + | PatternLayout: + | pattern: "[%d] %p %m (%c)%n" + | Loggers: + | Root: + | level: "ERROR" + | Logger: + | - name: "kafka" + | level: "DEBUG"""".stripMargin + + Files.write(defaultConfigsPath.resolve("tools-log4j2.yaml"), default.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + Files.write(mountedConfigsPath.resolve("tools-log4j2.yaml"), mounted.getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + Files.write(finalConfigsPath.resolve("tools-log4j2.yaml"), "".getBytes(StandardCharsets.UTF_8)).toFile.deleteOnExit() + + KafkaDockerWrapper.prepareToolsLog4j2Configs(defaultConfigsPath, mountedConfigsPath, finalConfigsPath, envVars) + + val source = scala.io.Source.fromFile(finalConfigsPath.toString + "/tools-log4j2.yaml") + val actual = try source.mkString.trim finally source.close() + + assertEquals(mounted, actual) } private def createDirs(): (Path, Path, Path) = { diff --git a/docker/examples/docker-compose-files/cluster/isolated/plaintext/docker-compose.yml b/docker/examples/docker-compose-files/cluster/isolated/plaintext/docker-compose.yml index edf71567ebe50..54ecc00531a05 100644 --- a/docker/examples/docker-compose-files/cluster/isolated/plaintext/docker-compose.yml +++ b/docker/examples/docker-compose-files/cluster/isolated/plaintext/docker-compose.yml @@ -22,7 +22,6 @@ services: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: 'controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LISTENERS: 'CONTROLLER://:9093' CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' @@ -38,7 +37,6 @@ services: KAFKA_NODE_ID: 2 KAFKA_PROCESS_ROLES: 'controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LISTENERS: 'CONTROLLER://:9093' CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' @@ -54,7 +52,6 @@ services: KAFKA_NODE_ID: 3 KAFKA_PROCESS_ROLES: 'controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:9093,2@controller-2:9093,3@controller-3:9093' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LISTENERS: 'CONTROLLER://:9093' CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' diff --git a/docker/examples/docker-compose-files/cluster/isolated/ssl/docker-compose.yml b/docker/examples/docker-compose-files/cluster/isolated/ssl/docker-compose.yml index e3590508fef15..f4b1ddb382c59 100644 --- a/docker/examples/docker-compose-files/cluster/isolated/ssl/docker-compose.yml +++ b/docker/examples/docker-compose-files/cluster/isolated/ssl/docker-compose.yml @@ -22,7 +22,6 @@ services: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: 'controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:29092,2@controller-2:29092,3@controller-3:29092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LISTENERS: 'CONTROLLER://:29092' CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' @@ -38,7 +37,6 @@ services: KAFKA_NODE_ID: 2 KAFKA_PROCESS_ROLES: 'controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:29092,2@controller-2:29092,3@controller-3:29092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LISTENERS: 'CONTROLLER://:29092' CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' @@ -54,7 +52,6 @@ services: KAFKA_NODE_ID: 3 KAFKA_PROCESS_ROLES: 'controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller-1:29092,2@controller-2:29092,3@controller-3:29092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LISTENERS: 'CONTROLLER://:29092' CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' diff --git a/docker/test/fixtures/mode/isolated/docker-compose.yml b/docker/test/fixtures/mode/isolated/docker-compose.yml index 5e27cbe560747..0dcf8d2f876f0 100644 --- a/docker/test/fixtures/mode/isolated/docker-compose.yml +++ b/docker/test/fixtures/mode/isolated/docker-compose.yml @@ -30,7 +30,6 @@ services: KAFKA_PROCESS_ROLES: 'controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:19092,2@controller2:19092,3@controller3:19092' KAFKA_LISTENERS: 'CONTROLLER://:19092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'CONTROLLER' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' @@ -49,7 +48,6 @@ services: KAFKA_PROCESS_ROLES: 'controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:19092,2@controller2:19092,3@controller3:19092' KAFKA_LISTENERS: 'CONTROLLER://:19092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'CONTROLLER' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' @@ -68,7 +66,6 @@ services: KAFKA_PROCESS_ROLES: 'controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:19092,2@controller2:19092,3@controller3:19092' KAFKA_LISTENERS: 'CONTROLLER://:19092' - KAFKA_INTER_BROKER_LISTENER_NAME: 'CONTROLLER' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'