Real-Time Data Sync with N8N - PostgreSQL to MongoDB
A practical guide to building automated data synchronization between PostgreSQL and MongoDB using N8N workflows, webhooks, and Redis for real-time ETL processing.
The Problem
You have data in PostgreSQL that needs to be in MongoDB for analytics. Keeping them in sync manually is tedious and error-prone. Here's how to automate it with N8N.
Why N8N?
What you need:
- Capture changes from PostgreSQL
- Transform relational data to documents
- Write to MongoDB in real-time
- Handle errors and retries
Why N8N works:
- Visual workflow builder
- Webhook triggers for real-time processing
- Built-in PostgreSQL and MongoDB nodes
- Custom TypeScript for complex logic
- Self-hosted = full control
Architecture
Simple flow:
PostgreSQL → Webhook → N8N → Transform → Redis (dedupe) → MongoDB
Components:
- PostgreSQL triggers send webhooks on changes
- N8N receives and processes events
- Redis prevents duplicate processing
- MongoDB stores transformed documents
Step 1: Set Up N8N
Deploy with Docker:
# docker-compose.yml
version: '3.8'
services:
n8n:
image: n8nio/n8n
ports:
- "5678:5678"
environment:
- N8N_BASIC_AUTH_ACTIVE=true
- N8N_BASIC_AUTH_USER=admin
- N8N_BASIC_AUTH_PASSWORD=your_password
- N8N_HOST=yourdomain.com
- WEBHOOK_URL=https://yourdomain.com/
volumes:
- n8n_data:/home/node/.n8n
depends_on:
- redis
redis:
image: redis:alpine
ports:
- "6379:6379"
volumes:
n8n_data:Start it:
docker-compose up -dAccess N8N at http://localhost:5678
Step 2: Create the Sync Workflow
In N8N, create a workflow with these nodes:
1. Webhook Node (Trigger)
- Create webhook endpoint
- Listen for POST requests
- Receives PostgreSQL change events
2. Function Node (Validate)
// Validate incoming data
const data = $input.item.json;
if (!data.id || !data.table) {
throw new Error('Invalid webhook payload');
}
return { json: data };3. Redis Node (Check Duplicate)
// Generate hash of record
const crypto = require('crypto');
const hash = crypto
.createHash('md5')
.update(JSON.stringify($input.item.json))
.digest('hex');
// Check if already processed
const key = `processed:${hash}`;4. PostgreSQL Node (Fetch Full Record)
SELECT * FROM users WHERE id = {{ $json.id }}5. Function Node (Transform)
// Transform relational to document
const record = $input.item.json;
return {
json: {
_id: record.id,
name: record.first_name + ' ' + record.last_name,
email: record.email,
created_at: new Date(record.created_at),
metadata: {
synced_at: new Date(),
source: 'postgresql'
}
}
};6. MongoDB Node (Upsert)
- Operation: Update
- Update Key: _id
- Options: Upsert = true
7. Redis Node (Mark Processed)
- SET key with 5 minute TTL
- Prevents reprocessing
Step 3: Set Up PostgreSQL Triggers
Send webhooks from PostgreSQL when data changes:
-- Create notification function
CREATE OR REPLACE FUNCTION notify_data_change()
RETURNS TRIGGER AS $$
DECLARE
payload TEXT;
BEGIN
payload := json_build_object(
'table', TG_TABLE_NAME,
'action', TG_OP,
'id', NEW.id,
'data', row_to_json(NEW)
)::text;
PERFORM pg_notify('data_changes', payload);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Attach to table
CREATE TRIGGER users_change_trigger
AFTER INSERT OR UPDATE ON users
FOR EACH ROW
EXECUTE FUNCTION notify_data_change();Set up a listener to forward to N8N webhook:
// listener.js
const { Client } = require('pg');
const axios = require('axios');
const client = new Client({
connectionString: process.env.DATABASE_URL
});
client.connect();
client.query('LISTEN data_changes');
client.on('notification', async (msg) => {
const payload = JSON.parse(msg.payload);
// Send to N8N webhook
await axios.post('https://your-n8n.com/webhook/data-sync', payload);
});Step 4: Add Custom TypeScript Node
For complex transformations, create a custom node:
// Custom transformer node
import { INodeType, INodeTypeDescription } from 'n8n-workflow';
export class PostgresqlToMongodbTransformer implements INodeType {
description: INodeTypeDescription = {
displayName: 'PostgreSQL to MongoDB Transformer',
name: 'postgresqlToMongodbTransformer',
group: ['transform'],
version: 1,
description: 'Transform PostgreSQL records to MongoDB documents',
defaults: {
name: 'Transform',
},
inputs: ['main'],
outputs: ['main'],
properties: [
{
displayName: 'Field Mappings',
name: 'mappings',
type: 'fixedCollection',
typeOptions: {
multipleValues: true,
},
default: {},
},
],
};
async execute(this: IExecuteFunctions) {
const items = this.getInputData();
const returnData: INodeExecutionData[] = [];
for (let i = 0; i < items.length; i++) {
const record = items[i].json;
// Transform logic
const transformed = {
_id: record.id,
...this.transformFields(record),
metadata: {
synced_at: new Date(),
source: 'postgresql'
}
};
returnData.push({ json: transformed });
}
return [returnData];
}
}Step 5: Handle Errors
Add error handling to your workflow:
1. Use Error Workflow
- Click workflow settings
- Enable "Error Workflow"
- Route failed executions to error handler
2. Add Retry Logic
// In Function node
const maxRetries = 3;
const retryCount = $executionMode === 'retry'
? parseInt($workflow.settings.retryCount || '0')
: 0;
if (retryCount < maxRetries) {
// Retry with exponential backoff
await new Promise(resolve =>
setTimeout(resolve, Math.pow(2, retryCount) * 1000)
);
throw new Error('Retry'); // Triggers retry
}3. Log Errors
// Send to Slack on error
const error = {
workflow: $workflow.name,
execution: $execution.id,
error: $json.error,
timestamp: new Date()
};
// Post to Slack webhook
await axios.post(SLACK_WEBHOOK_URL, {
text: `Sync failed: ${JSON.stringify(error)}`
});Step 6: Monitor Performance
Track what matters in N8N:
Built-in Metrics:
- Check execution history
- Monitor success/failure rates
- Track execution duration
- Review error patterns
Custom Monitoring Workflow:
// Health check workflow (runs every 5 min)
const pgCount = await postgres.query('SELECT COUNT(*) FROM users');
const mongoCount = await mongodb.count('users');
if (Math.abs(pgCount - mongoCount) > 100) {
// Counts don't match - alert
await slack.send(`Sync drift detected: PG=${pgCount}, Mongo=${mongoCount}`);
}Key metrics to watch:
- Sync latency (should be <1 second)
- Error rate (should be <1%)
- Queue depth (should be low)
- Duplicate rate (Redis hit rate)
Scaling Tips
When processing high volumes:
1. Batch Processing
// Group records into batches of 100
const BATCH_SIZE = 100;
const batches = [];
for (let i = 0; i < items.length; i += BATCH_SIZE) {
batches.push(items.slice(i, i + BATCH_SIZE));
}
// Process batches
for (const batch of batches) {
await mongodb.bulkWrite(batch);
}2. Rate Limiting
- Set N8N execution concurrency limit
- Use queue mode for reliability
- Add delays between batches if needed
3. Connection Pooling
- Reuse database connections
- Set appropriate pool sizes
- Monitor connection usage
4. Redis Deduplication
- Essential for preventing duplicate work
- Hash incoming records
- Check before processing
- Set 5-minute TTL
5. Multiple Workers
# docker-compose.yml for scaling
services:
n8n-worker-1:
image: n8nio/n8n
environment:
- EXECUTIONS_MODE=queue
n8n-worker-2:
image: n8nio/n8n
environment:
- EXECUTIONS_MODE=queueReal Results
What we achieved:
- 500K+ records synced daily
- Sub-second latency (50ms average)
- 99.9% success rate
- Real-time analytics instead of hours of delay
- Zero manual intervention
Cost:
- EC2 t3.medium: ~$50/month
- Redis: ~$30/month
- Total: ~$80/month
- vs. manual work: saved 40 hours/week
Common Issues and Fixes
Webhook floods:
- Problem: Bulk updates trigger thousands of webhooks
- Fix: Use queue mode, add rate limiting, batch similar changes
Duplicate processing:
- Problem: Same record processed multiple times
- Fix: Redis deduplication with hash + TTL
Connection timeouts:
- Problem: Database connections exhausted
- Fix: Connection pooling, proper cleanup
Transform errors:
- Problem: NULL values, type mismatches
- Fix: Add validation, handle NULLs gracefully
Slow performance:
- Problem: Processing 1 record at a time
- Fix: Batch writes, parallel processing
Key Takeaways
What works:
- N8N perfect for visual ETL workflows
- Webhooks enable real-time sync
- Redis essential for deduplication
- Custom TypeScript nodes for complex logic
- Start simple, add complexity gradually
Remember:
- Always add error handling and retries
- Monitor from day one
- Test with production-like volumes
- Document your workflows
- Use Redis to prevent duplicate works
Quick Start Checklist
- Deploy N8N with Docker
- Create webhook endpoint
- Set up PostgreSQL triggers
- Build basic sync workflow
- Add Redis deduplication
- Implement error handling
- Add monitoring
- Test with sample data
- Deploy and monitor
Start with one table, get it working, then expand.