Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/common/env_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,9 @@ const (
const (
EnvFuseSidecarInjectionMode = "FUSE_SIDECAR_INJECTION_MODE"
)

// Env names that related to legacy mechanisms for backward compatibility
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar: "Env names that related to legacy mechanisms" should be "Env names that are related to legacy mechanisms".

Suggested change
// Env names that related to legacy mechanisms for backward compatibility
// Env names that are related to legacy mechanisms for backward compatibility

Copilot uses AI. Check for mistakes.
const (
LegacyEnvBlockInUseDatasetDeletion = "LEGACY_MECHANISM_BLOCK_IN_USE_DATASET_DELETION"
LegacyEnvForceCleanUpManagedPVC = "LEGACY_MECHANISM_FORCE_CLEAN_UP_MANAGED_PVC"
)
15 changes: 9 additions & 6 deletions pkg/controllers/v1alpha1/dataset/dataset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@
}

// reconcile Dataset Deletion
func (r *DatasetReconciler) reconcileDatasetDeletion(ctx reconcileRequestContext) (ctrl.Result, error) {

Check failure on line 185 in pkg/controllers/v1alpha1/dataset/dataset_controller.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZ2WUZ4jID8J6ZLjdRMF&open=AZ2WUZ4jID8J6ZLjdRMF&pullRequest=5790
log := ctx.Log.WithName("reconcileDatasetDeletion")
log.Info("process the dataset", "dataset", ctx.Dataset)

Expand All @@ -195,12 +195,15 @@
return utils.RequeueAfterInterval(time.Duration(1 * time.Second))
}
*/
// 1.if there is a pod which is using the dataset (or cannot judge), then requeue
err := kubeclient.ShouldDeleteDataset(r.Client, ctx.Name, ctx.Namespace)
if err != nil {
ctx.Log.Error(err, "Failed to delete dataset", "DatasetDeleteError", ctx)
r.Recorder.Eventf(&ctx.Dataset, v1.EventTypeWarning, common.ErrorDeleteDataset, "Failed to delete dataset because err: %s", err.Error())
return utils.RequeueAfterInterval(time.Duration(10 * time.Second))
// 1.WARN: This is a LEGACY MECHANISM and the check will be skipped in common cases. It will be removed in future.
// If there is a pod which is using the dataset (or cannot judge), then requeue
if utils.GetBoolValueFromEnv(common.LegacyEnvBlockInUseDatasetDeletion, false) {
err := kubeclient.ShouldDeleteDataset(r.Client, ctx.Name, ctx.Namespace)
if err != nil {
ctx.Log.Error(err, "Failed to delete dataset", "DatasetDeleteError", ctx)
r.Recorder.Eventf(&ctx.Dataset, v1.EventTypeWarning, common.ErrorDeleteDataset, "Failed to delete dataset because err: %s", err.Error())
return utils.RequeueAfterInterval(time.Duration(10 * time.Second))
}
}

// 2. if there are datasets mounted this dataset, check reference dataset existence and requeue if there still has reference dataset
Expand Down
50 changes: 50 additions & 0 deletions pkg/ctrl/watch/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import (
"context"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"

"github.com/fluid-cloudnative/fluid/pkg/common"
webhookReconcile "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/webhook"
Expand All @@ -28,12 +30,15 @@
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

var (
Expand Down Expand Up @@ -116,7 +121,7 @@
return
}

func SetupWatcherForReconciler(mgr ctrl.Manager, options controller.Options, r Controller) (err error) {

Check failure on line 124 in pkg/ctrl/watch/manager.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 20 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZ2WUZz8ID8J6ZLjdRME&open=AZ2WUZz8ID8J6ZLjdRME&pullRequest=5790
options.Reconciler = r
c, err := controller.New(r.ControllerName(), mgr, options)
if err != nil {
Expand Down Expand Up @@ -158,6 +163,51 @@
return err
}

// PVC shares the same namespace and name with the runtime, enqueueing request with pvc name and namespace will trigger runtime reconciliation
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}), &handler.EnqueueRequestForObject{}, predicate.Funcs{
CreateFunc: func(ce event.CreateEvent) bool { return false },
UpdateFunc: func(ue event.UpdateEvent) bool { return false },
DeleteFunc: func(de event.DeleteEvent) bool {
pvc, ok := de.Object.(*corev1.PersistentVolumeClaim)
if !ok {
return false
}
if len(pvc.Annotations) > 0 && pvc.Annotations["CreatedBy"] == "fluid" {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Hardcoding annotation keys and values like "CreatedBy" and "fluid" is discouraged. These should be defined as constants in a common package to ensure consistency and avoid typos. Additionally, consider using a more robust check that aligns with common.GetExpectedFluidAnnotations().

return true
}
return false
Comment on lines +170 to +178
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The annotation filter is hardcoded as pvc.Annotations["CreatedBy"] == "fluid". Since the codebase already centralizes this in common.GetExpectedFluidAnnotations(), it’s easy for these literals to drift from the canonical key/value. Please reuse the common helper/constant here (and in the PV watch below) to keep the filter consistent across the project.

Copilot uses AI. Check for mistakes.
},
})

