如何将超过25个项目/行写入DynamoDB的表格?
问题的出现的原因是DynamoDB在单个BatchWriteItem请求中只能发送最多25个项目,但可以同时发送多个BatchWriteItem请求。为了加快速度,可以将这20,000行数据分成多个线程/进程/主机,并以并行方式将其推送到数据库。此外,还可以使用AWS Data Pipeline从S3中摄取数据,通过创建Hadoop集群自动化地将数据从S3下载并以一系列并行的BatchWriteItem请求发送到DynamoDB。
解决方法是使用并行线程来提高写入速度,同时可以使用AWS Data Pipeline来自动化数据摄取和导入到DynamoDB。此外,还可以通过提前告知DynamoDB每秒需要执行的读写操作数量来设置适当的预配置吞吐量。
如果使用AWS Data Pipeline,需要将所有数据从应用程序输出到S3,然后使用Data Pipeline将数据导入DynamoDB。相对于直接将数据写入DynamoDB,将数据先上传到S3,然后再导入DynamoDB的速度可能会更慢。但如果数据已经存储在其他地方,并且该地方可以是S3,使用Data Pipeline可以避免编写并行传输代码的工作。
解决写入超过25个项目到DynamoDB的问题的方法是使用并行线程将数据分批推送到数据库,并可以使用AWS Data Pipeline自动化数据摄取和导入。预配置吞吐量是设置DynamoDB执行读写操作的速度的重要参数。使用Data Pipeline需要将数据先上传到S3,然后再导入到DynamoDB,相对于直接写入DynamoDB可能会更慢,但可以避免编写并行传输代码的工作。
问题的出现原因是在使用DynamoDB的Table时,需要将超过25个项/行写入Table,但是在给定的代码中没有提供解决方法。要解决这个问题,可以使用lodash库将数据数组切分成大小为25的块,然后使用async库的each/every方法对这些块进行批量写入操作。下面是解决方法的代码示例:
const _ = require('lodash'); const async = require('async'); function putInHistory(data, cb) { var arrayOfArray25 = _.chunk(data, 25); async.every(arrayOfArray25, function(arrayOf25, callback) { var params = { RequestItems: { [TABLES.historyTable]: [] } }; arrayOf25.forEach(function(item){ params.RequestItems[TABLES.historyTable].push({ PutRequest: { Item: item } }); }); docClient.batchWrite(params, function(err, data) { if (err) { console.log(err); callback(err); } else { console.log(data); callback(null, true); } }); }, function(err, result) { if (err) { cb(err); } else { if (result) { cb(null, {allWritten: true}); } else { cb(null, {allWritten: false}); } } }); }
通过使用lodash库的`_.chunk`方法将数据数组分割成大小为25的块,然后使用async库的`every`方法对每个块进行操作。在每个块的操作中,创建一个包含25个项的参数对象,并循环遍历每个项,将其添加到参数对象的`RequestItems`属性中。最后,使用DynamoDB的`batchWrite`方法将参数对象中的项批量写入Table。当所有块都处理完毕后,通过回调函数将结果返回给调用者。
如何将超过25个项目/行写入DynamoDB表?
当我在寻找使用JavaScript SDK完成此操作的代码时,我找不到,所以我自己组合了一下。希望这对其他人有所帮助!
function multiWrite(table, data, cb) { var AWS = require('aws-sdk'); var db = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'}); // 构建批次 var batches = []; var current_batch = []; var item_count = 0; for(var x in data) { // 将项目添加到当前批次 item_count++; current_batch.push({ PutRequest: { Item: data[x] } }); // 如果添加了25个项目,则将当前批次添加到批次数组并重置它 if(item_count%25 == 0) { batches.push(current_batch); current_batch = []; } } // 如果最后一个批次有记录且不等于25,则添加最后一个批次 if(current_batch.length > 0 && current_batch.length != 25) batches.push(current_batch); // 数据库操作的处理程序 var completed_requests = 0; var errors = false; function handler(request) { return function(err, data) { // 增加已完成请求的数量 completed_requests++; // 设置错误标志 errors = (errors) ? true : err; // 如果出现错误,则记录错误 if(err) { console.error(JSON.stringify(err, null, 2)); console.error("导致数据库错误的请求:"); console.error(JSON.stringify(request, null, 2)); } // 如果完成了所有请求,则进行回调 if(completed_requests == batches.length) { cb(errors); } } } // 发出请求 var params; for(x in batches) { // 项目放在params.RequestItems.id数组中 // 项目的格式为{PutRequest: {Item: ITEM_OBJECT}} params = '{"RequestItems": {"' + table + '": []}}'; params = JSON.parse(params); params.RequestItems[table] = batches[x]; // 执行batchWrite操作 db.batchWrite(params, handler(params)); } }
请注意,此代码没有处理每个batchWrite中的错误。例如,您可能需要处理batchWrite结果返回的UnprocessedItems。有关更多信息,请参见docs.aws.amazon.com/amazondynamodb/latest/APIReference/...。