Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][kubectl-plugin] list Ray nodes #3007

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions kubectl-plugin/pkg/cmd/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,15 @@ func NewGetCommand(streams genericclioptions.IOStreams) *cobra.Command {

cmd.AddCommand(NewGetClusterCommand(streams))
cmd.AddCommand(NewGetWorkerGroupCommand(streams))
cmd.AddCommand(NewGetNodesCommand(streams))
return cmd
}

// joinLabelMap joins a map of K8s label key-val entries into a label selector string
func joinLabelMap(labelMap map[string]string) string {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved here from get_workergroup.go since it's used by get_nodes.go now too

var labels []string
for k, v := range labelMap {
labels = append(labels, fmt.Sprintf("%s=%s", k, v))
}
return strings.Join(labels, ",")
}
256 changes: 256 additions & 0 deletions kubectl-plugin/pkg/cmd/get/get_nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package get

import (
"context"
"errors"
"fmt"
"io"
"time"

"github.com/ray-project/kuberay/kubectl-plugin/pkg/util"
"github.com/ray-project/kuberay/kubectl-plugin/pkg/util/client"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/duration"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/printers"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/util/templates"
)

type GetNodesOptions struct {
configFlags *genericclioptions.ConfigFlags
ioStreams *genericclioptions.IOStreams
namespace string
cluster string
node string
allNamespaces bool
}

type node struct {
creationTimestamp v1.Time
cpus resource.Quantity
gpus resource.Quantity
tpus resource.Quantity
memory resource.Quantity
namespace string
cluster string
_type string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the underscore on this field?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type is a reserved keyword in Go. Lmk if there's a better name or convention.

workerGroup string
name string
}

var getNodesExample = templates.Examples(`
# Get nodes in the default namespace
kubectl ray get node

# Get nodes in all namespaces
kubectl ray get node --all-namespaces

# Get all nodes in a namespace
kubectl ray get node --namespace my-namespace

# Get all nodes for Ray clusters named my-raycluster in all namespaces
kubectl ray get node --ray-cluster my-raycluster --all-namespaces

# Get all nodes in a namespace for a Ray cluster
kubectl ray get node --namespace my-namespace --ray-cluster my-raycluster

# Get one node in a namespace for a Ray cluster
kubectl ray get node my-node --namespace my-namespace --ray-cluster my-raycluster
`)

func NewGetNodesOptions(streams genericclioptions.IOStreams) *GetNodesOptions {
return &GetNodesOptions{
configFlags: genericclioptions.NewConfigFlags(true),
ioStreams: &streams,
}
}

func NewGetNodesCommand(streams genericclioptions.IOStreams) *cobra.Command {
options := NewGetNodesOptions(streams)
cmdFactory := cmdutil.NewFactory(options.configFlags)

cmd := &cobra.Command{
Use: "node [NODE] [(-c|--ray-cluster) RAYCLUSTER]",
Aliases: []string{"nodes"},
Short: "Get Ray nodes",
Example: getNodesExample,
SilenceUsage: true,
Args: cobra.MaximumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
if err := options.Complete(args); err != nil {
return err
}
if err := options.Validate(); err != nil {
return err
}
k8sClient, err := client.NewClient(cmdFactory)
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}
return options.Run(cmd.Context(), k8sClient)
},
}

cmd.Flags().StringVarP(&options.cluster, "ray-cluster", "c", "", "Ray cluster")
cmd.Flags().BoolVarP(&options.allNamespaces, "all-namespaces", "A", false, "If present, list nodes across all namespaces. Namespace in current context is ignored even if specified with --namespace.")

options.configFlags.AddFlags(cmd.Flags())
return cmd
}

func (options *GetNodesOptions) Complete(args []string) error {
if options.allNamespaces {
options.namespace = ""
} else {
if options.configFlags.Namespace == nil || *options.configFlags.Namespace == "" {
options.namespace = "default"
} else {
options.namespace = *options.configFlags.Namespace
}
}

if len(args) > 0 {
options.node = args[0]
}

return nil
}

func (options *GetNodesOptions) Validate() error {
// Overrides and binds the kube config then retrieves the merged result
config, err := options.configFlags.ToRawKubeConfigLoader().RawConfig()
if err != nil {
return fmt.Errorf("error retrieving raw config: %w", err)
}
if !util.HasKubectlContext(config, options.configFlags) {
return fmt.Errorf("no context is currently set, use %q or %q to select a new one", "--context", "kubectl config use-context <context>")
}
return nil
}

