feat: enhance mkv model type

This commit is contained in:
master 2025-03-20 03:20:44 +08:00
parent c0d4de4d28
commit 4537190096
17 changed files with 739 additions and 334 deletions

10
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,10 @@
{
// allow autocomplete for ArkType expressions like "string | num"
"editor.quickSuggestions": {
"strings": "on"
},
// prioritize ArkType's "type" for autoimports
"typescript.preferences.autoImportSpecifierExcludeRegexes": [
"^(node:)?os$"
]
}

1
apps/mock/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
public/video-sample/huge/*

View File

@ -9,7 +9,7 @@
"preview": "rsbuild preview" "preview": "rsbuild preview"
}, },
"dependencies": { "dependencies": {
"konoebml": "0.1.0-rc.6", "konoebml": "0.1.0-rc.8",
"lit": "^3.2.1" "lit": "^3.2.1"
}, },
"devDependencies": { "devDependencies": {

View File

@ -1,11 +1,11 @@
export interface RangedVideoStream { export interface RangedStream {
controller: AbortController; controller: AbortController;
response: Response; response: Response;
stream: ReadableStream; body: ReadableStream;
totalSize?: number; totalSize?: number;
} }
export async function createRangedVideoStream( export async function createRangedStream(
url: string, url: string,
byteStart = 0, byteStart = 0,
byteEnd?: number byteEnd?: number
@ -53,7 +53,7 @@ export async function createRangedVideoStream(
return { return {
controller, controller,
response, response,
stream: body, body,
totalSize, totalSize,
}; };
} }

View File

@ -0,0 +1,60 @@
export interface RangedStream {
controller: AbortController;
response: Response;
body: ReadableStream<Uint8Array>;
totalSize?: number;
}
export async function createRangedStream(
url: string,
byteStart = 0,
byteEnd?: number
) {
const controller = new AbortController();
const signal = controller.signal;
const headers = new Headers();
headers.append(
'Range',
typeof byteEnd === 'number'
? `bytes=${byteStart}-${byteEnd}`
: `bytes=${byteStart}-`
);
const response = await fetch(url, { signal, headers });
if (!response.ok) {
throw new Error('fetch video stream failed');
}
const acceptRanges = response.headers.get('Accept-Ranges');
if (acceptRanges !== 'bytes') {
throw new Error('video server does not support byte ranges');
}
const body = response.body;
if (!(body instanceof ReadableStream)) {
throw new Error('can not get readable stream from response.body');
}
const contentRange = response.headers.get('Content-Range');
//
// Content-Range Header Syntax:
// Content-Range: <unit> <range-start>-<range-end>/<size>
// Content-Range: <unit> <range-start>-<range-end>/*
// Content-Range: <unit> */<size>
//
const totalSize = contentRange
? Number.parseInt(contentRange.split('/')[1], 10)
: undefined;
return {
controller,
response,
body,
totalSize,
};
}

View File

@ -5,4 +5,5 @@
<body> <body>
<my-element /> <my-element />
<video-pipeline-demo src="/api/static/video-sample/test.webm" /> <video-pipeline-demo src="/api/static/video-sample/test.webm" />
<!-- <video-pipeline-demo src="/api/static/video-sample/huge/animation.mkv" /> -->
</body> </body>

View File

@ -8,10 +8,11 @@ import {
type EbmlCuesTagType, type EbmlCuesTagType,
type EbmlSeekHeadTagType, type EbmlSeekHeadTagType,
type EbmlSegmentTagType, type EbmlSegmentTagType,
type EbmlClusterTagType,
} from 'konoebml'; } from 'konoebml';
import { isTagEnd } from './util'; import { isTagIdPos, simpleMasterExtractor } from './util';
import { isEqual } from 'lodash-es'; import { isEqual } from 'lodash-es';
import { type } from 'arktype';
import { TagWithArktype } from './util';
export const SEEK_ID_KAX_INFO = new Uint8Array([0x15, 0x49, 0xa9, 0x66]); export const SEEK_ID_KAX_INFO = new Uint8Array([0x15, 0x49, 0xa9, 0x66]);
export const SEEK_ID_KAX_TRACKS = new Uint8Array([0x16, 0x54, 0xae, 0x6b]); export const SEEK_ID_KAX_TRACKS = new Uint8Array([0x16, 0x54, 0xae, 0x6b]);
@ -40,8 +41,7 @@ export class EbmlSegment {
private addSeekHead(node: EbmlSeekHeadTagType) { private addSeekHead(node: EbmlSeekHeadTagType) {
this.seekHeadNode = node; this.seekHeadNode = node;
this.seekEntries = this.seekHeadNode.children this.seekEntries = this.seekHeadNode.children
.filter(isTagEnd) .filter(isTagIdPos(EbmlTagIdEnum.Seek, EbmlTagPosition.End))
.filter((c) => c.id === EbmlTagIdEnum.Seek)
.map((c) => { .map((c) => {
const seekId = c.children.find( const seekId = c.children.find(
(item) => item.id === EbmlTagIdEnum.SeekID (item) => item.id === EbmlTagIdEnum.SeekID
@ -74,7 +74,7 @@ export class EbmlSegment {
findLocalNodeBySeekPosition( findLocalNodeBySeekPosition(
seekPosition: number | undefined seekPosition: number | undefined
): EbmlTagType | undefined { ): EbmlTagType | undefined {
return Number.isSafeInteger(seekPosition) return seekPosition! >= 0
? this.metaOffsets.get(seekPosition as number) ? this.metaOffsets.get(seekPosition as number)
: undefined; : undefined;
} }
@ -104,6 +104,45 @@ export class EbmlSegment {
} }
} }
export class TrackEntry extends TagWithArktype({
id: EbmlTagIdEnum.TrackEntry,
schema: type({
trackNumber: 'number',
trackType: 'number',
trackUID: 'number',
}),
extract: simpleMasterExtractor({
[EbmlTagIdEnum.TrackNumber]: {
key: 'trackNumber',
extract: (t) => t.data as number,
},
[EbmlTagIdEnum.TrackType]: {
key: 'trackType',
extract: (t) => t.data as number,
},
[EbmlTagIdEnum.TrackUID]: {
key: 'trackUID',
extract: (t) => t.data as number,
},
}),
}) {}
const TracksSchema = type({
tracks: type.instanceOf(TrackEntry).array(),
});
export class Tracks extends TagWithArktype({
id: EbmlTagIdEnum.Tracks,
schema: TracksSchema,
extract: simpleMasterExtractor({
[EbmlTagIdEnum.TrackEntry]: {
key: 'tracks',
multi: true,
extract: TrackEntry.fromTag.bind(TrackEntry),
},
}),
}) {}
export interface EbmlSeekEntry { export interface EbmlSeekEntry {
seekId: Uint8Array; seekId: Uint8Array;
seekPosition: number; seekPosition: number;
@ -117,33 +156,59 @@ export class EbmlHead {
} }
} }
export class EbmlCluster { export class SimpleBlock extends TagWithArktype({
cluster: EbmlClusterTagType; id: EbmlTagIdEnum.SimpleBlock,
_timestamp: number; schema: type({
frame: type.instanceOf(Uint8Array),
}),
extract: (tag) => ({
frame: tag.payload,
}),
}) {}
constructor(cluster: EbmlClusterTagType) { export class Cluster extends TagWithArktype({
this.cluster = cluster; id: EbmlTagIdEnum.Cluster,
this._timestamp = cluster.children.find( schema: type({
(c) => c.id === EbmlTagIdEnum.Timecode timestamp: 'number',
)?.data as number; position: 'number?',
} prevSize: 'number?',
simpleBlock: type.instanceOf(SimpleBlock).array(),
}),
extract: simpleMasterExtractor({
[EbmlTagIdEnum.Timecode]: {
key: 'timestamp',
extract: (t) => t.data as number,
},
[EbmlTagIdEnum.PrevSize]: {
key: 'prevSize',
extract: (t) => t.data as number,
},
[EbmlTagIdEnum.SimpleBlock]: {
key: 'simpleBlock',
multi: true,
extract: SimpleBlock.fromTag.bind(SimpleBlock),
},
}),
}) {}
get timestamp(): number { export interface TrackPositions {
return this._timestamp; track: number;
} clusterPosition: number;
relativePosition?: number;
duration?: number;
} }
export class EbmlCue { export class CuePoint {
node: EbmlCuePointTagType; node: EbmlCuePointTagType;
_timestamp: number; _timestamp: number;
trackPositions: { track: number; position: number }[]; trackPositions: TrackPositions[];
get timestamp(): number { get timestamp(): number {
return this._timestamp; return this._timestamp;
} }
get position(): number { get position(): number {
return Math.max(...this.trackPositions.map((t) => t.position)); return Math.max(...this.trackPositions.map((t) => t.clusterPosition));
} }
constructor(node: EbmlCuePointTagType) { constructor(node: EbmlCuePointTagType) {
@ -151,38 +216,64 @@ export class EbmlCue {
this._timestamp = node.children.find((c) => c.id === EbmlTagIdEnum.CueTime) this._timestamp = node.children.find((c) => c.id === EbmlTagIdEnum.CueTime)
?.data as number; ?.data as number;
this.trackPositions = node.children this.trackPositions = node.children
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: <explanation>
.map((t) => { .map((t) => {
if ( if (
t.id === EbmlTagIdEnum.CueTrackPositions && t.id === EbmlTagIdEnum.CueTrackPositions &&
t.position === EbmlTagPosition.End t.position === EbmlTagPosition.End
) { ) {
const track = t.children.find((t) => t.id === EbmlTagIdEnum.CueTrack) let track!: number;
?.data as number; let clusterPosition!: number;
const position = t.children.find( let relativePosition: number | undefined;
(t) => t.id === EbmlTagIdEnum.CueClusterPosition let duration: number | undefined;
)?.data as number;
return track! >= 0 && position! >= 0 ? { track, position } : null; for (const c of t.children) {
if (c.id === EbmlTagIdEnum.CueTrack) {
track = c.data as number;
}
if (c.id === EbmlTagIdEnum.CueClusterPosition) {
clusterPosition = c.data as number;
}
if (c.id === EbmlTagIdEnum.CueRelativePosition) {
relativePosition = c.data as number;
}
if (c.id === EbmlTagIdEnum.CueDuration) {
duration = c.data as number;
}
}
if (track! >= 0 && clusterPosition! >= 0) {
return {
track: track!,
clusterPosition: clusterPosition!,
relativePosition,
duration,
} as TrackPositions;
}
throw new Error(
`Tracking positions missing track of cluster position at ${t.startOffset}`
);
} }
return null; return null;
}) })
.filter((a): a is { track: number; position: number } => !!a); .filter((a): a is TrackPositions => !!a);
} }
} }
export class EbmlCues { export class Cues extends TagWithArktype({
node: EbmlCuesTagType; id: EbmlTagIdEnum.Cues,
cues: EbmlCue[]; schema: type({
cues: type.instanceOf(CuePoint).array(),
constructor(node: EbmlCuesTagType) { }),
this.node = node; extract: simpleMasterExtractor({
this.cues = node.children [EbmlTagIdEnum.CuePoint]: {
.filter(isTagEnd) key: 'cues',
.filter((c) => c.id === EbmlTagIdEnum.CuePoint) multi: true,
.map((c) => new EbmlCue(c)); extract: (t) => new CuePoint(t),
} },
}),
findClosestCue(seekTime: number): EbmlCue | null { }) {
findClosestCue(seekTime: number): CuePoint | null {
const cues = this.cues; const cues = this.cues;
if (!cues || cues.length === 0) { if (!cues || cues.length === 0) {
return null; return null;

View File

@ -0,0 +1,327 @@
import {
type EbmlTagType,
EbmlStreamDecoder,
EbmlTagIdEnum,
EbmlTagPosition,
} from 'konoebml';
import {
Observable,
from,
switchMap,
share,
defer,
EMPTY,
of,
filter,
finalize,
isEmpty,
map,
merge,
raceWith,
reduce,
scan,
shareReplay,
take,
takeUntil,
withLatestFrom,
} from 'rxjs';
import { createRangedStream } from '@/fetch';
import { EbmlSegment, Cluster, SEEK_ID_KAX_CUES, Cues } from './model';
import { isTagIdPos } from './util';
export function createRangedEbmlStream(
url: string,
byteStart = 0,
byteEnd?: number
): Observable<{
ebml$: Observable<EbmlTagType>;
totalSize?: number;
response: Response;
body: ReadableStream<Uint8Array>;
controller: AbortController;
}> {
const stream$ = from(createRangedStream(url, byteStart, byteEnd));
return stream$.pipe(
switchMap(({ controller, body, totalSize, response }) => {
let requestCompleted = false;
const originRequest$ = new Observable<EbmlTagType>((subscriber) => {
body
.pipeThrough(
new EbmlStreamDecoder({
streamStartOffset: byteStart,
collectChild: (child) => child.id !== EbmlTagIdEnum.Cluster,
})
)
.pipeTo(
new WritableStream({
write: (tag) => subscriber.next(tag),
close: () => {
if (!requestCompleted) {
subscriber.complete();
}
},
})
)
.catch((error) => {
if (requestCompleted && error?.name === 'AbortError') {
return;
}
subscriber.error(error);
});
return () => {
requestCompleted = true;
controller.abort();
};
}).pipe(
share({
resetOnComplete: false,
resetOnError: false,
resetOnRefCountZero: true,
})
);
const ebml$ = defer(() =>
requestCompleted ? EMPTY : originRequest$
).pipe(
share({
resetOnError: false,
resetOnComplete: true,
resetOnRefCountZero: true,
})
);
return of({
ebml$,
totalSize,
response,
body,
controller,
});
})
);
}
export function createEbmlController(src: string) {
const request$ = createRangedEbmlStream(src, 0);
const controller$ = request$.pipe(
map(({ totalSize, ebml$, response, controller }) => {
const head$ = ebml$.pipe(
filter(isTagIdPos(EbmlTagIdEnum.EBML, EbmlTagPosition.End)),
take(1),
shareReplay(1)
);
console.debug(
`stream of video "${src}" created, total size is ${totalSize ?? 'unknown'}`
);
const segmentStart$ = ebml$.pipe(
filter((s) => s.position === EbmlTagPosition.Start),
filter((tag) => tag.id === EbmlTagIdEnum.Segment)
);
const segments$ = segmentStart$.pipe(
map((startTag) => {
const segment = new EbmlSegment(startTag);
const continuousReusedCluster$ = ebml$.pipe(
filter(isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.End)),
filter((s) => s.id === EbmlTagIdEnum.Cluster),
map(Cluster.fromTag.bind(Cluster))
);
const segmentEnd$ = ebml$.pipe(
filter(isTagIdPos(EbmlTagIdEnum.Segment, EbmlTagPosition.End)),
filter((tag) => tag.id === EbmlTagIdEnum.Segment),
take(1)
);
const clusterStart$ = ebml$.pipe(
filter(isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.Start)),
take(1),
shareReplay(1)
);
const meta$ = ebml$.pipe(
takeUntil(clusterStart$.pipe(raceWith(segmentEnd$))),
share({
resetOnComplete: false,
resetOnError: false,
resetOnRefCountZero: true,
})
);
const withMeta$ = meta$.pipe(
reduce((segment, meta) => {
segment.scanMeta(meta);
return segment;
}, segment),
map((segment) => {
segment.markMetaEnd();
return segment;
}),
take(1),
shareReplay(1)
);
const withRemoteCues$ = withMeta$.pipe(
switchMap((s) => {
if (s.cuesNode) {
return EMPTY;
}
const cuesStartOffset =
s.dataOffset +
(s.findSeekPositionBySeekId(SEEK_ID_KAX_CUES) ?? Number.NaN);
if (cuesStartOffset >= 0) {
return createRangedEbmlStream(src, cuesStartOffset).pipe(
switchMap((req) => req.ebml$),
filter(isTagIdPos(EbmlTagIdEnum.Cues, EbmlTagPosition.End)),
withLatestFrom(withMeta$),
map(([cues, withMeta]) => {
withMeta.cuesNode = cues;
return withMeta;
})
);
}
return EMPTY;
}),
take(1),
shareReplay(1)
);
const withLocalCues$ = withMeta$.pipe(
switchMap((s) => {
if (s.cuesNode) {
return of(s);
}
return EMPTY;
}),
shareReplay(1)
);
const withCues$ = merge(withLocalCues$, withRemoteCues$).pipe(
take(1)
);
const withoutCues$ = withCues$.pipe(
isEmpty(),
switchMap((empty) => (empty ? withMeta$ : EMPTY))
);
const seekWithoutCues = (seekTime: number): Observable<Cluster> => {
const cluster$ = continuousReusedCluster$.pipe(
isEmpty(),
switchMap((empty) => {
return empty
? clusterStart$.pipe(
switchMap((startTag) =>
createRangedEbmlStream(src, startTag.startOffset)
),
switchMap((req) => req.ebml$),
filter(
isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.End)
),
map(Cluster.fromTag.bind(Cluster))
)
: continuousReusedCluster$;
})
);
if (seekTime === 0) {
return cluster$;
}
return cluster$.pipe(
scan(
(prev, curr) =>
[prev?.[1], curr] as [
Cluster | undefined,
Cluster | undefined,
],
[undefined, undefined] as [
Cluster | undefined,
Cluster | undefined,
]
),
filter((c) => c[1]?.timestamp! > seekTime),
map((c) => c[0] ?? c[1]!)
);
};
const seekWithCues = (
cues: Cues,
seekTime: number
): Observable<Cluster> => {
if (seekTime === 0) {
return seekWithoutCues(seekTime);
}
const cuePoint = cues.findClosestCue(seekTime);
if (!cuePoint) {
return seekWithoutCues(seekTime);
}
return createRangedEbmlStream(
src,
cuePoint.position + segment.dataOffset
).pipe(
switchMap((req) => req.ebml$),
filter(isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.End)),
map(Cluster.fromTag.bind(Cluster))
);
};
const seek = (seekTime: number): Observable<Cluster> => {
if (seekTime === 0) {
const subscripton = merge(withCues$, withoutCues$).subscribe();
// if seekTime equals to 0 at start, reuse the initialize stream
return seekWithoutCues(seekTime).pipe(
finalize(() => {
subscripton.unsubscribe();
})
);
}
return merge(
withCues$.pipe(
switchMap((s) =>
seekWithCues(Cues.fromTag(s.cuesNode!), seekTime)
)
),
withoutCues$.pipe(switchMap((_) => seekWithoutCues(seekTime)))
);
};
return {
startTag,
head$,
segment,
meta$,
withMeta$,
withCues$,
withoutCues$,
seekWithCues,
seekWithoutCues,
seek,
};
})
);
return {
segments$,
head$,
totalSize,
ebml$,
controller,
response,
};
})
);
return {
controller$,
request$,
};
}

View File

@ -1,5 +1,150 @@
import { EbmlTagPosition, type EbmlTagType } from 'konoebml'; import type { Type } from 'arktype';
import type { EbmlMasterTagType, EbmlTagIdEnum, EbmlTagType } from 'konoebml';
export function isTagEnd(tag: EbmlTagType): boolean { export type InferType<T> = T extends Type<infer U> ? U : never;
return tag.position === EbmlTagPosition.End;
export interface TagWithArktypeOptions<
I extends EbmlTagType['id'],
S extends Type<any>,
> {
id: I;
schema: S;
extract: (tag: Extract<EbmlTagType, { id: I }>, schema: S) => InferType<S>;
}
export type TagWithArktypeClassInstance<
I extends EbmlTagType['id'],
S extends Type<any>,
> = InferType<S> & {
tag: Extract<EbmlTagType, { id: I }>;
};
export interface TagWithArktypeClass<
I extends EbmlTagType['id'],
S extends Type<any>,
> {
new (
tag: Extract<EbmlTagType, { id: I }>,
validatedTag: InferType<S>
): TagWithArktypeClassInstance<I, S>;
fromTag<R extends TagWithArktypeClassInstance<I, S>>(
this: new (
tag: Extract<EbmlTagType, { id: I }>,
validatedTag: InferType<S>
) => TagWithArktypeClassInstance<I, S>,
tag: Extract<EbmlTagType, { id: I }>
): R;
id: I;
schema: S;
}
export function TagWithArktype<
I extends EbmlTagType['id'],
S extends Type<any>,
>({
id,
schema,
extract,
}: TagWithArktypeOptions<I, S>): TagWithArktypeClass<I, S> {
const tagWithArktypeImpl = class TagWithArktypeImpl {
static id = id;
static schema = schema;
tag: Extract<EbmlTagType, { id: I }>;
constructor(
tag: Extract<EbmlTagType, { id: I }>,
validatedTag: InferType<S>
) {
Object.assign(this, validatedTag);
this.tag = tag;
}
static fromTag(tag: Extract<EbmlTagType, { id: I }>) {
const extractedData = extract(tag, schema);
const validatedExtractedData = schema(extractedData);
// biome-ignore lint/complexity/noThisInStatic: <explanation>
return new this(tag, validatedExtractedData);
}
};
return tagWithArktypeImpl as unknown as TagWithArktypeClass<I, S>;
}
export type PredicateIdExtract<T, K> = Extract<T, { id: K }>;
export type PredicatePositionExtract<
T extends { position: string },
P,
> = P extends T['position'] ? T : never;
export function isTagIdPos<
I extends EbmlTagIdEnum,
P extends PredicateIdExtract<EbmlTagType, I>['position'] | '*' = '*',
>(id: I, pos?: P) {
return (tag: EbmlTagType): tag is PredicateIdExtract<EbmlTagType, I> =>
tag.id === id && (pos === '*' || pos === tag.position);
}
export function isTagPos<
T extends { position: string },
P extends T['position'],
>(pos: P | '*' = '*') {
return (tag: T): tag is PredicatePositionExtract<T, P> =>
pos === '*' || pos === tag.position;
}
export type MasterChildExtractMap<T, K> = {
[id in EbmlTagIdEnum]?: K extends keyof T
?
| {
key: K;
multi: true;
extract: (
tag: Extract<EbmlTagType, { id: id }>
) => T[K] extends Array<infer U> ? U : never;
}
| {
key: K;
multi?: false;
extract: (tag: Extract<EbmlTagType, { id: id }>) => T[K];
}
: never;
};
export function simpleMasterExtractor<
T extends EbmlMasterTagType,
S extends Type<any>,
EM extends MasterChildExtractMap<InferType<S>, keyof InferType<S>>,
>(map: EM) {
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: <explanation>
return (tag: T, _schema: S): InferType<S> => {
if (!tag?.children?.length) {
return {} as unknown as InferType<S>;
}
const value = {} as Record<string, any>;
for (const c of tag.children) {
const entry = (
map as unknown as Record<
string,
{ id: number; multi: boolean; extract: (tag: any) => any }
>
)[c.id as number] as any;
if (entry?.key) {
const key = entry.key;
const item = entry.extract ? entry.extract(c) : c.data;
if (entry.multi) {
if (value[key]) {
value[key].push(item);
} else {
value[key] = [item];
}
} else {
value[key] = item;
}
}
}
return value as unknown as InferType<S>;
};
} }

View File

@ -1 +0,0 @@
export { createRangedVideoStream, type RangedVideoStream } from './fetch';

View File

@ -1,99 +1,7 @@
import { html, css, LitElement } from 'lit'; import { html, css, LitElement } from 'lit';
import { property } from 'lit/decorators.js'; import { property } from 'lit/decorators.js';
import { import { type Subscription, switchMap, take } from 'rxjs';
EbmlStreamDecoder, import { createEbmlController } from './media/mkv/reactive';
EbmlTagIdEnum,
EbmlTagPosition,
type EbmlTagType,
} from 'konoebml';
import {
EMPTY,
filter,
from,
isEmpty,
map,
merge,
mergeMap,
Observable,
of,
reduce,
scan,
share,
Subject,
type Subscription,
switchMap,
take,
takeUntil,
withLatestFrom,
} from 'rxjs';
import { createRangedVideoStream } from './media/shared';
import {
EbmlCluster,
EbmlCues,
EbmlSegment,
SEEK_ID_KAX_CUES,
} from './media/mkv/model';
import { isTagEnd } from './media/mkv/util';
export function createRangedEbmlStream(
url: string,
byteStart = 0,
byteEnd?: number
): Observable<{
ebml$: Observable<EbmlTagType>;
totalSize?: number;
response: Response;
stream: ReadableStream;
controller: AbortController;
}> {
const stream$ = from(createRangedVideoStream(url, byteStart, byteEnd));
return stream$.pipe(
mergeMap(({ controller, stream, totalSize, response }) => {
const ebml$ = new Observable<EbmlTagType>((subscriber) => {
stream
.pipeThrough(
new EbmlStreamDecoder({
streamStartOffset: byteStart,
collectChild: (child) => child.id !== EbmlTagIdEnum.Cluster,
})
)
.pipeTo(
new WritableStream({
write: (tag) => {
subscriber.next(tag);
},
close: () => {
subscriber.complete();
},
abort: (err: any) => {
subscriber.error(err);
},
})
);
return () => {
controller.abort();
};
}).pipe(
share({
connector: () => new Subject(),
resetOnComplete: false,
resetOnError: false,
resetOnRefCountZero: false,
})
);
return of({
ebml$,
totalSize,
response,
stream,
controller,
});
})
);
}
export class VideoPipelineDemo extends LitElement { export class VideoPipelineDemo extends LitElement {
@property() @property()
@ -108,182 +16,21 @@ export class VideoPipelineDemo extends LitElement {
return; return;
} }
const ebmlRequest$ = createRangedEbmlStream(this.src, 0); const { controller$ } = createEbmlController(this.src);
const ebmlInit$ = ebmlRequest$.pipe( this.subscripton = controller$
map(({ totalSize, ebml$, response, controller }) => {
const head = 1;
console.debug(
`stream of video "${this.src}" created, total size is ${totalSize ?? 'unknown'}`
);
const segmentStart$ = ebml$.pipe(
filter((s) => s.position === EbmlTagPosition.Start),
filter((tag) => tag.id === EbmlTagIdEnum.Segment)
);
const segmentEnd$ = ebml$.pipe(
filter(
(tag) =>
tag.id === EbmlTagIdEnum.Segment &&
tag.position === EbmlTagPosition.End
)
);
const segments$ = segmentStart$.pipe(
map((startTag) => {
const segment = new EbmlSegment(startTag);
const tag$ = ebml$.pipe(takeUntil(segmentEnd$));
const cluster$ = tag$.pipe(
filter(isTagEnd),
filter((tag) => tag.id === EbmlTagIdEnum.Cluster),
map((tag) => new EbmlCluster(tag))
);
const meta$ = tag$.pipe(takeUntil(cluster$));
const withMeta$ = meta$.pipe(
reduce((segment, meta) => {
segment.scanMeta(meta);
return segment;
}, segment),
map((segment) => {
segment.markMetaEnd();
return segment;
})
);
const withRemoteCues$ = withMeta$.pipe(
map((s) =>
s.cuesNode
? Number.NaN
: s.dataOffset +
(s.findSeekPositionBySeekId(SEEK_ID_KAX_CUES) ?? Number.NaN)
),
filter((cuesStartOffset) => cuesStartOffset >= 0),
switchMap((cuesStartOffset) =>
createRangedEbmlStream(this.src, cuesStartOffset).pipe(
switchMap((req) => req.ebml$)
)
),
filter(isTagEnd),
filter((tag) => tag?.id === EbmlTagIdEnum.Cues),
take(1),
withLatestFrom(withMeta$),
map(([cues, withMeta]) => {
withMeta.cuesNode = cues;
return withMeta;
}),
share()
);
const withLocalCues$ = withMeta$.pipe(filter((s) => !!s.cuesNode));
const withCues$ = merge(withRemoteCues$, withLocalCues$);
const withoutCues$ = withCues$.pipe(
isEmpty(),
switchMap((empty) => (empty ? withMeta$ : EMPTY))
);
const seekWithoutCues = (
cluster$: Observable<EbmlCluster>,
seekTime: number
): Observable<EbmlCluster> => {
if (seekTime === 0) {
return cluster$;
}
return cluster$.pipe(
scan(
(prev, curr) =>
[prev?.[1], curr] as [
EbmlCluster | undefined,
EbmlCluster | undefined,
],
[undefined, undefined] as [
EbmlCluster | undefined,
EbmlCluster | undefined,
]
),
filter((c) => c[1]?.timestamp! > seekTime),
map((c) => c[0] ?? c[1]!)
);
};
const seekWithCues = (
cues: EbmlCues,
cluster$: Observable<EbmlCluster>,
seekTime: number
): Observable<EbmlCluster> => {
if (seekTime === 0) {
return cluster$;
}
const cuePoint = cues.findClosestCue(seekTime);
if (!cuePoint) {
return seekWithoutCues(cluster$, seekTime);
}
return createRangedEbmlStream(
this.src,
cuePoint.position + segment.dataOffset
).pipe(
switchMap((req) => req.ebml$),
filter(isTagEnd),
filter((tag) => tag.id === EbmlTagIdEnum.Cluster),
map((c) => new EbmlCluster(c))
);
};
const seek = (seekTime: number): Observable<EbmlCluster> => {
return merge(
withCues$.pipe(
switchMap((s) =>
seekWithCues(new EbmlCues(s.cuesNode!), cluster$, seekTime)
)
),
withoutCues$.pipe(
switchMap((_) => seekWithoutCues(cluster$, seekTime))
)
);
};
return {
startTag,
head,
segment,
tag$,
meta$,
cluster$,
withMeta$,
withCues$,
withoutCues$,
seekWithCues,
seekWithoutCues,
seek,
};
})
);
return {
segments$,
head,
totalSize,
ebml$,
controller,
response,
};
})
);
this.subscripton = ebmlInit$
.pipe( .pipe(
switchMap(({ segments$ }) => segments$), switchMap(({ segments$ }) => segments$.pipe(take(1))),
take(1), switchMap(({ seek }) => seek(0))
switchMap(({ seek }) => seek(2000))
) )
.subscribe(console.log); .subscribe((cluster) => console.log(cluster));
const videoDecoder = new VideoDecoder({
output: (frame) => {},
error: (e) => {
e;
},
});
} }
connectedCallback(): void { connectedCallback(): void {
@ -293,6 +40,7 @@ export class VideoPipelineDemo extends LitElement {
disconnectedCallback(): void { disconnectedCallback(): void {
super.disconnectedCallback(); super.disconnectedCallback();
this.subscripton?.unsubscribe();
} }
render() { render() {

View File

@ -7,7 +7,12 @@
"experimentalDecorators": true, "experimentalDecorators": true,
"module": "ESNext", "module": "ESNext",
"moduleResolution": "bundler", "moduleResolution": "bundler",
"useDefineForClassFields": false "useDefineForClassFields": false,
"paths": {
"@/*": [
"./src/*"
]
}
}, },
"include": [ "include": [
"src" "src"

View File

@ -5,6 +5,7 @@
} }
``` ```
# ^https://konoplayer.com/api/static/*** resSpeed://1024K
^https://konoplayer.com/api*** reqHeaders://{x-forwarded.json} http://127.0.0.1:5001/api$1 ^https://konoplayer.com/api*** reqHeaders://{x-forwarded.json} http://127.0.0.1:5001/api$1
^https://konoplayer.com/*** reqHeaders://{x-forwarded.json} http://127.0.0.1:5000/$1 excludeFilter://^https://konoplayer.com/api ^https://konoplayer.com/*** reqHeaders://{x-forwarded.json} http://127.0.0.1:5000/$1 excludeFilter://^https://konoplayer.com/api
^wss://konoplayer.com/*** reqHeaders://{x-forwarded.json} ws://127.0.0.1:5000/$1 excludeFilter://^wss://konoplayer.com/api ^wss://konoplayer.com/*** reqHeaders://{x-forwarded.json} ws://127.0.0.1:5000/$1 excludeFilter://^wss://konoplayer.com/api

View File

@ -3,11 +3,6 @@
"extends": [ "extends": [
"ultracite" "ultracite"
], ],
"javascript": {
"globals": [
"Liveblocks"
]
},
"linter": { "linter": {
"rules": { "rules": {
"style": { "style": {
@ -26,12 +21,6 @@
}, },
"files": { "files": {
"ignore": [ "ignore": [
"packages/design-system/components/ui/**",
"packages/design-system/lib/**",
"packages/design-system/hooks/**",
"packages/collaboration/config.ts",
"apps/docs/**/*.json",
"apps/email/.react-email/**",
".vscode/*.json" ".vscode/*.json"
] ]
}, },

View File

@ -19,8 +19,10 @@
}, },
"dependencies": { "dependencies": {
"@types/lodash-es": "^4.17.12", "@types/lodash-es": "^4.17.12",
"arktype": "^2.1.10",
"lodash-es": "^4.17.21", "lodash-es": "^4.17.21",
"mnemonist": "^0.40.3", "mnemonist": "^0.40.3",
"rxjs": "^7.8.2" "rxjs": "^7.8.2",
"type-fest": "^4.37.0"
} }
} }

36
pnpm-lock.yaml generated
View File

@ -11,6 +11,9 @@ importers:
'@types/lodash-es': '@types/lodash-es':
specifier: ^4.17.12 specifier: ^4.17.12
version: 4.17.12 version: 4.17.12
arktype:
specifier: ^2.1.10
version: 2.1.10
lodash-es: lodash-es:
specifier: ^4.17.21 specifier: ^4.17.21
version: 4.17.21 version: 4.17.21
@ -20,6 +23,9 @@ importers:
rxjs: rxjs:
specifier: ^7.8.2 specifier: ^7.8.2
version: 7.8.2 version: 7.8.2
type-fest:
specifier: ^4.37.0
version: 4.37.0
devDependencies: devDependencies:
'@biomejs/biome': '@biomejs/biome':
specifier: 1.9.4 specifier: 1.9.4
@ -74,8 +80,8 @@ importers:
apps/playground: apps/playground:
dependencies: dependencies:
konoebml: konoebml:
specifier: 0.1.0-rc.6 specifier: 0.1.0-rc.8
version: 0.1.0-rc.6 version: 0.1.0-rc.8
lit: lit:
specifier: ^3.2.1 specifier: ^3.2.1
version: 3.2.1 version: 3.2.1
@ -129,6 +135,12 @@ packages:
resolution: {integrity: sha512-cGGqUGqBXIGJkeL65l70y0BflDAu/0Zi/ohbYat3hvadFfumRJnVElVfJ59JtWO7FfKQjxcwCVTyuQ/tevX/9A==} resolution: {integrity: sha512-cGGqUGqBXIGJkeL65l70y0BflDAu/0Zi/ohbYat3hvadFfumRJnVElVfJ59JtWO7FfKQjxcwCVTyuQ/tevX/9A==}
engines: {node: ^18.19.1 || ^20.11.1 || >=22.0.0, npm: ^6.11.0 || ^7.5.6 || >=8.0.0, yarn: '>= 1.13.0'} engines: {node: ^18.19.1 || ^20.11.1 || >=22.0.0, npm: ^6.11.0 || ^7.5.6 || >=8.0.0, yarn: '>= 1.13.0'}
'@ark/schema@0.45.0':
resolution: {integrity: sha512-3XlMWkZbEjh0YsF92vnnRNCWNRNhRKDTf6XhugyCXH0YRFuM+w1vFLDbB2JLfZloEd7i5cbqsLaDLzyBZbPrSg==}
'@ark/util@0.45.0':
resolution: {integrity: sha512-Z1gHEGbpPzLtPmYb932t2B++6YonlUi1Fa14IQ4vhsGMWhd81Mi1miUmdZXW4fNI/wg1saT7H2/5cAuONgTXhg==}
'@babel/code-frame@7.26.2': '@babel/code-frame@7.26.2':
resolution: {integrity: sha512-RJlIHRueQgwWitWgF8OdFYGZX328Ax5BCemNGlqHfplnRT9ESi8JkFlvaVYbS+UubVY6dpv87Fs2u5M29iNFVQ==} resolution: {integrity: sha512-RJlIHRueQgwWitWgF8OdFYGZX328Ax5BCemNGlqHfplnRT9ESi8JkFlvaVYbS+UubVY6dpv87Fs2u5M29iNFVQ==}
engines: {node: '>=6.9.0'} engines: {node: '>=6.9.0'}
@ -1095,6 +1107,9 @@ packages:
argparse@2.0.1: argparse@2.0.1:
resolution: {integrity: sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==} resolution: {integrity: sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==}
arktype@2.1.10:
resolution: {integrity: sha512-KqbrzI9qIGrQUClifyS1HpUp/oTSRtGDvnMKzwg2TAvxRpynY1mn/ubXaxAAdGPOM8V3pBqwb01Z6TcXqhBxzQ==}
array-flatten@1.1.1: array-flatten@1.1.1:
resolution: {integrity: sha512-PCVAQswWemu6UdxsDFFX/+gVeYqKAod3D3UVm91jHwynguOwAvYPhx8nNlM++NqRcK6CxxpUafjmhIdKiHibqg==} resolution: {integrity: sha512-PCVAQswWemu6UdxsDFFX/+gVeYqKAod3D3UVm91jHwynguOwAvYPhx8nNlM++NqRcK6CxxpUafjmhIdKiHibqg==}
@ -1838,8 +1853,8 @@ packages:
resolution: {integrity: sha512-dcS1ul+9tmeD95T+x28/ehLgd9mENa3LsvDTtzm3vyBEO7RPptvAD+t44WVXaUjTBRcrpFeFlC8WCruUR456hw==} resolution: {integrity: sha512-dcS1ul+9tmeD95T+x28/ehLgd9mENa3LsvDTtzm3vyBEO7RPptvAD+t44WVXaUjTBRcrpFeFlC8WCruUR456hw==}
engines: {node: '>=0.10.0'} engines: {node: '>=0.10.0'}
konoebml@0.1.0-rc.6: konoebml@0.1.0-rc.8:
resolution: {integrity: sha512-PM81HV0OVG0eGo+195azreUtp08FO9vLLrIvrUHbsSQHL/wq7TySU/6AaLm31e3ABKsJ65gpsQr6lcJ5xd8y3Q==} resolution: {integrity: sha512-fR4DZqCskLKxGBMc58gpOOzajFrfu9hQC7WZd8yGiIxLVhDkzBnihXqlsWJU6Qw77ukMOGGkbeM2uqQyv5dO3w==}
engines: {node: '>= 18.0.0'} engines: {node: '>= 18.0.0'}
lines-and-columns@1.2.4: lines-and-columns@1.2.4:
@ -2750,6 +2765,12 @@ snapshots:
transitivePeerDependencies: transitivePeerDependencies:
- chokidar - chokidar
'@ark/schema@0.45.0':
dependencies:
'@ark/util': 0.45.0
'@ark/util@0.45.0': {}
'@babel/code-frame@7.26.2': '@babel/code-frame@7.26.2':
dependencies: dependencies:
'@babel/helper-validator-identifier': 7.25.9 '@babel/helper-validator-identifier': 7.25.9
@ -3628,6 +3649,11 @@ snapshots:
argparse@2.0.1: {} argparse@2.0.1: {}
arktype@2.1.10:
dependencies:
'@ark/schema': 0.45.0
'@ark/util': 0.45.0
array-flatten@1.1.1: {} array-flatten@1.1.1: {}
array-timsort@1.0.3: {} array-timsort@1.0.3: {}
@ -4436,7 +4462,7 @@ snapshots:
kind-of@6.0.3: {} kind-of@6.0.3: {}
konoebml@0.1.0-rc.6: konoebml@0.1.0-rc.8:
dependencies: dependencies:
mnemonist: 0.40.3 mnemonist: 0.40.3
type-fest: 4.37.0 type-fest: 4.37.0