reject on variant switch

This commit is contained in:
mertalev
2026-05-09 02:01:42 -04:00
parent 5fcaa6ed6b
commit 13f08a034f
2 changed files with 33 additions and 11 deletions

View File

@@ -303,6 +303,16 @@ describe(HlsService.name, () => {
});
});
it('rejects pending waiters for the previous variant on variant change', async () => {
mocks.storage.checkFileExists.mockResolvedValueOnce(false);
const pending = sut.getSegment(auth, assetId, sessionId, 0, 'seg_1.m4s');
await new Promise((resolve) => setImmediate(resolve));
await sut.getSegment(auth, assetId, sessionId, 1, 'seg_1.m4s');
await expect(pending).rejects.toThrow('Variant changed');
});
it('throws NotFoundException when the session does not exist', async () => {
mocks.videoStream.getSession.mockReset();
await expect(sut.getSegment(auth, assetId, sessionId, variantIndex, 'init.mp4')).rejects.toBeInstanceOf(

View File

@@ -21,12 +21,13 @@ import { ImmichFileResponse } from 'src/utils/file';
import { getOutputSize } from 'src/utils/media';
type AssetWithStreamInfo = { videoStream: VideoStreamInfo & { timeBase: number }; packets: VideoPacketInfo };
type ApiSession = { lastRequestedSegment: number | null; lastVariantIndex: number | null };
@Injectable()
export class HlsService extends BaseService {
private pendingSegments = new PendingEvents<'HlsSegmentResult'>({ timeoutMs: 15_000 });
private pendingSessions = new PendingEvents<'HlsSessionResult'>({ timeoutMs: 5000 });
private sessions = new Map<string, { lastRequestedSegment: number | null }>();
private sessions = new Map<string, ApiSession>();
@OnEvent({ name: 'HlsSessionResult', server: true, workers: [ImmichWorker.Api] })
onSessionResult(event: ArgOf<'HlsSessionResult'>) {
@@ -65,7 +66,7 @@ export class HlsService extends BaseService {
const sessionId = this.cryptoRepository.randomUUID();
this.websocketRepository.serverSend('HlsSessionRequest', { sessionId, assetId, ownerId: auth.user.id });
await this.pendingSessions.wait(sessionId);
this.sessions.set(sessionId, { lastRequestedSegment: null });
this.trackSession(sessionId);
return this.generateMainPlaylist(sessionId, ffmpeg, asset);
}
@@ -97,7 +98,8 @@ export class HlsService extends BaseService {
cacheControl: CacheControl.PrivateWithCache,
});
const segmentIndex = this.getSegmentIndex(sessionId, filename);
const apiSession = this.trackSession(sessionId, variantIndex);
const segmentIndex = this.getSegmentIndex(apiSession, filename);
this.websocketRepository.serverSend('HlsHeartbeat', { sessionId, variantIndex, segmentIndex });
if (await this.storageRepository.checkFileExists(path, constants.R_OK)) {
@@ -170,17 +172,27 @@ export class HlsService extends BaseService {
return `${sessionId}:${variantIndex}:${segmentIndex}`;
}
private getSegmentIndex(sessionId: string, filename: string) {
const existing = this.sessions.get(sessionId);
private getSegmentIndex(session: ApiSession, filename: string) {
if (filename.endsWith('.mp4')) {
return (existing?.lastRequestedSegment ?? -1) + 1;
return (session.lastRequestedSegment ?? -1) + 1;
}
const segmentIndex = Number.parseInt(HLS_SEGMENT_FILENAME_REGEX.exec(filename)![1]);
if (existing) {
existing.lastRequestedSegment = segmentIndex;
} else {
this.sessions.set(sessionId, { lastRequestedSegment: segmentIndex });
}
session.lastRequestedSegment = segmentIndex;
return segmentIndex;
}
private trackSession(id: string, variantIndex: number | null = null) {
const session = this.sessions.get(id);
if (!session) {
const newSession = { lastRequestedSegment: null, lastVariantIndex: variantIndex };
this.sessions.set(id, newSession);
return newSession;
}
if (session.lastVariantIndex !== null && session.lastVariantIndex !== variantIndex) {
this.pendingSegments.rejectByPrefix(`${id}:${session.lastVariantIndex}:`, 'Variant changed');
}
session.lastVariantIndex = variantIndex;
return session;
}
}