From 1ec3f16fb164c53c7400d99782c37a023aa20c9e Mon Sep 17 00:00:00 2001 From: hyochan Date: Tue, 19 May 2026 22:04:31 +0900 Subject: [PATCH] fix(google): guard billing callbacks against double resume Add a shared continuation resume guard for BillingClient callback bridges and apply it across Play and Horizon purchase/query flows. Guard duplicate product-detail callbacks that can trigger repeated launch side effects, and add a race regression test for duplicate callbacks. --- .../java/dev/hyo/openiap/OpenIapModule.kt | 77 ++++-- .../java/dev/hyo/openiap/helpers/Helpers.kt | 10 +- .../dev/hyo/openiap/helpers/ProductManager.kt | 17 +- .../dev/hyo/openiap/helpers/CommonHelpers.kt | 19 +- .../helpers/ContinuationResumeGuard.kt | 44 ++++ .../java/dev/hyo/openiap/OpenIapModule.kt | 113 ++++---- .../java/dev/hyo/openiap/helpers/Helpers.kt | 11 +- .../dev/hyo/openiap/helpers/ProductManager.kt | 21 +- .../dev/hyo/openiap/QueryPurchasesRaceTest.kt | 244 ++++++++++++++++++ 9 files changed, 446 insertions(+), 110 deletions(-) create mode 100644 packages/google/openiap/src/main/java/dev/hyo/openiap/helpers/ContinuationResumeGuard.kt create mode 100644 packages/google/openiap/src/testPlay/java/dev/hyo/openiap/QueryPurchasesRaceTest.kt diff --git a/packages/google/openiap/src/horizon/java/dev/hyo/openiap/OpenIapModule.kt b/packages/google/openiap/src/horizon/java/dev/hyo/openiap/OpenIapModule.kt index 8cea7ea4..8b67acea 100644 --- a/packages/google/openiap/src/horizon/java/dev/hyo/openiap/OpenIapModule.kt +++ b/packages/google/openiap/src/horizon/java/dev/hyo/openiap/OpenIapModule.kt @@ -28,6 +28,7 @@ import dev.hyo.openiap.helpers.restorePurchasesHorizon import dev.hyo.openiap.helpers.queryPurchasesHorizon import dev.hyo.openiap.helpers.ProductManager import dev.hyo.openiap.helpers.queryProductDetailsHorizon +import dev.hyo.openiap.helpers.resumeGuard import dev.hyo.openiap.utils.HorizonBillingConverters.toActiveSubscription import dev.hyo.openiap.utils.HorizonBillingConverters.toInAppProduct import dev.hyo.openiap.utils.HorizonBillingConverters.toPurchase @@ -48,9 +49,8 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext import java.lang.ref.WeakReference +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException private const val TAG = "OpenIapModule" @@ -129,6 +129,7 @@ class OpenIapModule( override val initConnection: MutationInitConnectionHandler = { withContext(Dispatchers.IO) { suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() OpenIapLog.i("=== INIT CONNECTION ===", TAG) // CRITICAL FIX: Rebuild BillingClient if it was destroyed by endConnection @@ -141,7 +142,7 @@ class OpenIapModule( val client = billingClient ?: run { OpenIapLog.w("Failed to build BillingClient", TAG) - if (continuation.isActive) continuation.resume(false) + resumer.resume(false) return@suspendCancellableCoroutine } @@ -153,7 +154,7 @@ class OpenIapModule( } else { OpenIapLog.i("Horizon billing connected successfully", TAG) } - if (continuation.isActive) continuation.resume(ok) + resumer.resume(ok) } override fun onBillingServiceDisconnected() { @@ -379,15 +380,23 @@ class OpenIapModule( } suspendCancellableCoroutine> { continuation -> + var callbackRef: ((Result>) -> Unit)? = null + val resumer = continuation.resumeGuard { + callbackRef?.let { currentPurchaseCallback.compareAndSet(it, null) } + } val callback: (Result>) -> Unit = { result -> - if (continuation.isActive) continuation.resume(result.getOrDefault(emptyList())) + resumer.resume(result.getOrDefault(emptyList())) } + callbackRef = callback if (!currentPurchaseCallback.compareAndSet(null, callback)) { OpenIapLog.w("requestPurchase rejected: another purchase is already in progress", TAG) - if (continuation.isActive) continuation.resumeWithException(OpenIapError.DeveloperError()) + resumer.resumeWithException(OpenIapError.DeveloperError()) + return@suspendCancellableCoroutine + } + if (!continuation.isActive) { + currentPurchaseCallback.compareAndSet(callback, null) return@suspendCancellableCoroutine } - continuation.invokeOnCancellation { currentPurchaseCallback.compareAndSet(callback, null) } val desiredType = if (androidArgs.type == ProductQueryType.Subs) BillingClient.ProductType.SUBS else BillingClient.ProductType.INAPP @@ -570,7 +579,10 @@ class OpenIapModule( .setProductList(productList) .build() + val didHandleProductDetails = AtomicBoolean(false) client.queryProductDetailsAsync(params) { billingResult, productDetailsList -> + if (!didHandleProductDetails.compareAndSet(false, true)) return@queryProductDetailsAsync + if (billingResult.responseCode != BillingClient.BillingResponseCode.OK) { val err = OpenIapError.QueryProduct.withDiagnostics( responseCode = billingResult.responseCode, @@ -618,21 +630,23 @@ class OpenIapModule( if (isConsumable == true) { val params = ConsumeParams.newBuilder().setPurchaseToken(token).build() suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() client.consumeAsync(params) { result, _ -> if (result.responseCode != BillingClient.BillingResponseCode.OK) { OpenIapLog.w("Failed to consume Horizon purchase: ${result.debugMessage}", TAG) } - if (continuation.isActive) continuation.resume(Unit) + resumer.resume(Unit) } } } else { val params = AcknowledgePurchaseParams.newBuilder().setPurchaseToken(token).build() suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() client.acknowledgePurchase(params) { result -> if (result.responseCode != BillingClient.BillingResponseCode.OK) { OpenIapLog.w("Failed to acknowledge Horizon purchase: ${result.debugMessage}", TAG) } - if (continuation.isActive) continuation.resume(Unit) + resumer.resume(Unit) } } } @@ -644,12 +658,13 @@ class OpenIapModule( val client = billingClient ?: throw OpenIapError.NotPrepared val params = AcknowledgePurchaseParams.newBuilder().setPurchaseToken(purchaseToken).build() suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() client.acknowledgePurchase(params) { result -> val success = result.responseCode == BillingClient.BillingResponseCode.OK if (!success) { OpenIapLog.w("Horizon acknowledge failed: ${result.debugMessage}", TAG) } - if (continuation.isActive) continuation.resume(success) + resumer.resume(success) } } } @@ -660,12 +675,13 @@ class OpenIapModule( val client = billingClient ?: throw OpenIapError.NotPrepared val params = ConsumeParams.newBuilder().setPurchaseToken(purchaseToken).build() suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() client.consumeAsync(params) { result, _ -> val success = result.responseCode == BillingClient.BillingResponseCode.OK if (!success) { OpenIapLog.w("Horizon consume failed: ${result.debugMessage}", TAG) } - if (continuation.isActive) continuation.resume(success) + resumer.resume(success) } } } @@ -800,20 +816,21 @@ class OpenIapModule( suspend fun getStorefront(): String = withContext(Dispatchers.IO) { val client = billingClient ?: return@withContext "" suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() runCatching { client.getBillingConfigAsync( GetBillingConfigParams.newBuilder().build() ) { result, config -> - if (continuation.isActive) { - val code = if (result.responseCode == BillingClient.BillingResponseCode.OK) { - config?.countryCode.orEmpty() - } else "" - continuation.resume(code) + val code = if (result.responseCode == BillingClient.BillingResponseCode.OK) { + config?.countryCode.orEmpty() + } else { + "" } + resumer.resume(code) } }.onFailure { error -> OpenIapLog.w("Horizon getStorefront failed: ${error.message}", TAG) - if (continuation.isActive) continuation.resume("") + resumer.resume("") } } } @@ -931,6 +948,7 @@ class OpenIapModule( skus: List, productType: String ): List = suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() val products = skus.map { sku -> QueryProductDetailsParams.Product.newBuilder() .setProductId(sku) @@ -940,10 +958,10 @@ class OpenIapModule( val params = QueryProductDetailsParams.newBuilder().setProductList(products).build() client.queryProductDetailsAsync(params) { result, details -> if (result.responseCode == BillingClient.BillingResponseCode.OK) { - if (continuation.isActive) continuation.resume(details ?: emptyList()) + resumer.resume(details ?: emptyList()) } else { OpenIapLog.w("Horizon queryProductDetails failed: ${result.debugMessage}", TAG) - if (continuation.isActive) continuation.resume(emptyList()) + resumer.resume(emptyList()) } } } @@ -992,17 +1010,18 @@ class OpenIapModule( // Try to call the alternative billing method val result = suspendCancellableCoroutine { cont -> + val resumer = cont.resumeGuard() try { client.isAlternativeBillingOnlyAvailableAsync { billingResult -> - cont.resume(billingResult) + resumer.resume(billingResult) } } catch (e: NoSuchMethodError) { // Method doesn't exist in Horizon library OpenIapLog.w("Alternative Billing not supported by Horizon library", TAG) - cont.resumeWithException(Exception("Feature not supported")) + resumer.resumeWithException(Exception("Feature not supported")) } catch (e: Exception) { OpenIapLog.e("Error checking alternative billing: ${e.message}", e, TAG) - cont.resumeWithException(e) + resumer.resumeWithException(e) } } @@ -1025,9 +1044,10 @@ class OpenIapModule( val currentActivity = activityRef.get() ?: throw Exception("Activity not available") val result = suspendCancellableCoroutine { cont -> + val resumer = cont.resumeGuard() try { val listener = AlternativeBillingOnlyInformationDialogListener { billingResult -> - cont.resume(billingResult) + resumer.resume(billingResult) } currentActivity.runOnUiThread { client.showAlternativeBillingOnlyInformationDialog( @@ -1037,10 +1057,10 @@ class OpenIapModule( } } catch (e: NoSuchMethodError) { OpenIapLog.w("showAlternativeBillingOnlyInformationDialog not supported", TAG) - cont.resumeWithException(Exception("Feature not supported")) + resumer.resumeWithException(Exception("Feature not supported")) } catch (e: Exception) { OpenIapLog.e("Error showing alternative billing dialog: ${e.message}", e, TAG) - cont.resumeWithException(e) + resumer.resumeWithException(e) } } @@ -1060,16 +1080,17 @@ class OpenIapModule( val client = billingClient ?: throw Exception("Not connected") val result = suspendCancellableCoroutine> { cont -> + val resumer = cont.resumeGuard() try { client.createAlternativeBillingOnlyReportingDetailsAsync { billingResult, details -> - cont.resume(Pair(billingResult, details)) + resumer.resume(Pair(billingResult, details)) } } catch (e: NoSuchMethodError) { OpenIapLog.w("createAlternativeBillingOnlyReportingDetails not supported", TAG) - cont.resumeWithException(Exception("Feature not supported")) + resumer.resumeWithException(Exception("Feature not supported")) } catch (e: Exception) { OpenIapLog.e("Error creating alternative billing token: ${e.message}", e, TAG) - cont.resumeWithException(e) + resumer.resumeWithException(e) } } diff --git a/packages/google/openiap/src/horizon/java/dev/hyo/openiap/helpers/Helpers.kt b/packages/google/openiap/src/horizon/java/dev/hyo/openiap/helpers/Helpers.kt index dd09dfeb..6d4a1b53 100644 --- a/packages/google/openiap/src/horizon/java/dev/hyo/openiap/helpers/Helpers.kt +++ b/packages/google/openiap/src/horizon/java/dev/hyo/openiap/helpers/Helpers.kt @@ -9,7 +9,6 @@ import dev.hyo.openiap.OpenIapLog import dev.hyo.openiap.Purchase import dev.hyo.openiap.utils.HorizonBillingConverters.toPurchase import kotlinx.coroutines.suspendCancellableCoroutine -import kotlin.coroutines.resume private const val TAG = "Helpers" @@ -43,18 +42,19 @@ internal suspend fun queryPurchasesHorizon( client: BillingClient?, productType: String ): List = suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() OpenIapLog.d("queryPurchasesHorizon: type=$productType", TAG) val billingClient = client ?: run { OpenIapLog.w("queryPurchasesHorizon: BillingClient is null", TAG) - continuation.resume(emptyList()) + resumer.resume(emptyList()) return@suspendCancellableCoroutine } // CRITICAL FIX: Check if BillingClient is ready before querying if (!billingClient.isReady()) { OpenIapLog.w("queryPurchasesHorizon: BillingClient is not ready", TAG) - continuation.resume(emptyList()) + resumer.resume(emptyList()) return@suspendCancellableCoroutine } @@ -73,10 +73,10 @@ internal suspend fun queryPurchasesHorizon( it.toPurchase() } ?: emptyList() OpenIapLog.d("queryPurchasesHorizon: Returning ${mapped.size} mapped purchases", TAG) - continuation.resume(mapped) + resumer.resume(mapped) } else { OpenIapLog.w("queryPurchasesHorizon: Failed with code=${result.responseCode}", TAG) - continuation.resume(emptyList()) + resumer.resume(emptyList()) } } } diff --git a/packages/google/openiap/src/horizon/java/dev/hyo/openiap/helpers/ProductManager.kt b/packages/google/openiap/src/horizon/java/dev/hyo/openiap/helpers/ProductManager.kt index 2b868328..74ca7efe 100644 --- a/packages/google/openiap/src/horizon/java/dev/hyo/openiap/helpers/ProductManager.kt +++ b/packages/google/openiap/src/horizon/java/dev/hyo/openiap/helpers/ProductManager.kt @@ -6,7 +6,7 @@ import com.meta.horizon.billingclient.api.ProductDetails as HorizonProductDetail import dev.hyo.openiap.OpenIapLog import kotlinx.coroutines.suspendCancellableCoroutine import java.util.concurrent.ConcurrentHashMap -import kotlin.coroutines.resume +import java.util.concurrent.atomic.AtomicBoolean private const val TAG = "ProductManager" @@ -50,7 +50,7 @@ internal class ProductManager { // Check which products are missing or have incomplete data val needsQuery = mutableListOf() - val validCached = mutableListOf() + var validCachedCount = 0 productIds.distinct().forEach { productId -> val cached = cache[CacheKey(productId, productType)] @@ -69,7 +69,7 @@ internal class ProductManager { } if (isComplete) { - validCached.add(cached) + validCachedCount += 1 } else { OpenIapLog.w("Cached ProductDetails for '$productId' has incomplete data, will re-query", TAG) needsQuery.add(productId) @@ -78,7 +78,7 @@ internal class ProductManager { } } - OpenIapLog.d("getOrQuery: needsQuery=$needsQuery, validCached=${validCached.size}", TAG) + OpenIapLog.d("getOrQuery: needsQuery=$needsQuery, validCached=$validCachedCount", TAG) if (needsQuery.isEmpty()) { val cached = productIds.mapNotNull { cache[CacheKey(it, productType)] } @@ -99,8 +99,11 @@ internal class ProductManager { OpenIapLog.d("getOrQuery: Querying ${needsQuery.size} products from Horizon API", TAG) return suspendCancellableCoroutine { cont -> - cont.invokeOnCancellation { OpenIapLog.d("getOrQuery: cancelled", TAG) } + val resumer = cont.resumeGuard { OpenIapLog.d("getOrQuery: cancelled", TAG) } + val didHandleProductDetails = AtomicBoolean(false) client.queryProductDetailsAsync(params) { billingResult, result -> + if (!didHandleProductDetails.compareAndSet(false, true)) return@queryProductDetailsAsync + OpenIapLog.d( "getOrQuery: Response code=${billingResult.responseCode}, " + "message=${billingResult.debugMessage}, " + @@ -116,7 +119,7 @@ internal class ProductManager { ) // Return whatever we have in cache instead of crashing val cached = productIds.mapNotNull { cache[CacheKey(it, productType)] } - if (cont.isActive) cont.resume(cached) + resumer.resume(cached) return@queryProductDetailsAsync } @@ -150,7 +153,7 @@ internal class ProductManager { // Preserve requested order and include cached + newly-fetched val finalList = productIds.mapNotNull { cache[CacheKey(it, productType)] } OpenIapLog.d("getOrQuery: Returning ${finalList.size} total products", TAG) - if (cont.isActive) cont.resume(finalList) + resumer.resume(finalList) } } } diff --git a/packages/google/openiap/src/main/java/dev/hyo/openiap/helpers/CommonHelpers.kt b/packages/google/openiap/src/main/java/dev/hyo/openiap/helpers/CommonHelpers.kt index 34661892..56c83e45 100644 --- a/packages/google/openiap/src/main/java/dev/hyo/openiap/helpers/CommonHelpers.kt +++ b/packages/google/openiap/src/main/java/dev/hyo/openiap/helpers/CommonHelpers.kt @@ -13,7 +13,6 @@ import dev.hyo.openiap.listener.OpenIapPurchaseErrorListener import dev.hyo.openiap.listener.OpenIapPurchaseUpdateListener import dev.hyo.openiap.listener.OpenIapSubscriptionBillingIssueListener import kotlinx.coroutines.suspendCancellableCoroutine -import kotlin.coroutines.resume /** * Suspend function to wait for a purchase update via listener. @@ -23,14 +22,16 @@ internal suspend fun onPurchaseUpdated( addListener: (OpenIapPurchaseUpdateListener) -> Unit, removeListener: (OpenIapPurchaseUpdateListener) -> Unit ): Purchase = suspendCancellableCoroutine { continuation -> + lateinit var resumer: ContinuationResumeGuard val listener = object : OpenIapPurchaseUpdateListener { override fun onPurchaseUpdated(purchase: Purchase) { removeListener(this) - if (continuation.isActive) continuation.resume(purchase) + resumer.resume(purchase) } } + resumer = continuation.resumeGuard { removeListener(listener) } addListener(listener) - continuation.invokeOnCancellation { removeListener(listener) } + if (!continuation.isActive) removeListener(listener) } /** @@ -41,14 +42,16 @@ internal suspend fun onPurchaseError( addListener: (OpenIapPurchaseErrorListener) -> Unit, removeListener: (OpenIapPurchaseErrorListener) -> Unit ): PurchaseError = suspendCancellableCoroutine { continuation -> + lateinit var resumer: ContinuationResumeGuard val listener = object : OpenIapPurchaseErrorListener { override fun onPurchaseError(error: OpenIapError) { removeListener(this) - if (continuation.isActive) continuation.resume(error.toPurchaseError()) + resumer.resume(error.toPurchaseError()) } } + resumer = continuation.resumeGuard { removeListener(listener) } addListener(listener) - continuation.invokeOnCancellation { removeListener(listener) } + if (!continuation.isActive) removeListener(listener) } /** @@ -59,14 +62,16 @@ internal suspend fun onSubscriptionBillingIssue( addListener: (OpenIapSubscriptionBillingIssueListener) -> Unit, removeListener: (OpenIapSubscriptionBillingIssueListener) -> Unit ): Purchase = suspendCancellableCoroutine { continuation -> + lateinit var resumer: ContinuationResumeGuard val listener = object : OpenIapSubscriptionBillingIssueListener { override fun onSubscriptionBillingIssue(purchase: Purchase) { removeListener(this) - if (continuation.isActive) continuation.resume(purchase) + resumer.resume(purchase) } } + resumer = continuation.resumeGuard { removeListener(listener) } addListener(listener) - continuation.invokeOnCancellation { removeListener(listener) } + if (!continuation.isActive) removeListener(listener) } /** diff --git a/packages/google/openiap/src/main/java/dev/hyo/openiap/helpers/ContinuationResumeGuard.kt b/packages/google/openiap/src/main/java/dev/hyo/openiap/helpers/ContinuationResumeGuard.kt new file mode 100644 index 00000000..498ddac9 --- /dev/null +++ b/packages/google/openiap/src/main/java/dev/hyo/openiap/helpers/ContinuationResumeGuard.kt @@ -0,0 +1,44 @@ +package dev.hyo.openiap.helpers + +import kotlinx.coroutines.CancellableContinuation +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException + +/** + * Atomically completes a cancellable continuation at most once. + * + * BillingClient and listener callbacks can legally arrive late, more than once, + * or from different threads. `isActive` followed by `resume` is not atomic, so + * callback bridges should use this guard instead. + */ +internal class ContinuationResumeGuard( + private val continuation: CancellableContinuation, + onCancellation: (() -> Unit)? = null +) { + private val didComplete = AtomicBoolean(false) + + init { + continuation.invokeOnCancellation { + if (didComplete.compareAndSet(false, true)) { + onCancellation?.invoke() + } + } + } + + fun resume(value: T) { + if (didComplete.compareAndSet(false, true)) { + continuation.resume(value) + } + } + + fun resumeWithException(error: Throwable) { + if (didComplete.compareAndSet(false, true)) { + continuation.resumeWithException(error) + } + } +} + +internal fun CancellableContinuation.resumeGuard( + onCancellation: (() -> Unit)? = null +): ContinuationResumeGuard = ContinuationResumeGuard(this, onCancellation) diff --git a/packages/google/openiap/src/play/java/dev/hyo/openiap/OpenIapModule.kt b/packages/google/openiap/src/play/java/dev/hyo/openiap/OpenIapModule.kt index cd059dbb..316a4339 100644 --- a/packages/google/openiap/src/play/java/dev/hyo/openiap/OpenIapModule.kt +++ b/packages/google/openiap/src/play/java/dev/hyo/openiap/OpenIapModule.kt @@ -53,6 +53,7 @@ import dev.hyo.openiap.helpers.onPurchaseUpdated import dev.hyo.openiap.helpers.onSubscriptionBillingIssue import dev.hyo.openiap.helpers.queryProductDetails import dev.hyo.openiap.helpers.queryPurchases +import dev.hyo.openiap.helpers.resumeGuard import dev.hyo.openiap.helpers.restorePurchases as restorePurchasesHelper import dev.hyo.openiap.helpers.toAndroidPurchaseArgs import dev.hyo.openiap.listener.OpenIapDeveloperProvidedBillingListener @@ -69,9 +70,8 @@ import dev.hyo.openiap.utils.verifyPurchaseWithIapkit import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException import java.lang.ref.WeakReference +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference // AlternativeBillingMode moved to main source set (shared between Play and Horizon) @@ -130,6 +130,12 @@ class OpenIapModule( currentPurchaseCallback.getAndSet(null)?.invoke(result) } + private fun billingResultError(message: String): BillingResult = + BillingResult.newBuilder() + .setResponseCode(BillingClient.BillingResponseCode.ERROR) + .setDebugMessage(message) + .build() + // Billing programs enabled via enableBillingProgram (8.2.0+, EXTERNAL_PAYMENTS in 8.3.0+) private val enabledBillingPrograms = mutableSetOf() @@ -163,11 +169,12 @@ class OpenIapModule( withContext(Dispatchers.IO) { suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() initBillingClient( - onSuccess = { continuation.resume(true) }, + onSuccess = { resumer.resume(true) }, onFailure = { err -> OpenIapLog.w("Billing set up failed: ${err?.message}", TAG) - continuation.resume(false) + resumer.resume(false) } ) } @@ -352,6 +359,7 @@ class OpenIapModule( ) suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() val listenerClass = Class.forName("com.android.billingclient.api.AlternativeBillingOnlyAvailabilityListener") val availabilityListener = java.lang.reflect.Proxy.newProxyInstance( listenerClass.classLoader, @@ -363,10 +371,10 @@ class OpenIapModule( if (result?.responseCode == BillingClient.BillingResponseCode.OK) { OpenIapLog.d("✓ Alternative billing is available", TAG) - if (continuation.isActive) continuation.resume(true) + resumer.resume(true) } else { OpenIapLog.e("✗ Alternative billing not available: ${result?.debugMessage}", tag = TAG) - if (continuation.isActive) continuation.resume(false) + resumer.resume(false) } } null @@ -394,6 +402,7 @@ class OpenIapModule( ) val dialogResult = suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() val listenerClass = Class.forName("com.android.billingclient.api.AlternativeBillingOnlyInformationDialogListener") val dialogListener = java.lang.reflect.Proxy.newProxyInstance( listenerClass.classLoader, @@ -402,9 +411,7 @@ class OpenIapModule( if (method.name == "onAlternativeBillingOnlyInformationDialogResponse") { val result = args?.get(0) as? BillingResult OpenIapLog.d("Dialog result: ${result?.responseCode} - ${result?.debugMessage}", TAG) - if (continuation.isActive && result != null) { - continuation.resume(result) - } + resumer.resume(result ?: billingResultError("Missing alternative billing dialog result")) } null } @@ -443,6 +450,7 @@ class OpenIapModule( ) suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() val listenerClass = Class.forName("com.android.billingclient.api.AlternativeBillingOnlyReportingDetailsListener") val tokenListener = java.lang.reflect.Proxy.newProxyInstance( listenerClass.classLoader, @@ -457,14 +465,14 @@ class OpenIapModule( val tokenMethod = details.javaClass.getMethod("getExternalTransactionToken") val token = tokenMethod.invoke(details) as? String OpenIapLog.d("✓ External transaction token created: $token", TAG) - if (continuation.isActive) continuation.resume(token) + resumer.resume(token) } catch (e: Exception) { OpenIapLog.e("Failed to extract token: ${e.message}", e, TAG) - if (continuation.isActive) continuation.resume(null) + resumer.resume(null) } } else { OpenIapLog.e("Token creation failed: ${result?.debugMessage}", tag = TAG) - if (continuation.isActive) continuation.resume(null) + resumer.resume(null) } } null @@ -496,6 +504,7 @@ class OpenIapModule( } suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() try { // Use reflection to call isBillingProgramAvailableAsync (8.2.0+) val listenerClass = Class.forName("com.android.billingclient.api.BillingProgramAvailabilityListener") @@ -508,12 +517,10 @@ class OpenIapModule( OpenIapLog.d("Billing program availability result: ${result?.responseCode} - ${result?.debugMessage}", TAG) val isAvailable = result?.responseCode == BillingClient.BillingResponseCode.OK - if (continuation.isActive) { - continuation.resume(BillingProgramAvailabilityResultAndroid( + resumer.resume(BillingProgramAvailabilityResultAndroid( billingProgram = program, isAvailable = isAvailable - )) - } + )) } null } @@ -526,20 +533,16 @@ class OpenIapModule( method.invoke(client, billingProgramConstant, listener) } catch (e: NoSuchMethodException) { OpenIapLog.e("isBillingProgramAvailableAsync not found. Requires Billing Library 8.2.0+", e, TAG) - if (continuation.isActive) { - continuation.resume(BillingProgramAvailabilityResultAndroid( + resumer.resume(BillingProgramAvailabilityResultAndroid( billingProgram = program, isAvailable = false - )) - } + )) } catch (e: Exception) { OpenIapLog.e("Failed to check billing program availability: ${e.message}", e, TAG) - if (continuation.isActive) { - continuation.resume(BillingProgramAvailabilityResultAndroid( + resumer.resume(BillingProgramAvailabilityResultAndroid( billingProgram = program, isAvailable = false - )) - } + )) } } } @@ -568,6 +571,7 @@ class OpenIapModule( } suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() try { val listenerClass = Class.forName("com.android.billingclient.api.BillingProgramReportingDetailsListener") val listener = java.lang.reflect.Proxy.newProxyInstance( @@ -585,25 +589,25 @@ class OpenIapModule( val token = tokenMethod.invoke(details) as? String OpenIapLog.d("Billing program reporting token created: $token", TAG) - if (continuation.isActive && token != null) { - continuation.resume(BillingProgramReportingDetailsAndroid( + if (token != null) { + resumer.resume(BillingProgramReportingDetailsAndroid( billingProgram = program, externalTransactionToken = token )) - } else if (continuation.isActive) { - continuation.resumeWithException( + } else { + resumer.resumeWithException( OpenIapError.PurchaseFailed("Missing external transaction token") ) } } catch (e: Exception) { OpenIapLog.e("Failed to extract token: ${e.message}", e, TAG) - if (continuation.isActive) continuation.resumeWithException( + resumer.resumeWithException( OpenIapError.PurchaseFailed(e.message ?: e.javaClass.simpleName) ) } } else { OpenIapLog.e("Reporting details creation failed: ${result?.debugMessage}", tag = TAG) - if (continuation.isActive) continuation.resumeWithException( + resumer.resumeWithException( OpenIapError.PurchaseFailed(result?.debugMessage) ) } @@ -682,6 +686,7 @@ class OpenIapModule( } suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() try { // Build LaunchExternalLinkParams using reflection val paramsClass = Class.forName("com.android.billingclient.api.LaunchExternalLinkParams") @@ -721,7 +726,7 @@ class OpenIapModule( OpenIapLog.d("External link launch result: ${result?.responseCode} - ${result?.debugMessage}", TAG) val success = result?.responseCode == BillingClient.BillingResponseCode.OK - if (continuation.isActive) continuation.resume(success) + resumer.resume(success) } null } @@ -736,10 +741,10 @@ class OpenIapModule( launchMethod.invoke(client, activity, launchParams, listener) } catch (e: NoSuchMethodException) { OpenIapLog.e("launchExternalLink not found. Requires Billing Library 8.2.0+", e, TAG) - if (continuation.isActive) continuation.resume(false) + resumer.resume(false) } catch (e: Exception) { OpenIapLog.e("Failed to launch external link: ${e.message}", e, TAG) - if (continuation.isActive) continuation.resume(false) + resumer.resume(false) } } } @@ -897,17 +902,25 @@ class OpenIapModule( } suspendCancellableCoroutine> { continuation -> + var callbackRef: ((Result>) -> Unit)? = null + val resumer = continuation.resumeGuard { + callbackRef?.let { currentPurchaseCallback.compareAndSet(it, null) } + } val callback: (Result>) -> Unit = { result -> - if (continuation.isActive) continuation.resume(result.getOrDefault(emptyList())) + resumer.resume(result.getOrDefault(emptyList())) } + callbackRef = callback if (!currentPurchaseCallback.compareAndSet(null, callback)) { OpenIapLog.w("requestPurchase rejected: another purchase is already in progress", TAG) - if (continuation.isActive) continuation.resumeWithException( + resumer.resumeWithException( OpenIapError.DeveloperError("Another purchase is already in progress") ) return@suspendCancellableCoroutine } - continuation.invokeOnCancellation { currentPurchaseCallback.compareAndSet(callback, null) } + if (!continuation.isActive) { + currentPurchaseCallback.compareAndSet(callback, null) + return@suspendCancellableCoroutine + } val desiredType = if (androidArgs.type == ProductQueryType.Subs) BillingClient.ProductType.SUBS else BillingClient.ProductType.INAPP @@ -1115,7 +1128,10 @@ class OpenIapModule( .setProductList(productList) .build() + val didHandleProductDetails = AtomicBoolean(false) client.queryProductDetailsAsync(queryParams) { billingResult: BillingResult, result: QueryProductDetailsResult -> + if (!didHandleProductDetails.compareAndSet(false, true)) return@queryProductDetailsAsync + val productDetailsList = result.productDetailsList if (billingResult.responseCode == BillingClient.BillingResponseCode.OK && !productDetailsList.isNullOrEmpty()) { productManager.putAll(productDetailsList) @@ -1164,15 +1180,17 @@ class OpenIapModule( val result = if (isConsumable == true) { val params = ConsumeParams.newBuilder().setPurchaseToken(token).build() suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() client.consumeAsync(params) { outcome, _ -> - if (continuation.isActive) continuation.resume(outcome) + resumer.resume(outcome) } } } else { val params = AcknowledgePurchaseParams.newBuilder().setPurchaseToken(token).build() suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() client.acknowledgePurchase(params) { outcome -> - if (continuation.isActive) continuation.resume(outcome) + resumer.resume(outcome) } } } @@ -1188,12 +1206,13 @@ class OpenIapModule( val client = billingClient ?: throw OpenIapError.NotPrepared val params = AcknowledgePurchaseParams.newBuilder().setPurchaseToken(purchaseToken).build() suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() client.acknowledgePurchase(params) { result -> if (result.responseCode != BillingClient.BillingResponseCode.OK) { OpenIapLog.w("Failed to acknowledge purchase: ${result.debugMessage}", TAG) - if (continuation.isActive) continuation.resume(false) - } else if (continuation.isActive) { - continuation.resume(true) + resumer.resume(false) + } else { + resumer.resume(true) } } } @@ -1205,12 +1224,13 @@ class OpenIapModule( val client = billingClient ?: throw OpenIapError.NotPrepared val params = ConsumeParams.newBuilder().setPurchaseToken(purchaseToken).build() suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() client.consumeAsync(params) { result, _ -> if (result.responseCode != BillingClient.BillingResponseCode.OK) { OpenIapLog.w("Failed to consume purchase: ${result.debugMessage}", TAG) - if (continuation.isActive) continuation.resume(false) - } else if (continuation.isActive) { - continuation.resume(true) + resumer.resume(false) + } else { + resumer.resume(true) } } } @@ -1328,6 +1348,7 @@ class OpenIapModule( suspend fun getStorefront() = withContext(Dispatchers.IO) { val client = billingClient ?: return@withContext "" suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() runCatching { client.getBillingConfigAsync( GetBillingConfigParams.newBuilder().build(), @@ -1335,12 +1356,12 @@ class OpenIapModule( val code = if (result.responseCode == BillingClient.BillingResponseCode.OK) { config?.countryCode.orEmpty() } else "" - if (continuation.isActive) continuation.resume(code) + resumer.resume(code) } ) }.onFailure { error -> OpenIapLog.w("getStorefront failed: ${error.message}", TAG) - if (continuation.isActive) continuation.resume("") + resumer.resume("") } } } diff --git a/packages/google/openiap/src/play/java/dev/hyo/openiap/helpers/Helpers.kt b/packages/google/openiap/src/play/java/dev/hyo/openiap/helpers/Helpers.kt index a1b1ee75..2adc36ef 100644 --- a/packages/google/openiap/src/play/java/dev/hyo/openiap/helpers/Helpers.kt +++ b/packages/google/openiap/src/play/java/dev/hyo/openiap/helpers/Helpers.kt @@ -7,7 +7,6 @@ import dev.hyo.openiap.OpenIapError import dev.hyo.openiap.Purchase import dev.hyo.openiap.utils.BillingConverters.toPurchase import kotlinx.coroutines.suspendCancellableCoroutine -import kotlin.coroutines.resume // Common helpers (onPurchaseUpdated, onPurchaseError, AndroidPurchaseArgs, // toAndroidPurchaseArgs, toPurchaseError) are in main/helpers/CommonHelpers.kt @@ -28,8 +27,10 @@ internal suspend fun queryPurchases( productType: String, includeSuspended: Boolean = false ): List = suspendCancellableCoroutine { continuation -> + val resumer = continuation.resumeGuard() + val billingClient = client ?: run { - continuation.resume(emptyList()) + resumer.resume(emptyList()) return@suspendCancellableCoroutine } val paramsBuilder = QueryPurchasesParams.newBuilder().setProductType(productType) @@ -50,8 +51,6 @@ internal suspend fun queryPurchases( val params = paramsBuilder.build() billingClient.queryPurchasesAsync(params) { result, purchaseList -> - if (!continuation.isActive) return@queryPurchasesAsync - if (result.responseCode == BillingClient.BillingResponseCode.OK) { val mapped = purchaseList.map { billingPurchase -> // IMPORTANT: Google Play Billing Library does not include basePlanId in the Purchase object @@ -63,9 +62,9 @@ internal suspend fun queryPurchases( // This is a known limitation of the client-side Billing Library billingPurchase.toPurchase(productType, null) } - continuation.resume(mapped) + resumer.resume(mapped) } else { - continuation.resume(emptyList()) + resumer.resume(emptyList()) } } } diff --git a/packages/google/openiap/src/play/java/dev/hyo/openiap/helpers/ProductManager.kt b/packages/google/openiap/src/play/java/dev/hyo/openiap/helpers/ProductManager.kt index 51a70cd3..62aa76fb 100644 --- a/packages/google/openiap/src/play/java/dev/hyo/openiap/helpers/ProductManager.kt +++ b/packages/google/openiap/src/play/java/dev/hyo/openiap/helpers/ProductManager.kt @@ -6,8 +6,8 @@ import com.android.billingclient.api.ProductDetails import dev.hyo.openiap.OpenIapError import dev.hyo.openiap.OpenIapLog import kotlinx.coroutines.suspendCancellableCoroutine -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean /** * Manages ProductDetails caching and queries. @@ -18,7 +18,7 @@ import kotlin.coroutines.resumeWithException * appears to have incomplete data (defensive programming). */ internal class ProductManager { - private val cache = mutableMapOf() + private val cache = ConcurrentHashMap() fun get(productId: String): ProductDetails? = cache[productId] @@ -47,7 +47,6 @@ internal class ProductManager { // Check which products are missing or have incomplete data val needsQuery = mutableListOf() - val validCached = mutableListOf() for (productId in productIds.distinct()) { val cached = cache[productId] @@ -65,9 +64,7 @@ internal class ProductManager { else -> true } - if (isComplete) { - validCached.add(cached) - } else { + if (!isComplete) { OpenIapLog.w("Cached ProductDetails for '$productId' has incomplete data, will re-query", "ProductManager") needsQuery.add(productId) cache.remove(productId) @@ -90,17 +87,19 @@ internal class ProductManager { .build() return suspendCancellableCoroutine { cont -> + val resumer = cont.resumeGuard() + val didHandleProductDetails = AtomicBoolean(false) client.queryProductDetailsAsync(params) { billingResult, result -> + if (!didHandleProductDetails.compareAndSet(false, true)) return@queryProductDetailsAsync + // Always update cache even if coroutine was cancelled if (billingResult.responseCode == BillingClient.BillingResponseCode.OK) { val list = result.productDetailsList ?: emptyList() putAll(list) } - if (!cont.isActive) return@queryProductDetailsAsync - if (billingResult.responseCode != BillingClient.BillingResponseCode.OK) { - cont.resumeWithException( + resumer.resumeWithException( OpenIapError.QueryProduct.withDiagnostics( responseCode = billingResult.responseCode, debugMessage = billingResult.debugMessage, @@ -112,7 +111,7 @@ internal class ProductManager { return@queryProductDetailsAsync } // Preserve requested order and include cached + newly-fetched - cont.resume(productIds.mapNotNull { cache[it] }) + resumer.resume(productIds.mapNotNull { cache[it] }) } } } diff --git a/packages/google/openiap/src/testPlay/java/dev/hyo/openiap/QueryPurchasesRaceTest.kt b/packages/google/openiap/src/testPlay/java/dev/hyo/openiap/QueryPurchasesRaceTest.kt new file mode 100644 index 00000000..118a0008 --- /dev/null +++ b/packages/google/openiap/src/testPlay/java/dev/hyo/openiap/QueryPurchasesRaceTest.kt @@ -0,0 +1,244 @@ +@file:Suppress("DEPRECATION", "OVERRIDE_DEPRECATION") + +package dev.hyo.openiap + +import android.app.Activity +import com.android.billingclient.api.AcknowledgePurchaseParams +import com.android.billingclient.api.AcknowledgePurchaseResponseListener +import com.android.billingclient.api.AlternativeBillingOnlyAvailabilityListener +import com.android.billingclient.api.AlternativeBillingOnlyInformationDialogListener +import com.android.billingclient.api.AlternativeBillingOnlyReportingDetailsListener +import com.android.billingclient.api.BillingClient +import com.android.billingclient.api.BillingClientStateListener +import com.android.billingclient.api.BillingConfigResponseListener +import com.android.billingclient.api.BillingFlowParams +import com.android.billingclient.api.BillingProgramAvailabilityListener +import com.android.billingclient.api.BillingProgramReportingDetailsListener +import com.android.billingclient.api.BillingProgramReportingDetailsParams +import com.android.billingclient.api.BillingResult +import com.android.billingclient.api.ConsumeParams +import com.android.billingclient.api.ConsumeResponseListener +import com.android.billingclient.api.ExternalOfferAvailabilityListener +import com.android.billingclient.api.ExternalOfferInformationDialogListener +import com.android.billingclient.api.ExternalOfferReportingDetailsListener +import com.android.billingclient.api.GetBillingConfigParams +import com.android.billingclient.api.InAppMessageParams +import com.android.billingclient.api.InAppMessageResponseListener +import com.android.billingclient.api.LaunchExternalLinkParams +import com.android.billingclient.api.LaunchExternalLinkResponseListener +import com.android.billingclient.api.ProductDetailsResponseListener +import com.android.billingclient.api.Purchase +import com.android.billingclient.api.PurchasesResponseListener +import com.android.billingclient.api.QueryProductDetailsParams +import com.android.billingclient.api.QueryProductDetailsResult +import com.android.billingclient.api.QueryPurchasesParams +import dev.hyo.openiap.helpers.ProductManager +import dev.hyo.openiap.helpers.queryPurchases +import java.util.Collections +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import kotlin.concurrent.thread +import kotlinx.coroutines.test.runTest +import org.junit.Assert.assertTrue +import org.junit.Test + +class QueryPurchasesRaceTest { + + @Test + fun `queryPurchases tolerates duplicate concurrent callbacks`() = runTest { + val client = DuplicateBillingClient() + + queryPurchases(client, BillingClient.ProductType.INAPP) + + assertTrue( + "queryPurchases must ignore duplicate concurrent callbacks without double-resuming: " + + client.callbackFailures.joinToString { it::class.java.simpleName }, + client.callbackFailures.isEmpty() + ) + } + + @Test + fun `ProductManager getOrQuery tolerates duplicate concurrent callbacks`() = runTest { + val client = DuplicateBillingClient() + val productManager = ProductManager() + + productManager.getOrQuery(client, listOf("product-id"), BillingClient.ProductType.INAPP) + + assertTrue( + "getOrQuery must ignore duplicate concurrent callbacks without double-resuming: " + + client.callbackFailures.joinToString { it::class.java.simpleName }, + client.callbackFailures.isEmpty() + ) + } + + private class DuplicateBillingClient : BillingClient() { + val callbackFailures = Collections.synchronizedList(mutableListOf()) + + override fun queryPurchasesAsync( + params: QueryPurchasesParams, + listener: PurchasesResponseListener + ) { + val result = BillingResult.newBuilder() + .setResponseCode(BillingResponseCode.OK) + .build() + val purchaseList = CallbackBarrierList(callbackCount = 2) + runDuplicateCallbacks { + listener.onQueryPurchasesResponse(result, purchaseList) + } + } + + override fun queryProductDetailsAsync( + params: QueryProductDetailsParams, + listener: ProductDetailsResponseListener + ) { + val billingResult = BillingResult.newBuilder() + .setResponseCode(BillingResponseCode.OK) + .build() + val productDetails = CallbackBarrierList( + callbackCount = 2 + ) + val result = QueryProductDetailsResult.create(productDetails, emptyList()) + runDuplicateCallbacks { + listener.onProductDetailsResponse(billingResult, result) + } + } + + /** + * Blocks callback threads inside purchase mapping, then releases them + * together. The old implementation entered this block after checking + * isActive, making both callbacks race into continuation.resume(). + */ + private class CallbackBarrierList( + private val callbackCount: Int + ) : AbstractList() { + private val ready = CountDownLatch(callbackCount) + + override val size: Int = 0 + + override fun get(index: Int): T { + throw IndexOutOfBoundsException(index) + } + + override fun iterator(): Iterator = object : Iterator { + override fun hasNext(): Boolean { + ready.countDown() + ready.await(250, TimeUnit.MILLISECONDS) + return false + } + + override fun next(): T { + throw NoSuchElementException() + } + } + } + + private fun runDuplicateCallbacks(callback: () -> Unit) { + val callbacksReady = CountDownLatch(2) + val startCallbacks = CountDownLatch(1) + val callbacks = List(2) { + thread(start = false) { + runCatching { + callbacksReady.countDown() + check(startCallbacks.await(5, TimeUnit.SECONDS)) { + "timed out waiting to start duplicate callbacks" + } + callback() + }.onFailure { callbackFailures += it } + } + } + + callbacks.forEach { it.start() } + check(callbacksReady.await(5, TimeUnit.SECONDS)) { + "timed out waiting for duplicate callback threads" + } + startCallbacks.countDown() + callbacks.forEach { it.join() } + } + + override fun isReady(): Boolean = true + + override fun getConnectionState(): Int = ConnectionState.CONNECTED + + override fun isFeatureSupported(feature: String): BillingResult = + unsupported() + + override fun launchBillingFlow( + activity: Activity, + params: BillingFlowParams + ): BillingResult = unsupported() + + override fun showAlternativeBillingOnlyInformationDialog( + activity: Activity, + listener: AlternativeBillingOnlyInformationDialogListener + ): BillingResult = unsupported() + + override fun showExternalOfferInformationDialog( + activity: Activity, + listener: ExternalOfferInformationDialogListener + ): BillingResult = unsupported() + + override fun showInAppMessages( + activity: Activity, + params: InAppMessageParams, + listener: InAppMessageResponseListener + ): BillingResult = unsupported() + + override fun acknowledgePurchase( + params: AcknowledgePurchaseParams, + listener: AcknowledgePurchaseResponseListener + ) = unsupportedUnit() + + override fun consumeAsync( + params: ConsumeParams, + listener: ConsumeResponseListener + ) = unsupportedUnit() + + override fun createAlternativeBillingOnlyReportingDetailsAsync( + listener: AlternativeBillingOnlyReportingDetailsListener + ) = unsupportedUnit() + + override fun createBillingProgramReportingDetailsAsync( + params: BillingProgramReportingDetailsParams, + listener: BillingProgramReportingDetailsListener + ) = unsupportedUnit() + + override fun createExternalOfferReportingDetailsAsync( + listener: ExternalOfferReportingDetailsListener + ) = unsupportedUnit() + + override fun endConnection() = Unit + + override fun getBillingConfigAsync( + params: GetBillingConfigParams, + listener: BillingConfigResponseListener + ) = unsupportedUnit() + + override fun isAlternativeBillingOnlyAvailableAsync( + listener: AlternativeBillingOnlyAvailabilityListener + ) = unsupportedUnit() + + override fun isBillingProgramAvailableAsync( + billingProgram: Int, + listener: BillingProgramAvailabilityListener + ) = unsupportedUnit() + + override fun isExternalOfferAvailableAsync( + listener: ExternalOfferAvailabilityListener + ) = unsupportedUnit() + + override fun launchExternalLink( + activity: Activity, + params: LaunchExternalLinkParams, + listener: LaunchExternalLinkResponseListener + ) = unsupportedUnit() + + override fun startConnection(listener: BillingClientStateListener) = + unsupportedUnit() + + private fun unsupported(): BillingResult = BillingResult.newBuilder() + .setResponseCode(BillingResponseCode.FEATURE_NOT_SUPPORTED) + .build() + + private fun unsupportedUnit() = Unit + } +}