Skip to content

Instantly share code, notes, and snippets.

@txbm
Last active February 1, 2018 19:27
Show Gist options
  • Save txbm/288d71cdcaea696fb27ef42fc405420c to your computer and use it in GitHub Desktop.
Save txbm/288d71cdcaea696fb27ef42fc405420c to your computer and use it in GitHub Desktop.
noderdkafka state test harness
'use strict';
const kafka = require('node-rdkafka');
const { CODES: { ERRORS: {
ERR__ASSIGN_PARTITIONS,
ERR__REVOKE_PARTITIONS
}}} = kafka;
const clientId = 'pelias-test-client';
const groupId = 'pelias-test-cg';
const testTopic = 'pelias-test-topic';
const ENV = process.env.ENV || 'local';
const DURATION_MS = 5000;
const producerOpts = {
'socket.keepalive.enable': true,
'dr_cb': true
};
const consumerOpts = {
'group.id': groupId,
'enable.auto.commit': false
};
const config = {
local: {
'metadata.broker.list': '0.0.0.0:9092',
'client.id': clientId
},
nonprod: {
'metadata.broker.list': 'internal-core-kafka-bootstrap-stg-787583943.us-east-1.elb.amazonaws.com',
'client.id': clientId
}
};
const j = (o) => JSON.stringify(o, null, 2);
console.log(kafka.features);
console.log(kafka.librdkafkaVersion);
function runConsumer(sub = false, rb_cb = false) {
console.log(`subscribe: ${sub}`);
console.log(`rebalance_cb: ${rb_cb}`);
return new Promise((resolve, reject) => {
const cfg = Object.assign(
{},
config[ENV],
consumerOpts,
);
let consumer;
cfg.event_cb = (evt) => {
console.log(`librdkafka event: ${evt}`);
};
if (rb_cb) {
cfg.rebalance_cb = (err, ass) => {
console.log(`rb_cb: ${j(err)}`);
// console.log(`rb_cb: ${j(ass)}`);
if (err.code === ERR__ASSIGN_PARTITIONS) {
try { consumer.assign(ass); } catch (e) {
console.log(`rb_cb ass err: ${e.message}`);
resolve();
}
console.log(`rb_cb assign called`);
} else if (err.code == ERR__REVOKE_PARTITIONS) {
try { consumer.unassign(); } catch (e) {
console.log(`rb_cb unass err: ${e.message}`);
try {
consumer.assign();
} catch (e) {
console.log(`rb_cb assnull err: ${e.message}`);
}
resolve();
}
console.log(`rb_cb unassign called`);
} else {
console.error(`rb_cb error: ${j(err)}`);
resolve();
}
};
}
consumer = new kafka.KafkaConsumer(cfg);
consumer.on('ready', () => {
console.log(`rdy_cb: ready`);
if (sub) {
consumer.subscribe([ testTopic ]);
console.log(`rdy_cb: sub called`);
} else {
consumer.assign([
{ topic: testTopic, partition: 0, offset: 4 },
{ topic: testTopic, partition: 1, offset: 4 },
{ topic: testTopic, partition: 2, offset: 4 }
]);
console.log(`rdy_cb: ass called`);
}
consumer.consume();
console.log(`rdy_cb: consume called`);
setTimeout(() => {
console.log('shutdown: starting');
if (sub) {
consumer.unsubscribe();
console.log('shutdown: unsub called');
} else {
consumer.unassign();
console.log('shutdown: unass called');
}
consumer.disconnect((err) => {
console.error(`disconn_cb: ${j(err)}`);
resolve();
});
console.log('shutdown: disconnect called');
}, DURATION_MS);
});
consumer.on('data', (data) => {
console.log(`message: ${data.partition}-${data.offset}`);
console.log(`data: ${data.value.toString()}`);
});
consumer.on('rebalance', (data) => {
console.log(`rb_evt: ${j(data)}`);
});
consumer.on('event', (evt) => {
console.log(`librd_evt: ${j(data)}`);
});
consumer.on('disconnected', (err) => {
console.log(`dc_evt: ${j(err)}`);
resolve();
});
consumer.on('unsubscribed', (evt) => {
console.log(`unsub_evt: ${j(evt)}`);
});
consumer.connect();
});
};
async function run() {
// await runConsumer(true, true);
// await runConsumer(true, false);
await runConsumer(false, false);
}
run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment