416 lines
11 KiB
Go
416 lines
11 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/scottyah/paragliding/internal/client"
|
|
"github.com/scottyah/paragliding/internal/model"
|
|
"github.com/scottyah/paragliding/internal/repository"
|
|
)
|
|
|
|
const (
|
|
// Cache keys
|
|
cacheKeyCurrentWeather = "weather:current"
|
|
cacheKeyForecast = "weather:forecast"
|
|
cacheKeyHistorical = "weather:historical:%s"
|
|
cacheKeyLastAPIFetch = "weather:last_api_fetch"
|
|
|
|
// Cache TTLs
|
|
cacheTTLCurrent = 5 * time.Minute
|
|
cacheTTLForecast = 10 * time.Minute
|
|
cacheTTLHistorical = 24 * time.Hour
|
|
|
|
// API rate limiting - minimum time between API calls
|
|
minAPIFetchInterval = 15 * time.Minute
|
|
|
|
// Data staleness threshold - if DB data is older than this, consider fetching fresh
|
|
dataStaleThreshold = 30 * time.Minute
|
|
)
|
|
|
|
// WeatherData holds weather data with metadata
|
|
type WeatherData struct {
|
|
Points []model.WeatherPoint
|
|
FetchedAt time.Time
|
|
Source string // "cache", "database", "api"
|
|
}
|
|
|
|
// WeatherService provides weather data with DB-first access and rate-limited API fallback
|
|
type WeatherService struct {
|
|
client *client.OpenMeteoClient
|
|
repo *repository.WeatherRepository
|
|
logger *slog.Logger
|
|
|
|
// In-memory cache
|
|
cache map[string]cacheEntry
|
|
cacheMu sync.RWMutex
|
|
|
|
// API rate limiting
|
|
lastAPIFetch time.Time
|
|
lastAPIFetchMu sync.RWMutex
|
|
}
|
|
|
|
type cacheEntry struct {
|
|
data interface{}
|
|
expiresAt time.Time
|
|
}
|
|
|
|
// WeatherServiceConfig contains configuration for the weather service
|
|
type WeatherServiceConfig struct {
|
|
Client *client.OpenMeteoClient
|
|
Repo *repository.WeatherRepository
|
|
Logger *slog.Logger
|
|
}
|
|
|
|
// NewWeatherService creates a new weather service
|
|
func NewWeatherService(config WeatherServiceConfig) *WeatherService {
|
|
return &WeatherService{
|
|
client: config.Client,
|
|
repo: config.Repo,
|
|
logger: config.Logger,
|
|
cache: make(map[string]cacheEntry),
|
|
}
|
|
}
|
|
|
|
// GetCurrentWeather returns current weather conditions
|
|
// Priority: Cache → DB → API (rate-limited)
|
|
func (s *WeatherService) GetCurrentWeather(ctx context.Context) (*WeatherData, error) {
|
|
// 1. Try cache first
|
|
if cached := s.getFromCache(cacheKeyCurrentWeather); cached != nil {
|
|
if data, ok := cached.(*WeatherData); ok {
|
|
s.logger.Debug("current weather cache hit")
|
|
return data, nil
|
|
}
|
|
}
|
|
|
|
// 2. Try database
|
|
now := time.Now()
|
|
start := now.Add(-1 * time.Hour)
|
|
end := now.Add(1 * time.Hour)
|
|
|
|
points, err := s.repo.GetForecast(ctx, start, end)
|
|
if err != nil {
|
|
s.logger.Warn("failed to get current weather from DB", "error", err)
|
|
}
|
|
|
|
// Find closest point to now
|
|
if len(points) > 0 {
|
|
current := s.findClosestPoint(points, now)
|
|
|
|
// Check if data is fresh enough
|
|
if now.Sub(current.Time) < dataStaleThreshold {
|
|
data := &WeatherData{
|
|
Points: []model.WeatherPoint{current},
|
|
FetchedAt: now,
|
|
Source: "database",
|
|
}
|
|
s.setCache(cacheKeyCurrentWeather, data, cacheTTLCurrent)
|
|
s.logger.Debug("current weather from DB", "time", current.Time)
|
|
return data, nil
|
|
}
|
|
s.logger.Debug("DB data is stale", "data_time", current.Time, "threshold", dataStaleThreshold)
|
|
}
|
|
|
|
// 3. Try API (rate-limited)
|
|
if s.canFetchFromAPI() {
|
|
s.logger.Info("fetching current weather from API (DB data stale or missing)")
|
|
apiPoints, err := s.fetchAndStoreFromAPI(ctx)
|
|
if err != nil {
|
|
s.logger.Error("failed to fetch from API", "error", err)
|
|
// If we have stale DB data, return it
|
|
if len(points) > 0 {
|
|
current := s.findClosestPoint(points, now)
|
|
return &WeatherData{
|
|
Points: []model.WeatherPoint{current},
|
|
FetchedAt: now,
|
|
Source: "database (stale)",
|
|
}, nil
|
|
}
|
|
return nil, fmt.Errorf("no weather data available: %w", err)
|
|
}
|
|
|
|
current := s.findClosestPoint(apiPoints, now)
|
|
data := &WeatherData{
|
|
Points: []model.WeatherPoint{current},
|
|
FetchedAt: now,
|
|
Source: "api",
|
|
}
|
|
s.setCache(cacheKeyCurrentWeather, data, cacheTTLCurrent)
|
|
return data, nil
|
|
}
|
|
|
|
// 4. Return stale data if available, or error
|
|
if len(points) > 0 {
|
|
current := s.findClosestPoint(points, now)
|
|
s.logger.Warn("returning stale data (API rate limited)", "data_time", current.Time)
|
|
return &WeatherData{
|
|
Points: []model.WeatherPoint{current},
|
|
FetchedAt: now,
|
|
Source: "database (stale, API rate limited)",
|
|
}, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("no weather data available and API rate limited")
|
|
}
|
|
|
|
// GetForecast returns weather forecast data
|
|
// Priority: Cache → DB → API (rate-limited)
|
|
func (s *WeatherService) GetForecast(ctx context.Context) (*WeatherData, error) {
|
|
// 1. Try cache first
|
|
if cached := s.getFromCache(cacheKeyForecast); cached != nil {
|
|
if data, ok := cached.(*WeatherData); ok {
|
|
s.logger.Debug("forecast cache hit")
|
|
return data, nil
|
|
}
|
|
}
|
|
|
|
// 2. Try database
|
|
now := time.Now()
|
|
end := now.Add(7 * 24 * time.Hour) // 7 days ahead
|
|
|
|
points, err := s.repo.GetForecast(ctx, now, end)
|
|
if err != nil {
|
|
s.logger.Warn("failed to get forecast from DB", "error", err)
|
|
}
|
|
|
|
if len(points) > 0 {
|
|
// Check if we have recent enough data (at least some points in the next few hours)
|
|
hasRecentData := false
|
|
for _, p := range points {
|
|
if p.Time.After(now) && p.Time.Before(now.Add(6*time.Hour)) {
|
|
hasRecentData = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if hasRecentData {
|
|
data := &WeatherData{
|
|
Points: points,
|
|
FetchedAt: now,
|
|
Source: "database",
|
|
}
|
|
s.setCache(cacheKeyForecast, data, cacheTTLForecast)
|
|
s.logger.Debug("forecast from DB", "points", len(points))
|
|
return data, nil
|
|
}
|
|
}
|
|
|
|
// 3. Try API (rate-limited)
|
|
if s.canFetchFromAPI() {
|
|
s.logger.Info("fetching forecast from API (DB data stale or missing)")
|
|
apiPoints, err := s.fetchAndStoreFromAPI(ctx)
|
|
if err != nil {
|
|
s.logger.Error("failed to fetch from API", "error", err)
|
|
if len(points) > 0 {
|
|
return &WeatherData{
|
|
Points: points,
|
|
FetchedAt: now,
|
|
Source: "database (stale)",
|
|
}, nil
|
|
}
|
|
return nil, fmt.Errorf("no forecast data available: %w", err)
|
|
}
|
|
|
|
// Filter for future points
|
|
forecast := make([]model.WeatherPoint, 0, len(apiPoints))
|
|
for _, p := range apiPoints {
|
|
if p.Time.After(now) {
|
|
forecast = append(forecast, p)
|
|
}
|
|
}
|
|
|
|
data := &WeatherData{
|
|
Points: forecast,
|
|
FetchedAt: now,
|
|
Source: "api",
|
|
}
|
|
s.setCache(cacheKeyForecast, data, cacheTTLForecast)
|
|
return data, nil
|
|
}
|
|
|
|
// 4. Return stale data if available
|
|
if len(points) > 0 {
|
|
s.logger.Warn("returning stale forecast (API rate limited)", "points", len(points))
|
|
return &WeatherData{
|
|
Points: points,
|
|
FetchedAt: now,
|
|
Source: "database (stale, API rate limited)",
|
|
}, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("no forecast data available and API rate limited")
|
|
}
|
|
|
|
// GetHistorical returns historical weather data for a specific date
|
|
// Historical data is primarily from DB (populated by background fetcher)
|
|
func (s *WeatherService) GetHistorical(ctx context.Context, date time.Time) (*WeatherData, error) {
|
|
cacheKey := fmt.Sprintf(cacheKeyHistorical, date.Format("2006-01-02"))
|
|
|
|
// 1. Try cache first
|
|
if cached := s.getFromCache(cacheKey); cached != nil {
|
|
if data, ok := cached.(*WeatherData); ok {
|
|
s.logger.Debug("historical cache hit", "date", date.Format("2006-01-02"))
|
|
return data, nil
|
|
}
|
|
}
|
|
|
|
// 2. Get from database (historical data should always be from DB)
|
|
points, err := s.repo.GetHistorical(ctx, date)
|
|
if err != nil {
|
|
s.logger.Warn("failed to get historical data from DB", "error", err, "date", date.Format("2006-01-02"))
|
|
}
|
|
|
|
if len(points) > 0 {
|
|
data := &WeatherData{
|
|
Points: points,
|
|
FetchedAt: time.Now(),
|
|
Source: "database",
|
|
}
|
|
s.setCache(cacheKey, data, cacheTTLHistorical)
|
|
return data, nil
|
|
}
|
|
|
|
// Historical data not available - don't try API for past dates
|
|
// The background fetcher should have this data
|
|
return &WeatherData{
|
|
Points: []model.WeatherPoint{},
|
|
FetchedAt: time.Now(),
|
|
Source: "none",
|
|
}, nil
|
|
}
|
|
|
|
// GetAllPoints returns all available weather points (for assessment)
|
|
// This reads from DB/cache only, never triggers API calls
|
|
func (s *WeatherService) GetAllPoints(ctx context.Context) ([]model.WeatherPoint, error) {
|
|
// Try to get forecast data which includes recent + future points
|
|
data, err := s.GetForecast(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return data.Points, nil
|
|
}
|
|
|
|
// FetchFromAPI forces an API fetch (used by background fetcher)
|
|
// This bypasses rate limiting as it's called on a schedule
|
|
func (s *WeatherService) FetchFromAPI(ctx context.Context) ([]model.WeatherPoint, error) {
|
|
points, err := s.client.GetWeatherForecast(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch from API: %w", err)
|
|
}
|
|
|
|
// Store in database
|
|
if err := s.repo.SaveObservations(ctx, points); err != nil {
|
|
s.logger.Error("failed to save observations to DB", "error", err)
|
|
// Continue anyway - return the data
|
|
}
|
|
|
|
// Update last fetch time
|
|
s.lastAPIFetchMu.Lock()
|
|
s.lastAPIFetch = time.Now()
|
|
s.lastAPIFetchMu.Unlock()
|
|
|
|
// Clear caches so next request gets fresh data
|
|
s.clearCache()
|
|
|
|
s.logger.Info("API fetch complete", "points", len(points))
|
|
return points, nil
|
|
}
|
|
|
|
// canFetchFromAPI checks if enough time has passed since last API fetch
|
|
func (s *WeatherService) canFetchFromAPI() bool {
|
|
s.lastAPIFetchMu.RLock()
|
|
defer s.lastAPIFetchMu.RUnlock()
|
|
|
|
if s.lastAPIFetch.IsZero() {
|
|
return true
|
|
}
|
|
return time.Since(s.lastAPIFetch) >= minAPIFetchInterval
|
|
}
|
|
|
|
// fetchAndStoreFromAPI fetches from API and stores in DB
|
|
func (s *WeatherService) fetchAndStoreFromAPI(ctx context.Context) ([]model.WeatherPoint, error) {
|
|
points, err := s.client.GetWeatherForecast(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch from API: %w", err)
|
|
}
|
|
|
|
// Store in database
|
|
if err := s.repo.SaveObservations(ctx, points); err != nil {
|
|
s.logger.Error("failed to save observations to DB", "error", err)
|
|
// Continue anyway
|
|
}
|
|
|
|
// Update last fetch time
|
|
s.lastAPIFetchMu.Lock()
|
|
s.lastAPIFetch = time.Now()
|
|
s.lastAPIFetchMu.Unlock()
|
|
|
|
return points, nil
|
|
}
|
|
|
|
// findClosestPoint finds the weather point closest to the target time
|
|
func (s *WeatherService) findClosestPoint(points []model.WeatherPoint, target time.Time) model.WeatherPoint {
|
|
if len(points) == 0 {
|
|
return model.WeatherPoint{}
|
|
}
|
|
|
|
closest := points[0]
|
|
minDiff := absDuration(points[0].Time.Sub(target))
|
|
|
|
for _, point := range points[1:] {
|
|
diff := absDuration(point.Time.Sub(target))
|
|
if diff < minDiff {
|
|
minDiff = diff
|
|
closest = point
|
|
}
|
|
}
|
|
|
|
return closest
|
|
}
|
|
|
|
// Cache helpers
|
|
|
|
func (s *WeatherService) getFromCache(key string) interface{} {
|
|
s.cacheMu.RLock()
|
|
defer s.cacheMu.RUnlock()
|
|
|
|
entry, exists := s.cache[key]
|
|
if !exists {
|
|
return nil
|
|
}
|
|
|
|
if time.Now().After(entry.expiresAt) {
|
|
return nil
|
|
}
|
|
|
|
return entry.data
|
|
}
|
|
|
|
func (s *WeatherService) setCache(key string, data interface{}, ttl time.Duration) {
|
|
s.cacheMu.Lock()
|
|
defer s.cacheMu.Unlock()
|
|
|
|
s.cache[key] = cacheEntry{
|
|
data: data,
|
|
expiresAt: time.Now().Add(ttl),
|
|
}
|
|
}
|
|
|
|
func (s *WeatherService) clearCache() {
|
|
s.cacheMu.Lock()
|
|
defer s.cacheMu.Unlock()
|
|
|
|
s.cache = make(map[string]cacheEntry)
|
|
}
|
|
|
|
// absDuration returns the absolute value of a duration
|
|
func absDuration(d time.Duration) time.Duration {
|
|
if d < 0 {
|
|
return -d
|
|
}
|
|
return d
|
|
}
|