Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -106,14 +107,30 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.required(false)
.build();
public static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
.name("Move Destination Directory")
.description(String.format("The directory on the remote server to move the original file to once it has been ingested into NiFi. "
+ "This property is ignored unless the %s is set to '%s'. The specified directory must already exist on "
+ "the remote system if '%s' is disabled, or the rename will fail.",
COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName()))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.name("Move Destination Directory")
.description("""
The directory on the remote server to move the original file to once it has been ingested into NiFi.
This property is ignored unless the %s is set to '%s'.
The specified directory must already exist on the remote system if '%s' is disabled, or the rename will fail.
""".formatted( COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName())
)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();

public static final PropertyDescriptor MOVE_CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("Move Conflict Resolution")
.description(String.format("Determines how to handle filename collisions when '%s' is '%s'. " +
"This setting controls behavior when the target file exists in the %s.",
Comment on lines +124 to +125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A multiline string with """description""".formatted() can also work and make this a bit easier to read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description can be reformatted as mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@exceptionfactory code already has been updated. i do not know why it's not showing here

COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_DESTINATION_DIR.getDisplayName()))
.required(true)
.dependsOn(COMPLETION_STRATEGY, COMPLETION_MOVE.getValue())
.allowableValues(FileTransfer.CONFLICT_RESOLUTION_REPLACE_ALLOWABLE,
FileTransfer.CONFLICT_RESOLUTION_IGNORE_ALLOWABLE,
FileTransfer.CONFLICT_RESOLUTION_RENAME_ALLOWABLE
)
.defaultValue(FileTransfer.CONFLICT_RESOLUTION_IGNORE_ALLOWABLE)
.build();

public static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -147,7 +164,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
REMOTE_FILENAME,
COMPLETION_STRATEGY,
MOVE_DESTINATION_DIR,
MOVE_CREATE_DIRECTORY
MOVE_CREATE_DIRECTORY,
MOVE_CONFLICT_RESOLUTION
);

private static final Set<Relationship> RELATIONSHIPS = Set.of(
Expand Down Expand Up @@ -256,6 +274,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

Relationship failureRelationship = null;
String failureReason = null;
boolean closeConnOnFailure = false;

try {
Expand All @@ -268,6 +287,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
flowFile, filename, host, failureRelationship.getName());
} catch (final PermissionDeniedException e) {
failureRelationship = REL_PERMISSION_DENIED;
failureReason = "permission.denied.read";
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}",
flowFile, filename, host, failureRelationship.getName());
} catch (final ProcessException | IOException e) {
Expand Down Expand Up @@ -296,7 +316,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}

if (failureRelationship != null) {
attributes.put(FAILURE_REASON_ATTRIBUTE, failureRelationship.getName());
attributes.put(FAILURE_REASON_ATTRIBUTE, failureReason != null ? failureReason : failureRelationship.getName());
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(session.penalize(flowFile), failureRelationship);
session.getProvenanceReporter().route(flowFile, failureRelationship);
Expand All @@ -306,13 +326,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

flowFile = session.putAllAttributes(flowFile, attributes);

// Centralized pre-commit completion handling for MOVE/DELETE. This method may route and cleanup.
if (handleCompletionPreCommit(transfer, context, session, flowFile, attributes, filename, host, port, transferQueue)) {
return;
}

// emit provenance event and transfer FlowFile
session.getProvenanceReporter().fetch(flowFile, protocolName + "://" + host + ":" + port + "/" + filename, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);

// it is critical that we commit the session before moving/deleting the remote file. Otherwise, we could have a situation where
// we ingest the data, delete/move the remote file, and then NiFi dies/is shut down before the session is committed. This would
// result in data loss! If we commit the session first, we are safe.
// Commit and perform any post-commit completion actions (DELETE only). Move is already handled pre-commit above.
final BlockingQueue<FileTransferIdleWrapper> queue = transferQueue;
final Runnable cleanupTask = () -> cleanupTransfer(transfer, false, queue, host, port);

Expand All @@ -327,6 +350,101 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
}

private boolean handleCompletionPreCommit(final FileTransfer transfer,
final ProcessContext context,
final ProcessSession session,
final FlowFile flowFile,
final Map<String, String> baseAttributes,
final String filename,
final String host,
final int port,
final BlockingQueue<FileTransferIdleWrapper> transferQueue) {
final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();

try {
if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {

// Attempt remote MOVE
final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
final String simpleFilename = getSimpleFilename(filename);

try {
final String absoluteTargetDirPath = transfer.getAbsolutePath(flowFile, targetDir);
if (context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean()) {
transfer.ensureDirectoryExists(flowFile, new File(absoluteTargetDirPath));
}

String destinationFileName = simpleFilename;
final FileInfo remoteFileInfo = transfer.getRemoteFileInfo(flowFile, absoluteTargetDirPath, destinationFileName);
final String strategy = context.getProperty(MOVE_CONFLICT_RESOLUTION).getValue();

if (remoteFileInfo != null) {
switch (strategy.toUpperCase()) {
case FileTransfer.CONFLICT_RESOLUTION_REPLACE:
try {
transfer.deleteFile(flowFile, absoluteTargetDirPath, destinationFileName);
} catch (final PermissionDeniedException pde) {
// Per design: Completion Strategy failures should not prevent success. Log and skip move.
getLogger().warn("Permission denied deleting existing destination [{}] in [{}] for {}; skipping move but proceeding with success.",
destinationFileName, absoluteTargetDirPath, flowFile);
destinationFileName = null; // skip move on failure
} catch (final IOException ioe) {
getLogger().warn("I/O error deleting existing destination [{}] in [{}] for {}: {}. Skipping move but proceeding with success.",
destinationFileName, absoluteTargetDirPath, flowFile, ioe.toString(), ioe);
destinationFileName = null; // skip move on failure
}
break;
case FileTransfer.CONFLICT_RESOLUTION_RENAME:
final String uuid = UUID.randomUUID().toString();
destinationFileName = uuid + "." + destinationFileName;
getLogger().info("Generated filename [{}] to resolve conflict with initial filename [{}] for {}", destinationFileName, simpleFilename, flowFile);
break;
case FileTransfer.CONFLICT_RESOLUTION_IGNORE:
case FileTransfer.CONFLICT_RESOLUTION_NONE:
getLogger().info("Configured to {} on move conflict for {}. Original remote file will be left in place.", strategy, flowFile);
destinationFileName = null; // skip move
break;
default:
// Default branch handles unexpected/unknown conflict strategies. For safety, behave like IGNORE/NONE and skip the move.
getLogger().warn("Unrecognized Move Conflict Resolution strategy [{}]; behaving like IGNORE/NONE and leaving original remote file in place for {}", strategy, flowFile);
destinationFileName = null;
break;
}
}

if (destinationFileName != null) {
final String destinationPath = String.format("%s/%s", absoluteTargetDirPath, destinationFileName);
try {
transfer.rename(flowFile, filename, destinationPath);
} catch (final PermissionDeniedException pde) {
// Permission denied on rename should route to permission.denied to maintain legacy behavior and transparency
return routeWithCleanupReason(session, flowFile, baseAttributes, REL_PERMISSION_DENIED, false, transfer, false, transferQueue, host, port, "permission.denied.rename");
} catch (final FileNotFoundException fnfe) {
getLogger().info("Source file not found during remote move for {}. Proceeding with success since content has been fetched.", flowFile);
} catch (final IOException ioe) {
// Do not fail the FlowFile on completion action issues; log and proceed without moving
getLogger().warn("I/O error renaming remote file from [{}] to [{}] for {}: {}. Skipping move but proceeding with success.",
filename, destinationPath, flowFile, ioe.toString(), ioe);
}
}
} catch (final PermissionDeniedException pde) {
return routeWithCleanupReason(session, flowFile, baseAttributes, REL_PERMISSION_DENIED, false, transfer, false, transferQueue, host, port, "permission.denied.ensure-dir");
} catch (final IOException ioe) {
return routeWithCleanup(session, flowFile, baseAttributes, REL_COMMS_FAILURE, true, transfer, true, transferQueue, host, port);
}
} else if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
// DELETE will be performed post-commit in performCompletionStrategy to avoid data loss if session commit fails.
getLogger().debug("Completion Strategy DELETE will be executed post-commit for {}", flowFile);
}
} catch (final RuntimeException rte) {
// Safety net: do not swallow unexpected runtime exceptions; route to comms.failure to be safe
return routeWithCleanup(session, flowFile, baseAttributes, REL_COMMS_FAILURE, true, transfer, true, transferQueue, host, port);
}

// No routing occurred; continue to success
return false;
}

private void cleanupTransfer(final FileTransfer transfer, final boolean closeConnection, final BlockingQueue<FileTransferIdleWrapper> transferQueue, final String host, final int port) {
if (closeConnection) {
getLogger().debug("Closing FileTransfer...");
Expand All @@ -344,33 +462,33 @@ private void cleanupTransfer(final FileTransfer transfer, final boolean closeCon
private void performCompletionStrategy(final FileTransfer transfer, final ProcessContext context, final FlowFile flowFile, final String filename, final String host, final int port) {
final String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();
if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
// Perform DELETE post-commit to avoid data loss if the session commit fails. No routing is performed here.
try {
transfer.deleteFile(flowFile, null, filename);
} catch (final FileNotFoundException ignored) {
// file doesn't exist -- effectively the same as removing it. Move on.
// Already fetched content; treat as success
getLogger().debug("Source file not found during post-commit delete for {}. Nothing to delete.", flowFile);
} catch (final PermissionDeniedException pde) {
getLogger().warn("Post-commit delete failed due to insufficient permissions for {} on {}:{}; remote source remains. Error: {}", flowFile, host, port, pde.toString());
} catch (final IOException ioe) {
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}",
flowFile, host, port, filename, ioe, ioe);
getLogger().error("Post-commit delete failed for {} on {}:{} due to {}; remote source remains.", flowFile, host, port, ioe.toString(), ioe);
}
} else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
final String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
final String simpleFilename = StringUtils.substringAfterLast(filename, "/");

try {
final String absoluteTargetDirPath = transfer.getAbsolutePath(flowFile, targetDir);
if (context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean()) {
// Create the target directory if necessary.
transfer.ensureDirectoryExists(flowFile, new File(absoluteTargetDirPath));
}

final String destinationPath = String.format("%s/%s", absoluteTargetDirPath, simpleFilename);
transfer.rename(flowFile, filename, destinationPath);
} else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
// MOVE completion is handled pre-commit in onTrigger to allow routing on server-side rejection or communication failures.
// No remote-side move/delete is attempted here to avoid double-actions post-commit.
getLogger().debug("Completion Strategy MOVE already handled pre-commit for {}. No action taken in performCompletionStrategy.", flowFile);
}
}

} catch (final IOException ioe) {
getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}",
flowFile, host, port, filename, ioe, ioe);
}
private static String getSimpleFilename(final String path) {
if (path == null) {
return null;
}
final int slash = path.lastIndexOf('/');
final int backslash = path.lastIndexOf('\\');
final int idx = Math.max(slash, backslash);
return idx >= 0 ? path.substring(idx + 1) : path;
}


Expand Down Expand Up @@ -403,4 +521,47 @@ public long getLastUsed() {
return this.lastUsed;
}
}


/**
* Helper to route a FlowFile with provided relationship, set failure reason attribute, optionally penalize,
* record provenance route, perform cleanup of the FileTransfer, and return true to indicate routing occurred.
*/
private boolean routeWithCleanup(final ProcessSession session,
final FlowFile flowFile,
final Map<String, String> baseAttributes,
final Relationship relationship,
final boolean penalize,
final FileTransfer transfer,
final boolean closeConnection,
final BlockingQueue<FileTransferIdleWrapper> transferQueue,
final String host,
final int port) {
return routeWithCleanupReason(session, flowFile, baseAttributes, relationship, penalize, transfer, closeConnection, transferQueue, host, port, relationship.getName());
}

private boolean routeWithCleanupReason(final ProcessSession session,
final FlowFile flowFile,
final Map<String, String> baseAttributes,
final Relationship relationship,
final boolean penalize,
final FileTransfer transfer,
final boolean closeConnection,
final BlockingQueue<FileTransferIdleWrapper> transferQueue,
final String host,
final int port,
final String failureReason) {
final Map<String, String> attrs = new HashMap<>(baseAttributes);
attrs.put(FAILURE_REASON_ATTRIBUTE, failureReason);

FlowFile toTransfer = session.putAllAttributes(flowFile, attrs);
if (penalize) {
toTransfer = session.penalize(toTransfer);
}
session.transfer(toTransfer, relationship);
session.getProvenanceReporter().route(toTransfer, relationship);

cleanupTransfer(transfer, closeConnection, transferQueue, host, port);
return true;
}
}
Loading