feat(server): add websocket events for activity changes

- Add 'activity.change' event to event repository
- Emit event when new activity (reaction/comment) is created
- Add notification handler to broadcast activity changes to relevant users
- Update frontend websocket types to include on_activity_change event
- Update tests to mock album repository calls
This commit is contained in:
Min Idzelis
2025-06-15 02:23:09 +00:00
parent 6f2f295cf3
commit a7559f0691
6 changed files with 54 additions and 13 deletions

View File

@@ -47,6 +47,9 @@ type EventMap = {
];
'config.validate': [{ newConfig: SystemConfig; oldConfig: SystemConfig }];
// activity events
'activity.change': [{ recipientId: string[]; userId: string; albumId: string; assetId: string | null }];
// album events
'album.update': [
{ id: string; recipientId: string[]; assetId: string[]; userId: string; status: 'added' | 'removed' },
@@ -54,6 +57,7 @@ type EventMap = {
'album.invite': [{ id: string; userId: string }];
// asset events
'asset.update': [{ assetIds: string[]; userId: string }];
'asset.person': [
{ assetId: string; userId: string; personId: string | undefined; status: 'created' | 'removed' | 'removed_soft' },
];
@@ -102,11 +106,12 @@ export type ArgsOf<T extends EmitEvent> = EventMap[T];
export interface ClientEventMap {
on_upload_success: [AssetResponseDto];
on_user_delete: [string];
on_activity_change: [{ albumId: string; assetId: string | null }];
on_album_update: [{ albumId: string; assetId: string[]; status: 'added' | 'removed' }];
on_asset_person: [{ assetId: string; personId: string | undefined; status: 'created' | 'removed' | 'removed_soft' }];
on_asset_delete: [string];
on_asset_trash: [string[]];
on_asset_update: [AssetResponseDto];
on_asset_update: [string[]];
on_asset_hidden: [string];
on_asset_restore: [string[]];
on_asset_stack_update: string[];

View File

@@ -1,6 +1,7 @@
import { BadRequestException } from '@nestjs/common';
import { ReactionType } from 'src/dtos/activity.dto';
import { ActivityService } from 'src/services/activity.service';
import { albumStub } from 'test/fixtures/album.stub';
import { factory, newUuid, newUuids } from 'test/small.factory';
import { newTestService, ServiceMocks } from 'test/utils';
@@ -79,6 +80,7 @@ describe(ActivityService.name, () => {
mocks.access.activity.checkCreateAccess.mockResolvedValue(new Set([albumId]));
mocks.activity.create.mockResolvedValue(activity);
mocks.album.getById.mockResolvedValue({ ...albumStub.empty, owner: factory.user({ id: userId }), albumUsers: [] });
await sut.create(factory.auth({ user: { id: userId } }), {
albumId,
@@ -115,6 +117,7 @@ describe(ActivityService.name, () => {
mocks.access.activity.checkCreateAccess.mockResolvedValue(new Set([albumId]));
mocks.activity.create.mockResolvedValue(activity);
mocks.activity.search.mockResolvedValue([]);
mocks.album.getById.mockResolvedValue({ ...albumStub.empty, owner: factory.user({ id: userId }), albumUsers: [] });
await sut.create(factory.auth({ user: { id: userId } }), { albumId, assetId, type: ReactionType.LIKE });

View File

@@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { BadRequestException, Injectable } from '@nestjs/common';
import { Activity } from 'src/database';
import {
ActivityCreateDto,
@@ -58,11 +58,24 @@ export class ActivityService extends BaseService {
}
if (!activity) {
const album = await this.albumRepository.getById(common.albumId, { withAssets: false });
if (!album) {
throw new BadRequestException('Album not found');
}
activity = await this.activityRepository.create({
...common,
isLiked: dto.type === ReactionType.LIKE,
comment: dto.comment,
});
const allUsersExceptUs = [...album.albumUsers.map(({ user }) => user.id), album.owner.id].filter(
(userId) => userId !== auth.user.id,
);
await this.eventRepository.emit('activity.change', {
recipientId: allUsersExceptUs,
userId: common.userId,
albumId: activity.albumId,
assetId: activity.assetId,
});
}
return { duplicate, value: mapActivity(activity) };

View File

@@ -4,7 +4,6 @@ import { AlbumUser } from 'src/database';
import { SystemConfigDto } from 'src/dtos/system-config.dto';
import { AssetFileType, JobName, JobStatus, UserMetadataKey } from 'src/enum';
import { NotificationService } from 'src/services/notification.service';
import { INotifyAlbumUpdateJob } from 'src/types';
import { albumStub } from 'test/fixtures/album.stub';
import { assetStub } from 'test/fixtures/asset.stub';
import { userStub } from 'test/fixtures/user.stub';
@@ -154,7 +153,7 @@ describe(NotificationService.name, () => {
describe('onAlbumUpdateEvent', () => {
it('should queue notify album update event', async () => {
await sut.onAlbumUpdate({ id: 'album', recipientId: '42' });
await sut.onAlbumUpdate({ id: 'album', recipientId: ['42'], userId: '', assetId: [], status: 'added' });
expect(mocks.job.queue).toHaveBeenCalledWith({
name: JobName.NOTIFY_ALBUM_UPDATE,
data: { id: 'album', recipientId: '42', delay: 300_000 },
@@ -499,7 +498,13 @@ describe(NotificationService.name, () => {
});
it('should add new recipients for new images if job is already queued', async () => {
await sut.onAlbumUpdate({ id: '1', recipientId: '2' } as INotifyAlbumUpdateJob);
await sut.onAlbumUpdate({
id: '1',
recipientId: ['2'],
userId: '',
assetId: [],
status: 'added',
});
expect(mocks.job.removeJob).toHaveBeenCalledWith(JobName.NOTIFY_ALBUM_UPDATE, '1/2');
expect(mocks.job.queue).toHaveBeenCalledWith({
name: JobName.NOTIFY_ALBUM_UPDATE,

View File

@@ -1,6 +1,5 @@
import { BadRequestException, Injectable } from '@nestjs/common';
import { OnEvent, OnJob } from 'src/decorators';
import { mapAsset } from 'src/dtos/asset-response.dto';
import { AuthDto } from 'src/dtos/auth.dto';
import {
mapNotification,
@@ -128,6 +127,15 @@ export class NotificationService extends BaseService {
}
}
@OnEvent({ name: 'activity.change' })
onActivityChange({ recipientId, assetId, userId, albumId }: ArgOf<'activity.change'>) {
for (const recipient of recipientId) {
this.eventRepository.clientSend('on_activity_change', recipient, { albumId, assetId });
}
this.eventRepository.clientSend('on_activity_change', userId, { albumId, assetId });
}
@OnEvent({ name: 'asset.person' })
onAssetPerson({ assetId, userId, personId, status }: ArgOf<'asset.person'>) {
this.eventRepository.clientSend('on_asset_person', userId, { assetId, personId, status });
@@ -158,16 +166,17 @@ export class NotificationService extends BaseService {
this.eventRepository.clientSend('on_asset_trash', userId, assetIds);
}
@OnEvent({ name: 'asset.update' })
onAssetUpdate({ assetIds, userId }: ArgOf<'asset.update'>) {
this.eventRepository.clientSend('on_asset_update', userId, assetIds);
}
@OnEvent({ name: 'asset.metadataExtracted' })
async onAssetMetadataExtracted({ assetId, userId, source }: ArgOf<'asset.metadataExtracted'>) {
onAssetMetadataExtracted({ assetId, userId, source }: ArgOf<'asset.metadataExtracted'>) {
if (source !== 'sidecar-write') {
return;
}
const [asset] = await this.assetRepository.getByIdsWithAllRelationsButStacks([assetId]);
if (asset) {
this.eventRepository.clientSend('on_asset_update', userId, mapAsset(asset));
}
this.eventRepository.clientSend('on_asset_update', userId, [assetId]);
}
@OnEvent({ name: 'assets.restore' })
@@ -211,6 +220,11 @@ export class NotificationService extends BaseService {
name: JobName.NOTIFY_ALBUM_UPDATE,
data: { id, recipientId: recipient, delay: NotificationService.albumUpdateEmailDelayMs },
});
this.eventRepository.clientSend('on_album_update', recipient, { albumId: id, assetId, status });
}
} else if (status === 'removed') {
for (const recipient of recipientId) {
this.eventRepository.clientSend('on_album_update', recipient, { albumId: id, assetId, status });
}
}

View File

@@ -16,6 +16,7 @@ export interface ReleaseEvent {
export interface Events {
on_upload_success: (asset: AssetResponseDto) => void;
on_user_delete: (id: string) => void;
on_activity_change: (data: { albumId: string; assetId: string | null }) => void;
on_album_update: (data: { albumId: string; assetId: string[]; status: 'added' | 'removed' }) => void;
on_asset_person: ({
assetId,
@@ -27,7 +28,7 @@ export interface Events {
}) => void;
on_asset_delete: (assetId: string) => void;
on_asset_trash: (assetIds: string[]) => void;
on_asset_update: (asset: AssetResponseDto) => void;
on_asset_update: (assetIds: string[]) => void;
on_asset_hidden: (assetId: string) => void;
on_asset_restore: (assetIds: string[]) => void;
on_asset_stack_update: (assetIds: string[]) => void;