fix: album asset sync must sync new assets in a shared album (#20655)

This commit is contained in:
Zack Pollard
2025-08-05 17:53:51 +01:00
committed by GitHub
parent 09a5963eee
commit 02381343ff
19 changed files with 928 additions and 453 deletions

View File

@@ -1,5 +1,5 @@
import { Injectable } from '@nestjs/common';
import { Insertable, Kysely } from 'kysely';
import { Insertable, Kysely, sql } from 'kysely';
import { InjectKysely } from 'nestjs-kysely';
import { DummyValue, GenerateSql } from 'src/decorators';
import { SyncEntityType } from 'src/enum';
@@ -39,4 +39,13 @@ export class SyncCheckpointRepository {
.$if(!!types, (qb) => qb.where('type', 'in', types!))
.execute();
}
@GenerateSql()
getNow() {
return this.db
.selectNoFrom((eb) => [
eb.fn<string>('immich_uuid_v7', [sql.raw<Date>("now() - interval '1 millisecond'")]).as('nowId'),
])
.executeTakeFirstOrThrow();
}
}

View File

@@ -1,5 +1,5 @@
import { Injectable } from '@nestjs/common';
import { Kysely, SelectQueryBuilder, sql } from 'kysely';
import { Kysely, SelectQueryBuilder } from 'kysely';
import { InjectKysely } from 'nestjs-kysely';
import { columns } from 'src/database';
import { DummyValue, GenerateSql } from 'src/decorators';
@@ -33,6 +33,11 @@ type UpsertTables =
| 'user_metadata'
| 'asset_face';
export type SyncQueryOptions = {
nowId: string;
userId: string;
};
@Injectable()
export class SyncRepository {
album: AlbumSync;
@@ -81,21 +86,21 @@ export class SyncRepository {
class BaseSync {
constructor(protected db: Kysely<DB>) {}
protected auditTableFilters(ack?: SyncAck) {
protected auditTableFilters(nowId: string, ack?: SyncAck) {
return <T extends keyof Pick<DB, AuditTables>, D>(qb: SelectQueryBuilder<DB, T, D>) => {
const builder = qb as SelectQueryBuilder<DB, AuditTables, D>;
return builder
.where('deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('id', '<', nowId)
.$if(!!ack, (qb) => qb.where('id', '>', ack!.updateId))
.orderBy('id', 'asc') as SelectQueryBuilder<DB, T, D>;
};
}
protected upsertTableFilters(ack?: SyncAck) {
protected upsertTableFilters(nowId: string, ack?: SyncAck) {
return <T extends keyof Pick<DB, UpsertTables>, D>(qb: SelectQueryBuilder<DB, T, D>) => {
const builder = qb as SelectQueryBuilder<DB, UpsertTables, D>;
return builder
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('updateId', '>', ack!.updateId))
.orderBy('updateId', 'asc') as SelectQueryBuilder<DB, T, D>;
};
@@ -103,34 +108,34 @@ class BaseSync {
}
class AlbumSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] })
getCreatedAfter(userId: string, afterCreateId?: string) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID] })
getCreatedAfter({ nowId, userId }: SyncQueryOptions, afterCreateId?: string) {
return this.db
.selectFrom('album_user')
.select(['albumsId as id', 'createId'])
.where('usersId', '=', userId)
.$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!))
.where('createdAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('createId', '<', nowId)
.orderBy('createId', 'asc')
.execute();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('album_audit')
.select(['id', 'albumId'])
.where('userId', '=', userId)
.$call(this.auditTableFilters(ack))
.$call(this.auditTableFilters(nowId, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('album')
.distinctOn(['album.id', 'album.updateId'])
.where('album.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('album.updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('album.updateId', '>', ack!.updateId))
.orderBy('album.updateId', 'asc')
.leftJoin('album_user as album_users', 'album.id', 'album_users.albumsId')
@@ -152,29 +157,33 @@ class AlbumSync extends BaseSync {
}
class AlbumAssetSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
@GenerateSql({
params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db
.selectFrom('asset')
.innerJoin('album_asset', 'album_asset.assetsId', 'asset.id')
.selectFrom('album_asset')
.innerJoin('asset', 'asset.id', 'album_asset.assetsId')
.select(columns.syncAsset)
.select('asset.updateId')
.select('album_asset.updateId')
.where('album_asset.albumsId', '=', albumId)
.where('asset.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('asset.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('asset.updateId', '>=', afterUpdateId!))
.orderBy('asset.updateId', 'asc')
.where('album_asset.updateId', '<', nowId)
.where('album_asset.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('album_asset.updateId', '>=', afterUpdateId!))
.orderBy('album_asset.updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpdates({ nowId, userId }: SyncQueryOptions, albumToAssetAck: SyncAck, ack?: SyncAck) {
return this.db
.selectFrom('asset')
.innerJoin('album_asset', 'album_asset.assetsId', 'asset.id')
.select(columns.syncAsset)
.select('asset.updateId')
.where('asset.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('asset.updateId', '<', nowId)
.where('album_asset.updateId', '<=', albumToAssetAck.updateId) // Ensure we only send updates for assets that the client already knows about
.$if(!!ack, (qb) => qb.where('asset.updateId', '>', ack!.updateId))
.orderBy('asset.updateId', 'asc')
.innerJoin('album', 'album.id', 'album_asset.albumsId')
@@ -182,32 +191,52 @@ class AlbumAssetSync extends BaseSync {
.where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)]))
.stream();
}
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getCreates({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('album_asset')
.select('album_asset.updateId')
.innerJoin('asset', 'asset.id', 'album_asset.assetsId')
.select(columns.syncAsset)
.innerJoin('album', 'album.id', 'album_asset.albumsId')
.leftJoin('album_user', 'album_user.albumsId', 'album_asset.albumsId')
.where('album_asset.updateId', '<', nowId)
.where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)]))
.$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId))
.orderBy('album_asset.updateId', 'asc')
.stream();
}
}
class AlbumAssetExifSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
@GenerateSql({
params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db
.selectFrom('asset_exif')
.innerJoin('album_asset', 'album_asset.assetsId', 'asset_exif.assetId')
.selectFrom('album_asset')
.innerJoin('asset_exif', 'asset_exif.assetId', 'album_asset.assetsId')
.select(columns.syncAssetExif)
.select('asset_exif.updateId')
.select('album_asset.updateId')
.where('album_asset.albumsId', '=', albumId)
.where('asset_exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('asset_exif.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('asset_exif.updateId', '>=', afterUpdateId!))
.orderBy('asset_exif.updateId', 'asc')
.where('album_asset.updateId', '<', nowId)
.where('album_asset.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('album_asset.updateId', '>=', afterUpdateId!))
.orderBy('album_asset.updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpdates({ nowId, userId }: SyncQueryOptions, albumToAssetAck: SyncAck, ack?: SyncAck) {
return this.db
.selectFrom('asset_exif')
.innerJoin('album_asset', 'album_asset.assetsId', 'asset_exif.assetId')
.select(columns.syncAssetExif)
.select('asset_exif.updateId')
.where('asset_exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('album_asset.updateId', '<=', albumToAssetAck.updateId) // Ensure we only send exif updates for assets that the client already knows about
.where('asset_exif.updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('asset_exif.updateId', '>', ack!.updateId))
.orderBy('asset_exif.updateId', 'asc')
.innerJoin('album', 'album.id', 'album_asset.albumsId')
@@ -215,24 +244,43 @@ class AlbumAssetExifSync extends BaseSync {
.where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)]))
.stream();
}
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getCreates({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('album_asset')
.select('album_asset.updateId')
.innerJoin('asset_exif', 'asset_exif.assetId', 'album_asset.assetsId')
.select(columns.syncAssetExif)
.innerJoin('album', 'album.id', 'album_asset.albumsId')
.leftJoin('album_user', 'album_user.albumsId', 'album_asset.albumsId')
.where('album_asset.updateId', '<', nowId)
.where((eb) => eb.or([eb('album.ownerId', '=', userId), eb('album_user.usersId', '=', userId)]))
.$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId))
.orderBy('album_asset.updateId', 'asc')
.stream();
}
}
class AlbumToAssetSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
@GenerateSql({
params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db
.selectFrom('album_asset as album_assets')
.select(['album_assets.assetsId as assetId', 'album_assets.albumsId as albumId', 'album_assets.updateId'])
.where('album_assets.albumsId', '=', albumId)
.where('album_assets.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('album_assets.updateId', '<', nowId)
.where('album_assets.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('album_assets.updateId', '>=', afterUpdateId!))
.orderBy('album_assets.updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('album_asset_audit')
.select(['id', 'assetId', 'albumId'])
@@ -254,16 +302,16 @@ class AlbumToAssetSync extends BaseSync {
),
),
)
.$call(this.auditTableFilters(ack))
.$call(this.auditTableFilters(nowId, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('album_asset')
.select(['album_asset.assetsId as assetId', 'album_asset.albumsId as albumId', 'album_asset.updateId'])
.where('album_asset.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('album_asset.updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('album_asset.updateId', '>', ack!.updateId))
.orderBy('album_asset.updateId', 'asc')
.innerJoin('album', 'album.id', 'album_asset.albumsId')
@@ -274,22 +322,25 @@ class AlbumToAssetSync extends BaseSync {
}
class AlbumUserSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
@GenerateSql({
params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill({ nowId }: SyncQueryOptions, albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db
.selectFrom('album_user')
.select(columns.syncAlbumUser)
.select('album_user.updateId')
.where('albumsId', '=', albumId)
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('updateId', '<', nowId)
.where('updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!))
.orderBy('updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('album_user_audit')
.select(['id', 'userId', 'albumId'])
@@ -311,17 +362,17 @@ class AlbumUserSync extends BaseSync {
),
),
)
.$call(this.auditTableFilters(ack))
.$call(this.auditTableFilters(nowId, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('album_user')
.select(columns.syncAlbumUser)
.select('album_user.updateId')
.where('album_user.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('album_user.updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('album_user.updateId', '>', ack!.updateId))
.orderBy('album_user.updateId', 'asc')
.where((eb) =>
@@ -347,53 +398,53 @@ class AlbumUserSync extends BaseSync {
}
class AssetSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('asset_audit')
.select(['id', 'assetId'])
.where('ownerId', '=', userId)
.$call(this.auditTableFilters(ack))
.$call(this.auditTableFilters(nowId, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('asset')
.select(columns.syncAsset)
.select('asset.updateId')
.where('ownerId', '=', userId)
.$call(this.upsertTableFilters(ack))
.$call(this.upsertTableFilters(nowId, ack))
.stream();
}
}
class AuthUserSync extends BaseSync {
@GenerateSql({ params: [], stream: true })
getUpserts(ack?: SyncAck) {
getUpserts({ nowId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('user')
.select(columns.syncUser)
.select(['isAdmin', 'pinCode', 'oauthId', 'storageLabel', 'quotaSizeInBytes', 'quotaUsageInBytes'])
.$call(this.upsertTableFilters(ack))
.$call(this.upsertTableFilters(nowId, ack))
.stream();
}
}
class PersonSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('person_audit')
.select(['id', 'personId'])
.where('ownerId', '=', userId)
.$call(this.auditTableFilters(ack))
.$call(this.auditTableFilters(nowId, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('person')
.select([
@@ -410,27 +461,27 @@ class PersonSync extends BaseSync {
'faceAssetId',
])
.where('ownerId', '=', userId)
.$call(this.upsertTableFilters(ack))
.$call(this.upsertTableFilters(nowId, ack))
.stream();
}
}
class AssetFaceSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('asset_face_audit')
.select(['asset_face_audit.id', 'assetFaceId'])
.orderBy('asset_face_audit.id', 'asc')
.leftJoin('asset', 'asset.id', 'asset_face_audit.assetId')
.where('asset.ownerId', '=', userId)
.where('asset_face_audit.deletedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('asset_face_audit.id', '<', nowId)
.$if(!!ack, (qb) => qb.where('asset_face_audit.id', '>', ack!.updateId))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('asset_face')
.select([
@@ -446,7 +497,7 @@ class AssetFaceSync extends BaseSync {
'sourceType',
'asset_face.updateId',
])
.where('asset_face.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('asset_face.updateId', '<', nowId)
.$if(!!ack, (qb) => qb.where('asset_face.updateId', '>', ack!.updateId))
.orderBy('asset_face.updateId', 'asc')
.leftJoin('asset', 'asset.id', 'asset_face.assetId')
@@ -456,31 +507,31 @@ class AssetFaceSync extends BaseSync {
}
class AssetExifSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('asset_exif')
.select(columns.syncAssetExif)
.select('asset_exif.updateId')
.where('assetId', 'in', (eb) => eb.selectFrom('asset').select('id').where('ownerId', '=', userId))
.$call(this.upsertTableFilters(ack))
.$call(this.upsertTableFilters(nowId, ack))
.stream();
}
}
class MemorySync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('memory_audit')
.select(['id', 'memoryId'])
.where('userId', '=', userId)
.$call(this.auditTableFilters(ack))
.$call(this.auditTableFilters(nowId, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('memory')
.select([
@@ -499,97 +550,105 @@ class MemorySync extends BaseSync {
])
.select('updateId')
.where('ownerId', '=', userId)
.$call(this.upsertTableFilters(ack))
.$call(this.upsertTableFilters(nowId, ack))
.stream();
}
}
class MemoryToAssetSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('memory_asset_audit')
.select(['id', 'memoryId', 'assetId'])
.where('memoryId', 'in', (eb) => eb.selectFrom('memory').select('id').where('ownerId', '=', userId))
.$call(this.auditTableFilters(ack))
.$call(this.auditTableFilters(nowId, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('memory_asset')
.select(['memoriesId as memoryId', 'assetsId as assetId'])
.select('updateId')
.where('memoriesId', 'in', (eb) => eb.selectFrom('memory').select('id').where('ownerId', '=', userId))
.$call(this.upsertTableFilters(ack))
.$call(this.upsertTableFilters(nowId, ack))
.stream();
}
}
class PartnerSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] })
getCreatedAfter(userId: string, afterCreateId?: string) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }] })
getCreatedAfter({ nowId, userId }: SyncQueryOptions, afterCreateId?: string) {
return this.db
.selectFrom('partner')
.select(['sharedById', 'createId'])
.where('sharedWithId', '=', userId)
.$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!))
.where('createdAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('createId', '<', nowId)
.orderBy('partner.createId', 'asc')
.execute();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('partner_audit')
.select(['id', 'sharedById', 'sharedWithId'])
.where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)]))
.$call(this.auditTableFilters(ack))
.$call(this.auditTableFilters(nowId, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('partner')
.select(['sharedById', 'sharedWithId', 'inTimeline', 'updateId'])
.where((eb) => eb.or([eb('sharedById', '=', userId), eb('sharedWithId', '=', userId)]))
.$call(this.upsertTableFilters(ack))
.$call(this.upsertTableFilters(nowId, ack))
.stream();
}
}
class PartnerAssetsSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
@GenerateSql({
params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill(
{ nowId }: SyncQueryOptions,
partnerId: string,
afterUpdateId: string | undefined,
beforeUpdateId: string,
) {
return this.db
.selectFrom('asset')
.select(columns.syncAsset)
.select('asset.updateId')
.where('ownerId', '=', partnerId)
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('updateId', '<', nowId)
.where('updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!))
.orderBy('updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('asset_audit')
.select(['id', 'assetId'])
.where('ownerId', 'in', (eb) =>
eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId),
)
.$call(this.auditTableFilters(ack))
.$call(this.auditTableFilters(nowId, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('asset')
.select(columns.syncAsset)
@@ -597,29 +656,37 @@ class PartnerAssetsSync extends BaseSync {
.where('ownerId', 'in', (eb) =>
eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId),
)
.$call(this.upsertTableFilters(ack))
.$call(this.upsertTableFilters(nowId, ack))
.stream();
}
}
class PartnerAssetExifsSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
@GenerateSql({
params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill(
{ nowId }: SyncQueryOptions,
partnerId: string,
afterUpdateId: string | undefined,
beforeUpdateId: string,
) {
return this.db
.selectFrom('asset_exif')
.select(columns.syncAssetExif)
.select('asset_exif.updateId')
.innerJoin('asset', 'asset.id', 'asset_exif.assetId')
.where('asset.ownerId', '=', partnerId)
.where('asset_exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('asset_exif.updateId', '<', nowId)
.where('asset_exif.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('asset_exif.updateId', '>=', afterUpdateId!))
.orderBy('asset_exif.updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('asset_exif')
.select(columns.syncAssetExif)
@@ -632,61 +699,69 @@ class PartnerAssetExifsSync extends BaseSync {
eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId),
),
)
.$call(this.upsertTableFilters(ack))
.$call(this.upsertTableFilters(nowId, ack))
.stream();
}
}
class StackSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('stack_audit')
.select(['id', 'stackId'])
.where('userId', '=', userId)
.$call(this.auditTableFilters(ack))
.$call(this.auditTableFilters(nowId, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('stack')
.select(columns.syncStack)
.select('updateId')
.where('ownerId', '=', userId)
.$call(this.upsertTableFilters(ack))
.$call(this.upsertTableFilters(nowId, ack))
.stream();
}
}
class PartnerStackSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('stack_audit')
.select(['id', 'stackId'])
.where('userId', 'in', (eb) => eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId))
.$call(this.auditTableFilters(ack))
.$call(this.auditTableFilters(nowId, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
@GenerateSql({
params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }, DummyValue.UUID, DummyValue.UUID, DummyValue.UUID],
stream: true,
})
getBackfill(
{ nowId }: SyncQueryOptions,
partnerId: string,
afterUpdateId: string | undefined,
beforeUpdateId: string,
) {
return this.db
.selectFrom('stack')
.select(columns.syncStack)
.select('updateId')
.where('ownerId', '=', partnerId)
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('updateId', '<', nowId)
.where('updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!))
.orderBy('updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('stack')
.select(columns.syncStack)
@@ -694,41 +769,41 @@ class PartnerStackSync extends BaseSync {
.where('ownerId', 'in', (eb) =>
eb.selectFrom('partner').select(['sharedById']).where('sharedWithId', '=', userId),
)
.$call(this.upsertTableFilters(ack))
.$call(this.upsertTableFilters(nowId, ack))
.stream();
}
}
class UserSync extends BaseSync {
@GenerateSql({ params: [], stream: true })
getDeletes(ack?: SyncAck) {
return this.db.selectFrom('user_audit').select(['id', 'userId']).$call(this.auditTableFilters(ack)).stream();
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId }: SyncQueryOptions, ack?: SyncAck) {
return this.db.selectFrom('user_audit').select(['id', 'userId']).$call(this.auditTableFilters(nowId, ack)).stream();
}
@GenerateSql({ params: [], stream: true })
getUpserts(ack?: SyncAck) {
return this.db.selectFrom('user').select(columns.syncUser).$call(this.upsertTableFilters(ack)).stream();
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId }: SyncQueryOptions, ack?: SyncAck) {
return this.db.selectFrom('user').select(columns.syncUser).$call(this.upsertTableFilters(nowId, ack)).stream();
}
}
class UserMetadataSync extends BaseSync {
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getDeletes(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getDeletes({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('user_metadata_audit')
.select(['id', 'userId', 'key'])
.where('userId', '=', userId)
.$call(this.auditTableFilters(ack))
.$call(this.auditTableFilters(nowId, ack))
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getUpserts(userId: string, ack?: SyncAck) {
@GenerateSql({ params: [{ nowId: DummyValue.UUID, userId: DummyValue.UUID }], stream: true })
getUpserts({ nowId, userId }: SyncQueryOptions, ack?: SyncAck) {
return this.db
.selectFrom('user_metadata')
.select(['userId', 'key', 'value', 'updateId'])
.where('userId', '=', userId)
.$call(this.upsertTableFilters(ack))
.$call(this.upsertTableFilters(nowId, ack))
.stream();
}
}