Created
March 14, 2022 13:10
-
-
Save forkfork/56a0e0db2ce20b10ba4016e1034cb228 to your computer and use it in GitHub Desktop.
simplequeue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Redis = require("ioredis"); | |
const { v4: uuidv4 } = require('uuid'); | |
const redis = new Redis(); | |
const LIFETIME_MS = 1000; | |
let status = 'running'; | |
const processId = uuidv4(); | |
const sleep = (milliseconds) => { | |
return new Promise(resolve => setTimeout(resolve, milliseconds)) | |
} | |
function terminate() { | |
status = 'terminating'; | |
} | |
setTimeout(terminate, 10000, 'terminate'); | |
redis.defineCommand('atomicread', { | |
numberOfKeys: 2, | |
lua: ` | |
local lockowner = redis.call('GET', KEYS[1]) | |
if not lockowner then | |
redis.call('SET', KEYS[1], ARGV[1]) | |
end | |
if not lockowner or lockowner == ARGV[1] then | |
redis.call('PEXPIRE', KEYS[1], 10000) | |
local pending = redis.call('XAUTOCLAIM', KEYS[2], 'grp', 'consumer', 1000, '0-0') | |
if #pending[2] > 0 then | |
return pending[2] | |
end | |
local result = redis.call('XREADGROUP', 'GROUP', 'grp', 'consumer', 'COUNT', 100, 'STREAMS', KEYS[2], '>') | |
if result then | |
return result[1][2] | |
end | |
return {} | |
end | |
return nil | |
`, | |
}); | |
async function mainloop() { | |
while(status == 'running') { | |
let topics = await redis.smembers('streams'); | |
let connections = await redis.smembers('connections'); | |
for (let i=0; i<topics.length; i++) { | |
let topic = topics[i]; | |
let result = await redis.atomicread('consumer:{'+ topic +'}', 'stream:{'+ topic +'}', processId); | |
if (result == undefined) { | |
console.log("another process is active on topic", topic, ", backing off"); | |
continue; | |
} | |
let acks = []; | |
for (let n=0; n<result.length; n++) { | |
// FILTER HERE | |
for (let j=0; j<connections.length; j++) { | |
let params = { | |
ConnectionId: connections[j], | |
Data: JSON.stringify(result[n][1]) | |
}; | |
//await apigwManagementApi.postToConnection(params).promise(); | |
console.log("writing data", params); | |
} | |
acks.push(result[n][0]); | |
} | |
if (acks.length > 0 ) { | |
console.log("acking", acks.length, "messages on topic", topic); | |
await redis.xack('stream:{'+ topic +'}', 'grp', ...acks); | |
} | |
} | |
await sleep(50); | |
} | |
await redis.disconnect(); | |
} | |
mainloop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment