Created
February 25, 2020 20:40
-
-
Save KhasMek/8932edcff248f835435d071a05b8d1c6 to your computer and use it in GitHub Desktop.
Been slacking on adding all your ebooks to Calibre. This should help.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Bulk import new books, or existing books in new formats into your | |
calibre database. | |
Should this use native code instead of subprocess? Of couse, but | |
I'm lazy, knew the commands and didn't want to work more than I | |
needed to. Maybe someday I'll updated it to be more pretty. | |
""" | |
import json | |
import logging | |
import os | |
import re | |
import subprocess | |
import sys | |
from colorama import Fore, Style | |
loglevel = 'INFO' | |
ebook_extensions = [ | |
'epub', | |
'pdf', | |
'mobi', | |
'prc' | |
] | |
class MyFormatter(logging.Formatter): | |
crt_fmt = Style.BRIGHT + Fore.RED + " [!] %(msg)s" + Style.RESET_ALL | |
err_fmt = Fore.MAGENTA + " [!] %(msg)s" + Style.RESET_ALL | |
wrn_fmt = Fore.YELLOW + " [!] %(msg)s" + Style.RESET_ALL | |
inf_fmt = Fore.CYAN + " [+] %(msg)s" + Style.RESET_ALL | |
dbg_fmt = Fore.GREEN + " [ ] %(msg)s" + Style.RESET_ALL | |
def __init__(self): | |
super().__init__(fmt="%(levelno)d: %(msg)s", datefmt=None, style="%") | |
def format(self, record): | |
format_orig = self._style._fmt | |
if record.levelno == logging.CRITICAL: | |
self._style._fmt = MyFormatter.crt_fmt | |
if record.levelno == logging.ERROR: | |
self._style._fmt = MyFormatter.err_fmt | |
if record.levelno == logging.WARNING: | |
self._style._fmt = MyFormatter.wrn_fmt | |
if record.levelno == logging.INFO: | |
self._style._fmt = MyFormatter.inf_fmt | |
if record.levelno == logging.DEBUG: | |
self._style._fmt = MyFormatter.dbg_fmt | |
result = logging.Formatter.format(self, record) | |
self._style._fmt = format_orig | |
return result | |
fmt = MyFormatter() | |
hdlr = logging.StreamHandler(sys.stdout) | |
hdlr.setFormatter(fmt) | |
logging.root.addHandler(hdlr) | |
logging.root.setLevel(loglevel) | |
def build_existing_library(): | |
""" | |
This function is currently unused. But I'm leaving it jic. | |
returns: list of books in current Calibre library (with all metadata) | |
""" | |
list_cmd = [ | |
'calibredb', | |
'list', | |
'--for-machine', | |
'-f', | |
'all' | |
] | |
logging.debug('Command: %s' % ' '.join(cmd)) | |
p = subprocess.run(list_cmd, stdout=subprocess.PIPE) | |
existing_library = json.loads(p.stdout) | |
return existing_library | |
def build_ingest_list(path_to_add='.'): | |
""" | |
Build a list of ebooks to be ingested into the calibre database | |
returns: list | |
""" | |
ingest_list = set() | |
for _, _, files in os.walk(path_to_add): | |
logging.debug("Length of files: %s" % len(files)) | |
for file in files: | |
if file.endswith(tuple(ebook_extensions)): | |
ingest_list.add(file) | |
else: | |
logging.error("File: %s does not end with an allowed extension!!!" % file) | |
logging.debug("Length of ingest_list: %s" % len(ingest_list)) | |
return ingest_list | |
def check_book_in_library(book): | |
""" | |
This function is currently unused. But I'm leaving it jic. | |
""" | |
if book in existing_library['title']: | |
logging.debug("%s already in library!" % book) | |
else: | |
logging.debug("%s is not in library" % book) | |
def get_book_formats(book_title): | |
""" | |
Get a list of all formats of specified book already in the calibre database. | |
return: list of file extensions | |
""" | |
formats = [] | |
cmd = [ | |
'calibredb', | |
'list', | |
'-s', | |
'title:"' + book_title + '"', | |
'-f', 'formats', | |
'--for-machine' | |
] | |
logging.debug('Command: %s' % ' '.join(cmd)) | |
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
if p.stderr: | |
logging.error(p.stderr) | |
if p.stdout: | |
logging.debug(json.loads(p.stdout)) | |
try: | |
files = json.loads(p.stdout)[0]['formats'] | |
for f in files: | |
logging.debug("Filetype already in db: %s" % os.path.splitext(f)[1]) | |
formats.append(os.path.splitext(f)[1]) | |
except IndexError: | |
logging.error("Could not retrieve book formats for book %s, you may need to add manually." % book_title) | |
return formats | |
def get_book_id(book_title): | |
""" | |
Get the id of a book already in the calibre database. | |
return: string containing id. | |
""" | |
cmd = [ | |
'calibredb', | |
'search', | |
'title:'+ book_title | |
] | |
logging.debug('Command: %s' % ' '.join(cmd)) | |
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
if p.stderr: | |
logging.error(p.stderr) | |
return None | |
elif p.stdout: | |
book_id = p.stdout.decode('utf-8') | |
logging.debug("Book ID: %s" % book_id) | |
return(book_id) | |
else: | |
logging.error("UNKNOWN ERROR") | |
sys.exit(0) | |
def main(): | |
""" | |
Main entrypoint for this script. | |
""" | |
logging.info("Building list of books to add to library...") | |
ingest_list = build_ingest_list() | |
logging.debug(', '.join(ingest_list)) | |
logging.info("Books to be added: %s" % len(ingest_list)) | |
logging.debug("List of books: %s" % ', '.join(ingest_list)) | |
for book in ingest_list: | |
logging.info("Filename: %s" % book) | |
cmd = [ | |
'calibredb', | |
'add', | |
book | |
] | |
logging.debug('Command: %s' % ' '.join(cmd)) | |
upload_status = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
reg = re.compile(r'(\(see --duplicates option\)\:\\n\s+)(.*?(?=\\n))', re.MULTILINE) | |
if upload_status.stderr and reg.search(str(upload_status.stderr)): | |
logging.warning("Warning: %s" % str(upload_status.stderr.decode('utf-8'))) | |
book_title = reg.search(str(upload_status.stderr)).group(2).replace('\\', '') | |
logging.warning("%s is already in the library DB!" % book_title) | |
current_book_formats = get_book_formats(book_title) | |
book_format = os.path.splitext(book)[1] | |
if book_format in current_book_formats: | |
logging.warning("Book with same format (%s) is already in the database [%s]!" % (book_format, ', '.join(current_book_formats))) | |
else: | |
logging.info("%s is not in current formats of %s" % (book_format, ', '.join(current_book_formats))) | |
book_id = get_book_id(book_title) | |
if upload_new_format(book_id, book): | |
logging.info("%s successfully added to '%s' formats!" % (book_format, book_title)) | |
else: | |
logging.info("'%s' successfully added to the database!" % book) | |
def upload_book(book): | |
""" | |
Should I refactor part of the main function into this? Probably | |
Read in book, return true or book_title if already exists? | |
""" | |
pass | |
def upload_new_format(book_id, book): | |
""" | |
Upload a book that already exists in the calibre database, but in a new format | |
return: True if successful | |
""" | |
cmd = [ | |
'calibredb', | |
'add_format', | |
book_id, | |
book | |
] | |
logging.debug('Command: %s' % ' '.join(cmd)) | |
p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
if p.stderr: | |
logging.error(p.stderr) | |
else: | |
return True | |
if p.stdout: | |
logging.info(p.stdout) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment