How to Build a Lead Enrichment Pipeline with APIs (2026)
Step-by-step guide to building a lead enrichment pipeline using APIs. Architecture, code examples, best practices, and common pitfalls.
What is a Lead Enrichment Pipeline?
A lead enrichment pipeline automatically enhances your lead data with additional information from external sources. Instead of manually researching each lead, the pipeline fetches company data, professional profiles, and contact information through APIs, then stores the enriched data in your CRM or database.
This guide will show you how to build a production-ready enrichment pipeline that processes thousands of leads automatically.
Pipeline Architecture Overview
A typical lead enrichment pipeline consists of:
- Data Input: Leads enter from forms, CSV uploads, or CRM webhooks
- Queue System: Leads are queued for processing
- Enrichment Worker: Fetches data from enrichment APIs
- Data Validation: Validates and cleans enriched data
- Storage: Saves enriched data to database/CRM
- Notifications: Alerts sales team of high-value leads
Step 1: Set Up Your Environment
Required Tools
- Node.js 18+ or Python 3.9+
- Database: PostgreSQL or MongoDB
- Queue System: Bull (Redis) or AWS SQS
- Enrichment API: Netrows account with API key
Install Dependencies
# Node.js npm install bull redis pg axios # Python pip install celery redis psycopg2 requests
Step 2: Create Database Schema
CREATE TABLE leads ( id SERIAL PRIMARY KEY, email VARCHAR(255) UNIQUE NOT NULL, first_name VARCHAR(100), last_name VARCHAR(100), company_domain VARCHAR(255), linkedin_url VARCHAR(500), enrichment_status VARCHAR(50) DEFAULT 'pending', enriched_at TIMESTAMP, created_at TIMESTAMP DEFAULT NOW() ); CREATE TABLE enriched_data ( id SERIAL PRIMARY KEY, lead_id INTEGER REFERENCES leads(id), job_title VARCHAR(255), company_name VARCHAR(255), company_size VARCHAR(50), industry VARCHAR(100), location VARCHAR(255), skills TEXT[], raw_data JSONB, created_at TIMESTAMP DEFAULT NOW() );
Step 3: Set Up Queue System
// queue.js
const Queue = require('bull');
const redis = require('redis');
const enrichmentQueue = new Queue('lead-enrichment', {
redis: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT
}
});
// Add lead to queue
async function queueLeadForEnrichment(leadId) {
await enrichmentQueue.add({
leadId,
attempts: 0
}, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
}
});
}
module.exports = { enrichmentQueue, queueLeadForEnrichment };Step 4: Build Enrichment Worker
// worker.js
const { enrichmentQueue } = require('./queue');
const axios = require('axios');
const db = require('./database');
enrichmentQueue.process(async (job) => {
const { leadId } = job.data;
try {
// Fetch lead from database
const lead = await db.query(
'SELECT * FROM leads WHERE id = $1',
[leadId]
);
if (!lead.rows[0]) {
throw new Error('Lead not found');
}
const leadData = lead.rows[0];
// Enrich with Netrows API
const enrichedData = await enrichLead(leadData);
// Save enriched data
await saveEnrichedData(leadId, enrichedData);
// Update lead status
await db.query(
'UPDATE leads SET enrichment_status = $1, enriched_at = NOW() WHERE id = $2',
['completed', leadId]
);
return { success: true, leadId };
} catch (error) {
console.error('Enrichment failed:', error);
await db.query(
'UPDATE leads SET enrichment_status = $1 WHERE id = $2',
['failed', leadId]
);
throw error;
}
});
async function enrichLead(lead) {
const response = await axios.post(
'https://api.netrows.com/v1/people/profile',
{
linkedin_url: lead.linkedin_url || undefined,
email: lead.email || undefined
},
{
headers: {
'Authorization': `Bearer ${process.env.NETROWS_API_KEY}`,
'Content-Type': 'application/json'
}
}
);
return response.data;
}
async function saveEnrichedData(leadId, data) {
await db.query(
`INSERT INTO enriched_data
(lead_id, job_title, company_name, company_size, industry, location, skills, raw_data)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
[
leadId,
data.job_title,
data.company_name,
data.company_size,
data.industry,
data.location,
data.skills,
JSON.stringify(data)
]
);
}Step 5: Create API Endpoint
// api.js
const express = require('express');
const { queueLeadForEnrichment } = require('./queue');
const db = require('./database');
const app = express();
app.use(express.json());
// Add new lead
app.post('/api/leads', async (req, res) => {
try {
const { email, first_name, last_name, company_domain, linkedin_url } = req.body;
// Validate input
if (!email && !linkedin_url) {
return res.status(400).json({
error: 'Email or LinkedIn URL required'
});
}
// Insert lead
const result = await db.query(
`INSERT INTO leads (email, first_name, last_name, company_domain, linkedin_url)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (email) DO UPDATE SET
first_name = EXCLUDED.first_name,
last_name = EXCLUDED.last_name
RETURNING id`,
[email, first_name, last_name, company_domain, linkedin_url]
);
const leadId = result.rows[0].id;
// Queue for enrichment
await queueLeadForEnrichment(leadId);
res.json({
success: true,
leadId,
message: 'Lead queued for enrichment'
});
} catch (error) {
console.error('Error adding lead:', error);
res.status(500).json({ error: 'Failed to add lead' });
}
});
// Get enriched lead
app.get('/api/leads/:id', async (req, res) => {
try {
const { id } = req.params;
const result = await db.query(
`SELECT l.*, e.* FROM leads l
LEFT JOIN enriched_data e ON l.id = e.lead_id
WHERE l.id = $1`,
[id]
);
if (result.rows.length === 0) {
return res.status(404).json({ error: 'Lead not found' });
}
res.json(result.rows[0]);
} catch (error) {
console.error('Error fetching lead:', error);
res.status(500).json({ error: 'Failed to fetch lead' });
}
});
app.listen(3000, () => {
console.log('API server running on port 3000');
});Step 6: Add Rate Limiting
Implement rate limiting to respect API limits and avoid throttling:
// rate-limiter.js
const Bottleneck = require('bottleneck');
// Netrows allows 10-100 req/min depending on plan
const limiter = new Bottleneck({
minTime: 600, // 100 requests per minute
maxConcurrent: 5
});
async function enrichWithRateLimit(lead) {
return limiter.schedule(() => enrichLead(lead));
}
module.exports = { enrichWithRateLimit };Step 7: Implement Error Handling
// error-handler.js
class EnrichmentError extends Error {
constructor(message, code, retryable = false) {
super(message);
this.code = code;
this.retryable = retryable;
}
}
async function enrichWithErrorHandling(lead) {
try {
const response = await axios.post(
'https://api.netrows.com/v1/people/profile',
{ linkedin_url: lead.linkedin_url },
{
headers: {
'Authorization': `Bearer ${process.env.NETROWS_API_KEY}`,
'Content-Type': 'application/json'
},
timeout: 10000
}
);
return response.data;
} catch (error) {
if (error.response) {
const status = error.response.status;
if (status === 429) {
throw new EnrichmentError(
'Rate limit exceeded',
'RATE_LIMIT',
true
);
} else if (status === 404) {
throw new EnrichmentError(
'Profile not found',
'NOT_FOUND',
false
);
} else if (status >= 500) {
throw new EnrichmentError(
'API server error',
'SERVER_ERROR',
true
);
}
}
throw new EnrichmentError(
'Enrichment failed',
'UNKNOWN_ERROR',
true
);
}
}Step 8: Add Monitoring & Logging
// monitoring.js
const winston = require('winston');
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' })
]
});
// Track metrics
const metrics = {
totalProcessed: 0,
successful: 0,
failed: 0,
avgProcessingTime: 0
};
function logEnrichment(leadId, success, duration, error = null) {
metrics.totalProcessed++;
if (success) {
metrics.successful++;
} else {
metrics.failed++;
}
metrics.avgProcessingTime =
(metrics.avgProcessingTime * (metrics.totalProcessed - 1) + duration) /
metrics.totalProcessed;
logger.info({
event: 'enrichment_completed',
leadId,
success,
duration,
error: error?.message,
metrics
});
}
module.exports = { logger, logEnrichment, metrics };Best Practices
1. Implement Caching
Cache enriched data to avoid redundant API calls:
- Cache by email or LinkedIn URL
- Set TTL to 30-90 days
- Use Redis for fast lookups
- Invalidate cache on manual refresh
2. Batch Processing
Process leads in batches for better efficiency:
- Group leads by priority
- Process high-value leads first
- Use parallel processing (5-10 concurrent requests)
- Implement backpressure handling
3. Data Validation
Validate enriched data before saving:
- Check for required fields
- Validate email format
- Verify company domain
- Flag suspicious data
4. Retry Strategy
Implement smart retries for failed enrichments:
- Exponential backoff (2s, 4s, 8s)
- Maximum 3 retry attempts
- Don't retry 404 errors
- Retry 429 and 5xx errors
5. Cost Optimization
Minimize API costs:
- Check cache before API call
- Deduplicate leads before enrichment
- Only enrich qualified leads
- Use webhooks for real-time updates
Common Pitfalls to Avoid
1. Not Handling Rate Limits
Always implement rate limiting to avoid API throttling. Use libraries like Bottleneck or implement custom rate limiters.
2. Synchronous Processing
Never enrich leads synchronously in API requests. Always use a queue system for background processing.
3. No Error Recovery
Implement proper error handling and retry logic. Failed enrichments should be retried automatically.
4. Ignoring Data Quality
Validate and clean data before and after enrichment. Bad input data leads to poor enrichment results.
5. No Monitoring
Track success rates, processing times, and error rates. Set up alerts for anomalies.
Scaling Your Pipeline
Horizontal Scaling
Run multiple worker instances:
- Deploy workers on multiple servers
- Use container orchestration (Kubernetes)
- Auto-scale based on queue depth
- Load balance across workers
Database Optimization
Optimize database for high throughput:
- Add indexes on frequently queried columns
- Use connection pooling
- Partition large tables
- Archive old enrichment data
Queue Management
Optimize queue performance:
- Use priority queues for important leads
- Implement dead letter queues
- Monitor queue depth
- Set appropriate concurrency limits
Frequently Asked Questions
How many leads can I enrich per hour?
With Netrows' 100 req/min rate limit, you can enrich up to 6,000 leads per hour. Use multiple API keys or upgrade to enterprise for higher limits.
Should I use a queue system or process leads directly?
Always use a queue system for production. Queues provide retry logic, rate limiting, and prevent API timeouts. Direct processing is only suitable for small-scale testing.
How do I handle duplicate leads?
Use database constraints (UNIQUE on email) and check cache before enriching. Implement deduplication logic in your API endpoint.
What's the best way to prioritize leads?
Use priority queues with scoring based on lead source, company size, or engagement level. Process high-priority leads first.
How long should I cache enriched data?
Cache for 30-90 days depending on your use case. Sales teams need fresher data (30 days), while market research can use longer cache periods (90 days).
Should I enrich all leads or only qualified ones?
Only enrich qualified leads to save costs. Implement lead scoring and only enrich leads that meet your criteria (e.g., company size, industry, engagement level).
How do I handle API failures?
Implement retry logic with exponential backoff. Log failures for manual review. Set up alerts for high failure rates.
Can I enrich leads in real-time?
Yes, but use webhooks or async processing. Never block user requests waiting for enrichment. Show a loading state and update the UI when enrichment completes.
Ready to Build Your Enrichment Pipeline?
Netrows provides fast, reliable APIs for building production-grade enrichment pipelines. Get started with 100 free credits today.