From 8f92b8051e60a23fe977ce5f2cfba22258372a9b Mon Sep 17 00:00:00 2001 From: Tushar Malik Date: Fri, 13 Feb 2026 15:58:39 -0500 Subject: [PATCH 1/4] APIE-844 - Confluent CLI support for kstreams --- go.mod | 1 + go.sum | 2 + internal/kafka/command_consumer_group.go | 1 + .../kafka/command_consumer_group_describe.go | 1 + internal/kafka/command_consumer_group_list.go | 1 + internal/kafka/command_kafka_stream_group.go | 52 +++++++++++ .../command_kafka_stream_group_describe.go | 62 +++++++++++++ .../kafka/command_kafka_stream_group_list.go | 65 ++++++++++++++ .../command_kafka_stream_group_member.go | 32 +++++++ ...nd_kafka_stream_group_member_assignment.go | 27 ++++++ ...stream_group_member_assignment_describe.go | 66 ++++++++++++++ ...tream_group_member_assignment_task_list.go | 81 +++++++++++++++++ ...mand_kafka_stream_group_member_describe.go | 71 +++++++++++++++ .../command_kafka_stream_group_member_list.go | 75 ++++++++++++++++ ...a_stream_group_member_target_assignment.go | 17 ++++ ...group_member_target_assignment_describe.go | 66 ++++++++++++++ ...eam_group_member_target_assignment_list.go | 81 +++++++++++++++++ ...am_group_member_target_task_partitions..go | 16 ++++ ..._member_target_task_partitions_describe.go | 77 ++++++++++++++++ ...fka_stream_group_member_task_partitions.go | 22 +++++ ...m_group_member_task_partitions_describe.go | 76 ++++++++++++++++ .../command_kafka_stream_group_subtopology.go | 25 ++++++ ...kafka_stream_group_subtopology_describe.go | 64 +++++++++++++ ...and_kafka_stream_group_subtopology_list.go | 69 ++++++++++++++ pkg/ccloudv2/kafkarest.go | 89 +++++++++++++++++++ pkg/cmd/flags.go | 18 ++++ pkg/cmd/kafka_rest.go | 7 +- 27 files changed, 1161 insertions(+), 3 deletions(-) create mode 100644 internal/kafka/command_kafka_stream_group.go create mode 100644 internal/kafka/command_kafka_stream_group_describe.go create mode 100644 internal/kafka/command_kafka_stream_group_list.go create mode 100644 internal/kafka/command_kafka_stream_group_member.go create mode 100644 internal/kafka/command_kafka_stream_group_member_assignment.go create mode 100644 internal/kafka/command_kafka_stream_group_member_assignment_describe.go create mode 100644 internal/kafka/command_kafka_stream_group_member_assignment_task_list.go create mode 100644 internal/kafka/command_kafka_stream_group_member_describe.go create mode 100644 internal/kafka/command_kafka_stream_group_member_list.go create mode 100644 internal/kafka/command_kafka_stream_group_member_target_assignment.go create mode 100644 internal/kafka/command_kafka_stream_group_member_target_assignment_describe.go create mode 100644 internal/kafka/command_kafka_stream_group_member_target_assignment_list.go create mode 100644 internal/kafka/command_kafka_stream_group_member_target_task_partitions..go create mode 100644 internal/kafka/command_kafka_stream_group_member_target_task_partitions_describe.go create mode 100644 internal/kafka/command_kafka_stream_group_member_task_partitions.go create mode 100644 internal/kafka/command_kafka_stream_group_member_task_partitions_describe.go create mode 100644 internal/kafka/command_kafka_stream_group_subtopology.go create mode 100644 internal/kafka/command_kafka_stream_group_subtopology_describe.go create mode 100644 internal/kafka/command_kafka_stream_group_subtopology_list.go diff --git a/go.mod b/go.mod index 2f08ee458f..29bd509e65 100644 --- a/go.mod +++ b/go.mod @@ -164,6 +164,7 @@ require ( github.com/charmbracelet/x/term v0.1.1 // indirect github.com/charmbracelet/x/windows v0.1.0 // indirect github.com/cloudflare/circl v1.6.1 // indirect + github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest v0.23.1-0.20260206203324-971ae1cf8ca8 // indirect github.com/confluentinc/proto-go-setter v0.3.0 // indirect github.com/cyphar/filepath-securejoin v0.2.5 // indirect github.com/distribution/reference v0.6.0 // indirect diff --git a/go.sum b/go.sum index 5e531fa2d2..9fe9a6e629 100644 --- a/go.sum +++ b/go.sum @@ -188,6 +188,8 @@ github.com/compose-spec/compose-go/v2 v2.1.3 h1:bD67uqLuL/XgkAK6ir3xZvNLFPxPScEi github.com/compose-spec/compose-go/v2 v2.1.3/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc= github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52 h1:19qEGhkbZa5fopKCe0VPIV+Sasby4Pv10z9ZaktwWso= github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52/go.mod h1:62EMf+5uFEt1BJ2q8WMrUoI9VUSxAbDnmZCGRt/MbA0= +github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest v0.23.1-0.20260206203324-971ae1cf8ca8 h1:0t56uO8mzCIT7PLg/G7yKZc6U0ToR3Pqmi3aN1uPpiM= +github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest v0.23.1-0.20260206203324-971ae1cf8ca8/go.mod h1:R2nAnRzw0ug4oUywRLO7AiyciE1dFS1Rf3TIfxj9Znk= github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0 h1:zSF4OQUJXWH2JeAo9rsq13ibk+JFdzITGR8S7cFMpzw= github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0/go.mod h1:DoxqzzF3JzvJr3fWkvCiOHFlE0GoYpozWxFZ1Ud9ntA= github.com/confluentinc/ccloud-sdk-go-v2/apikeys v0.4.0 h1:8fWyLwMuy8ec0MVF5Avd54UvbIxhDFhZzanHBVwgxdw= diff --git a/internal/kafka/command_consumer_group.go b/internal/kafka/command_consumer_group.go index 0c9d1831c5..5cf19a0a4d 100644 --- a/internal/kafka/command_consumer_group.go +++ b/internal/kafka/command_consumer_group.go @@ -14,6 +14,7 @@ type consumerGroupOut struct { IsSimple bool `human:"Simple" serialized:"is_simple"` PartitionAssignor string `human:"Partition Assignor" serialized:"partition_assignor"` State string `human:"State" serialized:"state"` + ProtocolType string `human:"Type,omitempty" serialized:"type,omitempty"` } func (c *consumerCommand) newGroupCommand(cfg *config.Config) *cobra.Command { diff --git a/internal/kafka/command_consumer_group_describe.go b/internal/kafka/command_consumer_group_describe.go index e30b477362..1c52cc9be3 100644 --- a/internal/kafka/command_consumer_group_describe.go +++ b/internal/kafka/command_consumer_group_describe.go @@ -46,6 +46,7 @@ func (c *consumerCommand) groupDescribe(cmd *cobra.Command, args []string) error IsSimple: group.GetIsSimple(), PartitionAssignor: group.GetPartitionAssignor(), State: group.GetState(), + ProtocolType: group.GetType(), }) return table.Print() } diff --git a/internal/kafka/command_consumer_group_list.go b/internal/kafka/command_consumer_group_list.go index 020b4d23e6..ac1564de13 100644 --- a/internal/kafka/command_consumer_group_list.go +++ b/internal/kafka/command_consumer_group_list.go @@ -44,6 +44,7 @@ func (c *consumerCommand) groupList(cmd *cobra.Command, _ []string) error { IsSimple: group.GetIsSimple(), PartitionAssignor: group.GetPartitionAssignor(), State: group.GetState(), + ProtocolType: group.GetType(), }) } return list.Print() diff --git a/internal/kafka/command_kafka_stream_group.go b/internal/kafka/command_kafka_stream_group.go new file mode 100644 index 0000000000..25a0554c71 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group.go @@ -0,0 +1,52 @@ +package kafka + +import ( + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/config" + "github.com/spf13/cobra" +) + +type streamGroupOut struct { + Kind string `human:"Kind" serialized:"kind"` + ClusterId string `human:"Cluster Id" serialized:"cluster_id"` + GroupId string `human:"Group Id" serialized:"group_id"` + State string `human:"State" serialized:"state"` + MemberCount int32 `human:"Member Count" serialized:"member_count"` + SubtopologyCount int32 `human:"Subtopology Count" serialized:"subtopology_count"` + GroupEpoch int32 `human:"Group Epoch" serialized:"group_epoch"` + TopologyEpoch int32 `human:"Topology Epoch" serialized:"topology_epoch"` + TargetAssignmentEpoch int32 `human:"Target Assignment Epoch" serialized:"target_assignment_epoch"` + Members string `human:"Members" serialized:"members"` + Subtopologies string `human:"Subtopologies" serialized:"subtopologies"` +} + +func (c *consumerCommand) newStreamGroupCommand(cfg *config.Config) *cobra.Command { + cmd := &cobra.Command{ + Use: "stream-group", + Short: "Manage Kafka stream groups.", + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogin}, + } + + cmd.AddCommand(c.newStreamGroupDescribeCommand()) + cmd.AddCommand(c.newStreamGroupListCommand()) + cmd.AddCommand(c.newStreamGroupMemberCommand()) + cmd.AddCommand(c.newStreamGroupMemberAssignmentCommand()) + cmd.AddCommand(c.newStreamGroupMemberTaskPartitionsCommand()) + cmd.AddCommand(c.newStreamGroupMemberTargetAssignmentCommand()) + cmd.AddCommand(c.newStreamGroupMemberTargetAssignmentTaskPartitionsCommand()) + cmd.AddCommand(c.newStreamGroupSubtopologyCommand()) + + return cmd +} + +func (c *consumerCommand) validStreamGroupArgs(cmd *cobra.Command, args []string) []string { + if len(args) > 0 { + return nil + } + + if err := c.PersistentPreRunE(cmd, args); err != nil { + return nil + } + + return pcmd.AutocompleteStreamGroups(cmd, c.AuthenticatedCLICommand) +} diff --git a/internal/kafka/command_kafka_stream_group_describe.go b/internal/kafka/command_kafka_stream_group_describe.go new file mode 100644 index 0000000000..5be3d585c3 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_describe.go @@ -0,0 +1,62 @@ +package kafka + +import ( + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe stream group", + Args: cobra.ExactArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), + RunE: c.streamGroupDescribe, + } + + cmd.Flags().String("group", "", "Group Id.") + pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("group")) + + return cmd +} + +func (c *consumerCommand) streamGroupDescribe(cmd *cobra.Command, args []string) error { + groupId, err := cmd.Flags().GetString("group") + if err != nil { + return err + } + + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return err + } + + streamGroup, err := kafkaREST.CloudClientInternal.GetKafkaStreamGroup(groupId) + if err != nil { + return err + } + + table := output.NewTable(cmd) + table.Add(&streamGroupOut{ + Kind: streamGroup.GetKind(), + ClusterId: streamGroup.GetClusterId(), + GroupId: streamGroup.GetGroupId(), + State: streamGroup.GetState(), + MemberCount: streamGroup.GetMemberCount(), + SubtopologyCount: streamGroup.GetSubtopologyCount(), + GroupEpoch: streamGroup.GetGroupEpoch(), + TopologyEpoch: streamGroup.GetTopologyEpoch(), + TargetAssignmentEpoch: streamGroup.GetTargetAssignmentEpoch(), + Members: streamGroup.Members.GetRelated(), + Subtopologies: streamGroup.Subtopologies.GetRelated(), + }) + + return table.Print() +} diff --git a/internal/kafka/command_kafka_stream_group_list.go b/internal/kafka/command_kafka_stream_group_list.go new file mode 100644 index 0000000000..98e6682c35 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_list.go @@ -0,0 +1,65 @@ +package kafka + +import ( + kafkarestv3Internal "github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest/v3" + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List kafka stream groups.", + Args: cobra.NoArgs, + RunE: c.listStreamGroup, + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + } + + pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *consumerCommand) listStreamGroup(cmd *cobra.Command, _ []string) error { + groups, err := c.getStreamGroups(cmd) + if err != nil { + return err + } + + list := output.NewList(cmd) + for _, streamGroup := range groups { + list.Add(&streamGroupOut{ + Kind: streamGroup.GetKind(), + ClusterId: streamGroup.GetClusterId(), + GroupId: streamGroup.GetGroupId(), + State: streamGroup.GetState(), + MemberCount: streamGroup.GetMemberCount(), + SubtopologyCount: streamGroup.GetSubtopologyCount(), + GroupEpoch: streamGroup.GetGroupEpoch(), + TopologyEpoch: streamGroup.GetTopologyEpoch(), + TargetAssignmentEpoch: streamGroup.GetTargetAssignmentEpoch(), + Members: streamGroup.Members.GetRelated(), + Subtopologies: streamGroup.Subtopologies.GetRelated(), + }) + } + return list.Print() +} + +func (c *consumerCommand) getStreamGroups(cmd *cobra.Command) ([]kafkarestv3Internal.StreamsGroupData, error) { + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return nil, err + } + + topics, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup() + if err != nil { + return nil, err + } + + return topics.Data, nil +} diff --git a/internal/kafka/command_kafka_stream_group_member.go b/internal/kafka/command_kafka_stream_group_member.go new file mode 100644 index 0000000000..bc3c59541f --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member.go @@ -0,0 +1,32 @@ +package kafka + +import ( + "github.com/spf13/cobra" +) + +type streamGroupMemberOut struct { + Kind string `human:"Kind" serialized:"kind"` + ClusterId string `human:"Cluster Id" serialized:"cluster_id"` + GroupId string `human:"Group Id" serialized:"group_id"` + MemberId string `human:"Member Id" serialized:"member_id"` + ProcessId string `human:"Process Id" serialized:"process_id"` + ClientId string `human:"Client Id" serialized:"client_id"` + InstanceId string `human:"Instance Id" serialized:"instance_id"` + MemberEpoch int32 `human:"Member Epoch" serialized:"member_epoch"` + TopologyEpoch int32 `human:"Topology Epoch" serialized:"topology_epoch"` + IsClassic bool `human:"Is Classic" serialized:"is_classic"` + Assignments string `human:"Assignments" serialized:"assignments"` + TargetAssign string `human:"Target Assignment" serialized:"target_assignment"` +} + +func (c *consumerCommand) newStreamGroupMemberCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "stream-group-member", + Short: "Manage Kafka stream groups members.", + } + + cmd.AddCommand(c.newStreamGroupMemberDescribeCommand()) + cmd.AddCommand(c.newStreamGroupMemberListCommand()) + + return cmd +} diff --git a/internal/kafka/command_kafka_stream_group_member_assignment.go b/internal/kafka/command_kafka_stream_group_member_assignment.go new file mode 100644 index 0000000000..ec1a5da90d --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member_assignment.go @@ -0,0 +1,27 @@ +package kafka + +import ( + "github.com/spf13/cobra" +) + +type streamGroupMemberAssignmentOut struct { + Kind string `human:"Kind" serialized:"kind"` + ClusterId string `human:"Cluster Id" serialized:"cluster_id"` + GroupId string `human:"Group Id" serialized:"group_id"` + MemberId string `human:"Member Id" serialized:"member_id"` + ActiveTasks string `human:"Active Tasks" serialized:"active_tasks"` + StandbyTasks string `human:"Standby Tasks" serialized:"standby_tasks"` + WarmupTasks string `human:"Warmup Tasks" serialized:"warmup_tasks"` +} + +func (c *consumerCommand) newStreamGroupMemberAssignmentCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "stream-group-member-assignment", + Short: "Manage Kafka stream group member assignments.", + } + + cmd.AddCommand(c.newStreamGroupMemberAssignmentDescribeCommand()) + cmd.AddCommand(c.newStreamGroupMemberAssignmentListCommand()) + + return cmd +} diff --git a/internal/kafka/command_kafka_stream_group_member_assignment_describe.go b/internal/kafka/command_kafka_stream_group_member_assignment_describe.go new file mode 100644 index 0000000000..f91177f3d9 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member_assignment_describe.go @@ -0,0 +1,66 @@ +package kafka + +import ( + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupMemberAssignmentDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe stream group member assignment", + Args: cobra.ExactArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), + RunE: c.streamGroupMemberAssignmentDescribe, + } + + cmd.Flags().String("group", "", "Group Id.") + cmd.Flags().String("member", "", "Member Id.") + + pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("group")) + cobra.CheckErr(cmd.MarkFlagRequired("member")) + + return cmd +} + +func (c *consumerCommand) streamGroupMemberAssignmentDescribe(cmd *cobra.Command, args []string) error { + groupId, err := cmd.Flags().GetString("group") + if err != nil { + return err + } + + memberId, err := cmd.Flags().GetString("member") + if err != nil { + return err + } + + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return err + } + + assignment, err := kafkaREST.CloudClientInternal.GetKafkaStreamGroupMemberAssignment(groupId, memberId) + if err != nil { + return err + } + + table := output.NewTable(cmd) + table.Add(&streamGroupMemberAssignmentOut{ + Kind: assignment.GetKind(), + ClusterId: assignment.GetClusterId(), + GroupId: assignment.GetGroupId(), + MemberId: assignment.GetMemberId(), + ActiveTasks: assignment.ActiveTasks.GetRelated(), + StandbyTasks: assignment.StandbyTasks.GetRelated(), + WarmupTasks: assignment.WarmupTasks.GetRelated(), + }) + + return table.Print() +} diff --git a/internal/kafka/command_kafka_stream_group_member_assignment_task_list.go b/internal/kafka/command_kafka_stream_group_member_assignment_task_list.go new file mode 100644 index 0000000000..83f03f6140 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member_assignment_task_list.go @@ -0,0 +1,81 @@ +package kafka + +import ( + kafkarestv3Internal "github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest/v3" + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupMemberAssignmentListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List kafka stream group member assignment tasks.", + Args: cobra.NoArgs, + RunE: c.listStreamGroupMemberAssignmentTasks, + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + } + + cmd.Flags().String("group", "", "Group Id.") + cmd.Flags().String("member", "", "Member Id.") + cmd.Flags().String("assignment", "", "Assignment type (active, standby, warmup).") + + pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("group")) + cobra.CheckErr(cmd.MarkFlagRequired("member")) + cobra.CheckErr(cmd.MarkFlagRequired("assignment")) + + return cmd +} + +func (c *consumerCommand) listStreamGroupMemberAssignmentTasks(cmd *cobra.Command, _ []string) error { + tasks, err := c.getStreamGroupMemberAssignmentTasks(cmd) + if err != nil { + return err + } + + list := output.NewList(cmd) + for _, task := range tasks { + list.Add(&streamTaskOut{ + Kind: task.GetKind(), + SubtopologyId: task.GetSubtopologyId(), + PartitionIds: task.GetPartitionIds(), + }) + } + + return list.Print() +} + +func (c *consumerCommand) getStreamGroupMemberAssignmentTasks(cmd *cobra.Command) ([]kafkarestv3Internal.StreamsTaskData, error) { + groupId, err := cmd.Flags().GetString("group") + if err != nil { + return nil, err + } + + memberId, err := cmd.Flags().GetString("member") + if err != nil { + return nil, err + } + + assignmentType, err := cmd.Flags().GetString("assignment") + if err != nil { + return nil, err + } + + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return nil, err + } + + resp, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroupMemberAssignmentTasks(groupId, memberId, assignmentType) + if err != nil { + return nil, err + } + + return resp.Data, nil +} diff --git a/internal/kafka/command_kafka_stream_group_member_describe.go b/internal/kafka/command_kafka_stream_group_member_describe.go new file mode 100644 index 0000000000..676a03a5f4 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member_describe.go @@ -0,0 +1,71 @@ +package kafka + +import ( + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupMemberDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe stream group member", + Args: cobra.ExactArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), + RunE: c.streamGroupMemberDescribe, + } + + cmd.Flags().String("group", "", "Group Id.") + cmd.Flags().String("member", "", "Member Id.") + + pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("group")) + cobra.CheckErr(cmd.MarkFlagRequired("member")) + + return cmd +} + +func (c *consumerCommand) streamGroupMemberDescribe(cmd *cobra.Command, args []string) error { + groupId, err := cmd.Flags().GetString("group") + if err != nil { + return err + } + + memberId, err := cmd.Flags().GetString("member") + if err != nil { + return err + } + + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return err + } + + member, err := kafkaREST.CloudClientInternal.GetKafkaStreamGroupMember(groupId, memberId) + if err != nil { + return err + } + + table := output.NewTable(cmd) + table.Add(&streamGroupMemberOut{ + Kind: member.GetKind(), + ClusterId: member.GetClusterId(), + GroupId: member.GetGroupId(), + MemberId: member.GetMemberId(), + ProcessId: member.GetProcessId(), + ClientId: member.GetClientId(), + InstanceId: member.GetInstanceId(), + MemberEpoch: member.GetMemberEpoch(), + TopologyEpoch: member.GetTopologyEpoch(), + IsClassic: member.GetIsClassic(), + Assignments: member.Assignments.GetRelated(), + TargetAssign: member.TargetAssignment.GetRelated(), + }) + + return table.Print() +} diff --git a/internal/kafka/command_kafka_stream_group_member_list.go b/internal/kafka/command_kafka_stream_group_member_list.go new file mode 100644 index 0000000000..ef50f955e0 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member_list.go @@ -0,0 +1,75 @@ +package kafka + +import ( + kafkarestv3Internal "github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest/v3" + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupMemberListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List kafka stream group members.", + Args: cobra.NoArgs, + RunE: c.listStreamGroupMembers, + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + } + + cmd.Flags().String("group", "", "Group Id.") + + pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("group")) + + return cmd +} + +func (c *consumerCommand) listStreamGroupMembers(cmd *cobra.Command, _ []string) error { + members, err := c.getStreamGroupMembers(cmd) + if err != nil { + return err + } + + list := output.NewList(cmd) + for _, member := range members { + list.Add(&streamGroupMemberOut{ + Kind: member.GetKind(), + ClusterId: member.GetClusterId(), + GroupId: member.GetGroupId(), + MemberId: member.GetMemberId(), + ProcessId: member.GetProcessId(), + ClientId: member.GetClientId(), + InstanceId: member.GetInstanceId(), + MemberEpoch: member.GetMemberEpoch(), + TopologyEpoch: member.GetTopologyEpoch(), + IsClassic: member.GetIsClassic(), + Assignments: member.Assignments.GetRelated(), + TargetAssign: member.TargetAssignment.GetRelated(), + }) + } + return list.Print() +} + +func (c *consumerCommand) getStreamGroupMembers(cmd *cobra.Command) ([]kafkarestv3Internal.StreamsGroupMemberData, error) { + groupId, err := cmd.Flags().GetString("group") + if err != nil { + return nil, err + } + + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return nil, err + } + + resp, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroupMembers(groupId) + if err != nil { + return nil, err + } + + return resp.Data, nil +} diff --git a/internal/kafka/command_kafka_stream_group_member_target_assignment.go b/internal/kafka/command_kafka_stream_group_member_target_assignment.go new file mode 100644 index 0000000000..b82db3b1ff --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member_target_assignment.go @@ -0,0 +1,17 @@ +package kafka + +import ( + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupMemberTargetAssignmentCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "stream-group-member-target-assignment", + Short: "Manage Kafka stream group member target assignments.", + } + + cmd.AddCommand(c.newStreamGroupMemberTargetAssignmentDescribeCommand()) + cmd.AddCommand(c.newStreamGroupMemberTargetAssignmentTaskListCommand()) + + return cmd +} diff --git a/internal/kafka/command_kafka_stream_group_member_target_assignment_describe.go b/internal/kafka/command_kafka_stream_group_member_target_assignment_describe.go new file mode 100644 index 0000000000..c4dcd5cd11 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member_target_assignment_describe.go @@ -0,0 +1,66 @@ +package kafka + +import ( + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupMemberTargetAssignmentDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe stream group member target assignment", + Args: cobra.ExactArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), + RunE: c.streamGroupMemberTargetAssignmentDescribe, + } + + cmd.Flags().String("group", "", "Group Id.") + cmd.Flags().String("member", "", "Member Id.") + + pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("group")) + cobra.CheckErr(cmd.MarkFlagRequired("member")) + + return cmd +} + +func (c *consumerCommand) streamGroupMemberTargetAssignmentDescribe(cmd *cobra.Command, args []string) error { + groupId, err := cmd.Flags().GetString("group") + if err != nil { + return err + } + + memberId, err := cmd.Flags().GetString("member") + if err != nil { + return err + } + + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return err + } + + assignment, err := kafkaREST.CloudClientInternal.GetKafkaStreamGroupMemberTargetAssignment(groupId, memberId) + if err != nil { + return err + } + + table := output.NewTable(cmd) + table.Add(&streamGroupMemberAssignmentOut{ + Kind: assignment.GetKind(), + ClusterId: assignment.GetClusterId(), + GroupId: assignment.GetGroupId(), + MemberId: assignment.GetMemberId(), + ActiveTasks: assignment.ActiveTasks.GetRelated(), + StandbyTasks: assignment.StandbyTasks.GetRelated(), + WarmupTasks: assignment.WarmupTasks.GetRelated(), + }) + + return table.Print() +} diff --git a/internal/kafka/command_kafka_stream_group_member_target_assignment_list.go b/internal/kafka/command_kafka_stream_group_member_target_assignment_list.go new file mode 100644 index 0000000000..ed002bef4d --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member_target_assignment_list.go @@ -0,0 +1,81 @@ +package kafka + +import ( + kafkarestv3Internal "github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest/v3" + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupMemberTargetAssignmentTaskListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "target-task-list", + Short: "List kafka stream group member target assignment tasks.", + Args: cobra.NoArgs, + RunE: c.listStreamGroupMemberTargetAssignmentTasks, + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + } + + cmd.Flags().String("group", "", "Group Id.") + cmd.Flags().String("member", "", "Member Id.") + cmd.Flags().String("assignment", "", "Assignment type (active, standby, warmup).") + + pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("group")) + cobra.CheckErr(cmd.MarkFlagRequired("member")) + cobra.CheckErr(cmd.MarkFlagRequired("assignment")) + + return cmd +} + +func (c *consumerCommand) listStreamGroupMemberTargetAssignmentTasks(cmd *cobra.Command, _ []string) error { + tasks, err := c.getStreamGroupMemberTargetAssignmentTasks(cmd) + if err != nil { + return err + } + + list := output.NewList(cmd) + for _, task := range tasks { + list.Add(&streamTaskOut{ + Kind: task.GetKind(), + SubtopologyId: task.GetSubtopologyId(), + PartitionIds: task.GetPartitionIds(), + }) + } + + return list.Print() +} + +func (c *consumerCommand) getStreamGroupMemberTargetAssignmentTasks(cmd *cobra.Command) ([]kafkarestv3Internal.StreamsTaskData, error) { + groupId, err := cmd.Flags().GetString("group") + if err != nil { + return nil, err + } + + memberId, err := cmd.Flags().GetString("member") + if err != nil { + return nil, err + } + + assignmentType, err := cmd.Flags().GetString("assignment") + if err != nil { + return nil, err + } + + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return nil, err + } + + resp, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroupMemberTargetAssignmentTasks(groupId, memberId, assignmentType) + if err != nil { + return nil, err + } + + return resp.Data, nil +} diff --git a/internal/kafka/command_kafka_stream_group_member_target_task_partitions..go b/internal/kafka/command_kafka_stream_group_member_target_task_partitions..go new file mode 100644 index 0000000000..91c02cc483 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member_target_task_partitions..go @@ -0,0 +1,16 @@ +package kafka + +import ( + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupMemberTargetAssignmentTaskPartitionsCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "stream-group-member-target-assignment-task-partitions", + Short: "Manage Kafka stream group member target assignment task partitions.", + } + + cmd.AddCommand(c.newStreamGroupMemberTargetAssignmentTaskPartitionsDescribeCommand()) + + return cmd +} diff --git a/internal/kafka/command_kafka_stream_group_member_target_task_partitions_describe.go b/internal/kafka/command_kafka_stream_group_member_target_task_partitions_describe.go new file mode 100644 index 0000000000..c90154f16c --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member_target_task_partitions_describe.go @@ -0,0 +1,77 @@ +package kafka + +import ( + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupMemberTargetAssignmentTaskPartitionsDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "task-partitions-describe ", + Short: "Describe stream group member target assignment task partitions", + Args: cobra.ExactArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), + RunE: c.streamGroupMemberTargetAssignmentTaskPartitionsDescribe, + } + + cmd.Flags().String("group", "", "Group Id.") + cmd.Flags().String("member", "", "Member Id.") + cmd.Flags().String("subtopology", "", "Subtopology Id.") + cmd.Flags().String("assignment-type", "", "Assignments type (active, standby, warmup).") + + pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("group")) + cobra.CheckErr(cmd.MarkFlagRequired("member")) + cobra.CheckErr(cmd.MarkFlagRequired("subtopology")) + cobra.CheckErr(cmd.MarkFlagRequired("assignment-type")) + + return cmd +} + +func (c *consumerCommand) streamGroupMemberTargetAssignmentTaskPartitionsDescribe(cmd *cobra.Command, args []string) error { + groupId, err := cmd.Flags().GetString("group") + if err != nil { + return err + } + + memberId, err := cmd.Flags().GetString("member") + if err != nil { + return err + } + + subtopologyId, err := cmd.Flags().GetString("subtopology") + if err != nil { + return err + } + + assignmentsType, err := cmd.Flags().GetString("assignment-type") + if err != nil { + return err + } + + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return err + } + + taskPartitions, err := kafkaREST.CloudClientInternal. + GetKafkaStreamsGroupMemberTargetAssignmentTaskPartitions(groupId, memberId, assignmentsType, subtopologyId) + if err != nil { + return err + } + + table := output.NewTable(cmd) + table.Add(&streamTaskOut{ + Kind: taskPartitions.GetKind(), + SubtopologyId: taskPartitions.GetSubtopologyId(), + PartitionIds: taskPartitions.GetPartitionIds(), + }) + + return table.Print() +} diff --git a/internal/kafka/command_kafka_stream_group_member_task_partitions.go b/internal/kafka/command_kafka_stream_group_member_task_partitions.go new file mode 100644 index 0000000000..f4759e77bc --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member_task_partitions.go @@ -0,0 +1,22 @@ +package kafka + +import ( + "github.com/spf13/cobra" +) + +type streamTaskOut struct { + Kind string `human:"Kind" serialized:"kind"` + SubtopologyId string `human:"Subtopology Id" serialized:"subtopology_id"` + PartitionIds []int32 `human:"Partition Ids" serialized:"partition_ids"` +} + +func (c *consumerCommand) newStreamGroupMemberTaskPartitionsCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "stream-group-member-task-partitions", + Short: "Manage Kafka stream group member task partitions.", + } + + cmd.AddCommand(c.newStreamGroupMemberTaskPartitionsDescribeCommand()) + + return cmd +} diff --git a/internal/kafka/command_kafka_stream_group_member_task_partitions_describe.go b/internal/kafka/command_kafka_stream_group_member_task_partitions_describe.go new file mode 100644 index 0000000000..3e447977d1 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_member_task_partitions_describe.go @@ -0,0 +1,76 @@ +package kafka + +import ( + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupMemberTaskPartitionsDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe stream group member task partitions", + Args: cobra.ExactArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), + RunE: c.streamGroupMemberTaskPartitionsDescribe, + } + + cmd.Flags().String("group", "", "Group Id.") + cmd.Flags().String("member", "", "Member Id.") + cmd.Flags().String("subtopology", "", "Subtopology Id.") + cmd.Flags().String("assignment", "", "Assignments type (active, standby, warmup).") + + pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("group")) + cobra.CheckErr(cmd.MarkFlagRequired("member")) + cobra.CheckErr(cmd.MarkFlagRequired("subtopology")) + cobra.CheckErr(cmd.MarkFlagRequired("assignment")) + + return cmd +} + +func (c *consumerCommand) streamGroupMemberTaskPartitionsDescribe(cmd *cobra.Command, args []string) error { + groupId, err := cmd.Flags().GetString("group") + if err != nil { + return err + } + + memberId, err := cmd.Flags().GetString("member") + if err != nil { + return err + } + + subtopologyId, err := cmd.Flags().GetString("subtopology") + if err != nil { + return err + } + + assignmentsType, err := cmd.Flags().GetString("assignment") + if err != nil { + return err + } + + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return err + } + + taskPartitions, err := kafkaREST.CloudClientInternal.GetKafkaStreamGroupMemberAssignmentTaskPartitions(groupId, memberId, assignmentsType, subtopologyId) + if err != nil { + return err + } + + table := output.NewTable(cmd) + table.Add(&streamTaskOut{ + Kind: taskPartitions.GetKind(), + SubtopologyId: taskPartitions.GetSubtopologyId(), + PartitionIds: taskPartitions.GetPartitionIds(), + }) + + return table.Print() +} diff --git a/internal/kafka/command_kafka_stream_group_subtopology.go b/internal/kafka/command_kafka_stream_group_subtopology.go new file mode 100644 index 0000000000..31a970f5f6 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_subtopology.go @@ -0,0 +1,25 @@ +package kafka + +import ( + "github.com/spf13/cobra" +) + +type streamGroupSubtopologyOut struct { + Kind string `human:"Kind" serialized:"kind"` + ClusterId string `human:"Cluster Id" serialized:"cluster_id"` + GroupId string `human:"Group Id" serialized:"group_id"` + SubtopologyId string `human:"Subtopology Id" serialized:"subtopology_id"` + SourceTopics []string `human:"Source Topics" serialized:"source_topics"` +} + +func (c *consumerCommand) newStreamGroupSubtopologyCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "stream-group-subtopology", + Short: "Manage Kafka stream group subtopologies.", + } + + cmd.AddCommand(c.newStreamGroupSubtopologyDescribeCommand()) + cmd.AddCommand(c.newStreamGroupSubtopologyListCommand()) + + return cmd +} diff --git a/internal/kafka/command_kafka_stream_group_subtopology_describe.go b/internal/kafka/command_kafka_stream_group_subtopology_describe.go new file mode 100644 index 0000000000..4ddd1715f4 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_subtopology_describe.go @@ -0,0 +1,64 @@ +package kafka + +import ( + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupSubtopologyDescribeCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe stream group subtopology", + Args: cobra.ExactArgs(1), + ValidArgsFunction: pcmd.NewValidArgsFunction(c.validStreamGroupArgs), + RunE: c.streamGroupSubtopologyDescribe, + } + + cmd.Flags().String("group", "", "Group Id.") + cmd.Flags().String("subtopology", "", "Subtopology Id.") + + pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("group")) + cobra.CheckErr(cmd.MarkFlagRequired("subtopology")) + + return cmd +} + +func (c *consumerCommand) streamGroupSubtopologyDescribe(cmd *cobra.Command, args []string) error { + groupId, err := cmd.Flags().GetString("group") + if err != nil { + return err + } + + subtopologyId, err := cmd.Flags().GetString("subtopology") + if err != nil { + return err + } + + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return err + } + + subtopology, err := kafkaREST.CloudClientInternal.GetKafkaStreamGroupSubtopology(groupId, subtopologyId) + if err != nil { + return err + } + + table := output.NewTable(cmd) + table.Add(&streamGroupSubtopologyOut{ + Kind: subtopology.GetKind(), + ClusterId: subtopology.GetClusterId(), + GroupId: subtopology.GetGroupId(), + SubtopologyId: subtopology.GetSubtopologyId(), + SourceTopics: subtopology.GetSourceTopics(), + }) + + return table.Print() +} diff --git a/internal/kafka/command_kafka_stream_group_subtopology_list.go b/internal/kafka/command_kafka_stream_group_subtopology_list.go new file mode 100644 index 0000000000..00c90eb133 --- /dev/null +++ b/internal/kafka/command_kafka_stream_group_subtopology_list.go @@ -0,0 +1,69 @@ +package kafka + +import ( + kafkarestv3Internal "github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest/v3" + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/output" + "github.com/spf13/cobra" +) + +func (c *consumerCommand) newStreamGroupSubtopologyListCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List kafka stream group subtopologies.", + Args: cobra.NoArgs, + RunE: c.listStreamGroupSubtopologies, + Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireNonAPIKeyCloudLogin}, + } + + cmd.Flags().String("group", "", "Group Id.") + + pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("group")) + + return cmd +} + +func (c *consumerCommand) listStreamGroupSubtopologies(cmd *cobra.Command, _ []string) error { + subtopologies, err := c.getStreamGroupSubtopologies(cmd) + if err != nil { + return err + } + + list := output.NewList(cmd) + for _, subtopology := range subtopologies { + list.Add(&streamGroupSubtopologyOut{ + Kind: subtopology.GetKind(), + ClusterId: subtopology.GetClusterId(), + GroupId: subtopology.GetGroupId(), + SubtopologyId: subtopology.GetSubtopologyId(), + SourceTopics: subtopology.GetSourceTopics(), + }) + } + + return list.Print() +} + +func (c *consumerCommand) getStreamGroupSubtopologies(cmd *cobra.Command) ([]kafkarestv3Internal.StreamsGroupSubtopologyData, error) { + groupId, err := cmd.Flags().GetString("group") + if err != nil { + return nil, err + } + + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return nil, err + } + + resp, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroupMemberSubtopologies(groupId) + if err != nil { + return nil, err + } + + return resp.Data, nil +} diff --git a/pkg/ccloudv2/kafkarest.go b/pkg/ccloudv2/kafkarest.go index c558dd74e0..7a82f2b953 100644 --- a/pkg/ccloudv2/kafkarest.go +++ b/pkg/ccloudv2/kafkarest.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" + kafkarestv3Internal "github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest/v3" kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" "github.com/confluentinc/cli/v4/pkg/ccstructs" @@ -37,6 +38,34 @@ func NewKafkaRestClient(url, clusterId, userAgent, authToken string, unsafeTrace } } +type KafkaRestClientInternal struct { + *kafkarestv3Internal.APIClient + AuthToken string + ClusterId string +} + +func NewKafkaRestClientInternal(url, clusterId, userAgent, authToken string, unsafeTrace bool) *KafkaRestClientInternal { + cfg := kafkarestv3Internal.NewConfiguration() + cfg.Debug = unsafeTrace + cfg.HTTPClient = NewRetryableHttpClient(nil, unsafeTrace) + cfg.Servers = kafkarestv3Internal.ServerConfigurations{{URL: url}} + cfg.UserAgent = userAgent + + return &KafkaRestClientInternal{ + APIClient: kafkarestv3Internal.NewAPIClient(cfg), + AuthToken: authToken, + ClusterId: clusterId, + } +} + +func (c *KafkaRestClientInternal) kafkaRestApiContextInternal() context.Context { + return context.WithValue(context.Background(), kafkarestv3Internal.ContextAccessToken, c.AuthToken) +} + +func (c *KafkaRestClientInternal) GetUrlInternal() string { + return c.GetConfig().Servers[0].URL +} + func (c *KafkaRestClient) GetUrl() string { return c.GetConfig().Servers[0].URL } @@ -365,3 +394,63 @@ func (c *KafkaRestClient) UpdateKafkaTopicPartitionCount(topicName string, updat func (c *KafkaRestClient) GetKafkaTopic(topicName string) (kafkarestv3.TopicData, *http.Response, error) { return c.TopicV3Api.GetKafkaTopic(c.kafkaRestApiContext(), c.ClusterId, topicName).Execute() } + +func (c *KafkaRestClientInternal) GetKafkaStreamGroup(groupId string) (kafkarestv3Internal.StreamsGroupData, error) { + res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroup(c.kafkaRestApiContextInternal(), c.ClusterId, groupId).Execute() + return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) +} + +func (c *KafkaRestClientInternal) GetKafkaStreamGroupMember(groupId, memberId string) (kafkarestv3Internal.StreamsGroupMemberData, error) { + res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroupMember(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, memberId).Execute() + return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) +} + +func (c *KafkaRestClientInternal) GetKafkaStreamGroupMemberAssignment(groupId, memberId string) (kafkarestv3Internal.StreamsGroupMemberAssignmentData, error) { + res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroupMemberAssignments(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, memberId).Execute() + return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) +} + +func (c *KafkaRestClientInternal) GetKafkaStreamGroupMemberTargetAssignment(groupId, memberId string) (kafkarestv3Internal.StreamsGroupMemberAssignmentData, error) { + res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroupMemberTargetAssignments(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, memberId).Execute() + return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) +} + +func (c *KafkaRestClientInternal) GetKafkaStreamGroupMemberAssignmentTaskPartitions(groupId, memberId, assignmentsType, subtopologyId string) (kafkarestv3Internal.StreamsTaskData, error) { + res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroupMemberAssignmentTaskPartitions(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, memberId, assignmentsType, subtopologyId).Execute() + return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) +} + +func (c *KafkaRestClientInternal) GetKafkaStreamGroupSubtopology(groupId, topology string) (kafkarestv3Internal.StreamsGroupSubtopologyData, error) { + res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroupSubtopology(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, topology).Execute() + return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) +} + +func (c *KafkaRestClientInternal) GetKafkaStreamsGroupMemberTargetAssignmentTaskPartitions(groupId, memberId, assignmentsType, subtopologyId string) (kafkarestv3Internal.StreamsTaskData, error) { + res, httpResp, err := c.StreamsGroupV3Api.GetKafkaStreamsGroupMemberTargetAssignmentTaskPartitions(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, memberId, assignmentsType, subtopologyId).Execute() + return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) +} + +func (c *KafkaRestClientInternal) ListKafkaStreamsGroup() (kafkarestv3Internal.StreamsGroupDataList, error) { + res, httpResp, err := c.StreamsGroupV3Api.ListKafkaStreamsGroups(c.kafkaRestApiContextInternal(), c.ClusterId).Execute() + return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) +} + +func (c *KafkaRestClientInternal) ListKafkaStreamsGroupMembers(groupId string) (kafkarestv3Internal.StreamsGroupMemberDataList, error) { + res, httpResp, err := c.StreamsGroupV3Api.ListKafkaStreamsGroupMembers(c.kafkaRestApiContextInternal(), c.ClusterId, groupId).Execute() + return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) +} + +func (c *KafkaRestClientInternal) ListKafkaStreamsGroupMemberAssignmentTasks(groupId, memberId, assignmentType string) (kafkarestv3Internal.StreamsTaskDataList, error) { + res, httpResp, err := c.StreamsGroupV3Api.ListKafkaStreamsGroupMemberAssignmentTasks(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, memberId, assignmentType).Execute() + return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) +} + +func (c *KafkaRestClientInternal) ListKafkaStreamsGroupMemberTargetAssignmentTasks(groupId, memberId, assignmentType string) (kafkarestv3Internal.StreamsTaskDataList, error) { + res, httpResp, err := c.StreamsGroupV3Api.ListKafkaStreamsGroupMemberTargetAssignmentTasks(c.kafkaRestApiContextInternal(), c.ClusterId, groupId, memberId, assignmentType).Execute() + return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) +} + +func (c *KafkaRestClientInternal) ListKafkaStreamsGroupMemberSubtopologies(groupId string) (kafkarestv3Internal.StreamsGroupSubtopologyDataList, error) { + res, httpResp, err := c.StreamsGroupV3Api.ListKafkaStreamsGroupSubtopologies(c.kafkaRestApiContextInternal(), c.ClusterId, groupId).Execute() + return res, kafkarest.NewError(c.GetUrlInternal(), err, httpResp) +} diff --git a/pkg/cmd/flags.go b/pkg/cmd/flags.go index 70db3c0f2d..000e38407b 100644 --- a/pkg/cmd/flags.go +++ b/pkg/cmd/flags.go @@ -618,6 +618,24 @@ func AutocompleteShareGroups(cmd *cobra.Command, c *AuthenticatedCLICommand) []s return suggestions } +func AutocompleteStreamGroups(cmd *cobra.Command, c *AuthenticatedCLICommand) []string { + kafkaREST, err := c.GetKafkaREST(cmd) + if err != nil { + return nil + } + + streamGroups, err := kafkaREST.CloudClientInternal.ListKafkaStreamsGroup() + if err != nil { + return nil + } + + suggestions := make([]string, len(streamGroups.Data)) + for i, streamGroup := range streamGroups.Data { + suggestions[i] = streamGroup.GetGroupId() + } + return suggestions +} + func AddNetworkFlag(cmd *cobra.Command, c *AuthenticatedCLICommand) { cmd.Flags().String("network", "", "Network ID.") RegisterFlagCompletionFunc(cmd, "network", func(cmd *cobra.Command, args []string) []string { diff --git a/pkg/cmd/kafka_rest.go b/pkg/cmd/kafka_rest.go index 948ee80e5f..6fe4bfaec8 100644 --- a/pkg/cmd/kafka_rest.go +++ b/pkg/cmd/kafka_rest.go @@ -9,9 +9,10 @@ import ( ) type KafkaREST struct { - Context context.Context - CloudClient *ccloudv2.KafkaRestClient - Client *kafkarestv3.APIClient + Context context.Context + CloudClient *ccloudv2.KafkaRestClient + CloudClientInternal *ccloudv2.KafkaRestClientInternal + Client *kafkarestv3.APIClient } func (k *KafkaREST) GetClusterId() string { From 1603eeb3c75d5fc9b6e853ef9980b0f1e06be714 Mon Sep 17 00:00:00 2001 From: Cynthia Qin Date: Wed, 11 Mar 2026 00:52:25 -0700 Subject: [PATCH 2/4] Update dependency --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 29bd509e65..363f1421a1 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/charmbracelet/lipgloss v0.11.0 github.com/client9/gospell v0.0.0-20160306015952-90dfc71015df github.com/confluentinc/ccloud-sdk-go-v1-public v0.0.0-20250521223017-0e8f6f971b52 + github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest v0.23.1-0.20260206203324-971ae1cf8ca8 github.com/confluentinc/ccloud-sdk-go-v2/ai v0.1.0 github.com/confluentinc/ccloud-sdk-go-v2/apikeys v0.4.0 github.com/confluentinc/ccloud-sdk-go-v2/billing v0.3.0 @@ -164,7 +165,6 @@ require ( github.com/charmbracelet/x/term v0.1.1 // indirect github.com/charmbracelet/x/windows v0.1.0 // indirect github.com/cloudflare/circl v1.6.1 // indirect - github.com/confluentinc/ccloud-sdk-go-v2-internal/kafkarest v0.23.1-0.20260206203324-971ae1cf8ca8 // indirect github.com/confluentinc/proto-go-setter v0.3.0 // indirect github.com/cyphar/filepath-securejoin v0.2.5 // indirect github.com/distribution/reference v0.6.0 // indirect From 649039a1b96230108ac2a7d961bb36a979abced6 Mon Sep 17 00:00:00 2001 From: Cynthia Qin Date: Wed, 11 Mar 2026 11:38:40 -0700 Subject: [PATCH 3/4] Update to better match API spec --- internal/kafka/command_consumer.go | 1 + ...and_kafka_stream_group_member_target_task_partitions.go} | 0 ...nd_kafka_stream_group_member_task_partitions_describe.go | 6 +++--- 3 files changed, 4 insertions(+), 3 deletions(-) rename internal/kafka/{command_kafka_stream_group_member_target_task_partitions..go => command_kafka_stream_group_member_target_task_partitions.go} (100%) diff --git a/internal/kafka/command_consumer.go b/internal/kafka/command_consumer.go index 0bdc085395..254b1cf508 100644 --- a/internal/kafka/command_consumer.go +++ b/internal/kafka/command_consumer.go @@ -37,6 +37,7 @@ func newConsumerCommand(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Com cmd.AddCommand(c.newListCommandOnPrem()) } cmd.AddCommand(c.newGroupCommand(cfg)) + cmd.AddCommand(c.newStreamGroupCommand(cfg)) return cmd } diff --git a/internal/kafka/command_kafka_stream_group_member_target_task_partitions..go b/internal/kafka/command_kafka_stream_group_member_target_task_partitions.go similarity index 100% rename from internal/kafka/command_kafka_stream_group_member_target_task_partitions..go rename to internal/kafka/command_kafka_stream_group_member_target_task_partitions.go diff --git a/internal/kafka/command_kafka_stream_group_member_task_partitions_describe.go b/internal/kafka/command_kafka_stream_group_member_task_partitions_describe.go index 3e447977d1..0f54f33f01 100644 --- a/internal/kafka/command_kafka_stream_group_member_task_partitions_describe.go +++ b/internal/kafka/command_kafka_stream_group_member_task_partitions_describe.go @@ -18,7 +18,7 @@ func (c *consumerCommand) newStreamGroupMemberTaskPartitionsDescribeCommand() *c cmd.Flags().String("group", "", "Group Id.") cmd.Flags().String("member", "", "Member Id.") cmd.Flags().String("subtopology", "", "Subtopology Id.") - cmd.Flags().String("assignment", "", "Assignments type (active, standby, warmup).") + cmd.Flags().String("assignment-type", "", "Assignments type (active, standby, warmup).") pcmd.AddEndpointFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) @@ -29,7 +29,7 @@ func (c *consumerCommand) newStreamGroupMemberTaskPartitionsDescribeCommand() *c cobra.CheckErr(cmd.MarkFlagRequired("group")) cobra.CheckErr(cmd.MarkFlagRequired("member")) cobra.CheckErr(cmd.MarkFlagRequired("subtopology")) - cobra.CheckErr(cmd.MarkFlagRequired("assignment")) + cobra.CheckErr(cmd.MarkFlagRequired("assignment-type")) return cmd } @@ -50,7 +50,7 @@ func (c *consumerCommand) streamGroupMemberTaskPartitionsDescribe(cmd *cobra.Com return err } - assignmentsType, err := cmd.Flags().GetString("assignment") + assignmentsType, err := cmd.Flags().GetString("assignment-type") if err != nil { return err } From 07fca9aba7d216f0148b01d9dc87a21a43f4af4f Mon Sep 17 00:00:00 2001 From: Cynthia Qin Date: Wed, 11 Mar 2026 13:53:07 -0700 Subject: [PATCH 4/4] Add integration tests --- pkg/cmd/prerunner.go | 7 +- .../iam/certificate-authority/use.golden | 1 + .../kafka/consumer/group/list-onprem.golden | 10 +- .../output/kafka/consumer/group/list.golden | 10 +- .../stream-group/assignment-describe.golden | 9 + .../stream-group/assignment-task-list.golden | 3 + .../consumer/stream-group/describe-dne.golden | 1 + .../stream-group/describe-json.golden | 13 + .../consumer/stream-group/describe.golden | 13 + .../consumer/stream-group/list-json.golden | 28 ++ .../kafka/consumer/stream-group/list.golden | 4 + .../stream-group/member-describe.golden | 14 + .../stream-group/member-list-json.golden | 16 + .../consumer/stream-group/member-list.golden | 3 + .../stream-group/subtopology-describe.golden | 7 + .../stream-group/subtopology-list-json.golden | 9 + .../stream-group/subtopology-list.golden | 3 + .../target-assignment-describe.golden | 9 + .../target-assignment-task-list.golden | 3 + .../target-task-partitions-describe.golden | 5 + .../task-partitions-describe.golden | 5 + test/kafka_test.go | 33 ++ test/test-server/kafka_rest_router.go | 348 ++++++++++++++++++ 23 files changed, 541 insertions(+), 13 deletions(-) create mode 100644 test/fixtures/output/iam/certificate-authority/use.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/assignment-describe.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/assignment-task-list.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/describe-dne.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/describe-json.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/describe.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/list-json.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/list.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/member-describe.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/member-list-json.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/member-list.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/subtopology-describe.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/subtopology-list-json.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/subtopology-list.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/target-assignment-describe.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/target-assignment-task-list.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/target-task-partitions-describe.golden create mode 100644 test/fixtures/output/kafka/consumer/stream-group/task-partitions-describe.golden diff --git a/pkg/cmd/prerunner.go b/pkg/cmd/prerunner.go index a66b9f7c41..d4b811c91d 100644 --- a/pkg/cmd/prerunner.go +++ b/pkg/cmd/prerunner.go @@ -380,9 +380,10 @@ func (r *PreRun) setCCloudClient(c *AuthenticatedCLICommand) error { } kafkaRest := &KafkaREST{ - Context: context.WithValue(context.Background(), kafkarestv3.ContextAccessToken, dataplaneToken), - CloudClient: ccloudv2.NewKafkaRestClient(restEndpoint, lkc, r.Version.UserAgent, dataplaneToken, unsafeTrace), - Client: CreateKafkaRESTClient(restEndpoint, unsafeTrace), + Context: context.WithValue(context.Background(), kafkarestv3.ContextAccessToken, dataplaneToken), + CloudClient: ccloudv2.NewKafkaRestClient(restEndpoint, lkc, r.Version.UserAgent, dataplaneToken, unsafeTrace), + CloudClientInternal: ccloudv2.NewKafkaRestClientInternal(restEndpoint, lkc, r.Version.UserAgent, dataplaneToken, unsafeTrace), + Client: CreateKafkaRESTClient(restEndpoint, unsafeTrace), } return kafkaRest, nil }) diff --git a/test/fixtures/output/iam/certificate-authority/use.golden b/test/fixtures/output/iam/certificate-authority/use.golden new file mode 100644 index 0000000000..9910b99fb2 --- /dev/null +++ b/test/fixtures/output/iam/certificate-authority/use.golden @@ -0,0 +1 @@ +Using certificate authority "op-12345". diff --git a/test/fixtures/output/kafka/consumer/group/list-onprem.golden b/test/fixtures/output/kafka/consumer/group/list-onprem.golden index c6567188a2..5a4d4460cb 100644 --- a/test/fixtures/output/kafka/consumer/group/list-onprem.golden +++ b/test/fixtures/output/kafka/consumer/group/list-onprem.golden @@ -1,5 +1,5 @@ - Cluster | Consumer Group | Coordinator | Simple | Partition Assignor | State -------------+------------------+-------------+--------+------------------------------------------------------+---------------------- - cluster-1 | consumer-group-1 | broker-1 | true | org.apache.kafka.clients.consumer.RoundRobinAssignor | STABLE - cluster-1 | consumer-group-2 | broker-2 | false | org.apache.kafka.clients.consumer.StickyAssignor | PREPARING_REBALANCE - cluster-1 | consumer-group-3 | broker-3 | false | org.apache.kafka.clients.consumer.RangeAssignor | DEAD + Cluster | Consumer Group | Coordinator | Simple | Partition Assignor | State | Type +------------+------------------+-------------+--------+------------------------------------------------------+---------------------+------- + cluster-1 | consumer-group-1 | broker-1 | true | org.apache.kafka.clients.consumer.RoundRobinAssignor | STABLE | + cluster-1 | consumer-group-2 | broker-2 | false | org.apache.kafka.clients.consumer.StickyAssignor | PREPARING_REBALANCE | + cluster-1 | consumer-group-3 | broker-3 | false | org.apache.kafka.clients.consumer.RangeAssignor | DEAD | diff --git a/test/fixtures/output/kafka/consumer/group/list.golden b/test/fixtures/output/kafka/consumer/group/list.golden index 7b5b76548a..dbe0550378 100644 --- a/test/fixtures/output/kafka/consumer/group/list.golden +++ b/test/fixtures/output/kafka/consumer/group/list.golden @@ -1,5 +1,5 @@ - Cluster | Consumer Group | Coordinator | Simple | Partition Assignor | State ------------+------------------+-------------+--------+------------------------------------------------------+---------------------- - lkc-1234 | consumer-group-1 | broker-1 | true | org.apache.kafka.clients.consumer.RoundRobinAssignor | STABLE - lkc-1234 | consumer-group-2 | broker-2 | false | org.apache.kafka.clients.consumer.StickyAssignor | PREPARING_REBALANCE - lkc-1234 | consumer-group-3 | broker-3 | false | org.apache.kafka.clients.consumer.RangeAssignor | DEAD + Cluster | Consumer Group | Coordinator | Simple | Partition Assignor | State | Type +-----------+------------------+-------------+--------+------------------------------------------------------+---------------------+------- + lkc-1234 | consumer-group-1 | broker-1 | true | org.apache.kafka.clients.consumer.RoundRobinAssignor | STABLE | + lkc-1234 | consumer-group-2 | broker-2 | false | org.apache.kafka.clients.consumer.StickyAssignor | PREPARING_REBALANCE | + lkc-1234 | consumer-group-3 | broker-3 | false | org.apache.kafka.clients.consumer.RangeAssignor | DEAD | diff --git a/test/fixtures/output/kafka/consumer/stream-group/assignment-describe.golden b/test/fixtures/output/kafka/consumer/stream-group/assignment-describe.golden new file mode 100644 index 0000000000..0b04e5b22b --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/assignment-describe.golden @@ -0,0 +1,9 @@ ++---------------+-------------------------------------------------------------------------------------------------+ +| Kind | KafkaStreamsGroupMemberAssignment | +| Cluster Id | lkc-1234 | +| Group Id | streams-group-1 | +| Member Id | member-1 | +| Active Tasks | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members/member-1/assignments/active | +| Standby Tasks | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members/member-1/assignments/standby | +| Warmup Tasks | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members/member-1/assignments/warmup | ++---------------+-------------------------------------------------------------------------------------------------+ diff --git a/test/fixtures/output/kafka/consumer/stream-group/assignment-task-list.golden b/test/fixtures/output/kafka/consumer/stream-group/assignment-task-list.golden new file mode 100644 index 0000000000..a29693198d --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/assignment-task-list.golden @@ -0,0 +1,3 @@ + Kind | Subtopology Id | Partition Ids +-------------------+----------------+---------------- + KafkaStreamsTask | subtopology-1 | 0, 1, 2 diff --git a/test/fixtures/output/kafka/consumer/stream-group/describe-dne.golden b/test/fixtures/output/kafka/consumer/stream-group/describe-dne.golden new file mode 100644 index 0000000000..716b863527 --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/describe-dne.golden @@ -0,0 +1 @@ +Error: 404 Not Found diff --git a/test/fixtures/output/kafka/consumer/stream-group/describe-json.golden b/test/fixtures/output/kafka/consumer/stream-group/describe-json.golden new file mode 100644 index 0000000000..ec2259e477 --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/describe-json.golden @@ -0,0 +1,13 @@ +{ + "kind": "KafkaStreamsGroup", + "cluster_id": "lkc-1234", + "group_id": "streams-group-1", + "state": "STABLE", + "member_count": 2, + "subtopology_count": 1, + "group_epoch": 3, + "topology_epoch": 2, + "target_assignment_epoch": 2, + "members": "/kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members", + "subtopologies": "/kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/subtopologies" +} diff --git a/test/fixtures/output/kafka/consumer/stream-group/describe.golden b/test/fixtures/output/kafka/consumer/stream-group/describe.golden new file mode 100644 index 0000000000..971b1bdd37 --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/describe.golden @@ -0,0 +1,13 @@ ++-------------------------+--------------------------------------------------------------------------+ +| Kind | KafkaStreamsGroup | +| Cluster Id | lkc-1234 | +| Group Id | streams-group-1 | +| State | STABLE | +| Member Count | 2 | +| Subtopology Count | 1 | +| Group Epoch | 3 | +| Topology Epoch | 2 | +| Target Assignment Epoch | 2 | +| Members | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members | +| Subtopologies | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/subtopologies | ++-------------------------+--------------------------------------------------------------------------+ diff --git a/test/fixtures/output/kafka/consumer/stream-group/list-json.golden b/test/fixtures/output/kafka/consumer/stream-group/list-json.golden new file mode 100644 index 0000000000..41a642fb0d --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/list-json.golden @@ -0,0 +1,28 @@ +[ + { + "kind": "KafkaStreamsGroup", + "cluster_id": "lkc-1234", + "group_id": "streams-group-1", + "state": "STABLE", + "member_count": 2, + "subtopology_count": 1, + "group_epoch": 3, + "topology_epoch": 2, + "target_assignment_epoch": 2, + "members": "/kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members", + "subtopologies": "/kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/subtopologies" + }, + { + "kind": "KafkaStreamsGroup", + "cluster_id": "lkc-1234", + "group_id": "streams-group-2", + "state": "RECONCILING", + "member_count": 1, + "subtopology_count": 2, + "group_epoch": 1, + "topology_epoch": 1, + "target_assignment_epoch": 1, + "members": "/kafka/v3/clusters/lkc-1234/streams-groups/streams-group-2/members", + "subtopologies": "/kafka/v3/clusters/lkc-1234/streams-groups/streams-group-2/subtopologies" + } +] diff --git a/test/fixtures/output/kafka/consumer/stream-group/list.golden b/test/fixtures/output/kafka/consumer/stream-group/list.golden new file mode 100644 index 0000000000..f88b25c7f2 --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/list.golden @@ -0,0 +1,4 @@ + Kind | Cluster Id | Group Id | State | Member Count | Subtopology Count | Group Epoch | Topology Epoch | Target Assignment Epoch | Members | Subtopologies +--------------------+------------+-----------------+-------------+--------------+-------------------+-------------+----------------+-------------------------+--------------------------------------------------------------------+--------------------------------------------------------------------------- + KafkaStreamsGroup | lkc-1234 | streams-group-1 | STABLE | 2 | 1 | 3 | 2 | 2 | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/subtopologies + KafkaStreamsGroup | lkc-1234 | streams-group-2 | RECONCILING | 1 | 2 | 1 | 1 | 1 | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-2/members | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-2/subtopologies diff --git a/test/fixtures/output/kafka/consumer/stream-group/member-describe.golden b/test/fixtures/output/kafka/consumer/stream-group/member-describe.golden new file mode 100644 index 0000000000..786bc53b98 --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/member-describe.golden @@ -0,0 +1,14 @@ ++-------------------+------------------------------------------------------------------------------------------------+ +| Kind | KafkaStreamsGroupMember | +| Cluster Id | lkc-1234 | +| Group Id | streams-group-1 | +| Member Id | member-1 | +| Process Id | process-1 | +| Client Id | client-1 | +| Instance Id | instance-1 | +| Member Epoch | 2 | +| Topology Epoch | 2 | +| Is Classic | false | +| Assignments | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members/member-1/assignments | +| Target Assignment | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members/member-1/target-assignments | ++-------------------+------------------------------------------------------------------------------------------------+ diff --git a/test/fixtures/output/kafka/consumer/stream-group/member-list-json.golden b/test/fixtures/output/kafka/consumer/stream-group/member-list-json.golden new file mode 100644 index 0000000000..0e5795d334 --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/member-list-json.golden @@ -0,0 +1,16 @@ +[ + { + "kind": "KafkaStreamsGroupMember", + "cluster_id": "lkc-1234", + "group_id": "streams-group-1", + "member_id": "member-1", + "process_id": "process-1", + "client_id": "client-1", + "instance_id": "instance-1", + "member_epoch": 2, + "topology_epoch": 2, + "is_classic": false, + "assignments": "/kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members/member-1/assignments", + "target_assignment": "/kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members/member-1/target-assignments" + } +] diff --git a/test/fixtures/output/kafka/consumer/stream-group/member-list.golden b/test/fixtures/output/kafka/consumer/stream-group/member-list.golden new file mode 100644 index 0000000000..c7c821e9b1 --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/member-list.golden @@ -0,0 +1,3 @@ + Kind | Cluster Id | Group Id | Member Id | Process Id | Client Id | Instance Id | Member Epoch | Topology Epoch | Is Classic | Assignments | Target Assignment +--------------------------+------------+-----------------+-----------+------------+-----------+-------------+--------------+----------------+------------+-----------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------- + KafkaStreamsGroupMember | lkc-1234 | streams-group-1 | member-1 | process-1 | client-1 | instance-1 | 2 | 2 | false | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members/member-1/assignments | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members/member-1/target-assignments diff --git a/test/fixtures/output/kafka/consumer/stream-group/subtopology-describe.golden b/test/fixtures/output/kafka/consumer/stream-group/subtopology-describe.golden new file mode 100644 index 0000000000..4d5e747253 --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/subtopology-describe.golden @@ -0,0 +1,7 @@ ++----------------+------------------------------+ +| Kind | KafkaStreamsGroupSubtopology | +| Cluster Id | lkc-1234 | +| Group Id | streams-group-1 | +| Subtopology Id | subtopology-1 | +| Source Topics | input-topic-1, input-topic-2 | ++----------------+------------------------------+ diff --git a/test/fixtures/output/kafka/consumer/stream-group/subtopology-list-json.golden b/test/fixtures/output/kafka/consumer/stream-group/subtopology-list-json.golden new file mode 100644 index 0000000000..6685c3cb91 --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/subtopology-list-json.golden @@ -0,0 +1,9 @@ +[ + { + "kind": "KafkaStreamsGroupSubtopology", + "cluster_id": "lkc-1234", + "group_id": "streams-group-1", + "subtopology_id": "subtopology-1", + "source_topics": ["input-topic-1", "input-topic-2"] + } +] diff --git a/test/fixtures/output/kafka/consumer/stream-group/subtopology-list.golden b/test/fixtures/output/kafka/consumer/stream-group/subtopology-list.golden new file mode 100644 index 0000000000..35bb997f85 --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/subtopology-list.golden @@ -0,0 +1,3 @@ + Kind | Cluster Id | Group Id | Subtopology Id | Source Topics +-------------------------------+------------+-----------------+----------------+------------------------------- + KafkaStreamsGroupSubtopology | lkc-1234 | streams-group-1 | subtopology-1 | input-topic-1, input-topic-2 diff --git a/test/fixtures/output/kafka/consumer/stream-group/target-assignment-describe.golden b/test/fixtures/output/kafka/consumer/stream-group/target-assignment-describe.golden new file mode 100644 index 0000000000..9c97a03ee9 --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/target-assignment-describe.golden @@ -0,0 +1,9 @@ ++---------------+--------------------------------------------------------------------------------------------------------+ +| Kind | KafkaStreamsGroupMemberAssignment | +| Cluster Id | lkc-1234 | +| Group Id | streams-group-1 | +| Member Id | member-1 | +| Active Tasks | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members/member-1/target-assignments/active | +| Standby Tasks | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members/member-1/target-assignments/standby | +| Warmup Tasks | /kafka/v3/clusters/lkc-1234/streams-groups/streams-group-1/members/member-1/target-assignments/warmup | ++---------------+--------------------------------------------------------------------------------------------------------+ diff --git a/test/fixtures/output/kafka/consumer/stream-group/target-assignment-task-list.golden b/test/fixtures/output/kafka/consumer/stream-group/target-assignment-task-list.golden new file mode 100644 index 0000000000..8c7b61dc4b --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/target-assignment-task-list.golden @@ -0,0 +1,3 @@ + Kind | Subtopology Id | Partition Ids +-------------------+----------------+---------------- + KafkaStreamsTask | subtopology-1 | 0, 1 diff --git a/test/fixtures/output/kafka/consumer/stream-group/target-task-partitions-describe.golden b/test/fixtures/output/kafka/consumer/stream-group/target-task-partitions-describe.golden new file mode 100644 index 0000000000..2f08fb9c5f --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/target-task-partitions-describe.golden @@ -0,0 +1,5 @@ ++----------------+------------------+ +| Kind | KafkaStreamsTask | +| Subtopology Id | subtopology-1 | +| Partition Ids | 0, 1 | ++----------------+------------------+ diff --git a/test/fixtures/output/kafka/consumer/stream-group/task-partitions-describe.golden b/test/fixtures/output/kafka/consumer/stream-group/task-partitions-describe.golden new file mode 100644 index 0000000000..bad6e8269c --- /dev/null +++ b/test/fixtures/output/kafka/consumer/stream-group/task-partitions-describe.golden @@ -0,0 +1,5 @@ ++----------------+------------------+ +| Kind | KafkaStreamsTask | +| Subtopology Id | subtopology-1 | +| Partition Ids | 0, 1, 2 | ++----------------+------------------+ diff --git a/test/kafka_test.go b/test/kafka_test.go index 750388175d..5868cab4b8 100644 --- a/test/kafka_test.go +++ b/test/kafka_test.go @@ -739,6 +739,39 @@ func (s *CLITestSuite) TestKafkaConsumerGroup() { } } +func (s *CLITestSuite) TestKafkaConsumerStreamGroup() { + tests := []CLITest{ + {args: "kafka consumer stream-group list --cluster lkc-1234", fixture: "kafka/consumer/stream-group/list.golden"}, + {args: "kafka consumer stream-group list --cluster lkc-1234 -o json", fixture: "kafka/consumer/stream-group/list-json.golden"}, + {args: "kafka consumer stream-group describe streams-group-1 --group streams-group-1 --cluster lkc-1234", fixture: "kafka/consumer/stream-group/describe.golden"}, + {args: "kafka consumer stream-group describe streams-group-1 --group streams-group-1 --cluster lkc-1234 -o json", fixture: "kafka/consumer/stream-group/describe-json.golden"}, + {args: "kafka consumer stream-group describe streams-group-999 --group streams-group-999 --cluster lkc-1234", fixture: "kafka/consumer/stream-group/describe-dne.golden", exitCode: 1}, + + {args: "kafka consumer stream-group stream-group-member list --group streams-group-1 --cluster lkc-1234", fixture: "kafka/consumer/stream-group/member-list.golden"}, + {args: "kafka consumer stream-group stream-group-member list --group streams-group-1 --cluster lkc-1234 -o json", fixture: "kafka/consumer/stream-group/member-list-json.golden"}, + {args: "kafka consumer stream-group stream-group-member describe member-1 --group streams-group-1 --member member-1 --cluster lkc-1234", fixture: "kafka/consumer/stream-group/member-describe.golden"}, + + {args: "kafka consumer stream-group stream-group-member-assignment describe member-1 --group streams-group-1 --member member-1 --cluster lkc-1234", fixture: "kafka/consumer/stream-group/assignment-describe.golden"}, + {args: "kafka consumer stream-group stream-group-member-assignment list --group streams-group-1 --member member-1 --assignment active --cluster lkc-1234", fixture: "kafka/consumer/stream-group/assignment-task-list.golden"}, + + {args: "kafka consumer stream-group stream-group-member-target-assignment describe member-1 --group streams-group-1 --member member-1 --cluster lkc-1234", fixture: "kafka/consumer/stream-group/target-assignment-describe.golden"}, + {args: "kafka consumer stream-group stream-group-member-target-assignment target-task-list --group streams-group-1 --member member-1 --assignment active --cluster lkc-1234", fixture: "kafka/consumer/stream-group/target-assignment-task-list.golden"}, + + {args: "kafka consumer stream-group stream-group-member-task-partitions describe member-1 --group streams-group-1 --member member-1 --subtopology subtopology-1 --assignment-type active --cluster lkc-1234", fixture: "kafka/consumer/stream-group/task-partitions-describe.golden"}, + + {args: "kafka consumer stream-group stream-group-member-target-assignment-task-partitions task-partitions-describe member-1 --group streams-group-1 --member member-1 --subtopology subtopology-1 --assignment-type active --cluster lkc-1234", fixture: "kafka/consumer/stream-group/target-task-partitions-describe.golden"}, + + {args: "kafka consumer stream-group stream-group-subtopology list --group streams-group-1 --cluster lkc-1234", fixture: "kafka/consumer/stream-group/subtopology-list.golden"}, + {args: "kafka consumer stream-group stream-group-subtopology list --group streams-group-1 --cluster lkc-1234 -o json", fixture: "kafka/consumer/stream-group/subtopology-list-json.golden"}, + {args: "kafka consumer stream-group stream-group-subtopology describe subtopology-1 --group streams-group-1 --subtopology subtopology-1 --cluster lkc-1234", fixture: "kafka/consumer/stream-group/subtopology-describe.golden"}, + } + + for _, test := range tests { + test.login = "cloud" + s.runIntegrationTest(test) + } +} + func (s *CLITestSuite) TestKafkaConsumerGroupLag() { tests := []CLITest{ {args: "kafka consumer group lag describe consumer-group-1 --cluster lkc-describe-dedicated --topic topic-1 --partition 1", fixture: "kafka/consumer/group/lag/describe.golden"}, diff --git a/test/test-server/kafka_rest_router.go b/test/test-server/kafka_rest_router.go index 06e2c204f2..dbd7330d69 100644 --- a/test/test-server/kafka_rest_router.go +++ b/test/test-server/kafka_rest_router.go @@ -25,6 +25,10 @@ const ( clientID2 = "client-2" topicName1 = "topic-1" shareGroupID1 = "share-group-1" + streamsGroupID1 = "streams-group-1" + streamsGroupID2 = "streams-group-2" + streamsMemberID = "member-1" + subtopologyID = "subtopology-1" ) type route struct { @@ -53,6 +57,18 @@ var kafkaRestRoutes = []route{ {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lag-summary", handleKafkaRestLagSummary}, {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lags", handleKafkaRestLags}, {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lags/{topic_name}/partitions/{partition_id}", handleKafkaRestLag}, + {"/kafka/v3/clusters/{cluster_id}/streams-groups", handleKafkaRestStreamsGroups}, + {"/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}", handleKafkaRestStreamsGroup}, + {"/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members", handleKafkaRestStreamsGroupMembers}, + {"/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}", handleKafkaRestStreamsGroupMember}, + {"/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}/assignments", handleKafkaRestStreamsGroupMemberAssignments}, + {"/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}/assignments/{assignments_type}", handleKafkaRestStreamsGroupMemberAssignmentTasks}, + {"/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}/assignments/{assignments_type}/subtopologies/{subtopology_id}", handleKafkaRestStreamsGroupMemberAssignmentTaskPartitions}, + {"/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}/target-assignments", handleKafkaRestStreamsGroupMemberTargetAssignments}, + {"/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}/target-assignments/{assignments_type}", handleKafkaRestStreamsGroupMemberTargetAssignmentTasks}, + {"/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}/target-assignments/{assignments_type}/subtopologies/{subtopology_id}", handleKafkaRestStreamsGroupMemberTargetAssignmentTaskPartitions}, + {"/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/subtopologies", handleKafkaRestStreamsGroupSubtopologies}, + {"/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/subtopologies/{subtopology_id}", handleKafkaRestStreamsGroupSubtopology}, {"/kafka/v3/clusters/{cluster_id}/share-groups", handleKafkaRestShareGroups}, {"/kafka/v3/clusters/{cluster_id}/share-groups/{share_group_id}", handleKafkaRestShareGroup}, {"/kafka/v3/clusters/{cluster_id}/share-groups/{share_group_id}/consumers", handleKafkaRestShareGroupConsumers}, @@ -2436,6 +2452,338 @@ func handleClustersClusterIdTopicsTopicsNamePartitionsReplicaStatus(t *testing.T } } +// Handler for: "/kafka/v3/clusters/{cluster_id}/streams-groups" +func handleKafkaRestStreamsGroups(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + clusterId := mux.Vars(r)["cluster_id"] + switch r.Method { + case http.MethodGet: + response := map[string]interface{}{ + "kind": "KafkaStreamsGroupList", + "metadata": map[string]interface{}{}, + "data": []map[string]interface{}{ + { + "kind": "KafkaStreamsGroup", + "metadata": map[string]interface{}{}, + "cluster_id": clusterId, + "group_id": streamsGroupID1, + "state": "STABLE", + "member_count": int32(2), + "subtopology_count": int32(1), + "group_epoch": int32(3), + "topology_epoch": int32(2), + "target_assignment_epoch": int32(2), + "members": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + streamsGroupID1 + "/members"}, + "subtopologies": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + streamsGroupID1 + "/subtopologies"}, + }, + { + "kind": "KafkaStreamsGroup", + "metadata": map[string]interface{}{}, + "cluster_id": clusterId, + "group_id": streamsGroupID2, + "state": "RECONCILING", + "member_count": int32(1), + "subtopology_count": int32(2), + "group_epoch": int32(1), + "topology_epoch": int32(1), + "target_assignment_epoch": int32(1), + "members": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + streamsGroupID2 + "/members"}, + "subtopologies": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + streamsGroupID2 + "/subtopologies"}, + }, + }, + } + err := json.NewEncoder(w).Encode(response) + require.NoError(t, err) + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}" +func handleKafkaRestStreamsGroup(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + clusterId := vars["cluster_id"] + groupId := vars["group_id"] + switch r.Method { + case http.MethodGet: + if groupId == streamsGroupID1 { + response := map[string]interface{}{ + "kind": "KafkaStreamsGroup", + "metadata": map[string]interface{}{}, + "cluster_id": clusterId, + "group_id": streamsGroupID1, + "state": "STABLE", + "member_count": int32(2), + "subtopology_count": int32(1), + "group_epoch": int32(3), + "topology_epoch": int32(2), + "target_assignment_epoch": int32(2), + "members": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + streamsGroupID1 + "/members"}, + "subtopologies": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + streamsGroupID1 + "/subtopologies"}, + } + err := json.NewEncoder(w).Encode(response) + require.NoError(t, err) + } else { + require.NoError(t, writeErrorResponse(w, http.StatusNotFound, 40403, "This server does not host this streams group.")) + } + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members" +func handleKafkaRestStreamsGroupMembers(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + clusterId := vars["cluster_id"] + groupId := vars["group_id"] + switch r.Method { + case http.MethodGet: + response := map[string]interface{}{ + "kind": "KafkaStreamsGroupMemberList", + "metadata": map[string]interface{}{}, + "data": []map[string]interface{}{ + { + "kind": "KafkaStreamsGroupMember", + "metadata": map[string]interface{}{}, + "cluster_id": clusterId, + "group_id": groupId, + "member_id": streamsMemberID, + "process_id": "process-1", + "client_id": "client-1", + "instance_id": "instance-1", + "member_epoch": int32(2), + "topology_epoch": int32(2), + "is_classic": false, + "assignments": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + groupId + "/members/" + streamsMemberID + "/assignments"}, + "target_assignment": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + groupId + "/members/" + streamsMemberID + "/target-assignments"}, + }, + }, + } + err := json.NewEncoder(w).Encode(response) + require.NoError(t, err) + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}" +func handleKafkaRestStreamsGroupMember(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + clusterId := vars["cluster_id"] + groupId := vars["group_id"] + memberId := vars["member_id"] + switch r.Method { + case http.MethodGet: + if memberId == streamsMemberID { + response := map[string]interface{}{ + "kind": "KafkaStreamsGroupMember", + "metadata": map[string]interface{}{}, + "cluster_id": clusterId, + "group_id": groupId, + "member_id": streamsMemberID, + "process_id": "process-1", + "client_id": "client-1", + "instance_id": "instance-1", + "member_epoch": int32(2), + "topology_epoch": int32(2), + "is_classic": false, + "assignments": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + groupId + "/members/" + streamsMemberID + "/assignments"}, + "target_assignment": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + groupId + "/members/" + streamsMemberID + "/target-assignments"}, + } + err := json.NewEncoder(w).Encode(response) + require.NoError(t, err) + } else { + require.NoError(t, writeErrorResponse(w, http.StatusNotFound, 40403, "This server does not host this streams group member.")) + } + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}/assignments" +func handleKafkaRestStreamsGroupMemberAssignments(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + clusterId := vars["cluster_id"] + groupId := vars["group_id"] + memberId := vars["member_id"] + switch r.Method { + case http.MethodGet: + response := map[string]interface{}{ + "kind": "KafkaStreamsGroupMemberAssignment", + "metadata": map[string]interface{}{}, + "cluster_id": clusterId, + "group_id": groupId, + "member_id": memberId, + "active_tasks": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + groupId + "/members/" + memberId + "/assignments/active"}, + "standby_tasks": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + groupId + "/members/" + memberId + "/assignments/standby"}, + "warmup_tasks": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + groupId + "/members/" + memberId + "/assignments/warmup"}, + } + err := json.NewEncoder(w).Encode(response) + require.NoError(t, err) + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}/target-assignments" +func handleKafkaRestStreamsGroupMemberTargetAssignments(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + clusterId := vars["cluster_id"] + groupId := vars["group_id"] + memberId := vars["member_id"] + switch r.Method { + case http.MethodGet: + response := map[string]interface{}{ + "kind": "KafkaStreamsGroupMemberAssignment", + "metadata": map[string]interface{}{}, + "cluster_id": clusterId, + "group_id": groupId, + "member_id": memberId, + "active_tasks": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + groupId + "/members/" + memberId + "/target-assignments/active"}, + "standby_tasks": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + groupId + "/members/" + memberId + "/target-assignments/standby"}, + "warmup_tasks": map[string]interface{}{"related": "/kafka/v3/clusters/" + clusterId + "/streams-groups/" + groupId + "/members/" + memberId + "/target-assignments/warmup"}, + } + err := json.NewEncoder(w).Encode(response) + require.NoError(t, err) + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}/assignments/{assignments_type}" +func handleKafkaRestStreamsGroupMemberAssignmentTasks(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + response := map[string]interface{}{ + "kind": "KafkaStreamsTaskList", + "metadata": map[string]interface{}{}, + "data": []map[string]interface{}{ + { + "kind": "KafkaStreamsTask", + "metadata": map[string]interface{}{}, + "subtopology_id": subtopologyID, + "partition_ids": []int32{0, 1, 2}, + }, + }, + } + err := json.NewEncoder(w).Encode(response) + require.NoError(t, err) + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}/target-assignments/{assignments_type}" +func handleKafkaRestStreamsGroupMemberTargetAssignmentTasks(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + response := map[string]interface{}{ + "kind": "KafkaStreamsTaskList", + "metadata": map[string]interface{}{}, + "data": []map[string]interface{}{ + { + "kind": "KafkaStreamsTask", + "metadata": map[string]interface{}{}, + "subtopology_id": subtopologyID, + "partition_ids": []int32{0, 1}, + }, + }, + } + err := json.NewEncoder(w).Encode(response) + require.NoError(t, err) + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}/assignments/{assignments_type}/subtopologies/{subtopology_id}" +func handleKafkaRestStreamsGroupMemberAssignmentTaskPartitions(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + response := map[string]interface{}{ + "kind": "KafkaStreamsTask", + "metadata": map[string]interface{}{}, + "subtopology_id": subtopologyID, + "partition_ids": []int32{0, 1, 2}, + } + err := json.NewEncoder(w).Encode(response) + require.NoError(t, err) + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/members/{member_id}/target-assignments/{assignments_type}/subtopologies/{subtopology_id}" +func handleKafkaRestStreamsGroupMemberTargetAssignmentTaskPartitions(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + response := map[string]interface{}{ + "kind": "KafkaStreamsTask", + "metadata": map[string]interface{}{}, + "subtopology_id": subtopologyID, + "partition_ids": []int32{0, 1}, + } + err := json.NewEncoder(w).Encode(response) + require.NoError(t, err) + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/subtopologies" +func handleKafkaRestStreamsGroupSubtopologies(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + clusterId := vars["cluster_id"] + groupId := vars["group_id"] + switch r.Method { + case http.MethodGet: + response := map[string]interface{}{ + "kind": "KafkaStreamsGroupSubtopologyList", + "metadata": map[string]interface{}{}, + "data": []map[string]interface{}{ + { + "kind": "KafkaStreamsGroupSubtopology", + "metadata": map[string]interface{}{}, + "cluster_id": clusterId, + "group_id": groupId, + "subtopology_id": subtopologyID, + "source_topics": []string{"input-topic-1", "input-topic-2"}, + }, + }, + } + err := json.NewEncoder(w).Encode(response) + require.NoError(t, err) + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/streams-groups/{group_id}/subtopologies/{subtopology_id}" +func handleKafkaRestStreamsGroupSubtopology(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + clusterId := vars["cluster_id"] + groupId := vars["group_id"] + subtopId := vars["subtopology_id"] + switch r.Method { + case http.MethodGet: + if subtopId == subtopologyID { + response := map[string]interface{}{ + "kind": "KafkaStreamsGroupSubtopology", + "metadata": map[string]interface{}{}, + "cluster_id": clusterId, + "group_id": groupId, + "subtopology_id": subtopologyID, + "source_topics": []string{"input-topic-1", "input-topic-2"}, + } + err := json.NewEncoder(w).Encode(response) + require.NoError(t, err) + } else { + require.NoError(t, writeErrorResponse(w, http.StatusNotFound, 40403, "This server does not host this streams group subtopology.")) + } + } + } +} + func writeErrorResponse(responseWriter http.ResponseWriter, statusCode, errorCode int, message string) error { responseWriter.WriteHeader(statusCode) errorResponseBody := fmt.Sprintf(`{