Command Palette

Search for a command to run...

Blog
Previous

Real-Time Data Sync with N8N - PostgreSQL to MongoDB

A practical guide to building automated data synchronization between PostgreSQL and MongoDB using N8N workflows, webhooks, and Redis for real-time ETL processing.

The Problem

You have data in PostgreSQL that needs to be in MongoDB for analytics. Keeping them in sync manually is tedious and error-prone. Here's how to automate it with N8N.

Why N8N?

What you need:

  • Capture changes from PostgreSQL
  • Transform relational data to documents
  • Write to MongoDB in real-time
  • Handle errors and retries

Why N8N works:

  • Visual workflow builder
  • Webhook triggers for real-time processing
  • Built-in PostgreSQL and MongoDB nodes
  • Custom TypeScript for complex logic
  • Self-hosted = full control

Architecture

Simple flow:

PostgreSQL → Webhook → N8N → Transform → Redis (dedupe) → MongoDB

Components:

  • PostgreSQL triggers send webhooks on changes
  • N8N receives and processes events
  • Redis prevents duplicate processing
  • MongoDB stores transformed documents

Step 1: Set Up N8N

Deploy with Docker:

# docker-compose.yml
version: '3.8'
services:
  n8n:
    image: n8nio/n8n
    ports:
      - "5678:5678"
    environment:
      - N8N_BASIC_AUTH_ACTIVE=true
      - N8N_BASIC_AUTH_USER=admin
      - N8N_BASIC_AUTH_PASSWORD=your_password
      - N8N_HOST=yourdomain.com
      - WEBHOOK_URL=https://yourdomain.com/
    volumes:
      - n8n_data:/home/node/.n8n
    depends_on:
      - redis
 
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
 
volumes:
  n8n_data:

Start it:

docker-compose up -d

Access N8N at http://localhost:5678

Step 2: Create the Sync Workflow

In N8N, create a workflow with these nodes:

1. Webhook Node (Trigger)

  • Create webhook endpoint
  • Listen for POST requests
  • Receives PostgreSQL change events

2. Function Node (Validate)

// Validate incoming data
const data = $input.item.json;
 
if (!data.id || !data.table) {
  throw new Error('Invalid webhook payload');
}
 
return { json: data };

3. Redis Node (Check Duplicate)

// Generate hash of record
const crypto = require('crypto');
const hash = crypto
  .createHash('md5')
  .update(JSON.stringify($input.item.json))
  .digest('hex');
 
// Check if already processed
const key = `processed:${hash}`;

4. PostgreSQL Node (Fetch Full Record)

SELECT * FROM users WHERE id = {{ $json.id }}

5. Function Node (Transform)

// Transform relational to document
const record = $input.item.json;
 
return {
  json: {
    _id: record.id,
    name: record.first_name + ' ' + record.last_name,
    email: record.email,
    created_at: new Date(record.created_at),
    metadata: {
      synced_at: new Date(),
      source: 'postgresql'
    }
  }
};

6. MongoDB Node (Upsert)

  • Operation: Update
  • Update Key: _id
  • Options: Upsert = true

7. Redis Node (Mark Processed)

  • SET key with 5 minute TTL
  • Prevents reprocessing

Step 3: Set Up PostgreSQL Triggers

Send webhooks from PostgreSQL when data changes:

-- Create notification function
CREATE OR REPLACE FUNCTION notify_data_change()
RETURNS TRIGGER AS $$
DECLARE
  payload TEXT;
BEGIN
  payload := json_build_object(
    'table', TG_TABLE_NAME,
    'action', TG_OP,
    'id', NEW.id,
    'data', row_to_json(NEW)
  )::text;
  
  PERFORM pg_notify('data_changes', payload);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;
 
-- Attach to table
CREATE TRIGGER users_change_trigger
AFTER INSERT OR UPDATE ON users
FOR EACH ROW
EXECUTE FUNCTION notify_data_change();

Set up a listener to forward to N8N webhook:

// listener.js
const { Client } = require('pg');
const axios = require('axios');
 
const client = new Client({
  connectionString: process.env.DATABASE_URL
});
 
client.connect();
client.query('LISTEN data_changes');
 
client.on('notification', async (msg) => {
  const payload = JSON.parse(msg.payload);
  
  // Send to N8N webhook
  await axios.post('https://your-n8n.com/webhook/data-sync', payload);
});

Step 4: Add Custom TypeScript Node

For complex transformations, create a custom node:

// Custom transformer node
import { INodeType, INodeTypeDescription } from 'n8n-workflow';
 
export class PostgresqlToMongodbTransformer implements INodeType {
  description: INodeTypeDescription = {
    displayName: 'PostgreSQL to MongoDB Transformer',
    name: 'postgresqlToMongodbTransformer',
    group: ['transform'],
    version: 1,
    description: 'Transform PostgreSQL records to MongoDB documents',
    defaults: {
      name: 'Transform',
    },
    inputs: ['main'],
    outputs: ['main'],
    properties: [
      {
        displayName: 'Field Mappings',
        name: 'mappings',
        type: 'fixedCollection',
        typeOptions: {
          multipleValues: true,
        },
        default: {},
      },
    ],
  };
 
  async execute(this: IExecuteFunctions) {
    const items = this.getInputData();
    const returnData: INodeExecutionData[] = [];
 
    for (let i = 0; i < items.length; i++) {
      const record = items[i].json;
      
      // Transform logic
      const transformed = {
        _id: record.id,
        ...this.transformFields(record),
        metadata: {
          synced_at: new Date(),
          source: 'postgresql'
        }
      };
 
      returnData.push({ json: transformed });
    }
 
    return [returnData];
  }
}

Step 5: Handle Errors

Add error handling to your workflow:

1. Use Error Workflow

  • Click workflow settings
  • Enable "Error Workflow"
  • Route failed executions to error handler

2. Add Retry Logic

// In Function node
const maxRetries = 3;
const retryCount = $executionMode === 'retry' 
  ? parseInt($workflow.settings.retryCount || '0') 
  : 0;
 
if (retryCount < maxRetries) {
  // Retry with exponential backoff
  await new Promise(resolve => 
    setTimeout(resolve, Math.pow(2, retryCount) * 1000)
  );
  throw new Error('Retry'); // Triggers retry
}

3. Log Errors

// Send to Slack on error
const error = {
  workflow: $workflow.name,
  execution: $execution.id,
  error: $json.error,
  timestamp: new Date()
};
 
// Post to Slack webhook
await axios.post(SLACK_WEBHOOK_URL, {
  text: `Sync failed: ${JSON.stringify(error)}`
});

Step 6: Monitor Performance

Track what matters in N8N:

Built-in Metrics:

  • Check execution history
  • Monitor success/failure rates
  • Track execution duration
  • Review error patterns

Custom Monitoring Workflow:

// Health check workflow (runs every 5 min)
const pgCount = await postgres.query('SELECT COUNT(*) FROM users');
const mongoCount = await mongodb.count('users');
 
if (Math.abs(pgCount - mongoCount) > 100) {
  // Counts don't match - alert
  await slack.send(`Sync drift detected: PG=${pgCount}, Mongo=${mongoCount}`);
}

Key metrics to watch:

  • Sync latency (should be <1 second)
  • Error rate (should be <1%)
  • Queue depth (should be low)
  • Duplicate rate (Redis hit rate)

Scaling Tips

When processing high volumes:

1. Batch Processing

// Group records into batches of 100
const BATCH_SIZE = 100;
const batches = [];
 
for (let i = 0; i < items.length; i += BATCH_SIZE) {
  batches.push(items.slice(i, i + BATCH_SIZE));
}
 
// Process batches
for (const batch of batches) {
  await mongodb.bulkWrite(batch);
}

2. Rate Limiting

  • Set N8N execution concurrency limit
  • Use queue mode for reliability
  • Add delays between batches if needed

3. Connection Pooling

  • Reuse database connections
  • Set appropriate pool sizes
  • Monitor connection usage

4. Redis Deduplication

  • Essential for preventing duplicate work
  • Hash incoming records
  • Check before processing
  • Set 5-minute TTL

5. Multiple Workers

# docker-compose.yml for scaling
services:
  n8n-worker-1:
    image: n8nio/n8n
    environment:
      - EXECUTIONS_MODE=queue
      
  n8n-worker-2:
    image: n8nio/n8n
    environment:
      - EXECUTIONS_MODE=queue

Real Results

What we achieved:

  • 500K+ records synced daily
  • Sub-second latency (50ms average)
  • 99.9% success rate
  • Real-time analytics instead of hours of delay
  • Zero manual intervention

Cost:

  • EC2 t3.medium: ~$50/month
  • Redis: ~$30/month
  • Total: ~$80/month
  • vs. manual work: saved 40 hours/week

Common Issues and Fixes

Webhook floods:

  • Problem: Bulk updates trigger thousands of webhooks
  • Fix: Use queue mode, add rate limiting, batch similar changes

Duplicate processing:

  • Problem: Same record processed multiple times
  • Fix: Redis deduplication with hash + TTL

Connection timeouts:

  • Problem: Database connections exhausted
  • Fix: Connection pooling, proper cleanup

Transform errors:

  • Problem: NULL values, type mismatches
  • Fix: Add validation, handle NULLs gracefully

Slow performance:

  • Problem: Processing 1 record at a time
  • Fix: Batch writes, parallel processing

Key Takeaways

What works:

  • N8N perfect for visual ETL workflows
  • Webhooks enable real-time sync
  • Redis essential for deduplication
  • Custom TypeScript nodes for complex logic
  • Start simple, add complexity gradually

Remember:

  • Always add error handling and retries
  • Monitor from day one
  • Test with production-like volumes
  • Document your workflows
  • Use Redis to prevent duplicate works

Quick Start Checklist

  • Deploy N8N with Docker
  • Create webhook endpoint
  • Set up PostgreSQL triggers
  • Build basic sync workflow
  • Add Redis deduplication
  • Implement error handling
  • Add monitoring
  • Test with sample data
  • Deploy and monitor

Start with one table, get it working, then expand.