import amqp, { type Channel, type ConsumeMessage } from "amqplib" import { and, eq, sql } from "drizzle-orm" import { createDatabase, createDatabasePool, imageAssets, imageAssetVersions, imageVariants, } from "@image-platform/database" import { assertQueueTopology, parseGenerateVariantJobBuffer } from "@image-platform/queue" import type { ResizeMode } from "@image-platform/image-config" import { createS3Client, putObjectBuffer } from "@image-platform/storage" import { loadWorkerConfig } from "./config.js" async function bootstrap() { const config = loadWorkerConfig() const pool = createDatabasePool() const db = createDatabase(pool) const s3 = createS3Client(config.storage) const connection = await amqp.connect(config.rabbitmqUrl) const channel = await connection.createChannel() await assertQueueTopology(channel, config.queueTopology) await channel.prefetch(config.prefetch) await channel.consume( config.queueTopology.generateVariantQueue, (message) => void handleGenerateVariantMessage({ channel, config, db, message, s3 }), { noAck: false }, ) console.log(`worker consuming ${config.queueTopology.generateVariantQueue}`) const shutdown = async () => { console.log("worker shutting down") await channel.close().catch((error: unknown) => console.error("failed to close RabbitMQ channel", error)) await connection.close().catch((error: unknown) => console.error("failed to close RabbitMQ connection", error)) await pool.end().catch((error: unknown) => console.error("failed to close PostgreSQL pool", error)) process.exit(0) } process.once("SIGINT", () => void shutdown()) process.once("SIGTERM", () => void shutdown()) } type GenerateVariantContext = { channel: Channel config: ReturnType db: ReturnType message: ConsumeMessage | null s3: ReturnType } async function handleGenerateVariantMessage({ channel, config, db, message, s3 }: GenerateVariantContext) { if (message === null) { return } try { const job = parseGenerateVariantJobBuffer(message.content) const variant = await loadVariantForGeneration(db, job.variantId) if (!variant) { console.error("generate variant job references missing variant", job) channel.nack(message, false, false) return } if (variant.status === "ready") { channel.ack(message) return } await db .update(imageVariants) .set({ attemptCount: sql`${imageVariants.attemptCount} + 1`, error: null, status: "processing", updatedAt: new Date(), }) .where(and(eq(imageVariants.id, variant.id), eq(imageVariants.status, variant.status))) const imgproxyUrl = buildImgproxyUrl(config.imgproxyUpstream, variant.sourceUrl, { format: variant.format, height: variant.height ?? 0, quality: variant.quality, resize: variant.resizeMode, width: variant.width, }) const response = await fetch(imgproxyUrl) if (!response.ok) { throw new Error(`imgproxy returned ${response.status} for variant ${variant.id}`) } const body = Buffer.from(await response.arrayBuffer()) const contentType = response.headers.get("content-type") ?? contentTypeForFormat(variant.format) const putResult = await putObjectBuffer({ body, bucket: config.storage.bucket, cacheControl: "public, max-age=31536000, immutable", client: s3, contentType, key: variant.s3Key, }) await db .update(imageVariants) .set({ contentType, error: null, etag: putResult.ETag ?? null, sizeBytes: body.length, status: "ready", updatedAt: new Date(), }) .where(eq(imageVariants.id, variant.id)) console.log("generated image variant", { id: variant.id, key: variant.s3Key, sizeBytes: body.length }) channel.ack(message) } catch (error) { console.error("invalid generate variant job", error) const variantId = getVariantIdFromMessage(message) if (variantId) { await db .update(imageVariants) .set({ error: formatError(error), status: "failed", updatedAt: new Date() }) .where(eq(imageVariants.id, variantId)) .catch((updateError: unknown) => console.error("failed to mark variant as failed", updateError)) } channel.nack(message, false, false) } } async function loadVariantForGeneration(db: ReturnType, variantId: string) { const [row] = await db .select({ format: imageVariants.format, height: imageVariants.height, id: imageVariants.id, quality: imageVariants.quality, resizeMode: imageVariants.resizeMode, s3Key: imageVariants.s3Key, sourceUrl: imageAssetVersions.sourceUrl, status: imageVariants.status, width: imageVariants.width, }) .from(imageVariants) .innerJoin(imageAssets, eq(imageVariants.assetId, imageAssets.id)) .innerJoin(imageAssetVersions, eq(imageVariants.assetVersionId, imageAssetVersions.id)) .where(eq(imageVariants.id, variantId)) .limit(1) return row ?? null } function buildImgproxyUrl( upstream: URL, sourceUrl: string, options: { format: "avif" | "jpg" | "png" | "webp" height: number quality: number resize: ResizeMode width: number }, ) { const url = new URL(upstream) const encodedSource = Buffer.from(sourceUrl).toString("base64url") url.pathname = joinUrlPath( url.pathname, "insecure", `rs:${options.resize}:${options.width}:${options.height}`, `q:${options.quality}`, `${encodedSource}.${options.format}`, ) return url } function joinUrlPath(...segments: string[]) { return segments .flatMap((segment) => segment.split("/")) .filter(Boolean) .map(encodePathSegment) .join("/") .replace(/^/, "/") } function encodePathSegment(segment: string) { return segment.includes(":") ? segment : encodeURIComponent(segment) } function contentTypeForFormat(format: "avif" | "jpg" | "png" | "webp") { if (format === "jpg") { return "image/jpeg" } return `image/${format}` } function getVariantIdFromMessage(message: ConsumeMessage) { try { return parseGenerateVariantJobBuffer(message.content).variantId } catch { return null } } function formatError(error: unknown) { const message = error instanceof Error ? error.message : String(error) return message.slice(0, 2000) } void bootstrap().catch((error: unknown) => { console.error("worker failed to start", error) process.exit(1) })