From 4f6baba6981f910a622bd63bea168975d676df8a Mon Sep 17 00:00:00 2001 From: Louis Beaumont Date: Thu, 26 Sep 2024 14:14:31 -0700 Subject: [PATCH] fix pipes issues --- Cargo.toml | 2 +- .../typescript/pipe-email-daily-log/pipe.ts | 68 ++++++++++++------- .../pipe-email-daily-log/screenpipe.d.ts | 8 +-- .../screenpipe.d.ts | 6 +- .../pipe-screen-to-crm/screenpipe.d.ts | 7 +- .../typescript/pipe-stream-ocr-text/pipe.js | 64 ----------------- .../typescript/pipe-stream-ocr-text/pipe.ts | 2 +- .../pipe-stream-ocr-text/screenpipe.d.ts | 6 +- .../components/pipe-config-form.tsx | 6 ++ screenpipe-app-tauri/src-tauri/Cargo.toml | 2 +- screenpipe-core/src/deno/runtime.js | 52 ++++---------- screenpipe-core/src/pipes.rs | 9 ++- screenpipe-core/tests/pipes_test.rs | 60 ++++++++++++++++ 13 files changed, 143 insertions(+), 149 deletions(-) delete mode 100644 examples/typescript/pipe-stream-ocr-text/pipe.js diff --git a/Cargo.toml b/Cargo.toml index 64ba7cd0..8e2bbfea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ resolver = "2" [workspace.package] -version = "0.1.92" +version = "0.1.93" authors = ["louis030195 "] description = "" repository = "https://github.com/mediar-ai/screenpipe" diff --git a/examples/typescript/pipe-email-daily-log/pipe.ts b/examples/typescript/pipe-email-daily-log/pipe.ts index b7fa5a11..cafa8170 100644 --- a/examples/typescript/pipe-email-daily-log/pipe.ts +++ b/examples/typescript/pipe-email-daily-log/pipe.ts @@ -45,6 +45,7 @@ async function queryScreenpipe( const response = await fetch(url); if (!response.ok) { + console.log("screenpipe response:", await response.text()); throw new Error(`http error! status: ${response.status}`); } return await response.json(); @@ -119,8 +120,9 @@ async function generateDailyLog( } async function saveDailyLog(logEntry: DailyLog): Promise { + console.log("creating logs dir"); const logsDir = `${process.env.PIPE_DIR}/logs`; - fs.mkdirSync(logsDir); + console.log("logs dir:", logsDir); console.log("saving log entry:", logEntry); console.log("logs dir:", logsDir); const timestamp = new Date() @@ -129,7 +131,10 @@ async function saveDailyLog(logEntry: DailyLog): Promise { .replace(/\..+/, ""); const filename = `${timestamp}-${logEntry.category.replace("/", "-")}.json`; console.log("filename:", filename); - fs.writeFileSync(`${logsDir}/${filename}`, JSON.stringify(logEntry, null, 2)); + await fs.writeFile( + `${logsDir}/${filename}`, + JSON.stringify(logEntry, null, 2) + ); } async function generateDailySummary( @@ -220,14 +225,17 @@ async function sendEmail( async function getTodayLogs(): Promise { try { const logsDir = `${process.env.PIPE_DIR}/logs`; - const today = new Date().toISOString().split("T")[0]; // Get today's date in YYYY-MM-DD format + const today = new Date().toISOString().replace(/:/g, "-").split("T")[0]; // Get today's date in YYYY-MM-DD format - const files = fs.readdirSync(logsDir); + console.log("reading logs dir:", logsDir); + const files = await fs.readdir(logsDir); + console.log("files:", files); const todayFiles = files.filter((file) => file.startsWith(today)); + console.log("today's files:", todayFiles); const logs: DailyLog[] = []; for (const file of todayFiles) { - const content = fs.readFileSync(`${logsDir}/${file}`); + const content = await fs.readFile(`${logsDir}/${file}`); logs.push(JSON.parse(content)); } @@ -256,6 +264,13 @@ async function dailyLogPipeline(): Promise { const windowName = config.windowName || ""; const pageSize = config.pageSize; + console.log("creating logs dir"); + const logsDir = `${process.env.PIPE_DIR}/logs`; + console.log("logs dir:", logsDir); + await fs.mkdir(logsDir).catch((error) => { + console.warn("error creating logs dir:", error); + }); + let lastEmailSent = new Date(0); // Initialize to a past date // send a welcome email to announce what will happen, when it will happen, and what it will do @@ -321,31 +336,32 @@ async function dailyLogPipeline(): Promise { } if (shouldSendSummary) { - await retry(async () => { - const todayLogs = await getTodayLogs(); - console.log("today's logs:", todayLogs); - - if (todayLogs.length > 0) { - const summary = await generateDailySummary( - todayLogs, - summaryPrompt, - ollamaModel, - ollamaApiUrl - ); - console.log("summary:", summary); - await sendEmail( - emailAddress, - emailPassword, - "activity summary", - summary - ); - lastEmailSent = now; - } - }); + // await retry(async () => { + const todayLogs = await getTodayLogs(); + console.log("today's logs:", todayLogs); + + if (todayLogs.length > 0) { + const summary = await generateDailySummary( + todayLogs, + summaryPrompt, + ollamaModel, + ollamaApiUrl + ); + console.log("summary:", summary); + await sendEmail( + emailAddress, + emailPassword, + "activity summary", + summary + ); + lastEmailSent = now; + } + // }); } } catch (error) { console.warn("error in daily log pipeline:", error); } + console.log("sleeping for", interval, "ms"); await new Promise((resolve) => setTimeout(resolve, interval)); } } diff --git a/examples/typescript/pipe-email-daily-log/screenpipe.d.ts b/examples/typescript/pipe-email-daily-log/screenpipe.d.ts index 02482681..a52563c4 100644 --- a/examples/typescript/pipe-email-daily-log/screenpipe.d.ts +++ b/examples/typescript/pipe-email-daily-log/screenpipe.d.ts @@ -25,10 +25,10 @@ declare global { }; const fs: { - readFileSync: (path: string) => string; - writeFileSync: (path: string, contents: string) => void; - readdirSync: (path: string) => string[]; - mkdirSync: (path: string) => void; + readFile: (path: string) => Promise; + writeFile: (path: string, contents: string) => Promise; + readdir: (path: string) => Promise; + mkdir: (path: string) => Promise; }; const path: { diff --git a/examples/typescript/pipe-phi3.5-engineering-team-logs/screenpipe.d.ts b/examples/typescript/pipe-phi3.5-engineering-team-logs/screenpipe.d.ts index ed848b81..05f902db 100644 --- a/examples/typescript/pipe-phi3.5-engineering-team-logs/screenpipe.d.ts +++ b/examples/typescript/pipe-phi3.5-engineering-team-logs/screenpipe.d.ts @@ -18,8 +18,10 @@ declare global { }; const fs: { - readFileSync: (path: string) => string; - writeFileSync: (path: string, contents: string) => void; + readFile: (path: string) => Promise; + writeFile: (path: string, contents: string) => Promise; + readdir: (path: string) => Promise; + mkdir: (path: string) => Promise; }; const path: { diff --git a/examples/typescript/pipe-screen-to-crm/screenpipe.d.ts b/examples/typescript/pipe-screen-to-crm/screenpipe.d.ts index ed848b81..480eb00b 100644 --- a/examples/typescript/pipe-screen-to-crm/screenpipe.d.ts +++ b/examples/typescript/pipe-screen-to-crm/screenpipe.d.ts @@ -18,10 +18,11 @@ declare global { }; const fs: { - readFileSync: (path: string) => string; - writeFileSync: (path: string, contents: string) => void; + readFile: (path: string) => Promise; + writeFile: (path: string, contents: string) => Promise; + readdir: (path: string) => Promise; + mkdir: (path: string) => Promise; }; - const path: { join: (...paths: string[]) => string; }; diff --git a/examples/typescript/pipe-stream-ocr-text/pipe.js b/examples/typescript/pipe-stream-ocr-text/pipe.js deleted file mode 100644 index 109cd348..00000000 --- a/examples/typescript/pipe-stream-ocr-text/pipe.js +++ /dev/null @@ -1,64 +0,0 @@ -"use strict"; -let INTERVAL = 10 * 1000; // 10 seconds in milliseconds -async function queryScreenpipe() { - try { - const now = new Date(); - const oneMinuteAgo = new Date(now.getTime() - INTERVAL); - const queryParams = `start_time=${oneMinuteAgo.toISOString()}&end_time=${now.toISOString()}&limit=50&content_type=ocr`; - const result = await fetch(`http://localhost:3030/search?${queryParams}`).then((r) => r.json()); - console.log("Retrieved", result.data.length, "items from screenpipe"); - return result.data; - } - catch (error) { - console.error("Error querying screenpipe:", error); - return []; - } -} -async function writeToMarkdown(data) { - console.log("Writing to markdown", JSON.stringify(data)); - const fileName = `screen-ocr-${new Date() - .toISOString() - .replace(/[:.]/g, "-")}.md`; - const content = data - .map((item) => `## ${item.content.timestamp}\n\n${item.content.text}\n\n---\n`) - .join("\n"); - const dir = path.join(process.env.PIPE_DIR, fileName); - console.log("Writing to", dir); - fs.writeFileSync(dir, content); - console.log(`Written OCR data to ${fileName}`); -} -async function runOCRTracker() { - console.log("Starting OCR Tracker"); - // wait 2 seocnds - await new Promise((resolve) => setTimeout(resolve, 1000)); - console.log(pipe); - await pipe.loadConfig(); - INTERVAL = pipe.config.interval * 1000; - console.log("INTERVAL", INTERVAL); - while (true) { - try { - const screenData = await queryScreenpipe(); - await writeToMarkdown(screenData); - } - catch (error) { - console.error("Error in OCR tracking:", error); - } - finally { - await new Promise((resolve) => setTimeout(resolve, INTERVAL)); - } - // const isEnabled = await pipe.isEnabled(); - // if (!isEnabled) { - // console.log("pipe is disabled"); - // break; - // } - } -} -// Self-invoking async function to run the OCR tracker -(async () => { - try { - await runOCRTracker(); - } - catch (error) { - console.error("Fatal error in OCR Tracker:", error); - } -})(); diff --git a/examples/typescript/pipe-stream-ocr-text/pipe.ts b/examples/typescript/pipe-stream-ocr-text/pipe.ts index fdc9fbcc..379a0bff 100644 --- a/examples/typescript/pipe-stream-ocr-text/pipe.ts +++ b/examples/typescript/pipe-stream-ocr-text/pipe.ts @@ -32,7 +32,7 @@ async function writeToMarkdown(data: any) { const dir = path.join(process.env.PIPE_DIR, fileName); console.log("Writing to", dir); - fs.writeFileSync(dir, content); + await fs.writeFile(dir, content); console.log(`Written OCR data to ${fileName}`); } diff --git a/examples/typescript/pipe-stream-ocr-text/screenpipe.d.ts b/examples/typescript/pipe-stream-ocr-text/screenpipe.d.ts index 836e6881..e98bc8eb 100644 --- a/examples/typescript/pipe-stream-ocr-text/screenpipe.d.ts +++ b/examples/typescript/pipe-stream-ocr-text/screenpipe.d.ts @@ -18,8 +18,10 @@ declare global { }; const fs: { - readFileSync: (path: string) => string; - writeFileSync: (path: string, contents: string) => void; + readFile: (path: string) => Promise; + writeFile: (path: string, contents: string) => Promise; + readdir: (path: string) => Promise; + mkdir: (path: string) => Promise; }; const path: { diff --git a/screenpipe-app-tauri/components/pipe-config-form.tsx b/screenpipe-app-tauri/components/pipe-config-form.tsx index 24e6bf19..24331908 100644 --- a/screenpipe-app-tauri/components/pipe-config-form.tsx +++ b/screenpipe-app-tauri/components/pipe-config-form.tsx @@ -119,6 +119,8 @@ export const PipeConfigForm: React.FC = ({ onChange={(e) => handleInputChange(field.name, parseFloat(e.target.value) || 0) } + autoCorrect="off" + spellCheck="false" /> @@ -147,6 +149,8 @@ export const PipeConfigForm: React.FC = ({ type="time" value={value} onChange={(e) => handleInputChange(field.name, e.target.value)} + autoCorrect="off" + spellCheck="false" /> @@ -206,6 +210,8 @@ export const PipeConfigForm: React.FC = ({ type="text" value={value} onChange={(e) => handleInputChange(field.name, e.target.value)} + autoCorrect="off" + spellCheck="false" /> diff --git a/screenpipe-app-tauri/src-tauri/Cargo.toml b/screenpipe-app-tauri/src-tauri/Cargo.toml index fbc0c230..6759acf5 100644 --- a/screenpipe-app-tauri/src-tauri/Cargo.toml +++ b/screenpipe-app-tauri/src-tauri/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "screenpipe-app" -version = "0.2.67" +version = "0.2.68" description = "" authors = ["you"] license = "" diff --git a/screenpipe-core/src/deno/runtime.js b/screenpipe-core/src/deno/runtime.js index 98623ed8..8389c642 100644 --- a/screenpipe-core/src/deno/runtime.js +++ b/screenpipe-core/src/deno/runtime.js @@ -34,15 +34,15 @@ const sendLog = async (level, ...args) => { const console = { log: (...args) => { - core.print(`[js][info]: ${argsToMessage(...args)}\n`, false); + core.print(`[pipe][${globalThis.metadata.id}][info]: ${argsToMessage(...args)}\n`, false); // sendLog("info", ...args); }, error: (...args) => { - core.print(`[js][error]: ${argsToMessage(...args)}\n`, true); + core.print(`[pipe][${globalThis.metadata.id}][error]: ${argsToMessage(...args)}\n`, true); // sendLog("error", ...args); }, warn: (...args) => { - core.print(`[js][warn]: ${argsToMessage(...args)}\n`, true); + core.print(`[pipe][${globalThis.metadata.id}][warn]: ${argsToMessage(...args)}\n`, true); // sendLog("warn", ...args); } }; @@ -160,46 +160,18 @@ const pipe = { // } }; -const fs = { // TODO does not work? - readFileSync: (path) => { - // This is a synchronous wrapper around the async operation - // Note: This will block the event loop and should be used carefully - return new Promise((resolve, reject) => { - ops.op_read_file(path) - .then(resolve) - .catch(reject); - }); - }, - writeFileSync: (path, contents) => { - // Similarly, this is a synchronous wrapper - return new Promise((resolve, reject) => { - ops.op_write_file(path, contents) - .then(resolve) - .catch(reject); - }); +const fs = { + readFile: async (path) => { + return await ops.op_read_file(path); }, - readdirSync: (path) => { - return new Promise((resolve, reject) => { - ops.op_readdir(path) - .then(resolve) - .catch(reject); - }); + writeFile: async (path, contents) => { + return await ops.op_write_file(path, contents); }, - mkdirSync: (path) => { - // This is a synchronous wrapper around the async operation - return new Promise((resolve, reject) => { - ops.op_create_dir(path) - .then(resolve) - .catch(reject); - }); + readdir: async (path) => { + return await ops.op_readdir(path); }, - statSync: (path) => { - // This is a synchronous wrapper around the async operation - return new Promise((resolve, reject) => { - ops.op_stat(path) - .then(resolve) - .catch(reject); - }); + mkdir: async (path) => { + return await ops.op_create_dir(path); } }; diff --git a/screenpipe-core/src/pipes.rs b/screenpipe-core/src/pipes.rs index 95a7da3d..e03545e0 100644 --- a/screenpipe-core/src/pipes.rs +++ b/screenpipe-core/src/pipes.rs @@ -135,8 +135,7 @@ mod pipes { let mut entries = tokio::fs::read_dir(&path).await?; let mut file_names = Vec::new(); while let Some(entry) = entries.next_entry().await? { - let path = entry.path(); - if let Some(file_name) = path.file_name().and_then(|os_str| os_str.to_str()) { + if let Some(file_name) = entry.file_name().to_str() { file_names.push(file_name.to_string()); } } @@ -228,7 +227,6 @@ mod pipes { } } - // Add this new operation #[op2(async)] async fn op_create_dir(#[string] path: String) -> anyhow::Result<(), AnyError> { tokio::fs::create_dir_all(&path).await.map_err(|e| { @@ -317,14 +315,15 @@ mod pipes { op_read_file, op_write_file, op_remove_file, + op_readdir, + op_create_dir, + op_fetch_get, op_fetch_post, op_set_timeout, op_fetch, op_get_env, - op_readdir, op_send_email, - op_create_dir, ] } diff --git a/screenpipe-core/tests/pipes_test.rs b/screenpipe-core/tests/pipes_test.rs index c940799e..0a876601 100644 --- a/screenpipe-core/tests/pipes_test.rs +++ b/screenpipe-core/tests/pipes_test.rs @@ -289,4 +289,64 @@ generated by your screenpipe ai assistant (who's definitely not planning to take let result = run_pipe(pipe_dir.to_string_lossy().to_string(), screenpipe_dir).await; assert!(result.is_ok(), "Pipe execution failed: {:?}", result); } + + #[tokio::test] + async fn test_directory_functions() { + let temp_dir = TempDir::new().unwrap(); + let screenpipe_dir = temp_dir.path().to_path_buf(); + + let code = r#" + (async () => { + // Test mkdir + await fs.mkdir('test_dir'); + console.log('Directory created'); + + // Test writeFile + await fs.writeFile('test_dir/test_file.txt', 'Hello, World!'); + console.log('File written'); + + // Test readFile + const content = await fs.readFile('test_dir/test_file.txt'); + console.log('File content:', content); + if (content !== 'Hello, World!') { + throw new Error('File content mismatch'); + } + + // Test readdir + const files = await fs.readdir('test_dir'); + console.log('Directory contents:', files); + if (!files.includes('test_file.txt')) { + throw new Error('File not found in directory'); + } + + // Test path.join + const joinedPath = path.join('test_dir', 'nested', 'file.txt'); + console.log('Joined path:', joinedPath); + const expectedPath = process.env.OS === 'windows' ? 'test_dir\\nested\\file.txt' : 'test_dir/nested/file.txt'; + if (joinedPath !== expectedPath) { + throw new Error('Path join mismatch'); + } + + console.log('All directory function tests passed'); + })(); + "#; + + let pipe_dir = setup_test_pipe(&temp_dir, "directory_functions_test", code).await; + + // Change the working directory to the pipe directory + std::env::set_current_dir(&pipe_dir).unwrap(); + + let result = run_pipe(pipe_dir.to_string_lossy().to_string(), screenpipe_dir).await; + assert!(result.is_ok(), "Pipe execution failed: {:?}", result); + + // Additional checks + let test_dir = pipe_dir.join("test_dir"); + assert!(test_dir.exists(), "Test directory was not created"); + + let test_file = test_dir.join("test_file.txt"); + assert!(test_file.exists(), "Test file was not created"); + + let file_content = std::fs::read_to_string(test_file).unwrap(); + assert_eq!(file_content, "Hello, World!", "File content mismatch"); + } }