Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ An example consists of two pipelines:
* Writing into Snowflake
1. Reading files from provided by `inputFile` argument.
2. Counting words
3. Writing counts into Snowflake table provided by `tableName` argument.
3. Writing counts into Snowflake table provided by `table` argument.
* Reading from Snowflake
1. Reading counts from Snowflake table provided by `tableName` argument.
1. Reading counts from Snowflake table provided by `table` argument.
2. Writing counts into provided by `output` argument.

#### Executing:
Expand All @@ -70,7 +70,7 @@ An example consists of two pipelines:
--password=<SNOWFLAKE PASSWORD> \
--database=<SNOWFLAKE DATABASE> \
--schema=<SNOWFLAKE SCHEMA> \
--tableName=<SNOWFLAKE TABLE NAME> \
--table=<SNOWFLAKE TABLE NAME> \
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \
--stagingBucketName=<GCS BUCKET NAME> \
--runner=<DirectRunner/DataflowRunner> \
Expand Down
Binary file not shown.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ repositories {

dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile files('beam-sdks-java-io-snowflake-2.22.0-SNAPSHOT.jar')
compile files('beam-sdks-java-io-snowflake-2.23.0-SNAPSHOT.jar')
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
compile group: 'org.apache.beam', name: 'beam-runners-direct-java', version: '2.22.0'
compile group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: '2.22.0'
compile 'com.google.cloud:google-cloud-kms:1.20.0'
}

task execute (type:JavaExec) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/batching/SnowflakeWordCountOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;

/**
* Supported PipelineOptions used in provided examples.
Expand All @@ -21,4 +22,10 @@ public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions {
String getOutput();

void setOutput(String value);

@Description(
"KMS Encryption Key should be in the format projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}")
ValueProvider<String> getKMSEncryptionKey();

void setKMSEncryptionKey(ValueProvider<String> keyName);
}
21 changes: 18 additions & 3 deletions src/main/java/batching/WordCountExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
Expand All @@ -23,10 +24,11 @@
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import util.KMSEncryptedNestedValueProvider;

/**
* An example that contains batch writing and reading from Snowflake. Inspired by Apache Beam/WordCount-example(https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java)
*
* <p>
* Check main README for more information.
*/
public class WordCountExample {
Expand Down Expand Up @@ -98,9 +100,16 @@ private static PTransform<PBegin, PCollection<WordCountRow>> createSnowflakeRead

public static SnowflakeIO.DataSourceConfiguration createSnowflakeConfiguration(SnowflakeWordCountOptions options) {
return SnowflakeIO.DataSourceConfiguration.create()
.withUsernamePasswordAuth(options.getUsername(), options.getPassword())
.withUsernamePasswordAuth(
maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()),
maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey())
)
.withOAuth(options.getOauthToken())
.withKeyPairRawAuth(options.getUsername(), options.getRawPrivateKey(), options.getPrivateKeyPassphrase())
.withKeyPairRawAuth(
maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()),
maybeDecrypt(options.getRawPrivateKey(), options.getKMSEncryptionKey()),
maybeDecrypt(options.getPrivateKeyPassphrase(), options.getKMSEncryptionKey())
)
.withDatabase(options.getDatabase())
.withServerName(options.getServerName())
.withSchema(options.getSchema());
Expand Down Expand Up @@ -154,4 +163,10 @@ public void processElement(@Element String element, OutputReceiver<String> recei
}
}

private static ValueProvider<String> maybeDecrypt(
ValueProvider<String> unencryptedValue, ValueProvider<String> kmsKey) {

return new KMSEncryptedNestedValueProvider(unencryptedValue, kmsKey);
}

}
100 changes: 100 additions & 0 deletions src/main/java/util/DualInputNestedValueProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. SecondTou may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANSecondT KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package util;

import com.google.common.base.MoreObjects;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;

import java.io.Serializable;

/**
* {@link DualInputNestedValueProvider} is an implementation of {@link ValueProvider} that allows
* for wrapping two {@link ValueProvider} objects. It's inspired by {@link
* org.apache.beam.sdk.options.ValueProvider.NestedValueProvider} but it can accept two inputs
* rather than one.
*/
public class DualInputNestedValueProvider<T, FirstT, SecondT>
implements ValueProvider<T>, Serializable {

/** Pair like struct holding two values. */
public static class TranslatorInput<FirstT, SecondT> {
private final FirstT x;
private final SecondT y;

public TranslatorInput(FirstT x, SecondT y) {
this.x = x;
this.y = y;
}

public FirstT getX() {
return x;
}

public SecondT getY() {
return y;
}
}

private final ValueProvider<FirstT> valueX;
private final ValueProvider<SecondT> valueY;
private final SerializableFunction<TranslatorInput<FirstT, SecondT>, T> translator;
private transient volatile T cachedValue;

public DualInputNestedValueProvider(
ValueProvider<FirstT> valueX,
ValueProvider<SecondT> valueY,
SerializableFunction<TranslatorInput<FirstT, SecondT>, T> translator) {
this.valueX = valueX;
this.valueY = valueY;
this.translator = translator;
}

/** Creates a {@link NestedValueProvider} that wraps two provided values. */
public static <T, FirstT, SecondT> DualInputNestedValueProvider<T, FirstT, SecondT> of(
ValueProvider<FirstT> valueX,
ValueProvider<SecondT> valueY,
SerializableFunction<TranslatorInput<FirstT, SecondT>, T> translator) {
DualInputNestedValueProvider<T, FirstT, SecondT> factory =
new DualInputNestedValueProvider<>(valueX, valueY, translator);
return factory;
}

@Override
public T get() {
if (cachedValue == null) {
cachedValue = translator.apply(new TranslatorInput<>(valueX.get(), valueY.get()));
}
return cachedValue;
}

@Override
public boolean isAccessible() {
return valueX.isAccessible() && valueY.isAccessible();
}

@Override
public String toString() {
if (isAccessible()) {
return String.valueOf(get());
}
return MoreObjects.toStringHelper(this)
.add("valueX", valueX)
.add("valueY", valueY)
.add("translator", translator.getClass().getSimpleName())
.toString();
}
}
Loading