mirror of
https://github.com/immich-app/immich.git
synced 2026-03-01 18:19:10 +03:00
refactor: migration tag repository to kysely (#16398)
This commit is contained in:
@@ -1,209 +1,188 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { InjectDataSource, InjectRepository } from '@nestjs/typeorm';
|
||||
import { Insertable, Kysely, sql, Updateable } from 'kysely';
|
||||
import { InjectKysely } from 'nestjs-kysely';
|
||||
import { columns } from 'src/database';
|
||||
import { DB, TagAsset, Tags } from 'src/db';
|
||||
import { Chunked, ChunkedSet, DummyValue, GenerateSql } from 'src/decorators';
|
||||
import { TagEntity } from 'src/entities/tag.entity';
|
||||
import { LoggingRepository } from 'src/repositories/logging.repository';
|
||||
import { DataSource, In, Repository } from 'typeorm';
|
||||
|
||||
export type AssetTagItem = { assetId: string; tagId: string };
|
||||
|
||||
@Injectable()
|
||||
export class TagRepository {
|
||||
constructor(
|
||||
@InjectDataSource() private dataSource: DataSource,
|
||||
@InjectRepository(TagEntity) private repository: Repository<TagEntity>,
|
||||
@InjectKysely() private db: Kysely<DB>,
|
||||
private logger: LoggingRepository,
|
||||
) {
|
||||
this.logger.setContext(TagRepository.name);
|
||||
}
|
||||
|
||||
get(id: string): Promise<TagEntity | null> {
|
||||
return this.repository.findOne({ where: { id } });
|
||||
@GenerateSql({ params: [DummyValue.UUID] })
|
||||
get(id: string) {
|
||||
return this.db.selectFrom('tags').select(columns.tagDto).where('id', '=', id).executeTakeFirst();
|
||||
}
|
||||
|
||||
getByValue(userId: string, value: string): Promise<TagEntity | null> {
|
||||
return this.repository.findOne({ where: { userId, value } });
|
||||
@GenerateSql({ params: [DummyValue.UUID, DummyValue.STRING] })
|
||||
getByValue(userId: string, value: string) {
|
||||
return this.db
|
||||
.selectFrom('tags')
|
||||
.select(columns.tagDto)
|
||||
.where('userId', '=', userId)
|
||||
.where('value', '=', value)
|
||||
.executeTakeFirst();
|
||||
}
|
||||
|
||||
async upsertValue({
|
||||
userId,
|
||||
value,
|
||||
parent,
|
||||
}: {
|
||||
userId: string;
|
||||
value: string;
|
||||
parent?: TagEntity;
|
||||
}): Promise<TagEntity> {
|
||||
return this.dataSource.transaction(async (manager) => {
|
||||
// upsert tag
|
||||
const { identifiers } = await manager.upsert(
|
||||
TagEntity,
|
||||
{ userId, value, parentId: parent?.id },
|
||||
{ conflictPaths: { userId: true, value: true } },
|
||||
);
|
||||
const id = identifiers[0]?.id;
|
||||
if (!id) {
|
||||
throw new Error('Failed to upsert tag');
|
||||
}
|
||||
@GenerateSql({ params: [{ userId: DummyValue.UUID, value: DummyValue.STRING, parentId: DummyValue.UUID }] })
|
||||
async upsertValue({ userId, value, parentId: _parentId }: { userId: string; value: string; parentId?: string }) {
|
||||
const parentId = _parentId ?? null;
|
||||
return this.db.transaction().execute(async (tx) => {
|
||||
const tag = await this.db
|
||||
.insertInto('tags')
|
||||
.values({ userId, value, parentId })
|
||||
.onConflict((oc) => oc.columns(['userId', 'value']).doUpdateSet({ parentId }))
|
||||
.returningAll()
|
||||
.executeTakeFirstOrThrow();
|
||||
|
||||
// update closure table
|
||||
await manager.query(
|
||||
`INSERT INTO tags_closure (id_ancestor, id_descendant)
|
||||
VALUES ($1, $1)
|
||||
ON CONFLICT DO NOTHING;`,
|
||||
[id],
|
||||
);
|
||||
await tx
|
||||
.insertInto('tags_closure')
|
||||
.values({ id_ancestor: tag.id, id_descendant: tag.id })
|
||||
.onConflict((oc) => oc.doNothing())
|
||||
.execute();
|
||||
|
||||
if (parent) {
|
||||
await manager.query(
|
||||
`INSERT INTO tags_closure (id_ancestor, id_descendant)
|
||||
SELECT id_ancestor, '${id}' as id_descendant FROM tags_closure WHERE id_descendant = $1
|
||||
ON CONFLICT DO NOTHING`,
|
||||
[parent.id],
|
||||
);
|
||||
if (parentId) {
|
||||
await tx
|
||||
.insertInto('tags_closure')
|
||||
.columns(['id_ancestor', 'id_descendant'])
|
||||
.expression(
|
||||
this.db
|
||||
.selectFrom('tags_closure')
|
||||
.select(['id_ancestor', sql.raw<string>(`'${tag.id}'`).as('id_descendant')])
|
||||
.where('id_descendant', '=', parentId),
|
||||
)
|
||||
.onConflict((oc) => oc.doNothing())
|
||||
.execute();
|
||||
}
|
||||
|
||||
return manager.findOneOrFail(TagEntity, { where: { id } });
|
||||
return tag;
|
||||
});
|
||||
}
|
||||
|
||||
async getAll(userId: string): Promise<TagEntity[]> {
|
||||
const tags = await this.repository.find({
|
||||
where: { userId },
|
||||
order: {
|
||||
value: 'ASC',
|
||||
},
|
||||
});
|
||||
|
||||
return tags;
|
||||
@GenerateSql({ params: [DummyValue.UUID] })
|
||||
getAll(userId: string) {
|
||||
return this.db
|
||||
.selectFrom('tags')
|
||||
.select(columns.tagDto)
|
||||
.where('userId', '=', userId)
|
||||
.orderBy('value asc')
|
||||
.execute();
|
||||
}
|
||||
|
||||
create(tag: Partial<TagEntity>): Promise<TagEntity> {
|
||||
return this.save(tag);
|
||||
@GenerateSql({ params: [{ userId: DummyValue.UUID, color: DummyValue.STRING, value: DummyValue.STRING }] })
|
||||
create(tag: Insertable<Tags>) {
|
||||
return this.db.insertInto('tags').values(tag).returningAll().executeTakeFirstOrThrow();
|
||||
}
|
||||
|
||||
update(tag: Partial<TagEntity>): Promise<TagEntity> {
|
||||
return this.save(tag);
|
||||
@GenerateSql({ params: [DummyValue.UUID, { color: DummyValue.STRING }] })
|
||||
update(id: string, dto: Updateable<Tags>) {
|
||||
return this.db.updateTable('tags').set(dto).where('id', '=', id).returningAll().executeTakeFirstOrThrow();
|
||||
}
|
||||
|
||||
async delete(id: string): Promise<void> {
|
||||
await this.repository.delete(id);
|
||||
@GenerateSql({ params: [DummyValue.UUID] })
|
||||
async delete(id: string) {
|
||||
await this.db.deleteFrom('tags').where('id', '=', id).execute();
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [DummyValue.UUID, [DummyValue.UUID]] })
|
||||
@ChunkedSet({ paramIndex: 1 })
|
||||
@GenerateSql({ params: [DummyValue.UUID, [DummyValue.UUID]] })
|
||||
async getAssetIds(tagId: string, assetIds: string[]): Promise<Set<string>> {
|
||||
if (assetIds.length === 0) {
|
||||
return new Set();
|
||||
}
|
||||
|
||||
const results = await this.dataSource
|
||||
.createQueryBuilder()
|
||||
.select('tag_asset.assetsId', 'assetId')
|
||||
.from('tag_asset', 'tag_asset')
|
||||
.where('"tag_asset"."tagsId" = :tagId', { tagId })
|
||||
.andWhere('"tag_asset"."assetsId" IN (:...assetIds)', { assetIds })
|
||||
.getRawMany<{ assetId: string }>();
|
||||
const results = await this.db
|
||||
.selectFrom('tag_asset')
|
||||
.select(['assetsId as assetId'])
|
||||
.where('tagsId', '=', tagId)
|
||||
.where('assetsId', 'in', assetIds)
|
||||
.execute();
|
||||
|
||||
return new Set(results.map(({ assetId }) => assetId));
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [DummyValue.UUID, [DummyValue.UUID]] })
|
||||
@Chunked({ paramIndex: 1 })
|
||||
async addAssetIds(tagId: string, assetIds: string[]): Promise<void> {
|
||||
if (assetIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.dataSource.manager
|
||||
.createQueryBuilder()
|
||||
.insert()
|
||||
.into('tag_asset', ['tagsId', 'assetsId'])
|
||||
await this.db
|
||||
.insertInto('tag_asset')
|
||||
.values(assetIds.map((assetId) => ({ tagsId: tagId, assetsId: assetId })))
|
||||
.execute();
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [DummyValue.UUID, [DummyValue.UUID]] })
|
||||
@Chunked({ paramIndex: 1 })
|
||||
async removeAssetIds(tagId: string, assetIds: string[]): Promise<void> {
|
||||
if (assetIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.dataSource
|
||||
.createQueryBuilder()
|
||||
.delete()
|
||||
.from('tag_asset')
|
||||
.where({
|
||||
tagsId: tagId,
|
||||
assetsId: In(assetIds),
|
||||
})
|
||||
.execute();
|
||||
await this.db.deleteFrom('tag_asset').where('tagsId', '=', tagId).where('assetsId', 'in', assetIds).execute();
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [{ assetId: DummyValue.UUID, tagsIds: [DummyValue.UUID] }] })
|
||||
@Chunked()
|
||||
async upsertAssetIds(items: AssetTagItem[]): Promise<AssetTagItem[]> {
|
||||
upsertAssetIds(items: Insertable<TagAsset>[]) {
|
||||
if (items.length === 0) {
|
||||
return [];
|
||||
return Promise.resolve([]);
|
||||
}
|
||||
|
||||
const { identifiers } = await this.dataSource
|
||||
.createQueryBuilder()
|
||||
.insert()
|
||||
.into('tag_asset', ['assetsId', 'tagsId'])
|
||||
.values(items.map(({ assetId, tagId }) => ({ assetsId: assetId, tagsId: tagId })))
|
||||
return this.db
|
||||
.insertInto('tag_asset')
|
||||
.values(items)
|
||||
.onConflict((oc) => oc.doNothing())
|
||||
.returningAll()
|
||||
.execute();
|
||||
|
||||
return (identifiers as Array<{ assetsId: string; tagsId: string }>).map(({ assetsId, tagsId }) => ({
|
||||
assetId: assetsId,
|
||||
tagId: tagsId,
|
||||
}));
|
||||
}
|
||||
|
||||
async upsertAssetTags({ assetId, tagIds }: { assetId: string; tagIds: string[] }) {
|
||||
await this.dataSource.transaction(async (manager) => {
|
||||
await manager.createQueryBuilder().delete().from('tag_asset').where({ assetsId: assetId }).execute();
|
||||
@GenerateSql({ params: [DummyValue.UUID, [DummyValue.UUID]] })
|
||||
@Chunked({ paramIndex: 1 })
|
||||
replaceAssetTags(assetId: string, tagIds: string[]) {
|
||||
return this.db.transaction().execute(async (tx) => {
|
||||
await tx.deleteFrom('tag_asset').where('assetsId', '=', assetId).execute();
|
||||
|
||||
if (tagIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
await manager
|
||||
.createQueryBuilder()
|
||||
.insert()
|
||||
.into('tag_asset', ['tagsId', 'assetsId'])
|
||||
return tx
|
||||
.insertInto('tag_asset')
|
||||
.values(tagIds.map((tagId) => ({ tagsId: tagId, assetsId: assetId })))
|
||||
.onConflict((oc) => oc.doNothing())
|
||||
.returningAll()
|
||||
.execute();
|
||||
});
|
||||
}
|
||||
|
||||
@GenerateSql()
|
||||
async deleteEmptyTags() {
|
||||
await this.dataSource.transaction(async (manager) => {
|
||||
const ids = new Set<string>();
|
||||
const tags = await manager.find(TagEntity);
|
||||
for (const tag of tags) {
|
||||
const count = await manager
|
||||
.createQueryBuilder('assets', 'asset')
|
||||
.innerJoin(
|
||||
'asset.tags',
|
||||
'asset_tags',
|
||||
'asset_tags.id IN (SELECT id_descendant FROM tags_closure WHERE id_ancestor = :tagId)',
|
||||
{ tagId: tag.id },
|
||||
)
|
||||
.getCount();
|
||||
// TODO rewrite as a single statement
|
||||
await this.db.transaction().execute(async (tx) => {
|
||||
const result = await tx
|
||||
.selectFrom('assets')
|
||||
.innerJoin('tag_asset', 'tag_asset.assetsId', 'assets.id')
|
||||
.innerJoin('tags_closure', 'tags_closure.id_descendant', 'tag_asset.tagsId')
|
||||
.innerJoin('tags', 'tags.id', 'tags_closure.id_descendant')
|
||||
.select((eb) => ['tags.id', eb.fn.count<number>('assets.id').as('count')])
|
||||
.groupBy('tags.id')
|
||||
.execute();
|
||||
|
||||
if (count === 0) {
|
||||
this.logger.debug(`Found empty tag: ${tag.id} - ${tag.value}`);
|
||||
ids.add(tag.id);
|
||||
}
|
||||
}
|
||||
|
||||
if (ids.size > 0) {
|
||||
await manager.delete(TagEntity, { id: In([...ids]) });
|
||||
this.logger.log(`Deleted ${ids.size} empty tags`);
|
||||
const ids = result.filter(({ count }) => count === 0).map(({ id }) => id);
|
||||
if (ids.length > 0) {
|
||||
await this.db.deleteFrom('tags').where('id', 'in', ids).execute();
|
||||
this.logger.log(`Deleted ${ids.length} empty tags`);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async save(partial: Partial<TagEntity>): Promise<TagEntity> {
|
||||
const { id } = await this.repository.save(partial);
|
||||
return this.repository.findOneOrFail({ where: { id } });
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user