The High-Altitude Balloon Flight Simulator is a comprehensive physics-based simulation system designed for accurate trajectory prediction and real-time tracking. The implementation integrates multiple physical models into a cohesive framework that balances accuracy, performance, and usability.
┌─────────────────┐ ┌────────────────┐ ┌─────────────────┐ │ Configuration │────▶│ Initializer │────▶│ Simulator │ └─────────────────┘ └────────────────┘ └─────────────────┘ │ │ ▼ ▼ ┌────────────────┐ ┌─────────────────┐ │ Physics Models │ │ Integrator │ └────────────────┘ └─────────────────┘ │ │ ▼ ▼ ┌────────────────┐ ┌─────────────────┐ │ Data Sources │ │ Output │ └────────────────┘ └─────────────────┘
# Current Project Structure
balloon-sim/
├── src/ # All source code
│ ├── api/v1/ # REST API endpoints
│ │ ├── middleware/ # Auth & error handling
│ │ ├── routes/ # API route handlers
│ │ └── schemas/ # Request validation
│ ├── core/ # Core business logic
│ │ ├── config/ # Configuration management
│ │ ├── models/ # Database models
│ │ └── services/ # Business services
│ ├── simulation/ # Physics engine
│ │ ├── atmosphere/ # Atmospheric models
│ │ ├── balloons/ # Balloon physics
│ │ ├── engine/ # Simulation controller
│ │ ├── physics/ # Physics calculations
│ │ └── utils/ # Common utilities
│ ├── web/ # Web application
│ │ ├── static/ # Frontend assets
│ │ ├── templates/ # HTML templates
│ │ └── app.py # Flask application
│ └── workers/ # Background processors
├── data/ # Data files
│ ├── balloon_configs/ # Balloon specifications
│ └── atmospheric_data/ # Weather data
├── scripts/ # Utility scripts
└── run_server.py # Main entry point
class BalloonSimulator:
"""Main orchestrator for balloon flight simulation"""
def __init__(self, config: SimulationConfig):
# Initialize components with dependency injection
self.atmosphere = AtmosphereModel(config.atmosphere)
self.coordinate_system = CoordinateSystem(config.geodetic)
# Create balloon based on type
self.balloon = BalloonFactory.create(
config.balloon_type,
config.balloon_params
)
# Physics models
self.thermodynamics = ThermalModel(config.thermal)
self.aerodynamics = AerodynamicsModel(config.aero)
# Numerical integrator
self.integrator = DormandPrince87(
tolerance=config.numerical.tolerance,
max_step=config.numerical.max_timestep
)
# Data sources
self.weather_data = WeatherDataManager(config.weather)
self.terrain_data = TerrainDataManager(config.terrain)
def simulate(self, initial_state: State, duration: float) -> Trajectory:
"""Run simulation from initial state"""
# Simulation implementation
pass
@dataclass
class State:
"""Complete state vector for simulation"""
# Position and motion
time: float
position: np.ndarray # ECEF coordinates [x, y, z]
velocity: np.ndarray # ECEF velocity [vx, vy, vz]
# Thermodynamic state
temperature_gas: float
temperature_film: float
pressure_gas: float
# Mass properties
mass_gas: float
mass_payload: float
mass_balloon: float
# Balloon geometry
volume: float
radius: float
# Derived properties (computed, not integrated)
altitude: float = field(init=False)
latitude: float = field(init=False)
longitude: float = field(init=False)
def __post_init__(self):
"""Compute derived properties"""
self.update_geodetic_position()
def to_integration_vector(self) -> np.ndarray:
"""Convert to vector for numerical integration"""
return np.concatenate([
self.position,
self.velocity,
[self.temperature_gas],
[self.temperature_film],
[self.mass_gas]
])
@classmethod
def from_integration_vector(cls, vec: np.ndarray, time: float,
static_params: dict) -> 'State':
"""Reconstruct state from integration vector"""
return cls(
time=time,
position=vec[0:3],
velocity=vec[3:6],
temperature_gas=vec[6],
temperature_film=vec[7],
mass_gas=vec[8],
**static_params
)
class PhysicsModel(ABC):
"""Abstract base class for physics models"""
@abstractmethod
def compute_derivatives(self, state: State,
environment: Environment) -> Derivatives:
"""Compute time derivatives of state variables"""
pass
@abstractmethod
def validate_state(self, state: State) -> bool:
"""Check if state is physically valid"""
pass
@abstractmethod
def get_diagnostic_data(self, state: State) -> dict:
"""Return diagnostic information for analysis"""
pass
class BalloonPhysics(PhysicsModel):
"""Complete balloon physics implementation"""
def __init__(self, balloon_params: BalloonParameters):
self.params = balloon_params
self.gas_model = GasThermodynamics()
self.material_model = BalloonMaterial(balloon_params.material)
def compute_derivatives(self, state: State,
environment: Environment) -> Derivatives:
# Get environmental conditions
atm = environment.atmosphere.get_conditions(state.altitude)
# Compute forces
forces = self.compute_forces(state, atm)
# Thermal dynamics
heat_rates = self.compute_heat_transfer(state, environment)
# Mass dynamics (venting/leakage)
mass_rate = self.compute_mass_flow(state, atm)
# Build derivatives
return Derivatives(
position=state.velocity,
velocity=forces.total / self.total_mass(state),
temperature_gas=heat_rates.gas / self.gas_heat_capacity(state),
temperature_film=heat_rates.film / self.film_heat_capacity(state),
mass_gas=mass_rate
)
Component | Responsibility | Key Interfaces | Configuration |
---|---|---|---|
AtmosphereModel | Atmospheric conditions | get_conditions(altitude) get_wind(position, time) |
Data source hierarchy Update frequency |
BalloonPhysics | Balloon dynamics | compute_forces() check_burst() compute_volume() |
Balloon type Material properties |
ThermalModel | Heat transfer | compute_heat_rates() solar_radiation() thermal_radiation() |
Emissivity values Heat transfer coeffs |
Integrator | Numerical integration | step() integrate() set_tolerances() |
Error tolerances Max timestep |
DataManager | External data access | fetch_weather() get_terrain() cache_management() |
API credentials Cache policy |
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │Initial State │────▶│ Integrator │────▶│ Time Step n │ └──────────────┘ └──────────────┘ └──────────────┘ │ │ ▼ ▼ ┌──────────────┐ ┌──────────────┐ │Physics Models│ │ Environment │ └──────────────┘ │ Update │ │ └──────────────┘ ▼ │ ┌──────────────┐ │ │ Derivatives │◀────────────┘ └──────────────┘ │ ▼ ┌──────────────┐ │State Update │ └──────────────┘ │ ▼ ┌──────────────┐ │ Output & │ │ Diagnostics │ └──────────────┘
class RealtimeDataManager:
"""Manages real-time data updates during simulation"""
def __init__(self, config: DataConfig):
self.weather_fetcher = WeatherDataFetcher(config.weather)
self.cache = DataCache(config.cache)
self.update_interval = config.update_interval
self.last_update = {}
async def get_current_conditions(self, position: Position,
time: datetime) -> Environment:
"""Get current environmental conditions with caching"""
# Check if update needed
cache_key = self._get_cache_key(position, time)
if self._is_cache_valid(cache_key, time):
return self.cache.get(cache_key)
# Fetch new data
weather_data = await self.weather_fetcher.fetch_async(
position, time
)
# Process and cache
environment = self._process_weather_data(weather_data)
self.cache.set(cache_key, environment, ttl=self.update_interval)
return environment
def _process_weather_data(self, raw_data: dict) -> Environment:
"""Process raw weather data into environment model"""
# Extract wind components
wind_profile = WindProfile(
altitudes=raw_data['levels']['altitude'],
u_wind=raw_data['levels']['u_wind'],
v_wind=raw_data['levels']['v_wind'],
interpolation='cubic'
)
# Temperature profile
temp_profile = TemperatureProfile(
altitudes=raw_data['levels']['altitude'],
temperatures=raw_data['levels']['temperature']
)
# Create environment
return Environment(
wind=wind_profile,
temperature=temp_profile,
pressure_surface=raw_data['surface']['pressure'],
timestamp=raw_data['timestamp']
)
class TrajectoryOutput:
"""Manages simulation output and data export"""
def __init__(self, config: OutputConfig):
self.format = config.format
self.compression = config.compression
self.include_diagnostics = config.include_diagnostics
self.output_interval = config.output_interval
self.trajectory_points = []
self.diagnostic_data = []
def add_point(self, state: State, diagnostics: dict = None):
"""Add trajectory point with optional diagnostics"""
point = TrajectoryPoint(
time=state.time,
position=state.position.copy(),
velocity=state.velocity.copy(),
altitude=state.altitude,
latitude=state.latitude,
longitude=state.longitude,
temperature_gas=state.temperature_gas,
temperature_film=state.temperature_film,
volume=state.volume,
ascent_rate=state.velocity[2]
)
self.trajectory_points.append(point)
if self.include_diagnostics and diagnostics:
self.diagnostic_data.append({
'time': state.time,
**diagnostics
})
def export(self, filename: str):
"""Export trajectory in specified format"""
if self.format == 'csv':
self._export_csv(filename)
elif self.format == 'json':
self._export_json(filename)
elif self.format == 'kml':
self._export_kml(filename)
elif self.format == 'netcdf':
self._export_netcdf(filename)
else:
raise ValueError(f"Unknown format: {self.format}")
def _export_kml(self, filename: str):
"""Export trajectory as KML for Google Earth"""
kml = simplekml.Kml()
# Flight path
path = kml.newlinestring(name="Balloon Trajectory")
path.coords = [(p.longitude, p.latitude, p.altitude)
for p in self.trajectory_points]
path.altitudemode = simplekml.AltitudeMode.absolute
path.style.linestyle.width = 3
path.style.linestyle.color = simplekml.Color.red
# Key points
launch = self.trajectory_points[0]
burst = max(self.trajectory_points, key=lambda p: p.altitude)
landing = self.trajectory_points[-1]
for point, name, icon in [
(launch, "Launch", "http://maps.google.com/mapfiles/kml/paddle/grn-circle.png"),
(burst, "Burst", "http://maps.google.com/mapfiles/kml/paddle/red-circle.png"),
(landing, "Landing", "http://maps.google.com/mapfiles/kml/paddle/blu-circle.png")
]:
pm = kml.newpoint(name=name)
pm.coords = [(point.longitude, point.latitude, point.altitude)]
pm.style.iconstyle.icon.href = icon
kml.save(filename)
@dataclass
class SimulationConfig:
"""Complete simulation configuration"""
# Balloon configuration
balloon_type: str = 'HAB-1200'
balloon_params: dict = field(default_factory=dict)
payload_mass: float = 2.0 # kg
# Launch configuration
launch_site: LaunchSite = field(default_factory=LaunchSite)
launch_time: datetime = field(default_factory=datetime.utcnow)
# Numerical configuration
numerical: NumericalConfig = field(default_factory=NumericalConfig)
# Data sources
weather_source: str = 'gfs'
terrain_enabled: bool = True
# Output configuration
output: OutputConfig = field(default_factory=OutputConfig)
@classmethod
def from_file(cls, filename: str) -> 'SimulationConfig':
"""Load configuration from YAML/JSON file"""
with open(filename, 'r') as f:
if filename.endswith('.yaml') or filename.endswith('.yml'):
data = yaml.safe_load(f)
else:
data = json.load(f)
return cls(**data)
def validate(self):
"""Validate configuration consistency"""
# Check balloon type
if self.balloon_type not in AVAILABLE_BALLOONS:
raise ValueError(f"Unknown balloon type: {self.balloon_type}")
# Validate numerical parameters
if self.numerical.max_timestep < self.numerical.min_timestep:
raise ValueError("Max timestep must be >= min timestep")
# Check data source availability
if self.weather_source not in WEATHER_SOURCES:
raise ValueError(f"Unknown weather source: {self.weather_source}")
class SimulationInitializer:
"""Initialize simulation with proper state setup"""
def __init__(self, config: SimulationConfig):
self.config = config
def create_initial_state(self) -> State:
"""Create initial state from configuration"""
# Get launch site coordinates
launch_pos = self.config.launch_site.to_ecef()
# Initialize balloon
balloon = BalloonFactory.create(
self.config.balloon_type,
self.config.balloon_params
)
# Get initial atmospheric conditions
atmosphere = AtmosphereModel()
conditions = atmosphere.get_conditions(
self.config.launch_site.altitude
)
# Calculate initial gas mass for neutral buoyancy + free lift
total_mass = (
balloon.mass_envelope +
self.config.payload_mass +
balloon.mass_fittings
)
target_lift = self.config.balloon_params.get('target_lift',
total_mass * 0.3)
gas_mass = balloon.calculate_gas_mass_for_lift(
target_lift + total_mass,
conditions['pressure'],
conditions['temperature']
)
# Create initial state
return State(
time=0.0,
position=launch_pos,
velocity=np.zeros(3),
temperature_gas=conditions['temperature'],
temperature_film=conditions['temperature'],
pressure_gas=conditions['pressure'],
mass_gas=gas_mass,
mass_payload=self.config.payload_mass,
mass_balloon=balloon.mass_envelope,
volume=balloon.calculate_volume(gas_mass,
conditions['pressure'],
conditions['temperature']),
radius=balloon.calculate_radius(volume)
)
class SimulatorFactory:
"""Factory for creating configured simulator instances"""
@staticmethod
def create_simulator(config: SimulationConfig) -> BalloonSimulator:
"""Create fully configured simulator"""
# Create data managers
weather_manager = WeatherDataManager(
source=config.weather_source,
cache_dir=config.cache_dir,
api_key=config.weather_api_key
)
terrain_manager = None
if config.terrain_enabled:
terrain_manager = TerrainDataManager(
cache_dir=config.cache_dir
)
# Create physics models
atmosphere = AtmosphereModel(
weather_data=weather_manager,
model_type=config.atmosphere_model
)
balloon = BalloonFactory.create(
config.balloon_type,
config.balloon_params
)
thermal = ThermalModel(
config.thermal_params
)
aero = AerodynamicsModel(
config.aero_params
)
# Create integrator
integrator = IntegratorFactory.create(
method=config.numerical.integration_method,
tolerance=config.numerical.tolerance,
max_step=config.numerical.max_timestep
)
# Assemble simulator
return BalloonSimulator(
atmosphere=atmosphere,
balloon=balloon,
thermal=thermal,
aerodynamics=aero,
integrator=integrator,
terrain=terrain_manager,
config=config
)
class BalloonSimulator:
"""Main simulation orchestrator"""
def simulate(self, initial_state: State,
duration: float = None,
termination_conditions: list = None) -> SimulationResult:
"""Run complete simulation"""
# Initialize
state = initial_state.copy()
trajectory = Trajectory()
trajectory.add_point(state)
# Set up termination conditions
if termination_conditions is None:
termination_conditions = [
AltitudeTermination(min_altitude=state.altitude - 100),
BurstTermination(self.balloon),
TimeoutTermination(duration or 4*3600) # 4 hours default
]
# Main integration loop
try:
while not any(tc.should_terminate(state) for tc in termination_conditions):
# Get environment at current state
environment = self.get_environment(state)
# Compute time step
dt = self.compute_timestep(state, environment)
# Integration step
new_state = self.integrate_step(state, environment, dt)
# Check for special events
events = self.check_events(state, new_state)
if events:
new_state = self.handle_events(events, new_state)
# Update state
state = new_state
# Record trajectory
if self.should_record(state):
trajectory.add_point(state)
# Progress callback
if self.progress_callback:
self.progress_callback(state, trajectory)
except SimulationError as e:
# Handle simulation errors gracefully
trajectory.set_error(str(e))
# Finalize results
return SimulationResult(
trajectory=trajectory,
termination_reason=self.get_termination_reason(state, termination_conditions),
final_state=state,
statistics=self.compute_statistics(trajectory),
metadata=self.get_metadata()
)
def integrate_step(self, state: State, environment: Environment,
dt: float) -> State:
"""Perform one integration step"""
# Define dynamics function for integrator
def dynamics(t, y):
# Reconstruct state from vector
current_state = State.from_integration_vector(
y, state.time + t, self.get_static_params(state)
)
# Compute all physics
derivatives = self.compute_derivatives(current_state, environment)
return derivatives.to_vector()
# Integrate
t_span = (0, dt)
y0 = state.to_integration_vector()
result = self.integrator.step(dynamics, t_span, y0)
# Create new state
new_state = State.from_integration_vector(
result.y,
state.time + dt,
self.get_static_params(state)
)
# Update derived quantities
new_state.update_derived_properties(self.balloon, environment)
return new_state
def compute_derivatives(self, state: State,
environment: Environment) -> Derivatives:
"""Compute all state derivatives"""
# Get atmospheric conditions
atmosphere = environment.get_conditions_at(state.position)
# Compute forces
gravity = self.compute_gravity(state)
buoyancy = self.compute_buoyancy(state, atmosphere)
drag = self.aerodynamics.compute_drag(state, atmosphere)
total_force = gravity + buoyancy + drag
# Get total mass
total_mass = (state.mass_gas + state.mass_balloon +
state.mass_payload)
# Acceleration
acceleration = total_force / total_mass
# Thermal dynamics
heat_rates = self.thermal.compute_heat_transfer(
state, environment
)
# Gas dynamics
mass_flow = self.balloon.compute_mass_flow(state, atmosphere)
# Build derivatives
return Derivatives(
position=state.velocity,
velocity=acceleration,
temperature_gas=heat_rates.gas_rate,
temperature_film=heat_rates.film_rate,
mass_gas=mass_flow
)
class EventDetector:
"""Detect special events during simulation"""
def __init__(self):
self.event_handlers = {
'burst': BurstHandler(),
'float': FloatAltitudeHandler(),
'termination': TerminationHandler()
}
def check_events(self, state_old: State, state_new: State) -> list:
"""Check for events between states"""
events = []
# Burst detection
if self.detect_burst(state_old, state_new):
events.append(Event('burst', self.find_burst_point(state_old, state_new)))
# Float altitude reached
if self.detect_float(state_old, state_new):
events.append(Event('float', state_new))
# Ground impact
if state_new.altitude <= 0:
events.append(Event('landing', self.interpolate_landing(state_old, state_new)))
return events
def detect_burst(self, state_old: State, state_new: State) -> bool:
"""Detect balloon burst"""
# Check multiple criteria
diameter_old = 2 * state_old.radius
diameter_new = 2 * state_new.radius
# Diameter exceeded burst diameter
if (diameter_old < self.burst_diameter and
diameter_new >= self.burst_diameter):
return True
# Sudden volume decrease (catastrophic failure)
if state_new.volume < 0.5 * state_old.volume:
return True
# Strain limit exceeded
strain = self.calculate_strain(state_new)
if strain > self.material.max_strain:
return True
return False
def interpolate_burst_point(self, state_old: State,
state_new: State) -> State:
"""Find exact burst point via interpolation"""
# Binary search for burst point
low, high = 0.0, 1.0
for _ in range(10): # 10 iterations gives ~0.1% accuracy
mid = (low + high) / 2
state_mid = self.interpolate_states(state_old, state_new, mid)
if self.is_burst(state_mid):
high = mid
else:
low = mid
return self.interpolate_states(state_old, state_new, high)
from flask import Flask, request, jsonify
from flask_restful import Api, Resource
app = Flask(__name__)
api = Api(app)
class SimulationResource(Resource):
"""Main simulation endpoint"""
def post(self):
"""Create new simulation"""
# Parse request
data = request.get_json()
# Validate input
try:
config = SimulationConfig(**data)
config.validate()
except ValueError as e:
return {'error': str(e)}, 400
# Create simulation
sim_id = str(uuid.uuid4())
# Run async
task = run_simulation_async.delay(sim_id, config)
return {
'simulation_id': sim_id,
'task_id': task.id,
'status': 'queued',
'links': {
'self': f'/api/simulations/{sim_id}',
'status': f'/api/simulations/{sim_id}/status',
'results': f'/api/simulations/{sim_id}/results',
'websocket': f'/ws/simulations/{sim_id}'
}
}, 202
class SimulationStatus(Resource):
"""Check simulation status"""
def get(self, sim_id):
"""Get current status"""
status = get_simulation_status(sim_id)
if not status:
return {'error': 'Simulation not found'}, 404
return {
'simulation_id': sim_id,
'status': status.state,
'progress': status.progress,
'current_altitude': status.current_altitude,
'elapsed_time': status.elapsed_time,
'estimated_remaining': status.estimated_remaining
}
# API Routes
api.add_resource(SimulationResource, '/api/simulations')
api.add_resource(SimulationStatus, '/api/simulations//status')
# WebSocket for real-time updates
from flask_socketio import SocketIO, emit, join_room
socketio = SocketIO(app, cors_allowed_origins="*")
@socketio.on('subscribe')
def handle_subscription(data):
"""Subscribe to simulation updates"""
sim_id = data['simulation_id']
join_room(sim_id)
emit('subscribed', {'simulation_id': sim_id})
def broadcast_update(sim_id, update):
"""Broadcast update to subscribed clients"""
socketio.emit('update', update, room=sim_id)
class BalloonSimulatorClient:
"""Python client for balloon simulator API"""
def __init__(self, base_url='http://localhost:5000'):
self.base_url = base_url
self.session = requests.Session()
def run_simulation(self, config: dict, wait=True,
callback=None) -> SimulationResult:
"""Run simulation with optional real-time updates"""
# Submit simulation
response = self.session.post(
f'{self.base_url}/api/simulations',
json=config
)
response.raise_for_status()
sim_data = response.json()
sim_id = sim_data['simulation_id']
if callback:
# Connect WebSocket for updates
self._connect_websocket(sim_id, callback)
if wait:
# Poll until complete
return self._wait_for_completion(sim_id)
else:
# Return immediately
return SimulationHandle(sim_id, self)
def _connect_websocket(self, sim_id, callback):
"""Connect to WebSocket for real-time updates"""
import socketio
sio = socketio.Client()
@sio.on('update')
def on_update(data):
callback(data)
sio.connect(self.base_url)
sio.emit('subscribe', {'simulation_id': sim_id})
return sio
# Inefficient
for i in range(len(positions)):
distances[i] = np.sqrt(np.sum((positions[i] - reference)**2))
# Efficient
distances = np.linalg.norm(positions - reference, axis=1)
@lru_cache(maxsize=1000)
def get_atmosphere_conditions(altitude_rounded: float):
"""Cache atmosphere lookups at 100m intervals"""
return compute_atmosphere(altitude_rounded)
def get_conditions(altitude: float):
# Round to nearest 100m for cache efficiency
altitude_rounded = round(altitude / 100) * 100
return get_atmosphere_conditions(altitude_rounded)
def run_ensemble(configs: list, n_workers=4):
"""Run multiple simulations in parallel"""
with ProcessPoolExecutor(max_workers=n_workers) as executor:
futures = []
for config in configs:
future = executor.submit(run_single_simulation, config)
futures.append(future)
results = []
for future in as_completed(futures):
results.append(future.result())
return results
Operation | Target Time | Optimization |
---|---|---|
Single timestep (1s) | < 1 ms | Vectorized operations |
Atmosphere lookup | < 0.1 ms | LRU cache |
3-hour trajectory | < 5 s | Adaptive timestep |
Weather data fetch | < 2 s | Async + caching |
100-member ensemble | < 30 s | Multiprocessing |
class MemoryEfficientTrajectory:
"""Memory-efficient trajectory storage"""
def __init__(self, decimation_factor=10):
self.decimation_factor = decimation_factor
self.key_points = [] # Launch, burst, landing
self.decimated_points = [] # Every Nth point
self.full_trajectory = None # Only if requested
self.point_count = 0
def add_point(self, state: State):
"""Add point with intelligent decimation"""
# Always keep key points
if self._is_key_point(state):
self.key_points.append(state.to_compact())
# Decimate regular points
if self.point_count % self.decimation_factor == 0:
self.decimated_points.append(state.to_compact())
self.point_count += 1
def get_full_trajectory(self):
"""Reconstruct full trajectory if needed"""
if self.full_trajectory is None:
raise ValueError("Full trajectory not stored")
return self.full_trajectory
import pytest
import numpy as np
from numpy.testing import assert_allclose
class TestBalloonPhysics:
"""Unit tests for balloon physics"""
@pytest.fixture
def balloon(self):
"""Standard test balloon"""
return LatexBalloon(
manufacturer='standard',
model='HAB-1200',
burst_diameter=8.0
)
def test_buoyancy_calculation(self, balloon):
"""Test buoyancy force calculation"""
state = State(
volume=4.5, # m³
temperature_gas=250, # K
pressure_gas=50000, # Pa
)
atmosphere = AtmosphereConditions(
pressure=50000,
temperature=250,
density=0.7
)
buoyancy = balloon.calculate_buoyancy(state, atmosphere)
# F_b = ρ_air * V * g
expected = 0.7 * 4.5 * 9.81
assert_allclose(buoyancy, expected, rtol=1e-6)
def test_volume_pressure_relationship(self, balloon):
"""Test ideal gas law implementation"""
# Known conditions
mass_gas = 0.5 # kg
pressure = 101325 # Pa
temperature = 288 # K
volume = balloon.calculate_volume(mass_gas, pressure, temperature)
# Verify with ideal gas law
R_specific = 2077 # J/(kg·K) for helium
expected_volume = mass_gas * R_specific * temperature / pressure
assert_allclose(volume, expected_volume, rtol=1e-3)
@pytest.mark.parametrize("altitude,expected_g", [
(0, 9.81),
(10000, 9.78),
(30000, 9.71),
])
def test_gravity_model(self, altitude, expected_g):
"""Test gravity variation with altitude"""
g = calculate_gravity(altitude, latitude=45.0)
assert_allclose(g, expected_g, atol=0.01)
class TestSimulationIntegration:
"""Integration tests for complete simulation"""
def test_standard_flight_profile(self):
"""Test typical flight matches expected profile"""
config = SimulationConfig(
balloon_type='HAB-1200',
payload_mass=2.0,
launch_site=LaunchSite(lat=40.0, lon=-105.0, alt=1600),
)
simulator = SimulatorFactory.create_simulator(config)
initial_state = create_initial_state(config)
result = simulator.simulate(initial_state)
# Verify key metrics
assert result.max_altitude > 25000 # meters
assert result.max_altitude < 35000
assert result.flight_time > 7000 # seconds
assert result.flight_time < 12000
assert result.range_distance < 200000 # meters
def test_burst_detection(self):
"""Test burst is detected correctly"""
# Configure for guaranteed burst
config = SimulationConfig(
balloon_type='HAB-1200',
payload_mass=0.5, # Light payload = higher burst
)
simulator = SimulatorFactory.create_simulator(config)
result = simulator.simulate(create_initial_state(config))
assert result.termination_reason == 'burst'
assert result.burst_altitude > 30000
import time
from contextlib import contextmanager
@contextmanager
def timer(name):
"""Simple performance timer"""
start = time.perf_counter()
yield
elapsed = time.perf_counter() - start
print(f"{name}: {elapsed:.3f}s")
def test_simulation_performance():
"""Benchmark simulation performance"""
config = standard_config()
simulator = create_simulator(config)
# Warm up
simulator.simulate(create_initial_state(config))
# Benchmark
n_runs = 10
times = []
for _ in range(n_runs):
with timer("Single simulation") as t:
result = simulator.simulate(create_initial_state(config))
times.append(t)
avg_time = np.mean(times)
std_time = np.std(times)
print(f"Average: {avg_time:.3f}s ± {std_time:.3f}s")
assert avg_time < 5.0 # Should complete in < 5 seconds
# Dockerfile
FROM python:3.9-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
gfortran \
libgeos-dev \
libproj-dev \
&& rm -rf /var/lib/apt/lists/*
# Set working directory
WORKDIR /app
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Create non-root user
RUN useradd -m -u 1000 simulator && \
chown -R simulator:simulator /app
USER simulator
# Configure environment
ENV PYTHONUNBUFFERED=1
ENV SIMULATOR_CONFIG=/app/config/production.yaml
# Expose ports
EXPOSE 5000 8000
# Run application
CMD ["gunicorn", "--bind", "0.0.0.0:5000", \
"--workers", "4", "--threads", "2", \
"--timeout", "300", "app:application"]
# docker-compose.yml
version: '3.8'
services:
simulator:
build: .
ports:
- "5000:5000"
environment:
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://user:pass@postgres:5432/simulator
- WEATHER_API_KEY=${WEATHER_API_KEY}
depends_on:
- redis
- postgres
volumes:
- ./data:/app/data
- ./cache:/app/cache
restart: unless-stopped
redis:
image: redis:6-alpine
volumes:
- redis_data:/data
restart: unless-stopped
postgres:
image: postgres:13
environment:
- POSTGRES_DB=simulator
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- simulator
restart: unless-stopped
volumes:
redis_data:
postgres_data:
import logging
from prometheus_client import Counter, Histogram, Gauge
# Metrics
simulation_counter = Counter('simulations_total',
'Total simulations run')
simulation_duration = Histogram('simulation_duration_seconds',
'Simulation execution time')
active_simulations = Gauge('active_simulations',
'Currently running simulations')
# Structured logging
logger = logging.getLogger(__name__)
class SimulationMonitor:
"""Monitor simulation performance and health"""
def __init__(self):
self.setup_logging()
def setup_logging(self):
"""Configure structured logging"""
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
@contextmanager
def monitor_simulation(self, sim_id):
"""Monitor single simulation execution"""
simulation_counter.inc()
active_simulations.inc()
logger.info(f"Starting simulation {sim_id}")
start_time = time.time()
try:
yield
except Exception as e:
logger.error(f"Simulation {sim_id} failed: {e}")
raise
finally:
duration = time.time() - start_time
simulation_duration.observe(duration)
active_simulations.dec()
logger.info(f"Completed simulation {sim_id} in {duration:.2f}s")