diff --git a/package.json b/package.json index 064686d..791220f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "konoebml", - "version": "0.1.0-rc.6", + "version": "0.1.0-rc.7", "description": "A modern JavaScript implementation of EBML RFC8794", "main": "./dist/index.cjs", "module": "./dist/index.js", diff --git a/src/decoder.ts b/src/decoder.ts index 851e1cf..d2caad1 100644 --- a/src/decoder.ts +++ b/src/decoder.ts @@ -14,9 +14,25 @@ export type EbmlStreamDecoderChunkType = | ArrayBuffer | ArrayBufferLike; +export interface EbmlDecodeStreamTransformerBackpressure { + /** + * @default true + */ + enabled?: boolean; + /** + * @default () => Promise.resolve() + */ + eventLoop?: () => Promise; + /** + * @default 'byte-length' + */ + queuingStrategy?: 'byte-length' | 'count'; +} + export interface EbmlDecodeStreamTransformerOptions { collectChild?: DecodeContentCollectChildPredicate; streamStartOffset?: number; + backpressure?: EbmlDecodeStreamTransformerBackpressure; } export class EbmlDecodeStreamTransformer< @@ -30,11 +46,20 @@ export class EbmlDecodeStreamTransformer< > = new Queue(); private _tickIdleCallback: VoidFunction | undefined; private _currentTask: Promise | undefined; - private _writeBuffer = new Queue(); + private _initWatermark = 0; + public backpressure: Required; public readonly options: EbmlDecodeStreamTransformerOptions; constructor(options: EbmlDecodeStreamTransformerOptions = {}) { this.options = options; + this.backpressure = Object.assign( + { + enabled: true, + eventLoop: () => Promise.resolve(), + queuingStrategy: 'byte-length', + }, + options.backpressure ?? {} + ); } public getBuffer(): Uint8Array { @@ -148,19 +173,22 @@ export class EbmlDecodeStreamTransformer< } } - private tryEnqueueToBuffer(item: EbmlTagTrait) { - this._writeBuffer.enqueue(item); - } - - private waitBufferRelease( + private async tryEnqueueToController( ctrl: TransformStreamDefaultController, - isFlush: boolean + item: EbmlTagTrait ) { - while (this._writeBuffer.size) { - if (ctrl.desiredSize! <= 0 && !isFlush) { - break; + if (this.backpressure.enabled) { + const eventLoop = this.backpressure.eventLoop; + while (true) { + if (ctrl.desiredSize! < this._initWatermark) { + await eventLoop(); + } else { + ctrl.enqueue(item as unknown as E); + break; + } } - ctrl.enqueue(this._writeBuffer.dequeue() as unknown as E); + } else { + ctrl.enqueue(item as unknown as E); } } @@ -188,7 +216,7 @@ export class EbmlDecodeStreamTransformer< collectChild: this.options.collectChild, dataViewController: this, })) { - this.tryEnqueueToBuffer(tag); + await this.tryEnqueueToController(ctrl, tag); } this._currentTask = undefined; } catch (err) { @@ -201,7 +229,6 @@ export class EbmlDecodeStreamTransformer< } await Promise.race([this._currentTask, waitIdle]); - this.waitBufferRelease(ctrl, isFlush); } async start(ctrl: TransformStreamDefaultController) { @@ -210,6 +237,7 @@ export class EbmlDecodeStreamTransformer< this._requests.clear(); this._tickIdleCallback = undefined; this._currentTask = undefined; + this._initWatermark = ctrl.desiredSize ?? 0; await this.tick(ctrl, false); } @@ -249,7 +277,18 @@ export class EbmlStreamDecoder< constructor(options: EbmlStreamDecoderOptions = {}) { const transformer = new EbmlDecodeStreamTransformer(options); - super(transformer); + const queuingStrategy = transformer.backpressure.queuingStrategy; + const outputQueuingStrategySize = + queuingStrategy === 'count' + ? (a: E) => { + const s = a?.countQueuingSize; + return s >= 0 ? s : 1; + } + : (a: E) => { + const s = a?.byteLengthQueuingSize; + return s >= 0 ? s : 1; + }; + super(transformer, undefined, { size: outputQueuingStrategySize }); this.transformer = transformer; } } diff --git a/src/encoder.ts b/src/encoder.ts index de88181..a4476e9 100644 --- a/src/encoder.ts +++ b/src/encoder.ts @@ -1,40 +1,79 @@ -import { Queue, Stack } from 'mnemonist'; +import { Stack } from 'mnemonist'; import { EbmlTreeMasterNotMatchError, UnreachableOrLogicError } from './errors'; import { EbmlTagPosition } from './models/enums'; import type { EbmlTagType } from './models/tag'; import { EbmlMasterTag } from './models/tag-master'; import { EbmlTagTrait } from './models/tag-trait'; +export interface EbmlEncodeStreamTransformerBackpressure { + /** + * @default true + */ + enabled?: boolean; + /** + * @default () => Promise.resolve() + */ + eventLoop?: () => Promise; + /** + * @default 'byte-length' + */ + queuingStrategy?: 'byte-length' | 'count'; +} + +export interface EbmlEncodeStreamTransformerOptions { + backpressure?: EbmlEncodeStreamTransformerBackpressure; +} + export class EbmlEncodeStreamTransformer implements Transformer { stack = new Stack<[EbmlMasterTag, Uint8Array[]]>(); - _writeBuffer = new Queue(); - _writeBufferTask: Promise | undefined; closed = false; + private _initWatermark = 0; + public backpressure: Required; + public readonly options: EbmlEncodeStreamTransformerOptions; - tryEnqueueToBuffer(...frag: Uint8Array[]) { + constructor(options: EbmlEncodeStreamTransformerOptions = {}) { + this.options = options; + this.backpressure = Object.assign( + { + enabled: true, + eventLoop: () => Promise.resolve(), + queuingStrategy: 'byte-length', + }, + options.backpressure ?? {} + ); + } + + async tryEnqueueToController( + ctrl: TransformStreamDefaultController, + ...frag: Uint8Array[] + ) { const top = this.stack.peek(); if (top) { top[1].push(...frag); + } else if (this.backpressure.enabled) { + const eventLoop = this.backpressure.eventLoop; + let i = 0; + while (i < frag.length) { + if (ctrl.desiredSize! < this._initWatermark) { + await eventLoop(); + } else { + ctrl.enqueue(frag[i]); + i++; + } + } } else { - for (const f of frag) { - this._writeBuffer.enqueue(f); + let i = 0; + while (i < frag.length) { + ctrl.enqueue(frag[i]); + i++; } } } - waitBufferRelease( - ctrl: TransformStreamDefaultController, - isFlush: boolean - ) { - while (this._writeBuffer.size) { - if (ctrl.desiredSize! <= 0 && !isFlush) { - break; - } - const pop = this._writeBuffer.dequeue(); - ctrl.enqueue(pop); - } + start(ctrl: TransformStreamDefaultController) { + this._initWatermark = ctrl.desiredSize ?? 0; } // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: @@ -49,7 +88,7 @@ export class EbmlEncodeStreamTransformer if (tag instanceof EbmlMasterTag) { if (tag.contentLength === Number.POSITIVE_INFINITY) { if (tag.position === EbmlTagPosition.Start) { - this.tryEnqueueToBuffer(...tag.encodeHeader()); + await this.tryEnqueueToController(ctrl, ...tag.encodeHeader()); } } else { // biome-ignore lint/style/useCollapsedElseIf: @@ -66,30 +105,35 @@ export class EbmlEncodeStreamTransformer 0 ); startTag.contentLength = size; - this.tryEnqueueToBuffer(...startTag.encodeHeader()); - this.tryEnqueueToBuffer(...fragments); + await this.tryEnqueueToController(ctrl, ...startTag.encodeHeader()); + await this.tryEnqueueToController(ctrl, ...fragments); } } } else { - this.tryEnqueueToBuffer(...tag.encode()); + await this.tryEnqueueToController(ctrl, ...tag.encode()); } - this.waitBufferRelease(ctrl, false); - } - - flush(ctrl: TransformStreamDefaultController) { - this.waitBufferRelease(ctrl, true); } } +export interface EbmlStreamEncoderOptions + extends EbmlEncodeStreamTransformerOptions {} + export class EbmlStreamEncoder extends TransformStream< EbmlTagTrait | EbmlTagType, Uint8Array > { public readonly transformer: EbmlEncodeStreamTransformer; - constructor() { - const transformer = new EbmlEncodeStreamTransformer(); - super(transformer); + constructor(options: EbmlStreamEncoderOptions = {}) { + const transformer = new EbmlEncodeStreamTransformer(options); + const queuingStrategy = transformer.backpressure.queuingStrategy; + const inputQueuingStrategySize = + queuingStrategy === 'count' + ? (a: EbmlTagTrait | EbmlTagType) => + a?.countQueuingSize >= 0 ? a.countQueuingSize : 1 + : (a: EbmlTagTrait | EbmlTagType) => + a?.byteLengthQueuingSize >= 0 ? a.byteLengthQueuingSize : 1; + super(transformer, { size: inputQueuingStrategySize }); this.transformer = transformer; } } diff --git a/src/models/tag-data.ts b/src/models/tag-data.ts index 27edeef..5bf614f 100644 --- a/src/models/tag-data.ts +++ b/src/models/tag-data.ts @@ -31,6 +31,10 @@ export class EbmlDataTag extends EbmlTagTrait { }); } + override get byteLengthQueuingSize(): number { + return this.totalLength; + } + // biome-ignore lint/correctness/useYield: override async *decodeContentImpl(options: DecodeContentOptions) { const controller = options.dataViewController; diff --git a/src/models/tag-master.ts b/src/models/tag-master.ts index 9a94564..162c288 100644 --- a/src/models/tag-master.ts +++ b/src/models/tag-master.ts @@ -16,14 +16,6 @@ export interface CreateEbmlMasterTagOptions export class EbmlMasterTag extends EbmlTagTrait { private _children: EbmlTagTrait[] = []; - get children(): EbmlTagTrait[] { - return this._children; - } - - set children(value: EbmlTagTrait[]) { - this._children = value; - } - constructor(options: CreateEbmlMasterTagOptions) { super({ ...options, @@ -32,6 +24,21 @@ export class EbmlMasterTag extends EbmlTagTrait { }); } + override get byteLengthQueuingSize(): number { + if (this.position === EbmlTagPosition.Start) { + return this.headerLength; + } + return 0; + } + + get children(): EbmlTagTrait[] { + return this._children; + } + + set children(value: EbmlTagTrait[]) { + this._children = value; + } + *encodeContent(): Generator { for (const child of this.children) { yield* child.encode(); diff --git a/src/models/tag-trait.ts b/src/models/tag-trait.ts index e5d8007..3b9666e 100644 --- a/src/models/tag-trait.ts +++ b/src/models/tag-trait.ts @@ -76,6 +76,11 @@ export abstract class EbmlTagTrait { this._endOffset = options.endOffset; } + public abstract get byteLengthQueuingSize(): number; + public get countQueuingSize(): number { + return 1; + } + public set contentLength(value: number) { this._contentLength = value; } diff --git a/tests/decoder.spec.ts b/tests/decoder.spec.ts index d2dbc06..13ab856 100644 --- a/tests/decoder.spec.ts +++ b/tests/decoder.spec.ts @@ -28,7 +28,7 @@ async function collectTags(decoder: Decoder): Promise { return tags; } -describe('EbmlStreamDecoder', () => { +describe('Ebml Decoder', () => { it('should wait for more data if a tag is longer than the buffer', async () => { const decoder = getDecoderWithNullSink(); const writer = decoder.writable.getWriter(); diff --git a/tests/encoder.spec.ts b/tests/encoder.spec.ts index 82b3e64..8a26968 100644 --- a/tests/encoder.spec.ts +++ b/tests/encoder.spec.ts @@ -56,7 +56,9 @@ const makeEncoderTest = async (tags: EbmlTagTrait[]) => { controller.close(); }, }); + const encoder = new EbmlStreamEncoder(); + const chunks: ArrayBuffer[] = []; await new Promise((resolve, reject) => { @@ -70,6 +72,9 @@ const makeEncoderTest = async (tags: EbmlTagTrait[]) => { close() { resolve(); }, + abort: (e) => { + reject(e); + }, }) ) .catch(reject); @@ -106,16 +111,15 @@ describe('EBML Encoder', () => { ]); }); - describe('#writeTag', () => { - it('throws with an incomplete tag data', async () => { - await expect(() => makeEncoderTest([incompleteTag])).rejects.toThrow( - /should only accept embl tag but not/ - ); - }); - it('throws with an invalid tag id', async () => { - await expect(() => makeEncoderTest([invalidTag])).rejects.toThrow( - /should only accept embl tag but not/ - ); - }); + it('throws with an incomplete tag data', async () => { + await expect(() => makeEncoderTest([incompleteTag])).rejects.toThrow( + /should only accept embl tag but not/ + ); + }); + + it('throws with an invalid tag id', async () => { + await expect(() => makeEncoderTest([invalidTag])).rejects.toThrow( + /should only accept embl tag but not/ + ); }); });