Skip to content

Instantly share code, notes, and snippets.

@pirhoo
Last active September 19, 2024 21:11
Show Gist options
  • Save pirhoo/f919f6f2dd19ba0766ddfacc2b0a16ce to your computer and use it in GitHub Desktop.
Save pirhoo/f919f6f2dd19ba0766ddfacc2b0a16ce to your computer and use it in GitHub Desktop.
Python script that will help you clean up duplicate folders with names like foldername, foldername (1), foldername (2), etc. The script scans the specified directory, groups folders with the same base name, and presents a summary of operations before asking you to confirm actions for each group.
#!/usr/bin/env python3
import os
import re
import argparse
import shutil
import logging
from logging import handlers
from datetime import datetime
from pathlib import Path
import sqlite3
import sys
import json
from collections import defaultdict
class JsonFormatter(logging.Formatter):
"""Custom logging formatter to output JSON-formatted logs."""
def format(self, record):
log_record = {
"timestamp": self.formatTime(record, self.datefmt),
"level": record.levelname,
"message": record.msg, # Expected to be a dict
"function": record.funcName,
"line": record.lineno,
}
if record.exc_info:
log_record["exception"] = self.formatException(record.exc_info)
return json.dumps(log_record)
def setup_logging():
"""Sets up JSON logging to a file with the current timestamp."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_filename = f"clean_duplicate_folders.{timestamp}.log"
handler = handlers.RotatingFileHandler(log_filename, maxBytes=10485760, backupCount=5)
handler.setFormatter(JsonFormatter())
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(handler)
def parse_arguments():
parser = argparse.ArgumentParser(description='Clean duplicate sibling folders with "(n)" suffixes.')
parser.add_argument('directory', nargs='?', default='.', help='Directory to start from (default: current directory)')
parser.add_argument('-r', '--recursive', action='store_true', help='Recursively process subdirectories')
parser.add_argument('-n', '--dry-run', action='store_true', help='Perform a dry run (do not make any changes)')
parser.add_argument('-l', '--level', type=int, default=None, help='Maximum depth level for recursion (default: unlimited)')
parser.add_argument('-c', '--default-choice', type=int, choices=[1, 2, 3],
help='Default choice to apply to all groups (1: delete duplicates, 2: merge and delete duplicates, 3: skip)')
args = parser.parse_args()
return args
def log_configuration(args):
"""Logs the configuration used to run the script."""
config = {
"action": "configuration",
"directory": args.directory,
"recursive": args.recursive,
"dry_run": args.dry_run,
"level": args.level,
"default_choice": args.default_choice,
}
logging.info(config)
def initialize_database(db_path='filesystem_index.db'):
"""Initializes the SQLite database and creates tables if they don't exist."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS directories (
id INTEGER PRIMARY KEY,
path TEXT UNIQUE,
parent_path TEXT,
mtime REAL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY,
directory_path TEXT,
name TEXT,
size INTEGER,
mtime REAL
)
''')
conn.commit()
return conn
def prompt_use_existing_index():
"""Prompts the user to decide whether to use the existing index."""
while True:
choice = input("An index file was found. Do you want to use the existing index? (Y/n): ").strip().lower()
if choice in {'y', 'yes', ''}:
return True
elif choice in {'n', 'no'}:
return False
print("Invalid input. Please enter 'Y' or 'n'.")
def collect_directories(conn, directory, recursive, level=None):
"""Scans the filesystem and populates the database with directory and file information."""
dir_count = 0 # Initialize the directory counter
file_count = 0 # Initialize the file counter
cursor = conn.cursor()
cursor.execute('DELETE FROM directories')
cursor.execute('DELETE FROM files')
conn.commit()
def scan_dir(current_dir, current_level):
nonlocal dir_count, file_count
try:
with os.scandir(current_dir) as entries:
dir_entries = []
file_entries = []
for entry in entries:
full_path = Path(entry.path)
if entry.is_dir(follow_symlinks=False):
mtime = entry.stat(follow_symlinks=False).st_mtime
dir_entries.append((str(full_path), str(current_dir), mtime))
dir_count += 1
# Update live output
formatted_dir_count = f"{dir_count:,}"
formatted_file_count = f"{file_count:,}"
print(f"Scanning directories: {formatted_dir_count}, files: {formatted_file_count}", end='\r', flush=True)
if recursive and (level is None or current_level < level):
scan_dir(full_path, current_level + 1)
elif entry.is_file(follow_symlinks=False):
mtime = entry.stat(follow_symlinks=False).st_mtime
size = entry.stat(follow_symlinks=False).st_size
file_entries.append((str(current_dir), entry.name, size, mtime))
file_count += 1
# Update live output
formatted_dir_count = f"{dir_count:,}"
formatted_file_count = f"{file_count:,}"
print(f"Scanning directories: {formatted_dir_count}, files: {formatted_file_count}", end='\r', flush=True)
# Insert directory and file entries in batches
if dir_entries:
cursor.executemany('INSERT OR IGNORE INTO directories (path, parent_path, mtime) VALUES (?, ?, ?)', dir_entries)
if file_entries:
cursor.executemany('INSERT OR IGNORE INTO files (directory_path, name, size, mtime) VALUES (?, ?, ?, ?)', file_entries)
conn.commit()
except PermissionError as e:
print(f"Permission denied: {current_dir}")
logging.error({
"action": "scan_error",
"directory": str(current_dir),
"error": str(e)
})
scan_dir(Path(directory), 1)
print() # Move to the next line after scanning is complete
# Log the total number of directories and files scanned
logging.info({
"action": "scanning_complete",
"total_directories_scanned": dir_count,
"total_files_scanned": file_count
})
def load_directories_from_index(conn):
"""Loads directory paths from the database."""
cursor = conn.cursor()
cursor.execute('SELECT path FROM directories')
directories = [Path(row[0]) for row in cursor.fetchall()]
return directories
def group_directories(conn):
"""Groups duplicate sibling directories based on their base names and parent directories."""
cursor = conn.cursor()
pattern = re.compile(r'^(.*?)(?: \((\d+)\))?$')
cursor.execute('SELECT path, parent_path FROM directories')
groups = defaultdict(list)
for path_str, parent_path_str in cursor.fetchall():
dir_path = Path(path_str)
parent_dir = Path(parent_path_str)
dir_name = dir_path.name
match = pattern.match(dir_name)
if match:
base_name = match.group(1)
group_key = (str(parent_dir), base_name)
groups[group_key].append(dir_path)
# Only keep groups with more than one directory
duplicate_groups = {k: v for k, v in groups.items() if len(v) > 1}
return duplicate_groups
def get_directory_size(conn, dir_path):
"""Calculates the total size and number of files in a directory using the database."""
cursor = conn.cursor()
cursor.execute('SELECT size FROM files WHERE directory_path = ?', (str(dir_path),))
sizes = cursor.fetchall()
total_size = sum(size[0] for size in sizes)
num_files = len(sizes)
return total_size, num_files
def summarize_group(group_key, dir_paths, conn):
parent_dir, base_name = group_key
print(f"\nFound duplicate directories in '{parent_dir}': '{base_name}'")
for dir_path in sorted(dir_paths):
size, num_files = get_directory_size(conn, dir_path)
formatted_size = f"{size:,}"
formatted_num_files = f"{num_files:,}"
print(f" - {dir_path.name} : {formatted_num_files} files, {formatted_size} bytes")
def prompt_user_action(default_choice=None):
if default_choice:
print(f"\nApplying default choice: {default_choice}")
return str(default_choice)
print("\nSelect an action:")
print("1) Delete duplicate folders (keep only the base folder)")
print("2) Merge contents into base folder, then delete duplicates")
print("3) Skip (do nothing)")
while True:
choice = input("Enter your choice (1/2/3): ").strip()
if choice in {'1', '2', '3'}:
return choice
else:
print("Invalid input. Please enter 1, 2, or 3.")
def delete_duplicates(base_dir, duplicate_dirs, dry_run):
for dup_dir in duplicate_dirs:
if dry_run:
print(f"Dry run: would delete {dup_dir}")
logging.info({
"action": "delete",
"status": "dry_run",
"directory": str(dup_dir)
})
else:
try:
print(f"Deleting {dup_dir}")
shutil.rmtree(dup_dir)
logging.info({
"action": "delete",
"status": "success",
"directory": str(dup_dir)
})
except Exception as e:
logging.error({
"action": "delete",
"status": "error",
"directory": str(dup_dir),
"error": str(e)
})
def merge_contents(base_dir, duplicate_dirs, dry_run):
for dup_dir in duplicate_dirs:
for item in os.listdir(dup_dir):
src = dup_dir / item
dst = base_dir / item
if dst.exists():
print(f"Conflict: {dst} already exists.")
print(f"Skipping {src}")
logging.info({
"action": "merge",
"status": "conflict",
"source": str(src),
"destination": str(dst)
})
else:
if dry_run:
print(f"Dry run: would move {src} to {dst}")
logging.info({
"action": "move",
"status": "dry_run",
"source": str(src),
"destination": str(dst)
})
else:
try:
print(f"Moving {src} to {dst}")
shutil.move(str(src), str(dst))
logging.info({
"action": "move",
"status": "success",
"source": str(src),
"destination": str(dst)
})
except Exception as e:
print(f"Error moving {src} to {dst}: {e}")
logging.error({
"action": "move",
"status": "error",
"source": str(src),
"destination": str(dst),
"error": str(e)
})
# Delete the duplicate directory
if dry_run:
print(f"Dry run: would delete {dup_dir}")
logging.info({
"action": "delete",
"status": "dry_run",
"directory": str(dup_dir)
})
else:
try:
print(f"Deleting {dup_dir}")
shutil.rmtree(dup_dir)
logging.info({
"action": "delete",
"status": "success",
"directory": str(dup_dir)
})
except Exception as e:
print(f"Error deleting {dup_dir}: {e}")
logging.error({
"action": "delete",
"status": "error",
"directory": str(dup_dir),
"error": str(e)
})
def process_group(group_key, dir_paths, dry_run, default_choice=None):
parent_dir, base_name = group_key
# Separate base directory and duplicates
base_dir = None
suffix_pattern = re.compile(r'.* \(\d+\)$')
for dir_path in dir_paths:
if not suffix_pattern.match(dir_path.name):
base_dir = dir_path
break
if base_dir is None:
# No base directory without suffix, pick the one with the lowest suffix number
def get_suffix_num(dir_name):
match = re.match(r'.* \((\d+)\)$', dir_name)
return int(match.group(1)) if match else float('inf')
base_dir = min(dir_paths, key=lambda d: get_suffix_num(d.name))
duplicate_dirs = [d for d in dir_paths if d != base_dir]
summarize_group(group_key, dir_paths, conn)
logging.info({
"action": "found_duplicate_group",
"parent_directory": parent_dir,
"base_name": base_name,
"directories": [str(d) for d in dir_paths],
"base_directory": str(base_dir),
"duplicate_directories": [str(d) for d in duplicate_dirs]
})
action = prompt_user_action(default_choice)
if action == '1':
logging.info({
"action": "process_group",
"method": "delete_duplicates",
"group": f"{parent_dir}/{base_name}"
})
delete_duplicates(base_dir, duplicate_dirs, dry_run)
elif action == '2':
logging.info({
"action": "process_group",
"method": "merge_contents",
"group": f"{parent_dir}/{base_name}"
})
merge_contents(base_dir, duplicate_dirs, dry_run)
elif action == '3':
print("Skipping this group.")
logging.info({
"action": "process_group",
"method": "skip",
"group": f"{parent_dir}/{base_name}"
})
def main():
global conn # Make conn accessible in process_group
setup_logging()
args = parse_arguments()
log_configuration(args)
db_path = 'filesystem_index.db'
index_exists = os.path.exists(db_path)
conn = initialize_database(db_path)
if index_exists:
use_existing = prompt_use_existing_index()
if not use_existing:
print("Rescanning the filesystem and rebuilding the index...")
collect_directories(conn, args.directory, args.recursive, args.level)
else:
print(f"Using existing index from {db_path}")
logging.info({"action": "using_existing_index"})
else:
print("No index file found. Scanning the filesystem...")
collect_directories(conn, args.directory, args.recursive, args.level)
directories = load_directories_from_index(conn)
total_directories = len(directories)
print(f"Total directories indexed: {total_directories:,}")
logging.info({
"action": "directories_indexed",
"total_directories": total_directories
})
groups = group_directories(conn)
if not groups:
print("No duplicate directories found.")
logging.info({"action": "no_duplicates_found"})
return
for group_key, dir_paths in groups.items():
process_group(group_key, dir_paths, args.dry_run, args.default_choice)
logging.info({"action": "script_complete"})
conn.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment