feat: replace bun with pnpm+esbuild, add logs field, and clean up trigger/flow status (#12310)

This commit is contained in:
Mo AbuAboud
2026-03-31 20:15:34 +03:00
committed by GitHub
parent dfbe5ea986
commit 23c1de864a
40 changed files with 828 additions and 13941 deletions

View File

@@ -31,7 +31,7 @@ ENV LC_ALL en_US.UTF-8
COPY default.cf /usr/local/etc/isolate
RUN npm i -g bun@1.3.1 cross-env@7.0.3
RUN npm i -g bun@1.3.1 cross-env@7.0.3 pnpm@10.33.0 esbuild@0.25.0
# install isolated-vm in a parent directory to avoid linking the package in every sandbox

View File

@@ -24,6 +24,8 @@ jobs:
with:
bun-version: latest
- run: npm install -g pnpm@latest
- run: bun install
- name: Turbo Cache

View File

@@ -27,35 +27,32 @@ RUN --mount=type=cache,target=/var/cache/apt,sharing=locked \
sed -i '/en_US.UTF-8/s/^# //g' /etc/locale.gen && \
locale-gen en_US.UTF-8
RUN export ARCH=$(uname -m) && \
if [ "$ARCH" = "x86_64" ]; then \
curl -fSL https://github.com/oven-sh/bun/releases/download/bun-v1.3.1/bun-linux-x64-baseline.zip -o bun.zip; \
elif [ "$ARCH" = "aarch64" ]; then \
curl -fSL https://github.com/oven-sh/bun/releases/download/bun-v1.3.1/bun-linux-aarch64.zip -o bun.zip; \
fi
RUN unzip bun.zip \
&& mv bun-*/bun /usr/local/bin/bun \
&& chmod +x /usr/local/bin/bun \
&& rm -rf bun.zip bun-*
RUN bun --version
# Install global npm packages in a single layer
RUN --mount=type=cache,target=/root/.npm \
npm install -g --no-fund --no-audit \
node-gyp \
npm@11.11.0 \
pm2@6.0.10 \
typescript@4.9.4
typescript@4.9.4 \
pnpm@10.33.0 \
esbuild@0.25.0
# Install isolated-vm globally (needed for sandboxes)
RUN --mount=type=cache,target=/root/.bun/install/cache \
cd /usr/src && bun install isolated-vm@6.0.2
RUN --mount=type=cache,target=/root/.npm \
cd /usr/src && npm install --no-fund --no-audit isolated-vm@6.0.2
### STAGE 1: Build ###
FROM base AS build
# Install bun for monorepo build (build-time only, not needed at runtime)
RUN export ARCH=$(uname -m) && \
if [ "$ARCH" = "x86_64" ]; then \
curl -fSL https://github.com/oven-sh/bun/releases/download/bun-v1.3.1/bun-linux-x64-baseline.zip -o bun.zip; \
elif [ "$ARCH" = "aarch64" ]; then \
curl -fSL https://github.com/oven-sh/bun/releases/download/bun-v1.3.1/bun-linux-aarch64.zip -o bun.zip; \
fi
RUN unzip bun.zip && mv bun-*/bun /usr/local/bin/bun && chmod +x /usr/local/bin/bun && rm -rf bun.zip bun-*
WORKDIR /usr/src/app
# Copy dependency files and workspace package.json files for resolution
@@ -104,13 +101,16 @@ COPY --from=build /usr/src/app/bun.lock ./
COPY --from=build /usr/src/app/bunfig.toml ./
COPY --from=build /usr/src/app/LICENSE .
# Copy workspace package.json files (needed for bun workspace resolution)
# Copy workspace package.json files (needed for workspace resolution)
COPY --from=build /usr/src/app/packages ./packages
# Copy built engine
COPY --from=build /usr/src/app/dist/packages/engine/ ./dist/packages/engine/
# Regenerate lockfile and install production dependencies (pieces were trimmed from workspace)
# Copy bun from build stage (needed to resolve workspace:* protocol in package.json files)
COPY --from=build /usr/local/bin/bun /usr/local/bin/bun
# Install production dependencies
RUN --mount=type=cache,target=/root/.bun/install/cache \
bun install --production

View File

@@ -1,5 +1,6 @@
{
"lockfileVersion": 1,
"configVersion": 0,
"workspaces": {
"": {
"name": "activepieces",
@@ -226,6 +227,7 @@
"pino": "10.1.0",
"pino-loki": "2.1.3",
"pino-opentelemetry-transport": "3.0.0",
"pnpm": "10.33.0",
"posthog-js": "1.195.0",
"priority-queue-typescript": "1.0.1",
"prismjs": "1.30.0",
@@ -7919,7 +7921,7 @@
},
"packages/shared": {
"name": "@activepieces/shared",
"version": "0.50.0",
"version": "0.50.1",
"dependencies": {
"dayjs": "1.11.9",
"deepmerge-ts": "7.1.0",
@@ -13832,6 +13834,8 @@
"pngjs": ["pngjs@5.0.0", "", {}, "sha512-40QW5YalBNfQo5yRYmiw7Yz6TKKVr3h6970B2YE+3fQpsWcrbj1PzJgxeJ19DRQjhMbKPIuMY8rFaXc8moolVw=="],
"pnpm": ["pnpm@10.33.0", "", { "bin": { "pnpm": "bin/pnpm.cjs", "pnpx": "bin/pnpx.cjs" } }, "sha512-EFaLtKavtYyes2MNqQzJUWQXq+vT+rvmc58K55VyjaFJHp21pUTHatjrdXD1xLs9bGN7LLQb/c20f6gjyGSTGQ=="],
"possible-typed-array-names": ["possible-typed-array-names@1.1.0", "", {}, "sha512-/+5VFTchJDoVj3bhoqi6UeymcD00DAwb1nJwamzPvHEszJ4FpF6SNNbUbOS8yI56qHzdV8eK0qEfOSiodkTdxg=="],
"postcss": ["postcss@8.5.8", "", { "dependencies": { "nanoid": "^3.3.11", "picocolors": "^1.1.1", "source-map-js": "^1.2.1" } }, "sha512-OW/rX8O/jXnm82Ey1k44pObPtdblfiuWnrd8X7GJ7emImCOstunGbXUpp7HdBrFQX6rJzn3sPT397Wp5aCwCHg=="],

View File

@@ -262,6 +262,7 @@
"pino": "10.1.0",
"pino-loki": "2.1.3",
"pino-opentelemetry-transport": "3.0.0",
"pnpm": "10.33.0",
"posthog-js": "1.195.0",
"priority-queue-typescript": "1.0.1",
"prismjs": "1.30.0",

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -20,15 +20,20 @@ async function buildPieces(app: FastifyInstance, piecesInfo: PieceInfo[]): Promi
}
}
const pieceFilters = piecesInfo.map(p => `--filter=@activepieces/piece-${p.pieceName}`).join(' ')
const sharedFilters = '--filter=@activepieces/pieces-framework --filter=@activepieces/pieces-common --filter=@activepieces/shared'
const filterArgs = `${sharedFilters} ${pieceFilters} --force`
const pieceFilters = piecesInfo.map(p => `--filter=@activepieces/piece-${p.pieceName}`)
const filterArgs = [
'--filter=@activepieces/pieces-framework',
'--filter=@activepieces/pieces-common',
'--filter=@activepieces/shared',
...pieceFilters,
'--force',
]
app.log.info(`Building ${piecesInfo.length} piece(s): ${piecesInfo.map(p => p.pieceName).join(',')}...`)
const lock = await memoryLock.acquire(PIECES_BUILDER_MUTEX_KEY)
try {
const startTime = performance.now()
await spawnAndWait(`npx turbo run build ${filterArgs}`)
await spawnAndWait('npx', ['turbo', 'run', 'build', ...filterArgs])
const buildTime = (performance.now() - startTime) / 1000
app.log.info(`Build completed in ${buildTime.toFixed(2)} seconds`)
@@ -132,12 +137,12 @@ export async function startDevPieceWatcher(app: FastifyInstance): Promise<void>
process.once('SIGTERM', () => void cleanup())
}
function spawnAndWait(cmd: string): Promise<void> {
function spawnAndWait(cmd: string, args: string[]): Promise<void> {
return new Promise((resolve, reject) => {
const child = spawn(cmd, {
const child = spawn(cmd, args, {
cwd: process.cwd(),
stdio: 'inherit',
shell: true,
shell: false,
})
child.on('close', (code) => {
if (code === 0) {

View File

@@ -351,7 +351,7 @@ describe('Execute Flow E2E', () => {
})
// Poll until flow run completes
const maxWaitMs = 60_000
const maxWaitMs = 120_000
const pollIntervalMs = 500
const start = Date.now()
let result = await flowRunService(app.log).getOnePopulatedOrThrow({

View File

@@ -16,12 +16,12 @@ export type CodeSandbox = {
type RunCodeModuleParams = {
/**
* The {@link CodeModule} to execute.
* Path to the compiled index.js file to execute.
*/
codeModule: CodeModule
codeFilePath: string
/**
* The inputs that are passed to the {@link CodeModule}.
* The inputs that are passed to the code module.
*/
inputs: Record<string, unknown>
}

View File

@@ -1,10 +1,12 @@
import { CodeSandbox } from '../../core/code/code-sandbox-common'
import importFresh from '@activepieces/import-fresh-webpack'
import { CodeModule, CodeSandbox } from '../../core/code/code-sandbox-common'
/**
* Runs code without a sandbox.
*/
export const noOpCodeSandbox: CodeSandbox = {
async runCodeModule({ codeModule, inputs }) {
async runCodeModule({ codeFilePath, inputs }) {
const codeModule: CodeModule = await importFresh(codeFilePath)
return codeModule.code(inputs)
},

View File

@@ -1,5 +1,6 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { CodeModule, CodeSandbox } from '../../core/code/code-sandbox-common'
import { readFile } from 'node:fs/promises'
import { CodeSandbox } from '../../core/code/code-sandbox-common'
const ONE_HUNDRED_TWENTY_EIGHT_MEGABYTES = 128
@@ -17,7 +18,7 @@ const getIvm = () => {
* Runs code in a V8 Isolate sandbox
*/
export const v8IsolateCodeSandbox: CodeSandbox = {
async runCodeModule({ codeModule, inputs }) {
async runCodeModule({ codeFilePath, inputs }) {
const ivm = getIvm()
const isolate = new ivm.Isolate({ memoryLimit: ONE_HUNDRED_TWENTY_EIGHT_MEGABYTES })
@@ -29,12 +30,12 @@ export const v8IsolateCodeSandbox: CodeSandbox = {
},
})
const serializedCodeModule = serializeCodeModule(codeModule)
const source = await readFile(codeFilePath, 'utf8')
return await executeIsolate({
isolate,
isolateContext,
code: serializedCodeModule,
code: wrapCjsModule(source),
})
}
finally {
@@ -89,14 +90,16 @@ const executeIsolate = async ({ isolate, isolateContext, code }: ExecuteIsolateP
return outRef.copy()
}
const serializeCodeModule = (codeModule: CodeModule): string => {
const serializedCodeFunction = Object.keys(codeModule)
.reduce((acc, key) =>
acc + `const ${key} = ${(codeModule as any)[key].toString()};`,
'')
// replace the exports.function_name with function_name
return serializedCodeFunction.replace(/\(0, exports\.(\w+)\)/g, '$1') + 'code(inputs);'
// Wrap CJS source so `exports`/`module` are defined but `require` is NOT,
// blocking all Node.js built-in access inside the isolate.
// `inputs` is already injected as a global by initIsolateContext.
function wrapCjsModule(source: string): string {
return `(function() {
const exports = Object.create(null);
const module = { exports };
${source}
return module.exports.code(inputs);
})()`
}
type InitContextParams = {

View File

@@ -1,9 +1,7 @@
import path from 'path'
import importFresh from '@activepieces/import-fresh-webpack'
import { LATEST_CONTEXT_VERSION } from '@activepieces/pieces-framework'
import { CodeAction, EngineGenericError, FlowActionType, FlowRunStatus, GenericStepOutput, isNil, StepOutputStatus } from '@activepieces/shared'
import { initCodeSandbox } from '../core/code/code-sandbox'
import { CodeModule } from '../core/code/code-sandbox-common'
import { continueIfFailureHandler, runWithExponentialBackoff } from '../helper/error-handling'
import { progressService } from '../services/progress.service'
import { utils } from '../utils'
@@ -48,11 +46,10 @@ const executeAction: ActionHandler<CodeAction> = async ({ action, executionState
}
const artifactPath = path.resolve(`${constants.baseCodeDirectory}/${constants.flowVersionId}/${action.name}/index.js`)
const codeModule: CodeModule = await importFresh(artifactPath)
const codeSandbox = await initCodeSandbox()
const output = await codeSandbox.runCodeModule({
codeModule,
codeFilePath: artifactPath,
inputs: resolvedInput,
})

View File

@@ -0,0 +1,71 @@
import { writeFile, unlink } from 'node:fs/promises'
import os from 'node:os'
import path from 'node:path'
import { v8IsolateCodeSandbox } from '../../../src/lib/core/code/v8-isolate-code-sandbox'
describe('v8IsolateCodeSandbox', () => {
describe('runCodeModule', () => {
let tmpFile: string
beforeEach(() => {
tmpFile = path.join(os.tmpdir(), `v8-sandbox-test-${Date.now()}.js`)
})
afterEach(async () => {
await unlink(tmpFile).catch(() => undefined)
})
async function runWithSource(source: string, inputs: Record<string, unknown> = {}): Promise<unknown> {
await writeFile(tmpFile, source, 'utf8')
return v8IsolateCodeSandbox.runCodeModule({ codeFilePath: tmpFile, inputs })
}
// --- correct execution ---
it('executes module.exports.code and returns inputs', async () => {
const source = `module.exports = { code: async (inputs) => inputs }`
const result = await runWithSource(source, { key: 'value' })
expect(result).toEqual({ key: 'value' })
})
it('executes exports.code pattern (esbuild assignment style)', async () => {
const source = `exports.code = async (inputs) => ({ doubled: inputs.n * 2 })`
const result = await runWithSource(source, { n: 3 })
expect(result).toEqual({ doubled: 6 })
})
it('makes inputs accessible inside the code function', async () => {
const source = `module.exports = { code: async (inputs) => inputs.greeting }`
const result = await runWithSource(source, { greeting: 'hello' })
expect(result).toBe('hello')
})
it('supports async/await inside the code function', async () => {
const source = `module.exports = { code: async () => { const v = await Promise.resolve(42); return v } }`
const result = await runWithSource(source)
expect(result).toBe(42)
})
it('propagates user errors thrown inside code', async () => {
const source = `module.exports = { code: async () => { throw new Error('user error') } }`
await expect(runWithSource(source)).rejects.toThrow('user error')
})
// --- sandbox boundary ---
it('blocks top-level require (ReferenceError)', async () => {
const source = `const cp = require('child_process'); module.exports = { code: async () => null }`
await expect(runWithSource(source)).rejects.toThrow(/require/)
})
it('blocks require inside the code function (ReferenceError)', async () => {
const source = `module.exports = { code: async () => { require('fs'); return null } }`
await expect(runWithSource(source)).rejects.toThrow(/require/)
})
it('blocks access to the process global (ReferenceError)', async () => {
const source = `module.exports = { code: async () => process.env }`
await expect(runWithSource(source)).rejects.toThrow(/process/)
})
})
})

View File

@@ -1,60 +0,0 @@
import { apDayjsDuration, fileSystemUtils } from '@activepieces/server-utils'
import { tryCatch } from '@activepieces/shared'
import { Logger } from 'pino'
import { CommandOutput, execPromise, spawnWithKill } from '../../utils/exec'
export const bunRunner = (log: Logger) => ({
async install({ path, filtersPath }: InstallParams): Promise<CommandOutput> {
const args = [
'--ignore-scripts',
]
const filters: string[] = filtersPath
.map(sanitizeFilterPath)
.map((path) => `--filter ./${path}`)
await fileSystemUtils.threadSafeMkdir(path)
log.debug({ path, args, filters }, '[bunRunner#install]')
const { error, data } = await tryCatch(async () => spawnWithKill({
cmd: `bun install ${args.join(' ')} ${filters.join(' ')}`,
options: {
cwd: path,
},
printOutput: false,
timeoutMs: apDayjsDuration(10, 'minutes').asMilliseconds(),
}))
if (error) {
log.error({ error }, '[bunRunner#install] Failed to install dependencies')
throw error
}
return data
},
async build({ path, entryFile, outputFile }: BuildParams): Promise<CommandOutput> {
const config = [
`${entryFile}`,
'--target node',
'--production',
'--format cjs',
`--outfile ${outputFile}`,
]
log.debug({ path, entryFile, outputFile, config }, '[bunRunner#build]')
return execPromise(`bun build ${config.join(' ')}`, { cwd: path })
},
})
function sanitizeFilterPath(filterPath: string): string {
const allowed = /^(?![.])[a-zA-Z0-9\-_.@/]+$/
if (!allowed.test(filterPath)) {
throw new Error(`Invalid filter path ${filterPath}`)
}
return filterPath
}
type InstallParams = {
path: string
filtersPath: string[]
}
type BuildParams = {
path: string
entryFile: string
outputFile: string
}

View File

@@ -6,7 +6,7 @@ import { trace } from '@opentelemetry/api'
import { Logger } from 'pino'
import { workerSettings } from '../../config/worker-settings'
import { cacheState, NO_SAVE_GUARD } from '../cache-state'
import { bunRunner } from './bun-runner'
import { packageRunner } from './package-runner'
const tracer = trace.getTracer('code-builder')
@@ -114,7 +114,7 @@ export const codeBuilder = (log: Logger) => ({
}
})
// node_modules is no longer needed after bun build bundles everything into index.js
// node_modules is no longer needed after esbuild bundles everything into index.js
await tryCatch(() => rm(path.join(codePath, 'node_modules'), { recursive: true }))
return currentHash
},
@@ -156,7 +156,7 @@ async function installDependencies({ path, packageJson }: InstallDependenciesPar
await fs.writeFile(`${path}/package.json`, packageJson, 'utf8')
const deps = Object.entries(JSON.parse(packageJson).dependencies ?? {})
if (deps.length > 0) {
await bunRunner(log).install({ path, filtersPath: [] })
await packageRunner(log).install({ path })
}
}
@@ -167,7 +167,7 @@ async function compileCode({ path, code }: CompileCodeParams, log: Logger): Prom
})
await fs.writeFile(`${path}/index.ts`, code, { encoding: 'utf8', flag: 'w' })
await bunRunner(log).build({
await packageRunner(log).build({
path,
entryFile: `${path}/index.ts`,
outputFile: `${path}/index.js`,

View File

@@ -0,0 +1,43 @@
import { apDayjsDuration, fileSystemUtils } from '@activepieces/server-utils'
import { Logger } from 'pino'
import { CommandOutput, spawnWithKill } from '../../utils/exec'
export const packageRunner = (log: Logger) => ({
async install({ path }: InstallParams): Promise<CommandOutput> {
await fileSystemUtils.threadSafeMkdir(path)
log.debug({ path }, '[packageRunner#install]')
return spawnWithKill({
cmd: 'pnpm install --prefer-offline --ignore-scripts',
options: { cwd: path },
printOutput: false,
timeoutMs: apDayjsDuration(10, 'minutes').asMilliseconds(),
})
},
async build({ path, entryFile, outputFile }: BuildParams): Promise<CommandOutput> {
const args = [
entryFile,
'--bundle',
'--platform=node',
'--format=cjs',
`--outfile=${outputFile}`,
]
log.debug({ path, entryFile, outputFile, args }, '[packageRunner#build]')
return spawnWithKill({
cmd: 'esbuild',
args,
options: { cwd: path },
printOutput: false,
timeoutMs: apDayjsDuration(5, 'minutes').asMilliseconds(),
})
},
})
type InstallParams = {
path: string
}
type BuildParams = {
path: string
entryFile: string
outputFile: string
}

View File

@@ -19,19 +19,18 @@ import { Logger } from 'pino'
import writeFileAtomic from 'write-file-atomic'
import { workerSettings } from '../../config/worker-settings'
import { getGlobalCacheCommonPath, getGlobalCachePathLatestVersion } from '../cache-paths'
import { bunRunner } from '../code/bun-runner'
import { packageRunner } from '../code/package-runner'
const tracer = trace.getTracer('piece-installer')
const usedPiecesMemoryCache: Record<string, boolean> = {}
const relativePiecePath = (piece: PiecePackage) => join('./', 'pieces', `${piece.pieceName}-${piece.pieceVersion}`)
const piecePath = (rootWorkspace: string, piece: PiecePackage) => join(rootWorkspace, 'pieces', `${piece.pieceName}-${piece.pieceVersion}`)
export const pieceInstaller = (log: Logger, apiClient: WorkerToApiContract) => ({
async install({ pieces, includeFilters }: InstallParams): Promise<void> {
async install({ pieces }: InstallParams): Promise<void> {
const groupedPieces = groupPiecesByPackagePath(pieces)
const installPromises = Object.entries(groupedPieces).map(async ([packagePath, piecesInGroup]) => {
await installPieces(packagePath, piecesInGroup, includeFilters, log, apiClient)
await installPieces(packagePath, piecesInGroup, log, apiClient)
})
await Promise.all(installPromises)
},
@@ -52,7 +51,7 @@ function getCustomPiecesPath(platformId: string): string {
}
}
async function installPieces(rootWorkspace: string, pieces: PiecePackage[], includeFilters: boolean, log: Logger, apiClient: WorkerToApiContract): Promise<void> {
async function installPieces(rootWorkspace: string, pieces: PiecePackage[], log: Logger, apiClient: WorkerToApiContract): Promise<void> {
const devPieces = workerSettings.getSettings().DEV_PIECES
const nonDevPieces = pieces.filter(piece => !devPieces.includes(getPieceNameFromAlias(piece.pieceName)))
const { piecesToInstall } = await partitionPiecesToInstall(rootWorkspace, nonDevPieces)
@@ -79,44 +78,46 @@ async function installPieces(rootWorkspace: string, pieces: PiecePackage[], incl
pieces: piecesToInstall.map(piece => `${piece.pieceName}-${piece.pieceVersion}`),
}, '[pieceInstaller] acquired lock and starting to install pieces')
await createRootPackageJson({
path: rootWorkspace,
})
await savePackageArchivesToDiskIfNotCached(rootWorkspace, piecesToInstall, apiClient)
await createRootPackageJson({ path: rootWorkspace })
await createPnpmWorkspaceYaml({ path: rootWorkspace })
await Promise.all(piecesToInstall.map(piece => createPiecePackageJson({ rootWorkspace, piecePackage: piece })))
await Promise.all(piecesToInstall.map(piece => createPiecePackageJson({
rootWorkspace,
piecePackage: piece,
})))
await tracer.startActiveSpan('pieceInstaller.bunInstall', async (span) => {
await tracer.startActiveSpan('pieceInstaller.install', async (span) => {
try {
span.setAttribute('pieces.count', piecesToInstall.length)
span.setAttribute('pieces.rootWorkspace', rootWorkspace)
const { error: installError } = await tryCatch(async () => bunRunner(log).install({
path: rootWorkspace,
filtersPath: includeFilters ? piecesToInstall.map(relativePiecePath) : [],
}))
const { error: batchError } = await tryCatch(() => packageRunner(log).install({ path: rootWorkspace }))
if (!isNil(installError)) {
log.error({
rootWorkspace,
pieces: piecesToInstall.map(piece => `${piece.pieceName}-${piece.pieceVersion}`),
error: installError,
}, '[pieceInstaller] Piece installation failed, rolling back')
span.recordException(installError instanceof Error ? installError : new Error(String(installError)))
await rollbackInstallation(rootWorkspace, piecesToInstall)
throw installError
if (!batchError) {
await markPiecesAsUsed(rootWorkspace, piecesToInstall)
log.info({ rootWorkspace, piecesCount: piecesToInstall.length }, '[pieceInstaller] Installed pieces using pnpm workspace')
return
}
await markPiecesAsUsed(rootWorkspace, piecesToInstall)
log.warn({ error: batchError, pieces: piecesToInstall.map(p => `${p.pieceName}@${p.pieceVersion}`) },
'[pieceInstaller] Batch install failed, retrying pieces individually')
await rm(join(rootWorkspace, 'pnpm-workspace.yaml'), { force: true })
log.info({
rootWorkspace,
piecesCount: piecesToInstall.length,
}, '[pieceInstaller] Installed registry pieces using bun')
const failures: PiecePackage[] = []
await Promise.all(piecesToInstall.map(async (piece) => {
const { error } = await tryCatch(() => packageRunner(log).install({ path: piecePath(rootWorkspace, piece) }))
if (error) {
span.recordException(error instanceof Error ? error : new Error(String(error)))
log.error({ piece: `${piece.pieceName}@${piece.pieceVersion}`, error }, '[pieceInstaller] Individual piece failed, rolling back')
await rm(piecePath(rootWorkspace, piece), { recursive: true, force: true })
failures.push(piece)
}
else {
await markPiecesAsUsed(rootWorkspace, [piece])
}
}))
if (failures.length > 0) {
const names = failures.map(p => `${p.pieceName}@${p.pieceVersion}`).join(', ')
throw new Error(`[pieceInstaller] Failed to install: ${names}`)
}
}
finally {
span.end()
@@ -126,12 +127,6 @@ async function installPieces(rootWorkspace: string, pieces: PiecePackage[], incl
})
}
async function rollbackInstallation(rootWorkspace: string, pieces: PiecePackage[]): Promise<void> {
await Promise.all(pieces.map(piece => rm(path.resolve(rootWorkspace, relativePiecePath(piece)), {
recursive: true,
})))
}
function groupPiecesByPackagePath(pieces: PiecePackage[]): Record<string, PiecePackage[]> {
return groupBy(pieces, (piece) => {
switch (piece.packageType) {
@@ -169,18 +164,6 @@ async function savePackageArchivesToDiskIfNotCached(
await Promise.all(saveToDiskJobs)
}
async function createRootPackageJson({ path }: { path: string }): Promise<void> {
const packageJsonPath = join(path, 'package.json')
await fileSystemUtils.threadSafeMkdir(dirname(packageJsonPath))
await writeFileAtomic(packageJsonPath, JSON.stringify({
'name': 'fast-workspace',
'version': '1.0.0',
'workspaces': [
'pieces/**',
],
}, null, 2), 'utf8')
}
async function createPiecePackageJson({ rootWorkspace, piecePackage }: {
rootWorkspace: string
piecePackage: PiecePackage
@@ -198,6 +181,21 @@ async function createPiecePackageJson({ rootWorkspace, piecePackage }: {
await writeFile(packageJsonPath, JSON.stringify(packageJson, null, 2), 'utf8')
}
async function createRootPackageJson({ path }: { path: string }): Promise<void> {
const packageJsonPath = join(path, 'package.json')
await fileSystemUtils.threadSafeMkdir(dirname(packageJsonPath))
await writeFileAtomic(packageJsonPath, JSON.stringify({
name: 'fast-workspace',
version: '1.0.0',
workspaces: ['pieces/**'],
}, null, 2), 'utf8')
}
async function createPnpmWorkspaceYaml({ path }: { path: string }): Promise<void> {
const workspacePath = join(path, 'pnpm-workspace.yaml')
await writeFileAtomic(workspacePath, 'packages:\n - \'pieces/**\'\n', 'utf8')
}
async function partitionPiecesToInstall(rootWorkspace: string, pieces: PiecePackage[]): Promise<PieceInstallationResult> {
const piecesWithCheck = await Promise.all(
pieces.map(async (piece) => {
@@ -240,7 +238,6 @@ function getPackageArchivePathForPiece(rootWorkspace: string, piecePackage: Priv
type InstallParams = {
pieces: PiecePackage[]
includeFilters: boolean
}
type PieceInstallationResult = {

View File

@@ -60,7 +60,6 @@ export const provisioner = (log: Logger, apiClient: WorkerToApiContract) => ({
piecesSpan.setAttribute('pieces.count', uniquePieces.length)
await pieceInstaller(log, apiClient).install({
pieces: uniquePieces,
includeFilters: true,
})
void tryCatch(() => apiClient.markPieceAsUsed({ pieces: uniquePieces }))
log.info({

View File

@@ -12,6 +12,7 @@ import {
isNil,
ResumeExecuteFlowOperation,
ResumePayload,
tryCatch,
WorkerJobType,
WorkerToApiContract,
} from '@activepieces/shared'
@@ -33,7 +34,11 @@ export const executeFlowJob: JobHandler<ExecuteFlowJobData, FireAndForgetJobResu
return { kind: JobResultKind.FIRE_AND_FORGET }
}
const provisioned = await provisionFlowPieces({ flowVersion, platformId: data.platformId, flowId: data.flowId, projectId: data.projectId, log: ctx.log, apiClient: ctx.apiClient })
const { data: provisioned, error: provisionError } = await tryCatch(() => provisionFlowPieces({ flowVersion, platformId: data.platformId, flowId: data.flowId, projectId: data.projectId, log: ctx.log, apiClient: ctx.apiClient }))
if (provisionError) {
await reportFlowStatus(ctx, data, FlowRunStatus.INTERNAL_ERROR)
throw provisionError
}
if (!provisioned) {
await reportFlowStatus(ctx, data, FlowRunStatus.FAILED)
return { kind: JobResultKind.FIRE_AND_FORGET }

View File

@@ -39,7 +39,7 @@ export function isolateProcess(log: SandboxLogger, enginePath: string, _codeDire
}
const envArgs = Object.entries(sandboxEnv)
.map(([key, value]) => `--env=${key}='${value}'`)
.map(([key, value]) => `--env=${key}=${value}`)
const dirArgs = mounts.map((m) => {
const suffix = m.optional ? ':maybe' : ''
@@ -66,7 +66,7 @@ export function isolateProcess(log: SandboxLogger, enginePath: string, _codeDire
log.debug({ sandboxId, command: `${isolateBinaryPath} ${args.join(' ')}` }, 'Spawning isolate process')
const child = spawn(isolateBinaryPath, args, {
shell: true,
shell: false,
})
child.stdout?.on('data', (data: Buffer) => {

View File

@@ -7,16 +7,20 @@ export const execPromise = promisify(execCallback)
export async function spawnWithKill({
cmd,
args: explicitArgs,
options = {},
printOutput,
timeoutMs,
}: SpawnWithKillParams): Promise<CommandOutput> {
return new Promise((resolve, reject) => {
const [command, ...args] = cmd.split(' ')
// When explicit args are provided, skip shell splitting and disable shell
// to prevent command injection via user-controlled path components.
const [command, ...splitArgs] = explicitArgs === undefined ? cmd.split(' ') : [cmd]
const args = explicitArgs ?? splitArgs
const cp = spawn(command, args, {
detached: true,
shell: true,
shell: explicitArgs === undefined,
...options,
})
@@ -83,6 +87,7 @@ export async function spawnWithKill({
type SpawnWithKillParams = {
cmd: string
args?: string[]
options?: SpawnOptions
printOutput?: boolean
timeoutMs?: number

View File

@@ -273,7 +273,7 @@ async function warmupPiecesOnStartup(apiClient: WorkerToApiContract): Promise<vo
}
logger.info({ count: pieces.length }, 'Starting piece cache warmup')
const { error: installError } = await tryCatch(() =>
pieceInstaller(logger, apiClient).install({ pieces, includeFilters: false }),
pieceInstaller(logger, apiClient).install({ pieces }),
)
if (installError) {
logger.error({ error: installError }, 'Failed to install pieces during startup warmup')

View File

@@ -0,0 +1,163 @@
import { rm, writeFile, mkdir } from 'node:fs/promises'
import { readFileSync } from 'node:fs'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { randomUUID } from 'node:crypto'
import { createRequire } from 'module'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import type { Logger } from 'pino'
import { packageRunner } from '../../../../src/lib/cache/code/package-runner'
const fakeLog = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
child: vi.fn().mockReturnThis(),
} as unknown as Logger
let tempDir: string
beforeEach(async () => {
tempDir = join(tmpdir(), `esbuild-test-${randomUUID()}`)
await mkdir(tempDir, { recursive: true })
})
afterEach(async () => {
await rm(tempDir, { recursive: true, force: true })
})
const requireDynamic = createRequire(import.meta.url)
describe('packageRunner.build (integration)', () => {
it('basic AP format compiles and is callable', async () => {
const entryFile = join(tempDir, 'index.ts')
const outputFile = join(tempDir, 'index.js')
await writeFile(entryFile, `export const code = async (inputs) => { return true; };`)
await packageRunner(fakeLog).build({ path: tempDir, entryFile, outputFile })
const mod = requireDynamic(outputFile)
expect(typeof mod.code).toBe('function')
expect(await mod.code({})).toBe(true)
})
it('TypeScript types are stripped', async () => {
const entryFile = join(tempDir, 'index.ts')
const outputFile = join(tempDir, 'index.js')
await writeFile(entryFile, `export const code = async (inputs: { x: number }) => { return inputs.x * 2; };`)
await packageRunner(fakeLog).build({ path: tempDir, entryFile, outputFile })
const mod = requireDynamic(outputFile)
expect(await mod.code({ x: 21 })).toBe(42)
})
it('output is CommonJS — no ESM syntax', async () => {
const entryFile = join(tempDir, 'index.ts')
const outputFile = join(tempDir, 'index.js')
await writeFile(entryFile, `export const code = async (inputs) => { return true; };`)
await packageRunner(fakeLog).build({ path: tempDir, entryFile, outputFile })
const content = readFileSync(outputFile, 'utf-8')
expect(content).not.toMatch(/^export /m)
expect(content).toMatch(/exports\.|module\.exports/)
})
it('invalid syntax → build rejects', async () => {
const entryFile = join(tempDir, 'index.ts')
const outputFile = join(tempDir, 'index.js')
await writeFile(entryFile, `export const code = async (inputs) => { return @@@; };`)
await expect(packageRunner(fakeLog).build({ path: tempDir, entryFile, outputFile })).rejects.toThrow()
})
it('code using lodash — full install + build pipeline', { timeout: 30_000 }, async () => {
const packageJsonFile = join(tempDir, 'package.json')
const entryFile = join(tempDir, 'index.ts')
const outputFile = join(tempDir, 'index.js')
await writeFile(packageJsonFile, JSON.stringify({ dependencies: { lodash: '4.17.21' } }))
await packageRunner(fakeLog).install({ path: tempDir })
await writeFile(entryFile, [
`import _ from 'lodash';`,
`export const code = async (inputs: { arr: number[] }) => {`,
` return _.sum(inputs.arr);`,
`};`,
].join('\n'))
await packageRunner(fakeLog).build({ path: tempDir, entryFile, outputFile })
const mod = requireDynamic(outputFile)
expect(await mod.code({ arr: [1, 2, 3] })).toBe(6)
})
it('code using hello-world-npm — full install + build pipeline', { timeout: 30_000 }, async () => {
const packageJsonFile = join(tempDir, 'package.json')
const entryFile = join(tempDir, 'index.ts')
const outputFile = join(tempDir, 'index.js')
await writeFile(packageJsonFile, JSON.stringify({ dependencies: { 'hello-world-npm': '1.1.1' } }))
await packageRunner(fakeLog).install({ path: tempDir })
await writeFile(entryFile, [
`import helloWorldNpm from 'hello-world-npm';`,
`export const code = async (inputs) => {`,
` return helloWorldNpm();`,
`};`,
].join('\n'))
await packageRunner(fakeLog).build({ path: tempDir, entryFile, outputFile })
const mod = requireDynamic(outputFile)
expect(await mod.code({})).toBe('Hello World NPM')
})
it('hello-world@0.0.2 has no main entry — install succeeds but build rejects', { timeout: 30_000 }, async () => {
// hello-world@0.0.2 has no "main" field and no index.js — no bundler can resolve it.
// This was always broken (verified: bun build fails identically). Not a regression.
const packageJsonFile = join(tempDir, 'package.json')
const entryFile = join(tempDir, 'index.ts')
const outputFile = join(tempDir, 'index.js')
await writeFile(packageJsonFile, JSON.stringify({ dependencies: { 'hello-world': '0.0.2' } }))
await packageRunner(fakeLog).install({ path: tempDir })
await writeFile(entryFile, [
`import helloWorld from 'hello-world';`,
`export const code = async (inputs) => {`,
` const result = helloWorld();`,
` return result;`,
`};`,
].join('\n'))
await expect(packageRunner(fakeLog).build({ path: tempDir, entryFile, outputFile })).rejects.toThrow()
})
it('code using dayjs — full install + build pipeline', { timeout: 30_000 }, async () => {
const packageJsonFile = join(tempDir, 'package.json')
const entryFile = join(tempDir, 'index.ts')
const outputFile = join(tempDir, 'index.js')
await writeFile(packageJsonFile, JSON.stringify({ dependencies: { dayjs: '1.11.0' } }))
await packageRunner(fakeLog).install({ path: tempDir })
await writeFile(entryFile, [
`import dayjs from 'dayjs';`,
`export const code = async (inputs: { iso: string }) => {`,
` return dayjs(inputs.iso).year();`,
`};`,
].join('\n'))
await packageRunner(fakeLog).build({ path: tempDir, entryFile, outputFile })
const mod = requireDynamic(outputFile)
expect(await mod.code({ iso: '2024-06-15' })).toBe(2024)
})
})

View File

@@ -0,0 +1,93 @@
import { describe, expect, it, vi, beforeEach } from 'vitest'
import type { Logger } from 'pino'
const mockSpawnWithKill = vi.fn()
const mockThreadSafeMkdir = vi.fn()
vi.mock('../../../../src/lib/utils/exec', () => ({
spawnWithKill: mockSpawnWithKill,
}))
vi.mock('@activepieces/server-utils', () => ({
fileSystemUtils: {
threadSafeMkdir: mockThreadSafeMkdir,
},
apDayjsDuration: (_val: number, _unit: string) => ({ asMilliseconds: () => 600000 }),
}))
const { packageRunner } = await import('../../../../src/lib/cache/code/package-runner')
const fakeLog = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
child: vi.fn().mockReturnThis(),
} as unknown as Logger
beforeEach(() => {
vi.clearAllMocks()
mockThreadSafeMkdir.mockResolvedValue(undefined)
})
describe('packageRunner.install', () => {
it('runs pnpm install with --prefer-offline and --ignore-scripts in the given path', async () => {
mockSpawnWithKill.mockResolvedValueOnce({ stdout: '', stderr: '' })
await packageRunner(fakeLog).install({ path: '/workspace/pieces/my-piece-1.0.0' })
expect(mockSpawnWithKill).toHaveBeenCalledOnce()
const { cmd, options } = mockSpawnWithKill.mock.calls[0][0]
expect(cmd).toBe('pnpm install --prefer-offline --ignore-scripts')
expect(options.cwd).toBe('/workspace/pieces/my-piece-1.0.0')
})
it('spawnWithKill rejects — error propagates to caller', async () => {
const boom = new Error('spawn error')
mockSpawnWithKill.mockRejectedValueOnce(boom)
await expect(
packageRunner(fakeLog).install({ path: '/workspace' }),
).rejects.toThrow('spawn error')
})
})
describe('packageRunner.build', () => {
it('calls esbuild via spawnWithKill with explicit args — no shell interpolation', async () => {
mockSpawnWithKill.mockResolvedValueOnce({ stdout: '', stderr: '' })
await packageRunner(fakeLog).build({
path: '/path',
entryFile: '/path/index.ts',
outputFile: '/path/index.js',
})
expect(mockSpawnWithKill).toHaveBeenCalledTimes(1)
const { cmd, args, options } = mockSpawnWithKill.mock.calls[0][0]
expect(cmd).toBe('esbuild')
expect(args).toEqual([
'/path/index.ts',
'--bundle',
'--platform=node',
'--format=cjs',
'--outfile=/path/index.js',
])
expect(options.cwd).toBe('/path')
})
it('shell metacharacters in path are passed as literal args, not interpreted', async () => {
mockSpawnWithKill.mockResolvedValueOnce({ stdout: '', stderr: '' })
const maliciousPath = '/codes/abc/; touch /tmp/rce; #'
await packageRunner(fakeLog).build({
path: maliciousPath,
entryFile: `${maliciousPath}/index.ts`,
outputFile: `${maliciousPath}/index.js`,
})
const { cmd, args } = mockSpawnWithKill.mock.calls[0][0]
// The malicious string must appear verbatim in the args array, never split by shell
expect(cmd).toBe('esbuild')
expect(args[0]).toBe(`${maliciousPath}/index.ts`)
})
})

View File

@@ -179,7 +179,7 @@ describe('createSandbox', () => {
{ timeoutInSeconds: 10 },
)
expect(result).toEqual({ ...engineResponse, stdOut: '', stdError: '' })
expect(result).toEqual({ ...engineResponse, logs: undefined })
})
it('recovers after engine returns INTERNAL_ERROR and handles next job', async () => {
@@ -242,7 +242,7 @@ describe('createSandbox', () => {
{ timeoutInSeconds: 10 },
)
expect(result).toEqual({ ...engineResponse, stdOut: 'line1\nline2\n', stdError: 'err1\n' })
expect(result).toEqual({ ...engineResponse, logs: 'stdout:\nline1\nline2\n\nstderr:\nerr1\n' })
})
it('delegates worker contract calls to handlers', async () => {

View File

@@ -0,0 +1,163 @@
import { access, mkdir, writeFile } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { randomUUID } from 'node:crypto'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { PackageType, PieceType } from '@activepieces/shared'
import type { OfficialPiecePackage } from '@activepieces/shared'
import type { Logger } from 'pino'
// Module-level variable updated per test so the vi.mock factory can reference it
let testWorkspace = ''
const mockInstall = vi.fn()
vi.mock('../src/lib/cache/code/package-runner', () => ({
packageRunner: () => ({
install: mockInstall,
}),
}))
vi.mock('../src/lib/config/worker-settings', () => ({
workerSettings: {
getSettings: () => ({
EXECUTION_MODE: 'UNSANDBOXED',
DEV_PIECES: [],
}),
},
}))
vi.mock('../src/lib/cache/cache-paths', () => ({
getGlobalCacheCommonPath: () => testWorkspace,
getGlobalCachePathLatestVersion: () => testWorkspace,
}))
// Import after mocks are registered
const { pieceInstaller } = await import('../src/lib/cache/pieces/piece-installer')
function makePiece(name: string, version = '1.0.0'): OfficialPiecePackage {
return {
packageType: PackageType.REGISTRY,
pieceType: PieceType.OFFICIAL,
pieceName: name,
pieceVersion: version,
}
}
function pieceDirPath(piece: OfficialPiecePackage): string {
return join(testWorkspace, 'pieces', `${piece.pieceName}-${piece.pieceVersion}`)
}
function readyFilePath(piece: OfficialPiecePackage): string {
return join(pieceDirPath(piece), 'ready')
}
async function pathExists(p: string): Promise<boolean> {
return access(p).then(() => true, () => false)
}
const fakeLog = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
child: vi.fn().mockReturnThis(),
} as unknown as Logger
// REGISTRY pieces don't call apiClient.getPieceArchive so an empty object suffices
const fakeApiClient = {} as never
beforeEach(async () => {
testWorkspace = join(tmpdir(), `piece-installer-test-${randomUUID()}`)
await mkdir(testWorkspace, { recursive: true })
vi.clearAllMocks()
})
afterEach(async () => {
const { rm } = await import('node:fs/promises')
await rm(testWorkspace, { recursive: true, force: true })
})
describe('pieceInstaller', () => {
it('two pieces succeed — both marked ready, install called once (batch)', async () => {
const piece1 = makePiece('@activepieces/piece-a')
const piece2 = makePiece('@activepieces/piece-b')
const installer = pieceInstaller(fakeLog, fakeApiClient)
mockInstall.mockResolvedValue({ output: '' })
await installer.install({ pieces: [piece1, piece2] })
expect(mockInstall).toHaveBeenCalledTimes(1)
expect(mockInstall).toHaveBeenCalledWith({ path: testWorkspace })
expect(await pathExists(readyFilePath(piece1))).toBe(true)
expect(await pathExists(readyFilePath(piece2))).toBe(true)
})
it('good piece succeeds, bad piece fails — good marked ready, bad rolled back, error names bad', async () => {
const good = makePiece('@activepieces/piece-good')
const bad = makePiece('@activepieces/piece-bad')
const installer = pieceInstaller(fakeLog, fakeApiClient)
mockInstall.mockImplementation(({ path }: { path: string }) => {
if (path === testWorkspace) {
return Promise.reject(new Error('batch install failure'))
}
if (path.includes('piece-bad')) {
return Promise.reject(new Error('install failure'))
}
return Promise.resolve({ output: '' })
})
const error = await installer.install({ pieces: [good, bad] }).catch(e => e as Error)
expect(error).toBeInstanceOf(Error)
expect(error.message).toContain('@activepieces/piece-bad@1.0.0')
expect(error.message).not.toContain('@activepieces/piece-good@1.0.0')
expect(await pathExists(readyFilePath(good))).toBe(true)
expect(await pathExists(pieceDirPath(bad))).toBe(false)
})
it('both pieces fail — both rolled back, error names both', async () => {
const piece1 = makePiece('@activepieces/piece-x')
const piece2 = makePiece('@activepieces/piece-y')
const installer = pieceInstaller(fakeLog, fakeApiClient)
mockInstall.mockRejectedValue(new Error('install failure'))
const error = await installer.install({ pieces: [piece1, piece2] }).catch(e => e as Error)
expect(error).toBeInstanceOf(Error)
expect(error.message).toContain('@activepieces/piece-x@1.0.0')
expect(error.message).toContain('@activepieces/piece-y@1.0.0')
expect(await pathExists(pieceDirPath(piece1))).toBe(false)
expect(await pathExists(pieceDirPath(piece2))).toBe(false)
})
it('single piece fails — rolled back, error thrown', async () => {
const piece = makePiece('@activepieces/piece-solo')
const installer = pieceInstaller(fakeLog, fakeApiClient)
mockInstall.mockRejectedValue(new Error('install failure'))
await expect(installer.install({ pieces: [piece] })).rejects.toThrow('@activepieces/piece-solo@1.0.0')
expect(mockInstall).toHaveBeenCalledTimes(2)
expect(await pathExists(pieceDirPath(piece))).toBe(false)
})
it('piece already installed — pnpm install never called', async () => {
const piece = makePiece('@activepieces/piece-cached')
const pieceDir = pieceDirPath(piece)
await mkdir(pieceDir, { recursive: true })
await writeFile(join(pieceDir, 'ready'), 'true')
const installer = pieceInstaller(fakeLog, fakeApiClient)
await installer.install({ pieces: [piece] })
expect(mockInstall).not.toHaveBeenCalled()
})
})

View File

@@ -1,6 +1,6 @@
{
"name": "@activepieces/shared",
"version": "0.50.0",
"version": "0.50.1",
"type": "commonjs",
"sideEffects": false,
"main": "./dist/src/index.js",

View File

@@ -1,4 +1,5 @@
import { z } from 'zod'
import { STEP_NAME_REGEX } from '../../../core/common'
import { VersionType } from '../../pieces'
import { PropertySettings } from '../properties'
import { SampleDataSetting } from '../sample-data'
@@ -21,7 +22,7 @@ export enum BranchExecutionType {
}
const commonActionProps = {
name: z.string(),
name: z.string().regex(STEP_NAME_REGEX),
valid: z.boolean(),
displayName: z.string(),
skip: z.boolean().optional(),

View File

@@ -1,4 +1,5 @@
import { z } from 'zod'
import { STEP_NAME_REGEX } from '../../../core/common'
import { VersionType } from '../../pieces'
import { CodeActionSettings, LoopOnItemsActionSettings, PieceActionSettings, RouterActionSettings } from '../actions/action'
import { PropertySettings } from '../properties'
@@ -26,7 +27,7 @@ export enum FlowTriggerType {
}
const commonProps = {
name: z.string(),
name: z.string().regex(STEP_NAME_REGEX),
valid: z.boolean(),
displayName: z.string(),
nextAction: z.any().optional(),

View File

@@ -1,2 +1,3 @@
export * from './permission'
export const SAFE_STRING_PATTERN = '^[^./]+$'
export const SAFE_STRING_PATTERN = '^[^./]+$'
export const STEP_NAME_REGEX = /^[a-zA-Z_][a-zA-Z0-9_]*$/

View File

@@ -0,0 +1,56 @@
import { test, expect } from '../../../fixtures';
/**
* Warmup resilience smoke test.
*
* Triggers a piece sync (which may cause warmup for some pieces) and then
* verifies the worker remains healthy by successfully running a Webhook flow.
* A broken piece published with workspace:* dependencies must not prevent other
* flows from executing.
*/
test.describe('Piece isolation — CE', () => {
test('worker stays healthy after piece sync and can execute a webhook flow', async ({ page, automationsPage, builderPage, request }) => {
test.setTimeout(120000);
// Trigger a piece sync — this exercises the warmup path
const token = await page.evaluate(() => localStorage.getItem('token'));
await request.post('/api/v1/pieces/sync', {
headers: { Authorization: `Bearer ${token}` },
});
await automationsPage.waitFor();
await automationsPage.newFlowFromScratch();
await builderPage.selectInitialTrigger({
piece: 'Webhook',
trigger: 'Catch Webhook'
});
const webhookInput = page.locator('input.grow.bg-background');
const webhookUrl = await webhookInput.inputValue();
await builderPage.testTrigger();
const runVersion = Math.floor(Math.random() * 100000);
await page.context().request.get(`${webhookUrl}?runVersion=${runVersion}`);
await page.waitForTimeout(3000);
await builderPage.addAction({
piece: 'Webhook',
action: 'Return Response'
});
await page.locator('div.cm-activeLine.cm-line').fill('');
await page.locator('div.cm-activeLine.cm-line').fill(
'{"runVersion": "{{trigger[\'queryParams\'][\'runVersion\']}}"}'
);
await page.waitForTimeout(1000);
await builderPage.publishFlow();
const response = await page.context().request.get(`${webhookUrl}/sync?runVersion=${runVersion}`);
const body = await response.json();
expect(body.runVersion).toBe(runVersion.toString());
});
});

View File

@@ -0,0 +1,96 @@
import { test, expect } from '../../../fixtures';
/**
* EE: Bad piece isolation test.
*
* Uploads a custom piece whose package.json contains a workspace:* dependency
* (simulating an accidentally mis-published piece). Verifies that the bad piece's
* install failure does NOT prevent a standard Webhook flow from succeeding.
*
* Requires a platform admin token (EE only).
*/
test.describe('Piece isolation — EE', () => {
test('broken custom piece does not prevent webhook flow from running', async ({ page, automationsPage, builderPage, request }) => {
test.setTimeout(180000);
const token = await page.evaluate(() => localStorage.getItem('token'));
const authHeaders = { Authorization: `Bearer ${token}` };
// Upload a bad custom piece that has workspace:* in its dependencies.
// The archive is a minimal tarball with a package.json that contains a
// workspace protocol dependency which bun cannot resolve on install.
const badPackageJson = JSON.stringify({
name: '@test/broken-piece',
version: '0.0.1',
dependencies: {
'some-internal-package': 'workspace:*',
},
});
const { Readable } = await import('node:stream');
const tar = await import('tar-stream');
const zlib = await import('node:zlib');
const pack = tar.pack();
pack.entry({ name: 'package/package.json' }, badPackageJson);
pack.finalize();
const chunks: Buffer[] = [];
for await (const chunk of pack.pipe(zlib.createGzip())) {
chunks.push(chunk as Buffer);
}
const tarball = Buffer.concat(chunks);
// POST the broken piece archive to the platform pieces endpoint
const uploadResponse = await request.post('/api/v1/pieces', {
headers: authHeaders,
multipart: {
pieceArchive: {
name: 'broken-piece.tgz',
mimeType: 'application/gzip',
buffer: tarball,
},
},
});
// Upload may fail with 4xx if EE endpoint is not available — skip gracefully
test.skip(uploadResponse.status() === 404, 'Piece upload endpoint not available in this build');
// Even if the upload succeeds and bun fails to install the broken piece,
// a regular Webhook flow must still execute successfully
await automationsPage.waitFor();
await automationsPage.newFlowFromScratch();
await builderPage.selectInitialTrigger({
piece: 'Webhook',
trigger: 'Catch Webhook'
});
const webhookInput = page.locator('input.grow.bg-background');
const webhookUrl = await webhookInput.inputValue();
await builderPage.testTrigger();
const runVersion = Math.floor(Math.random() * 100000);
await page.context().request.get(`${webhookUrl}?runVersion=${runVersion}`);
await page.waitForTimeout(3000);
await builderPage.addAction({
piece: 'Webhook',
action: 'Return Response'
});
await page.locator('div.cm-activeLine.cm-line').fill('');
await page.locator('div.cm-activeLine.cm-line').fill(
'{"runVersion": "{{trigger[\'queryParams\'][\'runVersion\']}}"}'
);
await page.waitForTimeout(1000);
await builderPage.publishFlow();
const response = await page.context().request.get(`${webhookUrl}/sync?runVersion=${runVersion}`);
const body = await response.json();
expect(body.runVersion).toBe(runVersion.toString());
});
});