Skip to content

Commit

Permalink
feat: add possibility to configure app from file
Browse files Browse the repository at this point in the history
  • Loading branch information
raczu committed Nov 4, 2024
1 parent c7eac4e commit 86e337d
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 3 deletions.
31 changes: 31 additions & 0 deletions configs/config-dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
clusterName: "dev.kube2kafka.cluster"
maxEventAge: "1h"
kafka:
topic: foobar
brokers:
- "localhost:9092"
compression: "snappy"
filters:
- kind: "Pod"
reason: "(?i)created"
message: ".*"
type: "Normal|Warning"
selectors:
- key: cluster
value: "{{ .ClusterName }}"
- key: kind
value: "{{ .InvolvedObject.Kind }}"
- key: namespace
value: "{{ .Namespace }}"
- key: reason
value: "{{ .Reason }}"
- key: message
value: "{{ .Message }}"
- key: type
value: "{{ .Type }}"
- key: component
value: "{{ .Source.Component }}"
- key: occurred
value: "{{ .GetRFC3339Timestamp }}"
- key: count
value: "{{ .Count }}"
58 changes: 58 additions & 0 deletions configs/config-example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# This is an example configuration file for kube2kafka.
# It covers all the possible configuration options.

# The following fields are not required, but they are shown here for
# demonstration purposes:
# - namespace (default: all namespaces)
# - maxEventAge (default: 1 minute)
# - kafka.compression (default: none)
# - kafka.tls (default: no TLS)
# - kafka.sasl (default: no SASL)
# - kafka.sasl.mechanism (default: plain)
# - filters (default: no filter will be applied)
# - selectors (default: all fields will be sent to Kafka)

clusterName: "example.kube2kafka.cluster" # used to identify the cluster
namespace: "default"
# Max event age is used to filter out events during initial sync (list request).
# This allows to avoid sending all occurred events to Kafka when kube2kafka starts.
maxEventAge: "1m"
kafka:
topic: foo
brokers:
- "broker:9092"
- "broker:9093"
- "broker:9094"
compression: "gzip" # one of none, gzip, snappy, lz4 or zstd
tls:
cacert: "/path/to/ca.crt"
cert: "/path/to/client.crt"
key: "/path/to/client.key"
sasl:
username: username
password: password
mechanism: plain # one of plain, sha256 or sha512
# Fields in filters supports regular expressions.
# There is no need to define every field in the filter as empty fields are omitted
# from the comparison.
# Only events that match any of defined filters will be sent to Kafka.
filters:
- kind: "Pod"
namespace: "default|kube-system"
reason: "(?i)created"
message: ".*"
type: "Normal|Warning"
component: "^kubelet$"
- kind: "Service"
namespace: "^(default)?$"
# Selectors are used to extract values from the event to create json payload which
# will be sent to Kafka. The values are extracted using golang text/template package.
# There is no risk when some error occurs during the field selection as the fallback
# mechanism ensure that default fields will be selected.
selectors:
- key: cluster
value: "{{ .ClusterName }}"
- key: kind
value: "{{ .InvolvedObject.Kind }}"
- key: namespace
value: "{{ .Namespace }}"
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/onsi/gomega v1.34.2
github.com/segmentio/kafka-go v0.4.47
go.uber.org/zap v1.27.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.30.5
k8s.io/apimachinery v0.30.5
k8s.io/client-go v0.30.5
Expand Down Expand Up @@ -54,7 +55,6 @@ require (
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
Expand Down
77 changes: 77 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package config

import (
"fmt"
"github.com/raczu/kube2kafka/pkg/kube/watcher"
"github.com/raczu/kube2kafka/pkg/processor"
"gopkg.in/yaml.v3"
"os"
"time"
)

type Config struct {
ClusterName string `yaml:"clusterName"`
TargetNamespace string `yaml:"namespace"`
MaxEventAge time.Duration `yaml:"maxEventAge"`
BufferSize int `yaml:"bufferSize"`
Kafka *KafkaConfig `yaml:"kafka"`
Filters []processor.Filter `yaml:"filters"`
Selectors []processor.Selector `yaml:"selectors"`
}

func (c *Config) SetDefaults() {
if c.MaxEventAge == 0 {
c.MaxEventAge = watcher.DefaultMaxEventAge
}

if c.BufferSize == 0 {
c.BufferSize = watcher.DefaultEventBufferCap
}
}

func (c *Config) Validate() error {
if c.ClusterName == "" {
return fmt.Errorf("cluster name is required")
}

if c.BufferSize <= 0 {
return fmt.Errorf("buffer size must be positive")
}

if c.Kafka == nil {
return fmt.Errorf("kafka config is required")
}

if err := c.Kafka.Validate(); err != nil {
return fmt.Errorf("kafka config has issues: %w", err)
}

for i, filter := range c.Filters {
err := filter.Validate()
if err != nil {
return fmt.Errorf("filter at index %d has issues: %w", i, err)
}
}

for i, selector := range c.Selectors {
err := selector.Validate()
if err != nil {
return fmt.Errorf("selector at index %d has issues: %w", i, err)
}
}
return nil
}

func Read(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, err
}

var config Config
err = yaml.Unmarshal(data, &config)
if err != nil {
return nil, err
}
return &config, nil
}
118 changes: 118 additions & 0 deletions internal/config/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package config

import (
"crypto/tls"
"fmt"
"github.com/raczu/kube2kafka/pkg/exporter"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"
"os"
)

type RawTLSData struct {
CA string `yaml:"cacert"`
Cert string `yaml:"cert"`
Key string `yaml:"key"`
SkipVerify bool `yaml:"skipVerify"`
}

func (r *RawTLSData) Build() (*tls.Config, error) {
var data exporter.TLSData

if r.CA != "" {
ca, err := os.ReadFile(r.CA)
if err != nil {
return nil, fmt.Errorf("error reading ca file: %w", err)
}
data.CA = ca
}

if r.Cert != "" && r.Key != "" {
cert, err := os.ReadFile(r.Cert)
if err != nil {
return nil, fmt.Errorf("error reading client cert file: %w", err)
}
data.Cert = cert

key, err := os.ReadFile(r.Key)
if err != nil {
return nil, fmt.Errorf("error reading client key file: %w", err)
}
data.Key = key
}

data.SkipVerify = r.SkipVerify
return data.Build()
}

type RawSASLData struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
Mechanism string `yaml:"mechanism" default:"plain"`
}

func (r *RawSASLData) Validate() error {
if r.Username == "" {
return fmt.Errorf("username is required")
}

if r.Password == "" {
return fmt.Errorf("password is required")
}
return nil
}

func (r *RawSASLData) Build() (sasl.Mechanism, error) {
factory, err := exporter.MapMechanismString(r.Mechanism)
if err != nil {
return nil, fmt.Errorf("failed to map mechanism string: %w", err)
}
return factory(r.Username, r.Password)
}

type KafkaConfig struct {
Brokers []string `yaml:"brokers"`
Topic string `yaml:"topic"`
RawCompression string `yaml:"compression" default:"none"`
RawTLS *RawTLSData `yaml:"tls"`
RawSASL *RawSASLData `yaml:"sasl"`
}

func (c *KafkaConfig) Validate() error {
if len(c.Brokers) == 0 {
return fmt.Errorf("at least one broker is required")
}

if c.Topic == "" {
return fmt.Errorf("topic is required")
}

if c.RawSASL != nil {
if err := c.RawSASL.Validate(); err != nil {
return fmt.Errorf("sasl config has issues: %w", err)
}
}
return nil
}

func (c *KafkaConfig) TLS() (*tls.Config, error) {
if c.RawTLS == nil {
return nil, nil
}
return c.RawTLS.Build()
}

func (c *KafkaConfig) SASL() (sasl.Mechanism, error) {
if c.RawSASL == nil {
return nil, nil
}
return c.RawSASL.Build()
}

func (c *KafkaConfig) Compression() (kafka.Compression, error) {
compression, err := exporter.MapCodecString(c.RawCompression)
if err != nil {
return kafka.Compression(0), fmt.Errorf("failed to map codec string: %w", err)
}
return compression, nil
}
41 changes: 41 additions & 0 deletions pkg/exporter/config.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,55 @@
package exporter

import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/compress"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
)

type TLSData struct {
CA []byte
Cert []byte
Key []byte
SkipVerify bool
}

// Build creates a tls.Config from the TLSData.
func (t *TLSData) Build() (*tls.Config, error) {
config := &tls.Config{
MinVersion: tls.VersionTLS12,
}

if t.CA != nil {
config.RootCAs = x509.NewCertPool()
if !config.RootCAs.AppendCertsFromPEM(t.CA) {
return nil, fmt.Errorf("failed to parse ca certificate")
}
}

if t.Cert != nil && t.Key != nil {
cert, err := tls.X509KeyPair(t.Cert, t.Key)
if err != nil {
return nil, fmt.Errorf(
"failed to parse client certificate and its private key: %w",
err,
)
}
config.Certificates = []tls.Certificate{cert}
}

if t.SkipVerify {
config.InsecureSkipVerify = true
}
return config, nil
}

var codec2compression = map[string]kafka.Compression{
"none": compress.None,
"gzip": kafka.Gzip,
"snappy": kafka.Snappy,
"lz4": kafka.Lz4,
Expand Down
11 changes: 9 additions & 2 deletions pkg/exporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,21 @@ import (
. "github.com/onsi/gomega"
"github.com/raczu/kube2kafka/pkg/exporter"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/compress"
"github.com/segmentio/kafka-go/sasl/plain"
)

var _ = Describe("MapCodecString", func() {
When("mapping a known codec string", func() {
It("should return the corresponding kafka.Compression", func() {
codecs := []string{"gzip", "snappy", "lz4", "zstd"}
expected := []kafka.Compression{kafka.Gzip, kafka.Snappy, kafka.Lz4, kafka.Zstd}
codecs := []string{"none", "gzip", "snappy", "lz4", "zstd"}
expected := []kafka.Compression{
compress.None,
kafka.Gzip,
kafka.Snappy,
kafka.Lz4,
kafka.Zstd,
}

for i, codec := range codecs {
compression, err := exporter.MapCodecString(codec)
Expand Down

0 comments on commit 86e337d

Please sign in to comment.