Skip to main content

Pipeline System

The pipeline system is a shared architecture used across multiple modules: Leads, Opportunities, Deals, and Projects. Rather than duplicating pipeline logic per module, a single set of tables and endpoints serves all modules via a module column discriminator.

Architecture

┌──────────────────────────────────────────────────────────┐
│ pipelines │
│ id | name | module | is_default | ... │
│ 1 | Sales | leads | true | │
│ 2 | Enterprise | leads | false | │
│ 3 | Deal Flow | opportunities | true | │
│ 4 | Projects | projects | true | │
└──────────┬───────────────────────────────────────────────┘
│ 1:N
┌──────────▼───────────────────────────────────────────────┐
│ pipeline_stages │
│ id | pipeline_id | name | position | color | ... │
│ 1 | 1 | New | 1 | blue | │
│ 2 | 1 | Qualified | 2 | green | │
│ 3 | 1 | Proposal | 3 | amber | │
│ 4 | 1 | Closed Won | 4 | green | │
└──────────┬───────────────────────────────────────────────┘
│ 1:N
┌──────────▼───────────────────────────────────────────────┐
│ pipeline_stage_fields │
│ id | stage_id | field_name | is_required | field_type │
│ 1 | 3 | amount | true | number │
│ 2 | 3 | proposal | true | file │
│ 3 | 4 | close_date | true | date │
└──────────────────────────────────────────────────────────┘

Tables

pipelines

ColumnTypeDescription
idUUIDPrimary key
nameVARCHAR(255)Pipeline name
moduleVARCHAR(50)Module discriminator
is_defaultBOOLEANDefault pipeline for the module
is_activeBOOLEANWhether pipeline is active
created_byUUIDCreator user ID
created_atTIMESTAMPTZCreation timestamp
updated_atTIMESTAMPTZLast update timestamp
deleted_atTIMESTAMPTZSoft delete timestamp

pipeline_stages

ColumnTypeDescription
idUUIDPrimary key
pipeline_idUUIDFK to pipelines
nameVARCHAR(255)Stage name
positionINTEGERSort order
colorVARCHAR(20)Display color
probabilityINTEGERWin probability percentage
is_wonBOOLEANMarks as "won" stage
is_lostBOOLEANMarks as "lost" stage
stage_owner_typeVARCHAR(20)'user', 'team', or 'role'
stage_owner_user_idUUIDOwner user (if type = user)
stage_owner_team_idUUIDOwner team (if type = team)
stage_owner_role_idUUIDOwner role (if type = role)
field_visibilityJSONBPer-field visibility rules for this stage

pipeline_stage_fields

Required fields that must be filled before transitioning to a stage.

ColumnTypeDescription
idUUIDPrimary key
stage_idUUIDFK to pipeline_stages
field_nameVARCHAR(255)Field identifier
field_labelVARCHAR(255)Display label
field_typeVARCHAR(50)Input type (text, number, date, select, file)
is_requiredBOOLEANWhether field is mandatory
optionsJSONBOptions for select fields

record_stage_assignments

Tracks who is assigned to work on a record at each stage.

ColumnTypeDescription
idUUIDPrimary key
entity_typeVARCHAR(50)'leads', 'opportunities', etc.
entity_idUUIDRecord ID
stage_idUUIDCurrent stage
assigned_toUUIDAssigned user
assigned_atTIMESTAMPTZAssignment timestamp
completed_atTIMESTAMPTZWhen assignment was completed

Module Column Values

ValueModule
'leads'Leads pipeline
'opportunities'Opportunities pipeline
'deals'Deals pipeline (Sprint 5)
'projects'Projects pipeline (Sprint 8)

API Endpoints

All pipeline endpoints are consolidated under /lead-settings/:

Shared Endpoints

Despite being under /lead-settings/, these endpoints serve ALL modules. The module query parameter determines which module's data is returned.

Pipelines

GET    /lead-settings/pipelines?module=leads
POST /lead-settings/pipelines
PUT /lead-settings/pipelines/:id
DELETE /lead-settings/pipelines/:id

Stages

GET    /lead-settings/stages?module=leads&pipelineId=uuid
POST /lead-settings/stages
PUT /lead-settings/stages/:id
DELETE /lead-settings/stages/:id
PUT /lead-settings/stages/reorder (body: { stages: [{ id, position }] })

Stage Fields

GET    /lead-settings/stages/:stageId/fields
POST /lead-settings/stages/:stageId/fields
PUT /lead-settings/stages/:stageId/fields/:fieldId
DELETE /lead-settings/stages/:stageId/fields/:fieldId

Stage Ownership

GET    /lead-settings/stage-ownership/:stageId
PUT /lead-settings/stage-ownership/:stageId

Field Visibility

GET    /lead-settings/field-visibility/:stageId
PUT /lead-settings/field-visibility/:stageId

Frontend API Calls

// leads.api.ts
export const leadSettingsApi = {
// Pipelines
getPipelines: async (module: string = 'leads') => {
const { data } = await api.get('/lead-settings/pipelines', { params: { module } });
return data;
},

// Stages
getStages: async (pipelineId: string, module: string = 'leads') => {
const { data } = await api.get('/lead-settings/stages', {
params: { pipelineId, module },
});
return data;
},

// Stage fields
getStageFields: async (stageId: string) => {
const { data } = await api.get(`/lead-settings/stages/${stageId}/fields`);
return data;
},
};

// opportunities.api.ts — reuses the same endpoints
export const opportunitySettingsApi = {
getPipelines: () => leadSettingsApi.getPipelines('opportunities'),
getStages: (pipelineId: string) => leadSettingsApi.getStages(pipelineId, 'opportunities'),
};

Stage Change Flow

When a record moves between stages:

1. Client calls POST /leads/:id/change-stage
Body: { stageId, stageFieldValues? }

2. Service validates:
a. Record exists and belongs to user's access scope
b. Target stage exists in the record's pipeline
c. All required stage fields have values

3. If stage has ownership:
a. Check stage_owner_type
b. Create/update record_stage_assignments entry

4. Update record's stage_id

5. Log audit trail (AuditService)

6. Create activity entry (ActivityService)
"Stage changed from 'New' to 'Qualified'"

7. Return updated record
async changeStage(schemaName: string, userId: string, leadId: string, dto: ChangeStageDto) {
// 1. Validate required stage fields
const requiredFields = await this.dataSource.query(
`SELECT * FROM "${schemaName}".pipeline_stage_fields
WHERE stage_id = $1 AND is_required = true`,
[dto.stageId],
);

for (const field of requiredFields) {
if (!dto.stageFieldValues?.[field.field_name]) {
throw new BadRequestException(`Field "${field.field_label}" is required for this stage`);
}
}

// 2. Get current and target stage names for activity log
const [currentStage] = await this.dataSource.query(
`SELECT ps.name FROM "${schemaName}".pipeline_stages ps
JOIN "${schemaName}".leads l ON l.stage_id = ps.id
WHERE l.id = $1`,
[leadId],
);

const [targetStage] = await this.dataSource.query(
`SELECT name FROM "${schemaName}".pipeline_stages WHERE id = $1`,
[dto.stageId],
);

// 3. Update the record
await this.dataSource.query(
`UPDATE "${schemaName}".leads SET stage_id = $1, updated_at = NOW() WHERE id = $2`,
[dto.stageId, leadId],
);

// 4. Handle stage assignment
await this.dataSource.query(
`INSERT INTO "${schemaName}".record_stage_assignments
(entity_type, entity_id, stage_id, assigned_to, assigned_at)
VALUES ('leads', $1, $2, $3, NOW())`,
[leadId, dto.stageId, userId],
);

// 5. Audit + Activity
await this.auditService.log(schemaName, { /* ... */ });
await this.activityService.create(schemaName, {
entityType: 'leads',
entityId: leadId,
activityType: 'stage_change',
title: `Stage changed from "${currentStage.name}" to "${targetStage.name}"`,
performedBy: userId,
});
}

Stage Ordering and Reordering

Stages have a position column that determines display order. Reordering is done via bulk update:

// PUT /lead-settings/stages/reorder
// Body: { stages: [{ id: "uuid-1", position: 1 }, { id: "uuid-2", position: 2 }] }

async reorderStages(schemaName: string, stages: { id: string; position: number }[]) {
for (const stage of stages) {
await this.dataSource.query(
`UPDATE "${schemaName}".pipeline_stages SET position = $1 WHERE id = $2`,
[stage.position, stage.id],
);
}
}
tip

When creating a new stage, set its position to MAX(position) + 1 from the existing stages in the same pipeline to append it at the end.