-
Notifications
You must be signed in to change notification settings - Fork 2.9k
NIFI-15077 : Added conflict resolution strategies for file move operations in FetchFileTransfer #10405
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
NIFI-15077 : Added conflict resolution strategies for file move operations in FetchFileTransfer #10405
Changes from 9 commits
b92bdda
abe0f06
7b7c898
5bcaf61
1350efb
f73dfa9
e99ee17
66fb517
66d7dd0
88fb177
ba2a476
27b584f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -106,14 +106,33 @@ public abstract class FetchFileTransfer extends AbstractProcessor { | |
| .required(false) | ||
| .build(); | ||
| public static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder() | ||
| .name("Move Destination Directory") | ||
| .description(String.format("The directory on the remote server to move the original file to once it has been ingested into NiFi. " | ||
| + "This property is ignored unless the %s is set to '%s'. The specified directory must already exist on " | ||
| + "the remote system if '%s' is disabled, or the rename will fail.", | ||
| COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName())) | ||
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) | ||
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | ||
| .required(false) | ||
| .name("Move Destination Directory") | ||
| .description(""" | ||
| The directory on the remote server to move the original file to once it has been ingested into NiFi. | ||
| This property is ignored unless the %s is set to '%s'. | ||
| The specified directory must already exist on the remote system if '%s' is disabled, or the rename will fail. | ||
| """.formatted( | ||
| COMPLETION_STRATEGY.getDisplayName(), | ||
| COMPLETION_MOVE.getDisplayName(), | ||
| MOVE_CREATE_DIRECTORY.getDisplayName())) | ||
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) | ||
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | ||
| .required(false) | ||
| .build(); | ||
|
|
||
| public static final PropertyDescriptor MOVE_CONFLICT_RESOLUTION = new PropertyDescriptor.Builder() | ||
| .name("Move Conflict Resolution") | ||
| .description(String.format("Determines how to handle filename collisions when '%s' is '%s'. " + | ||
| "This setting controls behavior when the target file exists in the %s.", | ||
| COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_DESTINATION_DIR.getDisplayName())) | ||
| .required(true) | ||
| .dependsOn(COMPLETION_STRATEGY, COMPLETION_MOVE.getValue()) | ||
| .allowableValues(FileTransfer.CONFLICT_RESOLUTION_REPLACE_ALLOWABLE, | ||
| FileTransfer.CONFLICT_RESOLUTION_IGNORE_ALLOWABLE, | ||
| FileTransfer.CONFLICT_RESOLUTION_RENAME_ALLOWABLE, | ||
| FileTransfer.CONFLICT_RESOLUTION_REJECT_ALLOWABLE, | ||
| FileTransfer.CONFLICT_RESOLUTION_FAIL_ALLOWABLE) | ||
| .defaultValue(FileTransfer.CONFLICT_RESOLUTION_IGNORE_ALLOWABLE) | ||
| .build(); | ||
|
|
||
| public static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder() | ||
|
|
@@ -147,7 +166,8 @@ public abstract class FetchFileTransfer extends AbstractProcessor { | |
| REMOTE_FILENAME, | ||
| COMPLETION_STRATEGY, | ||
| MOVE_DESTINATION_DIR, | ||
| MOVE_CREATE_DIRECTORY | ||
| MOVE_CREATE_DIRECTORY, | ||
| MOVE_CONFLICT_RESOLUTION | ||
| ); | ||
|
|
||
| private static final Set<Relationship> RELATIONSHIPS = Set.of( | ||
|
|
@@ -363,7 +383,40 @@ private void performCompletionStrategy(final FileTransfer transfer, final Proces | |
| transfer.ensureDirectoryExists(flowFile, new File(absoluteTargetDirPath)); | ||
| } | ||
|
|
||
| final String destinationPath = String.format("%s/%s", absoluteTargetDirPath, simpleFilename); | ||
| String destinationFileName = simpleFilename; | ||
| final FileInfo remoteFileInfo = transfer.getRemoteFileInfo(flowFile, absoluteTargetDirPath, destinationFileName); | ||
| if (remoteFileInfo != null) { | ||
| final String strategy = context.getProperty(MOVE_CONFLICT_RESOLUTION).getValue(); | ||
| switch (strategy.toUpperCase()) { | ||
| case FileTransfer.CONFLICT_RESOLUTION_REPLACE: | ||
| try { | ||
| transfer.deleteFile(flowFile, absoluteTargetDirPath, destinationFileName); | ||
| } catch (final IOException deleteEx) { | ||
| getLogger().warn("Failed to delete existing destination file {} on {}:{}", | ||
| destinationFileName, host, port, deleteEx); | ||
| } | ||
| break; | ||
| case FileTransfer.CONFLICT_RESOLUTION_RENAME: | ||
|
|
||
| destinationFileName = FileTransferConflictUtil.generateUniqueFilename(transfer, absoluteTargetDirPath, destinationFileName, flowFile, getLogger()); | ||
| getLogger().info("Generated filename [{}] to resolve conflict with initial filename [{}] for {}", destinationFileName, simpleFilename, flowFile); | ||
| break; | ||
| case FileTransfer.CONFLICT_RESOLUTION_IGNORE: | ||
| getLogger().debug("Ignored conflicting destination filename [{}] for {}", destinationFilename, flowFile); | ||
| return; | ||
| case FileTransfer.CONFLICT_RESOLUTION_REJECT: | ||
| case FileTransfer.CONFLICT_RESOLUTION_FAIL: | ||
| getLogger().warn("Configured to {} on move conflict for {}. Original remote file will be left in place.", strategy, flowFile); | ||
| return; | ||
|
||
| case FileTransfer.CONFLICT_RESOLUTION_NONE: | ||
| default: | ||
| // Treat as IGNORE for move | ||
| getLogger().info("Ignoring conflicting destination filename [{}] for {}", destinationFilename, flowFile); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| final String destinationPath = String.format("%s/%s", absoluteTargetDirPath, destinationFileName); | ||
| transfer.rename(flowFile, filename, destinationPath); | ||
|
|
||
| } catch (final IOException ioe) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.nifi.processor.util.file.transfer; | ||
|
|
||
| import org.apache.nifi.flowfile.FlowFile; | ||
| import org.apache.nifi.logging.ComponentLog; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.UUID; | ||
|
|
||
| public final class FileTransferConflictUtil { | ||
| private FileTransferConflictUtil() { | ||
| } | ||
|
|
||
| /** | ||
| * Generates a unique filename by using a UUID prefix. | ||
| * This approach virtually eliminates the possibility of name collisions without requiring multiple remote file checks. | ||
| */ | ||
| public static String generateUniqueFilename(final FileTransfer transfer, | ||
ravinarayansingh marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| final String path, | ||
| final String baseFileName, | ||
| final FlowFile flowFile, | ||
| final ComponentLog logger) throws IOException { | ||
| String uuid = UUID.randomUUID().toString(); | ||
| String uniqueFilename = uuid + "." + baseFileName; | ||
|
|
||
| logger.info("Attempting to resolve filename conflict for {} on the remote server by using a newly generated filename of: {}", flowFile, uniqueFilename); | ||
| return uniqueFilename; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A multiline string with
"""description""".formatted()can also work and make this a bit easier to read.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This description can be reformatted as mentioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@exceptionfactory code already has been updated. i do not know why it's not showing here