konoebml/src/decoder.ts

295 lines
7.8 KiB
TypeScript

import { Queue } from 'mnemonist';
import type { FileDataViewController } from './adapters';
import { decodeEbmlContent } from './decode-utils';
import { StreamFlushReason, UnreachableOrLogicError } from './errors';
import type { EbmlTagType } from './models/tag-union';
import type {
DecodeContentCollectChildPredicate,
EbmlTagTrait,
} from './models/tag-trait';
import { dataViewSlice } from './tools';
export type EbmlStreamDecoderChunkType =
| Uint8Array
| ArrayBuffer
| ArrayBufferLike;
export interface EbmlDecodeStreamTransformerBackpressure {
/**
* @default true
*/
enabled?: boolean;
/**
* @default () => Promise.resolve()
*/
eventLoop?: () => Promise<void>;
/**
* @default 'byte-length'
*/
queuingStrategy?: 'byte-length' | 'count';
}
export interface EbmlDecodeStreamTransformerOptions {
collectChild?: DecodeContentCollectChildPredicate;
streamStartOffset?: number;
backpressure?: EbmlDecodeStreamTransformerBackpressure;
}
export class EbmlDecodeStreamTransformer<
E extends EbmlTagType | EbmlTagTrait = EbmlTagType,
> implements Transformer<EbmlStreamDecoderChunkType, E>, FileDataViewController
{
private _offset = 0;
private _buffer: Uint8Array = new Uint8Array(0);
private _requests: Queue<
[number, number, (view: DataView) => void, (err: Error) => void]
> = new Queue();
private _tickIdleCallback: VoidFunction | undefined;
private _currentTask: Promise<void> | undefined;
private _initWatermark = 0;
public backpressure: Required<EbmlDecodeStreamTransformerBackpressure>;
public readonly options: EbmlDecodeStreamTransformerOptions;
constructor(options: EbmlDecodeStreamTransformerOptions = {}) {
this.options = options;
this.backpressure = Object.assign(
{
enabled: true,
eventLoop: () => Promise.resolve(),
queuingStrategy: 'byte-length',
},
options.backpressure ?? {}
);
}
public getBuffer(): Uint8Array {
return this._buffer;
}
public getOffset(): number {
return this._offset;
}
public seek(nextOffset: number): number {
const oldOffset = this._offset;
if (this._requests.size > 0) {
throw new UnreachableOrLogicError(
'sequential transformer should not seek before all read requests done'
);
}
if (nextOffset < oldOffset) {
throw new UnreachableOrLogicError(
'sequential transformer should not seek to previous offset'
);
}
this._buffer = this._buffer.slice(nextOffset - oldOffset);
this._offset = nextOffset;
return nextOffset;
}
public async read(
reqStart: number,
reqLength = 0,
exactLength = false
): Promise<DataView> {
const bufferStart = this._offset;
const bufferEnd = bufferStart + this._buffer.byteLength;
const reqEnd = reqStart + reqLength;
if (reqStart < bufferStart) {
throw new UnreachableOrLogicError(
'sequential transformer should not read before current offset'
);
}
let view: DataView;
if (bufferEnd >= reqEnd) {
view = new DataView(this._buffer.buffer, reqStart - bufferStart);
} else {
view = await new Promise<DataView>((resolve, reject) => {
this._requests.enqueue([reqStart, reqEnd, resolve, reject]);
this.notifyIdle();
});
}
view = exactLength ? dataViewSlice(view, 0, reqLength) : view;
return view;
}
public async peek(offset: number): Promise<DataView | null> {
try {
return await this.read(offset, 1);
} catch (e) {
if (e instanceof StreamFlushReason) {
return null;
}
throw e;
}
}
private handleRequests() {
const bufferStart = this._offset;
const bufferEnd = bufferStart + this._buffer.byteLength;
let handleCounts = this._requests.size;
while (handleCounts > 0) {
const req = this._requests.dequeue();
if (!req) {
break;
}
const [reqStart, reqEnd, resolve, _] = req;
if (reqStart < bufferStart) {
throw new UnreachableOrLogicError(
'sequential transformer should not read before current offset'
);
}
if (bufferEnd >= reqEnd) {
resolve(new DataView(this._buffer.buffer, reqStart - bufferStart));
} else {
this.notifyIdle();
this._requests.enqueue(req);
}
handleCounts--;
}
}
private cancelRequests() {
while (true) {
const req = this._requests.dequeue();
if (!req) {
break;
}
const [_reqStart, _reqEnd, _resolve, reject] = req;
reject(new StreamFlushReason());
}
}
private notifyIdle() {
if (this._tickIdleCallback) {
this._tickIdleCallback();
}
}
private async tryEnqueueToController(
ctrl: TransformStreamDefaultController<E>,
item: EbmlTagTrait
) {
if (this.backpressure.enabled) {
const eventLoop = this.backpressure.eventLoop;
while (true) {
if (ctrl.desiredSize! < this._initWatermark) {
await eventLoop();
} else {
ctrl.enqueue(item as unknown as E);
break;
}
}
} else {
ctrl.enqueue(item as unknown as E);
}
}
private async tick(
ctrl: TransformStreamDefaultController<E>,
isFlush: boolean
) {
const waitIdle = new Promise<void>((resolve) => {
this._tickIdleCallback = () => {
resolve();
this._tickIdleCallback = undefined;
};
});
if (isFlush) {
this.cancelRequests();
} else {
this.handleRequests();
}
if (!this._currentTask && !isFlush) {
const decode = async () => {
try {
for await (const tag of decodeEbmlContent({
collectChild: this.options.collectChild,
dataViewController: this,
})) {
await this.tryEnqueueToController(ctrl, tag);
}
this._currentTask = undefined;
} catch (err) {
if (!(err instanceof StreamFlushReason)) {
ctrl.error(err);
}
}
};
this._currentTask = decode();
}
await Promise.race([this._currentTask, waitIdle]);
}
async start(ctrl: TransformStreamDefaultController<E>) {
this._offset = this.options.streamStartOffset ?? 0;
this._buffer = new Uint8Array(0);
this._requests.clear();
this._tickIdleCallback = undefined;
this._currentTask = undefined;
this._initWatermark = ctrl.desiredSize ?? 0;
await this.tick(ctrl, false);
}
async transform(
chunk: EbmlStreamDecoderChunkType,
ctrl: TransformStreamDefaultController<E>
): Promise<void> {
if (chunk.byteLength === 0) {
return;
}
const newBuffer = new Uint8Array(
this._buffer.byteLength + chunk.byteLength
);
newBuffer.set(this._buffer, 0);
newBuffer.set(
chunk instanceof Uint8Array ? chunk : new Uint8Array(chunk),
this._buffer.byteLength
);
this._buffer = newBuffer;
await this.tick(ctrl, false);
}
async flush(ctrl: TransformStreamDefaultController<E>) {
await this.tick(ctrl, true);
}
}
export interface EbmlStreamDecoderOptions
extends EbmlDecodeStreamTransformerOptions {}
export class EbmlStreamDecoder<
E extends EbmlTagType | EbmlTagTrait = EbmlTagType,
> extends TransformStream<EbmlStreamDecoderChunkType, E> {
public readonly transformer: EbmlDecodeStreamTransformer<E>;
constructor(options: EbmlStreamDecoderOptions = {}) {
const transformer = new EbmlDecodeStreamTransformer<E>(options);
const queuingStrategy = transformer.backpressure.queuingStrategy;
const outputQueuingStrategySize =
queuingStrategy === 'count'
? (a: E) => {
const s = a?.countQueuingSize;
return s >= 0 ? s : 1;
}
: (a: E) => {
const s = a?.byteLengthQueuingSize;
return s >= 0 ? s : 1;
};
super(transformer, undefined, { size: outputQueuingStrategySize });
this.transformer = transformer;
}
}