Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import io.getstream.chat.android.client.channel.ChannelMessagesUpdateLogic
import io.getstream.chat.android.client.errors.isPermanent
import io.getstream.chat.android.client.events.ChatEvent
import io.getstream.chat.android.client.extensions.cidToTypeAndId
import io.getstream.chat.android.client.extensions.getCreatedAtOrDefault
import io.getstream.chat.android.client.extensions.getCreatedAtOrNull
import io.getstream.chat.android.client.extensions.internal.NEVER
import io.getstream.chat.android.client.internal.state.model.querychannels.pagination.internal.QueryChannelPaginationRequest
import io.getstream.chat.android.client.internal.state.model.querychannels.pagination.internal.toAnyChannelPaginationRequest
import io.getstream.chat.android.client.internal.state.plugin.state.channel.internal.ChannelStateImpl
Expand Down Expand Up @@ -91,8 +93,8 @@ internal class ChannelLogicImpl(
updateDataForChannel(
channel = channel,
messageLimit = query.messagesLimit(),
shouldRefreshMessages = true,
// Note: The following arguments are NOT used. But they are kept for backwards compatibility.
shouldRefreshMessages = query.shouldRefresh,
scrollUpdate = false,
isNotificationUpdate = query.isNotificationUpdate,
isChannelsStateUpdate = true,
Expand Down Expand Up @@ -302,13 +304,39 @@ internal class ChannelLogicImpl(
state.setChannelConfig(channel.config)
// Set pending messages
state.setPendingMessages(channel.pendingMessages.map(PendingMessage::message))
// Reset messages (ensure they are sorted - when coming from DB)
// Update messages based on the relationship between the incoming page and existing state.
if (messageLimit > 0) {
val sortedMessages = withContext(Dispatchers.Default) {
channel.messages.sortedBy { it.getCreatedAtOrNull() }
}
state.setMessages(sortedMessages)
state.setEndOfOlderMessages(channel.messages.size < messageLimit)
val currentMessages = state.messages.value
when {
shouldRefreshMessages || currentMessages.isEmpty() -> {
// Initial load (DB seed or first fetch) or explicit refresh — full replace
state.setMessages(sortedMessages)
state.setEndOfOlderMessages(channel.messages.size < messageLimit)
}
state.insideSearch.value -> {
// User's window was already trimmed away from the latest (insideSearch set by
// trimNewestMessages, or a prior jump-to-message). Stay at current position;
// refresh the "jump to latest" cache with the server's current latest page.
state.upsertCachedLatestMessages(sortedMessages)
}
hasGap(currentMessages, sortedMessages) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for me to understand: this check for a gap between current and incoming, in that order, right?. What if there's a gap the other way around, i.e. all incoming are older than all current, is that a valid case?

Copy link
Contributor Author

@VelikovPetar VelikovPetar Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is intentional: This method should never be called with messages other than the latest (known) messages. It is called in the following scenarios:

  1. After the initial channel load (QueryChannels) - latest messages
  2. When pre-populating with data from DB - latest (known) messages
  3. Re-watch after reconnecting (again QueryChannels)
  4. When a new channel is added to the channel list via an event (ex. via notification.message_new) - again, we should have the latest messages in the response.

The other scenario should not be possible (when this method is invoked)

// Incoming page is newer than the current window with no overlap. Inserting the
// incoming messages would create a fragmented list. Instead, treat the user's
// position as a mid-page: store the incoming as the "latest" cache and signal the UI.
state.upsertCachedLatestMessages(sortedMessages)
state.setInsideSearch(true)
state.setEndOfNewerMessages(false)
}
else -> {
// Incoming messages are contiguous with (or overlap) the current window.
// Upsert preserves the user's scroll position while adding/updating messages.
state.upsertMessages(sortedMessages)
state.setEndOfOlderMessages(channel.messages.size < messageLimit)
}
}
}
// Add pinned messages
state.addPinnedMessages(channel.pinnedMessages)
Expand Down Expand Up @@ -428,4 +456,14 @@ internal class ChannelLogicImpl(
// Enrich the channel with messages
return channel.copy(messages = messages)
}

private fun hasGap(currentMessages: List<Message>, incomingMessages: List<Message>): Boolean {
val currentNewest = currentMessages.maxByOrNull { it.getCreatedAtOrDefault(NEVER) }
val incomingOldest = incomingMessages.firstOrNull()
return currentMessages.isNotEmpty() &&
currentNewest != null &&
incomingOldest != null &&
currentMessages.none { it.id == incomingOldest.id } &&
incomingOldest.getCreatedAtOrDefault(NEVER).after(currentNewest.getCreatedAtOrDefault(NEVER))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,26 @@ internal class ChannelStateImpl(
_cachedLatestMessages.value = emptyList()
}

/**
* Merges [messages] into the cached latest messages, replacing any existing entry
* with the same id and capping the list at [CACHED_LATEST_MESSAGES_LIMIT].
*
* Called during reconnection to refresh the "jump to latest" cache with the server's
* current latest page without disturbing the user's active scroll position.
*/
fun upsertCachedLatestMessages(messages: List<Message>) {
if (messages.isEmpty()) return
val messagesToUpsert = messages.filterNot { shouldIgnoreUpsertion(it) }
if (messagesToUpsert.isEmpty()) return
_cachedLatestMessages.update { current ->
current.mergeSorted(
other = messagesToUpsert,
idSelector = Message::id,
comparator = MESSAGE_COMPARATOR,
).takeLast(CACHED_LATEST_MESSAGES_LIMIT)
}
}

// endregion

// region Destroy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1384,6 +1384,97 @@ internal class ChannelLogicImplTest {
// Then
verify(stateImpl).setPendingMessages(listOf(pendingMsg))
}

@Test
fun `should upsert messages when state has messages and incoming are contiguous`() = runTest {
val existingMsg = randomMessage(id = "existing", createdAt = Date(1000L), createdLocallyAt = null)
whenever(stateImpl.messages).thenReturn(MutableStateFlow(listOf(existingMsg)))
val incomingMsg = randomMessage(id = "new", createdAt = Date(500L), createdLocallyAt = null)
val channel = randomChannel(
id = "123",
type = "messaging",
messages = listOf(incomingMsg),
members = emptyList(),
watchers = emptyList(),
read = emptyList(),
memberCount = 0,
watcherCount = 0,
)
sut.updateDataForChannel(channel = channel, messageLimit = 30)
verify(stateImpl).upsertMessages(listOf(incomingMsg))
verify(stateImpl, never()).setMessages(any())
verify(stateImpl, never()).upsertCachedLatestMessages(any())
verify(stateImpl, never()).setEndOfNewerMessages(any())
}

@Test
fun `should cache incoming and signal newer messages when gap is detected`() = runTest {
val existingMsg = randomMessage(id = "old", createdAt = Date(1000L), createdLocallyAt = null)
whenever(stateImpl.messages).thenReturn(MutableStateFlow(listOf(existingMsg)))
val incomingMsg = randomMessage(id = "new", createdAt = Date(5000L), createdLocallyAt = null)
val channel = randomChannel(
id = "123",
type = "messaging",
messages = listOf(incomingMsg),
members = emptyList(),
watchers = emptyList(),
read = emptyList(),
memberCount = 0,
watcherCount = 0,
)
sut.updateDataForChannel(channel = channel, messageLimit = 30)
verify(stateImpl).upsertCachedLatestMessages(listOf(incomingMsg))
verify(stateImpl).setInsideSearch(true)
verify(stateImpl).setEndOfNewerMessages(false)
verify(stateImpl, never()).setMessages(any())
verify(stateImpl, never()).upsertMessages(any())
verify(stateImpl, never()).setEndOfOlderMessages(any())
}

@Test
fun `should refresh cached latest messages when already inside search`() = runTest {
val existingMsg = randomMessage(id = "mid", createdAt = Date(1000L), createdLocallyAt = null)
whenever(stateImpl.messages).thenReturn(MutableStateFlow(listOf(existingMsg)))
whenever(stateImpl.insideSearch).thenReturn(MutableStateFlow(true))
val incomingMsg = randomMessage(id = "latest", createdAt = Date(5000L), createdLocallyAt = null)
val channel = randomChannel(
id = "123",
type = "messaging",
messages = listOf(incomingMsg),
members = emptyList(),
watchers = emptyList(),
read = emptyList(),
memberCount = 0,
watcherCount = 0,
)
sut.updateDataForChannel(channel = channel, messageLimit = 30)
verify(stateImpl).upsertCachedLatestMessages(listOf(incomingMsg))
verify(stateImpl, never()).setMessages(any())
verify(stateImpl, never()).upsertMessages(any())
verify(stateImpl, never()).setInsideSearch(any())
verify(stateImpl, never()).setEndOfNewerMessages(any())
}

@Test
fun `should replace messages when shouldRefreshMessages is true regardless of existing state`() = runTest {
val existingMsg = randomMessage(id = "old", createdAt = Date(1000L), createdLocallyAt = null)
whenever(stateImpl.messages).thenReturn(MutableStateFlow(listOf(existingMsg)))
val incomingMsg = randomMessage(id = "new", createdAt = Date(5000L), createdLocallyAt = null)
val channel = randomChannel(
id = "123",
type = "messaging",
messages = listOf(incomingMsg),
members = emptyList(),
watchers = emptyList(),
read = emptyList(),
memberCount = 0,
watcherCount = 0,
)
sut.updateDataForChannel(channel = channel, messageLimit = 30, shouldRefreshMessages = true)
verify(stateImpl).setMessages(listOf(incomingMsg))
verify(stateImpl, never()).upsertMessages(any())
verify(stateImpl, never()).upsertCachedLatestMessages(any())
}
}

// endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,56 @@ internal class ChannelStateImplMessagesTest {
}
}

@Nested
inner class UpsertCachedLatestMessages {

@Test
fun `upsertCachedLatestMessages with empty list should not change cache`() = runTest {
// given
val messages = createMessages(3)
channelState.setMessages(messages)
channelState.cacheLatestMessages()
channelState.setMessages(emptyList())
val before = channelState.toChannel().cachedLatestMessages
// when
channelState.upsertCachedLatestMessages(emptyList())
// then
assertEquals(before, channelState.toChannel().cachedLatestMessages)
}

@Test
fun `upsertCachedLatestMessages with all filtered messages should not change cache`() = runTest {
// given
val regularMsg = createMessage(1, timestamp = 1000)
channelState.setMessages(listOf(regularMsg))
channelState.cacheLatestMessages()
channelState.setMessages(emptyList())
val before = channelState.toChannel().cachedLatestMessages
// when — thread reply not shown in channel is always filtered out
val threadReply = createMessage(2, parentId = "parent1", showInChannel = false)
channelState.upsertCachedLatestMessages(listOf(threadReply))
// then
assertEquals(before, channelState.toChannel().cachedLatestMessages)
}

@Test
fun `upsertCachedLatestMessages should merge incoming messages into the cache`() = runTest {
// given
val msg1 = createMessage(1, timestamp = 1000)
val msg5 = createMessage(5, timestamp = 5000)
channelState.setMessages(listOf(msg1, msg5))
channelState.cacheLatestMessages()
channelState.setMessages(emptyList())
// when
val msg3 = createMessage(3, timestamp = 3000)
channelState.upsertCachedLatestMessages(listOf(msg3))
// then
val cachedMessages = channelState.toChannel().cachedLatestMessages
assertEquals(3, cachedMessages.size)
assertEquals(listOf("message_1", "message_3", "message_5"), cachedMessages.map { it.id })
}
}

@Nested
inner class GetMessageById {

Expand Down
Loading