From 69def277fe4421c9f9e75085ac17a7dfff9d6ff5 Mon Sep 17 00:00:00 2001 From: Thomas Johansen Date: Mon, 20 Jul 2020 14:25:39 +0200 Subject: [PATCH] fix: improve wait polling robustness --- cmd/remove.go | 27 +++++++++++++++++++++++++++ cmd/run.go | 36 ++++++++++++++++++++++++++---------- 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/cmd/remove.go b/cmd/remove.go index 4b14af9..b278850 100644 --- a/cmd/remove.go +++ b/cmd/remove.go @@ -6,6 +6,10 @@ import ( "github.com/spf13/cobra" ) +var ( + waitForDelete bool +) + var removeCmd = &cobra.Command{ Use: "rm ", Short: "Remove job from cluster", @@ -16,14 +20,37 @@ var removeCmd = &cobra.Command{ } name := args[0] + + job, err := kubectx.GetJob(name) + if err != nil { + return fmt.Errorf("unable to get job: %w", err) + } + + if job == nil { + fmt.Printf("Nothing to delete: no job named %s found\n", name) + return nil + } + + fmt.Printf("Deleting job %s...\n", name) if err := kubectx.DeleteJob(name); err != nil { return fmt.Errorf("unable to delete job: %w", err) } + if !waitForDelete { + return nil + } + + if err := waitUntilDeleted(name); err != nil { + return fmt.Errorf("timed out waiting for job to be deleted: %w", err) + } + return nil }, } func init() { rootCmd.AddCommand(removeCmd) + + flags := removeCmd.Flags() + flags.BoolVarP(&waitForDelete, "wait", "w", false, "wait for job to be deleted") } diff --git a/cmd/run.go b/cmd/run.go index ba3a4b2..248b920 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -11,7 +11,6 @@ import ( "github.com/spf13/cobra" "github.com/uitml/frink/internal/k8s" "github.com/uitml/frink/internal/k8s/retry" - batchv1 "k8s.io/api/batch/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/wait" ) @@ -50,7 +49,7 @@ var runCmd = &cobra.Command{ // TODO: Reconsider this? Many reasons to avoid this; should be challenged. k8s.OverrideJobSpec(job) - if err := deletePreviousJob(job); err != nil { + if err := deletePreviousJob(job.Name); err != nil { return fmt.Errorf("unable to delete previous job: %w", err) } @@ -66,6 +65,10 @@ var runCmd = &cobra.Command{ return nil } + if err := waitUntilStarted(job.Name); err != nil { + return fmt.Errorf("timed out waiting for job to start: %w", err) + } + // TODO: Ensure nil references are properly handled in this block. err = retry.OnError(backoff, apierrors.IsBadRequest, func() error { req, err := kubectx.GetJobLogs(job.Name, k8s.DefaultLogOptions) @@ -105,20 +108,20 @@ func init() { flags.BoolVarP(&follow, "follow", "f", false, "wait for job to start, then stream logs") } -func deletePreviousJob(job *batchv1.Job) error { - oldJob, err := kubectx.GetJob(job.Name) +func deletePreviousJob(name string) error { + job, err := kubectx.GetJob(name) if err != nil { return fmt.Errorf("unable to get previous job: %w", err) } - if oldJob != nil { + if job != nil { fmt.Println("Deleting previous job...") - err = kubectx.DeleteJob(oldJob.Name) + err = kubectx.DeleteJob(job.Name) if err != nil { return err } - if err := waitUntilDeleted(oldJob); err != nil { + if err := waitUntilDeleted(job.Name); err != nil { return err } } @@ -126,14 +129,27 @@ func deletePreviousJob(job *batchv1.Job) error { return nil } -func waitUntilDeleted(job *batchv1.Job) error { +func waitUntilDeleted(name string) error { + err := wait.Poll(100*time.Millisecond, 120*time.Second, func() (bool, error) { + job, err := kubectx.GetJob(name) + if err != nil { + return false, err + } + + return job == nil, nil + }) + + return err +} + +func waitUntilStarted(name string) error { err := wait.Poll(100*time.Millisecond, 120*time.Second, func() (bool, error) { - oldJob, err := kubectx.GetJob(job.Name) + job, err := kubectx.GetJob(name) if err != nil { return false, err } - return oldJob == nil, nil + return job != nil && job.Status.Active > 0, nil }) return err