refactor: rewrite playground

This commit is contained in:
master 2025-03-27 02:54:04 +08:00
parent 3c317627e7
commit 39e17eb6a5
19 changed files with 1161 additions and 1067 deletions

View File

@ -4,28 +4,30 @@ import {
animationFrames,
BehaviorSubject,
combineLatest,
ReplaySubject,
EMPTY,
map,
Observable,
shareReplay,
Subject,
Subscription,
switchMap,
take,
tap,
distinctUntilChanged,
fromEvent, withLatestFrom, share, delay, delayWhen, from, of,
fromEvent,
share,
takeUntil,
firstValueFrom,
} from 'rxjs';
import { createEbmlController } from '@konoplayer/matroska/reactive';
import {
TrackTypeRestrictionEnum,
type ClusterType,
} from '@konoplayer/matroska/schema';
import { createMatroska } from '@konoplayer/matroska/model';
import { createRef, ref, type Ref } from 'lit/directives/ref.js';
import { Queue } from 'mnemonist';
import type {SegmentComponent, AudioTrackContext, VideoTrackContext} from "@konoplayer/matroska/systems";
import type {
AudioTrackContext,
VideoTrackContext,
} from '@konoplayer/matroska/systems';
import {
captureCanvasAsVideoSrcObject,
createRenderingContext,
renderBitmapAtRenderingContext,
} from '@konoplayer/core/graphics';
export class VideoPipelineDemo extends LitElement {
static styles = css``;
@ -39,227 +41,116 @@ export class VideoPipelineDemo extends LitElement {
@property({ type: Number })
height = 720;
canvasRef: Ref<HTMLCanvasElement> = createRef();
destroyRef$ = new Subject<void>();
videoRef: Ref<HTMLVideoElement> = createRef();
renderingContext = createRenderingContext();
audioContext = new AudioContext();
canvasSource = new MediaSource();
seek$ = new ReplaySubject<number>(1);
seeked$ = new Subject<number>();
cluster$ = new Subject<SegmentComponent<ClusterType>>();
videoFrameBuffer$ = new BehaviorSubject(new Queue<VideoFrame>());
audioFrameBuffer$ = new BehaviorSubject(new Queue<AudioData>());
pipeline$$?: Subscription;
private startTime = 0;
paused$ = new BehaviorSubject<boolean>(false);
ended$ = new BehaviorSubject<boolean>(false);
private preparePipeline() {
currentTime$ = new BehaviorSubject<number>(0);
duration$ = new BehaviorSubject<number>(0);
frameRate$ = new BehaviorSubject<number>(30);
videoTrack$ = new BehaviorSubject<VideoTrackContext | undefined>(undefined);
audioTrack$ = new BehaviorSubject<AudioTrackContext | undefined>(undefined);
private async preparePipeline() {
const src = this.src;
const destroyRef$ = this.destroyRef$;
if (!src) {
return;
}
const { controller$ } = createEbmlController({
url: src,
});
const segmentContext$ = controller$.pipe(
switchMap(({ segments$ }) => segments$.pipe(take(1)))
const {
segment: {
seek,
defaultVideoTrack$,
defaultAudioTrack$,
videoTrackDecoder,
audioTrackDecoder,
},
} = await firstValueFrom(
createMatroska({
url: src,
})
);
const videoTrack$ = segmentContext$.pipe(
)
const currentCluster$ = combineLatest({
seekTime: this.seek$,
segmentContext: segmentContext$,
}).pipe(
delayWhen(({ segmentContext: { segment } }) => from(segment.track.flushContexts())),
switchMap(({ seekTime, segmentContext }) => combineLatest({
segmentContext: of(segmentContext),
cluster: segmentContext.seek(seekTime),
})),
const currentCluster$ = this.seeked$.pipe(
switchMap((seekTime) => seek(seekTime)),
share()
);
const decodeVideo$ = currentCluster$.pipe(
defaultVideoTrack$
.pipe(takeUntil(destroyRef$), take(1))
.subscribe(this.videoTrack$);
)
defaultAudioTrack$
.pipe(takeUntil(destroyRef$), take(1))
.subscribe(this.audioTrack$);
const decode$ = segmentContext$.pipe(
switchMap(({ withMeta$ }) => withMeta$),
map((segment) => {
const trackSystem = segment.track;
const infoSystem = segment.info;
const videoTrack = trackSystem.getTrackContext<VideoTrackContext>({
predicate: (c) =>
c.TrackType === TrackTypeRestrictionEnum.VIDEO &&
c.FlagEnabled !== 0,
});
const audioTrack = trackSystem.getTrackContext({
predicate: (c) =>
c.TrackType === TrackTypeRestrictionEnum.AUDIO &&
c.FlagEnabled !== 0,
});
const videoDecode$ = track
const videoDecode$ = tracks.video
? new Observable<VideoFrame>((subscriber) => {
let isFinalized = false;
const videoTrack = tracks.video!;
const decoder = new VideoDecoder({
output: (frame) => {
subscriber.next(frame);
},
error: (e) => {
if (!isFinalized) {
isFinalized = true;
subscriber.error(e);
}
},
});
decoder.configure({
codec: 'hev1.2.2.L93.B0', // codec: 'vp8',
hardwareAcceleration: 'prefer-hardware',
description: videoTrack.CodecPrivate, // Uint8Array包含 VPS/SPS/PPS
});
const sub = this.cluster$.subscribe((c) => {
if (!isFinalized) {
for (const b of (c.SimpleBlock || []).filter(
(b) => b.track === videoTrack.TrackNumber
)) {
const chunk = new EncodedVideoChunk({
type: b.keyframe ? 'key' : 'delta',
timestamp:
((infoSystem.info.TimestampScale as number) / 1000) *
((c.Timestamp as number) + b.value),
data: b.payload,
});
decoder.decode(chunk);
}
}
});
return () => {
if (!isFinalized) {
isFinalized = true;
decoder.close();
}
sub.unsubscribe();
};
})
: EMPTY;
const audioDecode$ = tracks.audio
? new Observable<AudioData>((subscriber) => {
let isFinalized = false;
const decoder = new AudioDecoder({
output: (audioData) => {
subscriber.next(audioData);
},
error: (e) => {
if (!isFinalized) {
isFinalized = true;
subscriber.error(e);
}
},
});
const audioTrack = tracks.audio!;
const sampleRate = audioTrack.Audio?.SamplingFrequency || 44100;
const codec = 'mp4a.40.2';
const numberOfChannels =
(audioTrack.Audio?.Channels as number) || 2;
const duration =
Math.round(Number(audioTrack.DefaultDuration) / 1000) ||
Math.round((1024 / sampleRate) * 1000000);
decoder.configure({
codec: codec,
description: audioTrack.CodecPrivate,
numberOfChannels,
sampleRate,
});
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: <explanation>
const sub = this.cluster$.subscribe((c) => {
if (!isFinalized) {
for (const b of (c.SimpleBlock || []).filter(
(b) => b.track === audioTrack.TrackNumber
)) {
const blockTime = (c.Timestamp as number) + b.value;
let n = 0;
for (const f of b.frames) {
const offsetTimeUs = (n + 1) * duration;
decoder.decode(
new EncodedAudioChunk({
type: b.keyframe ? 'key' : 'delta',
timestamp:
((infoSystem.info.TimestampScale as number) /
1000) *
blockTime +
offsetTimeUs,
data: f,
})
);
n += 1;
}
}
}
});
return () => {
if (!isFinalized) {
isFinalized = true;
}
sub.unsubscribe();
};
})
: EMPTY;
return {
video$: videoDecode$,
audio$: audioDecode$,
};
}),
shareReplay(1)
);
const addToVideoFrameBuffer$ = decode$.pipe(
switchMap((decode) => decode.video$),
tap((frame) => {
const buffer = this.videoFrameBuffer$.getValue();
this.videoTrack$
.pipe(
takeUntil(this.destroyRef$),
map((track) =>
track ? videoTrackDecoder(track, currentCluster$) : undefined
),
switchMap((decoder) => {
if (!decoder) {
return EMPTY;
}
return decoder.frame$;
})
)
.subscribe((frame) => {
const buffer = this.videoFrameBuffer$.value;
buffer.enqueue(frame);
this.videoFrameBuffer$.next(buffer);
})
);
});
const addToAudioFrameBuffer$ = decode$.pipe(
switchMap((decode) => decode.audio$),
tap((frame) => {
const buffer = this.audioFrameBuffer$.getValue();
this.audioTrack$
.pipe(
takeUntil(this.destroyRef$),
map((track) =>
track ? audioTrackDecoder(track, currentCluster$) : undefined
),
switchMap((decoder) => {
if (!decoder) {
return EMPTY;
}
return decoder.frame$;
})
)
.subscribe((frame) => {
const buffer = this.audioFrameBuffer$.value;
buffer.enqueue(frame);
this.audioFrameBuffer$.next(buffer);
})
);
});
const audio$ = combineLatest({
combineLatest({
paused: this.paused$,
ended: this.ended$,
buffered: this.audioFrameBuffer$.pipe(
map((q) => q.size >= 1),
distinctUntilChanged()
),
}).pipe(
map(({ ended, paused, buffered }) => !paused && !ended && !!buffered),
switchMap((enabled) => (enabled ? animationFrames() : EMPTY)),
// biome-ignore lint/complexity/noExcessiveCognitiveComplexity: <explanation>
tap(() => {
})
.pipe(
takeUntil(this.destroyRef$),
map(({ ended, paused, buffered }) => !paused && !ended && !!buffered),
switchMap((enabled) => (enabled ? animationFrames() : EMPTY))
)
.subscribe(() => {
const audioFrameBuffer = this.audioFrameBuffer$.getValue();
const nowTime = performance.now();
const accTime = nowTime - this.startTime;
@ -315,20 +206,22 @@ export class VideoPipelineDemo extends LitElement {
if (audioChanged) {
this.audioFrameBuffer$.next(this.audioFrameBuffer$.getValue());
}
})
);
});
const video$ = combineLatest({
combineLatest({
paused: this.paused$,
ended: this.ended$,
buffered: this.videoFrameBuffer$.pipe(
map((q) => q.size >= 1),
distinctUntilChanged()
),
}).pipe(
map(({ ended, paused, buffered }) => !paused && !ended && !!buffered),
switchMap((enabled) => (enabled ? animationFrames() : EMPTY)),
tap(() => {
})
.pipe(
takeUntil(this.destroyRef$),
map(({ ended, paused, buffered }) => !paused && !ended && !!buffered),
switchMap((enabled) => (enabled ? animationFrames() : EMPTY))
)
.subscribe(async () => {
const videoFrameBuffer = this.videoFrameBuffer$.getValue();
let videoChanged = false;
const nowTime = performance.now();
@ -337,16 +230,10 @@ export class VideoPipelineDemo extends LitElement {
const firstVideo = videoFrameBuffer.peek();
if (firstVideo && firstVideo.timestamp <= accTime * 1000) {
const videoFrame = videoFrameBuffer.dequeue()!;
const canvas = this.canvasRef.value;
const canvas2dContext = canvas?.getContext('2d');
if (canvas2dContext) {
canvas2dContext.drawImage(
videoFrame,
0,
0,
this.width,
this.height
);
const renderingContext = this.renderingContext;
if (renderingContext) {
const bitmap = await createImageBitmap(videoFrame);
renderBitmapAtRenderingContext(renderingContext, bitmap);
videoFrame.close();
videoChanged = true;
}
@ -357,49 +244,67 @@ export class VideoPipelineDemo extends LitElement {
if (videoChanged) {
this.videoFrameBuffer$.next(videoFrameBuffer);
}
})
);
});
this.pipeline$$ = new Subscription();
this.pipeline$$.add(audio$.subscribe());
this.pipeline$$.add(video$.subscribe());
this.pipeline$$.add(addToVideoFrameBuffer$.subscribe());
this.pipeline$$.add(addToAudioFrameBuffer$.subscribe());
this.pipeline$$.add(currentCluster$.subscribe(this.cluster$));
this.pipeline$$.add(
fromEvent(document.body, 'click').subscribe(() => {
fromEvent(document.body, 'click')
.pipe(takeUntil(this.destroyRef$))
.subscribe(() => {
this.audioContext.resume();
this.audioFrameBuffer$.next(this.audioFrameBuffer$.getValue());
})
);
});
}
connectedCallback(): void {
super.connectedCallback();
this.preparePipeline();
this.seek(0);
}
disconnectedCallback(): void {
super.disconnectedCallback();
this.pipeline$$?.unsubscribe();
}
seek(seekTime: number) {
this.seek$.next(seekTime);
}
play() {
this.paused$.next(false);
}
pause() {
this.paused$.next(true);
this.destroyRef$.next();
}
render() {
return html`
<canvas ref=${ref(this.canvasRef)} width=${this.width} height=${this.height}></canvas>
<video ref=${ref(this.videoRef)}></video>
`;
}
firstUpdated() {
const video = this.videoRef.value;
const context = this.renderingContext;
const frameRate$ = this.frameRate$;
const destroyRef$ = this.destroyRef$;
const currentTime$ = this.currentTime$;
const duration$ = this.duration$;
const seeked$ = this.seeked$;
if (!video) {
return;
}
const canvas = context.canvas as HTMLCanvasElement;
Object.defineProperty(video, 'duration', {
get: () => duration$.value,
set: (val: number) => {
duration$.next(val);
},
configurable: true,
});
Object.defineProperty(video, 'currentTime', {
get: () => currentTime$.value,
set: (val: number) => {
currentTime$.next(val);
seeked$.next(val);
},
configurable: true,
});
frameRate$
.pipe(takeUntil(destroyRef$), distinctUntilChanged())
.subscribe((frameRate) =>
captureCanvasAsVideoSrcObject(video, canvas, frameRate)
);
}
}

View File

@ -17,6 +17,7 @@
"devDependencies": {
"@biomejs/biome": "1.9.4",
"@types/node": "^22.13.11",
"@webgpu/types": "^0.1.59",
"change-case": "^5.4.4",
"happy-dom": "^17.4.4",
"tsx": "^4.19.3",

View File

@ -0,0 +1,35 @@
import { Observable } from 'rxjs';
// biome-ignore lint/correctness/noUndeclaredVariables: <explanation>
export function createAudioDecodeStream(configuration: AudioDecoderConfig): {
decoder: AudioDecoder;
frame$: Observable<AudioData>;
} {
let decoder!: VideoDecoder;
const frame$ = new Observable<AudioData>((subscriber) => {
let isFinalized = false;
decoder = new AudioDecoder({
output: (frame) => subscriber.next(frame),
error: (e) => {
if (!isFinalized) {
isFinalized = true;
subscriber.error(e);
}
},
});
decoder.configure(configuration);
return () => {
if (!isFinalized) {
isFinalized = true;
decoder.close();
}
};
});
return {
decoder,
frame$,
};
}

View File

@ -1,356 +0,0 @@
import {
BehaviorSubject,
distinctUntilChanged,
filter,
interval,
map,
merge, Observable,
Subject,
type Subscription, switchMap, takeUntil, tap,
} from 'rxjs';
import {NetworkState, ReadyState} from "./state.ts";
export interface Metadata {
duration: number
}
export abstract class VideoElementTrait {
private playbackTimer: Subscription | undefined;
_src$ = new BehaviorSubject<string>('');
_currentTime$ = new BehaviorSubject<number>(0);
_duration$ = new BehaviorSubject<number>(Number.NaN);
_paused$ = new BehaviorSubject<boolean>(true);
_ended$ = new BehaviorSubject<boolean>(false);
_volume$ = new BehaviorSubject<number>(1.0);
_muted$ = new BehaviorSubject<boolean>(false);
_playbackRate$ = new BehaviorSubject<number>(1.0);
_readyState$ = new BehaviorSubject<number>(0); // HAVE_NOTHING
_networkState$ = new BehaviorSubject<number>(0); // NETWORK_EMPTY
_width$ = new BehaviorSubject<number>(0);
_height$ = new BehaviorSubject<number>(0);
_videoWidth$ = new BehaviorSubject<number>(0); // 只读,视频内在宽度
_videoHeight$ = new BehaviorSubject<number>(0); // 只读,视频内在高度
_poster$ = new BehaviorSubject<string>('');
_destroyRef$ = new Subject<void>();
_progress$ = new Subject<Event>();
_error$ = new Subject<Event>();
_abort$ = new Subject<Event>();
_emptied$ = new Subject<Event>();
_stalled$ = new Subject<Event>();
_loadeddata$ = new Subject<Event>();
_playing$ = new Subject<Event>();
_waiting$ = new Subject<Event>();
_seeked$ = new Subject<Event>();
_timeupdate$ = new Subject<Event>();
_play$ = new Subject<Event>();
_resize$ = new Subject<Event>();
_setCurrentTime$ = new Subject<number>();
_setSrc$ = new Subject<string>();
_callLoadMetadataStart$ = new Subject<void>();
_callLoadMetadataEnd$ = new Subject<Metadata>();
_callLoadDataStart$ = new Subject<Metadata>();
_callLoadDataEnd$ = new Subject<void>();
protected constructor() {
this._setCurrentTime$.pipe(
takeUntil(this._destroyRef$))
.subscribe(this._currentTime$)
this.seeking$.pipe(
takeUntil(this._destroyRef$),
switchMap(() => this._seek()),
map(() => new Event("seeked"))
).subscribe(this._seeked$)
this._setSrc$.pipe(
takeUntil(this._destroyRef$),
).subscribe(this._src$)
this._setSrc$.pipe(
takeUntil(this._destroyRef$),
switchMap(() => this._load())
).subscribe();
this._readyState$.pipe(
takeUntil(this._destroyRef$),
filter((r) => r === ReadyState.HAVE_NOTHING),
map(() => 0)
).subscribe(this._currentTime$);
this._readyState$.pipe(
takeUntil(this._destroyRef$),
filter((r) => r === ReadyState.HAVE_NOTHING),
map(() => true),
).subscribe(this._paused$);
this._readyState$.pipe(
takeUntil(this._destroyRef$),
filter((r) => r === ReadyState.HAVE_NOTHING),
map(() => false)
).subscribe(this._ended$)
this._callLoadMetadataStart$.pipe(
takeUntil(this._destroyRef$),
map(() => NetworkState.NETWORK_LOADING)
).subscribe(
this._networkState$
);
this._callLoadDataEnd$.pipe(
takeUntil(this._destroyRef$),
map(() => NetworkState.NETWORK_IDLE)
).subscribe(this._networkState$);
this._callLoadMetadataEnd$.pipe(
takeUntil(this._destroyRef$),
map(() => ReadyState.HAVE_METADATA)
).subscribe(this._readyState$)
this._callLoadMetadataEnd$.pipe(
takeUntil(this._destroyRef$),
map(meta => meta.duration)
).subscribe(this._duration$);
this._callLoadDataEnd$.pipe(
takeUntil(this._destroyRef$),
map(() => ReadyState.HAVE_CURRENT_DATA)
).subscribe(this._readyState$);
}
get canplay$ () {
return this._readyState$.pipe(
filter((s) => {
return s >= ReadyState.HAVE_CURRENT_DATA
}),
distinctUntilChanged(),
map(() => new Event('canplay')),
)
}
get canplaythrough$ () {
return this._readyState$.pipe(
filter((s) => s >= ReadyState.HAVE_ENOUGH_DATA),
distinctUntilChanged(),
map(() => new Event('canplaythrough')),
)
}
get seeked$ () {
return this._seeked$.asObservable();
}
get loadstart$() {
return this._readyState$.pipe(
filter((s) => s === ReadyState.HAVE_ENOUGH_DATA),
distinctUntilChanged(),
map(() => new Event('loadstart'))
)
}
get loadedmetadata$() {
return this._readyState$.pipe(
filter((r) => r >= ReadyState.HAVE_METADATA),
distinctUntilChanged(),
map(() => new Event('loadedmetadata'))
);
}
get pause$() {
return this._paused$.pipe(
distinctUntilChanged(),
filter(s => s),
map(() => new Event('pause'))
)
}
get volumechange$() {
return merge(
this._volume$,
this._muted$,
).pipe(
map(() => new Event('volumechange'))
)
}
get ratechange$() {
return this._playbackRate$.pipe(
map(() => new Event('ratechange'))
)
}
get durationchange$() {
return this._duration$.pipe(
map(() => new Event('durationchange'))
)
}
get ended$() {
return this._ended$.pipe(
distinctUntilChanged(),
filter(s => s),
map(() => new Event('ended'))
)
}
get seeking$() {
return this._setCurrentTime$.pipe(
map(() => new Event('seeking'))
)
}
// 属性 getter/setter
get src(): string {
return this._src$.value;
}
set src(value: string) {
this._setSrc$.next(value);
}
get currentTime(): number {
return this._currentTime$.value;
}
set currentTime(value: number) {
if (value < 0 || value > this.duration) {
return
}
this._setCurrentTime$.next(
value
)
this._seeked$.next(new Event('seeked'));
this._timeupdate$.next(new Event('timeupdate'));
}
get duration(): number {
return this._duration$.value;
}
get paused(): boolean {
return this._paused$.value;
}
get ended(): boolean {
return this._ended$.value;
}
get volume(): number {
return this._volume$.value;
}
set volume(value: number) {
if (value < 0 || value > 1) {
return
}
this._volume$.next(value);
}
get muted(): boolean {
return this._muted$.value;
}
set muted(value: boolean) {
this._muted$.next(value);
}
get playbackRate(): number {
return this._playbackRate$.value;
}
set playbackRate(value: number) {
if (value <= 0) {
return;
}
this._playbackRate$.next(value);
}
get readyState(): number {
return this._readyState$.value;
}
get networkState(): number {
return this._networkState$.value;
}
load(): void {
this._load()
}
// 方法
_load(): Observable<void> {
this._callLoadMetadataStart$.next(undefined);
return this._loadMetadata()
.pipe(
tap((metadata) => this._callLoadMetadataEnd$.next(metadata)),
tap((metadata) => this._callLoadDataStart$.next(metadata)),
switchMap((metadata) => this._loadData(metadata)),
tap(() => this._callLoadDataEnd$)
)
}
play(): Promise<void> {
if (!this._paused$.value) {
return Promise.resolve()
}
if (this._readyState$.value < ReadyState.HAVE_FUTURE_DATA) {
this._waiting$.next(new Event('waiting'));
return Promise.reject(new Error('Not enough data'));
}
this._paused$.next(false);
this._play$.next(new Event('play'));
this._playing$.next(new Event('playing'));
// 模拟播放进度
this.playbackTimer = this._playbackRate$.pipe(
switchMap(playbackRate => interval(1000 / playbackRate)),
takeUntil(
merge(
this._paused$,
this._destroyRef$,
this._ended$
)
)
).subscribe(() => {
const newTime = this.currentTime + 1;
if (newTime >= this.duration) {
this._currentTime$.next(this.duration);
this._paused$.next(true);
this._ended$.next(true);
} else {
this._currentTime$.next(newTime);
this._timeupdate$.next(new Event('timeupdate'));
}
});
return Promise.resolve();
}
pause(): void {
if (this._paused$.value) {
return;
}
this._paused$.next(true);
}
canPlayType(type: string): string {
// 简化的实现,实际需要根据 MIME 类型检查支持情况
return type.includes('video/mp4') ? 'probably' : '';
}
addTextTrack(kind: string, label: string, language: string): void {
// 实现文本轨道逻辑(此处简化为占位符)
console.log(`Added text track: ${kind}, ${label}, ${language}`);
}
abstract _seek (): Observable<void>
abstract _loadMetadata (): Observable<Metadata>
abstract _loadData(metadata: Metadata): Observable<void>
[Symbol.dispose]() {
this._destroyRef$.next(undefined)
}
}

View File

@ -1,14 +0,0 @@
export enum NetworkState {
NETWORK_EMPTY = 0,
NETWORK_IDLE = 1,
NETWORK_LOADING = 2,
NETWORK_NO_SOURCE = 3,
}
export enum ReadyState {
HAVE_NOTHING = 0,
HAVE_METADATA = 1,
HAVE_CURRENT_DATA = 2,
HAVE_FUTURE_DATA = 3,
HAVE_ENOUGH_DATA = 4
}

View File

@ -0,0 +1,76 @@
import { Observable } from 'rxjs';
export type RenderingContext =
| ImageBitmapRenderingContext
| CanvasRenderingContext2D;
export function createRenderingContext(): RenderingContext {
const canvas = document.createElement('canvas');
const context =
canvas.getContext('bitmaprenderer') || canvas.getContext('2d');
if (!context) {
throw new DOMException(
'can not get rendering context of canvas',
'CanvasException'
);
}
return context;
}
export function renderBitmapAtRenderingContext(
context: RenderingContext,
bitmap: ImageBitmap
) {
const canvas = context.canvas;
if (bitmap.width !== canvas.width || bitmap.height !== canvas.height) {
canvas.width = bitmap.width;
canvas.height = bitmap.height;
}
if (context instanceof ImageBitmapRenderingContext) {
context.transferFromImageBitmap(bitmap);
} else {
context.drawImage(bitmap, 0, 0, bitmap.width, bitmap.height);
bitmap.close();
}
}
export function captureCanvasAsVideoSrcObject(
video: HTMLVideoElement,
canvas: HTMLCanvasElement,
frameRate: number
) {
video.srcObject = canvas.captureStream(frameRate);
}
export function createVideoDecodeStream(configuration: VideoDecoderConfig): {
decoder: VideoDecoder;
frame$: Observable<VideoFrame>;
} {
let decoder!: VideoDecoder;
const frame$ = new Observable<VideoFrame>((subscriber) => {
let isFinalized = false;
decoder = new VideoDecoder({
output: (frame) => subscriber.next(frame),
error: (e) => {
if (!isFinalized) {
isFinalized = true;
subscriber.error(e);
}
},
});
decoder.configure(configuration);
return () => {
if (!isFinalized) {
isFinalized = true;
decoder.close();
}
};
});
return {
decoder,
frame$,
};
}

View File

@ -3,9 +3,11 @@ import { ArkErrors, type } from 'arktype';
export const AAC_CODEC_TYPE = 'AAC';
export const AudioObjectTypeSchema = type('1 | 2 | 3 | 4 | 5 | 29 | 67');
export const AudioObjectTypeSchema = type('1 | 2 | 3 | 4 | 5 | 29 | 67 | 23');
export const SamplingFrequencyIndexSchema = type('1|2|3|4|5|6|7|8|9|10|11|12');
export const SamplingFrequencyIndexSchema = type(
'1 | 2 | 3 | 4 |5|6|7|8|9|10|11|12'
);
export const ChannelConfigurationSchema = type('1 | 2 | 3 | 4 | 5 | 6 | 7');
@ -108,3 +110,15 @@ export function genCodecIdByAudioSpecificConfig(
) {
return `mp4a.40.${config.audioObjectType}`;
}
export function samplesPerFrameByAACAudioObjectType(audioObjectType: number) {
switch (audioObjectType) {
case 5:
case 29:
return 2048;
case 23:
return 512;
default:
return 1024;
}
}

View File

@ -1,9 +1,13 @@
import {ParseCodecError, UnsupportedCodecError} from '@konoplayer/core/errors';
import {
ParseCodecError,
UnsupportedCodecError,
} from '@konoplayer/core/errors';
import { VideoCodec, AudioCodec } from '@konoplayer/core/codecs';
import type { TrackEntryType } from '../schema';
import {
genCodecIdByAudioSpecificConfig,
parseAudioSpecificConfig,
samplesPerFrameByAACAudioObjectType,
} from './aac';
import {
genCodecStringByAVCDecoderConfigurationRecord,
@ -19,7 +23,8 @@ import {
} from './hevc.ts';
import {
genCodecStringByVP9DecoderConfigurationRecord,
parseVP9DecoderConfigurationRecord, VP9_CODEC_TYPE,
parseVP9DecoderConfigurationRecord,
VP9_CODEC_TYPE,
} from './vp9.ts';
export const VideoCodecId = {
@ -123,7 +128,7 @@ export interface VideoDecoderConfigExt extends VideoDecoderConfig {
}
export function videoCodecIdRequirePeekingKeyframe(codecId: VideoCodecIdType) {
return codecId === VideoCodecId.VP9
return codecId === VideoCodecId.VP9;
}
export function videoCodecIdToWebCodecs(
@ -146,7 +151,10 @@ export function videoCodecIdToWebCodecs(
};
case VideoCodecId.VP9:
if (!keyframe) {
throw new ParseCodecError(VP9_CODEC_TYPE, 'keyframe is required to parse VP9 codec')
throw new ParseCodecError(
VP9_CODEC_TYPE,
'keyframe is required to parse VP9 codec'
);
}
return {
...shareOptions,
@ -200,11 +208,10 @@ export function videoCodecIdToWebCodecs(
export interface AudioDecoderConfigExt extends AudioDecoderConfig {
codecType: AudioCodec;
samplesPerFrame?: number;
}
export function isAudioCodecIdRequirePeekingKeyframe (
_track: TrackEntryType,
) {
export function isAudioCodecIdRequirePeekingKeyframe(_track: TrackEntryType) {
return false;
}
@ -231,6 +238,7 @@ export function audioCodecIdToWebCodecs(
...shareOptions,
codecType: AudioCodec.AAC,
codec: 'mp4a.40.1',
samplesPerFrame: 1024,
};
case AudioCodecId.AAC_MPEG2_LC:
case AudioCodecId.AAC_MPEG4_LC:
@ -238,6 +246,7 @@ export function audioCodecIdToWebCodecs(
...shareOptions,
codecType: AudioCodec.AAC,
codec: 'mp4a.40.2',
samplesPerFrame: 1024,
};
case AudioCodecId.AAC_MPEG2_SSR:
case AudioCodecId.AAC_MPEG4_SSR:
@ -245,12 +254,14 @@ export function audioCodecIdToWebCodecs(
...shareOptions,
codecType: AudioCodec.AAC,
codec: 'mp4a.40.3',
samplesPerFrame: 1024,
};
case AudioCodecId.AAC_MPEG4_LTP:
return {
...shareOptions,
codecType: AudioCodec.AAC,
codec: 'mp4a.40.4',
samplesPerFrame: 1024,
};
case AudioCodecId.AAC_MPEG2_LC_SBR:
case AudioCodecId.AAC_MPEG4_SBR:
@ -258,16 +269,25 @@ export function audioCodecIdToWebCodecs(
...shareOptions,
codecType: AudioCodec.AAC,
codec: 'mp4a.40.5',
samplesPerFrame: 2048,
};
case AudioCodecId.AAC:
if (codecPrivate) {
const config = parseAudioSpecificConfig(codecPrivate);
return {
...shareOptions,
codecType: AudioCodec.AAC,
codec: genCodecIdByAudioSpecificConfig(config),
samplesPerFrame: samplesPerFrameByAACAudioObjectType(
config.audioObjectType
),
};
}
return {
...shareOptions,
codecType: AudioCodec.AAC,
codec: codecPrivate
? genCodecIdByAudioSpecificConfig(
parseAudioSpecificConfig(codecPrivate)
)
: 'mp4a.40.2',
codec: 'mp4a.40.2',
samplesPerFrame: 1024,
};
case AudioCodecId.AC3:
case AudioCodecId.AC3_BSID9:
@ -275,6 +295,7 @@ export function audioCodecIdToWebCodecs(
...shareOptions,
codecType: AudioCodec.AC3,
codec: 'ac-3',
samplesPerFrame: 1536,
};
case AudioCodecId.EAC3:
case AudioCodecId.AC3_BSID10:
@ -282,21 +303,75 @@ export function audioCodecIdToWebCodecs(
...shareOptions,
codecType: AudioCodec.EAC3,
codec: 'ec-3',
// TODO: FIXME
// parse frame header
// samples per frame = numblkscod * 256
// most time numblkscod = 6
// samplesPerFrame: 1536,
};
case AudioCodecId.MPEG_L3:
return {
...shareOptions,
codecType: AudioCodec.MP3,
codec: 'mp3',
samplesPerFrame: 1152,
};
case AudioCodecId.VORBIS:
return { ...shareOptions, codecType: AudioCodec.Vorbis, codec: 'vorbis' };
return {
...shareOptions,
codecType: AudioCodec.Vorbis,
codec: 'vorbis',
/**
* TODO: FIXME
* read code private
* prase setup header
* ShortBlockSize = 2 ^ blocksize_0
* LongBlockSize = 2 ^ blocksize_1
*/
samplesPerFrame: 2048,
};
case AudioCodecId.FLAC:
return { ...shareOptions, codecType: AudioCodec.FLAC, codec: 'flac' };
return {
...shareOptions,
codecType: AudioCodec.FLAC,
codec: 'flac',
/**
* TODO: FIXME
* read code private
* get block size
*/
// samplesPerFrame: 4096,
};
case AudioCodecId.OPUS:
return { ...shareOptions, codecType: AudioCodec.Opus, codec: 'opus' };
return {
...shareOptions,
codecType: AudioCodec.Opus,
codec: 'opus',
/**
* TODO: FIXME
* Read TOC header from frame data
*/
// samplesPerFrame: 960,
};
case AudioCodecId.ALAC:
return { ...shareOptions, codecType: AudioCodec.ALAC, codec: 'alac' };
return {
...shareOptions,
codecType: AudioCodec.ALAC,
codec: 'alac',
/**
* TODO: FIXME
* parse private data and get frame length
* 00 00 10 00 // Frame Length (4096)
00 00 00 00 // Compatible Version (0)
00 10 // Bit Depth (16-bit)
40 00 // PB (like 40)
00 00 // MB (like 0)
00 00 // KB (like 0)
00 02 // Channels (2)
00 00 AC 44 // Sample Rate (44100Hz)
*/
// samplesPerFrame: 4096,
};
case AudioCodecId.PCM_INT_BIG:
if (bitDepth === 16) {
return {

View File

@ -0,0 +1,14 @@
import type { ClusterType } from '../schema';
export function* clusterBlocks(cluster: ClusterType) {
if (cluster.SimpleBlock) {
for (const simpleBlock of cluster.SimpleBlock) {
yield simpleBlock;
}
}
if (cluster.BlockGroup) {
for (const block of cluster.BlockGroup) {
yield block;
}
}
}

View File

@ -0,0 +1,69 @@
import type { CreateRangedStreamOptions } from '@konoplayer/core/data';
import { type EbmlEBMLTagType, EbmlTagIdEnum, EbmlTagPosition } from 'konoebml';
import {
switchMap,
filter,
take,
shareReplay,
map,
combineLatest,
of,
} from 'rxjs';
import { isTagIdPos } from '../util';
import { createRangedEbmlStream } from './resource';
import { type MatroskaSegmentModel, createMatroskaSegment } from './segment';
export type CreateMatroskaOptions = Omit<
CreateRangedStreamOptions,
'byteStart' | 'byteEnd'
>;
export interface MatroskaModel {
totalSize?: number;
initResponse: Response;
head: EbmlEBMLTagType;
segment: MatroskaSegmentModel;
}
export function createMatroska(options: CreateMatroskaOptions) {
const metadataRequest$ = createRangedEbmlStream({
...options,
byteStart: 0,
});
return metadataRequest$.pipe(
switchMap(({ totalSize, ebml$, response }) => {
const head$ = ebml$.pipe(
filter(isTagIdPos(EbmlTagIdEnum.EBML, EbmlTagPosition.End)),
take(1),
shareReplay(1)
);
const segmentStart$ = ebml$.pipe(
filter(isTagIdPos(EbmlTagIdEnum.Segment, EbmlTagPosition.Start))
);
/**
* while [matroska v4](https://www.matroska.org/technical/elements.html) doc tell that there is only one segment in a file
* some mkv generated by strange tools will emit several
*/
const segments$ = segmentStart$.pipe(
map((startTag) =>
createMatroskaSegment({
startTag,
matroskaOptions: options,
ebml$,
})
)
);
return combineLatest({
segment: segments$.pipe(take(1)),
head: head$,
totalSize: of(totalSize),
initResponse: of(response),
});
}),
shareReplay(1)
);
}

View File

@ -0,0 +1,90 @@
import {
type CreateRangedStreamOptions,
createRangedStream,
} from '@konoplayer/core/data';
import { type EbmlTagType, EbmlStreamDecoder, EbmlTagIdEnum } from 'konoebml';
import { Observable, from, switchMap, share, defer, EMPTY, of } from 'rxjs';
import { waitTick } from '../util';
export function createRangedEbmlStream({
url,
byteStart = 0,
byteEnd,
}: CreateRangedStreamOptions): Observable<{
ebml$: Observable<EbmlTagType>;
totalSize?: number;
response: Response;
body: ReadableStream<Uint8Array>;
controller: AbortController;
}> {
const stream$ = from(createRangedStream({ url, byteStart, byteEnd }));
return stream$.pipe(
switchMap(({ controller, body, totalSize, response }) => {
let requestCompleted = false;
const originRequest$ = new Observable<EbmlTagType>((subscriber) => {
body
.pipeThrough(
new EbmlStreamDecoder({
streamStartOffset: byteStart,
collectChild: (child) => child.id !== EbmlTagIdEnum.Cluster,
backpressure: {
eventLoop: waitTick,
},
})
)
.pipeTo(
new WritableStream({
write: async (tag) => {
await waitTick();
subscriber.next(tag);
},
close: () => {
if (!requestCompleted) {
requestCompleted = true;
subscriber.complete();
}
},
})
)
.catch((error) => {
if (requestCompleted && error?.name === 'AbortError') {
return;
}
requestCompleted = true;
subscriber.error(error);
});
return () => {
requestCompleted = true;
controller.abort();
};
}).pipe(
share({
resetOnComplete: false,
resetOnError: false,
resetOnRefCountZero: true,
})
);
const ebml$ = defer(() =>
requestCompleted ? EMPTY : originRequest$
).pipe(
share({
resetOnError: false,
resetOnComplete: true,
resetOnRefCountZero: true,
})
);
return of({
ebml$,
totalSize,
response,
body,
controller,
});
})
);
}

View File

@ -0,0 +1,419 @@
import { createAudioDecodeStream } from '@konoplayer/core/audition';
import { createVideoDecodeStream } from '@konoplayer/core/graphics';
import {
type EbmlSegmentTagType,
type EbmlTagType,
EbmlTagIdEnum,
EbmlTagPosition,
} from 'konoebml';
import {
type Observable,
scan,
takeWhile,
share,
map,
last,
switchMap,
shareReplay,
EMPTY,
filter,
withLatestFrom,
take,
of,
merge,
isEmpty,
finalize,
} from 'rxjs';
import type { CreateMatroskaOptions } from '.';
import { type ClusterType, TrackTypeRestrictionEnum } from '../schema';
import {
SegmentSystem,
type SegmentComponent,
type VideoTrackContext,
type AudioTrackContext,
SEEK_ID_KAX_CUES,
SEEK_ID_KAX_TAGS,
type CueSystem,
} from '../systems';
import {
standardTrackPredicate,
standardTrackPriority,
} from '../systems/track';
import { isTagIdPos } from '../util';
import { createRangedEbmlStream } from './resource';
export interface CreateMatroskaSegmentOptions {
matroskaOptions: CreateMatroskaOptions;
startTag: EbmlSegmentTagType;
ebml$: Observable<EbmlTagType>;
}
export interface MatroskaSegmentModel {
startTag: EbmlSegmentTagType;
segment: SegmentSystem;
metadataTags$: Observable<EbmlTagType>;
loadedMetadata$: Observable<SegmentSystem>;
loadedTags$: Observable<SegmentSystem>;
loadedCues$: Observable<SegmentSystem>;
seek: (seekTime: number) => Observable<SegmentComponent<ClusterType>>;
videoTrackDecoder: (
track: VideoTrackContext,
cluster$: Observable<ClusterType>
) => {
track: VideoTrackContext;
decoder: VideoDecoder;
frame$: Observable<VideoFrame>;
};
audioTrackDecoder: (
track: AudioTrackContext,
cluster$: Observable<ClusterType>
) => {
track: AudioTrackContext;
decoder: AudioDecoder;
frame$: Observable<AudioData>;
};
defaultVideoTrack$: Observable<VideoTrackContext | undefined>;
defaultAudioTrack$: Observable<AudioTrackContext | undefined>;
}
export function createMatroskaSegment({
matroskaOptions,
startTag,
ebml$,
}: CreateMatroskaSegmentOptions): MatroskaSegmentModel {
const segment = new SegmentSystem(startTag);
const clusterSystem = segment.cluster;
const seekSystem = segment.seek;
const metaScan$ = ebml$.pipe(
scan(
(acc, tag) => {
acc.segment.scanMeta(tag);
acc.tag = tag;
return acc;
},
{
segment,
tag: undefined as unknown as EbmlTagType,
}
),
takeWhile((acc) => acc.segment.canCompleteMeta(), true),
share({
resetOnComplete: false,
resetOnError: false,
resetOnRefCountZero: true,
})
);
const metadataTags$ = metaScan$.pipe(map(({ tag }) => tag));
const loadedMetadata$ = metaScan$.pipe(
last(),
switchMap(({ segment }) => segment.completeMeta()),
shareReplay(1)
);
const loadedRemoteCues$ = loadedMetadata$.pipe(
switchMap((s) => {
const cueSystem = s.cue;
const seekSystem = s.seek;
if (cueSystem.prepared) {
return EMPTY;
}
const remoteCuesTagStartOffset =
seekSystem.seekOffsetBySeekId(SEEK_ID_KAX_CUES);
if (remoteCuesTagStartOffset! >= 0) {
return createRangedEbmlStream({
...matroskaOptions,
byteStart: remoteCuesTagStartOffset,
}).pipe(
switchMap((req) => req.ebml$),
filter(isTagIdPos(EbmlTagIdEnum.Cues, EbmlTagPosition.End)),
withLatestFrom(loadedMetadata$),
map(([cues, withMeta]) => {
withMeta.cue.prepareCuesWithTag(cues);
return withMeta;
})
);
}
return EMPTY;
}),
take(1),
shareReplay(1)
);
const loadedLocalCues$ = loadedMetadata$.pipe(
switchMap((s) => (s.cue.prepared ? of(s) : EMPTY)),
shareReplay(1)
);
const loadedEmptyCues$ = merge(loadedLocalCues$, loadedRemoteCues$).pipe(
isEmpty(),
switchMap((empty) => (empty ? loadedMetadata$ : EMPTY))
);
const loadedCues$ = merge(
loadedLocalCues$,
loadedRemoteCues$,
loadedEmptyCues$
).pipe(take(1));
const loadedRemoteTags$ = loadedMetadata$.pipe(
switchMap((s) => {
const tagSystem = s.tag;
const seekSystem = s.seek;
if (tagSystem.prepared) {
return EMPTY;
}
const remoteTagsTagStartOffset =
seekSystem.seekOffsetBySeekId(SEEK_ID_KAX_TAGS);
if (remoteTagsTagStartOffset! >= 0) {
return createRangedEbmlStream({
...matroskaOptions,
byteStart: remoteTagsTagStartOffset,
}).pipe(
switchMap((req) => req.ebml$),
filter(isTagIdPos(EbmlTagIdEnum.Tags, EbmlTagPosition.End)),
withLatestFrom(loadedMetadata$),
map(([tags, withMeta]) => {
withMeta.tag.prepareTagsWithTag(tags);
return withMeta;
})
);
}
return EMPTY;
}),
take(1),
shareReplay(1)
);
const loadedLocalTags$ = loadedMetadata$.pipe(
switchMap((s) => (s.tag.prepared ? of(s) : EMPTY)),
shareReplay(1)
);
const loadedEmptyTags$ = merge(loadedRemoteTags$, loadedLocalTags$).pipe(
isEmpty(),
switchMap((empty) => (empty ? loadedMetadata$ : EMPTY))
);
const loadedTags$ = merge(
loadedLocalTags$,
loadedRemoteTags$,
loadedEmptyTags$
).pipe(take(1));
const seekWithoutCues = (
seekTime: number
): Observable<SegmentComponent<ClusterType>> => {
const request$ = loadedMetadata$.pipe(
switchMap(() =>
createRangedEbmlStream({
...matroskaOptions,
byteStart: seekSystem.firstClusterOffset,
})
)
);
const cluster$ = request$.pipe(
switchMap((req) => req.ebml$),
filter(isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.End)),
map((tag) => clusterSystem.addClusterWithTag(tag))
);
if (seekTime === 0) {
return cluster$;
}
return cluster$.pipe(
scan(
(acc, curr) => {
// avoid object recreation
acc.prev = acc.next;
acc.next = curr;
return acc;
},
{
prev: undefined as SegmentComponent<ClusterType> | undefined,
next: undefined as SegmentComponent<ClusterType> | undefined,
}
),
filter((c) => c.next?.Timestamp! > seekTime),
map((c) => c.prev ?? c.next!)
);
};
const seekWithCues = (
cueSystem: CueSystem,
seekTime: number
): Observable<SegmentComponent<ClusterType>> => {
if (seekTime === 0) {
return seekWithoutCues(seekTime);
}
const cuePoint = cueSystem.findClosestCue(seekTime);
if (!cuePoint) {
return seekWithoutCues(seekTime);
}
return createRangedEbmlStream({
...matroskaOptions,
byteStart: seekSystem.offsetFromSeekPosition(
cueSystem.getCueTrackPositions(cuePoint).CueClusterPosition as number
),
}).pipe(
switchMap((req) => req.ebml$),
filter(isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.End)),
map(clusterSystem.addClusterWithTag.bind(clusterSystem))
);
};
const seek = (
seekTime: number
): Observable<SegmentComponent<ClusterType>> => {
if (seekTime === 0) {
const subscription = loadedCues$.subscribe();
// if seekTime equals to 0 at start, reuse the initialize stream
return seekWithoutCues(seekTime).pipe(
finalize(() => {
subscription.unsubscribe();
})
);
}
return loadedCues$.pipe(
switchMap((segment) => {
const cueSystem = segment.cue;
if (cueSystem.prepared) {
return seekWithCues(cueSystem, seekTime);
}
return seekWithoutCues(seekTime);
})
);
};
const videoTrackDecoder = (
track: VideoTrackContext,
cluster$: Observable<ClusterType>
) => {
const { decoder, frame$ } = createVideoDecodeStream(track.configuration);
const clusterSystem = segment.cluster;
const decodeSubscription = cluster$.subscribe((cluster) => {
for (const block of clusterSystem.enumerateBlocks(
cluster,
track.trackEntry
)) {
const blockTime = Number(cluster.Timestamp) + block.relTime;
const blockDuration =
frames.length > 1 ? track.predictBlockDuration(blockTime) : 0;
const perFrameDuration =
frames.length > 1 && blockDuration
? blockDuration / block.frames.length
: 0;
for (const frame of block.frames) {
const chunk = new EncodedVideoChunk({
type: block.keyframe ? 'key' : 'delta',
data: frame,
timestamp: blockTime + perFrameDuration,
});
decoder.decode(chunk);
}
}
});
return {
track,
decoder,
frame$: frame$
.pipe(
finalize(() => {
decodeSubscription.unsubscribe();
})
)
.pipe(share()),
};
};
const audioTrackDecoder = (
track: AudioTrackContext,
cluster$: Observable<ClusterType>
) => {
const { decoder, frame$ } = createAudioDecodeStream(track.configuration);
const clusterSystem = segment.cluster;
const decodeSubscription = cluster$.subscribe((cluster) => {
for (const block of clusterSystem.enumerateBlocks(
cluster,
track.trackEntry
)) {
const blockTime = Number(cluster.Timestamp) + block.relTime;
const blockDuration =
frames.length > 1 ? track.predictBlockDuration(blockTime) : 0;
const perFrameDuration =
frames.length > 1 && blockDuration
? blockDuration / block.frames.length
: 0;
let i = 0;
for (const frame of block.frames) {
const chunk = new EncodedAudioChunk({
type: block.keyframe ? 'key' : 'delta',
data: frame,
timestamp: blockTime + perFrameDuration * i,
});
i++;
decoder.decode(chunk);
}
}
});
return {
track,
decoder,
frame$: frame$.pipe(finalize(() => decodeSubscription.unsubscribe())),
};
};
const defaultVideoTrack$ = loadedMetadata$.pipe(
map((segment) =>
segment.track.getTrackContext<VideoTrackContext>({
predicate: (track) =>
track.TrackType === TrackTypeRestrictionEnum.VIDEO &&
standardTrackPredicate(track),
priority: standardTrackPriority,
})
)
);
const defaultAudioTrack$ = loadedMetadata$.pipe(
map((segment) =>
segment.track.getTrackContext<AudioTrackContext>({
predicate: (track) =>
track.TrackType === TrackTypeRestrictionEnum.AUDIO &&
standardTrackPredicate(track),
priority: standardTrackPriority,
})
)
);
return {
startTag,
segment,
metadataTags$,
loadedMetadata$,
loadedTags$,
loadedCues$,
seek,
videoTrackDecoder,
audioTrackDecoder,
defaultVideoTrack$,
defaultAudioTrack$,
};
}

View File

@ -1,399 +0,0 @@
import {
EbmlStreamDecoder,
EbmlTagIdEnum,
EbmlTagPosition,
type EbmlTagType,
} from 'konoebml';
import {
defer,
EMPTY,
filter,
finalize,
from,
isEmpty, last,
map,
merge,
Observable,
of,
scan,
share,
shareReplay,
switchMap,
take,
takeWhile,
withLatestFrom,
} from 'rxjs';
import {
createRangedStream,
type CreateRangedStreamOptions,
} from '@konoplayer/core/data';
import { isTagIdPos, waitTick } from './util';
import type { ClusterType } from './schema';
import {SEEK_ID_KAX_CUES, SEEK_ID_KAX_TAGS, type CueSystem, type SegmentComponent, SegmentSystem} from "./systems";
export interface CreateRangedEbmlStreamOptions
extends CreateRangedStreamOptions {
}
export function createRangedEbmlStream({
url,
byteStart = 0,
byteEnd,
}: CreateRangedEbmlStreamOptions): Observable<{
ebml$: Observable<EbmlTagType>;
totalSize?: number;
response: Response;
body: ReadableStream<Uint8Array>;
controller: AbortController;
}> {
const stream$ = from(createRangedStream({ url, byteStart, byteEnd }));
return stream$.pipe(
switchMap(({ controller, body, totalSize, response }) => {
let requestCompleted = false;
const originRequest$ = new Observable<EbmlTagType>((subscriber) => {
body
.pipeThrough(
new EbmlStreamDecoder({
streamStartOffset: byteStart,
collectChild: (child) => child.id !== EbmlTagIdEnum.Cluster,
backpressure: {
eventLoop: waitTick,
},
})
)
.pipeTo(
new WritableStream({
write: async (tag) => {
await waitTick();
subscriber.next(tag);
},
close: () => {
if (!requestCompleted) {
requestCompleted = true;
subscriber.complete();
}
},
})
)
.catch((error) => {
if (requestCompleted && error?.name === 'AbortError') {
return;
}
requestCompleted = true;
subscriber.error(error);
});
return () => {
requestCompleted = true;
controller.abort();
};
}).pipe(
share({
resetOnComplete: false,
resetOnError: false,
resetOnRefCountZero: true,
})
);
const ebml$ = defer(() =>
requestCompleted ? EMPTY : originRequest$
).pipe(
share({
resetOnError: false,
resetOnComplete: true,
resetOnRefCountZero: true,
})
);
return of({
ebml$,
totalSize,
response,
body,
controller,
});
})
);
}
export interface CreateEbmlControllerOptions
extends Omit<CreateRangedEbmlStreamOptions, 'byteStart' | 'byteEnd'> {}
export function createEbmlController({
url,
...options
}: CreateEbmlControllerOptions) {
const metaRequest$ = createRangedEbmlStream({
...options,
url,
byteStart: 0,
});
const controller$ = metaRequest$.pipe(
map(({ totalSize, ebml$, response, controller }) => {
const head$ = ebml$.pipe(
filter(isTagIdPos(EbmlTagIdEnum.EBML, EbmlTagPosition.End)),
take(1),
shareReplay(1)
);
console.debug(
`stream of video "${url}" created, total size is ${totalSize ?? 'unknown'}`
);
const segmentStart$ = ebml$.pipe(
filter(isTagIdPos(EbmlTagIdEnum.Segment, EbmlTagPosition.Start))
);
/**
* while [matroska v4](https://www.matroska.org/technical/elements.html) doc tell that there is only one segment in a file
* some mkv generated by strange tools will emit several
*/
const segments$ = segmentStart$.pipe(
map((startTag) => {
const segment = new SegmentSystem(startTag);
const clusterSystem = segment.cluster;
const seekSystem = segment.seek;
const metaScan$ = ebml$.pipe(
scan(
(acc, tag) => {
acc.segment.scanMeta(tag);
acc.tag = tag;
return acc;
},
{
segment,
tag: undefined as unknown as EbmlTagType,
}
),
takeWhile((acc) => acc.segment.canCompleteMeta(), true),
share({
resetOnComplete: false,
resetOnError: false,
resetOnRefCountZero: true,
})
);
const meta$ = metaScan$.pipe(
map(({ tag }) => tag)
);
const withMeta$ = metaScan$.pipe(
last(),
switchMap(({ segment }) => segment.completeMeta()),
shareReplay(1)
);
const withRemoteCues$ = withMeta$.pipe(
switchMap((s) => {
const cueSystem = s.cue;
const seekSystem = s.seek;
if (cueSystem.prepared) {
return EMPTY;
}
const remoteCuesTagStartOffset =
seekSystem.seekOffsetBySeekId(SEEK_ID_KAX_CUES);
if (remoteCuesTagStartOffset! >= 0) {
return createRangedEbmlStream({
...options,
url,
byteStart: remoteCuesTagStartOffset,
}).pipe(
switchMap((req) => req.ebml$),
filter(isTagIdPos(EbmlTagIdEnum.Cues, EbmlTagPosition.End)),
withLatestFrom(withMeta$),
map(([cues, withMeta]) => {
withMeta.cue.prepareCuesWithTag(cues);
return withMeta;
})
);
}
return EMPTY;
}),
take(1),
shareReplay(1)
);
const withLocalCues$ = withMeta$.pipe(
switchMap((s) => (s.cue.prepared ? of(s) : EMPTY)),
shareReplay(1)
);
const withRemoteTags$ = withMeta$.pipe(
switchMap((s) => {
const tagSystem = s.tag;
const seekSystem = s.seek;
if (tagSystem.prepared) {
return EMPTY;
}
const remoteTagsTagStartOffset =
seekSystem.seekOffsetBySeekId(SEEK_ID_KAX_TAGS);
if (remoteTagsTagStartOffset! >= 0) {
return createRangedEbmlStream({
...options,
url,
byteStart: remoteTagsTagStartOffset,
}).pipe(
switchMap((req) => req.ebml$),
filter(isTagIdPos(EbmlTagIdEnum.Tags, EbmlTagPosition.End)),
withLatestFrom(withMeta$),
map(([tags, withMeta]) => {
withMeta.tag.prepareTagsWithTag(tags);
return withMeta;
})
);
}
return EMPTY;
}),
take(1),
shareReplay(1)
);
const withLocalTags$ = withMeta$.pipe(
switchMap((s) => (s.tag.prepared ? of(s) : EMPTY)),
shareReplay(1)
);
const withCues$ = merge(withLocalCues$, withRemoteCues$).pipe(
take(1)
);
const withoutCues$ = withCues$.pipe(
isEmpty(),
switchMap((empty) => (empty ? withMeta$ : EMPTY))
);
const withTags$ = merge(withLocalTags$, withRemoteTags$).pipe(
take(1)
);
const withoutTags$ = withTags$.pipe(
isEmpty(),
switchMap((empty) => (empty ? withMeta$ : EMPTY))
);
const seekWithoutCues = (
seekTime: number
): Observable<SegmentComponent<ClusterType>> => {
const request$ = withMeta$.pipe(
switchMap(() =>
createRangedEbmlStream({
...options,
url,
byteStart: seekSystem.firstClusterOffset,
})
)
);
const cluster$ = request$.pipe(
switchMap((req) => req.ebml$),
filter(isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.End)),
map((tag) => clusterSystem.addClusterWithTag(tag))
);
if (seekTime === 0) {
return cluster$;
}
return cluster$.pipe(
scan(
(acc, curr) => {
// avoid object recreation
acc.prev = acc.next;
acc.next = curr;
return acc;
},
{
prev: undefined as SegmentComponent<ClusterType> | undefined,
next: undefined as SegmentComponent<ClusterType> | undefined,
}
),
filter((c) => c.next?.Timestamp! > seekTime),
map((c) => c.prev ?? c.next!)
);
};
const seekWithCues = (
cueSystem: CueSystem,
seekTime: number
): Observable<SegmentComponent<ClusterType>> => {
if (seekTime === 0) {
return seekWithoutCues(seekTime);
}
const cuePoint = cueSystem.findClosestCue(seekTime);
if (!cuePoint) {
return seekWithoutCues(seekTime);
}
return createRangedEbmlStream({
...options,
url,
byteStart: seekSystem.offsetFromSeekPosition(
cueSystem.getCueTrackPositions(cuePoint)
.CueClusterPosition as number
),
}).pipe(
switchMap((req) => req.ebml$),
filter(isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.End)),
map(clusterSystem.addClusterWithTag.bind(clusterSystem))
);
};
const seek = (
seekTime: number
): Observable<SegmentComponent<ClusterType>> => {
if (seekTime === 0) {
const subscription = merge(withCues$, withoutCues$).subscribe();
// if seekTime equals to 0 at start, reuse the initialize stream
return seekWithoutCues(seekTime).pipe(
finalize(() => {
subscription.unsubscribe();
})
);
}
return merge(
withCues$.pipe(switchMap((s) => seekWithCues(s.cue, seekTime))),
withoutCues$.pipe(switchMap((_) => seekWithoutCues(seekTime)))
);
};
return {
startTag,
head$,
segment,
meta$,
withMeta$,
withCues$,
withoutCues$,
withTags$,
withoutTags$,
seekWithCues,
seekWithoutCues,
seek,
};
})
);
return {
segments$,
head$,
totalSize,
ebml$,
controller,
response,
};
}),
shareReplay(1)
);
return {
controller$,
request$: metaRequest$,
};
}

View File

@ -2,8 +2,11 @@ import { type, match } from 'arktype';
import { EbmlTagIdEnum, EbmlSimpleBlockTag, EbmlBlockTag } from 'konoebml';
export const BinarySchema = type.instanceOf(Uint8Array);
export type BinaryType = typeof BinarySchema.infer;
export const SimpleBlockSchema = type.instanceOf(EbmlSimpleBlockTag);
export const BlockSchema = type.instanceOf(EbmlBlockTag);
export type SimpleBlockType = typeof SimpleBlockSchema.infer;
export type BlockType = typeof BlockSchema.infer;
export const DocTypeExtensionSchema = type({
DocTypeExtensionName: type.string,

View File

@ -1,6 +1,65 @@
import type {EbmlClusterTagType} from "konoebml";
import {ClusterSchema, type ClusterType} from "../schema";
import {type SegmentComponent, SegmentComponentSystemTrait} from "./segment";
import type { EbmlClusterTagType } from 'konoebml';
import {
ClusterSchema,
type SimpleBlockType,
type ClusterType,
type BlockGroupType,
type TrackEntryType,
} from '../schema';
import { type SegmentComponent, SegmentComponentSystemTrait } from './segment';
export abstract class BlockViewTrait {
abstract get keyframe(): boolean;
abstract get frames(): Uint8Array[];
abstract get trackNum(): number | bigint;
abstract get relTime(): number;
}
export class SimpleBlockView extends BlockViewTrait {
constructor(public readonly block: SimpleBlockType) {
super();
}
get keyframe() {
return !!this.block.keyframe;
}
get frames(): Uint8Array<ArrayBufferLike>[] {
return this.block.frames;
}
get trackNum() {
return this.block.track;
}
get relTime() {
return this.block.value;
}
}
export class BlockGroupView extends BlockViewTrait {
constructor(public readonly block: BlockGroupType) {
super();
}
get keyframe() {
return !this.block.ReferenceBlock;
}
get frames(): Uint8Array<ArrayBufferLike>[] {
return this.block.Block.frames;
}
get trackNum() {
return this.block.Block.track;
}
get relTime() {
return this.block.Block.value;
}
}
export class ClusterSystem extends SegmentComponentSystemTrait<
EbmlClusterTagType,
@ -14,7 +73,27 @@ export class ClusterSystem extends SegmentComponentSystemTrait<
addClusterWithTag(tag: EbmlClusterTagType) {
const cluster = this.componentFromTag(tag);
this.clustersBuffer.push(cluster);
// this.clustersBuffer.push(cluster);
return cluster;
}
}
*enumerateBlocks(
cluster: ClusterType,
track: TrackEntryType
): Generator<BlockViewTrait> {
if (cluster.SimpleBlock) {
for (const block of cluster.SimpleBlock) {
if (block.track === track.TrackNumber) {
yield new SimpleBlockView(block);
}
}
}
if (cluster.BlockGroup) {
for (const block of cluster.BlockGroup) {
if (block.Block.track === track.TrackNumber) {
yield new BlockGroupView(block);
}
}
}
}
}

View File

@ -1,44 +1,69 @@
import {ParseCodecErrors, UnsupportedCodecError} from "@konoplayer/core/errors.ts";
import {
ParseCodecErrors,
UnsupportedCodecError,
} from '@konoplayer/core/errors.ts';
import {
EbmlTagIdEnum,
type EbmlTrackEntryTagType,
type EbmlTracksTagType
} from "konoebml";
type EbmlTracksTagType,
} from 'konoebml';
import {
audioCodecIdToWebCodecs,
videoCodecIdRequirePeekingKeyframe,
videoCodecIdToWebCodecs, type AudioDecoderConfigExt, type VideoDecoderConfigExt
} from "../codecs";
import {TrackEntrySchema, type TrackEntryType, TrackTypeRestrictionEnum} from "../schema";
import {type SegmentComponent, SegmentComponentSystemTrait} from "./segment";
videoCodecIdToWebCodecs,
type AudioDecoderConfigExt,
type VideoDecoderConfigExt,
} from '../codecs';
import {
TrackEntrySchema,
type TrackEntryType,
TrackTypeRestrictionEnum,
} from '../schema';
import { type SegmentComponent, SegmentComponentSystemTrait } from './segment';
export interface GetTrackEntryOptions {
priority?: (v: SegmentComponent<TrackEntryType>) => number;
predicate?: (v: SegmentComponent<TrackEntryType>) => boolean;
predicate: (v: SegmentComponent<TrackEntryType>) => boolean;
}
export abstract class TrackContext {
peekingKeyframe?: Uint8Array;
trackEntry: TrackEntryType
trackEntry: TrackEntryType;
timecodeScale: number;
lastBlockTimestamp = Number.NaN;
averageBlockDuration = Number.NaN;
constructor(trackEntry: TrackEntryType) {
constructor(trackEntry: TrackEntryType, timecodeScale: number) {
this.trackEntry = trackEntry;
this.timecodeScale = timecodeScale;
}
peekKeyframe (payload: Uint8Array) {
peekKeyframe(payload: Uint8Array) {
this.peekingKeyframe = payload;
}
preparedToConfigure () {
preparedToConfigure() {
if (this.requirePeekKeyframe()) {
return !!this.peekingKeyframe;
}
return true;
}
abstract requirePeekKeyframe (): boolean;
abstract requirePeekKeyframe(): boolean;
abstract buildConfiguration (): Promise<void>;
abstract buildConfiguration(): Promise<void>;
predictBlockDuration(blockTimestamp: number): number {
if (this.trackEntry.DefaultDuration) {
return Number(this.trackEntry.DefaultDuration);
}
const delta = blockTimestamp - this.lastBlockTimestamp;
this.lastBlockTimestamp = blockTimestamp;
this.averageBlockDuration = this.averageBlockDuration
? this.averageBlockDuration * 0.5 + delta * 0.5
: delta;
return this.averageBlockDuration;
}
}
export class DefaultTrackContext extends TrackContext {
@ -46,18 +71,22 @@ export class DefaultTrackContext extends TrackContext {
return false;
}
// biome-ignore lint/suspicious/noEmptyBlockStatements: <explanation>
override async buildConfiguration(): Promise<void> {}
}
export class VideoTrackContext extends TrackContext {
configuration!: VideoDecoderConfigExt;
override requirePeekKeyframe (): boolean {
override requirePeekKeyframe(): boolean {
return videoCodecIdRequirePeekingKeyframe(this.trackEntry.CodecID);
}
async buildConfiguration () {
const configuration = videoCodecIdToWebCodecs(this.trackEntry, this.peekingKeyframe);
async buildConfiguration() {
const configuration = videoCodecIdToWebCodecs(
this.trackEntry,
this.peekingKeyframe
);
if (await VideoDecoder.isConfigSupported(configuration)) {
throw new UnsupportedCodecError(configuration.codec, 'video decoder');
}
@ -68,21 +97,50 @@ export class VideoTrackContext extends TrackContext {
export class AudioTrackContext extends TrackContext {
configuration!: AudioDecoderConfigExt;
override requirePeekKeyframe (): boolean {
override requirePeekKeyframe(): boolean {
return videoCodecIdRequirePeekingKeyframe(this.trackEntry.CodecID);
}
async buildConfiguration () {
const configuration = audioCodecIdToWebCodecs(this.trackEntry, this.peekingKeyframe);
async buildConfiguration() {
const configuration = audioCodecIdToWebCodecs(
this.trackEntry,
this.peekingKeyframe
);
if (await AudioDecoder.isConfigSupported(configuration)) {
throw new UnsupportedCodecError(configuration.codec, 'audio decoder');
}
this.configuration = configuration;
}
override predictBlockDuration(blockTimestamp: number): number {
if (this.trackEntry.DefaultDuration) {
return Number(this.trackEntry.DefaultDuration);
}
if (this.configuration.samplesPerFrame) {
return (
Number(
this.configuration.samplesPerFrame / this.configuration.sampleRate
) *
(1_000_000_000 / Number(this.timecodeScale))
);
}
const delta = blockTimestamp - this.lastBlockTimestamp;
this.lastBlockTimestamp = blockTimestamp;
this.averageBlockDuration = this.averageBlockDuration
? this.averageBlockDuration * 0.5 + delta * 0.5
: delta;
return this.averageBlockDuration;
}
}
export function standardTrackPredicate(track: TrackEntryType) {
return track.FlagEnabled !== 0;
}
export function standardTrackPriority(track: TrackEntryType) {
return (Number(!!track.FlagForced) << 8) + (Number(!!track.FlagDefault) << 4);
}
export class TrackSystem extends SegmentComponentSystemTrait<
EbmlTrackEntryTagType,
@ -96,37 +154,45 @@ export class TrackSystem extends SegmentComponentSystemTrait<
trackContexts: Map<number | bigint, TrackContext> = new Map();
getTrackEntry({
priority = (track) =>
(Number(!!track.FlagForced) << 4) + Number(!!track.FlagDefault),
predicate = (track) => track.FlagEnabled !== 0,
}: GetTrackEntryOptions) {
priority = standardTrackPriority,
predicate,
}: GetTrackEntryOptions) {
return this.tracks
.filter(predicate)
.toSorted((a, b) => priority(b) - priority(a))
.at(0);
}
getTrackContext <T extends TrackContext>(options: GetTrackEntryOptions): T | undefined {
getTrackContext<T extends TrackContext>(
options: GetTrackEntryOptions
): T | undefined {
const trackEntry = this.getTrackEntry(options);
const trackNum = trackEntry?.TrackNumber!;
return this.trackContexts.get(trackNum) as T | undefined;
}
prepareTracksWithTag(tag: EbmlTracksTagType) {
const infoSystem = this.segment.info;
this.tracks = tag.children
.filter((c) => c.id === EbmlTagIdEnum.TrackEntry)
.map((c) => this.componentFromTag(c));
for (const track of this.tracks) {
if (track.TrackType === TrackTypeRestrictionEnum.VIDEO) {
this.trackContexts.set(track.TrackNumber, new VideoTrackContext(track))
this.trackContexts.set(
track.TrackNumber,
new VideoTrackContext(track, Number(infoSystem.info.TimestampScale))
);
} else if (track.TrackType === TrackTypeRestrictionEnum.AUDIO) {
this.trackContexts.set(track.TrackNumber, new AudioTrackContext(track))
this.trackContexts.set(
track.TrackNumber,
new AudioTrackContext(track, Number(infoSystem.info.TimestampScale))
);
}
}
return this;
}
async buildTracksConfiguration () {
async buildTracksConfiguration() {
const parseErrors = new ParseCodecErrors();
for (const context of this.trackContexts.values()) {
@ -141,15 +207,15 @@ export class TrackSystem extends SegmentComponentSystemTrait<
}
}
tryPeekKeyframe (tag: { track: number | bigint, frames: Uint8Array[] }) {
tryPeekKeyframe(tag: { track: number | bigint; frames: Uint8Array[] }) {
for (const c of this.trackContexts.values()) {
if (c.trackEntry.TrackNumber === tag.track) {
c.peekKeyframe(tag.frames?.[0])
c.peekKeyframe(tag.frames?.[0]);
}
}
}
preparedToConfigureTracks (): boolean {
preparedToConfigureTracks(): boolean {
for (const c of this.trackContexts.values()) {
if (!c.preparedToConfigure()) {
return false;
@ -157,4 +223,4 @@ export class TrackSystem extends SegmentComponentSystemTrait<
}
return true;
}
}
}

8
pnpm-lock.yaml generated
View File

@ -36,6 +36,9 @@ importers:
'@types/node':
specifier: ^22.13.11
version: 22.13.11
'@webgpu/types':
specifier: ^0.1.59
version: 0.1.59
change-case:
specifier: ^5.4.4
version: 5.4.4
@ -1151,6 +1154,9 @@ packages:
'@webassemblyjs/wast-printer@1.14.1':
resolution: {integrity: sha512-kPSSXE6De1XOR820C90RIo2ogvZG+c3KiHzqUoO/F34Y2shGzesfqv7o57xrxovZJH/MetF5UjroJ/R/3isoiw==}
'@webgpu/types@0.1.59':
resolution: {integrity: sha512-jZJ6ipNli+rn++/GAPqsZXfsgjx951wlCW7vNAg+oGdp0ZYidTOkbVTVeK2frzowuD5ch7MRz7leOEX1PMv43A==}
'@xhmikosr/archive-type@7.0.0':
resolution: {integrity: sha512-sIm84ZneCOJuiy3PpWR5bxkx3HaNt1pqaN+vncUBZIlPZCq8ASZH+hBVdu5H8znR7qYC6sKwx+ie2Q7qztJTxA==}
engines: {node: ^14.14.0 || >=16.0.0}
@ -4015,6 +4021,8 @@ snapshots:
'@webassemblyjs/ast': 1.14.1
'@xtuc/long': 4.2.2
'@webgpu/types@0.1.59': {}
'@xhmikosr/archive-type@7.0.0':
dependencies:
file-type: 19.6.0

View File

@ -342,10 +342,15 @@ function generateMkvSchemaHierarchy(elements_: EbmlElementType[]) {
const idMulti = new Set<string>();
const preDefs = [
'export const BinarySchema = type.instanceOf(Uint8Array);',
'export type BinaryType = typeof BinarySchema.infer;',
...Object.entries(AdHocType).map(
([name, meta]) =>
`export const ${meta.primitive()} = type.instanceOf(Ebml${name}Tag);`
),
...Object.entries(AdHocType).map(
([name, meta]) =>
`export type ${name}Type = typeof ${meta.primitive()}.infer;`
),
];
const generateAssociated = (el: EbmlElementType): string | undefined => {

View File

@ -14,6 +14,10 @@
"DOM.AsyncIterable",
"DOM.Iterable"
],
"types": [
"@webgpu/types",
"@types/node"
],
"module": "ESNext",
"moduleDetection": "force",
"moduleResolution": "bundler",