package repository import ( "context" "fmt" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/scottyah/paragliding/internal/model" ) // WeatherRepository handles database operations for weather observations type WeatherRepository struct { pool *pgxpool.Pool } // NewWeatherRepository creates a new weather repository func NewWeatherRepository(pool *pgxpool.Pool) *WeatherRepository { return &WeatherRepository{ pool: pool, } } // SaveObservations performs a bulk upsert of weather observations // Uses batch inserts for efficiency and ON CONFLICT to handle duplicates func (r *WeatherRepository) SaveObservations(ctx context.Context, observations []model.WeatherPoint) error { if len(observations) == 0 { return nil } // Use a batch for efficient bulk insert batch := &pgx.Batch{} query := ` INSERT INTO weather_observations (observed_at, wind_speed_mph, wind_direction, wind_gust_mph, source) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (observed_at, source) DO UPDATE SET wind_speed_mph = EXCLUDED.wind_speed_mph, wind_direction = EXCLUDED.wind_direction, wind_gust_mph = EXCLUDED.wind_gust_mph, created_at = NOW() ` for _, obs := range observations { // Normalize wind direction to 0-359 range windDir := obs.WindDirection for windDir < 0 { windDir += 360 } for windDir >= 360 { windDir -= 360 } batch.Queue(query, obs.Time, obs.WindSpeedMPH, windDir, obs.WindGustMPH, "open-meteo") } // Execute the batch br := r.pool.SendBatch(ctx, batch) defer br.Close() // Process all batch results to ensure they complete for i := 0; i < len(observations); i++ { _, err := br.Exec() if err != nil { return fmt.Errorf("failed to save observation %d: %w", i, err) } } return nil } // GetForecast retrieves weather observations within a time range // Results are ordered by time ascending for forecast display func (r *WeatherRepository) GetForecast(ctx context.Context, start, end time.Time) ([]model.WeatherPoint, error) { query := ` SELECT observed_at, wind_speed_mph, wind_direction, wind_gust_mph FROM weather_observations WHERE observed_at >= $1 AND observed_at <= $2 ORDER BY observed_at ASC ` rows, err := r.pool.Query(ctx, query, start, end) if err != nil { return nil, fmt.Errorf("failed to query forecast: %w", err) } defer rows.Close() var observations []model.WeatherPoint for rows.Next() { var obs model.WeatherPoint err := rows.Scan(&obs.Time, &obs.WindSpeedMPH, &obs.WindDirection, &obs.WindGustMPH) if err != nil { return nil, fmt.Errorf("failed to scan observation: %w", err) } observations = append(observations, obs) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("error iterating rows: %w", err) } return observations, nil } // GetHistorical retrieves all weather observations for a specific day // Returns data for the entire day in the system's timezone func (r *WeatherRepository) GetHistorical(ctx context.Context, date time.Time) ([]model.WeatherPoint, error) { // Get start and end of the day start := time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, date.Location()) end := start.Add(24 * time.Hour) query := ` SELECT observed_at, wind_speed_mph, wind_direction, wind_gust_mph FROM weather_observations WHERE observed_at >= $1 AND observed_at < $2 ORDER BY observed_at ASC ` rows, err := r.pool.Query(ctx, query, start, end) if err != nil { return nil, fmt.Errorf("failed to query historical data: %w", err) } defer rows.Close() var observations []model.WeatherPoint for rows.Next() { var obs model.WeatherPoint err := rows.Scan(&obs.Time, &obs.WindSpeedMPH, &obs.WindDirection, &obs.WindGustMPH) if err != nil { return nil, fmt.Errorf("failed to scan observation: %w", err) } observations = append(observations, obs) } if err := rows.Err(); err != nil { return nil, fmt.Errorf("error iterating rows: %w", err) } return observations, nil } // Close closes the database pool func (r *WeatherRepository) Close() { r.pool.Close() }