feat: temp save

This commit is contained in:
master 2025-03-25 05:53:32 +08:00
parent 39a4cf2773
commit 3c317627e7
14 changed files with 980 additions and 548 deletions

View File

@ -15,18 +15,18 @@ import {
take,
tap,
distinctUntilChanged,
fromEvent,
filter,
fromEvent, withLatestFrom, share, delay, delayWhen, from, of,
} from 'rxjs';
import { createEbmlController } from '@konoplayer/matroska/reactive';
import {
TrackTypeRestrictionEnum,
type ClusterType,
} from '@konoplayer/matroska/schema';
import type { SegmentComponent } from '@konoplayer/matroska/model';
import { createRef, ref, type Ref } from 'lit/directives/ref.js';
import { Queue } from 'mnemonist';
import type {SegmentComponent, AudioTrackContext, VideoTrackContext} from "@konoplayer/matroska/systems";
export class VideoPipelineDemo extends LitElement {
static styles = css``;
@ -43,6 +43,7 @@ export class VideoPipelineDemo extends LitElement {
audioContext = new AudioContext();
seek$ = new ReplaySubject<number>(1);
cluster$ = new Subject<SegmentComponent<ClusterType>>();
videoFrameBuffer$ = new BehaviorSubject(new Queue<VideoFrame>());
audioFrameBuffer$ = new BehaviorSubject(new Queue<AudioData>());
@ -62,37 +63,47 @@ export class VideoPipelineDemo extends LitElement {
url: src,
});
const segment$ = controller$.pipe(
const segmentContext$ = controller$.pipe(
switchMap(({ segments$ }) => segments$.pipe(take(1)))
);
const cluster$ = combineLatest({
seekTime: this.seek$,
segment: segment$,
}).pipe(switchMap(({ seekTime, segment }) => segment.seek(seekTime)));
const videoTrack$ = segmentContext$.pipe(
const decode$ = segment$.pipe(
)
const currentCluster$ = combineLatest({
seekTime: this.seek$,
segmentContext: segmentContext$,
}).pipe(
delayWhen(({ segmentContext: { segment } }) => from(segment.track.flushContexts())),
switchMap(({ seekTime, segmentContext }) => combineLatest({
segmentContext: of(segmentContext),
cluster: segmentContext.seek(seekTime),
})),
share()
);
const decodeVideo$ = currentCluster$.pipe(
)
const decode$ = segmentContext$.pipe(
switchMap(({ withMeta$ }) => withMeta$),
map((segment) => {
const trackSystem = segment.track;
const infoSystem = segment.info;
const tracks = {
video: trackSystem.getTrackEntry({
predicate: (c) =>
c.TrackType === TrackTypeRestrictionEnum.VIDEO &&
c.FlagEnabled !== 0,
}),
audio: trackSystem.getTrackEntry({
const videoTrack = trackSystem.getTrackContext<VideoTrackContext>({
predicate: (c) =>
c.TrackType === TrackTypeRestrictionEnum.VIDEO &&
c.FlagEnabled !== 0,
});
const audioTrack = trackSystem.getTrackContext({
predicate: (c) =>
c.TrackType === TrackTypeRestrictionEnum.AUDIO &&
c.FlagEnabled !== 0,
}),
subtitle: trackSystem.getTrackEntry({
predicate: (c) =>
c.TrackType === TrackTypeRestrictionEnum.SUBTITLE &&
c.FlagEnabled !== 0,
}),
};
});
const videoDecode$ = track
const videoDecode$ = tracks.video
? new Observable<VideoFrame>((subscriber) => {
@ -354,7 +365,7 @@ export class VideoPipelineDemo extends LitElement {
this.pipeline$$.add(video$.subscribe());
this.pipeline$$.add(addToVideoFrameBuffer$.subscribe());
this.pipeline$$.add(addToAudioFrameBuffer$.subscribe());
this.pipeline$$.add(cluster$.subscribe(this.cluster$));
this.pipeline$$.add(currentCluster$.subscribe(this.cluster$));
this.pipeline$$.add(
fromEvent(document.body, 'click').subscribe(() => {
this.audioContext.resume();

View File

@ -0,0 +1,356 @@
import {
BehaviorSubject,
distinctUntilChanged,
filter,
interval,
map,
merge, Observable,
Subject,
type Subscription, switchMap, takeUntil, tap,
} from 'rxjs';
import {NetworkState, ReadyState} from "./state.ts";
export interface Metadata {
duration: number
}
export abstract class VideoElementTrait {
private playbackTimer: Subscription | undefined;
_src$ = new BehaviorSubject<string>('');
_currentTime$ = new BehaviorSubject<number>(0);
_duration$ = new BehaviorSubject<number>(Number.NaN);
_paused$ = new BehaviorSubject<boolean>(true);
_ended$ = new BehaviorSubject<boolean>(false);
_volume$ = new BehaviorSubject<number>(1.0);
_muted$ = new BehaviorSubject<boolean>(false);
_playbackRate$ = new BehaviorSubject<number>(1.0);
_readyState$ = new BehaviorSubject<number>(0); // HAVE_NOTHING
_networkState$ = new BehaviorSubject<number>(0); // NETWORK_EMPTY
_width$ = new BehaviorSubject<number>(0);
_height$ = new BehaviorSubject<number>(0);
_videoWidth$ = new BehaviorSubject<number>(0); // 只读,视频内在宽度
_videoHeight$ = new BehaviorSubject<number>(0); // 只读,视频内在高度
_poster$ = new BehaviorSubject<string>('');
_destroyRef$ = new Subject<void>();
_progress$ = new Subject<Event>();
_error$ = new Subject<Event>();
_abort$ = new Subject<Event>();
_emptied$ = new Subject<Event>();
_stalled$ = new Subject<Event>();
_loadeddata$ = new Subject<Event>();
_playing$ = new Subject<Event>();
_waiting$ = new Subject<Event>();
_seeked$ = new Subject<Event>();
_timeupdate$ = new Subject<Event>();
_play$ = new Subject<Event>();
_resize$ = new Subject<Event>();
_setCurrentTime$ = new Subject<number>();
_setSrc$ = new Subject<string>();
_callLoadMetadataStart$ = new Subject<void>();
_callLoadMetadataEnd$ = new Subject<Metadata>();
_callLoadDataStart$ = new Subject<Metadata>();
_callLoadDataEnd$ = new Subject<void>();
protected constructor() {
this._setCurrentTime$.pipe(
takeUntil(this._destroyRef$))
.subscribe(this._currentTime$)
this.seeking$.pipe(
takeUntil(this._destroyRef$),
switchMap(() => this._seek()),
map(() => new Event("seeked"))
).subscribe(this._seeked$)
this._setSrc$.pipe(
takeUntil(this._destroyRef$),
).subscribe(this._src$)
this._setSrc$.pipe(
takeUntil(this._destroyRef$),
switchMap(() => this._load())
).subscribe();
this._readyState$.pipe(
takeUntil(this._destroyRef$),
filter((r) => r === ReadyState.HAVE_NOTHING),
map(() => 0)
).subscribe(this._currentTime$);
this._readyState$.pipe(
takeUntil(this._destroyRef$),
filter((r) => r === ReadyState.HAVE_NOTHING),
map(() => true),
).subscribe(this._paused$);
this._readyState$.pipe(
takeUntil(this._destroyRef$),
filter((r) => r === ReadyState.HAVE_NOTHING),
map(() => false)
).subscribe(this._ended$)
this._callLoadMetadataStart$.pipe(
takeUntil(this._destroyRef$),
map(() => NetworkState.NETWORK_LOADING)
).subscribe(
this._networkState$
);
this._callLoadDataEnd$.pipe(
takeUntil(this._destroyRef$),
map(() => NetworkState.NETWORK_IDLE)
).subscribe(this._networkState$);
this._callLoadMetadataEnd$.pipe(
takeUntil(this._destroyRef$),
map(() => ReadyState.HAVE_METADATA)
).subscribe(this._readyState$)
this._callLoadMetadataEnd$.pipe(
takeUntil(this._destroyRef$),
map(meta => meta.duration)
).subscribe(this._duration$);
this._callLoadDataEnd$.pipe(
takeUntil(this._destroyRef$),
map(() => ReadyState.HAVE_CURRENT_DATA)
).subscribe(this._readyState$);
}
get canplay$ () {
return this._readyState$.pipe(
filter((s) => {
return s >= ReadyState.HAVE_CURRENT_DATA
}),
distinctUntilChanged(),
map(() => new Event('canplay')),
)
}
get canplaythrough$ () {
return this._readyState$.pipe(
filter((s) => s >= ReadyState.HAVE_ENOUGH_DATA),
distinctUntilChanged(),
map(() => new Event('canplaythrough')),
)
}
get seeked$ () {
return this._seeked$.asObservable();
}
get loadstart$() {
return this._readyState$.pipe(
filter((s) => s === ReadyState.HAVE_ENOUGH_DATA),
distinctUntilChanged(),
map(() => new Event('loadstart'))
)
}
get loadedmetadata$() {
return this._readyState$.pipe(
filter((r) => r >= ReadyState.HAVE_METADATA),
distinctUntilChanged(),
map(() => new Event('loadedmetadata'))
);
}
get pause$() {
return this._paused$.pipe(
distinctUntilChanged(),
filter(s => s),
map(() => new Event('pause'))
)
}
get volumechange$() {
return merge(
this._volume$,
this._muted$,
).pipe(
map(() => new Event('volumechange'))
)
}
get ratechange$() {
return this._playbackRate$.pipe(
map(() => new Event('ratechange'))
)
}
get durationchange$() {
return this._duration$.pipe(
map(() => new Event('durationchange'))
)
}
get ended$() {
return this._ended$.pipe(
distinctUntilChanged(),
filter(s => s),
map(() => new Event('ended'))
)
}
get seeking$() {
return this._setCurrentTime$.pipe(
map(() => new Event('seeking'))
)
}
// 属性 getter/setter
get src(): string {
return this._src$.value;
}
set src(value: string) {
this._setSrc$.next(value);
}
get currentTime(): number {
return this._currentTime$.value;
}
set currentTime(value: number) {
if (value < 0 || value > this.duration) {
return
}
this._setCurrentTime$.next(
value
)
this._seeked$.next(new Event('seeked'));
this._timeupdate$.next(new Event('timeupdate'));
}
get duration(): number {
return this._duration$.value;
}
get paused(): boolean {
return this._paused$.value;
}
get ended(): boolean {
return this._ended$.value;
}
get volume(): number {
return this._volume$.value;
}
set volume(value: number) {
if (value < 0 || value > 1) {
return
}
this._volume$.next(value);
}
get muted(): boolean {
return this._muted$.value;
}
set muted(value: boolean) {
this._muted$.next(value);
}
get playbackRate(): number {
return this._playbackRate$.value;
}
set playbackRate(value: number) {
if (value <= 0) {
return;
}
this._playbackRate$.next(value);
}
get readyState(): number {
return this._readyState$.value;
}
get networkState(): number {
return this._networkState$.value;
}
load(): void {
this._load()
}
// 方法
_load(): Observable<void> {
this._callLoadMetadataStart$.next(undefined);
return this._loadMetadata()
.pipe(
tap((metadata) => this._callLoadMetadataEnd$.next(metadata)),
tap((metadata) => this._callLoadDataStart$.next(metadata)),
switchMap((metadata) => this._loadData(metadata)),
tap(() => this._callLoadDataEnd$)
)
}
play(): Promise<void> {
if (!this._paused$.value) {
return Promise.resolve()
}
if (this._readyState$.value < ReadyState.HAVE_FUTURE_DATA) {
this._waiting$.next(new Event('waiting'));
return Promise.reject(new Error('Not enough data'));
}
this._paused$.next(false);
this._play$.next(new Event('play'));
this._playing$.next(new Event('playing'));
// 模拟播放进度
this.playbackTimer = this._playbackRate$.pipe(
switchMap(playbackRate => interval(1000 / playbackRate)),
takeUntil(
merge(
this._paused$,
this._destroyRef$,
this._ended$
)
)
).subscribe(() => {
const newTime = this.currentTime + 1;
if (newTime >= this.duration) {
this._currentTime$.next(this.duration);
this._paused$.next(true);
this._ended$.next(true);
} else {
this._currentTime$.next(newTime);
this._timeupdate$.next(new Event('timeupdate'));
}
});
return Promise.resolve();
}
pause(): void {
if (this._paused$.value) {
return;
}
this._paused$.next(true);
}
canPlayType(type: string): string {
// 简化的实现,实际需要根据 MIME 类型检查支持情况
return type.includes('video/mp4') ? 'probably' : '';
}
addTextTrack(kind: string, label: string, language: string): void {
// 实现文本轨道逻辑(此处简化为占位符)
console.log(`Added text track: ${kind}, ${label}, ${language}`);
}
abstract _seek (): Observable<void>
abstract _loadMetadata (): Observable<Metadata>
abstract _loadData(metadata: Metadata): Observable<void>
[Symbol.dispose]() {
this._destroyRef$.next(undefined)
}
}

View File

@ -0,0 +1,14 @@
export enum NetworkState {
NETWORK_EMPTY = 0,
NETWORK_IDLE = 1,
NETWORK_LOADING = 2,
NETWORK_NO_SOURCE = 3,
}
export enum ReadyState {
HAVE_NOTHING = 0,
HAVE_METADATA = 1,
HAVE_CURRENT_DATA = 2,
HAVE_FUTURE_DATA = 3,
HAVE_ENOUGH_DATA = 4
}

View File

@ -1,4 +1,4 @@
import { UnsupportedCodecError } from '@konoplayer/core/errors';
import {ParseCodecError, UnsupportedCodecError} from '@konoplayer/core/errors';
import { VideoCodec, AudioCodec } from '@konoplayer/core/codecs';
import type { TrackEntryType } from '../schema';
import {
@ -19,7 +19,7 @@ import {
} from './hevc.ts';
import {
genCodecStringByVP9DecoderConfigurationRecord,
parseVP9DecoderConfigurationRecord,
parseVP9DecoderConfigurationRecord, VP9_CODEC_TYPE,
} from './vp9.ts';
export const VideoCodecId = {
@ -122,9 +122,13 @@ export interface VideoDecoderConfigExt extends VideoDecoderConfig {
codecType: VideoCodec;
}
export function videoCodecIdRequirePeekingKeyframe(codecId: VideoCodecIdType) {
return codecId === VideoCodecId.VP9
}
export function videoCodecIdToWebCodecs(
track: TrackEntryType,
keyframe: Uint8Array
keyframe: Uint8Array | undefined
): VideoDecoderConfigExt {
const codecId = track.CodecID;
const codecPrivate = track.CodecPrivate;
@ -141,6 +145,9 @@ export function videoCodecIdToWebCodecs(
),
};
case VideoCodecId.VP9:
if (!keyframe) {
throw new ParseCodecError(VP9_CODEC_TYPE, 'keyframe is required to parse VP9 codec')
}
return {
...shareOptions,
codecType: VideoCodec.VP9,
@ -195,8 +202,15 @@ export interface AudioDecoderConfigExt extends AudioDecoderConfig {
codecType: AudioCodec;
}
export function isAudioCodecIdRequirePeekingKeyframe (
_track: TrackEntryType,
) {
return false;
}
export function audioCodecIdToWebCodecs(
track: TrackEntryType
track: TrackEntryType,
_keyframe: Uint8Array | undefined
): AudioDecoderConfigExt {
const codecId = track.CodecID;
const codecPrivate = track.CodecPrivate;

View File

@ -1,469 +0,0 @@
import {
type EbmlClusterTagType,
type EbmlCuePointTagType,
type EbmlCuesTagType,
type EbmlInfoTagType,
type EbmlMasterTagType,
type EbmlSeekHeadTagType,
type EbmlSegmentTagType,
EbmlTagIdEnum,
EbmlTagPosition,
type EbmlTagsTagType,
type EbmlTagTagType,
type EbmlTagType,
type EbmlTrackEntryTagType,
type EbmlTracksTagType,
} from 'konoebml';
import { convertEbmlTagToComponent, type InferType } from './util';
import { isEqual, maxBy } from 'lodash-es';
import { ArkErrors, type Type } from 'arktype';
import {
ClusterSchema,
type ClusterType,
CuePointSchema,
type CuePointType,
type CueTrackPositionsType,
InfoSchema,
type InfoType,
SeekHeadSchema,
type SeekHeadType,
TagSchema,
type TagType,
TrackEntrySchema,
type TrackEntryType,
TrackTypeRestrictionEnum,
} from './schema';
import { concatBufs } from 'konoebml/lib/tools';
import {
ParseCodecErrors,
UnreachableOrLogicError,
UnsupportedCodecError,
} from '@konoplayer/core/errors';
import { audioCodecIdToWebCodecs, videoCodecIdToWebCodecs } from './codecs';
import { Queue } from 'mnemonist';
import { BehaviorSubject } from 'rxjs';
export const SEEK_ID_KAX_INFO = new Uint8Array([0x15, 0x49, 0xa9, 0x66]);
export const SEEK_ID_KAX_TRACKS = new Uint8Array([0x16, 0x54, 0xae, 0x6b]);
export const SEEK_ID_KAX_CUES = new Uint8Array([0x1c, 0x53, 0xbb, 0x6b]);
export const SEEK_ID_KAX_TAGS = new Uint8Array([0x12, 0x54, 0xc3, 0x67]);
export class SegmentSystem {
startTag: EbmlSegmentTagType;
headTags: EbmlTagType[] = [];
firstCluster: EbmlClusterTagType | undefined;
cue: CueSystem;
cluster: ClusterSystem;
seek: SeekSystem;
info: InfoSystem;
track: TrackSystem;
tag: TagSystem;
constructor(startNode: EbmlSegmentTagType) {
this.startTag = startNode;
this.cue = new CueSystem(this);
this.cluster = new ClusterSystem(this);
this.seek = new SeekSystem(this);
this.info = new InfoSystem(this);
this.track = new TrackSystem(this);
this.tag = new TagSystem(this);
}
get contentStartOffset() {
return this.startTag.startOffset + this.startTag.headerLength;
}
private seekLocal() {
const infoTag = this.seek.seekTagBySeekId(SEEK_ID_KAX_INFO);
const tracksTag = this.seek.seekTagBySeekId(SEEK_ID_KAX_TRACKS);
const cuesTag = this.seek.seekTagBySeekId(SEEK_ID_KAX_CUES);
const tagsTag = this.seek.seekTagBySeekId(SEEK_ID_KAX_TAGS);
if (cuesTag?.id === EbmlTagIdEnum.Cues) {
this.cue.prepareCuesWithTag(cuesTag);
}
if (infoTag?.id === EbmlTagIdEnum.Info) {
this.info.prepareWithInfoTag(infoTag);
}
if (tracksTag?.id === EbmlTagIdEnum.Tracks) {
this.track.prepareTracksWithTag(tracksTag);
}
if (tagsTag?.id === EbmlTagIdEnum.Tags) {
this.tag.prepareTagsWithTag(tagsTag);
}
}
scanMeta(tag: EbmlTagType) {
if (
tag.id === EbmlTagIdEnum.SeekHead &&
tag.position === EbmlTagPosition.End
) {
this.seek.addSeekHeadTag(tag);
}
this.headTags.push(tag);
this.seek.memoTag(tag);
if (tag.id === EbmlTagIdEnum.Cluster && !this.firstCluster) {
this.firstCluster = tag;
this.seekLocal();
}
return this;
}
async completeMeta() {
this.seekLocal();
await this.parseCodes();
return this;
}
async parseCodes() {
const candidates = this.track.tracks.filter(
(c) =>
c.TrackType === TrackTypeRestrictionEnum.AUDIO ||
c.TrackType === TrackTypeRestrictionEnum.VIDEO
);
const parseErrors = new ParseCodecErrors();
for (const t of candidates) {
try {
await this.track.initTrack(t, this.);
} catch (e) {
parseErrors.cause.push(e as Error);
}
}
if (parseErrors.cause.length > 0) {
console.error(parseErrors);
}
}
}
export type SegmentComponent<T> = T & {
get segment(): SegmentSystem;
};
export function withSegment<T extends object>(
component: T,
segment: SegmentSystem
): SegmentComponent<T> {
const component_ = component as T & { segment: SegmentSystem };
component_.segment = segment;
return component_;
}
export class SegmentComponentSystemTrait<
E extends EbmlMasterTagType,
S extends Type<any>,
> {
segment: SegmentSystem;
get schema(): S {
throw new Error('unimplemented!');
}
constructor(segment: SegmentSystem) {
this.segment = segment;
}
componentFromTag(tag: E): SegmentComponent<InferType<S>> {
const extracted = convertEbmlTagToComponent(tag);
const result = this.schema(extracted) as
| (InferType<S> & { segment: SegmentSystem })
| ArkErrors;
if (result instanceof ArkErrors) {
const errors = result;
console.error(
'Parse component from tag error:',
tag.toDebugRecord(),
errors.flatProblemsByPath
);
throw errors;
}
result.segment = this.segment;
return result;
}
}
export class SeekSystem extends SegmentComponentSystemTrait<
EbmlSeekHeadTagType,
typeof SeekHeadSchema
> {
override get schema() {
return SeekHeadSchema;
}
seekHeads: SeekHeadType[] = [];
private offsetToTagMemo: Map<number, EbmlTagType> = new Map();
memoTag(tag: EbmlTagType) {
this.offsetToTagMemo.set(tag.startOffset, tag);
}
addSeekHeadTag(tag: EbmlSeekHeadTagType) {
const seekHead = this.componentFromTag(tag);
this.seekHeads.push(seekHead);
return seekHead;
}
offsetFromSeekPosition(position: number): number {
return position + this.segment.contentStartOffset;
}
seekTagByStartOffset(
startOffset: number | undefined
): EbmlTagType | undefined {
return startOffset! >= 0
? this.offsetToTagMemo.get(startOffset!)
: undefined;
}
seekOffsetBySeekId(seekId: Uint8Array): number | undefined {
const seekPosition = this.seekHeads[0]?.Seek?.find((c) =>
isEqual(c.SeekID, seekId)
)?.SeekPosition;
return seekPosition! >= 0
? this.offsetFromSeekPosition(seekPosition! as number)
: undefined;
}
seekTagBySeekId(seekId: Uint8Array): EbmlTagType | undefined {
return this.seekTagByStartOffset(this.seekOffsetBySeekId(seekId));
}
get firstClusterOffset() {
if (!this.segment.firstCluster) {
throw new UnreachableOrLogicError('first cluster not found');
}
return this.segment.firstCluster.startOffset;
}
}
export class InfoSystem extends SegmentComponentSystemTrait<
EbmlInfoTagType,
typeof InfoSchema
> {
override get schema() {
return InfoSchema;
}
info!: SegmentComponent<InfoType>;
prepareWithInfoTag(tag: EbmlInfoTagType) {
this.info = this.componentFromTag(tag);
return this;
}
}
export class ClusterSystem extends SegmentComponentSystemTrait<
EbmlClusterTagType,
typeof ClusterSchema
> {
override get schema() {
return ClusterSchema;
}
clustersBuffer: SegmentComponent<ClusterType>[] = [];
addClusterWithTag(tag: EbmlClusterTagType) {
const cluster = this.componentFromTag(tag);
this.clustersBuffer.push(cluster);
return cluster;
}
}
export interface GetTrackEntryOptions {
priority?: (v: SegmentComponent<TrackEntryType>) => number;
predicate?: (v: SegmentComponent<TrackEntryType>) => boolean;
}
export interface TrackState<Decoder, Config, Frame> {
decoder: Decoder;
configuration?: Config;
frameBuffer$: BehaviorSubject<Queue<Frame>>;
}
export class TrackSystem extends SegmentComponentSystemTrait<
EbmlTrackEntryTagType,
typeof TrackEntrySchema
> {
override get schema() {
return TrackEntrySchema;
}
tracks: SegmentComponent<TrackEntryType>[] = [];
videoTrackState = new WeakMap<
TrackEntryType,
TrackState<VideoDecoder, VideoDecoderConfig, VideoFrame>
>();
audioTrackState = new WeakMap<
TrackEntryType,
TrackState<AudioDecoder, AudioDecoderConfig, AudioData>
>();
getTrackEntry({
priority = (track) =>
(Number(!!track.FlagForced) << 4) + Number(!!track.FlagDefault),
predicate = (track) => track.FlagEnabled !== 0,
}: GetTrackEntryOptions) {
return this.tracks
.filter(predicate)
.toSorted((a, b) => priority(b) - priority(a))
.at(0);
}
prepareTracksWithTag(tag: EbmlTracksTagType) {
this.tracks = tag.children
.filter((c) => c.id === EbmlTagIdEnum.TrackEntry)
.map((c) => this.componentFromTag(c));
return this;
}
async initTrack(track: TrackEntryType) {
if (track.TrackType === TrackTypeRestrictionEnum.AUDIO) {
const configuration = audioCodecIdToWebCodecs(track);
if (await AudioDecoder.isConfigSupported(configuration)) {
throw new UnsupportedCodecError(configuration.codec, 'audio decoder');
}
const queue$ = new BehaviorSubject(new Queue<AudioData>());
this.audioTrackState.set(track, {
configuration,
decoder: new AudioDecoder({
output: (audioData) => {
const queue = queue$.getValue();
queue.enqueue(audioData);
queue$.next(queue);
},
error: (e) => {
queue$.error(e);
},
}),
frameBuffer$: queue$,
});
} else if (track.TrackType === TrackTypeRestrictionEnum.VIDEO) {
const configuration = videoCodecIdToWebCodecs(track, this.keyframe);
if (await VideoDecoder.isConfigSupported(configuration)) {
throw new UnsupportedCodecError(configuration.codec, 'audio decoder');
}
const queue$ = new BehaviorSubject(new Queue<VideoFrame>());
this.videoTrackState.set(track, {
configuration,
decoder: new VideoDecoder({
output: (audioData) => {
const queue = queue$.getValue();
queue.enqueue(audioData);
queue$.next(queue);
},
error: (e) => {
queue$.error(e);
},
}),
frameBuffer$: queue$,
});
}
}
}
export class CueSystem extends SegmentComponentSystemTrait<
EbmlCuePointTagType,
typeof CuePointSchema
> {
override get schema() {
return CuePointSchema;
}
cues: SegmentComponent<CuePointType>[] = [];
prepareCuesWithTag(tag: EbmlCuesTagType) {
this.cues = tag.children
.filter((c) => c.id === EbmlTagIdEnum.CuePoint)
.map(this.componentFromTag.bind(this));
return this;
}
findClosestCue(seekTime: number): CuePointType | undefined {
const cues = this.cues;
if (!cues || cues.length === 0) {
return undefined;
}
let left = 0;
let right = cues.length - 1;
if (seekTime <= cues[0].CueTime) {
return cues[0];
}
if (seekTime >= cues[right].CueTime) {
return cues[right];
}
while (left <= right) {
const mid = Math.floor((left + right) / 2);
if (cues[mid].CueTime === seekTime) {
return cues[mid];
}
if (cues[mid].CueTime < seekTime) {
left = mid + 1;
} else {
right = mid - 1;
}
}
const before = cues[right];
const after = cues[left];
return Math.abs((before.CueTime as number) - seekTime) <
Math.abs((after.CueTime as number) - seekTime)
? before
: after;
}
getCueTrackPositions(
cuePoint: CuePointType,
track?: number
): CueTrackPositionsType {
let cueTrackPositions: CueTrackPositionsType | undefined;
if (track! >= 0) {
cueTrackPositions = cuePoint.CueTrackPositions.find(
(c) => c.CueTrack === track
);
}
if (!cueTrackPositions) {
cueTrackPositions = maxBy(
cuePoint.CueTrackPositions,
(c) => c.CueClusterPosition
)!;
}
return cueTrackPositions;
}
get prepared(): boolean {
return this.cues.length > 0;
}
}
export class TagSystem extends SegmentComponentSystemTrait<
EbmlTagTagType,
typeof TagSchema
> {
override get schema() {
return TagSchema;
}
tags: SegmentComponent<TagType>[] = [];
prepareTagsWithTag(tag: EbmlTagsTagType) {
this.tags = tag.children
.filter((c) => c.id === EbmlTagIdEnum.Tag)
.map((c) => this.componentFromTag(c));
return this;
}
get prepared(): boolean {
return this.tags.length > 0;
}
}

View File

@ -10,12 +10,11 @@ import {
filter,
finalize,
from,
isEmpty,
isEmpty, last,
map,
merge,
Observable,
of,
reduce,
scan,
share,
shareReplay,
@ -28,51 +27,33 @@ import {
createRangedStream,
type CreateRangedStreamOptions,
} from '@konoplayer/core/data';
import {
type CueSystem,
SEEK_ID_KAX_CUES,
SEEK_ID_KAX_TAGS,
type SegmentComponent,
SegmentSystem,
} from './model';
import { isTagIdPos, waitTick } from './util';
import type { ClusterType } from './schema';
import {SEEK_ID_KAX_CUES, SEEK_ID_KAX_TAGS, type CueSystem, type SegmentComponent, SegmentSystem} from "./systems";
export interface CreateRangedEbmlStreamOptions
extends CreateRangedStreamOptions {
tee?: boolean;
}
export function createRangedEbmlStream({
url,
byteStart = 0,
byteEnd,
tee = false,
}: CreateRangedEbmlStreamOptions): Observable<{
ebml$: Observable<EbmlTagType>;
totalSize?: number;
response: Response;
body: ReadableStream<Uint8Array>;
controller: AbortController;
teeBody: ReadableStream<Uint8Array> | undefined;
}> {
const stream$ = from(createRangedStream({ url, byteStart, byteEnd }));
return stream$.pipe(
switchMap(({ controller, body, totalSize, response }) => {
let requestCompleted = false;
let teeStream: ReadableStream<Uint8Array> | undefined;
let stream: ReadableStream<Uint8Array>;
if (tee) {
[stream, teeStream] = body.tee();
} else {
stream = body;
}
const originRequest$ = new Observable<EbmlTagType>((subscriber) => {
stream
body
.pipeThrough(
new EbmlStreamDecoder({
streamStartOffset: byteStart,
@ -130,8 +111,7 @@ export function createRangedEbmlStream({
ebml$,
totalSize,
response,
body: stream,
teeBody: teeStream,
body,
controller,
});
})
@ -149,11 +129,10 @@ export function createEbmlController({
...options,
url,
byteStart: 0,
tee: true,
});
const controller$ = metaRequest$.pipe(
map(({ totalSize, ebml$, response, controller, teeBody }) => {
map(({ totalSize, ebml$, response, controller }) => {
const head$ = ebml$.pipe(
filter(isTagIdPos(EbmlTagIdEnum.EBML, EbmlTagPosition.End)),
take(1),
@ -174,36 +153,23 @@ export function createEbmlController({
*/
const segments$ = segmentStart$.pipe(
map((startTag) => {
const segment = new SegmentSystem(startTag, teeBody!);
const segment = new SegmentSystem(startTag);
const clusterSystem = segment.cluster;
const seekSystem = segment.seek;
const meta$ = ebml$.pipe(
const metaScan$ = ebml$.pipe(
scan(
(acc, tag) => {
// avoid object recreation
acc.hasKeyframe =
acc.hasKeyframe ||
(tag.id === EbmlTagIdEnum.SimpleBlock && tag.keyframe) ||
(tag.id === EbmlTagIdEnum.BlockGroup &&
tag.children.every(
(c) => c.id !== EbmlTagIdEnum.ReferenceBlock
));
acc.segment.scanMeta(tag);
acc.tag = tag;
return acc;
},
{ hasKeyframe: false, tag: undefined as unknown as EbmlTagType }
{
segment,
tag: undefined as unknown as EbmlTagType,
}
),
takeWhile(({ tag, hasKeyframe }) => {
return (
!isTagIdPos(EbmlTagIdEnum.Segment, EbmlTagPosition.End)(tag) &&
!(
isTagIdPos(EbmlTagIdEnum.Cluster, EbmlTagPosition.End)(tag) &&
hasKeyframe
)
);
}, true),
map(({ tag }) => tag),
takeWhile((acc) => acc.segment.canCompleteMeta(), true),
share({
resetOnComplete: false,
resetOnError: false,
@ -211,10 +177,13 @@ export function createEbmlController({
})
);
const withMeta$ = meta$.pipe(
reduce((segment, meta) => segment.scanMeta(meta), segment),
switchMap(() => segment.completeMeta()),
take(1),
const meta$ = metaScan$.pipe(
map(({ tag }) => tag)
);
const withMeta$ = metaScan$.pipe(
last(),
switchMap(({ segment }) => segment.completeMeta()),
shareReplay(1)
);

View File

@ -0,0 +1,20 @@
import type {EbmlClusterTagType} from "konoebml";
import {ClusterSchema, type ClusterType} from "../schema";
import {type SegmentComponent, SegmentComponentSystemTrait} from "./segment";
export class ClusterSystem extends SegmentComponentSystemTrait<
EbmlClusterTagType,
typeof ClusterSchema
> {
override get schema() {
return ClusterSchema;
}
clustersBuffer: SegmentComponent<ClusterType>[] = [];
addClusterWithTag(tag: EbmlClusterTagType) {
const cluster = this.componentFromTag(tag);
this.clustersBuffer.push(cluster);
return cluster;
}
}

View File

@ -0,0 +1,84 @@
import {type EbmlCuePointTagType, type EbmlCuesTagType, EbmlTagIdEnum} from "konoebml";
import {CuePointSchema, type CuePointType, type CueTrackPositionsType} from "../schema.ts";
import {maxBy} from "lodash-es";
import {type SegmentComponent, SegmentComponentSystemTrait} from "./segment.ts";
export class CueSystem extends SegmentComponentSystemTrait<
EbmlCuePointTagType,
typeof CuePointSchema
> {
override get schema() {
return CuePointSchema;
}
cues: SegmentComponent<CuePointType>[] = [];
prepareCuesWithTag(tag: EbmlCuesTagType) {
this.cues = tag.children
.filter((c) => c.id === EbmlTagIdEnum.CuePoint)
.map(this.componentFromTag.bind(this));
return this;
}
findClosestCue(seekTime: number): CuePointType | undefined {
const cues = this.cues;
if (!cues || cues.length === 0) {
return undefined;
}
let left = 0;
let right = cues.length - 1;
if (seekTime <= cues[0].CueTime) {
return cues[0];
}
if (seekTime >= cues[right].CueTime) {
return cues[right];
}
while (left <= right) {
const mid = Math.floor((left + right) / 2);
if (cues[mid].CueTime === seekTime) {
return cues[mid];
}
if (cues[mid].CueTime < seekTime) {
left = mid + 1;
} else {
right = mid - 1;
}
}
const before = cues[right];
const after = cues[left];
return Math.abs((before.CueTime as number) - seekTime) <
Math.abs((after.CueTime as number) - seekTime)
? before
: after;
}
getCueTrackPositions(
cuePoint: CuePointType,
track?: number
): CueTrackPositionsType {
let cueTrackPositions: CueTrackPositionsType | undefined;
if (track! >= 0) {
cueTrackPositions = cuePoint.CueTrackPositions.find(
(c) => c.CueTrack === track
);
}
if (!cueTrackPositions) {
cueTrackPositions = maxBy(
cuePoint.CueTrackPositions,
(c) => c.CueClusterPosition
)!;
}
return cueTrackPositions;
}
get prepared(): boolean {
return this.cues.length > 0;
}
}

View File

@ -0,0 +1,7 @@
export { TrackContext, AudioTrackContext, VideoTrackContext, DefaultTrackContext, type GetTrackEntryOptions, TrackSystem } from './track';
export { CueSystem } from './cue';
export { TagSystem } from './tag';
export { ClusterSystem } from './cluster';
export { InfoSystem } from './info';
export { type SegmentComponent, SegmentSystem, SegmentComponentSystemTrait, withSegment } from './segment';
export { SeekSystem, SEEK_ID_KAX_CUES, SEEK_ID_KAX_INFO, SEEK_ID_KAX_TAGS, SEEK_ID_KAX_TRACKS } from './seek';

View File

@ -0,0 +1,19 @@
import type {EbmlInfoTagType} from "konoebml";
import {InfoSchema, type InfoType} from "../schema.ts";
import {type SegmentComponent, SegmentComponentSystemTrait} from "./segment.ts";
export class InfoSystem extends SegmentComponentSystemTrait<
EbmlInfoTagType,
typeof InfoSchema
> {
override get schema() {
return InfoSchema;
}
info!: SegmentComponent<InfoType>;
prepareWithInfoTag(tag: EbmlInfoTagType) {
this.info = this.componentFromTag(tag);
return this;
}
}

View File

@ -0,0 +1,65 @@
import type {EbmlSeekHeadTagType, EbmlTagType} from "konoebml";
import {SeekHeadSchema, type SeekHeadType} from "../schema.ts";
import {isEqual} from "lodash-es";
import {UnreachableOrLogicError} from "@konoplayer/core/errors.ts";
import {SegmentComponentSystemTrait} from "./segment.ts";
export const SEEK_ID_KAX_INFO = new Uint8Array([0x15, 0x49, 0xa9, 0x66]);
export const SEEK_ID_KAX_TRACKS = new Uint8Array([0x16, 0x54, 0xae, 0x6b]);
export const SEEK_ID_KAX_CUES = new Uint8Array([0x1c, 0x53, 0xbb, 0x6b]);
export const SEEK_ID_KAX_TAGS = new Uint8Array([0x12, 0x54, 0xc3, 0x67]);
export class SeekSystem extends SegmentComponentSystemTrait<
EbmlSeekHeadTagType,
typeof SeekHeadSchema
> {
override get schema() {
return SeekHeadSchema;
}
seekHeads: SeekHeadType[] = [];
private offsetToTagMemo: Map<number, EbmlTagType> = new Map();
memoOffset(tag: EbmlTagType) {
this.offsetToTagMemo.set(tag.startOffset, tag);
}
addSeekHeadTag(tag: EbmlSeekHeadTagType) {
const seekHead = this.componentFromTag(tag);
this.seekHeads.push(seekHead);
return seekHead;
}
offsetFromSeekPosition(position: number): number {
return position + this.segment.contentStartOffset;
}
seekTagByStartOffset(
startOffset: number | undefined
): EbmlTagType | undefined {
return startOffset! >= 0
? this.offsetToTagMemo.get(startOffset!)
: undefined;
}
seekOffsetBySeekId(seekId: Uint8Array): number | undefined {
const seekPosition = this.seekHeads[0]?.Seek?.find((c) =>
isEqual(c.SeekID, seekId)
)?.SeekPosition;
return seekPosition! >= 0
? this.offsetFromSeekPosition(seekPosition! as number)
: undefined;
}
seekTagBySeekId(seekId: Uint8Array): EbmlTagType | undefined {
return this.seekTagByStartOffset(this.seekOffsetBySeekId(seekId));
}
get firstClusterOffset() {
if (!this.segment.firstCluster) {
throw new UnreachableOrLogicError('first cluster not found');
}
return this.segment.firstCluster.startOffset;
}
}

View File

@ -0,0 +1,156 @@
import {
type EbmlClusterTagType,
type EbmlMasterTagType,
type EbmlSegmentTagType,
EbmlTagIdEnum,
EbmlTagPosition,
type EbmlTagType
} from "konoebml";
import {ArkErrors, type Type} from "arktype";
import {convertEbmlTagToComponent, type InferType} from "../util.ts";
import {CueSystem} from "./cue.ts";
import {ClusterSystem} from "./cluster.ts";
import {SEEK_ID_KAX_CUES, SEEK_ID_KAX_INFO, SEEK_ID_KAX_TAGS, SEEK_ID_KAX_TRACKS, SeekSystem} from "./seek.ts";
import {InfoSystem} from "./info.ts";
import {TrackSystem} from "./track.ts";
import {TagSystem} from "./tag.ts";
import type {BlockGroupType} from "../schema.ts";
export class SegmentSystem {
startTag: EbmlSegmentTagType;
metaTags: EbmlTagType[] = [];
firstCluster: EbmlClusterTagType | undefined;
cue: CueSystem;
cluster: ClusterSystem;
seek: SeekSystem;
info: InfoSystem;
track: TrackSystem;
tag: TagSystem;
constructor(startNode: EbmlSegmentTagType) {
this.startTag = startNode;
this.cue = new CueSystem(this);
this.cluster = new ClusterSystem(this);
this.seek = new SeekSystem(this);
this.info = new InfoSystem(this);
this.track = new TrackSystem(this);
this.tag = new TagSystem(this);
}
get contentStartOffset() {
return this.startTag.startOffset + this.startTag.headerLength;
}
private seekLocal() {
const infoTag = this.seek.seekTagBySeekId(SEEK_ID_KAX_INFO);
const tracksTag = this.seek.seekTagBySeekId(SEEK_ID_KAX_TRACKS);
const cuesTag = this.seek.seekTagBySeekId(SEEK_ID_KAX_CUES);
const tagsTag = this.seek.seekTagBySeekId(SEEK_ID_KAX_TAGS);
if (cuesTag?.id === EbmlTagIdEnum.Cues) {
this.cue.prepareCuesWithTag(cuesTag);
}
if (infoTag?.id === EbmlTagIdEnum.Info) {
this.info.prepareWithInfoTag(infoTag);
}
if (tracksTag?.id === EbmlTagIdEnum.Tracks) {
this.track.prepareTracksWithTag(tracksTag);
}
if (tagsTag?.id === EbmlTagIdEnum.Tags) {
this.tag.prepareTagsWithTag(tagsTag);
}
}
scanMeta(tag: EbmlTagType) {
if (
tag.id === EbmlTagIdEnum.SeekHead &&
tag.position === EbmlTagPosition.End
) {
this.seek.addSeekHeadTag(tag);
}
this.metaTags.push(tag);
this.seek.memoOffset(tag);
if (tag.id === EbmlTagIdEnum.Cluster && !this.firstCluster) {
this.firstCluster = tag;
this.seekLocal();
}
if (this.firstCluster) {
if (tag.id === EbmlTagIdEnum.SimpleBlock && tag.keyframe) {
this.track.tryPeekKeyframe(tag);
} else if (tag.id === EbmlTagIdEnum.BlockGroup) {
const blockGroup = convertEbmlTagToComponent(tag) as BlockGroupType;
// keep frame
if (blockGroup && !blockGroup.ReferenceBlock && blockGroup.Block) {
this.track.tryPeekKeyframe(blockGroup.Block);
}
}
}
return this;
}
canCompleteMeta() {
const lastTag = this.metaTags.at(-1);
if (!lastTag) {
return false;
}
if (lastTag.id === EbmlTagIdEnum.Segment && lastTag.position === EbmlTagPosition.End) {
return true;
}
return !!(this.firstCluster && this.track.preparedToConfigureTracks());
}
async completeMeta() {
this.seekLocal();
await this.track.buildTracksConfiguration();
return this;
}
}
export type SegmentComponent<T> = T & {
get segment(): SegmentSystem;
};
export function withSegment<T extends object>(
component: T,
segment: SegmentSystem
): SegmentComponent<T> {
const component_ = component as T & { segment: SegmentSystem };
component_.segment = segment;
return component_;
}
export class SegmentComponentSystemTrait<
E extends EbmlMasterTagType,
S extends Type<any>,
> {
segment: SegmentSystem;
get schema(): S {
throw new Error('unimplemented!');
}
constructor(segment: SegmentSystem) {
this.segment = segment;
}
componentFromTag(tag: E): SegmentComponent<InferType<S>> {
const extracted = convertEbmlTagToComponent(tag);
const result = this.schema(extracted) as
| (InferType<S> & { segment: SegmentSystem })
| ArkErrors;
if (result instanceof ArkErrors) {
const errors = result;
console.error(
'Parse component from tag error:',
tag.toDebugRecord(),
errors.flatProblemsByPath
);
throw errors;
}
result.segment = this.segment;
return result;
}
}

View File

@ -0,0 +1,26 @@
import {EbmlTagIdEnum, type EbmlTagsTagType, type EbmlTagTagType} from "konoebml";
import {TagSchema, type TagType} from "../schema.ts";
import {type SegmentComponent, SegmentComponentSystemTrait} from "./segment.ts";
export class TagSystem extends SegmentComponentSystemTrait<
EbmlTagTagType,
typeof TagSchema
> {
override get schema() {
return TagSchema;
}
tags: SegmentComponent<TagType>[] = [];
prepareTagsWithTag(tag: EbmlTagsTagType) {
this.tags = tag.children
.filter((c) => c.id === EbmlTagIdEnum.Tag)
.map((c) => this.componentFromTag(c));
return this;
}
get prepared(): boolean {
return this.tags.length > 0;
}
}

View File

@ -0,0 +1,160 @@
import {ParseCodecErrors, UnsupportedCodecError} from "@konoplayer/core/errors.ts";
import {
EbmlTagIdEnum,
type EbmlTrackEntryTagType,
type EbmlTracksTagType
} from "konoebml";
import {
audioCodecIdToWebCodecs,
videoCodecIdRequirePeekingKeyframe,
videoCodecIdToWebCodecs, type AudioDecoderConfigExt, type VideoDecoderConfigExt
} from "../codecs";
import {TrackEntrySchema, type TrackEntryType, TrackTypeRestrictionEnum} from "../schema";
import {type SegmentComponent, SegmentComponentSystemTrait} from "./segment";
export interface GetTrackEntryOptions {
priority?: (v: SegmentComponent<TrackEntryType>) => number;
predicate?: (v: SegmentComponent<TrackEntryType>) => boolean;
}
export abstract class TrackContext {
peekingKeyframe?: Uint8Array;
trackEntry: TrackEntryType
constructor(trackEntry: TrackEntryType) {
this.trackEntry = trackEntry;
}
peekKeyframe (payload: Uint8Array) {
this.peekingKeyframe = payload;
}
preparedToConfigure () {
if (this.requirePeekKeyframe()) {
return !!this.peekingKeyframe;
}
return true;
}
abstract requirePeekKeyframe (): boolean;
abstract buildConfiguration (): Promise<void>;
}
export class DefaultTrackContext extends TrackContext {
override requirePeekKeyframe(): boolean {
return false;
}
override async buildConfiguration(): Promise<void> {}
}
export class VideoTrackContext extends TrackContext {
configuration!: VideoDecoderConfigExt;
override requirePeekKeyframe (): boolean {
return videoCodecIdRequirePeekingKeyframe(this.trackEntry.CodecID);
}
async buildConfiguration () {
const configuration = videoCodecIdToWebCodecs(this.trackEntry, this.peekingKeyframe);
if (await VideoDecoder.isConfigSupported(configuration)) {
throw new UnsupportedCodecError(configuration.codec, 'video decoder');
}
this.configuration = configuration;
}
}
export class AudioTrackContext extends TrackContext {
configuration!: AudioDecoderConfigExt;
override requirePeekKeyframe (): boolean {
return videoCodecIdRequirePeekingKeyframe(this.trackEntry.CodecID);
}
async buildConfiguration () {
const configuration = audioCodecIdToWebCodecs(this.trackEntry, this.peekingKeyframe);
if (await AudioDecoder.isConfigSupported(configuration)) {
throw new UnsupportedCodecError(configuration.codec, 'audio decoder');
}
this.configuration = configuration;
}
}
export class TrackSystem extends SegmentComponentSystemTrait<
EbmlTrackEntryTagType,
typeof TrackEntrySchema
> {
override get schema() {
return TrackEntrySchema;
}
tracks: SegmentComponent<TrackEntryType>[] = [];
trackContexts: Map<number | bigint, TrackContext> = new Map();
getTrackEntry({
priority = (track) =>
(Number(!!track.FlagForced) << 4) + Number(!!track.FlagDefault),
predicate = (track) => track.FlagEnabled !== 0,
}: GetTrackEntryOptions) {
return this.tracks
.filter(predicate)
.toSorted((a, b) => priority(b) - priority(a))
.at(0);
}
getTrackContext <T extends TrackContext>(options: GetTrackEntryOptions): T | undefined {
const trackEntry = this.getTrackEntry(options);
const trackNum = trackEntry?.TrackNumber!;
return this.trackContexts.get(trackNum) as T | undefined;
}
prepareTracksWithTag(tag: EbmlTracksTagType) {
this.tracks = tag.children
.filter((c) => c.id === EbmlTagIdEnum.TrackEntry)
.map((c) => this.componentFromTag(c));
for (const track of this.tracks) {
if (track.TrackType === TrackTypeRestrictionEnum.VIDEO) {
this.trackContexts.set(track.TrackNumber, new VideoTrackContext(track))
} else if (track.TrackType === TrackTypeRestrictionEnum.AUDIO) {
this.trackContexts.set(track.TrackNumber, new AudioTrackContext(track))
}
}
return this;
}
async buildTracksConfiguration () {
const parseErrors = new ParseCodecErrors();
for (const context of this.trackContexts.values()) {
try {
await context.buildConfiguration();
} catch (e) {
parseErrors.cause.push(e as Error);
}
}
if (parseErrors.cause.length > 0) {
console.error(parseErrors);
}
}
tryPeekKeyframe (tag: { track: number | bigint, frames: Uint8Array[] }) {
for (const c of this.trackContexts.values()) {
if (c.trackEntry.TrackNumber === tag.track) {
c.peekKeyframe(tag.frames?.[0])
}
}
}
preparedToConfigureTracks (): boolean {
for (const c of this.trackContexts.values()) {
if (!c.preparedToConfigure()) {
return false;
}
}
return true;
}
}