Skip to content

Instantly share code, notes, and snippets.

@mgabeler-lee-6rs
Last active August 20, 2020 05:11
Show Gist options
  • Save mgabeler-lee-6rs/69a9b0e368d061b1935a7aa428e217b6 to your computer and use it in GitHub Desktop.
Save mgabeler-lee-6rs/69a9b0e368d061b1935a7aa428e217b6 to your computer and use it in GitHub Desktop.
Demo of applying retry policy to subscription
2020-08-20T05:08:10.567Z Creating resources for legacy
2020-08-20T05:08:10.573Z Creating topic
2020-08-20T05:08:18.934Z Getting topic metadata
2020-08-20T05:08:19.074Z Updating metadata
2020-08-20T05:08:19.608Z Creating subscription
2020-08-20T05:08:21.534Z Configuring resources for legacy
2020-08-20T05:08:21.668Z No retry policy set, adding it
2020-08-20T05:08:22.152Z Trying nack/ack for legacy
2020-08-20T05:08:22.153Z Activating subscription
2020-08-20T05:08:22.156Z Sending a message
2020-08-20T05:08:22.280Z Sent
2020-08-20T05:08:23.960Z { delay: 1807 } Got a message
2020-08-20T05:08:23.962Z ... Nacking it
2020-08-20T05:08:27.345Z { delay: 3385 } Got a message
2020-08-20T05:08:27.345Z ... Nacking it
2020-08-20T05:08:30.749Z { delay: 3404 } Got a message
2020-08-20T05:08:30.749Z ... Nacking it
2020-08-20T05:08:34.152Z { delay: 3403 } Got a message
2020-08-20T05:08:34.152Z ... Nacking it
2020-08-20T05:08:38.556Z { delay: 4404 } Got a message
2020-08-20T05:08:38.556Z ... Nacking it
2020-08-20T05:08:42.959Z { delay: 4403 } Got a message
2020-08-20T05:08:42.960Z ... Nacking it
2020-08-20T05:08:48.365Z { delay: 5406 } Got a message
2020-08-20T05:08:48.365Z ... Nacking it
2020-08-20T05:08:53.770Z { delay: 5405 } Got a message
2020-08-20T05:08:53.771Z ... Nacking it
2020-08-20T05:09:00.176Z { delay: 6406 } Got a message
2020-08-20T05:09:00.176Z ... Nacking it
2020-08-20T05:09:07.584Z { delay: 7408 } Got a message
2020-08-20T05:09:07.584Z ... Nacking it
2020-08-20T05:09:17.993Z { delay: 10409 } Got a message
2020-08-20T05:09:17.994Z ... Acking it
2020-08-20T05:09:17.995Z { numSeen: 11, nacks: -1, seen: 11, duration: 55715 } Got results
2020-08-20T05:09:17.995Z Deactivating subscription
2020-08-20T05:09:17.998Z Deleting resources for legacy
2020-08-20T05:09:19.225Z Done with legacy
2020-08-20T05:09:19.225Z Creating resources for modern
2020-08-20T05:09:19.225Z Creating topic
2020-08-20T05:09:54.754Z Getting topic metadata
2020-08-20T05:09:54.891Z Updating metadata
2020-08-20T05:09:55.404Z Creating subscription
2020-08-20T05:09:57.676Z Configuring resources for modern
2020-08-20T05:09:57.824Z Sub retry policy already set correctly
2020-08-20T05:09:57.824Z Trying nack/ack for modern
2020-08-20T05:09:57.825Z Activating subscription
2020-08-20T05:09:57.825Z Sending a message
2020-08-20T05:09:57.947Z Sent
2020-08-20T05:09:59.652Z { delay: 1827 } Got a message
2020-08-20T05:09:59.653Z ... Nacking it
2020-08-20T05:10:03.052Z { delay: 3400 } Got a message
2020-08-20T05:10:03.052Z ... Nacking it
2020-08-20T05:10:06.457Z { delay: 3405 } Got a message
2020-08-20T05:10:06.457Z ... Nacking it
2020-08-20T05:10:09.861Z { delay: 3404 } Got a message
2020-08-20T05:10:09.861Z ... Nacking it
2020-08-20T05:10:13.264Z { delay: 3403 } Got a message
2020-08-20T05:10:13.265Z ... Nacking it
2020-08-20T05:10:17.668Z { delay: 4404 } Got a message
2020-08-20T05:10:17.668Z ... Nacking it
2020-08-20T05:10:22.072Z { delay: 4404 } Got a message
2020-08-20T05:10:22.072Z ... Nacking it
2020-08-20T05:10:27.480Z { delay: 5408 } Got a message
2020-08-20T05:10:27.480Z ... Nacking it
2020-08-20T05:10:35.887Z { delay: 8407 } Got a message
2020-08-20T05:10:35.887Z ... Nacking it
2020-08-20T05:10:45.298Z { delay: 9411 } Got a message
2020-08-20T05:10:45.298Z ... Nacking it
2020-08-20T05:10:53.707Z { delay: 8409 } Got a message
2020-08-20T05:10:53.707Z ... Acking it
2020-08-20T05:10:53.707Z { numSeen: 11, nacks: -1, seen: 11, duration: 55760 } Got results
2020-08-20T05:10:53.707Z Deactivating subscription
2020-08-20T05:10:53.708Z Deleting resources for modern
2020-08-20T05:10:54.908Z Done with modern
/* eslint-disable no-await-in-loop */
import { PubSub, Topic, Subscription, Message, SubscriptionMetadata } from '@google-cloud/pubsub';
import _ from 'lodash';
type OptName = 'PUBSUB_PROJECT_ID' | 'TOPIC_NAME' | 'SUB_NAME';
type Opts = Record<OptName, string>;
function log(...args: any[]) {
// eslint-disable-next-line no-console
return console.log(new Date(), ...args);
}
function error(err: Error, ...args: any[]) {
// eslint-disable-next-line no-console
return console.error(new Date(), err, ...args);
}
function initEnvironment(): Opts {
return _.pick(
_.defaults(process.env, {
PUBSUB_PROJECT_ID: 'my-project',
TOPIC_NAME: 'my-topic',
SUB_NAME: 'my-subscription',
}),
'PUBSUB_PROJECT_ID',
'TOPIC_NAME',
'SUB_NAME',
);
}
async function initPubSub(opts: Opts) {
const pubSub = new PubSub({
// ...?
});
// cleanup after any prior run that failed
const topic = pubSub.topic(opts.TOPIC_NAME);
const sub = topic.subscription(opts.SUB_NAME);
if ((await sub.exists())[0]) {
log('Removing leftover subscription');
await sub.delete();
}
if ((await topic.exists())[0]) {
log('Removing leftover topic');
await topic.delete();
}
return pubSub;
}
const retryPolicy: SubscriptionMetadata['retryPolicy'] = {
// using strings for `seconds` props so we can compare this against
// the `getMetadata` return
minimumBackoff: {
seconds: '1',
nanos: 0,
},
maximumBackoff: {
seconds: '60',
nanos: 0,
},
};
async function createTopicAndSubscription({
opts,
pubSub,
mode,
}: {
opts: Opts;
pubSub: PubSub;
mode: 'legacy' | 'modern';
}) {
log('Creating topic');
const [topic] = await pubSub.topic(opts.TOPIC_NAME).create({
// why can't we put the labels metadata in here?
});
log('Getting topic metadata');
const [meta] = await topic.getMetadata();
// be a good citizen and make clear this is a curiosity hack
if (meta.labels?.['squad_owner'] !== 'curiosity') {
log('Updating metadata');
await topic.setMetadata({
// keys not present in this will be unmodified
labels: {
...(meta.labels || {}),
squad_owner: 'curiosity',
},
});
}
log('Creating subscription');
let sub;
switch (mode) {
case 'legacy':
// create the sub with default settings
[sub] = await topic.createSubscription(opts.SUB_NAME, {
labels: { squad_owner: 'curiosity' },
});
break;
case 'modern':
[sub] = await topic.createSubscription(opts.SUB_NAME, {
labels: { squad_owner: 'curiosity' },
retryPolicy,
});
break;
default:
throw new Error('wtf');
}
return { topic, sub };
}
async function reconfigureSubscription(sub: Subscription) {
const [meta] = await sub.getMetadata();
if (meta.retryPolicy) {
if (_.isEqual(meta.retryPolicy, retryPolicy)) {
log('Sub retry policy already set correctly');
return;
} else {
log({ retryPolicy: meta.retryPolicy }, 'Sub retry policy set, but wrong, fixing');
}
} else {
log('No retry policy set, adding it');
}
await sub.setMetadata({
retryPolicy,
});
}
async function deleteTopicAndSubscription({ topic, sub }: { topic: Topic; sub: Subscription }) {
await sub.close();
sub.removeAllListeners('message');
await sub.delete();
await topic.delete();
}
function manualPromise<T>() {
let resolve!: (value: T) => void;
let reject!: (err: any) => void;
const promise = new Promise((_res, _rej) => {
resolve = _res;
reject = _rej;
});
return { promise, resolve, reject };
}
async function tryNackAck({ topic, sub }: { topic: Topic; sub: Subscription }) {
let numSeen = 0;
let nacks = 10;
const { promise: completion, resolve: allReceived } = manualPromise<number>();
const { promise: timeout, reject: onTimeout } = manualPromise<number>();
let lastReceived = Date.now();
function handler(msg: Message) {
const now = Date.now();
log({ delay: now - lastReceived }, 'Got a message');
lastReceived = now;
++numSeen;
if (nacks-- > 0) {
log('... Nacking it');
msg.nack();
} else {
log('... Acking it');
msg.ack();
allReceived(numSeen);
}
}
log('Activating subscription');
sub.on('message', handler);
log('Sending a message');
await topic.publishJSON({});
log('Sent');
const start = Date.now();
const timer = setTimeout(onTimeout, 65_000, new Error('Timed out'));
try {
const seen = await Promise.race([completion, timeout]);
const duration = Date.now() - start;
log({ numSeen, nacks, seen, duration }, 'Got results');
} catch (err) {
const duration = Date.now() - start;
error(err, { numSeen, nacks, duration }, 'Failed');
} finally {
clearTimeout(timer);
}
log('Deactivating subscription');
sub.removeAllListeners('message');
}
async function main() {
try {
const opts = initEnvironment();
const pubSub = await initPubSub(opts);
for (const mode of ['legacy', 'modern'] as const) {
log(`Creating resources for ${mode}`);
const { topic, sub } = await createTopicAndSubscription({ opts, pubSub, mode });
log(`Configuring resources for ${mode}`);
await reconfigureSubscription(sub);
log(`Trying nack/ack for ${mode}`);
await tryNackAck({ topic, sub });
log(`Deleting resources for ${mode}`);
await deleteTopicAndSubscription({ topic, sub });
log(`Done with ${mode}`);
}
} catch (err) {
error(err);
process.exitCode = 1;
}
}
export = main;
if (require.main === module) {
main();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment