chore: worker-v2 cleanup (#12019)

Co-authored-by: Mo AbuAboud <moabuaboud@gmail.com>
This commit is contained in:
Chaker Atallah
2026-03-24 14:16:19 +01:00
committed by GitHub
parent 8a94869800
commit 8ec4c008cd
4 changed files with 33 additions and 6 deletions

View File

@@ -1,12 +1,10 @@
#!/bin/sh
export AP_CONTAINER_TYPE="${AP_CONTAINER_TYPE:-WORKER_AND_APP}"
export AP_WORKERS="${AP_WORKERS:-1}"
export AP_PORT="${AP_PORT:-80}"
export AP_PM2_INSTANCES="${AP_PM2_INSTANCES:-1}"
echo "AP_CONTAINER_TYPE: $AP_CONTAINER_TYPE"
echo "AP_WORKERS: $AP_WORKERS"
echo "AP_PORT: $AP_PORT"
echo "AP_PM2_INSTANCES: $AP_PM2_INSTANCES"
@@ -53,7 +51,7 @@ if [ "$AP_CONTAINER_TYPE" = "WORKER" ] || [ "$AP_CONTAINER_TYPE" = "WORKER_AND_A
name: 'activepieces-worker',
script: 'packages/server/worker/dist/src/bootstrap.js',
node_args: '--enable-source-maps',
instances: ${AP_WORKERS},
instances: 1,
exec_mode: 'fork'
},"
fi

View File

@@ -91,7 +91,6 @@ it will produce these values. </Tip>
| `AP_ISSUE_ARCHIVE_DAYS` | Controls the automatic archival of issues in the system. Issues that have not been updated for this many days will be automatically moved to an archived state.| `14` | `1`
| `AP_LOAD_TRANSLATIONS_FOR_DEV_PIECES` | Load translations for dev pieces (configured via `AP_DEV_PIECES`). When disabled, dev pieces are loaded without translations. This only affects development mode.| `false` | `true`
| `AP_CONTAINER_TYPE` | Controls which services to run in the Docker container. `APP` starts only the API server, `WORKER` starts only the worker, `WORKER_AND_APP` starts both. | `WORKER_AND_APP` | `APP`
| `AP_WORKERS` | Number of worker PM2 instances to run. Only affects WORKER containers. | `1` | `3`
<Warning>
The frontend URL is essential for webhooks and app triggers to work. It must

View File

@@ -5,7 +5,8 @@ import { worker } from './worker'
const workerToken = system.getOrThrow(WorkerSystemProp.WORKER_TOKEN)
async function main(): Promise<void> {
await worker.start({ apiUrl: getApiUrl(), socketUrl: getSocketUrl(), workerToken })
const containerType = system.get(WorkerSystemProp.CONTAINER_TYPE) ?? 'WORKER_AND_APP'
await worker.start({ apiUrl: getApiUrl(), socketUrl: getSocketUrl(), workerToken, withHealthServer: containerType === 'WORKER' })
const shutdown = async () => {
const timeout = setTimeout(() => {
@@ -24,3 +25,4 @@ main().catch((err) => {
logger.error({ error: err }, 'Worker crashed')
process.exit(1)
})

View File

@@ -1,3 +1,4 @@
import { createServer } from 'http'
import os from 'os'
import { systemUsage } from '@activepieces/server-utils'
import {
@@ -33,10 +34,12 @@ const workerId = `worker-${nanoid()}`
const workerHostname = os.hostname()
let healthServerInstance: ReturnType<typeof createServer> | null = null
let sandboxManagers: SandboxManager[] = []
export const worker = {
async start({ apiUrl, socketUrl, workerToken }: WorkerStartParams): Promise<void> {
async start({ apiUrl, socketUrl, workerToken, withHealthServer = false }: WorkerStartParams): Promise<void> {
const platformIdForDedicatedWorker = system.get(WorkerSystemProp.PLATFORM_ID_FOR_DEDICATED_WORKER)
socket = io(socketUrl.url, {
auth: { token: workerToken, workerId, platformIdForDedicatedWorker },
@@ -66,6 +69,9 @@ export const worker = {
logger.error({ error: error.message }, 'Socket.IO connection error')
})
if (withHealthServer) {
healthServerInstance = startHealthServer()
}
logger.info({ apiUrl, socketUrl }, 'Worker started, polling for jobs...')
},
@@ -75,6 +81,8 @@ export const worker = {
sandboxManagers = []
socket?.disconnect()
socket = null
healthServerInstance?.close()
healthServerInstance = null
logger.info('Worker stopped')
},
}
@@ -274,8 +282,28 @@ function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms))
}
function startHealthServer(): ReturnType<typeof createServer> {
const port = Number(system.get(WorkerSystemProp.PORT))
const server = createServer((req, res) => {
if (req.method === 'GET' && req.url === '/worker/health') {
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(JSON.stringify({ status: 'ok' }))
}
else {
res.writeHead(404)
res.end()
}
})
server.listen(port, () => {
logger.info({ port }, 'Health server listening')
})
return server
}
type WorkerStartParams = {
apiUrl: string
socketUrl: { url: string, path: string }
workerToken: string
withHealthServer?: boolean
}