pacsea/sources/feeds/
rate_limit.rs

1//! Rate limiting and circuit breaker for network requests.
2use std::collections::HashMap;
3use std::sync::{LazyLock, Mutex};
4use std::time::{Duration, Instant};
5
6use rand::Rng;
7use tracing::{debug, warn};
8
9use super::Result;
10
11/// Rate limiter for news feed network requests.
12/// Tracks the last request time to enforce minimum delay between requests.
13static RATE_LIMITER: LazyLock<Mutex<Instant>> = LazyLock::new(|| Mutex::new(Instant::now()));
14/// Minimum delay between news feed network requests (500ms).
15const RATE_LIMIT_DELAY_MS: u64 = 500;
16
17/// Rate limiter state for archlinux.org with exponential backoff.
18struct ArchLinuxRateLimiter {
19    /// Last request timestamp.
20    last_request: Instant,
21    /// Current backoff delay in milliseconds (starts at base delay, increases exponentially).
22    current_backoff_ms: u64,
23    /// Number of consecutive failures/rate limits.
24    consecutive_failures: u32,
25}
26
27/// Rate limiter for archlinux.org requests with exponential backoff.
28/// Tracks last request time and implements progressive delays on failures.
29static ARCHLINUX_RATE_LIMITER: LazyLock<Mutex<ArchLinuxRateLimiter>> = LazyLock::new(|| {
30    Mutex::new(ArchLinuxRateLimiter {
31        last_request: Instant::now(),
32        current_backoff_ms: 500, // Start with 500ms base delay (reduced from 2s for faster initial requests)
33        consecutive_failures: 0,
34    })
35});
36
37/// Semaphore to serialize archlinux.org requests (only 1 concurrent request allowed).
38/// This prevents multiple async tasks from overwhelming the server even when rate limiting
39/// is applied, because the rate limiter alone doesn't prevent concurrent requests that
40/// start at nearly the same time from all proceeding simultaneously.
41static ARCHLINUX_REQUEST_SEMAPHORE: LazyLock<std::sync::Arc<tokio::sync::Semaphore>> =
42    LazyLock::new(|| std::sync::Arc::new(tokio::sync::Semaphore::new(1)));
43
44/// Base delay for archlinux.org requests (2 seconds).
45const ARCHLINUX_BASE_DELAY_MS: u64 = 500; // Reduced from 2000ms for faster initial requests
46/// Maximum backoff delay (60 seconds).
47const ARCHLINUX_MAX_BACKOFF_MS: u64 = 60000;
48
49/// Circuit breaker state for tracking failures per endpoint type.
50#[derive(Debug, Clone)]
51enum CircuitState {
52    /// Circuit is closed - normal operation.
53    Closed,
54    /// Circuit is open - blocking requests due to failures.
55    Open {
56        /// Timestamp when circuit was opened (for cooldown calculation).
57        opened_at: Instant,
58    },
59    /// Circuit is half-open - allowing one test request.
60    HalfOpen,
61}
62
63/// Circuit breaker state tracking failures per endpoint pattern.
64struct CircuitBreakerState {
65    /// Current circuit state.
66    state: CircuitState,
67    /// Recent request outcomes (true = success, false = failure).
68    /// Tracks last 10 requests to calculate failure rate.
69    recent_outcomes: Vec<bool>,
70    /// Endpoint pattern this breaker tracks (e.g., "/feeds/news/", "/packages/*/json/").
71    /// Stored for debugging/logging purposes.
72    #[allow(dead_code)]
73    endpoint_pattern: String,
74}
75
76/// Circuit breakers per endpoint pattern.
77/// Key: endpoint pattern (e.g., "/feeds/news/", "/packages/*/json/")
78static CIRCUIT_BREAKERS: LazyLock<Mutex<HashMap<String, CircuitBreakerState>>> =
79    LazyLock::new(|| Mutex::new(HashMap::new()));
80
81/// Maximum number of recent outcomes to track for failure rate calculation.
82const CIRCUIT_BREAKER_HISTORY_SIZE: usize = 10;
83/// Failure rate threshold to open circuit (50% = 5 failures out of 10).
84/// Used in calculation: `failure_count * 2 >= total_count` (equivalent to `>= 0.5`).
85#[allow(dead_code)]
86const CIRCUIT_BREAKER_FAILURE_THRESHOLD: f64 = 0.5;
87/// Cooldown period before transitioning from `Open` to `HalfOpen` (60 seconds).
88const CIRCUIT_BREAKER_COOLDOWN_SECS: u64 = 60;
89
90/// Flag indicating a network error occurred during the last news fetch.
91/// This can be checked by the UI to show a toast message.
92static NETWORK_ERROR_FLAG: std::sync::atomic::AtomicBool =
93    std::sync::atomic::AtomicBool::new(false);
94
95/// What: Check and clear the network error flag.
96///
97/// Inputs: None
98///
99/// Output: `true` if a network error occurred since the last check, `false` otherwise.
100///
101/// Details:
102/// - Atomically loads and clears the flag.
103/// - Used by the UI to show a toast when news fetch had network issues.
104#[must_use]
105pub fn take_network_error() -> bool {
106    NETWORK_ERROR_FLAG.swap(false, std::sync::atomic::Ordering::SeqCst)
107}
108
109/// What: Set the network error flag.
110///
111/// Inputs: None
112///
113/// Output: None
114///
115/// Details:
116/// - Called when a network error occurs during news fetching.
117pub(super) fn set_network_error() {
118    NETWORK_ERROR_FLAG.store(true, std::sync::atomic::Ordering::SeqCst);
119}
120
121/// What: Retry a network operation with exponential backoff on failure.
122///
123/// Inputs:
124/// - `operation`: Async closure that returns a Result
125/// - `max_retries`: Maximum number of retry attempts
126///
127/// Output:
128/// - Result from the operation, or error if all retries fail
129///
130/// Details:
131/// - On failure, waits with exponential backoff: 1s, 2s, 4s...
132/// - Stops retrying after `max_retries` attempts
133pub(super) async fn retry_with_backoff<T, E, F, Fut>(
134    mut operation: F,
135    max_retries: usize,
136) -> std::result::Result<T, E>
137where
138    F: FnMut() -> Fut,
139    Fut: std::future::Future<Output = std::result::Result<T, E>>,
140{
141    let mut attempt = 0;
142    loop {
143        match operation().await {
144            Ok(result) => return Ok(result),
145            Err(e) => {
146                if attempt >= max_retries {
147                    return Err(e);
148                }
149                attempt += 1;
150                let backoff_secs = 1u64 << (attempt - 1); // Exponential: 1, 2, 4, 8...
151                warn!(
152                    attempt,
153                    max_retries,
154                    backoff_secs,
155                    "network request failed, retrying with exponential backoff"
156                );
157                tokio::time::sleep(Duration::from_secs(backoff_secs)).await;
158            }
159        }
160    }
161}
162
163/// What: Apply rate limiting before making a network request.
164///
165/// Inputs: None
166///
167/// Output: None (async sleep if needed)
168///
169/// Details:
170/// - Ensures minimum delay between network requests to avoid overwhelming servers.
171/// - Thread-safe via mutex guarding the last request timestamp.
172pub(super) async fn rate_limit() {
173    let delay_needed = {
174        let mut last_request = match RATE_LIMITER.lock() {
175            Ok(guard) => guard,
176            Err(poisoned) => poisoned.into_inner(),
177        };
178        let elapsed = last_request.elapsed();
179        let min_delay = Duration::from_millis(RATE_LIMIT_DELAY_MS);
180        let delay = if elapsed < min_delay {
181            // Safe to unwrap because we checked elapsed < min_delay above
182            #[allow(clippy::unwrap_used)]
183            min_delay.checked_sub(elapsed).unwrap()
184        } else {
185            Duration::ZERO
186        };
187        *last_request = Instant::now();
188        delay
189    };
190    if !delay_needed.is_zero() {
191        tokio::time::sleep(delay_needed).await;
192    }
193}
194
195/// Maximum jitter in milliseconds to add to rate limiting delays (prevents thundering herd).
196const JITTER_MAX_MS: u64 = 500;
197
198/// What: Apply rate limiting specifically for archlinux.org requests with exponential backoff.
199///
200/// Inputs: None
201///
202/// Output: `OwnedSemaphorePermit` that the caller MUST hold during the request.
203///
204/// # Panics
205/// - Panics if the archlinux.org request semaphore is closed (should never happen in practice).
206///
207/// Details:
208/// - Acquires a semaphore permit to serialize archlinux.org requests (only 1 at a time).
209/// - Uses longer base delay (2 seconds) for archlinux.org to reduce request frequency.
210/// - Implements exponential backoff: increases delay on consecutive failures (2s → 4s → 8s → 16s, max 60s).
211/// - Adds random jitter (0-500ms) to prevent thundering herd when multiple clients retry simultaneously.
212/// - Resets backoff after successful requests.
213/// - Thread-safe via mutex guarding the rate limiter state.
214/// - The returned permit MUST be held until the HTTP request completes to ensure serialization.
215/// - If the permit is dropped before the HTTP request completes, another request may start concurrently,
216///   defeating the serialization and potentially causing race conditions or overwhelming the server.
217pub async fn rate_limit_archlinux() -> tokio::sync::OwnedSemaphorePermit {
218    // 1. Acquire semaphore to serialize requests (waits if another request is in progress)
219    // This is the key change - ensures only one archlinux.org request at a time
220    let permit = ARCHLINUX_REQUEST_SEMAPHORE
221        .clone()
222        .acquire_owned()
223        .await
224        // Semaphore is never closed, so this cannot fail in practice
225        .expect("archlinux.org request semaphore should never be closed");
226
227    // 2. Now that we have exclusive access, compute and apply the rate limiting delay
228    let delay_needed = {
229        let mut limiter = match ARCHLINUX_RATE_LIMITER.lock() {
230            Ok(guard) => guard,
231            Err(poisoned) => poisoned.into_inner(),
232        };
233        let elapsed = limiter.last_request.elapsed();
234        let min_delay = Duration::from_millis(limiter.current_backoff_ms);
235        let delay = if elapsed < min_delay {
236            // Safe to unwrap because we checked elapsed < min_delay above
237            #[allow(clippy::unwrap_used)]
238            min_delay.checked_sub(elapsed).unwrap()
239        } else {
240            Duration::ZERO
241        };
242        limiter.last_request = Instant::now();
243        delay
244    };
245
246    if !delay_needed.is_zero() {
247        // Add random jitter to prevent thundering herd when multiple clients retry simultaneously
248        let jitter_ms = rand::rng().random_range(0..=JITTER_MAX_MS);
249        let delay_with_jitter = delay_needed + Duration::from_millis(jitter_ms);
250        // Safe to unwrap: delay_ms will be small (max 60s = 60000ms, well within u64)
251        #[allow(clippy::cast_possible_truncation)]
252        let delay_ms = delay_needed.as_millis() as u64;
253        debug!(
254            delay_ms,
255            jitter_ms,
256            total_ms = delay_with_jitter.as_millis(),
257            "rate limiting archlinux.org request with jitter"
258        );
259        tokio::time::sleep(delay_with_jitter).await;
260    }
261
262    // 3. Return the permit - caller MUST hold it during the request
263    permit
264}
265
266/// What: Extract endpoint pattern from URL for circuit breaker tracking.
267///
268/// Inputs:
269/// - `url`: Full URL to extract pattern from
270///
271/// Output:
272/// - Endpoint pattern string (e.g., "/feeds/news/", "/packages/*/json/")
273///
274/// Details:
275/// - Normalizes URLs to endpoint patterns for grouping similar requests.
276/// - Replaces specific package names with "*" for JSON endpoints.
277#[must_use]
278pub fn extract_endpoint_pattern(url: &str) -> String {
279    // Extract path from URL
280    if let Some(path_start) = url.find("://")
281        && let Some(path_pos) = url[path_start + 3..].find('/')
282    {
283        let path = &url[path_start + 3 + path_pos..];
284        // Normalize package-specific endpoints
285        if path.contains("/packages/") && path.contains("/json/") {
286            // Pattern: /packages/{repo}/{arch}/{name}/json/ -> /packages/*/json/
287            if let Some(json_pos) = path.find("/json/") {
288                let base = &path[..json_pos];
289                if let Some(last_slash) = base.rfind('/') {
290                    return format!("{}/*/json/", &base[..=last_slash]);
291                }
292            }
293        }
294        // For feeds, use the full path
295        if path.starts_with("/feeds/") {
296            return path.to_string();
297        }
298        // For news articles, use /news/ pattern
299        if path.contains("/news/")
300            && !path.ends_with('/')
301            && let Some(news_pos) = path.find("/news/")
302        {
303            return format!("{}/*", &path[..news_pos + "/news/".len()]);
304        }
305        return path.to_string();
306    }
307    url.to_string()
308}
309
310/// What: Check circuit breaker state before making a request.
311///
312/// Inputs:
313/// - `endpoint_pattern`: Endpoint pattern to check circuit breaker for
314///
315/// Output:
316/// - `Ok(())` if request should proceed, `Err` with cached error if circuit is open
317///
318/// # Errors
319/// - Returns `Err` if the circuit breaker is open and cooldown period has not expired.
320///
321/// Details:
322/// - Returns error immediately if circuit is Open and cooldown not expired.
323/// - Allows request if circuit is Closed or `HalfOpen`.
324/// - Automatically transitions `Open` → `HalfOpen` after cooldown period.
325#[allow(clippy::significant_drop_tightening)]
326pub fn check_circuit_breaker(endpoint_pattern: &str) -> Result<()> {
327    // MutexGuard must be held for entire function to modify breaker state
328    let mut breakers = match CIRCUIT_BREAKERS.lock() {
329        Ok(guard) => guard,
330        Err(poisoned) => poisoned.into_inner(),
331    };
332
333    let breaker = breakers
334        .entry(endpoint_pattern.to_string())
335        .or_insert_with(|| CircuitBreakerState {
336            state: CircuitState::Closed,
337            recent_outcomes: Vec::new(),
338            endpoint_pattern: endpoint_pattern.to_string(),
339        });
340
341    match &breaker.state {
342        CircuitState::Open { opened_at } => {
343            let elapsed = opened_at.elapsed();
344            if elapsed.as_secs() >= CIRCUIT_BREAKER_COOLDOWN_SECS {
345                // Transition to HalfOpen after cooldown
346                breaker.state = CircuitState::HalfOpen;
347                debug!(
348                    endpoint_pattern,
349                    "circuit breaker transitioning Open → HalfOpen after cooldown"
350                );
351                Ok(())
352            } else {
353                // Still in cooldown, block request
354                let remaining = CIRCUIT_BREAKER_COOLDOWN_SECS - elapsed.as_secs();
355                warn!(
356                    endpoint_pattern,
357                    remaining_secs = remaining,
358                    "circuit breaker is Open, blocking request"
359                );
360                Err(format!(
361                    "Circuit breaker is Open for {endpoint_pattern} (cooldown: {remaining}s remaining)"
362                )
363                .into())
364            }
365        }
366        CircuitState::HalfOpen | CircuitState::Closed => Ok(()),
367    }
368}
369
370/// What: Record request outcome in circuit breaker.
371///
372/// Inputs:
373/// - `endpoint_pattern`: Endpoint pattern for this request
374/// - `success`: `true` if request succeeded, `false` if it failed
375///
376/// Output: None
377///
378/// Details:
379/// - Records outcome in recent history (max 10 entries).
380/// - On success: resets failure count, moves to Closed.
381/// - On failure: increments failure count, opens circuit if >50% failure rate.
382#[allow(clippy::significant_drop_tightening)]
383pub fn record_circuit_breaker_outcome(endpoint_pattern: &str, success: bool) {
384    // MutexGuard must be held for entire function to modify breaker state
385    let mut breakers = match CIRCUIT_BREAKERS.lock() {
386        Ok(guard) => guard,
387        Err(poisoned) => poisoned.into_inner(),
388    };
389
390    let breaker = breakers
391        .entry(endpoint_pattern.to_string())
392        .or_insert_with(|| CircuitBreakerState {
393            state: CircuitState::Closed,
394            recent_outcomes: Vec::new(),
395            endpoint_pattern: endpoint_pattern.to_string(),
396        });
397
398    // Add outcome to history (keep last N)
399    breaker.recent_outcomes.push(success);
400    if breaker.recent_outcomes.len() > CIRCUIT_BREAKER_HISTORY_SIZE {
401        breaker.recent_outcomes.remove(0);
402    }
403
404    if success {
405        // On success, reset to Closed
406        breaker.state = CircuitState::Closed;
407        if !breaker.recent_outcomes.iter().all(|&x| x) {
408            debug!(
409                endpoint_pattern,
410                "circuit breaker: request succeeded, resetting to Closed"
411            );
412        }
413    } else {
414        // On failure, check if we should open circuit
415        let failure_count = breaker
416            .recent_outcomes
417            .iter()
418            .filter(|&&outcome| !outcome)
419            .count();
420        // Calculate failure rate using integer comparison to avoid precision loss
421        // Threshold is 0.5 (50%), so we check: failure_count * 2 >= total_count
422        let total_count = breaker.recent_outcomes.len();
423
424        if failure_count * 2 >= total_count && total_count >= CIRCUIT_BREAKER_HISTORY_SIZE {
425            // Open circuit
426            breaker.state = CircuitState::Open {
427                opened_at: Instant::now(),
428            };
429            warn!(
430                endpoint_pattern,
431                failure_count,
432                total = breaker.recent_outcomes.len(),
433                failure_percentage = (failure_count * 100) / total_count,
434                "circuit breaker opened due to high failure rate"
435            );
436        } else if matches!(breaker.state, CircuitState::HalfOpen) {
437            // HalfOpen test failed, go back to Open
438            breaker.state = CircuitState::Open {
439                opened_at: Instant::now(),
440            };
441            warn!(
442                endpoint_pattern,
443                "circuit breaker: HalfOpen test failed, reopening"
444            );
445        }
446    }
447}
448
449/// What: Extract Retry-After value from error message string.
450///
451/// Inputs:
452/// - `error_msg`: Error message that may contain Retry-After information
453///
454/// Output:
455/// - `Some(seconds)` if Retry-After found in error message, `None` otherwise
456///
457/// Details:
458/// - Parses format: "error message (Retry-After: Ns)" where N is seconds.
459#[must_use]
460pub fn extract_retry_after_from_error(error_msg: &str) -> Option<u64> {
461    if let Some(start) = error_msg.find("Retry-After: ") {
462        let after_start = start + "Retry-After: ".len();
463        let remaining = &error_msg[after_start..];
464        if let Some(end) = remaining.find('s') {
465            let seconds_str = &remaining[..end];
466            return seconds_str.trim().parse::<u64>().ok();
467        }
468    }
469    None
470}
471
472/// What: Increase backoff delay for archlinux.org after a failure or rate limit.
473///
474/// Inputs:
475/// - `retry_after_seconds`: Optional Retry-After value from server (in seconds)
476///
477/// Output: None
478///
479/// Details:
480/// - If Retry-After is provided, uses that value (capped at maximum delay).
481/// - Otherwise, doubles the current backoff delay (exponential backoff).
482/// - Caps at maximum delay (60 seconds).
483/// - Increments consecutive failure counter.
484pub fn increase_archlinux_backoff(retry_after_seconds: Option<u64>) {
485    let mut limiter = match ARCHLINUX_RATE_LIMITER.lock() {
486        Ok(guard) => guard,
487        Err(poisoned) => poisoned.into_inner(),
488    };
489    limiter.consecutive_failures += 1;
490    // Use Retry-After value if provided, otherwise use exponential backoff
491    if let Some(retry_after) = retry_after_seconds {
492        // Convert seconds to milliseconds, cap at maximum
493        let retry_after_ms = (retry_after * 1000).min(ARCHLINUX_MAX_BACKOFF_MS);
494        limiter.current_backoff_ms = retry_after_ms;
495        warn!(
496            consecutive_failures = limiter.consecutive_failures,
497            retry_after_seconds = retry_after,
498            backoff_ms = limiter.current_backoff_ms,
499            "increased archlinux.org backoff delay using Retry-After header"
500        );
501    } else {
502        // Double the backoff delay, capped at maximum
503        limiter.current_backoff_ms = (limiter.current_backoff_ms * 2).min(ARCHLINUX_MAX_BACKOFF_MS);
504        warn!(
505            consecutive_failures = limiter.consecutive_failures,
506            backoff_ms = limiter.current_backoff_ms,
507            "increased archlinux.org backoff delay"
508        );
509    }
510}
511
512/// What: Reset backoff delay for archlinux.org after a successful request.
513///
514/// Inputs: None
515///
516/// Output: None
517///
518/// Details:
519/// - Resets backoff to base delay (2 seconds).
520/// - Resets consecutive failure counter.
521pub fn reset_archlinux_backoff() {
522    let mut limiter = match ARCHLINUX_RATE_LIMITER.lock() {
523        Ok(guard) => guard,
524        Err(poisoned) => poisoned.into_inner(),
525    };
526    if limiter.consecutive_failures > 0 {
527        debug!(
528            previous_failures = limiter.consecutive_failures,
529            previous_backoff_ms = limiter.current_backoff_ms,
530            "resetting archlinux.org backoff after successful request"
531        );
532    }
533    limiter.current_backoff_ms = ARCHLINUX_BASE_DELAY_MS;
534    limiter.consecutive_failures = 0;
535}