Skip to content

Instantly share code, notes, and snippets.

@terrettaz
Created May 2, 2020 15:36
Show Gist options
  • Save terrettaz/53528e26cf9f7b19125af286e780dc9a to your computer and use it in GitHub Desktop.
Save terrettaz/53528e26cf9f7b19125af286e780dc9a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2.7
import sys
import os
import popen2
import re
import shutil
import pytz
import imghdr
from datetime import datetime
from dateutil import parser
import json
import pprint
ALBUMS_DIR = '/Users/pik/Pictures/Photos/AlbumsJournaliers'
RESULT_DIR = '/Users/pik/Pictures/Photos/DailyPictures'
DATA = '/Users/pik/.daily_pictures.json'
LOCAL_TZ = pytz.timezone ("Europe/Zurich")
def test_image(path):
try:
return imghdr.what(path)
except Exception as e:
print 'skipping ' + path
return None
def parse_tz_datetime(text, message, path):
if text in ['0000:00:00 00:00:00', '-']:
return None
try:
dt = parser.parse(text)
if dt.tzinfo:
return dt.astimezone(LOCAL_TZ)
return dt
except Exception as e:
print 'Cannot parse "%s" date from path %s: %s' % (text, path, message)
def get_dates(path):
r, w, e = popen2.popen3(
'xargs exiftool -f -s -s -s -d \'%Y%m%d%H%M%S%z\' -FileModifyDate -DateTimeOriginal')
try:
escaped_path = re.sub(r' ', "\\ ", path)
w.write(escaped_path.encode('utf-8')+'\n')
except Exception as e:
print 'Error while executing exiftool ' + e
raise e
finally:
w.close()
lines = r.readlines()
return [
('file_date', parse_tz_datetime(lines[0][:-1], 'file date', path)),
('exif_date', parse_tz_datetime(lines[1][:-1], 'exif date', path))
]
def guess_date(path):
dirname = os.path.dirname(path)
if dirname != '':
m = re.match('^.*(?P<day>[0-3][0-9])\.(?P<month>[0-1][0-9])\.(?P<year>[0-1][0-9]).*$', dirname)
if m != None:
year = int(m.group('year'))
if year < 80:
year = 2000 + year
else:
year = 1900 + year
try:
return datetime(year, int(m.group('month')), int(m.group('day')))
except Exception as e:
print 'Warning: cannot create date with %s-%s-%s' % (m.group('year'), m.group('month'), m.group('day'))
print ' for file %(path)u %(e)u ' % locals()
raise e
def get_file_info(path):
info = {
'path':path,
'filename':os.path.basename(path)
}
try:
dates = filter(lambda pair: pair[1],
[('guess_date', guess_date(path))] + get_dates(path))
return dict(info.items() + dates)
except Exception as e:
print 'Error while processing "%(path)s" %(info)s %(e)s' % locals()
raise e
def select_best_date(info):
path = info['path']
exif = info['exif_date'] if 'exif_date' in info else None
file = info['file_date'] if 'file_date' in info else None
guess = info['guess_date'] if 'guess_date' in info else None
if 'selected_date' in info:
return info
if exif and guess:
if exif.year != guess.year and \
exif.month != guess.month and \
exif.day != guess.day:
choice = 0
array = (exif, guess, file)
while True:
print '%(path)s as two diffrent dates, choose:' % locals()
print ' [1] exif %(exif)s' % locals()
print ' [2] guess %(guess)s' % locals()
print ' [3] file %(file)s' % locals()
print ' [s] Skip'
print ' [q] Quit'
choice = sys.stdin.readline().lower()[:-1]
if choice == 'q':
sys.exit(1)
elif choice == 's':
return info
if int(choice) > 3 or int(choice) < 1:
continue
return dict(info.items() + [('selected_date', array[int(choice)-1])])
else:
return dict(info.items() + [('selected_date', exif)])
elif exif == None and guess == None:
while True:
try:
print '%(path)s cannot find a valid date, choose one: yyyy-mm-dd or "s" to skip' % locals()
strdate = sys.stdin.readline().lower()[:-1]
if strdate == 's':
return info
return dict(info.items() + [('selected_date', datetime.strptime(strdate, '%Y-%m-%d'))])
except:
continue
elif exif == None:
return dict(info.items() + [('selected_date', guess)])
else:
return dict(info.items() + [('selected_date', exif)])
def load_data():
if not os.path.exists(DATA): return []
f = open(DATA)
try:
data = json.loads(f.read())
return map(lambda x: parse_dates(x), data)
except Exception as e:
print e
print 'Cannot load data from %s' % DATA
return []
finally:
f.close()
def format_dates(dict_from):
d = dict(dict_from)
d.update(dict(
filter(lambda pair: pair[1],
map(lambda key:
(key, d[key].strftime('%Y%m%d%H%M%S') if key in d else None),
['exif_date', 'guess_date', 'file_date', 'selected_date']))))
return d
def parse_dates(dict_from):
d = dict(dict_from)
d.update(dict(
filter(lambda pair: pair[1],
map(lambda key:
(key, datetime.strptime(d[key], '%Y%m%d%H%M%S') if key in d else None),
['exif_date', 'guess_date', 'file_date', 'selected_date']))))
return d
def save_data(data):
print 'saving data .. ',
f = open(DATA, 'w')
try:
data = map(lambda x: format_dates(x), data)
f.write(json.dumps(data))
finally:
f.close
'ok'
def find_by_path(path, file_infos):
infos = filter(lambda x: x['path'] == path, file_infos)
if len(infos) > 0:
return infos[0]
return None
def load_new_paths(file_infos):
print 'loading files .. ',
paths = []
for root, dirs, files in os.walk(ALBUMS_DIR):
for f in files:
path = os.path.join(root, f).decode('utf-8')
info = find_by_path(path, file_infos)
if info or (info != None and 'error' in info):
continue
if not test_image(path):
continue
paths.append(path)
print 'ok'
return paths
def load_file_infos(paths):
from multiprocessing import Pool
print 'loading infos .. ',
pool = Pool(16)
files_info = []
try:
for value in pool.imap(get_file_info, paths):
if value: # Can be None
files_info.append(value)
finally:
pool.terminate()
pool.join()
print 'ok'
return files_info
def select_dates(files_info):
print 'Selecting dates ..'
return map(select_best_date, files_info)
def copy_files(files_info, dryRun):
count_dir = 0
count_files = 0
for info in files_info:
if 'error' in info:
continue
date = info['selected_date']
if date == None:
print 'Skipped %s' % info['filename']
info['error'] = True
continue
new_dir = os.path.join(
RESULT_DIR,
str(date.year),
date.strftime('%m'),
date.strftime('%y.%m.%d'))
if not os.path.exists(new_dir):
print 'creating dir %s' % new_dir
if not dryRun:
os.makedirs(new_dir)
count_dir += 1
if not os.path.exists(os.path.join(new_dir, os.path.basename(info['path']))):
print 'copying %s' % info['path']
if not dryRun:
shutil.copy2(info['path'], new_dir)
count_files += 1
return (count_dir, count_files)
def main(argv):
files_info = load_data()
try:
paths = load_new_paths(files_info)
files_info+=load_file_infos(paths)
save_data(files_info)
files_info = select_dates(files_info)
save_data(files_info)
count_dir, count_files = copy_files(files_info, '--dryRun' in argv)
result = 'finished, %(count_dir)d directories created, %(count_files)d files copied' % locals()
print result
except KeyboardInterrupt: pass
finally:
save_data(files_info)
if __name__ == '__main__':
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment