Data Processing Workflows
This document describes the end-to-end data processing workflows that keep the SG Cars Trends platform updated with the latest vehicle registration and COE bidding data.Workflow Sequence Diagram
Workflow Components
1. Cars Data Processing Workflow
Purpose: Process monthly vehicle registration data from LTA DataMall Trigger:- Scheduled: Every hour during business hours (9 AM - 6 PM, Mon-Fri)
- Manual:
POST /workflows/trigger
endpoint
- Timestamp Check: Query Redis for
last_updated:cars
to determine if processing is needed - File Download: Fetch “Monthly New Registration of Cars by Make.zip” from LTA DataMall
- Checksum Validation: Compare file checksum with previous version to detect changes
- Data Extraction: Extract and parse CSV files if changes detected
- Data Transformation:
- Clean special characters from make names
- Normalize vehicle type classifications
- Convert string numbers to integers
- Database Insert: Batch insert new records with conflict resolution
- Cache Update: Update Redis timestamp for successful processing
- Blog Generation: Create AI-powered blog post analyzing latest data
- Social Media: Announce new blog post across configured platforms
Monthly New Registration of Cars by Make.zip
Key Fields Processed:
month
: YYYY-MM formatmake
: Vehicle manufacturer (normalized)fuel_type
: Petrol, Diesel, Electric, Hybridvehicle_type
: Cars, Motorcycles, Buses, etc.number
: Registration count
2. COE Data Processing Workflow
Purpose: Process Certificate of Entitlement bidding results Trigger:- Scheduled: Every hour during business hours
- Manual:
POST /workflows/trigger
endpoint
- Timestamp Check: Query Redis for
last_updated:coe
- File Download: Fetch COE bidding results from LTA DataMall
- Checksum Validation: Detect file changes
- Data Processing: Parse COE bidding data and PQP rates
- Database Updates: Insert bidding results and premium rates
- Conditional Blog Generation:
- Generate blog posts only when both bidding exercises are complete (
bidding_no = 2
) - Provides comprehensive monthly COE analysis
- Generate blog posts only when both bidding exercises are complete (
- Social Media: Announce completed bidding results and blog posts
- COE bidding results CSV
- Prevailing Quota Premium (PQP) rates
month
: Bidding monthbidding_no
: Exercise number (1 or 2)vehicle_class
: COE categoryquota
: Available certificatesbids_received
: Total bids submittedpremium
: Winning premium (SGD)
3. Blog Generation Workflow
Purpose: Create AI-powered market insights from processed data AI Integration: Google Gemini API for content generation Content Creation Process:- Data Analysis: Analyze latest month’s data for trends and insights
- Content Generation: Use LLM to create comprehensive blog posts
- Structured Output: Generate title, content, excerpt, and metadata
- SEO Optimization: Create SEO-friendly slugs and descriptions
- Storage: Save to database with duplicate prevention
- Social Distribution: Announce new content across platforms
- Executive summaries with key insights
- Data tables for fuel type and vehicle type breakdowns
- Market trend analysis and implications
- Professional market commentary
- Reading time estimation
- AI attribution and model versioning
4. Social Media Integration Workflow
Purpose: Distribute updates across multiple social platforms Supported Platforms:- Discord: Webhook-based notifications
- LinkedIn: Business-focused updates
- Telegram: Channel messaging
- Twitter: Social media engagement
- Platform Validation: Check configuration for each enabled platform
- Parallel Publishing: Simultaneously post to all configured platforms
- Error Handling: Graceful degradation with Discord error notifications
- Result Aggregation: Collect success/failure statistics
Error Handling and Reliability
Redundancy Prevention
Checksum Validation: Files are only processed if checksums differ from previous versions Timestamp Tracking: Redis timestamps prevent duplicate processing Database Constraints: Unique key constraints prevent duplicate recordsError Recovery
Workflow State: QStash maintains workflow state and enables retries Discord Notifications: Failed workflows trigger Discord alerts with error details Graceful Degradation: Social media failures don’t block data processingMonitoring and Logging
CloudWatch Logs: Comprehensive logging for all workflow steps Structured Logging: JSON-formatted logs for easy parsing Error Tracking: Detailed error messages with contextConfiguration
Environment Variables
Core Workflow:QSTASH_TOKEN
: QStash authenticationLTA_DATAMALL_API_KEY
: LTA DataMall accessDATABASE_URL
: PostgreSQL connectionUPSTASH_REDIS_REST_*
: Redis configuration
GEMINI_API_KEY
: Google Gemini AI access
DISCORD_WEBHOOK_URL
: Error notificationsLINKEDIN_*
: LinkedIn API credentialsTELEGRAM_*
: Bot configurationTWITTER_*
: Twitter API credentials
Scheduling Configuration
Cron Schedule:*/60 0-10 * * 1-5
- Every hour from 9 AM to 6 PM
- Monday through Friday (Singapore business hours)
- Aligned with LTA DataMall update patterns
Performance Considerations
Batch Processing
- Database inserts use batch operations for efficiency
- Large datasets processed in chunks to avoid timeouts
Caching Strategy
- Redis caching for API responses
- Timestamp-based cache invalidation
- CDN caching for static content
Resource Optimization
- ARM64 Lambda functions for cost efficiency
- 120-second timeout for long-running processes
- Memory allocation optimized for CSV processing