|
|
|
|
@@ -6,7 +6,7 @@ import { extname, join } from 'node:path';
|
|
|
|
|
import { Readable } from 'node:stream';
|
|
|
|
|
import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants';
|
|
|
|
|
import { StorageCore } from 'src/cores/storage.core';
|
|
|
|
|
import { OnJob } from 'src/decorators';
|
|
|
|
|
import { OnEvent, OnJob } from 'src/decorators';
|
|
|
|
|
import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/asset-upload';
|
|
|
|
|
import { AuthDto } from 'src/dtos/auth.dto';
|
|
|
|
|
import {
|
|
|
|
|
@@ -14,11 +14,13 @@ import {
|
|
|
|
|
AssetStatus,
|
|
|
|
|
AssetType,
|
|
|
|
|
AssetVisibility,
|
|
|
|
|
ImmichWorker,
|
|
|
|
|
JobName,
|
|
|
|
|
JobStatus,
|
|
|
|
|
QueueName,
|
|
|
|
|
StorageFolder,
|
|
|
|
|
} from 'src/enum';
|
|
|
|
|
import { ArgOf } from 'src/repositories/event.repository';
|
|
|
|
|
import { BaseService } from 'src/services/base.service';
|
|
|
|
|
import { JobItem, JobOf } from 'src/types';
|
|
|
|
|
import { isAssetChecksumConstraint } from 'src/utils/database';
|
|
|
|
|
@@ -29,6 +31,24 @@ export const MAX_RUFH_INTEROP_VERSION = 8;
|
|
|
|
|
|
|
|
|
|
@Injectable()
|
|
|
|
|
export class AssetUploadService extends BaseService {
|
|
|
|
|
// This is used to proactively abort previous requests for the same asset
|
|
|
|
|
// when a new one arrives. The previous request still holds the asset lock
|
|
|
|
|
// and will prevent the new request from proceeding until the previous one
|
|
|
|
|
// times out. As normal client behavior will not have concurrent requests, we
|
|
|
|
|
// we can assume the previous request has already failed on the client end.
|
|
|
|
|
private activeRequests = new Map<string, { req: Readable; startTime: Date }>();
|
|
|
|
|
|
|
|
|
|
@OnEvent({ name: 'UploadAbort', workers: [ImmichWorker.Api] })
|
|
|
|
|
onUploadAbort({ assetId, abortTime }: ArgOf<'UploadAbort'>) {
|
|
|
|
|
const entry = this.activeRequests.get(assetId);
|
|
|
|
|
if (entry && abortTime > entry.startTime) {
|
|
|
|
|
this.activeRequests.delete(assetId);
|
|
|
|
|
entry.req.destroy();
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async startUpload(auth: AuthDto, req: Readable, res: Response, dto: StartUploadDto): Promise<void> {
|
|
|
|
|
this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`);
|
|
|
|
|
const { isComplete, assetData, uploadLength, contentLength, version } = dto;
|
|
|
|
|
@@ -57,15 +77,15 @@ export class AssetUploadService extends BaseService {
|
|
|
|
|
this.sendInterimResponse(res, location, version);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.addRequest(asset.id, req);
|
|
|
|
|
let checksumBuffer: Buffer | undefined;
|
|
|
|
|
const metadata = { id: asset.id, path: asset.path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt };
|
|
|
|
|
const writeStream = this.pipe(req, res, metadata);
|
|
|
|
|
|
|
|
|
|
const writeStream = this.storageRepository.createOrAppendWriteStream(asset.path);
|
|
|
|
|
if (isComplete) {
|
|
|
|
|
const hash = createHash('sha1');
|
|
|
|
|
req.on('data', (data: Buffer) => hash.update(data));
|
|
|
|
|
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
|
|
|
|
|
}
|
|
|
|
|
req.pipe(writeStream);
|
|
|
|
|
await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject));
|
|
|
|
|
this.setCompleteHeader(res, dto.version, isComplete);
|
|
|
|
|
if (!isComplete) {
|
|
|
|
|
@@ -77,7 +97,7 @@ export class AssetUploadService extends BaseService {
|
|
|
|
|
return await this.sendChecksumMismatch(res, asset.id, asset.path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await this.onComplete(metadata);
|
|
|
|
|
await this.onComplete({ id: asset.id, path: asset.path, fileModifiedAt: assetData.fileModifiedAt });
|
|
|
|
|
res.status(200).send({ id: asset.id });
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -85,6 +105,7 @@ export class AssetUploadService extends BaseService {
|
|
|
|
|
this.logger.verboseFn(() => `Resuming upload for ${id}: ${JSON.stringify(dto)}`);
|
|
|
|
|
const { isComplete, uploadLength, uploadOffset, contentLength, version } = dto;
|
|
|
|
|
this.setCompleteHeader(res, version, false);
|
|
|
|
|
this.addRequest(id, req);
|
|
|
|
|
return this.databaseRepository.withUuidLock(id, async () => {
|
|
|
|
|
const completionData = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
|
|
|
|
|
if (!completionData) {
|
|
|
|
|
@@ -117,8 +138,8 @@ export class AssetUploadService extends BaseService {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const metadata = { id, path, size: contentLength, fileModifiedAt };
|
|
|
|
|
const writeStream = this.pipe(req, res, metadata);
|
|
|
|
|
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
|
|
|
|
|
req.pipe(writeStream);
|
|
|
|
|
await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject));
|
|
|
|
|
this.setCompleteHeader(res, version, isComplete);
|
|
|
|
|
if (!isComplete) {
|
|
|
|
|
@@ -138,12 +159,13 @@ export class AssetUploadService extends BaseService {
|
|
|
|
|
return await this.sendChecksumMismatch(res, id, path);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
await this.onComplete(metadata);
|
|
|
|
|
await this.onComplete({ id, path, fileModifiedAt });
|
|
|
|
|
res.status(200).send({ id });
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cancelUpload(auth: AuthDto, assetId: string, res: Response): Promise<void> {
|
|
|
|
|
this.abortExistingRequest(assetId);
|
|
|
|
|
return this.databaseRepository.withUuidLock(assetId, async () => {
|
|
|
|
|
const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id);
|
|
|
|
|
if (!asset) {
|
|
|
|
|
@@ -160,6 +182,7 @@ export class AssetUploadService extends BaseService {
|
|
|
|
|
|
|
|
|
|
async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise<void> {
|
|
|
|
|
this.logger.verboseFn(() => `Getting upload status for ${id} with version ${version}`);
|
|
|
|
|
this.abortExistingRequest(id);
|
|
|
|
|
return this.databaseRepository.withUuidLock(id, async () => {
|
|
|
|
|
const asset = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
|
|
|
|
|
if (!asset) {
|
|
|
|
|
@@ -290,45 +313,24 @@ export class AssetUploadService extends BaseService {
|
|
|
|
|
await withRetry(() => this.assetRepository.removeAndDecrementQuota(assetId));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private pipe(req: Readable, res: Response, { id, path, size }: { id: string; path: string; size: number }) {
|
|
|
|
|
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
|
|
|
|
|
writeStream.on('error', (error) => {
|
|
|
|
|
this.logger.error(`Failed to write chunk to ${path}: ${error.message}`);
|
|
|
|
|
if (!res.headersSent) {
|
|
|
|
|
res.status(500).send();
|
|
|
|
|
private addRequest(assetId: string, req: Readable) {
|
|
|
|
|
const addTime = new Date();
|
|
|
|
|
const activeRequest = { req, startTime: addTime };
|
|
|
|
|
this.abortExistingRequest(assetId, addTime);
|
|
|
|
|
this.activeRequests.set(assetId, activeRequest);
|
|
|
|
|
req.on('close', () => {
|
|
|
|
|
if (this.activeRequests.get(assetId)?.req === req) {
|
|
|
|
|
this.activeRequests.delete(assetId);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
req.on('error', (error) => {
|
|
|
|
|
this.logger.error(`Failed to read request body: ${error.message}`);
|
|
|
|
|
if (!res.headersSent) {
|
|
|
|
|
res.status(500).send();
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
let receivedLength = 0;
|
|
|
|
|
req.on('data', (data: Buffer) => {
|
|
|
|
|
if (receivedLength + data.length > size) {
|
|
|
|
|
writeStream.destroy();
|
|
|
|
|
void this.onCancel(id, path).catch((error: any) =>
|
|
|
|
|
this.logger.error(`Failed to remove ${id} after too much data: ${error.message}`),
|
|
|
|
|
);
|
|
|
|
|
if (!res.headersSent) {
|
|
|
|
|
res.status(400).send('Received more data than specified in content-length');
|
|
|
|
|
}
|
|
|
|
|
res.on('finish', () => req.destroy());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
receivedLength += data.length;
|
|
|
|
|
if (!writeStream.write(data)) {
|
|
|
|
|
req.pause();
|
|
|
|
|
writeStream.once('drain', () => req.resume());
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
req.on('end', () => writeStream.end());
|
|
|
|
|
|
|
|
|
|
return writeStream;
|
|
|
|
|
private abortExistingRequest(assetId: string, abortTime = new Date()) {
|
|
|
|
|
const abortEvent = { assetId, abortTime };
|
|
|
|
|
// only emit if we didn't just abort it ourselves
|
|
|
|
|
if (!this.onUploadAbort(abortEvent)) {
|
|
|
|
|
this.eventRepository.serverSend('UploadAbort', abortEvent);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void {
|
|
|
|
|
@@ -373,11 +375,12 @@ export class AssetUploadService extends BaseService {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private validateQuota(auth: AuthDto, size: number): void {
|
|
|
|
|
if (auth.user.quotaSizeInBytes === null) {
|
|
|
|
|
const { quotaSizeInBytes: quotaLimit, quotaUsageInBytes: currentUsage } = auth.user;
|
|
|
|
|
if (quotaLimit === null) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (auth.user.quotaSizeInBytes < auth.user.quotaUsageInBytes + size) {
|
|
|
|
|
if (quotaLimit < currentUsage + size) {
|
|
|
|
|
throw new BadRequestException('Quota has been exceeded!');
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|