-
Notifications
You must be signed in to change notification settings - Fork 196
Add Impersonate Service Account argument #2015
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a9071c7
469df21
7c0ff44
233e7d1
38d7130
69a9401
21fb05f
3a61218
eb2b141
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| import { BigQuery, GetTablesResponse, TableField, TableMetadata } from "@google-cloud/bigquery"; | ||
| import { GoogleAuth, Impersonated } from "google-auth-library"; | ||
| import Long from "long"; | ||
| import { PromisePoolExecutor } from "promise-pool-executor"; | ||
|
|
||
| import { BigQuery, GetTablesResponse, TableField, TableMetadata } from "@google-cloud/bigquery"; | ||
| import { collectEvaluationQueries, QueryOrAction } from "df/cli/api/dbadapters/execution_sql"; | ||
| import { IBigQueryError, IDbAdapter, IDbClient, IExecutionResult, OnCancel } from "df/cli/api/dbadapters/index"; | ||
| import { parseBigqueryEvalError } from "df/cli/api/utils/error_parsing"; | ||
|
|
@@ -10,7 +10,9 @@ import { coerceAsError } from "df/common/errors/errors"; | |
| import { retry } from "df/common/promises"; | ||
| import { dataform } from "df/protos/ts"; | ||
|
|
||
| const GOOGLE_CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; | ||
| const EXTRA_GOOGLE_SCOPES = ["https://www.googleapis.com/auth/drive"]; | ||
| const IMPERSONATION_GOOGLE_SCOPES = [GOOGLE_CLOUD_PLATFORM_SCOPE, ...EXTRA_GOOGLE_SCOPES]; | ||
|
|
||
| const BIGQUERY_DATE_RELATED_FIELDS = [ | ||
| "BigQueryDate", | ||
|
|
@@ -104,8 +106,8 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| try { | ||
| await this.pool | ||
| .addSingleTask({ | ||
| generator: () => | ||
| this.getClient().query({ | ||
| generator: async () => | ||
| (await this.getClient()).query({ | ||
| useLegacySql: false, | ||
| query, | ||
| dryRun: true | ||
|
|
@@ -130,7 +132,8 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| } | ||
|
|
||
| public async tables(): Promise<dataform.ITarget[]> { | ||
| const datasets = await this.getClient().getDatasets({ autoPaginate: true, maxResults: 1000 }); | ||
| const client = await this.getClient(); | ||
| const datasets = await client.getDatasets({ autoPaginate: true, maxResults: 1000 }); | ||
| const tables = await Promise.all( | ||
| datasets[0].map(dataset => dataset.getTables({ autoPaginate: true, maxResults: 1000 })) | ||
| ); | ||
|
|
@@ -219,7 +222,7 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| } | ||
|
|
||
| public async schemas(database: string): Promise<string[]> { | ||
| const data = await this.getClient(database).getDatasets(); | ||
| const data = await (await this.getClient(database)).getDatasets(); | ||
| return data[0].map(dataset => dataset.id); | ||
| } | ||
|
|
||
|
|
@@ -239,7 +242,7 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| metadata.schema.fields | ||
| ); | ||
|
|
||
| await this.getClient(target.database) | ||
| await (await this.getClient(target.database)) | ||
| .dataset(target.schema) | ||
| .table(target.name) | ||
| .setMetadata({ | ||
|
|
@@ -251,7 +254,7 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
|
|
||
| private async getMetadata(target: dataform.ITarget): Promise<TableMetadata> { | ||
| try { | ||
| const table = await this.getClient(target.database) | ||
| const table = await (await this.getClient(target.database)) | ||
| .dataset(target.schema) | ||
| .table(target.name) | ||
| .getMetadata(); | ||
|
|
@@ -266,19 +269,38 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| } | ||
| } | ||
|
|
||
| private getClient(projectId?: string) { | ||
| private async getClient(projectId?: string) { | ||
|
kolina marked this conversation as resolved.
|
||
| projectId = projectId || this.bigQueryCredentials.projectId; | ||
| if (!this.clients.has(projectId)) { | ||
| this.clients.set( | ||
| const clientConfig: any = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| projectId, | ||
| new BigQuery({ | ||
| scopes: EXTRA_GOOGLE_SCOPES, | ||
|
Comment on lines
276
to
+277
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are |
||
| location: this.bigQueryCredentials.location | ||
| }; | ||
|
|
||
| if (this.bigQueryCredentials.impersonateServiceAccount) { | ||
| // For impersonation, create an Impersonated credential directly | ||
| const sourceAuth = new GoogleAuth({ | ||
| projectId, | ||
| scopes: EXTRA_GOOGLE_SCOPES, | ||
| location: this.bigQueryCredentials.location, | ||
| scopes: IMPERSONATION_GOOGLE_SCOPES, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add an explaining comment why we're passing different set of scopes with impersonation and without |
||
| credentials: | ||
| this.bigQueryCredentials.credentials && JSON.parse(this.bigQueryCredentials.credentials) | ||
| }) | ||
| ); | ||
| this.bigQueryCredentials.credentials && | ||
| JSON.parse(this.bigQueryCredentials.credentials) | ||
| }); | ||
|
|
||
| const authClient = await sourceAuth.getClient(); | ||
|
|
||
| clientConfig.authClient = new Impersonated({ | ||
| sourceClient: authClient, | ||
| targetPrincipal: this.bigQueryCredentials.impersonateServiceAccount, | ||
| targetScopes: IMPERSONATION_GOOGLE_SCOPES | ||
| }); | ||
| } else { | ||
| clientConfig.credentials = | ||
| this.bigQueryCredentials.credentials && JSON.parse(this.bigQueryCredentials.credentials); | ||
| } | ||
|
|
||
| this.clients.set(projectId, new BigQuery(clientConfig)); | ||
| } | ||
| return this.clients.get(projectId); | ||
| } | ||
|
|
@@ -290,12 +312,12 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| byteLimit?: number, | ||
| location?: string | ||
| ) { | ||
| const results = await new Promise<any[]>((resolve, reject) => { | ||
| const results = await new Promise<any[]>(async (resolve, reject) => { | ||
| const allRows = new LimitedResultSet({ | ||
| rowLimit, | ||
| byteLimit | ||
| }); | ||
| const stream = this.getClient().createQueryStream({ | ||
| const stream = (await this.getClient()).createQueryStream({ | ||
| query, | ||
| params, | ||
| location | ||
|
|
@@ -332,7 +354,8 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| return retry( | ||
| async () => { | ||
| try { | ||
| const job = await this.getClient().createQueryJob({ | ||
| const client = await this.getClient(); | ||
| const job = await client.createQueryJob({ | ||
| useLegacySql: false, | ||
| jobPrefix: "dataform-" + (jobPrefix ? `${jobPrefix}-` : ""), | ||
| query, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,6 @@ | ||
| import * as chokidar from "chokidar"; | ||
| import * as fs from "fs"; | ||
| import * as glob from "glob"; | ||
| import parseDuration from "parse-duration"; | ||
| import * as path from "path"; | ||
| import yargs from "yargs"; | ||
|
|
||
|
|
@@ -28,6 +27,7 @@ import { | |
| actuallyResolve, | ||
| assertPathExists, | ||
| compiledGraphHasErrors, | ||
| parseCliDuration, | ||
| promptForIcebergConfig, | ||
| } from "df/cli/util"; | ||
| import { createYargsCli, INamedOption } from "df/cli/yargswrapper"; | ||
|
|
@@ -174,7 +174,7 @@ const timeoutOption: INamedOption<yargs.Options> = { | |
| type: "string", | ||
| default: null, | ||
| coerce: (rawTimeoutString: string | null) => | ||
| rawTimeoutString ? parseDuration(rawTimeoutString) : null | ||
| rawTimeoutString ? parseCliDuration(rawTimeoutString) : null | ||
| } | ||
| }; | ||
|
|
||
|
|
@@ -207,6 +207,13 @@ const bigqueryJobLabelsOption: INamedOption<yargs.Options> = { | |
| } | ||
| }; | ||
|
|
||
| const impersonateServiceAccountOption: INamedOption<yargs.Options> = { | ||
| name: "impersonate-service-account", | ||
| option: { | ||
| describe: "Service account email to impersonate during authentication.", | ||
| type: "string" | ||
| } | ||
| }; | ||
| const quietCompileOption: INamedOption<yargs.Options> = { | ||
| name: "quiet", | ||
| option: { | ||
|
|
@@ -503,7 +510,7 @@ export function runCli() { | |
| format: `test [${projectDirMustExistOption.name}]`, | ||
| description: "Run the dataform project's unit tests.", | ||
| positionalOptions: [projectDirMustExistOption], | ||
| options: [credentialsOption, timeoutOption, ...ProjectConfigOptions.allYargsOptions], | ||
| options: [credentialsOption, impersonateServiceAccountOption, timeoutOption, ...ProjectConfigOptions.allYargsOptions], | ||
| processFn: async argv => { | ||
| print("Compiling...\n"); | ||
| const compiledGraph = await compile({ | ||
|
|
@@ -519,6 +526,10 @@ export function runCli() { | |
| const readCredentials = credentials.read( | ||
| getCredentialsPath(argv[projectDirOption.name], argv[credentialsOption.name]) | ||
| ); | ||
| if (argv[impersonateServiceAccountOption.name]) { | ||
| (readCredentials as any).impersonateServiceAccount = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we extend |
||
| argv[impersonateServiceAccountOption.name]; | ||
| } | ||
|
|
||
| if (!compiledGraph.tests.length) { | ||
| printError("No unit tests found."); | ||
|
|
@@ -563,10 +574,10 @@ export function runCli() { | |
| }, | ||
| actionsOption, | ||
| credentialsOption, | ||
| impersonateServiceAccountOption, | ||
| fullRefreshOption, | ||
| includeDepsOption, | ||
| includeDependentsOption, | ||
| credentialsOption, | ||
| jsonOutputOption, | ||
| timeoutOption, | ||
| tagsOption, | ||
|
|
@@ -599,6 +610,10 @@ export function runCli() { | |
| const readCredentials = credentials.read( | ||
| getCredentialsPath(argv[projectDirOption.name], argv[credentialsOption.name]) | ||
| ); | ||
| if (argv[impersonateServiceAccountOption.name]) { | ||
| (readCredentials as any).impersonateServiceAccount = | ||
| argv[impersonateServiceAccountOption.name]; | ||
| } | ||
|
|
||
| const dbadapter = new BigQueryDbAdapter(readCredentials); | ||
| const executionGraph = await build( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,6 +52,74 @@ export function formatBytesInHumanReadableFormat(bytes: number): string { | |
| return `${value} ${units[i]}`; | ||
| } | ||
|
|
||
| const DURATION_UNITS_IN_MILLIS: { [unit: string]: number } = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please elaborate about the effort to avoid it and upgrade |
||
| ms: 1, | ||
| msec: 1, | ||
| msecs: 1, | ||
| millisecond: 1, | ||
| milliseconds: 1, | ||
| s: 1000, | ||
| sec: 1000, | ||
| secs: 1000, | ||
| second: 1000, | ||
| seconds: 1000, | ||
| m: 60 * 1000, | ||
| min: 60 * 1000, | ||
| mins: 60 * 1000, | ||
| minute: 60 * 1000, | ||
| minutes: 60 * 1000, | ||
| h: 60 * 60 * 1000, | ||
| hr: 60 * 60 * 1000, | ||
| hrs: 60 * 60 * 1000, | ||
| hour: 60 * 60 * 1000, | ||
| hours: 60 * 60 * 1000, | ||
| d: 24 * 60 * 60 * 1000, | ||
| day: 24 * 60 * 60 * 1000, | ||
| days: 24 * 60 * 60 * 1000, | ||
| w: 7 * 24 * 60 * 60 * 1000, | ||
| week: 7 * 24 * 60 * 60 * 1000, | ||
| weeks: 7 * 24 * 60 * 60 * 1000 | ||
| }; | ||
|
|
||
| export function parseCliDuration(rawDuration: string): number { | ||
| const normalizedDuration = rawDuration?.trim().toLowerCase(); | ||
| if (!normalizedDuration) { | ||
| throw new Error("Duration cannot be empty."); | ||
| } | ||
|
|
||
| if (/^[+-]?\d+(\.\d+)?$/.test(normalizedDuration)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lint checks are failing |
||
| return Number(normalizedDuration); | ||
| } | ||
|
|
||
| let totalDurationMillis = 0; | ||
| let matchFound = false; | ||
| let cursor = 0; | ||
| const durationPattern = /([+-]?\d+(?:\.\d+)?)\s*([a-z]+)/g; | ||
|
|
||
| for (let match = durationPattern.exec(normalizedDuration); match; match = durationPattern.exec(normalizedDuration)) { | ||
| if (normalizedDuration.slice(cursor, match.index).trim()) { | ||
| throw new Error(`Invalid duration: ${rawDuration}`); | ||
| } | ||
|
|
||
| const durationValue = Number(match[1]); | ||
| const durationUnit = match[2]; | ||
| const unitMillis = DURATION_UNITS_IN_MILLIS[durationUnit]; | ||
| if (unitMillis === undefined) { | ||
| throw new Error(`Unsupported duration unit: ${durationUnit}`); | ||
| } | ||
|
|
||
| totalDurationMillis += durationValue * unitMillis; | ||
| cursor = durationPattern.lastIndex; | ||
| matchFound = true; | ||
| } | ||
|
|
||
| if (!matchFound || normalizedDuration.slice(cursor).trim()) { | ||
| throw new Error(`Invalid duration: ${rawDuration}`); | ||
| } | ||
|
|
||
| return totalDurationMillis; | ||
| } | ||
|
|
||
| /** | ||
| * Handles prompting and validation for defaultBucketName, defaultTableFolderRoot | ||
| * and defaultTableFolderSubpath if the user provides the --iceberg flag when | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to break lint checks