← Back to Projects
SI Project Architecture
A centralised pipeline management platform that coordinates data copy operations between SQL databases and Azure targets. Inspired by Azure Data Factory’s limitations with Self-hosted Integration Runtime, this application provides unlimited control over data extraction and transformation patterns. The same pipelines that ran on SHIR caused VMs to run at 80-100% CPU; after implementing si_app, CPU usage barely reached 20% with ingestion eight times faster on average.
System Overview
graph TB
subgraph "SI Project Architecture"
subgraph "Control Plane"
SA[si_app Web Interface & API]
DB[(SQLite Database)]
SCHED[gocron Scheduler]
end
subgraph "Message Broker"
OPQ[OPQ Broker Message Queue]
end
subgraph "Worker Fleet"
W1[si_worker 1]
W2[si_worker 2]
W3[si_worker N]
end
subgraph "Data Sources and Targets"
SQL[(SQL Databases Source Systems)]
AZURE[Azure Storage Target Systems]
end
subgraph "User Interface"
WEB[Web Browser Dashboard]
end
end
WEB --> SA
SA --> DB
SA --> SCHED
SA --> OPQ
SCHED --> OPQ
OPQ --> W1
OPQ --> W2
OPQ --> W3
W1 --> SQL
W1 --> AZURE
W2 --> SQL
W2 --> AZURE
W3 --> SQL
W3 --> AZURE
W1 --> SA
W2 --> SA
W3 --> SA
Data Flow Pipeline
sequenceDiagram
participant User
participant SA as si_app
participant DB as SQLite
participant OPQ as OPQ Broker
participant Worker as si_worker
participant SQL as SQL DB
participant Azure as Azure Storage
User->>SA: Create Pipeline (Web UI)
SA->>DB: Store Pipeline Config
SA->>User: Success Response
User->>SA: Trigger Pipeline
SA->>OPQ: Enqueue Job (manual)
Note over SA,SCHED: Scheduled Execution
SCHED->>SA: Cron Trigger
SA->>OPQ: Enqueue Job (scheduled)
OPQ->>Worker: Assign Job
Worker->>SA: Status: running
SA->>User: WebSocket: Job Started
Worker->>SQL: Execute Query / Read Table
Worker->>Azure: Upload Data
Worker->>SA: Status: success
SA->>DB: Store History
SA->>User: WebSocket: Job Complete
alt Job Fails
Worker->>SA: Status: error
SA->>DB: Store Error History
SA->>User: WebSocket: Job Failed
end
Web Interface Architecture
graph LR
subgraph "Web Application"
subgraph "Pages"
DASH[Dashboard /]
PIPE[Pipelines /pipelines]
CREATE[Create Pipeline /create_pipeline]
EDIT[Edit Pipeline /edit_pipeline]
HIST[History /history]
end
subgraph "API Endpoints"
REST[REST Pipeline Management]
WS[WebSocket Live Updates]
STATUS[Status Worker Jobs]
end
subgraph "Backend Services"
HAND[HTTP Handlers]
SCHEDULER[gocron Engine]
BROADCAST[WebSocket Hub]
end
end
USER[User] --> DASH
USER --> PIPE
USER --> CREATE
USER --> EDIT
USER --> HIST
DASH --> REST
PIPE --> REST
CREATE --> REST
EDIT --> REST
HIST --> REST
DASH --> WS
PIPE --> WS
REST --> HAND
WS --> BROADCAST
STATUS --> HAND
HAND --> SCHEDULER
BROADCAST --> STATUS
Worker Processing Flow
flowchart TD
START([Worker Start]) --> INIT[Load Configuration]
INIT --> HEARTBEAT[Send Heartbeat to si_app]
HEARTBEAT --> POLL[Poll OPQ for Jobs]
POLL --> JOB{Job Available}
JOB -->|No| WAIT[Wait 5 seconds]
WAIT --> HEARTBEAT
JOB -->|Yes| FETCH[Fetch Job Details]
FETCH --> STATUS_RUNNING[Send Status running]
STATUS_RUNNING --> TYPE{Pipeline Type}
TYPE -->|Query| EXEC_QUERY[Execute SQL Query]
TYPE -->|Table| READ_TABLE[Read Full Table]
EXEC_QUERY --> PROCESS[Process Data in Chunks]
READ_TABLE --> PROCESS
PROCESS --> UPLOAD[Upload to Azure]
UPLOAD --> STATUS_SUCCESS[Send Status success]
STATUS_SUCCESS --> HEARTBEAT
FETCH -->|Error| STATUS_ERROR[Send Status error]
EXEC_QUERY -->|Error| STATUS_ERROR
READ_TABLE -->|Error| STATUS_ERROR
PROCESS -->|Error| STATUS_ERROR
UPLOAD -->|Error| STATUS_ERROR
STATUS_ERROR --> HEARTBEAT
SQL Operations
flowchart TD
START([Data Collection Start]) --> MODE{Collection Mode}
MODE -->|Full Load| FULL[Full Table Load SELECT star FROM table]
MODE -->|Partial| PARTIAL[Partial Collection WHERE conditions]
FULL --> CHUNK[Process in Chunks Configurable Size]
PARTIAL --> CHUNK
CHUNK --> TRANSFORM[Data Transformation]
TRANSFORM --> UPSERT{Upsert Strategy}
UPSERT -->|Insert New| INSERT[INSERT new records]
UPSERT -->|Update Existing| UPDATE[UPDATE existing records]
UPSERT -->|Skip Duplicates| SKIP[SKIP duplicate records]
INSERT --> BATCH[Batch Processing]
UPDATE --> BATCH
SKIP --> BATCH
BATCH --> READY[Data Ready for Azure Upload]
Azure Storage Patterns
graph TB
subgraph "Azure Storage Options"
subgraph "Flat Storage"
BLOB[Azure Blob Storage CSV JSON Parquet]
FILES[Single Files Large Datasets]
end
subgraph "Hierarchical Storage"
DELTA[Delta Lake Partitioned Data]
FOLDERS[Folder Structure Hierarchical]
end
end
subgraph "Storage Patterns"
subgraph "By Date"
DAILY[Daily Folders]
MONTHLY[Monthly Folders]
end
subgraph "By Type"
TYPE_FOLDERS[Type-Based]
SCHEMA[Schema-Based]
end
subgraph "By Source"
SOURCE_FOLDERS[Source System]
PIPELINE[Pipeline-Based]
end
end
BLOB --> DAILY
BLOB --> MONTHLY
BLOB --> TYPE_FOLDERS
BLOB --> SOURCE_FOLDERS
DELTA --> DAILY
DELTA --> MONTHLY
DELTA --> TYPE_FOLDERS
DELTA --> SOURCE_FOLDERS
FILES --> DAILY
FILES --> MONTHLY
FILES --> TYPE_FOLDERS
FILES --> SOURCE_FOLDERS
FOLDERS --> DAILY
FOLDERS --> MONTHLY
FOLDERS --> TYPE_FOLDERS
FOLDERS --> SOURCE_FOLDERS
Message Flow with OPQ
graph TB
subgraph "Message Flow"
subgraph "Producers"
SA[si_app Manual Triggers]
SCHED[gocron Scheduled Triggers]
end
subgraph "Message Broker"
OPQ[OPQ Broker]
QUEUE[Job Queue]
PERSIST[Persistent Storage]
end
subgraph "Consumers"
W1[Worker 1]
W2[Worker 2]
W3[Worker N]
end
end
SA --> OPQ
SCHED --> OPQ
OPQ --> QUEUE
QUEUE --> PERSIST
QUEUE --> W1
QUEUE --> W2
QUEUE --> W3
W1 --> OPQ
W2 --> OPQ
W3 --> OPQ
Error Handling
stateDiagram-v2
[*] --> Ready
Ready --> Processing: Job Assigned
Processing --> Running: Start Execution
Running --> Success: Complete
Running --> Error: Failure
Error --> Ready: Retry Recover
Success --> Ready: Next Job
Running --> Timeout: No Heartbeat
Timeout --> Ready: Worker Restart
Ready --> Connected: Register with si_app
Connected --> Ready: Heartbeat Sent
Connected --> Disconnected: Connection Lost
Disconnected --> Connected: Reconnect
Horizontal Scaling
graph TB
subgraph "Horizontal Scaling"
subgraph "Load Balancing"
LB[OPQ Broker Load Balancer]
end
subgraph "Worker Pool"
W1[Worker 1]
W2[Worker 2]
W3[Worker 3]
WN[Worker N]
end
subgraph "Data Sources"
DB1[(Database 1)]
DB2[(Database 2)]
AZ1[Azure Container 1]
AZ2[Azure Container 2]
end
end
LB --> W1
LB --> W2
LB --> W3
LB --> WN
W1 --> DB1
W1 --> AZ1
W2 --> DB1
W2 --> AZ2
W3 --> DB2
W3 --> AZ1
WN --> DB2
WN --> AZ2
Configuration Management
graph LR
subgraph "Configuration Layers"
ENV[Environment Variables]
JSON[Config Files]
DB[Database Settings]
end
subgraph "Components"
SA[si_app Config]
OPQ[OPQ Config]
WORKER[Worker Config]
end
ENV --> SA
JSON --> SA
JSON --> OPQ
JSON --> WORKER
DB --> SA
subgraph "Config Types"
AZURE[Azure Credentials]
SQL[SQL Connection]
SCHED[Cron Schedules]
WORKER_SETTINGS[Worker Settings]
end
AZURE --> WORKER
SQL --> SA
SCHED --> SA
WORKER_SETTINGS --> WORKER
Monitoring
graph TB
subgraph "Monitoring Stack"
subgraph "Metrics Collection"
HEART[Worker Heartbeats]
STATUS[Job Status Updates]
PERF[Performance Metrics]
end
subgraph "Real-time Updates"
WS[WebSocket Hub]
DASH[Live Dashboard]
end
subgraph "Historical Data"
HIST_DB[(History Database)]
LOGS[Execution Logs]
end
end
HEART --> WS
STATUS --> WS
PERF --> WS
WS --> DASH
STATUS --> HIST_DB
PERF --> LOGS
subgraph "Alerting"
ERRORS[Error Tracking]
TIMEOUT[Timeout Detection]
end
ERRORS --> DASH
TIMEOUT --> DASH
Key Architectural Principles
- Microservices-Inspired: Clear separation between control plane, message broker, and workers
- Event-Driven: Asynchronous job processing via message queues
- Scalable: Horizontal scaling through stateless workers
- Fault-Tolerant: Persistent job storage and worker recovery
- Real-time: WebSocket-based live updates
- Configurable: Environment-based configuration management
Impact
- Performance: 8x faster ingestion, CPU dropped from 80-100% to below 20%
- Scalability: Handles hundreds of concurrent pipelines, not limited to SHIR 1.5 ratio
- Reliability: Zero data loss through persistent queuing
- Size: Distribution component 18 MB, agent 40 MB, query module 8 MB