if err != nil {
return err
}

pvToRuntimeReqFn := func(ctx context.Context, o client.Object) []reconcile.Request {
pv, ok := o.(*corev1.PersistentVolume)
if !ok {
return []reconcile.Request{}
}

if len(pv.Annotations) > 0 && pv.Annotations["CreatedBy"] == "fluid" {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Hardcoding "CreatedBy" here is inconsistent with other parts of the codebase. Please use a constant instead.

if pv.Spec.ClaimRef != nil && len(pv.Spec.ClaimRef.Name) > 0 && len(pv.Spec.ClaimRef.Namespace) > 0 {
return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: pv.Spec.ClaimRef.Name, Namespace: pv.Spec.ClaimRef.Namespace}}}
}
}

return []reconcile.Request{}
}

err = c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolume{}), handler.EnqueueRequestsFromMapFunc(pvToRuntimeReqFn), predicate.Funcs{
CreateFunc: func(ce event.CreateEvent) bool { return false },
UpdateFunc: func(ue event.UpdateEvent) bool { return false },
DeleteFunc: func(de event.DeleteEvent) bool { return true },
})
Comment on lines +201 to +205
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeleteFunc for PersistentVolume events currently returns true for all PV deletions, even though the mapping function only enqueues requests for Fluid-managed PVs. This causes the controller to process every PV delete event cluster-wide. Consider filtering in the predicate as well (e.g., only pass events where the PV has Fluid annotations / a non-nil ClaimRef) to reduce unnecessary work under high PV churn.

Copilot uses AI. Check for mistakes.

if err != nil {
return err
}

return
}

Expand Down
70 changes: 40 additions & 30 deletions pkg/utils/dataset/volume/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package volume

import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/fluid-cloudnative/fluid/pkg/common"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
Expand Down Expand Up @@ -97,52 +100,59 @@ func DeleteFusePersistentVolumeClaim(client client.Client,
}

stillFound := false
retries := 10
for i := 0; i < retries; i++ {
ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Second)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The 1-second timeout for confirming PVC deletion is extremely short. Kubernetes resource deletion often takes several seconds to propagate through controllers and be reflected in the API. A 1-second window will likely cause frequent timeouts and reconciliation requeues under normal cluster load. Consider increasing this to 5 seconds for better robustness.

Suggested change
ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Second)
ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)

defer cancelFunc()

backoff := wait.Backoff{Duration: 100 * time.Millisecond, Steps: 10, Jitter: 0.2}
err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (done bool, err error) {
Comment on lines +106 to +107
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The backoff configuration (10 steps of 100ms) sums up to the 1-second context timeout. Due to API call latency, the later steps will likely be cut off by the context expiration. If the timeout is increased, consider also adjusting the backoff duration or steps to ensure they fit within the new timeout window.

stillFound, err = kubeclient.IsPersistentVolumeClaimExist(client, runtime.GetName(), runtime.GetNamespace(), common.GetExpectedFluidAnnotations())
if err != nil {
return err
return false, err
}

if !stillFound {
break
return true, nil
}

should, err := kubeclient.ShouldRemoveProtectionFinalizer(client, runtime.GetName(), runtime.GetNamespace())
if err != nil {
// ignore NotFound error and re-check existence if the pvc is already deleted
if utils.IgnoreNotFound(err) == nil {
continue
}
}

if should {
log.Info("Should forcibly remove pvc-protection finalizer")
err = kubeclient.RemoveProtectionFinalizer(client, runtime.GetName(), runtime.GetNamespace())
// WARN: This is a LEGACY MECHANISM and will be removed in the future.
// force deletion of pvc-protection finalizer will not be done, we'll wait until the pvc is really deleted by the PV controller.
if utils.GetBoolValueFromEnv(common.LegacyEnvForceCleanUpManagedPVC, false) {
should, err := kubeclient.ShouldRemoveProtectionFinalizer(client, runtime.GetName(), runtime.GetNamespace())
if err != nil {
// ignore NotFound error and re-check existence if the pvc is already deleted
if utils.IgnoreNotFound(err) == nil {
continue
return false, nil
}
}

if should {
log.Info("Should forcibly remove pvc-protection finalizer")
err = kubeclient.RemoveProtectionFinalizer(client, runtime.GetName(), runtime.GetNamespace())
if err != nil {
// ignore NotFound error and re-check existence if the pvc is already deleted
if utils.IgnoreNotFound(err) == nil {
return false, nil
}
log.Info("Failed to remove finalizers", "name", runtime.GetName(), "namespace", runtime.GetNamespace())
return false, err
}
log.Info("Failed to remove finalizers", "name", runtime.GetName(), "namespace", runtime.GetNamespace())
return err
}
}
Comment on lines +117 to 140
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PVC finalizer force-cleanup path is now gated behind LEGACY_MECHANISM_FORCE_CLEAN_UP_MANAGED_PVC, which changes the behavior of existing unit tests that assume the pvc-protection finalizer will be removed automatically. For example, pkg/utils/dataset/volume/delete_test.go has scenarios like "PVC is stuck terminating with pvc-protection finalizer" that will now time out unless the env var is set. Please update the tests (set/unset the env var per scenario and adjust assertions) to match the new default behavior.

Copilot uses AI. Check for mistakes.

time.Sleep(1 * time.Second)
}
return false, nil
})

