1use std::sync::Arc;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::time::{Duration, Instant};
9
10use serde::{Deserialize, Serialize};
11
12#[derive(Debug, Clone, Default, Serialize, Deserialize)]
14pub struct OperationMetrics {
15 pub count: u64,
17 pub success_count: u64,
19 pub failure_count: u64,
21 pub total_duration_ms: u64,
23 pub min_duration_ms: Option<u64>,
25 pub max_duration_ms: Option<u64>,
27 pub avg_duration_ms: Option<f64>,
29}
30
31#[derive(Debug, Default)]
33pub struct AtomicOperationMetrics {
34 count: AtomicU64,
35 success_count: AtomicU64,
36 failure_count: AtomicU64,
37 total_duration_ms: AtomicU64,
38 min_duration_ms: AtomicU64,
39 max_duration_ms: AtomicU64,
40}
41
42impl AtomicOperationMetrics {
43 pub fn new() -> Self {
45 Self {
46 count: AtomicU64::new(0),
47 success_count: AtomicU64::new(0),
48 failure_count: AtomicU64::new(0),
49 total_duration_ms: AtomicU64::new(0),
50 min_duration_ms: AtomicU64::new(u64::MAX),
51 max_duration_ms: AtomicU64::new(0),
52 }
53 }
54
55 pub fn record_success(&self, duration: Duration) {
57 let ms = duration.as_millis() as u64;
58 self.count.fetch_add(1, Ordering::Relaxed);
59 self.success_count.fetch_add(1, Ordering::Relaxed);
60 self.total_duration_ms.fetch_add(ms, Ordering::Relaxed);
61 self.update_min_max(ms);
62 }
63
64 pub fn record_failure(&self, duration: Duration) {
66 let ms = duration.as_millis() as u64;
67 self.count.fetch_add(1, Ordering::Relaxed);
68 self.failure_count.fetch_add(1, Ordering::Relaxed);
69 self.total_duration_ms.fetch_add(ms, Ordering::Relaxed);
70 self.update_min_max(ms);
71 }
72
73 fn update_min_max(&self, ms: u64) {
74 let mut current = self.min_duration_ms.load(Ordering::Relaxed);
76 while ms < current {
77 match self.min_duration_ms.compare_exchange_weak(
78 current,
79 ms,
80 Ordering::Relaxed,
81 Ordering::Relaxed,
82 ) {
83 Ok(_) => break,
84 Err(c) => current = c,
85 }
86 }
87
88 let mut current = self.max_duration_ms.load(Ordering::Relaxed);
90 while ms > current {
91 match self.max_duration_ms.compare_exchange_weak(
92 current,
93 ms,
94 Ordering::Relaxed,
95 Ordering::Relaxed,
96 ) {
97 Ok(_) => break,
98 Err(c) => current = c,
99 }
100 }
101 }
102
103 pub fn snapshot(&self) -> OperationMetrics {
105 let count = self.count.load(Ordering::Relaxed);
106 let success_count = self.success_count.load(Ordering::Relaxed);
107 let failure_count = self.failure_count.load(Ordering::Relaxed);
108 let total_duration_ms = self.total_duration_ms.load(Ordering::Relaxed);
109 let min = self.min_duration_ms.load(Ordering::Relaxed);
110 let max = self.max_duration_ms.load(Ordering::Relaxed);
111
112 let min_duration_ms = if min == u64::MAX { None } else { Some(min) };
113 let max_duration_ms = if max == 0 && count == 0 {
114 None
115 } else {
116 Some(max)
117 };
118 let avg_duration_ms = if count > 0 {
119 Some(total_duration_ms as f64 / count as f64)
120 } else {
121 None
122 };
123
124 OperationMetrics {
125 count,
126 success_count,
127 failure_count,
128 total_duration_ms,
129 min_duration_ms,
130 max_duration_ms,
131 avg_duration_ms,
132 }
133 }
134
135 pub fn reset(&self) {
137 self.count.store(0, Ordering::Relaxed);
138 self.success_count.store(0, Ordering::Relaxed);
139 self.failure_count.store(0, Ordering::Relaxed);
140 self.total_duration_ms.store(0, Ordering::Relaxed);
141 self.min_duration_ms.store(u64::MAX, Ordering::Relaxed);
142 self.max_duration_ms.store(0, Ordering::Relaxed);
143 }
144}
145
146#[derive(Debug)]
148pub struct ConnectionMetrics {
149 connected_at: Option<Instant>,
151 pub connect: AtomicOperationMetrics,
153 pub disconnect: AtomicOperationMetrics,
155 pub reads: AtomicOperationMetrics,
157 pub writes: AtomicOperationMetrics,
159 pub reconnects: AtomicOperationMetrics,
161 bytes_read: AtomicU64,
163 bytes_written: AtomicU64,
165}
166
167impl Default for ConnectionMetrics {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173impl ConnectionMetrics {
174 pub fn new() -> Self {
176 Self {
177 connected_at: None,
178 connect: AtomicOperationMetrics::new(),
179 disconnect: AtomicOperationMetrics::new(),
180 reads: AtomicOperationMetrics::new(),
181 writes: AtomicOperationMetrics::new(),
182 reconnects: AtomicOperationMetrics::new(),
183 bytes_read: AtomicU64::new(0),
184 bytes_written: AtomicU64::new(0),
185 }
186 }
187
188 pub fn shared() -> Arc<Self> {
190 Arc::new(Self::new())
191 }
192
193 pub fn mark_connected(&mut self) {
195 self.connected_at = Some(Instant::now());
196 }
197
198 pub fn uptime(&self) -> Option<Duration> {
200 self.connected_at.map(|t| t.elapsed())
201 }
202
203 pub fn record_bytes_read(&self, bytes: u64) {
205 self.bytes_read.fetch_add(bytes, Ordering::Relaxed);
206 }
207
208 pub fn record_bytes_written(&self, bytes: u64) {
210 self.bytes_written.fetch_add(bytes, Ordering::Relaxed);
211 }
212
213 pub fn total_bytes_read(&self) -> u64 {
215 self.bytes_read.load(Ordering::Relaxed)
216 }
217
218 pub fn total_bytes_written(&self) -> u64 {
220 self.bytes_written.load(Ordering::Relaxed)
221 }
222
223 pub fn summary(&self) -> ConnectionMetricsSummary {
225 ConnectionMetricsSummary {
226 uptime_ms: self.uptime().map(|d| d.as_millis() as u64),
227 connect: self.connect.snapshot(),
228 disconnect: self.disconnect.snapshot(),
229 reads: self.reads.snapshot(),
230 writes: self.writes.snapshot(),
231 reconnects: self.reconnects.snapshot(),
232 bytes_read: self.total_bytes_read(),
233 bytes_written: self.total_bytes_written(),
234 }
235 }
236
237 pub fn reset(&mut self) {
239 self.connected_at = None;
240 self.connect.reset();
241 self.disconnect.reset();
242 self.reads.reset();
243 self.writes.reset();
244 self.reconnects.reset();
245 self.bytes_read.store(0, Ordering::Relaxed);
246 self.bytes_written.store(0, Ordering::Relaxed);
247 }
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize)]
252pub struct ConnectionMetricsSummary {
253 pub uptime_ms: Option<u64>,
255 pub connect: OperationMetrics,
257 pub disconnect: OperationMetrics,
259 pub reads: OperationMetrics,
261 pub writes: OperationMetrics,
263 pub reconnects: OperationMetrics,
265 pub bytes_read: u64,
267 pub bytes_written: u64,
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274
275 #[test]
276 fn test_operation_metrics_success() {
277 let metrics = AtomicOperationMetrics::new();
278 metrics.record_success(Duration::from_millis(100));
279 metrics.record_success(Duration::from_millis(200));
280
281 let snapshot = metrics.snapshot();
282 assert_eq!(snapshot.count, 2);
283 assert_eq!(snapshot.success_count, 2);
284 assert_eq!(snapshot.failure_count, 0);
285 assert_eq!(snapshot.min_duration_ms, Some(100));
286 assert_eq!(snapshot.max_duration_ms, Some(200));
287 }
288
289 #[test]
290 fn test_operation_metrics_failure() {
291 let metrics = AtomicOperationMetrics::new();
292 metrics.record_failure(Duration::from_millis(50));
293
294 let snapshot = metrics.snapshot();
295 assert_eq!(snapshot.count, 1);
296 assert_eq!(snapshot.success_count, 0);
297 assert_eq!(snapshot.failure_count, 1);
298 }
299
300 #[test]
301 fn test_operation_metrics_reset() {
302 let metrics = AtomicOperationMetrics::new();
303 metrics.record_success(Duration::from_millis(100));
304 metrics.reset();
305
306 let snapshot = metrics.snapshot();
307 assert_eq!(snapshot.count, 0);
308 assert_eq!(snapshot.success_count, 0);
309 }
310
311 #[test]
312 fn test_connection_metrics() {
313 let mut metrics = ConnectionMetrics::new();
314 metrics.mark_connected();
315 metrics.reads.record_success(Duration::from_millis(10));
316 metrics.record_bytes_read(100);
317
318 assert!(metrics.uptime().is_some());
319 assert_eq!(metrics.total_bytes_read(), 100);
320
321 let summary = metrics.summary();
322 assert_eq!(summary.reads.count, 1);
323 assert_eq!(summary.bytes_read, 100);
324 }
325}