Last active
July 19, 2021 23:49
-
-
Save park-brian/e5b9ed2c6a12639b5b02fc8951d91908 to your computer and use it in GitHub Desktop.
Uses parallel batchWrite requests to quickly load data into DynamoDB.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function sleep(ms) { | |
return new Promise((resolve) => setTimeout(resolve, ms)); | |
} | |
async function insertDynamoDBItems(documentClient, tableName, items) { | |
if (!items.length) return; | |
let responses = []; | |
let response = await documentClient | |
.batchWrite({ | |
RequestItems: { | |
[tableName]: items.map((Item) => ({ | |
PutRequest: { Item }, | |
})), | |
}, | |
ReturnConsumedCapacity: 'TOTAL' | |
}) | |
.promise(); | |
responses.push(response) | |
// attempt to reinsert unprocessed items (exponential backoff) | |
let backoff = 100; | |
while (response.UnprocessedItems?.length) { | |
console.warn(`Batch write did not succeed, retrying in ${backoff}ms`); | |
await sleep(backoff); | |
response = await documentClient.batchWrite(response.UnprocessedItems).promise(); | |
responses.push(response) | |
backoff *= 2; | |
} | |
return responses; | |
} | |
async function importDynamoDBTable(documentClient, tableName, items) { | |
const batchSize = 25; | |
const maxBatches = 100; // insert 2500 records at a time | |
let batch = []; | |
let batches = []; | |
let count = 0; | |
async function insertBatches() { | |
count += batches.map(batch => batch.length).reduce((a, b) => a + b); | |
const response = await Promise.all( | |
batches.map(batch => | |
insertDynamoDBItems(documentClient, tableName, batch) | |
) | |
); | |
batches = []; | |
return response; | |
} | |
// remove await if not using an async iterator | |
for await (const item of items) { | |
batch.push(item); | |
if (batch.length >= batchSize) { | |
batches.push(batch); | |
batch = []; | |
if (batches.length >= maxBatches) { | |
await insertBatches(); | |
} | |
} | |
} | |
// insert remaining records | |
batches.push(batch); | |
await insertBatches(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment