Skip to content

Commit

Permalink
Jira/holiday/iwf 330 (#501)
Browse files Browse the repository at this point in the history
Co-authored-by: lwolczynski <[email protected]>
Co-authored-by: Katie Atrops <[email protected]>
  • Loading branch information
3 people authored Dec 2, 2024
1 parent 425478b commit 1d24724
Show file tree
Hide file tree
Showing 20 changed files with 293 additions and 57 deletions.
5 changes: 3 additions & 2 deletions cmd/server/iwf/iwf.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func start(c *cli.Context) {
if err != nil {
rawLog.Fatalf("Unable to connect to Temporal because of error %v", err)
}
unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false)
unifiedClient = temporalapi.NewTemporalClient(temporalClient, config.Interpreter.Temporal.Namespace, converter.GetDefaultDataConverter(), false, &config.Api.QueryWorkflowFailedRetryPolicy)

for _, svcName := range services {
go launchTemporalService(svcName, *config, unifiedClient, temporalClient, logger)
Expand All @@ -146,7 +146,8 @@ func start(c *cli.Context) {
if err != nil {
rawLog.Fatalf("Unable to connect to Cadence because of error %v", err)
}
unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc)

unifiedClient = cadenceapi.NewCadenceClient(domain, cadenceClient, serviceClient, encoded.GetDefaultDataConverter(), closeFunc, &config.Api.QueryWorkflowFailedRetryPolicy)

for _, svcName := range services {
go launchCadenceService(svcName, *config, unifiedClient, serviceClient, domain, closeFunc, logger)
Expand Down
26 changes: 26 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ type (
OmitRpcInputOutputInHistory *bool `yaml:"omitRpcInputOutputInHistory"`
// WaitForStateCompletionMigration is used to control workflowId of the WaitForStateCompletion system/internal workflows
WaitForStateCompletionMigration WaitForStateCompletionMigration `yaml:"waitForStateCompletionMigration"`
QueryWorkflowFailedRetryPolicy QueryWorkflowFailedRetryPolicy `yaml:"queryWorkflowFailedRetryPolicy"`
}

QueryWorkflowFailedRetryPolicy struct {
// defaults to 1
InitialIntervalSeconds int `yaml:"initialIntervalSeconds"`
// defaults to 5
MaximumAttempts int `yaml:"maximumAttempts"`
}

WaitForStateCompletionMigration struct {
Expand Down Expand Up @@ -147,3 +155,21 @@ func (c Config) GetWaitForOnWithDefault() string {
}
return "old"
}

func QueryWorkflowFailedRetryPolicyWithDefaults(retryPolicy *QueryWorkflowFailedRetryPolicy) QueryWorkflowFailedRetryPolicy {
var rp QueryWorkflowFailedRetryPolicy

if retryPolicy != nil && retryPolicy.InitialIntervalSeconds != 0 {
rp.InitialIntervalSeconds = retryPolicy.InitialIntervalSeconds
} else {
rp.InitialIntervalSeconds = 1
}

if retryPolicy != nil && retryPolicy.MaximumAttempts != 0 {
rp.MaximumAttempts = retryPolicy.MaximumAttempts
} else {
rp.MaximumAttempts = 5
}

return rp
}
3 changes: 3 additions & 0 deletions config/config_template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ api:
waitForStateCompletionMigration:
signalWithStartOn: old
waitForOn: old
queryWorkflowFailedRetryPolicy:
initialIntervalSeconds: 1
maximumAttempts: 5
interpreter:
defaultWorkflowConfig:
continueAsNewThreshold: 100
Expand Down
3 changes: 3 additions & 0 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ api:
waitForStateCompletionMigration:
signalWithStartOn: old
waitForOn: old
queryWorkflowFailedRetryPolicy:
initialIntervalSeconds: 1
maximumAttempts: 5
interpreter:
temporal:
hostPort: localhost:7233
Expand Down
3 changes: 3 additions & 0 deletions config/development_cadence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ api:
waitForStateCompletionMigration:
signalWithStartOn: old
waitForOn: old
queryWorkflowFailedRetryPolicy:
initialIntervalSeconds: 1
maximumAttempts: 5
interpreter:
# interpreterActivityConfig:
# disableSystemSearchAttributes: true # (deprecated) set to true if you don't have advanced visibility in Cadence, see more https://github.com/uber/cadence/issues/5085
Expand Down
3 changes: 3 additions & 0 deletions gen/iwfidl/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,7 @@ components:
skipSignalReapply: true
workflowRunId: workflowRunId
workflowId: workflowId
skipUpdateReapply: true
stateExecutionId: stateExecutionId
properties:
workflowId:
Expand All @@ -1448,6 +1449,8 @@ components:
type: string
skipSignalReapply:
type: boolean
skipUpdateReapply:
type: boolean
required:
- resetType
- workflowId
Expand Down
26 changes: 26 additions & 0 deletions gen/iwfidl/docs/WorkflowResetRequest.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Name | Type | Description | Notes
**StateId** | Pointer to **string** | | [optional]
**StateExecutionId** | Pointer to **string** | | [optional]
**SkipSignalReapply** | Pointer to **bool** | | [optional]
**SkipUpdateReapply** | Pointer to **bool** | | [optional]

## Methods

Expand Down Expand Up @@ -248,6 +249,31 @@ SetSkipSignalReapply sets SkipSignalReapply field to given value.

HasSkipSignalReapply returns a boolean if a field has been set.

### GetSkipUpdateReapply

`func (o *WorkflowResetRequest) GetSkipUpdateReapply() bool`

GetSkipUpdateReapply returns the SkipUpdateReapply field if non-nil, zero value otherwise.

### GetSkipUpdateReapplyOk

`func (o *WorkflowResetRequest) GetSkipUpdateReapplyOk() (*bool, bool)`

GetSkipUpdateReapplyOk returns a tuple with the SkipUpdateReapply field if it's non-nil, zero value otherwise
and a boolean to check if the value has been set.

### SetSkipUpdateReapply

`func (o *WorkflowResetRequest) SetSkipUpdateReapply(v bool)`

SetSkipUpdateReapply sets SkipUpdateReapply field to given value.

### HasSkipUpdateReapply

`func (o *WorkflowResetRequest) HasSkipUpdateReapply() bool`

HasSkipUpdateReapply returns a boolean if a field has been set.


[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)

Expand Down
36 changes: 36 additions & 0 deletions gen/iwfidl/model_workflow_reset_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/uber-go/tally/v4 v4.1.1
github.com/uber/cadence-idl v0.0.0-20220713235846-fda89e95df1e
github.com/urfave/cli v1.22.5
go.temporal.io/sdk v1.29.1
go.temporal.io/sdk v1.30.0
go.temporal.io/sdk/contrib/tally v0.1.0
go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20220331154559-fd0d1eb548eb
go.uber.org/cadence v0.17.1-0.20230105221902-f50f452a8eae // pin to pick GetUnhandledSignalNames API
Expand All @@ -19,14 +19,12 @@ require (
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/pkg/errors v0.9.1
go.temporal.io/api v1.39.0
)
require go.temporal.io/api v1.40.0

require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/nexus-rpc/sdk-go v0.0.10 // indirect
github.com/nexus-rpc/sdk-go v0.0.11 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.8.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nexus-rpc/sdk-go v0.0.10 h1:7jEPUlsghxoD4OJ2H8YbFJ1t4wbxsUef7yZgBfyY3uA=
github.com/nexus-rpc/sdk-go v0.0.10/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ=
github.com/nexus-rpc/sdk-go v0.0.11 h1:qH3Us3spfp50t5ca775V1va2eE6z1zMQDZY4mvbw0CI=
github.com/nexus-rpc/sdk-go v0.0.11/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
Expand Down Expand Up @@ -399,11 +399,11 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.5.0/go.mod h1:BqKxEJJYdxb5dqf0ODfzfMxh8UEQ5L3zKS51FiIYYkA=
go.temporal.io/api v1.39.0 h1:pbhcfvNDB7mllb8lIBqPcg+m6LMG/IhTpdiFxe+0mYk=
go.temporal.io/api v1.39.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/api v1.40.0 h1:rH3HvUUCFr0oecQTBW5tI6DdDQsX2Xb6OFVgt/bvLto=
go.temporal.io/api v1.40.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.12.0/go.mod h1:lSp3lH1lI0TyOsus0arnO3FYvjVXBZGi/G7DjnAnm6o=
go.temporal.io/sdk v1.29.1 h1:y+sUMbUhTU9rj50mwIZAPmcXCtgUdOWS9xHDYRYSgZ0=
go.temporal.io/sdk v1.29.1/go.mod h1:kp//DRvn3CqQVBCtjL51Oicp9wrZYB2s6row1UgzcKQ=
go.temporal.io/sdk v1.30.0 h1:7jzSFZYk+tQ2kIYEP+dvrM7AW9EsCEP52JHCjVGuwbI=
go.temporal.io/sdk v1.30.0/go.mod h1:Pv45F/fVDgWKx+jhix5t/dGgqROVaI+VjPLd3CHWqq0=
go.temporal.io/sdk/contrib/tally v0.1.0 h1:edAcGKNIDYU7fd10e4C/43dHw/h1F9cACupcmIKwzPI=
go.temporal.io/sdk/contrib/tally v0.1.0/go.mod h1:PckZI8gA0AxIBvrgT2FQlm8TaqptYmqRdy2NxOibsZQ=
go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20220331154559-fd0d1eb548eb h1:cpSEAnCCkOtcwdY2NysRXdi4Ny19F5V5KplrAVl1/Mo=
Expand Down
4 changes: 4 additions & 0 deletions integ/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func createTestConfig(testCfg IwfServiceTestConfig) config.Config {
SignalWithStartOn: "old",
WaitForOn: "old",
},
QueryWorkflowFailedRetryPolicy: config.QueryWorkflowFailedRetryPolicy{
InitialIntervalSeconds: 1,
MaximumAttempts: 5,
},
},
Interpreter: config.Interpreter{
VerboseDebug: false,
Expand Down
2 changes: 0 additions & 2 deletions integ/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ func doTestCreateWithoutStartingState(t *testing.T, backendType service.BackendT
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second * 2)

// workflow shouldn't executed any state
var dump service.ContinueAsNewDumpResponse
err = uclient.QueryWorkflow(context.Background(), &dump, wfId, "", service.ContinueAsNewDumpQueryType)
Expand Down
74 changes: 73 additions & 1 deletion integ/locking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func doTestLockingWorkflow(t *testing.T, backendType service.BackendType, config
startReq := iwfidl.WorkflowStartRequest{
WorkflowId: wfId,
IwfWorkflowType: locking.WorkflowType,
WorkflowTimeoutSeconds: 100,
WorkflowTimeoutSeconds: 300,
IwfWorkerUrl: "http://localhost:" + testWorkflowServerPort,
StartStateId: ptr.Any(locking.State1),
WorkflowStartOptions: &iwfidl.WorkflowStartOptions{
Expand Down Expand Up @@ -245,4 +245,76 @@ func doTestLockingWorkflow(t *testing.T, backendType service.BackendType, config
},
}
assertions.ElementsMatch(expected1, queryResult1.GetObjects())

//reset here with reapply and compare counter
resetReq := apiClient.DefaultApi.ApiV1WorkflowResetPost(context.Background())
_, httpResp, err = resetReq.WorkflowResetRequest(iwfidl.WorkflowResetRequest{
WorkflowId: wfId,
ResetType: iwfidl.BEGINNING,
//SkipSignalReapply: ptr.Any(true),
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second * 20)
req2Reset := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
resp2Reset, httpResp, err := req2Reset.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
WorkflowId: wfId,
}).Execute()
panicAtHttpError(err, httpResp)

assertions.Equal(iwfidl.COMPLETED, resp2Reset.GetWorkflowStatus())

//TODO: There is a bug in the Temporal go SDK where only the first update method is actually executed. When that is fixed the following code can be uncommented to test resetting update methods.
//time.Sleep(time.Second * 10)
//reqRpcReset := apiClient.DefaultApi.ApiV1WorkflowRpcPost(context.Background())
//_, httpResp, err = reqRpc.WorkflowRpcRequest(iwfidl.WorkflowRpcRequest{
// WorkflowId: wfId,
// RpcName: locking.RPCName,
// Input: locking.UnblockValue,
//}).Execute()
//panicAtHttpError(err, httpResp)

//time.Sleep(time.Second * 20)
//req2Reset := apiClient.DefaultApi.ApiV1WorkflowGetWithWaitPost(context.Background())
//resp2Reset, httpResp, err := req2Reset.WorkflowGetRequest(iwfidl.WorkflowGetRequest{
// WorkflowId: wfId,
//}).Execute()
//panicAtHttpError(err, httpResp)

//s2StartsDecides := locking.InParallelS2 + rpcIncrease // locking.InParallelS2 original state executions, and a new trigger from rpc
//finalCounterValue := int64(locking.InParallelS2 + 2*rpcIncrease)
//stateCompletionCount := locking.InParallelS2 + rpcIncrease + 1
//resetHistory, _ := wfHandler.GetTestResult()
//assertions.Equalf(map[string]int64{
// "S1_start": 1,
// "S1_decide": 1,
// "StateWaiting_start": 1,
// "StateWaiting_decide": 1,
// "S2_start": int64(s2StartsDecides),
// "S2_decide": int64(s2StartsDecides),
//}, resetHistory, "locking.test fail, %v", history)
//
//assertions.Equal(iwfidl.COMPLETED, resp2Reset.GetWorkflowStatus())
//assertions.Equal(stateCompletionCount, len(resp2Reset.GetResults()))
//
//reqSearchReset := apiClient.DefaultApi.ApiV1WorkflowSearchattributesGetPost(context.Background())
//searchResultReset, httpResp, err := reqSearchReset.WorkflowGetSearchAttributesRequest(iwfidl.WorkflowGetSearchAttributesRequest{
// WorkflowId: wfId,
// Keys: []iwfidl.SearchAttributeKeyAndType{
// {
// Key: iwfidl.PtrString(locking.TestSearchAttributeIntKey),
// ValueType: ptr.Any(iwfidl.INT),
// },
// },
//}).Execute()
//panicAtHttpError(err, httpResp)
//
//expectedSearchAttributeIntReset := iwfidl.SearchAttribute{
// Key: iwfidl.PtrString(locking.TestSearchAttributeIntKey),
// ValueType: ptr.Any(iwfidl.INT),
// IntegerValue: iwfidl.PtrInt64(finalCounterValue),
//}
//assertions.Equal([]iwfidl.SearchAttribute{expectedSearchAttributeIntReset}, searchResultReset.GetSearchAttributes())

//reset here without update reapply and counter should be less
}
3 changes: 0 additions & 3 deletions integ/signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
panicAtHttpError(err, httpResp)

// test update config
time.Sleep(time.Second)
var debugDump service.DebugDumpResponse
err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType)
if err != nil {
Expand All @@ -134,7 +133,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second)
err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType)
if err != nil {
panic(err)
Expand All @@ -154,7 +152,6 @@ func doTestSignalWorkflow(t *testing.T, backendType service.BackendType, config
}).Execute()
panicAtHttpError(err, httpResp)

time.Sleep(time.Second)
err = uclient.QueryWorkflow(context.Background(), &debugDump, wfId, "", service.DebugDumpQueryType)
if err != nil {
panic(err)
Expand Down
3 changes: 0 additions & 3 deletions integ/timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,6 @@ func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *
}).Execute()
panicAtHttpError(err, httpResp)

if config != nil {
time.Sleep(time.Second * 2)
}
err = uclient.QueryWorkflow(context.Background(), &timerInfos, wfId, "", service.GetCurrentTimerInfosQueryType)
if err != nil {
log.Fatalf("Fail to invoke query %v", err)
Expand Down
Loading

0 comments on commit 1d24724

Please sign in to comment.