Skip to content

Commit

Permalink
[chore] implement missing topic resource methods
Browse files Browse the repository at this point in the history
This includes:

- Update
- Import
- Read

There are a few workarounds commented in the code.
  • Loading branch information
r-vasquez committed Feb 7, 2024
1 parent 7a06df5 commit 0731d07
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 164 deletions.
33 changes: 7 additions & 26 deletions redpanda/models/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,11 @@ import "github.com/hashicorp/terraform-plugin-framework/types"

// Topic defines the structure for configuration settings parsed from HCL.
type Topic struct {
Name types.String `tfsdk:"name"`
PartitionCount types.Number `tfsdk:"partition_count"`
ReplicationFactor types.Number `tfsdk:"replication_factor"`
Configuration []*TopicConfiguration `tfsdk:"configuration"`
AllowDeletion types.Bool `tfsdk:"allow_deletion"`
ClusterAPIURL types.String `tfsdk:"cluster_api_url"`
ID types.String `tfsdk:"id"`
}

// TopicConfiguration defines the structure for configuration settings parsed from HCL.
type TopicConfiguration struct {
Name types.String
Type types.String
Value types.String
Source types.String
IsReadOnly types.Bool
IsSensitive types.Bool
ConfigSynonyms []*TopicConfigSynonym
Documentation types.String
}

