mirror of
https://github.com/immich-app/immich.git
synced 2026-03-01 18:19:10 +03:00
chore(server): split album update notifications into multiple jobs (#17879)
We would like to move away from the concept of finding and removing pending jobs. The only place this is used is for album update notifications, and this is done so that users who initially uploaded assets to an album will also receive a notification if someone else then adds assets to the same album. This can also be achieved with a job for each recipient. Multiple jobs also has the advantage that it will scale better for albums with many users, it's possible to send notifications concurrently, retries are possible without sending duplicate notifications, and it's clear what recipient a job failed for.
This commit is contained in:
@@ -48,7 +48,7 @@ type EventMap = {
|
||||
'config.validate': [{ newConfig: SystemConfig; oldConfig: SystemConfig }];
|
||||
|
||||
// album events
|
||||
'album.update': [{ id: string; recipientIds: string[] }];
|
||||
'album.update': [{ id: string; recipientId: string }];
|
||||
'album.invite': [{ id: string; userId: string }];
|
||||
|
||||
// asset events
|
||||
|
||||
@@ -9,7 +9,7 @@ import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueName } from 'src/
|
||||
import { ConfigRepository } from 'src/repositories/config.repository';
|
||||
import { EventRepository } from 'src/repositories/event.repository';
|
||||
import { LoggingRepository } from 'src/repositories/logging.repository';
|
||||
import { IEntityJob, JobCounts, JobItem, JobOf, QueueStatus } from 'src/types';
|
||||
import { JobCounts, JobItem, JobOf, QueueStatus } from 'src/types';
|
||||
import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc';
|
||||
|
||||
type JobMapItem = {
|
||||
@@ -206,7 +206,10 @@ export class JobRepository {
|
||||
private getJobOptions(item: JobItem): JobsOptions | null {
|
||||
switch (item.name) {
|
||||
case JobName.NOTIFY_ALBUM_UPDATE: {
|
||||
return { jobId: item.data.id, delay: item.data?.delay };
|
||||
return {
|
||||
jobId: `${item.data.id}/${item.data.recipientId}`,
|
||||
delay: item.data?.delay,
|
||||
};
|
||||
}
|
||||
case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: {
|
||||
return { jobId: item.data.id };
|
||||
@@ -227,19 +230,12 @@ export class JobRepository {
|
||||
return this.moduleRef.get<Queue>(getQueueToken(queue), { strict: false });
|
||||
}
|
||||
|
||||
public async removeJob(jobId: string, name: JobName): Promise<IEntityJob | undefined> {
|
||||
const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobId);
|
||||
if (!existingJob) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
/** @deprecated */
|
||||
// todo: remove this when asset notifications no longer need it.
|
||||
public async removeJob(name: JobName, jobID: string): Promise<void> {
|
||||
const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobID);
|
||||
if (existingJob) {
|
||||
await existingJob.remove();
|
||||
} catch (error: any) {
|
||||
if (error.message?.includes('Missing key for job')) {
|
||||
return;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
return existingJob.data;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user