Created
September 16, 2023 01:35
-
-
Save a10y/b8a5ee757052985d4347e728fcb3fdcd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Pull data from SAM.gov, extract all attachments, push to S3 storage | |
import subprocess | |
import json | |
import os | |
from pathlib import Path | |
from multiprocessing import Pool | |
WASABI_ENDPOINT = "https://s3.wasabisys.com" | |
# Worker | |
def do_upload( | |
solicitation_number: str, | |
idx: int, | |
doc_url: str, | |
folder_path: Path, | |
bucket: str, | |
endpoint: str, | |
): | |
# strip any leading slash on folder_path for uploads | |
if folder_path.is_absolute(): | |
folder_path = folder_path.relative_to("/") | |
# localize the file based on solicitation number + idx in resourceLinks list | |
local_path = f"{solicitation_number}_{idx}" | |
subprocess.run(["wget", doc_url, "-O", local_path]) | |
# Check if the filetype matches a known set of file types | |
file_type = subprocess.run( | |
["file", local_path], stdout=subprocess.PIPE | |
).stdout.decode("utf-8") | |
if "PDF" in file_type: | |
remote_path = folder_path / f"{local_path}.pdf" | |
elif "Microsoft Word" in file_type: | |
remote_path = folder_path / f"{local_path}.docx" | |
elif "Microsoft Excel" in file_type: | |
remote_path = folder_path / f"{local_path}.xlsx" | |
elif "JPEG" in file_type: | |
remote_path = folder_path / f"{local_path}.jpeg" | |
elif "PNG" in file_type: | |
remote_path = folder_path / f"{local_path}.png" | |
else: | |
# Unknown: don't include a file extension | |
print(f"unknown file type: {file_type}") | |
remote_path = folder_path / local_path | |
upload_command = [ | |
"aws", | |
"s3", | |
f"--endpoint-url={endpoint}", | |
"cp", | |
local_path, | |
f"s3://{bucket}/{str(remote_path)}", | |
] | |
try: | |
subprocess.run(upload_command, check=True, stderr=subprocess.PIPE) | |
# Delete the temp input file | |
os.remove(local_path) | |
print(f"uploaded {remote_path}") | |
except subprocess.CalledProcessError as e: | |
print(f"s3 upload subprocess failed for {local_path}: {e}") | |
def worker_wrapper(args): | |
do_upload(*args) | |
def main(): | |
import sys | |
assert len(sys.argv) >= 3, f"{__file__} opportunities.json bucket_name directory" | |
[fname, bucket, directory] = sys.argv[1:] | |
directory = Path(directory) | |
with open(fname, "r") as f: | |
opportunity_data = json.load(f) | |
for opportunity in opportunity_data["opportunitiesData"]: | |
solicitation_number = opportunity["solicitationNumber"] | |
links = opportunity["resourceLinks"] | |
if type(links) != list: | |
continue | |
# Submit to the processing pool to download all files | |
tasks = [ | |
(solicitation_number, idx, link, directory, bucket, WASABI_ENDPOINT) | |
for (idx, link) in enumerate(links) | |
] | |
with Pool(16) as pool: | |
results = [pool.apply_async(do_upload, task) for task in tasks] | |
for res in results: | |
res.get() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment