1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::future::join_all;
11use tokio::sync::RwLock;
12use tokio::time::interval;
13use tokio_util::sync::CancellationToken;
14use tracing::{debug, info, warn};
15
16use aranet_types::{CurrentReading, DeviceInfo, DeviceType};
17
18use crate::device::Device;
19use crate::error::{Error, Result};
20use crate::events::{DeviceEvent, DeviceId, DisconnectReason, EventDispatcher};
21use crate::reconnect::ReconnectOptions;
22use crate::scan::{DiscoveredDevice, ScanOptions, scan_with_options};
23
24#[derive(Debug)]
26pub struct ManagedDevice {
27 pub id: String,
29 pub name: Option<String>,
31 pub device_type: Option<DeviceType>,
33 device: Option<Arc<Device>>,
36 pub auto_reconnect: bool,
38 pub last_reading: Option<CurrentReading>,
40 pub info: Option<DeviceInfo>,
42 pub reconnect_options: ReconnectOptions,
44}
45
46impl ManagedDevice {
47 pub fn new(id: &str) -> Self {
49 Self {
50 id: id.to_string(),
51 name: None,
52 device_type: None,
53 device: None,
54 auto_reconnect: true,
55 last_reading: None,
56 info: None,
57 reconnect_options: ReconnectOptions::default(),
58 }
59 }
60
61 pub fn with_reconnect_options(id: &str, options: ReconnectOptions) -> Self {
63 Self {
64 reconnect_options: options,
65 ..Self::new(id)
66 }
67 }
68
69 pub fn has_device(&self) -> bool {
71 self.device.is_some()
72 }
73
74 pub async fn is_connected(&self) -> bool {
76 if let Some(device) = &self.device {
77 device.is_connected().await
78 } else {
79 false
80 }
81 }
82
83 pub fn device(&self) -> Option<&Arc<Device>> {
85 self.device.as_ref()
86 }
87
88 pub fn device_arc(&self) -> Option<Arc<Device>> {
90 self.device.clone()
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct ManagerConfig {
97 pub scan_options: ScanOptions,
99 pub default_reconnect_options: ReconnectOptions,
101 pub event_capacity: usize,
103 pub health_check_interval: Duration,
105}
106
107impl Default for ManagerConfig {
108 fn default() -> Self {
109 Self {
110 scan_options: ScanOptions::default(),
111 default_reconnect_options: ReconnectOptions::default(),
112 event_capacity: 100,
113 health_check_interval: Duration::from_secs(30),
114 }
115 }
116}
117
118pub struct DeviceManager {
120 devices: RwLock<HashMap<String, ManagedDevice>>,
122 events: EventDispatcher,
124 config: ManagerConfig,
126}
127
128impl DeviceManager {
129 pub fn new() -> Self {
131 Self::with_config(ManagerConfig::default())
132 }
133
134 pub fn with_event_capacity(capacity: usize) -> Self {
136 Self::with_config(ManagerConfig {
137 event_capacity: capacity,
138 ..Default::default()
139 })
140 }
141
142 pub fn with_config(config: ManagerConfig) -> Self {
144 Self {
145 devices: RwLock::new(HashMap::new()),
146 events: EventDispatcher::new(config.event_capacity),
147 config,
148 }
149 }
150
151 pub fn events(&self) -> &EventDispatcher {
153 &self.events
154 }
155
156 pub fn config(&self) -> &ManagerConfig {
158 &self.config
159 }
160
161 pub async fn scan(&self) -> Result<Vec<DiscoveredDevice>> {
163 scan_with_options(self.config.scan_options.clone()).await
164 }
165
166 pub async fn scan_with_options(&self, options: ScanOptions) -> Result<Vec<DiscoveredDevice>> {
168 let devices = scan_with_options(options).await?;
169
170 for device in &devices {
172 self.events.send(DeviceEvent::Discovered {
173 device: DeviceId {
174 id: device.identifier.clone(),
175 name: device.name.clone(),
176 device_type: device.device_type,
177 },
178 rssi: device.rssi,
179 });
180 }
181
182 Ok(devices)
183 }
184
185 pub async fn add_device(&self, identifier: &str) -> Result<()> {
187 self.add_device_with_options(identifier, self.config.default_reconnect_options.clone())
188 .await
189 }
190
191 pub async fn add_device_with_options(
193 &self,
194 identifier: &str,
195 reconnect_options: ReconnectOptions,
196 ) -> Result<()> {
197 let mut devices = self.devices.write().await;
198
199 if devices.contains_key(identifier) {
200 return Ok(()); }
202
203 let managed = ManagedDevice::with_reconnect_options(identifier, reconnect_options);
204 devices.insert(identifier.to_string(), managed);
205
206 info!("Added device to manager: {}", identifier);
207 Ok(())
208 }
209
210 pub async fn connect(&self, identifier: &str) -> Result<()> {
220 let reconnect_options = {
222 let mut devices = self.devices.write().await;
223
224 let managed = devices.entry(identifier.to_string()).or_insert_with(|| {
226 info!("Adding device to manager: {}", identifier);
227 ManagedDevice::with_reconnect_options(
228 identifier,
229 self.config.default_reconnect_options.clone(),
230 )
231 });
232
233 if managed.device.is_some() {
235 debug!("Device {} already has a connection handle", identifier);
236 return Ok(());
237 }
238
239 managed.reconnect_options.clone()
241 };
242 let _ = reconnect_options;
247 let device = Arc::new(Device::connect(identifier).await?);
248 let info = device.read_device_info().await.ok();
249 let device_type = device.device_type();
250 let name = device.name().map(|s| s.to_string());
251
252 {
254 let mut devices = self.devices.write().await;
255 if let Some(managed) = devices.get_mut(identifier) {
256 if managed.device.is_some() {
258 debug!(
260 "Another task connected {} while we were connecting, discarding our connection",
261 identifier
262 );
263 drop(devices); let _ = device.disconnect().await;
265 return Ok(());
266 }
267
268 managed.device = Some(device);
269 managed.info = info.clone();
270 managed.device_type = device_type;
271 managed.name = name.clone();
272 } else {
273 let mut managed = ManagedDevice::new(identifier);
275 managed.device = Some(device);
276 managed.info = info.clone();
277 managed.device_type = device_type;
278 managed.name = name.clone();
279 devices.insert(identifier.to_string(), managed);
280 }
281 }
282
283 self.events.send(DeviceEvent::Connected {
285 device: DeviceId {
286 id: identifier.to_string(),
287 name,
288 device_type,
289 },
290 info,
291 });
292
293 info!("Connected to device: {}", identifier);
294 Ok(())
295 }
296
297 pub async fn disconnect(&self, identifier: &str) -> Result<()> {
299 let device_arc = {
300 let mut devices = self.devices.write().await;
301 if let Some(managed) = devices.get_mut(identifier) {
302 managed.device.take()
303 } else {
304 None
305 }
306 };
307
308 if let Some(device) = device_arc {
310 device.disconnect().await?;
311 self.events.send(DeviceEvent::Disconnected {
312 device: DeviceId::new(identifier),
313 reason: DisconnectReason::UserRequested,
314 });
315 }
316
317 Ok(())
318 }
319
320 pub async fn remove_device(&self, identifier: &str) -> Result<()> {
322 self.disconnect(identifier).await?;
323 self.devices.write().await.remove(identifier);
324 info!("Removed device from manager: {}", identifier);
325 Ok(())
326 }
327
328 pub async fn device_ids(&self) -> Vec<String> {
330 self.devices.read().await.keys().cloned().collect()
331 }
332
333 pub async fn device_count(&self) -> usize {
335 self.devices.read().await.len()
336 }
337
338 pub async fn connected_count(&self) -> usize {
344 let devices = self.devices.read().await;
345 devices.values().filter(|m| m.has_device()).count()
346 }
347
348 pub async fn connected_count_verified(&self) -> usize {
353 let device_arcs: Vec<Arc<Device>> = {
355 let devices = self.devices.read().await;
356 devices.values().filter_map(|m| m.device_arc()).collect()
357 };
358 let futures = device_arcs.iter().map(|d| d.is_connected());
362 let results = join_all(futures).await;
363
364 results.into_iter().filter(|&connected| connected).count()
365 }
366
367 pub async fn read_current(&self, identifier: &str) -> Result<CurrentReading> {
369 let device = {
371 let devices = self.devices.read().await;
372 let managed = devices
373 .get(identifier)
374 .ok_or_else(|| Error::device_not_found(identifier))?;
375 managed.device_arc().ok_or(Error::NotConnected)?
376 };
377 let reading = device.read_current().await?;
380
381 self.events.send(DeviceEvent::Reading {
383 device: DeviceId::new(identifier),
384 reading,
385 });
386
387 {
389 let mut devices = self.devices.write().await;
390 if let Some(managed) = devices.get_mut(identifier) {
391 managed.last_reading = Some(reading);
392 }
393 }
394
395 Ok(reading)
396 }
397
398 pub async fn read_all(&self) -> HashMap<String, Result<CurrentReading>> {
404 let devices_to_read: Vec<(String, Arc<Device>)> = {
406 let devices = self.devices.read().await;
407 devices
408 .iter()
409 .filter_map(|(id, managed)| managed.device_arc().map(|d| (id.clone(), d)))
410 .collect()
411 };
412 let read_futures = devices_to_read.iter().map(|(id, device)| {
416 let id = id.clone();
417 let device = Arc::clone(device);
418 async move {
419 let result = device.read_current().await;
420 (id, result)
421 }
422 });
423
424 let read_results: Vec<(String, Result<CurrentReading>)> = join_all(read_futures).await;
425
426 for (id, result) in &read_results {
428 if let Ok(reading) = result {
429 self.events.send(DeviceEvent::Reading {
430 device: DeviceId::new(id),
431 reading: *reading,
432 });
433 }
434 }
435
436 {
438 let mut devices = self.devices.write().await;
439 for (id, result) in &read_results {
440 if let Ok(reading) = result
441 && let Some(managed) = devices.get_mut(id)
442 {
443 managed.last_reading = Some(*reading);
444 }
445 }
446 }
447
448 read_results.into_iter().collect()
449 }
450
451 pub async fn connect_all(&self) -> HashMap<String, Result<()>> {
455 let ids: Vec<_> = self.devices.read().await.keys().cloned().collect();
456
457 let connect_futures = ids.iter().map(|id| {
460 let id = id.clone();
461 async move {
462 let result = self.connect(&id).await;
463 (id, result)
464 }
465 });
466
467 join_all(connect_futures).await.into_iter().collect()
468 }
469
470 pub async fn disconnect_all(&self) -> HashMap<String, Result<()>> {
474 let devices_to_disconnect: Vec<(String, Arc<Device>)> = {
476 let mut devices = self.devices.write().await;
477 devices
478 .iter_mut()
479 .filter_map(|(id, managed)| managed.device.take().map(|d| (id.clone(), d)))
480 .collect()
481 };
482
483 let disconnect_futures = devices_to_disconnect.iter().map(|(id, device)| {
485 let id = id.clone();
486 let device = Arc::clone(device);
487 async move {
488 let result = device.disconnect().await;
489 (id, result)
490 }
491 });
492
493 let results: Vec<(String, Result<()>)> = join_all(disconnect_futures).await;
494
495 for (id, result) in &results {
497 if result.is_ok() {
498 self.events.send(DeviceEvent::Disconnected {
499 device: DeviceId::new(id),
500 reason: DisconnectReason::UserRequested,
501 });
502 }
503 }
504
505 results.into_iter().collect()
506 }
507
508 pub fn try_is_connected(&self, identifier: &str) -> Option<bool> {
518 match self.devices.try_read() {
520 Ok(devices) => Some(
521 devices
522 .get(identifier)
523 .map(|m| m.has_device())
524 .unwrap_or(false),
525 ),
526 Err(_) => None, }
528 }
529
530 pub async fn is_connected(&self, identifier: &str) -> bool {
534 let device = {
535 let devices = self.devices.read().await;
536 devices.get(identifier).and_then(|m| m.device_arc())
537 };
538
539 if let Some(device) = device {
540 device.is_connected().await
541 } else {
542 false
543 }
544 }
545
546 pub async fn get_device_info(&self, identifier: &str) -> Option<DeviceInfo> {
548 let devices = self.devices.read().await;
549 devices.get(identifier).and_then(|m| m.info.clone())
550 }
551
552 pub async fn get_last_reading(&self, identifier: &str) -> Option<CurrentReading> {
554 let devices = self.devices.read().await;
555 devices.get(identifier).and_then(|m| m.last_reading)
556 }
557
558 pub fn start_health_monitor(
579 self: &Arc<Self>,
580 cancel_token: CancellationToken,
581 ) -> tokio::task::JoinHandle<()> {
582 let manager = Arc::clone(self);
583 let interval_duration = manager.config.health_check_interval;
584
585 tokio::spawn(async move {
586 let mut check_interval = interval(interval_duration);
587
588 loop {
589 tokio::select! {
590 _ = cancel_token.cancelled() => {
591 info!("Health monitor cancelled, shutting down");
592 break;
593 }
594 _ = check_interval.tick() => {
595 let devices_to_check: Vec<(String, Option<Arc<Device>>, bool, ReconnectOptions)> = {
597 let devices = manager.devices.read().await;
598 devices
599 .iter()
600 .map(|(id, m)| {
601 (
602 id.clone(),
603 m.device_arc(),
604 m.auto_reconnect,
605 m.reconnect_options.clone(),
606 )
607 })
608 .collect()
609 };
610
611 for (id, device_opt, auto_reconnect, _options) in devices_to_check {
612 let should_reconnect = match device_opt {
613 Some(device) => !device.is_connected().await,
614 None => true,
615 };
616
617 if should_reconnect && auto_reconnect {
618 debug!("Health monitor: attempting reconnect for {}", id);
619 if let Err(e) = manager.connect(&id).await {
620 warn!("Health monitor: reconnect failed for {}: {}", id, e);
621 }
622 }
623 }
624 }
625 }
626 }
627 })
628 }
629}
630
631impl Default for DeviceManager {
632 fn default() -> Self {
633 Self::new()
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640
641 #[tokio::test]
642 async fn test_manager_add_device() {
643 let manager = DeviceManager::new();
644 manager.add_device("test-device").await.unwrap();
645
646 assert_eq!(manager.device_count().await, 1);
647 assert!(
648 manager
649 .device_ids()
650 .await
651 .contains(&"test-device".to_string())
652 );
653 }
654
655 #[tokio::test]
656 async fn test_manager_remove_device() {
657 let manager = DeviceManager::new();
658 manager.add_device("test-device").await.unwrap();
659 manager.remove_device("test-device").await.unwrap();
660
661 assert_eq!(manager.device_count().await, 0);
662 }
663
664 #[tokio::test]
665 async fn test_manager_not_connected_by_default() {
666 let manager = DeviceManager::new();
667 manager.add_device("test-device").await.unwrap();
668
669 assert!(!manager.is_connected("test-device").await);
670 assert_eq!(manager.connected_count().await, 0);
671 }
672
673 #[tokio::test]
674 async fn test_manager_events() {
675 let manager = DeviceManager::new();
676 let _rx = manager.events().subscribe();
677
678 manager.add_device("test-device").await.unwrap();
679
680 assert_eq!(manager.events().receiver_count(), 1);
682 }
683}