|
|
|
|
@@ -1,10 +1,23 @@
|
|
|
|
|
import amqp, { type Channel, type ConsumeMessage } from "amqplib"
|
|
|
|
|
import { parseGenerateVariantJobBuffer, type QueueTopology } from "@image-platform/queue"
|
|
|
|
|
import { and, eq, sql } from "drizzle-orm"
|
|
|
|
|
import {
|
|
|
|
|
createDatabase,
|
|
|
|
|
createDatabasePool,
|
|
|
|
|
imageAssets,
|
|
|
|
|
imageAssetVersions,
|
|
|
|
|
imageVariants,
|
|
|
|
|
} from "@image-platform/database"
|
|
|
|
|
import { assertQueueTopology, parseGenerateVariantJobBuffer } from "@image-platform/queue"
|
|
|
|
|
import type { ResizeMode } from "@image-platform/image-config"
|
|
|
|
|
import { createS3Client, putObjectBuffer } from "@image-platform/storage"
|
|
|
|
|
|
|
|
|
|
import { loadWorkerConfig } from "./config.js"
|
|
|
|
|
|
|
|
|
|
async function bootstrap() {
|
|
|
|
|
const config = loadWorkerConfig()
|
|
|
|
|
const pool = createDatabasePool()
|
|
|
|
|
const db = createDatabase(pool)
|
|
|
|
|
const s3 = createS3Client(config.storage)
|
|
|
|
|
const connection = await amqp.connect(config.rabbitmqUrl)
|
|
|
|
|
const channel = await connection.createChannel()
|
|
|
|
|
|
|
|
|
|
@@ -12,7 +25,7 @@ async function bootstrap() {
|
|
|
|
|
await channel.prefetch(config.prefetch)
|
|
|
|
|
await channel.consume(
|
|
|
|
|
config.queueTopology.generateVariantQueue,
|
|
|
|
|
(message) => void handleGenerateVariantMessage(channel, message),
|
|
|
|
|
(message) => void handleGenerateVariantMessage({ channel, config, db, message, s3 }),
|
|
|
|
|
{ noAck: false },
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@@ -22,6 +35,7 @@ async function bootstrap() {
|
|
|
|
|
console.log("worker shutting down")
|
|
|
|
|
await channel.close().catch((error: unknown) => console.error("failed to close RabbitMQ channel", error))
|
|
|
|
|
await connection.close().catch((error: unknown) => console.error("failed to close RabbitMQ connection", error))
|
|
|
|
|
await pool.end().catch((error: unknown) => console.error("failed to close PostgreSQL pool", error))
|
|
|
|
|
process.exit(0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -29,38 +43,179 @@ async function bootstrap() {
|
|
|
|
|
process.once("SIGTERM", () => void shutdown())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function assertQueueTopology(channel: Channel, topology: QueueTopology) {
|
|
|
|
|
await channel.assertExchange(topology.jobsExchange, "direct", { durable: true })
|
|
|
|
|
await channel.assertExchange(topology.jobsDeadLetterExchange, "direct", { durable: true })
|
|
|
|
|
await channel.assertQueue(topology.generateVariantQueue, {
|
|
|
|
|
deadLetterExchange: topology.jobsDeadLetterExchange,
|
|
|
|
|
deadLetterRoutingKey: topology.generateVariantDeadLetterRoutingKey,
|
|
|
|
|
durable: true,
|
|
|
|
|
})
|
|
|
|
|
await channel.assertQueue(topology.generateVariantDeadLetterQueue, { durable: true })
|
|
|
|
|
await channel.bindQueue(topology.generateVariantQueue, topology.jobsExchange, topology.generateVariantRoutingKey)
|
|
|
|
|
await channel.bindQueue(
|
|
|
|
|
topology.generateVariantDeadLetterQueue,
|
|
|
|
|
topology.jobsDeadLetterExchange,
|
|
|
|
|
topology.generateVariantDeadLetterRoutingKey,
|
|
|
|
|
)
|
|
|
|
|
type GenerateVariantContext = {
|
|
|
|
|
channel: Channel
|
|
|
|
|
config: ReturnType<typeof loadWorkerConfig>
|
|
|
|
|
db: ReturnType<typeof createDatabase>
|
|
|
|
|
message: ConsumeMessage | null
|
|
|
|
|
s3: ReturnType<typeof createS3Client>
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function handleGenerateVariantMessage(channel: Channel, message: ConsumeMessage | null) {
|
|
|
|
|
async function handleGenerateVariantMessage({ channel, config, db, message, s3 }: GenerateVariantContext) {
|
|
|
|
|
if (message === null) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
const job = parseGenerateVariantJobBuffer(message.content)
|
|
|
|
|
console.log("generate variant job received, handler not implemented yet", job)
|
|
|
|
|
channel.nack(message, false, false)
|
|
|
|
|
const variant = await loadVariantForGeneration(db, job.variantId)
|
|
|
|
|
|
|
|
|
|
if (!variant) {
|
|
|
|
|
console.error("generate variant job references missing variant", job)
|
|
|
|
|
channel.nack(message, false, false)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (variant.status === "ready") {
|
|
|
|
|
channel.ack(message)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await db
|
|
|
|
|
.update(imageVariants)
|
|
|
|
|
.set({
|
|
|
|
|
attemptCount: sql`${imageVariants.attemptCount} + 1`,
|
|
|
|
|
error: null,
|
|
|
|
|
status: "processing",
|
|
|
|
|
updatedAt: new Date(),
|
|
|
|
|
})
|
|
|
|
|
.where(and(eq(imageVariants.id, variant.id), eq(imageVariants.status, variant.status)))
|
|
|
|
|
|
|
|
|
|
const imgproxyUrl = buildImgproxyUrl(config.imgproxyUpstream, variant.sourceUrl, {
|
|
|
|
|
format: variant.format,
|
|
|
|
|
height: variant.height ?? 0,
|
|
|
|
|
quality: variant.quality,
|
|
|
|
|
resize: variant.resizeMode,
|
|
|
|
|
width: variant.width,
|
|
|
|
|
})
|
|
|
|
|
const response = await fetch(imgproxyUrl)
|
|
|
|
|
|
|
|
|
|
if (!response.ok) {
|
|
|
|
|
throw new Error(`imgproxy returned ${response.status} for variant ${variant.id}`)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const body = Buffer.from(await response.arrayBuffer())
|
|
|
|
|
const contentType = response.headers.get("content-type") ?? contentTypeForFormat(variant.format)
|
|
|
|
|
const putResult = await putObjectBuffer({
|
|
|
|
|
body,
|
|
|
|
|
bucket: config.storage.bucket,
|
|
|
|
|
cacheControl: "public, max-age=31536000, immutable",
|
|
|
|
|
client: s3,
|
|
|
|
|
contentType,
|
|
|
|
|
key: variant.s3Key,
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
await db
|
|
|
|
|
.update(imageVariants)
|
|
|
|
|
.set({
|
|
|
|
|
contentType,
|
|
|
|
|
error: null,
|
|
|
|
|
etag: putResult.ETag ?? null,
|
|
|
|
|
sizeBytes: body.length,
|
|
|
|
|
status: "ready",
|
|
|
|
|
updatedAt: new Date(),
|
|
|
|
|
})
|
|
|
|
|
.where(eq(imageVariants.id, variant.id))
|
|
|
|
|
|
|
|
|
|
console.log("generated image variant", { id: variant.id, key: variant.s3Key, sizeBytes: body.length })
|
|
|
|
|
channel.ack(message)
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.error("invalid generate variant job", error)
|
|
|
|
|
const variantId = getVariantIdFromMessage(message)
|
|
|
|
|
|
|
|
|
|
if (variantId) {
|
|
|
|
|
await db
|
|
|
|
|
.update(imageVariants)
|
|
|
|
|
.set({ error: formatError(error), status: "failed", updatedAt: new Date() })
|
|
|
|
|
.where(eq(imageVariants.id, variantId))
|
|
|
|
|
.catch((updateError: unknown) => console.error("failed to mark variant as failed", updateError))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
channel.nack(message, false, false)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function loadVariantForGeneration(db: ReturnType<typeof createDatabase>, variantId: string) {
|
|
|
|
|
const [row] = await db
|
|
|
|
|
.select({
|
|
|
|
|
format: imageVariants.format,
|
|
|
|
|
height: imageVariants.height,
|
|
|
|
|
id: imageVariants.id,
|
|
|
|
|
quality: imageVariants.quality,
|
|
|
|
|
resizeMode: imageVariants.resizeMode,
|
|
|
|
|
s3Key: imageVariants.s3Key,
|
|
|
|
|
sourceUrl: imageAssetVersions.sourceUrl,
|
|
|
|
|
status: imageVariants.status,
|
|
|
|
|
width: imageVariants.width,
|
|
|
|
|
})
|
|
|
|
|
.from(imageVariants)
|
|
|
|
|
.innerJoin(imageAssets, eq(imageVariants.assetId, imageAssets.id))
|
|
|
|
|
.innerJoin(imageAssetVersions, eq(imageVariants.assetVersionId, imageAssetVersions.id))
|
|
|
|
|
.where(eq(imageVariants.id, variantId))
|
|
|
|
|
.limit(1)
|
|
|
|
|
|
|
|
|
|
return row ?? null
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function buildImgproxyUrl(
|
|
|
|
|
upstream: URL,
|
|
|
|
|
sourceUrl: string,
|
|
|
|
|
options: {
|
|
|
|
|
format: "avif" | "jpg" | "png" | "webp"
|
|
|
|
|
height: number
|
|
|
|
|
quality: number
|
|
|
|
|
resize: ResizeMode
|
|
|
|
|
width: number
|
|
|
|
|
},
|
|
|
|
|
) {
|
|
|
|
|
const url = new URL(upstream)
|
|
|
|
|
const encodedSource = Buffer.from(sourceUrl).toString("base64url")
|
|
|
|
|
url.pathname = joinUrlPath(
|
|
|
|
|
url.pathname,
|
|
|
|
|
"insecure",
|
|
|
|
|
`rs:${options.resize}:${options.width}:${options.height}`,
|
|
|
|
|
`q:${options.quality}`,
|
|
|
|
|
`${encodedSource}.${options.format}`,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return url
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function joinUrlPath(...segments: string[]) {
|
|
|
|
|
return segments
|
|
|
|
|
.flatMap((segment) => segment.split("/"))
|
|
|
|
|
.filter(Boolean)
|
|
|
|
|
.map(encodePathSegment)
|
|
|
|
|
.join("/")
|
|
|
|
|
.replace(/^/, "/")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function encodePathSegment(segment: string) {
|
|
|
|
|
return segment.includes(":") ? segment : encodeURIComponent(segment)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function contentTypeForFormat(format: "avif" | "jpg" | "png" | "webp") {
|
|
|
|
|
if (format === "jpg") {
|
|
|
|
|
return "image/jpeg"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return `image/${format}`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function getVariantIdFromMessage(message: ConsumeMessage) {
|
|
|
|
|
try {
|
|
|
|
|
return parseGenerateVariantJobBuffer(message.content).variantId
|
|
|
|
|
} catch {
|
|
|
|
|
return null
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function formatError(error: unknown) {
|
|
|
|
|
const message = error instanceof Error ? error.message : String(error)
|
|
|
|
|
|
|
|
|
|
return message.slice(0, 2000)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void bootstrap().catch((error: unknown) => {
|
|
|
|
|
console.error("worker failed to start", error)
|
|
|
|
|
process.exit(1)
|
|
|
|
|
|