Skip to content

Commit

Permalink
refactor: align size policies with Kubernetes resource quantity conve…
Browse files Browse the repository at this point in the history
…ntions

- Updated size policies from int-based values to resource.Quantity
  for consistency with Kubernetes resource limits.
- Enabled the use of units such as Ki, Mi, Gi for max checkpoint
  size and total size policies.

Signed-off-by: Parthiba-Hazra <[email protected]>
  • Loading branch information
Parthiba-Hazra committed Nov 10, 2024
1 parent c48b886 commit c322bf4
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 107 deletions.
45 changes: 23 additions & 22 deletions api/v1/checkpointrestoreoperator_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1

import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -34,37 +35,37 @@ type CheckpointRestoreOperatorSpec struct {
}

type GlobalPolicySpec struct {
MaxCheckpointsPerNamespaces *int `json:"maxCheckpointsPerNamespace,omitempty"`
MaxCheckpointsPerPod *int `json:"maxCheckpointsPerPod,omitempty"`
MaxCheckpointsPerContainer *int `json:"maxCheckpointsPerContainer,omitempty"`
MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"`
MaxTotalSizePerNamespace *int `json:"maxTotalSizePerNamespace,omitempty"`
MaxTotalSizePerPod *int `json:"maxTotalSizePerPod,omitempty"`
MaxTotalSizePerContainer *int `json:"maxTotalSizePerContainer,omitempty"`
MaxCheckpointsPerNamespaces *int `json:"maxCheckpointsPerNamespace,omitempty"`
MaxCheckpointsPerPod *int `json:"maxCheckpointsPerPod,omitempty"`
MaxCheckpointsPerContainer *int `json:"maxCheckpointsPerContainer,omitempty"`
MaxCheckpointSize *resource.Quantity `json:"maxCheckpointSize,omitempty"`
MaxTotalSizePerNamespace *resource.Quantity `json:"maxTotalSizePerNamespace,omitempty"`
MaxTotalSizePerPod *resource.Quantity `json:"maxTotalSizePerPod,omitempty"`
MaxTotalSizePerContainer *resource.Quantity `json:"maxTotalSizePerContainer,omitempty"`
}

type ContainerPolicySpec struct {
Namespace string `json:"namespace,omitempty"`
Pod string `json:"pod,omitempty"`
Container string `json:"container,omitempty"`
MaxCheckpoints *int `json:"maxCheckpoints,omitempty"`
MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"`
MaxTotalSize *int `json:"maxTotalSize,omitempty"`
Namespace string `json:"namespace,omitempty"`
Pod string `json:"pod,omitempty"`
Container string `json:"container,omitempty"`
MaxCheckpoints *int `json:"maxCheckpoints,omitempty"`
MaxCheckpointSize *resource.Quantity `json:"maxCheckpointSize,omitempty"`
MaxTotalSize *resource.Quantity `json:"maxTotalSize,omitempty"`
}

type PodPolicySpec struct {
Namespace string `json:"namespace,omitempty"`
Pod string `json:"pod,omitempty"`
MaxCheckpoints *int `json:"maxCheckpoints,omitempty"`
MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"`
MaxTotalSize *int `json:"maxTotalSize,omitempty"`
Namespace string `json:"namespace,omitempty"`
Pod string `json:"pod,omitempty"`
MaxCheckpoints *int `json:"maxCheckpoints,omitempty"`
MaxCheckpointSize *resource.Quantity `json:"maxCheckpointSize,omitempty"`
MaxTotalSize *resource.Quantity `json:"maxTotalSize,omitempty"`
}

type NamespacePolicySpec struct {
Namespace string `json:"namespace,omitempty"`
MaxCheckpoints *int `json:"maxCheckpoints,omitempty"`
MaxCheckpointSize *int `json:"maxCheckpointSize,omitempty"`
MaxTotalSize *int `json:"maxTotalSize,omitempty"`
Namespace string `json:"namespace,omitempty"`
MaxCheckpoints *int `json:"maxCheckpoints,omitempty"`
MaxCheckpointSize *resource.Quantity `json:"maxCheckpointSize,omitempty"`
MaxTotalSize *resource.Quantity `json:"maxTotalSize,omitempty"`
}

