Skip to content
Merged
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ Currently, the following operations are supported:
| Subcommand | Description |
| --------- | ----------- |
| `delete acls [flags]` | Deletes ACL(s) in the cluster matching the provided flags |
| `delete topic [topic]` | Deletes a single topic in the cluster |

#### get

Expand Down
22 changes: 22 additions & 0 deletions cmd/topicctl/subcmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func init() {
addSharedFlags(deleteCmd, &deleteConfig.shared)
deleteCmd.AddCommand(
deleteACLCmd(),
deleteTopicCmd(),
)
RootCmd.AddCommand(deleteCmd)
}
Expand Down Expand Up @@ -150,3 +151,24 @@ $ topicctl delete acls --resource-type topic --resource-pattern-type literal --r
cmd.MarkFlagRequired("resource-type")
return cmd
}

func deleteTopicCmd() *cobra.Command {
return &cobra.Command{
Use: "topic [topic name]",
Short: "Delete a topic",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())

adminClient, err := deleteConfig.shared.getAdminClient(ctx, sess, false)
if err != nil {
return err
}
defer adminClient.Close()

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.DeleteTopic(ctx, args[0])
},
}
}
25 changes: 25 additions & 0 deletions pkg/admin/brokerclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,31 @@ func (c *BrokerAdminClient) AssignPartitions(
return err
}

// DeleteTopic deletes a topic in the cluster.
func (c *BrokerAdminClient) DeleteTopic(ctx context.Context, topic string) error {
if c.config.ReadOnly {
return errors.New("Cannot delete topics in read-only mode")
}

req := &kafka.DeleteTopicsRequest{
Topics: []string{topic},
}
log.Debugf("DeleteTopics request: %+v", req)

resp, err := c.client.DeleteTopics(ctx, req)
log.Debugf("DeleteTopics response: %+v (%+v)", resp, err)

if err != nil {
return err
}

if err, ok := resp.Errors[topic]; ok {
return err
}

return nil
}

// AddPartitions extends a topic by adding one or more new partitions to it.
func (c *BrokerAdminClient) AddPartitions(
ctx context.Context,
Expand Down
64 changes: 64 additions & 0 deletions pkg/admin/brokerclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,70 @@ func TestBrokerClientAddPartitions(t *testing.T) {
assert.Equal(t, []int{6, 1}, topicInfo.Partitions[4].Replicas)
}

func TestBrokerDeleteTopic(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
}

ctx := context.Background()
client, err := NewBrokerAdminClient(
ctx,
BrokerAdminClientConfig{
ConnectorConfig: ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
},
},
)
require.NoError(t, err)

topicName := util.RandomString("topic-delete-", 6)
err = client.CreateTopic(
ctx,
kafka.TopicConfig{
Topic: topicName,
NumPartitions: -1,
ReplicationFactor: -1,
ReplicaAssignments: []kafka.ReplicaAssignment{
{
Partition: 0,
Replicas: []int{1, 2},
},
{
Partition: 1,
Replicas: []int{2, 3},
},
{
Partition: 2,
Replicas: []int{3, 4},
},
},
ConfigEntries: []kafka.ConfigEntry{
{
ConfigName: "flush.ms",
ConfigValue: "2000",
},
{
ConfigName: "retention.ms",
ConfigValue: "10000000",
},
},
},
)
require.NoError(t, err)
util.RetryUntil(t, 5*time.Second, func() error {
_, err := client.GetTopic(ctx, topicName, true)
return err
})

err = client.DeleteTopic(ctx, topicName)
require.NoError(t, err)

time.Sleep(time.Second * 10)

_, err = client.GetTopic(ctx, topicName, false)
require.Error(t, err)
}

func TestBrokerClientAlterAssignments(t *testing.T) {
if !util.CanTestBrokerAdmin() {
t.Skip("Skipping because KAFKA_TOPICS_TEST_BROKER_ADMIN is not set")
Expand Down
3 changes: 3 additions & 0 deletions pkg/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type Client interface {
detailed bool,
) (TopicInfo, error)

// DeleteTopic deletes a single topic in the cluster.
DeleteTopic(ctx context.Context, topic string) error

// GetACLs gets full information about each ACL in the cluster.
GetACLs(
ctx context.Context,
Expand Down
23 changes: 23 additions & 0 deletions pkg/admin/zkclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,29 @@ func (c *ZKAdminClient) CreateTopic(
return err
}

func (c *ZKAdminClient) DeleteTopic(ctx context.Context, topic string) error {
if c.readOnly {
return errors.New("Cannot delete topics in read-only mode")
}

req := kafka.DeleteTopicsRequest{
Topics: []string{topic},
}
log.Debugf("DeleteTopics request: %+v", req)

resp, err := c.Connector.KafkaClient.DeleteTopics(ctx, &req)
log.Debugf("DeleteTopics response: %+v (%+v)", resp, err)
if err != nil {
return err
}

if err, ok := resp.Errors[topic]; ok {
return err
}

return nil
}

// AssignPartitions notifies the cluster to begin a partition reassignment.
// This should only be used for existing partitions; to create new partitions,
// use the AddPartitions method.
Expand Down
35 changes: 35 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/segmentio/topicctl/pkg/config"
"github.com/segmentio/topicctl/pkg/groups"
"github.com/segmentio/topicctl/pkg/messages"
"github.com/segmentio/topicctl/pkg/util"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -608,6 +609,40 @@ func (c *CLIRunner) GetTopics(ctx context.Context, full bool) error {
return nil
}

// DeleteTopic deletes a single topic.
func (c *CLIRunner) DeleteTopic(ctx context.Context, topic string) error {
c.printer("Checking if topic %s exists...", topic)
c.startSpinner()
// First check that topic exists
_, err := c.adminClient.GetTopic(ctx, topic, false)
if err != nil {
c.stopSpinner()
return fmt.Errorf("error fetching topic info: %+v", err)
}
c.stopSpinner()
c.printer("Topic %s exists in the cluster!", topic)

confirm, err := util.Confirm(fmt.Sprintf("Delete topic \"%s\"", topic), false)
if err != nil {
return err
}

if !confirm {
return nil
}

c.startSpinner()
err = c.adminClient.DeleteTopic(ctx, topic)
c.stopSpinner()
if err != nil {
return err
}

c.printer("Topic %s successfully deleted", topic)

return nil
}

// GerUsers fetches the details of each user in the cluster and prints out a table of them.
func (c *CLIRunner) GetUsers(ctx context.Context, names []string) error {
c.startSpinner()
Expand Down
2 changes: 1 addition & 1 deletion pkg/version/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package version

// Version is the current topicctl version.
const Version = "1.20.2"
const Version = "1.21.0"