From c5c8fc56a5fecaa6a4dfd4b01a7d500a0191b504 Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Thu, 12 Feb 2026 20:42:38 -0500 Subject: [PATCH] pg impl --- pnpm-lock.yaml | 296 +------ server/package.json | 2 - server/src/app.module.ts | 13 +- server/src/repositories/config.repository.ts | 21 - server/src/repositories/job.repository.ts | 344 ++++++-- server/src/repositories/job.worker.ts | 298 +++++++ server/src/repositories/job.write-buffer.ts | 121 +++ server/src/schema/index.ts | 60 ++ server/src/schema/tables/job.table.ts | 800 ++++++++++++++++++ .../repositories/config.repository.mock.ts | 7 - .../test/repositories/job.repository.mock.ts | 1 + 11 files changed, 1582 insertions(+), 381 deletions(-) create mode 100644 server/src/repositories/job.worker.ts create mode 100644 server/src/repositories/job.write-buffer.ts create mode 100644 server/src/schema/tables/job.table.ts diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a435f3db6d..64c41e5003 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -67,7 +67,7 @@ importers: version: 24.10.13 '@vitest/coverage-v8': specifier: ^3.0.0 - version: 3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) + version: 3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) byte-size: specifier: ^9.0.0 version: 9.0.1 @@ -115,10 +115,10 @@ importers: version: 6.1.0(typescript@5.9.3)(vite@7.3.1(@types/node@24.10.13)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) vitest: specifier: ^3.0.0 - version: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) + version: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) vitest-fetch-mock: specifier: ^0.4.0 - version: 0.4.5(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) + version: 0.4.5(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) yaml: specifier: ^2.3.1 version: 2.8.2 @@ -343,9 +343,6 @@ importers: '@extism/extism': specifier: 2.0.0-rc13 version: 2.0.0-rc13 - '@nestjs/bullmq': - specifier: ^11.0.1 - version: 11.0.4(@nestjs/common@11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.13)(bullmq@5.67.3) '@nestjs/common': specifier: ^11.0.4 version: 11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2) @@ -424,9 +421,6 @@ importers: body-parser: specifier: ^2.2.0 version: 2.2.2 - bullmq: - specifier: ^5.51.0 - version: 5.67.3 chokidar: specifier: ^4.0.3 version: 4.0.3 @@ -670,7 +664,7 @@ importers: version: 13.15.10 '@vitest/coverage-v8': specifier: ^3.0.0 - version: 3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) + version: 3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) eslint: specifier: ^9.14.0 version: 9.39.2(jiti@2.6.1) @@ -727,7 +721,7 @@ importers: version: 6.1.0(typescript@5.9.3)(vite@7.3.1(@types/node@24.10.13)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) vitest: specifier: ^3.0.0 - version: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) + version: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) web: dependencies: @@ -781,7 +775,7 @@ importers: version: 2.6.0 fabric: specifier: ^6.5.4 - version: 6.9.1 + version: 6.9.1(encoding@0.1.13) geo-coordinates-parser: specifier: ^1.7.4 version: 1.7.4 @@ -881,7 +875,7 @@ importers: version: 6.9.1 '@testing-library/svelte': specifier: ^5.2.8 - version: 5.3.1(svelte@5.50.0)(vite@7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) + version: 5.3.1(svelte@5.50.0)(vite@7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) '@testing-library/user-event': specifier: ^14.5.2 version: 14.6.1(@testing-library/dom@10.4.1) @@ -905,7 +899,7 @@ importers: version: 1.5.6 '@vitest/coverage-v8': specifier: ^3.0.0 - version: 3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) + version: 3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) dotenv: specifier: ^17.0.0 version: 17.2.4 @@ -968,7 +962,7 @@ importers: version: 7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) vitest: specifier: ^3.0.0 - version: 3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) + version: 3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) packages: @@ -3402,52 +3396,9 @@ packages: '@microsoft/tsdoc@0.16.0': resolution: {integrity: sha512-xgAyonlVVS+q7Vc7qLW0UrJU7rSFcETRWsqdXZtjzRU8dF+6CkozTK4V4y1LwOX7j8r/vHphjDeMeGI4tNGeGA==} - '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3': - resolution: {integrity: sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==} - cpu: [arm64] - os: [darwin] - - '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3': - resolution: {integrity: sha512-mdzd3AVzYKuUmiWOQ8GNhl64/IoFGol569zNRdkLReh6LRLHOXxU4U8eq0JwaD8iFHdVGqSy4IjFL4reoWCDFw==} - cpu: [x64] - os: [darwin] - - '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3': - resolution: {integrity: sha512-YxQL+ax0XqBJDZiKimS2XQaf+2wDGVa1enVRGzEvLLVFeqa5kx2bWbtcSXgsxjQB7nRqqIGFIcLteF/sHeVtQg==} - cpu: [arm64] - os: [linux] - - '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3': - resolution: {integrity: sha512-fg0uy/dG/nZEXfYilKoRe7yALaNmHoYeIoJuJ7KJ+YyU2bvY8vPv27f7UKhGRpY6euFYqEVhxCFZgAUNQBM3nw==} - cpu: [arm] - os: [linux] - - '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3': - resolution: {integrity: sha512-cvwNfbP07pKUfq1uH+S6KJ7dT9K8WOE4ZiAcsrSes+UY55E/0jLYc+vq+DO7jlmqRb5zAggExKm0H7O/CBaesg==} - cpu: [x64] - os: [linux] - - '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3': - resolution: {integrity: sha512-x0fWaQtYp4E6sktbsdAqnehxDgEc/VwM7uLsRCYWaiGu0ykYdZPiS8zCWdnjHwyiumousxfBm4SO31eXqwEZhQ==} - cpu: [x64] - os: [win32] - '@namnode/store@0.1.0': resolution: {integrity: sha512-4NGTldxKcmY0UuZ7OEkvCjs8ZEoeYB6M2UwMu74pdLiFMKxXbj9HdNk1Qn213bxX1O7bY5h+PLh5DZsTURZkYA==} - '@nestjs/bull-shared@11.0.4': - resolution: {integrity: sha512-VBJcDHSAzxQnpcDfA0kt9MTGUD1XZzfByV70su0W0eDCQ9aqIEBlzWRW21tv9FG9dIut22ysgDidshdjlnczLw==} - peerDependencies: - '@nestjs/common': ^10.0.0 || ^11.0.0 - '@nestjs/core': ^10.0.0 || ^11.0.0 - - '@nestjs/bullmq@11.0.4': - resolution: {integrity: sha512-wBzK9raAVG0/6NTMdvLGM4/FQ1lsB35/pYS8L6a0SDgkTiLpd7mAjQ8R692oMx5s7IjvgntaZOuTUrKYLNfIkA==} - peerDependencies: - '@nestjs/common': ^10.0.0 || ^11.0.0 - '@nestjs/core': ^10.0.0 || ^11.0.0 - bullmq: ^3.0.0 || ^4.0.0 || ^5.0.0 - '@nestjs/cli@11.0.16': resolution: {integrity: sha512-P0H+Vcjki6P5160E5QnMt3Q0X5FTg4PZkP99Ig4lm/4JWqfw32j3EXv3YBTJ2DmxLwOQ/IS9F7dzKpMAgzKTGg==} engines: {node: '>= 20.11'} @@ -5744,9 +5695,6 @@ packages: resolution: {integrity: sha512-bkXY9WsVpY7CvMhKSR6pZilZu9Ln5WDrKVBUXf2S443etkmEO4V58heTecXcUIsNsi4Rx8JUO4NfX1IcQl4deg==} engines: {node: '>=18.20'} - bullmq@5.67.3: - resolution: {integrity: sha512-eeQobOJn8M0Rj8tcZCVFLrimZgJQallJH1JpclOoyut2nDNkDwTEPMVcZzLeSR2fGeIVbfJTjU96F563Qkge5A==} - bundle-name@4.1.0: resolution: {integrity: sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q==} engines: {node: '>=18'} @@ -6241,10 +6189,6 @@ packages: crelt@1.0.6: resolution: {integrity: sha512-VQ2MBenTq1fWZUH9DJNGti7kKv6EeAuYr3cLwxUWhIu1baTaXh4Ib5W2CqHVqib4/MqbYGJqiL3Zb8GJZr3l4g==} - cron-parser@4.9.0: - resolution: {integrity: sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==} - engines: {node: '>=12.0.0'} - cron@4.4.0: resolution: {integrity: sha512-fkdfq+b+AHI4cKdhZlppHveI/mgz2qpiYxcm+t5E5TsxX7QrLS1VE0+7GENEk9z0EeGPcpSciGv6ez24duWhwQ==} engines: {node: '>=18.x'} @@ -9104,13 +9048,6 @@ packages: ms@2.1.3: resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} - msgpackr-extract@3.0.3: - resolution: {integrity: sha512-P0efT1C9jIdVRefqjzOQ9Xml57zpOXnIuS+csaB4MdZbTdmGDLo8XhzBG1N7aO11gKDDkJvBLULeFTo46wwreA==} - hasBin: true - - msgpackr@1.11.5: - resolution: {integrity: sha512-UjkUHN0yqp9RWKy0Lplhh+wlpdt9oQBYgULZOiFhV3VclSF1JnSQWZ5r9gORQlNYaUKQoR8itv7g7z1xDDuACA==} - multer@2.0.2: resolution: {integrity: sha512-u7f2xaZ/UG8oLXHvtF/oWTRvT44p9ecwBBqTwgJVq0+4BW1g8OW01TyMEGWBHbyMOYVHXslaut7qEQ1meATXgw==} engines: {node: '>= 10.16.0'} @@ -9240,10 +9177,6 @@ packages: resolution: {integrity: sha512-rLvcdSyRCyouf6jcOIPe/BgwG/d7hKjzMKOas33/pHEr6gbq18IK9zV7DiPvzsz0oBJPme6qr6H6kGZuI9/DZg==} engines: {node: '>= 6.13.0'} - node-gyp-build-optional-packages@5.2.2: - resolution: {integrity: sha512-s+w+rBWnpTMwSFbaE0UXsRlg7hU4FjekKU4eyAih5T8nJuNZT1nNsskXpxmeqSK9UzkBl6UgRlnKc8hz8IEqOw==} - hasBin: true - node-gyp-build@4.8.4: resolution: {integrity: sha512-LA4ZjwlnUblHVgq0oBF3Jl/6h/Nvs5fzBLwdEF4nuxnFdsfajde4WfxtJr3CaiH+F6ewcIB/q4jQ4UzPyid+CQ==} hasBin: true @@ -10753,11 +10686,6 @@ packages: resolution: {integrity: sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==} hasBin: true - semver@7.7.3: - resolution: {integrity: sha512-SdsKMrI9TdgjdweUSR9MweHA4EJ8YxHn8DFaDisvhVlUOe4BF1tLD7GAj0lIqWVl+dPb/rExr0Btby5loQm20Q==} - engines: {node: '>=10'} - hasBin: true - semver@7.7.4: resolution: {integrity: sha512-vFKC2IEtQnVhpT78h1Yp8wzwrf8CM+MzKMHGJZfBtzhZNycRFnXsHk6E5TxIkkMsgNS7mdX3AGB7x2QM2di4lA==} engines: {node: '>=10'} @@ -15291,22 +15219,6 @@ snapshots: dependencies: mapbox-gl: 1.13.3 - '@mapbox/node-pre-gyp@1.0.11': - dependencies: - detect-libc: 2.1.2 - https-proxy-agent: 5.0.1 - make-dir: 3.1.0 - node-fetch: 2.7.0 - nopt: 5.0.0 - npmlog: 5.0.1 - rimraf: 3.0.2 - semver: 7.7.4 - tar: 6.2.1 - transitivePeerDependencies: - - encoding - - supports-color - optional: true - '@mapbox/node-pre-gyp@1.0.11(encoding@0.1.13)': dependencies: detect-libc: 2.1.2 @@ -15426,40 +15338,8 @@ snapshots: '@microsoft/tsdoc@0.16.0': {} - '@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3': - optional: true - - '@msgpackr-extract/msgpackr-extract-darwin-x64@3.0.3': - optional: true - - '@msgpackr-extract/msgpackr-extract-linux-arm64@3.0.3': - optional: true - - '@msgpackr-extract/msgpackr-extract-linux-arm@3.0.3': - optional: true - - '@msgpackr-extract/msgpackr-extract-linux-x64@3.0.3': - optional: true - - '@msgpackr-extract/msgpackr-extract-win32-x64@3.0.3': - optional: true - '@namnode/store@0.1.0': {} - '@nestjs/bull-shared@11.0.4(@nestjs/common@11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.13)': - dependencies: - '@nestjs/common': 11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2) - '@nestjs/core': 11.1.13(@nestjs/common@11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/platform-express@11.1.13)(@nestjs/websockets@11.1.13)(reflect-metadata@0.2.2)(rxjs@7.8.2) - tslib: 2.8.1 - - '@nestjs/bullmq@11.0.4(@nestjs/common@11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.13)(bullmq@5.67.3)': - dependencies: - '@nestjs/bull-shared': 11.0.4(@nestjs/common@11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.13) - '@nestjs/common': 11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2) - '@nestjs/core': 11.1.13(@nestjs/common@11.1.13(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/platform-express@11.1.13)(@nestjs/websockets@11.1.13)(reflect-metadata@0.2.2)(rxjs@7.8.2) - bullmq: 5.67.3 - tslib: 2.8.1 - '@nestjs/cli@11.0.16(@swc/core@1.15.11(@swc/helpers@0.5.17))(@types/node@24.10.13)': dependencies: '@angular-devkit/core': 19.2.19(chokidar@4.0.3) @@ -16627,14 +16507,14 @@ snapshots: dependencies: svelte: 5.50.0 - '@testing-library/svelte@5.3.1(svelte@5.50.0)(vite@7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))': + '@testing-library/svelte@5.3.1(svelte@5.50.0)(vite@7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))': dependencies: '@testing-library/dom': 10.4.1 '@testing-library/svelte-core': 1.0.0(svelte@5.50.0) svelte: 5.50.0 optionalDependencies: vite: 7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) - vitest: 3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) + vitest: 3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) '@testing-library/user-event@14.6.1(@testing-library/dom@10.4.1)': dependencies: @@ -17330,7 +17210,7 @@ snapshots: '@vercel/oidc@3.0.5': {} - '@vitest/coverage-v8@3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))': + '@vitest/coverage-v8@3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))': dependencies: '@ampproject/remapping': 2.3.0 '@bcoe/v8-coverage': 1.0.2 @@ -17345,11 +17225,11 @@ snapshots: std-env: 3.10.0 test-exclude: 7.0.1 tinyrainbow: 2.0.0 - vitest: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) + vitest: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) transitivePeerDependencies: - supports-color - '@vitest/coverage-v8@3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))': + '@vitest/coverage-v8@3.2.4(vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2))': dependencies: '@ampproject/remapping': 2.3.0 '@bcoe/v8-coverage': 1.0.2 @@ -17364,7 +17244,7 @@ snapshots: std-env: 3.10.0 test-exclude: 7.0.1 tinyrainbow: 2.0.0 - vitest: 3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) + vitest: 3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) transitivePeerDependencies: - supports-color @@ -17987,18 +17867,6 @@ snapshots: builtin-modules@5.0.0: {} - bullmq@5.67.3: - dependencies: - cron-parser: 4.9.0 - ioredis: 5.9.2 - msgpackr: 1.11.5 - node-abort-controller: 3.1.1 - semver: 7.7.3 - tslib: 2.8.1 - uuid: 11.1.0 - transitivePeerDependencies: - - supports-color - bundle-name@4.1.0: dependencies: run-applescript: 7.1.0 @@ -18084,16 +17952,6 @@ snapshots: caniuse-lite@1.0.30001769: {} - canvas@2.11.2: - dependencies: - '@mapbox/node-pre-gyp': 1.0.11 - nan: 2.24.0 - simple-get: 3.1.1 - transitivePeerDependencies: - - encoding - - supports-color - optional: true - canvas@2.11.2(encoding@0.1.13): dependencies: '@mapbox/node-pre-gyp': 1.0.11(encoding@0.1.13) @@ -18487,10 +18345,6 @@ snapshots: crelt@1.0.6: {} - cron-parser@4.9.0: - dependencies: - luxon: 3.7.2 - cron@4.4.0: dependencies: '@types/luxon': 3.7.1 @@ -19714,10 +19568,10 @@ snapshots: extend@3.0.2: {} - fabric@6.9.1: + fabric@6.9.1(encoding@0.1.13): optionalDependencies: - canvas: 2.11.2 - jsdom: 20.0.3(canvas@2.11.2) + canvas: 2.11.2(encoding@0.1.13) + jsdom: 20.0.3(canvas@2.11.2(encoding@0.1.13)) transitivePeerDependencies: - bufferutil - encoding @@ -20870,7 +20724,7 @@ snapshots: dependencies: argparse: 2.0.1 - jsdom@20.0.3(canvas@2.11.2): + jsdom@20.0.3(canvas@2.11.2(encoding@0.1.13)): dependencies: abab: 2.0.6 acorn: 8.15.0 @@ -20899,7 +20753,7 @@ snapshots: ws: 8.19.0 xml-name-validator: 4.0.0 optionalDependencies: - canvas: 2.11.2 + canvas: 2.11.2(encoding@0.1.13) transitivePeerDependencies: - bufferutil - supports-color @@ -20936,36 +20790,6 @@ snapshots: - utf-8-validate optional: true - jsdom@26.1.0(canvas@2.11.2): - dependencies: - cssstyle: 4.6.0 - data-urls: 5.0.0 - decimal.js: 10.6.0 - html-encoding-sniffer: 4.0.0 - http-proxy-agent: 7.0.2 - https-proxy-agent: 7.0.6 - is-potential-custom-element-name: 1.0.1 - nwsapi: 2.2.23 - parse5: 7.3.0 - rrweb-cssom: 0.8.0 - saxes: 6.0.0 - symbol-tree: 3.2.4 - tough-cookie: 5.1.2 - w3c-xmlserializer: 5.0.0 - webidl-conversions: 7.0.0 - whatwg-encoding: 3.1.1 - whatwg-mimetype: 4.0.0 - whatwg-url: 14.2.0 - ws: 8.19.0 - xml-name-validator: 5.0.0 - optionalDependencies: - canvas: 2.11.2 - transitivePeerDependencies: - - bufferutil - - supports-color - - utf-8-validate - optional: true - jsep@1.4.0: {} jsesc@3.1.0: {} @@ -22089,22 +21913,6 @@ snapshots: ms@2.1.3: {} - msgpackr-extract@3.0.3: - dependencies: - node-gyp-build-optional-packages: 5.2.2 - optionalDependencies: - '@msgpackr-extract/msgpackr-extract-darwin-arm64': 3.0.3 - '@msgpackr-extract/msgpackr-extract-darwin-x64': 3.0.3 - '@msgpackr-extract/msgpackr-extract-linux-arm': 3.0.3 - '@msgpackr-extract/msgpackr-extract-linux-arm64': 3.0.3 - '@msgpackr-extract/msgpackr-extract-linux-x64': 3.0.3 - '@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.3 - optional: true - - msgpackr@1.11.5: - optionalDependencies: - msgpackr-extract: 3.0.3 - multer@2.0.2: dependencies: append-field: 1.0.0 @@ -22223,11 +22031,6 @@ snapshots: emojilib: 2.4.0 skin-tone: 2.0.0 - node-fetch@2.7.0: - dependencies: - whatwg-url: 5.0.0 - optional: true - node-fetch@2.7.0(encoding@0.1.13): dependencies: whatwg-url: 5.0.0 @@ -22236,11 +22039,6 @@ snapshots: node-forge@1.3.3: {} - node-gyp-build-optional-packages@5.2.2: - dependencies: - detect-libc: 2.1.2 - optional: true - node-gyp-build@4.8.4: {} node-gyp@12.2.0: @@ -23938,8 +23736,6 @@ snapshots: semver@6.3.1: {} - semver@7.7.3: {} - semver@7.7.4: {} send@0.19.2: @@ -25379,9 +25175,9 @@ snapshots: optionalDependencies: vite: 7.3.1(@types/node@25.2.3)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) - vitest-fetch-mock@0.4.5(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)): + vitest-fetch-mock@0.4.5(vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)): dependencies: - vitest: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) + vitest: 3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2): dependencies: @@ -25427,51 +25223,7 @@ snapshots: - tsx - yaml - vitest@3.2.4(@types/debug@4.1.12)(@types/node@24.10.13)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2): - dependencies: - '@types/chai': 5.2.3 - '@vitest/expect': 3.2.4 - '@vitest/mocker': 3.2.4(vite@7.3.1(@types/node@24.10.13)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2)) - '@vitest/pretty-format': 3.2.4 - '@vitest/runner': 3.2.4 - '@vitest/snapshot': 3.2.4 - '@vitest/spy': 3.2.4 - '@vitest/utils': 3.2.4 - chai: 5.3.3 - debug: 4.4.3 - expect-type: 1.3.0 - magic-string: 0.30.21 - pathe: 2.0.3 - picomatch: 4.0.3 - std-env: 3.10.0 - tinybench: 2.9.0 - tinyexec: 0.3.2 - tinyglobby: 0.2.15 - tinypool: 1.1.1 - tinyrainbow: 2.0.0 - vite: 7.3.1(@types/node@24.10.13)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) - vite-node: 3.2.4(@types/node@24.10.13)(jiti@2.6.1)(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2) - why-is-node-running: 2.3.0 - optionalDependencies: - '@types/debug': 4.1.12 - '@types/node': 24.10.13 - happy-dom: 20.5.0 - jsdom: 26.1.0(canvas@2.11.2) - transitivePeerDependencies: - - jiti - - less - - lightningcss - - msw - - sass - - sass-embedded - - stylus - - sugarss - - supports-color - - terser - - tsx - - yaml - - vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2): + vitest@3.2.4(@types/debug@4.1.12)(@types/node@25.2.3)(happy-dom@20.5.0)(jiti@2.6.1)(jsdom@26.1.0(canvas@2.11.2(encoding@0.1.13)))(lightningcss@1.30.2)(sass@1.97.1)(terser@5.44.1)(tsx@4.21.0)(yaml@2.8.2): dependencies: '@types/chai': 5.2.3 '@vitest/expect': 3.2.4 @@ -25500,7 +25252,7 @@ snapshots: '@types/debug': 4.1.12 '@types/node': 25.2.3 happy-dom: 20.5.0 - jsdom: 26.1.0(canvas@2.11.2) + jsdom: 26.1.0(canvas@2.11.2(encoding@0.1.13)) transitivePeerDependencies: - jiti - less diff --git a/server/package.json b/server/package.json index 80427642e5..dec72a096a 100644 --- a/server/package.json +++ b/server/package.json @@ -35,7 +35,6 @@ }, "dependencies": { "@extism/extism": "2.0.0-rc13", - "@nestjs/bullmq": "^11.0.1", "@nestjs/common": "^11.0.4", "@nestjs/core": "^11.0.4", "@nestjs/platform-express": "^11.0.4", @@ -62,7 +61,6 @@ "async-lock": "^1.4.0", "bcrypt": "^6.0.0", "body-parser": "^2.2.0", - "bullmq": "^5.51.0", "chokidar": "^4.0.3", "class-transformer": "^0.5.1", "class-validator": "^0.14.0", diff --git a/server/src/app.module.ts b/server/src/app.module.ts index 49b779ca18..5b907ccd2c 100644 --- a/server/src/app.module.ts +++ b/server/src/app.module.ts @@ -1,4 +1,3 @@ -import { BullModule } from '@nestjs/bullmq'; import { Inject, Module, OnModuleDestroy, OnModuleInit, ValidationPipe } from '@nestjs/common'; import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE } from '@nestjs/core'; import { ScheduleModule, SchedulerRegistry } from '@nestjs/schedule'; @@ -22,6 +21,7 @@ import { LoggingInterceptor } from 'src/middleware/logging.interceptor'; import { repositories } from 'src/repositories'; import { AppRepository } from 'src/repositories/app.repository'; import { ConfigRepository } from 'src/repositories/config.repository'; +import { JobRepository } from 'src/repositories/job.repository'; import { DatabaseRepository } from 'src/repositories/database.repository'; import { EventRepository } from 'src/repositories/event.repository'; import { LoggingRepository } from 'src/repositories/logging.repository'; @@ -49,7 +49,7 @@ const commonMiddleware = [ const apiMiddleware = [FileUploadInterceptor, ...commonMiddleware, { provide: APP_GUARD, useClass: AuthGuard }]; const configRepository = new ConfigRepository(); -const { bull, cls, database, otel } = configRepository.getEnv(); +const { cls, database, otel } = configRepository.getEnv(); const commonImports = [ ClsModule.forRoot(cls.config), @@ -57,7 +57,6 @@ const commonImports = [ OpenTelemetryModule.forRoot(otel), ]; -const bullImports = [BullModule.forRoot(bull.config), BullModule.registerQueue(...bull.queues)]; export class BaseModule implements OnModuleInit, OnModuleDestroy { constructor( @@ -65,6 +64,7 @@ export class BaseModule implements OnModuleInit, OnModuleDestroy { logger: LoggingRepository, private authService: AuthService, private eventRepository: EventRepository, + private jobRepository: JobRepository, private queueService: QueueService, private telemetryRepository: TelemetryRepository, private websocketRepository: WebsocketRepository, @@ -91,12 +91,13 @@ export class BaseModule implements OnModuleInit, OnModuleDestroy { async onModuleDestroy() { await this.eventRepository.emit('AppShutdown'); + await this.jobRepository.onShutdown(); await teardownTelemetry(); } } @Module({ - imports: [...bullImports, ...commonImports, ScheduleModule.forRoot()], + imports: [...commonImports, ScheduleModule.forRoot()], controllers: [...controllers], providers: [...common, ...apiMiddleware, { provide: IWorker, useValue: ImmichWorker.Api }], }) @@ -137,13 +138,13 @@ export class MaintenanceModule { } @Module({ - imports: [...bullImports, ...commonImports], + imports: [...commonImports], providers: [...common, { provide: IWorker, useValue: ImmichWorker.Microservices }, SchedulerRegistry], }) export class MicroservicesModule extends BaseModule {} @Module({ - imports: [...bullImports, ...commonImports], + imports: [...commonImports], providers: [...common, ...commandsAndQuestions, SchedulerRegistry], }) export class ImmichAdminModule implements OnModuleDestroy { diff --git a/server/src/repositories/config.repository.ts b/server/src/repositories/config.repository.ts index 54a5d1987f..f73eceacd8 100644 --- a/server/src/repositories/config.repository.ts +++ b/server/src/repositories/config.repository.ts @@ -1,6 +1,4 @@ -import { RegisterQueueOptions } from '@nestjs/bullmq'; import { Inject, Injectable, Optional } from '@nestjs/common'; -import { QueueOptions } from 'bullmq'; import { plainToInstance } from 'class-transformer'; import { validateSync } from 'class-validator'; import { Request, Response } from 'express'; @@ -19,7 +17,6 @@ import { ImmichWorker, LogFormat, LogLevel, - QueueName, } from 'src/enum'; import { DatabaseConnectionParams, VectorExtension } from 'src/types'; import { setDifference } from 'src/utils/set'; @@ -48,11 +45,6 @@ export interface EnvData { thirdPartySupportUrl?: string; }; - bull: { - config: QueueOptions; - queues: RegisterQueueOptions[]; - }; - cls: { config: ClsModuleOptions; }; @@ -253,19 +245,6 @@ const getEnv = (): EnvData => { thirdPartySupportUrl: dto.IMMICH_THIRD_PARTY_SUPPORT_URL, }, - bull: { - config: { - prefix: 'immich_bull', - connection: { ...redisConfig }, - defaultJobOptions: { - attempts: 1, - removeOnComplete: true, - removeOnFail: false, - }, - }, - queues: Object.values(QueueName).map((name) => ({ name })), - }, - cls: { config: { middleware: { diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index b12accb68e..9bc7c40988 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -1,16 +1,21 @@ -import { getQueueToken } from '@nestjs/bullmq'; import { Injectable } from '@nestjs/common'; import { ModuleRef, Reflector } from '@nestjs/core'; -import { JobsOptions, Queue, Worker } from 'bullmq'; +import { Kysely, sql } from 'kysely'; import { ClassConstructor } from 'class-transformer'; import { setTimeout } from 'node:timers/promises'; +import { InjectKysely } from 'nestjs-kysely'; +import postgres from 'postgres'; import { JobConfig } from 'src/decorators'; import { QueueJobResponseDto, QueueJobSearchDto } from 'src/dtos/queue.dto'; import { JobName, JobStatus, MetadataKey, QueueCleanType, QueueJobStatus, QueueName } from 'src/enum'; import { ConfigRepository } from 'src/repositories/config.repository'; import { EventRepository } from 'src/repositories/event.repository'; +import { QUEUE_TABLE, WriteBuffer } from 'src/repositories/job.write-buffer'; +import { charToJobName, jobNameToChar, QueueWorker } from 'src/repositories/job.worker'; import { LoggingRepository } from 'src/repositories/logging.repository'; +import { DB } from 'src/schema'; import { JobCounts, JobItem, JobOf } from 'src/types'; +import { asPostgresConnectionConfig } from 'src/utils/database'; import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc'; type JobMapItem = { @@ -20,16 +25,73 @@ type JobMapItem = { label: string; }; +// Status char codes +const STATUS_PENDING = 'p'; +const STATUS_ACTIVE = 'a'; +const STATUS_FAILED = 'f'; + +// Stall timeouts in milliseconds +const STALL_LONG = 60 * 60 * 1000; // 1 hour +const STALL_MEDIUM = 30 * 60 * 1000; // 30 min +const STALL_DEFAULT = 5 * 60 * 1000; // 5 min + +const getStallTimeout = (queueName: QueueName): number => { + switch (queueName) { + case QueueName.VideoConversion: + case QueueName.BackupDatabase: + case QueueName.Editor: { + return STALL_LONG; + } + case QueueName.Library: + case QueueName.StorageTemplateMigration: { + return STALL_MEDIUM; + } + default: { + return STALL_DEFAULT; + } + } +}; + +const getClaimBatch = (queueName: QueueName): number => { + switch (queueName) { + case QueueName.VideoConversion: + case QueueName.BackupDatabase: + case QueueName.StorageTemplateMigration: + case QueueName.Editor: + case QueueName.FacialRecognition: + case QueueName.DuplicateDetection: { + return 1; + } + default: { + return 100; // will be clamped to slotsAvailable by the worker + } + } +}; + +// Map QueueJobStatus to our "char" status codes +const STATUS_FILTER: Record = { + [QueueJobStatus.Active]: STATUS_ACTIVE, + [QueueJobStatus.Failed]: STATUS_FAILED, + [QueueJobStatus.Waiting]: STATUS_PENDING, + [QueueJobStatus.Complete]: null, // completed jobs are deleted + [QueueJobStatus.Delayed]: STATUS_PENDING, // delayed = pending with future run_after + [QueueJobStatus.Paused]: STATUS_PENDING, // paused queue has pending jobs +}; + @Injectable() export class JobRepository { - private workers: Partial> = {}; + private workers: Partial> = {}; private handlers: Partial> = {}; + private writeBuffer!: WriteBuffer; + private listenConn: postgres.Sql | null = null; + private pauseState: Partial> = {}; constructor( private moduleRef: ModuleRef, private configRepository: ConfigRepository, private eventRepository: EventRepository, private logger: LoggingRepository, + @InjectKysely() private db: Kysely, ) { this.logger.setContext(JobRepository.name); } @@ -85,15 +147,53 @@ export class JobRepository { } startWorkers() { - const { bull } = this.configRepository.getEnv(); - for (const queueName of Object.values(QueueName)) { - this.logger.debug(`Starting worker for queue: ${queueName}`); - this.workers[queueName] = new Worker( - queueName, - (job) => this.eventRepository.emit('JobRun', queueName, job as JobItem), - { ...bull.config, concurrency: 1 }, - ); - } + this.writeBuffer = new WriteBuffer(this.db, (queue) => this.notify(queue)); + + // Startup sweep: reset any active jobs from a previous crash + const startupPromises = Object.values(QueueName).map(async (queueName) => { + const tableName = QUEUE_TABLE[queueName]; + await sql` + UPDATE ${sql.table(tableName)} + SET "status" = ${STATUS_PENDING}::"char", "started_at" = NULL, "expires_at" = NULL + WHERE "status" = ${STATUS_ACTIVE}::"char" + `.execute(this.db); + }); + + // Load pause state and setup workers + void Promise.all(startupPromises).then(async () => { + // Load pause state from DB + const metaRows = await this.db.selectFrom('job_queue_meta').selectAll().execute(); + for (const row of metaRows) { + this.pauseState[row.queue_name as QueueName] = row.is_paused; + } + + // Create workers + for (const queueName of Object.values(QueueName)) { + const worker = new QueueWorker({ + queueName, + tableName: QUEUE_TABLE[queueName], + stallTimeout: getStallTimeout(queueName), + claimBatch: getClaimBatch(queueName), + concurrency: 1, + db: this.db, + onJob: (job) => this.eventRepository.emit('JobRun', queueName, job), + }); + + if (this.pauseState[queueName]) { + worker.pause(); + } + + this.workers[queueName] = worker; + } + + // Setup LISTEN/NOTIFY + await this.setupListen(); + + // Trigger initial fetch for all workers + for (const worker of Object.values(this.workers)) { + worker.onNotification(); + } + }); } async run({ name, data }: JobItem) { @@ -113,44 +213,92 @@ export class JobRepository { return; } - worker.concurrency = concurrency; + worker.setConcurrency(concurrency); } - async isActive(name: QueueName): Promise { - const queue = this.getQueue(name); - const count = await queue.getActiveCount(); - return count > 0; + isActive(name: QueueName): Promise { + const worker = this.workers[name]; + return Promise.resolve(worker ? worker.activeJobCount > 0 : false); } - async isPaused(name: QueueName): Promise { - return this.getQueue(name).isPaused(); + isPaused(name: QueueName): Promise { + return Promise.resolve(this.pauseState[name] ?? false); } - pause(name: QueueName) { - return this.getQueue(name).pause(); + async pause(name: QueueName) { + this.pauseState[name] = true; + await this.db + .insertInto('job_queue_meta') + .values({ queue_name: name, is_paused: true }) + .onConflict((oc) => oc.column('queue_name').doUpdateSet({ is_paused: true })) + .execute(); + this.workers[name]?.pause(); } - resume(name: QueueName) { - return this.getQueue(name).resume(); + async resume(name: QueueName) { + this.pauseState[name] = false; + await this.db + .insertInto('job_queue_meta') + .values({ queue_name: name, is_paused: false }) + .onConflict((oc) => oc.column('queue_name').doUpdateSet({ is_paused: false })) + .execute(); + this.workers[name]?.resume(); } - empty(name: QueueName) { - return this.getQueue(name).drain(); + async empty(name: QueueName) { + const tableName = QUEUE_TABLE[name]; + await sql`DELETE FROM ${sql.table(tableName)} WHERE "status" = ${STATUS_PENDING}::"char"`.execute(this.db); } - clear(name: QueueName, type: QueueCleanType) { - return this.getQueue(name).clean(0, 1000, type); + async clear(name: QueueName, _type: QueueCleanType) { + const tableName = QUEUE_TABLE[name]; + await sql`DELETE FROM ${sql.table(tableName)} WHERE "status" = ${STATUS_FAILED}::"char"`.execute(this.db); } - getJobCounts(name: QueueName): Promise { - return this.getQueue(name).getJobCounts( - 'active', - 'completed', - 'failed', - 'delayed', - 'waiting', - 'paused', - ) as unknown as Promise; + async getJobCounts(name: QueueName): Promise { + const tableName = QUEUE_TABLE[name]; + const result = await sql<{ status: string; count: string }>` + SELECT "status", count(*)::text as count FROM ${sql.table(tableName)} GROUP BY "status" + `.execute(this.db); + + const counts: JobCounts = { + active: 0, + completed: 0, + failed: 0, + delayed: 0, + waiting: 0, + paused: 0, + }; + + for (const row of result.rows) { + switch (row.status) { + case STATUS_PENDING: { + counts.waiting = Number(row.count); + break; + } + case STATUS_ACTIVE: { + counts.active = Number(row.count); + break; + } + case STATUS_FAILED: { + counts.failed = Number(row.count); + break; + } + } + } + + // In-memory active count may be more accurate than DB for in-flight jobs + const worker = this.workers[name]; + if (worker) { + counts.active = worker.activeJobCount; + } + + if (this.pauseState[name]) { + counts.paused = counts.waiting; + counts.waiting = 0; + } + + return counts; } private getQueueName(name: JobName) { @@ -162,31 +310,24 @@ export class JobRepository { return; } - const promises = []; - const itemsByQueue = {} as Record; + const bufferItems: { queue: QueueName; row: { name: string; data: unknown; priority: number; dedup_key: string | null; run_after: Date } }[] = []; + for (const item of items) { const queueName = this.getQueueName(item.name); - const job = { - name: item.name, - data: item.data || {}, - options: this.getJobOptions(item) || undefined, - } as JobItem & { data: any; options: JobsOptions | undefined }; - - if (job.options?.jobId) { - // need to use add() instead of addBulk() for jobId deduplication - promises.push(this.getQueue(queueName).add(item.name, item.data, job.options)); - } else { - itemsByQueue[queueName] = itemsByQueue[queueName] || []; - itemsByQueue[queueName].push(job); - } + const options = this.getJobOptions(item); + bufferItems.push({ + queue: queueName, + row: { + name: jobNameToChar(item.name), + data: item.data || {}, + priority: options?.priority ?? 0, + dedup_key: options?.dedupKey ?? null, + run_after: options?.delay ? new Date(Date.now() + options.delay) : new Date(), + }, + }); } - for (const [queueName, jobs] of Object.entries(itemsByQueue)) { - const queue = this.getQueue(queueName as QueueName); - promises.push(queue.addBulk(jobs)); - } - - await Promise.all(promises); + await this.writeBuffer.add(bufferItems); } async queue(item: JobItem): Promise { @@ -209,29 +350,50 @@ export class JobRepository { } async searchJobs(name: QueueName, dto: QueueJobSearchDto): Promise { - const jobs = await this.getQueue(name).getJobs(dto.status ?? Object.values(QueueJobStatus), 0, 1000); - return jobs.map((job) => { - const { id, name, timestamp, data } = job; - return { id, name: name as JobName, timestamp, data }; - }); + const tableName = QUEUE_TABLE[name]; + const statuses = dto.status ?? Object.values(QueueJobStatus); + const charStatuses = statuses + .map((s) => STATUS_FILTER[s]) + .filter((s): s is string => s !== null); + + if (charStatuses.length === 0) { + return []; + } + + const uniqueStatuses = [...new Set(charStatuses)]; + + const rows = await sql<{ id: number; name: string; data: unknown; run_after: Date }>` + SELECT "id", "name", "data", "run_after" + FROM ${sql.table(tableName)} + WHERE "status" = ANY(${sql.val(uniqueStatuses)}::"char"[]) + ORDER BY "id" DESC + LIMIT 1000 + `.execute(this.db); + + return rows.rows.map((row) => ({ + id: String(row.id), + name: charToJobName(row.name) ?? (row.name as unknown as JobName), + data: (row.data ?? {}) as object, + timestamp: new Date(row.run_after).getTime(), + })); } - private getJobOptions(item: JobItem): JobsOptions | null { + private getJobOptions(item: JobItem): { dedupKey?: string; priority?: number; delay?: number } | null { switch (item.name) { case JobName.NotifyAlbumUpdate: { return { - jobId: `${item.data.id}/${item.data.recipientId}`, + dedupKey: `${item.data.id}/${item.data.recipientId}`, delay: item.data?.delay, }; } case JobName.StorageTemplateMigrationSingle: { - return { jobId: item.data.id }; + return { dedupKey: item.data.id }; } case JobName.PersonGenerateThumbnail: { return { priority: 1 }; } case JobName.FacialRecognitionQueueAll: { - return { jobId: JobName.FacialRecognitionQueueAll }; + return { dedupKey: JobName.FacialRecognitionQueueAll }; } default: { return null; @@ -239,16 +401,52 @@ export class JobRepository { } } - private getQueue(queue: QueueName): Queue { - return this.moduleRef.get(getQueueToken(queue), { strict: false }); - } - /** @deprecated */ // todo: remove this when asset notifications no longer need it. public async removeJob(name: JobName, jobID: string): Promise { - const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobID); - if (existingJob) { - await existingJob.remove(); + const queueName = this.getQueueName(name); + const tableName = QUEUE_TABLE[queueName]; + await sql`DELETE FROM ${sql.table(tableName)} WHERE "id" = ${Number(jobID)}`.execute(this.db); + } + + private async setupListen(): Promise { + const { database } = this.configRepository.getEnv(); + const pgConfig = asPostgresConnectionConfig(database.config); + this.listenConn = postgres({ + host: pgConfig.host, + port: pgConfig.port, + username: pgConfig.username, + password: pgConfig.password as string | undefined, + database: pgConfig.database, + ssl: pgConfig.ssl as boolean | undefined, + max: 1, + }); + + for (const queueName of Object.values(QueueName)) { + await this.listenConn.listen(`jobs:${queueName}`, () => { + this.workers[queueName]?.onNotification(); + }); + } + } + + private async notify(queue: QueueName): Promise { + await sql`SELECT pg_notify(${`jobs:${queue}`}, '')`.execute(this.db); + } + + async onShutdown(): Promise { + // Stop workers + const shutdownPromises = Object.values(this.workers).map((worker) => worker.shutdown()); + await Promise.all(shutdownPromises); + + // Flush write buffer + if (this.writeBuffer) { + await this.writeBuffer.flush(); + } + + // Close LISTEN connection + if (this.listenConn) { + await this.listenConn.end(); + this.listenConn = null; } } } diff --git a/server/src/repositories/job.worker.ts b/server/src/repositories/job.worker.ts new file mode 100644 index 0000000000..4cb423a6d1 --- /dev/null +++ b/server/src/repositories/job.worker.ts @@ -0,0 +1,298 @@ +import { Kysely, sql } from 'kysely'; +import { JobName, QueueName } from 'src/enum'; +import { DB } from 'src/schema'; +import { JobItem } from 'src/types'; + +// Job status codes stored as "char" (single-byte PostgreSQL type) +const STATUS_PENDING = 'p'; +const STATUS_ACTIVE = 'a'; +const STATUS_FAILED = 'f'; + +// Bidirectional JobName <-> "char" mapping +const JOB_CHAR: Record = {}; +const CHAR_JOB: Record = {}; + +// Assign sequential character codes starting from 0x01 +let charCode = 1; +for (const jobName of Object.values(JobName)) { + const char = String.fromCodePoint(charCode++); + JOB_CHAR[jobName] = char; + CHAR_JOB[char] = jobName; +} + +export const jobNameToChar = (name: JobName): string => JOB_CHAR[name]; +export const charToJobName = (char: string): JobName | undefined => CHAR_JOB[char]; + +type JobRow = { + id: number; + name: string; + data: unknown; + priority: number; + status: string; + dedup_key: string | null; + run_after: Date; + started_at: Date | null; + expires_at: Date | null; + error: string | null; +}; + +export interface QueueWorkerOptions { + queueName: QueueName; + tableName: string; + stallTimeout: number; + claimBatch: number; + concurrency: number; + db: Kysely; + onJob: (job: JobItem) => Promise; +} + +export class QueueWorker { + private concurrency: number; + private activeCount = 0; + private activeJobs = new Map(); + private hasPending = true; + private fetching = false; + private paused = false; + private stopped = false; + private heartbeatTimer: ReturnType | null = null; + + private readonly queueName: QueueName; + private readonly tableName: string; + private readonly stallTimeout: number; + private readonly claimBatch: number; + private readonly db: Kysely; + private readonly onJobFn: (job: JobItem) => Promise; + + constructor(options: QueueWorkerOptions) { + this.queueName = options.queueName; + this.tableName = options.tableName; + this.stallTimeout = options.stallTimeout; + this.claimBatch = options.claimBatch; + this.concurrency = options.concurrency; + this.db = options.db; + this.onJobFn = options.onJob; + } + + get activeJobCount(): number { + return this.activeCount; + } + + onNotification(): void { + this.hasPending = true; + void this.tryFetch(); + } + + setConcurrency(n: number): void { + this.concurrency = n; + void this.tryFetch(); + } + + pause(): void { + this.paused = true; + } + + resume(): void { + this.paused = false; + this.hasPending = true; + void this.tryFetch(); + } + + async shutdown(): Promise { + this.stopped = true; + this.stopHeartbeat(); + + // Re-queue active jobs + if (this.activeJobs.size > 0) { + const ids = [...this.activeJobs.keys()]; + await sql` + UPDATE ${sql.table(this.tableName)} + SET "status" = ${STATUS_PENDING}::"char", "started_at" = NULL, "expires_at" = NULL + WHERE "id" = ANY(${sql.val(ids)}::bigint[]) + `.execute(this.db); + } + } + + private get slotsAvailable(): number { + return Math.max(0, this.concurrency - this.activeCount); + } + + private async tryFetch(): Promise { + if (this.fetching || this.paused || this.stopped) { + return; + } + this.fetching = true; + try { + while (this.slotsAvailable > 0 && this.hasPending && !this.stopped) { + const limit = Math.min(this.slotsAvailable, this.claimBatch); + const jobs = await this.claim(limit); + if (jobs.length === 0) { + const recovered = await this.recoverStalled(); + if (recovered === 0) { + this.hasPending = false; + break; + } + continue; + } + this.activeCount += jobs.length; + for (const job of jobs) { + void this.processJob(job); + } + } + } finally { + this.fetching = false; + } + } + + private async processJob(row: JobRow): Promise { + this.activeJobs.set(row.id, { startedAt: Date.now() }); + this.startHeartbeat(); + try { + const jobName = charToJobName(row.name); + if (!jobName) { + throw new Error(`Unknown job char code: ${row.name.codePointAt(0)}`); + } + await this.onJobFn({ name: jobName, data: row.data } as JobItem); + // Success: delete completed job and try to fetch next + const next = await this.completeAndFetch(row.id, true); + this.activeJobs.delete(row.id); + if (next) { + void this.processJob(next); + } else { + this.activeCount--; + this.hasPending = false; + } + } catch (error: unknown) { + // Failure: mark as failed and try to fetch next + const errorMsg = error instanceof Error ? error.message : String(error); + const next = await this.completeAndFetch(row.id, false, errorMsg); + this.activeJobs.delete(row.id); + if (next) { + void this.processJob(next); + } else { + this.activeCount--; + this.hasPending = false; + } + } finally { + if (this.activeJobs.size === 0) { + this.stopHeartbeat(); + } + } + } + + /** + * Claim up to `limit` pending jobs using FOR UPDATE SKIP LOCKED + */ + private async claim(limit: number): Promise { + const result = await sql` + UPDATE ${sql.table(this.tableName)} SET + "status" = ${STATUS_ACTIVE}::"char", + "started_at" = now(), + "expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval + WHERE "id" IN ( + SELECT "id" FROM ${sql.table(this.tableName)} + WHERE "status" = ${STATUS_PENDING}::"char" AND "run_after" <= now() + ORDER BY "priority" DESC, "id" ASC + FOR UPDATE SKIP LOCKED + LIMIT ${sql.lit(limit)} + ) + RETURNING * + `.execute(this.db); + return result.rows as JobRow[]; + } + + /** + * Atomically complete a job (delete on success, mark failed on failure) and claim the next one. + * Uses a CTE to combine operations in a single round-trip. + */ + private async completeAndFetch( + jobId: number, + success: boolean, + errorMsg?: string, + ): Promise { + if (success) { + const result = await sql` + WITH completed AS ( + DELETE FROM ${sql.table(this.tableName)} WHERE "id" = ${jobId} + ), + next AS ( + SELECT "id" FROM ${sql.table(this.tableName)} + WHERE "status" = ${STATUS_PENDING}::"char" AND "run_after" <= now() + ORDER BY "priority" DESC, "id" ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + UPDATE ${sql.table(this.tableName)} SET + "status" = ${STATUS_ACTIVE}::"char", + "started_at" = now(), + "expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval + WHERE "id" = (SELECT "id" FROM next) + RETURNING * + `.execute(this.db); + return (result.rows as JobRow[])[0]; + } + + const result = await sql` + WITH failed AS ( + UPDATE ${sql.table(this.tableName)} + SET "status" = ${STATUS_FAILED}::"char", "error" = ${errorMsg ?? null} + WHERE "id" = ${jobId} + ), + next AS ( + SELECT "id" FROM ${sql.table(this.tableName)} + WHERE "status" = ${STATUS_PENDING}::"char" AND "run_after" <= now() + ORDER BY "priority" DESC, "id" ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + UPDATE ${sql.table(this.tableName)} SET + "status" = ${STATUS_ACTIVE}::"char", + "started_at" = now(), + "expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval + WHERE "id" = (SELECT "id" FROM next) + RETURNING * + `.execute(this.db); + return (result.rows as JobRow[])[0]; + } + + /** + * Recover stalled jobs: reset jobs whose expires_at has passed + */ + private async recoverStalled(): Promise { + const result = await sql` + UPDATE ${sql.table(this.tableName)} + SET "status" = ${STATUS_PENDING}::"char", "started_at" = NULL, "expires_at" = NULL + WHERE "status" = ${STATUS_ACTIVE}::"char" AND "expires_at" < now() + `.execute(this.db); + return Number(result.numAffectedRows ?? 0); + } + + /** + * Extend expiry for all active jobs (heartbeat) + */ + private async extendExpiry(): Promise { + if (this.activeJobs.size === 0) { + return; + } + const ids = [...this.activeJobs.keys()]; + await sql` + UPDATE ${sql.table(this.tableName)} + SET "expires_at" = now() + ${sql.lit(`'${this.stallTimeout} milliseconds'`)}::interval + WHERE "id" = ANY(${sql.val(ids)}::bigint[]) + `.execute(this.db); + } + + private startHeartbeat(): void { + if (this.heartbeatTimer) { + return; + } + const interval = Math.max(1000, Math.floor(this.stallTimeout / 2)); + this.heartbeatTimer = setInterval(() => void this.extendExpiry(), interval); + } + + private stopHeartbeat(): void { + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + } +} diff --git a/server/src/repositories/job.write-buffer.ts b/server/src/repositories/job.write-buffer.ts new file mode 100644 index 0000000000..3b10dd0e70 --- /dev/null +++ b/server/src/repositories/job.write-buffer.ts @@ -0,0 +1,121 @@ +import { Kysely, sql } from 'kysely'; +import { QueueName } from 'src/enum'; +import { DB } from 'src/schema'; + +export type InsertRow = { + name: string; + data: unknown; + priority: number; + dedup_key: string | null; + run_after: Date; +}; + +type QueueTableName = keyof DB & `jobs_${string}`; + +export const QUEUE_TABLE: Record = { + [QueueName.ThumbnailGeneration]: 'jobs_thumbnail_generation', + [QueueName.MetadataExtraction]: 'jobs_metadata_extraction', + [QueueName.VideoConversion]: 'jobs_video_conversion', + [QueueName.FaceDetection]: 'jobs_face_detection', + [QueueName.FacialRecognition]: 'jobs_facial_recognition', + [QueueName.SmartSearch]: 'jobs_smart_search', + [QueueName.DuplicateDetection]: 'jobs_duplicate_detection', + [QueueName.BackgroundTask]: 'jobs_background_task', + [QueueName.StorageTemplateMigration]: 'jobs_storage_template_migration', + [QueueName.Migration]: 'jobs_migration', + [QueueName.Search]: 'jobs_search', + [QueueName.Sidecar]: 'jobs_sidecar', + [QueueName.Library]: 'jobs_library', + [QueueName.Notification]: 'jobs_notification', + [QueueName.BackupDatabase]: 'jobs_backup_database', + [QueueName.Ocr]: 'jobs_ocr', + [QueueName.Workflow]: 'jobs_workflow', + [QueueName.Editor]: 'jobs_editor', +}; + +type Deferred = { promise: Promise; resolve: () => void }; + +const createDeferred = (): Deferred => { + let resolve!: () => void; + const promise = new Promise((r) => (resolve = r)); + return { promise, resolve }; +}; + +const CHUNK_SIZE = 5000; + +export class WriteBuffer { + private buffers = new Map(); + private pending: Deferred | null = null; + private timer: ReturnType | null = null; + + constructor( + private db: Kysely, + private notify: (queue: QueueName) => Promise, + ) {} + + async add(items: { queue: QueueName; row: InsertRow }[]): Promise { + for (const { queue, row } of items) { + let buf = this.buffers.get(queue); + if (!buf) { + buf = []; + this.buffers.set(queue, buf); + } + buf.push(row); + } + if (!this.timer) { + this.pending = createDeferred(); + this.timer = setTimeout(() => void this.flush(), 10); + } + return this.pending!.promise; + } + + async flush(): Promise { + const snapshot = this.buffers; + this.buffers = new Map(); + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + const deferred = this.pending; + this.pending = null; + + if (snapshot.size === 0) { + deferred?.resolve(); + return; + } + + try { + for (const [queue, rows] of snapshot) { + const tableName = QUEUE_TABLE[queue]; + for (let i = 0; i < rows.length; i += CHUNK_SIZE) { + const chunk = rows.slice(i, i + CHUNK_SIZE); + await this.insertChunk(tableName, chunk); + } + await this.notify(queue); + } + } finally { + deferred?.resolve(); + } + } + + private async insertChunk(tableName: string, rows: InsertRow[]): Promise { + const names = rows.map((r) => r.name); + const datas = rows.map((r) => JSON.stringify(r.data)); + const priorities = rows.map((r) => r.priority); + const dedupKeys = rows.map((r) => r.dedup_key); + const runAfters = rows.map((r) => r.run_after.toISOString()); + + await sql` + INSERT INTO ${sql.table(tableName)} ("name", "data", "priority", "dedup_key", "run_after") + SELECT * FROM unnest( + ${sql.val(names)}::"char"[], + ${sql.val(datas)}::jsonb[], + ${sql.val(priorities)}::smallint[], + ${sql.val(dedupKeys)}::text[], + ${sql.val(runAfters)}::timestamptz[] + ) + ON CONFLICT ("dedup_key") WHERE "dedup_key" IS NOT NULL AND "status" = 'p'::"char" + DO NOTHING + `.execute(this.db); + } +} diff --git a/server/src/schema/index.ts b/server/src/schema/index.ts index 4dc3d40312..283f09be13 100644 --- a/server/src/schema/index.ts +++ b/server/src/schema/index.ts @@ -41,6 +41,27 @@ import { AssetTable } from 'src/schema/tables/asset.table'; import { AuditTable } from 'src/schema/tables/audit.table'; import { FaceSearchTable } from 'src/schema/tables/face-search.table'; import { GeodataPlacesTable } from 'src/schema/tables/geodata-places.table'; +import { + JobQueueMetaTable, + JobsBackgroundTaskTable, + JobsBackupDatabaseTable, + JobsDuplicateDetectionTable, + JobsEditorTable, + JobsFaceDetectionTable, + JobsFacialRecognitionTable, + JobsLibraryTable, + JobsMetadataExtractionTable, + JobsMigrationTable, + JobsNotificationTable, + JobsOcrTable, + JobsSearchTable, + JobsSidecarTable, + JobsSmartSearchTable, + JobsStorageTemplateMigrationTable, + JobsThumbnailGenerationTable, + JobsVideoConversionTable, + JobsWorkflowTable, +} from 'src/schema/tables/job.table'; import { LibraryTable } from 'src/schema/tables/library.table'; import { MemoryAssetAuditTable } from 'src/schema/tables/memory-asset-audit.table'; import { MemoryAssetTable } from 'src/schema/tables/memory-asset.table'; @@ -135,6 +156,25 @@ export class ImmichDatabase { WorkflowTable, WorkflowFilterTable, WorkflowActionTable, + JobsThumbnailGenerationTable, + JobsMetadataExtractionTable, + JobsVideoConversionTable, + JobsFaceDetectionTable, + JobsFacialRecognitionTable, + JobsSmartSearchTable, + JobsDuplicateDetectionTable, + JobsBackgroundTaskTable, + JobsStorageTemplateMigrationTable, + JobsMigrationTable, + JobsSearchTable, + JobsSidecarTable, + JobsLibraryTable, + JobsNotificationTable, + JobsBackupDatabaseTable, + JobsOcrTable, + JobsWorkflowTable, + JobsEditorTable, + JobQueueMetaTable, ]; functions = [ @@ -252,4 +292,24 @@ export interface DB { workflow: WorkflowTable; workflow_filter: WorkflowFilterTable; workflow_action: WorkflowActionTable; + + jobs_thumbnail_generation: JobsThumbnailGenerationTable; + jobs_metadata_extraction: JobsMetadataExtractionTable; + jobs_video_conversion: JobsVideoConversionTable; + jobs_face_detection: JobsFaceDetectionTable; + jobs_facial_recognition: JobsFacialRecognitionTable; + jobs_smart_search: JobsSmartSearchTable; + jobs_duplicate_detection: JobsDuplicateDetectionTable; + jobs_background_task: JobsBackgroundTaskTable; + jobs_storage_template_migration: JobsStorageTemplateMigrationTable; + jobs_migration: JobsMigrationTable; + jobs_search: JobsSearchTable; + jobs_sidecar: JobsSidecarTable; + jobs_library: JobsLibraryTable; + jobs_notification: JobsNotificationTable; + jobs_backup_database: JobsBackupDatabaseTable; + jobs_ocr: JobsOcrTable; + jobs_workflow: JobsWorkflowTable; + jobs_editor: JobsEditorTable; + job_queue_meta: JobQueueMetaTable; } diff --git a/server/src/schema/tables/job.table.ts b/server/src/schema/tables/job.table.ts new file mode 100644 index 0000000000..43e367dd85 --- /dev/null +++ b/server/src/schema/tables/job.table.ts @@ -0,0 +1,800 @@ +import { Column, ConfigurationParameter, Generated, Index, PrimaryColumn, Table } from 'src/sql-tools'; + +// Job status values stored as "char" (single-byte PostgreSQL type): +// 'p' = pending, 'a' = active, 'c' = completed, 'f' = failed + +@Table('jobs_thumbnail_generation') +@Index({ name: 'IDX_jobs_thumbnail_generation_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_thumbnail_generation_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsThumbnailGenerationTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_metadata_extraction') +@Index({ name: 'IDX_jobs_metadata_extraction_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_metadata_extraction_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsMetadataExtractionTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_video_conversion') +@Index({ name: 'IDX_jobs_video_conversion_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_video_conversion_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsVideoConversionTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_face_detection') +@Index({ name: 'IDX_jobs_face_detection_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_face_detection_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsFaceDetectionTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_facial_recognition') +@Index({ + name: 'IDX_jobs_facial_recognition_pending', + columns: ['priority', 'id'], + where: `"status" = 'p'::"char"`, +}) +@Index({ + name: 'IDX_jobs_facial_recognition_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsFacialRecognitionTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_smart_search') +@Index({ name: 'IDX_jobs_smart_search_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_smart_search_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsSmartSearchTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_duplicate_detection') +@Index({ + name: 'IDX_jobs_duplicate_detection_pending', + columns: ['priority', 'id'], + where: `"status" = 'p'::"char"`, +}) +@Index({ + name: 'IDX_jobs_duplicate_detection_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsDuplicateDetectionTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_background_task') +@Index({ name: 'IDX_jobs_background_task_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_background_task_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsBackgroundTaskTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_storage_template_migration') +@Index({ + name: 'IDX_jobs_storage_template_migration_pending', + columns: ['priority', 'id'], + where: `"status" = 'p'::"char"`, +}) +@Index({ + name: 'IDX_jobs_storage_template_migration_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsStorageTemplateMigrationTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_migration') +@Index({ name: 'IDX_jobs_migration_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_migration_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsMigrationTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_search') +@Index({ name: 'IDX_jobs_search_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_search_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsSearchTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_sidecar') +@Index({ name: 'IDX_jobs_sidecar_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_sidecar_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsSidecarTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_library') +@Index({ name: 'IDX_jobs_library_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_library_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsLibraryTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_notification') +@Index({ name: 'IDX_jobs_notification_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_notification_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsNotificationTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_backup_database') +@Index({ name: 'IDX_jobs_backup_database_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_backup_database_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsBackupDatabaseTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_ocr') +@Index({ name: 'IDX_jobs_ocr_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_ocr_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsOcrTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_workflow') +@Index({ name: 'IDX_jobs_workflow_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_workflow_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsWorkflowTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +@Table('jobs_editor') +@Index({ name: 'IDX_jobs_editor_pending', columns: ['priority', 'id'], where: `"status" = 'p'::"char"` }) +@Index({ + name: 'IDX_jobs_editor_dedup', + columns: ['dedup_key'], + unique: true, + where: `"dedup_key" IS NOT NULL AND "status" = 'p'::"char"`, +}) +@ConfigurationParameter({ name: 'autovacuum_vacuum_threshold', value: 100, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_scale_factor', value: 0.01, scope: 'table' }) +@ConfigurationParameter({ name: 'autovacuum_vacuum_cost_delay', value: 0, scope: 'table' }) +export class JobsEditorTable { + @PrimaryColumn({ type: 'bigint', identity: true }) + id!: Generated; + + @Column({ type: '"char"' }) + name!: string; + + @Column({ type: 'jsonb', nullable: true }) + data!: unknown; + + @Column({ type: 'smallint', default: 0 }) + priority!: Generated; + + @Column({ type: '"char"', default: 'p' }) + status!: Generated; + + @Column({ type: 'text', nullable: true }) + dedup_key!: string | null; + + @Column({ type: 'timestamp with time zone', default: () => 'now()' }) + run_after!: Generated; + + @Column({ type: 'timestamp with time zone', nullable: true }) + started_at!: Date | null; + + @Column({ type: 'timestamp with time zone', nullable: true }) + expires_at!: Date | null; + + @Column({ type: 'text', nullable: true }) + error!: string | null; +} + +// Queue metadata table +@Table('job_queue_meta') +export class JobQueueMetaTable { + @PrimaryColumn({ type: 'text' }) + queue_name!: string; + + @Column({ type: 'boolean', default: false }) + is_paused!: Generated; +} diff --git a/server/test/repositories/config.repository.mock.ts b/server/test/repositories/config.repository.mock.ts index 62e498372e..c1ccefdf10 100644 --- a/server/test/repositories/config.repository.mock.ts +++ b/server/test/repositories/config.repository.mock.ts @@ -9,13 +9,6 @@ const envData: EnvData = { logFormat: LogFormat.Console, buildMetadata: {}, - bull: { - config: { - connection: {}, - prefix: 'immich_bull', - }, - queues: [{ name: 'queue-1' }], - }, cls: { config: {}, diff --git a/server/test/repositories/job.repository.mock.ts b/server/test/repositories/job.repository.mock.ts index 4fc5460c8a..1945d3b495 100644 --- a/server/test/repositories/job.repository.mock.ts +++ b/server/test/repositories/job.repository.mock.ts @@ -20,5 +20,6 @@ export const newJobRepositoryMock = (): Mocked