Skip to content

Commit

Permalink
fix pipes issues
Browse files Browse the repository at this point in the history
  • Loading branch information
louis030195 committed Sep 26, 2024
1 parent 0aab75b commit 4f6baba
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 149 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ resolver = "2"


[workspace.package]
version = "0.1.92"
version = "0.1.93"
authors = ["louis030195 <hi@louis030195.com>"]
description = ""
repository = "https://github.com/mediar-ai/screenpipe"
Expand Down
68 changes: 42 additions & 26 deletions examples/typescript/pipe-email-daily-log/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async function queryScreenpipe(

const response = await fetch(url);
if (!response.ok) {
console.log("screenpipe response:", await response.text());
throw new Error(`http error! status: ${response.status}`);
}
return await response.json();
Expand Down Expand Up @@ -119,8 +120,9 @@ async function generateDailyLog(
}

async function saveDailyLog(logEntry: DailyLog): Promise<void> {
console.log("creating logs dir");
const logsDir = `${process.env.PIPE_DIR}/logs`;
fs.mkdirSync(logsDir);
console.log("logs dir:", logsDir);
console.log("saving log entry:", logEntry);
console.log("logs dir:", logsDir);
const timestamp = new Date()
Expand All @@ -129,7 +131,10 @@ async function saveDailyLog(logEntry: DailyLog): Promise<void> {
.replace(/\..+/, "");
const filename = `${timestamp}-${logEntry.category.replace("/", "-")}.json`;
console.log("filename:", filename);
fs.writeFileSync(`${logsDir}/${filename}`, JSON.stringify(logEntry, null, 2));
await fs.writeFile(
`${logsDir}/${filename}`,
JSON.stringify(logEntry, null, 2)
);
}

async function generateDailySummary(
Expand Down Expand Up @@ -220,14 +225,17 @@ async function sendEmail(
async function getTodayLogs(): Promise<DailyLog[]> {
try {
const logsDir = `${process.env.PIPE_DIR}/logs`;
const today = new Date().toISOString().split("T")[0]; // Get today's date in YYYY-MM-DD format
const today = new Date().toISOString().replace(/:/g, "-").split("T")[0]; // Get today's date in YYYY-MM-DD format

const files = fs.readdirSync(logsDir);
console.log("reading logs dir:", logsDir);
const files = await fs.readdir(logsDir);
console.log("files:", files);
const todayFiles = files.filter((file) => file.startsWith(today));
console.log("today's files:", todayFiles);

const logs: DailyLog[] = [];
for (const file of todayFiles) {
const content = fs.readFileSync(`${logsDir}/${file}`);
const content = await fs.readFile(`${logsDir}/${file}`);
logs.push(JSON.parse(content));
}

Expand Down Expand Up @@ -256,6 +264,13 @@ async function dailyLogPipeline(): Promise<void> {
const windowName = config.windowName || "";
const pageSize = config.pageSize;

console.log("creating logs dir");
const logsDir = `${process.env.PIPE_DIR}/logs`;
console.log("logs dir:", logsDir);
await fs.mkdir(logsDir).catch((error) => {
console.warn("error creating logs dir:", error);
});

let lastEmailSent = new Date(0); // Initialize to a past date

// send a welcome email to announce what will happen, when it will happen, and what it will do
Expand Down Expand Up @@ -321,31 +336,32 @@ async function dailyLogPipeline(): Promise<void> {
}

if (shouldSendSummary) {
await retry(async () => {
const todayLogs = await getTodayLogs();
console.log("today's logs:", todayLogs);

if (todayLogs.length > 0) {
const summary = await generateDailySummary(
todayLogs,
summaryPrompt,
ollamaModel,
ollamaApiUrl
);
console.log("summary:", summary);
await sendEmail(
emailAddress,
emailPassword,
"activity summary",
summary
);
lastEmailSent = now;
}
});
// await retry(async () => {
const todayLogs = await getTodayLogs();
console.log("today's logs:", todayLogs);

if (todayLogs.length > 0) {
const summary = await generateDailySummary(
todayLogs,
summaryPrompt,
ollamaModel,
ollamaApiUrl
);
console.log("summary:", summary);
await sendEmail(
emailAddress,
emailPassword,
"activity summary",
summary
);
lastEmailSent = now;
}
// });
}
} catch (error) {
console.warn("error in daily log pipeline:", error);
}
console.log("sleeping for", interval, "ms");
await new Promise((resolve) => setTimeout(resolve, interval));
}
}
Expand Down
8 changes: 4 additions & 4 deletions examples/typescript/pipe-email-daily-log/screenpipe.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ declare global {
};

const fs: {
readFileSync: (path: string) => string;
writeFileSync: (path: string, contents: string) => void;
readdirSync: (path: string) => string[];
mkdirSync: (path: string) => void;
readFile: (path: string) => Promise<string>;
writeFile: (path: string, contents: string) => Promise<void>;
readdir: (path: string) => Promise<string[]>;
mkdir: (path: string) => Promise<void>;
};

const path: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ declare global {
};

const fs: {
readFileSync: (path: string) => string;
writeFileSync: (path: string, contents: string) => void;
readFile: (path: string) => Promise<string>;
writeFile: (path: string, contents: string) => Promise<void>;
readdir: (path: string) => Promise<string[]>;
mkdir: (path: string) => Promise<void>;
};

const path: {
Expand Down
7 changes: 4 additions & 3 deletions examples/typescript/pipe-screen-to-crm/screenpipe.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ declare global {
};

const fs: {
readFileSync: (path: string) => string;
writeFileSync: (path: string, contents: string) => void;
readFile: (path: string) => Promise<string>;
writeFile: (path: string, contents: string) => Promise<void>;
readdir: (path: string) => Promise<string[]>;
mkdir: (path: string) => Promise<void>;
};

const path: {
join: (...paths: string[]) => string;
};
Expand Down
64 changes: 0 additions & 64 deletions examples/typescript/pipe-stream-ocr-text/pipe.js

This file was deleted.

2 changes: 1 addition & 1 deletion examples/typescript/pipe-stream-ocr-text/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async function writeToMarkdown(data: any) {

const dir = path.join(process.env.PIPE_DIR, fileName);
console.log("Writing to", dir);
fs.writeFileSync(dir, content);
await fs.writeFile(dir, content);
console.log(`Written OCR data to ${fileName}`);
}

Expand Down
6 changes: 4 additions & 2 deletions examples/typescript/pipe-stream-ocr-text/screenpipe.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ declare global {
};

const fs: {
readFileSync: (path: string) => string;
writeFileSync: (path: string, contents: string) => void;
readFile: (path: string) => Promise<string>;
writeFile: (path: string, contents: string) => Promise<void>;
readdir: (path: string) => Promise<string[]>;
mkdir: (path: string) => Promise<void>;
};

const path: {
Expand Down
6 changes: 6 additions & 0 deletions screenpipe-app-tauri/components/pipe-config-form.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ export const PipeConfigForm: React.FC<PipeConfigFormProps> = ({
onChange={(e) =>
handleInputChange(field.name, parseFloat(e.target.value) || 0)
}
autoCorrect="off"
spellCheck="false"
/>
<TooltipProvider>
<Tooltip>
Expand Down Expand Up @@ -147,6 +149,8 @@ export const PipeConfigForm: React.FC<PipeConfigFormProps> = ({
type="time"
value={value}
onChange={(e) => handleInputChange(field.name, e.target.value)}
autoCorrect="off"
spellCheck="false"
/>
<TooltipProvider>
<Tooltip>
Expand Down Expand Up @@ -206,6 +210,8 @@ export const PipeConfigForm: React.FC<PipeConfigFormProps> = ({
type="text"
value={value}
onChange={(e) => handleInputChange(field.name, e.target.value)}
autoCorrect="off"
spellCheck="false"
/>
<TooltipProvider>
<Tooltip>
Expand Down
2 changes: 1 addition & 1 deletion screenpipe-app-tauri/src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "screenpipe-app"
version = "0.2.67"
version = "0.2.68"
description = ""
authors = ["you"]
license = ""
Expand Down
52 changes: 12 additions & 40 deletions screenpipe-core/src/deno/runtime.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ const sendLog = async (level, ...args) => {

const console = {
log: (...args) => {
core.print(`[js][info]: ${argsToMessage(...args)}\n`, false);
core.print(`[pipe][${globalThis.metadata.id}][info]: ${argsToMessage(...args)}\n`, false);
// sendLog("info", ...args);
},
error: (...args) => {
core.print(`[js][error]: ${argsToMessage(...args)}\n`, true);
core.print(`[pipe][${globalThis.metadata.id}][error]: ${argsToMessage(...args)}\n`, true);
// sendLog("error", ...args);
},
warn: (...args) => {
core.print(`[js][warn]: ${argsToMessage(...args)}\n`, true);
core.print(`[pipe][${globalThis.metadata.id}][warn]: ${argsToMessage(...args)}\n`, true);
// sendLog("warn", ...args);
}
};
Expand Down Expand Up @@ -160,46 +160,18 @@ const pipe = {
// }
};

const fs = { // TODO does not work?
readFileSync: (path) => {
// This is a synchronous wrapper around the async operation
// Note: This will block the event loop and should be used carefully
return new Promise((resolve, reject) => {
ops.op_read_file(path)
.then(resolve)
.catch(reject);
});
},
writeFileSync: (path, contents) => {
// Similarly, this is a synchronous wrapper
return new Promise((resolve, reject) => {
ops.op_write_file(path, contents)
.then(resolve)
.catch(reject);
});
const fs = {
readFile: async (path) => {
return await ops.op_read_file(path);
},
readdirSync: (path) => {
return new Promise((resolve, reject) => {
ops.op_readdir(path)
.then(resolve)
.catch(reject);
});
writeFile: async (path, contents) => {
return await ops.op_write_file(path, contents);
},
mkdirSync: (path) => {
// This is a synchronous wrapper around the async operation
return new Promise((resolve, reject) => {
ops.op_create_dir(path)
.then(resolve)
.catch(reject);
});
readdir: async (path) => {
return await ops.op_readdir(path);
},
statSync: (path) => {
// This is a synchronous wrapper around the async operation
return new Promise((resolve, reject) => {
ops.op_stat(path)
.then(resolve)
.catch(reject);
});
mkdir: async (path) => {
return await ops.op_create_dir(path);
}
};

Expand Down
Loading

0 comments on commit 4f6baba

Please sign in to comment.