|
#!/usr/bin/env python3 |
|
# Copyright (c) 2017 Yu-Jie Lin |
|
# |
|
# Permission is hereby granted, free of charge, to any person obtaining a copy |
|
# of this software and associated documentation files (the "Software"), to deal |
|
# in the Software without restriction, including without limitation the rights |
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|
# copies of the Software, and to permit persons to whom the Software is |
|
# furnished to do so, subject to the following conditions: |
|
# |
|
# The above copyright notice and this permission notice shall be included in |
|
# all copies or substantial portions of the Software. |
|
# |
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
|
# SOFTWARE. |
|
|
|
import argparse |
|
import csv |
|
import logging as log |
|
from itertools import groupby |
|
|
|
LOG_FORMAT = ('%(asctime)s.%(msecs)03d %(levelname)8s ' |
|
'%(funcName)s:%(lineno)d: %(message)s') |
|
LOG_DATEFMT = '%H:%M:%S' |
|
log.basicConfig(format=LOG_FORMAT, datefmt=LOG_DATEFMT, level='DEBUG') |
|
|
|
|
|
def write(basename, records, r_dates, n_books): |
|
|
|
with open(basename, 'w') as f: |
|
f.write(r_dates + '\n') |
|
f.write(n_books + '\n') |
|
f.write('\n') |
|
|
|
with open(basename + '.csv', 'w') as fcsv: |
|
fcsv.write(('"Ranking","Consecutive Weeks","Start","End","Title",' |
|
'"Author"\n')) |
|
for key, weeks, ranking in records: |
|
title, author = key |
|
args = [ranking, len(weeks), weeks[0], weeks[-1], title, |
|
author] |
|
f.write('{:5,d} {:2d} weeks ({}–{}): {} by {}\n'.format(*args)) |
|
args[4] = title.replace('"', '""') |
|
args[5] = author.replace('"', '""') |
|
fcsv.write('"{}","{}","{}","{}","{}","{}"\n'.format(*args)) |
|
log.info(basename + ' written') |
|
log.info(basename + '.csv written') |
|
|
|
|
|
def add_ranking(records): |
|
|
|
nrs = [] |
|
rank = 0 |
|
oweeks = None |
|
for n, item in enumerate(records): |
|
key, weeks = item |
|
if oweeks != len(weeks): |
|
rank = n + 1 |
|
oweeks = len(weeks) |
|
nrs.append((key, weeks, rank)) |
|
return nrs |
|
|
|
|
|
def calc_consecutive(fiction, r_dates, n_books): |
|
|
|
records = {} |
|
okey = (None, None) |
|
weeks = [] |
|
for week, title, author in fiction: |
|
key = (title, author) |
|
if key != okey: |
|
if okey != (None, None): |
|
# new top 1 |
|
oweeks = records.get(okey, []) |
|
if len(weeks) > len(oweeks): |
|
# new record for the title |
|
records[okey] = weeks |
|
okey = key |
|
weeks = [week] |
|
else: |
|
weeks.append(week) |
|
else: |
|
oweeks = records.get(okey, []) |
|
if len(weeks) > len(oweeks): |
|
# new record for the title |
|
records[okey] = weeks |
|
|
|
# checking title with different author |
|
kf = lambda item: item[0][0] |
|
by_title = sorted(records.items(), key=kf) |
|
for k, g in groupby(by_title, key=kf): |
|
books = list(g) |
|
if len(books) == 1: |
|
continue |
|
title = books[0][0][0] |
|
msg = '"{}" has been topped with different authors:' |
|
log.warning(msg.format(title)) |
|
for book in books: |
|
author = book[0][1] |
|
log.warning(' {}'.format(author)) |
|
|
|
kf = lambda item: len(item[1]) |
|
c_most_weeks = sorted(records.items(), key=kf, reverse=True) |
|
c_most_weeks = add_ranking(c_most_weeks) |
|
write('c_most_weeks', c_most_weeks, r_dates, n_books) |
|
|
|
kf = lambda item: item[1][0] |
|
c_chron = sorted(c_most_weeks, key=kf) |
|
write('c_chron', c_chron, r_dates, n_books) |
|
|
|
|
|
def calc(fiction, r_dates, n_books): |
|
|
|
####################### |
|
# most weeks by title # |
|
####################### |
|
|
|
kf = lambda item: (item[1], item[2]) |
|
by_title = sorted(fiction, key=kf) |
|
most_weeks = {} |
|
for k, g in groupby(by_title, key=kf): |
|
nweeks = [w[0] for w in g] |
|
most_weeks[k] = nweeks |
|
|
|
kf = lambda item: len(item[1]) |
|
most_weeks = sorted(most_weeks.items(), key=kf, reverse=True) |
|
most_weeks = add_ranking(most_weeks) |
|
write('most_weeks', most_weeks, r_dates, n_books) |
|
|
|
##################### |
|
# heatmap by decade # |
|
##################### |
|
|
|
decize = lambda s: '{}s'.format(int(s[:4]) // 10 * 10) |
|
wks_dec = [(len(b[1]), decize(b[1][0])) for b in most_weeks] |
|
kf = lambda item: item[1] |
|
wks_dec.sort(key=kf) |
|
bins_by_dec = {} |
|
weeks_books = {} |
|
max_weeks = len(most_weeks[0][1]) |
|
for dec, weeks in groupby(wks_dec, key=kf): |
|
bins = [0] * max_weeks |
|
weeks = list(weeks) |
|
total_books = len(weeks) |
|
weeks = sorted(w[0] for w in weeks) |
|
total_weeks = sum(weeks) |
|
for w, count in groupby(weeks): |
|
bins[w - 1] = sum(1 for c in count) |
|
bins_by_dec[dec] = bins |
|
weeks_books[dec] = (total_weeks, total_books) |
|
|
|
fn = 'most_weeks_heatmap.csv' |
|
with open(fn, 'w') as f: |
|
nums = ['"{}"'.format(w) for w in range(1, max_weeks + 1)] |
|
hline = '"Decade",' + ','.join(nums) |
|
f.write(hline + '\n') |
|
for dec in sorted(bins_by_dec.keys()): |
|
bins = bins_by_dec[dec] |
|
f.write('"{}",'.format(dec)) |
|
f.write(','.join(['"{}"'.format(b) for b in bins])) |
|
f.write('\n') |
|
log.info(fn + ' written') |
|
|
|
fn = 'weeks_books.csv' |
|
with open(fn, 'w') as f: |
|
hline = ('"Decade","Weeks","Books","Average Weeks/Book",' |
|
'"Average Books/Year"\n') |
|
f.write(hline) |
|
fmt = ','.join(['"{}"'] * 5) + '\n' |
|
for dec in sorted(weeks_books.keys()): |
|
weeks, books = weeks_books[dec] |
|
f.write(fmt.format(dec, weeks, books, weeks / books, books / 10)) |
|
log.info(fn + ' written') |
|
|
|
|
|
def main(): |
|
|
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('CSV') |
|
args = parser.parse_args() |
|
|
|
fiction = [] |
|
with open(args.CSV) as f: |
|
cr = csv.reader(f) |
|
for r in cr: |
|
if r[0] != 'Fiction': |
|
continue |
|
fiction.append(r[1:]) |
|
|
|
msg = '{:,} weekly lists ({}–{}) published' |
|
r_dates = msg.format(len(fiction), fiction[0][0], fiction[-1][0]) |
|
log.info(r_dates) |
|
|
|
kf = lambda item: '{} by {}'.format(*item[1:]) |
|
books = sum(1 for b in groupby(sorted(fiction, key=kf), key=kf)) |
|
n_books = '{:5,} books topped the lists'.format(books) |
|
log.info(n_books) |
|
|
|
calc_consecutive(fiction, r_dates, n_books) |
|
calc(fiction, r_dates, n_books) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |