diff --git a/cli/BUILD b/cli/BUILD index 2eebdabb1..ee3b03e8d 100644 --- a/cli/BUILD +++ b/cli/BUILD @@ -28,7 +28,6 @@ ts_library( "@npm//@types/yargs", "@npm//chokidar", "@npm//glob", - "@npm//parse-duration", "@npm//readline-sync", "@npm//untildify", "@npm//yargs", diff --git a/cli/api/BUILD b/cli/api/BUILD index 352f3ef06..72f11589a 100644 --- a/cli/api/BUILD +++ b/cli/api/BUILD @@ -38,6 +38,7 @@ ts_library( "@npm//deepmerge", "@npm//fs-extra", "@npm//glob", + "@npm//google-auth-library", "@npm//google-sql-syntax-ts", "@npm//js-beautify", "@npm//js-yaml", diff --git a/cli/api/dbadapters/bigquery.ts b/cli/api/dbadapters/bigquery.ts index 6f02bc609..8eed91e4c 100644 --- a/cli/api/dbadapters/bigquery.ts +++ b/cli/api/dbadapters/bigquery.ts @@ -1,7 +1,7 @@ -import { BigQuery, GetTablesResponse, TableField, TableMetadata } from "@google-cloud/bigquery"; +import { GoogleAuth, Impersonated } from "google-auth-library"; import Long from "long"; import { PromisePoolExecutor } from "promise-pool-executor"; - +import { BigQuery, GetTablesResponse, TableField, TableMetadata } from "@google-cloud/bigquery"; import { collectEvaluationQueries, QueryOrAction } from "df/cli/api/dbadapters/execution_sql"; import { IBigQueryError, IDbAdapter, IDbClient, IExecutionResult, OnCancel } from "df/cli/api/dbadapters/index"; import { parseBigqueryEvalError } from "df/cli/api/utils/error_parsing"; @@ -10,7 +10,9 @@ import { coerceAsError } from "df/common/errors/errors"; import { retry } from "df/common/promises"; import { dataform } from "df/protos/ts"; +const GOOGLE_CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; const EXTRA_GOOGLE_SCOPES = ["https://www.googleapis.com/auth/drive"]; +const IMPERSONATION_GOOGLE_SCOPES = [GOOGLE_CLOUD_PLATFORM_SCOPE, ...EXTRA_GOOGLE_SCOPES]; const BIGQUERY_DATE_RELATED_FIELDS = [ "BigQueryDate", @@ -104,8 +106,8 @@ export class BigQueryDbAdapter implements IDbAdapter { try { await this.pool .addSingleTask({ - generator: () => - this.getClient().query({ + generator: async () => + (await this.getClient()).query({ useLegacySql: false, query, dryRun: true @@ -130,7 +132,8 @@ export class BigQueryDbAdapter implements IDbAdapter { } public async tables(): Promise { - const datasets = await this.getClient().getDatasets({ autoPaginate: true, maxResults: 1000 }); + const client = await this.getClient(); + const datasets = await client.getDatasets({ autoPaginate: true, maxResults: 1000 }); const tables = await Promise.all( datasets[0].map(dataset => dataset.getTables({ autoPaginate: true, maxResults: 1000 })) ); @@ -219,7 +222,7 @@ export class BigQueryDbAdapter implements IDbAdapter { } public async schemas(database: string): Promise { - const data = await this.getClient(database).getDatasets(); + const data = await (await this.getClient(database)).getDatasets(); return data[0].map(dataset => dataset.id); } @@ -239,7 +242,7 @@ export class BigQueryDbAdapter implements IDbAdapter { metadata.schema.fields ); - await this.getClient(target.database) + await (await this.getClient(target.database)) .dataset(target.schema) .table(target.name) .setMetadata({ @@ -251,7 +254,7 @@ export class BigQueryDbAdapter implements IDbAdapter { private async getMetadata(target: dataform.ITarget): Promise { try { - const table = await this.getClient(target.database) + const table = await (await this.getClient(target.database)) .dataset(target.schema) .table(target.name) .getMetadata(); @@ -266,19 +269,38 @@ export class BigQueryDbAdapter implements IDbAdapter { } } - private getClient(projectId?: string) { + private async getClient(projectId?: string) { projectId = projectId || this.bigQueryCredentials.projectId; if (!this.clients.has(projectId)) { - this.clients.set( + const clientConfig: any = { projectId, - new BigQuery({ + scopes: EXTRA_GOOGLE_SCOPES, + location: this.bigQueryCredentials.location + }; + + if (this.bigQueryCredentials.impersonateServiceAccount) { + // For impersonation, create an Impersonated credential directly + const sourceAuth = new GoogleAuth({ projectId, - scopes: EXTRA_GOOGLE_SCOPES, - location: this.bigQueryCredentials.location, + scopes: IMPERSONATION_GOOGLE_SCOPES, credentials: - this.bigQueryCredentials.credentials && JSON.parse(this.bigQueryCredentials.credentials) - }) - ); + this.bigQueryCredentials.credentials && + JSON.parse(this.bigQueryCredentials.credentials) + }); + + const authClient = await sourceAuth.getClient(); + + clientConfig.authClient = new Impersonated({ + sourceClient: authClient, + targetPrincipal: this.bigQueryCredentials.impersonateServiceAccount, + targetScopes: IMPERSONATION_GOOGLE_SCOPES + }); + } else { + clientConfig.credentials = + this.bigQueryCredentials.credentials && JSON.parse(this.bigQueryCredentials.credentials); + } + + this.clients.set(projectId, new BigQuery(clientConfig)); } return this.clients.get(projectId); } @@ -290,12 +312,12 @@ export class BigQueryDbAdapter implements IDbAdapter { byteLimit?: number, location?: string ) { - const results = await new Promise((resolve, reject) => { + const results = await new Promise(async (resolve, reject) => { const allRows = new LimitedResultSet({ rowLimit, byteLimit }); - const stream = this.getClient().createQueryStream({ + const stream = (await this.getClient()).createQueryStream({ query, params, location @@ -332,7 +354,8 @@ export class BigQueryDbAdapter implements IDbAdapter { return retry( async () => { try { - const job = await this.getClient().createQueryJob({ + const client = await this.getClient(); + const job = await client.createQueryJob({ useLegacySql: false, jobPrefix: "dataform-" + (jobPrefix ? `${jobPrefix}-` : ""), query, diff --git a/cli/index.ts b/cli/index.ts index f0548a72e..d0bbc1ce3 100644 --- a/cli/index.ts +++ b/cli/index.ts @@ -1,7 +1,6 @@ import * as chokidar from "chokidar"; import * as fs from "fs"; import * as glob from "glob"; -import parseDuration from "parse-duration"; import * as path from "path"; import yargs from "yargs"; @@ -28,6 +27,7 @@ import { actuallyResolve, assertPathExists, compiledGraphHasErrors, + parseCliDuration, promptForIcebergConfig, } from "df/cli/util"; import { createYargsCli, INamedOption } from "df/cli/yargswrapper"; @@ -174,7 +174,7 @@ const timeoutOption: INamedOption = { type: "string", default: null, coerce: (rawTimeoutString: string | null) => - rawTimeoutString ? parseDuration(rawTimeoutString) : null + rawTimeoutString ? parseCliDuration(rawTimeoutString) : null } }; @@ -207,6 +207,13 @@ const bigqueryJobLabelsOption: INamedOption = { } }; +const impersonateServiceAccountOption: INamedOption = { + name: "impersonate-service-account", + option: { + describe: "Service account email to impersonate during authentication.", + type: "string" + } +}; const quietCompileOption: INamedOption = { name: "quiet", option: { @@ -503,7 +510,7 @@ export function runCli() { format: `test [${projectDirMustExistOption.name}]`, description: "Run the dataform project's unit tests.", positionalOptions: [projectDirMustExistOption], - options: [credentialsOption, timeoutOption, ...ProjectConfigOptions.allYargsOptions], + options: [credentialsOption, impersonateServiceAccountOption, timeoutOption, ...ProjectConfigOptions.allYargsOptions], processFn: async argv => { print("Compiling...\n"); const compiledGraph = await compile({ @@ -519,6 +526,10 @@ export function runCli() { const readCredentials = credentials.read( getCredentialsPath(argv[projectDirOption.name], argv[credentialsOption.name]) ); + if (argv[impersonateServiceAccountOption.name]) { + (readCredentials as any).impersonateServiceAccount = + argv[impersonateServiceAccountOption.name]; + } if (!compiledGraph.tests.length) { printError("No unit tests found."); @@ -563,10 +574,10 @@ export function runCli() { }, actionsOption, credentialsOption, + impersonateServiceAccountOption, fullRefreshOption, includeDepsOption, includeDependentsOption, - credentialsOption, jsonOutputOption, timeoutOption, tagsOption, @@ -599,6 +610,10 @@ export function runCli() { const readCredentials = credentials.read( getCredentialsPath(argv[projectDirOption.name], argv[credentialsOption.name]) ); + if (argv[impersonateServiceAccountOption.name]) { + (readCredentials as any).impersonateServiceAccount = + argv[impersonateServiceAccountOption.name]; + } const dbadapter = new BigQueryDbAdapter(readCredentials); const executionGraph = await build( diff --git a/cli/util.ts b/cli/util.ts index dcd77b37b..2d46bb878 100644 --- a/cli/util.ts +++ b/cli/util.ts @@ -52,6 +52,74 @@ export function formatBytesInHumanReadableFormat(bytes: number): string { return `${value} ${units[i]}`; } +const DURATION_UNITS_IN_MILLIS: { [unit: string]: number } = { + ms: 1, + msec: 1, + msecs: 1, + millisecond: 1, + milliseconds: 1, + s: 1000, + sec: 1000, + secs: 1000, + second: 1000, + seconds: 1000, + m: 60 * 1000, + min: 60 * 1000, + mins: 60 * 1000, + minute: 60 * 1000, + minutes: 60 * 1000, + h: 60 * 60 * 1000, + hr: 60 * 60 * 1000, + hrs: 60 * 60 * 1000, + hour: 60 * 60 * 1000, + hours: 60 * 60 * 1000, + d: 24 * 60 * 60 * 1000, + day: 24 * 60 * 60 * 1000, + days: 24 * 60 * 60 * 1000, + w: 7 * 24 * 60 * 60 * 1000, + week: 7 * 24 * 60 * 60 * 1000, + weeks: 7 * 24 * 60 * 60 * 1000 +}; + +export function parseCliDuration(rawDuration: string): number { + const normalizedDuration = rawDuration?.trim().toLowerCase(); + if (!normalizedDuration) { + throw new Error("Duration cannot be empty."); + } + + if (/^[+-]?\d+(\.\d+)?$/.test(normalizedDuration)) { + return Number(normalizedDuration); + } + + let totalDurationMillis = 0; + let matchFound = false; + let cursor = 0; + const durationPattern = /([+-]?\d+(?:\.\d+)?)\s*([a-z]+)/g; + + for (let match = durationPattern.exec(normalizedDuration); match; match = durationPattern.exec(normalizedDuration)) { + if (normalizedDuration.slice(cursor, match.index).trim()) { + throw new Error(`Invalid duration: ${rawDuration}`); + } + + const durationValue = Number(match[1]); + const durationUnit = match[2]; + const unitMillis = DURATION_UNITS_IN_MILLIS[durationUnit]; + if (unitMillis === undefined) { + throw new Error(`Unsupported duration unit: ${durationUnit}`); + } + + totalDurationMillis += durationValue * unitMillis; + cursor = durationPattern.lastIndex; + matchFound = true; + } + + if (!matchFound || normalizedDuration.slice(cursor).trim()) { + throw new Error(`Invalid duration: ${rawDuration}`); + } + + return totalDurationMillis; +} + /** * Handles prompting and validation for defaultBucketName, defaultTableFolderRoot * and defaultTableFolderSubpath if the user provides the --iceberg flag when diff --git a/cli/util_test.ts b/cli/util_test.ts index b86f89370..cc15781fe 100644 --- a/cli/util_test.ts +++ b/cli/util_test.ts @@ -3,6 +3,7 @@ import { expect } from "chai"; import { formatBytesInHumanReadableFormat, formatExecutionSuffix, + parseCliDuration, validateIcebergConfigBucketName, validateIcebergConfigTableFolderRoot, validateIcebergConfigTableFolderSubpath, @@ -35,6 +36,30 @@ suite('format bytes in human readable format', () => { }); }); +suite("parse cli duration", () => { + test("parses numeric durations as milliseconds", () => { + expect(parseCliDuration("1500")).equals(1500); + }); + + test("parses single-unit durations", () => { + expect(parseCliDuration("1s")).equals(1000); + expect(parseCliDuration("10m")).equals(600000); + expect(parseCliDuration("2 hours")).equals(7200000); + }); + + test("parses compound and fractional durations", () => { + expect(parseCliDuration("1h30m")).equals(5400000); + expect(parseCliDuration("1.5m")).equals(90000); + expect(parseCliDuration("1 week 2 days")).equals(777600000); + }); + + test("rejects invalid durations", () => { + expect(() => parseCliDuration("")).to.throw("Duration cannot be empty."); + expect(() => parseCliDuration("tomorrow")).to.throw("Invalid duration: tomorrow"); + expect(() => parseCliDuration("1fortnight")).to.throw("Unsupported duration unit: fortnight"); + }); +}); + suite('Iceberg Config Validation', () => { suite('validateIcebergConfigBucketName', () => { test('valid bucket names do not throw errors', () => { diff --git a/package.json b/package.json index fa51523ca..510fc3e40 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "estraverse": "^5.1.0", "fs-extra": "^9.0.0", "glob": "^10.5.0", + "google-auth-library": "~8.9.0", "google-sql-syntax-ts": "^1.0.3", "js-beautify": "^1.10.2", "js-yaml": "^4.1.1", @@ -45,7 +46,6 @@ "minimist": "^1.2.6", "moo": "^0.5.0", "object-sizeof": "^1.6.1", - "parse-duration": "^1.0.0", "prettier": "^1.14.2", "promise-pool-executor": "^1.1.1", "protobufjs": "^7.2.5", diff --git a/packages/@dataform/cli/BUILD b/packages/@dataform/cli/BUILD index 7a1d7085c..60ca46549 100644 --- a/packages/@dataform/cli/BUILD +++ b/packages/@dataform/cli/BUILD @@ -33,12 +33,12 @@ externals = [ "deepmerge", "fs-extra", "glob", + "google-auth-library", "google-sql-syntax-ts", "js-beautify", "js-yaml", "moo", "object-sizeof", - "parse-duration", "promise-pool-executor", "protobufjs", "readline-sync", diff --git a/protos/profiles.proto b/protos/profiles.proto index 09eabcdb8..37b982ac9 100644 --- a/protos/profiles.proto +++ b/protos/profiles.proto @@ -11,6 +11,8 @@ message BigQuery { string credentials = 3; // Options are listed here: https://cloud.google.com/bigquery/docs/locations string location = 4; + // Service account email to impersonate during authentication + string impersonate_service_account = 5; reserved 2; } diff --git a/yarn.lock b/yarn.lock index 160442c8f..66ccf1b49 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2090,7 +2090,7 @@ google-auth-library@^7.0.0, google-auth-library@^7.0.2: jws "^4.0.0" lru-cache "^6.0.0" -google-auth-library@^8.0.2: +google-auth-library@^8.0.2, google-auth-library@~8.9.0: version "8.9.0" resolved "https://registry.yarnpkg.com/google-auth-library/-/google-auth-library-8.9.0.tgz#15a271eb2ec35d43b81deb72211bd61b1ef14dd0" integrity sha512-f7aQCJODJFmYWN6PeNKzgvy9LI2tYmXnzpNDHEjG5sDNPgGb2FXQyTBnXeSH+PAtpKESFD+LmHw3Ox3mN7e1Fg== @@ -3219,11 +3219,6 @@ package-json-from-dist@^1.0.0: resolved "https://registry.yarnpkg.com/package-json-from-dist/-/package-json-from-dist-1.0.1.tgz#4f1471a010827a86f94cfd9b0727e36d267de505" integrity sha512-UEZIS3/by4OC8vL3P2dTXRETpebLI2NiI5vIrjaD/5UtrkFX/tNbwjTSRAGC/+7CAo2pIcBaRgWmcBBHcsaCIw== -parse-duration@^1.0.0: - version "1.0.0" - resolved "https://registry.yarnpkg.com/parse-duration/-/parse-duration-1.0.0.tgz#8605651745f61088f6fb14045c887526c291858c" - integrity "sha1-hgVlF0X2EIj2+xQEXIh1JsKRhYw= sha512-X4kUkCTHU1N/kEbwK9FpUJ0UZQa90VzeczfS704frR30gljxDG0pSziws06XlK+CGRSo/1wtG1mFIdBFQTMQNw==" - parse-semver@^1.1.1: version "1.1.1" resolved "https://registry.yarnpkg.com/parse-semver/-/parse-semver-1.1.1.tgz#9a4afd6df063dc4826f93fba4a99cf223f666cb8"