-
Notifications
You must be signed in to change notification settings - Fork 1.2k
optim(runtime): optimize deletion logic for datasets and runtimes #5790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
4f2a9cb
3d9cc19
313a104
2825a78
fd68981
30c673d
8fe011c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,9 @@ | |
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| appsv1 "k8s.io/api/apps/v1" | ||
| corev1 "k8s.io/api/core/v1" | ||
|
|
||
| "github.com/fluid-cloudnative/fluid/pkg/common" | ||
| webhookReconcile "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/webhook" | ||
|
|
@@ -28,12 +30,15 @@ | |
| ctrl "sigs.k8s.io/controller-runtime" | ||
| "sigs.k8s.io/controller-runtime/pkg/client" | ||
| "sigs.k8s.io/controller-runtime/pkg/controller" | ||
| "sigs.k8s.io/controller-runtime/pkg/event" | ||
| "sigs.k8s.io/controller-runtime/pkg/handler" | ||
| "sigs.k8s.io/controller-runtime/pkg/predicate" | ||
| "sigs.k8s.io/controller-runtime/pkg/reconcile" | ||
| "sigs.k8s.io/controller-runtime/pkg/source" | ||
|
|
||
| datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| ) | ||
|
|
||
| var ( | ||
|
|
@@ -116,7 +121,7 @@ | |
| return | ||
| } | ||
|
|
||
| func SetupWatcherForReconciler(mgr ctrl.Manager, options controller.Options, r Controller) (err error) { | ||
|
Check failure on line 124 in pkg/ctrl/watch/manager.go
|
||
| options.Reconciler = r | ||
| c, err := controller.New(r.ControllerName(), mgr, options) | ||
| if err != nil { | ||
|
|
@@ -158,6 +163,51 @@ | |
| return err | ||
| } | ||
|
|
||
| // PVC shares the same namespace and name with the runtime, enqueueing request with pvc name and namespace will trigger runtime reconciliation | ||
| err = c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}), &handler.EnqueueRequestForObject{}, predicate.Funcs{ | ||
| CreateFunc: func(ce event.CreateEvent) bool { return false }, | ||
| UpdateFunc: func(ue event.UpdateEvent) bool { return false }, | ||
| DeleteFunc: func(de event.DeleteEvent) bool { | ||
| pvc, ok := de.Object.(*corev1.PersistentVolumeClaim) | ||
| if !ok { | ||
| return false | ||
| } | ||
| if len(pvc.Annotations) > 0 && pvc.Annotations["CreatedBy"] == "fluid" { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| return true | ||
| } | ||
| return false | ||
|
Comment on lines
+170
to
+178
|
||
| }, | ||
| }) | ||
|
|
||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| pvToRuntimeReqFn := func(ctx context.Context, o client.Object) []reconcile.Request { | ||
| pv, ok := o.(*corev1.PersistentVolume) | ||
| if !ok { | ||
| return []reconcile.Request{} | ||
| } | ||
|
|
||
| if len(pv.Annotations) > 0 && pv.Annotations["CreatedBy"] == "fluid" { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| if pv.Spec.ClaimRef != nil && len(pv.Spec.ClaimRef.Name) > 0 && len(pv.Spec.ClaimRef.Namespace) > 0 { | ||
| return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: pv.Spec.ClaimRef.Name, Namespace: pv.Spec.ClaimRef.Namespace}}} | ||
| } | ||
| } | ||
|
|
||
| return []reconcile.Request{} | ||
| } | ||
|
|
||
| err = c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolume{}), handler.EnqueueRequestsFromMapFunc(pvToRuntimeReqFn), predicate.Funcs{ | ||
| CreateFunc: func(ce event.CreateEvent) bool { return false }, | ||
| UpdateFunc: func(ue event.UpdateEvent) bool { return false }, | ||
| DeleteFunc: func(de event.DeleteEvent) bool { return true }, | ||
| }) | ||
|
Comment on lines
+201
to
+205
|
||
|
|
||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -17,10 +17,13 @@ limitations under the License. | |||||
| package volume | ||||||
|
|
||||||
| import ( | ||||||
| "context" | ||||||
| "fmt" | ||||||
| "time" | ||||||
|
|
||||||
| "github.com/go-logr/logr" | ||||||
| "github.com/pkg/errors" | ||||||
| "k8s.io/apimachinery/pkg/util/wait" | ||||||
|
|
||||||
| "github.com/fluid-cloudnative/fluid/pkg/common" | ||||||
| "github.com/fluid-cloudnative/fluid/pkg/ddc/base" | ||||||
|
|
@@ -97,52 +100,59 @@ func DeleteFusePersistentVolumeClaim(client client.Client, | |||||
| } | ||||||
|
|
||||||
| stillFound := false | ||||||
| retries := 10 | ||||||
| for i := 0; i < retries; i++ { | ||||||
| ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Second) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The 1-second timeout for confirming PVC deletion is extremely short. Kubernetes resource deletion often takes several seconds to propagate through controllers and be reflected in the API. A 1-second window will likely cause frequent timeouts and reconciliation requeues under normal cluster load. Consider increasing this to 5 seconds for better robustness.
Suggested change
|
||||||
| defer cancelFunc() | ||||||
|
|
||||||
| backoff := wait.Backoff{Duration: 100 * time.Millisecond, Steps: 10, Jitter: 0.2} | ||||||
| err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (done bool, err error) { | ||||||
|
Comment on lines
+106
to
+107
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The backoff configuration (10 steps of 100ms) sums up to the 1-second context timeout. Due to API call latency, the later steps will likely be cut off by the context expiration. If the timeout is increased, consider also adjusting the backoff duration or steps to ensure they fit within the new timeout window. |
||||||
| stillFound, err = kubeclient.IsPersistentVolumeClaimExist(client, runtime.GetName(), runtime.GetNamespace(), common.GetExpectedFluidAnnotations()) | ||||||
| if err != nil { | ||||||
| return err | ||||||
| return false, err | ||||||
| } | ||||||
|
|
||||||
| if !stillFound { | ||||||
| break | ||||||
| return true, nil | ||||||
| } | ||||||
|
|
||||||
| should, err := kubeclient.ShouldRemoveProtectionFinalizer(client, runtime.GetName(), runtime.GetNamespace()) | ||||||
| if err != nil { | ||||||
| // ignore NotFound error and re-check existence if the pvc is already deleted | ||||||
| if utils.IgnoreNotFound(err) == nil { | ||||||
| continue | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| if should { | ||||||
| log.Info("Should forcibly remove pvc-protection finalizer") | ||||||
| err = kubeclient.RemoveProtectionFinalizer(client, runtime.GetName(), runtime.GetNamespace()) | ||||||
| // WARN: This is a LEGACY MECHANISM and will be removed in the future. | ||||||
| // force deletion of pvc-protection finalizer will not be done, we'll wait until the pvc is really deleted by the PV controller. | ||||||
| if utils.GetBoolValueFromEnv(common.LegacyEnvForceCleanUpManagedPVC, false) { | ||||||
| should, err := kubeclient.ShouldRemoveProtectionFinalizer(client, runtime.GetName(), runtime.GetNamespace()) | ||||||
| if err != nil { | ||||||
| // ignore NotFound error and re-check existence if the pvc is already deleted | ||||||
| if utils.IgnoreNotFound(err) == nil { | ||||||
| continue | ||||||
| return false, nil | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| if should { | ||||||
| log.Info("Should forcibly remove pvc-protection finalizer") | ||||||
| err = kubeclient.RemoveProtectionFinalizer(client, runtime.GetName(), runtime.GetNamespace()) | ||||||
| if err != nil { | ||||||
| // ignore NotFound error and re-check existence if the pvc is already deleted | ||||||
| if utils.IgnoreNotFound(err) == nil { | ||||||
| return false, nil | ||||||
| } | ||||||
| log.Info("Failed to remove finalizers", "name", runtime.GetName(), "namespace", runtime.GetNamespace()) | ||||||
| return false, err | ||||||
| } | ||||||
| log.Info("Failed to remove finalizers", "name", runtime.GetName(), "namespace", runtime.GetNamespace()) | ||||||
| return err | ||||||
| } | ||||||
| } | ||||||
|
Comment on lines
+117
to
140
|
||||||
|
|
||||||
| time.Sleep(1 * time.Second) | ||||||
| } | ||||||
| return false, nil | ||||||
| }) | ||||||
|
|
||||||
| if stillFound { | ||||||
| return fmt.Errorf("the PVC %s in ns %s is not cleaned up after 10-second retry", | ||||||
| runtime.GetName(), | ||||||
| runtime.GetNamespace()) | ||||||
| } else { | ||||||
| log.Info("The PVC is deleted successfully", | ||||||
| "name", runtime.GetName(), | ||||||
| "namespace", runtime.GetNamespace()) | ||||||
| if err != nil { | ||||||
| if wait.Interrupted(err) { | ||||||
| return errors.Wrapf(err, "timeout waiting for PVC %s to be deleted after 1-second retry", runtime.GetName()) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error message should reflect the actual timeout duration used in the context.
Suggested change
|
||||||
| } | ||||||
|
Comment on lines
+145
to
+148
|
||||||
| return err | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| return err | ||||||
| log.Info("The PVC is deleted successfully", | ||||||
| "name", runtime.GetName(), | ||||||
| "namespace", runtime.GetNamespace()) | ||||||
| } | ||||||
|
|
||||||
| return nil | ||||||
| } | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ import ( | |
| corev1 "k8s.io/api/core/v1" | ||
| apierrs "k8s.io/apimachinery/pkg/api/errors" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| "k8s.io/utils/ptr" | ||
| "sigs.k8s.io/controller-runtime/pkg/client" | ||
| ) | ||
|
|
||
|
|
@@ -198,6 +199,7 @@ func GetPvcMountPods(e client.Client, pvcName, namespace string) ([]corev1.Pod, | |
| nsPods := corev1.PodList{} | ||
| err := e.List(context.TODO(), &nsPods, &client.ListOptions{ | ||
| Namespace: namespace, | ||
| UnsafeDisableDeepCopy: ptr.To(true), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using UnsafeDisableDeepCopy in a public utility function like GetPvcMountPods is risky. Since the returned objects point directly to the cache's internal state, any caller that inadvertently modifies a Pod object will corrupt the cache. While the current usage in this PR is safe, this optimization should be documented clearly to warn future users of this utility. |
||
| }) | ||
| if err != nil { | ||
| log.Error(err, "Failed to list pods") | ||
|
|
@@ -215,35 +217,6 @@ func GetPvcMountPods(e client.Client, pvcName, namespace string) ([]corev1.Pod, | |
| return pods, err | ||
| } | ||
|
|
||
| // GetPvcMountNodes get nodes which have pods mounted the specific pvc for a given namespace | ||
| // it will only return a map of nodeName and amount of PvcMountPods on it | ||
| // if the Pvc mount Pod has completed, it will be ignored | ||
| // if fail to get pvc mount Nodes, treat every nodes as with no PVC mount Pods | ||
| func GetPvcMountNodes(e client.Client, pvcName, namespace string) (map[string]int64, error) { | ||
| pvcMountNodes := map[string]int64{} | ||
| pvcMountPods, err := GetPvcMountPods(e, pvcName, namespace) | ||
| if err != nil { | ||
| log.Error(err, "Failed to get PVC Mount Nodes because cannot list pods") | ||
| return pvcMountNodes, err | ||
| } | ||
|
|
||
| for _, pod := range pvcMountPods { | ||
| if IsCompletePod(&pod) { | ||
| continue | ||
| } | ||
| nodeName := pod.Spec.NodeName | ||
| if nodeName == "" { | ||
| continue | ||
| } | ||
| if _, found := pvcMountNodes[nodeName]; !found { | ||
| pvcMountNodes[nodeName] = 1 | ||
| } else { | ||
| pvcMountNodes[nodeName] = pvcMountNodes[nodeName] + 1 | ||
| } | ||
| } | ||
| return pvcMountNodes, nil | ||
| } | ||
|
|
||
| // RemoveProtectionFinalizer removes finalizers of PersistentVolumeClaim | ||
| // if all owners that this PVC is mounted by are inactive (Succeed or Failed) | ||
| func RemoveProtectionFinalizer(client client.Client, name, namespace string) (err error) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grammar: "Env names that related to legacy mechanisms" should be "Env names that are related to legacy mechanisms".