From caeba5063bd951d6929c4cc9cc3382d48d3b5baf Mon Sep 17 00:00:00 2001 From: Paul Makles Date: Tue, 10 Feb 2026 17:12:27 +0000 Subject: [PATCH] refactor(server): move database restores code into a service (#25918) * fix(server): use provided database name/username for restore & ensure name is not mangled fixes #25633 Signed-off-by: izzy * chore: add db switch back but with comments Signed-off-by: izzy * refactor: no need to restore database since it's not technically possible chore: late fallback for username in parameter builder Signed-off-by: izzy * chore: type fix Signed-off-by: izzy * refactor: move db backup code into service * test: check SQL sent to psql * chore: remove todo Signed-off-by: izzy --------- Signed-off-by: izzy --- server/src/app.module.ts | 2 + .../maintenance-worker.controller.ts | 10 +- .../maintenance-worker.service.spec.ts | 203 +---- .../maintenance/maintenance-worker.service.ts | 42 +- server/src/services/backup.service.spec.ts | 270 ------ server/src/services/backup.service.ts | 99 --- .../services/database-backup.service.spec.ts | 816 +++++++++++++++++- .../src/services/database-backup.service.ts | 565 +++++++++++- server/src/services/index.ts | 2 - server/src/utils/database-backups.ts | 460 +--------- server/test/utils.ts | 9 +- 11 files changed, 1379 insertions(+), 1099 deletions(-) delete mode 100644 server/src/services/backup.service.spec.ts delete mode 100644 server/src/services/backup.service.ts diff --git a/server/src/app.module.ts b/server/src/app.module.ts index 7d622ea23d..49b779ca18 100644 --- a/server/src/app.module.ts +++ b/server/src/app.module.ts @@ -33,6 +33,7 @@ import { WebsocketRepository } from 'src/repositories/websocket.repository'; import { services } from 'src/services'; import { AuthService } from 'src/services/auth.service'; import { CliService } from 'src/services/cli.service'; +import { DatabaseBackupService } from 'src/services/database-backup.service'; import { QueueService } from 'src/services/queue.service'; import { getKyselyConfig } from 'src/utils/database'; @@ -114,6 +115,7 @@ export class ApiModule extends BaseModule {} AppRepository, MaintenanceHealthRepository, MaintenanceWebsocketRepository, + DatabaseBackupService, MaintenanceWorkerService, ...commonMiddleware, { provide: APP_GUARD, useClass: MaintenanceAuthGuard }, diff --git a/server/src/maintenance/maintenance-worker.controller.ts b/server/src/maintenance/maintenance-worker.controller.ts index 72527e27c0..162fa27257 100644 --- a/server/src/maintenance/maintenance-worker.controller.ts +++ b/server/src/maintenance/maintenance-worker.controller.ts @@ -34,12 +34,14 @@ import { FilenameParamDto } from 'src/validation'; import type { DatabaseBackupController as _DatabaseBackupController } from 'src/controllers/database-backup.controller'; import type { ServerController as _ServerController } from 'src/controllers/server.controller'; import { DatabaseBackupDeleteDto, DatabaseBackupListResponseDto } from 'src/dtos/database-backup.dto'; +import { DatabaseBackupService } from 'src/services/database-backup.service'; @Controller() export class MaintenanceWorkerController { constructor( private logger: LoggingRepository, private service: MaintenanceWorkerService, + private databaseBackupService: DatabaseBackupService, ) {} /** @@ -61,7 +63,7 @@ export class MaintenanceWorkerController { @Get('admin/database-backups') @MaintenanceRoute() listDatabaseBackups(): Promise { - return this.service.listBackups(); + return this.databaseBackupService.listBackups(); } /** @@ -74,7 +76,7 @@ export class MaintenanceWorkerController { @Res() res: Response, @Next() next: NextFunction, ) { - await sendFile(res, next, () => this.service.downloadBackup(filename), this.logger); + await sendFile(res, next, () => this.databaseBackupService.downloadBackup(filename), this.logger); } /** @@ -83,7 +85,7 @@ export class MaintenanceWorkerController { @Delete('admin/database-backups') @MaintenanceRoute() async deleteDatabaseBackup(@Body() dto: DatabaseBackupDeleteDto): Promise { - return this.service.deleteBackup(dto.backups); + return this.databaseBackupService.deleteBackup(dto.backups); } /** @@ -96,7 +98,7 @@ export class MaintenanceWorkerController { @UploadedFile() file: Express.Multer.File, ): Promise { - return this.service.uploadBackup(file); + return this.databaseBackupService.uploadBackup(file); } @Get('admin/maintenance/status') diff --git a/server/src/maintenance/maintenance-worker.service.spec.ts b/server/src/maintenance/maintenance-worker.service.spec.ts index 9fd8f38fcb..1d5bee62b0 100644 --- a/server/src/maintenance/maintenance-worker.service.spec.ts +++ b/server/src/maintenance/maintenance-worker.service.spec.ts @@ -1,23 +1,18 @@ -import { BadRequestException, UnauthorizedException } from '@nestjs/common'; +import { UnauthorizedException } from '@nestjs/common'; import { SignJWT } from 'jose'; -import { DateTime } from 'luxon'; -import { PassThrough, Readable } from 'node:stream'; -import { StorageCore } from 'src/cores/storage.core'; -import { MaintenanceAction, StorageFolder, SystemMetadataKey } from 'src/enum'; +import { MaintenanceAction, SystemMetadataKey } from 'src/enum'; import { MaintenanceHealthRepository } from 'src/maintenance/maintenance-health.repository'; import { MaintenanceWebsocketRepository } from 'src/maintenance/maintenance-websocket.repository'; import { MaintenanceWorkerService } from 'src/maintenance/maintenance-worker.service'; -import { automock, AutoMocked, getMocks, mockDuplex, mockSpawn, ServiceMocks } from 'test/utils'; - -function* mockData() { - yield ''; -} +import { DatabaseBackupService } from 'src/services/database-backup.service'; +import { automock, AutoMocked, getMocks, ServiceMocks } from 'test/utils'; describe(MaintenanceWorkerService.name, () => { let sut: MaintenanceWorkerService; let mocks: ServiceMocks; let maintenanceWebsocketRepositoryMock: AutoMocked; let maintenanceHealthRepositoryMock: AutoMocked; + let databaseBackupServiceMock: AutoMocked; beforeEach(() => { mocks = getMocks(); @@ -29,6 +24,20 @@ describe(MaintenanceWorkerService.name, () => { args: [mocks.logger], strict: false, }); + databaseBackupServiceMock = automock(DatabaseBackupService, { + args: [ + mocks.logger, + mocks.storage, + mocks.config, + mocks.systemMetadata, + mocks.process, + mocks.database, + mocks.cron, + mocks.job, + maintenanceHealthRepositoryMock, + ], + strict: false, + }); sut = new MaintenanceWorkerService( mocks.logger as never, @@ -40,6 +49,7 @@ describe(MaintenanceWorkerService.name, () => { mocks.storage as never, mocks.process, mocks.database as never, + databaseBackupServiceMock, ); sut.mock({ @@ -310,17 +320,6 @@ describe(MaintenanceWorkerService.name, () => { describe('action: restore database', () => { beforeEach(() => { mocks.database.tryLock.mockResolvedValueOnce(true); - - mocks.storage.readdir.mockResolvedValue([]); - mocks.process.spawn.mockReturnValue(mockSpawn(0, 'data', '')); - mocks.process.spawnDuplexStream.mockImplementation(() => mockDuplex('command', 0, 'data', '')); - mocks.process.fork.mockImplementation(() => mockSpawn(0, 'Immich Server is listening', '')); - mocks.storage.rename.mockResolvedValue(); - mocks.storage.unlink.mockResolvedValue(); - mocks.storage.createPlainReadStream.mockReturnValue(Readable.from(mockData())); - mocks.storage.createWriteStream.mockReturnValue(new PassThrough()); - mocks.storage.createGzip.mockReturnValue(new PassThrough()); - mocks.storage.createGunzip.mockReturnValue(new PassThrough()); }); it('should update maintenance mode state', async () => { @@ -341,21 +340,7 @@ describe(MaintenanceWorkerService.name, () => { }); }); - it('should fail to restore invalid backup', async () => { - await sut.runAction({ - action: MaintenanceAction.RestoreDatabase, - restoreBackupFilename: 'filename', - }); - - expect(maintenanceWebsocketRepositoryMock.clientSend).toHaveBeenCalledWith('MaintenanceStatusV1', 'private', { - active: true, - action: MaintenanceAction.RestoreDatabase, - error: 'Error: Invalid backup file format!', - task: 'error', - }); - }); - - it('should successfully run a backup', async () => { + it('should defer to database backup service', async () => { await sut.runAction({ action: MaintenanceAction.RestoreDatabase, restoreBackupFilename: 'development-filename.sql', @@ -380,13 +365,10 @@ describe(MaintenanceWorkerService.name, () => { action: 'end', }, ); - - expect(maintenanceHealthRepositoryMock.checkApiHealth).toHaveBeenCalled(); - expect(mocks.process.spawnDuplexStream).toHaveBeenCalledTimes(3); }); - it('should fail if backup creation fails', async () => { - mocks.process.spawnDuplexStream.mockReturnValueOnce(mockDuplex('pg_dump', 1, '', 'error')); + it('should forward errors from database backup service', async () => { + databaseBackupServiceMock.restoreDatabaseBackup.mockRejectedValue('Sample error'); await sut.runAction({ action: MaintenanceAction.RestoreDatabase, @@ -396,149 +378,16 @@ describe(MaintenanceWorkerService.name, () => { expect(maintenanceWebsocketRepositoryMock.clientSend).toHaveBeenCalledWith('MaintenanceStatusV1', 'private', { active: true, action: MaintenanceAction.RestoreDatabase, - error: 'Error: pg_dump non-zero exit code (1)\nerror', + error: 'Sample error', task: 'error', }); - expect(maintenanceWebsocketRepositoryMock.clientSend).toHaveBeenLastCalledWith( - 'MaintenanceStatusV1', - expect.any(String), - expect.objectContaining({ - task: 'error', - }), - ); - }); - - it('should fail if restore itself fails', async () => { - mocks.process.spawnDuplexStream - .mockReturnValueOnce(mockDuplex('pg_dump', 0, 'data', '')) - .mockReturnValueOnce(mockDuplex('gzip', 0, 'data', '')) - .mockReturnValueOnce(mockDuplex('psql', 1, '', 'error')); - - await sut.runAction({ - action: MaintenanceAction.RestoreDatabase, - restoreBackupFilename: 'development-filename.sql', - }); - - expect(maintenanceWebsocketRepositoryMock.clientSend).toHaveBeenCalledWith('MaintenanceStatusV1', 'private', { + expect(maintenanceWebsocketRepositoryMock.clientSend).toHaveBeenCalledWith('MaintenanceStatusV1', 'public', { active: true, action: MaintenanceAction.RestoreDatabase, - error: 'Error: psql non-zero exit code (1)\nerror', + error: 'Something went wrong, see logs!', task: 'error', }); - - expect(maintenanceWebsocketRepositoryMock.clientSend).toHaveBeenLastCalledWith( - 'MaintenanceStatusV1', - expect.any(String), - expect.objectContaining({ - task: 'error', - }), - ); - }); - - it('should rollback if database migrations fail', async () => { - mocks.database.runMigrations.mockRejectedValue(new Error('Migrations Error')); - - await sut.runAction({ - action: MaintenanceAction.RestoreDatabase, - restoreBackupFilename: 'development-filename.sql', - }); - - expect(maintenanceWebsocketRepositoryMock.clientSend).toHaveBeenCalledWith('MaintenanceStatusV1', 'private', { - active: true, - action: MaintenanceAction.RestoreDatabase, - error: 'Error: Migrations Error', - task: 'error', - }); - - expect(maintenanceHealthRepositoryMock.checkApiHealth).toHaveBeenCalledTimes(0); - expect(mocks.process.spawnDuplexStream).toHaveBeenCalledTimes(4); - }); - - it('should rollback if API healthcheck fails', async () => { - maintenanceHealthRepositoryMock.checkApiHealth.mockRejectedValue(new Error('Health Error')); - - await sut.runAction({ - action: MaintenanceAction.RestoreDatabase, - restoreBackupFilename: 'development-filename.sql', - }); - - expect(maintenanceWebsocketRepositoryMock.clientSend).toHaveBeenCalledWith('MaintenanceStatusV1', 'private', { - active: true, - action: MaintenanceAction.RestoreDatabase, - error: 'Error: Health Error', - task: 'error', - }); - - expect(maintenanceHealthRepositoryMock.checkApiHealth).toHaveBeenCalled(); - expect(mocks.process.spawnDuplexStream).toHaveBeenCalledTimes(4); - }); - }); - - /** - * Backups - */ - - describe('listBackups', () => { - it('should give us all backups', async () => { - mocks.storage.readdir.mockResolvedValue([ - `immich-db-backup-${DateTime.fromISO('2025-07-25T11:02:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz.tmp`, - `immich-db-backup-${DateTime.fromISO('2025-07-27T11:01:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz`, - 'immich-db-backup-1753789649000.sql.gz', - `immich-db-backup-${DateTime.fromISO('2025-07-29T11:01:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz`, - ]); - mocks.storage.stat.mockResolvedValue({ size: 1024 } as any); - - await expect(sut.listBackups()).resolves.toMatchObject({ - backups: [ - { filename: 'immich-db-backup-20250729T110116-v1.234.5-pg14.5.sql.gz', filesize: 1024 }, - { filename: 'immich-db-backup-20250727T110116-v1.234.5-pg14.5.sql.gz', filesize: 1024 }, - { filename: 'immich-db-backup-1753789649000.sql.gz', filesize: 1024 }, - ], - }); - }); - }); - - describe('deleteBackup', () => { - it('should reject invalid file names', async () => { - await expect(sut.deleteBackup(['filename'])).rejects.toThrowError( - new BadRequestException('Invalid backup name!'), - ); - }); - - it('should unlink the target file', async () => { - await sut.deleteBackup(['filename.sql']); - expect(mocks.storage.unlink).toHaveBeenCalledTimes(1); - expect(mocks.storage.unlink).toHaveBeenCalledWith( - `${StorageCore.getBaseFolder(StorageFolder.Backups)}/filename.sql`, - ); - }); - }); - - describe('uploadBackup', () => { - it('should reject invalid file names', async () => { - await expect(sut.uploadBackup({ originalname: 'invalid backup' } as never)).rejects.toThrowError( - new BadRequestException('Invalid backup name!'), - ); - }); - - it('should write file', async () => { - await sut.uploadBackup({ originalname: 'path.sql.gz', buffer: 'buffer' } as never); - expect(mocks.storage.createOrOverwriteFile).toBeCalledWith('/data/backups/uploaded-path.sql.gz', 'buffer'); - }); - }); - - describe('downloadBackup', () => { - it('should reject invalid file names', () => { - expect(() => sut.downloadBackup('invalid backup')).toThrowError(new BadRequestException('Invalid backup name!')); - }); - - it('should get backup path', () => { - expect(sut.downloadBackup('hello.sql.gz')).toEqual( - expect.objectContaining({ - path: '/data/backups/hello.sql.gz', - }), - ); }); }); }); diff --git a/server/src/maintenance/maintenance-worker.service.ts b/server/src/maintenance/maintenance-worker.service.ts index 6415693733..9ceb3caa43 100644 --- a/server/src/maintenance/maintenance-worker.service.ts +++ b/server/src/maintenance/maintenance-worker.service.ts @@ -25,19 +25,11 @@ import { StorageRepository } from 'src/repositories/storage.repository'; import { SystemMetadataRepository } from 'src/repositories/system-metadata.repository'; import { type ApiService as _ApiService } from 'src/services/api.service'; import { type BaseService as _BaseService } from 'src/services/base.service'; -import { type DatabaseBackupService as _DatabaseBackupService } from 'src/services/database-backup.service'; +import { DatabaseBackupService } from 'src/services/database-backup.service'; import { type ServerService as _ServerService } from 'src/services/server.service'; import { type VersionService as _VersionService } from 'src/services/version.service'; import { MaintenanceModeState } from 'src/types'; import { getConfig } from 'src/utils/config'; -import { - deleteDatabaseBackup, - downloadDatabaseBackup, - listDatabaseBackups, - restoreDatabaseBackup, - uploadDatabaseBackup, -} from 'src/utils/database-backups'; -import { ImmichFileResponse } from 'src/utils/file'; import { createMaintenanceLoginUrl, detectPriorInstall } from 'src/utils/maintenance'; import { getExternalDomain } from 'src/utils/misc'; @@ -62,6 +54,7 @@ export class MaintenanceWorkerService { private storageRepository: StorageRepository, private processRepository: ProcessRepository, private databaseRepository: DatabaseRepository, + private databaseBackupService: DatabaseBackupService, ) { this.logger.setContext(this.constructor.name); } @@ -187,35 +180,6 @@ export class MaintenanceWorkerService { return '/usr/src/app/upload'; } - /** - * {@link _DatabaseBackupService.listBackups} - */ - async listBackups(): Promise<{ backups: { filename: string; filesize: number }[] }> { - const backups = await listDatabaseBackups(this.backupRepos); - return { backups }; - } - - /** - * {@link _DatabaseBackupService.deleteBackup} - */ - async deleteBackup(files: string[]): Promise { - return deleteDatabaseBackup(this.backupRepos, files); - } - - /** - * {@link _DatabaseBackupService.uploadBackup} - */ - async uploadBackup(file: Express.Multer.File): Promise { - return uploadDatabaseBackup(this.backupRepos, file); - } - - /** - * {@link _DatabaseBackupService.downloadBackup} - */ - downloadBackup(fileName: string): ImmichFileResponse { - return downloadDatabaseBackup(fileName); - } - private get secret() { if (!this.#secret) { throw new Error('Secret is not initialised yet.'); @@ -364,7 +328,7 @@ export class MaintenanceWorkerService { progress: 0, }); - await restoreDatabaseBackup(this.backupRepos, filename, (task, progress) => + await this.databaseBackupService.restoreDatabaseBackup(filename, (task, progress) => this.setStatus({ active: true, action: MaintenanceAction.RestoreDatabase, diff --git a/server/src/services/backup.service.spec.ts b/server/src/services/backup.service.spec.ts deleted file mode 100644 index ea80dd5759..0000000000 --- a/server/src/services/backup.service.spec.ts +++ /dev/null @@ -1,270 +0,0 @@ -import { DateTime } from 'luxon'; -import { PassThrough } from 'node:stream'; -import { defaults, SystemConfig } from 'src/config'; -import { StorageCore } from 'src/cores/storage.core'; -import { ImmichWorker, JobStatus, StorageFolder } from 'src/enum'; -import { BackupService } from 'src/services/backup.service'; -import { systemConfigStub } from 'test/fixtures/system-config.stub'; -import { mockDuplex, mockSpawn, newTestService, ServiceMocks } from 'test/utils'; -import { describe } from 'vitest'; - -describe(BackupService.name, () => { - let sut: BackupService; - let mocks: ServiceMocks; - - beforeEach(() => { - ({ sut, mocks } = newTestService(BackupService)); - }); - - it('should work', () => { - expect(sut).toBeDefined(); - }); - - describe('onBootstrapEvent', () => { - it('should init cron job and handle config changes', async () => { - mocks.database.tryLock.mockResolvedValue(true); - mocks.cron.create.mockResolvedValue(); - - await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig }); - - expect(mocks.cron.create).toHaveBeenCalled(); - }); - - it('should not initialize backup database cron job when lock is taken', async () => { - mocks.database.tryLock.mockResolvedValue(false); - - await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig }); - - expect(mocks.cron.create).not.toHaveBeenCalled(); - }); - - it('should not initialise backup database job when running on microservices', async () => { - mocks.config.getWorker.mockReturnValue(ImmichWorker.Microservices); - await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig }); - - expect(mocks.cron.create).not.toHaveBeenCalled(); - }); - }); - - describe('onConfigUpdateEvent', () => { - beforeEach(async () => { - mocks.database.tryLock.mockResolvedValue(true); - mocks.cron.create.mockResolvedValue(); - - await sut.onConfigInit({ newConfig: defaults }); - }); - - it('should update cron job if backup is enabled', () => { - mocks.cron.update.mockResolvedValue(); - - sut.onConfigUpdate({ - oldConfig: defaults, - newConfig: { - backup: { - database: { - enabled: true, - cronExpression: '0 1 * * *', - }, - }, - } as SystemConfig, - }); - - expect(mocks.cron.update).toHaveBeenCalledWith({ name: 'backupDatabase', expression: '0 1 * * *', start: true }); - expect(mocks.cron.update).toHaveBeenCalled(); - }); - - it('should do nothing if instance does not have the backup database lock', async () => { - mocks.database.tryLock.mockResolvedValue(false); - await sut.onConfigInit({ newConfig: defaults }); - sut.onConfigUpdate({ newConfig: systemConfigStub.backupEnabled as SystemConfig, oldConfig: defaults }); - expect(mocks.cron.update).not.toHaveBeenCalled(); - }); - }); - - describe('cleanupDatabaseBackups', () => { - it('should do nothing if not reached keepLastAmount', async () => { - mocks.systemMetadata.get.mockResolvedValue(systemConfigStub.backupEnabled); - mocks.storage.readdir.mockResolvedValue(['immich-db-backup-1.sql.gz']); - await sut.cleanupDatabaseBackups(); - expect(mocks.storage.unlink).not.toHaveBeenCalled(); - }); - - it('should remove failed backup files', async () => { - mocks.systemMetadata.get.mockResolvedValue(systemConfigStub.backupEnabled); - //`immich-db-backup-${DateTime.now().toFormat("yyyyLLdd'T'HHmmss")}-v${serverVersion.toString()}-pg${databaseVersion.split(' ')[0]}.sql.gz.tmp`, - mocks.storage.readdir.mockResolvedValue([ - 'immich-db-backup-123.sql.gz.tmp', - `immich-db-backup-${DateTime.fromISO('2025-07-25T11:02:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz.tmp`, - `immich-db-backup-${DateTime.fromISO('2025-07-27T11:01:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz`, - `immich-db-backup-${DateTime.fromISO('2025-07-29T11:01:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz.tmp`, - ]); - await sut.cleanupDatabaseBackups(); - expect(mocks.storage.unlink).toHaveBeenCalledTimes(3); - expect(mocks.storage.unlink).toHaveBeenCalledWith( - `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-123.sql.gz.tmp`, - ); - expect(mocks.storage.unlink).toHaveBeenCalledWith( - `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-20250725T110216-v1.234.5-pg14.5.sql.gz.tmp`, - ); - expect(mocks.storage.unlink).toHaveBeenCalledWith( - `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-20250729T110116-v1.234.5-pg14.5.sql.gz.tmp`, - ); - }); - - it('should remove old backup files over keepLastAmount', async () => { - mocks.systemMetadata.get.mockResolvedValue(systemConfigStub.backupEnabled); - mocks.storage.readdir.mockResolvedValue(['immich-db-backup-1.sql.gz', 'immich-db-backup-2.sql.gz']); - await sut.cleanupDatabaseBackups(); - expect(mocks.storage.unlink).toHaveBeenCalledTimes(1); - expect(mocks.storage.unlink).toHaveBeenCalledWith( - `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-1.sql.gz`, - ); - }); - - it('should remove old backup files over keepLastAmount and failed backups', async () => { - mocks.systemMetadata.get.mockResolvedValue(systemConfigStub.backupEnabled); - mocks.storage.readdir.mockResolvedValue([ - `immich-db-backup-${DateTime.fromISO('2025-07-25T11:02:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz.tmp`, - `immich-db-backup-${DateTime.fromISO('2025-07-27T11:01:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz`, - 'immich-db-backup-1753789649000.sql.gz', - `immich-db-backup-${DateTime.fromISO('2025-07-29T11:01:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz`, - ]); - await sut.cleanupDatabaseBackups(); - expect(mocks.storage.unlink).toHaveBeenCalledTimes(3); - expect(mocks.storage.unlink).toHaveBeenCalledWith( - `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-1753789649000.sql.gz`, - ); - expect(mocks.storage.unlink).toHaveBeenCalledWith( - `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-20250725T110216-v1.234.5-pg14.5.sql.gz.tmp`, - ); - expect(mocks.storage.unlink).toHaveBeenCalledWith( - `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-20250727T110116-v1.234.5-pg14.5.sql.gz`, - ); - }); - }); - - describe('handleBackupDatabase', () => { - beforeEach(() => { - mocks.storage.readdir.mockResolvedValue([]); - mocks.process.spawn.mockReturnValue(mockSpawn(0, 'data', '')); - mocks.process.spawnDuplexStream.mockImplementation(() => mockDuplex('command', 0, 'data', '')); - mocks.storage.rename.mockResolvedValue(); - mocks.storage.unlink.mockResolvedValue(); - mocks.systemMetadata.get.mockResolvedValue(systemConfigStub.backupEnabled); - mocks.storage.createWriteStream.mockReturnValue(new PassThrough()); - }); - - it('should sanitize DB_URL (remove uselibpqcompat) before calling pg_dumpall', async () => { - // create a service instance with a URL connection that includes libpqcompat - const dbUrl = 'postgresql://postgres:pwd@host:5432/immich?sslmode=require&uselibpqcompat=true'; - const configMock = { - getEnv: () => ({ database: { config: { connectionType: 'url', url: dbUrl }, skipMigrations: false } }), - getWorker: () => ImmichWorker.Api, - isDev: () => false, - } as unknown as any; - - ({ sut, mocks } = newTestService(BackupService, { config: configMock })); - - mocks.storage.readdir.mockResolvedValue([]); - mocks.process.spawnDuplexStream.mockImplementation(() => mockDuplex('command', 0, 'data', '')); - mocks.storage.rename.mockResolvedValue(); - mocks.storage.unlink.mockResolvedValue(); - mocks.systemMetadata.get.mockResolvedValue(systemConfigStub.backupEnabled); - mocks.storage.createWriteStream.mockReturnValue(new PassThrough()); - mocks.database.getPostgresVersion.mockResolvedValue('14.10'); - - await sut.handleBackupDatabase(); - - expect(mocks.process.spawnDuplexStream).toHaveBeenCalled(); - const call = mocks.process.spawnDuplexStream.mock.calls[0]; - const args = call[1] as string[]; - expect(args).toMatchInlineSnapshot(` - [ - "postgresql://postgres:pwd@host:5432/immich?sslmode=require", - "--clean", - "--if-exists", - ] - `); - }); - - it('should run a database backup successfully', async () => { - const result = await sut.handleBackupDatabase(); - expect(result).toBe(JobStatus.Success); - expect(mocks.storage.createWriteStream).toHaveBeenCalled(); - }); - - it('should rename file on success', async () => { - const result = await sut.handleBackupDatabase(); - expect(result).toBe(JobStatus.Success); - expect(mocks.storage.rename).toHaveBeenCalled(); - }); - - it('should fail if pg_dump fails', async () => { - mocks.process.spawnDuplexStream.mockReturnValueOnce(mockDuplex('pg_dump', 1, '', 'error')); - await expect(sut.handleBackupDatabase()).rejects.toThrow('pg_dump non-zero exit code (1)'); - }); - - it('should not rename file if pgdump fails and gzip succeeds', async () => { - mocks.process.spawnDuplexStream.mockReturnValueOnce(mockDuplex('pg_dump', 1, '', 'error')); - await expect(sut.handleBackupDatabase()).rejects.toThrow('pg_dump non-zero exit code (1)'); - expect(mocks.storage.rename).not.toHaveBeenCalled(); - }); - - it('should fail if gzip fails', async () => { - mocks.process.spawnDuplexStream.mockReturnValueOnce(mockDuplex('pg_dump', 0, 'data', '')); - mocks.process.spawnDuplexStream.mockReturnValueOnce(mockDuplex('gzip', 1, '', 'error')); - await expect(sut.handleBackupDatabase()).rejects.toThrow('gzip non-zero exit code (1)'); - }); - - it('should fail if write stream fails', async () => { - mocks.storage.createWriteStream.mockImplementation(() => { - throw new Error('error'); - }); - await expect(sut.handleBackupDatabase()).rejects.toThrow('error'); - }); - - it('should fail if rename fails', async () => { - mocks.storage.rename.mockRejectedValue(new Error('error')); - await expect(sut.handleBackupDatabase()).rejects.toThrow('error'); - }); - - it('should ignore unlink failing and still return failed job status', async () => { - mocks.process.spawnDuplexStream.mockReturnValueOnce(mockDuplex('pg_dump', 1, '', 'error')); - mocks.storage.unlink.mockRejectedValue(new Error('error')); - await expect(sut.handleBackupDatabase()).rejects.toThrow('pg_dump non-zero exit code (1)'); - expect(mocks.storage.unlink).toHaveBeenCalled(); - }); - - it.each` - postgresVersion | expectedVersion - ${'14.10'} | ${14} - ${'14.10.3'} | ${14} - ${'14.10 (Debian 14.10-1.pgdg120+1)'} | ${14} - ${'15.3.3'} | ${15} - ${'16.4.2'} | ${16} - ${'17.15.1'} | ${17} - ${'18.0.0'} | ${18} - `( - `should use pg_dump $expectedVersion with postgres version $postgresVersion`, - async ({ postgresVersion, expectedVersion }) => { - mocks.database.getPostgresVersion.mockResolvedValue(postgresVersion); - await sut.handleBackupDatabase(); - expect(mocks.process.spawnDuplexStream).toHaveBeenCalledWith( - `/usr/lib/postgresql/${expectedVersion}/bin/pg_dump`, - expect.any(Array), - expect.any(Object), - ); - }, - ); - it.each` - postgresVersion - ${'13.99.99'} - ${'19.0.0'} - `(`should fail if postgres version $postgresVersion is not supported`, async ({ postgresVersion }) => { - mocks.database.getPostgresVersion.mockResolvedValue(postgresVersion); - const result = await sut.handleBackupDatabase(); - expect(mocks.process.spawn).not.toHaveBeenCalled(); - expect(result).toBe(JobStatus.Failed); - }); - }); -}); diff --git a/server/src/services/backup.service.ts b/server/src/services/backup.service.ts deleted file mode 100644 index 637e968929..0000000000 --- a/server/src/services/backup.service.ts +++ /dev/null @@ -1,99 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import path from 'node:path'; -import { StorageCore } from 'src/cores/storage.core'; -import { OnEvent, OnJob } from 'src/decorators'; -import { DatabaseLock, ImmichWorker, JobName, JobStatus, QueueName, StorageFolder } from 'src/enum'; -import { ArgOf } from 'src/repositories/event.repository'; -import { BaseService } from 'src/services/base.service'; -import { - createDatabaseBackup, - isFailedDatabaseBackupName, - isValidDatabaseRoutineBackupName, - UnsupportedPostgresError, -} from 'src/utils/database-backups'; -import { handlePromiseError } from 'src/utils/misc'; - -@Injectable() -export class BackupService extends BaseService { - private backupLock = false; - - @OnEvent({ name: 'ConfigInit', workers: [ImmichWorker.Microservices] }) - async onConfigInit({ - newConfig: { - backup: { database }, - }, - }: ArgOf<'ConfigInit'>) { - this.backupLock = await this.databaseRepository.tryLock(DatabaseLock.BackupDatabase); - - if (this.backupLock) { - this.cronRepository.create({ - name: 'backupDatabase', - expression: database.cronExpression, - onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.DatabaseBackup }), this.logger), - start: database.enabled, - }); - } - } - - @OnEvent({ name: 'ConfigUpdate', server: true }) - onConfigUpdate({ newConfig: { backup } }: ArgOf<'ConfigUpdate'>) { - if (!this.backupLock) { - return; - } - - this.cronRepository.update({ - name: 'backupDatabase', - expression: backup.database.cronExpression, - start: backup.database.enabled, - }); - } - - async cleanupDatabaseBackups() { - this.logger.debug(`Database Backup Cleanup Started`); - const { - backup: { database: config }, - } = await this.getConfig({ withCache: false }); - - const backupsFolder = StorageCore.getBaseFolder(StorageFolder.Backups); - const files = await this.storageRepository.readdir(backupsFolder); - const backups = files - .filter((filename) => isValidDatabaseRoutineBackupName(filename)) - .toSorted() - .toReversed(); - const failedBackups = files.filter((filename) => isFailedDatabaseBackupName(filename)); - - const toDelete = backups.slice(config.keepLastAmount); - toDelete.push(...failedBackups); - - for (const file of toDelete) { - await this.storageRepository.unlink(path.join(backupsFolder, file)); - } - this.logger.debug(`Database Backup Cleanup Finished, deleted ${toDelete.length} backups`); - } - - @OnJob({ name: JobName.DatabaseBackup, queue: QueueName.BackupDatabase }) - async handleBackupDatabase(): Promise { - try { - await createDatabaseBackup(this.backupRepos); - } catch (error) { - if (error instanceof UnsupportedPostgresError) { - return JobStatus.Failed; - } - - throw error; - } - - await this.cleanupDatabaseBackups(); - return JobStatus.Success; - } - - private get backupRepos() { - return { - logger: this.logger, - storage: this.storageRepository, - config: this.configRepository, - process: this.processRepository, - database: this.databaseRepository, - }; - } -} diff --git a/server/src/services/database-backup.service.spec.ts b/server/src/services/database-backup.service.spec.ts index 4d68b02325..9ca37200b7 100644 --- a/server/src/services/database-backup.service.spec.ts +++ b/server/src/services/database-backup.service.spec.ts @@ -1,23 +1,594 @@ import { BadRequestException } from '@nestjs/common'; import { DateTime } from 'luxon'; +import { PassThrough, Readable } from 'node:stream'; +import { defaults, SystemConfig } from 'src/config'; import { StorageCore } from 'src/cores/storage.core'; -import { StorageFolder } from 'src/enum'; +import { ImmichWorker, JobStatus, StorageFolder } from 'src/enum'; +import { MaintenanceHealthRepository } from 'src/maintenance/maintenance-health.repository'; import { DatabaseBackupService } from 'src/services/database-backup.service'; -import { MaintenanceService } from 'src/services/maintenance.service'; -import { newTestService, ServiceMocks } from 'test/utils'; +import { systemConfigStub } from 'test/fixtures/system-config.stub'; +import { automock, AutoMocked, getMocks, mockDuplex, mockSpawn, ServiceMocks } from 'test/utils'; -describe(MaintenanceService.name, () => { +describe(DatabaseBackupService.name, () => { let sut: DatabaseBackupService; let mocks: ServiceMocks; + let maintenanceHealthRepositoryMock: AutoMocked; beforeEach(() => { - ({ sut, mocks } = newTestService(DatabaseBackupService)); + mocks = getMocks(); + maintenanceHealthRepositoryMock = automock(MaintenanceHealthRepository, { + args: [mocks.logger], + strict: false, + }); + sut = new DatabaseBackupService( + mocks.logger as never, + mocks.storage as never, + mocks.config, + mocks.systemMetadata as never, + mocks.process, + mocks.database as never, + mocks.cron as never, + mocks.job as never, + maintenanceHealthRepositoryMock as never, + ); }); it('should work', () => { expect(sut).toBeDefined(); }); + describe('onBootstrapEvent', () => { + it('should init cron job and handle config changes', async () => { + mocks.database.tryLock.mockResolvedValue(true); + mocks.cron.create.mockResolvedValue(); + + await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig }); + + expect(mocks.cron.create).toHaveBeenCalled(); + }); + + it('should not initialize backup database cron job when lock is taken', async () => { + mocks.database.tryLock.mockResolvedValue(false); + + await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig }); + + expect(mocks.cron.create).not.toHaveBeenCalled(); + }); + + it('should not initialise backup database job when running on microservices', async () => { + mocks.config.getWorker.mockReturnValue(ImmichWorker.Microservices); + await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig }); + + expect(mocks.cron.create).not.toHaveBeenCalled(); + }); + }); + + describe('onConfigUpdateEvent', () => { + beforeEach(async () => { + mocks.database.tryLock.mockResolvedValue(true); + mocks.cron.create.mockResolvedValue(); + + await sut.onConfigInit({ newConfig: defaults }); + }); + + it('should update cron job if backup is enabled', () => { + mocks.cron.update.mockResolvedValue(); + + sut.onConfigUpdate({ + oldConfig: defaults, + newConfig: { + backup: { + database: { + enabled: true, + cronExpression: '0 1 * * *', + }, + }, + } as SystemConfig, + }); + + expect(mocks.cron.update).toHaveBeenCalledWith({ name: 'backupDatabase', expression: '0 1 * * *', start: true }); + expect(mocks.cron.update).toHaveBeenCalled(); + }); + + it('should do nothing if instance does not have the backup database lock', async () => { + mocks.database.tryLock.mockResolvedValue(false); + await sut.onConfigInit({ newConfig: defaults }); + sut.onConfigUpdate({ newConfig: systemConfigStub.backupEnabled as SystemConfig, oldConfig: defaults }); + expect(mocks.cron.update).not.toHaveBeenCalled(); + }); + }); + + describe('cleanupDatabaseBackups', () => { + it('should do nothing if not reached keepLastAmount', async () => { + mocks.systemMetadata.get.mockResolvedValue(systemConfigStub.backupEnabled); + mocks.storage.readdir.mockResolvedValue(['immich-db-backup-1.sql.gz']); + await sut.cleanupDatabaseBackups(); + expect(mocks.storage.unlink).not.toHaveBeenCalled(); + }); + + it('should remove failed backup files', async () => { + mocks.systemMetadata.get.mockResolvedValue(systemConfigStub.backupEnabled); + //`immich-db-backup-${DateTime.now().toFormat("yyyyLLdd'T'HHmmss")}-v${serverVersion.toString()}-pg${databaseVersion.split(' ')[0]}.sql.gz.tmp`, + mocks.storage.readdir.mockResolvedValue([ + 'immich-db-backup-123.sql.gz.tmp', + `immich-db-backup-${DateTime.fromISO('2025-07-25T11:02:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz.tmp`, + `immich-db-backup-${DateTime.fromISO('2025-07-27T11:01:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz`, + `immich-db-backup-${DateTime.fromISO('2025-07-29T11:01:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz.tmp`, + ]); + await sut.cleanupDatabaseBackups(); + expect(mocks.storage.unlink).toHaveBeenCalledTimes(3); + expect(mocks.storage.unlink).toHaveBeenCalledWith( + `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-123.sql.gz.tmp`, + ); + expect(mocks.storage.unlink).toHaveBeenCalledWith( + `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-20250725T110216-v1.234.5-pg14.5.sql.gz.tmp`, + ); + expect(mocks.storage.unlink).toHaveBeenCalledWith( + `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-20250729T110116-v1.234.5-pg14.5.sql.gz.tmp`, + ); + }); + + it('should remove old backup files over keepLastAmount', async () => { + mocks.systemMetadata.get.mockResolvedValue(systemConfigStub.backupEnabled); + mocks.storage.readdir.mockResolvedValue(['immich-db-backup-1.sql.gz', 'immich-db-backup-2.sql.gz']); + await sut.cleanupDatabaseBackups(); + expect(mocks.storage.unlink).toHaveBeenCalledTimes(1); + expect(mocks.storage.unlink).toHaveBeenCalledWith( + `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-1.sql.gz`, + ); + }); + + it('should remove old backup files over keepLastAmount and failed backups', async () => { + mocks.systemMetadata.get.mockResolvedValue(systemConfigStub.backupEnabled); + mocks.storage.readdir.mockResolvedValue([ + `immich-db-backup-${DateTime.fromISO('2025-07-25T11:02:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz.tmp`, + `immich-db-backup-${DateTime.fromISO('2025-07-27T11:01:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz`, + 'immich-db-backup-1753789649000.sql.gz', + `immich-db-backup-${DateTime.fromISO('2025-07-29T11:01:16Z').toFormat("yyyyLLdd'T'HHmmss")}-v1.234.5-pg14.5.sql.gz`, + ]); + await sut.cleanupDatabaseBackups(); + expect(mocks.storage.unlink).toHaveBeenCalledTimes(3); + expect(mocks.storage.unlink).toHaveBeenCalledWith( + `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-1753789649000.sql.gz`, + ); + expect(mocks.storage.unlink).toHaveBeenCalledWith( + `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-20250725T110216-v1.234.5-pg14.5.sql.gz.tmp`, + ); + expect(mocks.storage.unlink).toHaveBeenCalledWith( + `${StorageCore.getBaseFolder(StorageFolder.Backups)}/immich-db-backup-20250727T110116-v1.234.5-pg14.5.sql.gz`, + ); + }); + }); + + describe('handleBackupDatabase / createDatabaseBackup', () => { + beforeEach(() => { + mocks.storage.readdir.mockResolvedValue([]); + mocks.process.spawn.mockReturnValue(mockSpawn(0, 'data', '')); + mocks.process.spawnDuplexStream.mockImplementation(() => mockDuplex()('command', 0, 'data', '')); + mocks.storage.rename.mockResolvedValue(); + mocks.storage.unlink.mockResolvedValue(); + mocks.systemMetadata.get.mockResolvedValue(systemConfigStub.backupEnabled); + mocks.storage.createWriteStream.mockReturnValue(new PassThrough()); + }); + + it('should sanitize DB_URL (remove uselibpqcompat) before calling pg_dumpall', async () => { + // create a service instance with a URL connection that includes libpqcompat + const dbUrl = 'postgresql://postgres:pwd@host:5432/immich?sslmode=require&uselibpqcompat=true'; + const configMock = { + getEnv: () => ({ database: { config: { connectionType: 'url', url: dbUrl }, skipMigrations: false } }), + getWorker: () => ImmichWorker.Api, + isDev: () => false, + } as unknown as any; + + sut = new DatabaseBackupService( + mocks.logger as never, + mocks.storage as never, + configMock as never, + mocks.systemMetadata as never, + mocks.process, + mocks.database as never, + mocks.cron as never, + mocks.job as never, + void 0 as never, + ); + + mocks.storage.readdir.mockResolvedValue([]); + mocks.process.spawnDuplexStream.mockImplementation(() => mockDuplex()('command', 0, 'data', '')); + mocks.storage.rename.mockResolvedValue(); + mocks.storage.unlink.mockResolvedValue(); + mocks.systemMetadata.get.mockResolvedValue(systemConfigStub.backupEnabled); + mocks.storage.createWriteStream.mockReturnValue(new PassThrough()); + mocks.database.getPostgresVersion.mockResolvedValue('14.10'); + + await sut.handleBackupDatabase(); + + expect(mocks.process.spawnDuplexStream).toHaveBeenCalled(); + const call = mocks.process.spawnDuplexStream.mock.calls[0]; + const args = call[1] as string[]; + expect(args).toMatchInlineSnapshot(` + [ + "postgresql://postgres:pwd@host:5432/immich?sslmode=require", + "--clean", + "--if-exists", + ] + `); + }); + + it('should run a database backup successfully', async () => { + const result = await sut.handleBackupDatabase(); + expect(result).toBe(JobStatus.Success); + expect(mocks.storage.createWriteStream).toHaveBeenCalled(); + }); + + it('should rename file on success', async () => { + const result = await sut.handleBackupDatabase(); + expect(result).toBe(JobStatus.Success); + expect(mocks.storage.rename).toHaveBeenCalled(); + }); + + it('should fail if pg_dump fails', async () => { + mocks.process.spawnDuplexStream.mockReturnValueOnce(mockDuplex()('pg_dump', 1, '', 'error')); + await expect(sut.handleBackupDatabase()).rejects.toThrow('pg_dump non-zero exit code (1)'); + }); + + it('should not rename file if pgdump fails and gzip succeeds', async () => { + mocks.process.spawnDuplexStream.mockReturnValueOnce(mockDuplex()('pg_dump', 1, '', 'error')); + await expect(sut.handleBackupDatabase()).rejects.toThrow('pg_dump non-zero exit code (1)'); + expect(mocks.storage.rename).not.toHaveBeenCalled(); + }); + + it('should fail if gzip fails', async () => { + mocks.process.spawnDuplexStream.mockReturnValueOnce(mockDuplex()('pg_dump', 0, 'data', '')); + mocks.process.spawnDuplexStream.mockReturnValueOnce(mockDuplex()('gzip', 1, '', 'error')); + await expect(sut.handleBackupDatabase()).rejects.toThrow('gzip non-zero exit code (1)'); + }); + + it('should fail if write stream fails', async () => { + mocks.storage.createWriteStream.mockImplementation(() => { + throw new Error('error'); + }); + await expect(sut.handleBackupDatabase()).rejects.toThrow('error'); + }); + + it('should fail if rename fails', async () => { + mocks.storage.rename.mockRejectedValue(new Error('error')); + await expect(sut.handleBackupDatabase()).rejects.toThrow('error'); + }); + + it('should ignore unlink failing and still return failed job status', async () => { + mocks.process.spawnDuplexStream.mockReturnValueOnce(mockDuplex()('pg_dump', 1, '', 'error')); + mocks.storage.unlink.mockRejectedValue(new Error('error')); + await expect(sut.handleBackupDatabase()).rejects.toThrow('pg_dump non-zero exit code (1)'); + expect(mocks.storage.unlink).toHaveBeenCalled(); + }); + + it.each` + postgresVersion | expectedVersion + ${'14.10'} | ${14} + ${'14.10.3'} | ${14} + ${'14.10 (Debian 14.10-1.pgdg120+1)'} | ${14} + ${'15.3.3'} | ${15} + ${'16.4.2'} | ${16} + ${'17.15.1'} | ${17} + ${'18.0.0'} | ${18} + `( + `should use pg_dump $expectedVersion with postgres version $postgresVersion`, + async ({ postgresVersion, expectedVersion }) => { + mocks.database.getPostgresVersion.mockResolvedValue(postgresVersion); + await sut.handleBackupDatabase(); + expect(mocks.process.spawnDuplexStream).toHaveBeenCalledWith( + `/usr/lib/postgresql/${expectedVersion}/bin/pg_dump`, + expect.any(Array), + expect.any(Object), + ); + }, + ); + it.each` + postgresVersion + ${'13.99.99'} + ${'19.0.0'} + `(`should fail if postgres version $postgresVersion is not supported`, async ({ postgresVersion }) => { + mocks.database.getPostgresVersion.mockResolvedValue(postgresVersion); + const result = await sut.handleBackupDatabase(); + expect(mocks.process.spawn).not.toHaveBeenCalled(); + expect(result).toBe(JobStatus.Failed); + }); + }); + + describe('buildPostgresLaunchArguments', () => { + describe('default config', () => { + it('should generate pg_dump arguments', async () => { + await expect(sut.buildPostgresLaunchArguments('pg_dump')).resolves.toMatchInlineSnapshot(` + { + "args": [ + "--username", + "postgres", + "--host", + "database", + "--port", + "5432", + "immich", + "--clean", + "--if-exists", + ], + "bin": "/usr/lib/postgresql/14/bin/pg_dump", + "databaseMajorVersion": 14, + "databasePassword": "postgres", + "databaseUsername": "postgres", + "databaseVersion": "14.10 (Debian 14.10-1.pgdg120+1)", + } + `); + }); + + it('should generate psql arguments', async () => { + await expect(sut.buildPostgresLaunchArguments('psql')).resolves.toMatchInlineSnapshot(` + { + "args": [ + "--username", + "postgres", + "--host", + "database", + "--port", + "5432", + "--dbname", + "immich", + "--echo-all", + "--output=/dev/null", + ], + "bin": "/usr/lib/postgresql/14/bin/psql", + "databaseMajorVersion": 14, + "databasePassword": "postgres", + "databaseUsername": "postgres", + "databaseVersion": "14.10 (Debian 14.10-1.pgdg120+1)", + } + `); + }); + + it('should generate psql (single transaction) arguments', async () => { + await expect(sut.buildPostgresLaunchArguments('psql', { singleTransaction: true })).resolves + .toMatchInlineSnapshot(` + { + "args": [ + "--username", + "postgres", + "--host", + "database", + "--port", + "5432", + "--dbname", + "immich", + "--single-transaction", + "--set", + "ON_ERROR_STOP=on", + "--echo-all", + "--output=/dev/null", + ], + "bin": "/usr/lib/postgresql/14/bin/psql", + "databaseMajorVersion": 14, + "databasePassword": "postgres", + "databaseUsername": "postgres", + "databaseVersion": "14.10 (Debian 14.10-1.pgdg120+1)", + } + `); + }); + }); + + describe('using custom parts', () => { + beforeEach(() => { + const configMock = { + getEnv: () => ({ + database: { + config: { + connectionType: 'parts', + host: 'myhost', + port: 1234, + username: 'mypg', + password: 'mypwd', + database: 'myimmich', + }, + skipMigrations: false, + }, + }), + getWorker: () => ImmichWorker.Api, + isDev: () => false, + } as unknown as any; + + sut = new DatabaseBackupService( + mocks.logger as never, + mocks.storage as never, + configMock as never, + mocks.systemMetadata as never, + mocks.process, + mocks.database as never, + mocks.cron as never, + mocks.job as never, + void 0 as never, + ); + }); + + it('should generate pg_dump arguments', async () => { + await expect(sut.buildPostgresLaunchArguments('pg_dump')).resolves.toMatchInlineSnapshot(` + { + "args": [ + "--username", + "mypg", + "--host", + "myhost", + "--port", + "1234", + "myimmich", + "--clean", + "--if-exists", + ], + "bin": "/usr/lib/postgresql/14/bin/pg_dump", + "databaseMajorVersion": 14, + "databasePassword": "mypwd", + "databaseUsername": "mypg", + "databaseVersion": "14.10 (Debian 14.10-1.pgdg120+1)", + } + `); + }); + + it('should generate psql (single transaction) arguments', async () => { + await expect(sut.buildPostgresLaunchArguments('psql', { singleTransaction: true })).resolves + .toMatchInlineSnapshot(` + { + "args": [ + "--username", + "mypg", + "--host", + "myhost", + "--port", + "1234", + "--dbname", + "myimmich", + "--single-transaction", + "--set", + "ON_ERROR_STOP=on", + "--echo-all", + "--output=/dev/null", + ], + "bin": "/usr/lib/postgresql/14/bin/psql", + "databaseMajorVersion": 14, + "databasePassword": "mypwd", + "databaseUsername": "mypg", + "databaseVersion": "14.10 (Debian 14.10-1.pgdg120+1)", + } + `); + }); + }); + + describe('using URL', () => { + beforeEach(() => { + const dbUrl = 'postgresql://mypg:mypwd@myhost:1234/myimmich?sslmode=require&uselibpqcompat=true'; + const configMock = { + getEnv: () => ({ database: { config: { connectionType: 'url', url: dbUrl }, skipMigrations: false } }), + getWorker: () => ImmichWorker.Api, + isDev: () => false, + } as unknown as any; + + sut = new DatabaseBackupService( + mocks.logger as never, + mocks.storage as never, + configMock as never, + mocks.systemMetadata as never, + mocks.process, + mocks.database as never, + mocks.cron as never, + mocks.job as never, + void 0 as never, + ); + }); + + it('should generate pg_dump arguments', async () => { + await expect(sut.buildPostgresLaunchArguments('pg_dump')).resolves.toMatchInlineSnapshot(` + { + "args": [ + "postgresql://mypg:mypwd@myhost:1234/myimmich?sslmode=require", + "--clean", + "--if-exists", + ], + "bin": "/usr/lib/postgresql/14/bin/pg_dump", + "databaseMajorVersion": 14, + "databasePassword": "mypwd", + "databaseUsername": "mypg", + "databaseVersion": "14.10 (Debian 14.10-1.pgdg120+1)", + } + `); + }); + + it('should generate psql (single transaction) arguments', async () => { + await expect(sut.buildPostgresLaunchArguments('psql', { singleTransaction: true })).resolves + .toMatchInlineSnapshot(` + { + "args": [ + "--dbname", + "postgresql://mypg:mypwd@myhost:1234/myimmich?sslmode=require", + "--single-transaction", + "--set", + "ON_ERROR_STOP=on", + "--echo-all", + "--output=/dev/null", + ], + "bin": "/usr/lib/postgresql/14/bin/psql", + "databaseMajorVersion": 14, + "databasePassword": "mypwd", + "databaseUsername": "mypg", + "databaseVersion": "14.10 (Debian 14.10-1.pgdg120+1)", + } + `); + }); + }); + + describe('using bad URL', () => { + beforeEach(() => { + const dbUrl = 'post://gresql://mypg:myp@wd@myhos:t:1234/myimmich?sslmode=require&uselibpqcompat=true'; + const configMock = { + getEnv: () => ({ database: { config: { connectionType: 'url', url: dbUrl }, skipMigrations: false } }), + getWorker: () => ImmichWorker.Api, + isDev: () => false, + } as unknown as any; + + sut = new DatabaseBackupService( + mocks.logger as never, + mocks.storage as never, + configMock as never, + mocks.systemMetadata as never, + mocks.process, + mocks.database as never, + mocks.cron as never, + mocks.job as never, + void 0 as never, + ); + }); + + it('should fallback to reasonable defaults', async () => { + await expect(sut.buildPostgresLaunchArguments('psql')).resolves.toMatchInlineSnapshot(` + { + "args": [ + "--dbname", + "post://gresql//mypg:myp@wd@myhos:t:1234/myimmich?sslmode=require", + "--echo-all", + "--output=/dev/null", + ], + "bin": "/usr/lib/postgresql/14/bin/psql", + "databaseMajorVersion": 14, + "databasePassword": "", + "databaseUsername": "", + "databaseVersion": "14.10 (Debian 14.10-1.pgdg120+1)", + } + `); + }); + }); + }); + + describe('uploadBackup', () => { + it('should reject invalid file names', async () => { + await expect(sut.uploadBackup({ originalname: 'invalid backup' } as never)).rejects.toThrowError( + new BadRequestException('Invalid backup name!'), + ); + }); + + it('should write file', async () => { + await sut.uploadBackup({ originalname: 'path.sql.gz', buffer: 'buffer' } as never); + expect(mocks.storage.createOrOverwriteFile).toBeCalledWith('/data/backups/uploaded-path.sql.gz', 'buffer'); + }); + }); + + describe('downloadBackup', () => { + it('should reject invalid file names', () => { + expect(() => sut.downloadBackup('invalid backup')).toThrowError(new BadRequestException('Invalid backup name!')); + }); + + it('should get backup path', () => { + expect(sut.downloadBackup('hello.sql.gz')).toEqual( + expect.objectContaining({ + path: '/data/backups/hello.sql.gz', + }), + ); + }); + }); + describe('listBackups', () => { it('should give us all backups', async () => { mocks.storage.readdir.mockResolvedValue([ @@ -54,30 +625,233 @@ describe(MaintenanceService.name, () => { }); }); - describe('uploadBackup', () => { - it('should reject invalid file names', async () => { - await expect(sut.uploadBackup({ originalname: 'invalid backup' } as never)).rejects.toThrowError( - new BadRequestException('Invalid backup name!'), + describe('restoreDatabaseBackup', () => { + beforeEach(() => { + mocks.storage.readdir.mockResolvedValue([]); + mocks.process.spawn.mockReturnValue(mockSpawn(0, 'data', '')); + mocks.process.spawnDuplexStream.mockImplementation(() => mockDuplex()('command', 0, 'data', '')); + mocks.process.fork.mockImplementation(() => mockSpawn(0, 'Immich Server is listening', '')); + mocks.storage.rename.mockResolvedValue(); + mocks.storage.unlink.mockResolvedValue(); + mocks.storage.createPlainReadStream.mockReturnValue(Readable.from(mockData())); + mocks.storage.createWriteStream.mockReturnValue(new PassThrough()); + mocks.storage.createGzip.mockReturnValue(new PassThrough()); + mocks.storage.createGunzip.mockReturnValue(new PassThrough()); + + const configMock = { + getEnv: () => ({ + database: { + config: { + connectionType: 'parts', + host: 'myhost', + port: 1234, + username: 'mypg', + password: 'mypwd', + database: 'myimmich', + }, + skipMigrations: false, + }, + }), + getWorker: () => ImmichWorker.Api, + isDev: () => false, + } as unknown as any; + + sut = new DatabaseBackupService( + mocks.logger as never, + mocks.storage as never, + configMock as never, + mocks.systemMetadata as never, + mocks.process, + mocks.database as never, + mocks.cron as never, + mocks.job as never, + maintenanceHealthRepositoryMock, ); }); - it('should write file', async () => { - await sut.uploadBackup({ originalname: 'path.sql.gz', buffer: 'buffer' } as never); - expect(mocks.storage.createOrOverwriteFile).toBeCalledWith('/data/backups/uploaded-path.sql.gz', 'buffer'); - }); - }); - - describe('downloadBackup', () => { - it('should reject invalid file names', () => { - expect(() => sut.downloadBackup('invalid backup')).toThrowError(new BadRequestException('Invalid backup name!')); + it('should fail to restore invalid backup', async () => { + await expect(sut.restoreDatabaseBackup('filename')).rejects.toThrowErrorMatchingInlineSnapshot( + `[Error: Invalid backup file format!]`, + ); }); - it('should get backup path', () => { - expect(sut.downloadBackup('hello.sql.gz')).toEqual( + it('should successfully restore a backup', async () => { + let writtenToPsql = ''; + + mocks.process.spawnDuplexStream.mockImplementationOnce(() => mockDuplex()('command', 0, 'data', '')); + mocks.process.spawnDuplexStream.mockImplementationOnce(() => mockDuplex()('command', 0, 'data', '')); + mocks.process.spawnDuplexStream.mockImplementationOnce(() => { + return mockDuplex((chunk) => (writtenToPsql += chunk))('command', 0, 'data', ''); + }); + + const progress = vitest.fn(); + await sut.restoreDatabaseBackup('development-filename.sql', progress); + + expect(progress).toHaveBeenCalledWith('backup', 0.05); + expect(progress).toHaveBeenCalledWith('migrations', 0.9); + + expect(maintenanceHealthRepositoryMock.checkApiHealth).toHaveBeenCalled(); + expect(mocks.process.spawnDuplexStream).toHaveBeenCalledTimes(3); + + expect(mocks.process.spawnDuplexStream).toHaveBeenLastCalledWith( + expect.stringMatching('/bin/psql'), + [ + '--username', + 'mypg', + '--host', + 'myhost', + '--port', + '1234', + '--dbname', + 'myimmich', + '--single-transaction', + '--set', + 'ON_ERROR_STOP=on', + '--echo-all', + '--output=/dev/null', + ], expect.objectContaining({ - path: '/data/backups/hello.sql.gz', + env: expect.objectContaining({ + PATH: expect.any(String), + PGPASSWORD: 'mypwd', + }), }), ); + + expect(writtenToPsql).toMatchInlineSnapshot(` + " + -- drop all other database connections + SELECT pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE datname = current_database() + AND pid <> pg_backend_pid(); + + -- re-create the default schema + DROP SCHEMA public CASCADE; + CREATE SCHEMA public; + + -- restore access to schema + GRANT ALL ON SCHEMA public TO "mypg"; + GRANT ALL ON SCHEMA public TO public; + SELECT 1;" + `); + }); + + it('should generate pg_dumpall specific SQL instructions', async () => { + let writtenToPsql = ''; + + mocks.process.spawnDuplexStream.mockImplementationOnce(() => mockDuplex()('command', 0, 'data', '')); + mocks.process.spawnDuplexStream.mockImplementationOnce(() => mockDuplex()('command', 0, 'data', '')); + mocks.process.spawnDuplexStream.mockImplementationOnce(() => { + return mockDuplex((chunk) => (writtenToPsql += chunk))('command', 0, 'data', ''); + }); + + const progress = vitest.fn(); + await sut.restoreDatabaseBackup('development-v2.4.0-.sql', progress); + + expect(progress).toHaveBeenCalledWith('backup', 0.05); + expect(progress).toHaveBeenCalledWith('migrations', 0.9); + + expect(maintenanceHealthRepositoryMock.checkApiHealth).toHaveBeenCalled(); + expect(mocks.process.spawnDuplexStream).toHaveBeenCalledTimes(3); + + expect(mocks.process.spawnDuplexStream).toHaveBeenLastCalledWith( + expect.stringMatching('/bin/psql'), + [ + '--username', + 'mypg', + '--host', + 'myhost', + '--port', + '1234', + '--dbname', + 'myimmich', + '--echo-all', + '--output=/dev/null', + ], + expect.objectContaining({ + env: expect.objectContaining({ + PATH: expect.any(String), + PGPASSWORD: 'mypwd', + }), + }), + ); + + expect(writtenToPsql).toMatchInlineSnapshot(String.raw` + " + -- drop all other database connections + SELECT pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE datname = current_database() + AND pid <> pg_backend_pid(); + + \c postgres + SELECT 1;" + `); + }); + + it('should fail if backup creation fails', async () => { + mocks.process.spawnDuplexStream.mockReturnValueOnce(mockDuplex()('pg_dump', 1, '', 'error')); + + const progress = vitest.fn(); + await expect(sut.restoreDatabaseBackup('development-filename.sql', progress)).rejects + .toThrowErrorMatchingInlineSnapshot(` + [Error: pg_dump non-zero exit code (1) + error] + `); + + expect(progress).toHaveBeenCalledWith('backup', 0.05); + }); + + it('should fail if restore itself fails', async () => { + mocks.process.spawnDuplexStream + .mockReturnValueOnce(mockDuplex()('pg_dump', 0, 'data', '')) + .mockReturnValueOnce(mockDuplex()('gzip', 0, 'data', '')) + .mockReturnValueOnce(mockDuplex()('psql', 1, '', 'error')); + + const progress = vitest.fn(); + await expect(sut.restoreDatabaseBackup('development-filename.sql', progress)).rejects + .toThrowErrorMatchingInlineSnapshot(` + [Error: psql non-zero exit code (1) + error] + `); + + expect(progress).toHaveBeenCalledWith('backup', 0.05); + }); + + it('should rollback if database migrations fail', async () => { + mocks.database.runMigrations.mockRejectedValue(new Error('Migrations Error')); + + const progress = vitest.fn(); + await expect( + sut.restoreDatabaseBackup('development-filename.sql', progress), + ).rejects.toThrowErrorMatchingInlineSnapshot(`[Error: Migrations Error]`); + + expect(progress).toHaveBeenCalledWith('backup', 0.05); + expect(progress).toHaveBeenCalledWith('migrations', 0.9); + + expect(maintenanceHealthRepositoryMock.checkApiHealth).toHaveBeenCalledTimes(0); + expect(mocks.process.spawnDuplexStream).toHaveBeenCalledTimes(4); + }); + + it('should rollback if API healthcheck fails', async () => { + maintenanceHealthRepositoryMock.checkApiHealth.mockRejectedValue(new Error('Health Error')); + + const progress = vitest.fn(); + await expect( + sut.restoreDatabaseBackup('development-filename.sql', progress), + ).rejects.toThrowErrorMatchingInlineSnapshot(`[Error: Health Error]`); + + expect(progress).toHaveBeenCalledWith('backup', 0.05); + expect(progress).toHaveBeenCalledWith('migrations', 0.9); + expect(progress).toHaveBeenCalledWith('rollback', 0); + + expect(maintenanceHealthRepositoryMock.checkApiHealth).toHaveBeenCalled(); + expect(mocks.process.spawnDuplexStream).toHaveBeenCalledTimes(4); }); }); }); + +function* mockData() { + yield 'SELECT 1;'; +} diff --git a/server/src/services/database-backup.service.ts b/server/src/services/database-backup.service.ts index 542e961b43..de7090fa83 100644 --- a/server/src/services/database-backup.service.ts +++ b/server/src/services/database-backup.service.ts @@ -1,43 +1,560 @@ -import { Injectable } from '@nestjs/common'; +import { BadRequestException, Injectable, Optional } from '@nestjs/common'; +import { debounce } from 'lodash'; +import { DateTime } from 'luxon'; +import path, { basename } from 'node:path'; +import { PassThrough, Readable, Writable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; +import semver from 'semver'; +import { serverVersion } from 'src/constants'; +import { StorageCore } from 'src/cores/storage.core'; +import { OnEvent, OnJob } from 'src/decorators'; import { DatabaseBackupListResponseDto } from 'src/dtos/database-backup.dto'; -import { BaseService } from 'src/services/base.service'; +import { CacheControl, DatabaseLock, ImmichWorker, JobName, JobStatus, QueueName, StorageFolder } from 'src/enum'; +import { MaintenanceHealthRepository } from 'src/maintenance/maintenance-health.repository'; +import { ConfigRepository } from 'src/repositories/config.repository'; +import { CronRepository } from 'src/repositories/cron.repository'; +import { DatabaseRepository } from 'src/repositories/database.repository'; +import { ArgOf } from 'src/repositories/event.repository'; +import { JobRepository } from 'src/repositories/job.repository'; +import { LoggingRepository } from 'src/repositories/logging.repository'; +import { ProcessRepository } from 'src/repositories/process.repository'; +import { StorageRepository } from 'src/repositories/storage.repository'; +import { SystemMetadataRepository } from 'src/repositories/system-metadata.repository'; +import { getConfig } from 'src/utils/config'; import { - deleteDatabaseBackup, - downloadDatabaseBackup, - listDatabaseBackups, - uploadDatabaseBackup, + findDatabaseBackupVersion, + isFailedDatabaseBackupName, + isValidDatabaseBackupName, + isValidDatabaseRoutineBackupName, + UnsupportedPostgresError, } from 'src/utils/database-backups'; import { ImmichFileResponse } from 'src/utils/file'; +import { handlePromiseError } from 'src/utils/misc'; -/** - * This service is available outside of maintenance mode to manage maintenance mode - */ @Injectable() -export class DatabaseBackupService extends BaseService { - async listBackups(): Promise { - const backups = await listDatabaseBackups(this.backupRepos); - return { backups }; +export class DatabaseBackupService { + constructor( + private readonly logger: LoggingRepository, + private readonly storageRepository: StorageRepository, + private readonly configRepository: ConfigRepository, + private readonly systemMetadataRepository: SystemMetadataRepository, + private readonly processRepository: ProcessRepository, + private readonly databaseRepository: DatabaseRepository, + @Optional() + private readonly cronRepository: CronRepository, + @Optional() + private readonly jobRepository: JobRepository, + @Optional() + private readonly maintenanceHealthRepository: MaintenanceHealthRepository, + ) { + this.logger.setContext(this.constructor.name); } - deleteBackup(files: string[]): Promise { - return deleteDatabaseBackup(this.backupRepos, files); + private backupLock = false; + + @OnEvent({ name: 'ConfigInit', workers: [ImmichWorker.Microservices] }) + async onConfigInit({ + newConfig: { + backup: { database }, + }, + }: ArgOf<'ConfigInit'>) { + if (!this.cronRepository || !this.jobRepository) { + return; + } + + this.backupLock = await this.databaseRepository.tryLock(DatabaseLock.BackupDatabase); + + if (this.backupLock) { + this.cronRepository.create({ + name: 'backupDatabase', + expression: database.cronExpression, + onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.DatabaseBackup }), this.logger), + start: database.enabled, + }); + } + } + + @OnEvent({ name: 'ConfigUpdate', server: true }) + onConfigUpdate({ newConfig: { backup } }: ArgOf<'ConfigUpdate'>) { + if (!this.cronRepository || !this.jobRepository || !this.backupLock) { + return; + } + + this.cronRepository.update({ + name: 'backupDatabase', + expression: backup.database.cronExpression, + start: backup.database.enabled, + }); + } + + @OnJob({ name: JobName.DatabaseBackup, queue: QueueName.BackupDatabase }) + async handleBackupDatabase(): Promise { + try { + await this.createDatabaseBackup(); + } catch (error) { + if (error instanceof UnsupportedPostgresError) { + return JobStatus.Failed; + } + + throw error; + } + + await this.cleanupDatabaseBackups(); + return JobStatus.Success; + } + + async buildPostgresLaunchArguments( + bin: 'pg_dump' | 'pg_dumpall' | 'psql', + options: { + singleTransaction?: boolean; + } = {}, + ): Promise<{ + bin: string; + args: string[]; + databaseUsername: string; + databasePassword: string; + databaseVersion: string; + databaseMajorVersion?: number; + }> { + const { + database: { config: databaseConfig }, + } = this.configRepository.getEnv(); + const isUrlConnection = databaseConfig.connectionType === 'url'; + + const databaseVersion = await this.databaseRepository.getPostgresVersion(); + const databaseSemver = semver.coerce(databaseVersion); + const databaseMajorVersion = databaseSemver?.major; + + const args: string[] = []; + let databaseUsername; + + if (isUrlConnection) { + if (bin !== 'pg_dump') { + args.push('--dbname'); + } + + let url = databaseConfig.url; + if (URL.canParse(databaseConfig.url)) { + const parsedUrl = new URL(databaseConfig.url); + // remove known bad parameters + parsedUrl.searchParams.delete('uselibpqcompat'); + + databaseUsername = parsedUrl.username; + url = parsedUrl.toString(); + } + + // assume typical values if we can't parse URL or not present + databaseUsername ??= 'postgres'; + + args.push(url); + } else { + databaseUsername = databaseConfig.username; + + args.push( + '--username', + databaseUsername, + '--host', + databaseConfig.host, + '--port', + databaseConfig.port.toString(), + ); + + switch (bin) { + case 'pg_dumpall': { + args.push('--database'); + break; + } + case 'psql': { + args.push('--dbname'); + break; + } + } + + args.push(databaseConfig.database); + } + + switch (bin) { + case 'pg_dump': + case 'pg_dumpall': { + args.push('--clean', '--if-exists'); + break; + } + case 'psql': { + if (options.singleTransaction) { + args.push( + // don't commit any transaction on failure + '--single-transaction', + // exit with non-zero code on error + '--set', + 'ON_ERROR_STOP=on', + ); + } + + args.push( + // used for progress monitoring + '--echo-all', + '--output=/dev/null', + ); + break; + } + } + + if (!databaseMajorVersion || !databaseSemver || !semver.satisfies(databaseSemver, '>=14.0.0 <19.0.0')) { + this.logger.error(`Database Restore Failure: Unsupported PostgreSQL version: ${databaseVersion}`); + throw new UnsupportedPostgresError(databaseVersion); + } + + return { + bin: `/usr/lib/postgresql/${databaseMajorVersion}/bin/${bin}`, + args, + databaseUsername, + databasePassword: isUrlConnection ? new URL(databaseConfig.url).password : databaseConfig.password, + databaseVersion, + databaseMajorVersion, + }; + } + + async createDatabaseBackup(filenamePrefix: string = ''): Promise { + this.logger.debug(`Database Backup Started`); + + const { bin, args, databasePassword, databaseVersion, databaseMajorVersion } = + await this.buildPostgresLaunchArguments('pg_dump'); + + this.logger.log(`Database Backup Starting. Database Version: ${databaseMajorVersion}`); + + const filename = `${filenamePrefix}immich-db-backup-${DateTime.now().toFormat("yyyyLLdd'T'HHmmss")}-v${serverVersion.toString()}-pg${databaseVersion.split(' ')[0]}.sql.gz`; + const backupFilePath = path.join(StorageCore.getBaseFolder(StorageFolder.Backups), filename); + const temporaryFilePath = `${backupFilePath}.tmp`; + + try { + const pgdump = this.processRepository.spawnDuplexStream(bin, args, { + env: { + PATH: process.env.PATH, + PGPASSWORD: databasePassword, + }, + }); + + const gzip = this.processRepository.spawnDuplexStream('gzip', ['--rsyncable']); + const fileStream = this.storageRepository.createWriteStream(temporaryFilePath); + + await pipeline(pgdump, gzip, fileStream); + await this.storageRepository.rename(temporaryFilePath, backupFilePath); + } catch (error) { + this.logger.error(`Database Backup Failure: ${error}`); + await this.storageRepository + .unlink(temporaryFilePath) + .catch((error) => this.logger.error(`Failed to delete failed backup file: ${error}`)); + throw error; + } + + this.logger.log(`Database Backup Success`); + return backupFilePath; } async uploadBackup(file: Express.Multer.File): Promise { - return uploadDatabaseBackup(this.backupRepos, file); + const backupsFolder = StorageCore.getBaseFolder(StorageFolder.Backups); + const fn = basename(file.originalname); + if (!isValidDatabaseBackupName(fn)) { + throw new BadRequestException('Invalid backup name!'); + } + + const filePath = path.join(backupsFolder, `uploaded-${fn}`); + await this.storageRepository.createOrOverwriteFile(filePath, file.buffer); } downloadBackup(fileName: string): ImmichFileResponse { - return downloadDatabaseBackup(fileName); - } + if (!isValidDatabaseBackupName(fileName)) { + throw new BadRequestException('Invalid backup name!'); + } + + const filePath = path.join(StorageCore.getBaseFolder(StorageFolder.Backups), fileName); - private get backupRepos() { return { - logger: this.logger, - storage: this.storageRepository, - config: this.configRepository, - process: this.processRepository, - database: this.databaseRepository, + path: filePath, + fileName, + cacheControl: CacheControl.PrivateWithoutCache, + contentType: fileName.endsWith('.gz') ? 'application/gzip' : 'application/sql', }; } + + async listBackups(): Promise { + const backupsFolder = StorageCore.getBaseFolder(StorageFolder.Backups); + const files = await this.storageRepository.readdir(backupsFolder); + + const validFiles = files + .filter((fn) => isValidDatabaseBackupName(fn)) + .toSorted((a, b) => (a.startsWith('uploaded-') === b.startsWith('uploaded-') ? a.localeCompare(b) : 1)) + .toReversed(); + + const backups = await Promise.all( + validFiles.map(async (filename) => { + const stats = await this.storageRepository.stat(path.join(backupsFolder, filename)); + return { filename, filesize: stats.size }; + }), + ); + + return { + backups, + }; + } + + async deleteBackup(files: string[]): Promise { + const backupsFolder = StorageCore.getBaseFolder(StorageFolder.Backups); + + if (files.some((filename) => !isValidDatabaseBackupName(filename))) { + throw new BadRequestException('Invalid backup name!'); + } + + await Promise.all(files.map((filename) => this.storageRepository.unlink(path.join(backupsFolder, filename)))); + } + + async cleanupDatabaseBackups() { + this.logger.debug(`Database Backup Cleanup Started`); + const { + backup: { database: config }, + } = await getConfig( + { + configRepo: this.configRepository, + metadataRepo: this.systemMetadataRepository, + logger: this.logger, + }, + { + withCache: false, + }, + ); + + const backupsFolder = StorageCore.getBaseFolder(StorageFolder.Backups); + const files = await this.storageRepository.readdir(backupsFolder); + const backups = files + .filter((filename) => isValidDatabaseRoutineBackupName(filename)) + .toSorted() + .toReversed(); + const failedBackups = files.filter((filename) => isFailedDatabaseBackupName(filename)); + + const toDelete = backups.slice(config.keepLastAmount); + toDelete.push(...failedBackups); + + for (const file of toDelete) { + await this.storageRepository.unlink(path.join(backupsFolder, file)); + } + + this.logger.debug(`Database Backup Cleanup Finished, deleted ${toDelete.length} backups`); + } + + async restoreDatabaseBackup( + filename: string, + progressCb?: (action: 'backup' | 'restore' | 'migrations' | 'rollback', progress: number) => void, + ): Promise { + this.logger.debug(`Database Restore Started`); + + let complete = false; + try { + if (!isValidDatabaseBackupName(filename)) { + throw new Error('Invalid backup file format!'); + } + + const backupFilePath = path.join(StorageCore.getBaseFolder(StorageFolder.Backups), filename); + await this.storageRepository.stat(backupFilePath); // => check file exists + + let isPgClusterDump = false; + const version = findDatabaseBackupVersion(filename); + if (version && semver.satisfies(version, '<= 2.4')) { + isPgClusterDump = true; + } + + const { bin, args, databaseUsername, databasePassword, databaseMajorVersion } = + await this.buildPostgresLaunchArguments('psql', { + singleTransaction: !isPgClusterDump, + }); + + progressCb?.('backup', 0.05); + + const restorePointFilePath = await this.createDatabaseBackup('restore-point-'); + + this.logger.log(`Database Restore Starting. Database Version: ${databaseMajorVersion}`); + + let inputStream: Readable; + if (backupFilePath.endsWith('.gz')) { + const fileStream = this.storageRepository.createPlainReadStream(backupFilePath); + const gunzip = this.storageRepository.createGunzip(); + fileStream.pipe(gunzip); + inputStream = gunzip; + } else { + inputStream = this.storageRepository.createPlainReadStream(backupFilePath); + } + + const sqlStream = Readable.from(sql(inputStream, databaseUsername, isPgClusterDump)); + const psql = this.processRepository.spawnDuplexStream(bin, args, { + env: { + PATH: process.env.PATH, + PGPASSWORD: databasePassword, + }, + }); + + const [progressSource, progressSink] = createSqlProgressStreams((progress) => { + if (complete) { + return; + } + + this.logger.log(`Restore progress ~ ${(progress * 100).toFixed(2)}%`); + progressCb?.('restore', progress); + }); + + await pipeline(sqlStream, progressSource, psql, progressSink); + + try { + progressCb?.('migrations', 0.9); + await this.databaseRepository.runMigrations(); + await this.maintenanceHealthRepository.checkApiHealth(); + } catch (error) { + progressCb?.('rollback', 0); + + const fileStream = this.storageRepository.createPlainReadStream(restorePointFilePath); + const gunzip = this.storageRepository.createGunzip(); + fileStream.pipe(gunzip); + inputStream = gunzip; + + const sqlStream = Readable.from(sqlRollback(inputStream, databaseUsername)); + const psql = this.processRepository.spawnDuplexStream(bin, args, { + env: { + PATH: process.env.PATH, + PGPASSWORD: databasePassword, + }, + }); + + const [progressSource, progressSink] = createSqlProgressStreams((progress) => { + if (complete) { + return; + } + + this.logger.log(`Rollback progress ~ ${(progress * 100).toFixed(2)}%`); + progressCb?.('rollback', progress); + }); + + await pipeline(sqlStream, progressSource, psql, progressSink); + + throw error; + } + } catch (error) { + this.logger.error(`Database Restore Failure: ${error}`); + throw error; + } finally { + complete = true; + } + + this.logger.log(`Database Restore Success`); + } +} + +const SQL_DROP_CONNECTIONS = ` + -- drop all other database connections + SELECT pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE datname = current_database() + AND pid <> pg_backend_pid(); +`; + +const SQL_RESET_SCHEMA = (username: string) => ` + -- re-create the default schema + DROP SCHEMA public CASCADE; + CREATE SCHEMA public; + + -- restore access to schema + GRANT ALL ON SCHEMA public TO "${username}"; + GRANT ALL ON SCHEMA public TO public; +`; + +async function* sql(inputStream: Readable, databaseUsername: string, isPgClusterDump: boolean) { + yield SQL_DROP_CONNECTIONS; + yield isPgClusterDump + ? // it is likely the dump contains SQL to try to drop the currently active + // database to ensure we have a fresh slate; if the `postgres` database exists + // then prefer to switch before continuing otherwise this will just silently fail + String.raw` + \c postgres + ` + : SQL_RESET_SCHEMA(databaseUsername); + + for await (const chunk of inputStream) { + yield chunk; + } +} + +async function* sqlRollback(inputStream: Readable, databaseUsername: string) { + yield SQL_DROP_CONNECTIONS; + yield SQL_RESET_SCHEMA(databaseUsername); + + for await (const chunk of inputStream) { + yield chunk; + } +} + +function createSqlProgressStreams(cb: (progress: number) => void) { + const STDIN_START_MARKER = new TextEncoder().encode('FROM stdin'); + const STDIN_END_MARKER = new TextEncoder().encode(String.raw`\.`); + + let readingStdin = false; + let sequenceIdx = 0; + + let linesSent = 0; + let linesProcessed = 0; + + const startedAt = +Date.now(); + const cbDebounced = debounce( + () => { + const progress = source.writableEnded + ? Math.min(1, linesProcessed / linesSent) + : // progress simulation while we're in an indeterminate state + Math.min(0.3, 0.1 + (Date.now() - startedAt) / 1e4); + cb(progress); + }, + 100, + { + maxWait: 100, + }, + ); + + let lastByte = -1; + const source = new PassThrough({ + transform(chunk, _encoding, callback) { + for (const byte of chunk) { + if (!readingStdin && byte === 10 && lastByte !== 10) { + linesSent += 1; + } + + lastByte = byte; + + const sequence = readingStdin ? STDIN_END_MARKER : STDIN_START_MARKER; + if (sequence[sequenceIdx] === byte) { + sequenceIdx += 1; + + if (sequence.length === sequenceIdx) { + sequenceIdx = 0; + readingStdin = !readingStdin; + } + } else { + sequenceIdx = 0; + } + } + + cbDebounced(); + this.push(chunk); + callback(); + }, + }); + + const sink = new Writable({ + write(chunk, _encoding, callback) { + for (const byte of chunk) { + if (byte === 10) { + linesProcessed++; + } + } + + cbDebounced(); + callback(); + }, + }); + + return [source, sink]; } diff --git a/server/src/services/index.ts b/server/src/services/index.ts index 2c2fb995c8..ba54474b71 100644 --- a/server/src/services/index.ts +++ b/server/src/services/index.ts @@ -7,7 +7,6 @@ import { AssetService } from 'src/services/asset.service'; import { AuditService } from 'src/services/audit.service'; import { AuthAdminService } from 'src/services/auth-admin.service'; import { AuthService } from 'src/services/auth.service'; -import { BackupService } from 'src/services/backup.service'; import { CliService } from 'src/services/cli.service'; import { DatabaseBackupService } from 'src/services/database-backup.service'; import { DatabaseService } from 'src/services/database.service'; @@ -58,7 +57,6 @@ export const services = [ AuditService, AuthService, AuthAdminService, - BackupService, CliService, DatabaseBackupService, DatabaseService, diff --git a/server/src/utils/database-backups.ts b/server/src/utils/database-backups.ts index 1d508e2a7d..70bedb32b1 100644 --- a/server/src/utils/database-backups.ts +++ b/server/src/utils/database-backups.ts @@ -1,20 +1,3 @@ -import { BadRequestException } from '@nestjs/common'; -import { debounce } from 'lodash'; -import { DateTime } from 'luxon'; -import path, { basename, join } from 'node:path'; -import { PassThrough, Readable, Writable } from 'node:stream'; -import { pipeline } from 'node:stream/promises'; -import semver from 'semver'; -import { serverVersion } from 'src/constants'; -import { StorageCore } from 'src/cores/storage.core'; -import { CacheControl, StorageFolder } from 'src/enum'; -import { MaintenanceHealthRepository } from 'src/maintenance/maintenance-health.repository'; -import { ConfigRepository } from 'src/repositories/config.repository'; -import { DatabaseRepository } from 'src/repositories/database.repository'; -import { LoggingRepository } from 'src/repositories/logging.repository'; -import { ProcessRepository } from 'src/repositories/process.repository'; -import { StorageRepository } from 'src/repositories/storage.repository'; - export function isValidDatabaseBackupName(filename: string) { return filename.match(/^[\d\w-.]+\.sql(?:\.gz)?$/); } @@ -30,453 +13,12 @@ export function isFailedDatabaseBackupName(filename: string) { return filename.match(/^immich-db-backup-.*\.sql\.gz\.tmp$/); } -export function findVersion(filename: string) { +export function findDatabaseBackupVersion(filename: string) { return /-v(.*)-/.exec(filename)?.[1]; } -type BackupRepos = { - logger: LoggingRepository; - storage: StorageRepository; - config: ConfigRepository; - process: ProcessRepository; - database: DatabaseRepository; - health: MaintenanceHealthRepository; -}; - export class UnsupportedPostgresError extends Error { constructor(databaseVersion: string) { super(`Unsupported PostgreSQL version: ${databaseVersion}`); } } - -export async function buildPostgresLaunchArguments( - { logger, config, database }: Pick, - bin: 'pg_dump' | 'pg_dumpall' | 'psql', - options: { - singleTransaction?: boolean; - username?: string; - } = {}, -): Promise<{ - bin: string; - args: string[]; - databaseUsername: string; - databasePassword: string; - databaseVersion: string; - databaseMajorVersion?: number; -}> { - const { - database: { config: databaseConfig }, - } = config.getEnv(); - const isUrlConnection = databaseConfig.connectionType === 'url'; - - const databaseVersion = await database.getPostgresVersion(); - const databaseSemver = semver.coerce(databaseVersion); - const databaseMajorVersion = databaseSemver?.major; - - const args: string[] = []; - let databaseUsername; - - if (isUrlConnection) { - if (bin !== 'pg_dump') { - args.push('--dbname'); - } - - let url = databaseConfig.url; - if (URL.canParse(databaseConfig.url)) { - const parsedUrl = new URL(databaseConfig.url); - // remove known bad parameters - parsedUrl.searchParams.delete('uselibpqcompat'); - - databaseUsername = parsedUrl.username; - url = parsedUrl.toString(); - } - - // assume typical values if we can't parse URL or not present - databaseUsername ??= 'postgres'; - - args.push(url); - } else { - databaseUsername = databaseConfig.username; - - args.push('--username', databaseUsername, '--host', databaseConfig.host, '--port', databaseConfig.port.toString()); - - switch (bin) { - case 'pg_dumpall': { - args.push('--database'); - break; - } - case 'psql': { - args.push('--dbname'); - break; - } - } - - args.push(databaseConfig.database); - } - - switch (bin) { - case 'pg_dump': - case 'pg_dumpall': { - args.push('--clean', '--if-exists'); - break; - } - case 'psql': { - if (options.singleTransaction) { - args.push( - // don't commit any transaction on failure - '--single-transaction', - // exit with non-zero code on error - '--set', - 'ON_ERROR_STOP=on', - ); - } - - args.push( - // used for progress monitoring - '--echo-all', - '--output=/dev/null', - ); - break; - } - } - - if (!databaseMajorVersion || !databaseSemver || !semver.satisfies(databaseSemver, '>=14.0.0 <19.0.0')) { - logger.error(`Database Restore Failure: Unsupported PostgreSQL version: ${databaseVersion}`); - throw new UnsupportedPostgresError(databaseVersion); - } - - return { - bin: `/usr/lib/postgresql/${databaseMajorVersion}/bin/${bin}`, - args, - databaseUsername, - databasePassword: isUrlConnection ? new URL(databaseConfig.url).password : databaseConfig.password, - databaseVersion, - databaseMajorVersion, - }; -} - -export async function createDatabaseBackup( - { logger, storage, process: processRepository, ...pgRepos }: Omit, - filenamePrefix: string = '', -): Promise { - logger.debug(`Database Backup Started`); - - const { bin, args, databasePassword, databaseVersion, databaseMajorVersion } = await buildPostgresLaunchArguments( - { logger, ...pgRepos }, - 'pg_dump', - ); - - logger.log(`Database Backup Starting. Database Version: ${databaseMajorVersion}`); - - const filename = `${filenamePrefix}immich-db-backup-${DateTime.now().toFormat("yyyyLLdd'T'HHmmss")}-v${serverVersion.toString()}-pg${databaseVersion.split(' ')[0]}.sql.gz`; - const backupFilePath = join(StorageCore.getBaseFolder(StorageFolder.Backups), filename); - const temporaryFilePath = `${backupFilePath}.tmp`; - - try { - const pgdump = processRepository.spawnDuplexStream(bin, args, { - env: { - PATH: process.env.PATH, - PGPASSWORD: databasePassword, - }, - }); - - const gzip = processRepository.spawnDuplexStream('gzip', ['--rsyncable']); - const fileStream = storage.createWriteStream(temporaryFilePath); - - await pipeline(pgdump, gzip, fileStream); - await storage.rename(temporaryFilePath, backupFilePath); - } catch (error) { - logger.error(`Database Backup Failure: ${error}`); - await storage - .unlink(temporaryFilePath) - .catch((error) => logger.error(`Failed to delete failed backup file: ${error}`)); - throw error; - } - - logger.log(`Database Backup Success`); - return backupFilePath; -} - -const SQL_DROP_CONNECTIONS = ` - -- drop all other database connections - SELECT pg_terminate_backend(pid) - FROM pg_stat_activity - WHERE datname = current_database() - AND pid <> pg_backend_pid(); -`; - -const SQL_RESET_SCHEMA = (username: string) => ` - -- re-create the default schema - DROP SCHEMA public CASCADE; - CREATE SCHEMA public; - - -- restore access to schema - GRANT ALL ON SCHEMA public TO "${username}"; - GRANT ALL ON SCHEMA public TO public; -`; - -async function* sql(inputStream: Readable, databaseUsername: string, isPgClusterDump: boolean) { - yield SQL_DROP_CONNECTIONS; - yield isPgClusterDump - ? // it is likely the dump contains SQL to try to drop the currently active - // database to ensure we have a fresh slate; if the `postgres` database exists - // then prefer to switch before continuing otherwise this will just silently fail - String.raw` - \c postgres - ` - : SQL_RESET_SCHEMA(databaseUsername); - - for await (const chunk of inputStream) { - yield chunk; - } -} - -async function* sqlRollback(inputStream: Readable, databaseUsername: string) { - yield SQL_DROP_CONNECTIONS; - yield SQL_RESET_SCHEMA(databaseUsername); - - for await (const chunk of inputStream) { - yield chunk; - } -} - -export async function restoreDatabaseBackup( - { logger, storage, process: processRepository, database: databaseRepository, health, ...pgRepos }: BackupRepos, - filename: string, - progressCb?: (action: 'backup' | 'restore' | 'migrations' | 'rollback', progress: number) => void, -): Promise { - logger.debug(`Database Restore Started`); - - let complete = false; - try { - if (!isValidDatabaseBackupName(filename)) { - throw new Error('Invalid backup file format!'); - } - - const backupFilePath = path.join(StorageCore.getBaseFolder(StorageFolder.Backups), filename); - await storage.stat(backupFilePath); // => check file exists - - let isPgClusterDump = false; - const version = findVersion(filename); - if (version && semver.satisfies(version, '<= 2.4')) { - isPgClusterDump = true; - } - - const { bin, args, databaseUsername, databasePassword, databaseMajorVersion } = await buildPostgresLaunchArguments( - { logger, database: databaseRepository, ...pgRepos }, - 'psql', - { - singleTransaction: !isPgClusterDump, - }, - ); - - progressCb?.('backup', 0.05); - - const restorePointFilePath = await createDatabaseBackup( - { logger, storage, process: processRepository, database: databaseRepository, ...pgRepos }, - 'restore-point-', - ); - - logger.log(`Database Restore Starting. Database Version: ${databaseMajorVersion}`); - - let inputStream: Readable; - if (backupFilePath.endsWith('.gz')) { - const fileStream = storage.createPlainReadStream(backupFilePath); - const gunzip = storage.createGunzip(); - fileStream.pipe(gunzip); - inputStream = gunzip; - } else { - inputStream = storage.createPlainReadStream(backupFilePath); - } - - const sqlStream = Readable.from(sql(inputStream, databaseUsername, isPgClusterDump)); - const psql = processRepository.spawnDuplexStream(bin, args, { - env: { - PATH: process.env.PATH, - PGPASSWORD: databasePassword, - }, - }); - - const [progressSource, progressSink] = createSqlProgressStreams((progress) => { - if (complete) { - return; - } - - logger.log(`Restore progress ~ ${(progress * 100).toFixed(2)}%`); - progressCb?.('restore', progress); - }); - - await pipeline(sqlStream, progressSource, psql, progressSink); - - try { - progressCb?.('migrations', 0.9); - await databaseRepository.runMigrations(); - await health.checkApiHealth(); - } catch (error) { - progressCb?.('rollback', 0); - - const fileStream = storage.createPlainReadStream(restorePointFilePath); - const gunzip = storage.createGunzip(); - fileStream.pipe(gunzip); - inputStream = gunzip; - - const sqlStream = Readable.from(sqlRollback(inputStream, databaseUsername)); - const psql = processRepository.spawnDuplexStream(bin, args, { - env: { - PATH: process.env.PATH, - PGPASSWORD: databasePassword, - }, - }); - - const [progressSource, progressSink] = createSqlProgressStreams((progress) => { - if (complete) { - return; - } - - logger.log(`Rollback progress ~ ${(progress * 100).toFixed(2)}%`); - progressCb?.('rollback', progress); - }); - - await pipeline(sqlStream, progressSource, psql, progressSink); - - throw error; - } - } catch (error) { - logger.error(`Database Restore Failure: ${error}`); - throw error; - } finally { - complete = true; - } - - logger.log(`Database Restore Success`); -} - -export async function deleteDatabaseBackup({ storage }: Pick, files: string[]): Promise { - const backupsFolder = StorageCore.getBaseFolder(StorageFolder.Backups); - - if (files.some((filename) => !isValidDatabaseBackupName(filename))) { - throw new BadRequestException('Invalid backup name!'); - } - - await Promise.all(files.map((filename) => storage.unlink(path.join(backupsFolder, filename)))); -} - -export async function listDatabaseBackups({ - storage, -}: Pick): Promise<{ filename: string; filesize: number }[]> { - const backupsFolder = StorageCore.getBaseFolder(StorageFolder.Backups); - const files = await storage.readdir(backupsFolder); - - const validFiles = files - .filter((fn) => isValidDatabaseBackupName(fn)) - .toSorted((a, b) => (a.startsWith('uploaded-') === b.startsWith('uploaded-') ? a.localeCompare(b) : 1)) - .toReversed(); - - const backups = await Promise.all( - validFiles.map(async (filename) => { - const stats = await storage.stat(path.join(backupsFolder, filename)); - return { filename, filesize: stats.size }; - }), - ); - - return backups; -} - -export async function uploadDatabaseBackup( - { storage }: Pick, - file: Express.Multer.File, -): Promise { - const backupsFolder = StorageCore.getBaseFolder(StorageFolder.Backups); - const fn = basename(file.originalname); - if (!isValidDatabaseBackupName(fn)) { - throw new BadRequestException('Invalid backup name!'); - } - - const path = join(backupsFolder, `uploaded-${fn}`); - await storage.createOrOverwriteFile(path, file.buffer); -} - -export function downloadDatabaseBackup(fileName: string) { - if (!isValidDatabaseBackupName(fileName)) { - throw new BadRequestException('Invalid backup name!'); - } - - const path = join(StorageCore.getBaseFolder(StorageFolder.Backups), fileName); - - return { - path, - fileName, - cacheControl: CacheControl.PrivateWithoutCache, - contentType: fileName.endsWith('.gz') ? 'application/gzip' : 'application/sql', - }; -} - -function createSqlProgressStreams(cb: (progress: number) => void) { - const STDIN_START_MARKER = new TextEncoder().encode('FROM stdin'); - const STDIN_END_MARKER = new TextEncoder().encode(String.raw`\.`); - - let readingStdin = false; - let sequenceIdx = 0; - - let linesSent = 0; - let linesProcessed = 0; - - const startedAt = +Date.now(); - const cbDebounced = debounce( - () => { - const progress = source.writableEnded - ? Math.min(1, linesProcessed / linesSent) - : // progress simulation while we're in an indeterminate state - Math.min(0.3, 0.1 + (Date.now() - startedAt) / 1e4); - cb(progress); - }, - 100, - { - maxWait: 100, - }, - ); - - let lastByte = -1; - const source = new PassThrough({ - transform(chunk, _encoding, callback) { - for (const byte of chunk) { - if (!readingStdin && byte === 10 && lastByte !== 10) { - linesSent += 1; - } - - lastByte = byte; - - const sequence = readingStdin ? STDIN_END_MARKER : STDIN_START_MARKER; - if (sequence[sequenceIdx] === byte) { - sequenceIdx += 1; - - if (sequence.length === sequenceIdx) { - sequenceIdx = 0; - readingStdin = !readingStdin; - } - } else { - sequenceIdx = 0; - } - } - - cbDebounced(); - this.push(chunk); - callback(); - }, - }); - - const sink = new Writable({ - write(chunk, _encoding, callback) { - for (const byte of chunk) { - if (byte === 10) { - linesProcessed++; - } - } - - cbDebounced(); - callback(); - }, - }); - - return [source, sink]; -} diff --git a/server/test/utils.ts b/server/test/utils.ts index cd866994eb..ff913e018e 100644 --- a/server/test/utils.ts +++ b/server/test/utils.ts @@ -496,10 +496,12 @@ export const mockSpawn = vitest.fn((exitCode: number, stdout: string, stderr: st } as unknown as ChildProcessWithoutNullStreams; }); -export const mockDuplex = vitest.fn( +export const mockDuplex = + (chunkCb?: (chunk: Buffer) => void) => (command: string, exitCode: number, stdout: string, stderr: string, error?: unknown) => { const duplex = new Duplex({ - write(_chunk, _encoding, callback) { + write(chunk, _encoding, callback) { + chunkCb?.(chunk); callback(); }, @@ -524,8 +526,7 @@ export const mockDuplex = vitest.fn( }); return duplex; - }, -); + }; export const mockFork = vitest.fn((exitCode: number, stdout: string, stderr: string, error?: unknown) => { const stdoutStream = new Readable({