From 45371900960598a4798072e2c9cd3aa56e7075e8 Mon Sep 17 00:00:00 2001 From: lonelyhentxi Date: Thu, 20 Mar 2025 03:20:44 +0800 Subject: [PATCH] feat: enhance mkv model type --- .vscode/settings.json | 10 + apps/mock/.gitignore | 1 + apps/playground/package.json | 2 +- .../src/{media/shared => }/fetch.ts | 8 +- apps/playground/src/fetch/index.ts | 60 ++++ apps/playground/src/index.html | 1 + apps/playground/src/media/mkv/model.ts | 171 ++++++--- apps/playground/src/media/mkv/reactive.ts | 327 ++++++++++++++++++ apps/playground/src/media/mkv/util.ts | 151 +++++++- apps/playground/src/media/shared/index.ts | 1 - apps/playground/src/utils/types.ts | 0 apps/playground/src/video-pipeline-demo.ts | 282 +-------------- apps/playground/tsconfig.json | 7 +- apps/proxy/.whistle/rules/files/0.konoplayer | 1 + biome.jsonc | 11 - package.json | 4 +- pnpm-lock.yaml | 36 +- 17 files changed, 739 insertions(+), 334 deletions(-) create mode 100644 .vscode/settings.json create mode 100644 apps/mock/.gitignore rename apps/playground/src/{media/shared => }/fetch.ts (90%) create mode 100644 apps/playground/src/fetch/index.ts create mode 100644 apps/playground/src/media/mkv/reactive.ts delete mode 100644 apps/playground/src/media/shared/index.ts delete mode 100644 apps/playground/src/utils/types.ts diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..b2539e9 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,10 @@ +{ + // allow autocomplete for ArkType expressions like "string | num" + "editor.quickSuggestions": { + "strings": "on" + }, + // prioritize ArkType's "type" for autoimports + "typescript.preferences.autoImportSpecifierExcludeRegexes": [ + "^(node:)?os$" + ] +} \ No newline at end of file diff --git a/apps/mock/.gitignore b/apps/mock/.gitignore new file mode 100644 index 0000000..1eed71a --- /dev/null +++ b/apps/mock/.gitignore @@ -0,0 +1 @@ +public/video-sample/huge/* \ No newline at end of file diff --git a/apps/playground/package.json b/apps/playground/package.json index 730bed2..9200978 100644 --- a/apps/playground/package.json +++ b/apps/playground/package.json @@ -9,7 +9,7 @@ "preview": "rsbuild preview" }, "dependencies": { - "konoebml": "0.1.0-rc.6", + "konoebml": "0.1.0-rc.8", "lit": "^3.2.1" }, "devDependencies": { diff --git a/apps/playground/src/media/shared/fetch.ts b/apps/playground/src/fetch.ts similarity index 90% rename from apps/playground/src/media/shared/fetch.ts rename to apps/playground/src/fetch.ts index 43dc653..4570150 100644 --- a/apps/playground/src/media/shared/fetch.ts +++ b/apps/playground/src/fetch.ts @@ -1,11 +1,11 @@ -export interface RangedVideoStream { +export interface RangedStream { controller: AbortController; response: Response; - stream: ReadableStream; + body: ReadableStream; totalSize?: number; } -export async function createRangedVideoStream( +export async function createRangedStream( url: string, byteStart = 0, byteEnd?: number @@ -53,7 +53,7 @@ export async function createRangedVideoStream( return { controller, response, - stream: body, + body, totalSize, }; } diff --git a/apps/playground/src/fetch/index.ts b/apps/playground/src/fetch/index.ts new file mode 100644 index 0000000..ebe4c51 --- /dev/null +++ b/apps/playground/src/fetch/index.ts @@ -0,0 +1,60 @@ +export interface RangedStream { + controller: AbortController; + response: Response; + body: ReadableStream; + totalSize?: number; + } + + export async function createRangedStream( + url: string, + byteStart = 0, + byteEnd?: number + ) { + const controller = new AbortController(); + const signal = controller.signal; + const headers = new Headers(); + headers.append( + 'Range', + typeof byteEnd === 'number' + ? `bytes=${byteStart}-${byteEnd}` + : `bytes=${byteStart}-` + ); + + const response = await fetch(url, { signal, headers }); + + if (!response.ok) { + throw new Error('fetch video stream failed'); + } + + const acceptRanges = response.headers.get('Accept-Ranges'); + + if (acceptRanges !== 'bytes') { + throw new Error('video server does not support byte ranges'); + } + + const body = response.body; + + if (!(body instanceof ReadableStream)) { + throw new Error('can not get readable stream from response.body'); + } + + const contentRange = response.headers.get('Content-Range'); + + // + // Content-Range Header Syntax: + // Content-Range: -/ + // Content-Range: -/* + // Content-Range: */ + // + const totalSize = contentRange + ? Number.parseInt(contentRange.split('/')[1], 10) + : undefined; + + return { + controller, + response, + body, + totalSize, + }; + } + \ No newline at end of file diff --git a/apps/playground/src/index.html b/apps/playground/src/index.html index 819ba76..8a54361 100644 --- a/apps/playground/src/index.html +++ b/apps/playground/src/index.html @@ -5,4 +5,5 @@ + \ No newline at end of file diff --git a/apps/playground/src/media/mkv/model.ts b/apps/playground/src/media/mkv/model.ts index c78106a..417ad8d 100644 --- a/apps/playground/src/media/mkv/model.ts +++ b/apps/playground/src/media/mkv/model.ts @@ -8,10 +8,11 @@ import { type EbmlCuesTagType, type EbmlSeekHeadTagType, type EbmlSegmentTagType, - type EbmlClusterTagType, } from 'konoebml'; -import { isTagEnd } from './util'; +import { isTagIdPos, simpleMasterExtractor } from './util'; import { isEqual } from 'lodash-es'; +import { type } from 'arktype'; +import { TagWithArktype } from './util'; export const SEEK_ID_KAX_INFO = new Uint8Array([0x15, 0x49, 0xa9, 0x66]); export const SEEK_ID_KAX_TRACKS = new Uint8Array([0x16, 0x54, 0xae, 0x6b]); @@ -40,8 +41,7 @@ export class EbmlSegment { private addSeekHead(node: EbmlSeekHeadTagType) { this.seekHeadNode = node; this.seekEntries = this.seekHeadNode.children - .filter(isTagEnd) - .filter((c) => c.id === EbmlTagIdEnum.Seek) + .filter(isTagIdPos(EbmlTagIdEnum.Seek, EbmlTagPosition.End)) .map((c) => { const seekId = c.children.find( (item) => item.id === EbmlTagIdEnum.SeekID @@ -74,7 +74,7 @@ export class EbmlSegment { findLocalNodeBySeekPosition( seekPosition: number | undefined ): EbmlTagType | undefined { - return Number.isSafeInteger(seekPosition) + return seekPosition! >= 0 ? this.metaOffsets.get(seekPosition as number) : undefined; } @@ -104,6 +104,45 @@ export class EbmlSegment { } } +export class TrackEntry extends TagWithArktype({ + id: EbmlTagIdEnum.TrackEntry, + schema: type({ + trackNumber: 'number', + trackType: 'number', + trackUID: 'number', + }), + extract: simpleMasterExtractor({ + [EbmlTagIdEnum.TrackNumber]: { + key: 'trackNumber', + extract: (t) => t.data as number, + }, + [EbmlTagIdEnum.TrackType]: { + key: 'trackType', + extract: (t) => t.data as number, + }, + [EbmlTagIdEnum.TrackUID]: { + key: 'trackUID', + extract: (t) => t.data as number, + }, + }), +}) {} + +const TracksSchema = type({ + tracks: type.instanceOf(TrackEntry).array(), +}); + +export class Tracks extends TagWithArktype({ + id: EbmlTagIdEnum.Tracks, + schema: TracksSchema, + extract: simpleMasterExtractor({ + [EbmlTagIdEnum.TrackEntry]: { + key: 'tracks', + multi: true, + extract: TrackEntry.fromTag.bind(TrackEntry), + }, + }), +}) {} + export interface EbmlSeekEntry { seekId: Uint8Array; seekPosition: number; @@ -117,33 +156,59 @@ export class EbmlHead { } } -export class EbmlCluster { - cluster: EbmlClusterTagType; - _timestamp: number; +export class SimpleBlock extends TagWithArktype({ + id: EbmlTagIdEnum.SimpleBlock, + schema: type({ + frame: type.instanceOf(Uint8Array), + }), + extract: (tag) => ({ + frame: tag.payload, + }), +}) {} - constructor(cluster: EbmlClusterTagType) { - this.cluster = cluster; - this._timestamp = cluster.children.find( - (c) => c.id === EbmlTagIdEnum.Timecode - )?.data as number; - } +export class Cluster extends TagWithArktype({ + id: EbmlTagIdEnum.Cluster, + schema: type({ + timestamp: 'number', + position: 'number?', + prevSize: 'number?', + simpleBlock: type.instanceOf(SimpleBlock).array(), + }), + extract: simpleMasterExtractor({ + [EbmlTagIdEnum.Timecode]: { + key: 'timestamp', + extract: (t) => t.data as number, + }, + [EbmlTagIdEnum.PrevSize]: { + key: 'prevSize', + extract: (t) => t.data as number, + }, + [EbmlTagIdEnum.SimpleBlock]: { + key: 'simpleBlock', + multi: true, + extract: SimpleBlock.fromTag.bind(SimpleBlock), + }, + }), +}) {} - get timestamp(): number { - return this._timestamp; - } +export interface TrackPositions { + track: number; + clusterPosition: number; + relativePosition?: number; + duration?: number; } -export class EbmlCue { +export class CuePoint { node: EbmlCuePointTagType; _timestamp: number; - trackPositions: { track: number; position: number }[]; + trackPositions: TrackPositions[]; get timestamp(): number { return this._timestamp; } get position(): number { - return Math.max(...this.trackPositions.map((t) => t.position)); + return Math.max(...this.trackPositions.map((t) => t.clusterPosition)); } constructor(node: EbmlCuePointTagType) { @@ -151,38 +216,64 @@ export class EbmlCue { this._timestamp = node.children.find((c) => c.id === EbmlTagIdEnum.CueTime) ?.data as number; this.trackPositions = node.children + // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: .map((t) => { if ( t.id === EbmlTagIdEnum.CueTrackPositions && t.position === EbmlTagPosition.End ) { - const track = t.children.find((t) => t.id === EbmlTagIdEnum.CueTrack) - ?.data as number; - const position = t.children.find( - (t) => t.id === EbmlTagIdEnum.CueClusterPosition - )?.data as number; + let track!: number; + let clusterPosition!: number; + let relativePosition: number | undefined; + let duration: number | undefined; - return track! >= 0 && position! >= 0 ? { track, position } : null; + for (const c of t.children) { + if (c.id === EbmlTagIdEnum.CueTrack) { + track = c.data as number; + } + if (c.id === EbmlTagIdEnum.CueClusterPosition) { + clusterPosition = c.data as number; + } + if (c.id === EbmlTagIdEnum.CueRelativePosition) { + relativePosition = c.data as number; + } + if (c.id === EbmlTagIdEnum.CueDuration) { + duration = c.data as number; + } + } + + if (track! >= 0 && clusterPosition! >= 0) { + return { + track: track!, + clusterPosition: clusterPosition!, + relativePosition, + duration, + } as TrackPositions; + } + throw new Error( + `Tracking positions missing track of cluster position at ${t.startOffset}` + ); } return null; }) - .filter((a): a is { track: number; position: number } => !!a); + .filter((a): a is TrackPositions => !!a); } } -export class EbmlCues { - node: EbmlCuesTagType; - cues: EbmlCue[]; - - constructor(node: EbmlCuesTagType) { - this.node = node; - this.cues = node.children - .filter(isTagEnd) - .filter((c) => c.id === EbmlTagIdEnum.CuePoint) - .map((c) => new EbmlCue(c)); - } - - findClosestCue(seekTime: number): EbmlCue | null { +export class Cues extends TagWithArktype({ + id: EbmlTagIdEnum.Cues, + schema: type({ + cues: type.instanceOf(CuePoint).array(), + }), + extract: simpleMasterExtractor({ + [EbmlTagIdEnum.CuePoint]: { + key: 'cues', + multi: true, + extract: (t) => new CuePoint(t), + }, + }), +}) { + findClosestCue(seekTime: number): CuePoint | null { const cues = this.cues; if (!cues || cues.length === 0) { return null; diff --git a/apps/playground/src/media/mkv/reactive.ts b/apps/playground/src/media/mkv/reactive.ts new file mode 100644 index 0000000..53805fd --- /dev/null +++ b/apps/playground/src/media/mkv/reactive.ts @@ -0,0 +1,327 @@ +import { + type EbmlTagType, + EbmlStreamDecoder, + EbmlTagIdEnum, + EbmlTagPosition, +} from 'konoebml'; +import { + Observable, + from, + switchMap, + share, + defer, + EMPTY, + of, + filter, + finalize, + isEmpty, + map, + merge, + raceWith, + reduce, + scan, + shareReplay, + take, + takeUntil, + withLatestFrom, +} from 'rxjs'; +import { createRangedStream } from '@/fetch'; +import { EbmlSegment, Cluster, SEEK_ID_KAX_CUES, Cues } from './model'; +import { isTagIdPos } from './util'; + +export function createRangedEbmlStream( + url: string, + byteStart = 0, + byteEnd?: number +): Observable<{ + ebml$: Observable; + totalSize?: number; + response: Response; + body: ReadableStream; + controller: AbortController; +}> { + const stream$ = from(createRangedStream(url, byteStart, byteEnd)); + + return stream$.pipe( + switchMap(({ controller, body, totalSize, response }) => { + let requestCompleted = false; + const originRequest$ = new Observable((subscriber) => { + body + .pipeThrough( + new EbmlStreamDecoder({ + streamStartOffset: byteStart, + collectChild: (child) => child.id !== EbmlTagIdEnum.Cluster, + }) + ) + .pipeTo( + new WritableStream({ + write: (tag) => subscriber.next(tag), + close: () => { + if (!requestCompleted) { + subscriber.complete(); + } + }, + }) + ) + .catch((error) => { + if (requestCompleted && error?.name === 'AbortError') { + return; + } + subscriber.error(error); + }); + + return () => { + requestCompleted = true; + controller.abort(); + }; + }).pipe( + share({ + resetOnComplete: false, + resetOnError: false, + resetOnRefCountZero: true, + }) + ); + + const ebml$ = defer(() => + requestCompleted ? EMPTY : originRequest$ + ).pipe( + share({ + resetOnError: false, + resetOnComplete: true, + resetOnRefCountZero: true, + }) + ); + + return of({ + ebml$, + totalSize, + response, + body, + controller, + }); + }) + ); +} +export function createEbmlController(src: string) { + const request$ = createRangedEbmlStream(src, 0); + + const controller$ = request$.pipe( + map(({ totalSize, ebml$, response, controller }) => { + const head$ = ebml$.pipe( + filter(isTagIdPos(EbmlTagIdEnum.EBML, EbmlTagPosition.End)), + take(1), + shareReplay(1) + ); + + console.debug( + `stream of video "${src}" created, total size is ${totalSize ?? 'unknown'}` + ); + + const segmentStart$ = ebml$.pipe( + filter((s) => s.position === EbmlTagPosition.Start), + filter((tag) => tag.id === EbmlTagIdEnum.Segment) + ); + + const segments$ = segmentStart$.pipe( + map((startTag) => { + const segment = new EbmlSegment(startTag); + + const continuousReusedCluster$ = ebml$.pipe( + filter(isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.End)), + filter((s) => s.id === EbmlTagIdEnum.Cluster), + map(Cluster.fromTag.bind(Cluster)) + ); + + const segmentEnd$ = ebml$.pipe( + filter(isTagIdPos(EbmlTagIdEnum.Segment, EbmlTagPosition.End)), + filter((tag) => tag.id === EbmlTagIdEnum.Segment), + take(1) + ); + + const clusterStart$ = ebml$.pipe( + filter(isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.Start)), + take(1), + shareReplay(1) + ); + + const meta$ = ebml$.pipe( + takeUntil(clusterStart$.pipe(raceWith(segmentEnd$))), + share({ + resetOnComplete: false, + resetOnError: false, + resetOnRefCountZero: true, + }) + ); + + const withMeta$ = meta$.pipe( + reduce((segment, meta) => { + segment.scanMeta(meta); + return segment; + }, segment), + map((segment) => { + segment.markMetaEnd(); + return segment; + }), + take(1), + shareReplay(1) + ); + + const withRemoteCues$ = withMeta$.pipe( + switchMap((s) => { + if (s.cuesNode) { + return EMPTY; + } + const cuesStartOffset = + s.dataOffset + + (s.findSeekPositionBySeekId(SEEK_ID_KAX_CUES) ?? Number.NaN); + if (cuesStartOffset >= 0) { + return createRangedEbmlStream(src, cuesStartOffset).pipe( + switchMap((req) => req.ebml$), + filter(isTagIdPos(EbmlTagIdEnum.Cues, EbmlTagPosition.End)), + withLatestFrom(withMeta$), + map(([cues, withMeta]) => { + withMeta.cuesNode = cues; + return withMeta; + }) + ); + } + return EMPTY; + }), + take(1), + shareReplay(1) + ); + + const withLocalCues$ = withMeta$.pipe( + switchMap((s) => { + if (s.cuesNode) { + return of(s); + } + return EMPTY; + }), + shareReplay(1) + ); + + const withCues$ = merge(withLocalCues$, withRemoteCues$).pipe( + take(1) + ); + + const withoutCues$ = withCues$.pipe( + isEmpty(), + switchMap((empty) => (empty ? withMeta$ : EMPTY)) + ); + + const seekWithoutCues = (seekTime: number): Observable => { + const cluster$ = continuousReusedCluster$.pipe( + isEmpty(), + switchMap((empty) => { + return empty + ? clusterStart$.pipe( + switchMap((startTag) => + createRangedEbmlStream(src, startTag.startOffset) + ), + switchMap((req) => req.ebml$), + filter( + isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.End) + ), + map(Cluster.fromTag.bind(Cluster)) + ) + : continuousReusedCluster$; + }) + ); + if (seekTime === 0) { + return cluster$; + } + + return cluster$.pipe( + scan( + (prev, curr) => + [prev?.[1], curr] as [ + Cluster | undefined, + Cluster | undefined, + ], + [undefined, undefined] as [ + Cluster | undefined, + Cluster | undefined, + ] + ), + filter((c) => c[1]?.timestamp! > seekTime), + map((c) => c[0] ?? c[1]!) + ); + }; + + const seekWithCues = ( + cues: Cues, + seekTime: number + ): Observable => { + if (seekTime === 0) { + return seekWithoutCues(seekTime); + } + + const cuePoint = cues.findClosestCue(seekTime); + + if (!cuePoint) { + return seekWithoutCues(seekTime); + } + + return createRangedEbmlStream( + src, + cuePoint.position + segment.dataOffset + ).pipe( + switchMap((req) => req.ebml$), + filter(isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.End)), + map(Cluster.fromTag.bind(Cluster)) + ); + }; + + const seek = (seekTime: number): Observable => { + if (seekTime === 0) { + const subscripton = merge(withCues$, withoutCues$).subscribe(); + + // if seekTime equals to 0 at start, reuse the initialize stream + return seekWithoutCues(seekTime).pipe( + finalize(() => { + subscripton.unsubscribe(); + }) + ); + } + return merge( + withCues$.pipe( + switchMap((s) => + seekWithCues(Cues.fromTag(s.cuesNode!), seekTime) + ) + ), + withoutCues$.pipe(switchMap((_) => seekWithoutCues(seekTime))) + ); + }; + + return { + startTag, + head$, + segment, + meta$, + withMeta$, + withCues$, + withoutCues$, + seekWithCues, + seekWithoutCues, + seek, + }; + }) + ); + + return { + segments$, + head$, + totalSize, + ebml$, + controller, + response, + }; + }) + ); + + return { + controller$, + request$, + }; +} diff --git a/apps/playground/src/media/mkv/util.ts b/apps/playground/src/media/mkv/util.ts index aebf5d6..6a53acf 100644 --- a/apps/playground/src/media/mkv/util.ts +++ b/apps/playground/src/media/mkv/util.ts @@ -1,5 +1,150 @@ -import { EbmlTagPosition, type EbmlTagType } from 'konoebml'; +import type { Type } from 'arktype'; +import type { EbmlMasterTagType, EbmlTagIdEnum, EbmlTagType } from 'konoebml'; -export function isTagEnd(tag: EbmlTagType): boolean { - return tag.position === EbmlTagPosition.End; +export type InferType = T extends Type ? U : never; + +export interface TagWithArktypeOptions< + I extends EbmlTagType['id'], + S extends Type, +> { + id: I; + schema: S; + extract: (tag: Extract, schema: S) => InferType; +} + +export type TagWithArktypeClassInstance< + I extends EbmlTagType['id'], + S extends Type, +> = InferType & { + tag: Extract; +}; + +export interface TagWithArktypeClass< + I extends EbmlTagType['id'], + S extends Type, +> { + new ( + tag: Extract, + validatedTag: InferType + ): TagWithArktypeClassInstance; + + fromTag>( + this: new ( + tag: Extract, + validatedTag: InferType + ) => TagWithArktypeClassInstance, + tag: Extract + ): R; + + id: I; + schema: S; +} + +export function TagWithArktype< + I extends EbmlTagType['id'], + S extends Type, +>({ + id, + schema, + extract, +}: TagWithArktypeOptions): TagWithArktypeClass { + const tagWithArktypeImpl = class TagWithArktypeImpl { + static id = id; + static schema = schema; + + tag: Extract; + + constructor( + tag: Extract, + validatedTag: InferType + ) { + Object.assign(this, validatedTag); + this.tag = tag; + } + + static fromTag(tag: Extract) { + const extractedData = extract(tag, schema); + const validatedExtractedData = schema(extractedData); + // biome-ignore lint/complexity/noThisInStatic: + return new this(tag, validatedExtractedData); + } + }; + + return tagWithArktypeImpl as unknown as TagWithArktypeClass; +} + +export type PredicateIdExtract = Extract; + +export type PredicatePositionExtract< + T extends { position: string }, + P, +> = P extends T['position'] ? T : never; + +export function isTagIdPos< + I extends EbmlTagIdEnum, + P extends PredicateIdExtract['position'] | '*' = '*', +>(id: I, pos?: P) { + return (tag: EbmlTagType): tag is PredicateIdExtract => + tag.id === id && (pos === '*' || pos === tag.position); +} + +export function isTagPos< + T extends { position: string }, + P extends T['position'], +>(pos: P | '*' = '*') { + return (tag: T): tag is PredicatePositionExtract => + pos === '*' || pos === tag.position; +} +export type MasterChildExtractMap = { + [id in EbmlTagIdEnum]?: K extends keyof T + ? + | { + key: K; + multi: true; + extract: ( + tag: Extract + ) => T[K] extends Array ? U : never; + } + | { + key: K; + multi?: false; + extract: (tag: Extract) => T[K]; + } + : never; +}; + +export function simpleMasterExtractor< + T extends EbmlMasterTagType, + S extends Type, + EM extends MasterChildExtractMap, keyof InferType>, +>(map: EM) { + // biome-ignore lint/complexity/noExcessiveCognitiveComplexity: + return (tag: T, _schema: S): InferType => { + if (!tag?.children?.length) { + return {} as unknown as InferType; + } + const value = {} as Record; + for (const c of tag.children) { + const entry = ( + map as unknown as Record< + string, + { id: number; multi: boolean; extract: (tag: any) => any } + > + )[c.id as number] as any; + if (entry?.key) { + const key = entry.key; + const item = entry.extract ? entry.extract(c) : c.data; + if (entry.multi) { + if (value[key]) { + value[key].push(item); + } else { + value[key] = [item]; + } + } else { + value[key] = item; + } + } + } + return value as unknown as InferType; + }; } diff --git a/apps/playground/src/media/shared/index.ts b/apps/playground/src/media/shared/index.ts deleted file mode 100644 index 29469df..0000000 --- a/apps/playground/src/media/shared/index.ts +++ /dev/null @@ -1 +0,0 @@ -export { createRangedVideoStream, type RangedVideoStream } from './fetch'; diff --git a/apps/playground/src/utils/types.ts b/apps/playground/src/utils/types.ts deleted file mode 100644 index e69de29..0000000 diff --git a/apps/playground/src/video-pipeline-demo.ts b/apps/playground/src/video-pipeline-demo.ts index 0209649..e6e2bb5 100644 --- a/apps/playground/src/video-pipeline-demo.ts +++ b/apps/playground/src/video-pipeline-demo.ts @@ -1,99 +1,7 @@ import { html, css, LitElement } from 'lit'; import { property } from 'lit/decorators.js'; -import { - EbmlStreamDecoder, - EbmlTagIdEnum, - EbmlTagPosition, - type EbmlTagType, -} from 'konoebml'; -import { - EMPTY, - filter, - from, - isEmpty, - map, - merge, - mergeMap, - Observable, - of, - reduce, - scan, - share, - Subject, - type Subscription, - switchMap, - take, - takeUntil, - withLatestFrom, -} from 'rxjs'; -import { createRangedVideoStream } from './media/shared'; -import { - EbmlCluster, - EbmlCues, - EbmlSegment, - SEEK_ID_KAX_CUES, -} from './media/mkv/model'; -import { isTagEnd } from './media/mkv/util'; - -export function createRangedEbmlStream( - url: string, - byteStart = 0, - byteEnd?: number -): Observable<{ - ebml$: Observable; - totalSize?: number; - response: Response; - stream: ReadableStream; - controller: AbortController; -}> { - const stream$ = from(createRangedVideoStream(url, byteStart, byteEnd)); - - return stream$.pipe( - mergeMap(({ controller, stream, totalSize, response }) => { - const ebml$ = new Observable((subscriber) => { - stream - .pipeThrough( - new EbmlStreamDecoder({ - streamStartOffset: byteStart, - collectChild: (child) => child.id !== EbmlTagIdEnum.Cluster, - }) - ) - .pipeTo( - new WritableStream({ - write: (tag) => { - subscriber.next(tag); - }, - close: () => { - subscriber.complete(); - }, - abort: (err: any) => { - subscriber.error(err); - }, - }) - ); - - return () => { - controller.abort(); - }; - }).pipe( - share({ - connector: () => new Subject(), - resetOnComplete: false, - resetOnError: false, - resetOnRefCountZero: false, - }) - ); - - return of({ - ebml$, - totalSize, - response, - stream, - controller, - }); - }) - ); -} +import { type Subscription, switchMap, take } from 'rxjs'; +import { createEbmlController } from './media/mkv/reactive'; export class VideoPipelineDemo extends LitElement { @property() @@ -108,182 +16,21 @@ export class VideoPipelineDemo extends LitElement { return; } - const ebmlRequest$ = createRangedEbmlStream(this.src, 0); + const { controller$ } = createEbmlController(this.src); - const ebmlInit$ = ebmlRequest$.pipe( - map(({ totalSize, ebml$, response, controller }) => { - const head = 1; - console.debug( - `stream of video "${this.src}" created, total size is ${totalSize ?? 'unknown'}` - ); - - const segmentStart$ = ebml$.pipe( - filter((s) => s.position === EbmlTagPosition.Start), - filter((tag) => tag.id === EbmlTagIdEnum.Segment) - ); - - const segmentEnd$ = ebml$.pipe( - filter( - (tag) => - tag.id === EbmlTagIdEnum.Segment && - tag.position === EbmlTagPosition.End - ) - ); - - const segments$ = segmentStart$.pipe( - map((startTag) => { - const segment = new EbmlSegment(startTag); - const tag$ = ebml$.pipe(takeUntil(segmentEnd$)); - const cluster$ = tag$.pipe( - filter(isTagEnd), - filter((tag) => tag.id === EbmlTagIdEnum.Cluster), - map((tag) => new EbmlCluster(tag)) - ); - const meta$ = tag$.pipe(takeUntil(cluster$)); - - const withMeta$ = meta$.pipe( - reduce((segment, meta) => { - segment.scanMeta(meta); - return segment; - }, segment), - map((segment) => { - segment.markMetaEnd(); - return segment; - }) - ); - - const withRemoteCues$ = withMeta$.pipe( - map((s) => - s.cuesNode - ? Number.NaN - : s.dataOffset + - (s.findSeekPositionBySeekId(SEEK_ID_KAX_CUES) ?? Number.NaN) - ), - filter((cuesStartOffset) => cuesStartOffset >= 0), - switchMap((cuesStartOffset) => - createRangedEbmlStream(this.src, cuesStartOffset).pipe( - switchMap((req) => req.ebml$) - ) - ), - filter(isTagEnd), - filter((tag) => tag?.id === EbmlTagIdEnum.Cues), - take(1), - withLatestFrom(withMeta$), - map(([cues, withMeta]) => { - withMeta.cuesNode = cues; - return withMeta; - }), - share() - ); - - const withLocalCues$ = withMeta$.pipe(filter((s) => !!s.cuesNode)); - - const withCues$ = merge(withRemoteCues$, withLocalCues$); - - const withoutCues$ = withCues$.pipe( - isEmpty(), - switchMap((empty) => (empty ? withMeta$ : EMPTY)) - ); - - const seekWithoutCues = ( - cluster$: Observable, - seekTime: number - ): Observable => { - if (seekTime === 0) { - return cluster$; - } - - return cluster$.pipe( - scan( - (prev, curr) => - [prev?.[1], curr] as [ - EbmlCluster | undefined, - EbmlCluster | undefined, - ], - [undefined, undefined] as [ - EbmlCluster | undefined, - EbmlCluster | undefined, - ] - ), - filter((c) => c[1]?.timestamp! > seekTime), - map((c) => c[0] ?? c[1]!) - ); - }; - - const seekWithCues = ( - cues: EbmlCues, - cluster$: Observable, - seekTime: number - ): Observable => { - if (seekTime === 0) { - return cluster$; - } - - const cuePoint = cues.findClosestCue(seekTime); - - if (!cuePoint) { - return seekWithoutCues(cluster$, seekTime); - } - - return createRangedEbmlStream( - this.src, - cuePoint.position + segment.dataOffset - ).pipe( - switchMap((req) => req.ebml$), - filter(isTagEnd), - filter((tag) => tag.id === EbmlTagIdEnum.Cluster), - map((c) => new EbmlCluster(c)) - ); - }; - - const seek = (seekTime: number): Observable => { - return merge( - withCues$.pipe( - switchMap((s) => - seekWithCues(new EbmlCues(s.cuesNode!), cluster$, seekTime) - ) - ), - withoutCues$.pipe( - switchMap((_) => seekWithoutCues(cluster$, seekTime)) - ) - ); - }; - - return { - startTag, - head, - segment, - tag$, - meta$, - cluster$, - withMeta$, - withCues$, - withoutCues$, - seekWithCues, - seekWithoutCues, - seek, - }; - }) - ); - - return { - segments$, - head, - totalSize, - ebml$, - controller, - response, - }; - }) - ); - - this.subscripton = ebmlInit$ + this.subscripton = controller$ .pipe( - switchMap(({ segments$ }) => segments$), - take(1), - switchMap(({ seek }) => seek(2000)) + switchMap(({ segments$ }) => segments$.pipe(take(1))), + switchMap(({ seek }) => seek(0)) ) - .subscribe(console.log); + .subscribe((cluster) => console.log(cluster)); + + const videoDecoder = new VideoDecoder({ + output: (frame) => {}, + error: (e) => { + e; + }, + }); } connectedCallback(): void { @@ -293,6 +40,7 @@ export class VideoPipelineDemo extends LitElement { disconnectedCallback(): void { super.disconnectedCallback(); + this.subscripton?.unsubscribe(); } render() { diff --git a/apps/playground/tsconfig.json b/apps/playground/tsconfig.json index 8d8a430..c37b5e6 100644 --- a/apps/playground/tsconfig.json +++ b/apps/playground/tsconfig.json @@ -7,7 +7,12 @@ "experimentalDecorators": true, "module": "ESNext", "moduleResolution": "bundler", - "useDefineForClassFields": false + "useDefineForClassFields": false, + "paths": { + "@/*": [ + "./src/*" + ] + } }, "include": [ "src" diff --git a/apps/proxy/.whistle/rules/files/0.konoplayer b/apps/proxy/.whistle/rules/files/0.konoplayer index 1fdb25a..7564a08 100644 --- a/apps/proxy/.whistle/rules/files/0.konoplayer +++ b/apps/proxy/.whistle/rules/files/0.konoplayer @@ -5,6 +5,7 @@ } ``` +# ^https://konoplayer.com/api/static/*** resSpeed://1024K ^https://konoplayer.com/api*** reqHeaders://{x-forwarded.json} http://127.0.0.1:5001/api$1 ^https://konoplayer.com/*** reqHeaders://{x-forwarded.json} http://127.0.0.1:5000/$1 excludeFilter://^https://konoplayer.com/api ^wss://konoplayer.com/*** reqHeaders://{x-forwarded.json} ws://127.0.0.1:5000/$1 excludeFilter://^wss://konoplayer.com/api \ No newline at end of file diff --git a/biome.jsonc b/biome.jsonc index 4652af4..b2553b5 100644 --- a/biome.jsonc +++ b/biome.jsonc @@ -3,11 +3,6 @@ "extends": [ "ultracite" ], - "javascript": { - "globals": [ - "Liveblocks" - ] - }, "linter": { "rules": { "style": { @@ -26,12 +21,6 @@ }, "files": { "ignore": [ - "packages/design-system/components/ui/**", - "packages/design-system/lib/**", - "packages/design-system/hooks/**", - "packages/collaboration/config.ts", - "apps/docs/**/*.json", - "apps/email/.react-email/**", ".vscode/*.json" ] }, diff --git a/package.json b/package.json index df36e2b..b491de8 100644 --- a/package.json +++ b/package.json @@ -19,8 +19,10 @@ }, "dependencies": { "@types/lodash-es": "^4.17.12", + "arktype": "^2.1.10", "lodash-es": "^4.17.21", "mnemonist": "^0.40.3", - "rxjs": "^7.8.2" + "rxjs": "^7.8.2", + "type-fest": "^4.37.0" } } \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a96fd04..089e7f8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -11,6 +11,9 @@ importers: '@types/lodash-es': specifier: ^4.17.12 version: 4.17.12 + arktype: + specifier: ^2.1.10 + version: 2.1.10 lodash-es: specifier: ^4.17.21 version: 4.17.21 @@ -20,6 +23,9 @@ importers: rxjs: specifier: ^7.8.2 version: 7.8.2 + type-fest: + specifier: ^4.37.0 + version: 4.37.0 devDependencies: '@biomejs/biome': specifier: 1.9.4 @@ -74,8 +80,8 @@ importers: apps/playground: dependencies: konoebml: - specifier: 0.1.0-rc.6 - version: 0.1.0-rc.6 + specifier: 0.1.0-rc.8 + version: 0.1.0-rc.8 lit: specifier: ^3.2.1 version: 3.2.1 @@ -129,6 +135,12 @@ packages: resolution: {integrity: sha512-cGGqUGqBXIGJkeL65l70y0BflDAu/0Zi/ohbYat3hvadFfumRJnVElVfJ59JtWO7FfKQjxcwCVTyuQ/tevX/9A==} engines: {node: ^18.19.1 || ^20.11.1 || >=22.0.0, npm: ^6.11.0 || ^7.5.6 || >=8.0.0, yarn: '>= 1.13.0'} + '@ark/schema@0.45.0': + resolution: {integrity: sha512-3XlMWkZbEjh0YsF92vnnRNCWNRNhRKDTf6XhugyCXH0YRFuM+w1vFLDbB2JLfZloEd7i5cbqsLaDLzyBZbPrSg==} + + '@ark/util@0.45.0': + resolution: {integrity: sha512-Z1gHEGbpPzLtPmYb932t2B++6YonlUi1Fa14IQ4vhsGMWhd81Mi1miUmdZXW4fNI/wg1saT7H2/5cAuONgTXhg==} + '@babel/code-frame@7.26.2': resolution: {integrity: sha512-RJlIHRueQgwWitWgF8OdFYGZX328Ax5BCemNGlqHfplnRT9ESi8JkFlvaVYbS+UubVY6dpv87Fs2u5M29iNFVQ==} engines: {node: '>=6.9.0'} @@ -1095,6 +1107,9 @@ packages: argparse@2.0.1: resolution: {integrity: sha512-8+9WqebbFzpX9OR+Wa6O29asIogeRMzcGtAINdpMHHyAg10f05aSFVBbcEqGf/PXw1EjAZ+q2/bEBg3DvurK3Q==} + arktype@2.1.10: + resolution: {integrity: sha512-KqbrzI9qIGrQUClifyS1HpUp/oTSRtGDvnMKzwg2TAvxRpynY1mn/ubXaxAAdGPOM8V3pBqwb01Z6TcXqhBxzQ==} + array-flatten@1.1.1: resolution: {integrity: sha512-PCVAQswWemu6UdxsDFFX/+gVeYqKAod3D3UVm91jHwynguOwAvYPhx8nNlM++NqRcK6CxxpUafjmhIdKiHibqg==} @@ -1838,8 +1853,8 @@ packages: resolution: {integrity: sha512-dcS1ul+9tmeD95T+x28/ehLgd9mENa3LsvDTtzm3vyBEO7RPptvAD+t44WVXaUjTBRcrpFeFlC8WCruUR456hw==} engines: {node: '>=0.10.0'} - konoebml@0.1.0-rc.6: - resolution: {integrity: sha512-PM81HV0OVG0eGo+195azreUtp08FO9vLLrIvrUHbsSQHL/wq7TySU/6AaLm31e3ABKsJ65gpsQr6lcJ5xd8y3Q==} + konoebml@0.1.0-rc.8: + resolution: {integrity: sha512-fR4DZqCskLKxGBMc58gpOOzajFrfu9hQC7WZd8yGiIxLVhDkzBnihXqlsWJU6Qw77ukMOGGkbeM2uqQyv5dO3w==} engines: {node: '>= 18.0.0'} lines-and-columns@1.2.4: @@ -2750,6 +2765,12 @@ snapshots: transitivePeerDependencies: - chokidar + '@ark/schema@0.45.0': + dependencies: + '@ark/util': 0.45.0 + + '@ark/util@0.45.0': {} + '@babel/code-frame@7.26.2': dependencies: '@babel/helper-validator-identifier': 7.25.9 @@ -3628,6 +3649,11 @@ snapshots: argparse@2.0.1: {} + arktype@2.1.10: + dependencies: + '@ark/schema': 0.45.0 + '@ark/util': 0.45.0 + array-flatten@1.1.1: {} array-timsort@1.0.3: {} @@ -4436,7 +4462,7 @@ snapshots: kind-of@6.0.3: {} - konoebml@0.1.0-rc.6: + konoebml@0.1.0-rc.8: dependencies: mnemonist: 0.40.3 type-fest: 4.37.0