Created
July 21, 2019 15:26
-
-
Save mitsuhiko/ee8f506445e06679b32934bc04c8d5b2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import os | |
import io | |
import json | |
import lzma | |
import email | |
import pickle | |
import hashlib | |
import tarfile | |
import binascii | |
import arfile | |
import base64 | |
from functools import update_wrapper | |
from urllib.request import urlopen | |
from urllib.parse import urljoin | |
from itertools import chain | |
debug_file_re = re.compile(r'/\.build-id/([a-f0-9]{2})/([a-f0-9]{10,})\.debug$') | |
embedded_version_re = re.compile(r'^(.*?)\s+\((.*?)\)$') | |
def parse_package_file(f): | |
buf = [] | |
for line in chain(f, [b'']): | |
line = line.rstrip(b'\r\n') | |
if not line: | |
if buf: | |
yield dict(email.message_from_bytes(b'\n'.join(buf)).items()) | |
buf = [] | |
else: | |
buf.append(line) | |
class DebugFile(object): | |
def __init__(self, build_id, data): | |
self.build_id = build_id | |
self.data = data | |
def __repr__(self): | |
return '<DebugFile build_id=%r (%d bytes)>' % ( | |
self.build_id, | |
len(self.data), | |
) | |
class PackageRef(object): | |
def __init__(self, dist, component, name, arch, version, source, debug_archive_url): | |
self.dist = dist | |
self.component = component | |
self.name = name | |
self.arch = arch | |
self.version = version | |
self.source = source | |
self.debug_archive_url = debug_archive_url | |
@property | |
def source_package_ptr(self): | |
match = embedded_version_re.match(self.source) | |
if match is not None: | |
return match.groups() | |
return self.arch, self.source, self.version | |
def iter_debug_files(self): | |
with urlopen(self.debug_archive_url) as f: | |
ar = arfile.ArFile(fileobj=io.BytesIO(f.read())) | |
zdata = ar.extractfile('data.tar.xz') | |
archive = tarfile.open(fileobj=lzma.LZMAFile(zdata)) | |
for info in archive.getmembers(): | |
match = debug_file_re.search(info.name) | |
if match is None: | |
continue | |
build_id = match.group(1) + match.group(2) | |
yield DebugFile(build_id, archive.extractfile(info).read()) | |
def iter_binaries(self): | |
with urlopen(self.binary_archive_url) as f: | |
pass | |
def __repr__(self): | |
return '<PackageRef dist=%r component=%r name=%r arch=%r version=%r>' % ( | |
self.dist, | |
self.component, | |
self.name, | |
self.arch, | |
self.version, | |
) | |
class Release(object): | |
def __init__(self, repo, dist): | |
self.repo = repo | |
self.dist = dist | |
with urlopen(self.base_url + '/Release') as f: | |
meta = email.message_from_binary_file(f) | |
self.release_info = dict(meta.items()) | |
@property | |
def base_url(self): | |
return '%sdists/%s/' % ( | |
self.repo.binary_index, | |
self.dist, | |
) | |
@property | |
def debug_base_url(self): | |
return '%sdists/%s-debug/' % ( | |
self.repo.debug_index, | |
self.dist, | |
) | |
@property | |
def archs(self): | |
archs = self.release_info['Architectures'].split() | |
return frozenset(x for x in archs if x in self.repo.archs) | |
@property | |
def components(self): | |
return frozenset(self.release_info['Components'].split()) | |
def iter_build_id_indexes(self): | |
for component in self.components: | |
for arch in self.archs: | |
yield self._get_build_id_index(component, arch) | |
def iter_non_debug_packages(self): | |
for component in self.components: | |
for arch in self.archs: | |
url = urljoin(self.base_url, '%s/binary-%s/Packages.xz' % ( | |
component, | |
arch, | |
)) | |
for package in self._iter_packages(url): | |
yield package | |
url = urljoin(self.debug_base_url, '%s/binary-%s/Packages.xz' % ( | |
component, | |
arch, | |
)) | |
for package in self._iter_packages(url): | |
if not package.get('Build-Ids'): | |
yield package | |
def _iter_packages(self, url): | |
with urlopen(url) as zf: | |
with lzma.LZMAFile(zf) as f: | |
for package in parse_package_file(f): | |
yield package | |
def _get_build_id_index(self, component, arch): | |
url = urljoin(self.debug_base_url, '%s/binary-%s/Packages.xz' % ( | |
component, | |
arch, | |
)) | |
index = {} | |
for package in self._iter_packages(url): | |
package_ref = PackageRef( | |
dist=self.dist, | |
component=component, | |
arch=arch, | |
name=package['Package'], | |
version=package['Version'], | |
source=package['Source'], | |
debug_archive_url=urljoin(self.repo.debug_index, package['Filename']), | |
) | |
for build_id in package.get('Build-Ids', '').split(): | |
index[build_id] = package_ref | |
return index | |
def __repr__(self): | |
return '<Release dist=%r archs=%r components=%r>' % ( | |
self.dist, | |
sorted(self.archs), | |
sorted(self.components), | |
) | |
class Repository(object): | |
def __init__(self, binary_index, debug_index, dists, archs, cache_dir): | |
self.binary_index = binary_index | |
self.debug_index = debug_index | |
self.dists = dists | |
self.archs = archs | |
self.cache_dir = cache_dir | |
def build_index(self, filename): | |
build_ids = {} | |
packages = {} | |
for release in self.iter_releases(): | |
all_packages = {} | |
for package in release.iter_non_debug_packages(): | |
all_packages[package['Architecture'], package['Package'], package['Version']] = urljoin( | |
self.binary_index, package['Filename']) | |
if 'Source' in package: | |
match = embedded_version_re.match(package['Source']) | |
if match is not None: | |
all_packages[(package['Architecture'],) + match.groups()] = urljoin( | |
self.binary_index, package['Filename']) | |
for index in release.iter_build_id_indexes(): | |
for build_id, pkg_ref in index.items(): | |
package_key = base64.urlsafe_b64encode(hashlib.md5( | |
pkg_ref.debug_archive_url.encode('utf-8')).digest()).decode('utf-8').rstrip('=') | |
build_ids[build_id] = package_key | |
if package_key not in packages: | |
packages[package_key] = { | |
'dbg': pkg_ref.debug_archive_url, | |
} | |
try: | |
packages[package_key]['bin'] = all_packages[ | |
pkg_ref.source_package_ptr] | |
except LookupError: | |
print('missing', pkg_ref, pkg_ref.source_package_ptr) | |
try: | |
os.makedirs('index/build-id') | |
except OSError: | |
pass | |
for build_id, pkg in build_ids.items(): | |
with open('index/build-id/%s' % build_id, 'w') as f: | |
f.write(pkg) | |
try: | |
os.makedirs('index/pkg') | |
except OSError: | |
pass | |
for pkg, meta in packages.items(): | |
with open('index/pkg/%s' % pkg, 'w') as f: | |
json.dump(meta, f) | |
def iter_releases(self): | |
for dist in self.dists: | |
yield Release(self, dist) | |
debian_official = Repository( | |
binary_index='http://ftp.debian.org/debian/', | |
debug_index='http://debug.mirrors.debian.org/debian-debug/', | |
dists=['stable'], | |
archs=['amd64', 'i386'], | |
cache_dir='cache', | |
) | |
debian_official.build_index('index.json') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment