Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): update graceful shutdown behaviour #198

Merged
merged 21 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
steps:
- uses: actions/checkout@v4

- uses: actions/setup-go@v4
- uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

Expand Down
1 change: 1 addition & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ import (

var (
ErrServerNotRunning = errors.New("server not running")
ErrServerShutdown = errors.New("server is shutting down")
)
80 changes: 53 additions & 27 deletions examples/graceful/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,84 @@ package main
import (
"context"
"net/http"
"sync"
"syscall"
"time"

"github.com/foomo/keel/interfaces"
"github.com/foomo/keel/service"
"go.uber.org/zap"

"github.com/foomo/keel"
"github.com/foomo/keel/log"
)

func main() {
service.DefaultHTTPHealthzAddr = "localhost:9400"

l := zap.NewExample().Named("root")

l.Info("1. starting readiness checks")
go call(l.Named("readiness"), "http://localhost:9400/healthz/readiness")

svr := keel.NewServer(
keel.WithHTTPZapService(true),
keel.WithHTTPViperService(true),
keel.WithHTTPPrometheusService(true),
keel.WithLogger(l.Named("server")),
keel.WithHTTPHealthzService(true),
)

l := svr.Logger()

go waitGroup(svr.CancelContext(), l.With(log.FServiceName("waitGroup")))

// create demo service
svs := http.NewServeMux()
svs.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
})

svr.AddService(
service.NewHTTP(l, "demo", "localhost:8080", svs),
service.NewHTTP(l, "http", "localhost:8080", svs),
)

svr.Run()
}

func waitGroup(ctx context.Context, l *zap.Logger) {
var wg sync.WaitGroup
svr.AddCloser(interfaces.CloserFunc(func(ctx context.Context) error {
l := l.Named("closer")
l.Info("closing stuff")
time.Sleep(3 * time.Second)
l.Info("done closing stuff")
return nil
}))

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
l.Info("Break the loop")
return
case <-time.After(3 * time.Second):
l.Info("Hello in a loop")
}

l.Info("3. starting http checks")
go call(l.Named("http"), "http://localhost:8080")

l.Info("4. sleeping for 5 seconds")
time.Sleep(5 * time.Second)

l.Info("5. sending shutdown signal")
if err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM); err != nil {
l.Fatal(err.Error())
}

}()

wg.Wait()
svr.Run()
l.Info("done")
}

func call(l *zap.Logger, url string) {
l = l.With(zap.String("url", url))
for {
func() {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
l.With(zap.Error(err)).Error("failed to create request")
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
l.With(zap.Error(err)).Error("failed to send request")
return
}
l.Info("ok", zap.Int("status", resp.StatusCode))
}()
time.Sleep(time.Second)
}
}
6 changes: 3 additions & 3 deletions examples/healthz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
// See k8s for probe documentation
// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#types-of-probe
func main() {
service.DefaultHTTPHealthzAddr = "localhost:9400"

// you can override the below config by settings env vars
_ = os.Setenv("SERVICE_HEALTHZ_ENABLED", "true")

svr := keel.NewServer(
keel.WithHTTPZapService(true),
keel.WithHTTPViperService(true),
// allows you to use probes for health checks in cluster:
// GET :9400/healthz
// GET :9400/healthz/readiness
Expand Down Expand Up @@ -65,7 +65,7 @@ func main() {
select {
case <-time.After(10 * time.Second):
l.Info("initialization done")
case <-svr.CancelContext().Done():
case <-svr.ShutdownContext().Done():
l.Info("initialization canceled")
}

Expand Down
9 changes: 6 additions & 3 deletions examples/persistence/mongo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ func main() {
l := svr.Logger()

cDateTime := &store.DateTimeCodec{}
rb := bson.NewRegistryBuilder()
rb.RegisterCodec(store.TDateTime, cDateTime)
rb := bson.NewRegistry()
rb.RegisterTypeEncoder(store.TDateTime, cDateTime)
rb.RegisterTypeDecoder(store.TDateTime, cDateTime)

// create persistor
persistor, err := keelmongo.New(
Expand All @@ -36,7 +37,9 @@ func main() {
// enable telemetry (enabled by default)
keelmongo.WithOtelEnabled(true),
keelmongo.WithClientOptions(
options.Client().SetRegistry(rb.Build()),
func(clientOptions *options.ClientOptions) {
clientOptions.SetRegistry(rb)
},
),
)
// use log must helper to exit on error
Expand Down
3 changes: 3 additions & 0 deletions examples/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"os"

"github.com/foomo/keel"
"github.com/foomo/keel/service"
)

func main() {
service.DefaultHTTPPrometheusAddr = "localhost:9200"

// you can override the below config by settings env vars
_ = os.Setenv("SERVICE_ZAP_ENABLED", "true")
_ = os.Setenv("SERVICE_VIPER_ENABLED", "true")
Expand Down
45 changes: 34 additions & 11 deletions examples/telemetry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@ package main
import (
"math/rand"
"net/http"
"time"

"github.com/foomo/keel/service"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/instrument"

"github.com/foomo/keel"
"github.com/foomo/keel/log"
"github.com/foomo/keel/net/http/middleware"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
)

var metricRequestLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "demo",
Name: "request_latency_seconds",
Help: "Request Latency",
Buckets: prometheus.ExponentialBuckets(.0001, 2, 50),
})

func main() {
// Run this example with the following env vars:
//
Expand Down Expand Up @@ -48,14 +57,14 @@ func main() {
svs := http.NewServeMux()

{ // counter
counter, err := meter.SyncInt64().Counter(
counter, err := meter.Int64Counter(
"a.counter",
instrument.WithDescription("Count things"),
metric.WithDescription("Count things"),
)
log.Must(l, err, "failed to create counter meter")

svs.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) {
counter.Add(r.Context(), 1, attribute.String("key", "value"))
counter.Add(r.Context(), 1, metric.WithAttributes(attribute.String("key", "value")))
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK!"))
})
Expand All @@ -70,33 +79,47 @@ func main() {
})

{ // up down
upDown, err := meter.SyncInt64().UpDownCounter(
upDown, err := meter.Int64UpDownCounter(
"a.updown",
instrument.WithDescription("Up down values"),
metric.WithDescription("Up down values"),
)
log.Must(l, err, "failed to create up down meter")

svs.HandleFunc("/up", func(w http.ResponseWriter, r *http.Request) {
upDown.Add(r.Context(), 1, attribute.String("key", "value"))
upDown.Add(r.Context(), 1, metric.WithAttributes(attribute.String("key", "value")))
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK!"))
})
svs.HandleFunc("/down", func(w http.ResponseWriter, r *http.Request) {
upDown.Add(r.Context(), -1, attribute.String("key", "value"))
upDown.Add(r.Context(), -1, metric.WithAttributes(attribute.String("key", "value")))
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK!"))
})
}

{ // histogram
histogram, err := meter.SyncInt64().Histogram(
histogram, err := meter.Int64Histogram(
"a.histogram",
instrument.WithDescription("Up down values"),
metric.WithDescription("Up down values"),
metric.WithUnit("ms"),
)
log.Must(l, err, "failed to create up down meter")

svs.HandleFunc("/histogram", func(w http.ResponseWriter, r *http.Request) {
histogram.Record(r.Context(), int64(rand.Int()), attribute.String("key", "value"))
start := time.Now()
time.Sleep(time.Second)
traceID := trace.SpanContextFromContext(r.Context())
histogram.Record(r.Context(), int64(rand.Int()),
metric.WithAttributes(
attribute.String("key", "value"),
attribute.String("traceID", traceID.TraceID().String()),
),
)

metricRequestLatency.(prometheus.ExemplarObserver).ObserveWithExemplar(
time.Since(start).Seconds(), prometheus.Labels{"traceID": traceID.TraceID().String()},
)

w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK!"))
})
Expand Down
Loading
Loading