aranet_core/streaming.rs
1//! Real-time streaming of sensor readings via BLE notifications.
2//!
3//! This module provides functionality to subscribe to sensor readings
4//! and receive them as an async stream.
5//!
6//! The stream supports graceful shutdown via the [`ReadingStream::close`] method,
7//! which uses a cancellation token to cleanly stop the background polling task.
8
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12use std::time::Duration;
13
14use futures::stream::Stream;
15use tokio::sync::mpsc;
16use tokio::time::interval;
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, warn};
19
20use aranet_types::CurrentReading;
21
22use crate::device::Device;
23use crate::error::Error;
24
25/// Options for reading streams.
26///
27/// Use the builder pattern for convenient configuration:
28///
29/// ```ignore
30/// let options = StreamOptions::builder()
31/// .poll_interval(Duration::from_secs(5))
32/// .include_errors(true)
33/// .max_consecutive_failures(5)
34/// .build();
35/// ```
36#[derive(Debug, Clone)]
37pub struct StreamOptions {
38 /// Polling interval for devices that don't support notifications.
39 /// Default: 1 second.
40 pub poll_interval: Duration,
41 /// Buffer size for the reading channel.
42 /// Default: 16 readings.
43 pub buffer_size: usize,
44 /// Whether to include failed reads in the stream.
45 ///
46 /// When `false` (default), read errors are logged but not sent to the stream.
47 /// When `true`, errors are sent as `Err(Error)` items, allowing the consumer
48 /// to detect and handle connection issues.
49 ///
50 /// **Recommendation:** Set to `true` for applications that need to detect
51 /// disconnections or errors in real-time.
52 pub include_errors: bool,
53 /// Maximum consecutive failures before auto-closing the stream.
54 ///
55 /// When set to `Some(n)`, the stream will automatically close after `n`
56 /// consecutive read failures, indicating a likely disconnection.
57 /// When `None` (default), the stream will continue indefinitely regardless
58 /// of failures.
59 ///
60 /// **Recommendation:** Set to `Some(5)` or similar for production use to
61 /// prevent indefinite polling of a disconnected device.
62 pub max_consecutive_failures: Option<u32>,
63}
64
65impl Default for StreamOptions {
66 fn default() -> Self {
67 Self {
68 poll_interval: Duration::from_secs(1),
69 buffer_size: 16,
70 include_errors: false,
71 max_consecutive_failures: None,
72 }
73 }
74}
75
76impl StreamOptions {
77 /// Create a new builder for StreamOptions.
78 pub fn builder() -> StreamOptionsBuilder {
79 StreamOptionsBuilder::default()
80 }
81
82 /// Create options with a specific poll interval.
83 pub fn with_interval(interval: Duration) -> Self {
84 Self {
85 poll_interval: interval,
86 ..Default::default()
87 }
88 }
89
90 /// Validate the options and return an error if invalid.
91 ///
92 /// Checks that:
93 /// - `buffer_size` is > 0
94 /// - `poll_interval` is > 0
95 pub fn validate(&self) -> crate::error::Result<()> {
96 if self.buffer_size == 0 {
97 return Err(crate::error::Error::InvalidConfig(
98 "buffer_size must be > 0".to_string(),
99 ));
100 }
101 if self.poll_interval.is_zero() {
102 return Err(crate::error::Error::InvalidConfig(
103 "poll_interval must be > 0".to_string(),
104 ));
105 }
106 Ok(())
107 }
108}
109
110/// Builder for StreamOptions.
111#[derive(Debug, Clone, Default)]
112pub struct StreamOptionsBuilder {
113 options: StreamOptions,
114}
115
116impl StreamOptionsBuilder {
117 /// Set the polling interval.
118 #[must_use]
119 pub fn poll_interval(mut self, interval: Duration) -> Self {
120 self.options.poll_interval = interval;
121 self
122 }
123
124 /// Set the buffer size.
125 #[must_use]
126 pub fn buffer_size(mut self, size: usize) -> Self {
127 self.options.buffer_size = size;
128 self
129 }
130
131 /// Set whether to include errors in the stream.
132 ///
133 /// When `true`, read errors are sent as `Err(Error)` items to the stream,
134 /// allowing consumers to detect disconnections and other issues.
135 #[must_use]
136 pub fn include_errors(mut self, include: bool) -> Self {
137 self.options.include_errors = include;
138 self
139 }
140
141 /// Set the maximum consecutive failures before auto-closing.
142 ///
143 /// When set, the stream will automatically close after this many
144 /// consecutive read failures, indicating a likely disconnection.
145 #[must_use]
146 pub fn max_consecutive_failures(mut self, max: u32) -> Self {
147 self.options.max_consecutive_failures = Some(max);
148 self
149 }
150
151 /// Build the StreamOptions.
152 #[must_use]
153 pub fn build(self) -> StreamOptions {
154 self.options
155 }
156}
157
158/// A stream of sensor readings from a device.
159///
160/// The stream polls the device at a configured interval and sends readings
161/// through a channel. It supports graceful shutdown via [`close`](Self::close).
162pub struct ReadingStream {
163 receiver: mpsc::Receiver<ReadingResult>,
164 handle: tokio::task::JoinHandle<()>,
165 cancel_token: CancellationToken,
166}
167
168/// Result type for stream items.
169pub type ReadingResult = std::result::Result<CurrentReading, Error>;
170
171impl ReadingStream {
172 /// Create a new reading stream from a connected device (takes Arc).
173 ///
174 /// This spawns a background task that polls the device at the configured
175 /// interval and sends readings to the stream.
176 ///
177 /// If `max_consecutive_failures` is set, the stream will automatically
178 /// close after that many consecutive read failures.
179 pub fn new(device: Arc<Device>, options: StreamOptions) -> Self {
180 let (tx, rx) = mpsc::channel(options.buffer_size);
181 let cancel_token = CancellationToken::new();
182 let task_token = cancel_token.clone();
183 let max_failures = options.max_consecutive_failures;
184
185 let handle = tokio::spawn(async move {
186 let mut interval = interval(options.poll_interval);
187 let mut consecutive_failures: u32 = 0;
188
189 loop {
190 tokio::select! {
191 _ = task_token.cancelled() => {
192 debug!("Stream cancelled, stopping gracefully");
193 break;
194 }
195 _ = interval.tick() => {
196 match device.read_current().await {
197 Ok(reading) => {
198 // Reset failure counter on success
199 consecutive_failures = 0;
200 if tx.send(Ok(reading)).await.is_err() {
201 debug!("Stream receiver dropped, stopping");
202 break;
203 }
204 }
205 Err(e) => {
206 consecutive_failures += 1;
207 warn!(
208 "Error reading from device (failure {}/{}): {}",
209 consecutive_failures,
210 max_failures.map_or("∞".to_string(), |n| n.to_string()),
211 e
212 );
213
214 // Check if we've exceeded max consecutive failures
215 if let Some(max) = max_failures
216 && consecutive_failures >= max {
217 warn!(
218 "Max consecutive failures ({}) reached, auto-closing stream",
219 max
220 );
221 // Send final error if configured to include errors
222 if options.include_errors {
223 let _ = tx.send(Err(e)).await;
224 }
225 break;
226 }
227
228 if options.include_errors && tx.send(Err(e)).await.is_err() {
229 debug!("Stream receiver dropped, stopping");
230 break;
231 }
232 }
233 }
234 }
235 }
236 }
237 });
238
239 Self {
240 receiver: rx,
241 handle,
242 cancel_token,
243 }
244 }
245
246 /// Close the stream and stop the background polling task gracefully.
247 ///
248 /// This signals the background task to stop via a cancellation token,
249 /// allowing it to complete any in-progress operations before exiting.
250 /// This is preferred over aborting the task, which may leave resources
251 /// in an inconsistent state.
252 pub fn close(self) {
253 self.cancel_token.cancel();
254 // The handle will complete on its own; we don't need to await it
255 }
256
257 /// Get a cancellation token that can be used to cancel the stream externally.
258 ///
259 /// This allows multiple places to trigger cancellation of the stream.
260 pub fn cancellation_token(&self) -> CancellationToken {
261 self.cancel_token.clone()
262 }
263
264 /// Check if the stream is still active (background task running).
265 pub fn is_active(&self) -> bool {
266 !self.handle.is_finished()
267 }
268
269 /// Check if the stream has been cancelled.
270 pub fn is_cancelled(&self) -> bool {
271 self.cancel_token.is_cancelled()
272 }
273
274 /// Check if the stream stopped unexpectedly.
275 ///
276 /// Returns `true` if the background task has finished but was not explicitly
277 /// cancelled via [`close()`](Self::close) or by dropping the stream.
278 ///
279 /// This can indicate:
280 /// - A panic in the background task
281 /// - The stream auto-closed due to reaching `max_consecutive_failures`
282 /// - The receiver was dropped unexpectedly
283 ///
284 /// This can be useful for detecting and handling unexpected stream termination:
285 ///
286 /// ```ignore
287 /// if stream.has_unexpectedly_stopped() {
288 /// // Log the event and potentially restart the stream
289 /// log::warn!("Stream stopped unexpectedly - may need restart");
290 /// }
291 /// ```
292 ///
293 /// Note: To distinguish between auto-close due to failures vs actual panics,
294 /// you may need additional monitoring of the stream's error output.
295 pub fn has_unexpectedly_stopped(&self) -> bool {
296 self.handle.is_finished() && !self.cancel_token.is_cancelled()
297 }
298
299 /// Check if the background task has panicked.
300 ///
301 /// **Deprecated:** Use [`has_unexpectedly_stopped()`](Self::has_unexpectedly_stopped) instead,
302 /// which has clearer semantics. This method may return `true` even when the stream
303 /// stopped due to `max_consecutive_failures` being reached, not just panics.
304 #[deprecated(
305 since = "0.2.0",
306 note = "Use has_unexpectedly_stopped() instead for clearer semantics"
307 )]
308 pub fn has_panicked(&self) -> bool {
309 self.has_unexpectedly_stopped()
310 }
311}
312
313impl Drop for ReadingStream {
314 fn drop(&mut self) {
315 // Ensure the background task is cancelled when the stream is dropped.
316 // This prevents resource leaks if the stream is dropped without calling close().
317 self.cancel_token.cancel();
318 }
319}
320
321impl Stream for ReadingStream {
322 type Item = ReadingResult;
323
324 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
325 Pin::new(&mut self.receiver).poll_recv(cx)
326 }
327}
328
329/// Extension trait for Device to create reading streams.
330///
331/// **Note:** This trait requires `Arc<Self>` because the stream's background task
332/// needs to hold a reference to the device that outlives the method call.
333///
334/// # Example
335///
336/// ```ignore
337/// use std::sync::Arc;
338/// use aranet_core::{Device, DeviceStreamExt};
339/// use futures::StreamExt;
340///
341/// // Wrap device in Arc for streaming
342/// let device = Arc::new(Device::connect("Aranet4 12345").await?);
343///
344/// // Create a stream and consume readings
345/// let mut stream = device.stream();
346/// while let Some(result) = stream.next().await {
347/// match result {
348/// Ok(reading) => println!("CO2: {} ppm", reading.co2),
349/// Err(e) => eprintln!("Error: {}", e),
350/// }
351/// }
352/// ```
353pub trait DeviceStreamExt {
354 /// Create a reading stream with default options.
355 ///
356 /// Polls the device every second and buffers up to 16 readings.
357 fn stream(self: Arc<Self>) -> ReadingStream;
358
359 /// Create a reading stream with custom options.
360 fn stream_with_options(self: Arc<Self>, options: StreamOptions) -> ReadingStream;
361}
362
363impl DeviceStreamExt for Device {
364 fn stream(self: Arc<Self>) -> ReadingStream {
365 ReadingStream::new(self, StreamOptions::default())
366 }
367
368 fn stream_with_options(self: Arc<Self>, options: StreamOptions) -> ReadingStream {
369 ReadingStream::new(self, options)
370 }
371}
372
373/// Create a stream from a device without needing the trait import.
374///
375/// This is a convenience function for creating a polling stream.
376///
377/// # Example
378///
379/// ```ignore
380/// use std::sync::Arc;
381/// use std::time::Duration;
382/// use aranet_core::{Device, streaming};
383///
384/// let device = Arc::new(Device::connect("Aranet4 12345").await?);
385/// let stream = streaming::from_device(device, Duration::from_secs(5));
386/// ```
387pub fn from_device(device: Arc<Device>, poll_interval: Duration) -> ReadingStream {
388 ReadingStream::new(device, StreamOptions::with_interval(poll_interval))
389}
390
391/// Create a stream with default options from a device.
392///
393/// Convenience function that wraps `from_device` with a 1-second interval.
394pub fn from_device_default(device: Arc<Device>) -> ReadingStream {
395 ReadingStream::new(device, StreamOptions::default())
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401
402 #[test]
403 fn test_stream_options_default() {
404 let opts = StreamOptions::default();
405 assert_eq!(opts.poll_interval, Duration::from_secs(1));
406 assert_eq!(opts.buffer_size, 16);
407 assert!(!opts.include_errors);
408 }
409
410 #[test]
411 fn test_stream_options_with_interval() {
412 let opts = StreamOptions::with_interval(Duration::from_millis(500));
413 assert_eq!(opts.poll_interval, Duration::from_millis(500));
414 }
415
416 #[test]
417 fn test_stream_options_builder() {
418 let opts = StreamOptions::builder()
419 .poll_interval(Duration::from_secs(5))
420 .buffer_size(32)
421 .include_errors(true)
422 .build();
423
424 assert_eq!(opts.poll_interval, Duration::from_secs(5));
425 assert_eq!(opts.buffer_size, 32);
426 assert!(opts.include_errors);
427 }
428
429 #[test]
430 fn test_stream_options_builder_partial() {
431 // Only set some options, others should be defaults
432 let opts = StreamOptions::builder().include_errors(true).build();
433
434 assert_eq!(opts.poll_interval, Duration::from_secs(1)); // default
435 assert_eq!(opts.buffer_size, 16); // default
436 assert!(opts.include_errors); // set
437 }
438}