mirror of
https://github.com/immich-app/immich.git
synced 2026-03-04 09:57:33 +03:00
refactor(server): startup checks for vector extension (#11559)
* update update logic refactor * update tests * get version range through repo method, make tests more static * move "should work" test
This commit is contained in:
@@ -2,11 +2,13 @@ import { Inject, Injectable } from '@nestjs/common';
|
||||
import { InjectDataSource } from '@nestjs/typeorm';
|
||||
import AsyncLock from 'async-lock';
|
||||
import semver from 'semver';
|
||||
import { POSTGRES_VERSION_RANGE, VECTOR_VERSION_RANGE, VECTORS_VERSION_RANGE } from 'src/constants';
|
||||
import { getVectorExtension } from 'src/database.config';
|
||||
import {
|
||||
DatabaseExtension,
|
||||
DatabaseLock,
|
||||
EXTENSION_NAMES,
|
||||
ExtensionVersion,
|
||||
IDatabaseRepository,
|
||||
VectorExtension,
|
||||
VectorIndex,
|
||||
@@ -29,20 +31,18 @@ export class DatabaseRepository implements IDatabaseRepository {
|
||||
this.logger.setContext(DatabaseRepository.name);
|
||||
}
|
||||
|
||||
async getExtensionVersion(extension: DatabaseExtension): Promise<string | undefined> {
|
||||
const res = await this.dataSource.query(`SELECT extversion FROM pg_extension WHERE extname = $1`, [extension]);
|
||||
return res[0]?.['extversion'];
|
||||
}
|
||||
|
||||
async getAvailableExtensionVersion(extension: DatabaseExtension): Promise<string | undefined> {
|
||||
const res = await this.dataSource.query(
|
||||
`
|
||||
SELECT version FROM pg_available_extension_versions
|
||||
WHERE name = $1 AND installed = false
|
||||
ORDER BY version DESC`,
|
||||
async getExtensionVersion(extension: DatabaseExtension): Promise<ExtensionVersion> {
|
||||
const [res]: ExtensionVersion[] = await this.dataSource.query(
|
||||
`SELECT default_version as "availableVersion", installed_version as "installedVersion"
|
||||
FROM pg_available_extensions
|
||||
WHERE name = $1`,
|
||||
[extension],
|
||||
);
|
||||
return res[0]?.['version'];
|
||||
return res ?? { availableVersion: null, installedVersion: null };
|
||||
}
|
||||
|
||||
getExtensionVersionRange(extension: VectorExtension): string {
|
||||
return extension === DatabaseExtension.VECTORS ? VECTORS_VERSION_RANGE : VECTOR_VERSION_RANGE;
|
||||
}
|
||||
|
||||
async getPostgresVersion(): Promise<string> {
|
||||
@@ -50,6 +50,10 @@ export class DatabaseRepository implements IDatabaseRepository {
|
||||
return version;
|
||||
}
|
||||
|
||||
getPostgresVersionRange(): string {
|
||||
return POSTGRES_VERSION_RANGE;
|
||||
}
|
||||
|
||||
async createExtension(extension: DatabaseExtension): Promise<void> {
|
||||
await this.dataSource.query(`CREATE EXTENSION IF NOT EXISTS ${extension}`);
|
||||
}
|
||||
@@ -59,28 +63,34 @@ export class DatabaseRepository implements IDatabaseRepository {
|
||||
}
|
||||
|
||||
async updateVectorExtension(extension: VectorExtension, targetVersion?: string): Promise<VectorUpdateResult> {
|
||||
const currentVersion = await this.getExtensionVersion(extension);
|
||||
if (!currentVersion) {
|
||||
const { availableVersion, installedVersion } = await this.getExtensionVersion(extension);
|
||||
if (!installedVersion) {
|
||||
throw new Error(`${EXTENSION_NAMES[extension]} extension is not installed`);
|
||||
}
|
||||
|
||||
if (!availableVersion) {
|
||||
throw new Error(`No available version for ${EXTENSION_NAMES[extension]} extension`);
|
||||
}
|
||||
targetVersion ??= availableVersion;
|
||||
|
||||
const isVectors = extension === DatabaseExtension.VECTORS;
|
||||
let restartRequired = false;
|
||||
await this.dataSource.manager.transaction(async (manager) => {
|
||||
await this.setSearchPath(manager);
|
||||
|
||||
const isSchemaUpgrade = targetVersion && semver.satisfies(targetVersion, '0.1.1 || 0.1.11');
|
||||
if (isVectors && installedVersion === '0.1.1') {
|
||||
await this.setExtVersion(manager, DatabaseExtension.VECTORS, '0.1.11');
|
||||
}
|
||||
|
||||
const isSchemaUpgrade = semver.satisfies(installedVersion, '0.1.1 || 0.1.11');
|
||||
if (isSchemaUpgrade && isVectors) {
|
||||
await this.updateVectorsSchema(manager, currentVersion);
|
||||
await this.updateVectorsSchema(manager);
|
||||
}
|
||||
|
||||
await manager.query(`ALTER EXTENSION ${extension} UPDATE${targetVersion ? ` TO '${targetVersion}'` : ''}`);
|
||||
await manager.query(`ALTER EXTENSION ${extension} UPDATE TO '${targetVersion}'`);
|
||||
|
||||
if (!isSchemaUpgrade) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (isVectors) {
|
||||
const diff = semver.diff(installedVersion, targetVersion);
|
||||
if (isVectors && diff && ['minor', 'major'].includes(diff)) {
|
||||
await manager.query('SELECT pgvectors_upgrade()');
|
||||
restartRequired = true;
|
||||
} else {
|
||||
@@ -96,24 +106,24 @@ export class DatabaseRepository implements IDatabaseRepository {
|
||||
try {
|
||||
await this.dataSource.query(`REINDEX INDEX ${index}`);
|
||||
} catch (error) {
|
||||
if (getVectorExtension() === DatabaseExtension.VECTORS) {
|
||||
this.logger.warn(`Could not reindex index ${index}. Attempting to auto-fix.`);
|
||||
const table = index === VectorIndex.CLIP ? 'smart_search' : 'face_search';
|
||||
const dimSize = await this.getDimSize(table);
|
||||
await this.dataSource.manager.transaction(async (manager) => {
|
||||
await this.setSearchPath(manager);
|
||||
await manager.query(`DROP INDEX IF EXISTS ${index}`);
|
||||
await manager.query(`ALTER TABLE ${table} ALTER COLUMN embedding SET DATA TYPE real[]`);
|
||||
await manager.query(`ALTER TABLE ${table} ALTER COLUMN embedding SET DATA TYPE vector(${dimSize})`);
|
||||
await manager.query(`SET vectors.pgvector_compatibility=on`);
|
||||
await manager.query(`
|
||||
CREATE INDEX IF NOT EXISTS ${index} ON ${table}
|
||||
USING hnsw (embedding vector_cosine_ops)
|
||||
WITH (ef_construction = 300, m = 16)`);
|
||||
});
|
||||
} else {
|
||||
if (getVectorExtension() !== DatabaseExtension.VECTORS) {
|
||||
throw error;
|
||||
}
|
||||
this.logger.warn(`Could not reindex index ${index}. Attempting to auto-fix.`);
|
||||
|
||||
const table = await this.getIndexTable(index);
|
||||
const dimSize = await this.getDimSize(table);
|
||||
await this.dataSource.manager.transaction(async (manager) => {
|
||||
await this.setSearchPath(manager);
|
||||
await manager.query(`DROP INDEX IF EXISTS ${index}`);
|
||||
await manager.query(`ALTER TABLE ${table} ALTER COLUMN embedding SET DATA TYPE real[]`);
|
||||
await manager.query(`ALTER TABLE ${table} ALTER COLUMN embedding SET DATA TYPE vector(${dimSize})`);
|
||||
await manager.query(`SET vectors.pgvector_compatibility=on`);
|
||||
await manager.query(`
|
||||
CREATE INDEX IF NOT EXISTS ${index} ON ${table}
|
||||
USING hnsw (embedding vector_cosine_ops)
|
||||
WITH (ef_construction = 300, m = 16)`);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,13 +133,8 @@ export class DatabaseRepository implements IDatabaseRepository {
|
||||
}
|
||||
|
||||
try {
|
||||
const res = await this.dataSource.query(
|
||||
`
|
||||
SELECT idx_status
|
||||
FROM pg_vector_index_stat
|
||||
WHERE indexname = $1`,
|
||||
[name],
|
||||
);
|
||||
const query = `SELECT idx_status FROM pg_vector_index_stat WHERE indexname = $1`;
|
||||
const res = await this.dataSource.query(query, [name]);
|
||||
return res[0]?.['idx_status'] === 'UPGRADE';
|
||||
} catch (error) {
|
||||
const message: string = (error as any).message;
|
||||
@@ -146,19 +151,27 @@ export class DatabaseRepository implements IDatabaseRepository {
|
||||
await manager.query(`SET search_path TO "$user", public, vectors`);
|
||||
}
|
||||
|
||||
private async updateVectorsSchema(manager: EntityManager, currentVersion: string): Promise<void> {
|
||||
await manager.query('CREATE SCHEMA IF NOT EXISTS vectors');
|
||||
await manager.query(`UPDATE pg_catalog.pg_extension SET extversion = $1 WHERE extname = $2`, [
|
||||
currentVersion,
|
||||
DatabaseExtension.VECTORS,
|
||||
]);
|
||||
await manager.query('UPDATE pg_catalog.pg_extension SET extrelocatable = true WHERE extname = $1', [
|
||||
DatabaseExtension.VECTORS,
|
||||
]);
|
||||
private async setExtVersion(manager: EntityManager, extName: DatabaseExtension, version: string): Promise<void> {
|
||||
const query = `UPDATE pg_catalog.pg_extension SET extversion = $1 WHERE extname = $2`;
|
||||
await manager.query(query, [version, extName]);
|
||||
}
|
||||
|
||||
private async getIndexTable(index: VectorIndex): Promise<string> {
|
||||
const tableQuery = `SELECT relname FROM pg_stat_all_indexes WHERE indexrelname = $1`;
|
||||
const [res]: { relname: string | null }[] = await this.dataSource.manager.query(tableQuery, [index]);
|
||||
const table = res?.relname;
|
||||
if (!table) {
|
||||
throw new Error(`Could not find table for index ${index}`);
|
||||
}
|
||||
return table;
|
||||
}
|
||||
|
||||
private async updateVectorsSchema(manager: EntityManager): Promise<void> {
|
||||
const extension = DatabaseExtension.VECTORS;
|
||||
await manager.query(`CREATE SCHEMA IF NOT EXISTS ${extension}`);
|
||||
await manager.query('UPDATE pg_catalog.pg_extension SET extrelocatable = true WHERE extname = $1', [extension]);
|
||||
await manager.query('ALTER EXTENSION vectors SET SCHEMA vectors');
|
||||
await manager.query('UPDATE pg_catalog.pg_extension SET extrelocatable = false WHERE extname = $1', [
|
||||
DatabaseExtension.VECTORS,
|
||||
]);
|
||||
await manager.query('UPDATE pg_catalog.pg_extension SET extrelocatable = false WHERE extname = $1', [extension]);
|
||||
}
|
||||
|
||||
private async getDimSize(table: string, column = 'embedding'): Promise<number> {
|
||||
|
||||
Reference in New Issue
Block a user