From 03561ecfa27a5d118c79f1bc4ea57e4c7f2be965 Mon Sep 17 00:00:00 2001 From: Tosone Date: Tue, 30 Apr 2024 23:05:17 +0800 Subject: [PATCH] :sparkles: Reimplement lock with badger (#358) --- Makefile | 2 +- build/Dockerfile | 22 +- build/Dockerfile.builder | 2 +- build/Dockerfile.debian | 18 +- go.mod | 4 + go.sum | 12 + pkg/configs/configuration.go | 12 +- pkg/configs/default.go | 3 + pkg/modules/locker/database/database.go | 236 +++++++++++++++---- pkg/modules/locker/database/database_test.go | 50 ++-- 10 files changed, 276 insertions(+), 85 deletions(-) diff --git a/Makefile b/Makefile index 0d544c94..f22c046b 100644 --- a/Makefile +++ b/Makefile @@ -45,7 +45,7 @@ build: ## Build sigma and put the output binary in ./bin @CGO_ENABLED=1 GO111MODULE=on CC="$(CC)" CXX="$(CXX)" $(GOCMD) build $(GOFLAGS) -tags "timetzdata,exclude_graphdriver_devicemapper,exclude_graphdriver_btrfs,containers_image_openpgp" -o bin/$(BINARY_NAME) -v . build-builder: ## Build sigma-builder and put the output binary in ./bin - @CGO_ENABLED=1 GO111MODULE=on CC="$(CC)" CXX="$(CXX)" $(GOCMD) build $(GOFLAGS) -tags "timetzdata,exclude_graphdriver_devicemapper,exclude_graphdriver_btrfs,containers_image_openpgp" -o bin/$(BINARY_NAME)-builder -v ./cmd/builder + @CGO_ENABLED=0 GO111MODULE=on CC="$(CC)" CXX="$(CXX)" $(GOCMD) build $(GOFLAGS) -tags "timetzdata,exclude_graphdriver_devicemapper,exclude_graphdriver_btrfs,containers_image_openpgp" -o bin/$(BINARY_NAME)-builder -v ./cmd/builder clean: ## Remove build related file rm -fr ./bin diff --git a/build/Dockerfile b/build/Dockerfile index 98694b44..29139611 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -69,14 +69,14 @@ RUN set -eux && \ DISABLE_CGO=1 make bin/skopeo."${TARGETOS}"."${TARGETARCH}" && \ cp bin/skopeo."${TARGETOS}"."${TARGETARCH}" /tmp/skopeo -FROM --platform=$BUILDPLATFORM golang:${GOLANG_VERSION} as builder +FROM golang:${GOLANG_VERSION} as builder ARG USE_MIRROR=false RUN set -eux && \ if [ "$USE_MIRROR" = true ]; then sed -i "s/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g" /etc/apk/repositories; fi && \ - apk add --no-cache make bash ncurses build-base git openssl && \ - apk add --no-cache zig --repository=https://mirrors.aliyun.com/alpine/edge/testing + apk add --no-cache make bash ncurses build-base git openssl linux-headers && \ + apk add --no-cache zig --repository=https://mirrors.aliyun.com/alpine/edge/community COPY . /go/src/github.com/go-sigma/sigma COPY --from=web-builder /web/dist /go/src/github.com/go-sigma/sigma/web/dist @@ -86,14 +86,14 @@ WORKDIR /go/src/github.com/go-sigma/sigma ARG TARGETOS TARGETARCH RUN --mount=type=cache,target=/go/pkg/mod --mount=type=cache,target=/root/.cache/go-build \ - case "${TARGETARCH}" in \ - amd64) export CC="zig cc -target x86_64-linux-musl" ;; \ - arm64) export CC="zig cc -target aarch64-linux-musl" ;; \ - esac; \ - case "${TARGETARCH}" in \ - amd64) export CXX="zig c++ -target x86_64-linux-musl" ;; \ - arm64) export CXX="zig c++ -target aarch64-linux-musl" ;; \ - esac; \ + # case "${TARGETARCH}" in \ + # amd64) export CC="zig cc -target x86_64-linux-musl" ;; \ + # arm64) export CC="zig cc -target aarch64-linux-musl" ;; \ + # esac; \ + # case "${TARGETARCH}" in \ + # amd64) export CXX="zig c++ -target x86_64-linux-musl" ;; \ + # arm64) export CXX="zig c++ -target aarch64-linux-musl" ;; \ + # esac; \ GOOS=$TARGETOS GOARCH=$TARGETARCH CC="${CC}" CXX="${CXX}" make build FROM alpine:${ALPINE_VERSION} diff --git a/build/Dockerfile.builder b/build/Dockerfile.builder index 3f801116..5b9bac1d 100644 --- a/build/Dockerfile.builder +++ b/build/Dockerfile.builder @@ -21,7 +21,7 @@ ARG USE_MIRROR=false RUN set -eux && \ if [ "$USE_MIRROR" = true ]; then sed -i "s/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g" /etc/apk/repositories; fi && \ apk add --no-cache make bash ncurses build-base git openssl && \ - apk add --no-cache zig --repository=https://mirrors.aliyun.com/alpine/edge/testing + apk add --no-cache zig --repository=https://mirrors.aliyun.com/alpine/edge/community COPY . /go/src/github.com/go-sigma/sigma WORKDIR /go/src/github.com/go-sigma/sigma diff --git a/build/Dockerfile.debian b/build/Dockerfile.debian index b8ad6109..dad3c34c 100644 --- a/build/Dockerfile.debian +++ b/build/Dockerfile.debian @@ -74,7 +74,7 @@ RUN set -eux && \ DISABLE_CGO=1 make bin/skopeo."${TARGETOS}"."${TARGETARCH}" && \ cp bin/skopeo."${TARGETOS}"."${TARGETARCH}" /tmp/skopeo -FROM --platform=$BUILDPLATFORM golang:${GOLANG_VERSION} as builder +FROM golang:${GOLANG_VERSION} as builder ARG USE_MIRROR=false ARG ZIG_VERSION=0.11.0 @@ -108,14 +108,14 @@ WORKDIR /go/src/github.com/go-sigma/sigma ARG TARGETOS TARGETARCH RUN --mount=type=cache,target=/go/pkg/mod --mount=type=cache,target=/root/.cache/go-build \ - case "${TARGETARCH}" in \ - amd64) export CC="zig cc -target x86_64-linux-musl" ;; \ - arm64) export CC="zig cc -target aarch64-linux-musl" ;; \ - esac; \ - case "${TARGETARCH}" in \ - amd64) export CXX="zig c++ -target x86_64-linux-musl" ;; \ - arm64) export CXX="zig c++ -target aarch64-linux-musl" ;; \ - esac; \ + # case "${TARGETARCH}" in \ + # amd64) export CC="zig cc -target x86_64-linux-musl" ;; \ + # arm64) export CC="zig cc -target aarch64-linux-musl" ;; \ + # esac; \ + # case "${TARGETARCH}" in \ + # amd64) export CXX="zig c++ -target x86_64-linux-musl" ;; \ + # arm64) export CXX="zig c++ -target aarch64-linux-musl" ;; \ + # esac; \ GOOS=$TARGETOS GOARCH=$TARGETARCH CC="${CC}" CXX="${CXX}" make build FROM debian:${DEBIAN_VERSION} diff --git a/go.mod b/go.mod index 0f015ba7..2572d8f5 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/casbin/gorm-adapter/v3 v3.24.0 github.com/containers/podman/v5 v5.0.2 github.com/deckarep/golang-set/v2 v2.6.0 + github.com/dgraph-io/badger/v4 v4.2.0 github.com/distribution/distribution/v3 v3.0.0-alpha.1 github.com/distribution/reference v0.6.0 github.com/docker/cli v25.0.5+incompatible @@ -130,6 +131,7 @@ require ( github.com/cyphar/filepath-securejoin v0.2.4 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davidmz/go-pageant v1.0.2 // indirect + github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/digitorus/pkcs7 v0.0.0-20230818184609-3a137a874352 // indirect github.com/digitorus/timestamp v0.0.0-20231217203849-220c5c2851b7 // indirect @@ -175,10 +177,12 @@ require ( github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect + github.com/golang/glog v1.2.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/certificate-transparency-go v1.1.8 // indirect + github.com/google/flatbuffers v2.0.8+incompatible // indirect github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/go-containerregistry v0.19.1 // indirect diff --git a/go.sum b/go.sum index 6a99c73c..ad8ae078 100644 --- a/go.sum +++ b/go.sum @@ -402,8 +402,14 @@ github.com/davidmz/go-pageant v1.0.2 h1:bPblRCh5jGU+Uptpz6LgMZGD5hJoOt7otgT454Wv github.com/davidmz/go-pageant v1.0.2/go.mod h1:P2EDDnMqIwG5Rrp05dTRITj9z2zpGcD9efWSkTNKLIE= github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= +github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs= +github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak= +github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= +github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dhui/dktest v0.4.0 h1:z05UmuXZHO/bgj/ds2bGMBu8FI4WA+Ag/m3ghL+om7M= @@ -440,6 +446,7 @@ github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 h1:iFaUwBSo5Svw6L7HYpRu/0lE3e0BaElwnNO1qkNQxBY= github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5/go.mod h1:qssHWj60/X5sZFNxpG4HBPDHVqxNm4DfnCKgrbZOT+s= github.com/dsnet/golib v0.0.0-20171103203638-1ea166775780/go.mod h1:Lj+Z9rebOhdfkVLjJ8T6VcRQv3SXugXy999NBtR9aFY= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30= @@ -605,6 +612,8 @@ github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2V github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= github.com/golang-sql/sqlexp v0.1.0/go.mod h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= +github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -647,6 +656,8 @@ github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Z github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/certificate-transparency-go v1.1.8 h1:LGYKkgZF7satzgTak9R4yzfJXEeYVAjV6/EAEJOf1to= github.com/google/certificate-transparency-go v1.1.8/go.mod h1:bV/o8r0TBKRf1X//iiiSgWrvII4d7/8OiA+3vG26gI8= +github.com/google/flatbuffers v2.0.8+incompatible h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM= +github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 h1:0VpGH+cDhbDtdcweoyCVsF3fhN8kejK6rFe/2FFX2nU= github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49/go.mod h1:BkkQ4L1KS1xMt2aWSPStnn55ChGC0DPOn2FQYj+f25M= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -1704,6 +1715,7 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220817070843-5a390386f1f2/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/pkg/configs/configuration.go b/pkg/configs/configuration.go index afbabc28..1c7c54de 100644 --- a/pkg/configs/configuration.go +++ b/pkg/configs/configuration.go @@ -148,9 +148,19 @@ type ConfigurationWorkQueue struct { Inmemory ConfigurationWorkQueueInmemmory `yaml:"inmemory"` } +// ConfigurationLockerDatabase ... +type ConfigurationLockerDatabase struct { + Path string `yaml:"path"` +} + +// ConfigurationLockerRedis ... +type ConfigurationLockerRedis struct{} + // ConfigurationLocker ... type ConfigurationLocker struct { - Type enums.LockerType `yaml:"type"` + Type enums.LockerType `yaml:"type"` + Database ConfigurationLockerDatabase `yaml:"database"` + Redis ConfigurationLockerRedis `yaml:"redis"` } // ConfigurationNamespace ... diff --git a/pkg/configs/default.go b/pkg/configs/default.go index dd92fdde..03adb5e4 100644 --- a/pkg/configs/default.go +++ b/pkg/configs/default.go @@ -63,4 +63,7 @@ func defaultSettings() { if configuration.Cache.Ttl == 0 { configuration.Cache.Ttl = time.Second * 30 } + if configuration.Locker.Type == enums.LockerTypeDatabase && configuration.Locker.Database.Path == "" { + configuration.Locker.Database.Path = "/var/lib/sigma/badger" + } } diff --git a/pkg/modules/locker/database/database.go b/pkg/modules/locker/database/database.go index 67896bb1..c5630e92 100644 --- a/pkg/modules/locker/database/database.go +++ b/pkg/modules/locker/database/database.go @@ -16,34 +16,77 @@ package database import ( "context" - "errors" + "encoding/json" "fmt" - "math/rand" // nolint: gosec + "os" + "strings" + "sync" "time" + badger "github.com/dgraph-io/badger/v4" + "github.com/google/uuid" "github.com/rs/zerolog/log" - "gorm.io/gorm" "github.com/go-sigma/sigma/pkg/configs" - "github.com/go-sigma/sigma/pkg/dal/dao" - "github.com/go-sigma/sigma/pkg/dal/query" + "github.com/go-sigma/sigma/pkg/dal/models" "github.com/go-sigma/sigma/pkg/modules/locker/definition" + "github.com/go-sigma/sigma/pkg/utils" ) +type logger struct{} + +// Errorf is the error log +func (l logger) Errorf(msg string, opts ...interface{}) { + log.Error().Msg(strings.TrimSpace(fmt.Sprintf(msg, opts...))) +} + +// Warningf is the warning log +func (l logger) Warningf(msg string, opts ...interface{}) { + log.Warn().Msg(strings.TrimSpace(fmt.Sprintf(msg, opts...))) +} + +// Infof is the info log +func (l logger) Infof(msg string, opts ...interface{}) { + log.Info().Msg(strings.TrimSpace(fmt.Sprintf(msg, opts...))) +} + +// Debugf is the debug log +func (l logger) Debugf(msg string, opts ...interface{}) { + log.Debug().Msg(strings.TrimSpace(fmt.Sprintf(msg, opts...))) +} + type lockerDatabase struct { - lockerServiceFactory dao.LockerServiceFactory + db *badger.DB } +var initOnce sync.Once + +var db *badger.DB + func New(config configs.Configuration) (definition.Locker, error) { + initOnce.Do(func() { + var err error + dir := config.Locker.Database.Path + if dir == "" { + dir, err = os.MkdirTemp("", "locker") + if err != nil { + panic("make temp dir for badger failed") + } + } + db, err = badger.Open(badger.DefaultOptions(dir).WithLogger(&logger{})) + if err != nil { + panic(fmt.Errorf("open badger database failed: %v", err)) + } + }) return &lockerDatabase{ - lockerServiceFactory: dao.NewLockerServiceFactory(), + db: db, }, nil } type lock struct { - key, value string - expire time.Duration - lockerServiceFactory dao.LockerServiceFactory + db *badger.DB + key, value string + expire time.Duration } // Lock ... @@ -53,30 +96,64 @@ func (l lockerDatabase) Acquire(ctx context.Context, key string, expire, waitTim } ddlCtx, cancel := context.WithTimeout(ctx, waitTimeout) defer cancel() - ticker := time.NewTicker(time.Duration(500) * time.Millisecond) + ticker := time.NewTicker(time.Duration(100) * time.Millisecond) defer func() { ticker.Stop() }() + var err error for { select { case <-ddlCtx.Done(): + if err != nil { + log.Error().Err(err).Msg("Acquire lock failed") + } return nil, ddlCtx.Err() case <-ticker.C: } - val := fmt.Sprintf("%d-%d", rand.Int(), time.Now().Nanosecond()) // nolint: gosec - err := query.Q.Transaction(func(tx *query.Query) error { - lockerService := l.lockerServiceFactory.New(tx) - return lockerService.Create(ctx, key, val, time.Now().Add(expire).UnixMilli()) - }) + value := fmt.Sprintf("%s-%d", uuid.NewString(), time.Now().Nanosecond()) // nolint: gosec + txn := l.db.NewTransaction(true) + var res *badger.Item + res, err = txn.Get([]byte(key)) + if err == badger.ErrKeyNotFound { + err = txn.Set([]byte(key), utils.MustMarshal(models.Locker{Key: key, Value: value, + Expire: time.Now().Add(expire).UnixMilli()})) + if err != nil { + continue + } + } else { + var val []byte + val, err = res.ValueCopy(nil) + if err != nil { + continue + } + var v models.Locker + err = json.Unmarshal(val, &v) + if err != nil { + continue + } + if v.Expire > time.Now().UnixMilli() { + continue + } else { + err = txn.Delete([]byte(key)) + if err != nil { + continue + } + err = txn.Set([]byte(key), utils.MustMarshal(models.Locker{Key: key, Value: value, + Expire: time.Now().Add(expire).UnixMilli()})) + if err != nil { + continue + } + } + } + err = txn.Commit() if err != nil { - log.Error().Err(err).Msg("Create locker failed, wait for retry") continue } return &lock{ - key: key, - value: val, - expire: expire, - lockerServiceFactory: l.lockerServiceFactory, + db: l.db, + key: key, + value: value, + expire: expire, }, nil } } @@ -89,7 +166,7 @@ func (l lockerDatabase) AcquireWithRenew(ctx context.Context, key string, expire } go func() { - ticker := time.NewTicker(time.Duration(500) * time.Millisecond) + ticker := time.NewTicker(time.Duration(100) * time.Millisecond) defer func() { ticker.Stop() }() @@ -122,31 +199,110 @@ func (l lock) Renew(ctx context.Context, ttls ...time.Duration) error { if expire < definition.MinLockExpire { return definition.ErrLockTooShort } - err := query.Q.Transaction(func(tx *query.Query) error { - lockerService := l.lockerServiceFactory.New(tx) - return lockerService.Renew(ctx, l.key, l.value, time.Now().Add(expire).UnixMilli()) - }) - if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) || - errors.Is(err, definition.ErrLockAlreadyExpired) { - log.Error().Err(err).Msg("Locker already expired") - return definition.ErrLockAlreadyExpired + + ddlCtx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + ticker := time.NewTicker(time.Duration(100) * time.Millisecond) + defer func() { + ticker.Stop() + }() + + var err error + for { + select { + case <-ddlCtx.Done(): + if err != nil { + return err + } + return ddlCtx.Err() + case <-ticker.C: } - if errors.Is(err, definition.ErrLockNotHeld) { - log.Error().Err(err).Msg("Locker not held") + txn := l.db.NewTransaction(true) + var val []byte + val, err = getByKey(txn, l.key) + if err != nil { + continue + } + var v models.Locker + err = json.Unmarshal(val, &v) + if err != nil { + continue + } + if v.Value != l.value { return definition.ErrLockNotHeld } - log.Error().Err(err).Msg("Renew locker failed") - return fmt.Errorf("Renew locker failed") + if v.Expire < time.Now().UnixMilli() { + return definition.ErrLockAlreadyExpired + } + err = txn.Set([]byte(l.key), utils.MustMarshal(models.Locker{Key: l.key, Value: l.value, + Expire: time.Now().Add(expire).UnixMilli()})) + if err != nil { + continue + } + break } + return nil } // Unlock ... func (l *lock) Unlock(ctx context.Context) error { - err := query.Q.Transaction(func(tx *query.Query) error { - lockerService := l.lockerServiceFactory.New(tx) - return lockerService.Delete(ctx, l.key, l.value) - }) - return err + ddlCtx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + ticker := time.NewTicker(time.Duration(100) * time.Millisecond) + defer func() { + ticker.Stop() + }() + + var err error + for { + select { + case <-ddlCtx.Done(): + if err != nil { + return err + } + return ddlCtx.Err() + case <-ticker.C: + } + txn := l.db.NewTransaction(true) + var val []byte + val, err = getByKey(txn, l.key) + if err != nil { + continue + } + var v models.Locker + err = json.Unmarshal(val, &v) + if err != nil { + continue + } + if v.Value != l.value { + return definition.ErrLockNotHeld + } + err = txn.Delete([]byte(l.key)) + if err != nil { + continue + } + err = txn.Commit() + if err != nil { + return err + } + break + } + + return nil +} + +func getByKey(txn *badger.Txn, key string) ([]byte, error) { + if txn == nil { + return nil, fmt.Errorf("txn is nil") + } + item, err := txn.Get([]byte(key)) + if err != nil { + return nil, err + } + val, err := item.ValueCopy(nil) + if err != nil { + return nil, err + } + return val, nil } diff --git a/pkg/modules/locker/database/database_test.go b/pkg/modules/locker/database/database_test.go index 5fd6d6a3..3308eceb 100644 --- a/pkg/modules/locker/database/database_test.go +++ b/pkg/modules/locker/database/database_test.go @@ -17,6 +17,8 @@ package database_test import ( "context" "errors" + "fmt" + "os" "sync" "sync/atomic" "testing" @@ -25,22 +27,22 @@ import ( "github.com/stretchr/testify/assert" "github.com/go-sigma/sigma/pkg/configs" - "github.com/go-sigma/sigma/pkg/dal" + "github.com/go-sigma/sigma/pkg/logger" "github.com/go-sigma/sigma/pkg/modules/locker/database" - "github.com/go-sigma/sigma/pkg/tests" ) func TestDatabaseAcquire(t *testing.T) { - assert.NoError(t, tests.Initialize(t)) - assert.NoError(t, tests.DB.Init()) - defer func() { - conn, err := dal.DB.DB() - assert.NoError(t, err) - assert.NoError(t, conn.Close()) - assert.NoError(t, tests.DB.DeInit()) - }() - - config := configs.Configuration{} + logger.SetLevel("debug") + + p, _ := os.MkdirTemp("", "badger") + config := configs.Configuration{ + Locker: configs.ConfigurationLocker{ + Database: configs.ConfigurationLockerDatabase{ + Path: p, + }, + }, + } + defer os.RemoveAll(p) // nolint: errcheck ctx, cancel := context.WithCancel(context.TODO()) defer cancel() @@ -59,6 +61,9 @@ func TestDatabaseAcquire(t *testing.T) { defer wg.Done() l, err := c.Acquire(ctx, key, time.Second*1, time.Second*3) assert.Equal(t, true, err == nil || errors.Is(err, context.DeadlineExceeded)) + if !(err == nil || errors.Is(err, context.DeadlineExceeded)) { + fmt.Println(err) + } if l != nil { <-time.After(time.Millisecond * 100) defer l.Unlock(ctx) // nolint: errcheck @@ -71,16 +76,17 @@ func TestDatabaseAcquire(t *testing.T) { } func TestDatabaseAcquireWithRenew(t *testing.T) { - assert.NoError(t, tests.Initialize(t)) - assert.NoError(t, tests.DB.Init()) - defer func() { - conn, err := dal.DB.DB() - assert.NoError(t, err) - assert.NoError(t, conn.Close()) - assert.NoError(t, tests.DB.DeInit()) - }() - - config := configs.Configuration{} + logger.SetLevel("debug") + + p, _ := os.MkdirTemp("", "badger") + config := configs.Configuration{ + Locker: configs.ConfigurationLocker{ + Database: configs.ConfigurationLockerDatabase{ + Path: p, + }, + }, + } + defer os.RemoveAll(p) // nolint: errcheck ctx, cancel := context.WithCancel(context.TODO()) defer cancel()