Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions packages/cubejs-base-driver/src/BaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ export abstract class BaseDriver implements DriverInterface {
if (!list.Contents) {
return [];
} else {
const csvFile = await Promise.all(
const csvFiles = await Promise.all(
list.Contents.map(async (file) => {
const command = new GetObjectCommand({
Bucket: bucketName,
Expand All @@ -795,7 +795,7 @@ export abstract class BaseDriver implements DriverInterface {
return getSignedUrl(storage, command, { expiresIn: 3600 });
})
);
return csvFile;
return csvFiles;
}
}

Expand All @@ -818,16 +818,17 @@ export abstract class BaseDriver implements DriverInterface {
const bucket = storage.bucket(bucketName);
const [files] = await bucket.getFiles({ prefix: `${tableName}/` });
if (files.length) {
const csvFile = await Promise.all(files.map(async (file) => {
const csvFiles = await Promise.all(files.map(async (file) => {
const [url] = await file.getSignedUrl({
action: 'read',
expires: new Date(new Date().getTime() + 60 * 60 * 1000)
});
return url;
}));
return csvFile;

return csvFiles;
} else {
return [];
throw new Error('No CSV files were obtained from the bucket');
}
}

Expand Down Expand Up @@ -923,6 +924,10 @@ export abstract class BaseDriver implements DriverInterface {
}
}

if (csvFiles.length === 0) {
throw new Error('No CSV files were obtained from the bucket');
}

return csvFiles;
}
}
75 changes: 35 additions & 40 deletions packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,13 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface {
}`);
}

const types = options.query
const { types, exportedCount } = options.query
? await this.unloadWithSql(tableName, options)
: await this.unloadWithTable(tableName, options);
const csvFile = await this.getCsvFiles(tableName);
// Snowflake doesn't produce csv files if no data is exported (no data rows)
// so it's important not to call getCsvFiles(), because it checks for empty files list
// and throws an error.
const csvFile = exportedCount > 0 ? await this.getCsvFiles(tableName) : [];

return {
exportBucketCsvEscapeSymbol: this.config.exportBucketCsvEscapeSymbol,
Expand All @@ -568,37 +571,42 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface {
};
}

private buildBucketUrl(tableName: string): string {
const { bucketType } = <SnowflakeDriverExportBucket> this.config.exportBucket;

let bucketName: string;
let exportPrefix: string;
let path: string;

if (bucketType === 'azure') {
({ bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName));
const pathArr = path.split('/');
bucketName = `${bucketName}/${pathArr[0]}`;
exportPrefix = pathArr.length > 1 ? `${pathArr.slice(1).join('/')}/${tableName}` : tableName;
} else {
({ bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName));
exportPrefix = path ? `${path}/${tableName}` : tableName;
}

return `${bucketType}://${bucketName}/${exportPrefix}/`;
}

/**
* Unload data from a SQL query to an export bucket.
*/
private async unloadWithSql(
tableName: string,
options: UnloadOptions,
): Promise<TableStructure> {
): Promise<{ types: TableStructure, exportedCount: number }> {
if (!options.query) {
throw new Error('Unload query is missed.');
} else {
const types = await this.queryColumnTypes(options.query.sql, options.query.params);
const connection = await this.getConnection();
const { bucketType } =
<SnowflakeDriverExportBucket> this.config.exportBucket;

let bucketName: string;
let exportPrefix: string;
let path: string;

if (bucketType === 'azure') {
({ bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName));
const pathArr = path.split('/');
bucketName = `${bucketName}/${pathArr[0]}`;
exportPrefix = pathArr.length > 1 ? `${pathArr.slice(1).join('/')}/${tableName}` : tableName;
} else {
({ bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName));
exportPrefix = path ? `${path}/${tableName}` : tableName;
}
const bucketUrl = this.buildBucketUrl(tableName);

const unloadSql = `
COPY INTO '${bucketType}://${bucketName}/${exportPrefix}/'
COPY INTO '${bucketUrl}'
FROM (${options.query.sql})
${this.exportOptionsClause(options)}`;
const result = await this.execute<UnloadResponse[]>(
Expand All @@ -610,7 +618,7 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface {
if (!result) {
throw new Error('Missing `COPY INTO` query result.');
}
return types;
return { types, exportedCount: parseInt(result[0].rows_unloaded, 10) };
}
}

Expand Down Expand Up @@ -641,28 +649,13 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface {
private async unloadWithTable(
tableName: string,
options: UnloadOptions,
): Promise<TableStructure> {
): Promise<{ types: TableStructure, exportedCount: number }> {
const types = await this.tableColumnTypes(tableName);
const connection = await this.getConnection();
const { bucketType } =
<SnowflakeDriverExportBucket> this.config.exportBucket;

let bucketName: string;
let exportPrefix: string;
let path: string;

if (bucketType === 'azure') {
({ bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName));
const pathArr = path.split('/');
bucketName = `${bucketName}/${pathArr[0]}`;
exportPrefix = pathArr.length > 1 ? `${pathArr.slice(1).join('/')}/${tableName}` : tableName;
} else {
({ bucketName, path } = this.parseBucketUrl(this.config.exportBucket!.bucketName));
exportPrefix = path ? `${path}/${tableName}` : tableName;
}
const bucketUrl = this.buildBucketUrl(tableName);

const unloadSql = `
COPY INTO '${bucketType}://${bucketName}/${exportPrefix}/'
COPY INTO '${bucketUrl}'
FROM ${tableName}
${this.exportOptionsClause(options)}`;
const result = await this.execute<UnloadResponse[]>(
Expand All @@ -671,10 +664,12 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface {
[],
false,
);

if (!result) {
throw new Error('Missing `COPY INTO` query result.');
}
return types;

return { types, exportedCount: parseInt(result[0].rows_unloaded, 10) };
}

/**
Expand Down
Loading