Skip to content

Instantly share code, notes, and snippets.

@pirhoo
Last active August 27, 2024 15:41
Show Gist options
  • Save pirhoo/b711177acc69ced6355e06553e66be3b to your computer and use it in GitHub Desktop.
Save pirhoo/b711177acc69ced6355e06553e66be3b to your computer and use it in GitHub Desktop.
A simple python script to extract all archives in the current directory.
#!/usr/bin/env python3
import os
import shutil
import argparse
import subprocess
from pathlib import Path
import multiprocessing
import logging
from logging import handlers
from datetime import datetime
from contextlib import redirect_stderr, redirect_stdout
import json
import time
# Constants for supported archive extensions
SUPPORTED_EXTENSIONS = {"7z", "zip", "tar", "gz", "xz", "rar"}
# ANSI color codes for terminal output
GREEN = "\033[32m"
MAGENTA = "\033[35m"
GRAY = "\033[90m"
RESET = "\033[0m"
BOLD = "\033[1m"
class JsonFormatter(logging.Formatter):
"""Custom logging formatter to output JSON-formatted logs."""
def format(self, record):
# Directly format the log record as a JSON object
log_record = {
"timestamp": self.formatTime(record, self.datefmt),
"level": record.levelname,
"message": record.msg, # Log record is expected to be a JSON-serializable dict
"function": record.funcName,
"line": record.lineno,
}
if record.exc_info:
log_record["exception"] = self.formatException(record.exc_info)
return json.dumps(log_record)
def setup_logging() -> None:
"""Sets up JSON logging to a file with the current timestamp."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_filename = f"archive.{timestamp}.log"
handler = handlers.RotatingFileHandler(
log_filename, maxBytes=10485760, backupCount=5
)
handler.setFormatter(JsonFormatter())
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(handler)
def log_configuration(args) -> None:
"""Logs the configuration used to run the script."""
config = {
"action": "configuration",
"directory": args.directory,
"min_size_mb": args.size,
"confirm_extract": args.confirm_extract,
"confirm_remove": args.confirm_remove,
"skip_remove": args.skip_remove,
"parallel_jobs": args.jobs,
"max_depth": args.level,
"progress_bar": args.progress_bar,
"throttle": args.throttle,
}
logging.info(config) # Pass the config dictionary directly
def display_progress_bar(progress: int, total: int, width: int = 40) -> None:
"""Displays a Typer-style progress bar in the terminal with percentage at the start."""
percent = int(progress * 100 / total) if total > 0 else 0
fill = int(width * progress / total) if total > 0 else 0
filled_bar = "━" * fill
empty_bar = "━" * (width - fill - 1)
glyph = "╸" if fill < width else ""
total_len = len(str(total))
progress_total = f"{progress:{total_len}}/{total}"
filled_color = MAGENTA if percent < 100 else GREEN
# Print the progress bar with colors and percentage at the beginning
display = f"\rExtracting {progress_total} {filled_color}{filled_bar}{glyph}{GRAY}{empty_bar}{RESET} {BOLD}{percent}%{RESET}"
print(display, end="", flush=True)
def prompt_yes_no(question: str) -> bool:
"""Prompts the user for a yes/no response."""
while True:
answer = input(f"{question} (Y/n): ").strip().lower()
if answer in {"y", "yes", ""}:
return True
if answer in {"n", "no"}:
return False
print("Invalid input. Please enter 'Y' or 'n'.")
def create_unique_foldername(filepath: Path) -> Path:
"""Creates a unique folder name based on the file path."""
foldername = filepath.with_suffix("")
counter = 0
new_foldername = foldername
while new_foldername.exists():
counter += 1
new_foldername = foldername.parent / f"{foldername.name} ({counter})"
return new_foldername
def extract_archive(filepath: Path, foldername: Path) -> None:
"""Extracts an archive based on its file extension."""
try:
if filepath.suffix[1:] == "rar":
subprocess.run(
["unrar", "x", "-y", str(filepath), str(foldername)],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
)
elif filepath.suffix[1:] == "7z":
subprocess.run(
["7z", "x", "-y", str(filepath), f"-o{foldername}"],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
)
else:
shutil.unpack_archive(str(filepath), str(foldername))
logging.info(
{
"action": "extract",
"file": str(filepath),
"status": "success",
"output_folder": str(foldername),
}
) # Log directly as a JSON object
except Exception as e:
logging.error(
{
"action": "extract",
"file": str(filepath),
"status": "error",
"error": str(e),
}
) # Log directly as a JSON object
def extract_file(filepath: Path, confirm_remove: bool, skip_remove: bool) -> None:
"""Extracts an archive using the appropriate tool and handles removal."""
new_foldername = create_unique_foldername(filepath)
new_foldername.mkdir(parents=True, exist_ok=True)
extract_archive(filepath, new_foldername)
if not skip_remove and (
not confirm_remove
or prompt_yes_no(f"Do you want to remove the original archive {filepath}?")
):
os.remove(filepath)
logging.info(
{"action": "remove", "file": str(filepath), "status": "success"}
) # Log directly as a JSON object
else:
logging.info(
{"action": "remove", "file": str(filepath), "status": "skipped"}
) # Log directly as a JSON object
logging.info(
{
"action": "extraction_complete",
"file": str(filepath),
"output_folder": str(new_foldername),
"status": "success",
}
) # Log directly as a JSON object
def process_file(
filepath: Path,
min_size: int,
confirm_extract: bool,
confirm_remove: bool,
skip_remove: bool,
throttle: float,
) -> int:
"""Processes a single file to check if it should be extracted."""
filesize = filepath.stat().st_size
if filepath.suffix[1:] in SUPPORTED_EXTENSIONS and filesize > min_size:
logging.info(
{
"action": "found_large_archive",
"file": str(filepath),
"size_mb": filesize / (1024 * 1024),
}
) # Log directly as a JSON object
if not confirm_extract or prompt_yes_no("Do you want to extract this archive?"):
extract_file(filepath, confirm_remove, skip_remove)
else:
logging.info(
{
"action": "skip_file",
"file": str(filepath),
"reason": (
"unsupported_format_or_small_size"
if filepath.suffix[1:] not in SUPPORTED_EXTENSIONS
else "small_size"
),
}
) # Log directly as a JSON object
if throttle > 0:
time.sleep(throttle)
return 1
def find_files(
directory: Path, current_depth: int, max_depth: int, files: list
) -> None:
"""Recursively finds files up to a specified directory depth level."""
if current_depth > max_depth:
return
for entry in directory.iterdir():
if entry.is_file():
files.append(entry)
elif entry.is_dir():
find_files(entry, current_depth + 1, max_depth, files)
def log_final_result(total_files: int, processed_files: int) -> None:
"""Logs the final result of the script execution."""
result = {
"action": "final_result",
"total_files": total_files,
"processed_files": processed_files,
}
logging.info(result) # Log directly as a JSON object
def initialize_file_processing(args) -> tuple:
"""Initialize logging and find files to process."""
setup_logging()
log_configuration(args)
files = []
find_files(Path(args.directory), 0, args.level, files)
total_files = len(files)
return files, total_files
def process_file_parallel_init(filepath):
"""Helper function to initialize arguments for parallel processing."""
return process_file(*filepath)
def process_files_serially(files: list, args) -> int:
"""Process files serially."""
processed_files = 0
for filepath in files:
processed_files += process_file(
filepath,
args.size * 1024 * 1024,
args.confirm_extract,
args.confirm_remove,
args.skip_remove,
args.throttle,
)
if args.progress_bar:
display_progress_bar(processed_files, len(files))
return processed_files
def process_files_parallel(files: list, args) -> int:
"""Process files using multiprocessing."""
processed_files = 0
pool_args = [
(
filepath,
args.size * 1024 * 1024,
args.confirm_extract,
args.confirm_remove,
args.skip_remove,
args.throttle,
)
for filepath in files
]
with multiprocessing.Pool(args.jobs) as pool:
results = pool.imap(process_file_parallel_init, pool_args)
for result in results:
processed_files += result
if args.progress_bar:
display_progress_bar(processed_files, len(files))
return processed_files
def run_main(args) -> None:
"""Runs the main process of extracting files."""
files, total_files = initialize_file_processing(args)
if args.confirm_extract or args.confirm_remove:
args.jobs = 1 # Force serial processing if user confirmation is needed
if args.jobs > 1:
processed_files = process_files_parallel(files, args)
else:
processed_files = process_files_serially(files, args)
if args.progress_bar:
print() # Move to the next line after the progress bar
# Log final result
log_final_result(total_files, processed_files)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Recursively find and extract archives."
)
parser.add_argument(
"-s",
"--size",
type=int,
default=10,
help="Minimum size in MB to process (default: 10 MB)",
)
parser.add_argument(
"-e",
"--confirm-extract",
action="store_true",
help="Confirm extraction for each archive (default: False)",
)
parser.add_argument(
"-r",
"--confirm-remove",
action="store_true",
help="Confirm removal of original archive after extraction (default: False)",
)
parser.add_argument(
"--skip-remove",
action="store_true",
help="Skip deleting the original archive after extraction",
)
parser.add_argument(
"-j",
"--jobs",
type=int,
default=multiprocessing.cpu_count(),
help="Number of parallel jobs (default: number of CPU cores)",
)
parser.add_argument(
"-l",
"--level",
type=int,
default=float("inf"),
help="Maximum depth level to search for files (default: infinite)",
)
parser.add_argument(
"-p",
"--progress-bar",
action="store_true",
help="Display progress bar",
)
parser.add_argument(
"--throttle",
type=float,
default=0,
help="Throttle duration in seconds between processing each file (default: 0)",
)
parser.add_argument(
"directory",
nargs="?",
default=".",
help="Directory to start searching from (default: current directory)",
)
args = parser.parse_args()
# Redirect stderr to hide unwanted error output, keep stdout open for progress bar
with open(os.devnull, "w") as fnull:
if args.progress_bar:
with redirect_stderr(fnull):
run_main(args)
else:
with redirect_stdout(fnull), redirect_stderr(fnull):
run_main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment