Refactor to remove IDBFS & NODEFS, use Transferable to handle large files

This commit is contained in:
Jerome Wu
2020-01-13 22:07:47 +08:00
parent 1bacf193d9
commit 0f15f58554
20 changed files with 162 additions and 281 deletions

View File

@@ -1,16 +1,14 @@
const createJob = require('./createJob');
const { log } = require('./utils/log');
const getId = require('./utils/getId');
const extractProgress = require('./utils/extractProgress');
const parseProgress = require('./utils/parseProgress');
const resolvePaths = require('./utils/resolvePaths');
const getTransferables = require('./utils/getTransferables');
const {
defaultOptions,
spawnWorker,
terminateWorker,
onMessage,
send,
fetchFile,
fs,
} = require('./worker/node');
let workerCounter = 0;
@@ -41,15 +39,21 @@ module.exports = (_options = {}) => {
const startJob = ({ id: jobId, action, payload }) => (
new Promise((resolve, reject) => {
log(`[${id}]: Start ${jobId}, action=${action}`);
setResolve(action, resolve);
setReject(action, reject);
send(worker, {
const packet = {
workerId: id,
jobId,
action,
payload,
});
};
log(`[${id}]: Start ${jobId}, action=${action}`);
setResolve(action, resolve);
setReject(action, reject);
/*
* By using Transferable in postMessage, we are able
* to send large files to worker
* @ref: https://github.com/ffmpegjs/ffmpeg.js/issues/8#issuecomment-572230128
*/
worker.postMessage(packet, getTransferables(packet));
})
);
@@ -59,118 +63,121 @@ module.exports = (_options = {}) => {
}))
);
const syncfs = (populate, jobId) => (
const write = async (path, data, jobId) => (
startJob(createJob({
id: jobId, action: 'syncfs', payload: { populate },
id: jobId,
action: 'FS',
payload: {
method: 'writeFile',
args: [path, await fetchFile(data)],
},
}))
);
const write = async (path, data) => {
await syncfs();
await fs.writeFile(path, await fetchFile(data));
await syncfs(true);
return {
path: `/data/${path}`,
};
};
const writeText = async (path, text) => {
await syncfs(true);
await fs.writeFile(path, text);
await syncfs(true);
return {
path: `/data/${path}`,
};
};
const read = async (path, del = true) => {
const data = await fs.readFile(path);
if (del) {
await fs.deleteFile(path);
}
return {
data,
};
};
const remove = async (path) => {
await fs.deleteFile(path);
return {
path: `/data/${path}`,
};
};
const run = (args, opts = {}, jobId) => (
const writeText = (path, text, jobId) => (
startJob(createJob({
id: jobId, action: 'run', payload: { args, options: opts },
id: jobId,
action: 'FS',
payload: {
method: 'writeFile',
args: [path, text],
},
}))
);
const transcode = (input, output, opts = '', del = true, jobId) => (
run(
`-i /data/${input} ${opts} ${output}`,
{ input, output, del },
jobId,
)
const read = (path, jobId) => (
startJob(createJob({
id: jobId,
action: 'FS',
payload: {
method: 'readFile',
args: [path],
},
}))
);
const trim = (input, output, from, to, opts = '', del = true, jobId) => (
run(
`-i /data/${input} -ss ${from} -to ${to} ${opts} ${output}`,
{ input, output, del },
jobId,
)
const remove = (path, jobId) => (
startJob(createJob({
id: jobId,
action: 'FS',
payload: {
method: 'unlink',
args: [path],
},
}))
);
const concatDemuxer = async (input, output, opts = '', del = true, jobId) => {
const text = input.reduce((acc, path) => `${acc}\nfile ${path}`, '');
await writeText('concat_list.txt', text);
return run(`-f concat -safe 0 -i /data/concat_list.txt -c copy ${opts} ${output}`,
{ del, output, input: [...input, 'concat_list.txt'] },
jobId);
};
const run = (args, jobId) => (
startJob(createJob({
id: jobId,
action: 'run',
payload: {
args,
},
}))
);
const ls = (path, jobId) => (
startJob(createJob({
id: jobId,
action: 'FS',
payload: { method: 'readdir', args: [path] },
payload: {
method: 'readdir',
args: [path],
},
}))
);
const transcode = (input, output, opts = '', jobId) => (
run(
`-i ${input} ${opts} ${output}`,
jobId,
)
);
const trim = (input, output, from, to, opts = '', jobId) => (
run(
`-i ${input} -ss ${from} -to ${to} ${opts} ${output}`,
jobId,
)
);
const concatDemuxer = async (input, output, opts = '', jobId) => {
const text = input.reduce((acc, path) => `${acc}\nfile ${path}`, '');
await writeText('concat_list.txt', text);
return run(`-f concat -safe 0 -i concat_list.txt -c copy ${opts} ${output}`, jobId);
};
const terminate = async (jobId) => {
if (worker !== null) {
await startJob(createJob({
id: jobId,
action: 'terminate',
}));
terminateWorker(worker);
worker.terminate();
worker = null;
}
return Promise.resolve();
};
onMessage(worker, ({
workerId, jobId, status, action, data,
workerId, jobId, action, status, payload,
}) => {
if (status === 'resolve') {
const { message, data } = payload;
log(`[${workerId}]: Complete ${jobId}`);
let d = data;
if (action === 'FS') {
const { method, data: _data } = data;
if (method === 'readFile') {
d = Uint8Array.from({ ..._data, length: Object.keys(_data).length });
} else {
d = _data;
}
}
resolves[action]({ jobId, data: d });
resolves[action]({
workerId,
jobId,
message,
data,
});
} else if (status === 'reject') {
rejects[action](data);
throw Error(data);
rejects[action](payload);
throw Error(payload);
} else if (status === 'progress') {
extractProgress(data, progress);
logger(data);
parseProgress(payload, progress);
logger(payload);
}
});
@@ -180,16 +187,15 @@ module.exports = (_options = {}) => {
setResolve,
setReject,
load,
syncfs,
write,
writeText,
read,
remove,
ls,
run,
transcode,
trim,
concatDemuxer,
ls,
terminate,
};
};

View File

@@ -0,0 +1,16 @@
module.exports = (packet) => {
const transferables = [];
const check = (b) => {
if (b instanceof Uint8Array) {
transferables.push(b.buffer);
} else if (b instanceof ArrayBuffer) {
transferables.push(b);
}
};
const { payload: { args, data } } = packet;
check(data);
if (Array.isArray(args)) {
args.forEach((arg) => check(arg));
}
return transferables;
};

View File

@@ -1,12 +1,10 @@
const worker = require('../');
const getCore = require('./getCore');
const fs = require('../../worker/browser/fs');
global.addEventListener('message', ({ data }) => {
worker.dispatchHandlers(data, (obj) => postMessage(obj));
worker.dispatchHandlers(data, postMessage);
});
worker.setAdapter({
getCore,
fs,
});

View File

@@ -1,6 +1,7 @@
require('regenerator-runtime/runtime');
const defaultArgs = require('./constants/defaultArgs');
const strList2ptr = require('./utils/strList2ptr');
const getTransferables = require('../utils/getTransferables');
let action = 'unknown';
let Module = null;
@@ -26,89 +27,53 @@ const load = ({ workerId, payload: { options: { corePath } } }, res) => {
}
};
const syncfs = async ({
payload: {
populate = false,
},
}, res) => {
await Module.syncfs(populate);
res.resolve({ message: `Sync file system with populate=${populate}` });
};
const FS = ({
payload: {
method,
args,
},
}, res) => {
const data = Module.FS[method](...args);
res.resolve({
message: `${method} ${args.join(',')}`,
method,
data,
message: `Complete ${method}`,
data: Module.FS[method](...args),
});
};
const run = async ({
const run = ({
payload: {
args: _args,
options: {
input, output, del = true,
},
},
}, res) => {
const args = [...defaultArgs, ..._args.trim().split(' ')].filter((s) => s.length !== 0);
ffmpeg(args.length, strList2ptr(Module, args));
/*
* After executing the ffmpeg command, the data is saved in MEMFS,
* if `output` is specified in the options, here ffmpeg.js will move
* these files to IDBFS or NODEFS here.
*/
if (typeof output === 'string') {
await adapter.fs.writeFile(output, Module.FS.readFile(output));
Module.FS.unlink(output);
} else if (Array.isArray(output)) {
await Promise.all(output.map(async (p) => {
await adapter.fs.writeFile(p, Module.FS.readFile(p));
Module.FS.unlink(p);
}));
}
/*
* To prevent input files occupy filesystem without notice,
* if `input` is specified in the options, ffmpeg.js cleans these
* files for you
*/
if (del && typeof input === 'string') {
await adapter.fs.deleteFile(input);
} else if (del && Array.isArray(input)) {
await Promise.all(input.map((p) => adapter.fs.deleteFile(p)));
}
res.resolve({ message: `Complete ${args.join(' ')}` });
res.resolve({
message: `Complete ${args.join(' ')}`,
});
};
exports.dispatchHandlers = (packet, send) => {
const res = (status, data) => {
send({
...packet,
const { workerId, jobId, action: act } = packet;
const res = (status, payload) => {
const pkt = {
workerId,
jobId,
action: act,
status,
data,
});
payload,
};
send(pkt, getTransferables(pkt));
};
res.resolve = res.bind(this, 'resolve');
res.reject = res.bind(this, 'reject');
res.progress = res.bind(this, 'progress');
action = packet.action;
action = act;
try {
({
load,
syncfs,
FS,
run,
})[packet.action](packet, res);
})[act](packet, res);
} catch (err) {
/** Prepare exception to travel through postMessage */
res.reject(err.toString());

View File

@@ -1,12 +1,16 @@
const { parentPort } = require('worker_threads');
const worker = require('../');
const getCore = require('./getCore');
const fs = require('../../worker/node/fs');
process.on('message', (packet) => {
worker.dispatchHandlers(packet, (obj) => process.send(obj));
parentPort.on('message', (packet) => {
worker.dispatchHandlers(
packet,
(...args) => {
parentPort.postMessage(...args);
},
);
});
worker.setAdapter({
getCore,
fs,
});

View File

@@ -1,34 +0,0 @@
const { openDB } = require('idb');
const getDB = () => openDB('/data', 21);
const getDataKeyAndMode = async (db) => {
const dummy = await db.get('FILE_DATA', '/data/.DUMMY');
const dataKey = Object.keys(dummy).filter((k) => !['mode', 'timestamp'].includes(k)).pop();
return { dataKey, mode: dummy.mode };
};
module.exports = {
readFile: async (path) => {
const db = await getDB();
const { dataKey } = await getDataKeyAndMode(db);
return (await db.get('FILE_DATA', `/data/${path}`))[dataKey];
},
writeFile: async (path, data) => {
const db = await getDB();
const { dataKey, mode } = await getDataKeyAndMode(db);
await db.put(
'FILE_DATA',
{
[dataKey]: data,
mode,
timestamp: new Date(),
},
`/data/${path}`,
);
},
deleteFile: async (path) => {
const db = await getDB();
await db.delete('FILE_DATA', `/data/${path}`);
},
};

View File

@@ -9,18 +9,12 @@
*/
const defaultOptions = require('./defaultOptions');
const spawnWorker = require('./spawnWorker');
const terminateWorker = require('./terminateWorker');
const onMessage = require('./onMessage');
const send = require('./send');
const fetchFile = require('./fetchFile');
const fs = require('./fs');
module.exports = {
defaultOptions,
spawnWorker,
terminateWorker,
onMessage,
send,
fetchFile,
fs,
};

View File

@@ -1,10 +0,0 @@
/**
* send
*
* @name send
* @function send packet to worker and create a job
* @access public
*/
module.exports = async (worker, packet) => {
worker.postMessage(packet);
};

View File

@@ -1,10 +0,0 @@
/**
* terminateWorker
*
* @name terminateWorker
* @function terminate worker
* @access public
*/
module.exports = (worker) => {
worker.terminate();
};

View File

@@ -6,7 +6,7 @@ const isURL = require('is-url');
module.exports = async (_data) => {
let data = _data;
if (typeof _data === 'undefined') {
return _data;
return new Uint8Array();
}
if (typeof _data === 'string') {
@@ -22,5 +22,5 @@ module.exports = async (_data) => {
data = _data;
}
return data;
return new Uint8Array(data);
};

View File

@@ -1,16 +0,0 @@
const util = require('util');
const fs = require('fs');
const readFile = util.promisify(fs.readFile);
const writeFile = util.promisify(fs.writeFile);
const deleteFile = util.promisify(fs.unlink);
module.exports = (path) => (
readFile(`./data/${path}`)
);
module.exports = {
readFile: (path) => readFile(`./data/${path}`),
writeFile: (path, data) => writeFile(`./data/${path}`, data),
deleteFile: (path) => deleteFile(`./data/${path}`),
};

View File

@@ -1,17 +1,11 @@
const defaultOptions = require('./defaultOptions');
const spawnWorker = require('./spawnWorker');
const terminateWorker = require('./terminateWorker');
const onMessage = require('./onMessage');
const send = require('./send');
const fetchFile = require('./fetchFile');
const fs = require('./fs');
module.exports = {
defaultOptions,
spawnWorker,
terminateWorker,
onMessage,
send,
fetchFile,
fs,
};

View File

@@ -1,10 +0,0 @@
/**
* send
*
* @name send
* @function send packet to worker and create a job
* @access public
*/
module.exports = (worker, packet) => {
worker.send(packet);
};

View File

@@ -1,12 +1,12 @@
const { fork } = require('child_process');
const { Worker } = require('worker_threads');
/**
* spawnWorker
*
* @name spawnWorker
* @function fork a new process in node
* @function fork a new worker thread in node
* @access public
*/
module.exports = ({ workerPath }) => (
fork(workerPath)
new Worker(workerPath)
);

View File

@@ -1,10 +0,0 @@
/**
* terminateWorker
*
* @name terminateWorker
* @function kill worker
* @access public
*/
module.exports = (worker) => {
worker.kill();
};