if stillFound {
return fmt.Errorf("the PVC %s in ns %s is not cleaned up after 10-second retry",
runtime.GetName(),
runtime.GetNamespace())
} else {
log.Info("The PVC is deleted successfully",
"name", runtime.GetName(),
"namespace", runtime.GetNamespace())
if err != nil {
if wait.Interrupted(err) {
return errors.Wrapf(err, "timeout waiting for PVC %s to be deleted after 1-second retry", runtime.GetName())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error message should reflect the actual timeout duration used in the context.

Suggested change
return errors.Wrapf(err, "timeout waiting for PVC %s to be deleted after 1-second retry", runtime.GetName())
return errors.Wrapf(err, "timeout waiting for PVC %s to be deleted after 5-second retry", runtime.GetName())

}
Comment on lines +145 to +148
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout error message loses important context and no longer includes the PVC namespace (the earlier error messages in this file include both name and namespace). Consider including both name and namespace, and avoid hardcoding "1-second" in the message unless it exactly matches the configured timeout/backoff values.

Copilot uses AI. Check for mistakes.
return err
}
}

return err
log.Info("The PVC is deleted successfully",
"name", runtime.GetName(),
"namespace", runtime.GetNamespace())
}

return nil
}
31 changes: 2 additions & 29 deletions pkg/utils/kubeclient/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -198,6 +199,7 @@ func GetPvcMountPods(e client.Client, pvcName, namespace string) ([]corev1.Pod,
nsPods := corev1.PodList{}
err := e.List(context.TODO(), &nsPods, &client.ListOptions{
Namespace: namespace,
UnsafeDisableDeepCopy: ptr.To(true),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using UnsafeDisableDeepCopy in a public utility function like GetPvcMountPods is risky. Since the returned objects point directly to the cache's internal state, any caller that inadvertently modifies a Pod object will corrupt the cache. While the current usage in this PR is safe, this optimization should be documented clearly to warn future users of this utility.

})
if err != nil {
log.Error(err, "Failed to list pods")
Expand All @@ -215,35 +217,6 @@ func GetPvcMountPods(e client.Client, pvcName, namespace string) ([]corev1.Pod,
return pods, err
}

// GetPvcMountNodes get nodes which have pods mounted the specific pvc for a given namespace
// it will only return a map of nodeName and amount of PvcMountPods on it
// if the Pvc mount Pod has completed, it will be ignored
// if fail to get pvc mount Nodes, treat every nodes as with no PVC mount Pods
func GetPvcMountNodes(e client.Client, pvcName, namespace string) (map[string]int64, error) {
pvcMountNodes := map[string]int64{}
pvcMountPods, err := GetPvcMountPods(e, pvcName, namespace)
if err != nil {
log.Error(err, "Failed to get PVC Mount Nodes because cannot list pods")
return pvcMountNodes, err
}

for _, pod := range pvcMountPods {
if IsCompletePod(&pod) {
continue
}
nodeName := pod.Spec.NodeName
if nodeName == "" {
continue
}
if _, found := pvcMountNodes[nodeName]; !found {
pvcMountNodes[nodeName] = 1
} else {
pvcMountNodes[nodeName] = pvcMountNodes[nodeName] + 1
}
}
return pvcMountNodes, nil
}

// RemoveProtectionFinalizer removes finalizers of PersistentVolumeClaim
// if all owners that this PVC is mounted by are inactive (Succeed or Failed)
func RemoveProtectionFinalizer(client client.Client, name, namespace string) (err error) {
Expand Down
Loading
Loading