Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 115 additions & 39 deletions relink.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@
import pwd
import argparse
import logging
import time

DEFAULT_SOURCE_ROOT = '/glade/campaign/cesm/cesmdata/cseg/inputdata/'
DEFAULT_TARGET_ROOT = '/glade/campaign/collections/gdex/data/d651077/cesmdata/inputdata/'
DEFAULT_SOURCE_ROOT = "/glade/campaign/cesm/cesmdata/cseg/inputdata/"
DEFAULT_TARGET_ROOT = (
"/glade/campaign/collections/gdex/data/d651077/cesmdata/inputdata/"
)

# Set up logger
logger = logging.getLogger(__name__)

def find_and_replace_owned_files(source_dir, target_dir, username):

def find_and_replace_owned_files(source_dir, target_dir, username, dry_run=False):
"""
Finds files owned by a specific user in a source directory tree,
deletes them, and replaces them with symbolic links to the same
Expand All @@ -26,6 +30,7 @@ def find_and_replace_owned_files(source_dir, target_dir, username):
source_dir (str): The root of the directory tree to search for files.
target_dir (str): The root of the directory tree containing the new files.
username (str): The name of the user whose files will be processed.
dry_run (bool): If True, only show what would be done without making changes.
"""
source_dir = os.path.abspath(source_dir)
target_dir = os.path.abspath(target_dir)
Expand All @@ -37,11 +42,14 @@ def find_and_replace_owned_files(source_dir, target_dir, username):
logger.error("Error: User '%s' not found. Exiting.", username)
return

if dry_run:
logger.info("DRY RUN MODE - No changes will be made")

logger.info(
"Searching for files owned by '%s' (UID: %s) in '%s'...",
username,
user_uid,
source_dir
source_dir,
)

for dirpath, _, filenames in os.walk(source_dir):
Expand All @@ -56,7 +64,7 @@ def find_and_replace_owned_files(source_dir, target_dir, username):

file_uid = os.stat(file_path).st_uid
except FileNotFoundError:
continue # Skip if file was deleted during traversal
continue # Skip if file was deleted during traversal

if file_uid == user_uid:
logger.info("Found owned file: %s", file_path)
Expand All @@ -71,16 +79,24 @@ def find_and_replace_owned_files(source_dir, target_dir, username):
"Warning: Corresponding file not found in '%s' "
"for '%s'. Skipping.",
target_dir,
file_path
file_path,
)
continue

# Get the link name
link_name = file_path

if dry_run:
logger.info(
"[DRY RUN] Would create symbolic link: %s -> %s",
link_name,
link_target,
)
continue

# Remove the original file
try:
os.rename(link_name, link_name+".tmp")
os.rename(link_name, link_name + ".tmp")
logger.info("Deleted original file: %s", link_name)
except OSError as e:
logger.error("Error deleting file %s: %s. Skipping.", link_name, e)
Expand All @@ -91,11 +107,36 @@ def find_and_replace_owned_files(source_dir, target_dir, username):
# Create parent directories for the link if they don't exist
os.makedirs(os.path.dirname(link_name), exist_ok=True)
os.symlink(link_target, link_name)
os.remove(link_name+".tmp")
logger.info("Created symbolic link: %s -> %s", link_name, link_target)
os.remove(link_name + ".tmp")
logger.info(
"Created symbolic link: %s -> %s", link_name, link_target
)
except OSError as e:
os.rename(link_name+".tmp", link_name)
logger.error("Error creating symlink for %s: %s. Skipping.", link_name, e)
os.rename(link_name + ".tmp", link_name)
logger.error(
"Error creating symlink for %s: %s. Skipping.", link_name, e
)


def validate_directory(path):
"""
Validate that the path exists and is a directory.

Args:
path (str): The path to validate.

Returns:
str: The absolute path if valid.

Raises:
argparse.ArgumentTypeError: If path doesn't exist or is not a directory.
"""
if not os.path.exists(path):
raise argparse.ArgumentTypeError(f"Directory '{path}' does not exist")
if not os.path.isdir(path):
raise argparse.ArgumentTypeError(f"'{path}' is not a directory")
return os.path.abspath(path)


def parse_arguments():
"""
Expand All @@ -107,59 +148,94 @@ def parse_arguments():
"""
parser = argparse.ArgumentParser(
description=(
'Find files owned by a user and replace them with symbolic links to a target directory.'
"Find files owned by a user and replace them with symbolic links to a target directory."
)
)
parser.add_argument(
'--source-root',
"--source-root",
type=validate_directory,
default=DEFAULT_SOURCE_ROOT,
help=(
f'The root of the directory tree to search for files (default: {DEFAULT_SOURCE_ROOT})'
)
f"The root of the directory tree to search for files (default: {DEFAULT_SOURCE_ROOT})"
),
)
parser.add_argument(
'--target-root',
"--target-root",
type=validate_directory,
default=DEFAULT_TARGET_ROOT,
help=(
f'The root of the directory tree where files should be moved to '
f'(default: {DEFAULT_TARGET_ROOT})'
)
f"The root of the directory tree where files should be moved to "
f"(default: {DEFAULT_TARGET_ROOT})"
),
)

# Verbosity options (mutually exclusive)
verbosity_group = parser.add_mutually_exclusive_group()
verbosity_group.add_argument(
'-v', '--verbose',
action='store_true',
help='Enable verbose output'
"-v", "--verbose", action="store_true", help="Enable verbose output"
)
verbosity_group.add_argument(
'-q', '--quiet',
action='store_true',
help='Quiet mode (show only warnings and errors)'
"-q",
"--quiet",
action="store_true",
help="Quiet mode (show only warnings and errors)",
)

parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be done without making any changes",
)
parser.add_argument(
"--timing",
action="store_true",
help="Measure and display the execution time",
)

return parser.parse_args()
args = parser.parse_args()

if __name__ == '__main__':
process_args(args)

return args

args = parse_arguments()

def process_args(args):
"""
Process parsed arguments and set derived attributes.

Sets the log_level attribute on args based on verbosity flags.

Args:
args (argparse.Namespace): Parsed command-line arguments.
"""
# Configure logging based on verbosity flags
if args.quiet:
LOG_LEVEL = logging.WARNING
args.log_level = logging.WARNING
elif args.verbose:
LOG_LEVEL = logging.DEBUG
args.log_level = logging.DEBUG
else:
LOG_LEVEL = logging.INFO
args.log_level = logging.INFO

logging.basicConfig(
level=LOG_LEVEL,
format='%(message)s',
stream=sys.stdout
)

my_username = os.environ['USER']

def main():

args = parse_arguments()

logging.basicConfig(level=args.log_level, format="%(message)s", stream=sys.stdout)

my_username = os.environ["USER"]

start_time = time.time()

# --- Execution ---
find_and_replace_owned_files(args.source_root, args.target_root, my_username)
find_and_replace_owned_files(
args.source_root, args.target_root, my_username, dry_run=args.dry_run
)

if args.timing:
elapsed_time = time.time() - start_time
logger.info("Execution time: %.2f seconds", elapsed_time)


if __name__ == "__main__":
main()
Loading