mirror of
https://github.com/immich-app/immich.git
synced 2026-02-04 17:01:13 +03:00
feat(server,web): add websocket events for album updates
and person face changes
This commit is contained in:
@@ -48,10 +48,15 @@ type EventMap = {
|
||||
'config.validate': [{ newConfig: SystemConfig; oldConfig: SystemConfig }];
|
||||
|
||||
// album events
|
||||
'album.update': [{ id: string; recipientId: string }];
|
||||
'album.update': [
|
||||
{ id: string; recipientId: string[]; assetId: string[]; userId: string; status: 'added' | 'removed' },
|
||||
];
|
||||
'album.invite': [{ id: string; userId: string }];
|
||||
|
||||
// asset events
|
||||
'asset.person': [
|
||||
{ assetId: string; userId: string; personId: string | undefined; status: 'created' | 'removed' | 'removed_soft' },
|
||||
];
|
||||
'asset.tag': [{ assetId: string }];
|
||||
'asset.untag': [{ assetId: string }];
|
||||
'asset.hide': [{ assetId: string; userId: string }];
|
||||
@@ -97,6 +102,8 @@ export type ArgsOf<T extends EmitEvent> = EventMap[T];
|
||||
export interface ClientEventMap {
|
||||
on_upload_success: [AssetResponseDto];
|
||||
on_user_delete: [string];
|
||||
on_album_update: [{ albumId: string; assetId: string[]; status: 'added' | 'removed' }];
|
||||
on_asset_person: [{ assetId: string; personId: string | undefined; status: 'created' | 'removed' | 'removed_soft' }];
|
||||
on_asset_delete: [string];
|
||||
on_asset_trash: [string[]];
|
||||
on_asset_update: [AssetResponseDto];
|
||||
|
||||
@@ -484,6 +484,15 @@ export class PersonRepository {
|
||||
.executeTakeFirst();
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [DummyValue.UUID] })
|
||||
async getAssetPersonByFaceId(id: string) {
|
||||
return this.db
|
||||
.selectFrom('asset_faces')
|
||||
.select(['asset_faces.assetId', 'asset_faces.personId'])
|
||||
.where('asset_faces.id', '=', id)
|
||||
.executeTakeFirst();
|
||||
}
|
||||
|
||||
@GenerateSql()
|
||||
async getLatestFaceDate(): Promise<string | undefined> {
|
||||
const result = (await this.db
|
||||
|
||||
@@ -178,9 +178,13 @@ export class AlbumService extends BaseService {
|
||||
(userId) => userId !== auth.user.id,
|
||||
);
|
||||
|
||||
for (const recipientId of allUsersExceptUs) {
|
||||
await this.eventRepository.emit('album.update', { id, recipientId });
|
||||
}
|
||||
await this.eventRepository.emit('album.update', {
|
||||
id,
|
||||
userId: auth.user.id,
|
||||
assetId: dto.ids,
|
||||
recipientId: allUsersExceptUs,
|
||||
status: 'added',
|
||||
});
|
||||
}
|
||||
|
||||
return results;
|
||||
@@ -200,7 +204,16 @@ export class AlbumService extends BaseService {
|
||||
if (removedIds.length > 0 && album.albumThumbnailAssetId && removedIds.includes(album.albumThumbnailAssetId)) {
|
||||
await this.albumRepository.updateThumbnails();
|
||||
}
|
||||
|
||||
const allUsersExceptUs = [...album.albumUsers.map(({ user }) => user.id), album.owner.id].filter(
|
||||
(userId) => userId !== auth.user.id,
|
||||
);
|
||||
await this.eventRepository.emit('album.update', {
|
||||
id,
|
||||
userId: auth.user.id,
|
||||
assetId: dto.ids,
|
||||
recipientId: allUsersExceptUs,
|
||||
status: 'removed',
|
||||
});
|
||||
return results;
|
||||
}
|
||||
|
||||
|
||||
@@ -128,6 +128,11 @@ export class NotificationService extends BaseService {
|
||||
}
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'asset.person' })
|
||||
onAssetPerson({ assetId, userId, personId, status }: ArgOf<'asset.person'>) {
|
||||
this.eventRepository.clientSend('on_asset_person', userId, { assetId, personId, status });
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'asset.hide' })
|
||||
onAssetHide({ assetId, userId }: ArgOf<'asset.hide'>) {
|
||||
this.eventRepository.clientSend('on_asset_hidden', userId, assetId);
|
||||
@@ -198,12 +203,18 @@ export class NotificationService extends BaseService {
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'album.update' })
|
||||
async onAlbumUpdate({ id, recipientId }: ArgOf<'album.update'>) {
|
||||
await this.jobRepository.removeJob(JobName.NOTIFY_ALBUM_UPDATE, `${id}/${recipientId}`);
|
||||
await this.jobRepository.queue({
|
||||
name: JobName.NOTIFY_ALBUM_UPDATE,
|
||||
data: { id, recipientId, delay: NotificationService.albumUpdateEmailDelayMs },
|
||||
});
|
||||
async onAlbumUpdate({ id, recipientId, userId, assetId, status }: ArgOf<'album.update'>) {
|
||||
if (status === 'added') {
|
||||
for (const recipient of recipientId) {
|
||||
await this.jobRepository.removeJob(JobName.NOTIFY_ALBUM_UPDATE, `${id}/${recipientId}`);
|
||||
await this.jobRepository.queue({
|
||||
name: JobName.NOTIFY_ALBUM_UPDATE,
|
||||
data: { id, recipientId: recipient, delay: NotificationService.albumUpdateEmailDelayMs },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
this.eventRepository.clientSend('on_album_update', userId, { albumId: id, assetId, status });
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'album.invite' })
|
||||
|
||||
@@ -627,11 +627,28 @@ export class PersonService extends BaseService {
|
||||
boundingBoxY2: dto.y + dto.height,
|
||||
sourceType: SourceType.MANUAL,
|
||||
});
|
||||
|
||||
await this.eventRepository.emit('asset.person', {
|
||||
assetId: dto.assetId,
|
||||
userId: auth.user.id,
|
||||
personId: dto.personId,
|
||||
status: 'created',
|
||||
});
|
||||
}
|
||||
|
||||
async deleteFace(auth: AuthDto, id: string, dto: AssetFaceDeleteDto): Promise<void> {
|
||||
await this.requireAccess({ auth, permission: Permission.FACE_DELETE, ids: [id] });
|
||||
const assetPerson = await this.personRepository.getAssetPersonByFaceId(id);
|
||||
if (!assetPerson) {
|
||||
throw new NotFoundException('Asset face not found');
|
||||
}
|
||||
|
||||
return dto.force ? this.personRepository.deleteAssetFace(id) : this.personRepository.softDeleteAssetFaces(id);
|
||||
await (dto.force ? this.personRepository.deleteAssetFace(id) : this.personRepository.softDeleteAssetFaces(id));
|
||||
await this.eventRepository.emit('asset.person', {
|
||||
userId: auth.user.id,
|
||||
assetId: assetPerson.assetId,
|
||||
personId: assetPerson.personId ?? undefined,
|
||||
status: dto.force ? 'removed' : 'removed_soft',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,8 @@ export function updateObject(target: any, source: any): boolean {
|
||||
}
|
||||
const isDate = target[key] instanceof Date;
|
||||
if (typeof target[key] === 'object' && !isDate) {
|
||||
updated = updated || updateObject(target[key], source[key]);
|
||||
const updatedChild = updateObject(target[key], source[key]);
|
||||
updated = updated || updatedChild;
|
||||
} else {
|
||||
if (target[key] !== source[key]) {
|
||||
target[key] = source[key];
|
||||
|
||||
@@ -1,85 +1,219 @@
|
||||
import { authManager } from '$lib/managers/auth-manager.svelte';
|
||||
import type { TimelineManager } from '$lib/managers/timeline-manager/timeline-manager.svelte';
|
||||
import type { PendingChange, TimelineAsset } from '$lib/managers/timeline-manager/types';
|
||||
import { websocketEvents } from '$lib/stores/websocket';
|
||||
import { toTimelineAsset } from '$lib/utils/timeline-util';
|
||||
import { throttle } from 'lodash-es';
|
||||
import { getAllAlbums, getAssetInfo, type AssetResponseDto } from '@immich/sdk';
|
||||
import type { Unsubscriber } from 'svelte/store';
|
||||
|
||||
const PROCESS_DELAY_MS = 2500;
|
||||
|
||||
export class WebsocketSupport {
|
||||
#pendingChanges: PendingChange[] = [];
|
||||
readonly #timelineManager: TimelineManager;
|
||||
#unsubscribers: Unsubscriber[] = [];
|
||||
#timelineManager: TimelineManager;
|
||||
|
||||
#processPendingChanges = throttle(() => {
|
||||
const { add, update, remove } = this.#getPendingChangeBatches();
|
||||
if (add.length > 0) {
|
||||
this.#timelineManager.addAssets(add);
|
||||
}
|
||||
if (update.length > 0) {
|
||||
this.#timelineManager.updateAssets(update);
|
||||
}
|
||||
if (remove.length > 0) {
|
||||
this.#timelineManager.removeAssets(remove);
|
||||
}
|
||||
this.#pendingChanges = [];
|
||||
}, 2500);
|
||||
#pendingUpdates: {
|
||||
updated: AssetResponseDto[];
|
||||
trashed: string[];
|
||||
deleted: string[];
|
||||
personed: { assetId: string; personId: string | undefined; status: 'created' | 'removed' | 'removed_soft' }[];
|
||||
album: { albumId: string; assetId: string[]; status: 'added' | 'removed' }[];
|
||||
} = {
|
||||
updated: [],
|
||||
trashed: [],
|
||||
deleted: [],
|
||||
personed: [],
|
||||
album: [],
|
||||
};
|
||||
#pendingCount() {
|
||||
return (
|
||||
this.#pendingUpdates.updated.length +
|
||||
this.#pendingUpdates.trashed.length +
|
||||
this.#pendingUpdates.deleted.length +
|
||||
this.#pendingUpdates.personed.length +
|
||||
this.#pendingUpdates.album.length
|
||||
);
|
||||
}
|
||||
#processTimeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
#isProcessing = false;
|
||||
|
||||
constructor(timeineManager: TimelineManager) {
|
||||
this.#timelineManager = timeineManager;
|
||||
constructor(timelineManager: TimelineManager) {
|
||||
this.#timelineManager = timelineManager;
|
||||
}
|
||||
|
||||
connectWebsocketEvents() {
|
||||
this.#unsubscribers.push(
|
||||
websocketEvents.on('on_upload_success', (asset) =>
|
||||
this.#addPendingChanges({ type: 'add', values: [toTimelineAsset(asset)] }),
|
||||
),
|
||||
websocketEvents.on('on_asset_trash', (ids) => this.#addPendingChanges({ type: 'trash', values: ids })),
|
||||
websocketEvents.on('on_asset_update', (asset) =>
|
||||
this.#addPendingChanges({ type: 'update', values: [toTimelineAsset(asset)] }),
|
||||
),
|
||||
websocketEvents.on('on_asset_delete', (id: string) => this.#addPendingChanges({ type: 'delete', values: [id] })),
|
||||
websocketEvents.on('on_asset_trash', (ids) => {
|
||||
this.#pendingUpdates.trashed.push(...ids);
|
||||
this.#scheduleProcessing();
|
||||
}),
|
||||
websocketEvents.on('on_asset_person', (data) => {
|
||||
this.#pendingUpdates.personed.push(data);
|
||||
this.#scheduleProcessing();
|
||||
}),
|
||||
// uploads and tagging are handled by this event
|
||||
websocketEvents.on('on_asset_update', (asset) => {
|
||||
this.#pendingUpdates.updated.push(asset);
|
||||
this.#scheduleProcessing();
|
||||
}),
|
||||
websocketEvents.on('on_album_update', (data) => {
|
||||
this.#pendingUpdates.album.push(data);
|
||||
this.#scheduleProcessing();
|
||||
}),
|
||||
websocketEvents.on('on_asset_trash', (ids) => {
|
||||
this.#pendingUpdates.trashed.push(...ids);
|
||||
this.#scheduleProcessing();
|
||||
}),
|
||||
websocketEvents.on('on_asset_delete', (ids) => {
|
||||
this.#pendingUpdates.deleted.push(ids);
|
||||
this.#scheduleProcessing();
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
disconnectWebsocketEvents() {
|
||||
this.#cleanup();
|
||||
}
|
||||
|
||||
#cleanup() {
|
||||
for (const unsubscribe of this.#unsubscribers) {
|
||||
unsubscribe();
|
||||
}
|
||||
this.#unsubscribers = [];
|
||||
this.#cancelScheduledProcessing();
|
||||
}
|
||||
|
||||
#addPendingChanges(...changes: PendingChange[]) {
|
||||
this.#pendingChanges.push(...changes);
|
||||
this.#processPendingChanges();
|
||||
#cancelScheduledProcessing() {
|
||||
if (this.#processTimeoutId) {
|
||||
clearTimeout(this.#processTimeoutId);
|
||||
this.#processTimeoutId = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
#getPendingChangeBatches() {
|
||||
const batch: {
|
||||
add: TimelineAsset[];
|
||||
update: TimelineAsset[];
|
||||
remove: string[];
|
||||
} = {
|
||||
add: [],
|
||||
update: [],
|
||||
remove: [],
|
||||
#scheduleProcessing() {
|
||||
if (this.#processTimeoutId) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.#processTimeoutId = setTimeout(() => {
|
||||
this.#processTimeoutId = undefined;
|
||||
void this.#processPendingChanges();
|
||||
}, PROCESS_DELAY_MS);
|
||||
}
|
||||
|
||||
async #processPendingChanges() {
|
||||
if (this.#isProcessing || this.#pendingCount() === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.#isProcessing = true;
|
||||
|
||||
try {
|
||||
await this.#process();
|
||||
} finally {
|
||||
this.#isProcessing = false;
|
||||
|
||||
if (this.#pendingCount() > 0) {
|
||||
this.#scheduleProcessing();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async #process() {
|
||||
const pendingUpdates = this.#pendingUpdates;
|
||||
this.#pendingUpdates = {
|
||||
updated: [],
|
||||
trashed: [],
|
||||
deleted: [],
|
||||
personed: [],
|
||||
album: [],
|
||||
};
|
||||
for (const { type, values } of this.#pendingChanges) {
|
||||
switch (type) {
|
||||
case 'add': {
|
||||
batch.add.push(...values);
|
||||
break;
|
||||
}
|
||||
case 'update': {
|
||||
batch.update.push(...values);
|
||||
break;
|
||||
}
|
||||
case 'delete':
|
||||
case 'trash': {
|
||||
batch.remove.push(...values);
|
||||
break;
|
||||
await this.#handleUpdatedAssets(pendingUpdates.updated);
|
||||
await this.#handleUpdatedAssetsPerson(pendingUpdates.personed);
|
||||
await this.#handleUpdatedAssetsAlbum(pendingUpdates.album);
|
||||
await this.#handleUpdatedAssetsTrashed(pendingUpdates.trashed);
|
||||
this.#timelineManager.removeAssets(pendingUpdates.deleted);
|
||||
}
|
||||
|
||||
async #handleUpdatedAssets(assets: AssetResponseDto[]) {
|
||||
const prefilteredAssets = assets.filter((asset) => !this.#timelineManager.isExcluded(toTimelineAsset(asset)));
|
||||
if (!this.#timelineManager.options.albumId) {
|
||||
// also check tags
|
||||
if (!this.#timelineManager.options.tagId) {
|
||||
return this.#timelineManager.addAssets(prefilteredAssets.map((asset) => toTimelineAsset(asset)));
|
||||
}
|
||||
for (const asset of prefilteredAssets) {
|
||||
if (asset.tags?.some((tag) => tag.id === this.#timelineManager.options.tagId)) {
|
||||
this.#timelineManager.addAssets([toTimelineAsset(asset)]);
|
||||
} else {
|
||||
this.#timelineManager.removeAssets([asset.id]);
|
||||
}
|
||||
}
|
||||
}
|
||||
return batch;
|
||||
const matchingAssets = [];
|
||||
for (const asset of prefilteredAssets) {
|
||||
const albums = await getAllAlbums({ assetId: asset.id });
|
||||
if (albums.some((album) => album.id === this.#timelineManager.options.albumId)) {
|
||||
if (this.#timelineManager.options.tagId) {
|
||||
if (asset.tags?.some((tag) => tag.id === this.#timelineManager.options.tagId)) {
|
||||
matchingAssets.push(asset);
|
||||
} else {
|
||||
this.#timelineManager.removeAssets([asset.id]);
|
||||
}
|
||||
} else {
|
||||
matchingAssets.push(asset);
|
||||
}
|
||||
}
|
||||
}
|
||||
return this.#timelineManager.addAssets(matchingAssets.map((asset) => toTimelineAsset(asset)));
|
||||
}
|
||||
|
||||
async #handleUpdatedAssetsPerson(
|
||||
data: { assetId: string; personId: string | undefined; status: 'created' | 'removed' | 'removed_soft' }[],
|
||||
) {
|
||||
if (!this.#timelineManager.options.personId) {
|
||||
for (const { assetId } of data) {
|
||||
const asset = await getAssetInfo({ id: assetId, key: authManager.key });
|
||||
this.#timelineManager.addAssets([toTimelineAsset(asset)]);
|
||||
}
|
||||
return;
|
||||
}
|
||||
for (const { assetId, personId, status } of data) {
|
||||
if (status === 'created') {
|
||||
if (personId !== this.#timelineManager.options.personId) {
|
||||
continue;
|
||||
}
|
||||
const asset = await getAssetInfo({ id: assetId, key: authManager.key });
|
||||
this.#timelineManager.addAssets([toTimelineAsset(asset)]);
|
||||
} else if (personId === this.#timelineManager.options.personId) {
|
||||
this.#timelineManager.removeAssets([assetId]);
|
||||
}
|
||||
}
|
||||
}
|
||||
async #handleUpdatedAssetsAlbum(data: { albumId: string; assetId: string[]; status: 'added' | 'removed' }[]) {
|
||||
if (!this.#timelineManager.options.albumId) {
|
||||
return;
|
||||
}
|
||||
for (const { albumId, assetId, status } of data) {
|
||||
if (albumId !== this.#timelineManager.options.albumId) {
|
||||
continue;
|
||||
}
|
||||
if (status === 'added') {
|
||||
const assets = await Promise.all(assetId.map((id) => getAssetInfo({ id, key: authManager.key })));
|
||||
this.#timelineManager.addAssets(assets.map((element) => toTimelineAsset(element)));
|
||||
} else if (status === 'removed') {
|
||||
this.#timelineManager.removeAssets(assetId);
|
||||
}
|
||||
}
|
||||
}
|
||||
async #handleUpdatedAssetsTrashed(trashed: string[]) {
|
||||
if (this.#timelineManager.options.isTrashed === undefined) {
|
||||
return;
|
||||
}
|
||||
if (this.#timelineManager.options.isTrashed) {
|
||||
const assets = await Promise.all(trashed.map((id) => getAssetInfo({ id, key: authManager.key })));
|
||||
this.#timelineManager.addAssets(assets.map((element) => toTimelineAsset(element)));
|
||||
} else {
|
||||
this.#timelineManager.removeAssets(trashed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,9 +59,6 @@ export class TimelineManager {
|
||||
initTask = new CancellableTask(
|
||||
() => {
|
||||
this.isInitialized = true;
|
||||
if (this.#options.albumId || this.#options.personId) {
|
||||
return;
|
||||
}
|
||||
this.connect();
|
||||
},
|
||||
() => {
|
||||
@@ -189,6 +186,10 @@ export class TimelineManager {
|
||||
return this.#viewportHeight;
|
||||
}
|
||||
|
||||
get options() {
|
||||
return { ...this.#options };
|
||||
}
|
||||
|
||||
async *assetsIterator(options?: {
|
||||
startMonthGroup?: MonthGroup;
|
||||
startDayGroup?: DayGroup;
|
||||
|
||||
@@ -16,6 +16,15 @@ export interface ReleaseEvent {
|
||||
export interface Events {
|
||||
on_upload_success: (asset: AssetResponseDto) => void;
|
||||
on_user_delete: (id: string) => void;
|
||||
on_album_update: (data: { albumId: string; assetId: string[]; status: 'added' | 'removed' }) => void;
|
||||
on_asset_person: ({
|
||||
assetId,
|
||||
personId,
|
||||
}: {
|
||||
assetId: string;
|
||||
personId: string | undefined;
|
||||
status: 'created' | 'removed' | 'removed_soft';
|
||||
}) => void;
|
||||
on_asset_delete: (assetId: string) => void;
|
||||
on_asset_trash: (assetIds: string[]) => void;
|
||||
on_asset_update: (asset: AssetResponseDto) => void;
|
||||
|
||||
Reference in New Issue
Block a user