Skip to content

Commit

Permalink
fix: improve wait polling robustness
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasjo committed Jul 20, 2020
1 parent bdec0bb commit 69def27
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 10 deletions.
27 changes: 27 additions & 0 deletions cmd/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"github.com/spf13/cobra"
)

var (
waitForDelete bool
)

var removeCmd = &cobra.Command{
Use: "rm <name>",
Short: "Remove job from cluster",
Expand All @@ -16,14 +20,37 @@ var removeCmd = &cobra.Command{
}

name := args[0]

job, err := kubectx.GetJob(name)
if err != nil {
return fmt.Errorf("unable to get job: %w", err)
}

if job == nil {
fmt.Printf("Nothing to delete: no job named %s found\n", name)
return nil
}

fmt.Printf("Deleting job %s...\n", name)
if err := kubectx.DeleteJob(name); err != nil {
return fmt.Errorf("unable to delete job: %w", err)
}

if !waitForDelete {
return nil
}

if err := waitUntilDeleted(name); err != nil {
return fmt.Errorf("timed out waiting for job to be deleted: %w", err)
}

return nil
},
}

func init() {
rootCmd.AddCommand(removeCmd)

flags := removeCmd.Flags()
flags.BoolVarP(&waitForDelete, "wait", "w", false, "wait for job to be deleted")
}
36 changes: 26 additions & 10 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/spf13/cobra"
"github.com/uitml/frink/internal/k8s"
"github.com/uitml/frink/internal/k8s/retry"
batchv1 "k8s.io/api/batch/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
)
Expand Down Expand Up @@ -50,7 +49,7 @@ var runCmd = &cobra.Command{
// TODO: Reconsider this? Many reasons to avoid this; should be challenged.
k8s.OverrideJobSpec(job)

if err := deletePreviousJob(job); err != nil {
if err := deletePreviousJob(job.Name); err != nil {
return fmt.Errorf("unable to delete previous job: %w", err)
}

Expand All @@ -66,6 +65,10 @@ var runCmd = &cobra.Command{
return nil
}

if err := waitUntilStarted(job.Name); err != nil {
return fmt.Errorf("timed out waiting for job to start: %w", err)
}

// TODO: Ensure nil references are properly handled in this block.
err = retry.OnError(backoff, apierrors.IsBadRequest, func() error {
req, err := kubectx.GetJobLogs(job.Name, k8s.DefaultLogOptions)
Expand Down Expand Up @@ -105,35 +108,48 @@ func init() {
flags.BoolVarP(&follow, "follow", "f", false, "wait for job to start, then stream logs")
}

func deletePreviousJob(job *batchv1.Job) error {
oldJob, err := kubectx.GetJob(job.Name)
func deletePreviousJob(name string) error {
job, err := kubectx.GetJob(name)
if err != nil {
return fmt.Errorf("unable to get previous job: %w", err)
}

if oldJob != nil {
if job != nil {
fmt.Println("Deleting previous job...")
err = kubectx.DeleteJob(oldJob.Name)
err = kubectx.DeleteJob(job.Name)
if err != nil {
return err
}

if err := waitUntilDeleted(oldJob); err != nil {
if err := waitUntilDeleted(job.Name); err != nil {
return err
}
}

return nil
}

func waitUntilDeleted(job *batchv1.Job) error {
func waitUntilDeleted(name string) error {
err := wait.Poll(100*time.Millisecond, 120*time.Second, func() (bool, error) {
job, err := kubectx.GetJob(name)
if err != nil {
return false, err
}

return job == nil, nil
})

return err
}

func waitUntilStarted(name string) error {
err := wait.Poll(100*time.Millisecond, 120*time.Second, func() (bool, error) {
oldJob, err := kubectx.GetJob(job.Name)
job, err := kubectx.GetJob(name)
if err != nil {
return false, err
}

return oldJob == nil, nil
return job != nil && job.Status.Active > 0, nil
})

return err
Expand Down

0 comments on commit 69def27

Please sign in to comment.