SkyLink Pro System Architecture

Multi-User Balloon Simulation Platform Technical Documentation

Platform Overview

SkyLink Pro is a multi-user balloon simulation platform built with Flask, SQLite, and modern web technologies. The system provides authentication, organization management, asynchronous simulation processing, and comprehensive API endpoints.

Key Components

Authentication System

Dual Authentication Architecture

The platform uses a dual authentication system for maximum flexibility:

JWT Token Generation

# JWT Token Generation for API Access def generate_token(user_id: int) -> str: payload = { 'user_id': user_id, 'exp': datetime.utcnow() + timedelta(days=7), 'iat': datetime.utcnow() } return jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256')

Authentication Middleware

# API Route Protection @require_auth def protected_api_route(): user = g.current_user # Automatically populated by middleware return jsonify({'user': user.username}) # Admin-only Routes @require_admin def admin_only_route(): # Only accessible by admin users return jsonify({'message': 'Admin access granted'})

User Roles & Permissions

Database Schema

User Model

class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) username = Column(String(50), unique=True, nullable=False, index=True) email = Column(String(255), unique=True, nullable=False, index=True) name = Column(String(100), nullable=True) password_hash = Column(String(255), nullable=False) created_at = Column(DateTime, nullable=False, default=datetime.utcnow, index=True) updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow) last_login = Column(DateTime, nullable=True) is_active = Column(Boolean, nullable=False, default=True) is_admin = Column(Boolean, nullable=False, default=False) organization_id = Column(Integer, ForeignKey('organizations.id'), nullable=True, index=True)

Organization Model

class Organization(Base): __tablename__ = 'organizations' id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False, unique=True, index=True) description = Column(String(500), nullable=True) created_at = Column(DateTime, nullable=False, default=datetime.utcnow) is_active = Column(Boolean, nullable=False, default=True) # Relationships users = relationship("User", back_populates="organization")

Simulation Model

class Simulation(Base): __tablename__ = 'simulations' id = Column(String(36), primary_key=True) # UUID user_id = Column(Integer, ForeignKey('users.id')) simulation_type = Column(Enum(SimulationType)) name = Column(String(200)) description = Column(Text) config = Column(JSON) # Simulation parameters status = Column(Enum(SimulationStatus), default=SimulationStatus.PENDING) results = Column(JSON) # Simulation outputs created_at = Column(DateTime, default=datetime.utcnow) completed_at = Column(DateTime)

Asynchronous Processing

Worker Architecture

Simulations are processed asynchronously by background workers that start automatically with the application:

# Automatic worker startup in app.py def start_worker(): from workers.simulation_worker import SimulationWorker worker = SimulationWorker() logger.info("Starting simulation worker...") worker.run() worker_thread = threading.Thread(target=start_worker, daemon=True) worker_thread.start()

Simulation Queue

The worker polls the database for pending simulations and processes them sequentially:

def get_next_job(self): """Get next pending simulation from queue""" simulation = self.db.query(Simulation).filter_by( status=SimulationStatus.PENDING ).order_by(Simulation.created_at).first() if simulation: simulation.status = SimulationStatus.PROCESSING self.db.commit() return { 'simulation_id': str(simulation.id), 'config': simulation.config } return None

Status Polling

The frontend polls simulation status using JavaScript intervals:

async function pollSimulationStatus(simulationId) { const response = await fetch(`/api/simulations/${simulationId}`); const result = await response.json(); if (result.status === 'completed') { handleSimulationSuccess(result); clearInterval(pollingInterval); } else if (result.status === 'failed') { handleSimulationError(result.error_message); clearInterval(pollingInterval); } }

REST API Endpoints (v1)

All API endpoints are now under /api/v1/ and require JWT authentication for protected routes.

Authentication

POST /api/v1/auth/register
Register new user account
Requires: username, email, password
POST /api/v1/auth/login
Login and receive JWT token
Requires: username, password
Returns: user object, JWT token
POST /api/v1/auth/logout
Logout and clear session
Requires: Authorization header
GET /api/v1/auth/me
Get current user information
Requires: Authorization header

Admin Endpoints

GET /api/v1/auth/admin/users
List all users with pagination
Requires: Admin role
Query params: page, per_page, search
PUT /api/v1/auth/admin/users/{user_id}
Update user properties
Requires: Admin role
Fields: is_admin, is_active, email
DELETE /api/v1/auth/admin/users/{user_id}
Delete user account
Requires: Admin role
GET /api/v1/auth/admin/organizations
List all organizations
Requires: Admin role
POST /api/v1/auth/admin/organizations
Create new organization
Requires: Admin role
Fields: name, description (optional)
DELETE /api/v1/auth/admin/organizations/{org_id}
Delete organization
Requires: Admin role

Simulation Management

POST /api/v1/simulations/submit
Submit new simulation for processing
Requires: Authorization header
Fields: launch_lat, launch_lon, payload_mass, balloon_material, fill_volume
GET /api/v1/simulations/{id}
Get simulation status and results
Returns: status, progress, results (if completed)
GET /api/v1/simulations/
List simulations with filtering
Query params: page, per_page, status, user_id
POST /api/v1/simulations/{id}/cancel
Cancel pending simulation
Requires: Authorization header
GET /api/v1/simulations/queue/stats
Get queue statistics
Returns: queue depth, processing stats

User History & Analytics

GET /api/v1/history/user
Get user's simulation history
Requires: Authorization header
GET /api/v1/history/organization
Get organization's simulation history
Requires: Authorization header
GET /api/v1/history/stats
Get user statistics
Requires: Authorization header
Returns: total simulations, storage used, computation time

Balloon Specifications

POST /api/v1/balloon-specs
Calculate balloon specifications and lift capacity
Fields: material, volume, payload_mass, lift_gas_type

Physics Validation

Pre-Flight Checks

The system validates balloon configurations before processing to prevent impossible simulations:

def validate_buoyancy(params): """Validate that balloon configuration can achieve positive lift""" fill_volume = params['fill_volume'] payload_mass = params['payload_mass'] # Calculate buoyancy vs weight air_density = 1.225 # kg/m³ at sea level gas_density = 0.1785 # helium density buoyancy_force = fill_volume * air_density * 9.81 total_weight = (payload_mass + gas_mass + balloon_mass) * 9.81 net_lift = buoyancy_force - total_weight if net_lift <= 0: raise ValueError(f"Configuration cannot achieve lift: {net_lift:.1f}N")

Parameter Validation

Reverse Planning System

Overview

The reverse planning feature allows users to specify a target landing zone and find optimal launch locations and times. The system performs a grid search across launch sites and time windows.

How the Algorithm Works

The reverse planning algorithm is essentially a brute-force search with intelligent optimization:

1. Search Space Definition

2. Forward Simulation Process

For each combination of (launch_location, launch_time, balloon_config):

3. Target Intersection Detection

# Check if trajectory passes through target zone for i, point in enumerate(trajectory): distance = haversine_distance(point.lat, point.lon, target_lat, target_lon) altitude = point.altitude if distance <= target_radius and target_alt_min <= altitude <= target_alt_max: # Success! Record hit details hit_info = { 'hit': True, 'hit_time': point.time, 'hit_altitude': altitude, 'closest_distance': distance }

4. Mission Type Logic

Implementation

def _process_reverse_planning(self, simulation: Simulation, config: Dict): """Process reverse trajectory planning""" # Generate search grid around target launch_sites = self._generate_search_grid( target_lat, target_lon, config['search_radius'], config['search_grid_spacing'] ) # Generate time steps within window time_steps = self._generate_time_steps( config['time_window_start'], config['time_window_end'], config['time_step_hours'] ) # Test each combination for launch_site in launch_sites: for launch_time in time_steps: for balloon_config in config['balloon_configs']: sim_config = { 'launch_latitude': launch_site['lat'], 'launch_longitude': launch_site['lon'], 'launch_datetime': launch_time, **balloon_config } # Run forward simulation result = run_simulation(sim_config) # Check if trajectory passes through target zone if self._check_target_intersection(result, target_zone): successful_trajectories.append({ 'launch_site': launch_site, 'launch_time': launch_time, 'score': self._calculate_score(result, target_zone) })

API Endpoints

POST /api/v1/reverse/plan
Create reverse planning job
Requires: Authorization header
Fields: target_lat, target_lon, target_radius, target_altitude_min, target_altitude_max
GET /api/v1/reverse/plan/{id}/results
Get reverse planning results
Requires: Authorization header
Returns: launch_sites array with scores and optimal times

Configuration Parameters

Reverse Planning Deep Dive

Search Algorithm Implementation

