From 137716f04997870499674f3d27bc52a59e8d5b3a Mon Sep 17 00:00:00 2001 From: Yang Guo <77757985+yangguo-ms@users.noreply.github.com> Date: Fri, 25 Jun 2021 15:54:44 -0700 Subject: [PATCH] [AzPubSub] Add Exit Command in All Admin Commands (#66) * add exit in kafka admin commands * add exit ot aclcommand * add error handling * update error catch * remove unused comma --- core/src/main/scala/kafka/admin/AclCommand.scala | 6 +++++- .../scala/kafka/admin/BrokerApiVersionsCommand.scala | 10 ++++++---- core/src/main/scala/kafka/admin/ConfigCommand.scala | 9 +++++++-- .../main/scala/kafka/admin/ConsumerGroupCommand.scala | 4 ++++ .../main/scala/kafka/admin/DeleteRecordsCommand.scala | 10 +++++++--- .../scala/kafka/admin/LeaderElectionCommand.scala | 11 ++++++----- core/src/main/scala/kafka/admin/LogDirsCommand.scala | 4 ++-- .../admin/PreferredReplicaLeaderElectionCommand.scala | 5 +++++ 8 files changed, 42 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 79ab6d167..bc6a5b026 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -73,6 +73,8 @@ object AclCommand extends Logging { } } + var exitCode = 0 + try { if (opts.options.has(opts.addOpt)) aclCommandService.addAcls() @@ -84,7 +86,9 @@ object AclCommand extends Logging { case e: Throwable => println(s"Error while executing ACL command: ${e.getMessage}") println(Utils.stackTrace(e)) - Exit.exit(1) + exitCode = 1 + } finally { + Exit.exit(exitCode) } } diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala index 92cdb9ee0..d723264c6 100644 --- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala +++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala @@ -22,9 +22,7 @@ import java.io.IOException import java.util.Properties import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit} - -import kafka.utils.{CommandDefaultOptions, CommandLineUtils} -import kafka.utils.Logging +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging} import org.apache.kafka.common.utils.Utils import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, ClientUtils, CommonClientConfigs, Metadata, NetworkClient, NodeApiVersions} import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFuture} @@ -55,6 +53,7 @@ object BrokerApiVersionsCommand { } def execute(args: Array[String], out: PrintStream): Unit = { + var exitCode = 0 val opts = new BrokerVersionCommandOptions(args) val adminClient = createAdminClient(opts) adminClient.awaitBrokers() @@ -62,10 +61,13 @@ object BrokerApiVersionsCommand { brokerMap.foreach { case (broker, versionInfoOrError) => versionInfoOrError match { case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n") - case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n") + case Failure(v) => + out.print(s"${broker} -> ERROR: ${v}\n") + exitCode = 1 } } adminClient.close() + Exit.exit(exitCode) } private def createAdminClient(opts: BrokerVersionCommandOptions): AdminClient = { diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 9203334e4..56e2f8cee 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -71,6 +71,9 @@ object ConfigCommand extends Config { DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp) def main(args: Array[String]): Unit = { + + var exitCode = 0 + try { val opts = new ConfigCommandOptions(args) @@ -87,13 +90,15 @@ object ConfigCommand extends Config { case e @ (_: IllegalArgumentException | _: InvalidConfigurationException | _: OptionException) => logger.debug(s"Failed config command with args '${args.mkString(" ")}'", e) System.err.println(e.getMessage) - Exit.exit(1) + exitCode = 1 case t: Throwable => logger.debug(s"Error while executing config command with args '${args.mkString(" ")}'", t) System.err.println(s"Error while executing config command with args '${args.mkString(" ")}'") t.printStackTrace(System.err) - Exit.exit(1) + exitCode = 1 + } finally { + Exit.exit(exitCode) } } diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index eeb93d65c..1a450ad8a 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -59,6 +59,8 @@ object ConsumerGroupCommand extends Logging { val consumerGroupService = new ConsumerGroupService(opts) + var exitCode = 0 + try { if (opts.options.has(opts.listOpt)) consumerGroupService.listGroups().foreach(println(_)) @@ -80,8 +82,10 @@ object ConsumerGroupCommand extends Logging { } catch { case e: Throwable => printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e)) + exitCode = 1 } finally { consumerGroupService.close() + Exit.exit(exitCode) } } diff --git a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala index 3a342b796..228e48a2d 100644 --- a/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala +++ b/core/src/main/scala/kafka/admin/DeleteRecordsCommand.scala @@ -19,10 +19,9 @@ package kafka.admin import java.io.PrintStream import java.util.Properties - import kafka.common.AdminCommandFailedException import kafka.utils.json.JsonValue -import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Json} +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Json} import org.apache.kafka.clients.admin.RecordsToDelete import org.apache.kafka.clients.{CommonClientConfigs, admin} import org.apache.kafka.common.TopicPartition @@ -90,14 +89,19 @@ object DeleteRecordsCommand { val deleteRecordsResult = adminClient.deleteRecords(recordsToDelete) out.println("Records delete operation completed:") + var exitCode = 0 + deleteRecordsResult.lowWatermarks.asScala.foreach { case (tp, partitionResult) => { try out.println(s"partition: $tp\tlow_watermark: ${partitionResult.get.lowWatermark}") catch { - case e: Exception => out.println(s"partition: $tp\terror: ${e.getMessage}") + case e: Exception => + out.println(s"partition: $tp\terror: ${e.getMessage}") + exitCode = 1 } }} adminClient.close() + Exit.exit(exitCode) } private def createAdminClient(opts: DeleteRecordsCommandOptions): admin.Admin = { diff --git a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala index ff349f6f3..16743d998 100644 --- a/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/LeaderElectionCommand.scala @@ -20,11 +20,7 @@ import java.util.Properties import java.util.concurrent.ExecutionException import joptsimple.util.EnumConverter import kafka.common.AdminCommandFailedException -import kafka.utils.CommandDefaultOptions -import kafka.utils.CommandLineUtils -import kafka.utils.CoreUtils -import kafka.utils.Json -import kafka.utils.Logging +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Json, Logging} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AdminClient => JAdminClient} import org.apache.kafka.common.ElectionType import org.apache.kafka.common.TopicPartition @@ -84,10 +80,15 @@ object LeaderElectionCommand extends Logging { JAdminClient.create(props) } + var exitCode = 0 + try { electLeaders(adminClient, electionType, topicPartitions) + } catch { + case e: Throwable => exitCode = 1 } finally { adminClient.close() + Exit.exit(exitCode) } } diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala index 40d39e115..3ec501dd5 100644 --- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala +++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala @@ -19,8 +19,7 @@ package kafka.admin import java.io.PrintStream import java.util.Properties - -import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Json} +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Json} import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeLogDirsResult, AdminClient => JAdminClient} import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo import org.apache.kafka.common.utils.Utils @@ -53,6 +52,7 @@ object LogDirsCommand { out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}") out.println(formatAsJson(logDirInfosByBroker, topicList.toSet)) adminClient.close() + Exit.exit(0) } private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirInfo]], topicSet: Set[String]): String = { diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 04765a8b2..ebe186369 100755 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -76,10 +76,15 @@ object PreferredReplicaLeaderElectionCommand extends Logging { new AdminClientCommand(adminProps) } + var exitCode = 0 + try { preferredReplicaElectionCommand.electPreferredLeaders(partitionsForPreferredReplicaElection) + } catch { + case e: Throwable => exitCode = 1 } finally { preferredReplicaElectionCommand.close() + Exit.exit(exitCode) } }