// CheckpointRestoreOperatorStatus defines the observed state of CheckpointRestoreOperator
Expand Down
40 changes: 20 additions & 20 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 65 additions & 56 deletions internal/controller/checkpointrestoreoperator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
metadata "github.com/checkpoint-restore/checkpointctl/lib"
"github.com/containers/storage/pkg/archive"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
kubelettypes "k8s.io/kubelet/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -54,24 +55,18 @@ const (
BySize
)

// Constants for unit conversion
const (
KB = 1024
MB = 1024 * KB
)

var (
GarbageCollector garbageCollector
policyMutex sync.RWMutex
checkpointDirectory string = "/var/lib/kubelet/checkpoints"
quit chan bool
maxCheckpointsPerContainer int = 10
maxCheckpointsPerPod int = 20
maxCheckpointsPerNamespace int = 30
maxCheckpointSize int = math.MaxInt32
maxTotalSizePerPod int = math.MaxInt32
maxTotalSizePerContainer int = math.MaxInt32
maxTotalSizePerNamespace int = math.MaxInt32
maxCheckpointsPerContainer int = 10
maxCheckpointsPerPod int = 20
maxCheckpointsPerNamespace int = 30
maxCheckpointSize resource.Quantity = resource.MustParse("1Ei")
maxTotalSizePerPod resource.Quantity = resource.MustParse("1Ei")
maxTotalSizePerContainer resource.Quantity = resource.MustParse("1Ei")
maxTotalSizePerNamespace resource.Quantity = resource.MustParse("1Ei")
containerPolicies []criuorgv1.ContainerPolicySpec
podPolicies []criuorgv1.PodPolicySpec
namespacePolicies []criuorgv1.NamespacePolicySpec
Expand Down Expand Up @@ -124,10 +119,10 @@ func resetAllPoliciesToDefault(log logr.Logger) {
maxCheckpointsPerContainer = 10
maxCheckpointsPerPod = 20
maxCheckpointsPerNamespace = 30
maxCheckpointSize = math.MaxInt32
maxTotalSizePerContainer = math.MaxInt32
maxTotalSizePerPod = math.MaxInt32
maxTotalSizePerNamespace = math.MaxInt32
maxCheckpointSize = resource.MustParse("1Ei")
maxTotalSizePerContainer = resource.MustParse("1Ei")
maxTotalSizePerPod = resource.MustParse("1Ei")
maxTotalSizePerNamespace = resource.MustParse("1Ei")

containerPolicies = nil
podPolicies = nil
Expand Down Expand Up @@ -155,24 +150,24 @@ func (r *CheckpointRestoreOperatorReconciler) handleGlobalPolicies(log logr.Logg
log.Info("Changed MaxCheckpointsPerNamespace", "maxCheckpointsPerNamespace", maxCheckpointsPerNamespace)
}

if globalPolicies.MaxCheckpointSize != nil && *globalPolicies.MaxCheckpointSize >= 0 {
maxCheckpointSize = *globalPolicies.MaxCheckpointSize * MB
log.Info("Changed MaxCheckpointSize", "maxCheckpointSize", maxCheckpointSize)
if globalPolicies.MaxCheckpointSize != nil {
maxCheckpointSize = *globalPolicies.MaxCheckpointSize
log.Info("Changed MaxCheckpointSize", "maxCheckpointSize", maxCheckpointSize.String())
}

if globalPolicies.MaxTotalSizePerNamespace != nil && *globalPolicies.MaxTotalSizePerNamespace >= 0 {
maxTotalSizePerNamespace = *globalPolicies.MaxTotalSizePerNamespace * MB
log.Info("Changed MaxTotalSizePerNamespace", "maxTotalSizePerNamespace", maxTotalSizePerNamespace)
if globalPolicies.MaxTotalSizePerNamespace != nil {
maxTotalSizePerNamespace = *globalPolicies.MaxTotalSizePerNamespace
log.Info("Changed MaxTotalSizePerNamespace", "maxTotalSizePerNamespace", maxTotalSizePerNamespace.String())
}

if globalPolicies.MaxTotalSizePerPod != nil && *globalPolicies.MaxTotalSizePerPod >= 0 {
maxTotalSizePerPod = *globalPolicies.MaxTotalSizePerPod * MB
log.Info("Changed MaxTotalSizePerPod", "maxTotalSizePerPod", maxTotalSizePerPod)
if globalPolicies.MaxTotalSizePerPod != nil {
maxTotalSizePerPod = *globalPolicies.MaxTotalSizePerPod
log.Info("Changed MaxTotalSizePerPod", "maxTotalSizePerPod", maxTotalSizePerPod.String())
}

if globalPolicies.MaxTotalSizePerContainer != nil && *globalPolicies.MaxTotalSizePerContainer >= 0 {
maxTotalSizePerContainer = *globalPolicies.MaxTotalSizePerContainer * MB
log.Info("Changed MaxTotalSizePerContainer", "maxTotalSizePerContainer", maxTotalSizePerContainer)
if globalPolicies.MaxTotalSizePerContainer != nil {
maxTotalSizePerContainer = *globalPolicies.MaxTotalSizePerContainer
log.Info("Changed MaxTotalSizePerContainer", "maxTotalSizePerContainer", maxTotalSizePerContainer.String())
}
}

Expand Down Expand Up @@ -332,38 +327,47 @@ func getCheckpointArchiveInformation(log logr.Logger, checkpointPath string) (*c

type Policy struct {
MaxCheckpoints int
MaxCheckpointSize int
MaxTotalSize int
MaxCheckpointSize resource.Quantity
MaxTotalSize resource.Quantity
}

func applyPolicies(log logr.Logger, details *checkpointDetails) {
policyMutex.Lock()
defer policyMutex.Unlock()

toInfinity := func(value *int) int {
// Function to handle default "infinity" value for count-based policies
toInfinityCount := func(value *int) int {
if value == nil {
return math.MaxInt32
}
return *value
}

// Function to handle default "infinity" value for size-based policies
toInfinitySize := func(value *resource.Quantity) resource.Quantity {
if value == nil {
return resource.MustParse("1Ei")
}
return *value
}

if policy := findContainerPolicy(details); policy != nil {
handleCheckpointsForLevel(log, details, "container", Policy{
MaxCheckpoints: toInfinity(policy.MaxCheckpoints),
MaxCheckpointSize: toInfinity(policy.MaxCheckpointSize) * MB,
MaxTotalSize: toInfinity(policy.MaxTotalSize) * MB,
MaxCheckpoints: toInfinityCount(policy.MaxCheckpoints),
MaxCheckpointSize: toInfinitySize(policy.MaxCheckpointSize),
MaxTotalSize: toInfinitySize(policy.MaxTotalSize),
})
} else if policy := findPodPolicy(details); policy != nil {
handleCheckpointsForLevel(log, details, "pod", Policy{
MaxCheckpoints: toInfinity(policy.MaxCheckpoints),
MaxCheckpointSize: toInfinity(policy.MaxCheckpointSize) * MB,
MaxTotalSize: toInfinity(policy.MaxTotalSize) * MB,
MaxCheckpoints: toInfinityCount(policy.MaxCheckpoints),
MaxCheckpointSize: toInfinitySize(policy.MaxCheckpointSize),
MaxTotalSize: toInfinitySize(policy.MaxTotalSize),
})
} else if policy := findNamespacePolicy(details); policy != nil {
handleCheckpointsForLevel(log, details, "namespace", Policy{
MaxCheckpoints: toInfinity(policy.MaxCheckpoints),
MaxCheckpointSize: toInfinity(policy.MaxCheckpointSize) * MB,
MaxTotalSize: toInfinity(policy.MaxTotalSize) * MB,
MaxCheckpoints: toInfinityCount(policy.MaxCheckpoints),
MaxCheckpointSize: toInfinitySize(policy.MaxCheckpointSize),
MaxTotalSize: toInfinitySize(policy.MaxTotalSize),
})
} else {
// Apply global policies if no specific policy found
Expand Down Expand Up @@ -466,11 +470,11 @@ func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, leve
log.Info("MaxCheckpoints is less than or equal to 0, skipping checkpoint handling", "level", level, "policy.MaxCheckpoints", policy.MaxCheckpoints)
return
}
if policy.MaxCheckpointSize <= 0 {
if policy.MaxCheckpointSize.Value() <= 0 {
log.Info("MaxCheckpointSize is less than or equal to 0, skipping checkpoint handling", "level", level, "policy.MaxCheckpointSize", policy.MaxCheckpointSize)
return
}
if policy.MaxTotalSize <= 0 {
if policy.MaxTotalSize.Value() <= 0 {
log.Info("MaxTotalSize is less than or equal to 0, skipping checkpoint handling", "level", level, "policy.MaxTotalSize", policy.MaxTotalSize)
return
}
Expand Down Expand Up @@ -529,7 +533,7 @@ func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, leve
}

checkpointArchivesCounter := len(filteredArchives)
totalSize := int64(0)
totalSize := resource.NewQuantity(0, resource.BinarySI)
archiveSizes := make(map[string]int64)
archivesToDelete := make(map[int64]string)

Expand All @@ -540,16 +544,19 @@ func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, leve
continue
}

log.Info("Checkpoint archive details", "archive", c, "size", fi.Size(), "maxCheckpointSize", policy.MaxCheckpointSize)
if policy.MaxCheckpointSize > 0 && fi.Size() > int64(policy.MaxCheckpointSize) {
log.Info("Deleting checkpoint archive due to exceeding MaxCheckpointSize", "archive", c, "size", fi.Size(), "maxCheckpointSize", policy.MaxCheckpointSize)
currentSize := resource.NewQuantity(fi.Size(), resource.BinarySI)
log.Info("Checkpoint archive details", "archive", c, "size", currentSize.String(), "maxCheckpointSize", policy.MaxCheckpointSize.String())

if policy.MaxCheckpointSize.Cmp(*currentSize) < 0 {
log.Info("Deleting checkpoint archive due to exceeding MaxCheckpointSize", "archive", c, "size", currentSize.String(), "maxCheckpointSize", policy.MaxCheckpointSize.String())
err := os.Remove(c)
if err != nil {
log.Error(err, "failed to remove checkpoint archive", "archive", c)
}
continue
}
totalSize += fi.Size()

totalSize.Add(*currentSize)
archiveSizes[c] = fi.Size()
archivesToDelete[fi.ModTime().UnixMicro()] = c
}
Expand All @@ -558,7 +565,7 @@ func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, leve
if policy.MaxCheckpoints > 0 && checkpointArchivesCounter > policy.MaxCheckpoints {
excessCount := int64(checkpointArchivesCounter - policy.MaxCheckpoints)
log.Info("Checkpoint count exceeds limit", "checkpointArchivesCounter", checkpointArchivesCounter, "maxCheckpoints", policy.MaxCheckpoints, "excessCount", excessCount)
toDelete := selectArchivesToDelete(log, checkpointArchives, archiveSizes, excessCount, ByCount)
toDelete := selectArchivesToDelete(log, filteredArchives, archiveSizes, excessCount, ByCount)
for _, archive := range toDelete {
log.Info("Deleting checkpoint archive due to excess count", "archive", archive)
err := os.Remove(archive)
Expand All @@ -573,19 +580,21 @@ func handleCheckpointsForLevel(log logr.Logger, details *checkpointDetails, leve
}

// Handle total size against maxTotalSize
if policy.MaxTotalSize > 0 && totalSize > int64(policy.MaxTotalSize) {
excessSize := totalSize - int64(policy.MaxTotalSize)
log.Info("Total size of checkpoint archives exceeds limit", "totalSize", totalSize, "maxTotalSize", policy.MaxTotalSize, "excessSize", excessSize)
toDelete := selectArchivesToDelete(log, filteredArchives, archiveSizes, excessSize, BySize)
if policy.MaxTotalSize.Cmp(*totalSize) < 0 {
excessSize := totalSize.DeepCopy()
excessSize.Sub(policy.MaxTotalSize)
log.Info("Total size of checkpoint archives exceeds limit", "totalSize", totalSize.String(), "maxTotalSize", policy.MaxTotalSize.String(), "excessSize", excessSize.String())
toDelete := selectArchivesToDelete(log, filteredArchives, archiveSizes, excessSize.Value(), BySize)
for _, archive := range toDelete {
log.Info("Deleting checkpoint archive due to excess size", "archive", archive)
err := os.Remove(archive)
if err != nil {
log.Error(err, "Removal of checkpoint archive failed", "archive", archive)
log.Error(err, "removal of checkpoint archive failed", "archive", archive)
}
totalSize -= archiveSizes[archive]
currentSize := resource.NewQuantity(archiveSizes[archive], resource.BinarySI)
totalSize.Sub(*currentSize)
delete(archiveSizes, archive)
if totalSize <= int64(policy.MaxTotalSize) {
if policy.MaxTotalSize.Cmp(*totalSize) >= 0 {
break
}
}
Expand Down
Loading

0 comments on commit c322bf4

Please sign in to comment.