aranet_core/
manager.rs

1//! Multi-device management.
2//!
3//! This module provides a manager for handling multiple Aranet devices
4//! simultaneously, with connection pooling and concurrent operations.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures::future::join_all;
11use tokio::sync::RwLock;
12use tokio::time::interval;
13use tokio_util::sync::CancellationToken;
14use tracing::{debug, info, warn};
15
16use aranet_types::{CurrentReading, DeviceInfo, DeviceType};
17
18use crate::device::Device;
19use crate::error::{Error, Result};
20use crate::events::{DeviceEvent, DeviceId, DisconnectReason, EventDispatcher};
21use crate::reconnect::ReconnectOptions;
22use crate::scan::{DiscoveredDevice, ScanOptions, scan_with_options};
23
24/// Information about a managed device.
25#[derive(Debug)]
26pub struct ManagedDevice {
27    /// Device identifier.
28    pub id: String,
29    /// Device name.
30    pub name: Option<String>,
31    /// Device type.
32    pub device_type: Option<DeviceType>,
33    /// The connected device (if connected).
34    /// Wrapped in Arc to allow concurrent access without holding the manager lock.
35    device: Option<Arc<Device>>,
36    /// Whether auto-reconnect is enabled.
37    pub auto_reconnect: bool,
38    /// Last known reading.
39    pub last_reading: Option<CurrentReading>,
40    /// Device info.
41    pub info: Option<DeviceInfo>,
42    /// Reconnection options (if auto-reconnect is enabled).
43    pub reconnect_options: ReconnectOptions,
44}
45
46impl ManagedDevice {
47    /// Create a new managed device entry.
48    pub fn new(id: &str) -> Self {
49        Self {
50            id: id.to_string(),
51            name: None,
52            device_type: None,
53            device: None,
54            auto_reconnect: true,
55            last_reading: None,
56            info: None,
57            reconnect_options: ReconnectOptions::default(),
58        }
59    }
60
61    /// Create a managed device with custom reconnect options.
62    pub fn with_reconnect_options(id: &str, options: ReconnectOptions) -> Self {
63        Self {
64            reconnect_options: options,
65            ..Self::new(id)
66        }
67    }
68
69    /// Check if the device is connected (sync check, doesn't query BLE).
70    pub fn has_device(&self) -> bool {
71        self.device.is_some()
72    }
73
74    /// Check if the device is connected (async, queries BLE).
75    pub async fn is_connected(&self) -> bool {
76        if let Some(device) = &self.device {
77            device.is_connected().await
78        } else {
79            false
80        }
81    }
82
83    /// Get a reference to the underlying device.
84    pub fn device(&self) -> Option<&Arc<Device>> {
85        self.device.as_ref()
86    }
87
88    /// Get a clone of the device Arc.
89    pub fn device_arc(&self) -> Option<Arc<Device>> {
90        self.device.clone()
91    }
92}
93
94/// Configuration for the device manager.
95#[derive(Debug, Clone)]
96pub struct ManagerConfig {
97    /// Default scan options.
98    pub scan_options: ScanOptions,
99    /// Default reconnect options for new devices.
100    pub default_reconnect_options: ReconnectOptions,
101    /// Event channel capacity.
102    pub event_capacity: usize,
103    /// Health check interval for auto-reconnect.
104    pub health_check_interval: Duration,
105}
106
107impl Default for ManagerConfig {
108    fn default() -> Self {
109        Self {
110            scan_options: ScanOptions::default(),
111            default_reconnect_options: ReconnectOptions::default(),
112            event_capacity: 100,
113            health_check_interval: Duration::from_secs(30),
114        }
115    }
116}
117
118/// Manager for multiple Aranet devices.
119pub struct DeviceManager {
120    /// Map of device ID to managed device.
121    devices: RwLock<HashMap<String, ManagedDevice>>,
122    /// Event dispatcher.
123    events: EventDispatcher,
124    /// Manager configuration.
125    config: ManagerConfig,
126}
127
128impl DeviceManager {
129    /// Create a new device manager.
130    pub fn new() -> Self {
131        Self::with_config(ManagerConfig::default())
132    }
133
134    /// Create a manager with custom event capacity.
135    pub fn with_event_capacity(capacity: usize) -> Self {
136        Self::with_config(ManagerConfig {
137            event_capacity: capacity,
138            ..Default::default()
139        })
140    }
141
142    /// Create a manager with full configuration.
143    pub fn with_config(config: ManagerConfig) -> Self {
144        Self {
145            devices: RwLock::new(HashMap::new()),
146            events: EventDispatcher::new(config.event_capacity),
147            config,
148        }
149    }
150
151    /// Get the event dispatcher for subscribing to events.
152    pub fn events(&self) -> &EventDispatcher {
153        &self.events
154    }
155
156    /// Get the manager configuration.
157    pub fn config(&self) -> &ManagerConfig {
158        &self.config
159    }
160
161    /// Scan for available devices.
162    pub async fn scan(&self) -> Result<Vec<DiscoveredDevice>> {
163        scan_with_options(self.config.scan_options.clone()).await
164    }
165
166    /// Scan with custom options.
167    pub async fn scan_with_options(&self, options: ScanOptions) -> Result<Vec<DiscoveredDevice>> {
168        let devices = scan_with_options(options).await?;
169
170        // Emit discovery events
171        for device in &devices {
172            self.events.send(DeviceEvent::Discovered {
173                device: DeviceId {
174                    id: device.identifier.clone(),
175                    name: device.name.clone(),
176                    device_type: device.device_type,
177                },
178                rssi: device.rssi,
179            });
180        }
181
182        Ok(devices)
183    }
184
185    /// Add a device to the manager by identifier.
186    pub async fn add_device(&self, identifier: &str) -> Result<()> {
187        self.add_device_with_options(identifier, self.config.default_reconnect_options.clone())
188            .await
189    }
190
191    /// Add a device with custom reconnect options.
192    pub async fn add_device_with_options(
193        &self,
194        identifier: &str,
195        reconnect_options: ReconnectOptions,
196    ) -> Result<()> {
197        let mut devices = self.devices.write().await;
198
199        if devices.contains_key(identifier) {
200            return Ok(()); // Already exists
201        }
202
203        let managed = ManagedDevice::with_reconnect_options(identifier, reconnect_options);
204        devices.insert(identifier.to_string(), managed);
205
206        info!("Added device to manager: {}", identifier);
207        Ok(())
208    }
209
210    /// Connect to a device.
211    ///
212    /// This method performs an atomic connect-or-skip operation:
213    /// - If the device doesn't exist, it's added and connected
214    /// - If the device exists but is not connected, it's connected
215    /// - If the device is already connected, this is a no-op
216    ///
217    /// The lock is held during the device entry update to prevent race conditions,
218    /// but released during the actual BLE connection to avoid blocking other operations.
219    pub async fn connect(&self, identifier: &str) -> Result<()> {
220        // Check if we need to connect (atomically check and mark as pending)
221        let reconnect_options = {
222            let mut devices = self.devices.write().await;
223
224            // Get or create the managed device entry
225            let managed = devices.entry(identifier.to_string()).or_insert_with(|| {
226                info!("Adding device to manager: {}", identifier);
227                ManagedDevice::with_reconnect_options(
228                    identifier,
229                    self.config.default_reconnect_options.clone(),
230                )
231            });
232
233            // If already connected, nothing to do
234            if managed.device.is_some() {
235                debug!("Device {} already has a connection handle", identifier);
236                return Ok(());
237            }
238
239            // Clone the reconnect options for use after releasing lock
240            managed.reconnect_options.clone()
241        };
242        // Lock is released here - other tasks can now access the device map
243
244        // Perform BLE connection (this may take time)
245        // Use the cloned reconnect_options if needed in the future
246        let _ = reconnect_options;
247        let device = Arc::new(Device::connect(identifier).await?);
248        let info = device.read_device_info().await.ok();
249        let device_type = device.device_type();
250        let name = device.name().map(|s| s.to_string());
251
252        // Update the managed device atomically
253        {
254            let mut devices = self.devices.write().await;
255            if let Some(managed) = devices.get_mut(identifier) {
256                // Check if another task connected while we were connecting
257                if managed.device.is_some() {
258                    // Another task beat us to it - disconnect our connection
259                    debug!(
260                        "Another task connected {} while we were connecting, discarding our connection",
261                        identifier
262                    );
263                    drop(devices); // Release lock before async disconnect
264                    let _ = device.disconnect().await;
265                    return Ok(());
266                }
267
268                managed.device = Some(device);
269                managed.info = info.clone();
270                managed.device_type = device_type;
271                managed.name = name.clone();
272            } else {
273                // Device was removed while we were connecting - still connect but add it back
274                let mut managed = ManagedDevice::new(identifier);
275                managed.device = Some(device);
276                managed.info = info.clone();
277                managed.device_type = device_type;
278                managed.name = name.clone();
279                devices.insert(identifier.to_string(), managed);
280            }
281        }
282
283        // Emit event
284        self.events.send(DeviceEvent::Connected {
285            device: DeviceId {
286                id: identifier.to_string(),
287                name,
288                device_type,
289            },
290            info,
291        });
292
293        info!("Connected to device: {}", identifier);
294        Ok(())
295    }
296
297    /// Disconnect from a device.
298    pub async fn disconnect(&self, identifier: &str) -> Result<()> {
299        let device_arc = {
300            let mut devices = self.devices.write().await;
301            if let Some(managed) = devices.get_mut(identifier) {
302                managed.device.take()
303            } else {
304                None
305            }
306        };
307
308        // Disconnect outside the lock
309        if let Some(device) = device_arc {
310            device.disconnect().await?;
311            self.events.send(DeviceEvent::Disconnected {
312                device: DeviceId::new(identifier),
313                reason: DisconnectReason::UserRequested,
314            });
315        }
316
317        Ok(())
318    }
319
320    /// Remove a device from the manager.
321    pub async fn remove_device(&self, identifier: &str) -> Result<()> {
322        self.disconnect(identifier).await?;
323        self.devices.write().await.remove(identifier);
324        info!("Removed device from manager: {}", identifier);
325        Ok(())
326    }
327
328    /// Get a list of all managed device IDs.
329    pub async fn device_ids(&self) -> Vec<String> {
330        self.devices.read().await.keys().cloned().collect()
331    }
332
333    /// Get the number of managed devices.
334    pub async fn device_count(&self) -> usize {
335        self.devices.read().await.len()
336    }
337
338    /// Get the number of connected devices (fast, doesn't query BLE).
339    ///
340    /// This returns the number of devices that have an active device handle,
341    /// without querying the BLE stack. Use `connected_count_verified` for
342    /// an accurate count that queries each device.
343    pub async fn connected_count(&self) -> usize {
344        let devices = self.devices.read().await;
345        devices.values().filter(|m| m.has_device()).count()
346    }
347
348    /// Get the number of connected devices (verified via BLE).
349    ///
350    /// This method queries each device to verify its connection status.
351    /// The lock is released before making BLE calls to avoid contention.
352    pub async fn connected_count_verified(&self) -> usize {
353        // Collect device handles while holding the lock briefly
354        let device_arcs: Vec<Arc<Device>> = {
355            let devices = self.devices.read().await;
356            devices.values().filter_map(|m| m.device_arc()).collect()
357        };
358        // Lock is released here
359
360        // Check connection status in parallel
361        let futures = device_arcs.iter().map(|d| d.is_connected());
362        let results = join_all(futures).await;
363
364        results.into_iter().filter(|&connected| connected).count()
365    }
366
367    /// Read current values from a specific device.
368    pub async fn read_current(&self, identifier: &str) -> Result<CurrentReading> {
369        // Get device Arc while holding the lock briefly
370        let device = {
371            let devices = self.devices.read().await;
372            let managed = devices
373                .get(identifier)
374                .ok_or_else(|| Error::device_not_found(identifier))?;
375            managed.device_arc().ok_or(Error::NotConnected)?
376        };
377        // Lock is released here
378
379        let reading = device.read_current().await?;
380
381        // Emit reading event
382        self.events.send(DeviceEvent::Reading {
383            device: DeviceId::new(identifier),
384            reading,
385        });
386
387        // Update cached reading
388        {
389            let mut devices = self.devices.write().await;
390            if let Some(managed) = devices.get_mut(identifier) {
391                managed.last_reading = Some(reading);
392            }
393        }
394
395        Ok(reading)
396    }
397
398    /// Read current values from all connected devices (in parallel).
399    ///
400    /// This method releases the lock before performing async BLE operations,
401    /// allowing other tasks to add/remove devices while reads are in progress.
402    /// All reads are performed in parallel for maximum performance.
403    pub async fn read_all(&self) -> HashMap<String, Result<CurrentReading>> {
404        // Collect device handles while holding the lock briefly
405        let devices_to_read: Vec<(String, Arc<Device>)> = {
406            let devices = self.devices.read().await;
407            devices
408                .iter()
409                .filter_map(|(id, managed)| managed.device_arc().map(|d| (id.clone(), d)))
410                .collect()
411        };
412        // Lock is released here
413
414        // Perform all reads in parallel
415        let read_futures = devices_to_read.iter().map(|(id, device)| {
416            let id = id.clone();
417            let device = Arc::clone(device);
418            async move {
419                let result = device.read_current().await;
420                (id, result)
421            }
422        });
423
424        let read_results: Vec<(String, Result<CurrentReading>)> = join_all(read_futures).await;
425
426        // Emit events and update cache
427        for (id, result) in &read_results {
428            if let Ok(reading) = result {
429                self.events.send(DeviceEvent::Reading {
430                    device: DeviceId::new(id),
431                    reading: *reading,
432                });
433            }
434        }
435
436        // Update cached readings
437        {
438            let mut devices = self.devices.write().await;
439            for (id, result) in &read_results {
440                if let Ok(reading) = result
441                    && let Some(managed) = devices.get_mut(id)
442                {
443                    managed.last_reading = Some(*reading);
444                }
445            }
446        }
447
448        read_results.into_iter().collect()
449    }
450
451    /// Connect to all known devices (in parallel).
452    ///
453    /// Returns a map of device IDs to connection results.
454    pub async fn connect_all(&self) -> HashMap<String, Result<()>> {
455        let ids: Vec<_> = self.devices.read().await.keys().cloned().collect();
456
457        // Note: We can't fully parallelize connect because it modifies state,
458        // but we can at least attempt connections concurrently
459        let connect_futures = ids.iter().map(|id| {
460            let id = id.clone();
461            async move {
462                let result = self.connect(&id).await;
463                (id, result)
464            }
465        });
466
467        join_all(connect_futures).await.into_iter().collect()
468    }
469
470    /// Disconnect from all devices (in parallel).
471    ///
472    /// Returns a map of device IDs to disconnection results.
473    pub async fn disconnect_all(&self) -> HashMap<String, Result<()>> {
474        // Collect all device arcs first
475        let devices_to_disconnect: Vec<(String, Arc<Device>)> = {
476            let mut devices = self.devices.write().await;
477            devices
478                .iter_mut()
479                .filter_map(|(id, managed)| managed.device.take().map(|d| (id.clone(), d)))
480                .collect()
481        };
482
483        // Disconnect all in parallel
484        let disconnect_futures = devices_to_disconnect.iter().map(|(id, device)| {
485            let id = id.clone();
486            let device = Arc::clone(device);
487            async move {
488                let result = device.disconnect().await;
489                (id, result)
490            }
491        });
492
493        let results: Vec<(String, Result<()>)> = join_all(disconnect_futures).await;
494
495        // Emit disconnection events
496        for (id, result) in &results {
497            if result.is_ok() {
498                self.events.send(DeviceEvent::Disconnected {
499                    device: DeviceId::new(id),
500                    reason: DisconnectReason::UserRequested,
501                });
502            }
503        }
504
505        results.into_iter().collect()
506    }
507
508    /// Check if a specific device is connected (fast, doesn't query BLE).
509    ///
510    /// This method attempts to check if a device has an active connection handle
511    /// without blocking. Returns `None` if the lock couldn't be acquired immediately,
512    /// or `Some(bool)` indicating whether the device has a connection handle.
513    ///
514    /// Note: This only checks if we have a device handle, not whether the actual
515    /// BLE connection is still alive. Use [`is_connected`](Self::is_connected) for
516    /// a verified check.
517    pub fn try_is_connected(&self, identifier: &str) -> Option<bool> {
518        // Try to acquire the lock without blocking
519        match self.devices.try_read() {
520            Ok(devices) => Some(
521                devices
522                    .get(identifier)
523                    .map(|m| m.has_device())
524                    .unwrap_or(false),
525            ),
526            Err(_) => None, // Lock was held, couldn't check
527        }
528    }
529
530    /// Check if a specific device is connected (verified via BLE).
531    ///
532    /// The lock is released before making the BLE call.
533    pub async fn is_connected(&self, identifier: &str) -> bool {
534        let device = {
535            let devices = self.devices.read().await;
536            devices.get(identifier).and_then(|m| m.device_arc())
537        };
538
539        if let Some(device) = device {
540            device.is_connected().await
541        } else {
542            false
543        }
544    }
545
546    /// Get device info for a specific device.
547    pub async fn get_device_info(&self, identifier: &str) -> Option<DeviceInfo> {
548        let devices = self.devices.read().await;
549        devices.get(identifier).and_then(|m| m.info.clone())
550    }
551
552    /// Get the last cached reading for a device.
553    pub async fn get_last_reading(&self, identifier: &str) -> Option<CurrentReading> {
554        let devices = self.devices.read().await;
555        devices.get(identifier).and_then(|m| m.last_reading)
556    }
557
558    /// Start a background health check task that monitors connection status.
559    ///
560    /// This spawns a task that periodically checks device connections and
561    /// attempts to reconnect devices that have auto_reconnect enabled.
562    ///
563    /// The task will run until the provided cancellation token is cancelled.
564    ///
565    /// # Example
566    ///
567    /// ```ignore
568    /// use tokio_util::sync::CancellationToken;
569    ///
570    /// let manager = Arc::new(DeviceManager::new());
571    /// let cancel = CancellationToken::new();
572    /// let handle = manager.start_health_monitor(cancel.clone());
573    ///
574    /// // Later, to stop the health monitor:
575    /// cancel.cancel();
576    /// handle.await.unwrap();
577    /// ```
578    pub fn start_health_monitor(
579        self: &Arc<Self>,
580        cancel_token: CancellationToken,
581    ) -> tokio::task::JoinHandle<()> {
582        let manager = Arc::clone(self);
583        let interval_duration = manager.config.health_check_interval;
584
585        tokio::spawn(async move {
586            let mut check_interval = interval(interval_duration);
587
588            loop {
589                tokio::select! {
590                    _ = cancel_token.cancelled() => {
591                        info!("Health monitor cancelled, shutting down");
592                        break;
593                    }
594                    _ = check_interval.tick() => {
595                        // Get devices that need checking
596                        let devices_to_check: Vec<(String, Option<Arc<Device>>, bool, ReconnectOptions)> = {
597                            let devices = manager.devices.read().await;
598                            devices
599                                .iter()
600                                .map(|(id, m)| {
601                                    (
602                                        id.clone(),
603                                        m.device_arc(),
604                                        m.auto_reconnect,
605                                        m.reconnect_options.clone(),
606                                    )
607                                })
608                                .collect()
609                        };
610
611                        for (id, device_opt, auto_reconnect, _options) in devices_to_check {
612                            let should_reconnect = match device_opt {
613                                Some(device) => !device.is_connected().await,
614                                None => true,
615                            };
616
617                            if should_reconnect && auto_reconnect {
618                                debug!("Health monitor: attempting reconnect for {}", id);
619                                if let Err(e) = manager.connect(&id).await {
620                                    warn!("Health monitor: reconnect failed for {}: {}", id, e);
621                                }
622                            }
623                        }
624                    }
625                }
626            }
627        })
628    }
629}
630
631impl Default for DeviceManager {
632    fn default() -> Self {
633        Self::new()
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640
641    #[tokio::test]
642    async fn test_manager_add_device() {
643        let manager = DeviceManager::new();
644        manager.add_device("test-device").await.unwrap();
645
646        assert_eq!(manager.device_count().await, 1);
647        assert!(
648            manager
649                .device_ids()
650                .await
651                .contains(&"test-device".to_string())
652        );
653    }
654
655    #[tokio::test]
656    async fn test_manager_remove_device() {
657        let manager = DeviceManager::new();
658        manager.add_device("test-device").await.unwrap();
659        manager.remove_device("test-device").await.unwrap();
660
661        assert_eq!(manager.device_count().await, 0);
662    }
663
664    #[tokio::test]
665    async fn test_manager_not_connected_by_default() {
666        let manager = DeviceManager::new();
667        manager.add_device("test-device").await.unwrap();
668
669        assert!(!manager.is_connected("test-device").await);
670        assert_eq!(manager.connected_count().await, 0);
671    }
672
673    #[tokio::test]
674    async fn test_manager_events() {
675        let manager = DeviceManager::new();
676        let _rx = manager.events().subscribe();
677
678        manager.add_device("test-device").await.unwrap();
679
680        // Events are only emitted for actual device operations
681        assert_eq!(manager.events().receiver_count(), 1);
682    }
683}