diff --git a/pkg/common/env_names.go b/pkg/common/env_names.go index 9d40bf0141e..95b7848cd0b 100644 --- a/pkg/common/env_names.go +++ b/pkg/common/env_names.go @@ -35,3 +35,9 @@ const ( const ( EnvFuseSidecarInjectionMode = "FUSE_SIDECAR_INJECTION_MODE" ) + +// Env names that related to legacy mechanisms for backward compatibility +const ( + LegacyEnvBlockInUseDatasetDeletion = "LEGACY_MECHANISM_BLOCK_IN_USE_DATASET_DELETION" + LegacyEnvForceCleanUpManagedPVC = "LEGACY_MECHANISM_FORCE_CLEAN_UP_MANAGED_PVC" +) diff --git a/pkg/controllers/v1alpha1/dataset/dataset_controller.go b/pkg/controllers/v1alpha1/dataset/dataset_controller.go index 1f1d8c5f9fd..787c8a0004f 100644 --- a/pkg/controllers/v1alpha1/dataset/dataset_controller.go +++ b/pkg/controllers/v1alpha1/dataset/dataset_controller.go @@ -195,12 +195,15 @@ func (r *DatasetReconciler) reconcileDatasetDeletion(ctx reconcileRequestContext return utils.RequeueAfterInterval(time.Duration(1 * time.Second)) } */ - // 1.if there is a pod which is using the dataset (or cannot judge), then requeue - err := kubeclient.ShouldDeleteDataset(r.Client, ctx.Name, ctx.Namespace) - if err != nil { - ctx.Log.Error(err, "Failed to delete dataset", "DatasetDeleteError", ctx) - r.Recorder.Eventf(&ctx.Dataset, v1.EventTypeWarning, common.ErrorDeleteDataset, "Failed to delete dataset because err: %s", err.Error()) - return utils.RequeueAfterInterval(time.Duration(10 * time.Second)) + // 1.WARN: This is a LEGACY MECHANISM and the check will be skipped in common cases. It will be removed in future. + // If there is a pod which is using the dataset (or cannot judge), then requeue + if utils.GetBoolValueFromEnv(common.LegacyEnvBlockInUseDatasetDeletion, false) { + err := kubeclient.ShouldDeleteDataset(r.Client, ctx.Name, ctx.Namespace) + if err != nil { + ctx.Log.Error(err, "Failed to delete dataset", "DatasetDeleteError", ctx) + r.Recorder.Eventf(&ctx.Dataset, v1.EventTypeWarning, common.ErrorDeleteDataset, "Failed to delete dataset because err: %s", err.Error()) + return utils.RequeueAfterInterval(time.Duration(10 * time.Second)) + } } // 2. if there are datasets mounted this dataset, check reference dataset existence and requeue if there still has reference dataset diff --git a/pkg/ctrl/watch/manager.go b/pkg/ctrl/watch/manager.go index 274fe24e2ab..4bf49642f8d 100644 --- a/pkg/ctrl/watch/manager.go +++ b/pkg/ctrl/watch/manager.go @@ -18,7 +18,9 @@ package watch import ( "context" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "github.com/fluid-cloudnative/fluid/pkg/common" webhookReconcile "github.com/fluid-cloudnative/fluid/pkg/controllers/v1alpha1/webhook" @@ -28,12 +30,15 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" ) var ( @@ -158,6 +163,51 @@ func SetupWatcherForReconciler(mgr ctrl.Manager, options controller.Options, r C return err } + // PVC shares the same namespace and name with the runtime, enqueueing request with pvc name and namespace will trigger runtime reconciliation + err = c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolumeClaim{}), &handler.EnqueueRequestForObject{}, predicate.Funcs{ + CreateFunc: func(ce event.CreateEvent) bool { return false }, + UpdateFunc: func(ue event.UpdateEvent) bool { return false }, + DeleteFunc: func(de event.DeleteEvent) bool { + pvc, ok := de.Object.(*corev1.PersistentVolumeClaim) + if !ok { + return false + } + if len(pvc.Annotations) > 0 && pvc.Annotations["CreatedBy"] == "fluid" { + return true + } + return false + }, + }) + + if err != nil { + return err + } + + pvToRuntimeReqFn := func(ctx context.Context, o client.Object) []reconcile.Request { + pv, ok := o.(*corev1.PersistentVolume) + if !ok { + return []reconcile.Request{} + } + + if len(pv.Annotations) > 0 && pv.Annotations["CreatedBy"] == "fluid" { + if pv.Spec.ClaimRef != nil && len(pv.Spec.ClaimRef.Name) > 0 && len(pv.Spec.ClaimRef.Namespace) > 0 { + return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: pv.Spec.ClaimRef.Name, Namespace: pv.Spec.ClaimRef.Namespace}}} + } + } + + return []reconcile.Request{} + } + + err = c.Watch(source.Kind(mgr.GetCache(), &corev1.PersistentVolume{}), handler.EnqueueRequestsFromMapFunc(pvToRuntimeReqFn), predicate.Funcs{ + CreateFunc: func(ce event.CreateEvent) bool { return false }, + UpdateFunc: func(ue event.UpdateEvent) bool { return false }, + DeleteFunc: func(de event.DeleteEvent) bool { return true }, + }) + + if err != nil { + return err + } + return } diff --git a/pkg/utils/dataset/volume/delete.go b/pkg/utils/dataset/volume/delete.go index 1002de3065f..47566f92286 100644 --- a/pkg/utils/dataset/volume/delete.go +++ b/pkg/utils/dataset/volume/delete.go @@ -17,10 +17,13 @@ limitations under the License. package volume import ( + "context" "fmt" "time" "github.com/go-logr/logr" + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/util/wait" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/ddc/base" @@ -97,52 +100,59 @@ func DeleteFusePersistentVolumeClaim(client client.Client, } stillFound := false - retries := 10 - for i := 0; i < retries; i++ { + ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Second) + defer cancelFunc() + + backoff := wait.Backoff{Duration: 100 * time.Millisecond, Steps: 10, Jitter: 0.2} + err := wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (done bool, err error) { stillFound, err = kubeclient.IsPersistentVolumeClaimExist(client, runtime.GetName(), runtime.GetNamespace(), common.GetExpectedFluidAnnotations()) if err != nil { - return err + return false, err } if !stillFound { - break + return true, nil } - should, err := kubeclient.ShouldRemoveProtectionFinalizer(client, runtime.GetName(), runtime.GetNamespace()) - if err != nil { - // ignore NotFound error and re-check existence if the pvc is already deleted - if utils.IgnoreNotFound(err) == nil { - continue - } - } - - if should { - log.Info("Should forcibly remove pvc-protection finalizer") - err = kubeclient.RemoveProtectionFinalizer(client, runtime.GetName(), runtime.GetNamespace()) + // WARN: This is a LEGACY MECHANISM and will be removed in the future. + // force deletion of pvc-protection finalizer will not be done, we'll wait until the pvc is really deleted by the PV controller. + if utils.GetBoolValueFromEnv(common.LegacyEnvForceCleanUpManagedPVC, false) { + should, err := kubeclient.ShouldRemoveProtectionFinalizer(client, runtime.GetName(), runtime.GetNamespace()) if err != nil { // ignore NotFound error and re-check existence if the pvc is already deleted if utils.IgnoreNotFound(err) == nil { - continue + return false, nil + } + } + + if should { + log.Info("Should forcibly remove pvc-protection finalizer") + err = kubeclient.RemoveProtectionFinalizer(client, runtime.GetName(), runtime.GetNamespace()) + if err != nil { + // ignore NotFound error and re-check existence if the pvc is already deleted + if utils.IgnoreNotFound(err) == nil { + return false, nil + } + log.Info("Failed to remove finalizers", "name", runtime.GetName(), "namespace", runtime.GetNamespace()) + return false, err } - log.Info("Failed to remove finalizers", "name", runtime.GetName(), "namespace", runtime.GetNamespace()) - return err } } - time.Sleep(1 * time.Second) - } + return false, nil + }) - if stillFound { - return fmt.Errorf("the PVC %s in ns %s is not cleaned up after 10-second retry", - runtime.GetName(), - runtime.GetNamespace()) - } else { - log.Info("The PVC is deleted successfully", - "name", runtime.GetName(), - "namespace", runtime.GetNamespace()) + if err != nil { + if wait.Interrupted(err) { + return errors.Wrapf(err, "timeout waiting for PVC %s to be deleted after 1-second retry", runtime.GetName()) + } + return err } - } - return err + log.Info("The PVC is deleted successfully", + "name", runtime.GetName(), + "namespace", runtime.GetNamespace()) + } + return nil } diff --git a/pkg/utils/kubeclient/volume.go b/pkg/utils/kubeclient/volume.go index 05ae52fd2dc..57c523c54a1 100644 --- a/pkg/utils/kubeclient/volume.go +++ b/pkg/utils/kubeclient/volume.go @@ -27,6 +27,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -198,6 +199,7 @@ func GetPvcMountPods(e client.Client, pvcName, namespace string) ([]corev1.Pod, nsPods := corev1.PodList{} err := e.List(context.TODO(), &nsPods, &client.ListOptions{ Namespace: namespace, + UnsafeDisableDeepCopy: ptr.To(true), }) if err != nil { log.Error(err, "Failed to list pods") @@ -215,35 +217,6 @@ func GetPvcMountPods(e client.Client, pvcName, namespace string) ([]corev1.Pod, return pods, err } -// GetPvcMountNodes get nodes which have pods mounted the specific pvc for a given namespace -// it will only return a map of nodeName and amount of PvcMountPods on it -// if the Pvc mount Pod has completed, it will be ignored -// if fail to get pvc mount Nodes, treat every nodes as with no PVC mount Pods -func GetPvcMountNodes(e client.Client, pvcName, namespace string) (map[string]int64, error) { - pvcMountNodes := map[string]int64{} - pvcMountPods, err := GetPvcMountPods(e, pvcName, namespace) - if err != nil { - log.Error(err, "Failed to get PVC Mount Nodes because cannot list pods") - return pvcMountNodes, err - } - - for _, pod := range pvcMountPods { - if IsCompletePod(&pod) { - continue - } - nodeName := pod.Spec.NodeName - if nodeName == "" { - continue - } - if _, found := pvcMountNodes[nodeName]; !found { - pvcMountNodes[nodeName] = 1 - } else { - pvcMountNodes[nodeName] = pvcMountNodes[nodeName] + 1 - } - } - return pvcMountNodes, nil -} - // RemoveProtectionFinalizer removes finalizers of PersistentVolumeClaim // if all owners that this PVC is mounted by are inactive (Succeed or Failed) func RemoveProtectionFinalizer(client client.Client, name, namespace string) (err error) { diff --git a/pkg/utils/kubeclient/volume_claim_test.go b/pkg/utils/kubeclient/volume_claim_test.go index 828f21a2c26..92add301900 100644 --- a/pkg/utils/kubeclient/volume_claim_test.go +++ b/pkg/utils/kubeclient/volume_claim_test.go @@ -120,156 +120,6 @@ var _ = Describe("DeletePersistentVolumeClaim", func() { }) }) -var _ = Describe("GetPvcMountNodes", func() { - var ( - namespace string - volumeName1 string - volumeName2 string - testPodInputs []*v1.Pod - client client.Client - ) - - BeforeEach(func() { - namespace = "test" - volumeName1 = "found" - volumeName2 = "found1" - testPodInputs = []*v1.Pod{{ - ObjectMeta: metav1.ObjectMeta{Name: "found"}, - Spec: v1.PodSpec{}, - }, { - ObjectMeta: metav1.ObjectMeta{Name: "bbb", Namespace: namespace}, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: volumeName1, - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: volumeName1, - ReadOnly: true, - }}, - }, - }, - NodeName: "node1", - }, - Status: v1.PodStatus{ - Phase: v1.PodSucceeded, - }, - }, { - ObjectMeta: metav1.ObjectMeta{Name: "ccc", Namespace: namespace}, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: volumeName1, - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: volumeName1, - ReadOnly: true, - }}, - }, - }, - NodeName: "node2", - }, - Status: v1.PodStatus{ - Phase: v1.PodRunning, - }, - }, { - ObjectMeta: metav1.ObjectMeta{Name: "ddd", Namespace: namespace}, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: volumeName1, - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: volumeName1, - ReadOnly: true, - }}, - }, - }, - NodeName: "node3", - }, - Status: v1.PodStatus{ - Phase: v1.PodRunning, - }, - }, { - ObjectMeta: metav1.ObjectMeta{Name: "eee", Namespace: namespace}, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: volumeName2, - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: volumeName2, - ReadOnly: true, - }}, - }, - }, - NodeName: "node4", - }, - Status: v1.PodStatus{ - Phase: v1.PodRunning, - }, - }, { - ObjectMeta: metav1.ObjectMeta{Name: "fff", Namespace: namespace}, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: volumeName2, - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: volumeName2, - ReadOnly: true, - }}, - }, - }, - NodeName: "", - }, - Status: v1.PodStatus{ - Phase: v1.PodRunning, - }, - }, { - ObjectMeta: metav1.ObjectMeta{Name: "hhh", Namespace: namespace}, - Spec: v1.PodSpec{ - Volumes: []v1.Volume{ - { - Name: volumeName2, - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: volumeName1, - ReadOnly: true, - }}, - }, - }, - NodeName: "node3", - }, - Status: v1.PodStatus{ - Phase: v1.PodRunning, - }, - }} - - testPods := []runtime.Object{} - for _, pod := range testPodInputs { - testPods = append(testPods, pod.DeepCopy()) - } - - client = fake.NewFakeClientWithScheme(testScheme, testPods...) - }) - - It("should return empty list when node list is empty", func() { - pvcMountNodes, _ := GetPvcMountNodes(client, "not found", namespace) - Expect(len(pvcMountNodes)).To(Equal(0)) - }) - - It("should return 1 node when node list is 1", func() { - pvcMountNodes, _ := GetPvcMountNodes(client, volumeName2, namespace) - Expect(len(pvcMountNodes)).To(Equal(1)) - }) - - It("should return 2 nodes when node list is 2", func() { - pvcMountNodes, _ := GetPvcMountNodes(client, volumeName1, namespace) - Expect(len(pvcMountNodes)).To(Equal(2)) - }) -}) - var _ = Describe("RemoveProtectionFinalizer", func() { var ( namespace string diff --git a/pkg/utils/kubeclient/volume_test.go b/pkg/utils/kubeclient/volume_test.go index 01a2075d32f..ad07fb8aafd 100644 --- a/pkg/utils/kubeclient/volume_test.go +++ b/pkg/utils/kubeclient/volume_test.go @@ -500,128 +500,6 @@ var _ = Describe("Volume related unit tests", Label("pkg.utils.kubeclient.volume }) }) - Describe("Test GetPvcMountNodes()", func() { - var ( - namespace string - pvcName string - pod1 *v1.Pod - pod2 *v1.Pod - pod3 *v1.Pod - ) - - BeforeEach(func() { - namespace = "test-ns" - pvcName = "test-pvc" - - pod1 = &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "running-pod", - Namespace: namespace, - }, - Spec: v1.PodSpec{ - NodeName: "node-1", - Volumes: []v1.Volume{ - { - Name: "pvc-volume", - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvcName, - }, - }, - }, - }, - }, - Status: v1.PodStatus{ - Phase: v1.PodRunning, - }, - } - - pod2 = &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "completed-pod", - Namespace: namespace, - }, - Spec: v1.PodSpec{ - NodeName: "node-2", - Volumes: []v1.Volume{ - { - Name: "pvc-volume", - VolumeSource: v1.VolumeSource{ - PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ - ClaimName: pvcName, - }, - }, - }, - }, - }, - Status: v1.PodStatus{ - Phase: v1.PodSucceeded, - }, - } - - pod3 = &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-without-pvc", - Namespace: namespace, - }, - Spec: v1.PodSpec{ - NodeName: "node-3", - Volumes: []v1.Volume{ - { - Name: "hostpath-volume", - VolumeSource: v1.VolumeSource{ - HostPath: &v1.HostPathVolumeSource{ - Path: "/tmp/test", - }, - }, - }, - }, - }, - Status: v1.PodStatus{ - Phase: v1.PodRunning, - }, - } - }) - - Context("when pods are mounting the persistent volume claim on nodes", func() { - BeforeEach(func() { - resources = []runtime.Object{pod1, pod2, pod3} - }) - - It("should return the map of nodes and pod counts", func() { - nodes, err := GetPvcMountNodes(client, pvcName, namespace) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodes)).To(Equal(1)) // Only running pod should be counted - Expect(nodes["node-1"]).To(Equal(int64(1))) - }) - }) - - Context("when no pods are mounting the persistent volume claim", func() { - BeforeEach(func() { - resources = []runtime.Object{pod3} // Only pod without PVC - }) - - It("should return empty map", func() { - nodes, err := GetPvcMountNodes(client, pvcName, namespace) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodes)).To(Equal(0)) - }) - }) - - Context("when only completed pods are mounting the persistent volume claim", func() { - BeforeEach(func() { - pod1.Status.Phase = v1.PodSucceeded // Make running pod completed - resources = []runtime.Object{pod1, pod3} - }) - - It("should return empty map", func() { - nodes, err := GetPvcMountNodes(client, pvcName, namespace) - Expect(err).NotTo(HaveOccurred()) - Expect(len(nodes)).To(Equal(0)) - }) - }) - }) - Describe("Test RemoveProtectionFinalizer()", func() { var ( namespace string