Skip to content

Instantly share code, notes, and snippets.

@aNNiMON
Last active July 29, 2023 15:00
Show Gist options
  • Save aNNiMON/6ba37e4d4084e858f917e271550ce5f6 to your computer and use it in GitHub Desktop.
Save aNNiMON/6ba37e4d4084e858f917e271550ce5f6 to your computer and use it in GitHub Desktop.
PicSorter
__pycache__
.idea
input
library
logs
images.db

PicSorter

Finds an image on danbooru, writes tags as IPTC keywords, than places the image in the library

from pathlib import Path
import yaml
class Config:
@staticmethod
def load(path='config.yml'):
with open(path, 'rt', encoding='utf8') as f:
config = yaml.load(f.read(), Loader=yaml.FullLoader)
return Config(config)
def __init__(self, config):
dirs = config.get('dirs', {})
self.dir_tmp = Path(dirs.get('tmp', '/tmp/'))
self.dir_processed = Path(dirs.get('processed', './processed'))
self.dir_logs = Path(dirs.get('logs', './logs'))
self.dir_library = Path(dirs.get('library', './library'))
self.__setup_folders()
def __setup_folders(self):
self.dir_tmp.mkdir(exist_ok=True)
self.dir_logs.mkdir(exist_ok=True)
self.dir_library.mkdir(exist_ok=True)
dirs:
tmp: /tmp/
processed: ./processed
logs: ./logs
library: ./library
import os
import re
import requests
import time
files = [f for f in os.listdir('.') if os.path.isfile(f)]
for f in files:
m = re.search('.*__(?:sample-)?(\w+)*.', f)
if not m:
print('Warning: ', f)
continue
md5 = m.group(1)
try:
data = requests.get('https://danbooru.donmai.us/posts.json?tags=md5%3A' + md5).json()
time.sleep(1)
if len(data) == 1 and "id" in data[0]:
print("https://danbooru.donmai.us/posts/" + str(data[0]['id']))
else:
print(md5)
except Exception as ex:
print(md5)
import sqlite3
from datetime import datetime
class Database:
def __init__(self):
self.db_name = 'images.db'
self.__create_tables()
def __create_tables(self):
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
c.executescript("""
CREATE TABLE IF NOT EXISTS images (
id INTEGER PRIMARY KEY NOT NULL,
provider TEXT NOT NULL,
tags TEXT NOT NULL,
created_at TIMESTAMP,
UNIQUE(id, provider) ON CONFLICT REPLACE
)
""")
conn.commit()
conn.close()
def is_exists(self, provider, _id) -> bool:
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
c.execute("SELECT EXISTS(SELECT 1 FROM images WHERE id=? AND provider=?)", (_id, provider))
result = c.fetchone()[0]
conn.close()
return bool(result)
def add(self, _id, provider, tags):
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
sql = 'INSERT INTO images(id, provider, tags, created_at) VALUES (?,?,?,?)'
c.execute(sql, (_id, provider, tags, datetime.now()))
conn.commit()
conn.close()
import logging
from typing import Optional
import requests
from bs4 import BeautifulSoup
class Iqdb:
@staticmethod
def search(file: str) -> Optional[str]:
logging.info('Searching %s', file)
files = {'file': open(file, 'rb')}
resp = requests.post('https://iqdb.org/', files=files, timeout=10)
doc = BeautifulSoup(resp.text, 'html.parser')
for tag in doc.select(".image a"):
url = tag.get("href")
if "danbooru.donmai.us/posts" in url:
if url.startswith("//"):
url = "https:" + url
return url
return None
import logging
import os
import shutil
from pathlib import Path
from tags import Tags
class Library:
def __init__(self, dir_root: Path):
self.dir_root = dir_root
self.dir_orphan = Path(dir_root, '_orphan')
self.dir_orphan.mkdir(exist_ok=True, parents=True)
def move_to_orphan(self, p: Path) -> None:
logging.info("%s move to orphan", p)
shutil.move(os.fspath(p), os.fspath(self.dir_orphan))
def move(self, p: Path, tags: Tags) -> str:
new_path = self.__compute_path(tags)
new_path.mkdir(exist_ok=True, parents=True)
logging.info("%s move to %s", p.name, new_path)
shutil.move(os.fspath(p), os.fspath(new_path))
return str(new_path).replace("\\", "/") + "/" + p.name
def __compute_path(self, tags: Tags) -> Path:
p = self.dir_root
if tags.copyrights == 'original':
# Originals groups by artist
p = p / "_originals"
if tags.artists != "":
artist = tags.artists.split(" ")[0]
artist = self.__sanitize(artist)
if (p / artist).exists():
p = p / artist
return p
# Main section
if tags.copyrights != "":
_copyright = tags.copyrights.split(" ")[0]
p = p / self.__sanitize(_copyright)
if tags.characters == "":
return p
# Characters section
characters = tags.characters_sanitized()
num = len(characters)
if num == 1:
p = p / self.__sanitize(characters[0])
elif num == 2 and characters[0] in characters[1]:
p = p / self.__sanitize(characters[0])
elif num == 2 and characters[1] in characters[0]:
p = p / self.__sanitize(characters[1])
else:
p = p / "_multiple"
return p
@staticmethod
def __sanitize(s: str) -> str:
s = "".join(x for x in s if x.isalnum() or x in "._-()")
return s.replace("_", " ").strip()
import logging
import re
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Optional
import fluentpy as _
import requests
from tags import Tags
class Metadata:
def __init__(self, dir_tmp: Path):
self.dir_tmp = dir_tmp
self.tmp_image_file = Path(self.dir_tmp, "tmp.jpg")
self.tmp_fallback_download_file = Path(self.dir_tmp, "dl.jpg")
def process(self, url: str) -> Optional[tuple[Path, Tags]]:
logging.info("Retrieving metadata for %s", url)
meta = self.__get_metadata(url)
status = self.__download_file(meta)
if not status:
logging.warning("Download failed")
return None
return self.__write_tags(url, meta)
@staticmethod
def __get_metadata(url: str) -> dict:
return requests.get(url + ".json").json()
def __download_file(self, r: dict) -> bool:
ext = r.get("file_ext", "")
w = int(r.get("image_width", "0"))
h = int(r.get("image_height", "0"))
if (ext not in ["jpg", "jpeg", "png", "webp"]) or w == 0 or h == 0:
logging.warning("Skipping due to unsupported extension: %s", ext)
print("\033[93mSkipping due to unsupported extension:", ext, "\033[0m")
return False
file_url = r.get("file_url")
if file_url is None:
logging.warning("Skipping due to an empty file url")
print("\033[93mSkipping due to an empty file url\033[0m")
return False
file_size_kb = int(r.get('file_size', "0")) / 1024
logging.info("Downloading image")
recompress = self.__need_recompress(ext, w, h, file_size_kb)
return self.__download(file_url, recompress=recompress)
@staticmethod
def __need_recompress(ext, w, h, size_kb) -> bool:
return ext == 'jpg' and size_kb > 1400 and w < 2500 and h < 2500
def __download(self, img_url: str, recompress: bool = False) -> bool:
opt_args = []
if recompress:
opt_args = ['-quality', "80"]
ret = subprocess.call([
'magick', img_url,
'-resize', '2500x2500>',
*opt_args, self.tmp_image_file
], stdout=subprocess.PIPE)
return ret == 0
# noinspection PyCallingNonCallable
# noinspection PyProtectedMember
def __write_tags(self, url: str, r: dict) -> tuple[Path, Tags]:
tag_general = r.get('tag_string_general', "")
tag_copyrights = r.get('tag_string_copyright', "")
tag_characters = r.get('tag_string_character', "")
tag_artists = r.get('tag_string_artist', "")
tags = Tags(tag_general, tag_copyrights, tag_characters, tag_artists)
tags_file = Path(self.dir_tmp, "tags.txt")
with open(tags_file, "w") as f:
content = _(tags.tags) \
.map(lambda s: "-IPTC:keywords=" + s) \
.join("\n") \
._
content += "\n-Exif:ImageDescription=" + url
content += "\n-Iptc:Caption-Abstract=" + url
content += "\n-Xmp:Description=" + url
f.write(content)
logging.info("Writing tags")
subprocess.call([
'exiftool', '-q', '-overwrite_original',
'-@', tags_file,
self.tmp_image_file
], stdout=subprocess.PIPE)
filename = self.__format_filename(tags)
result_file = Path(self.tmp_image_file.parent, filename)
self.tmp_image_file.rename(result_file)
return result_file, tags
@staticmethod
def __format_filename(tags: Tags):
filename = '{} {} by {} at {}.jpg'.format(
tags.copyrights.split(" ")[0] or "",
", ".join(tags.characters_sanitized()[:2]),
tags.artists.split(" ")[0] or "",
datetime.now().strftime('%Y%m%d_%H%M%S')
)
filename = "".join(x for x in filename if x.isalnum() or x in " ._-()")
return re.sub(r'\s+', ' ', filename).strip()
import argparse
import logging
import os
import re
import shutil
import time
from datetime import datetime
from pathlib import Path
from typing import Optional
from config import Config
from database import Database
from iqdb import Iqdb
from library import Library
from metadata import Metadata
class PicSorter:
@staticmethod
def parse_args():
parser = argparse.ArgumentParser(
description='Finds an image on danbooru, writes tags as IPTC keywords, than places the image in the library'
)
parser.add_argument('-c', '--config',
type=Path,
default='config.yml',
help='config.yml file path')
parser.add_argument('input', nargs=argparse.REMAINDER)
args = parser.parse_args()
if len(args.input) >= 1:
PicSorter(args.config).process(args.input)
def __init__(self, config_file='config.yml'):
config = Config.load(config_file)
self.config = config
self.__setup_logging(config.dir_logs)
self.library = Library(config.dir_library)
self.metadata = Metadata(config.dir_tmp)
self.db = Database()
@staticmethod
def __setup_logging(dir_logs: Path):
filename = datetime.now().strftime('%Y-%m-%d.log')
logfile = Path(dir_logs, filename)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(module)s: %(message)s',
datefmt='%H:%M:%S',
handlers=[
logging.FileHandler(os.fspath(logfile))
]
)
def process(self, inputs: list[str]) -> None:
for input in inputs:
if input.startswith("http") or re.search(r"(\d{3,})", input):
print("Processing url", input)
self.__process_url(input)
else:
p = Path(input)
if p.is_dir():
self.__process_folder(p)
elif p.is_file():
print("Processing file", input)
self.__process_file(input)
def __process_folder(self, dir_input: Path) -> None:
files = {p for p in dir_input.iterdir()
if p.suffix in [".jpg", ".png"]}
for filename in files:
print("Processing", filename)
try:
self.__process_file(filename)
except Exception as ex:
raise ex
time.sleep(5)
def __process_file(self, filename: str) -> bool:
url = self.__search_iqdb(filename)
if url is None:
return False
if self.__process_url(url):
self.config.dir_processed.mkdir(exist_ok=True, parents=True)
from_path = os.fspath(filename)
to_path = os.fspath(self.config.dir_processed)
shutil.move(from_path, to_path)
self.__show_path(to_path)
return True
return False
def __search_iqdb(self, filename: str) -> Optional[str]:
url = Iqdb.search(filename)
if url is None:
logging.warning("%s not found", filename)
self.library.move_to_orphan(Path(filename))
return None
return url
def __process_url(self, url: str) -> bool:
m = re.search(r"(https://((?:dan|ai)booru|yande).*?(?:post(?:s|/show)/)?(\d{3,}))", url)
if not m:
return False
provider = m.group(2)
post_id = int(m.group(3))
if provider not in ['danbooru', 'aibooru']:
return False
if self.db.is_exists(provider, post_id):
logging.info("Skipping exists post %s %d", provider, post_id)
return False
meta_result = self.metadata.process(m.group(1))
if meta_result is None:
return False
image_path, tags = meta_result
to_path = self.library.move(image_path, tags)
self.db.add(post_id, provider, tags.tags_string)
self.__show_path(to_path)
return True
def __show_path(self, p: str) -> None:
print("\033[92mSaved to", 'file://' + p.replace(' ', '%20'), "\033[0m")
if __name__ == '__main__':
PicSorter.parse_args()
beautifulsoup4==4.9.3
fluentpy>=2.0
PyYAML==5.4.1
requests>=2.24
from dataclasses import dataclass, field
import fluentpy as _
@dataclass
class Tags:
general: str
copyrights: str
characters: str
artists: str
tags: list[str] = field(init=False)
tags_string: str = field(init=False)
def __post_init__(self):
self.tags = self.__union_tags()
self.tags_string = " ".join(self.tags)
# noinspection PyCallingNonCallable
# noinspection PyProtectedMember
def characters_sanitized(self) -> list:
if self.copyrights == "":
# No need to sanitize tags
return self.characters.split(" ")
copyrights = self.copyrights.split(" ")
return _(self.characters) \
.split(" ") \
.filter(lambda s: s != "") \
.map(lambda s: self.__rename(s, copyrights)) \
._
@staticmethod
def __rename(s: str, substrings: list[str]) -> str:
for substring in substrings:
s = s.replace("_("+substring+")", "") \
.replace("("+substring+")", "") \
.strip()
return s
def __union_tags(self) -> list[str]:
tags = self.general.split(" ")
tags += self.__prefix_tags(self.copyrights, 'copyright_')
tags += self.__prefix_tags(self.characters, 'character_')
tags += self.__prefix_tags(self.artists, 'artist_')
return tags
# noinspection PyCallingNonCallable
# noinspection PyProtectedMember
@staticmethod
def __prefix_tags(tags: str, prefix: str) -> list[str]:
return _(tags) \
.split(" ") \
.filter(lambda s: s != "") \
.map(lambda s: prefix + s.strip()) \
._
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment