diff --git a/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/ChatClient.kt b/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/ChatClient.kt index 0428b52be2b..0bbbd86c1a9 100644 --- a/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/ChatClient.kt +++ b/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/ChatClient.kt @@ -101,6 +101,7 @@ import io.getstream.chat.android.client.extensions.ATTACHMENT_TYPE_FILE import io.getstream.chat.android.client.extensions.ATTACHMENT_TYPE_IMAGE import io.getstream.chat.android.client.extensions.cidToTypeAndId import io.getstream.chat.android.client.extensions.extractBaseUrl +import io.getstream.chat.android.client.extensions.getCreatedAtOrNull import io.getstream.chat.android.client.extensions.internal.isLaterThanDays import io.getstream.chat.android.client.header.VersionPrefixHeader import io.getstream.chat.android.client.helpers.AppSettingManager @@ -157,6 +158,7 @@ import io.getstream.chat.android.client.user.storage.SharedPreferencesCredential import io.getstream.chat.android.client.user.storage.UserCredentialStorage import io.getstream.chat.android.client.utils.ProgressCallback import io.getstream.chat.android.client.utils.TokenUtils +import io.getstream.chat.android.client.utils.internal.ServerClockOffset import io.getstream.chat.android.client.utils.mergePartially import io.getstream.chat.android.client.utils.message.ensureId import io.getstream.chat.android.client.utils.observable.ChatEventsObservable @@ -286,6 +288,7 @@ internal constructor( @InternalStreamChatApi public val audioPlayer: AudioPlayer, private val now: () -> Date = ::Date, + private val serverClockOffset: ServerClockOffset, private val repository: ChatClientRepository, private val messageReceiptReporter: MessageReceiptReporter, internal val messageReceiptManager: MessageReceiptManager, @@ -2588,16 +2591,34 @@ internal constructor( /** * Ensure the message has a [Message.createdLocallyAt] timestamp. - * If not, set it to the max of the channel's [Channel.lastMessageAt] + 1 millisecond and [now]. - * This ensures that the message appears in the correct order in the channel. + * If not, set it to the max of the channel's [Channel.lastMessageAt] + 1 millisecond and the + * estimated server time. Using estimated server time (instead of raw local clock) prevents + * cross-user ordering issues when the device clock is skewed. */ private suspend fun Message.ensureCreatedLocallyAt(cid: String): Message { - val lastMessageAt = repositoryFacade.selectChannel(cid = cid)?.lastMessageAt - val lastMessageAtPlusOneMillisecond = lastMessageAt?.let { - Date(it.time + 1) + val parentId = this.parentId + if (parentId != null) { + // Thread reply + val lastMessage = repositoryFacade.selectMessagesForThread(parentId, limit = 1).lastOrNull() + val lastMessageAt = lastMessage?.getCreatedAtOrNull() + val lastMessageAtPlusOneMillisecond = lastMessageAt?.let { + Date(it.time + 1) + } + val createdLocallyAt = max(lastMessageAtPlusOneMillisecond, serverClockOffset.estimatedServerTime()) + return copy(createdLocallyAt = this.createdLocallyAt ?: createdLocallyAt) + } else { + // Regular message + val (type, id) = cid.cidToTypeAndId() + // Fetch channel lastMessageAt from state, fallback to offline storage + val channelState = logicRegistry?.channelStateLogic(type, id)?.listenForChannelState() + val lastMessageAt = channelState?.channelData?.value?.lastMessageAt + ?: repositoryFacade.selectChannel(cid = cid)?.lastMessageAt + val lastMessageAtPlusOneMillisecond = lastMessageAt?.let { + Date(it.time + 1) + } + val createdLocallyAt = max(lastMessageAtPlusOneMillisecond, serverClockOffset.estimatedServerTime()) + return copy(createdLocallyAt = this.createdLocallyAt ?: createdLocallyAt) } - val createdLocallyAt = max(lastMessageAtPlusOneMillisecond, now()) - return copy(createdLocallyAt = this.createdLocallyAt ?: createdLocallyAt) } /** @@ -5037,6 +5058,8 @@ internal constructor( warmUpReflection() } + val serverClockOffset = ServerClockOffset() + val module = ChatModule( appContext = appContext, @@ -5055,6 +5078,7 @@ internal constructor( lifecycle = lifecycle, appName = this.appName, appVersion = this.appVersion, + serverClockOffset = serverClockOffset, ) val api = module.api() @@ -5091,6 +5115,7 @@ internal constructor( retryPolicy = retryPolicy, appSettingsManager = appSettingsManager, chatSocket = module.chatSocket, + serverClockOffset = serverClockOffset, pluginFactories = pluginFactories, repositoryFactoryProvider = repositoryFactoryProvider ?: pluginFactories diff --git a/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/di/ChatModule.kt b/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/di/ChatModule.kt index 0989553cca8..7575471ff5a 100644 --- a/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/di/ChatModule.kt +++ b/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/di/ChatModule.kt @@ -83,6 +83,7 @@ import io.getstream.chat.android.client.uploader.FileUploader import io.getstream.chat.android.client.uploader.StreamFileUploader import io.getstream.chat.android.client.user.CurrentUserFetcher import io.getstream.chat.android.client.utils.HeadersUtil +import io.getstream.chat.android.client.utils.internal.ServerClockOffset import io.getstream.chat.android.models.UserId import io.getstream.log.StreamLog import okhttp3.Interceptor @@ -116,6 +117,7 @@ import java.util.concurrent.TimeUnit * @param lifecycle Host [Lifecycle] used to observe app foreground/background and manage socket behavior. * @param appName Optional app name added to default headers for tracking. * @param appVersion Optional app version added to default headers for tracking. + * @param serverClockOffset Shared clock-offset tracker used by the socket layer for time synchronisation. */ @Suppress("TooManyFunctions") internal class ChatModule @@ -137,6 +139,7 @@ constructor( private val lifecycle: Lifecycle, private val appName: String?, private val appVersion: String?, + private val serverClockOffset: ServerClockOffset, ) { private val headersUtil = HeadersUtil(appContext, appName, appVersion) @@ -311,6 +314,7 @@ constructor( lifecycleObserver, networkStateProvider, clientDebugger, + serverClockOffset, ) private fun buildApi(chatConfig: ChatClientConfig): ChatApi = ProxyChatApi( diff --git a/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/socket/ChatSocket.kt b/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/socket/ChatSocket.kt index 71d80074d63..2cf9c4b7603 100644 --- a/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/socket/ChatSocket.kt +++ b/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/socket/ChatSocket.kt @@ -30,6 +30,7 @@ import io.getstream.chat.android.client.network.NetworkStateProvider import io.getstream.chat.android.client.scope.UserScope import io.getstream.chat.android.client.socket.ChatSocketStateService.State import io.getstream.chat.android.client.token.TokenManager +import io.getstream.chat.android.client.utils.internal.ServerClockOffset import io.getstream.chat.android.core.internal.coroutines.DispatcherProvider import io.getstream.chat.android.models.User import io.getstream.log.taggedLogger @@ -52,6 +53,7 @@ internal open class ChatSocket( private val lifecycleObserver: StreamLifecycleObserver, private val networkStateProvider: NetworkStateProvider, private val clientDebugger: ChatClientDebugger? = null, + private val serverClockOffset: ServerClockOffset, ) { private var streamWebSocket: StreamWebSocket? = null private val logger by taggedLogger(TAG) @@ -61,7 +63,13 @@ internal open class ChatSocket( private var socketStateObserverJob: Job? = null private val healthMonitor = HealthMonitor( userScope = userScope, - checkCallback = { (chatSocketStateService.currentState as? State.Connected)?.event?.let(::sendEvent) }, + checkCallback = { + (chatSocketStateService.currentState as? State.Connected)?.event?.let { + if (sendEvent(it)) { + serverClockOffset.onHealthCheckSent() + } + } + }, reconnectCallback = { chatSocketStateService.onWebSocketEventLost() }, ) private val lifecycleHandler = object : LifecycleHandler { @@ -84,6 +92,7 @@ internal open class ChatSocket( socketListenerJob?.cancel() when (networkStateProvider.isConnected()) { true -> { + serverClockOffset.onConnectionStarted() streamWebSocket = socketFactory.createSocket(connectionConf).apply { socketListenerJob = listen().onEach { when (it) { @@ -194,8 +203,14 @@ internal open class ChatSocket( private suspend fun handleEvent(chatEvent: ChatEvent) { when (chatEvent) { - is ConnectedEvent -> chatSocketStateService.onConnectionEstablished(chatEvent) - is HealthEvent -> healthMonitor.ack() + is ConnectedEvent -> { + serverClockOffset.onConnected(chatEvent.createdAt) + chatSocketStateService.onConnectionEstablished(chatEvent) + } + is HealthEvent -> { + serverClockOffset.onHealthCheck(chatEvent.createdAt) + healthMonitor.ack() + } else -> callListeners { listener -> listener.onEvent(chatEvent) } } } diff --git a/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/utils/internal/ServerClockOffset.kt b/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/utils/internal/ServerClockOffset.kt new file mode 100644 index 00000000000..7646c514447 --- /dev/null +++ b/stream-chat-android-client/src/main/java/io/getstream/chat/android/client/utils/internal/ServerClockOffset.kt @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-chat-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.chat.android.client.utils.internal + +import io.getstream.chat.android.client.events.ConnectedEvent +import io.getstream.chat.android.client.events.HealthEvent +import java.util.Date + +/** + * Tracks the offset between the local device clock and the server clock using + * NTP-style estimation from WebSocket health check round-trips. + * + * The algorithm keeps only the sample with the lowest observed RTT, since a + * smaller round-trip means less room for network asymmetry to distort the + * measurement. Under the assumption that clock skew is constant for the + * duration of a session, the estimate monotonically improves over time. + * + * Thread-safe: single-field writes use [Volatile] for visibility; compound + * read-modify-write sequences are guarded by [lock] for atomicity. + * + * @param localTimeMs Clock source for the local device time (injectable for tests). + * @param maxRttMs Upper bound on plausible RTT. Samples exceeding this are + * discarded as stale or mismatched. Defaults to the health check cycle + * interval (MONITOR_INTERVAL + HEALTH_CHECK_INTERVAL = 11 000 ms). + */ +internal class ServerClockOffset( + private val localTimeMs: () -> Long = { System.currentTimeMillis() }, + private val maxRttMs: Long = DEFAULT_MAX_RTT_MS, +) { + + private val lock = Any() + + @Volatile + private var offsetMs: Long = 0L + + @Volatile + private var bestRttMs: Long = Long.MAX_VALUE + + @Volatile + private var healthCheckSentAtMs: Long = 0L + + @Volatile + private var connectionStartedAtMs: Long = 0L + + /** + * Record the local time immediately before starting a WebSocket connection. + * When the next [ConnectedEvent] arrives, [onConnected] will pair with this + * timestamp to compute the offset using the NTP midpoint formula. + */ + internal fun onConnectionStarted() { + connectionStartedAtMs = localTimeMs() + } + + /** + * Record the local time immediately before sending a health check echo. + * The next [onHealthCheck] call will pair with this timestamp to compute RTT. + */ + internal fun onHealthCheckSent() { + healthCheckSentAtMs = localTimeMs() + } + + /** + * Calibration from a [ConnectedEvent]. + * + * If [onConnectionStarted] was called before this connection (e.g. right before + * opening the WebSocket), uses the NTP midpoint of (connectionStartedAt, receivedAt) + * and serverTime for a more accurate offset. Otherwise falls back to a naive + * `localTime - serverTime` estimate. + * + * Resets health check state, since a new connection means any in-flight health + * check from the previous connection is stale. + */ + internal fun onConnected(serverTime: Date) { + synchronized(lock) { + bestRttMs = Long.MAX_VALUE + healthCheckSentAtMs = 0L + + val receivedAtMs = localTimeMs() + val startedAtMs = connectionStartedAtMs + connectionStartedAtMs = 0L + + if (startedAtMs > 0L) { + val rtt = receivedAtMs - startedAtMs + if (rtt in 1..maxRttMs) { + offsetMs = (startedAtMs + receivedAtMs) / 2 - serverTime.time + bestRttMs = rtt + return + } + } + offsetMs = receivedAtMs - serverTime.time + } + } + + /** + * Refine the offset using a [HealthEvent] paired with [onHealthCheckSent]. + * + * Computes RTT from the stored send time and the current receive time, + * then applies the NTP midpoint formula: + * ``` + * offset = (sentAt + receivedAt) / 2 - serverTime + * ``` + * + * The sample is accepted only if: + * - There is a pending [onHealthCheckSent] timestamp. + * - RTT is positive (guards against clock anomalies). + * - RTT is below [maxRttMs] (rejects stale / mismatched pairs). + * - RTT is lower than any previous sample (min-RTT selection). + */ + internal fun onHealthCheck(serverTime: Date) { + synchronized(lock) { + val sentAtMs = healthCheckSentAtMs + if (sentAtMs <= 0L) return + healthCheckSentAtMs = 0L + + val receivedAtMs = localTimeMs() + val rtt = receivedAtMs - sentAtMs + if (rtt !in 1..maxRttMs) return + + if (rtt < bestRttMs) { + bestRttMs = rtt + offsetMs = (sentAtMs + receivedAtMs) / 2 - serverTime.time + } + } + } + + /** + * Returns the current time adjusted to the server timescale. + * + * Before the first [onConnected] call, this returns the raw local time + * (offset = 0). + */ + internal fun estimatedServerTime(): Date = + Date(localTimeMs() - offsetMs) + + internal companion object { + internal const val DEFAULT_MAX_RTT_MS = 11_000L + } +} diff --git a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/ChatClientConnectionTests.kt b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/ChatClientConnectionTests.kt index 48ad3744d4e..cdfc670a13a 100644 --- a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/ChatClientConnectionTests.kt +++ b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/ChatClientConnectionTests.kt @@ -34,6 +34,7 @@ import io.getstream.chat.android.client.token.FakeTokenManager import io.getstream.chat.android.client.user.CredentialConfig import io.getstream.chat.android.client.user.storage.UserCredentialStorage import io.getstream.chat.android.client.utils.TokenUtils +import io.getstream.chat.android.client.utils.internal.ServerClockOffset import io.getstream.chat.android.models.ConnectionData import io.getstream.chat.android.models.EventType import io.getstream.chat.android.models.GuestUser @@ -126,6 +127,7 @@ internal class ChatClientConnectionTests { retryPolicy = mock(), appSettingsManager = mock(), chatSocket = fakeChatSocket, + serverClockOffset = ServerClockOffset(), pluginFactories = emptyList(), repositoryFactoryProvider = NoOpRepositoryFactory.Provider, mutableClientState = mutableClientState, diff --git a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/ChatClientTest.kt b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/ChatClientTest.kt index 30519e2cc1b..e10c223f927 100644 --- a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/ChatClientTest.kt +++ b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/ChatClientTest.kt @@ -37,6 +37,7 @@ import io.getstream.chat.android.client.scope.UserTestScope import io.getstream.chat.android.client.socket.FakeChatSocket import io.getstream.chat.android.client.token.FakeTokenManager import io.getstream.chat.android.client.utils.TokenUtils +import io.getstream.chat.android.client.utils.internal.ServerClockOffset import io.getstream.chat.android.client.utils.retry.NoRetryPolicy import io.getstream.chat.android.models.ConnectionState import io.getstream.chat.android.models.EventType @@ -138,6 +139,7 @@ internal class ChatClientTest { retryPolicy = NoRetryPolicy(), appSettingsManager = mock(), chatSocket = fakeChatSocket, + serverClockOffset = ServerClockOffset(), pluginFactories = emptyList(), mutableClientState = Mother.mockedClientState(), repositoryFactoryProvider = NoOpRepositoryFactory.Provider, diff --git a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/DependencyResolverTest.kt b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/DependencyResolverTest.kt index c449bf03179..bfcba093e78 100644 --- a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/DependencyResolverTest.kt +++ b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/DependencyResolverTest.kt @@ -23,6 +23,7 @@ import io.getstream.chat.android.client.plugin.factory.PluginFactory import io.getstream.chat.android.client.scope.ClientTestScope import io.getstream.chat.android.client.scope.UserTestScope import io.getstream.chat.android.client.setup.state.internal.MutableClientState +import io.getstream.chat.android.client.utils.internal.ServerClockOffset import io.getstream.chat.android.core.internal.InternalStreamChatApi import io.getstream.chat.android.models.InitializationState import io.getstream.chat.android.models.NoOpMessageTransformer @@ -174,6 +175,7 @@ public class DependencyResolverTest { retryPolicy = mock(), appSettingsManager = mock(), chatSocket = mock(), + serverClockOffset = ServerClockOffset(), pluginFactories = pluginFactories, repositoryFactoryProvider = mock(), mutableClientState = mutableClientState, diff --git a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/MockClientBuilder.kt b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/MockClientBuilder.kt index eef087a4eb3..ef4b6ace915 100644 --- a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/MockClientBuilder.kt +++ b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/MockClientBuilder.kt @@ -33,6 +33,7 @@ import io.getstream.chat.android.client.setup.state.internal.MutableClientState import io.getstream.chat.android.client.token.FakeTokenManager import io.getstream.chat.android.client.uploader.FileUploader import io.getstream.chat.android.client.utils.TokenUtils +import io.getstream.chat.android.client.utils.internal.ServerClockOffset import io.getstream.chat.android.client.utils.retry.NoRetryPolicy import io.getstream.chat.android.models.EventType import io.getstream.chat.android.models.NoOpMessageTransformer @@ -121,6 +122,7 @@ internal class MockClientBuilder( retryPolicy = NoRetryPolicy(), appSettingsManager = mock(), chatSocket = mock(), + serverClockOffset = ServerClockOffset(), pluginFactories = emptyList(), repositoryFactoryProvider = NoOpRepositoryFactory.Provider, mutableClientState = mutableClientState, diff --git a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/chatclient/BaseChatClientTest.kt b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/chatclient/BaseChatClientTest.kt index 2b1174f1ab3..fa6ecda8edc 100644 --- a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/chatclient/BaseChatClientTest.kt +++ b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/chatclient/BaseChatClientTest.kt @@ -37,6 +37,7 @@ import io.getstream.chat.android.client.socket.FakeChatSocket import io.getstream.chat.android.client.token.TokenManager import io.getstream.chat.android.client.user.CurrentUserFetcher import io.getstream.chat.android.client.utils.TokenUtils +import io.getstream.chat.android.client.utils.internal.ServerClockOffset import io.getstream.chat.android.client.utils.retry.NoRetryPolicy import io.getstream.chat.android.models.NoOpMessageTransformer import io.getstream.chat.android.models.NoOpUserTransformer @@ -124,6 +125,7 @@ internal open class BaseChatClientTest { retryPolicy = NoRetryPolicy(), appSettingsManager = mock(), chatSocket = getChatSocket(), + serverClockOffset = ServerClockOffset(), pluginFactories = pluginFactories, repositoryFactoryProvider = NoOpRepositoryFactory.Provider, mutableClientState = mutableClientState, diff --git a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/debugger/ChatClientDebuggerTest.kt b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/debugger/ChatClientDebuggerTest.kt index 3570ac5fdbd..415b0a57b83 100644 --- a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/debugger/ChatClientDebuggerTest.kt +++ b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/debugger/ChatClientDebuggerTest.kt @@ -36,6 +36,7 @@ import io.getstream.chat.android.client.scope.UserTestScope import io.getstream.chat.android.client.socket.FakeChatSocket import io.getstream.chat.android.client.token.FakeTokenManager import io.getstream.chat.android.client.utils.TokenUtils +import io.getstream.chat.android.client.utils.internal.ServerClockOffset import io.getstream.chat.android.client.utils.retry.NoRetryPolicy import io.getstream.chat.android.models.Message import io.getstream.chat.android.models.NoOpMessageTransformer @@ -142,6 +143,7 @@ internal class ChatClientDebuggerTest { retryPolicy = NoRetryPolicy(), appSettingsManager = mock(), chatSocket = fakeChatSocket, + serverClockOffset = ServerClockOffset(), pluginFactories = pluginFactories, mutableClientState = Mother.mockedClientState(), repositoryFactoryProvider = NoOpRepositoryFactory.Provider, diff --git a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/socket/FakeChatSocket.kt b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/socket/FakeChatSocket.kt index e6178e185c3..83ebcb1f7e2 100644 --- a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/socket/FakeChatSocket.kt +++ b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/socket/FakeChatSocket.kt @@ -25,6 +25,7 @@ import io.getstream.chat.android.client.parser2.adapters.internal.StreamDateForm import io.getstream.chat.android.client.scope.UserScope import io.getstream.chat.android.client.token.FakeTokenManager import io.getstream.chat.android.client.token.TokenManager +import io.getstream.chat.android.client.utils.internal.ServerClockOffset import io.getstream.chat.android.models.EventType import io.getstream.chat.android.models.User import io.getstream.chat.android.randomString @@ -46,6 +47,7 @@ internal class FakeChatSocket private constructor( userScope: UserScope, lifecycleObserver: StreamLifecycleObserver, networkStateProvider: NetworkStateProvider, + serverClockOffset: ServerClockOffset, getWebSocketListener: () -> WebSocketListener, ) : ChatSocket( apiKey, @@ -55,6 +57,7 @@ internal class FakeChatSocket private constructor( userScope, lifecycleObserver, networkStateProvider, + serverClockOffset = serverClockOffset, ) { private val streamDateFormatter = StreamDateFormatter() private val webSocketListener: WebSocketListener by lazy { getWebSocketListener() } @@ -89,6 +92,7 @@ internal class FakeChatSocket private constructor( wssUrl: String = randomString(), tokenManager: TokenManager = FakeTokenManager(randomString()), networkStateProvider: NetworkStateProvider = mock(), + serverClockOffset: ServerClockOffset = ServerClockOffset(), ): FakeChatSocket { var webSocketListener: WebSocketListener? = null val parser: ChatParser = mock() @@ -107,6 +111,7 @@ internal class FakeChatSocket private constructor( userScope, lifecycleObserver, networkStateProvider, + serverClockOffset, ) { webSocketListener!! } } } diff --git a/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/utils/internal/ServerClockOffsetTest.kt b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/utils/internal/ServerClockOffsetTest.kt new file mode 100644 index 00000000000..bc705f2b08e --- /dev/null +++ b/stream-chat-android-client/src/test/java/io/getstream/chat/android/client/utils/internal/ServerClockOffsetTest.kt @@ -0,0 +1,344 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-chat-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.chat.android.client.utils.internal + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import java.util.Date + +internal class ServerClockOffsetTest { + + // ── estimatedServerTime before any calibration ────────────────────── + + @Test + fun `estimatedServerTime equals local time before any calibration`() { + val sut = ServerClockOffset(localTimeMs = { 1_000_000L }) + + assertEquals(Date(1_000_000L), sut.estimatedServerTime()) + } + + // ── onConnected (naive one-way estimate) ──────────────────────────── + + @Test + fun `onConnected calibrates when local clock is ahead`() { + val sut = ServerClockOffset(localTimeMs = { 10_000L }) + + sut.onConnected(serverTime = Date(7_000L)) + + assertEquals(Date(7_000L), sut.estimatedServerTime()) + } + + @Test + fun `onConnected calibrates when local clock is behind`() { + val sut = ServerClockOffset(localTimeMs = { 5_000L }) + + sut.onConnected(serverTime = Date(8_000L)) + + assertEquals(Date(8_000L), sut.estimatedServerTime()) + } + + @Test + fun `onConnected resets health check state from previous connection`() { + var localTime = 10_000L + val sut = ServerClockOffset(localTimeMs = { localTime }) + + sut.onHealthCheckSent() + + localTime = 10_200L + sut.onConnected(serverTime = Date(10_100L)) + + localTime = 10_400L + sut.onHealthCheck(serverTime = Date(10_300L)) + assertEquals(Date(10_100L + (10_400L - 10_200L)), sut.estimatedServerTime()) + } + + // ── onConnectionStarted + onConnected (NTP for initial connection) ─── + + @Test + fun `onConnected uses NTP midpoint when onConnectionStarted was called`() { + val skew = 3_000L + var localTime = 10_000L + val sut = ServerClockOffset(localTimeMs = { localTime }) + + sut.onConnectionStarted() + + localTime = 10_200L + val serverTimeAtMidpoint = (10_000L + 10_200L) / 2 - skew + sut.onConnected(serverTime = Date(serverTimeAtMidpoint)) + + // offset = (10_000 + 10_200) / 2 - serverTimeAtMidpoint = 3_000 + localTime = 15_000L + assertEquals(Date(15_000L - skew), sut.estimatedServerTime()) + } + + @Test + fun `onConnected falls back to naive when onConnectionStarted was not called`() { + val localTime = 10_000L + val sut = ServerClockOffset(localTimeMs = { localTime }) + + sut.onConnected(serverTime = Date(7_000L)) + + assertEquals(Date(7_000L), sut.estimatedServerTime()) + } + + @Test + fun `onConnected rejects connection pair when RTT exceeds maxRttMs and uses naive`() { + var localTime = 0L + val sut = ServerClockOffset(localTimeMs = { localTime }, maxRttMs = 100L) + + sut.onConnectionStarted() + + localTime = 500L + sut.onConnected(serverTime = Date(250L)) + + // RTT = 500 > maxRttMs = 100 → rejected, naive used: offset = 500 - 250 = 250 + assertEquals(Date(500L - 250L), sut.estimatedServerTime()) + } + + @Test + fun `onConnectionStarted is consumed so second onConnected uses naive`() { + var localTime = 0L + val sut = ServerClockOffset(localTimeMs = { localTime }) + + sut.onConnectionStarted() + localTime = 100L + sut.onConnected(serverTime = Date(50L)) + + localTime = 1_000L + sut.onConnected(serverTime = Date(999L)) + + // No connectionStartedAtMs (consumed), so naive: offset = 1000 - 999 = 1 + assertEquals(Date(1_000L - 1L), sut.estimatedServerTime()) + } + + // ── onHealthCheck (NTP midpoint with min-RTT selection) ───────────── + + @Test + fun `onHealthCheck computes NTP midpoint offset`() { + val skew = 3_000L + var localTime = 10_000L + val sut = ServerClockOffset(localTimeMs = { localTime }) + sut.onConnected(serverTime = Date(localTime - skew)) + + localTime = 20_000L + sut.onHealthCheckSent() + + localTime = 20_200L + sut.onHealthCheck(serverTime = Date(17_100L)) + + // offset = (20_000 + 20_200) / 2 - 17_100 = 3_000 + // estimatedServerTime = 20_200 - 3_000 = 17_200 + assertEquals(Date(17_200L), sut.estimatedServerTime()) + } + + @Test + fun `onHealthCheck keeps lowest RTT sample`() { + var localTime = 0L + val sut = ServerClockOffset(localTimeMs = { localTime }) + sut.onConnected(serverTime = Date(0L)) + + // First health check: RTT = 500 + localTime = 1_000L + sut.onHealthCheckSent() + localTime = 1_500L + sut.onHealthCheck(serverTime = Date(1_250L)) + val offsetAfterFirst = (1_000L + 1_500L) / 2 - 1_250L + + // Second health check: RTT = 100 (better) + localTime = 2_000L + sut.onHealthCheckSent() + localTime = 2_100L + sut.onHealthCheck(serverTime = Date(2_050L)) + val offsetAfterSecond = (2_000L + 2_100L) / 2 - 2_050L + + localTime = 5_000L + assertEquals(Date(5_000L - offsetAfterSecond), sut.estimatedServerTime()) + } + + @Test + fun `onHealthCheck ignores higher RTT sample`() { + var localTime = 0L + val sut = ServerClockOffset(localTimeMs = { localTime }) + sut.onConnected(serverTime = Date(0L)) + + // First health check: RTT = 100 (best) + localTime = 1_000L + sut.onHealthCheckSent() + localTime = 1_100L + sut.onHealthCheck(serverTime = Date(1_050L)) + val bestOffset = (1_000L + 1_100L) / 2 - 1_050L + + // Second health check: RTT = 500 (worse -- ignored) + localTime = 2_000L + sut.onHealthCheckSent() + localTime = 2_500L + sut.onHealthCheck(serverTime = Date(2_250L)) + + localTime = 5_000L + assertEquals(Date(5_000L - bestOffset), sut.estimatedServerTime()) + } + + @Test + fun `onHealthCheck overrides naive onConnected estimate`() { + val skew = 3_000L + var localTime = 10_000L + val sut = ServerClockOffset(localTimeMs = { localTime }) + sut.onConnected(serverTime = Date(localTime - skew)) + + // Naive estimate at localTime = 10_000: offset = 3_000 + assertEquals(Date(7_000L), sut.estimatedServerTime()) + + localTime = 20_000L + sut.onHealthCheckSent() + localTime = 20_200L + sut.onHealthCheck(serverTime = Date(17_100L)) + + // NTP offset = (20_000 + 20_200) / 2 - 17_100 = 3_000 + // At localTime = 20_200: estimated = 20_200 - 3_000 = 17_200 + assertEquals(Date(17_200L), sut.estimatedServerTime()) + } + + // ── Guards: mismatched / stale / implausible pairs ────────────────── + + @Test + fun `onHealthCheck is no-op without prior onHealthCheckSent`() { + var localTime = 10_000L + val sut = ServerClockOffset(localTimeMs = { localTime }) + sut.onConnected(serverTime = Date(7_000L)) + + localTime = 20_000L + sut.onHealthCheck(serverTime = Date(17_000L)) + + // Offset unchanged from onConnected: 10_000 - 7_000 = 3_000 + assertEquals(Date(20_000L - 3_000L), sut.estimatedServerTime()) + } + + @Test + fun `onHealthCheck consumes sentAt so second call is no-op`() { + var localTime = 0L + val sut = ServerClockOffset(localTimeMs = { localTime }) + sut.onConnected(serverTime = Date(0L)) + + localTime = 1_000L + sut.onHealthCheckSent() + + localTime = 1_100L + sut.onHealthCheck(serverTime = Date(1_050L)) + val offsetAfterFirst = (1_000L + 1_100L) / 2 - 1_050L + + localTime = 50_000L + sut.onHealthCheck(serverTime = Date(99_999L)) + + // Offset unchanged -- second call was a no-op (sentAtMs consumed) + assertEquals(Date(50_000L - offsetAfterFirst), sut.estimatedServerTime()) + } + + @Test + fun `onHealthCheck rejects RTT exceeding maxRttMs`() { + var localTime = 0L + val sut = ServerClockOffset(localTimeMs = { localTime }, maxRttMs = 500L) + sut.onConnected(serverTime = Date(0L)) + + localTime = 1_000L + sut.onHealthCheckSent() + localTime = 2_000L + sut.onHealthCheck(serverTime = Date(1_500L)) + + // RTT = 1_000 > maxRttMs = 500 → rejected, offset unchanged (= 0) + assertEquals(Date(2_000L), sut.estimatedServerTime()) + } + + @Test + fun `onHealthCheck rejects non-positive RTT`() { + val localTime = 1_000L + val sut = ServerClockOffset(localTimeMs = { localTime }) + sut.onConnected(serverTime = Date(1_000L)) + + sut.onHealthCheckSent() + // localTime hasn't advanced → RTT = 0 → rejected + sut.onHealthCheck(serverTime = Date(1_000L)) + + assertEquals(Date(1_000L), sut.estimatedServerTime()) + } + + // ── Reconnect resets ──────────────────────────────────────────────── + + @Test + fun `onConnected resets bestRtt so health checks re-converge`() { + var localTime = 0L + val sut = ServerClockOffset(localTimeMs = { localTime }) + sut.onConnected(serverTime = Date(0L)) + + // Excellent RTT on first connection + localTime = 1_000L + sut.onHealthCheckSent() + localTime = 1_050L + sut.onHealthCheck(serverTime = Date(1_025L)) + + // Reconnect resets bestRtt + localTime = 50_000L + sut.onConnected(serverTime = Date(50_000L)) + + // Worse RTT on new connection should still be accepted + localTime = 51_000L + sut.onHealthCheckSent() + localTime = 51_200L + sut.onHealthCheck(serverTime = Date(51_100L)) + + val expectedOffset = (51_000L + 51_200L) / 2 - 51_100L + localTime = 60_000L + assertEquals(Date(60_000L - expectedOffset), sut.estimatedServerTime()) + } + + // ── Clock directions with health check ────────────────────────────── + + @Test + fun `clock 1 hour ahead is corrected by health check`() { + val skew = 3_600_000L + var localTime = 36_000_000L + val sut = ServerClockOffset(localTimeMs = { localTime }) + sut.onConnected(serverTime = Date(localTime - skew)) + + localTime = 36_010_000L + sut.onHealthCheckSent() + localTime = 36_010_200L + val serverTimeAtMidpoint = (36_010_000L + 36_010_200L) / 2 - skew + sut.onHealthCheck(serverTime = Date(serverTimeAtMidpoint)) + + localTime = 36_020_000L + val expected = 36_020_000L - skew + assertEquals(Date(expected), sut.estimatedServerTime()) + } + + @Test + fun `clock 1 hour behind is corrected by health check`() { + val skew = -3_600_000L + var localTime = 28_800_000L + val sut = ServerClockOffset(localTimeMs = { localTime }) + sut.onConnected(serverTime = Date(localTime - skew)) + + localTime = 28_810_000L + sut.onHealthCheckSent() + localTime = 28_810_200L + val serverTimeAtMidpoint = (28_810_000L + 28_810_200L) / 2 - skew + sut.onHealthCheck(serverTime = Date(serverTimeAtMidpoint)) + + localTime = 28_820_000L + val expected = 28_820_000L - skew + assertEquals(Date(expected), sut.estimatedServerTime()) + } +}