Use IDBFS and NODEFS to process big file

This commit is contained in:
Jerome Wu
2019-12-03 22:06:44 +08:00
parent 72a2ff6e84
commit 3ab760b5bb
20 changed files with 215 additions and 162 deletions

View File

@@ -8,8 +8,9 @@ const {
spawnWorker,
terminateWorker,
onMessage,
loadMedia,
send,
fetchFile,
fs,
} = require('./worker/node');
let workerCounter = 0;
@@ -58,57 +59,72 @@ module.exports = (_options = {}) => {
}))
);
const write = async (path, data, jobId) => (
const syncfs = (populate, jobId) => (
startJob(createJob({
id: jobId,
action: 'write',
payload: {
path,
data: await loadMedia(data),
},
id: jobId, action: 'syncfs', payload: { populate },
}))
);
const writeText = async (path, text, jobId) => (
const write = async (path, data) => {
await syncfs();
await fs.writeFile(path, await fetchFile(data));
await syncfs(true);
return {
path: `/data/${path}`,
};
};
const writeText = async (path, text) => {
await syncfs(true);
await fs.writeFile(path, text);
await syncfs(true);
return {
path: `/data/${path}`,
};
};
const read = async (path, del = true) => {
const data = await fs.readFile(path);
if (del) {
await fs.deleteFile(path);
}
return {
data,
};
};
const remove = async (path) => {
await fs.deleteFile(path);
return {
path: `/data/${path}`,
};
};
const run = (args, opts = {}, jobId) => (
startJob(createJob({
id: jobId,
action: 'writeText',
payload: {
path,
text,
},
id: jobId, action: 'run', payload: { args, options: opts },
}))
);
const run = (args, jobId) => (
const transcode = (inputPath, outputPath, opts = '', del = true, jobId) => (
run(
`${opts} -i /data/${inputPath} ${outputPath}`,
{ inputPath, outputPath, del },
jobId,
)
);
const trim = (inputPath, outputPath, from, to, opts = '', del = true, jobId) => (
run(
`${opts} -ss ${from} -i /data/${inputPath} -t ${to} -c copy ${outputPath}`,
{ inputPath, outputPath, del },
jobId,
)
);
const ls = (path, jobId) => (
startJob(createJob({
id: jobId, action: 'run', payload: { args },
}))
);
const transcode = (inputPath, outputPath, opts = '', jobId) => (
run(`${opts} -i ${inputPath} ${outputPath}`, jobId)
);
const trim = (inputPath, outputPath, from, to, opts = '', jobId) => (
run(`${opts} -ss ${from} -i ${inputPath} -t ${to} -c copy ${outputPath}`, jobId)
);
const read = (path, jobId) => (
startJob(createJob({
id: jobId, action: 'read', payload: { path },
}))
);
const remove = (path, jobId) => (
startJob(createJob({
id: jobId, action: 'remove', payload: { path },
}))
);
const mkdir = (path, jobId) => (
startJob(createJob({
id: jobId, action: 'mkdir', payload: { path },
id: jobId, action: 'ls', payload: { path },
}))
);
@@ -151,14 +167,15 @@ module.exports = (_options = {}) => {
setResolve,
setReject,
load,
syncfs,
write,
writeText,
transcode,
trim,
read,
remove,
mkdir,
run,
transcode,
trim,
ls,
terminate,
};
};

View File

@@ -6,17 +6,19 @@ const ts2sec = (ts) => {
};
module.exports = ({ message }, progress) => {
if (message.startsWith(' Duration')) {
const ts = message.split(', ')[0].split(': ')[1];
const d = ts2sec(ts);
if (duration === 0 || duration > d) {
duration = d;
if (typeof message === 'string') {
if (message.startsWith(' Duration')) {
const ts = message.split(', ')[0].split(': ')[1];
const d = ts2sec(ts);
if (duration === 0 || duration > d) {
duration = d;
}
} else if (message.startsWith('frame')) {
const ts = message.split('time=')[1].split(' ')[0];
const t = ts2sec(ts);
progress({ ratio: t / duration });
} else if (message.startsWith('video:')) {
progress({ ratio: 1 });
}
} else if (message.startsWith('frame')) {
const ts = message.split('time=')[1].split(' ')[0];
const t = ts2sec(ts);
progress({ ratio: t / duration });
} else if (message.startsWith('video:')) {
progress({ ratio: 1 });
}
};

View File

@@ -1,5 +1,6 @@
const worker = require('../');
const getCore = require('./getCore');
const fs = require('../../worker/browser/fs');
global.addEventListener('message', ({ data }) => {
worker.dispatchHandlers(data, (obj) => postMessage(obj));
@@ -7,4 +8,5 @@ global.addEventListener('message', ({ data }) => {
worker.setAdapter({
getCore,
fs,
});

View File

@@ -30,7 +30,7 @@ const load = ({ workerId, payload: { options: { corePath } } }, res) => {
if (Module == null) {
const Core = adapter.getCore(corePath);
Core()
.then((_Module) => {
.then(async (_Module) => {
Module = _Module;
Module.setLogger((message, type) => {
res.progress({
@@ -45,60 +45,37 @@ const load = ({ workerId, payload: { options: { corePath } } }, res) => {
}
};
const write = ({
const syncfs = async ({
payload: {
path,
data,
populate = false,
},
}, res) => {
const d = Uint8Array.from({ ...data, length: Object.keys(data).length });
Module.FS.writeFile(path, d);
res.resolve({ message: `Write ${path} (${d.length} bytes)` });
await Module.syncfs(populate);
res.resolve({ message: `Sync file system with populate=${populate}` });
};
const writeText = ({
payload: {
path,
text,
},
}, res) => {
Module.FS.writeFile(path, text);
res.resolve({ message: `Write ${path} (${text.length} bytes)` });
};
const read = ({
const ls = ({
payload: {
path,
},
}, res) => {
res.resolve(Module.FS.readFile(path));
const dirs = Module.FS.readdir(path);
res.resolve({ message: `List path ${path}`, dirs });
};
const remove = ({
payload: {
path,
},
}, res) => {
Module.FS.unlink(path);
res.resolve({ message: `Delete ${path}` });
};
const mkdir = ({
payload: {
path,
},
}, res) => {
Module.FS.mkdir(path);
res.resolve({ message: `Create Directory ${path}` });
};
const run = ({
const run = async ({
payload: {
args: _args,
options: { inputPath, outputPath, del },
},
}, res) => {
const args = [...defaultArgs, ..._args.trim().split(' ')];
ffmpeg(args.length, strList2ptr(args));
await adapter.fs.writeFile(outputPath, Module.FS.readFile(outputPath));
Module.FS.unlink(outputPath);
if (del && typeof inputPath === 'string') {
await adapter.fs.deleteFile(inputPath);
}
res.resolve({ message: `Complete ${args.join(' ')}` });
};
@@ -118,11 +95,8 @@ exports.dispatchHandlers = (packet, send) => {
try {
({
load,
write,
writeText,
read,
remove,
mkdir,
ls,
syncfs,
run,
})[packet.action](packet, res);
} catch (err) {

View File

@@ -1,5 +1,6 @@
const worker = require('../');
const getCore = require('./getCore');
const fs = require('../../worker/node/fs');
process.on('message', (packet) => {
worker.dispatchHandlers(packet, (obj) => process.send(obj));
@@ -7,4 +8,5 @@ process.on('message', (packet) => {
worker.setAdapter({
getCore,
fs,
});

View File

@@ -20,27 +20,25 @@ const readFromBlobOrFile = (blob) => (
})
);
const loadMedia = async (image) => {
let data = image;
if (typeof image === 'undefined') {
module.exports = async (_data) => {
let data = _data;
if (typeof _data === 'undefined') {
return 'undefined';
}
if (typeof image === 'string') {
// Base64 Media
if (/data:image\/([a-zA-Z]*);base64,([^"]*)/.test(image)) {
data = atob(image.split(',')[1])
if (typeof _data === 'string') {
// Base64 _data
if (/data:_data\/([a-zA-Z]*);base64,([^"]*)/.test(_data)) {
data = atob(_data.split(',')[1])
.split('')
.map((c) => c.charCodeAt(0));
} else {
const res = await fetch(resolveURL(image));
const res = await fetch(resolveURL(_data));
data = await res.arrayBuffer();
}
} else if (image instanceof File || image instanceof Blob) {
data = await readFromBlobOrFile(image);
} else if (_data instanceof File || _data instanceof Blob) {
data = await readFromBlobOrFile(_data);
}
return new Uint8Array(data);
};
module.exports = loadMedia;

34
src/worker/browser/fs.js Normal file
View File

@@ -0,0 +1,34 @@
const { openDB } = require('idb');
const getDB = () => openDB('/data', 21);
const getDataKeyAndMode = async (db) => {
const dummy = await db.get('FILE_DATA', '/data/.DUMMY');
const dataKey = Object.keys(dummy).filter((k) => !['mode', 'timestamp'].includes(k)).pop();
return { dataKey, mode: dummy.mode };
};
module.exports = {
readFile: async (path) => {
const db = await getDB();
const { dataKey } = await getDataKeyAndMode(db);
return (await db.get('FILE_DATA', `/data/${path}`))[dataKey];
},
writeFile: async (path, data) => {
const db = await getDB();
const { dataKey, mode } = await getDataKeyAndMode(db);
await db.put(
'FILE_DATA',
{
[dataKey]: data,
mode,
timestamp: new Date(),
},
`/data/${path}`,
);
},
deleteFile: async (path) => {
const db = await getDB();
await db.delete('FILE_DATA', `/data/${path}`);
},
};

View File

@@ -12,7 +12,8 @@ const spawnWorker = require('./spawnWorker');
const terminateWorker = require('./terminateWorker');
const onMessage = require('./onMessage');
const send = require('./send');
const loadMedia = require('./loadMedia');
const fetchFile = require('./fetchFile');
const fs = require('./fs');
module.exports = {
defaultOptions,
@@ -20,5 +21,6 @@ module.exports = {
terminateWorker,
onMessage,
send,
loadMedia,
fetchFile,
fs,
};

View File

@@ -0,0 +1,26 @@
const util = require('util');
const fs = require('fs');
const fetch = require('node-fetch');
const isURL = require('is-url');
module.exports = async (_data) => {
let data = _data;
if (typeof _data === 'undefined') {
return _data;
}
if (typeof _data === 'string') {
if (isURL(_data) || _data.startsWith('chrome-extension://') || _data.startsWith('file://')) {
const res = await fetch(_data);
data = await res.arrayBuffer();
} else if (/data:_data\/([a-zA-Z]*);base64,([^"]*)/.test(_data)) {
data = Buffer.from(_data.split(',')[1], 'base64');
} else {
data = await util.promisify(fs.readFile)(_data);
}
} else if (Buffer.isBuffer(_data)) {
data = _data;
}
return data;
};

16
src/worker/node/fs.js Normal file
View File

@@ -0,0 +1,16 @@
const util = require('util');
const fs = require('fs');
const readFile = util.promisify(fs.readFile);
const writeFile = util.promisify(fs.writeFile);
const deleteFile = util.promisify(fs.unlink);
module.exports = (path) => (
readFile(`./data/${path}`)
);
module.exports = {
readFile: (path) => readFile(`./data/${path}`),
writeFile: (path, data) => writeFile(`./data/${path}`, data),
deleteFile: (path) => deleteFile(`./data/${path}`),
};

View File

@@ -1,18 +1,10 @@
/**
*
* Tesseract Worker impl. for node (using child_process)
*
* @fileoverview Tesseract Worker impl. for node
* @author Kevin Kwok <antimatter15@gmail.com>
* @author Guillermo Webster <gui@mit.edu>
* @author Jerome Wu <jeromewus@gmail.com>
*/
const defaultOptions = require('./defaultOptions');
const spawnWorker = require('./spawnWorker');
const terminateWorker = require('./terminateWorker');
const onMessage = require('./onMessage');
const send = require('./send');
const loadMedia = require('./loadMedia');
const fetchFile = require('./fetchFile');
const fs = require('./fs');
module.exports = {
defaultOptions,
@@ -20,5 +12,6 @@ module.exports = {
terminateWorker,
onMessage,
send,
loadMedia,
fetchFile,
fs,
};

View File

@@ -1,28 +0,0 @@
const util = require('util');
const fs = require('fs');
const fetch = require('node-fetch');
const isURL = require('is-url');
const readFile = util.promisify(fs.readFile);
module.exports = async (media) => {
let data = media;
if (typeof media === 'undefined') {
return media;
}
if (typeof media === 'string') {
if (isURL(media) || media.startsWith('chrome-extension://') || media.startsWith('file://')) {
const res = await fetch(media);
data = await res.arrayBuffer();
} else if (/data:media\/([a-zA-Z]*);base64,([^"]*)/.test(media)) {
data = Buffer.from(media.split(',')[1], 'base64');
} else {
data = await readFile(media);
}
} else if (Buffer.isBuffer(media)) {
data = media;
}
return new Uint8Array(data);
};