Created
December 8, 2018 01:33
-
-
Save markgajdosik/a4504907caa1228c132aaf59dc42339c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async def import_regular_records( | |
self, role: VOSSRole, direction: Optional[str]): | |
"""Using list API, imports records for a non-relation role. | |
:param role: Selected role to import records for. | |
:param direction: Hierarchy traversal direction or `None` if the | |
default traversal behaviour is required. | |
""" | |
if direction == 'default': | |
direction = None | |
# Install 'voss' additional fields. | |
role.install_additional_fields(common=False) | |
existing_pkids = set(role.records.values_list('uid', flat=True)) | |
received_pkids = set() | |
received_hierarchy_pkids = set() | |
for hierarchy_pkid in self.options['hierarchy_pkids'] or [None]: | |
hierarchy = self.hierarchy_map.get(hierarchy_pkid) | |
url_params = dict(count='true', summary='false', limit=1) | |
if hierarchy_pkid: | |
# We're importing from a specific hierarchy. | |
url_params['hierarchy'] = hierarchy_pkid | |
if direction: | |
# We're overriding the default traversal. | |
url_params['traversal'] = direction | |
# Get first page with 1 record (0 not possible) to get the max. | |
# page size limit and the total number of records. | |
first_page = await self.fetch_page(role, url_params, 0) | |
try: | |
limit = first_page['pagination'].get('maximum_limit') | |
if limit is None: | |
limit = 50 # (fall back to a sane default) | |
self.warning(f'Falling back to page size of {limit} ' | |
f'records for <b>{role}</b>.') | |
url_params['limit'] = limit | |
total = first_page['pagination']['total'] | |
except KeyError: | |
self.error(f'Cannot determine page size limit or the total ' | |
f'number of records for <b>{role}</b>. Skipping.') | |
return | |
self.info(f'Importing {total} <b>{role}</b> record/s from ' | |
f'<u>%s</u> (page size: {limit} records).', | |
f'{hierarchy[HFP]}.{hierarchy["name"]}' | |
if hierarchy_pkid else 'all hierarchies') | |
import_progress = self.log.append_progress( | |
f'Importing <b>{role}</b> records', total) | |
save_progress = self.log.append_progress( | |
f'Saving <b>{role}</b> records', total) | |
pages = (self.fetch_page(role, url_params, offset, import_progress) | |
for offset in range(0, total, limit)) | |
rows = [] | |
for page in await asyncio.gather(*pages): | |
for row in page.get('resources', ()): | |
try: | |
pkid = row['pkid'] | |
if pkid in received_pkids: | |
continue # (already got this; next record) | |
received_pkids.add(pkid) | |
received_hierarchy_pkids.add( | |
row['meta']['references']['parent'][0]['pkid']) | |
except KeyError as field: | |
self.error(f'Got a record with missing {field} field.', | |
detail=json_as_html(field)) | |
continue | |
if pkid not in existing_pkids: | |
rows.append(row) | |
if len(rows) >= self.MAX_RECORD_BULK_SIZE: | |
self.create_api_records(role, rows, | |
progress=save_progress) | |
rows = [] | |
else: | |
self.update_api_record(role, row) | |
if rows: | |
# Create any remaining records. | |
self.create_api_records(role, rows, progress=save_progress) | |
import_progress.finish() | |
save_progress.finish() | |
# Retrieve records we missed during the pagination. | |
self.log.info(f'Checking for missing <b>{role}</b> records.') | |
expected_pkids = (self.fetch_pkids(role, hierarchy_pkid) | |
for hierarchy_pkid in received_hierarchy_pkids) | |
expected_pkids = set(pkid for pkids in | |
await asyncio.gather(*expected_pkids) | |
for pkid in pkids) | |
missed_pkids = expected_pkids - received_pkids | |
if missed_pkids: | |
self.warning(f'Missed {len(missed_pkids)} <b>{role}</b> record/s.') | |
import_progress = self.log.append_progress( | |
'Importing missed records', len(missed_pkids)) | |
save_progress = self.log.append_progress( | |
'Saving missed records', len(missed_pkids)) | |
missed_records = (self.fetch_record(role, pkid, import_progress) | |
for pkid in missed_pkids) | |
rows = [] | |
for row in await asyncio.gather(*missed_records): | |
if row['pkid'] not in existing_pkids: | |
rows.append(row) | |
if len(rows) >= self.MAX_RECORD_BULK_SIZE: | |
self.create_api_records(role, rows, | |
progress=save_progress) | |
rows = [] | |
else: | |
self.update_api_record(role, row) | |
if rows: | |
# Create any remaining missing records. | |
self.create_api_records(role, rows, progress=save_progress) | |
else: | |
self.info(f'No <b>{role}</b> records were missed.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment