feat: add backpressure support

This commit is contained in:
master 2025-03-19 01:05:24 +08:00
parent 9a402b0921
commit 6cc8dfab7c
8 changed files with 167 additions and 64 deletions

View File

@ -1,6 +1,6 @@
{
"name": "konoebml",
"version": "0.1.0-rc.6",
"version": "0.1.0-rc.7",
"description": "A modern JavaScript implementation of EBML RFC8794",
"main": "./dist/index.cjs",
"module": "./dist/index.js",

View File

@ -14,9 +14,25 @@ export type EbmlStreamDecoderChunkType =
| ArrayBuffer
| ArrayBufferLike;
export interface EbmlDecodeStreamTransformerBackpressure {
/**
* @default true
*/
enabled?: boolean;
/**
* @default () => Promise.resolve()
*/
eventLoop?: () => Promise<void>;
/**
* @default 'byte-length'
*/
queuingStrategy?: 'byte-length' | 'count';
}
export interface EbmlDecodeStreamTransformerOptions {
collectChild?: DecodeContentCollectChildPredicate;
streamStartOffset?: number;
backpressure?: EbmlDecodeStreamTransformerBackpressure;
}
export class EbmlDecodeStreamTransformer<
@ -30,11 +46,20 @@ export class EbmlDecodeStreamTransformer<
> = new Queue();
private _tickIdleCallback: VoidFunction | undefined;
private _currentTask: Promise<void> | undefined;
private _writeBuffer = new Queue<EbmlTagTrait>();
private _initWatermark = 0;
public backpressure: Required<EbmlDecodeStreamTransformerBackpressure>;
public readonly options: EbmlDecodeStreamTransformerOptions;
constructor(options: EbmlDecodeStreamTransformerOptions = {}) {
this.options = options;
this.backpressure = Object.assign(
{
enabled: true,
eventLoop: () => Promise.resolve(),
queuingStrategy: 'byte-length',
},
options.backpressure ?? {}
);
}
public getBuffer(): Uint8Array {
@ -148,19 +173,22 @@ export class EbmlDecodeStreamTransformer<
}
}
private tryEnqueueToBuffer(item: EbmlTagTrait) {
this._writeBuffer.enqueue(item);
}
private waitBufferRelease(
private async tryEnqueueToController(
ctrl: TransformStreamDefaultController<E>,
isFlush: boolean
item: EbmlTagTrait
) {
while (this._writeBuffer.size) {
if (ctrl.desiredSize! <= 0 && !isFlush) {
if (this.backpressure.enabled) {
const eventLoop = this.backpressure.eventLoop;
while (true) {
if (ctrl.desiredSize! < this._initWatermark) {
await eventLoop();
} else {
ctrl.enqueue(item as unknown as E);
break;
}
ctrl.enqueue(this._writeBuffer.dequeue() as unknown as E);
}
} else {
ctrl.enqueue(item as unknown as E);
}
}
@ -188,7 +216,7 @@ export class EbmlDecodeStreamTransformer<
collectChild: this.options.collectChild,
dataViewController: this,
})) {
this.tryEnqueueToBuffer(tag);
await this.tryEnqueueToController(ctrl, tag);
}
this._currentTask = undefined;
} catch (err) {
@ -201,7 +229,6 @@ export class EbmlDecodeStreamTransformer<
}
await Promise.race([this._currentTask, waitIdle]);
this.waitBufferRelease(ctrl, isFlush);
}
async start(ctrl: TransformStreamDefaultController<E>) {
@ -210,6 +237,7 @@ export class EbmlDecodeStreamTransformer<
this._requests.clear();
this._tickIdleCallback = undefined;
this._currentTask = undefined;
this._initWatermark = ctrl.desiredSize ?? 0;
await this.tick(ctrl, false);
}
@ -249,7 +277,18 @@ export class EbmlStreamDecoder<
constructor(options: EbmlStreamDecoderOptions = {}) {
const transformer = new EbmlDecodeStreamTransformer<E>(options);
super(transformer);
const queuingStrategy = transformer.backpressure.queuingStrategy;
const outputQueuingStrategySize =
queuingStrategy === 'count'
? (a: E) => {
const s = a?.countQueuingSize;
return s >= 0 ? s : 1;
}
: (a: E) => {
const s = a?.byteLengthQueuingSize;
return s >= 0 ? s : 1;
};
super(transformer, undefined, { size: outputQueuingStrategySize });
this.transformer = transformer;
}
}

View File

@ -1,40 +1,79 @@
import { Queue, Stack } from 'mnemonist';
import { Stack } from 'mnemonist';
import { EbmlTreeMasterNotMatchError, UnreachableOrLogicError } from './errors';
import { EbmlTagPosition } from './models/enums';
import type { EbmlTagType } from './models/tag';
import { EbmlMasterTag } from './models/tag-master';
import { EbmlTagTrait } from './models/tag-trait';
export interface EbmlEncodeStreamTransformerBackpressure {
/**
* @default true
*/
enabled?: boolean;
/**
* @default () => Promise.resolve()
*/
eventLoop?: () => Promise<void>;
/**
* @default 'byte-length'
*/
queuingStrategy?: 'byte-length' | 'count';
}
export interface EbmlEncodeStreamTransformerOptions {
backpressure?: EbmlEncodeStreamTransformerBackpressure;
}
export class EbmlEncodeStreamTransformer
implements Transformer<EbmlTagTrait | EbmlTagType, Uint8Array>
{
stack = new Stack<[EbmlMasterTag, Uint8Array[]]>();
_writeBuffer = new Queue<Uint8Array>();
_writeBufferTask: Promise<void> | undefined;
closed = false;
private _initWatermark = 0;
public backpressure: Required<EbmlEncodeStreamTransformerBackpressure>;
public readonly options: EbmlEncodeStreamTransformerOptions;
tryEnqueueToBuffer(...frag: Uint8Array[]) {
constructor(options: EbmlEncodeStreamTransformerOptions = {}) {
this.options = options;
this.backpressure = Object.assign(
{
enabled: true,
eventLoop: () => Promise.resolve(),
queuingStrategy: 'byte-length',
},
options.backpressure ?? {}
);
}
async tryEnqueueToController(
ctrl: TransformStreamDefaultController<Uint8Array>,
...frag: Uint8Array[]
) {
const top = this.stack.peek();
if (top) {
top[1].push(...frag);
} else if (this.backpressure.enabled) {
const eventLoop = this.backpressure.eventLoop;
let i = 0;
while (i < frag.length) {
if (ctrl.desiredSize! < this._initWatermark) {
await eventLoop();
} else {
for (const f of frag) {
this._writeBuffer.enqueue(f);
ctrl.enqueue(frag[i]);
i++;
}
}
} else {
let i = 0;
while (i < frag.length) {
ctrl.enqueue(frag[i]);
i++;
}
}
}
waitBufferRelease(
ctrl: TransformStreamDefaultController<Uint8Array>,
isFlush: boolean
) {
while (this._writeBuffer.size) {
if (ctrl.desiredSize! <= 0 && !isFlush) {
break;
}
const pop = this._writeBuffer.dequeue();
ctrl.enqueue(pop);
}
start(ctrl: TransformStreamDefaultController<Uint8Array>) {
this._initWatermark = ctrl.desiredSize ?? 0;
}
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: <explanation>
@ -49,7 +88,7 @@ export class EbmlEncodeStreamTransformer
if (tag instanceof EbmlMasterTag) {
if (tag.contentLength === Number.POSITIVE_INFINITY) {
if (tag.position === EbmlTagPosition.Start) {
this.tryEnqueueToBuffer(...tag.encodeHeader());
await this.tryEnqueueToController(ctrl, ...tag.encodeHeader());
}
} else {
// biome-ignore lint/style/useCollapsedElseIf: <explanation>
@ -66,30 +105,35 @@ export class EbmlEncodeStreamTransformer
0
);
startTag.contentLength = size;
this.tryEnqueueToBuffer(...startTag.encodeHeader());
this.tryEnqueueToBuffer(...fragments);
await this.tryEnqueueToController(ctrl, ...startTag.encodeHeader());
await this.tryEnqueueToController(ctrl, ...fragments);
}
}
} else {
this.tryEnqueueToBuffer(...tag.encode());
await this.tryEnqueueToController(ctrl, ...tag.encode());
}
this.waitBufferRelease(ctrl, false);
}
flush(ctrl: TransformStreamDefaultController<Uint8Array>) {
this.waitBufferRelease(ctrl, true);
}
}
export interface EbmlStreamEncoderOptions
extends EbmlEncodeStreamTransformerOptions {}
export class EbmlStreamEncoder extends TransformStream<
EbmlTagTrait | EbmlTagType,
Uint8Array
> {
public readonly transformer: EbmlEncodeStreamTransformer;
constructor() {
const transformer = new EbmlEncodeStreamTransformer();
super(transformer);
constructor(options: EbmlStreamEncoderOptions = {}) {
const transformer = new EbmlEncodeStreamTransformer(options);
const queuingStrategy = transformer.backpressure.queuingStrategy;
const inputQueuingStrategySize =
queuingStrategy === 'count'
? (a: EbmlTagTrait | EbmlTagType) =>
a?.countQueuingSize >= 0 ? a.countQueuingSize : 1
: (a: EbmlTagTrait | EbmlTagType) =>
a?.byteLengthQueuingSize >= 0 ? a.byteLengthQueuingSize : 1;
super(transformer, { size: inputQueuingStrategySize });
this.transformer = transformer;
}
}

View File

@ -31,6 +31,10 @@ export class EbmlDataTag extends EbmlTagTrait {
});
}
override get byteLengthQueuingSize(): number {
return this.totalLength;
}
// biome-ignore lint/correctness/useYield: <explanation>
override async *decodeContentImpl(options: DecodeContentOptions) {
const controller = options.dataViewController;

View File

@ -16,14 +16,6 @@ export interface CreateEbmlMasterTagOptions
export class EbmlMasterTag extends EbmlTagTrait {
private _children: EbmlTagTrait[] = [];
get children(): EbmlTagTrait[] {
return this._children;
}
set children(value: EbmlTagTrait[]) {
this._children = value;
}
constructor(options: CreateEbmlMasterTagOptions) {
super({
...options,
@ -32,6 +24,21 @@ export class EbmlMasterTag extends EbmlTagTrait {
});
}
override get byteLengthQueuingSize(): number {
if (this.position === EbmlTagPosition.Start) {
return this.headerLength;
}
return 0;
}
get children(): EbmlTagTrait[] {
return this._children;
}
set children(value: EbmlTagTrait[]) {
this._children = value;
}
*encodeContent(): Generator<Uint8Array, void, unknown> {
for (const child of this.children) {
yield* child.encode();

View File

@ -76,6 +76,11 @@ export abstract class EbmlTagTrait {
this._endOffset = options.endOffset;
}
public abstract get byteLengthQueuingSize(): number;
public get countQueuingSize(): number {
return 1;
}
public set contentLength(value: number) {
this._contentLength = value;
}

View File

@ -28,7 +28,7 @@ async function collectTags(decoder: Decoder): Promise<EbmlTagType[]> {
return tags;
}
describe('EbmlStreamDecoder', () => {
describe('Ebml Decoder', () => {
it('should wait for more data if a tag is longer than the buffer', async () => {
const decoder = getDecoderWithNullSink();
const writer = decoder.writable.getWriter();

View File

@ -56,7 +56,9 @@ const makeEncoderTest = async (tags: EbmlTagTrait[]) => {
controller.close();
},
});
const encoder = new EbmlStreamEncoder();
const chunks: ArrayBuffer[] = [];
await new Promise<void>((resolve, reject) => {
@ -70,6 +72,9 @@ const makeEncoderTest = async (tags: EbmlTagTrait[]) => {
close() {
resolve();
},
abort: (e) => {
reject(e);
},
})
)
.catch(reject);
@ -106,16 +111,15 @@ describe('EBML Encoder', () => {
]);
});
describe('#writeTag', () => {
it('throws with an incomplete tag data', async () => {
await expect(() => makeEncoderTest([incompleteTag])).rejects.toThrow(
/should only accept embl tag but not/
);
});
it('throws with an invalid tag id', async () => {
await expect(() => makeEncoderTest([invalidTag])).rejects.toThrow(
/should only accept embl tag but not/
);
});
});
});