← Back to Projects

SI Project Architecture

A centralised pipeline management platform that coordinates data copy operations between SQL databases and Azure targets. Inspired by Azure Data Factory’s limitations with Self-hosted Integration Runtime, this application provides unlimited control over data extraction and transformation patterns. The same pipelines that ran on SHIR caused VMs to run at 80-100% CPU; after implementing si_app, CPU usage barely reached 20% with ingestion eight times faster on average.


System Overview

graph TB subgraph "SI Project Architecture" subgraph "Control Plane" SA[si_app Web Interface & API] DB[(SQLite Database)] SCHED[gocron Scheduler] end subgraph "Message Broker" OPQ[OPQ Broker Message Queue] end subgraph "Worker Fleet" W1[si_worker 1] W2[si_worker 2] W3[si_worker N] end subgraph "Data Sources and Targets" SQL[(SQL Databases Source Systems)] AZURE[Azure Storage Target Systems] end subgraph "User Interface" WEB[Web Browser Dashboard] end end WEB --> SA SA --> DB SA --> SCHED SA --> OPQ SCHED --> OPQ OPQ --> W1 OPQ --> W2 OPQ --> W3 W1 --> SQL W1 --> AZURE W2 --> SQL W2 --> AZURE W3 --> SQL W3 --> AZURE W1 --> SA W2 --> SA W3 --> SA

Data Flow Pipeline

sequenceDiagram participant User participant SA as si_app participant DB as SQLite participant OPQ as OPQ Broker participant Worker as si_worker participant SQL as SQL DB participant Azure as Azure Storage User->>SA: Create Pipeline (Web UI) SA->>DB: Store Pipeline Config SA->>User: Success Response User->>SA: Trigger Pipeline SA->>OPQ: Enqueue Job (manual) Note over SA,SCHED: Scheduled Execution SCHED->>SA: Cron Trigger SA->>OPQ: Enqueue Job (scheduled) OPQ->>Worker: Assign Job Worker->>SA: Status: running SA->>User: WebSocket: Job Started Worker->>SQL: Execute Query / Read Table Worker->>Azure: Upload Data Worker->>SA: Status: success SA->>DB: Store History SA->>User: WebSocket: Job Complete alt Job Fails Worker->>SA: Status: error SA->>DB: Store Error History SA->>User: WebSocket: Job Failed end

Web Interface Architecture

graph LR subgraph "Web Application" subgraph "Pages" DASH[Dashboard /] PIPE[Pipelines /pipelines] CREATE[Create Pipeline /create_pipeline] EDIT[Edit Pipeline /edit_pipeline] HIST[History /history] end subgraph "API Endpoints" REST[REST Pipeline Management] WS[WebSocket Live Updates] STATUS[Status Worker Jobs] end subgraph "Backend Services" HAND[HTTP Handlers] SCHEDULER[gocron Engine] BROADCAST[WebSocket Hub] end end USER[User] --> DASH USER --> PIPE USER --> CREATE USER --> EDIT USER --> HIST DASH --> REST PIPE --> REST CREATE --> REST EDIT --> REST HIST --> REST DASH --> WS PIPE --> WS REST --> HAND WS --> BROADCAST STATUS --> HAND HAND --> SCHEDULER BROADCAST --> STATUS

Worker Processing Flow

flowchart TD START([Worker Start]) --> INIT[Load Configuration] INIT --> HEARTBEAT[Send Heartbeat to si_app] HEARTBEAT --> POLL[Poll OPQ for Jobs] POLL --> JOB{Job Available} JOB -->|No| WAIT[Wait 5 seconds] WAIT --> HEARTBEAT JOB -->|Yes| FETCH[Fetch Job Details] FETCH --> STATUS_RUNNING[Send Status running] STATUS_RUNNING --> TYPE{Pipeline Type} TYPE -->|Query| EXEC_QUERY[Execute SQL Query] TYPE -->|Table| READ_TABLE[Read Full Table] EXEC_QUERY --> PROCESS[Process Data in Chunks] READ_TABLE --> PROCESS PROCESS --> UPLOAD[Upload to Azure] UPLOAD --> STATUS_SUCCESS[Send Status success] STATUS_SUCCESS --> HEARTBEAT FETCH -->|Error| STATUS_ERROR[Send Status error] EXEC_QUERY -->|Error| STATUS_ERROR READ_TABLE -->|Error| STATUS_ERROR PROCESS -->|Error| STATUS_ERROR UPLOAD -->|Error| STATUS_ERROR STATUS_ERROR --> HEARTBEAT

SQL Operations

flowchart TD START([Data Collection Start]) --> MODE{Collection Mode} MODE -->|Full Load| FULL[Full Table Load SELECT star FROM table] MODE -->|Partial| PARTIAL[Partial Collection WHERE conditions] FULL --> CHUNK[Process in Chunks Configurable Size] PARTIAL --> CHUNK CHUNK --> TRANSFORM[Data Transformation] TRANSFORM --> UPSERT{Upsert Strategy} UPSERT -->|Insert New| INSERT[INSERT new records] UPSERT -->|Update Existing| UPDATE[UPDATE existing records] UPSERT -->|Skip Duplicates| SKIP[SKIP duplicate records] INSERT --> BATCH[Batch Processing] UPDATE --> BATCH SKIP --> BATCH BATCH --> READY[Data Ready for Azure Upload]

Azure Storage Patterns

graph TB subgraph "Azure Storage Options" subgraph "Flat Storage" BLOB[Azure Blob Storage CSV JSON Parquet] FILES[Single Files Large Datasets] end subgraph "Hierarchical Storage" DELTA[Delta Lake Partitioned Data] FOLDERS[Folder Structure Hierarchical] end end subgraph "Storage Patterns" subgraph "By Date" DAILY[Daily Folders] MONTHLY[Monthly Folders] end subgraph "By Type" TYPE_FOLDERS[Type-Based] SCHEMA[Schema-Based] end subgraph "By Source" SOURCE_FOLDERS[Source System] PIPELINE[Pipeline-Based] end end BLOB --> DAILY BLOB --> MONTHLY BLOB --> TYPE_FOLDERS BLOB --> SOURCE_FOLDERS DELTA --> DAILY DELTA --> MONTHLY DELTA --> TYPE_FOLDERS DELTA --> SOURCE_FOLDERS FILES --> DAILY FILES --> MONTHLY FILES --> TYPE_FOLDERS FILES --> SOURCE_FOLDERS FOLDERS --> DAILY FOLDERS --> MONTHLY FOLDERS --> TYPE_FOLDERS FOLDERS --> SOURCE_FOLDERS

Message Flow with OPQ

graph TB subgraph "Message Flow" subgraph "Producers" SA[si_app Manual Triggers] SCHED[gocron Scheduled Triggers] end subgraph "Message Broker" OPQ[OPQ Broker] QUEUE[Job Queue] PERSIST[Persistent Storage] end subgraph "Consumers" W1[Worker 1] W2[Worker 2] W3[Worker N] end end SA --> OPQ SCHED --> OPQ OPQ --> QUEUE QUEUE --> PERSIST QUEUE --> W1 QUEUE --> W2 QUEUE --> W3 W1 --> OPQ W2 --> OPQ W3 --> OPQ

Error Handling

stateDiagram-v2 [*] --> Ready Ready --> Processing: Job Assigned Processing --> Running: Start Execution Running --> Success: Complete Running --> Error: Failure Error --> Ready: Retry Recover Success --> Ready: Next Job Running --> Timeout: No Heartbeat Timeout --> Ready: Worker Restart Ready --> Connected: Register with si_app Connected --> Ready: Heartbeat Sent Connected --> Disconnected: Connection Lost Disconnected --> Connected: Reconnect

Horizontal Scaling

graph TB subgraph "Horizontal Scaling" subgraph "Load Balancing" LB[OPQ Broker Load Balancer] end subgraph "Worker Pool" W1[Worker 1] W2[Worker 2] W3[Worker 3] WN[Worker N] end subgraph "Data Sources" DB1[(Database 1)] DB2[(Database 2)] AZ1[Azure Container 1] AZ2[Azure Container 2] end end LB --> W1 LB --> W2 LB --> W3 LB --> WN W1 --> DB1 W1 --> AZ1 W2 --> DB1 W2 --> AZ2 W3 --> DB2 W3 --> AZ1 WN --> DB2 WN --> AZ2

Configuration Management

graph LR subgraph "Configuration Layers" ENV[Environment Variables] JSON[Config Files] DB[Database Settings] end subgraph "Components" SA[si_app Config] OPQ[OPQ Config] WORKER[Worker Config] end ENV --> SA JSON --> SA JSON --> OPQ JSON --> WORKER DB --> SA subgraph "Config Types" AZURE[Azure Credentials] SQL[SQL Connection] SCHED[Cron Schedules] WORKER_SETTINGS[Worker Settings] end AZURE --> WORKER SQL --> SA SCHED --> SA WORKER_SETTINGS --> WORKER

Monitoring

graph TB subgraph "Monitoring Stack" subgraph "Metrics Collection" HEART[Worker Heartbeats] STATUS[Job Status Updates] PERF[Performance Metrics] end subgraph "Real-time Updates" WS[WebSocket Hub] DASH[Live Dashboard] end subgraph "Historical Data" HIST_DB[(History Database)] LOGS[Execution Logs] end end HEART --> WS STATUS --> WS PERF --> WS WS --> DASH STATUS --> HIST_DB PERF --> LOGS subgraph "Alerting" ERRORS[Error Tracking] TIMEOUT[Timeout Detection] end ERRORS --> DASH TIMEOUT --> DASH

Key Architectural Principles

Impact

← Back to Projects