From 1ace00a099141d63a787074015f84a44a7c60ce5 Mon Sep 17 00:00:00 2001 From: muji Date: Tue, 28 Nov 2023 13:30:26 +0800 Subject: [PATCH] Improve network account modules. --- workspace/net/Cargo.toml | 10 +- workspace/net/src/client/account/archive.rs | 45 ++ workspace/net/src/client/account/contacts.rs | 49 ++ workspace/net/src/client/account/devices.rs | 22 +- workspace/net/src/client/account/listen.rs | 110 ++++ workspace/net/src/client/account/migrate.rs | 33 ++ workspace/net/src/client/account/mod.rs | 11 + .../net/src/client/account/network_account.rs | 526 ++---------------- .../net/src/client/account/security_report.rs | 19 + workspace/net/src/client/account/sync.rs | 209 +++++++ 10 files changed, 532 insertions(+), 502 deletions(-) create mode 100644 workspace/net/src/client/account/archive.rs create mode 100644 workspace/net/src/client/account/contacts.rs create mode 100644 workspace/net/src/client/account/listen.rs create mode 100644 workspace/net/src/client/account/migrate.rs create mode 100644 workspace/net/src/client/account/security_report.rs create mode 100644 workspace/net/src/client/account/sync.rs diff --git a/workspace/net/Cargo.toml b/workspace/net/Cargo.toml index 6db45743a8..e4d5028b0a 100644 --- a/workspace/net/Cargo.toml +++ b/workspace/net/Cargo.toml @@ -14,14 +14,16 @@ full = [ "client", "server", "hashcheck", + "listen", "migrate", "contacts", "device", "security-report", ] client = [ - "reqwest", + "dep:reqwest", ] +listen = ["dep:tokio-tungstenite"] server = [ "toml", "axum", @@ -37,8 +39,8 @@ archive = ["sos-sdk/archive"] contacts = ["sos-sdk/contacts"] migrate = ["sos-migrate"] device = [ - "if-addrs", - "whoami", + "dep:if-addrs", + "dep:whoami", ] security-report = ["sos-sdk/security-report"] mem-fs = ["sos-sdk/mem-fs"] @@ -81,6 +83,7 @@ tower = { version = "0.4", optional = true } tower-http = { version = "0.3", features = ["cors", "trace"], optional = true } async-stream = { version = "0.3", optional = true } tokio-stream = { version = "0.1", optional = true } +tokio-tungstenite = { version = "0.20", features = ["rustls-tls-native-roots"] , optional = true} # device whoami = { version = "1.4", optional = true } @@ -103,7 +106,6 @@ features = ["async"] [target.'cfg(not(target_arch="wasm32"))'.dependencies] file-guard = "0.1" tokio = { version = "1", features = ["rt", "rt-multi-thread", "time", "sync", "macros"] } -tokio-tungstenite = { version = "0.20", features = ["rustls-tls-native-roots"] } [target.'cfg(target_arch="wasm32")'.dependencies] tokio = { version = "1", features = ["rt", "time", "sync"] } diff --git a/workspace/net/src/client/account/archive.rs b/workspace/net/src/client/account/archive.rs new file mode 100644 index 0000000000..59884a6e94 --- /dev/null +++ b/workspace/net/src/client/account/archive.rs @@ -0,0 +1,45 @@ +//! Adds backup archive functions to network account. +use super::network_account::LocalAccount; +use crate::client::{NetworkAccount, Result}; +use sos_sdk::account::{ + archive::{Inventory, RestoreOptions}, + AccountInfo, +}; +use std::path::{Path, PathBuf}; +use tokio::io::{AsyncRead, AsyncSeek}; + +impl NetworkAccount { + /// Create a backup archive containing the + /// encrypted data for the account. + pub async fn export_backup_archive>( + &self, + path: P, + ) -> Result<()> { + Ok(self.account.export_backup_archive(path).await?) + } + + /// Read the inventory from an archive. + pub async fn restore_archive_inventory< + R: AsyncRead + AsyncSeek + Unpin, + >( + buffer: R, + ) -> Result { + Ok(LocalAccount::restore_archive_inventory(buffer).await?) + } + + /// Import from an archive file. + pub async fn restore_backup_archive>( + owner: Option<&mut NetworkAccount>, + path: P, + options: RestoreOptions, + data_dir: Option, + ) -> Result { + Ok(LocalAccount::restore_backup_archive( + owner.map(|o| &mut o.account), + path, + options, + data_dir, + ) + .await?) + } +} diff --git a/workspace/net/src/client/account/contacts.rs b/workspace/net/src/client/account/contacts.rs new file mode 100644 index 0000000000..05f348afcb --- /dev/null +++ b/workspace/net/src/client/account/contacts.rs @@ -0,0 +1,49 @@ +//! Adds contacts functions to network account. +use crate::client::{NetworkAccount, Result}; +use sos_sdk::{ + account::contacts::ContactImportProgress, + vault::{secret::SecretId, Summary}, +}; +use std::path::Path; + +impl NetworkAccount { + /// Get an avatar JPEG image for a contact in the current + /// open folder. + pub async fn load_avatar( + &mut self, + secret_id: &SecretId, + folder: Option, + ) -> Result>> { + Ok(self.account.load_avatar(secret_id, folder).await?) + } + + /// Export a contact secret to vCard file. + pub async fn export_vcard_file>( + &mut self, + path: P, + secret_id: &SecretId, + folder: Option, + ) -> Result<()> { + Ok(self + .account + .export_vcard_file(path, secret_id, folder) + .await?) + } + + /// Export all contacts to a single vCard. + pub async fn export_all_vcards>( + &mut self, + path: P, + ) -> Result<()> { + Ok(self.account.export_all_vcards(path).await?) + } + + /// Import vCards from a string buffer. + pub async fn import_vcard( + &mut self, + content: &str, + progress: impl Fn(ContactImportProgress), + ) -> Result<()> { + Ok(self.account.import_vcard(content, progress).await?) + } +} diff --git a/workspace/net/src/client/account/devices.rs b/workspace/net/src/client/account/devices.rs index d85ba4a525..12bd758fbe 100644 --- a/workspace/net/src/client/account/devices.rs +++ b/workspace/net/src/client/account/devices.rs @@ -1,17 +1,17 @@ -//! User device manager. +//! Account device manager. use crate::client::Result; use std::path::PathBuf; -#[cfg(feature = "device")] -use crate::device::{self, TrustedDevice}; +use crate::{ + client::NetworkAccount, + device::{self, TrustedDevice}, +}; /// Manages the devices for a user. -#[cfg(feature = "device")] pub struct DeviceManager { device_dir: PathBuf, } -#[cfg(feature = "device")] impl DeviceManager { /// Create a new devices manager. pub(super) fn new(device_dir: PathBuf) -> Result { @@ -42,3 +42,15 @@ impl DeviceManager { Ok(()) } } + +impl NetworkAccount { + /// Account devices reference. + pub fn devices(&self) -> &DeviceManager { + &self.devices + } + + /// Account devices mutable reference. + pub fn devices_mut(&mut self) -> &mut DeviceManager { + &mut self.devices + } +} diff --git a/workspace/net/src/client/account/listen.rs b/workspace/net/src/client/account/listen.rs new file mode 100644 index 0000000000..be071798d2 --- /dev/null +++ b/workspace/net/src/client/account/listen.rs @@ -0,0 +1,110 @@ +//! Adds functions for listening to change notifications using +//! a websocket connection. +use crate::client::{ + account::remote::{NetworkAccountReceiver, NetworkAccountSender}, + Error, ListenOptions, NetworkAccount, Origin, RemoteBridge, Result, + WebSocketHandle, +}; +use futures::{select, FutureExt}; +use sos_sdk::prelude::SecureAccessKey; +use std::sync::Arc; + +use super::network_account::LocalAccount; + +impl NetworkAccount { + /// Listen for changes on a remote origin. + pub async fn listen( + &self, + origin: &Origin, + options: ListenOptions, + ) -> Result { + let remotes = self.remotes.read().await; + if let Some(remote) = remotes.get(origin) { + if let Some(remote) = + remote.as_any().downcast_ref::() + { + let remote = Arc::new(remote.clone()); + let (handle, rx, tx) = RemoteBridge::listen(remote, options); + self.spawn_remote_bridge_channels(rx, tx); + + // Store the listeners so we can + // close the connections on sign out + let mut listeners = self.listeners.lock().await; + listeners.push(handle.clone()); + + Ok(handle) + } else { + unreachable!(); + } + } else { + Err(Error::OriginNotFound(origin.clone())) + } + } + + fn spawn_remote_bridge_channels( + &self, + mut rx: NetworkAccountReceiver, + tx: NetworkAccountSender, + ) { + if self.account.is_authenticated() { + let user = self.user().unwrap(); + let keeper = user.identity().keeper(); + let secret_key = user.identity().signer().to_bytes(); + + // TODO: needs shutdown hook so this loop exits + // TODO: when the websocket connection is closed + tokio::task::spawn(async move { + loop { + select!( + event = rx + .secure_access_key_rx + .recv() + .fuse() => { + if let Some((folder_id, secure_key)) = event { + + // Decrypt the secure access key received + // when creating or importing a folder, + // must be done here as the remote bridge + // does not have access to the private key + // (account signing key) + let access_key = SecureAccessKey::decrypt( + &secure_key, + secret_key.clone(), + ) + .await?; + + // Save the access key for the synced folder + let identity = Arc::clone(&keeper); + LocalAccount::save_folder_password( + identity, + &folder_id, + access_key.clone(), + ) + .await?; + + tx.access_key_tx.send(access_key).await?; + } + } + event = rx + .remove_vault_rx + .recv() + .fuse() => { + if let Some(folder_id) = event { + // When a folder is removed via remote + // bridge changes we need to clean up the + // passphrase + let identity = Arc::clone(&keeper); + LocalAccount::remove_folder_password( + identity, + &folder_id, + ) + .await?; + } + } + ) + } + Ok::<(), Error>(()) + }); + } + } +} diff --git a/workspace/net/src/client/account/migrate.rs b/workspace/net/src/client/account/migrate.rs new file mode 100644 index 0000000000..c9e71553e3 --- /dev/null +++ b/workspace/net/src/client/account/migrate.rs @@ -0,0 +1,33 @@ +//! Adds migration functions to network account. +use crate::client::{NetworkAccount, Result}; +use sos_sdk::vault::Summary; +use std::path::Path; + +#[cfg(feature = "migrate")] +use sos_migrate::{import::ImportTarget, AccountExport, AccountImport}; + +#[cfg(feature = "migrate")] +impl NetworkAccount { + /// Write a zip archive containing all the secrets + /// for the account unencrypted. + /// + /// Used to migrate an account to another app. + pub async fn export_unsafe_archive>( + &self, + path: P, + ) -> Result<()> { + let migration = AccountExport::new(&self.account); + Ok(migration.export_unsafe_archive(path).await?) + } + + /// Import secrets from another app. + #[cfg(feature = "migrate")] + pub async fn import_file( + &mut self, + target: ImportTarget, + ) -> Result { + let _ = self.sync_lock.lock().await; + let mut migration = AccountImport::new(&mut self.account); + Ok(migration.import_file(target).await?) + } +} diff --git a/workspace/net/src/client/account/mod.rs b/workspace/net/src/client/account/mod.rs index e2a59392f7..84d95e9357 100644 --- a/workspace/net/src/client/account/mod.rs +++ b/workspace/net/src/client/account/mod.rs @@ -1,10 +1,21 @@ //! Network aware account storage. +#[cfg(feature = "archive")] +mod archive; +#[cfg(feature = "contacts")] +mod contacts; #[cfg(feature = "device")] mod devices; +#[cfg(feature = "listen")] +mod listen; mod macros; +#[cfg(feature = "migrate")] +mod migrate; mod network_account; mod remote; +#[cfg(feature = "security-report")] +mod security_report; +mod sync; #[cfg(feature = "device")] pub use devices::DeviceManager; diff --git a/workspace/net/src/client/account/network_account.rs b/workspace/net/src/client/account/network_account.rs index bd113d3e89..79f2f833d0 100644 --- a/workspace/net/src/client/account/network_account.rs +++ b/workspace/net/src/client/account/network_account.rs @@ -1,19 +1,13 @@ //! Network aware account. -use std::{ - any::Any, - path::{Path, PathBuf}, - sync::Arc, -}; - +use secrecy::SecretString; use sos_sdk::{ account::{ - archive::{Inventory, RestoreOptions}, search::{AccountSearch, AccountStatistics, DocumentCount}, - AccessOptions, Account, AccountBuilder, AccountData, AccountHandler, - AccountInfo, AuthenticatedUser, DetachedView, FolderStorage, - NewAccount, UserPaths, + AccessOptions, Account, AccountBuilder, AccountData, + AuthenticatedUser, DetachedView, FolderStorage, NewAccount, + UserPaths, }, - commit::{CommitHash, CommitState}, + commit::CommitHash, crypto::AccessKey, events::{Event, ReadEvent}, mpc::generate_keypair, @@ -24,119 +18,35 @@ use sos_sdk::{ }, vfs, }; - -#[cfg(feature = "contacts")] -use sos_sdk::account::contacts::ContactImportProgress; - -#[cfg(feature = "security-report")] -pub use sos_sdk::account::security_report::{ - SecurityReport, SecurityReportOptions, +use std::{ + path::{Path, PathBuf}, + sync::Arc, }; - +use tokio::sync::{Mutex, RwLock}; use tracing::{span, Level}; -use secrecy::SecretString; -use tokio::{ - io::{AsyncRead, AsyncSeek}, - sync::{Mutex, RwLock}, -}; - -#[cfg(not(target_arch = "wasm32"))] +#[cfg(feature = "listen")] use crate::client::WebSocketHandle; -use crate::client::{ - sync::SyncData, Error, Origin, Remote, RemoteBridge, RemoteSync, Remotes, - Result, SyncError, SyncOptions, -}; - -use async_trait::async_trait; #[cfg(feature = "device")] use super::devices::DeviceManager; -type SyncHandlerData = Arc>; -type LocalAccount = Account; - -struct SyncHandler { - remotes: Arc>, -} - -impl SyncHandler { - /// Try to sync the target folder against all remotes. - async fn try_sync_folder( - &self, - storage: Arc>, - folder: &Summary, - commit_state: &CommitState, - ) -> Result> { - let mut changed = false; - let (last_commit, commit_proof) = commit_state; - let mut last_commit = last_commit.clone(); - let mut commit_proof = commit_proof.clone(); - - let remotes = self.remotes.read().await; - for remote in remotes.values() { - let local_changed = remote - .sync_folder(folder, commit_state, None, &Default::default()) - .await?; - - // If a remote changes were applied to local - // we need to recompute the last commit and client proof - if local_changed { - let reader = storage.read().await; - let event_log = reader - .cache() - .get(folder.id()) - .ok_or(Error::CacheNotAvailable(*folder.id()))?; - last_commit = event_log - .last_commit() - .await? - .ok_or(Error::NoRootCommit)?; - commit_proof = event_log.tree().head()?; - } - - changed = changed || local_changed; - } - - Ok(if changed { - Some((last_commit, commit_proof)) - } else { - None - }) - } -} - -#[async_trait::async_trait] -impl AccountHandler for SyncHandler { - type Data = SyncHandlerData; +use super::sync::{SyncHandler, SyncHandlerData}; +use crate::client::{ + sync::SyncData, Origin, Remote, RemoteBridge, RemoteSync, Remotes, + Result, SyncError, +}; - fn data(&self) -> &Self::Data { - &self.remotes - } +pub(super) type LocalAccount = Account; - async fn before_change( - &self, - storage: Arc>, - folder: &Summary, - commit_state: &CommitState, - ) -> Option { - match self.try_sync_folder(storage, folder, commit_state).await { - Ok(commit_state) => commit_state, - Err(e) => { - tracing::error!(error = ?e, "failed to sync before change"); - None - } - } - } -} - -/// Adds networking capability to a local account. +/// Account with networking capability. pub struct NetworkAccount { /// Local account. - account: LocalAccount, + pub(super) account: LocalAccount, /// Devices for this user. #[cfg(feature = "device")] - devices: DeviceManager, + pub(super) devices: DeviceManager, /// Remote targets for synchronization. pub(super) remotes: Arc>, @@ -147,7 +57,7 @@ pub struct NetworkAccount { /// Websocket change listeners. #[cfg(not(target_arch = "wasm32"))] - listeners: Mutex>, + pub(super) listeners: Mutex>, } impl NetworkAccount { @@ -352,15 +262,11 @@ impl NetworkAccount { /// Delete the account for this user and sign out. pub async fn delete_account(&mut self) -> Result<()> { - Ok(self.account.delete_account().await?) - - /* - let event = self.user.delete_account(&self.paths).await?; - let audit_event: AuditEvent = (self.address(), &event).into(); - self.append_audit_logs(vec![audit_event]).await?; - self.sign_out().await; + // Delete the account and sign out + self.account.delete_account().await?; + // Shutdown any change listeners + self.shutdown_listeners().await; Ok(()) - */ } /// Rename this account. @@ -371,18 +277,6 @@ impl NetworkAccount { Ok(self.account.rename_account(account_name).await?) } - /// Users devices reference. - #[cfg(feature = "device")] - pub fn devices(&self) -> &DeviceManager { - &self.devices - } - - /// Users devices mutable reference. - #[cfg(feature = "device")] - pub fn devices_mut(&mut self) -> &mut DeviceManager { - &mut self.devices - } - /// Try to find a folder using a predicate. pub async fn find

(&self, predicate: P) -> Option

where @@ -427,17 +321,17 @@ impl NetworkAccount { let _enter = span.enter(); tracing::debug!(address = %self.address()); + self.shutdown_listeners().await; + Ok(self.account.sign_out().await?) + } - // Close all the websocket connections - { - let mut listeners = self.listeners.lock().await; - for handle in listeners.drain(..) { - tracing::debug!("close websocket"); - handle.close(); - } + /// Close all the websocket connections + async fn shutdown_listeners(&self) { + let mut listeners = self.listeners.lock().await; + for handle in listeners.drain(..) { + tracing::debug!("close websocket"); + handle.close(); } - - Ok(self.account.sign_out().await?) } /// Create a folder. @@ -767,358 +661,4 @@ impl NetworkAccount { .download_file(vault_id, secret_id, file_name) .await?) } - - /// Generate a security report. - #[cfg(feature = "security-report")] - pub async fn generate_security_report( - &mut self, - options: SecurityReportOptions, - ) -> Result> - where - D: Fn(Vec) -> R, - R: std::future::Future>, - { - Ok(self.account.generate_security_report(options).await?) - } -} - -#[cfg(feature = "migrate")] -use sos_migrate::{import::ImportTarget, AccountExport, AccountImport}; - -#[cfg(feature = "migrate")] -impl NetworkAccount { - /// Write a zip archive containing all the secrets - /// for the account unencrypted. - /// - /// Used to migrate an account to another provider. - pub async fn export_unsafe_archive>( - &self, - path: P, - ) -> Result<()> { - let migration = AccountExport::new(&self.account); - Ok(migration.export_unsafe_archive(path).await?) - } - - /// Import secrets from another app. - #[cfg(feature = "migrate")] - pub async fn import_file( - &mut self, - target: ImportTarget, - ) -> Result { - let _ = self.sync_lock.lock().await; - let mut migration = AccountImport::new(&mut self.account); - Ok(migration.import_file(target).await?) - } -} - -#[cfg(feature = "archive")] -impl NetworkAccount { - /// Create a backup archive containing the - /// encrypted data for the account. - pub async fn export_backup_archive>( - &self, - path: P, - ) -> Result<()> { - Ok(self.account.export_backup_archive(path).await?) - } - - /// Read the inventory from an archive. - pub async fn restore_archive_inventory< - R: AsyncRead + AsyncSeek + Unpin, - >( - buffer: R, - ) -> Result { - Ok(LocalAccount::restore_archive_inventory(buffer).await?) - } - - /// Import from an archive file. - pub async fn restore_backup_archive>( - owner: Option<&mut NetworkAccount>, - path: P, - options: RestoreOptions, - data_dir: Option, - ) -> Result { - Ok(LocalAccount::restore_backup_archive( - owner.map(|o| &mut o.account), - path, - options, - data_dir, - ) - .await?) - } -} - -#[cfg(feature = "contacts")] -impl NetworkAccount { - /// Get an avatar JPEG image for a contact in the current - /// open folder. - pub async fn load_avatar( - &mut self, - secret_id: &SecretId, - folder: Option, - ) -> Result>> { - Ok(self.account.load_avatar(secret_id, folder).await?) - } - - /// Export a contact secret to vCard file. - pub async fn export_vcard_file>( - &mut self, - path: P, - secret_id: &SecretId, - folder: Option, - ) -> Result<()> { - Ok(self - .account - .export_vcard_file(path, secret_id, folder) - .await?) - } - - /// Export all contacts to a single vCard. - pub async fn export_all_vcards>( - &mut self, - path: P, - ) -> Result<()> { - Ok(self.account.export_all_vcards(path).await?) - } - - /// Import vCards from a string buffer. - pub async fn import_vcard( - &mut self, - content: &str, - progress: impl Fn(ContactImportProgress), - ) -> Result<()> { - Ok(self.account.import_vcard(content, progress).await?) - } -} - -#[async_trait] -impl RemoteSync for NetworkAccount { - async fn sync(&self) -> Option { - self.sync_with_options(&Default::default()).await - } - - async fn sync_with_options( - &self, - options: &SyncOptions, - ) -> Option { - let _ = self.sync_lock.lock().await; - let mut errors = Vec::new(); - let remotes = self.remotes.read().await; - for (origin, remote) in &*remotes { - let sync_remote = options.origins.is_empty() - || options.origins.contains(origin); - - if sync_remote { - if let Some(e) = remote.sync_with_options(options).await { - match e { - SyncError::One(e) => errors.push((origin.clone(), e)), - SyncError::Multiple(mut errs) => { - errors.append(&mut errs) - } - } - } - } - } - if errors.is_empty() { - None - } else { - for error in &errors { - tracing::error!(error = ?error); - } - Some(SyncError::Multiple(errors)) - } - } - - async fn sync_folder( - &self, - folder: &Summary, - commit_state: &CommitState, - remote_state: Option, - options: &SyncOptions, - ) -> std::result::Result { - let _ = self.sync_lock.lock().await; - let mut errors = Vec::new(); - let mut changed = false; - let remotes = self.remotes.read().await; - for (origin, remote) in &*remotes { - let sync_remote = options.origins.is_empty() - || options.origins.contains(origin); - - if sync_remote { - match remote - .sync_folder( - folder, - commit_state, - remote_state.clone(), - options, - ) - .await - { - Ok(changes) => {} - Err(e) => match e { - SyncError::One(e) => errors.push((origin.clone(), e)), - SyncError::Multiple(mut errs) => { - errors.append(&mut errs) - } - }, - } - } - } - if errors.is_empty() { - Ok(changed) - } else { - for error in &errors { - tracing::error!(error = ?error); - } - Err(SyncError::Multiple(errors)) - } - } - - async fn sync_send_events( - &self, - folder: &Summary, - commit_state: &CommitState, - events: &[Event], - data: &[SyncData], - ) -> std::result::Result<(), SyncError> { - let _ = self.sync_lock.lock().await; - let mut errors = Vec::new(); - let remotes = self.remotes.read().await; - for (origin, remote) in &*remotes { - if let Err(e) = remote - .sync_send_events(folder, commit_state, events, data) - .await - { - match e { - SyncError::One(e) => errors.push((origin.clone(), e)), - SyncError::Multiple(mut errs) => errors.append(&mut errs), - } - } - } - if errors.is_empty() { - Ok(()) - } else { - Err(SyncError::Multiple(errors)) - } - } - - fn as_any(&self) -> &(dyn Any + Send + Sync) { - self - } - - fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync) { - self - } -} - -#[cfg(not(target_arch = "wasm32"))] -mod listen { - use super::LocalAccount; - use crate::client::{ - account::remote::{NetworkAccountReceiver, NetworkAccountSender}, - Error, ListenOptions, NetworkAccount, Origin, RemoteBridge, Result, - WebSocketHandle, - }; - use futures::{select, FutureExt}; - use sos_sdk::prelude::SecureAccessKey; - use std::sync::Arc; - - impl NetworkAccount { - /// Listen for changes on a remote origin. - pub async fn listen( - &self, - origin: &Origin, - options: ListenOptions, - ) -> Result { - let remotes = self.remotes.read().await; - if let Some(remote) = remotes.get(origin) { - if let Some(remote) = - remote.as_any().downcast_ref::() - { - let remote = Arc::new(remote.clone()); - let (handle, rx, tx) = - RemoteBridge::listen(remote, options); - self.spawn_remote_bridge_channels(rx, tx); - - // Store the listeners so we can - // close the connections on sign out - let mut listeners = self.listeners.lock().await; - listeners.push(handle.clone()); - - Ok(handle) - } else { - unreachable!(); - } - } else { - Err(Error::OriginNotFound(origin.clone())) - } - } - - fn spawn_remote_bridge_channels( - &self, - mut rx: NetworkAccountReceiver, - tx: NetworkAccountSender, - ) { - if self.account.is_authenticated() { - let user = self.user().unwrap(); - let keeper = user.identity().keeper(); - let secret_key = user.identity().signer().to_bytes(); - - // TODO: needs shutdown hook so this loop exits - // TODO: when the websocket connection is closed - tokio::task::spawn(async move { - loop { - select!( - event = rx - .secure_access_key_rx - .recv() - .fuse() => { - if let Some((folder_id, secure_key)) = event { - - // Decrypt the secure access key received - // when creating or importing a folder, - // must be done here as the remote bridge - // does not have access to the private key - // (account signing key) - let access_key = SecureAccessKey::decrypt( - &secure_key, - secret_key.clone(), - ) - .await?; - - // Save the access key for the synced folder - let identity = Arc::clone(&keeper); - LocalAccount::save_folder_password( - identity, - &folder_id, - access_key.clone(), - ) - .await?; - - tx.access_key_tx.send(access_key).await?; - } - } - event = rx - .remove_vault_rx - .recv() - .fuse() => { - if let Some(folder_id) = event { - // When a folder is removed via remote - // bridge changes we need to clean up the - // passphrase - let identity = Arc::clone(&keeper); - LocalAccount::remove_folder_password( - identity, - &folder_id, - ) - .await?; - } - } - ) - } - Ok::<(), Error>(()) - }); - } - } - } } diff --git a/workspace/net/src/client/account/security_report.rs b/workspace/net/src/client/account/security_report.rs new file mode 100644 index 0000000000..be63e94254 --- /dev/null +++ b/workspace/net/src/client/account/security_report.rs @@ -0,0 +1,19 @@ +//! Adds security report functions to network account. +use crate::client::{NetworkAccount, Result}; +use sos_sdk::account::security_report::{ + SecurityReport, SecurityReportOptions, +}; + +impl NetworkAccount { + /// Generate a security report. + pub async fn generate_security_report( + &mut self, + options: SecurityReportOptions, + ) -> Result> + where + D: Fn(Vec) -> R, + R: std::future::Future>, + { + Ok(self.account.generate_security_report(options).await?) + } +} diff --git a/workspace/net/src/client/account/sync.rs b/workspace/net/src/client/account/sync.rs new file mode 100644 index 0000000000..7df894e63d --- /dev/null +++ b/workspace/net/src/client/account/sync.rs @@ -0,0 +1,209 @@ +//! Adds sync capability to network account. +use crate::client::{ + sync::SyncData, Error, NetworkAccount, RemoteSync, Remotes, Result, + SyncError, SyncOptions, +}; +use async_trait::async_trait; +use sos_sdk::{ + account::{AccountHandler, FolderStorage}, + commit::CommitState, + events::Event, + vault::Summary, +}; +use std::{any::Any, sync::Arc}; +use tokio::sync::RwLock; + +pub(super) type SyncHandlerData = Arc>; + +pub(super) struct SyncHandler { + pub(super) remotes: Arc>, +} + +impl SyncHandler { + /// Try to sync the target folder against all remotes. + async fn try_sync_folder( + &self, + storage: Arc>, + folder: &Summary, + commit_state: &CommitState, + ) -> Result> { + let mut changed = false; + let (last_commit, commit_proof) = commit_state; + let mut last_commit = last_commit.clone(); + let mut commit_proof = commit_proof.clone(); + + let remotes = self.remotes.read().await; + for remote in remotes.values() { + let local_changed = remote + .sync_folder(folder, commit_state, None, &Default::default()) + .await?; + + // If a remote changes were applied to local + // we need to recompute the last commit and client proof + if local_changed { + let reader = storage.read().await; + let event_log = reader + .cache() + .get(folder.id()) + .ok_or(Error::CacheNotAvailable(*folder.id()))?; + last_commit = event_log + .last_commit() + .await? + .ok_or(Error::NoRootCommit)?; + commit_proof = event_log.tree().head()?; + } + + changed = changed || local_changed; + } + + Ok(if changed { + Some((last_commit, commit_proof)) + } else { + None + }) + } +} + +#[async_trait::async_trait] +impl AccountHandler for SyncHandler { + type Data = SyncHandlerData; + + fn data(&self) -> &Self::Data { + &self.remotes + } + + async fn before_change( + &self, + storage: Arc>, + folder: &Summary, + commit_state: &CommitState, + ) -> Option { + match self.try_sync_folder(storage, folder, commit_state).await { + Ok(commit_state) => commit_state, + Err(e) => { + tracing::error!(error = ?e, "failed to sync before change"); + None + } + } + } +} + +#[async_trait] +impl RemoteSync for NetworkAccount { + async fn sync(&self) -> Option { + self.sync_with_options(&Default::default()).await + } + + async fn sync_with_options( + &self, + options: &SyncOptions, + ) -> Option { + let _ = self.sync_lock.lock().await; + let mut errors = Vec::new(); + let remotes = self.remotes.read().await; + for (origin, remote) in &*remotes { + let sync_remote = options.origins.is_empty() + || options.origins.contains(origin); + + if sync_remote { + if let Some(e) = remote.sync_with_options(options).await { + match e { + SyncError::One(e) => errors.push((origin.clone(), e)), + SyncError::Multiple(mut errs) => { + errors.append(&mut errs) + } + } + } + } + } + if errors.is_empty() { + None + } else { + for error in &errors { + tracing::error!(error = ?error); + } + Some(SyncError::Multiple(errors)) + } + } + + async fn sync_folder( + &self, + folder: &Summary, + commit_state: &CommitState, + remote_state: Option, + options: &SyncOptions, + ) -> std::result::Result { + let _ = self.sync_lock.lock().await; + let mut errors = Vec::new(); + let mut changed = false; + let remotes = self.remotes.read().await; + for (origin, remote) in &*remotes { + let sync_remote = options.origins.is_empty() + || options.origins.contains(origin); + + if sync_remote { + match remote + .sync_folder( + folder, + commit_state, + remote_state.clone(), + options, + ) + .await + { + Ok(changes) => {} + Err(e) => match e { + SyncError::One(e) => errors.push((origin.clone(), e)), + SyncError::Multiple(mut errs) => { + errors.append(&mut errs) + } + }, + } + } + } + if errors.is_empty() { + Ok(changed) + } else { + for error in &errors { + tracing::error!(error = ?error); + } + Err(SyncError::Multiple(errors)) + } + } + + async fn sync_send_events( + &self, + folder: &Summary, + commit_state: &CommitState, + events: &[Event], + data: &[SyncData], + ) -> std::result::Result<(), SyncError> { + let _ = self.sync_lock.lock().await; + let mut errors = Vec::new(); + let remotes = self.remotes.read().await; + for (origin, remote) in &*remotes { + if let Err(e) = remote + .sync_send_events(folder, commit_state, events, data) + .await + { + match e { + SyncError::One(e) => errors.push((origin.clone(), e)), + SyncError::Multiple(mut errs) => errors.append(&mut errs), + } + } + } + if errors.is_empty() { + Ok(()) + } else { + Err(SyncError::Multiple(errors)) + } + } + + fn as_any(&self) -> &(dyn Any + Send + Sync) { + self + } + + fn as_any_mut(&mut self) -> &mut (dyn Any + Send + Sync) { + self + } +}