Skip to content

Instantly share code, notes, and snippets.

@dyusupov
Created June 12, 2019 17:04
Show Gist options
  • Save dyusupov/98bb74895caef56ddf90943c4bb7d6e0 to your computer and use it in GitHub Desktop.
Save dyusupov/98bb74895caef56ddf90943c4bb7d6e0 to your computer and use it in GitHub Desktop.
var assert = require('assert');
var async = require('async');
var debug = require('debug')('edgex-stream-test');
var http = require('http');
var Agent = require('agentkeepalive');
var keepaliveAgent = new Agent({
maxSockets: 1,
maxFreeSockets: 10,
timeout: 60000,
keepAliveTimeout: 30000 // free socket keepalive for 30 seconds
});
/*jslint bitwise: true */
var OBJ = 'obj2';
var KVOBJ = 'kvobj';
var CHUNKSIZE = 4096;
var HOST = 'localhost';
var PORT = 3000;
var PATH = '/svcbk1';
var FULLPATH_SS = PATH + '/' + OBJ + '?comp=streamsession';
var FULLPATH_APPEND = PATH + '/' + OBJ + '?comp=appendblock';
var FULLPATH_RANDWR = PATH + '/' + OBJ + '?comp=randwrblock';
var FULLPATH_KV = PATH + '/' + KVOBJ + '?comp=kv';
var SS_CONT = 0x00;
var SS_FIN = 0x01;
var SS_APPEND = 0x02;
var SS_RANDWR = 0x04;
var SS_KV = 0x08;
var SS_STAT = 0x10;
var CCOW_O_REPLACE = 0x01;
var CCOW_O_CREATE = 0x02;
process.on('uncaughtException', function (err) {
console.log('Got Exception:', err.toString());
});
var randomString = function (len, bits)
{
bits = bits || 36;
var outStr = "", newStr;
while (outStr.length < len) {
newStr = Math.random().toString(bits).slice(2);
outStr += newStr.slice(0, Math.min(newStr.length, (len - outStr.length)));
}
return outStr.toUpperCase();
};
var resCheck = function (res, cb)
{
cb((res.statusCode === 200 || res.statusCode === 206) ? null : res.statusCode);
};
var iovarSrc = [];
for (var i = 0; i < 256; i++) {
iovarSrc.push(new Buffer(randomString(CHUNKSIZE)));
}
var g_opts = {
host: HOST,
port: PORT,
path: FULLPATH_SS,
method: 'POST',
agent: keepaliveAgent,
headers: {
'Content-Type': 'application/octet-stream'
}
};
function g_opts_copy()
{
var opts = {};
Object.keys(g_opts).forEach(function (k) {
if (k === 'headers') {
opts[k] = {};
Object.keys(g_opts.headers).forEach(function (h) {
opts.headers[h] = g_opts.headers[h];
});
} else
opts[k] = g_opts[k];
});
return opts;
}
function write(arr, off, mode, sid, cb)
{
var i, totlen = 0;
var opts = g_opts_copy();
opts.headers['x-ccow-offset'] = off;
if (mode & SS_APPEND) {
opts.path = FULLPATH_APPEND;
opts.method = 'PUT';
} else if (mode & SS_RANDWR) {
opts.path = FULLPATH_RANDWR;
opts.method = 'PUT';
} else {
opts.path = FULLPATH_SS + (mode & SS_FIN ? '&finalize' : '');
opts.method = 'POST';
}
if (sid)
opts.headers['x-session-id'] = sid;
for (i = 0; i < arr.length; i++)
totlen += arr[i].length;
opts.headers['x-ccow-length'] = totlen;
opts.headers['Content-Length'] = totlen;
if (totlen === 0) {
opts.headers['x-ccow-object-oflags'] |= CCOW_O_CREATE | CCOW_O_REPLACE;
opts.headers['x-ccow-chunkmap-btree-order'] = 4;
opts.headers['x-ccow-chunkmap-chunk-size'] = CHUNKSIZE;
}
// console.log("opts.h", opts.headers);
var req = http.request(opts, function (res) {
var h = res.headers;
res.on('data', function (d) {
});
res.on('end', function () {
resCheck(res, function (err) {
if (err)
return cb(err);
cb(null, h);
});
});
});
var off_i = off;
for (i = 0; i < arr.length; i++) {
debug("sent", arr[i].length, "offset", off_i, "0:8c", arr[i].slice(0, 8).toString('base64'));
off_i += arr[i].length;
req.write(arr[i]);
}
req.end();
}
function read(off, len, mode, sid, cb)
{
var opts = g_opts_copy();
opts.method = (mode & SS_STAT) ? 'HEAD' : 'GET';
opts.headers['x-ccow-offset'] = off;
opts.headers['x-ccow-length'] = len;
opts.path = FULLPATH_SS + (mode & SS_FIN ? '&finalize' : '');
if (sid)
opts.headers['x-session-id'] = sid;
opts.headers['Content-Length'] = 0;
// console.log('opts.h', opts.headers);
var req = http.request(opts, function (res) {
var h = res.headers;
var content = [];
res.on('data', function (d) {
debug("received", d.length, "0:8c", d.slice(0, 8).toString('base64'));
content.push(d);
});
res.on('end', function () {
// console.log('h', h);
var buf = Buffer.concat(content);
debug("received total", buf.length);
resCheck(res, function (err) {
assert.ifError(err);
cb(null, buf, h);
});
});
res.on('error', function (err) {
console.log('read error', err);
cb(err);
});
});
req.end();
}
function kvlist(path, key, count, values, fmt, cb)
{
var opts = g_opts_copy();
opts.method = 'GET';
opts.path = path ? path : FULLPATH_KV;
opts.path += '&key=' + (key ? key : "");
if (count)
opts.path += '&maxresults=' + count;
if (values)
opts.path += '&values=' + values;
opts.headers['Content-Type'] = fmt;
opts.headers['Content-Length'] = 0;
var req = http.request(opts, function (res) {
var h = res.headers;
var content = [];
res.on('data', function (d) {
debug("received", d.length, "0:8c", d.slice(0, 8).toString('base64'));
content.push(d);
});
res.on('end', function () {
// DEBUG: console.log('h', h);
var buf = Buffer.concat(content);
debug("received total", buf.length);
resCheck(res, function (err) {
if (err == 404)
return cb(err);
assert.ifError(err);
cb(null, buf, h);
});
});
res.on('error', function (err) {
cb(err);
});
});
req.end();
}
function kvop(path, method, key, values, fmt, mode, sid, meta, cb)
{
var opts = g_opts_copy();
opts.method = method;
opts.path = path ? path : FULLPATH_KV;
opts.path += (mode & SS_FIN ? '&finalize' : '');
if (key)
opts.path += '&key=' + (key ? key : "");
opts.headers['Content-Type'] = fmt;
if (sid)
opts.headers['x-session-id'] = sid;
opts.headers['Content-Length'] = values.length;
if (method === 'POST' && values.length === 0) {
opts.headers['x-ccow-object-oflags'] |= CCOW_O_CREATE | CCOW_O_REPLACE;
opts.headers['x-ccow-chunkmap-btree-order'] = 4;
opts.headers['x-ccow-chunkmap-chunk-size'] = CHUNKSIZE;
if (meta) {
Object.keys(meta).forEach(function (key) {
opts.headers['x-amz-meta-' + key] = meta[key];
});
}
}
// console.log('opts', opts);
var req = http.request(opts, function (res) {
var h = res.headers;
debug("headers", h);
res.on('data', function (d) {
});
res.on('end', function () {
resCheck(res, function (err) {
if (err)
return cb(err);
cb(null, h);
});
});
});
if (values)
req.write(values);
req.end();
}
function bufsEqual(bufA, bufB) {
var len = bufA.length;
if (len !== bufB.length) {
console.log("buf len", len, "!==", bufB.length);
return false;
}
for (var i = 0; i < len; i++) {
if (bufA.readUInt8(i) !== bufB.readUInt8(i)) {
console.log("byte", i, "is different", bufA.readUInt8(i), bufB.readUInt8(i));
console.log('bufA', bufA);
console.log('bufB', bufB);
return false;
}
}
return true;
}
function writeVerifyStream(arr, off, mode, sid, cb)
{
var i, buf = Buffer.concat(arr);
write(arr, off, (sid && mode !== SS_FIN ? SS_CONT : mode), sid, function (err, wh) {
assert.ifError(err);
if (sid)
assert.equal(sid, wh['x-session-id']);
else if (mode === SS_CONT)
sid = wh['x-session-id'];
read(off, buf.length, mode, sid, function (err, content, rh) {
assert.ifError(err);
if (mode === SS_CONT && sid)
assert.equal(sid, rh['x-session-id']);
assert.equal(buf.length, content.length);
assert.ok(bufsEqual(buf, content));
// DEBUG: console.log(content.toString());
cb(null, sid);
});
});
}
function writeVerify(arr, off, cb)
{
writeVerifyStream(arr, off, SS_FIN, null, function (err) {
cb(err);
});
}
describe('SS_FIN', function () {
it('can create new empty object', function (done) {
write([], 0, SS_FIN, null, done);
});
it('can overwrite with 3 chunks at offset', function (done) {
writeVerify(iovarSrc.slice(0, 3), 0, done);
});
it('can append with 4 chars at the end', function (done) {
writeVerify([new Buffer('abcd')], 3 * CHUNKSIZE, done);
});
it('can read second chunk of object', function (done) {
read(CHUNKSIZE, CHUNKSIZE, SS_FIN, null, function (err, content, rh) {
assert.ifError(err);
assert.equal(content.length, CHUNKSIZE);
assert.ok(bufsEqual(content, iovarSrc[1]));
done();
});
});
it('can overwrite at unalined offset same chunk', function (done) {
writeVerify(iovarSrc.slice(1, 2), 2048, done);
});
it('can overwrite at unalined offset last chunk', function (done) {
writeVerify(iovarSrc.slice(0, 1), 10240, done);
});
it('can overwrite at unalined offset affecting 3 chunks', function (done) {
writeVerify(iovarSrc.slice(0, 2), 2048, done);
});
it('can write to a new offset', function (done) {
writeVerify(iovarSrc.slice(3, 8), 16384, done);
});
it('can overwrite with more then 8 chunks', function (done) {
writeVerify(iovarSrc.slice(0, 16), 0, done);
});
it('can overwrite with more then 8 chunks in the middle', function (done) {
writeVerify(iovarSrc.slice(2, 14), 8192, done);
});
it('can overwrite with more then 8 chunks in the middle and unalinged start', function (done) {
writeVerify(iovarSrc.slice(2, 14), 10240, done);
});
it('can overwrite with more then 8 chunks in the middle and unalinged end', function (done) {
var buf = iovarSrc.slice(2, 14);
buf[11] = buf[11].slice(0, 2048);
writeVerify(buf, 8192, done);
});
it('can normal overwrite 128 chunks', function (done) {
var bufs = iovarSrc.slice(0, 128);
async.forEachSeries(bufs, function (buf, next) {
writeVerifyStream([buf], CHUNKSIZE, SS_FIN, null, function (err, retSID) {
assert.ifError(err);
next();
});
}, done);
});
it('can normal read 128 chunks', function (done) {
var bufs = iovarSrc.slice(0, 128);
var i = 0;
async.forEachSeries(bufs, function (buf, next) {
read(i * CHUNKSIZE, CHUNKSIZE, SS_FIN, null, function (err, content, rh) {
assert.ifError(err);
assert.equal(content.length, CHUNKSIZE);
assert.ok(bufsEqual(content, iovarSrc[i]));
i++;
done();
});
});
});
});
describe('SS_CONT', function () {
var sid = null;
it('can create new empty object', function (done) {
write([], 0, SS_FIN, null, done);
});
it('initiate empty object and continue', function (done) {
write([], 0, SS_CONT, sid, function (err, rh) {
assert.ifError(err);
sid = rh['x-session-id'];
console.log("sid: " + sid);
done();
});
});
it('write in the middle of the first chunk and continue', function (done) {
var buf = iovarSrc.slice(0, 1)[0].slice(2144, 2244);
write([buf], 2144, SS_CONT, sid, function (err, rh) {
assert.ifError(err);
read(2144, 100, SS_CONT, sid, function (err, content, rh) {
assert.ifError(err);
assert.ok(bufsEqual(content, buf));
done();
});
});
});
it('write close to the beginning of the first chunk and continue', function (done) {
var buf = iovarSrc.slice(0, 1)[0].slice(100, 200);
write([buf], 100, SS_CONT, sid, function (err, rh) {
assert.ifError(err);
read(100, 100, SS_CONT, sid, function (err, content, rh) {
assert.ifError(err);
assert.ok(bufsEqual(content, buf));
done();
});
});
});
it('verify logical size and continue', function (done) {
read(0, 0, SS_STAT, sid, function (err, content, rh) {
assert.ifError(err);
assert.equal(rh['x-ccow-logical-size'], 2244);
done();
});
});
it('can stat 50 times while not finalized', function (done) {
var range = [], i = 50;
while (i--) range[i] = i;
async.forEachSeries(range, function (i, next) {
read(0, 0, SS_STAT, sid, function (err, content, rh) {
assert.ifError(err);
next();
});
}, done);
});
it('can write few buffers to an open sid and continue', function (done) {
var bufs = iovarSrc.slice(0, 4);
var i = 0;
async.forEachSeries(bufs, function (buf, next) {
write([buf], i * CHUNKSIZE, SS_CONT, sid, function (err, rh) {
assert.ifError(err);
assert.equal(sid, rh['x-session-id']);
i++;
next();
});
}, function () {
done();
});
});
it('can read 50 times while not finalized', function (done) {
var range = [], i = 50;
while (i--) range[i] = i;
async.forEachSeries(range, function (i, next) {
read(0, 4 * CHUNKSIZE, SS_CONT, sid, function (err, content, rh) {
assert.ifError(err);
assert.equal(content.length, 4 * CHUNKSIZE);
for (var j = 0; j < 4; j++)
assert.ok(bufsEqual(content.slice(j * CHUNKSIZE, (j + 1) * CHUNKSIZE), iovarSrc[j]));
next();
});
}, done);
});
it('can stream overwrite 64 chunks', function (done) {
var bufs = iovarSrc.slice(0, 64);
var i = 0;
async.forEachSeries(bufs, function (buf, next) {
writeVerifyStream([buf], i * CHUNKSIZE, i === 63 ? SS_FIN : SS_CONT, sid, function (err, retSID) {
assert.ifError(err);
sid = retSID;
i++;
next();
});
}, function (err) {
// node fires 'end' event too fast in finalize then read case
setTimeout(function () {
done(err);
}, 100);
});
});
it('can open new stream and read 8 chunks then re-read 3 in the middle', function (done) {
var buf = Buffer.concat(iovarSrc.slice(0, 8));
sid = null;
read(0, 8 * CHUNKSIZE, SS_CONT, sid, function (err, content, rh) {
assert.ifError(err);
assert.ok(bufsEqual(content, buf));
buf = Buffer.concat(iovarSrc.slice(3, 6));
sid = rh['x-session-id'];
console.log("sid: " + sid);
read(3 * CHUNKSIZE, 3 * CHUNKSIZE, SS_CONT, sid, function (err, content, rh) {
assert.ifError(err);
assert.ok(bufsEqual(content, buf));
buf = Buffer.concat(iovarSrc.slice(6, 9));
read(6 * CHUNKSIZE, 3 * CHUNKSIZE, SS_CONT, sid, function (err, content, rh) {
assert.ifError(err);
assert.ok(bufsEqual(content, buf));
done();
});
});
});
});
it('can stream read 64 chunks one by one', function (done) {
var bufs = [];
for (var j = 0; j < 64; j++) bufs.push(j);
var i = 0;
sid = null;
async.forEachSeries(bufs, function (buf, next) {
read(i * CHUNKSIZE, CHUNKSIZE, i === 63 ? SS_FIN : SS_CONT, sid, function (err, content, rh) {
assert.ifError(err);
sid = rh['x-session-id'];
if (i == 0)
console.log("sid: " + sid);
assert.equal(content.length, CHUNKSIZE);
assert.ok(bufsEqual(content, iovarSrc[i]));
i++;
next();
});
}, done);
});
it('can stream read 64 chunks all in one op', function (done) {
read(0, 64 * CHUNKSIZE, SS_FIN, null, function (err, content, rh) {
assert.ifError(err);
assert.equal(content.length, 64 * CHUNKSIZE);
for (var j = 0; j < 64; j++)
assert.ok(bufsEqual(content.slice(j * CHUNKSIZE, (j + 1) * CHUNKSIZE), iovarSrc[j]));
done();
});
});
});
describe('SS_APPEND', function () {
it('can create new empty object', function (done) {
write([], 0, SS_FIN, null, done);
});
it('can append 1 chunk to empty object', function (done) {
var bufs = iovarSrc.slice(0, 1);
write(bufs, 0, SS_APPEND, null, function (err) {
assert.ifError(err);
read(0, CHUNKSIZE, SS_FIN, null, function (err, content) {
assert.ifError(err);
assert.equal(content.length, CHUNKSIZE);
assert.ok(bufsEqual(content, iovarSrc[0]));
done();
});
});
});
it('can overwrite with 3 chunks at offset 0', function (done) {
writeVerify(iovarSrc.slice(0, 3), 0, done);
});
it('can overwrite with 3 chunks again at offset 0', function (done) {
writeVerify(iovarSrc.slice(0, 3), 0, done);
});
it('can append 1 chunk', function (done) {
var bufs = iovarSrc.slice(3, 4);
write(bufs, 0, SS_APPEND, null, function (err) {
assert.ifError(err);
read(3 * CHUNKSIZE, CHUNKSIZE, SS_FIN, null, function (err, content) {
assert.ifError(err);
assert.equal(content.length, CHUNKSIZE);
assert.ok(bufsEqual(content, iovarSrc[3]));
done();
});
});
});
it('can append 4 chunks', function (done) {
var bufs = iovarSrc.slice(4, 8);
var i = 4;
async.forEachSeries(bufs, function (buf, next) {
write([buf], 0, SS_APPEND, null, function (err) {
assert.ifError(err);
read(i * CHUNKSIZE, CHUNKSIZE, SS_FIN, null, function (err, content) {
assert.ifError(err);
assert.equal(content.length, CHUNKSIZE);
assert.ok(bufsEqual(content, iovarSrc[i]));
next();
});
});
}, done);
});
it('can append unaligned 1k buffer', function (done) {
var buf = iovarSrc[8].slice(0, 1024);
write([buf], 0, SS_APPEND, null, function (err) {
assert.ifError(err);
read(6 * CHUNKSIZE, 2 * CHUNKSIZE + 1024, SS_FIN, null, function (err, content) {
assert.ifError(err);
assert.equal(content.length, 2 * CHUNKSIZE + 1024);
assert.ok(bufsEqual(content.slice(2 * CHUNKSIZE, 2 * CHUNKSIZE + 1024), buf));
done();
});
});
});
it('can append one more unaligned 1k buffer', function (done) {
var buf = iovarSrc[8].slice(1024, 2048);
write([buf], 0, SS_APPEND, null, function (err) {
assert.ifError(err);
read(6 * CHUNKSIZE, 2 * CHUNKSIZE + 2048, SS_FIN, null, function (err, content) {
assert.ifError(err);
assert.equal(content.length, 2 * CHUNKSIZE + 2048);
assert.ok(bufsEqual(content.slice(2 * CHUNKSIZE + 1024, 2 * CHUNKSIZE + 2048), buf));
done();
});
});
});
it('can append one more unaligned chunk crossing two regions', function (done) {
var buf1 = iovarSrc[8].slice(2048, 4096);
var buf2 = iovarSrc[9].slice(0, 4096);
var buf3 = iovarSrc[10].slice(0, 2048);
var buf = Buffer.concat([buf1, buf2, buf3]);
write([buf], 0, SS_APPEND, null, function (err) {
assert.ifError(err);
read(9 * CHUNKSIZE - 2048, 2 * CHUNKSIZE, SS_FIN, null, function (err, content) {
assert.ifError(err);
assert.equal(content.length, 2 * CHUNKSIZE);
assert.ok(bufsEqual(content, buf));
done();
});
});
});
});
describe('SS_RANDWR', function () {
it('can create new empty object', function (done) {
write([], 0, SS_FIN, null, done);
});
it('can write to middle of empty object', function (done) {
writeVerify(iovarSrc.slice(1, 2), 4096, done);
});
it('can write to start of empty object', function (done) {
writeVerify(iovarSrc.slice(0, 1), 0, done);
});
it('can overwrite with 3 chunks at offset 0', function (done) {
writeVerify(iovarSrc.slice(0, 3), 0, done);
});
it('can overwrite with more then 8 chunks in the middle and unalinged start', function (done) {
writeVerify(iovarSrc.slice(2, 14), 10240, done);
});
it('can overwrite with more then 8 chunks in the middle and unalinged end', function (done) {
var buf = iovarSrc.slice(2, 14);
buf[11] = buf[11].slice(0, 2048);
writeVerify(buf, 8192, done);
});
});
describe('SS_KV', function () {
var sid = null;
it('can create new empty KV object', function (done) {
kvop("", "POST", "", "", "application/json", SS_FIN | SS_KV, null,
{ "key1": "value1", "key2": "value2" }, done);
});
it('can read headers', function (done) {
kvop("", "HEAD", "", "", "application/octet-stream", SS_FIN, null, null, function (err, h) {
assert.ifError(err);
assert.equal(h['x-amz-meta-key1'], 'value1');
assert.equal(h['x-amz-meta-key2'], 'value2');
done();
});
});
it('can list empty', function (done) {
kvlist("", "", 100, 0, "text/csv", function (err) {
assert.ifError(err);
done();
});
});
it('can insert one as json', function (done) {
var v = { k1: "v1" };
kvop("", "POST", "", JSON.stringify(v), "application/json", SS_CONT, sid, null, function (err, h) {
assert.ifError(err);
sid = h['x-session-id'];
done();
});
});
it('can insert two as json', function (done) {
var v = { k2: "v2", k3: "v3" };
kvop("", "POST", "", JSON.stringify(v), "application/json", SS_CONT, sid, null, done);
});
it('can insert one as binary and finalize', function (done) {
kvop("", "POST", "k4", "my-data-xxxx", "application/octet-stream", SS_FIN, sid, null, done);
});
it('can get k1', function (done) {
kvlist("", "k1", 1, 1, "text/csv", function (err, buf, hdrs) {
assert.ifError(err);
console.log(buf.toString());
done();
});
});
it('can get k2 and k3', function (done) {
kvlist("", "k2", 2, 1, "text/csv", function (err, buf, hdrs) {
assert.ifError(err);
console.log(buf.toString());
done();
});
});
it('can get k4', function (done) {
kvlist("", "k4", 1, 1, "application/octet-stream", function (err, buf, hdrs) {
assert.ifError(err);
assert.equal(buf.toString(), "my-data-xxxx");
done();
});
});
it('can list four', function (done) {
kvlist("", "", 100, 1, "text/csv", function (err, buf, hdrs) {
console.log(buf.toString());
done(err);
});
});
it('can delete two as json', function (done) {
var v = { k1: "", k2: "" };
kvop("", "DELETE", "", JSON.stringify(v), "application/json", SS_FIN, null, null, done);
});
it('can list two', function (done) {
kvlist("", "", 100, 1, "application/json", function (err, buf, hdrs) {
var json = JSON.parse(buf.toString());
console.log("JSON: " , json);
assert.equal(Object.keys(json).length, 2);
done(err);
});
});
it('can delete one', function (done) {
kvop("", "DELETE", "k4", "", "application/octet-stream", SS_FIN, null, null, done);
});
it('can list bucket as text/csv', function (done) {
kvlist("", "", 100, 1, "text/csv", function (err, buf, hdrs) {
console.log(buf.toString());
done(err);
});
});
it('can list one as JSON', function (done) {
kvlist("", "", 100, 1, "application/json", function (err, buf, hdrs) {
var json = JSON.parse(buf.toString());
console.log("JSON: " , json);
assert.equal(Object.keys(json).length, 1);
done(err);
});
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment