Platform Overview
SkyLink Pro is a multi-user balloon simulation platform built with Flask, SQLite, and modern web technologies. The system provides authentication, organization management, asynchronous simulation processing, and comprehensive API endpoints.
Key Components
- Flask Web Application - Main server with route handling and template rendering
- Authentication System - JWT-based sessions with admin/user roles
- Background Workers - Asynchronous simulation processing with automatic startup
- SQLite Database - User management, organization grouping, and simulation history
- REST API - Comprehensive endpoints for all platform functionality
- Physics Engine - Advanced balloon simulation with validation
Authentication System
Dual Authentication Architecture
The platform uses a dual authentication system for maximum flexibility:
- JWT Authentication - For API access with Bearer tokens in Authorization headers
- Session Authentication - For web UI with secure HTTP-only cookies
- Username-based Login - Users authenticate with username (not email)
JWT Token Generation
# JWT Token Generation for API Access
def generate_token(user_id: int) -> str:
payload = {
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(days=7),
'iat': datetime.utcnow()
}
return jwt.encode(payload, settings.SECRET_KEY, algorithm='HS256')
Authentication Middleware
# API Route Protection
@require_auth
def protected_api_route():
user = g.current_user # Automatically populated by middleware
return jsonify({'user': user.username})
# Admin-only Routes
@require_admin
def admin_only_route():
# Only accessible by admin users
return jsonify({'message': 'Admin access granted'})
User Roles & Permissions
- Admin Users:
- Create and manage user accounts
- Activate/deactivate users
- Create and manage organizations
- View all simulations across organizations
- Access admin panel in UI
- Standard Users:
- Run simulations
- View personal and organization-shared history
- Access simulation analytics
Database Schema
User Model
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(255), unique=True, nullable=False, index=True)
name = Column(String(100), nullable=True)
password_hash = Column(String(255), nullable=False)
created_at = Column(DateTime, nullable=False, default=datetime.utcnow, index=True)
updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow)
last_login = Column(DateTime, nullable=True)
is_active = Column(Boolean, nullable=False, default=True)
is_admin = Column(Boolean, nullable=False, default=False)
organization_id = Column(Integer, ForeignKey('organizations.id'), nullable=True, index=True)
Organization Model
class Organization(Base):
__tablename__ = 'organizations'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False, unique=True, index=True)
description = Column(String(500), nullable=True)
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
is_active = Column(Boolean, nullable=False, default=True)
# Relationships
users = relationship("User", back_populates="organization")
Simulation Model
class Simulation(Base):
__tablename__ = 'simulations'
id = Column(String(36), primary_key=True) # UUID
user_id = Column(Integer, ForeignKey('users.id'))
simulation_type = Column(Enum(SimulationType))
name = Column(String(200))
description = Column(Text)
config = Column(JSON) # Simulation parameters
status = Column(Enum(SimulationStatus), default=SimulationStatus.PENDING)
results = Column(JSON) # Simulation outputs
created_at = Column(DateTime, default=datetime.utcnow)
completed_at = Column(DateTime)
Asynchronous Processing
Worker Architecture
Simulations are processed asynchronously by background workers that start automatically with the application:
# Automatic worker startup in app.py
def start_worker():
from workers.simulation_worker import SimulationWorker
worker = SimulationWorker()
logger.info("Starting simulation worker...")
worker.run()
worker_thread = threading.Thread(target=start_worker, daemon=True)
worker_thread.start()
Simulation Queue
The worker polls the database for pending simulations and processes them sequentially:
def get_next_job(self):
"""Get next pending simulation from queue"""
simulation = self.db.query(Simulation).filter_by(
status=SimulationStatus.PENDING
).order_by(Simulation.created_at).first()
if simulation:
simulation.status = SimulationStatus.PROCESSING
self.db.commit()
return {
'simulation_id': str(simulation.id),
'config': simulation.config
}
return None
Status Polling
The frontend polls simulation status using JavaScript intervals:
async function pollSimulationStatus(simulationId) {
const response = await fetch(`/api/simulations/${simulationId}`);
const result = await response.json();
if (result.status === 'completed') {
handleSimulationSuccess(result);
clearInterval(pollingInterval);
} else if (result.status === 'failed') {
handleSimulationError(result.error_message);
clearInterval(pollingInterval);
}
}
REST API Endpoints (v1)
All API endpoints are now under /api/v1/
and require JWT authentication for protected routes.
Authentication
POST /api/v1/auth/register
Register new user account
Requires: username, email, password
POST /api/v1/auth/login
Login and receive JWT token
Requires: username, password
Returns: user object, JWT token
POST /api/v1/auth/logout
Logout and clear session
Requires: Authorization header
GET /api/v1/auth/me
Get current user information
Requires: Authorization header
Admin Endpoints
GET /api/v1/auth/admin/users
List all users with pagination
Requires: Admin role
Query params: page, per_page, search
PUT /api/v1/auth/admin/users/{user_id}
Update user properties
Requires: Admin role
Fields: is_admin, is_active, email
DELETE /api/v1/auth/admin/users/{user_id}
Delete user account
Requires: Admin role
GET /api/v1/auth/admin/organizations
List all organizations
Requires: Admin role
POST /api/v1/auth/admin/organizations
Create new organization
Requires: Admin role
Fields: name, description (optional)
DELETE /api/v1/auth/admin/organizations/{org_id}
Delete organization
Requires: Admin role
Simulation Management
POST /api/v1/simulations/submit
Submit new simulation for processing
Requires: Authorization header
Fields: launch_lat, launch_lon, payload_mass, balloon_material, fill_volume
GET /api/v1/simulations/{id}
Get simulation status and results
Returns: status, progress, results (if completed)
GET /api/v1/simulations/
List simulations with filtering
Query params: page, per_page, status, user_id
POST /api/v1/simulations/{id}/cancel
Cancel pending simulation
Requires: Authorization header
GET /api/v1/simulations/queue/stats
Get queue statistics
Returns: queue depth, processing stats
User History & Analytics
GET /api/v1/history/user
Get user's simulation history
Requires: Authorization header
GET /api/v1/history/organization
Get organization's simulation history
Requires: Authorization header
GET /api/v1/history/stats
Get user statistics
Requires: Authorization header
Returns: total simulations, storage used, computation time
Balloon Specifications
POST /api/v1/balloon-specs
Calculate balloon specifications and lift capacity
Fields: material, volume, payload_mass, lift_gas_type
Physics Validation
Pre-Flight Checks
The system validates balloon configurations before processing to prevent impossible simulations:
def validate_buoyancy(params):
"""Validate that balloon configuration can achieve positive lift"""
fill_volume = params['fill_volume']
payload_mass = params['payload_mass']
# Calculate buoyancy vs weight
air_density = 1.225 # kg/m³ at sea level
gas_density = 0.1785 # helium density
buoyancy_force = fill_volume * air_density * 9.81
total_weight = (payload_mass + gas_mass + balloon_mass) * 9.81
net_lift = buoyancy_force - total_weight
if net_lift <= 0:
raise ValueError(f"Configuration cannot achieve lift: {net_lift:.1f}N")
Parameter Validation
- Coordinate bounds - Latitude [-90, 90], Longitude [-180, 180]
- Physical limits - Positive mass, volume, and time values
- Buoyancy check - Ensure balloon can lift payload
- Material validation - Verify balloon material exists in database
Reverse Planning System
Overview
The reverse planning feature allows users to specify a target landing zone and find optimal launch locations and times. The system performs a grid search across launch sites and time windows.
How the Algorithm Works
The reverse planning algorithm is essentially a brute-force search with intelligent optimization:
1. Search Space Definition
- Spatial Grid: Generate points at regular intervals within search radius
- Temporal Grid: Create time steps (every 3-24 hours) within window
- Configuration Space: Test different balloon parameters if specified
2. Forward Simulation Process
For each combination of (launch_location, launch_time, balloon_config):
- Initialize balloon at launch point with full atmospheric data
- Run physics simulation using Dormand-Prince integration
- Track trajectory points at each timestep
- Monitor distance to target throughout flight
3. Target Intersection Detection
# Check if trajectory passes through target zone
for i, point in enumerate(trajectory):
distance = haversine_distance(point.lat, point.lon, target_lat, target_lon)
altitude = point.altitude
if distance <= target_radius and target_alt_min <= altitude <= target_alt_max:
# Success! Record hit details
hit_info = {
'hit': True,
'hit_time': point.time,
'hit_altitude': altitude,
'closest_distance': distance
}
4. Mission Type Logic
- Launch Window: Check if balloon ever passes through target
- Target Window: Check position at specific arrival time only
Implementation
def _process_reverse_planning(self, simulation: Simulation, config: Dict):
"""Process reverse trajectory planning"""
# Generate search grid around target
launch_sites = self._generate_search_grid(
target_lat, target_lon,
config['search_radius'],
config['search_grid_spacing']
)
# Generate time steps within window
time_steps = self._generate_time_steps(
config['time_window_start'],
config['time_window_end'],
config['time_step_hours']
)
# Test each combination
for launch_site in launch_sites:
for launch_time in time_steps:
for balloon_config in config['balloon_configs']:
sim_config = {
'launch_latitude': launch_site['lat'],
'launch_longitude': launch_site['lon'],
'launch_datetime': launch_time,
**balloon_config
}
# Run forward simulation
result = run_simulation(sim_config)
# Check if trajectory passes through target zone
if self._check_target_intersection(result, target_zone):
successful_trajectories.append({
'launch_site': launch_site,
'launch_time': launch_time,
'score': self._calculate_score(result, target_zone)
})
API Endpoints
POST /api/v1/reverse/plan
Create reverse planning job
Requires: Authorization header
Fields: target_lat, target_lon, target_radius, target_altitude_min, target_altitude_max
GET /api/v1/reverse/plan/{id}/results
Get reverse planning results
Requires: Authorization header
Returns: launch_sites array with scores and optimal times
Configuration Parameters
- Target Zone - Latitude, longitude, radius, and altitude window
- Search Area - Maximum distance from target to search for launch sites
- Grid Spacing - Distance between tested launch points (affects accuracy vs runtime)
- Time Window - Start and end times for potential launches
- Time Step - Interval between tested launch times (default: 6 hours)
Reverse Planning Deep Dive
Search Algorithm Implementation
The reverse planning system performs an exhaustive search through space and time:
def _process_reverse_planning(simulation, config):
# Extract target parameters
target_lat = config['target_lat']
target_lon = config['target_lon']
target_radius = config['target_radius']
target_alt_min = config['target_altitude_min']
target_alt_max = config['target_altitude_max']
# Generate search grid
launch_sites = _generate_search_grid(
target_lat, target_lon,
config['search_radius'],
config['search_grid_spacing'],
config.get('search_shape', 'circle')
)
# Test each combination
successful_trajectories = []
for launch_site in launch_sites:
for launch_time in time_steps:
for balloon_config in config['balloon_configs']:
# Run full physics simulation
sim_config = {
'launch_lat': launch_site[0],
'launch_lon': launch_site[1],
'launch_datetime': launch_time.isoformat(),
**balloon_config
}
results_df = run_simulation(sim_config)
# Check if trajectory meets criteria
if mission_type == 'target_window':
hit_info = check_target_hit_at_time(...)
else:
hit_info = check_target_hit(...)
if hit_info['hit']:
successful_trajectories.append({
'launch_lat': launch_site[0],
'launch_lon': launch_site[1],
'launch_time': launch_time.isoformat(),
'hit_time': hit_info['hit_time'],
'hit_altitude': hit_info['hit_altitude'],
'closest_distance': hit_info['closest_distance']
})
Grid Generation Algorithms
The system supports both circular and square search patterns:
def _generate_search_grid(center_lat, center_lon, radius_m, spacing_m, shape='circle'):
# Convert to degrees
lat_deg_per_m = 1 / 111320
lon_deg_per_m = 1 / (111320 * np.cos(np.radians(center_lat)))
points = []
lat = center_lat - radius_m * lat_deg_per_m
while lat <= center_lat + radius_m * lat_deg_per_m:
lon = center_lon - radius_m * lon_deg_per_m
while lon <= center_lon + radius_m * lon_deg_per_m:
if shape == 'square':
# All points within square bounds
points.append((lat, lon))
else:
# Only points within circle
dist_lat = lat - center_lat
dist_lon = lon - center_lon
if np.sqrt(dist_lat**2 + dist_lon**2) <= radius_lat:
points.append((lat, lon))
lon += spacing_m * lon_deg_per_m
lat += spacing_m * lat_deg_per_m
return points
Queue Architecture
The dual-queue system ensures optimal resource utilization:
Queue Type |
Purpose |
Workers |
Typical Runtime |
Forward Queue |
Regular trajectory simulations |
2 workers |
10-30 seconds |
Reverse Queue |
Reverse planning searches |
1 worker |
1-40 hours |
Mission Type: Target Window
For missions requiring arrival at a specific time, the system works backward:
# User specifies:
# - Target arrival time: "2024-01-15 15:00:00 UTC"
# - Search days before: 7
# System converts to launch window:
target_dt = datetime.fromisoformat(target_arrival_time)
time_window_end = target_dt
time_window_start = target_dt - timedelta(days=search_days_before)
# During simulation, check position at exact target time:
for launch_time in time_steps:
# Calculate when balloon would be at target time
time_since_launch = (target_dt - launch_time).total_seconds()
# Check if balloon is at target at that specific moment
hit_info = check_target_hit_at_time(
trajectory, time_since_launch,
target_lat, target_lon, target_radius,
alt_min, alt_max
)
Performance Metrics
Typical simulation counts and runtimes:
Search Radius |
Grid Spacing |
Time Window |
Time Step |
Total Sims |
Est. Runtime |
100 km |
10 km |
7 days |
6 hours |
~8,792 |
~37 hours |
100 km |
20 km |
7 days |
6 hours |
~2,198 |
~9 hours |
50 km |
10 km |
7 days |
12 hours |
~1,099 |
~4.5 hours |
Error Handling & Recovery
Simulation Failures
Failed simulations are handled gracefully with error logging and user notification:
try:
results_df = run_simulation(config)
simulation.status = SimulationStatus.COMPLETED
simulation.results = results_df.to_dict('records')
except Exception as e:
simulation.status = SimulationStatus.FAILED
simulation.error_message = str(e)
logger.error(f"Simulation failed: {e}")
finally:
simulation.completed_at = datetime.utcnow()
self.db.commit()
Worker Recovery
Workers automatically handle database connection issues and continue processing:
def run(self):
"""Main worker loop with error recovery"""
while True:
try:
job = self.get_next_job()
if job:
self._process_simulation(job)
else:
time.sleep(5) # Wait for new jobs
except Exception as e:
logger.error(f"Worker error: {e}")
time.sleep(10) # Backoff on errors