Skip to content

Commit 3a20ea7

Browse files
committed
Delete topic from CLI.
Closes mesos#186. Inspired by mesos#245
1 parent 1576216 commit 3a20ea7

File tree

8 files changed

+96
-0
lines changed

8 files changed

+96
-0
lines changed

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,26 @@ topic-expr examples:
896896
t* - topics starting with 't'
897897
```
898898

899+
Deleting Topics
900+
--------------
901+
Note: needs to be supported and enabled in the broker configuration (i.e.: `delete.topic.enable=true`).
902+
```
903+
#./kafka-mesos.sh help topic delete
904+
Delete topics
905+
Usage: topic delete [<topic-expr>]
906+
907+
Generic Options
908+
Option Description
909+
------ -----------
910+
--api Api url. Example: http://master:7000
911+
912+
topic-expr examples:
913+
t0 - topic t0
914+
t0,t1 - topics t0, t1
915+
* - any topic
916+
t* - topics starting with 't'
917+
```
918+
899919
Listing topic partition details
900920
-------------------------------
901921
```

src/scala/iface/0_10/ly/stealth/mesos/kafka/interface/impl/AdminUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ class AdminUtils(zkUrl: String) extends AdminUtilsProxy {
4141
configs: Properties
4242
): Unit = KafkaAdminUtils.changeTopicConfig(zkUtils, topic, configs)
4343

44+
override def deleteTopic(topicToDelete: String): Unit =
45+
KafkaAdminUtils.deleteTopic(zkUtils, topicToDelete)
46+
4447
override def fetchEntityConfig(
4548
entityType: String,
4649
entity: String

src/scala/iface/0_8/ly/stealth/mesos/kafka/interface/impl/AdminUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ class AdminUtils(zkUrl: String) extends AdminUtilsProxy {
4141
configs: Properties
4242
): Unit = KafkaAdminUtils.changeTopicConfig(zkClient, topic, configs)
4343

44+
override def deleteTopic(topicToDelete: String): Unit =
45+
KafkaAdminUtils.deleteTopic(zkClient, topicToDelete)
46+
4447
override def fetchEntityConfig(
4548
entityType: String,
4649
entity: String

src/scala/iface/0_9/ly/stealth/mesos/kafka/interface/impl/AdminUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ class AdminUtils(zkUrl: String) extends AdminUtilsProxy {
4141
configs: Properties
4242
): Unit = KafkaAdminUtils.changeTopicConfig(zkUtils, topic, configs)
4343

44+
override def deleteTopic(topicToDelete: String): Unit =
45+
KafkaAdminUtils.deleteTopic(zkUtils, topicToDelete)
46+
4447
override def fetchEntityConfig(
4548
entityType: String,
4649
entity: String

src/scala/iface/base/ly/stealth/mesos/kafka/interface/AdminUtilsProxy.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ trait AdminUtilsProxy {
3535

3636
def changeTopicConfig(topic: String, configs: Properties)
3737

38+
def deleteTopic(topic: String)
39+
3840
def fetchEntityConfig(entityType: String, entity: String): Properties
3941

4042
def changeClientIdConfig(clientId: String, configs: Properties)

src/scala/main/ly/stealth/mesos/kafka/cli/TopicCli.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ trait TopicCli {
5353
cmd match {
5454
case "list" => handleList(arg, args)
5555
case "add" | "update" => handleAddUpdate(arg, args, cmd == "add")
56+
case "delete" => handleDelete(arg, args)
5657
case "rebalance" => handleRebalance(arg, args)
5758
case "realign" => handleRealign(arg, args)
5859
case "partitions" => handlePartitions(arg)
@@ -72,6 +73,8 @@ trait TopicCli {
7273
handleList(null, null, help = true)
7374
case "add" | "update" =>
7475
handleAddUpdate(null, null, cmd == "add", help = true)
76+
case "delete" =>
77+
handleDelete(null, null, help = true)
7578
case "rebalance" =>
7679
handleRebalance(null, null, help = true)
7780
case "realign" =>
@@ -136,6 +139,40 @@ trait TopicCli {
136139
}
137140
}
138141

142+
def handleDelete(expr: String, args: Array[String], help: Boolean = false): Unit = {
143+
val parser = newParser()
144+
145+
if (help) {
146+
printLine("Delete topics\nUsage: topic delete [<topic-expr>]\n")
147+
handleGenericOptions(null, help = true)
148+
149+
printLine()
150+
printTopicExprExamples()
151+
152+
return
153+
}
154+
155+
val params = new util.LinkedHashMap[String, String]
156+
if (expr != null) params.put("topic", expr)
157+
158+
val json =
159+
try {
160+
sendRequestObj[ListTopicsResponse]("/topic/delete", params)
161+
}
162+
catch {
163+
case e: IOException => throw new Error("" + e)
164+
}
165+
166+
val topics = json.topics
167+
val title = s"topic${if (topics.size > 1) "s" else ""} deleted:"
168+
printLine(title)
169+
170+
topics.foreach { topic =>
171+
printTopic(topic, 1)
172+
printLine()
173+
}
174+
}
175+
139176
def handleAddUpdate(
140177
name: String,
141178
args: Array[String],
@@ -362,6 +399,7 @@ trait TopicCli {
362399
printLine("Commands:")
363400
printLine("list - list topics", 1)
364401
printLine("add - add topic", 1)
402+
printLine("delete - delete topic", 1)
365403
printLine("update - update topic", 1)
366404
printLine("rebalance - rebalance topics", 1)
367405
printLine("realign - realign topics, keeping existing broker assignments where possible", 1)

src/scala/main/ly/stealth/mesos/kafka/scheduler/Topics.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,12 @@ class Topics {
8383
AdminUtilsWrapper().assignReplicasToBrokers(brokers_, partitions, replicas, fixedStartIndex, startPartitionId)
8484
}
8585

86+
def deleteTopic(name: String): Topic = {
87+
val topicToDelete = getTopic(name)
88+
if(topicToDelete != null) AdminUtilsWrapper().deleteTopic(topicToDelete.name)
89+
topicToDelete
90+
}
91+
8692
def addTopic(name: String, assignment: Map[Int, Seq[Int]] = null, options: Map[String, String] = null): Topic = {
8793
val config = new Properties()
8894
if (options != null)

src/scala/main/ly/stealth/mesos/kafka/scheduler/http/api/TopicApi.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,27 @@ trait TopicApiComponentImpl extends TopicApiComponent {
5959
@DefaultValue("*") @BothParam("topic") expr: String
6060
) = list(expr)
6161

62+
@Path("delete")
63+
@POST
64+
@Produces(Array(MediaType.APPLICATION_JSON))
65+
def deletePost(
66+
@BothParam("topic") topicExpr: String
67+
): Response = {
68+
if (topicExpr == null)
69+
return Status.BadRequest("topic required")
70+
71+
val names = Expr.expandTopics(topicExpr).toSet
72+
73+
val missing = names.filter(cluster.topics.getTopic(_) == null)
74+
if(missing.nonEmpty) {
75+
Status.BadRequest(s"topic${if (missing.size > 1) "s" else ""} not found ${missing.mkString(",")}")
76+
} else {
77+
Response.ok(ListTopicsResponse(
78+
names.toSeq.map(cluster.topics.deleteTopic(_))
79+
)).build()
80+
}
81+
}
82+
6283
@Path("{op: (add|update)}")
6384
@POST
6485
@Produces(Array(MediaType.APPLICATION_JSON))

0 commit comments

Comments
 (0)