@@ -73,8 +73,8 @@ func (c *Client) Execute(ctx context.Context, tableID, queryFilePath string) err
7373 if err != nil {
7474 return errors .WithStack (err )
7575 }
76- // construct query with ordered columns
77- queryRaw = constructQueryWithOrderedColumns (queryRaw , columnNames )
76+ // construct query with ordered columns and BQ pseudo columns for ingestion time
77+ queryRaw = constructQueryWithOrderedColumnsWithBQIngestionTime (queryRaw , columnNames )
7878 }
7979
8080 if c .enablePartitionValue && ! c .enableAutoPartition {
@@ -111,6 +111,23 @@ func addPartitionValueColumn(rawQuery []byte) []byte {
111111 return []byte (fmt .Sprintf ("%s SELECT *, STRING(CURRENT_DATE()) as __partitionvalue FROM (%s)" , header , qr ))
112112}
113113
114+ // constructQueryWithOrderedColumnsWithBQIngestionTime constructs query with ordered columns and BQ pseudo columns for ingestion time
115+ // ref: https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_ingestion-time_partitioned_table
116+ func constructQueryWithOrderedColumnsWithBQIngestionTime (query []byte , orderedColumns []string ) []byte {
117+ var orderedColumnsWithBQIngestionTime []string
118+ for _ , col := range orderedColumns {
119+ val := col
120+ switch col {
121+ case "_partitiontime" :
122+ val = "CURRENT_TIMESTAMP() as _partitiontime"
123+ case "_partitiondate" :
124+ val = "CURRENT_DATE() as _partitiondate"
125+ }
126+ orderedColumnsWithBQIngestionTime = append (orderedColumnsWithBQIngestionTime , val )
127+ }
128+ return constructQueryWithOrderedColumns (query , orderedColumnsWithBQIngestionTime )
129+ }
130+
114131func constructQueryWithOrderedColumns (query []byte , orderedColumns []string ) []byte {
115132 header , qr := loader .SeparateHeadersAndQuery (string (query ))
116133 return []byte (fmt .Sprintf ("%s %s" , header , loader .ConstructQueryWithOrderedColumns (qr , orderedColumns )))
0 commit comments