Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.InstantCoder;
Expand Down Expand Up @@ -308,21 +310,24 @@ public void splitRestriction(
}

@NewTracker
public RestrictionTracker<BoundedSourceT, TimestampedValue<T>[]> restrictionTracker(
public RestrictionTracker<BoundedSourceT, Supplier<TimestampedValue<T>>[]> restrictionTracker(
@Restriction BoundedSourceT restriction, PipelineOptions pipelineOptions) {
return new BoundedSourceAsSDFRestrictionTracker<>(restriction, pipelineOptions);
}

@ProcessElement
public void processElement(
RestrictionTracker<BoundedSourceT, TimestampedValue<T>[]> tracker,
RestrictionTracker<BoundedSourceT, Supplier<TimestampedValue<T>>[]> tracker,
OutputReceiver<T> receiver)
throws IOException {
@SuppressWarnings(
"rawtypes") // most straightforward way of creating array with type parameter
TimestampedValue<T>[] out = new TimestampedValue[1];
Supplier<TimestampedValue<T>>[] out = new Supplier[1];
while (tracker.tryClaim(out)) {
receiver.outputWithTimestamp(out[0].getValue(), out[0].getTimestamp());
TimestampedValue<T> currentValue = out[0].get();
if (currentValue != null) {
receiver.outputWithTimestamp(currentValue.getValue(), currentValue.getTimestamp());
}
}
}

Expand All @@ -337,53 +342,55 @@ public Coder<BoundedSourceT> restrictionCoder() {
*/
private static class BoundedSourceAsSDFRestrictionTracker<
BoundedSourceT extends BoundedSource<T>, T>
extends RestrictionTracker<BoundedSourceT, TimestampedValue<T>[]> implements HasProgress {
extends RestrictionTracker<BoundedSourceT, Supplier<TimestampedValue<T>>[]> implements HasProgress {
private final BoundedSourceT initialRestriction;
private final PipelineOptions pipelineOptions;
private BoundedSource.@Nullable BoundedReader<T> currentReader = null;
private boolean claimedAll;
private boolean splitActive;
private Supplier<TimestampedValue<T>> readNextElementSupplier;

BoundedSourceAsSDFRestrictionTracker(
BoundedSourceT initialRestriction, PipelineOptions pipelineOptions) {
this.initialRestriction = initialRestriction;
this.pipelineOptions = pipelineOptions;
this.readNextElementSupplier = new Supplier<@Nullable TimestampedValue<T>>() {
@Override
public @Nullable TimestampedValue<T> get() {
try {
readOrThrow();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

@Override
public boolean tryClaim(TimestampedValue<T>[] position) {
public boolean tryClaim(Supplier<TimestampedValue<T>>[] position) {
if (splitActive) {
// notify split to continue
// wait for split to complete
}
if (claimedAll) {
return false;
}
try {
return tryClaimOrThrow(position);
} catch (IOException e) {
if (currentReader != null) {
try {
currentReader.close();
} catch (IOException closeException) {
e.addSuppressed(closeException);
} finally {
currentReader = null;
}
}
throw new RuntimeException(e);
}
position[0] = readNextElementSupplier;
return true;
}

private boolean tryClaimOrThrow(TimestampedValue<T>[] position) throws IOException {
private @Nullable TimestampedValue<T> readOrThrow() throws IOException {
BoundedSource.BoundedReader<T> currentReader = this.currentReader;
if (currentReader == null) {
BoundedSource.BoundedReader<T> newReader =
initialRestriction.createReader(pipelineOptions);
if (!newReader.start()) {
claimedAll = true;
newReader.close();
return false;
return null;
}
position[0] =
TimestampedValue.of(newReader.getCurrent(), newReader.getCurrentTimestamp());
this.currentReader = newReader;
return true;
return TimestampedValue.of(newReader.getCurrent(), newReader.getCurrentTimestamp());
}

if (!currentReader.advance()) {
Expand All @@ -393,12 +400,12 @@ private boolean tryClaimOrThrow(TimestampedValue<T>[] position) throws IOExcepti
} finally {
this.currentReader = null;
}
return false;
return null;
}

position[0] =
TimestampedValue.of(currentReader.getCurrent(), currentReader.getCurrentTimestamp());
return true;
return TimestampedValue.of(
currentReader.getCurrent(),
currentReader.getCurrentTimestamp());
}

@SuppressWarnings("Finalize")
Expand Down
Loading