Skip to content

Commit

Permalink
chore: remove tons of warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
louis030195 committed Jul 3, 2024
1 parent a59c727 commit 8336f44
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 116 deletions.
17 changes: 5 additions & 12 deletions screenpipe-audio/src/bin/screenpipe-audio.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::Result;
use env_logger::Env;
use log::info;
use screenpipe_audio::{continuous_audio_capture, ControlMessage};
use screenpipe_audio::continuous_audio_capture;
use std::thread;
use std::time::Duration;

Expand All @@ -10,27 +9,21 @@ fn main() -> Result<()> {
use log::LevelFilter;
use std::sync::mpsc;

let _ = Builder::new()
Builder::new()
.filter(None, LevelFilter::Info)
.filter_module("tokenizers", LevelFilter::Error)
.init();


let (control_tx, control_rx) = mpsc::channel();
let (_control_tx, control_rx) = mpsc::channel();
let (result_tx, result_rx) = mpsc::channel();
let chunk_duration = Duration::from_secs(5);

let capture_thread =
let _capture_thread =
thread::spawn(move || continuous_audio_capture(control_rx, result_tx, chunk_duration));

while true {
loop {
if let Ok(result) = result_rx.recv_timeout(Duration::from_secs(5)) {
info!("Transcription: {}", result.text);
}
}

control_tx.send(ControlMessage::Stop)?;
capture_thread.join().unwrap()?;

Ok(())
}
6 changes: 3 additions & 3 deletions screenpipe-audio/src/stt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use rubato::{
Resampler, SincFixedIn, SincInterpolationParameters, SincInterpolationType, WindowFunction,
};

use crate::{multilingual, pcm_decode::pcm_decode};
use crate::{pcm_decode::pcm_decode};

// TODO: improve model loading strategy

Expand Down Expand Up @@ -458,10 +458,10 @@ pub fn stt(input: &str) -> Result<String> {
// info!("loaded mel: {:?}", mel.dims());

let vb = candle_transformers::quantized_var_builder::VarBuilder::from_gguf(
&weights_filename,
weights_filename,
&device,
)?;
let mut model = Model::Quantized(m::quantized_model::Whisper::load(&vb, config)?);
let model = Model::Quantized(m::quantized_model::Whisper::load(&vb, config)?);
// info!("detecting language");
// TODO: disabled seems slow as fuck
// let language_token = Some(multilingual::detect_language(&mut model, &tokenizer, &mel)?);
Expand Down
16 changes: 5 additions & 11 deletions screenpipe-server/src/bin/screenpipe-server.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
use std::{
fs,
net::SocketAddr,
sync::{
mpsc::{channel, Sender},
Arc, Mutex,
},
sync::{mpsc::channel, Arc},
};

use tokio::sync::oneshot;
use tokio::time::Duration;

use screenpipe_server::{start_continuous_recording, DatabaseManager, RecorderControl, Server};
use screenpipe_server::{start_continuous_recording, DatabaseManager, Server};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize logging
use env_logger::Builder;
use log::LevelFilter;

let _ = Builder::new()
Builder::new()
.filter(None, LevelFilter::Info)
.filter_module("tokenizers", LevelFilter::Error)
.filter_module("rusty_tesseract", LevelFilter::Error)
Expand All @@ -32,11 +28,11 @@ async fn main() -> anyhow::Result<()> {
);
let db_server = db.clone();
// Channel for controlling the recorder
let (control_tx, control_rx) = channel();
let (_control_tx, control_rx) = channel();

// Start continuous recording in a separate task
let local_data_dir_clone = local_data_dir.clone();
let recording_task = tokio::spawn(async move {
let _recording_task = tokio::spawn(async move {
let fps = 10.0;
let audio_chunk_duration = Duration::from_secs(5);

Expand Down Expand Up @@ -71,8 +67,6 @@ async fn main() -> anyhow::Result<()> {
// This part will never be reached in the current implementation
// control_tx.send(RecorderControl::Stop).await?;
// recording_task.await??;

Ok(())
}

fn ensure_local_data_dir() -> anyhow::Result<String> {
Expand Down
6 changes: 3 additions & 3 deletions screenpipe-server/src/bin/screenpipe-video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ fn main() {
}
};

let video_capture = VideoCapture::new(&output_path, fps, new_chunk_callback);
let (tx, rx): (Sender<()>, Receiver<()>) = channel();
let video_capture = VideoCapture::new(output_path, fps, new_chunk_callback);
let (_tx, rx): (Sender<()>, Receiver<()>) = channel();
let rx = Arc::new(Mutex::new(rx));
let rx_thread = rx.clone();

Expand All @@ -51,7 +51,7 @@ fn main() {

let json_file = OpenOptions::new()
.create(true)
.write(true)

.append(true)
.open(&json_output_path)
.expect("Failed to create JSON file");
Expand Down
9 changes: 4 additions & 5 deletions screenpipe-server/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use anyhow::Result;
use chrono::Utc;
use log::{debug, error, info};
use screenpipe_audio::{continuous_audio_capture, save_audio_to_file, ControlMessage};
use std::future::IntoFuture;
use std::sync::mpsc::{self, Receiver};
use std::sync::{Arc, Mutex};
use std::thread;
Expand Down Expand Up @@ -37,7 +36,7 @@ pub async fn start_continuous_recording(
error!("Failed to insert new video chunk: {}", e);
}
};
let video_capture = VideoCapture::new(&output_path, fps, new_chunk_callback);
let video_capture = VideoCapture::new(output_path, fps, new_chunk_callback);
let control_rx = Arc::new(Mutex::new(control_rx));
let control_rx_video = Arc::clone(&control_rx);
let control_rx_audio = Arc::clone(&control_rx);
Expand All @@ -52,7 +51,7 @@ pub async fn start_continuous_recording(
// TODO: too muhc nesting 🤦‍♂️
let video_thread = thread::spawn(move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
let _ = runtime.block_on(async {
info!("Starting video capture thread");
let mut is_paused = false;
loop {
Expand Down Expand Up @@ -102,7 +101,7 @@ pub async fn start_continuous_recording(

let audio_processing_thread = thread::spawn(move || {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
let _ = runtime.block_on(async {
info!("Starting audio processing thread");
let mut is_paused = false;
loop {
Expand Down Expand Up @@ -170,7 +169,7 @@ pub async fn start_continuous_recording(
video_thread.join().unwrap();
audio_processing_thread.join().unwrap();
audio_control_tx.send(ControlMessage::Stop).unwrap();
audio_thread.join().unwrap();
let _ = audio_thread.join().unwrap();

info!("Continuous recording stopped");
Ok(())
Expand Down
25 changes: 6 additions & 19 deletions screenpipe-server/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
use axum::{
extract::{Path, Query, State},
extract::{Query, State},
http::StatusCode,
response::Json as JsonResponse,
routing::{get, post},
serve, Json, Router,
routing::get,
serve, Router,
};

use chrono::{DateTime, NaiveDateTime, Utc};
use image::{ImageBuffer, Rgb};
use chrono::{DateTime, Utc};
use log::info;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{
io::Cursor,
net::SocketAddr,
sync::{Arc, Mutex},
};
use std::{net::SocketAddr, sync::Arc};
use tokio::net::TcpListener;
use tower_http::cors::CorsLayer;

Expand Down Expand Up @@ -61,12 +56,6 @@ struct DateRangeQuery {
pagination: PaginationQuery,
}

#[derive(Deserialize)]
struct FrameQuery {
#[serde(default)]
thumbnail: bool,
}

// Response structs
#[derive(Serialize)]
struct PaginatedResponse<T> {
Expand Down Expand Up @@ -231,9 +220,7 @@ impl Server {
}

pub async fn start(self) -> Result<(), std::io::Error> {
let app_state = Arc::new(AppState {
db: self.db,
});
let app_state = Arc::new(AppState { db: self.db });

let app = Router::new()
.route("/search", get(search))
Expand Down
69 changes: 6 additions & 63 deletions screenpipe-server/src/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use chrono::Utc;
use ffmpeg_next as ffmpeg;
use ffmpeg_next::sys::AVSEEK_FLAG_FRAME;
use ffmpeg_next::{format, format::Pixel, media, software::scaling, util::frame::video::Video};
use image::ImageFormat::{self, Png};
use image::ImageFormat::{self};
use image::{DynamicImage, ImageBuffer, Rgb};
use log::{debug, error, info, warn};
use screenpipe_vision::{continuous_capture, CaptureResult, ControlMessage};
Expand Down Expand Up @@ -56,7 +56,7 @@ pub fn extract_frames_from_video(

'frames: while let Some(&target_frame_number) = sorted_frame_numbers.iter().next() {
// Seek to the nearest keyframe
seek_to_frame(&mut ictx, target_frame_number as i64)?;
seek_to_frame(&mut ictx, target_frame_number)?;

while frame_index <= target_frame_number {
for (stream, packet) in ictx.packets() {
Expand All @@ -73,7 +73,7 @@ pub fn extract_frames_from_video(
decoder.height() as u32,
frame_data.to_vec(),
)
.ok_or_else(|| ffmpeg::Error::InvalidData)?;
.ok_or(ffmpeg::Error::InvalidData)?;
images.push(DynamicImage::ImageRgb8(img));
sorted_frame_numbers.remove(&target_frame_number);
if sorted_frame_numbers.is_empty() {
Expand All @@ -92,58 +92,6 @@ pub fn extract_frames_from_video(
Ok(images)
}

pub fn extract_all_frames_from_video(video_path: &str) -> Result<Vec<DynamicImage>, ffmpeg::Error> {
ffmpeg::init()?;

let mut images = Vec::new();
let mut ictx = format::input(&video_path)?;
let input_stream = ictx
.streams()
.best(media::Type::Video)
.ok_or(ffmpeg::Error::StreamNotFound)?;
let video_stream_index = input_stream.index();

let context_decoder =
ffmpeg::codec::context::Context::from_parameters(input_stream.parameters())?;
let mut decoder = context_decoder.decoder().video()?;

let mut scaler = scaling::Context::get(
decoder.format(),
decoder.width(),
decoder.height(),
Pixel::RGB24,
decoder.width(),
decoder.height(),
scaling::Flags::BILINEAR,
)?;

let now = std::time::Instant::now();
println!("All frames: Starting at {}ms", now.elapsed().as_millis());

for (stream, packet) in ictx.packets() {
if stream.index() == video_stream_index {
decoder.send_packet(&packet)?;
let mut decoded = Video::empty();
while decoder.receive_frame(&mut decoded).is_ok() {
let mut rgb_frame = Video::empty();
scaler.run(&decoded, &mut rgb_frame)?;
let frame_data = rgb_frame.data(0);
let img = ImageBuffer::<Rgb<u8>, Vec<u8>>::from_raw(
decoder.width() as u32,
decoder.height() as u32,
frame_data.to_vec(),
)
.ok_or_else(|| ffmpeg::Error::InvalidData)?;
images.push(DynamicImage::ImageRgb8(img));
}
}
}

println!("All frames: Done in {}ms", now.elapsed().as_millis());

Ok(images)
}

fn seek_to_frame(
ictx: &mut format::context::Input,
frame_number: i64,
Expand Down Expand Up @@ -171,7 +119,6 @@ pub struct VideoCapture {
frame_queue: Arc<Mutex<VecDeque<CaptureResult>>>,
ffmpeg_handle: Arc<Mutex<Option<std::process::Child>>>,
is_running: Arc<Mutex<bool>>,
new_chunk_callback: Arc<dyn Fn(String) + Send + Sync>,
}

impl VideoCapture {
Expand All @@ -191,7 +138,7 @@ impl VideoCapture {
let capture_frame_queue = frame_queue.clone();
let capture_thread_is_running = is_running.clone();
let (result_sender, result_receiver) = channel();
let capture_thread = thread::spawn(move || {
let _capture_thread = thread::spawn(move || {
continuous_capture(
control_rx,
result_sender,
Expand All @@ -202,7 +149,7 @@ impl VideoCapture {
info!("Started capture thread");

// Spawn another thread to handle receiving and queueing the results
let queue_thread = thread::spawn(move || {
let _queue_thread = thread::spawn(move || {
while *capture_thread_is_running.lock().unwrap() {
if let Ok(result) = result_receiver.recv() {
capture_frame_queue.lock().unwrap().push_back(result);
Expand All @@ -212,15 +159,13 @@ impl VideoCapture {

let video_frame_queue = frame_queue.clone();
let video_thread_is_running = is_running.clone();
let video_thread_ffmpeg_handle = ffmpeg_handle.clone();
let output_path = output_path.to_string();
let video_thread = thread::spawn(move || {
let _video_thread = thread::spawn(move || {
save_frames_as_video(
&video_frame_queue,
&output_path,
fps,
video_thread_is_running,
video_thread_ffmpeg_handle,
new_chunk_callback_clone.as_ref(),
);
});
Expand All @@ -230,7 +175,6 @@ impl VideoCapture {
frame_queue,
ffmpeg_handle,
is_running,
new_chunk_callback,
}
}

Expand Down Expand Up @@ -259,7 +203,6 @@ fn save_frames_as_video(
output_path: &str,
fps: f64,
is_running: Arc<Mutex<bool>>,
ffmpeg_handle: Arc<Mutex<Option<std::process::Child>>>,
new_chunk_callback: &dyn Fn(String),
) {
let frames_per_video = 30; // Adjust this value as needed
Expand Down

0 comments on commit 8336f44

Please sign in to comment.