Skip to content

Instantly share code, notes, and snippets.

@dyusupov
Last active May 29, 2019 18:12
Show Gist options
  • Save dyusupov/725d083d6f043d53abf8d61507c7bba3 to your computer and use it in GitHub Desktop.
Save dyusupov/725d083d6f043d53abf8d61507c7bba3 to your computer and use it in GitHub Desktop.
AWS S3 and EdgeFS synchronization via Lambda function
var assert = require('assert');
var qs = require('querystring');
var AWS = require('aws-sdk');
var url = require('url');
var dns = require('dns');
// ======= config section =======
var AWS_ACCESS = "aws_key";
var AWS_SECRET = "aws_secret";
var EDGEFS_PROVIDER = "aws-s3"; // Has to match EdgeFS supported Cloud Provider name
var EDGEFS_ACCESS = "efs_key"; // EdgeFS tenant user S3 access key
var EDGEFS_SECRET = "efs_secret"; // EdgeFS tenant user S3 secret key
var EDGEFS_REGION = "cltest"; // EdgeFS cluster namespace or cofigured S3 origin
var EDGEFS_ENDPOINT = "http://IPADDR:9982"; // EdgeFS S3 service endpoint
var EDGEFS_BUCKET = "bk1"; // EdgeFS configured tenant's bucket
var EDGEFS_MDONLY = "true"; // If enabled then fetch data on demand
// ==============================
function uploadFromStream(s3e, awsRegion, awsBucket, key, headData, context) {
var stream = require('stream');
var pass = new stream.PassThrough();
var meta = {
"cloud-provider": EDGEFS_PROVIDER,
"cloud-region": awsRegion,
"cloud-origin": "https://" + awsBucket + ".s3.amazonaws.com",
"cloud-key": AWS_ACCESS,
"cloud-mdonly": EDGEFS_MDONLY
};
// merge in custom metadata if any
if ("Metadata" in headData) {
Object.keys(headData["Metadata"]).forEach(function (key) {
meta[key] = headData["Metadata"][key];
});
}
s3e.upload({Bucket: EDGEFS_BUCKET, Key: key, Body: pass,
ContentType: headData["ContentType"], Metadata: meta}, {partSize: 5 * 1024 * 1024 * 1024}, function(err, data) {
if (err)
return context.fail(err);
context.succeed('OK');
});
return pass;
}
function syncCreate(region, bucket, key, context) {
var s3 = new AWS.S3({apiVersion: '2006-03-01'});
var s3e = new AWS.S3({apiVersion: '2006-03-01'});
// Read from AWS S3 bucket
s3.config.update({
accessKeyId: AWS_ACCESS,
secretAccessKey: AWS_SECRET,
region: region,
computeChecksums: false
});
// Write to EdgeFS S3 endpoint
s3e.config.update({
httpOptions: { agent: new https.Agent({ rejectUnauthorized: false })},
endpoint: new AWS.Endpoint(EDGEFS_ENDPOINT),
s3BucketEndpoint: false,
s3ForcePathStyle: true,
accessKeyId: EDGEFS_ACCESS,
secretAccessKey: EDGEFS_SECRET,
region: EDGEFS_REGION
});
var params = {Bucket: bucket, Key: key};
s3.headObject(params, function(err, headData) {
if (err)
return context.fail(err);
var readStream = s3.getObject(params).createReadStream();
readStream.pipe(uploadFromStream(s3e, region, bucket, key, headData, context));
});
}
function syncRemove(key, context) {
var s3e = new AWS.S3({apiVersion: '2006-03-01'});
// Remove via EdgeFS S3 endpoint
s3e.config.update({
httpOptions: { agent: new https.Agent({ rejectUnauthorized: false })},
s3BucketEndpoint: false,
s3ForcePathStyle: true,
accessKeyId: EDGEFS_ACCESS,
secretAccessKey: EDGEFS_SECRET,
region: EDGEFS_REGION
});
s3e.endpoint = new AWS.Endpoint(EDGEFS_ENDPOINT);
var req = s3e.deleteObject({Bucket: EDGEFS_BUCKET, Key: key});
req.on('build', function () {
req.httpRequest.headers["x-cloud-delete"] = EDGEFS_PROVIDER;
});
req.send(function(err, data) {
if (err)
return context.fail(err);
context.succeed('OK');
});
}
exports.handler = function(event, context) {
console.log("event %j", event);
// Get the object from the event and show its content type
var bucket = event.Records[0].s3.bucket.name;
// see https://forums.aws.amazon.com/thread.jspa?threadID=215813
var key = Object.keys(qs.decode(event.Records[0].s3.object.key))[0];
// sanity checks
assert(bucket !== undefined, "bucket not present in s3 event");
assert(key !== undefined, "key not present in s3 event");
var sourceIp, endpointHostname;
try {
sourceIp = event.Records[0].requestParameters.sourceIPAddress;
endpointHostname = (new url.URL(EDGEFS_ENDPOINT)).hostname;
} catch (e) {
return context.fail(e);
}
dns.lookup(endpointHostname, function(err, result) {
if (err)
return context.fail(err);
if (sourceIp == result.address) {
console.log("Skip synchronization due to same source update detected");
return context.succeed('OK');
}
console.log("syncing event", event.Records[0].eventName, bucket + "/" + key, "to", EDGEFS_ENDPOINT, EDGEFS_BUCKET);
if (event.Records[0].eventName.match(/ObjectCreated/)) {
var region = event.Records[0].awsRegion;
syncCreate(region, bucket, key, context);
} else if (event.Records[0].eventName.match(/ObjectRemoved/)) {
syncRemove(key, context);
} else {
console.log("Not handling event", event.Records[0].eventName);
return context.succeed('OK');
}
})
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment