[AzPubSub] Add Exit Command in All Admin Commands (#66)

* add exit in kafka admin commands

* add exit ot aclcommand

* add error handling

* update error catch

* remove unused comma
This commit is contained in:
Yang Guo 2021-06-25 15:54:44 -07:00 коммит произвёл GitHub
Родитель 60d55ccb13
Коммит 137716f049
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 42 добавлений и 17 удалений

Просмотреть файл

@ -73,6 +73,8 @@ object AclCommand extends Logging {
}
}
var exitCode = 0
try {
if (opts.options.has(opts.addOpt))
aclCommandService.addAcls()
@ -84,7 +86,9 @@ object AclCommand extends Logging {
case e: Throwable =>
println(s"Error while executing ACL command: ${e.getMessage}")
println(Utils.stackTrace(e))
Exit.exit(1)
exitCode = 1
} finally {
Exit.exit(exitCode)
}
}

Просмотреть файл

@ -22,9 +22,7 @@ import java.io.IOException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils}
import kafka.utils.Logging
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Logging}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, ClientUtils, CommonClientConfigs, Metadata, NetworkClient, NodeApiVersions}
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFuture}
@ -55,6 +53,7 @@ object BrokerApiVersionsCommand {
}
def execute(args: Array[String], out: PrintStream): Unit = {
var exitCode = 0
val opts = new BrokerVersionCommandOptions(args)
val adminClient = createAdminClient(opts)
adminClient.awaitBrokers()
@ -62,10 +61,13 @@ object BrokerApiVersionsCommand {
brokerMap.foreach { case (broker, versionInfoOrError) =>
versionInfoOrError match {
case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n")
case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n")
case Failure(v) =>
out.print(s"${broker} -> ERROR: ${v}\n")
exitCode = 1
}
}
adminClient.close()
Exit.exit(exitCode)
}
private def createAdminClient(opts: BrokerVersionCommandOptions): AdminClient = {

Просмотреть файл

@ -71,6 +71,9 @@ object ConfigCommand extends Config {
DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp)
def main(args: Array[String]): Unit = {
var exitCode = 0
try {
val opts = new ConfigCommandOptions(args)
@ -87,13 +90,15 @@ object ConfigCommand extends Config {
case e @ (_: IllegalArgumentException | _: InvalidConfigurationException | _: OptionException) =>
logger.debug(s"Failed config command with args '${args.mkString(" ")}'", e)
System.err.println(e.getMessage)
Exit.exit(1)
exitCode = 1
case t: Throwable =>
logger.debug(s"Error while executing config command with args '${args.mkString(" ")}'", t)
System.err.println(s"Error while executing config command with args '${args.mkString(" ")}'")
t.printStackTrace(System.err)
Exit.exit(1)
exitCode = 1
} finally {
Exit.exit(exitCode)
}
}

Просмотреть файл

@ -59,6 +59,8 @@ object ConsumerGroupCommand extends Logging {
val consumerGroupService = new ConsumerGroupService(opts)
var exitCode = 0
try {
if (opts.options.has(opts.listOpt))
consumerGroupService.listGroups().foreach(println(_))
@ -80,8 +82,10 @@ object ConsumerGroupCommand extends Logging {
} catch {
case e: Throwable =>
printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e))
exitCode = 1
} finally {
consumerGroupService.close()
Exit.exit(exitCode)
}
}

Просмотреть файл

@ -19,10 +19,9 @@ package kafka.admin
import java.io.PrintStream
import java.util.Properties
import kafka.common.AdminCommandFailedException
import kafka.utils.json.JsonValue
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Json}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Json}
import org.apache.kafka.clients.admin.RecordsToDelete
import org.apache.kafka.clients.{CommonClientConfigs, admin}
import org.apache.kafka.common.TopicPartition
@ -90,14 +89,19 @@ object DeleteRecordsCommand {
val deleteRecordsResult = adminClient.deleteRecords(recordsToDelete)
out.println("Records delete operation completed:")
var exitCode = 0
deleteRecordsResult.lowWatermarks.asScala.foreach { case (tp, partitionResult) => {
try out.println(s"partition: $tp\tlow_watermark: ${partitionResult.get.lowWatermark}")
catch {
case e: Exception => out.println(s"partition: $tp\terror: ${e.getMessage}")
case e: Exception =>
out.println(s"partition: $tp\terror: ${e.getMessage}")
exitCode = 1
}
}}
adminClient.close()
Exit.exit(exitCode)
}
private def createAdminClient(opts: DeleteRecordsCommandOptions): admin.Admin = {

Просмотреть файл

@ -20,11 +20,7 @@ import java.util.Properties
import java.util.concurrent.ExecutionException
import joptsimple.util.EnumConverter
import kafka.common.AdminCommandFailedException
import kafka.utils.CommandDefaultOptions
import kafka.utils.CommandLineUtils
import kafka.utils.CoreUtils
import kafka.utils.Json
import kafka.utils.Logging
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, CoreUtils, Exit, Json, Logging}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AdminClient => JAdminClient}
import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.TopicPartition
@ -84,10 +80,15 @@ object LeaderElectionCommand extends Logging {
JAdminClient.create(props)
}
var exitCode = 0
try {
electLeaders(adminClient, electionType, topicPartitions)
} catch {
case e: Throwable => exitCode = 1
} finally {
adminClient.close()
Exit.exit(exitCode)
}
}

Просмотреть файл

@ -19,8 +19,7 @@ package kafka.admin
import java.io.PrintStream
import java.util.Properties
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Json}
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, Json}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, DescribeLogDirsResult, AdminClient => JAdminClient}
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.common.utils.Utils
@ -53,6 +52,7 @@ object LogDirsCommand {
out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}")
out.println(formatAsJson(logDirInfosByBroker, topicList.toSet))
adminClient.close()
Exit.exit(0)
}
private def formatAsJson(logDirInfosByBroker: Map[Integer, Map[String, LogDirInfo]], topicSet: Set[String]): String = {

Просмотреть файл

@ -76,10 +76,15 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
new AdminClientCommand(adminProps)
}
var exitCode = 0
try {
preferredReplicaElectionCommand.electPreferredLeaders(partitionsForPreferredReplicaElection)
} catch {
case e: Throwable => exitCode = 1
} finally {
preferredReplicaElectionCommand.close()
Exit.exit(exitCode)
}
}