diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6396cd9..1f997f3 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -14,7 +14,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - node_version: ['10'] #['8', '10', '12'] + node_version: ['12'] #['8', '10', '12'] os: [ubuntu-latest] #[ubuntu-latest, windows-latest, macOS-latest] steps: @@ -28,3 +28,4 @@ jobs: run: | npm install npm run lint + npm test diff --git a/docs/api.md b/docs/api.md index 907eb0f..63355c6 100644 --- a/docs/api.md +++ b/docs/api.md @@ -58,7 +58,7 @@ Worker.load() loads ffmpeg-core.js script (download from remote if not presented -### Worker.write(path, data): Promise +### Worker.write(path, data, jobId): Promise Worker.write() writes data to specific path in Emscripten file system, it is an essential step before doing any other tasks. @@ -66,6 +66,7 @@ Worker.write() writes data to specific path in Emscripten file system, it is an - `path` path to write data to file system - `data` data to write, can be Uint8Array, URL or base64 format +- `jobId` @see Worker.load() **Examples:** @@ -80,7 +81,7 @@ Worker.write() writes data to specific path in Emscripten file system, it is an -### Worker.writeText(path, text): Promise +### Worker.writeText(path, text, jobId): Promise Worker.write() writes text data to specific path in Emscripten file system. @@ -88,6 +89,7 @@ Worker.write() writes text data to specific path in Emscripten file system. - `path` path to write data to file system - `text` string to write to file +- `jobId` @see Worker.load() **Examples:** @@ -99,14 +101,14 @@ Worker.write() writes text data to specific path in Emscripten file system. -### Worker.read(path, del): Promise +### Worker.read(path, jobId): Promise Worker.read() reads data from file system, often used to get output data after specific task. **Arguments:** - `path` path to read data from file system -- `del` whether to delete file in IDBFS or NODEFS, default: true +- `jobId` @see Worker.load() **Examples:** @@ -118,13 +120,14 @@ Worker.read() reads data from file system, often used to get output data after s -### Worker.remove(path): Promise +### Worker.remove(path, jobId): Promise Worker.remove() removes files in file system, it will be better to delete unused files if you need to run ffmpeg.js multiple times. **Arguments:** - `path` path for file to delete +- `jobId` @see Worker.load() **Examples:** @@ -136,7 +139,7 @@ Worker.remove() removes files in file system, it will be better to delete unused -### Worker.transcode(input, output, options, del, jobId): Promise +### Worker.transcode(input, output, options, jobId): Promise Worker.transcode() transcode a video file to another format. @@ -145,8 +148,7 @@ Worker.transcode() transcode a video file to another format. - `input` input file path, the input file should be written through Worker.write() - `output` output file path, can be read with Worker.read() later - `options` a string to add extra arguments to ffmpeg -- `del` a boolean to determine whether to delete input file after the task is done, default: true -- `jobId` check Worker.load() +- `jobId` @see Worker.load() **Examples:** @@ -158,7 +160,7 @@ Worker.transcode() transcode a video file to another format. -### Worker.trim(input, output, from, to, options, del, jobId): Promise +### Worker.trim(input, output, from, to, options, jobId): Promise Worker.trim() trims video to specific interval. @@ -169,8 +171,7 @@ Worker.trim() trims video to specific interval. - `from` start time, can be in time stamp (00:00:12.000) or seconds (12) - `to` end time, rule same as above - `options` a string to add extra arguments to ffmpeg -- `del` a boolean to determine whether to delete input file after the task is done, default: true -- `jobId` check Worker.load() +- `jobId` @see Worker.load() **Examples:** @@ -182,7 +183,7 @@ Worker.trim() trims video to specific interval. -### Worker.concatDemuxer(input, output, options, del, jobId): Promise +### Worker.concatDemuxer(input, output, options, jobId): Promise Worker.concatDemuxer() concatenates multiple videos using concatDemuxer. This method won't encode the videos again. But it has its limitations. See [Concat demuxer Wiki](https://trac.ffmpeg.org/wiki/Concatenate) @@ -191,7 +192,6 @@ Worker.concatDemuxer() concatenates multiple videos using concatDemuxer. This me - `input` input file paths as an Array, the input files should be written through Worker.write() - `output` output file path, can be read with Worker.read() later - `options` a string to add extra arguments to ffmpeg -- `del` a boolean to determine whether to delete input file after the task is done, default: true - `jobId` check Worker.load() **Examples:** @@ -204,26 +204,19 @@ Worker.concatDemuxer() concatenates multiple videos using concatDemuxer. This me -### Worker.run(args, options, jobId): Promise +### Worker.run(args, jobId): Promise Worker.run() is similar to FFmpeg cli tool, aims to provide maximum flexiblity for users. **Arguments:** - `args` a string to represent arguments, note: inputPath must start with `/data/` as worker.write write to this path by default. -- `options` a object to define the value for input, output and del. - - `input` a string or an array of strings to indicate input files, ffmpeg.js deletes these files for you. - - `output` a string or an array of strings to indicate output files, ffmpeg.js moves these files to `/data`, deletes them from MEMFS and you can read them with Worker.read() - - `del` a boolean to determine whether to delete input file after the task is done, default: true - `jobId` check Worker.load() **Examples:** ```javascript (async () => { - await worker.run("-i /data/flame.avi -s 1920x1080 output.mp4", { - input: "flame.avi", - output: "output.mp4" - }); + await worker.run("-i /data/flame.avi -s 1920x1080 output.mp4"); })(); ``` diff --git a/package-lock.json b/package-lock.json index ea467da..84952fc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -811,9 +811,9 @@ } }, "@ffmpeg/core": { - "version": "0.5.0", - "resolved": "https://registry.npmjs.org/@ffmpeg/core/-/core-0.5.0.tgz", - "integrity": "sha512-IwccUO0ZchshMyYVfITT3c+X/S2721eiNuggxJcrC0oOyhWzUIGlule74vPDDcke6wTAVJ333WozuJgGsTYF2A==" + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/@ffmpeg/core/-/core-0.4.0.tgz", + "integrity": "sha512-ZHSgNnyKcc7WoorlWXstKbtLMvh16jH4JaD823doG1JUoLfEWQmELTTphR9n/aeAM246XfjjAUgfa7ct9DIReQ==" }, "@hapi/address": { "version": "2.1.2", diff --git a/package.json b/package.json index ad88ec5..dd0c860 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "lint": "eslint src", "wait": "rimraf dist && wait-on http://localhost:3000/dist/ffmpeg.dev.js", "test": "npm-run-all -p -r start test:all", - "test:all": "npm-run-all wait test:node:all", + "test:all": "npm-run-all wait test:browser:ffmpeg test:node:all", "test:node": "nyc mocha --exit --bail --require ./scripts/test-helper.js", "test:node:all": "npm run test:node -- ./tests/*.test.js", "test:browser": "mocha-headless-chrome -a incognito -a no-sandbox -a disable-setuid-sandbox -a disable-logging -t 300000", @@ -38,7 +38,7 @@ }, "homepage": "https://github.com/ffmpegjs/ffmpeg.js#readme", "dependencies": { - "@ffmpeg/core": "^0.5.0", + "@ffmpeg/core": "^0.4.0", "idb": "^4.0.5", "is-electron": "^2.2.0", "is-url": "^1.2.4", diff --git a/src/createWorker.js b/src/createWorker.js index 2f6947a..8068ffe 100644 --- a/src/createWorker.js +++ b/src/createWorker.js @@ -1,16 +1,14 @@ const createJob = require('./createJob'); const { log } = require('./utils/log'); const getId = require('./utils/getId'); -const extractProgress = require('./utils/extractProgress'); +const parseProgress = require('./utils/parseProgress'); const resolvePaths = require('./utils/resolvePaths'); +const getTransferables = require('./utils/getTransferables'); const { defaultOptions, spawnWorker, - terminateWorker, onMessage, - send, fetchFile, - fs, } = require('./worker/node'); let workerCounter = 0; @@ -41,15 +39,21 @@ module.exports = (_options = {}) => { const startJob = ({ id: jobId, action, payload }) => ( new Promise((resolve, reject) => { - log(`[${id}]: Start ${jobId}, action=${action}`); - setResolve(action, resolve); - setReject(action, reject); - send(worker, { + const packet = { workerId: id, jobId, action, payload, - }); + }; + log(`[${id}]: Start ${jobId}, action=${action}`); + setResolve(action, resolve); + setReject(action, reject); + /* + * By using Transferable in postMessage, we are able + * to send large files to worker + * @ref: https://github.com/ffmpegjs/ffmpeg.js/issues/8#issuecomment-572230128 + */ + worker.postMessage(packet, getTransferables(packet)); }) ); @@ -59,118 +63,121 @@ module.exports = (_options = {}) => { })) ); - const syncfs = (populate, jobId) => ( + const write = async (path, data, jobId) => ( startJob(createJob({ - id: jobId, action: 'syncfs', payload: { populate }, + id: jobId, + action: 'FS', + payload: { + method: 'writeFile', + args: [path, await fetchFile(data)], + }, })) ); - const write = async (path, data) => { - await syncfs(); - await fs.writeFile(path, await fetchFile(data)); - await syncfs(true); - return { - path: `/data/${path}`, - }; - }; - - const writeText = async (path, text) => { - await syncfs(true); - await fs.writeFile(path, text); - await syncfs(true); - return { - path: `/data/${path}`, - }; - }; - - const read = async (path, del = true) => { - const data = await fs.readFile(path); - if (del) { - await fs.deleteFile(path); - } - return { - data, - }; - }; - - const remove = async (path) => { - await fs.deleteFile(path); - return { - path: `/data/${path}`, - }; - }; - - const run = (args, opts = {}, jobId) => ( + const writeText = (path, text, jobId) => ( startJob(createJob({ - id: jobId, action: 'run', payload: { args, options: opts }, + id: jobId, + action: 'FS', + payload: { + method: 'writeFile', + args: [path, text], + }, })) ); - const transcode = (input, output, opts = '', del = true, jobId) => ( - run( - `-i /data/${input} ${opts} ${output}`, - { input, output, del }, - jobId, - ) + const read = (path, jobId) => ( + startJob(createJob({ + id: jobId, + action: 'FS', + payload: { + method: 'readFile', + args: [path], + }, + })) ); - const trim = (input, output, from, to, opts = '', del = true, jobId) => ( - run( - `-i /data/${input} -ss ${from} -to ${to} ${opts} ${output}`, - { input, output, del }, - jobId, - ) + const remove = (path, jobId) => ( + startJob(createJob({ + id: jobId, + action: 'FS', + payload: { + method: 'unlink', + args: [path], + }, + })) ); - const concatDemuxer = async (input, output, opts = '', del = true, jobId) => { - const text = input.reduce((acc, path) => `${acc}\nfile ${path}`, ''); - await writeText('concat_list.txt', text); - return run(`-f concat -safe 0 -i /data/concat_list.txt -c copy ${opts} ${output}`, - { del, output, input: [...input, 'concat_list.txt'] }, - jobId); - }; + const run = (args, jobId) => ( + startJob(createJob({ + id: jobId, + action: 'run', + payload: { + args, + }, + })) + ); const ls = (path, jobId) => ( startJob(createJob({ id: jobId, action: 'FS', - payload: { method: 'readdir', args: [path] }, + payload: { + method: 'readdir', + args: [path], + }, })) ); + const transcode = (input, output, opts = '', jobId) => ( + run( + `-i ${input} ${opts} ${output}`, + jobId, + ) + ); + + const trim = (input, output, from, to, opts = '', jobId) => ( + run( + `-i ${input} -ss ${from} -to ${to} ${opts} ${output}`, + jobId, + ) + ); + + const concatDemuxer = async (input, output, opts = '', jobId) => { + const text = input.reduce((acc, path) => `${acc}\nfile ${path}`, ''); + await writeText('concat_list.txt', text); + return run(`-f concat -safe 0 -i concat_list.txt -c copy ${opts} ${output}`, jobId); + }; + const terminate = async (jobId) => { if (worker !== null) { await startJob(createJob({ id: jobId, action: 'terminate', })); - terminateWorker(worker); + worker.terminate(); worker = null; } return Promise.resolve(); }; onMessage(worker, ({ - workerId, jobId, status, action, data, + workerId, jobId, action, status, payload, }) => { if (status === 'resolve') { + const { message, data } = payload; log(`[${workerId}]: Complete ${jobId}`); - let d = data; - if (action === 'FS') { - const { method, data: _data } = data; - if (method === 'readFile') { - d = Uint8Array.from({ ..._data, length: Object.keys(_data).length }); - } else { - d = _data; - } - } - resolves[action]({ jobId, data: d }); + resolves[action]({ + workerId, + jobId, + message, + data, + }); } else if (status === 'reject') { - rejects[action](data); - throw Error(data); + rejects[action](payload); + throw Error(payload); } else if (status === 'progress') { - extractProgress(data, progress); - logger(data); + parseProgress(payload, progress); + logger(payload); } }); @@ -180,16 +187,15 @@ module.exports = (_options = {}) => { setResolve, setReject, load, - syncfs, write, writeText, read, remove, + ls, run, transcode, trim, concatDemuxer, - ls, terminate, }; }; diff --git a/src/utils/getTransferables.js b/src/utils/getTransferables.js new file mode 100644 index 0000000..11c9112 --- /dev/null +++ b/src/utils/getTransferables.js @@ -0,0 +1,16 @@ +module.exports = (packet) => { + const transferables = []; + const check = (b) => { + if (b instanceof Uint8Array) { + transferables.push(b.buffer); + } else if (b instanceof ArrayBuffer) { + transferables.push(b); + } + }; + const { payload: { args, data } } = packet; + check(data); + if (Array.isArray(args)) { + args.forEach((arg) => check(arg)); + } + return transferables; +}; diff --git a/src/utils/extractProgress.js b/src/utils/parseProgress.js similarity index 100% rename from src/utils/extractProgress.js rename to src/utils/parseProgress.js diff --git a/src/worker-script/browser/index.js b/src/worker-script/browser/index.js index 83d244b..24a84b9 100644 --- a/src/worker-script/browser/index.js +++ b/src/worker-script/browser/index.js @@ -1,12 +1,10 @@ const worker = require('../'); const getCore = require('./getCore'); -const fs = require('../../worker/browser/fs'); global.addEventListener('message', ({ data }) => { - worker.dispatchHandlers(data, (obj) => postMessage(obj)); + worker.dispatchHandlers(data, postMessage); }); worker.setAdapter({ getCore, - fs, }); diff --git a/src/worker-script/index.js b/src/worker-script/index.js index 6f4f4c0..1e426fb 100644 --- a/src/worker-script/index.js +++ b/src/worker-script/index.js @@ -1,6 +1,7 @@ require('regenerator-runtime/runtime'); const defaultArgs = require('./constants/defaultArgs'); const strList2ptr = require('./utils/strList2ptr'); +const getTransferables = require('../utils/getTransferables'); let action = 'unknown'; let Module = null; @@ -26,89 +27,53 @@ const load = ({ workerId, payload: { options: { corePath } } }, res) => { } }; -const syncfs = async ({ - payload: { - populate = false, - }, -}, res) => { - await Module.syncfs(populate); - res.resolve({ message: `Sync file system with populate=${populate}` }); -}; - const FS = ({ payload: { method, args, }, }, res) => { - const data = Module.FS[method](...args); res.resolve({ - message: `${method} ${args.join(',')}`, - method, - data, + message: `Complete ${method}`, + data: Module.FS[method](...args), }); }; -const run = async ({ +const run = ({ payload: { args: _args, - options: { - input, output, del = true, - }, }, }, res) => { const args = [...defaultArgs, ..._args.trim().split(' ')].filter((s) => s.length !== 0); ffmpeg(args.length, strList2ptr(Module, args)); - - /* - * After executing the ffmpeg command, the data is saved in MEMFS, - * if `output` is specified in the options, here ffmpeg.js will move - * these files to IDBFS or NODEFS here. - */ - if (typeof output === 'string') { - await adapter.fs.writeFile(output, Module.FS.readFile(output)); - Module.FS.unlink(output); - } else if (Array.isArray(output)) { - await Promise.all(output.map(async (p) => { - await adapter.fs.writeFile(p, Module.FS.readFile(p)); - Module.FS.unlink(p); - })); - } - - /* - * To prevent input files occupy filesystem without notice, - * if `input` is specified in the options, ffmpeg.js cleans these - * files for you - */ - if (del && typeof input === 'string') { - await adapter.fs.deleteFile(input); - } else if (del && Array.isArray(input)) { - await Promise.all(input.map((p) => adapter.fs.deleteFile(p))); - } - - res.resolve({ message: `Complete ${args.join(' ')}` }); + res.resolve({ + message: `Complete ${args.join(' ')}`, + }); }; exports.dispatchHandlers = (packet, send) => { - const res = (status, data) => { - send({ - ...packet, + const { workerId, jobId, action: act } = packet; + const res = (status, payload) => { + const pkt = { + workerId, + jobId, + action: act, status, - data, - }); + payload, + }; + send(pkt, getTransferables(pkt)); }; res.resolve = res.bind(this, 'resolve'); res.reject = res.bind(this, 'reject'); res.progress = res.bind(this, 'progress'); - action = packet.action; + action = act; try { ({ load, - syncfs, FS, run, - })[packet.action](packet, res); + })[act](packet, res); } catch (err) { /** Prepare exception to travel through postMessage */ res.reject(err.toString()); diff --git a/src/worker-script/node/index.js b/src/worker-script/node/index.js index 0085455..6717a3d 100644 --- a/src/worker-script/node/index.js +++ b/src/worker-script/node/index.js @@ -1,12 +1,16 @@ +const { parentPort } = require('worker_threads'); const worker = require('../'); const getCore = require('./getCore'); -const fs = require('../../worker/node/fs'); -process.on('message', (packet) => { - worker.dispatchHandlers(packet, (obj) => process.send(obj)); +parentPort.on('message', (packet) => { + worker.dispatchHandlers( + packet, + (...args) => { + parentPort.postMessage(...args); + }, + ); }); worker.setAdapter({ getCore, - fs, }); diff --git a/src/worker/browser/fs.js b/src/worker/browser/fs.js deleted file mode 100644 index 319405e..0000000 --- a/src/worker/browser/fs.js +++ /dev/null @@ -1,34 +0,0 @@ -const { openDB } = require('idb'); - -const getDB = () => openDB('/data', 21); - -const getDataKeyAndMode = async (db) => { - const dummy = await db.get('FILE_DATA', '/data/.DUMMY'); - const dataKey = Object.keys(dummy).filter((k) => !['mode', 'timestamp'].includes(k)).pop(); - return { dataKey, mode: dummy.mode }; -}; - -module.exports = { - readFile: async (path) => { - const db = await getDB(); - const { dataKey } = await getDataKeyAndMode(db); - return (await db.get('FILE_DATA', `/data/${path}`))[dataKey]; - }, - writeFile: async (path, data) => { - const db = await getDB(); - const { dataKey, mode } = await getDataKeyAndMode(db); - await db.put( - 'FILE_DATA', - { - [dataKey]: data, - mode, - timestamp: new Date(), - }, - `/data/${path}`, - ); - }, - deleteFile: async (path) => { - const db = await getDB(); - await db.delete('FILE_DATA', `/data/${path}`); - }, -}; diff --git a/src/worker/browser/index.js b/src/worker/browser/index.js index 5289f98..467d4bb 100644 --- a/src/worker/browser/index.js +++ b/src/worker/browser/index.js @@ -9,18 +9,12 @@ */ const defaultOptions = require('./defaultOptions'); const spawnWorker = require('./spawnWorker'); -const terminateWorker = require('./terminateWorker'); const onMessage = require('./onMessage'); -const send = require('./send'); const fetchFile = require('./fetchFile'); -const fs = require('./fs'); module.exports = { defaultOptions, spawnWorker, - terminateWorker, onMessage, - send, fetchFile, - fs, }; diff --git a/src/worker/browser/send.js b/src/worker/browser/send.js deleted file mode 100644 index 88f8aaf..0000000 --- a/src/worker/browser/send.js +++ /dev/null @@ -1,10 +0,0 @@ -/** - * send - * - * @name send - * @function send packet to worker and create a job - * @access public - */ -module.exports = async (worker, packet) => { - worker.postMessage(packet); -}; diff --git a/src/worker/browser/terminateWorker.js b/src/worker/browser/terminateWorker.js deleted file mode 100644 index 753a3fd..0000000 --- a/src/worker/browser/terminateWorker.js +++ /dev/null @@ -1,10 +0,0 @@ -/** - * terminateWorker - * - * @name terminateWorker - * @function terminate worker - * @access public - */ -module.exports = (worker) => { - worker.terminate(); -}; diff --git a/src/worker/node/fetchFile.js b/src/worker/node/fetchFile.js index 8686465..ad94150 100644 --- a/src/worker/node/fetchFile.js +++ b/src/worker/node/fetchFile.js @@ -6,7 +6,7 @@ const isURL = require('is-url'); module.exports = async (_data) => { let data = _data; if (typeof _data === 'undefined') { - return _data; + return new Uint8Array(); } if (typeof _data === 'string') { @@ -22,5 +22,5 @@ module.exports = async (_data) => { data = _data; } - return data; + return new Uint8Array(data); }; diff --git a/src/worker/node/fs.js b/src/worker/node/fs.js deleted file mode 100644 index d5f3eb3..0000000 --- a/src/worker/node/fs.js +++ /dev/null @@ -1,16 +0,0 @@ -const util = require('util'); -const fs = require('fs'); - -const readFile = util.promisify(fs.readFile); -const writeFile = util.promisify(fs.writeFile); -const deleteFile = util.promisify(fs.unlink); - -module.exports = (path) => ( - readFile(`./data/${path}`) -); - -module.exports = { - readFile: (path) => readFile(`./data/${path}`), - writeFile: (path, data) => writeFile(`./data/${path}`, data), - deleteFile: (path) => deleteFile(`./data/${path}`), -}; diff --git a/src/worker/node/index.js b/src/worker/node/index.js index 3aa7147..997cb10 100644 --- a/src/worker/node/index.js +++ b/src/worker/node/index.js @@ -1,17 +1,11 @@ const defaultOptions = require('./defaultOptions'); const spawnWorker = require('./spawnWorker'); -const terminateWorker = require('./terminateWorker'); const onMessage = require('./onMessage'); -const send = require('./send'); const fetchFile = require('./fetchFile'); -const fs = require('./fs'); module.exports = { defaultOptions, spawnWorker, - terminateWorker, onMessage, - send, fetchFile, - fs, }; diff --git a/src/worker/node/send.js b/src/worker/node/send.js deleted file mode 100644 index 783c6e1..0000000 --- a/src/worker/node/send.js +++ /dev/null @@ -1,10 +0,0 @@ -/** - * send - * - * @name send - * @function send packet to worker and create a job - * @access public - */ -module.exports = (worker, packet) => { - worker.send(packet); -}; diff --git a/src/worker/node/spawnWorker.js b/src/worker/node/spawnWorker.js index e3c5d89..5bba13c 100644 --- a/src/worker/node/spawnWorker.js +++ b/src/worker/node/spawnWorker.js @@ -1,12 +1,12 @@ -const { fork } = require('child_process'); +const { Worker } = require('worker_threads'); /** * spawnWorker * * @name spawnWorker - * @function fork a new process in node + * @function fork a new worker thread in node * @access public */ module.exports = ({ workerPath }) => ( - fork(workerPath) + new Worker(workerPath) ); diff --git a/src/worker/node/terminateWorker.js b/src/worker/node/terminateWorker.js deleted file mode 100644 index 0e8b67e..0000000 --- a/src/worker/node/terminateWorker.js +++ /dev/null @@ -1,10 +0,0 @@ -/** - * terminateWorker - * - * @name terminateWorker - * @function kill worker - * @access public - */ -module.exports = (worker) => { - worker.kill(); -};