mirror of
https://github.com/immich-app/immich.git
synced 2026-02-09 03:08:53 +03:00
fixed locking
This commit is contained in:
@@ -24,10 +24,8 @@ import java.nio.ByteBuffer
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import javax.net.ssl.SSLSocketFactory
|
||||
import javax.net.ssl.X509TrustManager
|
||||
import kotlin.concurrent.Volatile
|
||||
|
||||
|
||||
private const val USER_AGENT = "Immich_Android_${BuildConfig.VERSION_NAME}"
|
||||
@@ -172,8 +170,8 @@ private class CronetImageFetcher(
|
||||
) : ImageFetcher {
|
||||
private val engine: CronetEngine
|
||||
private val executor = Executors.newSingleThreadExecutor()
|
||||
val active = AtomicInteger(0)
|
||||
@Volatile
|
||||
private val stateLock = Any()
|
||||
private var activeCount = 0
|
||||
private var draining = false
|
||||
|
||||
init {
|
||||
@@ -195,10 +193,12 @@ private class CronetImageFetcher(
|
||||
onSuccess: (NativeByteBuffer) -> Unit,
|
||||
onFailure: (Exception) -> Unit,
|
||||
) {
|
||||
active.incrementAndGet()
|
||||
if (draining) {
|
||||
active.decrementAndGet()
|
||||
throw IllegalStateException("Engine is draining")
|
||||
synchronized(stateLock) {
|
||||
if (draining) {
|
||||
onFailure(IllegalStateException("Engine is draining"))
|
||||
return
|
||||
}
|
||||
activeCount++
|
||||
}
|
||||
|
||||
val callback = FetchCallback(onSuccess, onFailure, ::onComplete)
|
||||
@@ -210,23 +210,25 @@ private class CronetImageFetcher(
|
||||
}
|
||||
|
||||
private fun onComplete() {
|
||||
// Runs on executor thread
|
||||
active.decrementAndGet()
|
||||
tryShutdown()
|
||||
}
|
||||
|
||||
private fun tryShutdown() {
|
||||
// Also on executor thread—no race with onRequestDone
|
||||
if (draining && active.get() == 0) {
|
||||
val shouldShutdown = synchronized(stateLock) {
|
||||
activeCount--
|
||||
draining && activeCount == 0
|
||||
}
|
||||
if (shouldShutdown) {
|
||||
engine.shutdown()
|
||||
executor.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
override fun drain() {
|
||||
draining = true
|
||||
// Post check to executor—runs after any already-queued callbacks
|
||||
executor.execute(this::tryShutdown)
|
||||
val shouldShutdown = synchronized(stateLock) {
|
||||
draining = true
|
||||
activeCount == 0
|
||||
}
|
||||
if (shouldShutdown) {
|
||||
engine.shutdown()
|
||||
executor.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
private class FetchCallback(
|
||||
@@ -284,8 +286,9 @@ private class CronetImageFetcher(
|
||||
private class OkHttpImageFetcher private constructor(
|
||||
private val client: OkHttpClient,
|
||||
) : ImageFetcher {
|
||||
private val active = AtomicInteger(0)
|
||||
@Volatile private var draining = false
|
||||
private val stateLock = Any()
|
||||
private var activeCount = 0
|
||||
private var draining = false
|
||||
|
||||
companion object {
|
||||
fun create(
|
||||
@@ -334,7 +337,11 @@ private class OkHttpImageFetcher private constructor(
|
||||
}
|
||||
|
||||
private fun onComplete() {
|
||||
if (active.decrementAndGet() == 0 && draining) {
|
||||
val shouldClose = synchronized(stateLock) {
|
||||
activeCount--
|
||||
draining && activeCount == 0
|
||||
}
|
||||
if (shouldClose) {
|
||||
client.cache?.close()
|
||||
}
|
||||
}
|
||||
@@ -346,10 +353,12 @@ private class OkHttpImageFetcher private constructor(
|
||||
onSuccess: (NativeByteBuffer) -> Unit,
|
||||
onFailure: (Exception) -> Unit,
|
||||
) {
|
||||
active.incrementAndGet()
|
||||
if (draining) {
|
||||
active.decrementAndGet()
|
||||
throw IllegalStateException("Client is draining")
|
||||
synchronized(stateLock) {
|
||||
if (draining) {
|
||||
onFailure(IllegalStateException("Client is draining"))
|
||||
return
|
||||
}
|
||||
activeCount++
|
||||
}
|
||||
|
||||
val requestBuilder = Request.Builder().url(url)
|
||||
@@ -409,9 +418,12 @@ private class OkHttpImageFetcher private constructor(
|
||||
}
|
||||
|
||||
override fun drain() {
|
||||
draining = true
|
||||
val shouldClose = synchronized(stateLock) {
|
||||
draining = true
|
||||
activeCount == 0
|
||||
}
|
||||
client.connectionPool.evictAll()
|
||||
if (active.get() == 0) {
|
||||
if (shouldClose) {
|
||||
client.cache?.close()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user