diff --git a/e2e/src/api/specs/asset-upload.e2e-spec.ts b/e2e/src/api/specs/asset-upload.e2e-spec.ts index 082b1a6599..b6e6839599 100644 --- a/e2e/src/api/specs/asset-upload.e2e-spec.ts +++ b/e2e/src/api/specs/asset-upload.e2e-spec.ts @@ -99,6 +99,9 @@ describe('/upload', () => { visibility: 'timeline', }), ); + const downloaded = await utils.downloadAsset(admin.accessToken, body.id); + expect(downloaded.size).toBe(content.byteLength); + expect(content.compare(await downloaded.bytes())).toBe(0); }); it('should create a complete upload with Upload-Incomplete: ?0 if version is 3', async () => { @@ -136,6 +139,99 @@ describe('/upload', () => { ); }); + it('should support conventional upload', async () => { + const content = randomBytes(1024); + + const checksum = createHash('sha1').update(content).digest('base64'); + const { status, headers, body } = await request(app) + .post('/upload') + .set('Authorization', `Bearer ${user.accessToken}`) + .set('X-Immich-Asset-Data', assetData) + .set('Repr-Digest', `sha=:${checksum}:`) + .set('Content-Type', 'image/jpeg') + .set('Upload-Length', '1024') + .send(content); + + expect(status).toBe(200); + expect(headers['upload-complete']).toBeUndefined(); + expect(headers['upload-incomplete']).toBeUndefined(); + expect(headers['location']).toBeUndefined(); + expect(body).toEqual(expect.objectContaining({ id: expect.any(String) })); + + const asset = await utils.getAssetInfo(user.accessToken, body.id); + expect(asset).toEqual( + expect.objectContaining({ + id: body.id, + checksum, + ownerId: user.userId, + exifInfo: expect.objectContaining({ fileSizeInByte: content.byteLength }), + originalFileName: 'test-image.jpg', + deviceAssetId: 'rufh', + deviceId: 'test', + isFavorite: true, + visibility: 'timeline', + }), + ); + }); + + it('overwrite partial duplicate if conventional upload', { timeout: 1000 }, async () => { + const content = randomBytes(10240); + + const checksum = createHash('sha1').update(content).digest('base64'); + + // simulate interrupted upload by starting a request and not completing it + const req = httpRequest({ + hostname: 'localhost', + port: 2285, + path: '/upload', + method: 'POST', + headers: { + Authorization: `Bearer ${user.accessToken}`, + 'X-Immich-Asset-Data': assetData, + 'Repr-Digest': `sha=:${checksum}:`, + 'Upload-Length': '1024', + 'Content-Length': '1024', + 'Content-Type': 'image/jpeg', + }, + }); + req.write(content.subarray(0, 512)); + + await setTimeout(50); + + const { status, headers, body } = await request(app) + .post('/upload') + .set('Authorization', `Bearer ${user.accessToken}`) + .set('X-Immich-Asset-Data', assetData) + .set('Repr-Digest', `sha=:${checksum}:`) + .set('Content-Type', 'image/jpeg') + .set('Upload-Length', '10240') + .send(content); + + expect(status).toBe(200); + expect(headers['upload-complete']).toBeUndefined(); + expect(headers['upload-incomplete']).toBeUndefined(); + expect(headers['location']).toBeUndefined(); + expect(body).toEqual(expect.objectContaining({ id: expect.any(String) })); + + const asset = await utils.getAssetInfo(user.accessToken, body.id); + expect(asset).toEqual( + expect.objectContaining({ + id: body.id, + checksum, + ownerId: user.userId, + exifInfo: expect.objectContaining({ fileSizeInByte: content.byteLength }), + originalFileName: 'test-image.jpg', + deviceAssetId: 'rufh', + deviceId: 'test', + isFavorite: true, + visibility: 'timeline', + }), + ); + const downloaded = await utils.downloadAsset(user.accessToken, body.id); + expect(downloaded.size).toBe(content.byteLength); + expect(content.compare(await downloaded.bytes())).toBe(0); + }); + it('should reject when Upload-Complete: ?1 with mismatching Content-Length and Upload-Length', async () => { const content = randomBytes(1000); @@ -789,6 +885,12 @@ describe('/upload', () => { .send(content.subarray(2000)); expect(secondResponse.status).toBe(200); + expect(secondResponse.headers['upload-complete']).toBe('?1'); + expect(secondResponse.body).toEqual(expect.objectContaining({ id: expect.any(String) })); + + const downloaded = await utils.downloadAsset(user.accessToken, secondResponse.body.id); + expect(downloaded.size).toBe(content.byteLength); + expect(content.compare(await downloaded.bytes())).toBe(0); }); }); diff --git a/e2e/src/utils.ts b/e2e/src/utils.ts index b33d6cb190..7ce271d9b0 100644 --- a/e2e/src/utils.ts +++ b/e2e/src/utils.ts @@ -561,6 +561,16 @@ export const utils = { await utils.waitForQueueFinish(accessToken, 'sidecar'); await utils.waitForQueueFinish(accessToken, 'metadataExtraction'); }, + + downloadAsset: async (accessToken: string, id: string) => { + const downloadedRes = await fetch(`${baseUrl}/api/assets/${id}/original`, { + headers: asBearerAuth(accessToken), + }); + if (!downloadedRes.ok) { + throw new Error(`Failed to download asset ${id}: ${downloadedRes.status} ${await downloadedRes.text()}`); + } + return await downloadedRes.blob(); + }, }; utils.initSdk(); diff --git a/server/src/controllers/asset-upload.controller.spec.ts b/server/src/controllers/asset-upload.controller.spec.ts index 10ad929561..9a9ed29cfe 100644 --- a/server/src/controllers/asset-upload.controller.spec.ts +++ b/server/src/controllers/asset-upload.controller.spec.ts @@ -59,10 +59,11 @@ describe(AssetUploadController.name, () => { expect(ctx.authenticate).toHaveBeenCalled(); }); - it('should require Upload-Draft-Interop-Version header', async () => { + it('should require at least version 3 of Upload-Draft-Interop-Version header if provided', async () => { const { status, body } = await request(ctx.getHttpServer()) .post('/upload') .set('X-Immich-Asset-Data', makeAssetData()) + .set('Upload-Draft-Interop-Version', '2') .set('Repr-Digest', checksum) .set('Upload-Complete', '?1') .set('Upload-Length', '1024') @@ -71,7 +72,7 @@ describe(AssetUploadController.name, () => { expect(status).toBe(400); expect(body).toEqual( expect.objectContaining({ - message: expect.arrayContaining(['version must be an integer number', 'version must not be less than 3']), + message: expect.arrayContaining(['version must not be less than 3']), }), ); }); @@ -102,17 +103,15 @@ describe(AssetUploadController.name, () => { expect(body).toEqual(expect.objectContaining({ message: 'Missing repr-digest header' })); }); - it('should require Upload-Complete header', async () => { + it('should allow conventional upload without Upload-Complete header', async () => { const { status, body } = await request(ctx.getHttpServer()) .post('/upload') - .set('Upload-Draft-Interop-Version', '8') .set('X-Immich-Asset-Data', makeAssetData()) .set('Repr-Digest', checksum) .set('Upload-Length', '1024') .send(buffer); - expect(status).toBe(400); - expect(body).toEqual(expect.objectContaining({ message: 'Expected valid upload-complete header' })); + expect(status).toBe(201); }); it('should require Upload-Length header for incomplete upload', async () => { @@ -255,7 +254,7 @@ describe(AssetUploadController.name, () => { .send(buffer); expect(status).toBe(400); - expect(body).toEqual(expect.objectContaining({ message: 'Expected valid upload-complete header' })); + expect(body).toEqual(expect.objectContaining({ message: 'upload-complete must be a structured boolean value' })); }); it('should validate Upload-Length is a positive integer', async () => { @@ -323,10 +322,11 @@ describe(AssetUploadController.name, () => { .patch(`/upload/${uploadId}`) .set('Upload-Draft-Interop-Version', '8') .set('Upload-Offset', '0') + .set('Content-Type', 'application/partial-upload') .send(Buffer.from('test')); expect(status).toBe(400); - expect(body).toEqual(expect.objectContaining({ message: 'Expected valid upload-complete header' })); + expect(body).toEqual(expect.objectContaining({ message: ['uploadComplete must be a boolean value'] })); }); it('should validate UUID parameter', async () => { diff --git a/server/src/dtos/asset-upload.dto.ts b/server/src/dtos/asset-upload.dto.ts index bab4d1e8a9..9f702e62a2 100644 --- a/server/src/dtos/asset-upload.dto.ts +++ b/server/src/dtos/asset-upload.dto.ts @@ -1,7 +1,7 @@ import { BadRequestException } from '@nestjs/common'; import { ApiProperty } from '@nestjs/swagger'; import { Expose, plainToInstance, Transform, Type } from 'class-transformer'; -import { Equals, IsInt, IsNotEmpty, IsString, Min, ValidateIf, ValidateNested } from 'class-validator'; +import { Equals, IsBoolean, IsInt, IsNotEmpty, IsString, Min, ValidateIf, ValidateNested } from 'class-validator'; import { ImmichHeader } from 'src/enum'; import { Optional, ValidateBoolean, ValidateDate } from 'src/validation'; import { parseDictionary } from 'structured-headers'; @@ -50,27 +50,22 @@ export class UploadAssetDataDto { iCloudId!: string; } -class BaseRufhHeadersDto { - @Expose({ name: Header.InteropVersion }) - @Min(3) - @IsInt() - @Type(() => Number) - version!: number; -} - -export class BaseUploadHeadersDto extends BaseRufhHeadersDto { +export class BaseUploadHeadersDto { @Expose({ name: Header.ContentLength }) @Min(0) @IsInt() @Type(() => Number) contentLength!: number; - - @Expose() - @Transform(({ obj }) => isUploadComplete(obj)) - uploadComplete!: boolean; } export class StartUploadDto extends BaseUploadHeadersDto { + @Expose({ name: Header.InteropVersion }) + @Optional() + @Min(3) + @IsInt() + @Type(() => Number) + version?: number; + @Expose({ name: ImmichHeader.AssetData }) @ValidateNested() @Transform(({ value }) => { @@ -121,15 +116,25 @@ export class StartUploadDto extends BaseUploadHeadersDto { } const contentLength = obj[Header.ContentLength]; - if (contentLength && isUploadComplete(obj)) { + if (contentLength && isUploadComplete(obj) !== false) { return Number(contentLength); } throw new BadRequestException(`Missing ${Header.UploadLength} header`); }) uploadLength!: number; + + @Expose() + @Transform(({ obj }) => isUploadComplete(obj)) + uploadComplete?: boolean; } export class ResumeUploadDto extends BaseUploadHeadersDto { + @Expose({ name: Header.InteropVersion }) + @Min(3) + @IsInt() + @Type(() => Number) + version!: number; + @Expose({ name: Header.ContentType }) @ValidateIf((o) => o.version && o.version >= 6) @Equals('application/partial-upload') @@ -147,9 +152,20 @@ export class ResumeUploadDto extends BaseUploadHeadersDto { @IsInt() @Type(() => Number) uploadOffset!: number; + + @Expose() + @IsBoolean() + @Transform(({ obj }) => isUploadComplete(obj)) + uploadComplete!: boolean; } -export class GetUploadStatusDto extends BaseRufhHeadersDto {} +export class GetUploadStatusDto { + @Expose({ name: Header.InteropVersion }) + @Min(3) + @IsInt() + @Type(() => Number) + version!: number; +} export class UploadOkDto { @ApiProperty() @@ -159,12 +175,14 @@ export class UploadOkDto { const STRUCTURED_TRUE = '?1'; const STRUCTURED_FALSE = '?0'; -function isUploadComplete(obj: any): boolean { +function isUploadComplete(obj: any) { const uploadComplete = obj[Header.UploadComplete]; if (uploadComplete === STRUCTURED_TRUE) { return true; } else if (uploadComplete === STRUCTURED_FALSE) { return false; + } else if (uploadComplete !== undefined) { + throw new BadRequestException('upload-complete must be a structured boolean value'); } const uploadIncomplete = obj[Header.UploadIncomplete]; @@ -172,6 +190,7 @@ function isUploadComplete(obj: any): boolean { return false; } else if (uploadIncomplete === STRUCTURED_FALSE) { return true; + } else if (uploadComplete !== undefined) { + throw new BadRequestException('upload-incomplete must be a structured boolean value'); } - throw new BadRequestException(`Expected valid ${Header.UploadComplete} header`); } diff --git a/server/src/services/asset-upload.service.spec.ts b/server/src/services/asset-upload.service.spec.ts index f2c146755a..0b0fd6499f 100644 --- a/server/src/services/asset-upload.service.spec.ts +++ b/server/src/services/asset-upload.service.spec.ts @@ -64,8 +64,6 @@ describe(AssetUploadService.name, () => { 1024, undefined, ); - - expect(mocks.storage.mkdir).toHaveBeenCalledWith(expect.stringContaining(authStub.user1.user.id)); }); it('should determine asset type from filename extension', async () => { diff --git a/server/src/services/asset-upload.service.ts b/server/src/services/asset-upload.service.ts index 71fe3487b8..b184a29c1b 100644 --- a/server/src/services/asset-upload.service.ts +++ b/server/src/services/asset-upload.service.ts @@ -2,8 +2,8 @@ import { BadRequestException, Injectable, InternalServerErrorException } from '@ import { Response } from 'express'; import { DateTime } from 'luxon'; import { createHash } from 'node:crypto'; -import { extname, join } from 'node:path'; -import { Readable } from 'node:stream'; +import { dirname, extname, join } from 'node:path'; +import { Readable, Writable } from 'node:stream'; import { SystemConfig } from 'src/config'; import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants'; import { StorageCore } from 'src/cores/storage.core'; @@ -55,6 +55,8 @@ export class AssetUploadService extends BaseService { async startUpload(auth: AuthDto, req: Readable, res: Response, dto: StartUploadDto): Promise { this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`); const { uploadComplete, assetData, uploadLength, contentLength, version } = dto; + const isComplete = uploadComplete !== false; + const isResumable = version && version >= 3 && uploadComplete !== undefined; const { backup } = await this.getConfig({ withCache: true }); const asset = await this.onStart(auth, dto); @@ -64,35 +66,49 @@ export class AssetUploadService extends BaseService { } const location = `/api/upload/${asset.id}`; - if (version <= MAX_RUFH_INTEROP_VERSION) { + if (isResumable) { this.sendInterimResponse(res, location, version, this.getUploadLimits(backup)); + // this is a 5xx to indicate the client should do offset retrieval and resume + res.status(500).send('Incomplete asset already exists'); + return; } - // this is a 5xx to indicate the client should do offset retrieval and resume - res.status(500).send('Incomplete asset already exists'); - return; } - if (uploadComplete && uploadLength !== contentLength) { + if (isComplete && uploadLength !== contentLength) { return this.sendInconsistentLength(res); } const location = `/api/upload/${asset.id}`; - if (version <= MAX_RUFH_INTEROP_VERSION) { + if (isResumable) { this.sendInterimResponse(res, location, version, this.getUploadLimits(backup)); } this.addRequest(asset.id, req); await this.databaseRepository.withUuidLock(asset.id, async () => { + // conventional upload, check status again with lock acquired before overwriting + if (asset.isDuplicate) { + const existingAsset = await this.assetRepository.getCompletionMetadata(asset.id, auth.user.id); + if (existingAsset?.status !== AssetStatus.Partial) { + return this.sendAlreadyCompleted(res); + } + } + await this.storageRepository.mkdir(dirname(asset.path)); + let checksumBuffer: Buffer | undefined; - const writeStream = this.pipe(req, asset.path, contentLength); - if (uploadComplete) { + const writeStream = asset.isDuplicate + ? this.storageRepository.createWriteStream(asset.path) + : this.storageRepository.createOrAppendWriteStream(asset.path); + this.pipe(req, writeStream, contentLength); + if (isComplete) { const hash = createHash('sha1'); req.on('data', (data: Buffer) => hash.update(data)); writeStream.on('finish', () => (checksumBuffer = hash.digest())); } await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject)); - this.setCompleteHeader(res, dto.version, uploadComplete); - if (!uploadComplete) { + if (isResumable) { + this.setCompleteHeader(res, version, uploadComplete); + } + if (!isComplete) { res.status(201).set('Location', location).setHeader('Upload-Limit', this.getUploadLimits(backup)).send(); return; } @@ -142,7 +158,8 @@ export class AssetUploadService extends BaseService { return; } - const writeStream = this.pipe(req, path, contentLength); + const writeStream = this.storageRepository.createOrAppendWriteStream(path); + this.pipe(req, writeStream, contentLength); await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject)); this.setCompleteHeader(res, version, uploadComplete); if (!uploadComplete) { @@ -300,7 +317,6 @@ export class AssetUploadService extends BaseService { return { id: duplicate.id, path, status: duplicate.status, isDuplicate: true }; } - await this.storageRepository.mkdir(folder); return { id: assetId, path, status: AssetStatus.Partial, isDuplicate: false }; } @@ -342,8 +358,7 @@ export class AssetUploadService extends BaseService { } } - private pipe(req: Readable, path: string, size: number) { - const writeStream = this.storageRepository.createOrAppendWriteStream(path); + private pipe(req: Readable, writeStream: Writable, size: number) { let receivedLength = 0; req.on('data', (data: Buffer) => { receivedLength += data.length; @@ -359,8 +374,6 @@ export class AssetUploadService extends BaseService { } writeStream.end(); }); - - return writeStream; } private sendInterimResponse({ socket }: Response, location: string, interopVersion: number, limits: string): void { @@ -427,12 +440,8 @@ export class AssetUploadService extends BaseService { } } - private setCompleteHeader(res: Response, interopVersion: number | null, isComplete: boolean): void { - if (!interopVersion) { - return; - } - - if (interopVersion > 3) { + private setCompleteHeader(res: Response, interopVersion: number | undefined, isComplete: boolean): void { + if (interopVersion === undefined || interopVersion > 3) { res.setHeader('Upload-Complete', isComplete ? '?1' : '?0'); } else { res.setHeader('Upload-Incomplete', isComplete ? '?0' : '?1');