diff --git a/configs/config-dev.yml b/configs/config-dev.yml new file mode 100644 index 0000000..84e5b3a --- /dev/null +++ b/configs/config-dev.yml @@ -0,0 +1,31 @@ +clusterName: "dev.kube2kafka.cluster" +maxEventAge: "1h" +kafka: + topic: foobar + brokers: + - "localhost:9092" + compression: "snappy" +filters: + - kind: "Pod" + reason: "(?i)created" + message: ".*" + type: "Normal|Warning" +selectors: + - key: cluster + value: "{{ .ClusterName }}" + - key: kind + value: "{{ .InvolvedObject.Kind }}" + - key: namespace + value: "{{ .Namespace }}" + - key: reason + value: "{{ .Reason }}" + - key: message + value: "{{ .Message }}" + - key: type + value: "{{ .Type }}" + - key: component + value: "{{ .Source.Component }}" + - key: occurred + value: "{{ .GetRFC3339Timestamp }}" + - key: count + value: "{{ .Count }}" diff --git a/configs/config-example.yml b/configs/config-example.yml new file mode 100644 index 0000000..0c41f57 --- /dev/null +++ b/configs/config-example.yml @@ -0,0 +1,58 @@ +# This is an example configuration file for kube2kafka. +# It covers all the possible configuration options. + +# The following fields are not required, but they are shown here for +# demonstration purposes: +# - namespace (default: all namespaces) +# - maxEventAge (default: 1 minute) +# - kafka.compression (default: none) +# - kafka.tls (default: no TLS) +# - kafka.sasl (default: no SASL) +# - kafka.sasl.mechanism (default: plain) +# - filters (default: no filter will be applied) +# - selectors (default: all fields will be sent to Kafka) + +clusterName: "example.kube2kafka.cluster" # used to identify the cluster +namespace: "default" +# Max event age is used to filter out events during initial sync (list request). +# This allows to avoid sending all occurred events to Kafka when kube2kafka starts. +maxEventAge: "1m" +kafka: + topic: foo + brokers: + - "broker:9092" + - "broker:9093" + - "broker:9094" + compression: "gzip" # one of none, gzip, snappy, lz4 or zstd + tls: + cacert: "/path/to/ca.crt" + cert: "/path/to/client.crt" + key: "/path/to/client.key" + sasl: + username: username + password: password + mechanism: plain # one of plain, sha256 or sha512 +# Fields in filters supports regular expressions. +# There is no need to define every field in the filter as empty fields are omitted +# from the comparison. +# Only events that match any of defined filters will be sent to Kafka. +filters: + - kind: "Pod" + namespace: "default|kube-system" + reason: "(?i)created" + message: ".*" + type: "Normal|Warning" + component: "^kubelet$" + - kind: "Service" + namespace: "^(default)?$" +# Selectors are used to extract values from the event to create json payload which +# will be sent to Kafka. The values are extracted using golang text/template package. +# There is no risk when some error occurs during the field selection as the fallback +# mechanism ensure that default fields will be selected. +selectors: + - key: cluster + value: "{{ .ClusterName }}" + - key: kind + value: "{{ .InvolvedObject.Kind }}" + - key: namespace + value: "{{ .Namespace }}" \ No newline at end of file diff --git a/go.mod b/go.mod index 472e635..35d4acc 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/onsi/gomega v1.34.2 github.com/segmentio/kafka-go v0.4.47 go.uber.org/zap v1.27.0 + gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.30.5 k8s.io/apimachinery v0.30.5 k8s.io/client-go v0.30.5 @@ -54,7 +55,6 @@ require ( google.golang.org/protobuf v1.34.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.120.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..0079568 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,77 @@ +package config + +import ( + "fmt" + "github.com/raczu/kube2kafka/pkg/kube/watcher" + "github.com/raczu/kube2kafka/pkg/processor" + "gopkg.in/yaml.v3" + "os" + "time" +) + +type Config struct { + ClusterName string `yaml:"clusterName"` + TargetNamespace string `yaml:"namespace"` + MaxEventAge time.Duration `yaml:"maxEventAge"` + BufferSize int `yaml:"bufferSize"` + Kafka *KafkaConfig `yaml:"kafka"` + Filters []processor.Filter `yaml:"filters"` + Selectors []processor.Selector `yaml:"selectors"` +} + +func (c *Config) SetDefaults() { + if c.MaxEventAge == 0 { + c.MaxEventAge = watcher.DefaultMaxEventAge + } + + if c.BufferSize == 0 { + c.BufferSize = watcher.DefaultEventBufferCap + } +} + +func (c *Config) Validate() error { + if c.ClusterName == "" { + return fmt.Errorf("cluster name is required") + } + + if c.BufferSize <= 0 { + return fmt.Errorf("buffer size must be positive") + } + + if c.Kafka == nil { + return fmt.Errorf("kafka config is required") + } + + if err := c.Kafka.Validate(); err != nil { + return fmt.Errorf("kafka config has issues: %w", err) + } + + for i, filter := range c.Filters { + err := filter.Validate() + if err != nil { + return fmt.Errorf("filter at index %d has issues: %w", i, err) + } + } + + for i, selector := range c.Selectors { + err := selector.Validate() + if err != nil { + return fmt.Errorf("selector at index %d has issues: %w", i, err) + } + } + return nil +} + +func Read(path string) (*Config, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + var config Config + err = yaml.Unmarshal(data, &config) + if err != nil { + return nil, err + } + return &config, nil +} diff --git a/internal/config/kafka.go b/internal/config/kafka.go new file mode 100644 index 0000000..c8bf729 --- /dev/null +++ b/internal/config/kafka.go @@ -0,0 +1,118 @@ +package config + +import ( + "crypto/tls" + "fmt" + "github.com/raczu/kube2kafka/pkg/exporter" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl" + "os" +) + +type RawTLSData struct { + CA string `yaml:"cacert"` + Cert string `yaml:"cert"` + Key string `yaml:"key"` + SkipVerify bool `yaml:"skipVerify"` +} + +func (r *RawTLSData) Build() (*tls.Config, error) { + var data exporter.TLSData + + if r.CA != "" { + ca, err := os.ReadFile(r.CA) + if err != nil { + return nil, fmt.Errorf("error reading ca file: %w", err) + } + data.CA = ca + } + + if r.Cert != "" && r.Key != "" { + cert, err := os.ReadFile(r.Cert) + if err != nil { + return nil, fmt.Errorf("error reading client cert file: %w", err) + } + data.Cert = cert + + key, err := os.ReadFile(r.Key) + if err != nil { + return nil, fmt.Errorf("error reading client key file: %w", err) + } + data.Key = key + } + + data.SkipVerify = r.SkipVerify + return data.Build() +} + +type RawSASLData struct { + Username string `yaml:"username"` + Password string `yaml:"password"` + Mechanism string `yaml:"mechanism" default:"plain"` +} + +func (r *RawSASLData) Validate() error { + if r.Username == "" { + return fmt.Errorf("username is required") + } + + if r.Password == "" { + return fmt.Errorf("password is required") + } + return nil +} + +func (r *RawSASLData) Build() (sasl.Mechanism, error) { + factory, err := exporter.MapMechanismString(r.Mechanism) + if err != nil { + return nil, fmt.Errorf("failed to map mechanism string: %w", err) + } + return factory(r.Username, r.Password) +} + +type KafkaConfig struct { + Brokers []string `yaml:"brokers"` + Topic string `yaml:"topic"` + RawCompression string `yaml:"compression" default:"none"` + RawTLS *RawTLSData `yaml:"tls"` + RawSASL *RawSASLData `yaml:"sasl"` +} + +func (c *KafkaConfig) Validate() error { + if len(c.Brokers) == 0 { + return fmt.Errorf("at least one broker is required") + } + + if c.Topic == "" { + return fmt.Errorf("topic is required") + } + + if c.RawSASL != nil { + if err := c.RawSASL.Validate(); err != nil { + return fmt.Errorf("sasl config has issues: %w", err) + } + } + return nil +} + +func (c *KafkaConfig) TLS() (*tls.Config, error) { + if c.RawTLS == nil { + return nil, nil + } + return c.RawTLS.Build() +} + +func (c *KafkaConfig) SASL() (sasl.Mechanism, error) { + if c.RawSASL == nil { + return nil, nil + } + return c.RawSASL.Build() +} + +func (c *KafkaConfig) Compression() (kafka.Compression, error) { + compression, err := exporter.MapCodecString(c.RawCompression) + if err != nil { + return kafka.Compression(0), fmt.Errorf("failed to map codec string: %w", err) + } + return compression, nil +} diff --git a/pkg/exporter/config.go b/pkg/exporter/config.go index 8483e89..f76ad09 100644 --- a/pkg/exporter/config.go +++ b/pkg/exporter/config.go @@ -1,14 +1,55 @@ package exporter import ( + "crypto/tls" + "crypto/x509" "fmt" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/compress" "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" "github.com/segmentio/kafka-go/sasl/scram" ) +type TLSData struct { + CA []byte + Cert []byte + Key []byte + SkipVerify bool +} + +// Build creates a tls.Config from the TLSData. +func (t *TLSData) Build() (*tls.Config, error) { + config := &tls.Config{ + MinVersion: tls.VersionTLS12, + } + + if t.CA != nil { + config.RootCAs = x509.NewCertPool() + if !config.RootCAs.AppendCertsFromPEM(t.CA) { + return nil, fmt.Errorf("failed to parse ca certificate") + } + } + + if t.Cert != nil && t.Key != nil { + cert, err := tls.X509KeyPair(t.Cert, t.Key) + if err != nil { + return nil, fmt.Errorf( + "failed to parse client certificate and its private key: %w", + err, + ) + } + config.Certificates = []tls.Certificate{cert} + } + + if t.SkipVerify { + config.InsecureSkipVerify = true + } + return config, nil +} + var codec2compression = map[string]kafka.Compression{ + "none": compress.None, "gzip": kafka.Gzip, "snappy": kafka.Snappy, "lz4": kafka.Lz4, diff --git a/pkg/exporter/config_test.go b/pkg/exporter/config_test.go index 4e4343d..52db8de 100644 --- a/pkg/exporter/config_test.go +++ b/pkg/exporter/config_test.go @@ -5,14 +5,21 @@ import ( . "github.com/onsi/gomega" "github.com/raczu/kube2kafka/pkg/exporter" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/compress" "github.com/segmentio/kafka-go/sasl/plain" ) var _ = Describe("MapCodecString", func() { When("mapping a known codec string", func() { It("should return the corresponding kafka.Compression", func() { - codecs := []string{"gzip", "snappy", "lz4", "zstd"} - expected := []kafka.Compression{kafka.Gzip, kafka.Snappy, kafka.Lz4, kafka.Zstd} + codecs := []string{"none", "gzip", "snappy", "lz4", "zstd"} + expected := []kafka.Compression{ + compress.None, + kafka.Gzip, + kafka.Snappy, + kafka.Lz4, + kafka.Zstd, + } for i, codec := range codecs { compression, err := exporter.MapCodecString(codec)