Skip to content

Commit

Permalink
bacalhau: add restart
Browse files Browse the repository at this point in the history
  • Loading branch information
juliangruber committed Aug 21, 2023
1 parent 1049d97 commit d467583
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 34 deletions.
4 changes: 2 additions & 2 deletions commands/station.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ export const station = async ({ json, experimental }) => {
]

if (experimental) {
modules.push(bacalhau.start({
modules.push(pRetry(() => bacalhau.run({
FIL_WALLET_ADDRESS,
storagePath: join(paths.moduleCache, 'bacalhau'),
onActivity: activity => {
activities.submit({ source: 'Bacalhau', ...activity })
},
onMetrics: m => metrics.submit('bacalhau', m)
}))
}), { retries: 1000 }))
}

await Promise.all(modules)
Expand Down
74 changes: 42 additions & 32 deletions lib/bacalhau.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { fetch } from 'undici'
import Sentry from '@sentry/node'
import { installBinaryModule, getBinaryModuleExecutable } from './modules.js'
import os from 'node:os'
import { once } from 'node:events'

const DIST_TAG = 'v1.0.3'
const { TARGET_ARCH = os.arch() } = process.env
Expand All @@ -25,7 +26,7 @@ export async function install () {
})
}

export async function start ({
export async function run ({
FIL_WALLET_ADDRESS,
storagePath,
onActivity,
Expand Down Expand Up @@ -75,50 +76,59 @@ export async function start ({

const apiMatch = output.match(/^API: (http.*)$/m)
if (apiMatch) {
const apiUrl = apiMatch[1]

childProcess.stdout.off('data', readyHandler)
onActivity({ type: 'info', message: 'Bacalhau module started.' })
setInterval(() => {
updateStats({ apiUrl, onMetrics })
.catch(err => {
console.error(
`Cannot fetch Bacalhau module stats. ${err.stack || err.message || err}`
)
})
}, 1000).unref()
resolve()
const apiUrl = apiMatch[1]
resolve(apiUrl)
}
}
childProcess.stdout.on('data', readyHandler)
childProcess.catch(reject)
})

childProcess.on('close', code => {
console.error(
`Bacalhau closed all stdio with code ${code ?? '<no code>'}`
)
childProcess.stderr.removeAllListeners()
childProcess.stdout.removeAllListeners()
Sentry.captureException('Bacalhau exited')
})

childProcess.on('exit', (code, signal) => {
const reason = signal ? `via signal ${signal}` : `with code: ${code}`
const msg = `Bacalhau exited ${reason}`
onActivity({ type: 'info', message: msg })
})

try {
await Promise.race([
readyPromise,
timers.setTimeout(500)
])
} catch (err) {
const errorMsg = err instanceof Error ? err.message : '' + err
const message = `Cannot start Bacalhau: ${errorMsg}`
onActivity({ type: 'error', message })
}
await Promise.all([
(async () => {
let apiUrl
try {
apiUrl = await readyPromise
} catch (err) {
const errorMsg = err instanceof Error ? err.message : '' + err
const message = `Cannot start Bacalhau: ${errorMsg}`
onActivity({ type: 'error', message })
throw err
}

onActivity({ type: 'info', message: 'Bacalhau module started.' })
while (true) {
if (
childProcess.exitCode !== null ||
childProcess.signalCode !== null
) {
break
}
try {
await updateStats({ apiUrl, onMetrics })
} catch (err) {
const errString = err.stack || err.message || err
console.error(`Cannot fetch Bacalhau module stats. ${errString}`)
}
await timers.setTimeout(1000)
}
})(),
(async () => {
const [code] = await once(childProcess, 'close')
console.error(`Bacalhau closed all stdio with code ${code ?? '<no code>'}`)
childProcess.stderr.removeAllListeners()
childProcess.stdout.removeAllListeners()
Sentry.captureException('Bacalhau exited')
throw new Error('Bacalhau exited')
})()
])
}

function handleActivityLogs ({ onActivity, text }) {
Expand Down

0 comments on commit d467583

Please sign in to comment.