Refactor to worker version

This commit is contained in:
Jerome Wu
2019-10-24 07:47:11 +08:00
parent 99adf5138c
commit 9e5d0c5cc6
42 changed files with 579 additions and 109 deletions

View File

@@ -0,0 +1,3 @@
module.exports = {
logger: () => {},
};

21
src/createJob.js Normal file
View File

@@ -0,0 +1,21 @@
const getId = require('./utils/getId');
let jobCounter = 0;
module.exports = ({
id: _id,
action,
payload = {},
}) => {
let id = _id;
if (typeof id === 'undefined') {
id = getId('Job', jobCounter);
jobCounter += 1;
}
return {
id,
action,
payload,
};
};

109
src/createWorker.js Normal file
View File

@@ -0,0 +1,109 @@
const createJob = require('./createJob');
const { log } = require('./utils/log');
const getId = require('./utils/getId');
const {
defaultOptions,
spawnWorker,
terminateWorker,
onMessage,
loadMedia,
send,
} = require('./worker/node');
let workerCounter = 0;
module.exports = (_options = {}) => {
const id = getId('Worker', workerCounter);
const {
logger,
...options
} = {
...defaultOptions,
..._options,
};
const resolves = {};
const rejects = {};
let worker = spawnWorker(options);
workerCounter += 1;
const setResolve = (action, res) => {
resolves[action] = res;
};
const setReject = (action, rej) => {
rejects[action] = rej;
};
const startJob = ({ id: jobId, action, payload }) => (
new Promise((resolve, reject) => {
log(`[${id}]: Start ${jobId}, action=${action}`);
setResolve(action, resolve);
setReject(action, reject);
send(worker, {
workerId: id,
jobId,
action,
payload,
});
})
);
const load = (jobId) => (
startJob(createJob({
id: jobId, action: 'load', payload: { options },
}))
);
const transcode = async (media, outputExt, opts, jobId) => (
startJob(createJob({
id: jobId,
action: 'transcode',
payload: {
media: await loadMedia(media),
outputExt,
options: opts,
},
}))
);
const terminate = async (jobId) => {
if (worker !== null) {
await startJob(createJob({
id: jobId,
action: 'terminate',
}));
terminateWorker(worker);
worker = null;
}
return Promise.resolve();
};
onMessage(worker, ({
workerId, jobId, status, action, data,
}) => {
if (status === 'resolve') {
log(`[${workerId}]: Complete ${jobId}`);
let d = data;
if (action === 'transcode') {
d = Array.from({ ...data, length: Object.keys(data).length });
}
resolves[action]({ jobId, data: d });
} else if (status === 'reject') {
rejects[action](data);
throw Error(data);
} else if (status === 'progress') {
logger(data);
}
});
return {
id,
worker,
setResolve,
setReject,
load,
transcode,
terminate,
};
};

View File

@@ -1,7 +1,6 @@
const load = require('./load');
const transcode = require('./transcode');
require('regenerator-runtime/runtime');
const createWorker = require('./createWorker');
module.exports = {
load,
transcode,
createWorker,
};

View File

@@ -1,12 +0,0 @@
const FFmpegCore = require('@ffmpeg/core');
const { setModule } = require('./util/module');
module.exports = () => (
new Promise((resolve) => {
FFmpegCore()
.then((Module) => {
setModule(Module);
resolve();
});
})
);

View File

@@ -1,17 +0,0 @@
const fs = require('fs');
const { getModule } = require('./util/module');
const getFFmpeg = require('./util/getFFmpeg');
const strList2ptr = require('./util/strList2ptr');
const defaultArgs = require('./constants/defaultArgs');
module.exports = (inputPath, outputExt, options = '') => {
const Module = getModule();
const data = new Uint8Array(fs.readFileSync(inputPath));
const iPath = `file.${inputPath.split('.').pop()}`;
const oPath = `file.${outputExt}`;
const ffmpeg = getFFmpeg();
const args = [...defaultArgs, ...`${options} -i ${iPath} ${oPath}`.trim().split(' ')];
Module.FS.writeFile(iPath, data);
ffmpeg(args.length, strList2ptr(args));
return Buffer.from(Module.FS.readFile(oPath));
};

View File

@@ -1,6 +0,0 @@
const { getModule } = require('./module');
module.exports = () => {
const Module = getModule();
return Module.cwrap('ffmpeg', 'number', ['number', 'number']);
};

View File

@@ -1,7 +0,0 @@
let Module = null;
exports.setModule = (m) => {
Module = m;
};
exports.getModule = () => Module;

View File

@@ -1,11 +0,0 @@
const { getModule } = require('./module');
module.exports = (s) => {
const Module = getModule();
const ptr = Module._malloc((s.length + 1) * Uint8Array.BYTES_PER_ELEMENT);
for (let i = 0; i < s.length; i += 1) {
Module.setValue(ptr + i, s.charCodeAt(i), 'i8');
}
Module.setValue(ptr + s.length, 0, 'i8');
return ptr;
};

View File

