fix(server): sync files to disk (#26881)

Ensure that all files are flushed after they've been written.

At current, files are not explicitly flushed to disk, which can cause
data corruption. In extreme circumstances, it's possible that uploaded
files may not ever be persisted at all.
This commit is contained in:
Thomas
2026-03-17 11:33:43 +00:00
committed by GitHub
parent bba4a00eb1
commit 16749ff8ba
2 changed files with 49 additions and 61 deletions

View File

@@ -3,13 +3,16 @@ import { PATH_METADATA } from '@nestjs/common/constants';
import { Reflector } from '@nestjs/core'; import { Reflector } from '@nestjs/core';
import { transformException } from '@nestjs/platform-express/multer/multer/multer.utils'; import { transformException } from '@nestjs/platform-express/multer/multer/multer.utils';
import { NextFunction, RequestHandler } from 'express'; import { NextFunction, RequestHandler } from 'express';
import multer, { StorageEngine, diskStorage } from 'multer'; import multer from 'multer';
import { createHash, randomUUID } from 'node:crypto'; import { createHash, randomUUID } from 'node:crypto';
import { join } from 'node:path';
import { pipeline } from 'node:stream';
import { Observable } from 'rxjs'; import { Observable } from 'rxjs';
import { UploadFieldName } from 'src/dtos/asset-media.dto'; import { UploadFieldName } from 'src/dtos/asset-media.dto';
import { RouteKey } from 'src/enum'; import { RouteKey } from 'src/enum';
import { AuthRequest } from 'src/middleware/auth.guard'; import { AuthRequest } from 'src/middleware/auth.guard';
import { LoggingRepository } from 'src/repositories/logging.repository'; import { LoggingRepository } from 'src/repositories/logging.repository';
import { StorageRepository } from 'src/repositories/storage.repository';
import { AssetMediaService } from 'src/services/asset-media.service'; import { AssetMediaService } from 'src/services/asset-media.service';
import { ImmichFile, UploadFile, UploadFiles } from 'src/types'; import { ImmichFile, UploadFile, UploadFiles } from 'src/types';
import { asUploadRequest, mapToUploadFile } from 'src/utils/asset.util'; import { asUploadRequest, mapToUploadFile } from 'src/utils/asset.util';
@@ -26,8 +29,6 @@ export function getFiles(files: UploadFiles) {
}; };
} }
type DiskStorageCallback = (error: Error | null, result: string) => void;
type ImmichMulterFile = Express.Multer.File & { uuid: string }; type ImmichMulterFile = Express.Multer.File & { uuid: string };
interface Callback<T> { interface Callback<T> {
@@ -35,34 +36,21 @@ interface Callback<T> {
(error: null, result: T): void; (error: null, result: T): void;
} }
const callbackify = <T>(target: (...arguments_: any[]) => T, callback: Callback<T>) => {
try {
return callback(null, target());
} catch (error: Error | any) {
return callback(error);
}
};
@Injectable() @Injectable()
export class FileUploadInterceptor implements NestInterceptor { export class FileUploadInterceptor implements NestInterceptor {
private handlers: { private handlers: {
userProfile: RequestHandler; userProfile: RequestHandler;
assetUpload: RequestHandler; assetUpload: RequestHandler;
}; };
private defaultStorage: StorageEngine;
constructor( constructor(
private reflect: Reflector, private reflect: Reflector,
private assetService: AssetMediaService, private assetService: AssetMediaService,
private storageRepository: StorageRepository,
private logger: LoggingRepository, private logger: LoggingRepository,
) { ) {
this.logger.setContext(FileUploadInterceptor.name); this.logger.setContext(FileUploadInterceptor.name);
this.defaultStorage = diskStorage({
filename: this.filename.bind(this),
destination: this.destination.bind(this),
});
const instance = multer({ const instance = multer({
fileFilter: this.fileFilter.bind(this), fileFilter: this.fileFilter.bind(this),
storage: { storage: {
@@ -99,60 +87,60 @@ export class FileUploadInterceptor implements NestInterceptor {
} }
private fileFilter(request: AuthRequest, file: Express.Multer.File, callback: multer.FileFilterCallback) { private fileFilter(request: AuthRequest, file: Express.Multer.File, callback: multer.FileFilterCallback) {
return callbackify(() => this.assetService.canUploadFile(asUploadRequest(request, file)), callback); try {
} callback(null, this.assetService.canUploadFile(asUploadRequest(request, file)));
} catch (error: Error | any) {
private filename(request: AuthRequest, file: Express.Multer.File, callback: DiskStorageCallback) { callback(error);
return callbackify( }
() => this.assetService.getUploadFilename(asUploadRequest(request, file)),
callback as Callback<string>,
);
}
private destination(request: AuthRequest, file: Express.Multer.File, callback: DiskStorageCallback) {
return callbackify(
() => this.assetService.getUploadFolder(asUploadRequest(request, file)),
callback as Callback<string>,
);
} }
private handleFile(request: AuthRequest, file: Express.Multer.File, callback: Callback<Partial<ImmichFile>>) { private handleFile(request: AuthRequest, file: Express.Multer.File, callback: Callback<Partial<ImmichFile>>) {
(file as ImmichMulterFile).uuid = randomUUID();
request.on('error', (error) => { request.on('error', (error) => {
this.logger.warn('Request error while uploading file, cleaning up', error); this.logger.warn('Request error while uploading file, cleaning up', error);
this.assetService.onUploadError(request, file).catch(this.logger.error); this.assetService.onUploadError(request, file).catch(this.logger.error);
}); });
if (!this.isAssetUploadFile(file)) { try {
this.defaultStorage._handleFile(request, file, callback); (file as ImmichMulterFile).uuid = randomUUID();
return;
}
const hash = createHash('sha1'); const uploadRequest = asUploadRequest(request, file);
file.stream.on('data', (chunk) => hash.update(chunk));
this.defaultStorage._handleFile(request, file, (error, info) => { const path = join(
if (error) { this.assetService.getUploadFolder(uploadRequest),
hash.destroy(); this.assetService.getUploadFilename(uploadRequest),
callback(error); );
} else {
callback(null, { ...info, checksum: hash.digest() }); const writeStream = this.storageRepository.createWriteStream(path);
} const hash = file.fieldname === UploadFieldName.ASSET_DATA ? createHash('sha1') : null;
});
let size = 0;
file.stream.on('data', (chunk) => {
hash?.update(chunk);
size += chunk.length;
});
pipeline(file.stream, writeStream, (error) => {
if (error) {
hash?.destroy();
return callback(error);
}
callback(null, {
path,
size,
checksum: hash?.digest(),
});
});
} catch (error: Error | any) {
callback(error);
}
} }
private removeFile(request: AuthRequest, file: Express.Multer.File, callback: (error: Error | null) => void) { private removeFile(_request: AuthRequest, file: Express.Multer.File, callback: (error: Error | null) => void) {
this.defaultStorage._removeFile(request, file, callback); this.storageRepository
} .unlink(file.path)
.then(() => callback(null))
private isAssetUploadFile(file: Express.Multer.File) { .catch(callback);
switch (file.fieldname as UploadFieldName) {
case UploadFieldName.ASSET_DATA: {
return true;
}
}
return false;
} }
private getHandler(route: RouteKey) { private getHandler(route: RouteKey) {

View File

@@ -63,7 +63,7 @@ export class StorageRepository {
} }
createWriteStream(filepath: string): Writable { createWriteStream(filepath: string): Writable {
return createWriteStream(filepath, { flags: 'w' }); return createWriteStream(filepath, { flags: 'w', flush: true });
} }
createOrOverwriteFile(filepath: string, buffer: Buffer) { createOrOverwriteFile(filepath: string, buffer: Buffer) {