mirror of
https://github.com/immich-app/immich.git
synced 2026-03-06 10:07:48 +03:00
refactor: database repository (#16593)
* refactor: database repository * fix error reindex check * chore: remove WIP code --------- Co-authored-by: mertalev <101130780+mertalev@users.noreply.github.com>
This commit is contained in:
@@ -1,18 +1,17 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { InjectDataSource } from '@nestjs/typeorm';
|
||||
import AsyncLock from 'async-lock';
|
||||
import { Kysely, sql } from 'kysely';
|
||||
import { Kysely, sql, Transaction } from 'kysely';
|
||||
import { InjectKysely } from 'nestjs-kysely';
|
||||
import semver from 'semver';
|
||||
import { EXTENSION_NAMES, POSTGRES_VERSION_RANGE, VECTOR_VERSION_RANGE, VECTORS_VERSION_RANGE } from 'src/constants';
|
||||
import { DB } from 'src/db';
|
||||
import { GenerateSql } from 'src/decorators';
|
||||
import { DatabaseExtension, DatabaseLock, VectorIndex } from 'src/enum';
|
||||
import { ConfigRepository } from 'src/repositories/config.repository';
|
||||
import { LoggingRepository } from 'src/repositories/logging.repository';
|
||||
import { ExtensionVersion, VectorExtension, VectorUpdateResult } from 'src/types';
|
||||
import { UPSERT_COLUMNS } from 'src/utils/database';
|
||||
import { isValidInteger } from 'src/validation';
|
||||
import { DataSource, EntityManager, EntityMetadata, QueryRunner } from 'typeorm';
|
||||
import { DataSource } from 'typeorm';
|
||||
|
||||
@Injectable()
|
||||
export class DatabaseRepository {
|
||||
@@ -21,9 +20,8 @@ export class DatabaseRepository {
|
||||
|
||||
constructor(
|
||||
@InjectKysely() private db: Kysely<DB>,
|
||||
@InjectDataSource() private dataSource: DataSource,
|
||||
private logger: LoggingRepository,
|
||||
configRepository: ConfigRepository,
|
||||
private configRepository: ConfigRepository,
|
||||
) {
|
||||
this.vectorExtension = configRepository.getEnv().database.vectorExtension;
|
||||
this.logger.setContext(DatabaseRepository.name);
|
||||
@@ -33,43 +31,24 @@ export class DatabaseRepository {
|
||||
await this.db.destroy();
|
||||
}
|
||||
|
||||
init() {
|
||||
for (const metadata of this.dataSource.entityMetadatas) {
|
||||
const table = metadata.tableName as keyof DB;
|
||||
UPSERT_COLUMNS[table] = this.getUpsertColumns(metadata);
|
||||
}
|
||||
}
|
||||
|
||||
async reconnect() {
|
||||
try {
|
||||
if (this.dataSource.isInitialized) {
|
||||
await this.dataSource.destroy();
|
||||
}
|
||||
const { isInitialized } = await this.dataSource.initialize();
|
||||
return isInitialized;
|
||||
} catch (error) {
|
||||
this.logger.error(`Database connection failed: ${error}`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [DatabaseExtension.VECTORS] })
|
||||
async getExtensionVersion(extension: DatabaseExtension): Promise<ExtensionVersion> {
|
||||
const [res]: ExtensionVersion[] = await this.dataSource.query(
|
||||
`SELECT default_version as "availableVersion", installed_version as "installedVersion"
|
||||
const { rows } = await sql<ExtensionVersion>`
|
||||
SELECT default_version as "availableVersion", installed_version as "installedVersion"
|
||||
FROM pg_available_extensions
|
||||
WHERE name = $1`,
|
||||
[extension],
|
||||
);
|
||||
return res ?? { availableVersion: null, installedVersion: null };
|
||||
WHERE name = ${extension}
|
||||
`.execute(this.db);
|
||||
return rows[0] ?? { availableVersion: null, installedVersion: null };
|
||||
}
|
||||
|
||||
getExtensionVersionRange(extension: VectorExtension): string {
|
||||
return extension === DatabaseExtension.VECTORS ? VECTORS_VERSION_RANGE : VECTOR_VERSION_RANGE;
|
||||
}
|
||||
|
||||
@GenerateSql()
|
||||
async getPostgresVersion(): Promise<string> {
|
||||
const [{ server_version: version }] = await this.dataSource.query(`SHOW server_version`);
|
||||
return version;
|
||||
const { rows } = await sql<{ server_version: string }>`SHOW server_version`.execute(this.db);
|
||||
return rows[0].server_version;
|
||||
}
|
||||
|
||||
getPostgresVersionRange(): string {
|
||||
@@ -77,7 +56,7 @@ export class DatabaseRepository {
|
||||
}
|
||||
|
||||
async createExtension(extension: DatabaseExtension): Promise<void> {
|
||||
await this.dataSource.query(`CREATE EXTENSION IF NOT EXISTS ${extension}`);
|
||||
await sql`CREATE EXTENSION IF NOT EXISTS ${sql.raw(extension)}`.execute(this.db);
|
||||
}
|
||||
|
||||
async updateVectorExtension(extension: VectorExtension, targetVersion?: string): Promise<VectorUpdateResult> {
|
||||
@@ -93,23 +72,23 @@ export class DatabaseRepository {
|
||||
|
||||
const isVectors = extension === DatabaseExtension.VECTORS;
|
||||
let restartRequired = false;
|
||||
await this.dataSource.manager.transaction(async (manager) => {
|
||||
await this.setSearchPath(manager);
|
||||
await this.db.transaction().execute(async (tx) => {
|
||||
await this.setSearchPath(tx);
|
||||
|
||||
if (isVectors && installedVersion === '0.1.1') {
|
||||
await this.setExtVersion(manager, DatabaseExtension.VECTORS, '0.1.11');
|
||||
await this.setExtVersion(tx, DatabaseExtension.VECTORS, '0.1.11');
|
||||
}
|
||||
|
||||
const isSchemaUpgrade = semver.satisfies(installedVersion, '0.1.1 || 0.1.11');
|
||||
if (isSchemaUpgrade && isVectors) {
|
||||
await this.updateVectorsSchema(manager);
|
||||
await this.updateVectorsSchema(tx);
|
||||
}
|
||||
|
||||
await manager.query(`ALTER EXTENSION ${extension} UPDATE TO '${targetVersion}'`);
|
||||
await sql`ALTER EXTENSION ${sql.raw(extension)} UPDATE TO ${sql.lit(targetVersion)}`.execute(tx);
|
||||
|
||||
const diff = semver.diff(installedVersion, targetVersion);
|
||||
if (isVectors && diff && ['minor', 'major'].includes(diff)) {
|
||||
await manager.query('SELECT pgvectors_upgrade()');
|
||||
await sql`SELECT pgvectors_upgrade()`.execute(tx);
|
||||
restartRequired = true;
|
||||
} else {
|
||||
await this.reindex(VectorIndex.CLIP);
|
||||
@@ -122,7 +101,7 @@ export class DatabaseRepository {
|
||||
|
||||
async reindex(index: VectorIndex): Promise<void> {
|
||||
try {
|
||||
await this.dataSource.query(`REINDEX INDEX ${index}`);
|
||||
await sql`REINDEX INDEX ${sql.raw(index)}`.execute(this.db);
|
||||
} catch (error) {
|
||||
if (this.vectorExtension !== DatabaseExtension.VECTORS) {
|
||||
throw error;
|
||||
@@ -131,29 +110,34 @@ export class DatabaseRepository {
|
||||
|
||||
const table = await this.getIndexTable(index);
|
||||
const dimSize = await this.getDimSize(table);
|
||||
await this.dataSource.manager.transaction(async (manager) => {
|
||||
await this.setSearchPath(manager);
|
||||
await manager.query(`DROP INDEX IF EXISTS ${index}`);
|
||||
await manager.query(`ALTER TABLE ${table} ALTER COLUMN embedding SET DATA TYPE real[]`);
|
||||
await manager.query(`ALTER TABLE ${table} ALTER COLUMN embedding SET DATA TYPE vector(${dimSize})`);
|
||||
await manager.query(`SET vectors.pgvector_compatibility=on`);
|
||||
await manager.query(`
|
||||
CREATE INDEX IF NOT EXISTS ${index} ON ${table}
|
||||
await this.db.transaction().execute(async (tx) => {
|
||||
await this.setSearchPath(tx);
|
||||
await sql`DROP INDEX IF EXISTS ${sql.raw(index)}`.execute(tx);
|
||||
await sql`ALTER TABLE ${sql.raw(table)} ALTER COLUMN embedding SET DATA TYPE real[]`.execute(tx);
|
||||
await sql`ALTER TABLE ${sql.raw(table)} ALTER COLUMN embedding SET DATA TYPE vector(${sql.raw(String(dimSize))})`.execute(
|
||||
tx,
|
||||
);
|
||||
await sql`SET vectors.pgvector_compatibility=on`.execute(tx);
|
||||
await sql`
|
||||
CREATE INDEX IF NOT EXISTS ${sql.raw(index)} ON ${sql.raw(table)}
|
||||
USING hnsw (embedding vector_cosine_ops)
|
||||
WITH (ef_construction = 300, m = 16)`);
|
||||
WITH (ef_construction = 300, m = 16)
|
||||
`.execute(tx);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [VectorIndex.CLIP] })
|
||||
async shouldReindex(name: VectorIndex): Promise<boolean> {
|
||||
if (this.vectorExtension !== DatabaseExtension.VECTORS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
const query = `SELECT idx_status FROM pg_vector_index_stat WHERE indexname = $1`;
|
||||
const res = await this.dataSource.query(query, [name]);
|
||||
return res[0]?.['idx_status'] === 'UPGRADE';
|
||||
const { rows } = await sql<{
|
||||
idx_status: string;
|
||||
}>`SELECT idx_status FROM pg_vector_index_stat WHERE indexname = ${name}`.execute(this.db);
|
||||
return rows[0]?.idx_status === 'UPGRADE';
|
||||
} catch (error) {
|
||||
const message: string = (error as any).message;
|
||||
if (message.includes('index is not existing')) {
|
||||
@@ -165,44 +149,45 @@ export class DatabaseRepository {
|
||||
}
|
||||
}
|
||||
|
||||
private async setSearchPath(manager: EntityManager): Promise<void> {
|
||||
await manager.query(`SET search_path TO "$user", public, vectors`);
|
||||
private async setSearchPath(tx: Transaction<DB>): Promise<void> {
|
||||
await sql`SET search_path TO "$user", public, vectors`.execute(tx);
|
||||
}
|
||||
|
||||
private async setExtVersion(manager: EntityManager, extName: DatabaseExtension, version: string): Promise<void> {
|
||||
const query = `UPDATE pg_catalog.pg_extension SET extversion = $1 WHERE extname = $2`;
|
||||
await manager.query(query, [version, extName]);
|
||||
private async setExtVersion(tx: Transaction<DB>, extName: DatabaseExtension, version: string): Promise<void> {
|
||||
await sql`UPDATE pg_catalog.pg_extension SET extversion = ${version} WHERE extname = ${extName}`.execute(tx);
|
||||
}
|
||||
|
||||
private async getIndexTable(index: VectorIndex): Promise<string> {
|
||||
const tableQuery = `SELECT relname FROM pg_stat_all_indexes WHERE indexrelname = $1`;
|
||||
const [res]: { relname: string | null }[] = await this.dataSource.manager.query(tableQuery, [index]);
|
||||
const table = res?.relname;
|
||||
const { rows } = await sql<{
|
||||
relname: string | null;
|
||||
}>`SELECT relname FROM pg_stat_all_indexes WHERE indexrelname = ${index}`.execute(this.db);
|
||||
const table = rows[0]?.relname;
|
||||
if (!table) {
|
||||
throw new Error(`Could not find table for index ${index}`);
|
||||
}
|
||||
return table;
|
||||
}
|
||||
|
||||
private async updateVectorsSchema(manager: EntityManager): Promise<void> {
|
||||
private async updateVectorsSchema(tx: Transaction<DB>): Promise<void> {
|
||||
const extension = DatabaseExtension.VECTORS;
|
||||
await manager.query(`CREATE SCHEMA IF NOT EXISTS ${extension}`);
|
||||
await manager.query('UPDATE pg_catalog.pg_extension SET extrelocatable = true WHERE extname = $1', [extension]);
|
||||
await manager.query('ALTER EXTENSION vectors SET SCHEMA vectors');
|
||||
await manager.query('UPDATE pg_catalog.pg_extension SET extrelocatable = false WHERE extname = $1', [extension]);
|
||||
await sql`CREATE SCHEMA IF NOT EXISTS ${extension}`.execute(tx);
|
||||
await sql`UPDATE pg_catalog.pg_extension SET extrelocatable = true WHERE extname = ${extension}`.execute(tx);
|
||||
await sql`ALTER EXTENSION vectors SET SCHEMA vectors`.execute(tx);
|
||||
await sql`UPDATE pg_catalog.pg_extension SET extrelocatable = false WHERE extname = ${extension}`.execute(tx);
|
||||
}
|
||||
|
||||
private async getDimSize(table: string, column = 'embedding'): Promise<number> {
|
||||
const res = await this.dataSource.query(`
|
||||
const { rows } = await sql<{ dimsize: number }>`
|
||||
SELECT atttypmod as dimsize
|
||||
FROM pg_attribute f
|
||||
JOIN pg_class c ON c.oid = f.attrelid
|
||||
WHERE c.relkind = 'r'::char
|
||||
AND f.attnum > 0
|
||||
AND c.relname = '${table}'
|
||||
AND f.attname = '${column}'`);
|
||||
AND c.relname = ${table}
|
||||
AND f.attname = '${column}'
|
||||
`.execute(this.db);
|
||||
|
||||
const dimSize = res[0]['dimsize'];
|
||||
const dimSize = rows[0]?.dimsize;
|
||||
if (!isValidInteger(dimSize, { min: 1, max: 2 ** 16 })) {
|
||||
throw new Error(`Could not retrieve dimension size`);
|
||||
}
|
||||
@@ -210,31 +195,32 @@ export class DatabaseRepository {
|
||||
}
|
||||
|
||||
async runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise<void> {
|
||||
await this.dataSource.runMigrations(options);
|
||||
const { database } = this.configRepository.getEnv();
|
||||
const dataSource = new DataSource(database.config.typeorm);
|
||||
|
||||
await dataSource.initialize();
|
||||
await dataSource.runMigrations(options);
|
||||
await dataSource.destroy();
|
||||
}
|
||||
|
||||
async withLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R> {
|
||||
let res;
|
||||
await this.asyncLock.acquire(DatabaseLock[lock], async () => {
|
||||
const queryRunner = this.dataSource.createQueryRunner();
|
||||
try {
|
||||
await this.acquireLock(lock, queryRunner);
|
||||
res = await callback();
|
||||
} finally {
|
||||
await this.db.connection().execute(async (connection) => {
|
||||
try {
|
||||
await this.releaseLock(lock, queryRunner);
|
||||
await this.acquireLock(lock, connection);
|
||||
res = await callback();
|
||||
} finally {
|
||||
await queryRunner.release();
|
||||
await this.releaseLock(lock, connection);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
return res as R;
|
||||
}
|
||||
|
||||
async tryLock(lock: DatabaseLock): Promise<boolean> {
|
||||
const queryRunner = this.dataSource.createQueryRunner();
|
||||
return await this.acquireTryLock(lock, queryRunner);
|
||||
tryLock(lock: DatabaseLock): Promise<boolean> {
|
||||
return this.db.connection().execute(async (connection) => this.acquireTryLock(lock, connection));
|
||||
}
|
||||
|
||||
isBusy(lock: DatabaseLock): boolean {
|
||||
@@ -245,22 +231,18 @@ export class DatabaseRepository {
|
||||
await this.asyncLock.acquire(DatabaseLock[lock], () => {});
|
||||
}
|
||||
|
||||
private async acquireLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise<void> {
|
||||
return queryRunner.query('SELECT pg_advisory_lock($1)', [lock]);
|
||||
private async acquireLock(lock: DatabaseLock, connection: Kysely<DB>): Promise<void> {
|
||||
await sql`SELECT pg_advisory_lock(${lock})`.execute(connection);
|
||||
}
|
||||
|
||||
private async acquireTryLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise<boolean> {
|
||||
const lockResult = await queryRunner.query('SELECT pg_try_advisory_lock($1)', [lock]);
|
||||
return lockResult[0].pg_try_advisory_lock;
|
||||
private async acquireTryLock(lock: DatabaseLock, connection: Kysely<DB>): Promise<boolean> {
|
||||
const { rows } = await sql<{
|
||||
pg_try_advisory_lock: boolean;
|
||||
}>`SELECT pg_try_advisory_lock(${lock})`.execute(connection);
|
||||
return rows[0].pg_try_advisory_lock;
|
||||
}
|
||||
|
||||
private async releaseLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise<void> {
|
||||
return queryRunner.query('SELECT pg_advisory_unlock($1)', [lock]);
|
||||
}
|
||||
|
||||
private getUpsertColumns(metadata: EntityMetadata) {
|
||||
return Object.fromEntries(
|
||||
metadata.ownColumns.map((column) => [column.propertyName, sql<string>`excluded.${sql.ref(column.propertyName)}`]),
|
||||
) as any;
|
||||
private async releaseLock(lock: DatabaseLock, connection: Kysely<DB>): Promise<void> {
|
||||
await sql`SELECT pg_advisory_unlock(${lock})`.execute(connection);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user