Skip to main content

Queue System

Intellicon CRM uses Bull (backed by Redis) for background job processing. This enables long-running operations like CSV imports, report generation, and email sending to run asynchronously without blocking API responses.

Architecture

┌──────────────┐     Add Job      ┌──────────────┐     Process     ┌──────────────┐
│ Controller │────────────────▶│ Redis Queue │────────────────▶│ Processor │
│ (API Layer) │ │ (Bull) │ │ (Worker) │
│ │◀─ Job ID ────────│ │ │ │
└──────────────┘ └──────────────┘ └──────┬───────┘
│ │
│ Poll status │
▼ ▼
GET /jobs/:id Write results to DB
{ status, progress } Update job status

Lead Import Processor

The primary queue use case is bulk CSV/XLSX import for leads.

Files

FilePurpose
apps/api/src/modules/lead-import/lead-import.module.tsModule + Bull queue registration
apps/api/src/modules/lead-import/lead-import.service.tsJob creation and status tracking
apps/api/src/modules/lead-import/lead-import.controller.tsUpload + status endpoints
apps/api/src/modules/lead-import/lead-import.processor.tsBull queue worker

Module Registration

// lead-import.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';

@Module({
imports: [
BullModule.registerQueue({
name: 'lead-import',
}),
],
controllers: [LeadImportController],
providers: [LeadImportService, LeadImportProcessor],
})
export class LeadImportModule {}

Queue Configuration

Bull connects to Redis using the environment variables:

// In app.module.ts or config
BullModule.forRoot({
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
},
});

Job Lifecycle

created → waiting → active → completed
└──→ failed → (retry) → active → completed
StatusDescription
createdJob added to queue
waitingIn queue, waiting for a worker
activeCurrently being processed
completedSuccessfully processed all rows
failedProcessing failed (may be retried)

Worker Implementation

// lead-import.processor.ts
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('lead-import')
export class LeadImportProcessor {
constructor(private readonly dataSource: DataSource) {}

@Process()
async handleImport(job: Job<{
schemaName: string;
userId: string;
filePath: string;
mapping: Record<string, string>;
importJobId: string;
}>) {
const { schemaName, userId, filePath, mapping, importJobId } = job.data;

try {
// 1. Parse CSV/XLSX file
const rows = await this.parseFile(filePath);
const totalRows = rows.length;

// 2. Update job status to 'processing'
await this.updateJobStatus(schemaName, importJobId, 'processing', {
totalRows,
});

let processed = 0;
let succeeded = 0;
let failed = 0;
const errors: any[] = [];

// 3. Process each row
for (const row of rows) {
try {
const mappedData = this.applyMapping(row, mapping);
await this.createLead(schemaName, userId, mappedData);
succeeded++;
} catch (err) {
failed++;
errors.push({ row: processed + 1, error: err.message });
}

processed++;

// 4. Report progress
await job.progress(Math.round((processed / totalRows) * 100));

// 5. Update job record periodically
if (processed % 100 === 0) {
await this.updateJobStatus(schemaName, importJobId, 'processing', {
totalRows, processed, succeeded, failed,
});
}
}

// 6. Finalize
await this.updateJobStatus(schemaName, importJobId, 'completed', {
totalRows, processed, succeeded, failed, errors,
});

return { totalRows, succeeded, failed };
} catch (err) {
await this.updateJobStatus(schemaName, importJobId, 'failed', {
error: err.message,
});
throw err;
}
}
}

Job Progress Tracking

Creating a Job

// lead-import.service.ts
@Injectable()
export class LeadImportService {
constructor(
@InjectQueue('lead-import') private importQueue: Queue,
private readonly dataSource: DataSource,
) {}

async startImport(schemaName: string, userId: string, file: any, mapping: any) {
// 1. Create import job record in DB
const [jobRecord] = await this.dataSource.query(
`INSERT INTO "${schemaName}".import_jobs
(file_name, status, created_by, created_at)
VALUES ($1, 'pending', $2, NOW())
RETURNING *`,
[file.originalname, userId],
);

// 2. Add to Bull queue
const job = await this.importQueue.add({
schemaName,
userId,
filePath: file.path,
mapping,
importJobId: jobRecord.id,
}, {
attempts: 3, // Retry up to 3 times
backoff: {
type: 'exponential',
delay: 5000, // 5s, 10s, 20s
},
removeOnComplete: true,
removeOnFail: false,
});

return { jobId: jobRecord.id, queueJobId: job.id };
}
}

Checking Job Status

// lead-import.controller.ts
@Get('jobs/:id')
@RequirePermission('leads', 'import')
async getJobStatus(
@Request() req: { user: JwtPayload },
@Param('id') id: string,
) {
return this.leadImportService.getJobStatus(req.user.tenantSchema, id);
}
// Response
{
"id": "job-uuid",
"fileName": "leads-2025.csv",
"status": "processing",
"totalRows": 5000,
"processed": 2340,
"succeeded": 2300,
"failed": 40,
"progress": 47,
"createdAt": "2025-01-15T10:00:00Z"
}

Error Handling and Retry

Bull supports automatic retry with configurable backoff:

await this.importQueue.add(jobData, {
attempts: 3, // Maximum retry attempts
backoff: {
type: 'exponential', // 'fixed' or 'exponential'
delay: 5000, // Initial delay in ms
},
});

Failed Job Handling

@OnQueueFailed()
handleFailed(job: Job, error: Error) {
console.error(`Job ${job.id} failed:`, error.message);
// Update import_jobs table with error details
}

@OnQueueCompleted()
handleCompleted(job: Job, result: any) {
console.log(`Job ${job.id} completed:`, result);
}

Monitoring Queue Health

Queue Events

@OnQueueActive()
handleActive(job: Job) {
console.log(`Processing job ${job.id}...`);
}

@OnQueueStalled()
handleStalled(job: Job) {
console.warn(`Job ${job.id} stalled — will be retried`);
}

Queue Metrics

// Get queue statistics
const queue = this.importQueue;
const waiting = await queue.getWaitingCount();
const active = await queue.getActiveCount();
const completed = await queue.getCompletedCount();
const failed = await queue.getFailedCount();

Adding New Queue Processors

To add a new background job type:

1. Register the Queue

// In your module
BullModule.registerQueue({
name: 'report-export',
})

2. Create the Processor

@Processor('report-export')
export class ReportExportProcessor {
@Process()
async handleExport(job: Job<{ schemaName: string; reportId: string }>) {
// Process the job
}
}

3. Inject and Use in Service

@Injectable()
export class ReportsService {
constructor(
@InjectQueue('report-export') private exportQueue: Queue,
) {}

async exportReport(schemaName: string, reportId: string) {
const job = await this.exportQueue.add({ schemaName, reportId });
return { jobId: job.id };
}
}
Redis Dependency

The queue system requires a running Redis instance. If Redis is unavailable:

  • Queue operations will throw connection errors
  • The rest of the application will continue to function
  • Import/export features will be unavailable until Redis is restored
Development

For local development, you can monitor Bull queues using Bull Board or Arena. Add it as a dev dependency for a web-based queue dashboard.