# Lockness Queue Background job processing system with automatic retries, delayed execution, and multiple driver support. ## Overview @lockness/queue provides a simple yet powerful job queue system with Memory and Deno KV drivers. Features include automatic retries, delayed jobs, multiple queues, and full TypeScript type safety. ## Installation ```typescript import { configureQueue, dispatch, QueueWorker } from '@lockness/queue' ``` ## Configuration ```typescript configureQueue({ driver: 'memory', // 'memory' | 'deno-kv' defaultQueue: 'default', kvPath: './data/kv', // optional, for deno-kv driver retryDelay: 3000, // 3 seconds between retries }) ``` ## Basic Usage ### 1. Define a Job ```typescript import type { Job, JobPayload } from '@lockness/queue' interface SendEmailPayload extends JobPayload { userId: number email: string subject: string body: string } class SendEmailJob implements Job { name = 'send-email' maxAttempts = 3 constructor(public payload: SendEmailPayload) {} async handle(payload: SendEmailPayload): Promise { console.log(`Sending email to ${payload.email}`) await sendEmail(payload.email, payload.subject, payload.body) } async failed(payload: SendEmailPayload, error: Error): Promise { // Called after all retries failed console.error( `Failed to send email after ${this.maxAttempts} attempts: ${error.message}`, ) await logError(error) } } ``` ### 2. Dispatch Jobs ```typescript import { dispatch } from '@lockness/queue' // Simple dispatch await dispatch( new SendEmailJob({ userId: 1, email: 'user@example.com', subject: 'Welcome!', body: 'Welcome to our service', }), ) // With delay (1 minute) await dispatch(job, { delay: 60000 }) // To specific queue await dispatch(job, { queue: 'emails' }) // Combined options await dispatch(job, { queue: 'notifications', delay: 5000 }) ``` ### 3. Process Jobs with Worker ```typescript import { QueueWorker } from '@lockness/queue' const worker = new QueueWorker({ queues: ['default', 'emails', 'notifications'], sleep: 1000, // Check for jobs every second maxJobs: 0, // Unlimited (0 = continuous) stopWhenEmpty: false, // Keep running }) await worker.start() ``` ## Decorator Alternative Use @Queueable decorator for cleaner job definitions: ```typescript import { Queueable } from '@lockness/queue' @Queueable('send-welcome-email', 5) // name, maxAttempts class SendWelcomeEmailJob implements Job<{ userId: number }> { constructor(public payload: { userId: number }) {} async handle(payload: { userId: number }): Promise { const user = await getUserById(payload.userId) await sendWelcomeEmail(user) } async failed(payload: { userId: number }, error: Error): Promise { await notifyAdmin( `Welcome email failed for user ${payload.userId}`, error, ) } } // Job is automatically registered await dispatch(new SendWelcomeEmailJob({ userId: 1 })) ``` ## Queue Management ```typescript import { clearQueue, queueSize } from '@lockness/queue' // Get queue size const size = await queueSize('default') console.log(`Default queue has ${size} jobs`) // Clear queue await clearQueue('emails') ``` ## Drivers ### Memory Driver (Default) In-memory queue for development and testing. Jobs are lost on restart. ```typescript configureQueue({ driver: 'memory' }) ``` **Pros:** - Fast - Simple setup - Good for development **Cons:** - Jobs lost on restart - Not shared across instances - Memory grows with queue size ### Deno KV Driver Persistent queue using Deno KV storage. Jobs survive restarts. ```typescript configureQueue({ driver: 'deno-kv', kvPath: './data/kv', // optional, defaults to Deno.openKv() }) ``` **Pros:** - Persistent across restarts - Shared across instances - Production-ready **Cons:** - Requires Deno KV access - Slight performance overhead ## Common Use Cases ### Email Notifications ```typescript @Queueable('send-notification', 3) class SendNotificationJob implements Job<{ userId: number; message: string }> { constructor(public payload: { userId: number; message: string }) {} async handle({ userId, message }: { userId: number; message: string }) { const user = await getUserById(userId) await mail() .to(user.email) .subject('Notification') .html(message) .send() } } // Dispatch from controller @Post('/notify') async notify(c: Context) { const { userId, message } = await c.req.json() await dispatch(new SendNotificationJob({ userId, message })) return c.json({ queued: true }) } ``` ### Image Processing ```typescript @Queueable('process-image', 5) class ProcessImageJob implements Job<{ imageId: number }> { constructor(public payload: { imageId: number }) {} async handle({ imageId }: { imageId: number }) { const image = await getImage(imageId) // Generate thumbnails const thumbnail = await resizeImage(image, 200, 200) await storage().put(`thumbnails/${imageId}.jpg`, thumbnail) // Generate medium size const medium = await resizeImage(image, 800, 800) await storage().put(`medium/${imageId}.jpg`, medium) // Update database await markImageProcessed(imageId) } async failed({ imageId }: { imageId: number }, error: Error) { await markImageFailed(imageId, error.message) } } ``` ### Payment Processing with Chaining ```typescript @Queueable('process-payment', 5) class ProcessPaymentJob implements Job<{ orderId: number }> { constructor(public payload: { orderId: number }) {} async handle({ orderId }: { orderId: number }) { const order = await getOrder(orderId) // Process payment const payment = await processPayment(order) await updateOrderStatus(orderId, 'paid') // Chain next job: send receipt await dispatch( new SendReceiptJob({ orderId, paymentId: payment.id }), { delay: 2000 }, ) // Chain another job: update inventory await dispatch(new UpdateInventoryJob({ orderId })) } async failed({ orderId }: { orderId: number }, error: Error) { await updateOrderStatus(orderId, 'failed') await notifyAdmin(`Payment failed for order ${orderId}`, error) } } @Queueable('send-receipt', 3) class SendReceiptJob implements Job<{ orderId: number; paymentId: string }> { constructor(public payload: { orderId: number; paymentId: string }) {} async handle( { orderId, paymentId }: { orderId: number; paymentId: string }, ) { const order = await getOrder(orderId) const receipt = await generateReceipt(order, paymentId) await sendReceiptEmail(order.user.email, receipt) } } ``` ### Scheduled Reports ```typescript @Queueable('generate-report', 3) class GenerateReportJob implements Job<{ reportType: string; userId: number }> { constructor(public payload: { reportType: string; userId: number }) {} async handle( { reportType, userId }: { reportType: string; userId: number }, ) { const data = await fetchReportData(reportType, userId) const report = await generatePDF(data) const filename = `reports/${userId}/${reportType}-${Date.now()}.pdf` await storage().put(filename, report) const url = await storage().signedUrl(filename, 86400) // 24 hours await notifyUserReportReady(userId, url) } } // Schedule daily reports Deno.cron('daily-reports', '0 0 * * *', async () => { const users = await getActiveUsers() for (const user of users) { await dispatch( new GenerateReportJob({ reportType: 'daily', userId: user.id }), ) } }) ``` ## Worker Configuration ### Basic Worker ```typescript const worker = new QueueWorker({ queues: ['default'], sleep: 1000, }) await worker.start() ``` ### Multi-Queue Worker with Priority ```typescript // Process high-priority queues first const worker = new QueueWorker({ queues: ['critical', 'high', 'normal', 'low'], sleep: 500, // Check more frequently maxJobs: 100, // Stop after 100 jobs }) await worker.start() ``` ### Worker for Specific Job Types ```typescript const emailWorker = new QueueWorker({ queues: ['emails'], sleep: 2000, stopWhenEmpty: false, }) const imageWorker = new QueueWorker({ queues: ['images', 'thumbnails'], sleep: 1000, stopWhenEmpty: false, }) await Promise.all([ emailWorker.start(), imageWorker.start(), ]) ``` ## Best Practices - Use meaningful job names (e.g., 'send-welcome-email', not 'job1') - Set appropriate maxAttempts based on job criticality - Implement failed() handler for alerting and logging - Use specific queues for different job types - Add delays for rate-limited operations - Chain related jobs instead of doing everything in one job - Use Deno KV driver in production for persistence - Monitor queue sizes to detect processing issues - Implement idempotent job handlers (safe to retry) - Log job execution for debugging - Use payload interfaces for type safety - Keep payload data small and serializable ## Testing ```typescript import { configureQueue, dispatch, QueueWorker } from '@lockness/queue' Deno.test('job processing', async () => { // Use memory driver for tests configureQueue({ driver: 'memory' }) let executed = false @Queueable('test-job', 1) class TestJob implements Job<{ value: string }> { constructor(public payload: { value: string }) {} async handle({ value }: { value: string }) { executed = true assertEquals(value, 'test') } } await dispatch(new TestJob({ value: 'test' })) const worker = new QueueWorker({ queues: ['default'], maxJobs: 1, stopWhenEmpty: true, }) await worker.start() assertEquals(executed, true) }) ```