Last active
September 11, 2024 20:53
-
-
Save saolsen/881eb0c9962c266db7d3962b9f71c4df to your computer and use it in GitHub Desktop.
pgboss with postgres.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// wow, this is awesome | |
// you really don't need drizzle, this is much better | |
// I really kinda hate that copilot isn't working though... | |
import postgres from "npm:postgres"; | |
import PgBoss from "npm:pg-boss"; | |
interface World { | |
world: string; | |
} | |
if (import.meta.main) { | |
const sql = postgres({}); | |
const boss_connection = await sql.reserve(); | |
const boss = new PgBoss({ | |
db: { | |
// deno-lint-ignore no-explicit-any | |
executeSql: async (text: string, values: any[] | undefined) => { | |
const processed_values = []; | |
if (values) { | |
for (const value of values) { | |
if (value === undefined) { | |
processed_values.push(null); | |
} else if ( | |
typeof value === "string" && | |
(value.startsWith("[") || value.startsWith("{")) | |
) { | |
processed_values.push(JSON.parse(value)); | |
} else { | |
processed_values.push(value); | |
} | |
} | |
} | |
if (text.includes("json_to_recordset")) { | |
console.log("query", text); | |
console.log("values", values); | |
console.log("processed_values", processed_values); | |
} | |
const results = await boss_connection.unsafe(text, processed_values); | |
return { rows: [...results.values()] }; | |
}, | |
}, | |
}); | |
boss.on("error", console.error); | |
await boss.start(); | |
const queue = "test-queue"; | |
await boss.createQueue(queue); | |
const id = await boss.send(queue, { arg1: "read me" }); | |
console.log(`created job ${id} in queue ${queue}`); | |
const worker = await boss.work(queue, async ([job]) => { | |
console.log(`received job ${job.id} with data ${JSON.stringify(job.data)}`); | |
}); | |
console.log(await boss.getQueueSize(queue)); | |
const result = await sql<World[]>` | |
select ${"Hello"} as world; | |
`; | |
console.log(); | |
for (const [i, entry] of result.entries()) { | |
console.log(i); | |
console.log(entry); | |
} | |
console.log("stopping worker"); | |
//await boss.offWork(worker); | |
console.log("stopping boss"); | |
//await boss.stop(); | |
console.log("releasing connection"); | |
//boss_connection.release(); | |
console.log("ending sql"); | |
//await sql.end(); | |
//Deno.exit(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment