mirror of
https://github.com/immich-app/immich.git
synced 2026-02-12 20:08:25 +03:00
dynamically batch asset requests
This commit is contained in:
@@ -3,13 +3,45 @@ import Flutter
|
||||
import MobileCoreServices
|
||||
import Photos
|
||||
|
||||
class Request {
|
||||
weak var workItem: DispatchWorkItem?
|
||||
class CancellationToken {
|
||||
var isCancelled = false
|
||||
let callback: (Result<[String: Int64], any Error>) -> Void
|
||||
}
|
||||
|
||||
class Request {
|
||||
let cancellationToken: CancellationToken
|
||||
|
||||
init(cancellationToken: CancellationToken) {
|
||||
self.cancellationToken = cancellationToken
|
||||
}
|
||||
|
||||
init(callback: @escaping (Result<[String: Int64], any Error>) -> Void) {
|
||||
self.callback = callback
|
||||
var isCancelled: Bool {
|
||||
get {
|
||||
return cancellationToken.isCancelled
|
||||
}
|
||||
set(newValue) {
|
||||
cancellationToken.isCancelled = newValue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class AssetRequest: Request {
|
||||
let assetId: String
|
||||
var completion: (PHAsset?) -> Void
|
||||
|
||||
init(cancellationToken: CancellationToken, assetId: String, completion: @escaping (PHAsset?) -> Void) {
|
||||
self.assetId = assetId
|
||||
self.completion = completion
|
||||
super.init(cancellationToken: cancellationToken)
|
||||
}
|
||||
}
|
||||
|
||||
class ThumbnailRequest: Request {
|
||||
weak var workItem: DispatchWorkItem?
|
||||
let completion: (Result<[String: Int64], any Error>) -> Void
|
||||
|
||||
init(cancellationToken: CancellationToken, completion: @escaping (Result<[String: Int64], any Error>) -> Void) {
|
||||
self.completion = completion
|
||||
super.init(cancellationToken: cancellationToken)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +49,6 @@ class ThumbnailApiImpl: ThumbnailApi {
|
||||
private static let imageManager = PHImageManager.default()
|
||||
private static let fetchOptions = {
|
||||
let fetchOptions = PHFetchOptions()
|
||||
fetchOptions.fetchLimit = 1
|
||||
fetchOptions.wantsIncrementalChangeDetails = false
|
||||
return fetchOptions
|
||||
}()
|
||||
@@ -30,17 +61,17 @@ class ThumbnailApiImpl: ThumbnailApi {
|
||||
requestOptions.version = .current
|
||||
return requestOptions
|
||||
}()
|
||||
|
||||
|
||||
private static let assetQueue = DispatchQueue(label: "thumbnail.assets", qos: .userInitiated)
|
||||
private static let requestQueue = DispatchQueue(label: "thumbnail.requests", qos: .userInitiated)
|
||||
private static let cancelQueue = DispatchQueue(label: "thumbnail.cancellation", qos: .default)
|
||||
private static let processingQueue = DispatchQueue(label: "thumbnail.processing", qos: .userInteractive, attributes: .concurrent)
|
||||
|
||||
private static let batchQueue = DispatchQueue(label: "thumbnail.batching", qos: .userInitiated)
|
||||
|
||||
private static let rgbColorSpace = CGColorSpaceCreateDeviceRGB()
|
||||
private static let bitmapInfo = CGBitmapInfo(rawValue: CGImageAlphaInfo.premultipliedLast.rawValue).rawValue
|
||||
private static var requests = [Int64: Request]()
|
||||
private static var requests = [Int64: ThumbnailRequest]()
|
||||
private static let cancelledResult = Result<[String: Int64], any Error>.success([:])
|
||||
private static let assetConcurrencySemaphore = DispatchSemaphore(value: ProcessInfo.processInfo.activeProcessorCount * 2)
|
||||
private static let thumbnailConcurrencySemaphore = DispatchSemaphore(value: ProcessInfo.processInfo.activeProcessorCount / 2 + 1)
|
||||
private static let assetCache = {
|
||||
let assetCache = NSCache<NSString, PHAsset>()
|
||||
@@ -48,6 +79,12 @@ class ThumbnailApiImpl: ThumbnailApi {
|
||||
return assetCache
|
||||
}()
|
||||
private static let activitySemaphore = DispatchSemaphore(value: 1)
|
||||
|
||||
private static var assetRequests = [AssetRequest]()
|
||||
private static var batchTimer: DispatchWorkItem?
|
||||
private static let batchLock = NSLock()
|
||||
private static let batchTimeout: TimeInterval = 0.001 // 1ms
|
||||
|
||||
private static let willResignActiveObserver = NotificationCenter.default.addObserver(
|
||||
forName: UIApplication.willResignActiveNotification,
|
||||
object: nil,
|
||||
@@ -64,7 +101,7 @@ class ThumbnailApiImpl: ThumbnailApi {
|
||||
processingQueue.resume()
|
||||
activitySemaphore.signal()
|
||||
}
|
||||
|
||||
|
||||
func getThumbhash(thumbhash: String, completion: @escaping (Result<[String : Int64], any Error>) -> Void) {
|
||||
Self.processingQueue.async {
|
||||
guard let data = Data(base64Encoded: thumbhash)
|
||||
@@ -75,111 +112,113 @@ class ThumbnailApiImpl: ThumbnailApi {
|
||||
completion(.success(["pointer": Int64(Int(bitPattern: pointer.baseAddress)), "width": Int64(width), "height": Int64(height)]))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func requestImage(assetId: String, requestId: Int64, width: Int64, height: Int64, isVideo: Bool, completion: @escaping (Result<[String: Int64], any Error>) -> Void) {
|
||||
let request = Request(callback: completion)
|
||||
let item = DispatchWorkItem {
|
||||
if request.isCancelled {
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
|
||||
guard let asset = Self.requestAsset(request: request, assetId: assetId)
|
||||
else {
|
||||
if request.isCancelled {
|
||||
let cancellationToken = CancellationToken()
|
||||
let thumbnailRequest = ThumbnailRequest(cancellationToken: cancellationToken, completion: completion)
|
||||
Self.requestAsset(request: AssetRequest(cancellationToken: cancellationToken, assetId: assetId) { asset in
|
||||
let item = DispatchWorkItem {
|
||||
if cancellationToken.isCancelled {
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
Self.removeRequest(requestId: requestId)
|
||||
completion(.failure(PigeonError(code: "", message: "Could not get asset data for \(assetId)", details: nil)))
|
||||
return
|
||||
}
|
||||
|
||||
Self.thumbnailConcurrencySemaphore.wait()
|
||||
defer { Self.thumbnailConcurrencySemaphore.signal() }
|
||||
|
||||
if request.isCancelled {
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
|
||||
var image: UIImage?
|
||||
Self.imageManager.requestImage(
|
||||
for: asset,
|
||||
targetSize: width > 0 && height > 0 ? CGSize(width: Double(width), height: Double(height)) : PHImageManagerMaximumSize,
|
||||
contentMode: .aspectFill,
|
||||
options: Self.requestOptions,
|
||||
resultHandler: { (_image, info) -> Void in
|
||||
image = _image
|
||||
|
||||
guard let asset = asset else {
|
||||
if cancellationToken.isCancelled {
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
Self.removeRequest(requestId: requestId)
|
||||
completion(.failure(PigeonError(code: "", message: "Could not get asset data for \(assetId)", details: nil)))
|
||||
return
|
||||
}
|
||||
)
|
||||
|
||||
if request.isCancelled {
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
|
||||
guard let image = image,
|
||||
let cgImage = image.cgImage else {
|
||||
|
||||
Self.thumbnailConcurrencySemaphore.wait()
|
||||
defer { Self.thumbnailConcurrencySemaphore.signal() }
|
||||
|
||||
if cancellationToken.isCancelled {
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
|
||||
var image: UIImage?
|
||||
Self.imageManager.requestImage(
|
||||
for: asset,
|
||||
targetSize: width > 0 && height > 0 ? CGSize(width: Double(width), height: Double(height)) : PHImageManagerMaximumSize,
|
||||
contentMode: .aspectFill,
|
||||
options: Self.requestOptions,
|
||||
resultHandler: { (_image, info) -> Void in
|
||||
image = _image
|
||||
}
|
||||
)
|
||||
|
||||
if cancellationToken.isCancelled {
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
|
||||
guard let image = image,
|
||||
let cgImage = image.cgImage else {
|
||||
Self.removeRequest(requestId: requestId)
|
||||
return completion(.failure(PigeonError(code: "", message: "Could not get pixel data for \(assetId)", details: nil)))
|
||||
}
|
||||
|
||||
let pointer = UnsafeMutableRawPointer.allocate(
|
||||
byteCount: Int(cgImage.width) * Int(cgImage.height) * 4,
|
||||
alignment: MemoryLayout<UInt8>.alignment
|
||||
)
|
||||
|
||||
if cancellationToken.isCancelled {
|
||||
pointer.deallocate()
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
|
||||
guard let context = CGContext(
|
||||
data: pointer,
|
||||
width: cgImage.width,
|
||||
height: cgImage.height,
|
||||
bitsPerComponent: 8,
|
||||
bytesPerRow: cgImage.width * 4,
|
||||
space: Self.rgbColorSpace,
|
||||
bitmapInfo: Self.bitmapInfo
|
||||
) else {
|
||||
pointer.deallocate()
|
||||
Self.removeRequest(requestId: requestId)
|
||||
return completion(.failure(PigeonError(code: "", message: "Could not create context for \(assetId)", details: nil)))
|
||||
}
|
||||
|
||||
if cancellationToken.isCancelled {
|
||||
pointer.deallocate()
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
|
||||
context.interpolationQuality = .none
|
||||
context.draw(cgImage, in: CGRect(x: 0, y: 0, width: cgImage.width, height: cgImage.height))
|
||||
|
||||
if cancellationToken.isCancelled {
|
||||
pointer.deallocate()
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
|
||||
self.waitForActiveState()
|
||||
completion(.success(["pointer": Int64(Int(bitPattern: pointer)), "width": Int64(cgImage.width), "height": Int64(cgImage.height)]))
|
||||
Self.removeRequest(requestId: requestId)
|
||||
return completion(.failure(PigeonError(code: "", message: "Could not get pixel data for \(assetId)", details: nil)))
|
||||
}
|
||||
|
||||
let pointer = UnsafeMutableRawPointer.allocate(
|
||||
byteCount: Int(cgImage.width) * Int(cgImage.height) * 4,
|
||||
alignment: MemoryLayout<UInt8>.alignment
|
||||
)
|
||||
|
||||
if request.isCancelled {
|
||||
pointer.deallocate()
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
|
||||
guard let context = CGContext(
|
||||
data: pointer,
|
||||
width: cgImage.width,
|
||||
height: cgImage.height,
|
||||
bitsPerComponent: 8,
|
||||
bytesPerRow: cgImage.width * 4,
|
||||
space: Self.rgbColorSpace,
|
||||
bitmapInfo: Self.bitmapInfo
|
||||
) else {
|
||||
pointer.deallocate()
|
||||
Self.removeRequest(requestId: requestId)
|
||||
return completion(.failure(PigeonError(code: "", message: "Could not create context for \(assetId)", details: nil)))
|
||||
}
|
||||
|
||||
if request.isCancelled {
|
||||
pointer.deallocate()
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
|
||||
context.interpolationQuality = .none
|
||||
context.draw(cgImage, in: CGRect(x: 0, y: 0, width: cgImage.width, height: cgImage.height))
|
||||
|
||||
if request.isCancelled {
|
||||
pointer.deallocate()
|
||||
return completion(Self.cancelledResult)
|
||||
}
|
||||
|
||||
self.waitForActiveState()
|
||||
completion(.success(["pointer": Int64(Int(bitPattern: pointer)), "width": Int64(cgImage.width), "height": Int64(cgImage.height)]))
|
||||
Self.removeRequest(requestId: requestId)
|
||||
}
|
||||
|
||||
request.workItem = item
|
||||
Self.addRequest(requestId: requestId, request: request)
|
||||
Self.processingQueue.async(execute: item)
|
||||
thumbnailRequest.workItem = item
|
||||
Self.processingQueue.async(execute: item)
|
||||
})
|
||||
|
||||
Self.addRequest(requestId: requestId, request: thumbnailRequest)
|
||||
}
|
||||
|
||||
|
||||
func cancelImageRequest(requestId: Int64) {
|
||||
Self.cancelRequest(requestId: requestId)
|
||||
}
|
||||
|
||||
private static func addRequest(requestId: Int64, request: Request) -> Void {
|
||||
|
||||
private static func addRequest(requestId: Int64, request: ThumbnailRequest) -> Void {
|
||||
requestQueue.sync { requests[requestId] = request }
|
||||
}
|
||||
|
||||
|
||||
private static func removeRequest(requestId: Int64) -> Void {
|
||||
requestQueue.sync { requests[requestId] = nil }
|
||||
}
|
||||
|
||||
|
||||
private static func cancelRequest(requestId: Int64) -> Void {
|
||||
requestQueue.async {
|
||||
guard let request = requests.removeValue(forKey: requestId) else { return }
|
||||
@@ -187,29 +226,75 @@ class ThumbnailApiImpl: ThumbnailApi {
|
||||
guard let item = request.workItem else { return }
|
||||
item.cancel()
|
||||
if item.isCancelled {
|
||||
cancelQueue.async { request.callback(Self.cancelledResult) }
|
||||
cancelQueue.async { request.completion(Self.cancelledResult) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static func requestAsset(request: Request, assetId: String) -> PHAsset? {
|
||||
var asset: PHAsset?
|
||||
assetQueue.sync { asset = assetCache.object(forKey: assetId as NSString) }
|
||||
if asset != nil { return asset }
|
||||
|
||||
Self.assetConcurrencySemaphore.wait()
|
||||
defer { Self.assetConcurrencySemaphore.signal() }
|
||||
|
||||
if request.isCancelled {
|
||||
return nil
|
||||
|
||||
private static func requestAsset(request: AssetRequest) {
|
||||
assetQueue.async {
|
||||
if (request.isCancelled) {
|
||||
request.completion(nil)
|
||||
}
|
||||
|
||||
if let cachedAsset = assetCache.object(forKey: request.assetId as NSString) {
|
||||
request.completion(cachedAsset)
|
||||
return
|
||||
}
|
||||
|
||||
batchLock.lock()
|
||||
if (request.isCancelled) {
|
||||
batchLock.unlock()
|
||||
request.completion(nil)
|
||||
return
|
||||
}
|
||||
|
||||
assetRequests.append(request)
|
||||
|
||||
batchTimer?.cancel()
|
||||
let timer = DispatchWorkItem(block: processBatch)
|
||||
batchTimer = timer
|
||||
batchLock.unlock()
|
||||
|
||||
batchQueue.asyncAfter(deadline: .now() + batchTimeout, execute: timer)
|
||||
}
|
||||
|
||||
guard let asset = PHAsset.fetchAssets(withLocalIdentifiers: [assetId], options: Self.fetchOptions).firstObject
|
||||
else { return nil }
|
||||
assetQueue.async { assetCache.setObject(asset, forKey: assetId as NSString) }
|
||||
return asset
|
||||
}
|
||||
|
||||
|
||||
private static func processBatch() {
|
||||
batchLock.lock()
|
||||
var completionMap = [String: [(PHAsset?) -> Void]]()
|
||||
var activeAssetIds = [String]()
|
||||
completionMap.reserveCapacity(assetRequests.count)
|
||||
activeAssetIds.reserveCapacity(assetRequests.count)
|
||||
for request in assetRequests {
|
||||
if (request.isCancelled) {
|
||||
request.completion(nil)
|
||||
continue
|
||||
}
|
||||
|
||||
if var completions = completionMap[request.assetId] {
|
||||
completions.append(request.completion)
|
||||
} else {
|
||||
activeAssetIds.append(request.assetId)
|
||||
completionMap[request.assetId] = [request.completion]
|
||||
}
|
||||
}
|
||||
assetRequests.removeAll(keepingCapacity: true)
|
||||
batchTimer = nil
|
||||
batchLock.unlock()
|
||||
|
||||
guard !requests.isEmpty else { return }
|
||||
|
||||
let assets = PHAsset.fetchAssets(withLocalIdentifiers: activeAssetIds, options: Self.fetchOptions)
|
||||
assets.enumerateObjects { asset, _, _ in
|
||||
let assetId = asset.localIdentifier
|
||||
for completion in completionMap[assetId]! {
|
||||
completion(asset)
|
||||
}
|
||||
assetQueue.async { assetCache.setObject(asset, forKey: assetId as NSString) }
|
||||
}
|
||||
}
|
||||
|
||||
func waitForActiveState() {
|
||||
Self.activitySemaphore.wait()
|
||||
Self.activitySemaphore.signal()
|
||||
|
||||
Reference in New Issue
Block a user