// TopicConfigSynonym defines the structure for configuration settings parsed from HCL.
type TopicConfigSynonym struct {
Name types.String
Value types.String
Source types.String
Name types.String `tfsdk:"name"`
PartitionCount types.Number `tfsdk:"partition_count"`
ReplicationFactor types.Number `tfsdk:"replication_factor"`
Configuration types.Map `tfsdk:"configuration"`
AllowDeletion types.Bool `tfsdk:"allow_deletion"`
ClusterAPIURL types.String `tfsdk:"cluster_api_url"`
ID types.String `tfsdk:"id"`
}
238 changes: 145 additions & 93 deletions redpanda/resources/topic/resource_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@ package topic
import (
"context"
"fmt"
"strings"

"github.com/hashicorp/terraform-plugin-framework-validators/stringvalidator"
"github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/resource"
"github.com/hashicorp/terraform-plugin-framework/resource/schema"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/mapplanmodifier"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/numberplanmodifier"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier"
"github.com/hashicorp/terraform-plugin-framework/schema/validator"
"github.com/hashicorp/terraform-plugin-framework/types"
cloudv1beta1 "github.com/redpanda-data/terraform-provider-redpanda/proto/gen/go/redpanda/api/controlplane/v1beta1"
dataplanev1alpha1 "github.com/redpanda-data/terraform-provider-redpanda/proto/gen/go/redpanda/api/dataplane/v1alpha1"
"github.com/redpanda-data/terraform-provider-redpanda/redpanda/clients"
"github.com/redpanda-data/terraform-provider-redpanda/redpanda/models"
Expand All @@ -48,16 +49,6 @@ type Topic struct {
resData utils.ResourceData
}

var sourceValidator = stringvalidator.OneOf(
"SOURCE_UNSPECIFIED",
"DYNAMIC_TOPIC_CONFIG",
"DYNAMIC_BROKER_CONFIG",
"DYNAMIC_DEFAULT_BROKER_CONFIG",
"STATIC_BROKER_CONFIG",
"DEFAULT_CONFIG",
"DYNAMIC_BROKER_LOGGER_CONFIG",
)

// Configure configures the Topic resource.
func (t *Topic) Configure(_ context.Context, request resource.ConfigureRequest, response *resource.ConfigureResponse) {
if request.ProviderData == nil {
Expand Down Expand Up @@ -89,78 +80,33 @@ func resourceTopicSchema() schema.Schema {
PlanModifiers: []planmodifier.String{stringplanmodifier.RequiresReplace()},
},
"partition_count": schema.NumberAttribute{
Description: "The number of partitions for the topic. This determines how the data is distributed across brokers.",
Optional: true,
PlanModifiers: []planmodifier.Number{numberplanmodifier.RequiresReplace()},
Description: "The number of partitions for the topic. This determines how the data is distributed across brokers.",
Optional: true,
Computed: true,
PlanModifiers: []planmodifier.Number{
numberplanmodifier.RequiresReplace(),
numberplanmodifier.UseStateForUnknown(),
},
},
"replication_factor": schema.NumberAttribute{
Description: "The replication factor for the topic, which defines how many copies of the data are kept across different brokers for fault tolerance.",
Optional: true,
PlanModifiers: []planmodifier.Number{numberplanmodifier.RequiresReplace()},
Description: "The replication factor for the topic, which defines how many copies of the data are kept across different brokers for fault tolerance.",
Optional: true,
Computed: true,
PlanModifiers: []planmodifier.Number{
numberplanmodifier.RequiresReplace(),
numberplanmodifier.UseStateForUnknown(),
},
},
"allow_deletion": schema.BoolAttribute{
Description: "Indicates whether the topic can be deleted.",
Optional: true,
},
"configuration": schema.SetNestedAttribute{
NestedObject: schema.NestedAttributeObject{
Attributes: map[string]schema.Attribute{
"name": schema.StringAttribute{
Description: "The name of the configuration parameter.",
Required: true,
},
"value": schema.StringAttribute{
Description: "The value of the configuration parameter.",
Required: true,
},
"type": schema.StringAttribute{
Description: "The type of the configuration parameter.",
Computed: true,
},
"source": schema.StringAttribute{
Description: "The source of the configuration parameter, indicating how the configuration was set.",
Computed: true,
Validators: []validator.String{
sourceValidator,
},
},
"is_read_only": schema.BoolAttribute{
Description: "Indicates whether the configuration parameter is read-only.",
Computed: true,
},
"is_sensitive": schema.BoolAttribute{
Description: "Indicates whether the configuration parameter is sensitive and should be handled securely.",
Computed: true,
},
"config_synonyms": schema.SetNestedAttribute{
NestedObject: schema.NestedAttributeObject{
Attributes: map[string]schema.Attribute{
"name": schema.StringAttribute{
Description: "The synonym name for the configuration parameter.",
Computed: true,
},
"value": schema.StringAttribute{
Description: "The synonym value for the configuration parameter.",
Computed: true,
},
"source": schema.StringAttribute{
Description: "The source of the synonym, indicating how the synonym was set.",
Computed: true,
Validators: []validator.String{
sourceValidator,
},
},
},
},
Computed: true,
},
"documentation": schema.StringAttribute{
Description: "Documentation for the configuration parameter, providing additional context or information.",
Computed: true,
},
},
},
Optional: true,
"configuration": schema.MapAttribute{
ElementType: types.StringType,
Description: "A map of string key/value pairs of topic configurations.",
Optional: true,
Computed: true,
PlanModifiers: []planmodifier.Map{mapplanmodifier.UseStateForUnknown()},
},
"cluster_api_url": schema.StringAttribute{
Required: true,
Expand All @@ -170,7 +116,8 @@ func resourceTopicSchema() schema.Schema {
PlanModifiers: []planmodifier.String{stringplanmodifier.RequiresReplace()},
},
"id": schema.StringAttribute{
Computed: true,
Computed: true,
PlanModifiers: []planmodifier.String{stringplanmodifier.UseStateForUnknown()},
},
},
}
Expand All @@ -186,36 +133,62 @@ func (t *Topic) Create(ctx context.Context, request resource.CreateRequest, resp
var model models.Topic
response.Diagnostics.Append(request.Plan.Get(ctx, &model)...)

cfg, err := utils.SliceToTopicConfiguration(model.Configuration)
cfg, err := utils.MapToCreateTopicConfiguration(model.Configuration)
if err != nil {
response.Diagnostics.AddError(fmt.Sprintf("failed to convert topic configuration for %s", model.Name), err.Error())
response.Diagnostics.AddError(fmt.Sprintf("failed to parse topic configuration for %s", model.Name), err.Error())
return
}
err = t.createTopicClient(ctx, model.ClusterAPIURL.ValueString())
if err != nil {
response.Diagnostics.AddError("failed to create topic client", err.Error())
return
}
var p, rf *int32
if !model.PartitionCount.IsUnknown() {
p = utils.NumberToInt32(model.PartitionCount)
}
if !model.ReplicationFactor.IsUnknown() {
rf = utils.NumberToInt32(model.ReplicationFactor)
}
_, err = t.TopicClient.CreateTopic(ctx, &dataplanev1alpha1.CreateTopicRequest{
Topic: &dataplanev1alpha1.CreateTopicRequest_Topic{
Name: model.Name.ValueString(),
PartitionCount: utils.NumberToInt32(model.PartitionCount),
ReplicationFactor: utils.NumberToInt32(model.ReplicationFactor),
PartitionCount: p,
ReplicationFactor: rf,
Configs: cfg,
},
})
if err != nil {
response.Diagnostics.AddError(fmt.Sprintf("failed to create topic %s", model.Name.ValueString()), err.Error())
response.Diagnostics.AddError(fmt.Sprintf("failed to create topic %q", model.Name.ValueString()), err.Error())
return
}
// This should be gone after a fix in Redpanda core (#15722) lands in the
// next patch release. Once it's released, all the information below should
// come in the CreateTopicResponse.
tp, err := utils.FindTopicByName(ctx, model.Name.ValueString(), t.TopicClient)
if err != nil {
response.Diagnostics.AddError(fmt.Sprintf("failed to get topic %q information after creation", model.Name), err.Error())
return
}
tpCfgRes, err := t.TopicClient.GetTopicConfigurations(ctx, &dataplanev1alpha1.GetTopicConfigurationsRequest{TopicName: tp.Name})
if err != nil {
response.Diagnostics.AddError(fmt.Sprintf("failed to retrieve %q topic configuration", tp.Name), err.Error())
return
}
tpCfg := filterDynamicConfig(tpCfgRes.Configurations)
tpCfgMap, err := utils.TopicConfigurationToMap(tpCfg)
if err != nil {
response.Diagnostics.AddError("unable to parse the topic configuration", err.Error())
return
}
// TODO: Once ListTopic is implemented, we should set the state according to the ListTopic response and not the model.
response.Diagnostics.Append(response.State.Set(ctx, models.Topic{
Name: model.Name,
PartitionCount: model.PartitionCount,
ReplicationFactor: model.ReplicationFactor,
Configuration: model.Configuration,
Name: types.StringValue(tp.Name),
PartitionCount: utils.Int32ToNumber(tp.PartitionCount),
ReplicationFactor: utils.Int32ToNumber(tp.ReplicationFactor),
Configuration: tpCfgMap,
AllowDeletion: model.AllowDeletion,
ClusterAPIURL: model.ClusterAPIURL,
ID: types.StringValue(tp.Name),
})...)
}

Expand All @@ -237,12 +210,13 @@ func (t *Topic) Read(ctx context.Context, request resource.ReadRequest, response
response.Diagnostics.AddError(fmt.Sprintf("failed receive response from topic api for topic %s", model.Name), err.Error())
return
}
tpCfg, err := t.TopicClient.GetTopicConfigurations(ctx, &dataplanev1alpha1.GetTopicConfigurationsRequest{TopicName: tp.Name})
tpCfgRes, err := t.TopicClient.GetTopicConfigurations(ctx, &dataplanev1alpha1.GetTopicConfigurationsRequest{TopicName: tp.Name})
if err != nil {
response.Diagnostics.AddError(fmt.Sprintf("failed to retrieve %q topic configuration", tp.Name), err.Error())
return
}
topicCfg, err := utils.TopicConfigurationToSlice(tpCfg.Configurations)
tpCfg := filterDynamicConfig(tpCfgRes.Configurations)
topicCfg, err := utils.TopicConfigurationToMap(tpCfg)
if err != nil {
response.Diagnostics.AddError("unable to parse the topic configuration", err.Error())
return
Expand All @@ -253,11 +227,44 @@ func (t *Topic) Read(ctx context.Context, request resource.ReadRequest, response
ReplicationFactor: utils.Int32ToNumber(tp.ReplicationFactor),
Configuration: topicCfg,
AllowDeletion: model.AllowDeletion,
ClusterAPIURL: model.ClusterAPIURL,
ID: types.StringValue(tp.Name),
})...)
}

// Update updates the state of the Topic resource.
func (*Topic) Update(_ context.Context, _ resource.UpdateRequest, _ *resource.UpdateResponse) {
func (t *Topic) Update(ctx context.Context, request resource.UpdateRequest, response *resource.UpdateResponse) {
var plan, state models.Topic
response.Diagnostics.Append(request.Plan.Get(ctx, &plan)...)
response.Diagnostics.Append(request.State.Get(ctx, &state)...)
err := t.createTopicClient(ctx, plan.ClusterAPIURL.ValueString())
if err != nil {
response.Diagnostics.AddError("failed to create topic client", err.Error())
return
}
if !plan.Configuration.Equal(state.Configuration) {
cfgToSet, err := utils.MapToUpdateTopicConfiguration(plan.Configuration)
if err != nil {
response.Diagnostics.AddError("failed to parse the configuration map", err.Error())
return
}
cfgs, err := t.TopicClient.UpdateTopicConfigurations(ctx, &dataplanev1alpha1.UpdateTopicConfigurationsRequest{
TopicName: plan.Name.ValueString(),
Configurations: cfgToSet,
})
if err != nil {
response.Diagnostics.AddError("failed to update topic configuration", err.Error())
return
}
tpCfg := filterDynamicConfig(cfgs.Configurations)
topicCfg, err := utils.TopicConfigurationToMap(tpCfg)
if err != nil {
response.Diagnostics.AddError("unable to parse the topic received topicCfg", err.Error())
return
}
plan.Configuration = topicCfg
}
response.Diagnostics.Append(response.State.Set(ctx, &plan)...)
}

// Delete deletes the Topic resource.
Expand All @@ -282,8 +289,36 @@ func (t *Topic) Delete(ctx context.Context, request resource.DeleteRequest, resp
}

// ImportState imports the state of the Topic resource.
func (*Topic) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) {
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("name"), types.StringValue(req.ID))...)
func (t *Topic) ImportState(ctx context.Context, req resource.ImportStateRequest, resp *resource.ImportStateResponse) {
split := strings.SplitN(req.ID, ",", 2)
if len(split) != 2 {
resp.Diagnostics.AddError(fmt.Sprintf("wrong ADDR ID format: %v", req.ID), "ADDR ID format is <topic_name>,<cluster_id>")
return
}
topicName, clusterID := split[0], split[1]
client, err := clients.NewClusterServiceClient(ctx, t.resData.CloudEnv, clients.ClientRequest{
ClientID: t.resData.ClientID,
ClientSecret: t.resData.ClientSecret,
})
if err != nil {
resp.Diagnostics.AddError("unable to start a cluster client", "unable to start a cluster client; make sure ADDR ID format is <topic_name>,<cluster_id>")
return
}
cluster, err := client.GetCluster(ctx, &cloudv1beta1.GetClusterRequest{
Id: clusterID,
})
if err != nil {
resp.Diagnostics.AddError(fmt.Sprintf("failed to find cluster with ID %q", clusterID), err.Error())
return
}
clusterURL, err := utils.SplitSchemeDefPort(cluster.DataplaneApi.Url, "443")
if err != nil {
resp.Diagnostics.AddError("unable to parse Cluster API URL", err.Error())
return
}
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("name"), types.StringValue(topicName))...)
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("id"), types.StringValue(topicName))...)
resp.Diagnostics.Append(resp.State.SetAttribute(ctx, path.Root("cluster_api_url"), types.StringValue(clusterURL))...)
}

func (t *Topic) createTopicClient(ctx context.Context, clusterURL string) error {
Expand All @@ -300,3 +335,20 @@ func (t *Topic) createTopicClient(ctx context.Context, clusterURL string) error
t.TopicClient = client
return nil
}

// filterDynamicConfig filters the configs and returns only the one with a
// DYNAMIC_TOPIC_CONFIG source.
func filterDynamicConfig(configs []*dataplanev1alpha1.Topic_Configuration) []*dataplanev1alpha1.Topic_Configuration {
var filtered []*dataplanev1alpha1.Topic_Configuration
for _, cfg := range configs {
if cfg != nil {
// cleanup.policy always report the source as Dynamic even if we are
// using the default. We can't manage this property for now.
// See https://github.com/redpanda-data/redpanda/issues/2225
if cfg.Source == dataplanev1alpha1.ConfigSource_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG && cfg.Name != "cleanup.policy" {
filtered = append(filtered, cfg)
}
}
}
return filtered
}
Loading

0 comments on commit 0731d07

Please sign in to comment.