From 8336f44b6791186527731160f855d15b5e4dd513 Mon Sep 17 00:00:00 2001 From: Louis Beaumont Date: Wed, 3 Jul 2024 15:36:22 +0200 Subject: [PATCH] chore: remove tons of warnings --- screenpipe-audio/src/bin/screenpipe-audio.rs | 17 ++--- screenpipe-audio/src/stt.rs | 6 +- .../src/bin/screenpipe-server.rs | 16 ++--- screenpipe-server/src/bin/screenpipe-video.rs | 6 +- screenpipe-server/src/core.rs | 9 ++- screenpipe-server/src/server.rs | 25 ++----- screenpipe-server/src/video.rs | 69 ++----------------- 7 files changed, 32 insertions(+), 116 deletions(-) diff --git a/screenpipe-audio/src/bin/screenpipe-audio.rs b/screenpipe-audio/src/bin/screenpipe-audio.rs index 518acaa1..76f65c06 100644 --- a/screenpipe-audio/src/bin/screenpipe-audio.rs +++ b/screenpipe-audio/src/bin/screenpipe-audio.rs @@ -1,7 +1,6 @@ use anyhow::Result; -use env_logger::Env; use log::info; -use screenpipe_audio::{continuous_audio_capture, ControlMessage}; +use screenpipe_audio::continuous_audio_capture; use std::thread; use std::time::Duration; @@ -10,27 +9,21 @@ fn main() -> Result<()> { use log::LevelFilter; use std::sync::mpsc; - let _ = Builder::new() + Builder::new() .filter(None, LevelFilter::Info) .filter_module("tokenizers", LevelFilter::Error) .init(); - - let (control_tx, control_rx) = mpsc::channel(); + let (_control_tx, control_rx) = mpsc::channel(); let (result_tx, result_rx) = mpsc::channel(); let chunk_duration = Duration::from_secs(5); - let capture_thread = + let _capture_thread = thread::spawn(move || continuous_audio_capture(control_rx, result_tx, chunk_duration)); - while true { + loop { if let Ok(result) = result_rx.recv_timeout(Duration::from_secs(5)) { info!("Transcription: {}", result.text); } } - - control_tx.send(ControlMessage::Stop)?; - capture_thread.join().unwrap()?; - - Ok(()) } diff --git a/screenpipe-audio/src/stt.rs b/screenpipe-audio/src/stt.rs index d6dd582f..e7a52dc2 100644 --- a/screenpipe-audio/src/stt.rs +++ b/screenpipe-audio/src/stt.rs @@ -11,7 +11,7 @@ use rubato::{ Resampler, SincFixedIn, SincInterpolationParameters, SincInterpolationType, WindowFunction, }; -use crate::{multilingual, pcm_decode::pcm_decode}; +use crate::{pcm_decode::pcm_decode}; // TODO: improve model loading strategy @@ -458,10 +458,10 @@ pub fn stt(input: &str) -> Result { // info!("loaded mel: {:?}", mel.dims()); let vb = candle_transformers::quantized_var_builder::VarBuilder::from_gguf( - &weights_filename, + weights_filename, &device, )?; - let mut model = Model::Quantized(m::quantized_model::Whisper::load(&vb, config)?); + let model = Model::Quantized(m::quantized_model::Whisper::load(&vb, config)?); // info!("detecting language"); // TODO: disabled seems slow as fuck // let language_token = Some(multilingual::detect_language(&mut model, &tokenizer, &mel)?); diff --git a/screenpipe-server/src/bin/screenpipe-server.rs b/screenpipe-server/src/bin/screenpipe-server.rs index cd0ab1b8..454ad199 100644 --- a/screenpipe-server/src/bin/screenpipe-server.rs +++ b/screenpipe-server/src/bin/screenpipe-server.rs @@ -1,16 +1,12 @@ use std::{ fs, net::SocketAddr, - sync::{ - mpsc::{channel, Sender}, - Arc, Mutex, - }, + sync::{mpsc::channel, Arc}, }; -use tokio::sync::oneshot; use tokio::time::Duration; -use screenpipe_server::{start_continuous_recording, DatabaseManager, RecorderControl, Server}; +use screenpipe_server::{start_continuous_recording, DatabaseManager, Server}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -18,7 +14,7 @@ async fn main() -> anyhow::Result<()> { use env_logger::Builder; use log::LevelFilter; - let _ = Builder::new() + Builder::new() .filter(None, LevelFilter::Info) .filter_module("tokenizers", LevelFilter::Error) .filter_module("rusty_tesseract", LevelFilter::Error) @@ -32,11 +28,11 @@ async fn main() -> anyhow::Result<()> { ); let db_server = db.clone(); // Channel for controlling the recorder - let (control_tx, control_rx) = channel(); + let (_control_tx, control_rx) = channel(); // Start continuous recording in a separate task let local_data_dir_clone = local_data_dir.clone(); - let recording_task = tokio::spawn(async move { + let _recording_task = tokio::spawn(async move { let fps = 10.0; let audio_chunk_duration = Duration::from_secs(5); @@ -71,8 +67,6 @@ async fn main() -> anyhow::Result<()> { // This part will never be reached in the current implementation // control_tx.send(RecorderControl::Stop).await?; // recording_task.await??; - - Ok(()) } fn ensure_local_data_dir() -> anyhow::Result { diff --git a/screenpipe-server/src/bin/screenpipe-video.rs b/screenpipe-server/src/bin/screenpipe-video.rs index 13894568..59cfa6a0 100644 --- a/screenpipe-server/src/bin/screenpipe-video.rs +++ b/screenpipe-server/src/bin/screenpipe-video.rs @@ -35,8 +35,8 @@ fn main() { } }; - let video_capture = VideoCapture::new(&output_path, fps, new_chunk_callback); - let (tx, rx): (Sender<()>, Receiver<()>) = channel(); + let video_capture = VideoCapture::new(output_path, fps, new_chunk_callback); + let (_tx, rx): (Sender<()>, Receiver<()>) = channel(); let rx = Arc::new(Mutex::new(rx)); let rx_thread = rx.clone(); @@ -51,7 +51,7 @@ fn main() { let json_file = OpenOptions::new() .create(true) - .write(true) + .append(true) .open(&json_output_path) .expect("Failed to create JSON file"); diff --git a/screenpipe-server/src/core.rs b/screenpipe-server/src/core.rs index 74bc9556..5a75483a 100644 --- a/screenpipe-server/src/core.rs +++ b/screenpipe-server/src/core.rs @@ -3,7 +3,6 @@ use anyhow::Result; use chrono::Utc; use log::{debug, error, info}; use screenpipe_audio::{continuous_audio_capture, save_audio_to_file, ControlMessage}; -use std::future::IntoFuture; use std::sync::mpsc::{self, Receiver}; use std::sync::{Arc, Mutex}; use std::thread; @@ -37,7 +36,7 @@ pub async fn start_continuous_recording( error!("Failed to insert new video chunk: {}", e); } }; - let video_capture = VideoCapture::new(&output_path, fps, new_chunk_callback); + let video_capture = VideoCapture::new(output_path, fps, new_chunk_callback); let control_rx = Arc::new(Mutex::new(control_rx)); let control_rx_video = Arc::clone(&control_rx); let control_rx_audio = Arc::clone(&control_rx); @@ -52,7 +51,7 @@ pub async fn start_continuous_recording( // TODO: too muhc nesting 🤦‍♂️ let video_thread = thread::spawn(move || { let runtime = tokio::runtime::Runtime::new().unwrap(); - runtime.block_on(async { + let _ = runtime.block_on(async { info!("Starting video capture thread"); let mut is_paused = false; loop { @@ -102,7 +101,7 @@ pub async fn start_continuous_recording( let audio_processing_thread = thread::spawn(move || { let runtime = tokio::runtime::Runtime::new().unwrap(); - runtime.block_on(async { + let _ = runtime.block_on(async { info!("Starting audio processing thread"); let mut is_paused = false; loop { @@ -170,7 +169,7 @@ pub async fn start_continuous_recording( video_thread.join().unwrap(); audio_processing_thread.join().unwrap(); audio_control_tx.send(ControlMessage::Stop).unwrap(); - audio_thread.join().unwrap(); + let _ = audio_thread.join().unwrap(); info!("Continuous recording stopped"); Ok(()) diff --git a/screenpipe-server/src/server.rs b/screenpipe-server/src/server.rs index 854529fd..e1f57262 100644 --- a/screenpipe-server/src/server.rs +++ b/screenpipe-server/src/server.rs @@ -1,21 +1,16 @@ use axum::{ - extract::{Path, Query, State}, + extract::{Query, State}, http::StatusCode, response::Json as JsonResponse, - routing::{get, post}, - serve, Json, Router, + routing::get, + serve, Router, }; -use chrono::{DateTime, NaiveDateTime, Utc}; -use image::{ImageBuffer, Rgb}; +use chrono::{DateTime, Utc}; use log::info; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::{ - io::Cursor, - net::SocketAddr, - sync::{Arc, Mutex}, -}; +use std::{net::SocketAddr, sync::Arc}; use tokio::net::TcpListener; use tower_http::cors::CorsLayer; @@ -61,12 +56,6 @@ struct DateRangeQuery { pagination: PaginationQuery, } -#[derive(Deserialize)] -struct FrameQuery { - #[serde(default)] - thumbnail: bool, -} - // Response structs #[derive(Serialize)] struct PaginatedResponse { @@ -231,9 +220,7 @@ impl Server { } pub async fn start(self) -> Result<(), std::io::Error> { - let app_state = Arc::new(AppState { - db: self.db, - }); + let app_state = Arc::new(AppState { db: self.db }); let app = Router::new() .route("/search", get(search)) diff --git a/screenpipe-server/src/video.rs b/screenpipe-server/src/video.rs index ef786121..c261b690 100644 --- a/screenpipe-server/src/video.rs +++ b/screenpipe-server/src/video.rs @@ -2,7 +2,7 @@ use chrono::Utc; use ffmpeg_next as ffmpeg; use ffmpeg_next::sys::AVSEEK_FLAG_FRAME; use ffmpeg_next::{format, format::Pixel, media, software::scaling, util::frame::video::Video}; -use image::ImageFormat::{self, Png}; +use image::ImageFormat::{self}; use image::{DynamicImage, ImageBuffer, Rgb}; use log::{debug, error, info, warn}; use screenpipe_vision::{continuous_capture, CaptureResult, ControlMessage}; @@ -56,7 +56,7 @@ pub fn extract_frames_from_video( 'frames: while let Some(&target_frame_number) = sorted_frame_numbers.iter().next() { // Seek to the nearest keyframe - seek_to_frame(&mut ictx, target_frame_number as i64)?; + seek_to_frame(&mut ictx, target_frame_number)?; while frame_index <= target_frame_number { for (stream, packet) in ictx.packets() { @@ -73,7 +73,7 @@ pub fn extract_frames_from_video( decoder.height() as u32, frame_data.to_vec(), ) - .ok_or_else(|| ffmpeg::Error::InvalidData)?; + .ok_or(ffmpeg::Error::InvalidData)?; images.push(DynamicImage::ImageRgb8(img)); sorted_frame_numbers.remove(&target_frame_number); if sorted_frame_numbers.is_empty() { @@ -92,58 +92,6 @@ pub fn extract_frames_from_video( Ok(images) } -pub fn extract_all_frames_from_video(video_path: &str) -> Result, ffmpeg::Error> { - ffmpeg::init()?; - - let mut images = Vec::new(); - let mut ictx = format::input(&video_path)?; - let input_stream = ictx - .streams() - .best(media::Type::Video) - .ok_or(ffmpeg::Error::StreamNotFound)?; - let video_stream_index = input_stream.index(); - - let context_decoder = - ffmpeg::codec::context::Context::from_parameters(input_stream.parameters())?; - let mut decoder = context_decoder.decoder().video()?; - - let mut scaler = scaling::Context::get( - decoder.format(), - decoder.width(), - decoder.height(), - Pixel::RGB24, - decoder.width(), - decoder.height(), - scaling::Flags::BILINEAR, - )?; - - let now = std::time::Instant::now(); - println!("All frames: Starting at {}ms", now.elapsed().as_millis()); - - for (stream, packet) in ictx.packets() { - if stream.index() == video_stream_index { - decoder.send_packet(&packet)?; - let mut decoded = Video::empty(); - while decoder.receive_frame(&mut decoded).is_ok() { - let mut rgb_frame = Video::empty(); - scaler.run(&decoded, &mut rgb_frame)?; - let frame_data = rgb_frame.data(0); - let img = ImageBuffer::, Vec>::from_raw( - decoder.width() as u32, - decoder.height() as u32, - frame_data.to_vec(), - ) - .ok_or_else(|| ffmpeg::Error::InvalidData)?; - images.push(DynamicImage::ImageRgb8(img)); - } - } - } - - println!("All frames: Done in {}ms", now.elapsed().as_millis()); - - Ok(images) -} - fn seek_to_frame( ictx: &mut format::context::Input, frame_number: i64, @@ -171,7 +119,6 @@ pub struct VideoCapture { frame_queue: Arc>>, ffmpeg_handle: Arc>>, is_running: Arc>, - new_chunk_callback: Arc, } impl VideoCapture { @@ -191,7 +138,7 @@ impl VideoCapture { let capture_frame_queue = frame_queue.clone(); let capture_thread_is_running = is_running.clone(); let (result_sender, result_receiver) = channel(); - let capture_thread = thread::spawn(move || { + let _capture_thread = thread::spawn(move || { continuous_capture( control_rx, result_sender, @@ -202,7 +149,7 @@ impl VideoCapture { info!("Started capture thread"); // Spawn another thread to handle receiving and queueing the results - let queue_thread = thread::spawn(move || { + let _queue_thread = thread::spawn(move || { while *capture_thread_is_running.lock().unwrap() { if let Ok(result) = result_receiver.recv() { capture_frame_queue.lock().unwrap().push_back(result); @@ -212,15 +159,13 @@ impl VideoCapture { let video_frame_queue = frame_queue.clone(); let video_thread_is_running = is_running.clone(); - let video_thread_ffmpeg_handle = ffmpeg_handle.clone(); let output_path = output_path.to_string(); - let video_thread = thread::spawn(move || { + let _video_thread = thread::spawn(move || { save_frames_as_video( &video_frame_queue, &output_path, fps, video_thread_is_running, - video_thread_ffmpeg_handle, new_chunk_callback_clone.as_ref(), ); }); @@ -230,7 +175,6 @@ impl VideoCapture { frame_queue, ffmpeg_handle, is_running, - new_chunk_callback, } } @@ -259,7 +203,6 @@ fn save_frames_as_video( output_path: &str, fps: f64, is_running: Arc>, - ffmpeg_handle: Arc>>, new_chunk_callback: &dyn Fn(String), ) { let frames_per_video = 30; // Adjust this value as needed