|
import re |
|
import csv |
|
import json |
|
|
|
|
|
with open("Location_Full Data_data.csv", newline="", encoding="utf-16") as f: |
|
reader = csv.DictReader(f, delimiter="\t") |
|
rows = [row for row in reader] |
|
# ⚠️ (location, year, type) is not unique because of mid-block crashes |
|
data = {} |
|
for row in rows: |
|
key = (row["Location"], int(row["Year"]), row["Crash Type"]) |
|
data[key] = data.get(key, 0) + int(row["CrashCount"]) |
|
|
|
locations = {location for (location, _, _) in data} |
|
years = {year for (_, year, _) in data} |
|
crash_types = {crash_type for (_, _, crash_type) in data} |
|
|
|
|
|
# Aggregate by location and sum crash types. Resulting data should look like |
|
# { "<Location>": { <N_Year>: <N_Crashes> } } |
|
|
|
total_crashes_in_location_by_year = { |
|
location: { |
|
year: sum( |
|
data.get((location, year, crash_type), 0) for crash_type in crash_types |
|
) |
|
for year in years |
|
} |
|
for location in locations |
|
} |
|
|
|
# Produce a CSV file with average crashes per year and on each year |
|
|
|
with open("data.csv", "wt") as f: |
|
headers = [ |
|
"Location", |
|
f"Average Crashes {min(years)}-{max(years)}", |
|
*(f"Crashes in {year}" for year in sorted(years)), |
|
] |
|
writer = csv.DictWriter(f, fieldnames=headers) |
|
writer.writeheader() |
|
writer.writerows( |
|
[ |
|
{ |
|
headers[0]: location, |
|
headers[1]: sum(crashes_by_year.values()) * 1.0 / len(years), |
|
**{ |
|
header: crashes_by_year.get(year) |
|
for header, year in zip(headers[2:], sorted(years)) |
|
}, |
|
} |
|
for location, crashes_by_year in total_crashes_in_location_by_year.items() |
|
] |
|
) |
|
|
|
|
|
# Produce a JSON file mapping road to average crash counts on intersections |
|
|
|
|
|
def normalize(key: str): |
|
key = re.sub(r"\s+", " ", key).strip() |
|
return key |
|
|
|
|
|
def split_normalize(key: str) -> set: |
|
parts = key.replace(" AND ", " & ").split("&") |
|
return set(normalize(part) for part in parts) |
|
|
|
|
|
roads = {road for location in locations for road in split_normalize(location)} |
|
|
|
|
|
def get_avg_crashes_by_intersections(road): |
|
for location, crashes_by_year in total_crashes_in_location_by_year.items(): |
|
if road not in split_normalize(location): |
|
continue |
|
|
|
crashes = sum(crashes_by_year.values()) * 1.0 / len(years) |
|
intersections = split_normalize(location).difference([road]) |
|
if len(intersections) > 0: |
|
intersection = " & ".join(intersections) |
|
yield ("at " + intersection, crashes) |
|
else: |
|
yield ("(mid-block crashes)", crashes) |
|
|
|
|
|
avg_crashes_in_road_by_intersection = { |
|
road: sorted(get_avg_crashes_by_intersections(road)) for road in sorted(roads) |
|
} |
|
|
|
|
|
with open("data.json", "w") as f: |
|
formatted_lines = { |
|
road: [ |
|
f"{round(crashes):3d} {intersection}" |
|
for intersection, crashes in crashes_by_intersection |
|
if round(crashes) > 0 |
|
] |
|
for road, crashes_by_intersection in avg_crashes_in_road_by_intersection.items() |
|
} |
|
json.dump({road: data for road, data in formatted_lines.items() if len(data)}, f) |
|
|
|
print(f"{len(formatted_lines.keys())} streets mapped") |