Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,33 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
.required(false)
.build();
public static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder()
.name("Move Destination Directory")
.description(String.format("The directory on the remote server to move the original file to once it has been ingested into NiFi. "
+ "This property is ignored unless the %s is set to '%s'. The specified directory must already exist on "
+ "the remote system if '%s' is disabled, or the rename will fail.",
COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName()))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.name("Move Destination Directory")
.description("""
The directory on the remote server to move the original file to once it has been ingested into NiFi.
This property is ignored unless the %s is set to '%s'.
The specified directory must already exist on the remote system if '%s' is disabled, or the rename will fail.
""".formatted(
COMPLETION_STRATEGY.getDisplayName(),
COMPLETION_MOVE.getDisplayName(),
MOVE_CREATE_DIRECTORY.getDisplayName()))
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();

public static final PropertyDescriptor MOVE_CONFLICT_RESOLUTION = new PropertyDescriptor.Builder()
.name("Move Conflict Resolution")
.description(String.format("Determines how to handle filename collisions when '%s' is '%s'. " +
"This setting controls behavior when the target file exists in the %s.",
Comment on lines +124 to +125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A multiline string with """description""".formatted() can also work and make this a bit easier to read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description can be reformatted as mentioned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@exceptionfactory code already has been updated. i do not know why it's not showing here

COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_DESTINATION_DIR.getDisplayName()))
.required(true)
.dependsOn(COMPLETION_STRATEGY, COMPLETION_MOVE.getValue())
.allowableValues(FileTransfer.CONFLICT_RESOLUTION_REPLACE_ALLOWABLE,
FileTransfer.CONFLICT_RESOLUTION_IGNORE_ALLOWABLE,
FileTransfer.CONFLICT_RESOLUTION_RENAME_ALLOWABLE,
FileTransfer.CONFLICT_RESOLUTION_REJECT_ALLOWABLE,
FileTransfer.CONFLICT_RESOLUTION_FAIL_ALLOWABLE)
.defaultValue(FileTransfer.CONFLICT_RESOLUTION_IGNORE_ALLOWABLE)
.build();

public static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -147,7 +166,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
REMOTE_FILENAME,
COMPLETION_STRATEGY,
MOVE_DESTINATION_DIR,
MOVE_CREATE_DIRECTORY
MOVE_CREATE_DIRECTORY,
MOVE_CONFLICT_RESOLUTION
);

private static final Set<Relationship> RELATIONSHIPS = Set.of(
Expand Down Expand Up @@ -363,7 +383,40 @@ private void performCompletionStrategy(final FileTransfer transfer, final Proces
transfer.ensureDirectoryExists(flowFile, new File(absoluteTargetDirPath));
}

final String destinationPath = String.format("%s/%s", absoluteTargetDirPath, simpleFilename);
String destinationFileName = simpleFilename;
final FileInfo remoteFileInfo = transfer.getRemoteFileInfo(flowFile, absoluteTargetDirPath, destinationFileName);
if (remoteFileInfo != null) {
final String strategy = context.getProperty(MOVE_CONFLICT_RESOLUTION).getValue();
switch (strategy.toUpperCase()) {
case FileTransfer.CONFLICT_RESOLUTION_REPLACE:
try {
transfer.deleteFile(flowFile, absoluteTargetDirPath, destinationFileName);
} catch (final IOException deleteEx) {
getLogger().warn("Failed to delete existing destination file {} on {}:{}",
destinationFileName, host, port, deleteEx);
}
break;
case FileTransfer.CONFLICT_RESOLUTION_RENAME:

destinationFileName = FileTransferConflictUtil.generateUniqueFilename(transfer, absoluteTargetDirPath, destinationFileName, flowFile, getLogger());
getLogger().info("Generated filename [{}] to resolve conflict with initial filename [{}] for {}", destinationFileName, simpleFilename, flowFile);
break;
case FileTransfer.CONFLICT_RESOLUTION_IGNORE:
getLogger().debug("Ignored conflicting destination filename [{}] for {}", destinationFilename, flowFile);
return;
case FileTransfer.CONFLICT_RESOLUTION_REJECT:
case FileTransfer.CONFLICT_RESOLUTION_FAIL:
getLogger().warn("Configured to {} on move conflict for {}. Original remote file will be left in place.", strategy, flowFile);
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to behave the same as the ignore strategy, does it fail elsewhere during processing despite the return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@exceptionfactory i have updated the code please have a look

case FileTransfer.CONFLICT_RESOLUTION_NONE:
default:
// Treat as IGNORE for move
getLogger().info("Ignoring conflicting destination filename [{}] for {}", destinationFilename, flowFile);
return;
}
}

final String destinationPath = String.format("%s/%s", absoluteTargetDirPath, destinationFileName);
transfer.rename(flowFile, filename, destinationPath);

} catch (final IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processor.util.file.transfer;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;

import java.io.IOException;
import java.util.UUID;

public final class FileTransferConflictUtil {
private FileTransferConflictUtil() {
}

/**
* Generates a unique filename by using a UUID prefix.
* This approach virtually eliminates the possibility of name collisions without requiring multiple remote file checks.
*/
public static String generateUniqueFilename(final FileTransfer transfer,
final String path,
final String baseFileName,
final FlowFile flowFile,
final ComponentLog logger) throws IOException {
String uuid = UUID.randomUUID().toString();
String uniqueFilename = uuid + "." + baseFileName;

logger.info("Attempting to resolve filename conflict for {} on the remote server by using a newly generated filename of: {}", flowFile, uniqueFilename);
return uniqueFilename;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class FetchFTP extends FetchFileTransfer {
COMPLETION_STRATEGY,
MOVE_DESTINATION_DIR,
MOVE_CREATE_DIRECTORY,
MOVE_CONFLICT_RESOLUTION,
FTPTransfer.CONNECTION_TIMEOUT,
FTPTransfer.DATA_TIMEOUT,
FTPTransfer.USE_COMPRESSION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class FetchSFTP extends FetchFileTransfer {
COMPLETION_STRATEGY,
MOVE_DESTINATION_DIR,
MOVE_CREATE_DIRECTORY,
MOVE_CONFLICT_RESOLUTION,
DISABLE_DIRECTORY_LISTING,
SFTPTransfer.CONNECTION_TIMEOUT,
SFTPTransfer.DATA_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.nifi.processor.util.file.transfer.FetchFileTransfer;
import org.apache.nifi.processors.standard.util.FTPTransfer;
import org.apache.nifi.processor.util.file.transfer.FileTransfer;
import org.apache.nifi.processor.util.file.transfer.FileInfo;
import org.apache.nifi.processor.util.file.transfer.PermissionDeniedException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
Expand Down Expand Up @@ -71,7 +72,7 @@ public void setUp() throws Exception {

MockProcessContext ctx = (MockProcessContext) runner.getProcessContext();
setDefaultValues(ctx, FTPTransfer.BUFFER_SIZE, FTPTransfer.DATA_TIMEOUT, FTPTransfer.CONNECTION_TIMEOUT,
FTPTransfer.CONNECTION_MODE, FTPTransfer.TRANSFER_MODE);
FTPTransfer.CONNECTION_MODE, FTPTransfer.TRANSFER_MODE);
ctx.setProperty(FTPTransfer.USERNAME, "foo");
ctx.setProperty(FTPTransfer.PASSWORD, "bar");
}
Expand Down Expand Up @@ -241,6 +242,24 @@ public void testRenameFails() {
assertTrue(proc.fileContents.containsKey("hello.txt"));
}

@Test
public void testMoveConflictReplace() {
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
runner.setProperty(FetchFileTransfer.MOVE_DESTINATION_DIR, "/moved/");
runner.setProperty(FetchFileTransfer.MOVE_CONFLICT_RESOLUTION, FileTransfer.CONFLICT_RESOLUTION_REPLACE);

// Destination exists
proc.addContent("/moved/hello.txt", "old".getBytes());
addFileAndEnqueue("hello.txt");

runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_SUCCESS, 1);

assertFalse(proc.fileContents.containsKey("hello.txt"));
assertTrue(proc.fileContents.containsKey("/moved/hello.txt"));
}


@Test
public void testCreateDirFails() {
runner.setProperty(FetchFileTransfer.COMPLETION_STRATEGY, FetchFileTransfer.COMPLETION_MOVE.getValue());
Expand Down Expand Up @@ -319,11 +338,19 @@ public void deleteFile(FlowFile flowFile, String path, String remoteFileName) th
throw new PermissionDeniedException("test permission denied");
}

if (!fileContents.containsKey(remoteFileName)) {
String key;
if (path == null) {
key = remoteFileName;
} else {
key = (path.endsWith("/") ? path.substring(0, path.length() - 1) : path) + "/" + remoteFileName;
}
key = key.replaceAll("/+", "/");

if (!fileContents.containsKey(key)) {
throw new FileNotFoundException();
}

fileContents.remove(remoteFileName);
fileContents.remove(key);
}

@Override
Expand All @@ -332,12 +359,15 @@ public void rename(FlowFile flowFile, String source, String target) throws IOExc
throw new PermissionDeniedException("test permission denied");
}

if (!fileContents.containsKey(source)) {
final String normalizedSource = source.replaceAll("/+", "/");
final String normalizedTarget = target.replaceAll("/+", "/");

if (!fileContents.containsKey(normalizedSource)) {
throw new FileNotFoundException();
}

final byte[] content = fileContents.remove(source);
fileContents.put(target, content);
final byte[] content = fileContents.remove(normalizedSource);
fileContents.put(normalizedTarget, content);
}

@Override
Expand All @@ -347,11 +377,49 @@ public void ensureDirectoryExists(FlowFile flowFile, File remoteDirectory) throw
}
}

@Override
public String getAbsolutePath(FlowFile flowFile, String remotePath) throws IOException {
final String abs;
if (!remotePath.startsWith("/") && !remotePath.startsWith("\\")) {
abs = new File(getHomeDirectory(flowFile), remotePath).getPath();
} else {
abs = remotePath;
}
String norm = abs.replace("\\", "/");
norm = norm.replaceAll("/+", "/");
if (norm.endsWith("/") && norm.length() > 1) {
norm = norm.substring(0, norm.length() - 1);
}
return norm;
}

@Override
public void close() throws IOException {
super.close();
isClosed = true;
}

@Override
public String getHomeDirectory(FlowFile flowFile) throws IOException {
return "/";
}

@Override
public FileInfo getRemoteFileInfo(FlowFile flowFile, String path, String remoteFileName) throws IOException {
final String dir = path == null ? "/" : path;
String key = (dir.endsWith("/") ? dir.substring(0, dir.length() - 1) : dir) + "/" + remoteFileName;
key = key.replaceAll("/+", "/");
final byte[] content = fileContents.get(key);
if (content == null) {
return null;
}
return new FileInfo.Builder()
.filename(remoteFileName)
.fullPathFileName(key)
.directory(false)
.size(content.length)
.build();
}
};
}
}
Expand Down
Loading