@@ -1,14 +0,0 @@
const { getModule } = require('./module');
const str2ptr = require('./str2ptr');
module.exports = (strList) => {
const Module = getModule();
const listPtr = Module._malloc(strList.length * Uint32Array.BYTES_PER_ELEMENT);
strList.forEach((s, idx) => {
const strPtr = str2ptr(s);
Module.setValue(listPtr + (4 * idx), strPtr, 'i32');
});
return listPtr;
};

3
src/utils/getId.js Normal file
View File

@@ -0,0 +1,3 @@
module.exports = (prefix, cnt) => (
`${prefix}-${cnt}-${Math.random().toString(16).slice(3, 8)}`
);

9
src/utils/log.js Normal file
View File

@@ -0,0 +1,9 @@
let logging = false;
exports.logging = logging;
exports.setLogging = (_logging) => {
logging = _logging;
};
exports.log = (...args) => (logging ? console.log.apply(this, args) : null);

View File

@@ -0,0 +1,5 @@
{
"env": {
"worker": true
}
}

View File

@@ -0,0 +1,6 @@
module.exports = (corePath) => {
if (typeof global.Module === 'undefined') {
global.importScripts(corePath);
}
return global.Module;
};

View File

@@ -0,0 +1,10 @@
const worker = require('../');
const getCore = require('./getCore');
global.addEventListener('message', ({ data }) => {
worker.dispatchHandlers(data, (obj) => postMessage(obj));
});
worker.setAdapter({
getCore,
});

View File

@@ -0,0 +1,83 @@
require('regenerator-runtime/runtime');
const defaultArgs = require('./constants/defaultArgs');
let Module = null;
let adapter = null;
let ffmpeg = null;
const str2ptr = (s) => {
const ptr = Module._malloc((s.length + 1) * Uint8Array.BYTES_PER_ELEMENT);
for (let i = 0; i < s.length; i += 1) {
Module.setValue(ptr + i, s.charCodeAt(i), 'i8');
}
Module.setValue(ptr + s.length, 0, 'i8');
return ptr;
};
const strList2ptr = (strList) => {
const listPtr = Module._malloc(strList.length * Uint32Array.BYTES_PER_ELEMENT);
strList.forEach((s, idx) => {
const strPtr = str2ptr(s);
Module.setValue(listPtr + (4 * idx), strPtr, 'i32');
});
return listPtr;
};
const load = ({ payload: { options: { corePath } } }, res) => {
if (Module == null) {
const Core = adapter.getCore(corePath);
Core()
.then((_Module) => {
Module = _Module;
ffmpeg = Module.cwrap('ffmpeg', 'number', ['number', 'number']);
res.resolve(true);
});
} else {
res.resolve(true);
}
};
const transcode = ({
payload: {
media,
outputExt,
options = '',
},
}, res) => {
const data = Uint8Array.from({ ...media, length: Object.keys(media).length });
const iPath = 'media';
const oPath = `media.${outputExt}`;
const args = [...defaultArgs, ...`${options} -i ${iPath} ${oPath}`.trim().split(' ')];
Module.FS.writeFile(iPath, data);
ffmpeg(args.length, strList2ptr(args));
res.resolve(Module.FS.readFile(oPath));
};
exports.dispatchHandlers = (packet, send) => {
const res = (status, data) => {
send({
...packet,
status,
data,
});
};
res.resolve = res.bind(this, 'resolve');
res.reject = res.bind(this, 'reject');
res.progress = res.bind(this, 'progress');
try {
({
load,
transcode,
})[packet.action](packet, res);
} catch (err) {
/** Prepare exception to travel through postMessage */
res.reject(err.toString());
}
};
exports.setAdapter = (_adapter) => {
adapter = _adapter;
};

View File

@@ -0,0 +1,8 @@
let FFmpegCore = null;
module.exports = () => {
if (FFmpegCore === null) {
FFmpegCore = require('@ffmpeg/core');
}
return FFmpegCore;
};

View File

@@ -0,0 +1,10 @@
const worker = require('../');
const getCore = require('./getCore');
process.on('message', (packet) => {
worker.dispatchHandlers(packet, (obj) => process.send(obj));
});
worker.setAdapter({
getCore,
});

View File

@@ -0,0 +1,14 @@
const resolveURL = require('resolve-url');
const { version, dependencies } = require('../../../package.json');
const defaultOptions = require('../../constants/defaultOptions');
/*
* Default options for browser worker
*/
module.exports = {
...defaultOptions,
workerPath: (typeof process !== 'undefined' && process.env.FFMPEG_ENV === 'development')
? resolveURL(`/dist/worker.dev.js?nocache=${Math.random().toString(36).slice(3)}`)
: `https://unpkg.com/@ffmpeg/ffmpeg@v${version}/dist/worker.min.js`,
corePath: `https://unpkg.com/@ffmpeg/core@v${dependencies['@ffmpeg/core'].substring(1)}/ffmpeg-core.js`,
};

View File

@@ -0,0 +1,24 @@
/**
*
* Tesseract Worker adapter for browser
*
* @fileoverview Tesseract Worker adapter for browser
* @author Kevin Kwok <antimatter15@gmail.com>
* @author Guillermo Webster <gui@mit.edu>
* @author Jerome Wu <jeromewus@gmail.com>
*/
const defaultOptions = require('./defaultOptions');
const spawnWorker = require('./spawnWorker');
const terminateWorker = require('./terminateWorker');
const onMessage = require('./onMessage');
const send = require('./send');
const loadMedia = require('./loadMedia');
module.exports = {
defaultOptions,
spawnWorker,
terminateWorker,
onMessage,
send,
loadMedia,
};

View File

@@ -0,0 +1,46 @@
const resolveURL = require('resolve-url');
/**
* readFromBlobOrFile
*
* @name readFromBlobOrFile
* @function
* @access private
*/
const readFromBlobOrFile = (blob) => (
new Promise((resolve, reject) => {
const fileReader = new FileReader();
fileReader.onload = () => {
resolve(fileReader.result);
};
fileReader.onerror = ({ target: { error: { code } } }) => {
reject(Error(`File could not be read! Code=${code}`));
};
fileReader.readAsArrayBuffer(blob);
})
);
const loadMedia = async (image) => {
let data = image;
if (typeof image === 'undefined') {
return 'undefined';
}
if (typeof image === 'string') {
// Base64 Media
if (/data:image\/([a-zA-Z]*);base64,([^"]*)/.test(image)) {
data = atob(image.split(',')[1])
.split('')
.map((c) => c.charCodeAt(0));
} else {
const res = await fetch(resolveURL(image));
data = res.arrayBuffer();
}
} else if (image instanceof File || image instanceof Blob) {
data = await readFromBlobOrFile(image);
}
return new Uint8Array(data);
};
module.exports = loadMedia;

View File

@@ -0,0 +1,5 @@
module.exports = (worker, handler) => {
worker.onmessage = ({ data }) => { // eslint-disable-line
handler(data);
};
};

View File

@@ -0,0 +1,10 @@
/**
* send
*
* @name send
* @function send packet to worker and create a job
* @access public
*/
module.exports = async (worker, packet) => {
worker.postMessage(packet);
};

View File

@@ -0,0 +1,10 @@
/**
* spawnWorker
*
* @name spawnWorker
* @function create a new Worker in browser
* @access public
*/
module.exports = ({ workerPath }) => (
new Worker(workerPath)
);

View File

@@ -0,0 +1,10 @@
/**
* terminateWorker
*
* @name terminateWorker
* @function terminate worker
* @access public
*/
module.exports = (worker) => {
worker.terminate();
};

View File

@@ -0,0 +1,10 @@
const path = require('path');
const defaultOptions = require('../../constants/defaultOptions');
/*
* Default options for node worker
*/
module.exports = {
...defaultOptions,
workerPath: path.join(__dirname, '..', '..', 'worker-script', 'node', 'index.js'),
};

24
src/worker/node/index.js Normal file
View File

@@ -0,0 +1,24 @@
/**
*
* Tesseract Worker impl. for node (using child_process)
*
* @fileoverview Tesseract Worker impl. for node
* @author Kevin Kwok <antimatter15@gmail.com>
* @author Guillermo Webster <gui@mit.edu>
* @author Jerome Wu <jeromewus@gmail.com>
*/
const defaultOptions = require('./defaultOptions');
const spawnWorker = require('./spawnWorker');
const terminateWorker = require('./terminateWorker');
const onMessage = require('./onMessage');
const send = require('./send');
const loadMedia = require('./loadMedia');
module.exports = {
defaultOptions,
spawnWorker,
terminateWorker,
onMessage,
send,
loadMedia,
};

View File

@@ -0,0 +1,28 @@
const util = require('util');
const fs = require('fs');
const fetch = require('node-fetch');
const isURL = require('is-url');
const readFile = util.promisify(fs.readFile);
module.exports = async (media) => {
let data = media;
if (typeof media === 'undefined') {
return media;
}
if (typeof media === 'string') {
if (isURL(media) || media.startsWith('chrome-extension://') || media.startsWith('file://')) {
const res = await fetch(media);
data = res.arrayBuffer();
} else if (/data:media\/([a-zA-Z]*);base64,([^"]*)/.test(media)) {
data = Buffer.from(media.split(',')[1], 'base64');
} else {
data = await readFile(media);
}
} else if (Buffer.isBuffer(media)) {
data = media;
}
return new Uint8Array(data);
};

View File

@@ -0,0 +1,3 @@
module.exports = (worker, handler) => {
worker.on('message', handler);
};

10
src/worker/node/send.js Normal file
View File

@@ -0,0 +1,10 @@
/**
* send
*
* @name send
* @function send packet to worker and create a job
* @access public
*/
module.exports = (worker, packet) => {
worker.send(packet);
};

View File

@@ -0,0 +1,12 @@
const { fork } = require('child_process');
/**
* spawnWorker
*
* @name spawnWorker
* @function fork a new process in node
* @access public
*/
module.exports = ({ workerPath }) => (
fork(workerPath)
);

View File

@@ -0,0 +1,10 @@
/**
* terminateWorker
*
* @name terminateWorker
* @function kill worker
* @access public
*/
module.exports = (worker) => {
worker.kill();
};