The reverse planning system performs an exhaustive search through space and time:

def _process_reverse_planning(simulation, config): # Extract target parameters target_lat = config['target_lat'] target_lon = config['target_lon'] target_radius = config['target_radius'] target_alt_min = config['target_altitude_min'] target_alt_max = config['target_altitude_max'] # Generate search grid launch_sites = _generate_search_grid( target_lat, target_lon, config['search_radius'], config['search_grid_spacing'], config.get('search_shape', 'circle') ) # Test each combination successful_trajectories = [] for launch_site in launch_sites: for launch_time in time_steps: for balloon_config in config['balloon_configs']: # Run full physics simulation sim_config = { 'launch_lat': launch_site[0], 'launch_lon': launch_site[1], 'launch_datetime': launch_time.isoformat(), **balloon_config } results_df = run_simulation(sim_config) # Check if trajectory meets criteria if mission_type == 'target_window': hit_info = check_target_hit_at_time(...) else: hit_info = check_target_hit(...) if hit_info['hit']: successful_trajectories.append({ 'launch_lat': launch_site[0], 'launch_lon': launch_site[1], 'launch_time': launch_time.isoformat(), 'hit_time': hit_info['hit_time'], 'hit_altitude': hit_info['hit_altitude'], 'closest_distance': hit_info['closest_distance'] })

Grid Generation Algorithms

The system supports both circular and square search patterns:

def _generate_search_grid(center_lat, center_lon, radius_m, spacing_m, shape='circle'): # Convert to degrees lat_deg_per_m = 1 / 111320 lon_deg_per_m = 1 / (111320 * np.cos(np.radians(center_lat))) points = [] lat = center_lat - radius_m * lat_deg_per_m while lat <= center_lat + radius_m * lat_deg_per_m: lon = center_lon - radius_m * lon_deg_per_m while lon <= center_lon + radius_m * lon_deg_per_m: if shape == 'square': # All points within square bounds points.append((lat, lon)) else: # Only points within circle dist_lat = lat - center_lat dist_lon = lon - center_lon if np.sqrt(dist_lat**2 + dist_lon**2) <= radius_lat: points.append((lat, lon)) lon += spacing_m * lon_deg_per_m lat += spacing_m * lat_deg_per_m return points

Queue Architecture

The dual-queue system ensures optimal resource utilization:

Queue Type Purpose Workers Typical Runtime
Forward Queue Regular trajectory simulations 2 workers 10-30 seconds
Reverse Queue Reverse planning searches 1 worker 1-40 hours

Mission Type: Target Window

For missions requiring arrival at a specific time, the system works backward:

# User specifies: # - Target arrival time: "2024-01-15 15:00:00 UTC" # - Search days before: 7 # System converts to launch window: target_dt = datetime.fromisoformat(target_arrival_time) time_window_end = target_dt time_window_start = target_dt - timedelta(days=search_days_before) # During simulation, check position at exact target time: for launch_time in time_steps: # Calculate when balloon would be at target time time_since_launch = (target_dt - launch_time).total_seconds() # Check if balloon is at target at that specific moment hit_info = check_target_hit_at_time( trajectory, time_since_launch, target_lat, target_lon, target_radius, alt_min, alt_max )

Performance Metrics

Typical simulation counts and runtimes:

Search Radius Grid Spacing Time Window Time Step Total Sims Est. Runtime
100 km 10 km 7 days 6 hours ~8,792 ~37 hours
100 km 20 km 7 days 6 hours ~2,198 ~9 hours
50 km 10 km 7 days 12 hours ~1,099 ~4.5 hours

Error Handling & Recovery

Simulation Failures

Failed simulations are handled gracefully with error logging and user notification:

try: results_df = run_simulation(config) simulation.status = SimulationStatus.COMPLETED simulation.results = results_df.to_dict('records') except Exception as e: simulation.status = SimulationStatus.FAILED simulation.error_message = str(e) logger.error(f"Simulation failed: {e}") finally: simulation.completed_at = datetime.utcnow() self.db.commit()

Worker Recovery

Workers automatically handle database connection issues and continue processing:

def run(self): """Main worker loop with error recovery""" while True: try: job = self.get_next_job() if job: self._process_simulation(job) else: time.sleep(5) # Wait for new jobs except Exception as e: logger.error(f"Worker error: {e}") time.sleep(10) # Backoff on errors