func (options *GetNodesOptions) Run(ctx context.Context, k8sClient client.Client) error {
listopts := v1.ListOptions{
LabelSelector: joinLabelMap(createRayNodeLabelSelectors(options.cluster)),
}

if options.node != "" {
listopts.FieldSelector = fmt.Sprintf("metadata.name=%s", options.node)
}

pods, err := k8sClient.KubernetesClient().CoreV1().Pods(options.namespace).List(ctx, listopts)
if err != nil {
return fmt.Errorf("unable to get Ray nodes: %w", err)
}

nodes := podsToNodes(pods.Items)

if options.node != "" && len(nodes) == 0 {
return errors.New(errorMessageForNodeNotFound(options.node, options.cluster, options.namespace, options.allNamespaces))
}

return printNodes(nodes, options.allNamespaces, options.ioStreams.Out)
}

// createRayNodeLabelSelectors creates a map of K8s label selectors for Ray nodes
func createRayNodeLabelSelectors(cluster string) map[string]string {
labelSelectors := map[string]string{
util.RayIsRayNodeLabelKey: "yes",
}
if cluster != "" {
labelSelectors[util.RayClusterLabelKey] = cluster
}
return labelSelectors
}

// errorMessageForNodeNotFound returns an error message for when a node is not found
func errorMessageForNodeNotFound(node, cluster, namespace string, allNamespaces bool) string {
errMsg := fmt.Sprintf("Ray node %s not found", node)

if allNamespaces {
errMsg += " in any namespace"
} else {
errMsg += fmt.Sprintf(" in namespace %s", namespace)
}

if cluster != "" {
errMsg += fmt.Sprintf(" in Ray cluster %s", cluster)
} else {
errMsg += " in any Ray cluster"
}

return errMsg
}

// podsToNodes converts an array of K8s Pods to a list of nodes
func podsToNodes(pods []corev1.Pod) []node {
var nodes []node
for _, pod := range pods {
nodes = append(nodes, node{
namespace: pod.Namespace,
cluster: pod.Labels[util.RayClusterLabelKey],
_type: pod.Labels[util.RayNodeTypeLabelKey],
workerGroup: pod.Labels[util.RayNodeGroupLabelKey],
name: pod.Name,
cpus: *pod.Spec.Containers[0].Resources.Requests.Cpu(),
gpus: *pod.Spec.Containers[0].Resources.Requests.Name(corev1.ResourceName(util.ResourceNvidiaGPU), resource.DecimalSI),
tpus: *pod.Spec.Containers[0].Resources.Requests.Name(corev1.ResourceName(util.ResourceGoogleTPU), resource.DecimalSI),
memory: *pod.Spec.Containers[0].Resources.Requests.Memory(),
creationTimestamp: pod.CreationTimestamp,
})
}
return nodes
}

// printNodes prints a list of nodes to the output
func printNodes(nodes []node, allNamespaces bool, output io.Writer) error {
resultTablePrinter := printers.NewTablePrinter(printers.PrintOptions{})

columns := []v1.TableColumnDefinition{}
if allNamespaces {
columns = append(columns, v1.TableColumnDefinition{Name: "Namespace", Type: "string"})
}
columns = append(columns, []v1.TableColumnDefinition{
{Name: "Name", Type: "string"},
{Name: "CPUs", Type: "string"},
{Name: "GPUs", Type: "string"},
{Name: "TPUs", Type: "string"},
{Name: "Memory", Type: "string"},
{Name: "Cluster", Type: "string"},
{Name: "Type", Type: "string"},
{Name: "Worker Group", Type: "string"},
{Name: "Age", Type: "string"},
}...)

resTable := &v1.Table{ColumnDefinitions: columns}

for _, node := range nodes {
age := duration.HumanDuration(time.Since(node.creationTimestamp.Time))
if node.creationTimestamp.Time.IsZero() {
age = "<unknown>"
}

row := v1.TableRow{}
if allNamespaces {
row.Cells = append(row.Cells, node.namespace)
}
row.Cells = append(row.Cells, []interface{}{
node.name,
node.cpus.String(),
node.gpus.String(),
node.tpus.String(),
node.memory.String(),
node.cluster,
node._type,
node.workerGroup,
age,
}...)

resTable.Rows = append(resTable.Rows, row)
}

return resultTablePrinter.PrintObj(resTable, output)
}
Loading
Loading