pacsea/sources/feeds/rate_limit.rs
1//! Rate limiting and circuit breaker for network requests.
2use std::collections::HashMap;
3use std::sync::{LazyLock, Mutex};
4use std::time::{Duration, Instant};
5
6use rand::Rng;
7use tracing::{debug, warn};
8
9use super::Result;
10
11/// Rate limiter for news feed network requests.
12/// Tracks the last request time to enforce minimum delay between requests.
13static RATE_LIMITER: LazyLock<Mutex<Instant>> = LazyLock::new(|| Mutex::new(Instant::now()));
14/// Minimum delay between news feed network requests (500ms).
15const RATE_LIMIT_DELAY_MS: u64 = 500;
16
17/// Rate limiter state for archlinux.org with exponential backoff.
18struct ArchLinuxRateLimiter {
19 /// Last request timestamp.
20 last_request: Instant,
21 /// Current backoff delay in milliseconds (starts at base delay, increases exponentially).
22 current_backoff_ms: u64,
23 /// Number of consecutive failures/rate limits.
24 consecutive_failures: u32,
25}
26
27/// Rate limiter for archlinux.org requests with exponential backoff.
28/// Tracks last request time and implements progressive delays on failures.
29static ARCHLINUX_RATE_LIMITER: LazyLock<Mutex<ArchLinuxRateLimiter>> = LazyLock::new(|| {
30 Mutex::new(ArchLinuxRateLimiter {
31 last_request: Instant::now(),
32 current_backoff_ms: 500, // Start with 500ms base delay (reduced from 2s for faster initial requests)
33 consecutive_failures: 0,
34 })
35});
36
37/// Semaphore to serialize archlinux.org requests (only 1 concurrent request allowed).
38/// This prevents multiple async tasks from overwhelming the server even when rate limiting
39/// is applied, because the rate limiter alone doesn't prevent concurrent requests that
40/// start at nearly the same time from all proceeding simultaneously.
41static ARCHLINUX_REQUEST_SEMAPHORE: LazyLock<std::sync::Arc<tokio::sync::Semaphore>> =
42 LazyLock::new(|| std::sync::Arc::new(tokio::sync::Semaphore::new(1)));
43
44/// Base delay for archlinux.org requests (2 seconds).
45const ARCHLINUX_BASE_DELAY_MS: u64 = 500; // Reduced from 2000ms for faster initial requests
46/// Maximum backoff delay (60 seconds).
47const ARCHLINUX_MAX_BACKOFF_MS: u64 = 60000;
48
49/// Circuit breaker state for tracking failures per endpoint type.
50#[derive(Debug, Clone)]
51enum CircuitState {
52 /// Circuit is closed - normal operation.
53 Closed,
54 /// Circuit is open - blocking requests due to failures.
55 Open {
56 /// Timestamp when circuit was opened (for cooldown calculation).
57 opened_at: Instant,
58 },
59 /// Circuit is half-open - allowing one test request.
60 HalfOpen,
61}
62
63/// Circuit breaker state tracking failures per endpoint pattern.
64struct CircuitBreakerState {
65 /// Current circuit state.
66 state: CircuitState,
67 /// Recent request outcomes (true = success, false = failure).
68 /// Tracks last 10 requests to calculate failure rate.
69 recent_outcomes: Vec<bool>,
70 /// Endpoint pattern this breaker tracks (e.g., "/feeds/news/", "/packages/*/json/").
71 /// Stored for debugging/logging purposes.
72 #[allow(dead_code)]
73 endpoint_pattern: String,
74}
75
76/// Circuit breakers per endpoint pattern.
77/// Key: endpoint pattern (e.g., "/feeds/news/", "/packages/*/json/")
78static CIRCUIT_BREAKERS: LazyLock<Mutex<HashMap<String, CircuitBreakerState>>> =
79 LazyLock::new(|| Mutex::new(HashMap::new()));
80
81/// Maximum number of recent outcomes to track for failure rate calculation.
82const CIRCUIT_BREAKER_HISTORY_SIZE: usize = 10;
83/// Failure rate threshold to open circuit (50% = 5 failures out of 10).
84/// Used in calculation: `failure_count * 2 >= total_count` (equivalent to `>= 0.5`).
85#[allow(dead_code)]
86const CIRCUIT_BREAKER_FAILURE_THRESHOLD: f64 = 0.5;
87/// Cooldown period before transitioning from `Open` to `HalfOpen` (60 seconds).
88const CIRCUIT_BREAKER_COOLDOWN_SECS: u64 = 60;
89
90/// Flag indicating a network error occurred during the last news fetch.
91/// This can be checked by the UI to show a toast message.
92static NETWORK_ERROR_FLAG: std::sync::atomic::AtomicBool =
93 std::sync::atomic::AtomicBool::new(false);
94
95/// What: Check and clear the network error flag.
96///
97/// Inputs: None
98///
99/// Output: `true` if a network error occurred since the last check, `false` otherwise.
100///
101/// Details:
102/// - Atomically loads and clears the flag.
103/// - Used by the UI to show a toast when news fetch had network issues.
104#[must_use]
105pub fn take_network_error() -> bool {
106 NETWORK_ERROR_FLAG.swap(false, std::sync::atomic::Ordering::SeqCst)
107}
108
109/// What: Set the network error flag.
110///
111/// Inputs: None
112///
113/// Output: None
114///
115/// Details:
116/// - Called when a network error occurs during news fetching.
117pub(super) fn set_network_error() {
118 NETWORK_ERROR_FLAG.store(true, std::sync::atomic::Ordering::SeqCst);
119}
120
121/// What: Retry a network operation with exponential backoff on failure.
122///
123/// Inputs:
124/// - `operation`: Async closure that returns a Result
125/// - `max_retries`: Maximum number of retry attempts
126///
127/// Output:
128/// - Result from the operation, or error if all retries fail
129///
130/// Details:
131/// - On failure, waits with exponential backoff: 1s, 2s, 4s...
132/// - Stops retrying after `max_retries` attempts
133pub(super) async fn retry_with_backoff<T, E, F, Fut>(
134 mut operation: F,
135 max_retries: usize,
136) -> std::result::Result<T, E>
137where
138 F: FnMut() -> Fut,
139 Fut: std::future::Future<Output = std::result::Result<T, E>>,
140{
141 let mut attempt = 0;
142 loop {
143 match operation().await {
144 Ok(result) => return Ok(result),
145 Err(e) => {
146 if attempt >= max_retries {
147 return Err(e);
148 }
149 attempt += 1;
150 let backoff_secs = 1u64 << (attempt - 1); // Exponential: 1, 2, 4, 8...
151 warn!(
152 attempt,
153 max_retries,
154 backoff_secs,
155 "network request failed, retrying with exponential backoff"
156 );
157 tokio::time::sleep(Duration::from_secs(backoff_secs)).await;
158 }
159 }
160 }
161}
162
163/// What: Apply rate limiting before making a network request.
164///
165/// Inputs: None
166///
167/// Output: None (async sleep if needed)
168///
169/// Details:
170/// - Ensures minimum delay between network requests to avoid overwhelming servers.
171/// - Thread-safe via mutex guarding the last request timestamp.
172pub(super) async fn rate_limit() {
173 let delay_needed = {
174 let mut last_request = match RATE_LIMITER.lock() {
175 Ok(guard) => guard,
176 Err(poisoned) => poisoned.into_inner(),
177 };
178 let elapsed = last_request.elapsed();
179 let min_delay = Duration::from_millis(RATE_LIMIT_DELAY_MS);
180 let delay = if elapsed < min_delay {
181 // Safe to unwrap because we checked elapsed < min_delay above
182 #[allow(clippy::unwrap_used)]
183 min_delay.checked_sub(elapsed).unwrap()
184 } else {
185 Duration::ZERO
186 };
187 *last_request = Instant::now();
188 delay
189 };
190 if !delay_needed.is_zero() {
191 tokio::time::sleep(delay_needed).await;
192 }
193}
194
195/// Maximum jitter in milliseconds to add to rate limiting delays (prevents thundering herd).
196const JITTER_MAX_MS: u64 = 500;
197
198/// What: Apply rate limiting specifically for archlinux.org requests with exponential backoff.
199///
200/// Inputs: None
201///
202/// Output: `OwnedSemaphorePermit` that the caller MUST hold during the request.
203///
204/// # Panics
205/// - Panics if the archlinux.org request semaphore is closed (should never happen in practice).
206///
207/// Details:
208/// - Acquires a semaphore permit to serialize archlinux.org requests (only 1 at a time).
209/// - Uses longer base delay (2 seconds) for archlinux.org to reduce request frequency.
210/// - Implements exponential backoff: increases delay on consecutive failures (2s → 4s → 8s → 16s, max 60s).
211/// - Adds random jitter (0-500ms) to prevent thundering herd when multiple clients retry simultaneously.
212/// - Resets backoff after successful requests.
213/// - Thread-safe via mutex guarding the rate limiter state.
214/// - The returned permit MUST be held until the HTTP request completes to ensure serialization.
215/// - If the permit is dropped before the HTTP request completes, another request may start concurrently,
216/// defeating the serialization and potentially causing race conditions or overwhelming the server.
217pub async fn rate_limit_archlinux() -> tokio::sync::OwnedSemaphorePermit {
218 // 1. Acquire semaphore to serialize requests (waits if another request is in progress)
219 // This is the key change - ensures only one archlinux.org request at a time
220 let permit = ARCHLINUX_REQUEST_SEMAPHORE
221 .clone()
222 .acquire_owned()
223 .await
224 // Semaphore is never closed, so this cannot fail in practice
225 .expect("archlinux.org request semaphore should never be closed");
226
227 // 2. Now that we have exclusive access, compute and apply the rate limiting delay
228 let delay_needed = {
229 let mut limiter = match ARCHLINUX_RATE_LIMITER.lock() {
230 Ok(guard) => guard,
231 Err(poisoned) => poisoned.into_inner(),
232 };
233 let elapsed = limiter.last_request.elapsed();
234 let min_delay = Duration::from_millis(limiter.current_backoff_ms);
235 let delay = if elapsed < min_delay {
236 // Safe to unwrap because we checked elapsed < min_delay above
237 #[allow(clippy::unwrap_used)]
238 min_delay.checked_sub(elapsed).unwrap()
239 } else {
240 Duration::ZERO
241 };
242 limiter.last_request = Instant::now();
243 delay
244 };
245
246 if !delay_needed.is_zero() {
247 // Add random jitter to prevent thundering herd when multiple clients retry simultaneously
248 let jitter_ms = rand::rng().random_range(0..=JITTER_MAX_MS);
249 let delay_with_jitter = delay_needed + Duration::from_millis(jitter_ms);
250 // Safe to unwrap: delay_ms will be small (max 60s = 60000ms, well within u64)
251 #[allow(clippy::cast_possible_truncation)]
252 let delay_ms = delay_needed.as_millis() as u64;
253 debug!(
254 delay_ms,
255 jitter_ms,
256 total_ms = delay_with_jitter.as_millis(),
257 "rate limiting archlinux.org request with jitter"
258 );
259 tokio::time::sleep(delay_with_jitter).await;
260 }
261
262 // 3. Return the permit - caller MUST hold it during the request
263 permit
264}
265
266/// What: Extract endpoint pattern from URL for circuit breaker tracking.
267///
268/// Inputs:
269/// - `url`: Full URL to extract pattern from
270///
271/// Output:
272/// - Endpoint pattern string (e.g., "/feeds/news/", "/packages/*/json/")
273///
274/// Details:
275/// - Normalizes URLs to endpoint patterns for grouping similar requests.
276/// - Replaces specific package names with "*" for JSON endpoints.
277#[must_use]
278pub fn extract_endpoint_pattern(url: &str) -> String {
279 // Extract path from URL
280 if let Some(path_start) = url.find("://")
281 && let Some(path_pos) = url[path_start + 3..].find('/')
282 {
283 let path = &url[path_start + 3 + path_pos..];
284 // Normalize package-specific endpoints
285 if path.contains("/packages/") && path.contains("/json/") {
286 // Pattern: /packages/{repo}/{arch}/{name}/json/ -> /packages/*/json/
287 if let Some(json_pos) = path.find("/json/") {
288 let base = &path[..json_pos];
289 if let Some(last_slash) = base.rfind('/') {
290 return format!("{}/*/json/", &base[..=last_slash]);
291 }
292 }
293 }
294 // For feeds, use the full path
295 if path.starts_with("/feeds/") {
296 return path.to_string();
297 }
298 // For news articles, use /news/ pattern
299 if path.contains("/news/")
300 && !path.ends_with('/')
301 && let Some(news_pos) = path.find("/news/")
302 {
303 return format!("{}/*", &path[..news_pos + "/news/".len()]);
304 }
305 return path.to_string();
306 }
307 url.to_string()
308}
309
310/// What: Check circuit breaker state before making a request.
311///
312/// Inputs:
313/// - `endpoint_pattern`: Endpoint pattern to check circuit breaker for
314///
315/// Output:
316/// - `Ok(())` if request should proceed, `Err` with cached error if circuit is open
317///
318/// # Errors
319/// - Returns `Err` if the circuit breaker is open and cooldown period has not expired.
320///
321/// Details:
322/// - Returns error immediately if circuit is Open and cooldown not expired.
323/// - Allows request if circuit is Closed or `HalfOpen`.
324/// - Automatically transitions `Open` → `HalfOpen` after cooldown period.
325#[allow(clippy::significant_drop_tightening)]
326pub fn check_circuit_breaker(endpoint_pattern: &str) -> Result<()> {
327 // MutexGuard must be held for entire function to modify breaker state
328 let mut breakers = match CIRCUIT_BREAKERS.lock() {
329 Ok(guard) => guard,
330 Err(poisoned) => poisoned.into_inner(),
331 };
332
333 let breaker = breakers
334 .entry(endpoint_pattern.to_string())
335 .or_insert_with(|| CircuitBreakerState {
336 state: CircuitState::Closed,
337 recent_outcomes: Vec::new(),
338 endpoint_pattern: endpoint_pattern.to_string(),
339 });
340
341 match &breaker.state {
342 CircuitState::Open { opened_at } => {
343 let elapsed = opened_at.elapsed();
344 if elapsed.as_secs() >= CIRCUIT_BREAKER_COOLDOWN_SECS {
345 // Transition to HalfOpen after cooldown
346 breaker.state = CircuitState::HalfOpen;
347 debug!(
348 endpoint_pattern,
349 "circuit breaker transitioning Open → HalfOpen after cooldown"
350 );
351 Ok(())
352 } else {
353 // Still in cooldown, block request
354 let remaining = CIRCUIT_BREAKER_COOLDOWN_SECS - elapsed.as_secs();
355 warn!(
356 endpoint_pattern,
357 remaining_secs = remaining,
358 "circuit breaker is Open, blocking request"
359 );
360 Err(format!(
361 "Circuit breaker is Open for {endpoint_pattern} (cooldown: {remaining}s remaining)"
362 )
363 .into())
364 }
365 }
366 CircuitState::HalfOpen | CircuitState::Closed => Ok(()),
367 }
368}
369
370/// What: Record request outcome in circuit breaker.
371///
372/// Inputs:
373/// - `endpoint_pattern`: Endpoint pattern for this request
374/// - `success`: `true` if request succeeded, `false` if it failed
375///
376/// Output: None
377///
378/// Details:
379/// - Records outcome in recent history (max 10 entries).
380/// - On success: resets failure count, moves to Closed.
381/// - On failure: increments failure count, opens circuit if >50% failure rate.
382#[allow(clippy::significant_drop_tightening)]
383pub fn record_circuit_breaker_outcome(endpoint_pattern: &str, success: bool) {
384 // MutexGuard must be held for entire function to modify breaker state
385 let mut breakers = match CIRCUIT_BREAKERS.lock() {
386 Ok(guard) => guard,
387 Err(poisoned) => poisoned.into_inner(),
388 };
389
390 let breaker = breakers
391 .entry(endpoint_pattern.to_string())
392 .or_insert_with(|| CircuitBreakerState {
393 state: CircuitState::Closed,
394 recent_outcomes: Vec::new(),
395 endpoint_pattern: endpoint_pattern.to_string(),
396 });
397
398 // Add outcome to history (keep last N)
399 breaker.recent_outcomes.push(success);
400 if breaker.recent_outcomes.len() > CIRCUIT_BREAKER_HISTORY_SIZE {
401 breaker.recent_outcomes.remove(0);
402 }
403
404 if success {
405 // On success, reset to Closed
406 breaker.state = CircuitState::Closed;
407 if !breaker.recent_outcomes.iter().all(|&x| x) {
408 debug!(
409 endpoint_pattern,
410 "circuit breaker: request succeeded, resetting to Closed"
411 );
412 }
413 } else {
414 // On failure, check if we should open circuit
415 let failure_count = breaker
416 .recent_outcomes
417 .iter()
418 .filter(|&&outcome| !outcome)
419 .count();
420 // Calculate failure rate using integer comparison to avoid precision loss
421 // Threshold is 0.5 (50%), so we check: failure_count * 2 >= total_count
422 let total_count = breaker.recent_outcomes.len();
423
424 if failure_count * 2 >= total_count && total_count >= CIRCUIT_BREAKER_HISTORY_SIZE {
425 // Open circuit
426 breaker.state = CircuitState::Open {
427 opened_at: Instant::now(),
428 };
429 warn!(
430 endpoint_pattern,
431 failure_count,
432 total = breaker.recent_outcomes.len(),
433 failure_percentage = (failure_count * 100) / total_count,
434 "circuit breaker opened due to high failure rate"
435 );
436 } else if matches!(breaker.state, CircuitState::HalfOpen) {
437 // HalfOpen test failed, go back to Open
438 breaker.state = CircuitState::Open {
439 opened_at: Instant::now(),
440 };
441 warn!(
442 endpoint_pattern,
443 "circuit breaker: HalfOpen test failed, reopening"
444 );
445 }
446 }
447}
448
449/// What: Extract Retry-After value from error message string.
450///
451/// Inputs:
452/// - `error_msg`: Error message that may contain Retry-After information
453///
454/// Output:
455/// - `Some(seconds)` if Retry-After found in error message, `None` otherwise
456///
457/// Details:
458/// - Parses format: "error message (Retry-After: Ns)" where N is seconds.
459#[must_use]
460pub fn extract_retry_after_from_error(error_msg: &str) -> Option<u64> {
461 if let Some(start) = error_msg.find("Retry-After: ") {
462 let after_start = start + "Retry-After: ".len();
463 let remaining = &error_msg[after_start..];
464 if let Some(end) = remaining.find('s') {
465 let seconds_str = &remaining[..end];
466 return seconds_str.trim().parse::<u64>().ok();
467 }
468 }
469 None
470}
471
472/// What: Increase backoff delay for archlinux.org after a failure or rate limit.
473///
474/// Inputs:
475/// - `retry_after_seconds`: Optional Retry-After value from server (in seconds)
476///
477/// Output: None
478///
479/// Details:
480/// - If Retry-After is provided, uses that value (capped at maximum delay).
481/// - Otherwise, doubles the current backoff delay (exponential backoff).
482/// - Caps at maximum delay (60 seconds).
483/// - Increments consecutive failure counter.
484pub fn increase_archlinux_backoff(retry_after_seconds: Option<u64>) {
485 let mut limiter = match ARCHLINUX_RATE_LIMITER.lock() {
486 Ok(guard) => guard,
487 Err(poisoned) => poisoned.into_inner(),
488 };
489 limiter.consecutive_failures += 1;
490 // Use Retry-After value if provided, otherwise use exponential backoff
491 if let Some(retry_after) = retry_after_seconds {
492 // Convert seconds to milliseconds, cap at maximum
493 let retry_after_ms = (retry_after * 1000).min(ARCHLINUX_MAX_BACKOFF_MS);
494 limiter.current_backoff_ms = retry_after_ms;
495 warn!(
496 consecutive_failures = limiter.consecutive_failures,
497 retry_after_seconds = retry_after,
498 backoff_ms = limiter.current_backoff_ms,
499 "increased archlinux.org backoff delay using Retry-After header"
500 );
501 } else {
502 // Double the backoff delay, capped at maximum
503 limiter.current_backoff_ms = (limiter.current_backoff_ms * 2).min(ARCHLINUX_MAX_BACKOFF_MS);
504 warn!(
505 consecutive_failures = limiter.consecutive_failures,
506 backoff_ms = limiter.current_backoff_ms,
507 "increased archlinux.org backoff delay"
508 );
509 }
510}
511
512/// What: Reset backoff delay for archlinux.org after a successful request.
513///
514/// Inputs: None
515///
516/// Output: None
517///
518/// Details:
519/// - Resets backoff to base delay (2 seconds).
520/// - Resets consecutive failure counter.
521pub fn reset_archlinux_backoff() {
522 let mut limiter = match ARCHLINUX_RATE_LIMITER.lock() {
523 Ok(guard) => guard,
524 Err(poisoned) => poisoned.into_inner(),
525 };
526 if limiter.consecutive_failures > 0 {
527 debug!(
528 previous_failures = limiter.consecutive_failures,
529 previous_backoff_ms = limiter.current_backoff_ms,
530 "resetting archlinux.org backoff after successful request"
531 );
532 }
533 limiter.current_backoff_ms = ARCHLINUX_BASE_DELAY_MS;
534 limiter.consecutive_failures = 0;
535}