Created
September 13, 2022 14:48
-
-
Save u8sand/e6e04ae31ff4524494616c1bc07a9c12 to your computer and use it in GitHub Desktop.
Streaming downloads from https://api.archs4.maayanlab.cloud/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python3 | |
# python archs4-dl.py series-samples GSE53655 | python archs4-dl.py fetch > GSE53655.tsv | |
import re | |
import json | |
import click | |
import urllib.request, urllib.parse, urllib.error | |
from tqdm import tqdm | |
base_url = 'https://api.archs4.maayanlab.cloud' | |
chunk_size = 8192 | |
@click.group() | |
def cli(): | |
pass | |
@cli.command() | |
@click.option('-o', '--output-file', type=click.File('w'), default='-') | |
@click.argument('series_id') | |
def series_samples(series_id, output_file): | |
with tqdm(unit=' samples') as t: | |
t.set_description('Resolving samples') | |
req = urllib.request.Request(f"{base_url}/meta/samples/geo_accession?{urllib.parse.urlencode(dict(series_id=series_id, skip=0, limit=100))}") | |
try: | |
res = urllib.request.urlopen(req) | |
except urllib.error.HTTPError as e: | |
raise click.ClickException(message=f"Series ID ({series_id}) Not Found") from e | |
start, stop, count = map(int, re.match(r'^(\d+)-(\d+)/(\d+)$', res.info()['content-range']).groups()) | |
samples = json.load(res) | |
print(*samples, sep='\n', file=output_file) | |
t.reset(count) | |
t.update(stop - start) | |
while stop != count: | |
req = urllib.request.Request(f"{base_url}/meta/samples/geo_accession?{urllib.parse.urlencode(dict(series_id=series_id, skip=stop, limit=100))}") | |
try: | |
res = urllib.request.urlopen(req) | |
except urllib.error.HTTPError as e: | |
raise click.ClickException(message=f"Series ID ({series_id}) Not Found") from e | |
start, stop, count = map(int, re.match(r'^(\d+)-(\d+)/(\d+)$', res.info()['content-range']).groups()) | |
samples = json.load(res) | |
print(*samples, sep='\n', file=output_file) | |
t.update(stop - start) | |
@cli.command() | |
@click.option('-i', '--samples-from', type=click.File('r'), default='-') | |
@click.option('-o', '--output-file', type=click.File('w'), default='-') | |
def fetch(samples_from, output_file): | |
with tqdm(unit=' sample') as t: | |
t.set_description('Loading samples') | |
geo_accession = [ | |
sample_id | |
for sample_id in map(str.strip, samples_from) | |
if sample_id and t.update(1) is None | |
] | |
t.reset(total=len(geo_accession) + 1) | |
t.set_description('Fetching samples') | |
req = urllib.request.Request( | |
f"{base_url}/data/expression/T", | |
headers={ | |
'Accept': 'text/tsv', | |
'Content-Type': 'application/json', | |
}, | |
data=json.dumps(dict( | |
geo_accession=geo_accession, | |
)).encode('utf-8'), | |
) | |
res = urllib.request.urlopen(req) | |
while chunk := res.read(chunk_size): | |
chunk = chunk.decode('utf-8') | |
output_file.write(chunk) | |
t.update(chunk.count('\n')) | |
if __name__ == '__main__': | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment