Conversation
Signed-off-by: Liam Beckman <lbeckman314@gmail.com>
Signed-off-by: Liam Beckman <lbeckman314@gmail.com>
There was a problem hiding this comment.
Pull request overview
This PR modifies Kubernetes resource cleanup to avoid deleting worker ServiceAccounts that are externally managed (e.g., shared per-user SAs provided via the _WORKER_SA task tag).
Changes:
- Add an
externalSAflag to ServiceAccount deletion and skip deletion when it’s set. - Determine
externalSAduringcleanResourcesby reading task tags from the database and pass it into SA deletion. - Replace templated ConfigMap creation with direct construction of a fixed ConfigMap containing
funnel-worker.yaml.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| compute/kubernetes/resources/serviceaccount.go | Adds externalSA parameter and skips ServiceAccount deletion when true. |
| compute/kubernetes/backend.go | Detects _WORKER_SA usage via DB lookup and passes externalSA into SA cleanup. |
| compute/kubernetes/resources/resources_test.go | Updates DeleteServiceAccount test call signature to include the new flag. |
| compute/kubernetes/resources/configmap.go | Removes template-based ConfigMap rendering/decoding and creates a static ConfigMap object. |
Comments suppressed due to low confidence (1)
compute/kubernetes/resources/resources_test.go:290
TestDeleteServiceAccountdoesn’t currently validate the deletion logic: the test ServiceAccount is created without theapp=funnel,taskId=<id>labels thatDeleteServiceAccountselects on, and there’s no post-condition check that it was removed. Add the expected labels to the test SA and assert a subsequent Get returns NotFound; also add a subtest forexternalSA=trueto ensure the SA is not deleted even when labeled.
func TestDeleteServiceAccount(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
sa := &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "funnel-worker-sa-" + testTaskID,
Namespace: namespace,
},
}
_, err := fakeClient.CoreV1().ServiceAccounts(namespace).Create(context.Background(), sa, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create test ServiceAccount: %v", err)
}
err = DeleteServiceAccount(context.Background(), testTaskID, namespace, fakeClient, l, false)
if err != nil {
t.Errorf("DeleteServiceAccount failed: %v", err)
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Check whether this task used an externally-managed ServiceAccount (e.g. | ||
| // Gen3Workflow per-user SA supplied via _WORKER_SA tag). If so, skip SA | ||
| // deletion — the SA is shared across tasks and must not be torn down here. | ||
| externalSA := false | ||
| if task, err := b.database.GetTask(ctx, &tes.GetTaskRequest{Id: taskId, View: tes.View_FULL.String()}); err == nil { | ||
| if saName, exists := task.Tags["_WORKER_SA"]; exists && saName != "" { | ||
| externalSA = true | ||
| } |
There was a problem hiding this comment.
The PR description says SA deletion is skipped when the ServiceAccount is "being used by any other task", but the implementation only checks whether this task has a non-empty _WORKER_SA tag. If the intent really is "any other task", consider querying the DB (e.g., ListTasks filtered on _WORKER_SA=<name> and non-terminal states) to confirm no other active tasks reference the same SA; otherwise, update the PR description/comment to match the narrower behavior ("skip when _WORKER_SA is set").
| @@ -21,41 +17,19 @@ func CreateConfigMap(ctx context.Context, taskId string, conf *config.Config, cl | |||
| return fmt.Errorf("marshaling config to ConfigMap: %v", err) | |||
| } | |||
|
|
|||
| indentFn := func(spaces int, s string) string { | |||
| pad := strings.Repeat(" ", spaces) | |||
| lines := strings.Split(s, "\n") | |||
| for i, line := range lines { | |||
| if line != "" { | |||
| lines[i] = pad + line | |||
| } | |||
| } | |||
| return strings.Join(lines, "\n") | |||
| } | |||
|
|
|||
| t, err := template.New(taskId).Funcs(template.FuncMap{"indent": indentFn}).Parse(conf.Kubernetes.ConfigMapTemplate) | |||
| if err != nil { | |||
| return fmt.Errorf("parsing template: %v", err) | |||
| } | |||
|
|
|||
| var buf bytes.Buffer | |||
| err = t.Execute(&buf, map[string]interface{}{ | |||
| "TaskId": taskId, | |||
| "Namespace": conf.Kubernetes.JobsNamespace, | |||
| "Data": string(configBytes), | |||
| }) | |||
| if err != nil { | |||
| return fmt.Errorf("%v", err) | |||
| } | |||
|
|
|||
| decode := scheme.Codecs.UniversalDeserializer().Decode | |||
| obj, _, err := decode(buf.Bytes(), nil, nil) | |||
| if err != nil { | |||
| return fmt.Errorf("decoding ConfigMap spec: %v", err) | |||
| } | |||
|
|
|||
| cm, ok := obj.(*corev1.ConfigMap) | |||
| if !ok { | |||
| return fmt.Errorf("failed to decode ConfigMap spec") | |||
| // Create the ConfigMap that will contain the Funnel Worker Config (`funnel-worker.yaml`) | |||
| cm := &corev1.ConfigMap{ | |||
| ObjectMeta: metav1.ObjectMeta{ | |||
| Name: fmt.Sprintf("funnel-worker-config-%s", taskId), | |||
| Namespace: conf.Kubernetes.JobsNamespace, | |||
| Labels: map[string]string{ | |||
| "app": "funnel", | |||
| "taskId": taskId, | |||
| }, | |||
| }, | |||
| Data: map[string]string{ | |||
| "funnel-worker.yaml": string(configBytes), | |||
| }, | |||
| } | |||
There was a problem hiding this comment.
CreateConfigMap no longer uses conf.Kubernetes.ConfigMapTemplate (which still exists in config.proto/pb.go and is set in tests). This is a behavior/API change: any custom ConfigMapTemplate configuration will now be ignored. Either keep honoring ConfigMapTemplate when non-empty (fall back to the hardcoded ConfigMap only when it’s empty), or remove/deprecate the field and update dependent tests/config accordingly.
| // Gen3Workflow per-user SA supplied via _WORKER_SA tag). If so, skip SA | ||
| // deletion — the SA is shared across tasks and must not be torn down here. | ||
| externalSA := false | ||
| if task, err := b.database.GetTask(ctx, &tes.GetTaskRequest{Id: taskId, View: tes.View_FULL.String()}); err == nil { |
There was a problem hiding this comment.
cleanResources fetches the task with View_FULL just to inspect tags. In BoltDB (and likely other DB implementations), FULL also loads executor stdout/stderr and system logs, which is significantly more work than needed here. Use tes.View_BASIC (or another minimal view that still includes Tags) to avoid unnecessary IO/CPU during cleanup.
| if task, err := b.database.GetTask(ctx, &tes.GetTaskRequest{Id: taskId, View: tes.View_FULL.String()}); err == nil { | |
| if task, err := b.database.GetTask(ctx, &tes.GetTaskRequest{Id: taskId, View: tes.View_BASIC.String()}); err == nil { |
| if task, err := b.database.GetTask(ctx, &tes.GetTaskRequest{Id: taskId, View: tes.View_FULL.String()}); err == nil { | ||
| if saName, exists := task.Tags["_WORKER_SA"]; exists && saName != "" { | ||
| externalSA = true | ||
| } | ||
| } |
There was a problem hiding this comment.
If b.database.GetTask fails, externalSA remains false and cleanup will proceed to delete ServiceAccounts. That undermines the intent of protecting shared/external SAs under transient DB errors or when the task record has been pruned. Consider handling the error explicitly (at least log it) and choosing a safer behavior (e.g., skip SA deletion when SA ownership can't be determined, or return an error so cleanup can be retried).
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| err = DeleteServiceAccount(context.Background(), testTaskID, namespace, fakeClient, l) | ||
| err = DeleteServiceAccount(context.Background(), testTaskID, namespace, fakeClient, l, false) |
There was a problem hiding this comment.
The new externalSA branch in DeleteServiceAccount isn't covered by tests. Add a test case that calls DeleteServiceAccount(..., externalSA=true) and asserts the ServiceAccount still exists afterward (and that no error is returned).
| externalSA := false | ||
| if task, err := b.database.GetTask(ctx, &tes.GetTaskRequest{Id: taskId, View: tes.View_FULL.String()}); err == nil { | ||
| if saName, exists := task.Tags["_WORKER_SA"]; exists && saName != "" { | ||
| externalSA = true | ||
| } | ||
| } | ||
|
|
||
| // Delete Job | ||
| b.log.Debug("deleting Job", "taskID", taskId) | ||
| err := resources.DeleteJob(ctx, b.conf, taskId, b.client, b.log) |
There was a problem hiding this comment.
Errors from GetTask are silently ignored here; if the DB read fails transiently, externalSA stays false and Funnel may delete a shared externally-managed ServiceAccount, reintroducing the authorization failure this PR is trying to prevent. Consider logging the error and choosing a safer fallback (e.g., skip SA deletion when tag lookup fails, or derive SA ownership from the Job/serviceAccountName before deleting).
| externalSA := false | |
| if task, err := b.database.GetTask(ctx, &tes.GetTaskRequest{Id: taskId, View: tes.View_FULL.String()}); err == nil { | |
| if saName, exists := task.Tags["_WORKER_SA"]; exists && saName != "" { | |
| externalSA = true | |
| } | |
| } | |
| // Delete Job | |
| b.log.Debug("deleting Job", "taskID", taskId) | |
| err := resources.DeleteJob(ctx, b.conf, taskId, b.client, b.log) | |
| // If task lookup fails, conservatively skip SA deletion rather than risk | |
| // deleting a shared externally-managed ServiceAccount. | |
| externalSA := false | |
| task, err := b.database.GetTask(ctx, &tes.GetTaskRequest{Id: taskId, View: tes.View_FULL.String()}) | |
| if err != nil { | |
| externalSA = true | |
| b.log.Error("getting task for ServiceAccount ownership check", "taskID", taskId, "error", err) | |
| } else if saName, exists := task.Tags["_WORKER_SA"]; exists && saName != "" { | |
| externalSA = true | |
| } | |
| // Delete Job | |
| b.log.Debug("deleting Job", "taskID", taskId) | |
| err = resources.DeleteJob(ctx, b.conf, taskId, b.client, b.log) |
Overview 🌀
This PR updates Funnel's Service Account behavior by skipping deletion if the
_WORKER_SAtag is passed via the task.Current Behavior ❌
When a task completes, Funnel unconditionally deletes the ServiceAccount associated with that task.
This means that when one task finishes and its SA is deleted, other concurrently running tasks for the same user lose authorization, causing:
error='kubernetes system error (JobCreationFailed): Failed to create Kubernetes job: Unauthorized'New Behavior ✔️
When a task completes, Funnel checks whether the task was submitted with a
_WORKER_SAtag (indicating an externally-managed ServiceAccount, e.g. a Gen3Workflow per-user SA).If so, Funnel skips deletion of the ServiceAccount, leaving its lifecycle to the external system that owns it.
Tasks without
_WORKER_SAcontinue to have their ServiceAccounts deleted as before.