Skip to content

Commit

Permalink
fix: record new pod when receate mount pod (#1269)
Browse files Browse the repository at this point in the history
Signed-off-by: zwwhdls <[email protected]>
  • Loading branch information
zwwhdls authored Feb 12, 2025
1 parent 59be5c5 commit 068fb72
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
26 changes: 23 additions & 3 deletions pkg/controller/pod_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path"
"runtime"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -53,6 +54,7 @@ var (
)

type PodDriver struct {
lock sync.Mutex
Client *k8sclient.K8sClient
handlers map[podStatus]podHandler
uniqueIdIndex map[string][]podWithUpgradeUUID
Expand Down Expand Up @@ -92,6 +94,7 @@ func NewPodDriver(client *k8sclient.K8sClient, mounter mount.SafeFormatAndMount,

func newPodDriver(client *k8sclient.K8sClient, mounter mount.SafeFormatAndMount) *PodDriver {
driver := &PodDriver{
lock: sync.Mutex{},
Client: client,
handlers: map[podStatus]podHandler{},
SafeFormatAndMount: mounter,
Expand Down Expand Up @@ -279,7 +282,7 @@ func (p *PodDriver) podCompleteHandler(ctx context.Context, pod *corev1.Pod) (Re
lock.Lock()
defer lock.Unlock()

hasAvailPod := p.getAvailableMountPod(ctx, pod.Labels[common.PodUniqueIdLabelKey], resource.GetUpgradeUUID(pod))
hasAvailPod := p.getAvailableMountPod(pod.Labels[common.PodUniqueIdLabelKey], resource.GetUpgradeUUID(pod))
if !hasAvailPod {
newPodName := podmount.GenPodNameByUniqueId(resource.GetUniqueId(*pod), true)
log.Info("need to create a new one", "newPodName", newPodName)
Expand Down Expand Up @@ -309,6 +312,7 @@ func (p *PodDriver) podCompleteHandler(ctx context.Context, pod *corev1.Pod) (Re
log.Error(err, "Create pod")
return Result{}, err
}
p.recordPod(newPod.Labels[common.PodUniqueIdLabelKey], resource.GetUpgradeUUID(newPod))
}

// delete the old one
Expand Down Expand Up @@ -444,7 +448,7 @@ func (p *PodDriver) podDeletedHandler(ctx context.Context, pod *corev1.Pod) (Res
}
}

hasAvailPod := p.getAvailableMountPod(ctx, pod.Labels[common.PodUniqueIdLabelKey], resource.GetUpgradeUUID(pod))
hasAvailPod := p.getAvailableMountPod(pod.Labels[common.PodUniqueIdLabelKey], resource.GetUpgradeUUID(pod))

// if no reference, clean up
if len(existTargets) == 0 && !hasAvailPod {
Expand All @@ -470,6 +474,7 @@ func (p *PodDriver) podDeletedHandler(ctx context.Context, pod *corev1.Pod) (Res
log.Error(err, "Create pod")
return Result{}, err
}
p.recordPod(newPod.Labels[common.PodUniqueIdLabelKey], resource.GetUpgradeUUID(newPod))
}
// remove finalizer of pod
if err := resource.RemoveFinalizer(ctx, p.Client, pod, common.Finalizer); err != nil {
Expand Down Expand Up @@ -1031,8 +1036,10 @@ func mkrMp(ctx context.Context, pod corev1.Pod) error {
return nil
}

func (p *PodDriver) getAvailableMountPod(ctx context.Context, uniqueId, upgradeUUID string) bool {
func (p *PodDriver) getAvailableMountPod(uniqueId, upgradeUUID string) bool {
// check pods in which get from kubelet
p.lock.Lock()
defer p.lock.Unlock()
for _, u := range p.uniqueIdIndex[uniqueId] {
if u.upgradeUUID == upgradeUUID {
if u.status != podDeleted && u.status != podComplete {
Expand All @@ -1043,6 +1050,19 @@ func (p *PodDriver) getAvailableMountPod(ctx context.Context, uniqueId, upgradeU
return false
}

func (p *PodDriver) recordPod(uniqueId, upgradeUUID string) {
// record pod in pool which get from kubelet
p.lock.Lock()
defer p.lock.Unlock()
if p.uniqueIdIndex[uniqueId] == nil {
p.uniqueIdIndex[uniqueId] = make([]podWithUpgradeUUID, 0)
}
p.uniqueIdIndex[uniqueId] = append(p.uniqueIdIndex[uniqueId], podWithUpgradeUUID{
upgradeUUID: upgradeUUID,
status: podPending,
})
}

func (p *PodDriver) getUniqueMountPod(ctx context.Context, uniqueId string) bool {
// check pods in which get from kubelet
for _, u := range p.uniqueIdIndex[uniqueId] {
Expand Down
19 changes: 10 additions & 9 deletions pkg/controller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) {
backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
lastPodStatus := make(map[string]PodStatus)
statusMu := sync.Mutex{}
mounter := mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: k8sexec.New(),
}
for {
timeoutCtx, cancel := context.WithTimeout(context.Background(), config.ReconcileTimeout)
g, ctx := errgroup.WithContext(timeoutCtx)
Expand All @@ -94,8 +98,12 @@ func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) {
podList, err := kc.GetNodeRunningPods()
if err != nil {
reconcilerLog.Error(err, "doReconcile GetNodeRunningPods error")
goto finish
cancel()
time.Sleep(time.Duration(config.ReconcilerInterval) * time.Second)
continue
}
podDriver := NewPodDriver(ks, mounter, podList)
podDriver.SetMountInfo(*mit)

for i := range podList.Items {
pod := &podList.Items[i]
Expand Down Expand Up @@ -123,13 +131,6 @@ func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) {
continue
}
g.Go(func() error {
mounter := mount.SafeFormatAndMount{
Interface: mount.New(""),
Exec: k8sexec.New(),
}
podDriver := NewPodDriver(ks, mounter, podList)
podDriver.SetMountInfo(*mit)

errChan := make(chan error, 1)
go func() {
defer close(errChan)
Expand Down Expand Up @@ -175,7 +176,7 @@ func doReconcile(ks *k8sclient.K8sClient, kc *k8sclient.KubeletClient) {
backOff.GC()
_ = g.Wait()
podList = nil
finish:

cancel()
time.Sleep(time.Duration(config.ReconcilerInterval) * time.Second)
}
Expand Down

0 comments on commit 068fb72

Please sign in to comment.