aranet_core/
streaming.rs

1//! Real-time streaming of sensor readings via BLE notifications.
2//!
3//! This module provides functionality to subscribe to sensor readings
4//! and receive them as an async stream.
5//!
6//! The stream supports graceful shutdown via the [`ReadingStream::close`] method,
7//! which uses a cancellation token to cleanly stop the background polling task.
8
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13
14use futures::stream::Stream;
15use tokio::sync::mpsc;
16use tokio::time::interval;
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, warn};
19
20use aranet_types::CurrentReading;
21
22use crate::device::Device;
23use crate::error::Error;
24
25/// Options for reading streams.
26///
27/// Use the builder pattern for convenient configuration:
28///
29/// ```ignore
30/// let options = StreamOptions::builder()
31///     .poll_interval(Duration::from_secs(5))
32///     .include_errors(true)
33///     .max_consecutive_failures(5)
34///     .build();
35/// ```
36#[derive(Debug, Clone)]
37pub struct StreamOptions {
38    /// Polling interval for devices that don't support notifications.
39    /// Default: 1 second.
40    pub poll_interval: Duration,
41    /// Buffer size for the reading channel.
42    /// Default: 16 readings.
43    pub buffer_size: usize,
44    /// Whether to include failed reads in the stream.
45    ///
46    /// When `false` (default), read errors are logged but not sent to the stream.
47    /// When `true`, errors are sent as `Err(Error)` items, allowing the consumer
48    /// to detect and handle connection issues.
49    ///
50    /// **Recommendation:** Set to `true` for applications that need to detect
51    /// disconnections or errors in real-time.
52    pub include_errors: bool,
53    /// Maximum consecutive failures before auto-closing the stream.
54    ///
55    /// When set to `Some(n)`, the stream will automatically close after `n`
56    /// consecutive read failures, indicating a likely disconnection.
57    /// When `None` (default), the stream will continue indefinitely regardless
58    /// of failures.
59    ///
60    /// **Recommendation:** Set to `Some(5)` or similar for production use to
61    /// prevent indefinite polling of a disconnected device.
62    pub max_consecutive_failures: Option<u32>,
63}
64
65impl Default for StreamOptions {
66    fn default() -> Self {
67        Self {
68            poll_interval: Duration::from_secs(1),
69            buffer_size: 16,
70            include_errors: false,
71            max_consecutive_failures: None,
72        }
73    }
74}
75
76impl StreamOptions {
77    /// Create a new builder for StreamOptions.
78    pub fn builder() -> StreamOptionsBuilder {
79        StreamOptionsBuilder::default()
80    }
81
82    /// Create options with a specific poll interval.
83    pub fn with_interval(interval: Duration) -> Self {
84        Self {
85            poll_interval: interval,
86            ..Default::default()
87        }
88    }
89
90    /// Validate the options and return an error if invalid.
91    ///
92    /// Checks that:
93    /// - `buffer_size` is > 0
94    /// - `poll_interval` is > 0
95    pub fn validate(&self) -> crate::error::Result<()> {
96        if self.buffer_size == 0 {
97            return Err(crate::error::Error::InvalidConfig(
98                "buffer_size must be > 0".to_string(),
99            ));
100        }
101        if self.poll_interval.is_zero() {
102            return Err(crate::error::Error::InvalidConfig(
103                "poll_interval must be > 0".to_string(),
104            ));
105        }
106        Ok(())
107    }
108}
109
110/// Builder for StreamOptions.
111#[derive(Debug, Clone, Default)]
112pub struct StreamOptionsBuilder {
113    options: StreamOptions,
114}
115
116impl StreamOptionsBuilder {
117    /// Set the polling interval.
118    #[must_use]
119    pub fn poll_interval(mut self, interval: Duration) -> Self {
120        self.options.poll_interval = interval;
121        self
122    }
123
124    /// Set the buffer size.
125    #[must_use]
126    pub fn buffer_size(mut self, size: usize) -> Self {
127        self.options.buffer_size = size;
128        self
129    }
130
131    /// Set whether to include errors in the stream.
132    ///
133    /// When `true`, read errors are sent as `Err(Error)` items to the stream,
134    /// allowing consumers to detect disconnections and other issues.
135    #[must_use]
136    pub fn include_errors(mut self, include: bool) -> Self {
137        self.options.include_errors = include;
138        self
139    }
140
141    /// Set the maximum consecutive failures before auto-closing.
142    ///
143    /// When set, the stream will automatically close after this many
144    /// consecutive read failures, indicating a likely disconnection.
145    #[must_use]
146    pub fn max_consecutive_failures(mut self, max: u32) -> Self {
147        self.options.max_consecutive_failures = Some(max);
148        self
149    }
150
151    /// Build the StreamOptions.
152    #[must_use]
153    pub fn build(self) -> StreamOptions {
154        self.options
155    }
156}
157
158/// A stream of sensor readings from a device.
159///
160/// The stream polls the device at a configured interval and sends readings
161/// through a channel. It supports graceful shutdown via [`close`](Self::close).
162pub struct ReadingStream {
163    receiver: mpsc::Receiver<ReadingResult>,
164    handle: tokio::task::JoinHandle<()>,
165    cancel_token: CancellationToken,
166}
167
168/// Result type for stream items.
169pub type ReadingResult = std::result::Result<CurrentReading, Error>;
170
171impl ReadingStream {
172    /// Create a new reading stream from a connected device (takes Arc).
173    ///
174    /// This spawns a background task that polls the device at the configured
175    /// interval and sends readings to the stream.
176    ///
177    /// If `max_consecutive_failures` is set, the stream will automatically
178    /// close after that many consecutive read failures.
179    pub fn new(device: Arc<Device>, options: StreamOptions) -> Self {
180        let (tx, rx) = mpsc::channel(options.buffer_size);
181        let cancel_token = CancellationToken::new();
182        let task_token = cancel_token.clone();
183        let max_failures = options.max_consecutive_failures;
184
185        let handle = tokio::spawn(async move {
186            let mut interval = interval(options.poll_interval);
187            let mut consecutive_failures: u32 = 0;
188
189            loop {
190                tokio::select! {
191                    _ = task_token.cancelled() => {
192                        debug!("Stream cancelled, stopping gracefully");
193                        break;
194                    }
195                    _ = interval.tick() => {
196                        match device.read_current().await {
197                            Ok(reading) => {
198                                // Reset failure counter on success
199                                consecutive_failures = 0;
200                                if tx.send(Ok(reading)).await.is_err() {
201                                    debug!("Stream receiver dropped, stopping");
202                                    break;
203                                }
204                            }
205                            Err(e) => {
206                                consecutive_failures += 1;
207                                warn!(
208                                    "Error reading from device (failure {}/{}): {}",
209                                    consecutive_failures,
210                                    max_failures.map_or("∞".to_string(), |n| n.to_string()),
211                                    e
212                                );
213
214                                // Check if we've exceeded max consecutive failures
215                                if let Some(max) = max_failures
216                                    && consecutive_failures >= max {
217                                        warn!(
218                                            "Max consecutive failures ({}) reached, auto-closing stream",
219                                            max
220                                        );
221                                        // Send final error if configured to include errors
222                                        if options.include_errors {
223                                            let _ = tx.send(Err(e)).await;
224                                        }
225                                        break;
226                                    }
227
228                                if options.include_errors && tx.send(Err(e)).await.is_err() {
229                                    debug!("Stream receiver dropped, stopping");
230                                    break;
231                                }
232                            }
233                        }
234                    }
235                }
236            }
237        });
238
239        Self {
240            receiver: rx,
241            handle,
242            cancel_token,
243        }
244    }
245
246    /// Close the stream and stop the background polling task gracefully.
247    ///
248    /// This signals the background task to stop via a cancellation token,
249    /// allowing it to complete any in-progress operations before exiting.
250    /// This is preferred over aborting the task, which may leave resources
251    /// in an inconsistent state.
252    pub fn close(self) {
253        self.cancel_token.cancel();
254        // The handle will complete on its own; we don't need to await it
255    }
256
257    /// Get a cancellation token that can be used to cancel the stream externally.
258    ///
259    /// This allows multiple places to trigger cancellation of the stream.
260    pub fn cancellation_token(&self) -> CancellationToken {
261        self.cancel_token.clone()
262    }
263
264    /// Check if the stream is still active (background task running).
265    pub fn is_active(&self) -> bool {
266        !self.handle.is_finished()
267    }
268
269    /// Check if the stream has been cancelled.
270    pub fn is_cancelled(&self) -> bool {
271        self.cancel_token.is_cancelled()
272    }
273
274    /// Check if the stream stopped unexpectedly.
275    ///
276    /// Returns `true` if the background task has finished but was not explicitly
277    /// cancelled via [`close()`](Self::close) or by dropping the stream.
278    ///
279    /// This can indicate:
280    /// - A panic in the background task
281    /// - The stream auto-closed due to reaching `max_consecutive_failures`
282    /// - The receiver was dropped unexpectedly
283    ///
284    /// This can be useful for detecting and handling unexpected stream termination:
285    ///
286    /// ```ignore
287    /// if stream.has_unexpectedly_stopped() {
288    ///     // Log the event and potentially restart the stream
289    ///     log::warn!("Stream stopped unexpectedly - may need restart");
290    /// }
291    /// ```
292    ///
293    /// Note: To distinguish between auto-close due to failures vs actual panics,
294    /// you may need additional monitoring of the stream's error output.
295    pub fn has_unexpectedly_stopped(&self) -> bool {
296        self.handle.is_finished() && !self.cancel_token.is_cancelled()
297    }
298
299    /// Check if the background task has panicked.
300    ///
301    /// **Deprecated:** Use [`has_unexpectedly_stopped()`](Self::has_unexpectedly_stopped) instead,
302    /// which has clearer semantics. This method may return `true` even when the stream
303    /// stopped due to `max_consecutive_failures` being reached, not just panics.
304    #[deprecated(
305        since = "0.2.0",
306        note = "Use has_unexpectedly_stopped() instead for clearer semantics"
307    )]
308    pub fn has_panicked(&self) -> bool {
309        self.has_unexpectedly_stopped()
310    }
311}
312
313impl Drop for ReadingStream {
314    fn drop(&mut self) {
315        // Ensure the background task is cancelled when the stream is dropped.
316        // This prevents resource leaks if the stream is dropped without calling close().
317        self.cancel_token.cancel();
318    }
319}
320
321impl Stream for ReadingStream {
322    type Item = ReadingResult;
323
324    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
325        Pin::new(&mut self.receiver).poll_recv(cx)
326    }
327}
328
329/// Extension trait for Device to create reading streams.
330///
331/// **Note:** This trait requires `Arc<Self>` because the stream's background task
332/// needs to hold a reference to the device that outlives the method call.
333///
334/// # Example
335///
336/// ```ignore
337/// use std::sync::Arc;
338/// use aranet_core::{Device, DeviceStreamExt};
339/// use futures::StreamExt;
340///
341/// // Wrap device in Arc for streaming
342/// let device = Arc::new(Device::connect("Aranet4 12345").await?);
343///
344/// // Create a stream and consume readings
345/// let mut stream = device.stream();
346/// while let Some(result) = stream.next().await {
347///     match result {
348///         Ok(reading) => println!("CO2: {} ppm", reading.co2),
349///         Err(e) => eprintln!("Error: {}", e),
350///     }
351/// }
352/// ```
353pub trait DeviceStreamExt {
354    /// Create a reading stream with default options.
355    ///
356    /// Polls the device every second and buffers up to 16 readings.
357    fn stream(self: Arc<Self>) -> ReadingStream;
358
359    /// Create a reading stream with custom options.
360    fn stream_with_options(self: Arc<Self>, options: StreamOptions) -> ReadingStream;
361}
362
363impl DeviceStreamExt for Device {
364    fn stream(self: Arc<Self>) -> ReadingStream {
365        ReadingStream::new(self, StreamOptions::default())
366    }
367
368    fn stream_with_options(self: Arc<Self>, options: StreamOptions) -> ReadingStream {
369        ReadingStream::new(self, options)
370    }
371}
372
373/// Create a stream from a device without needing the trait import.
374///
375/// This is a convenience function for creating a polling stream.
376///
377/// # Example
378///
379/// ```ignore
380/// use std::sync::Arc;
381/// use std::time::Duration;
382/// use aranet_core::{Device, streaming};
383///
384/// let device = Arc::new(Device::connect("Aranet4 12345").await?);
385/// let stream = streaming::from_device(device, Duration::from_secs(5));
386/// ```
387pub fn from_device(device: Arc<Device>, poll_interval: Duration) -> ReadingStream {
388    ReadingStream::new(device, StreamOptions::with_interval(poll_interval))
389}
390
391/// Create a stream with default options from a device.
392///
393/// Convenience function that wraps `from_device` with a 1-second interval.
394pub fn from_device_default(device: Arc<Device>) -> ReadingStream {
395    ReadingStream::new(device, StreamOptions::default())
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401
402    #[test]
403    fn test_stream_options_default() {
404        let opts = StreamOptions::default();
405        assert_eq!(opts.poll_interval, Duration::from_secs(1));
406        assert_eq!(opts.buffer_size, 16);
407        assert!(!opts.include_errors);
408    }
409
410    #[test]
411    fn test_stream_options_with_interval() {
412        let opts = StreamOptions::with_interval(Duration::from_millis(500));
413        assert_eq!(opts.poll_interval, Duration::from_millis(500));
414    }
415
416    #[test]
417    fn test_stream_options_builder() {
418        let opts = StreamOptions::builder()
419            .poll_interval(Duration::from_secs(5))
420            .buffer_size(32)
421            .include_errors(true)
422            .build();
423
424        assert_eq!(opts.poll_interval, Duration::from_secs(5));
425        assert_eq!(opts.buffer_size, 32);
426        assert!(opts.include_errors);
427    }
428
429    #[test]
430    fn test_stream_options_builder_partial() {
431        // Only set some options, others should be defaults
432        let opts = StreamOptions::builder().include_errors(true).build();
433
434        assert_eq!(opts.poll_interval, Duration::from_secs(1)); // default
435        assert_eq!(opts.buffer_size, 16); // default
436        assert!(opts.include_errors); // set
437    }
438}