Last active
August 11, 2022 21:16
-
-
Save DashBarkHuss/10e45ca377292c5681ff227fbd8f628c to your computer and use it in GitHub Desktop.
bullmq worker and react example converted from heroku example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// run your react frontend on port 3000 | |
import React, { useState, useEffect, useRef } from "react"; | |
function useInterval(callback, delay) { | |
const savedCallback = useRef(); | |
// Remember the latest callback. | |
useEffect(() => { | |
savedCallback.current = callback; | |
}, [callback]); | |
// Set up the interval. | |
useEffect(() => { | |
function tick() { | |
savedCallback.current(); | |
} | |
if (delay !== null) { | |
let id = setInterval(tick, delay); | |
return () => clearInterval(id); | |
} | |
}, [delay]); | |
} | |
export default function App() { | |
// Store for all of the jobs in progress | |
const [jobs, setJobs] = useState([]); | |
const [poll, setPoll] = useState(null); | |
useInterval(updateJobs, poll); | |
// Kick off a new job by POST-ing to the server | |
async function addJob() { | |
let res = await fetch("http://localhost:4000/job/", { method: "POST" }); | |
let job = await res.json(); | |
const jobsCopy = jobs; | |
jobsCopy.push({ id: job.id, state: "queued" }); | |
setJobs(jobsCopy); | |
if (!poll) setPoll(200); | |
render(); | |
} | |
const noJobsActiveOrQueued = (jobsCopy) => | |
poll && !jobsCopy.find((j) => j.state === "active" || j.state === "queued"); | |
async function updateJob(job) { | |
let res = await fetch(`http://localhost:4000/job/${job.id}`); | |
let result = await res.json(); | |
const jobExistsInJobsList = !!jobs.filter((j) => j.id === job.id); | |
if (jobExistsInJobsList) { | |
const jobsCopy = jobs; | |
const i = jobsCopy.findIndex((j) => j.id === result.id); | |
jobsCopy[i] = result; | |
setJobs(jobsCopy); | |
const noActiveOrQueuedJobs = noJobsActiveOrQueued(jobsCopy); | |
if (noActiveOrQueuedJobs) { | |
setPoll(null); | |
} | |
} | |
render(); | |
} | |
// Fetch updates for each job | |
async function updateJobs() { | |
jobs.forEach(async (job) => { | |
await updateJob(job); | |
}); | |
} | |
// Delete all stored jobs | |
function clear() { | |
setJobs([]); | |
} | |
// Update the UI colors | |
function render() { | |
const jobsCopy = jobs.map((job) => renderJob(job)); | |
setJobs(jobsCopy); | |
} | |
// Renders the HTML for each job object | |
function renderJob(job) { | |
let progress = job.progress || 0; | |
let color = "blue"; | |
if (job.state === "completed") { | |
color = "purple"; | |
progress = 100; | |
} else if (job.state === "failed") { | |
color = "red"; | |
progress = 100; | |
} | |
return { id: job.id, state: job.state, progress, color }; | |
} | |
return ( | |
<div> | |
<button onClick={addJob}>add job</button> | |
<button onClick={clear}>clear</button> | |
{jobs.map((job, i) => ( | |
<div key={i} style={{ color: job.color }}> | |
<div>id: {job.id}</div> | |
<div>progress: {job.progress}</div> | |
</div> | |
))} | |
</div> | |
); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// install redis and start redis- tutorial: https://www.youtube.com/watch?v=lgWjGkdrExA | |
// run this file | |
const throng = require('throng'); | |
const { Worker } = require('bullmq'); | |
const IORedis = require('ioredis'); | |
// Connect to a local redis instance locally, and the Heroku-provided URL in production | |
// not sure how to get the heroku host and port, guess you could right a regex or something | |
// but i havent; tried this inproduction yet | |
const REDIS_URL = process.env.REDIS_URL || 'redis://127.0.0.1:6379'; | |
const host = '127.0.0.1'; | |
const port = '6379'; | |
const connection = new IORedis(port, host, { maxRetriesPerRequest: null }); | |
// Spin up multiple processes to handle jobs to take advantage of more CPU cores | |
// See: https://devcenter.heroku.com/articles/node-concurrency for more info | |
const workers = process.env.WEB_CONCURRENCY || 2; | |
// The maximum number of jobs each worker should process at once. This will need | |
// to be tuned for your application. If each job is mostly waiting on network | |
// responses it can be much higher. If each job is CPU-intensive, it might need | |
// to be much lower. | |
// Heroku set their example to 50. I changed it to 1 to better visualize could the workers will wait for the previous job to complete. | |
const maxJobsPerWorker = 1; | |
function sleep(ms) { | |
return new Promise((resolve) => setTimeout(resolve, ms)); | |
} | |
function start() { | |
// Connect to the named work Worker | |
const workQueue = new Worker( | |
'work', | |
async (job) => { | |
// This is an example job that just slowly reports on progress | |
// while doing no work. Replace this with your own job logic. | |
let progress = 0; | |
console.log('...job starting....', job.id); | |
// // throw an error 20% of the time | |
if (Math.random() < 0.2) { | |
console.log('a job failed'); | |
throw new Error('This job failed!'); | |
} | |
while (progress < 100) { | |
await sleep(50); | |
progress += 1; | |
job.updateProgress(progress); | |
} | |
// A job can return values that will be stored in Redis as JSON | |
// This return value is unused in this demo application. | |
return { value: progress }; | |
}, | |
{ | |
connection, | |
concurrency: maxJobsPerWorker, | |
} | |
); | |
} | |
// Initialize the clustered worker process | |
// See: https://devcenter.heroku.com/articles/node-concurrency for more info | |
throng({ workers, start }); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// run this on port 4000 | |
const express = require('express'); | |
const { Queue } = require('bullmq'); | |
const cors = require('cors'); | |
const IORedis = require('ioredis'); | |
const host = '127.0.0.1'; | |
const port = '6379'; | |
const REDIS_URL = process.env.REDIS_URL || 'redis://127.0.0.1:6379'; | |
const connection = new IORedis(port, host, { maxRetriesPerRequest: null }); | |
// Serve on PORT on Heroku and on localhost:5000 locally | |
const PORT = process.env.PORT || '4000'; | |
// Connect to a local redis intance locally, and the Heroku-provided URL in production | |
const app = express(); | |
app.use( | |
cors({ | |
origin: 'http://localhost:3000', | |
}) | |
); | |
// Create / Connect to a named work queue | |
const workQueue = new Queue('work', { | |
// connection, | |
connection: { | |
host, | |
port, | |
}, | |
}); | |
// Kick off a new job by adding it to the work queue | |
app.post('/job', async (req, res) => { | |
// This would be where you could pass arguments to the job | |
// Ex: workQueue.add({ url: 'https://www.heroku.com' }) | |
// Docs: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueadd | |
const job = await workQueue.add('paint', { color: 'red' }); | |
res.json({ id: job.id }); | |
}); | |
// Allows the client to query the state of a background job | |
app.get('/job/:id', async (req, res) => { | |
const { id } = req.params; | |
const job = await workQueue.getJob(id); | |
if (job === null) { | |
console.log('getting ', id, ' null'); | |
res.status(404).end(); | |
} else { | |
const state = await job.getState(); | |
// eslint-disable-next-line no-underscore-dangle | |
const progress = job.progress; | |
console.log('getting ', id, ' ', state, ' ', progress); | |
const reason = job.failedReason; | |
res.json({ id, state, progress, reason }); | |
} | |
}); | |
// You can listen to global events to get notified when jobs are processed | |
workQueue.on('global:completed', (jobId, result) => { | |
console.log(`Job completed with result ${result}`); | |
}); | |
app.listen(PORT, () => console.log('Server started!')); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment