68 lines
2.5 KiB
TypeScript
68 lines
2.5 KiB
TypeScript
|
|
import amqp, { type Channel, type ConsumeMessage } from "amqplib"
|
||
|
|
import { parseGenerateVariantJobBuffer, type QueueTopology } from "@image-platform/queue"
|
||
|
|
|
||
|
|
import { loadWorkerConfig } from "./config.js"
|
||
|
|
|
||
|
|
async function bootstrap() {
|
||
|
|
const config = loadWorkerConfig()
|
||
|
|
const connection = await amqp.connect(config.rabbitmqUrl)
|
||
|
|
const channel = await connection.createChannel()
|
||
|
|
|
||
|
|
await assertQueueTopology(channel, config.queueTopology)
|
||
|
|
await channel.prefetch(config.prefetch)
|
||
|
|
await channel.consume(
|
||
|
|
config.queueTopology.generateVariantQueue,
|
||
|
|
(message) => void handleGenerateVariantMessage(channel, message),
|
||
|
|
{ noAck: false },
|
||
|
|
)
|
||
|
|
|
||
|
|
console.log(`worker consuming ${config.queueTopology.generateVariantQueue}`)
|
||
|
|
|
||
|
|
const shutdown = async () => {
|
||
|
|
console.log("worker shutting down")
|
||
|
|
await channel.close().catch((error: unknown) => console.error("failed to close RabbitMQ channel", error))
|
||
|
|
await connection.close().catch((error: unknown) => console.error("failed to close RabbitMQ connection", error))
|
||
|
|
process.exit(0)
|
||
|
|
}
|
||
|
|
|
||
|
|
process.once("SIGINT", () => void shutdown())
|
||
|
|
process.once("SIGTERM", () => void shutdown())
|
||
|
|
}
|
||
|
|
|
||
|
|
async function assertQueueTopology(channel: Channel, topology: QueueTopology) {
|
||
|
|
await channel.assertExchange(topology.jobsExchange, "direct", { durable: true })
|
||
|
|
await channel.assertExchange(topology.jobsDeadLetterExchange, "direct", { durable: true })
|
||
|
|
await channel.assertQueue(topology.generateVariantQueue, {
|
||
|
|
deadLetterExchange: topology.jobsDeadLetterExchange,
|
||
|
|
deadLetterRoutingKey: topology.generateVariantDeadLetterRoutingKey,
|
||
|
|
durable: true,
|
||
|
|
})
|
||
|
|
await channel.assertQueue(topology.generateVariantDeadLetterQueue, { durable: true })
|
||
|
|
await channel.bindQueue(topology.generateVariantQueue, topology.jobsExchange, topology.generateVariantRoutingKey)
|
||
|
|
await channel.bindQueue(
|
||
|
|
topology.generateVariantDeadLetterQueue,
|
||
|
|
topology.jobsDeadLetterExchange,
|
||
|
|
topology.generateVariantDeadLetterRoutingKey,
|
||
|
|
)
|
||
|
|
}
|
||
|
|
|
||
|
|
async function handleGenerateVariantMessage(channel: Channel, message: ConsumeMessage | null) {
|
||
|
|
if (message === null) {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
try {
|
||
|
|
const job = parseGenerateVariantJobBuffer(message.content)
|
||
|
|
console.log("generate variant job received, handler not implemented yet", job)
|
||
|
|
channel.nack(message, false, false)
|
||
|
|
} catch (error) {
|
||
|
|
console.error("invalid generate variant job", error)
|
||
|
|
channel.nack(message, false, false)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void bootstrap().catch((error: unknown) => {
|
||
|
|
console.error("worker failed to start", error)
|
||
|
|
process.exit(1)
|
||
|
|
})
|