Skip to content

Instantly share code, notes, and snippets.

@saolsen
Last active September 11, 2024 20:53
Show Gist options
  • Save saolsen/881eb0c9962c266db7d3962b9f71c4df to your computer and use it in GitHub Desktop.
Save saolsen/881eb0c9962c266db7d3962b9f71c4df to your computer and use it in GitHub Desktop.
pgboss with postgres.js
// wow, this is awesome
// you really don't need drizzle, this is much better
// I really kinda hate that copilot isn't working though...
import postgres from "npm:postgres";
import PgBoss from "npm:pg-boss";
interface World {
world: string;
}
if (import.meta.main) {
const sql = postgres({});
const boss_connection = await sql.reserve();
const boss = new PgBoss({
db: {
// deno-lint-ignore no-explicit-any
executeSql: async (text: string, values: any[] | undefined) => {
const processed_values = [];
if (values) {
for (const value of values) {
if (value === undefined) {
processed_values.push(null);
} else if (
typeof value === "string" &&
(value.startsWith("[") || value.startsWith("{"))
) {
processed_values.push(JSON.parse(value));
} else {
processed_values.push(value);
}
}
}
if (text.includes("json_to_recordset")) {
console.log("query", text);
console.log("values", values);
console.log("processed_values", processed_values);
}
const results = await boss_connection.unsafe(text, processed_values);
return { rows: [...results.values()] };
},
},
});
boss.on("error", console.error);
await boss.start();
const queue = "test-queue";
await boss.createQueue(queue);
const id = await boss.send(queue, { arg1: "read me" });
console.log(`created job ${id} in queue ${queue}`);
const worker = await boss.work(queue, async ([job]) => {
console.log(`received job ${job.id} with data ${JSON.stringify(job.data)}`);
});
console.log(await boss.getQueueSize(queue));
const result = await sql<World[]>`
select ${"Hello"} as world;
`;
console.log();
for (const [i, entry] of result.entries()) {
console.log(i);
console.log(entry);
}
console.log("stopping worker");
//await boss.offWork(worker);
console.log("stopping boss");
//await boss.stop();
console.log("releasing connection");
//boss_connection.release();
console.log("ending sql");
//await sql.end();
//Deno.exit();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment