Skip to content

Commit

Permalink
Fix i/o between pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
Ardem committed Feb 12, 2025
1 parent 27ecd02 commit 7732d9f
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 17 deletions.
2 changes: 1 addition & 1 deletion backend/pipelines/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/russross/blackfriday/v2 v2.0.1 // indirect
github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect
github.com/syndtr/goleveldb v1.0.0 // indirect
github.com/ylem-co/shared-messaging v0.0.0-20240906192118-7b8fffc71211 // indirect
github.com/ylem-co/shared-messaging v0.0.0-20250212095619-4155f4bd1124 // indirect
go.uber.org/atomic v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/net v0.0.0-20211029224645-99673261e6eb // indirect
Expand Down
2 changes: 2 additions & 0 deletions backend/pipelines/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,8 @@ github.com/ylem-co/shared-messaging v0.0.0-20240902171028-26da5a197fd4 h1:EMlJO+
github.com/ylem-co/shared-messaging v0.0.0-20240902171028-26da5a197fd4/go.mod h1:OAQr4N1aBobOY7KC4D4bRXWmdMl4SHxsKSs8yBniIoM=
github.com/ylem-co/shared-messaging v0.0.0-20240906192118-7b8fffc71211 h1:80cVDw/cJjxYz3DbnjrfXoKTwJD9Rahfzncsk0tQJgw=
github.com/ylem-co/shared-messaging v0.0.0-20240906192118-7b8fffc71211/go.mod h1:OAQr4N1aBobOY7KC4D4bRXWmdMl4SHxsKSs8yBniIoM=
github.com/ylem-co/shared-messaging v0.0.0-20250212095619-4155f4bd1124 h1:gf42BLiDeMQP3EH6EQ7UmvCLSOsOIlJ2wmTMwNV3mdo=
github.com/ylem-co/shared-messaging v0.0.0-20250212095619-4155f4bd1124/go.mod h1:OAQr4N1aBobOY7KC4D4bRXWmdMl4SHxsKSs8yBniIoM=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/sirupsen/logrus"
)

func InitiatePipelineRun(tx *sql.Tx, trc msgsrv.TaskRunContext, prevOutputBytes []byte, prewPipelineRunUuid uuid.UUID) error {
func InitiatePipelineRun(tx *sql.Tx, trc msgsrv.TaskRunContext, prevOutputBytes []byte, prewPipelineRunUuid uuid.UUID, prevPipelineOutputBytes []byte) error {
sysVars, err := envvariable.GetEnvVariablesByOrganizationUuidTx(tx, trc.Task.OrganizationUuid)
if err != nil {
return err
Expand Down Expand Up @@ -49,7 +49,7 @@ func InitiatePipelineRun(tx *sql.Tx, trc msgsrv.TaskRunContext, prevOutputBytes
sr := schedule.ScheduledRun{
PipelineRunUuid: wrUuid,
PipelineId: wf.Id,
Input: make([]byte, 0),
Input: prevPipelineOutputBytes,
EnvVars: envVars,
ExecuteAt: &executeAt,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (l *TriggerListener) OnTaskRunResult(ctx goka.Context, envelope interface{}
} else {
err = taskrunner.InitiatePipelineRun(tx, msgsrv.TaskRunContext{
Task: t,
}, []byte{}, uuid.Nil)
}, []byte{}, uuid.Nil, trr.Output)

if err != nil {
_ = tx.Rollback()
Expand Down
13 changes: 1 addition & 12 deletions processor/taskrunner/domain/runner/run_pipeline.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package runner

import (
"encoding/json"
"ylem_taskrunner/helpers/kafka"

messaging "github.com/ylem-co/shared-messaging"
)

Expand All @@ -22,15 +19,7 @@ func RunPipelineTaskRunner(t *messaging.RunPipelineTask) *messaging.TaskRunResul
tr.IsInitialTask = t.IsInitialTask
tr.IsFinalTask = t.IsFinalTask
tr.Meta = t.Meta

result, err := json.Marshal(t.PipelineToRunUuid)
if err != nil {
kafka.HandleBadRequestError(t.TaskUuid, messaging.TaskRunPipelineMessageName, err, tr)

return tr
}

tr.Output = result
tr.Output = t.Input

return tr
})
Expand Down
2 changes: 1 addition & 1 deletion processor/taskrunner/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ require (
github.com/ylem-co/hubspot-client v0.0.0-20240902164341-00f6a99cfdd1 // indirect
github.com/ylem-co/opsgenie-client v0.0.0-20240902173345-eb25eabd0214 // indirect
github.com/ylem-co/salesforce-client v0.0.0-20240902165912-6a570283e8d5 // indirect
github.com/ylem-co/shared-messaging v0.0.0-20240902171028-26da5a197fd4 // indirect
github.com/ylem-co/shared-messaging v0.0.0-20250212095619-4155f4bd1124 // indirect
golang.org/x/sys v0.15.0 // indirect
)

Expand Down
2 changes: 2 additions & 0 deletions processor/taskrunner/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ github.com/ylem-co/shared-messaging v0.0.0-20240717175742-c78b09b209c0 h1:MxPzTV
github.com/ylem-co/shared-messaging v0.0.0-20240717175742-c78b09b209c0/go.mod h1:mMMhSSerLwzVnqWs+gwfcbVBaEx66FzBBhq4HOlO4Iw=
github.com/ylem-co/shared-messaging v0.0.0-20240902171028-26da5a197fd4 h1:EMlJO+cJp5mC+bNGeq/leKaRNPXMnN/LESGKTIpr+FQ=
github.com/ylem-co/shared-messaging v0.0.0-20240902171028-26da5a197fd4/go.mod h1:OAQr4N1aBobOY7KC4D4bRXWmdMl4SHxsKSs8yBniIoM=
github.com/ylem-co/shared-messaging v0.0.0-20250212095619-4155f4bd1124 h1:gf42BLiDeMQP3EH6EQ7UmvCLSOsOIlJ2wmTMwNV3mdo=
github.com/ylem-co/shared-messaging v0.0.0-20250212095619-4155f4bd1124/go.mod h1:OAQr4N1aBobOY7KC4D4bRXWmdMl4SHxsKSs8yBniIoM=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down

0 comments on commit 7732d9f

Please sign in to comment.