Skip to content

Commit f7ff8e9

Browse files
committed
fix
1 parent 8e8335a commit f7ff8e9

1 file changed

Lines changed: 5 additions & 9 deletions

File tree

backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import org.apache.spark.sql.execution.unsafe.UnsafeColumnarBuildSideRelation
2828

2929
import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause, RemovalListener}
3030

31-
import java.util.concurrent.ConcurrentHashMap
3231
import java.util.concurrent.TimeUnit
3332

3433
case class BroadcastHashTable(pointer: Long, relation: BuildSideRelation)
@@ -48,9 +47,6 @@ object VeloxBroadcastBuildSideCache
4847
VeloxBackendSettings.GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME_DEFAULT
4948
)
5049

51-
// Track released pointers to prevent double free
52-
private val releasedPointers = ConcurrentHashMap.newKeySet[Long]()
53-
5450
// Use for controlling to build bhj hash table once.
5551
// key: hashtable id, value is hashtable backend pointer(long to string).
5652
private val buildSideRelationCache: Cache[String, BroadcastHashTable] =
@@ -98,9 +94,9 @@ object VeloxBroadcastBuildSideCache
9894
def cleanAll(): Unit = buildSideRelationCache.invalidateAll()
9995

10096
override def onRemoval(key: String, value: BroadcastHashTable, cause: RemovalCause): Unit = {
101-
// Use ConcurrentHashMap.add() which returns false if already present
102-
// This ensures only one thread can successfully mark the pointer as released
103-
if (releasedPointers.add(value.pointer)) {
97+
// Synchronize on the value object to ensure only one thread can release this specific hash table
98+
// This prevents concurrent calls to clearHashTable which can corrupt memory pool state
99+
value.synchronized {
104100
logWarning(s"Remove bhj $key = ${value.pointer}")
105101
if (value.relation != null) {
106102
value.relation match {
@@ -111,9 +107,9 @@ object VeloxBroadcastBuildSideCache
111107
}
112108
}
113109

110+
// clearHashTable must be called under synchronization to prevent
111+
// concurrent access to the same hash table's memory pool
114112
HashJoinBuilder.clearHashTable(value.pointer)
115-
} else {
116-
logWarning(s"Skip already released bhj $key = ${value.pointer}")
117113
}
118114
}
119115
}

0 commit comments

Comments
 (0)