diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 685c133..db0527e 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -29,17 +29,6 @@ jobs: go-build: ~/.cache/go-build name: ${{ matrix.os }} @ Go ${{ matrix.go }} runs-on: ${{ matrix.os }} - env: - GO111MODULE: on - GOPROXY: https://proxy.golang.org - - # Service containers to run with `container-job` - services: - nsqd: - image: appleboy/nsqd - ports: - - 4150:4150 - - 4151:4151 steps: - name: Set up Go ${{ matrix.go }} diff --git a/nsq_test.go b/nsq_test.go index 56e7b2e..f9664da 100644 --- a/nsq_test.go +++ b/nsq_test.go @@ -20,8 +20,6 @@ import ( "go.uber.org/goleak" ) -var host = "127.0.0.1" - func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } @@ -86,8 +84,11 @@ func TestNSQDefaultFlow(t *testing.T) { } func TestNSQShutdown(t *testing.T) { + ctx := context.Background() + natsC, endpoint := setupNSQContainer(ctx, t) + defer testcontainers.CleanupContainer(t, natsC) w := NewWorker( - WithAddr(host+":4150"), + WithAddr(endpoint), WithTopic("test2"), ) q, err := queue.NewQueue( @@ -105,11 +106,14 @@ func TestNSQShutdown(t *testing.T) { } func TestNSQCustomFuncAndWait(t *testing.T) { + ctx := context.Background() + natsC, endpoint := setupNSQContainer(ctx, t) + defer testcontainers.CleanupContainer(t, natsC) m := &mockMessage{ Message: "foo", } w := NewWorker( - WithAddr(host+":4150"), + WithAddr(endpoint), WithTopic("test3"), WithMaxInFlight(10), WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { @@ -134,11 +138,14 @@ func TestNSQCustomFuncAndWait(t *testing.T) { } func TestEnqueueJobAfterShutdown(t *testing.T) { + ctx := context.Background() + natsC, endpoint := setupNSQContainer(ctx, t) + defer testcontainers.CleanupContainer(t, natsC) m := mockMessage{ Message: "foo", } w := NewWorker( - WithAddr(host + ":4150"), + WithAddr(endpoint), ) q, err := queue.NewQueue( queue.WithWorker(w), @@ -156,11 +163,14 @@ func TestEnqueueJobAfterShutdown(t *testing.T) { } func TestJobReachTimeout(t *testing.T) { + ctx := context.Background() + natsC, endpoint := setupNSQContainer(ctx, t) + defer testcontainers.CleanupContainer(t, natsC) m := mockMessage{ Message: "foo", } w := NewWorker( - WithAddr(host+":4150"), + WithAddr(endpoint), WithTopic("timeout"), WithMaxInFlight(2), WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { @@ -195,11 +205,14 @@ func TestJobReachTimeout(t *testing.T) { } func TestCancelJobAfterShutdown(t *testing.T) { + ctx := context.Background() + natsC, endpoint := setupNSQContainer(ctx, t) + defer testcontainers.CleanupContainer(t, natsC) m := mockMessage{ Message: "test", } w := NewWorker( - WithAddr(host+":4150"), + WithAddr(endpoint), WithTopic("cancel"), WithLogger(queue.NewLogger()), WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { @@ -234,11 +247,14 @@ func TestCancelJobAfterShutdown(t *testing.T) { } func TestGoroutineLeak(t *testing.T) { + ctx := context.Background() + natsC, endpoint := setupNSQContainer(ctx, t) + defer testcontainers.CleanupContainer(t, natsC) m := mockMessage{ Message: "foo", } w := NewWorker( - WithAddr(host+":4150"), + WithAddr(endpoint), WithTopic("GoroutineLeak"), WithLogger(queue.NewEmptyLogger()), WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { @@ -279,11 +295,14 @@ func TestGoroutineLeak(t *testing.T) { } func TestGoroutinePanic(t *testing.T) { + ctx := context.Background() + natsC, endpoint := setupNSQContainer(ctx, t) + defer testcontainers.CleanupContainer(t, natsC) m := mockMessage{ Message: "foo", } w := NewWorker( - WithAddr(host+":4150"), + WithAddr(endpoint), WithTopic("GoroutinePanic"), WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { panic("missing something") @@ -305,11 +324,14 @@ func TestGoroutinePanic(t *testing.T) { } func TestNSQStatsinQueue(t *testing.T) { + ctx := context.Background() + natsC, endpoint := setupNSQContainer(ctx, t) + defer testcontainers.CleanupContainer(t, natsC) m := mockMessage{ Message: "foo", } w := NewWorker( - WithAddr(host+":4150"), + WithAddr(endpoint), WithTopic("nsq_stats"), WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { log.Println("get message") @@ -334,11 +356,14 @@ func TestNSQStatsinQueue(t *testing.T) { } func TestNSQStatsInWorker(t *testing.T) { + ctx := context.Background() + natsC, endpoint := setupNSQContainer(ctx, t) + defer testcontainers.CleanupContainer(t, natsC) m := mockMessage{ Message: "foo", } w := NewWorker( - WithAddr(host+":4150"), + WithAddr(endpoint), WithTopic("nsq_stats_queue"), )