Skip to content

Commit

Permalink
fix: unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
franklinkim committed Mar 15, 2024
1 parent 6e5b02f commit 44f4655
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 78 deletions.
2 changes: 1 addition & 1 deletion examples/healthz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func main() {
select {
case <-time.After(10 * time.Second):
l.Info("initialization done")
case <-svr.CancelContext().Done():
case <-svr.ShutdownContext().Done():
l.Info("initialization canceled")
}

Expand Down
3 changes: 3 additions & 0 deletions examples/services/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"os"

"github.com/foomo/keel"
"github.com/foomo/keel/service"
)

func main() {
service.DefaultHTTPPrometheusAddr = "localhost:9200"

// you can override the below config by settings env vars
_ = os.Setenv("SERVICE_ZAP_ENABLED", "true")
_ = os.Setenv("SERVICE_VIPER_ENABLED", "true")
Expand Down
100 changes: 55 additions & 45 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ type Server struct {
syncProbes map[healthz.Type][]interface{}
syncProbesLock sync.RWMutex
ctx context.Context
ctxCancel context.Context
ctxCancelFn context.CancelFunc
cancelCtx context.Context
cancelFunc context.CancelFunc
shutdownCtx context.Context
shutdownFunc context.CancelFunc
g *errgroup.Group
gCtx context.Context
l *zap.Logger
Expand All @@ -69,7 +71,7 @@ func NewServer(opts ...Option) *Server {
inst := &Server{
gracefulTimeout: 10 * 3 * time.Second,
shutdownTimeout: 30 * time.Second,
shutdownSignals: []os.Signal{syscall.SIGTERM},
shutdownSignals: []os.Signal{syscall.SIGINT, syscall.SIGTERM},
syncReadmers: []interfaces.Readmer{},
syncProbes: map[healthz.Type][]interface{}{},
ctx: context.Background(),
Expand All @@ -89,21 +91,24 @@ func NewServer(opts ...Option) *Server {
return nil
}))

inst.ctxCancel, inst.ctxCancelFn = signal.NotifyContext(inst.ctx, inst.shutdownSignals...)
inst.g, inst.gCtx = errgroup.WithContext(inst.ctxCancel)
inst.cancelCtx, inst.cancelFunc = context.WithCancel(inst.ctx)
inst.g, inst.gCtx = errgroup.WithContext(inst.cancelCtx)
inst.shutdownCtx, inst.shutdownFunc = signal.NotifyContext(inst.ctx, inst.shutdownSignals...)

// gracefully shutdown
inst.g.Go(func() error {
<-inst.gCtx.Done()
defer inst.ctxCancelFn()
<-inst.shutdownCtx.Done()
inst.l.Info("keel graceful shutdown")

timeoutCtx, timeoutCancel := context.WithTimeout(inst.ctxCancel, inst.shutdownTimeout)
timeoutCtx, timeoutCancel := context.WithTimeout(inst.ctx, inst.shutdownTimeout)
defer timeoutCancel()

inst.shutdown.Store(true)

inst.l.Info("keel pausing graceful shutdown", log.FDuration(inst.gracefulTimeout))
inst.l.Info("keel graceful shutdown timeout",
zap.Duration("graceful_timeout", inst.gracefulTimeout),
zap.Duration("shutdown_timeout", inst.shutdownTimeout),
)
{
timer := time.NewTimer(inst.gracefulTimeout)
select {
Expand All @@ -112,67 +117,57 @@ func NewServer(opts ...Option) *Server {
case <-timer.C:
}
}
inst.l.Info("keel resuming graceful shutdown")
inst.l.Info("keel graceful shutdown timeout complete")

// append internal closers
closers := append(inst.closers(), inst.traceProvider, inst.meterProvider)

inst.l.Debug("keel iterating closers")
inst.l.Info("keel graceful shutdown closers")
for _, closer := range closers {
var err error
l := inst.l.With(log.FName(fmt.Sprintf("%T", closer)))
switch c := closer.(type) {
case interfaces.Closer:
c.Close()
case interfaces.ErrorCloser:
if err := c.Close(); err != nil {
log.WithError(l, err).Error("failed to gracefully stop ErrorCloser")
}
err = c.Close()
case interfaces.CloserWithContext:
c.Close(timeoutCtx)
case interfaces.ErrorCloserWithContext:
if err := c.Close(timeoutCtx); err != nil {
log.WithError(l, err).Error("failed to gracefully stop ErrorCloserWithContext")
}
err = c.Close(timeoutCtx)
case interfaces.Shutdowner:
c.Shutdown()
case interfaces.ErrorShutdowner:
if err := c.Shutdown(); err != nil {
log.WithError(l, err).Error("failed to gracefully stop ErrorShutdowner")
}
err = c.Shutdown()
case interfaces.ShutdownerWithContext:
c.Shutdown(timeoutCtx)
case interfaces.ErrorShutdownerWithContext:
if err := c.Shutdown(timeoutCtx); err != nil {
log.WithError(l, err).Error("failed to gracefully stop ErrorShutdownerWithContext")
}
err = c.Shutdown(timeoutCtx)
case interfaces.Stopper:
c.Stop()
case interfaces.ErrorStopper:
if err := c.Stop(); err != nil {
log.WithError(l, err).Error("failed to gracefully stop ErrorStopper")
}
err = c.Stop()
case interfaces.StopperWithContext:
c.Stop(timeoutCtx)
case interfaces.ErrorStopperWithContext:
if err := c.Stop(timeoutCtx); err != nil {
log.WithError(l, err).Error("failed to gracefully stop ErrorStopperWithContext")
}
err = c.Stop(timeoutCtx)
case interfaces.Unsubscriber:
c.Unsubscribe()
case interfaces.ErrorUnsubscriber:
if err := c.Unsubscribe(); err != nil {
log.WithError(l, err).Error("failed to gracefully stop ErrorUnsubscriber")
}
err = c.Unsubscribe()
case interfaces.UnsubscriberWithContext:
c.Unsubscribe(timeoutCtx)
case interfaces.ErrorUnsubscriberWithContext:
if err := c.Unsubscribe(timeoutCtx); err != nil {
log.WithError(l, err).Error("failed to gracefully stop ErrorUnsubscriberWithContext")
}
err = c.Unsubscribe(timeoutCtx)
}
if err != nil {
log.WithError(l, err).Warn("keel graceful shutdown closer failed")
} else {
l.Debug("keel graceful shutdown closer closed")
}
}

inst.l.Debug("keel done closing")
inst.l.Info("keel graceful shutdown complete")

return nil
})
Expand Down Expand Up @@ -246,7 +241,22 @@ func (s *Server) Context() context.Context {

// CancelContext returns server's cancel context
func (s *Server) CancelContext() context.Context {
return s.ctxCancel
return s.cancelCtx
}

// CancelFunc returns server's cancel function
func (s *Server) CancelFunc() context.CancelFunc {
return s.cancelFunc
}

// ShutdownContext returns server's shutdown cancel context
func (s *Server) ShutdownContext() context.Context {
return s.shutdownCtx
}

// ShutdownCancel returns server's shutdown cancel function
func (s *Server) ShutdownCancel() context.CancelFunc {
return s.shutdownFunc
}

// AddService add a single service
Expand Down Expand Up @@ -335,9 +345,9 @@ func (s *Server) AddReadinessHealthzers(probes ...interface{}) {
}

// IsCanceled returns true if the internal errgroup has been canceled
// func (s *Server) IsCanceled() bool {
// return errors.Is(s.gCtx.Err(), context.Canceled)
// }
func (s *Server) IsCanceled() bool {
return s.cancelCtx.Err() != nil
}

// Healthz returns true if the server is running
func (s *Server) Healthz() error {
Expand All @@ -349,12 +359,12 @@ func (s *Server) Healthz() error {

// Run runs the server
func (s *Server) Run() {
// if s.IsCanceled() {
// s.l.Info("keel server canceled")
// return
// }
if s.IsCanceled() {
s.l.Info("keel server canceled")
return
}

// defer s.ctxCancelFn()
defer s.cancelFunc()
s.l.Info("starting keel server")

// start services
Expand Down
15 changes: 10 additions & 5 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ func (s *KeelTestSuite) BeforeTest(suiteName, testName string) {
})

ctx, cancel := context.WithCancel(context.Background())
s.svr = keel.NewServer(keel.WithContext(ctx), keel.WithLogger(s.l))
s.svr = keel.NewServer(
keel.WithContext(ctx),
keel.WithLogger(s.l),
keel.WithGracefulTimeout(400*time.Millisecond),
keel.WithShutdownTimeout(800*time.Millisecond),
)
s.cancel = cancel
}

Expand All @@ -75,7 +80,7 @@ func (s *KeelTestSuite) TearDownSuite() {}

func (s *KeelTestSuite) TestServiceHTTP() {
s.svr.AddServices(
service.NewHTTP(s.l, "test", ":55000", s.mux),
service.NewHTTP(s.l, "test", "localhost:55000", s.mux),
)

s.runServer()
Expand All @@ -87,8 +92,8 @@ func (s *KeelTestSuite) TestServiceHTTP() {

func (s *KeelTestSuite) TestServiceHTTPZap() {
s.svr.AddServices(
service.NewHTTPZap(s.l, "zap", ":9100", "/log"),
service.NewHTTP(s.l, "test", ":55000", s.mux),
service.NewHTTPZap(s.l, "zap", "localhost:9100", "/log"),
service.NewHTTP(s.l, "test", "localhost:55000", s.mux),
)

s.runServer()
Expand Down Expand Up @@ -142,7 +147,7 @@ func (s *KeelTestSuite) TestServiceHTTPZap() {

func (s *KeelTestSuite) TestGraceful() {
s.svr.AddServices(
service.NewHTTP(s.l, "test", ":55000", s.mux),
service.NewHTTP(s.l, "test", "localhost:55000", s.mux),
)

s.runServer()
Expand Down
14 changes: 12 additions & 2 deletions service/goroutine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func ExampleNewGoRoutine() {

svr := keel.NewServer(
keel.WithLogger(zap.NewExample()),
keel.WithGracefulTimeout(time.Second),
keel.WithShutdownTimeout(3*time.Second),
)

svr.AddService(
Expand All @@ -30,7 +32,7 @@ func ExampleNewGoRoutine() {
}

l.Info("ping")
time.Sleep(time.Second)
time.Sleep(700 * time.Millisecond)
once.Do(shutdown)
}
}),
Expand All @@ -43,8 +45,16 @@ func ExampleNewGoRoutine() {
// {"level":"info","msg":"starting keel service","keel_service_type":"goroutine","keel_service_name":"demo"}
// {"level":"info","msg":"ping","keel_service_type":"goroutine","keel_service_name":"demo","keel_service_inst":0}
// {"level":"info","msg":"ping","keel_service_type":"goroutine","keel_service_name":"demo","keel_service_inst":0}
// {"level":"debug","msg":"keel graceful shutdown"}
// {"level":"info","msg":"keel graceful shutdown"}
// {"level":"info","msg":"keel graceful shutdown timeout","graceful_timeout":"1s","shutdown_timeout":"3s"}
// {"level":"info","msg":"ping","keel_service_type":"goroutine","keel_service_name":"demo","keel_service_inst":0}
// {"level":"info","msg":"keel graceful shutdown timeout complete"}
// {"level":"info","msg":"keel graceful shutdown closers"}
// {"level":"info","msg":"stopping keel service","keel_service_type":"goroutine","keel_service_name":"demo"}
// {"level":"info","msg":"context has been canceled du to graceful shutdow","keel_service_type":"goroutine","keel_service_name":"demo","keel_service_inst":0}
// {"level":"debug","msg":"keel graceful shutdown closer closed","name":"*service.GoRoutine"}
// {"level":"debug","msg":"keel graceful shutdown closer closed","name":"trace.noopTracerProvider"}
// {"level":"debug","msg":"keel graceful shutdown closer closed","name":"nonrecording.noopMeterProvider"}
// {"level":"info","msg":"keel graceful shutdown complete"}
// {"level":"info","msg":"keel server stopped"}
}
8 changes: 0 additions & 8 deletions service/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,6 @@ import (
"time"
)

// shutdown example after the given time
func shutdownAfter(duration time.Duration) {
go func() {
time.Sleep(duration)
shutdown()
}()
}

func shutdown() {
if err := syscall.Kill(syscall.Getpid(), syscall.SIGINT); err != nil {
panic(err)
Expand Down
12 changes: 11 additions & 1 deletion service/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service_test

import (
"net/http"
"time"

"github.com/foomo/keel"
"github.com/foomo/keel/service"
Expand All @@ -11,6 +12,8 @@ import (
func ExampleNewHTTP() {
svr := keel.NewServer(
keel.WithLogger(zap.NewExample()),
keel.WithGracefulTimeout(time.Second),
keel.WithShutdownTimeout(3*time.Second),
)

l := svr.Logger()
Expand All @@ -34,7 +37,14 @@ func ExampleNewHTTP() {
// {"level":"info","msg":"starting keel server"}
// {"level":"info","msg":"starting keel service","keel_service_type":"http","keel_service_name":"demo","net_host_ip":"localhost","net_host_port":"8080"}
// {"level":"info","msg":"OK"}
// {"level":"debug","msg":"keel graceful shutdown"}
// {"level":"info","msg":"keel graceful shutdown"}
// {"level":"info","msg":"keel graceful shutdown timeout","graceful_timeout":"1s","shutdown_timeout":"3s"}
// {"level":"info","msg":"keel graceful shutdown timeout complete"}
// {"level":"info","msg":"keel graceful shutdown closers"}
// {"level":"info","msg":"stopping keel service","keel_service_type":"http","keel_service_name":"demo"}
// {"level":"warn","msg":"keel graceful shutdown closer failed","name":"*service.HTTP","error_type":"*errors.withStack","error_message":"failed to stop service: context deadline exceeded","error_messageVerbose":"context deadline exceeded\nfailed to stop service\ngithub.com/foomo/keel/service.(*HTTP).Close\n\t/Users/franklin/Workingcopies/github.com/foomo/keel/service/http.go:88\ngithub.com/foomo/keel.NewServer.func2\n\t/Users/franklin/Workingcopies/github.com/foomo/keel/server.go:137\ngolang.org/x/sync/errgroup.(*Group).Go.func1\n\t/Users/franklin/Workspaces/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:75\nruntime.goexit\n\t/opt/homebrew/opt/go/libexec/src/runtime/asm_arm64.s:1222"}
// {"level":"debug","msg":"keel graceful shutdown closer closed","name":"trace.noopTracerProvider"}
// {"level":"debug","msg":"keel graceful shutdown closer closed","name":"nonrecording.noopMeterProvider"}
// {"level":"info","msg":"keel graceful shutdown complete"}
// {"level":"info","msg":"keel server stopped"}
}
4 changes: 1 addition & 3 deletions service/httphealthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ func NewHealthz(l *zap.Logger, name, addr, path string, probes map[healthz.Type]

unavailable := func(l *zap.Logger, w http.ResponseWriter, r *http.Request, err error) {
if err != nil {
l = log.WithError(l, err)
l = log.WithHTTPRequest(l, r)
l.Debug("healthz probe failed")
log.WithError(l, err).With(log.FHTTPTarget(r.RequestURI)).Debug("healthz probe failed")
http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
}
}
Expand Down
Loading

0 comments on commit 44f4655

Please sign in to comment.