1use std::sync::atomic::{AtomicBool, Ordering};
7use std::time::Duration;
8
9use async_trait::async_trait;
10use btleplug::api::{Characteristic, Peripheral as _, WriteType};
11use btleplug::platform::{Adapter, Peripheral};
12use tokio::time::timeout;
13use tracing::{debug, info, warn};
14use uuid::Uuid;
15
16use crate::error::{Error, Result};
17use crate::scan::{ScanOptions, find_device};
18use crate::traits::AranetDevice;
19use crate::util::{create_identifier, format_peripheral_id};
20use crate::uuid::{
21 BATTERY_LEVEL, BATTERY_SERVICE, CURRENT_READINGS_DETAIL, CURRENT_READINGS_DETAIL_ALT,
22 DEVICE_INFO_SERVICE, DEVICE_NAME, FIRMWARE_REVISION, GAP_SERVICE, HARDWARE_REVISION,
23 MANUFACTURER_NAME, MODEL_NUMBER, SAF_TEHNIKA_SERVICE_NEW, SAF_TEHNIKA_SERVICE_OLD,
24 SERIAL_NUMBER, SOFTWARE_REVISION,
25};
26use aranet_types::{CurrentReading, DeviceInfo, DeviceType};
27
28pub struct Device {
44 #[allow(dead_code)]
50 adapter: Adapter,
51 peripheral: Peripheral,
53 name: Option<String>,
55 address: String,
57 device_type: Option<DeviceType>,
59 services_discovered: bool,
61 notification_handles: tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
63 disconnected: AtomicBool,
65}
66
67impl std::fmt::Debug for Device {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("Device")
73 .field("name", &self.name)
74 .field("address", &self.address)
75 .field("device_type", &self.device_type)
76 .field("services_discovered", &self.services_discovered)
77 .finish_non_exhaustive()
78 }
79}
80
81const READ_TIMEOUT: Duration = Duration::from_secs(10);
83
84const WRITE_TIMEOUT: Duration = Duration::from_secs(10);
86
87impl Device {
88 #[tracing::instrument(level = "info", skip_all, fields(identifier = %identifier))]
103 pub async fn connect(identifier: &str) -> Result<Self> {
104 Self::connect_with_timeout(identifier, Duration::from_secs(15)).await
105 }
106
107 #[tracing::instrument(level = "info", skip_all, fields(identifier = %identifier, timeout_secs = timeout.as_secs()))]
109 pub async fn connect_with_timeout(identifier: &str, timeout: Duration) -> Result<Self> {
110 let options = ScanOptions {
111 duration: timeout,
112 filter_aranet_only: false, };
114
115 let (adapter, peripheral) = match find_device(identifier).await {
117 Ok(result) => result,
118 Err(_) => crate::scan::find_device_with_options(identifier, options).await?,
119 };
120
121 Self::from_peripheral(adapter, peripheral).await
122 }
123
124 #[tracing::instrument(level = "info", skip_all)]
126 pub async fn from_peripheral(adapter: Adapter, peripheral: Peripheral) -> Result<Self> {
127 info!("Connecting to device...");
129 peripheral.connect().await?;
130 info!("Connected!");
131
132 info!("Discovering services...");
134 peripheral.discover_services().await?;
135
136 let services = peripheral.services();
137 debug!("Found {} services", services.len());
138 for service in &services {
139 debug!(" Service: {}", service.uuid);
140 for char in &service.characteristics {
141 debug!(" Characteristic: {}", char.uuid);
142 }
143 }
144
145 let properties = peripheral.properties().await?;
147 let name = properties.as_ref().and_then(|p| p.local_name.clone());
148
149 let address = properties
151 .as_ref()
152 .map(|p| create_identifier(&p.address.to_string(), &peripheral.id()))
153 .unwrap_or_else(|| format_peripheral_id(&peripheral.id()));
154
155 let device_type = name.as_ref().and_then(|n| DeviceType::from_name(n));
157
158 Ok(Self {
159 adapter,
160 peripheral,
161 name,
162 address,
163 device_type,
164 services_discovered: true,
165 notification_handles: tokio::sync::Mutex::new(Vec::new()),
166 disconnected: AtomicBool::new(false),
167 })
168 }
169
170 pub async fn is_connected(&self) -> bool {
172 self.peripheral.is_connected().await.unwrap_or(false)
173 }
174
175 #[tracing::instrument(level = "info", skip(self), fields(device_name = ?self.name))]
184 pub async fn disconnect(&self) -> Result<()> {
185 info!("Disconnecting from device...");
186 self.disconnected.store(true, Ordering::SeqCst);
187
188 {
190 let mut handles = self.notification_handles.lock().await;
191 for handle in handles.drain(..) {
192 handle.abort();
193 }
194 }
195
196 self.peripheral.disconnect().await?;
197 Ok(())
198 }
199
200 pub fn name(&self) -> Option<&str> {
202 self.name.as_deref()
203 }
204
205 pub fn address(&self) -> &str {
210 &self.address
211 }
212
213 pub fn device_type(&self) -> Option<DeviceType> {
215 self.device_type
216 }
217
218 pub async fn read_rssi(&self) -> Result<i16> {
223 let properties = self.peripheral.properties().await?;
224 properties
225 .and_then(|p| p.rssi)
226 .ok_or_else(|| Error::InvalidData("RSSI not available".to_string()))
227 }
228
229 fn find_characteristic(&self, uuid: Uuid) -> Result<Characteristic> {
231 let services = self.peripheral.services();
232 let service_count = services.len();
233
234 for service in &services {
236 if service.uuid == SAF_TEHNIKA_SERVICE_NEW || service.uuid == SAF_TEHNIKA_SERVICE_OLD {
237 for char in &service.characteristics {
238 if char.uuid == uuid {
239 return Ok(char.clone());
240 }
241 }
242 }
243 }
244
245 for service in &services {
247 if service.uuid == GAP_SERVICE
248 || service.uuid == DEVICE_INFO_SERVICE
249 || service.uuid == BATTERY_SERVICE
250 {
251 for char in &service.characteristics {
252 if char.uuid == uuid {
253 return Ok(char.clone());
254 }
255 }
256 }
257 }
258
259 for service in &services {
261 for char in &service.characteristics {
262 if char.uuid == uuid {
263 return Ok(char.clone());
264 }
265 }
266 }
267
268 Err(Error::characteristic_not_found(
269 uuid.to_string(),
270 service_count,
271 ))
272 }
273
274 pub async fn read_characteristic(&self, uuid: Uuid) -> Result<Vec<u8>> {
279 let characteristic = self.find_characteristic(uuid)?;
280 let data = timeout(READ_TIMEOUT, self.peripheral.read(&characteristic))
281 .await
282 .map_err(|_| Error::Timeout {
283 operation: format!("read characteristic {}", uuid),
284 duration: READ_TIMEOUT,
285 })??;
286 Ok(data)
287 }
288
289 pub async fn write_characteristic(&self, uuid: Uuid, data: &[u8]) -> Result<()> {
294 let characteristic = self.find_characteristic(uuid)?;
295 timeout(
296 WRITE_TIMEOUT,
297 self.peripheral
298 .write(&characteristic, data, WriteType::WithResponse),
299 )
300 .await
301 .map_err(|_| Error::Timeout {
302 operation: format!("write characteristic {}", uuid),
303 duration: WRITE_TIMEOUT,
304 })??;
305 Ok(())
306 }
307
308 #[tracing::instrument(level = "debug", skip(self), fields(device_name = ?self.name, device_type = ?self.device_type))]
314 pub async fn read_current(&self) -> Result<CurrentReading> {
315 let data = match self.read_characteristic(CURRENT_READINGS_DETAIL).await {
317 Ok(data) => data,
318 Err(Error::CharacteristicNotFound { .. }) => {
319 debug!("Primary reading characteristic not found, trying alternative");
321 self.read_characteristic(CURRENT_READINGS_DETAIL_ALT)
322 .await?
323 }
324 Err(e) => return Err(e),
325 };
326
327 match self.device_type {
329 Some(DeviceType::Aranet4) | None => {
330 Ok(CurrentReading::from_bytes(&data)?)
332 }
333 Some(DeviceType::Aranet2) => crate::readings::parse_aranet2_reading(&data),
334 Some(DeviceType::AranetRadon) => crate::readings::parse_aranet_radon_gatt(&data),
335 Some(DeviceType::AranetRadiation) => {
336 crate::readings::parse_aranet_radiation_gatt(&data).map(|ext| ext.reading)
338 }
339 Some(_) => Ok(CurrentReading::from_bytes(&data)?),
341 }
342 }
343
344 #[tracing::instrument(level = "debug", skip(self))]
346 pub async fn read_battery(&self) -> Result<u8> {
347 let data = self.read_characteristic(BATTERY_LEVEL).await?;
348 if data.is_empty() {
349 return Err(Error::InvalidData("Empty battery data".to_string()));
350 }
351 Ok(data[0])
352 }
353
354 #[tracing::instrument(level = "debug", skip(self))]
358 pub async fn read_device_info(&self) -> Result<DeviceInfo> {
359 fn read_string(data: Vec<u8>) -> String {
360 String::from_utf8(data)
361 .unwrap_or_default()
362 .trim_end_matches('\0')
363 .to_string()
364 }
365
366 let (
368 name_result,
369 model_result,
370 serial_result,
371 firmware_result,
372 hardware_result,
373 software_result,
374 manufacturer_result,
375 ) = tokio::join!(
376 self.read_characteristic(DEVICE_NAME),
377 self.read_characteristic(MODEL_NUMBER),
378 self.read_characteristic(SERIAL_NUMBER),
379 self.read_characteristic(FIRMWARE_REVISION),
380 self.read_characteristic(HARDWARE_REVISION),
381 self.read_characteristic(SOFTWARE_REVISION),
382 self.read_characteristic(MANUFACTURER_NAME),
383 );
384
385 let name = name_result
386 .map(read_string)
387 .unwrap_or_else(|_| self.name.clone().unwrap_or_default());
388
389 let model = model_result.map(read_string).unwrap_or_default();
390 let serial = serial_result.map(read_string).unwrap_or_default();
391 let firmware = firmware_result.map(read_string).unwrap_or_default();
392 let hardware = hardware_result.map(read_string).unwrap_or_default();
393 let software = software_result.map(read_string).unwrap_or_default();
394 let manufacturer = manufacturer_result.map(read_string).unwrap_or_default();
395
396 Ok(DeviceInfo {
397 name,
398 model,
399 serial,
400 firmware,
401 hardware,
402 software,
403 manufacturer,
404 })
405 }
406
407 pub async fn subscribe_to_notifications<F>(&self, uuid: Uuid, callback: F) -> Result<()>
413 where
414 F: Fn(&[u8]) + Send + Sync + 'static,
415 {
416 let characteristic = self.find_characteristic(uuid)?;
417
418 self.peripheral.subscribe(&characteristic).await?;
419
420 let mut stream = self.peripheral.notifications().await?;
422 let char_uuid = characteristic.uuid;
423
424 let handle = tokio::spawn(async move {
425 use futures::StreamExt;
426 while let Some(notification) = stream.next().await {
427 if notification.uuid == char_uuid {
428 callback(¬ification.value);
429 }
430 }
431 });
432
433 self.notification_handles.lock().await.push(handle);
435
436 Ok(())
437 }
438
439 pub async fn unsubscribe_from_notifications(&self, uuid: Uuid) -> Result<()> {
441 let characteristic = self.find_characteristic(uuid)?;
442 self.peripheral.unsubscribe(&characteristic).await?;
443 Ok(())
444 }
445}
446
447impl Drop for Device {
457 fn drop(&mut self) {
458 if !self.disconnected.load(Ordering::SeqCst) {
459 warn!(
460 device_name = ?self.name,
461 device_address = %self.address,
462 "Device dropped without calling disconnect() - BLE resources may not be properly released. \
463 Call device.disconnect().await before dropping."
464 );
465 }
466 }
467}
468
469#[async_trait]
470impl AranetDevice for Device {
471 async fn is_connected(&self) -> bool {
474 Device::is_connected(self).await
475 }
476
477 async fn disconnect(&self) -> Result<()> {
478 Device::disconnect(self).await
479 }
480
481 fn name(&self) -> Option<&str> {
484 Device::name(self)
485 }
486
487 fn address(&self) -> &str {
488 Device::address(self)
489 }
490
491 fn device_type(&self) -> Option<DeviceType> {
492 Device::device_type(self)
493 }
494
495 async fn read_current(&self) -> Result<CurrentReading> {
498 Device::read_current(self).await
499 }
500
501 async fn read_device_info(&self) -> Result<DeviceInfo> {
502 Device::read_device_info(self).await
503 }
504
505 async fn read_rssi(&self) -> Result<i16> {
506 Device::read_rssi(self).await
507 }
508
509 async fn read_battery(&self) -> Result<u8> {
512 Device::read_battery(self).await
513 }
514
515 async fn get_history_info(&self) -> Result<crate::history::HistoryInfo> {
518 Device::get_history_info(self).await
519 }
520
521 async fn download_history(&self) -> Result<Vec<aranet_types::HistoryRecord>> {
522 Device::download_history(self).await
523 }
524
525 async fn download_history_with_options(
526 &self,
527 options: crate::history::HistoryOptions,
528 ) -> Result<Vec<aranet_types::HistoryRecord>> {
529 Device::download_history_with_options(self, options).await
530 }
531
532 async fn get_interval(&self) -> Result<crate::settings::MeasurementInterval> {
535 Device::get_interval(self).await
536 }
537
538 async fn set_interval(&self, interval: crate::settings::MeasurementInterval) -> Result<()> {
539 Device::set_interval(self, interval).await
540 }
541
542 async fn get_calibration(&self) -> Result<crate::settings::CalibrationData> {
543 Device::get_calibration(self).await
544 }
545}