Files
2026-01-03 14:16:16 -08:00

416 lines
11 KiB
Go

package service
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/scottyah/paragliding/internal/client"
"github.com/scottyah/paragliding/internal/model"
"github.com/scottyah/paragliding/internal/repository"
)
const (
// Cache keys
cacheKeyCurrentWeather = "weather:current"
cacheKeyForecast = "weather:forecast"
cacheKeyHistorical = "weather:historical:%s"
cacheKeyLastAPIFetch = "weather:last_api_fetch"
// Cache TTLs
cacheTTLCurrent = 5 * time.Minute
cacheTTLForecast = 10 * time.Minute
cacheTTLHistorical = 24 * time.Hour
// API rate limiting - minimum time between API calls
minAPIFetchInterval = 15 * time.Minute
// Data staleness threshold - if DB data is older than this, consider fetching fresh
dataStaleThreshold = 30 * time.Minute
)
// WeatherData holds weather data with metadata
type WeatherData struct {
Points []model.WeatherPoint
FetchedAt time.Time
Source string // "cache", "database", "api"
}
// WeatherService provides weather data with DB-first access and rate-limited API fallback
type WeatherService struct {
client *client.OpenMeteoClient
repo *repository.WeatherRepository
logger *slog.Logger
// In-memory cache
cache map[string]cacheEntry
cacheMu sync.RWMutex
// API rate limiting
lastAPIFetch time.Time
lastAPIFetchMu sync.RWMutex
}
type cacheEntry struct {
data interface{}
expiresAt time.Time
}
// WeatherServiceConfig contains configuration for the weather service
type WeatherServiceConfig struct {
Client *client.OpenMeteoClient
Repo *repository.WeatherRepository
Logger *slog.Logger
}
// NewWeatherService creates a new weather service
func NewWeatherService(config WeatherServiceConfig) *WeatherService {
return &WeatherService{
client: config.Client,
repo: config.Repo,
logger: config.Logger,
cache: make(map[string]cacheEntry),
}
}
// GetCurrentWeather returns current weather conditions
// Priority: Cache → DB → API (rate-limited)
func (s *WeatherService) GetCurrentWeather(ctx context.Context) (*WeatherData, error) {
// 1. Try cache first
if cached := s.getFromCache(cacheKeyCurrentWeather); cached != nil {
if data, ok := cached.(*WeatherData); ok {
s.logger.Debug("current weather cache hit")
return data, nil
}
}
// 2. Try database
now := time.Now()
start := now.Add(-1 * time.Hour)
end := now.Add(1 * time.Hour)
points, err := s.repo.GetForecast(ctx, start, end)
if err != nil {
s.logger.Warn("failed to get current weather from DB", "error", err)
}
// Find closest point to now
if len(points) > 0 {
current := s.findClosestPoint(points, now)
// Check if data is fresh enough
if now.Sub(current.Time) < dataStaleThreshold {
data := &WeatherData{
Points: []model.WeatherPoint{current},
FetchedAt: now,
Source: "database",
}
s.setCache(cacheKeyCurrentWeather, data, cacheTTLCurrent)
s.logger.Debug("current weather from DB", "time", current.Time)
return data, nil
}
s.logger.Debug("DB data is stale", "data_time", current.Time, "threshold", dataStaleThreshold)
}
// 3. Try API (rate-limited)
if s.canFetchFromAPI() {
s.logger.Info("fetching current weather from API (DB data stale or missing)")
apiPoints, err := s.fetchAndStoreFromAPI(ctx)
if err != nil {
s.logger.Error("failed to fetch from API", "error", err)
// If we have stale DB data, return it
if len(points) > 0 {
current := s.findClosestPoint(points, now)
return &WeatherData{
Points: []model.WeatherPoint{current},
FetchedAt: now,
Source: "database (stale)",
}, nil
}
return nil, fmt.Errorf("no weather data available: %w", err)
}
current := s.findClosestPoint(apiPoints, now)
data := &WeatherData{
Points: []model.WeatherPoint{current},
FetchedAt: now,
Source: "api",
}
s.setCache(cacheKeyCurrentWeather, data, cacheTTLCurrent)
return data, nil
}
// 4. Return stale data if available, or error
if len(points) > 0 {
current := s.findClosestPoint(points, now)
s.logger.Warn("returning stale data (API rate limited)", "data_time", current.Time)
return &WeatherData{
Points: []model.WeatherPoint{current},
FetchedAt: now,
Source: "database (stale, API rate limited)",
}, nil
}
return nil, fmt.Errorf("no weather data available and API rate limited")
}
// GetForecast returns weather forecast data
// Priority: Cache → DB → API (rate-limited)
func (s *WeatherService) GetForecast(ctx context.Context) (*WeatherData, error) {
// 1. Try cache first
if cached := s.getFromCache(cacheKeyForecast); cached != nil {
if data, ok := cached.(*WeatherData); ok {
s.logger.Debug("forecast cache hit")
return data, nil
}
}
// 2. Try database
now := time.Now()
end := now.Add(7 * 24 * time.Hour) // 7 days ahead
points, err := s.repo.GetForecast(ctx, now, end)
if err != nil {
s.logger.Warn("failed to get forecast from DB", "error", err)
}
if len(points) > 0 {
// Check if we have recent enough data (at least some points in the next few hours)
hasRecentData := false
for _, p := range points {
if p.Time.After(now) && p.Time.Before(now.Add(6*time.Hour)) {
hasRecentData = true
break
}
}
if hasRecentData {
data := &WeatherData{
Points: points,
FetchedAt: now,
Source: "database",
}
s.setCache(cacheKeyForecast, data, cacheTTLForecast)
s.logger.Debug("forecast from DB", "points", len(points))
return data, nil
}
}
// 3. Try API (rate-limited)
if s.canFetchFromAPI() {
s.logger.Info("fetching forecast from API (DB data stale or missing)")
apiPoints, err := s.fetchAndStoreFromAPI(ctx)
if err != nil {
s.logger.Error("failed to fetch from API", "error", err)
if len(points) > 0 {
return &WeatherData{
Points: points,
FetchedAt: now,
Source: "database (stale)",
}, nil
}
return nil, fmt.Errorf("no forecast data available: %w", err)
}
// Filter for future points
forecast := make([]model.WeatherPoint, 0, len(apiPoints))
for _, p := range apiPoints {
if p.Time.After(now) {
forecast = append(forecast, p)
}
}
data := &WeatherData{
Points: forecast,
FetchedAt: now,
Source: "api",
}
s.setCache(cacheKeyForecast, data, cacheTTLForecast)
return data, nil
}
// 4. Return stale data if available
if len(points) > 0 {
s.logger.Warn("returning stale forecast (API rate limited)", "points", len(points))
return &WeatherData{
Points: points,
FetchedAt: now,
Source: "database (stale, API rate limited)",
}, nil
}
return nil, fmt.Errorf("no forecast data available and API rate limited")
}
// GetHistorical returns historical weather data for a specific date
// Historical data is primarily from DB (populated by background fetcher)
func (s *WeatherService) GetHistorical(ctx context.Context, date time.Time) (*WeatherData, error) {
cacheKey := fmt.Sprintf(cacheKeyHistorical, date.Format("2006-01-02"))
// 1. Try cache first
if cached := s.getFromCache(cacheKey); cached != nil {
if data, ok := cached.(*WeatherData); ok {
s.logger.Debug("historical cache hit", "date", date.Format("2006-01-02"))
return data, nil
}
}
// 2. Get from database (historical data should always be from DB)
points, err := s.repo.GetHistorical(ctx, date)
if err != nil {
s.logger.Warn("failed to get historical data from DB", "error", err, "date", date.Format("2006-01-02"))
}
if len(points) > 0 {
data := &WeatherData{
Points: points,
FetchedAt: time.Now(),
Source: "database",
}
s.setCache(cacheKey, data, cacheTTLHistorical)
return data, nil
}
// Historical data not available - don't try API for past dates
// The background fetcher should have this data
return &WeatherData{
Points: []model.WeatherPoint{},
FetchedAt: time.Now(),
Source: "none",
}, nil
}
// GetAllPoints returns all available weather points (for assessment)
// This reads from DB/cache only, never triggers API calls
func (s *WeatherService) GetAllPoints(ctx context.Context) ([]model.WeatherPoint, error) {
// Try to get forecast data which includes recent + future points
data, err := s.GetForecast(ctx)
if err != nil {
return nil, err
}
return data.Points, nil
}
// FetchFromAPI forces an API fetch (used by background fetcher)
// This bypasses rate limiting as it's called on a schedule
func (s *WeatherService) FetchFromAPI(ctx context.Context) ([]model.WeatherPoint, error) {
points, err := s.client.GetWeatherForecast(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch from API: %w", err)
}
// Store in database
if err := s.repo.SaveObservations(ctx, points); err != nil {
s.logger.Error("failed to save observations to DB", "error", err)
// Continue anyway - return the data
}
// Update last fetch time
s.lastAPIFetchMu.Lock()
s.lastAPIFetch = time.Now()
s.lastAPIFetchMu.Unlock()
// Clear caches so next request gets fresh data
s.clearCache()
s.logger.Info("API fetch complete", "points", len(points))
return points, nil
}
// canFetchFromAPI checks if enough time has passed since last API fetch
func (s *WeatherService) canFetchFromAPI() bool {
s.lastAPIFetchMu.RLock()
defer s.lastAPIFetchMu.RUnlock()
if s.lastAPIFetch.IsZero() {
return true
}
return time.Since(s.lastAPIFetch) >= minAPIFetchInterval
}
// fetchAndStoreFromAPI fetches from API and stores in DB
func (s *WeatherService) fetchAndStoreFromAPI(ctx context.Context) ([]model.WeatherPoint, error) {
points, err := s.client.GetWeatherForecast(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch from API: %w", err)
}
// Store in database
if err := s.repo.SaveObservations(ctx, points); err != nil {
s.logger.Error("failed to save observations to DB", "error", err)
// Continue anyway
}
// Update last fetch time
s.lastAPIFetchMu.Lock()
s.lastAPIFetch = time.Now()
s.lastAPIFetchMu.Unlock()
return points, nil
}
// findClosestPoint finds the weather point closest to the target time
func (s *WeatherService) findClosestPoint(points []model.WeatherPoint, target time.Time) model.WeatherPoint {
if len(points) == 0 {
return model.WeatherPoint{}
}
closest := points[0]
minDiff := absDuration(points[0].Time.Sub(target))
for _, point := range points[1:] {
diff := absDuration(point.Time.Sub(target))
if diff < minDiff {
minDiff = diff
closest = point
}
}
return closest
}
// Cache helpers
func (s *WeatherService) getFromCache(key string) interface{} {
s.cacheMu.RLock()
defer s.cacheMu.RUnlock()
entry, exists := s.cache[key]
if !exists {
return nil
}
if time.Now().After(entry.expiresAt) {
return nil
}
return entry.data
}
func (s *WeatherService) setCache(key string, data interface{}, ttl time.Duration) {
s.cacheMu.Lock()
defer s.cacheMu.Unlock()
s.cache[key] = cacheEntry{
data: data,
expiresAt: time.Now().Add(ttl),
}
}
func (s *WeatherService) clearCache() {
s.cacheMu.Lock()
defer s.cacheMu.Unlock()
s.cache = make(map[string]cacheEntry)
}
// absDuration returns the absolute value of a duration
func absDuration(d time.Duration) time.Duration {
if d < 0 {
return -d
}
return d
}