fix: release pooled DB connections after each operation to prevent MaxConnectionsExceeded#58
Conversation
…xConnectionsExceeded Peewee's PooledMySQLDatabase does not automatically return connections to the pool after execute_sql(). Under high concurrency this causes pool exhaustion and MaxConnectionsExceeded errors. - Add `manual_close()` to RetryingPooledMySQLDatabase to return connection to pool - Add `@release_connection` decorator to all OceanBaseRedisDb public methods - Split set_object/set_obj/deleteIfExpired into internal (_-prefixed) and public versions to avoid releasing connections inside atomic() transactions - Add `_release_connection()` to MysqlDistributedLock for doAcquire/delete_if_equal Made-with: Cursor
|
Related Documentation 2 document(s) may need updating based on files changed in this PR: PowerRAG's Space Database Interaction and Query OptimizationView Suggested Changes@@ -25,11 +25,156 @@
- **Reusing SQLAlchemy Column objects:** Attempting to use the same `Column` instance in multiple table or column operations leads to assignment errors. Always copy `Column` objects before use in DDL.
- **Mutable object reuse:** In general, avoid passing mutable objects to multiple database operations unless their state is reset or copied.
+- **Connection pool exhaustion:** Peewee's `PooledMySQLDatabase` does not automatically return connections to the pool after `execute_sql()` calls. Under high concurrency, this causes pool exhaustion and `MaxConnectionsExceeded` errors. Always ensure connections are released after database operations.
#### Best Practices
- Always use `.copy()` on SQLAlchemy `Column` objects before passing them to DDL methods.
- Review DDL-related code for inadvertent reuse of mutable objects.
+- Use the `@release_connection` decorator on all public database methods to ensure connections are returned to the pool after execution.
+- For operations inside `atomic()` transactions, split methods into internal (with `_` prefix) and public versions to avoid releasing connections prematurely within transactions.
+- Call `manual_close()` on `RetryingPooledMySQLDatabase` instances explicitly when needed to return connections to the pool.
+
+### Connection Pool Management
+
+PowerRAG uses Peewee's `PooledMySQLDatabase` for database connection pooling. A critical implementation detail is that Peewee does not automatically return connections to the pool after `execute_sql()` operations. Under high concurrency, this can lead to connection pool exhaustion and `MaxConnectionsExceeded` errors.
+
+#### The manual_close() Method
+
+The `RetryingPooledMySQLDatabase` class includes a `manual_close()` method that explicitly returns the current thread's connection to the pool:
+
+```python
+def manual_close(self):
+ """
+ Close the current thread's connection and return it to the pool.
+
+ In peewee's PooledMySQLDatabase, connections are not automatically returned
+ to the pool after execute_sql(). They only get returned when close() is called.
+ This method should be called after database operations to prevent connection
+ pool exhaustion, especially in scenarios with many concurrent operations.
+ """
+ if not self.is_closed():
+ self.close()
+```
+
+#### The @release_connection Decorator
+
+To automate connection release, PowerRAG provides a `@release_connection` decorator that ensures connections are returned to the pool after method execution:
+
+```python
+def release_connection(func):
+ """
+ Decorator to ensure database connection is returned to pool after method execution.
+
+ Peewee's PooledMySQLDatabase does not automatically return connections to the pool
+ after execute_sql(). This decorator ensures connections are properly released
+ to prevent connection pool exhaustion under high concurrency.
+ """
+ @wraps(func)
+ def wrapper(self, *args, **kwargs):
+ try:
+ return func(self, *args, **kwargs)
+ finally:
+ if isinstance(self.db, RetryingPooledMySQLDatabase):
+ self.db.manual_close()
+ return wrapper
+```
+
+#### Usage Pattern
+
+Apply the `@release_connection` decorator to all public database methods in classes like `OceanBaseRedisDb`:
+
+```python
+@release_connection
+def get(self, k):
+ if not self.db:
+ return None
+ try:
+ cursor = self.db.execute_sql(
+ 'select cache_value from cache where cache_key = %s and expire_time > now()',
+ (k,)
+ )
+ # ... processing logic ...
+ except Exception as e:
+ # ... error handling ...
+
+@release_connection
+def set(self, k, v, exp=3600):
+ try:
+ expire_time = datetime.now() + timedelta(seconds=exp)
+ self.db.execute_sql(
+ 'replace into cache (cache_key, cache_value, expire_time) values (%s, %s, %s)',
+ (k, v, expire_time)
+ )
+ return True
+ except Exception as e:
+ # ... error handling ...
+```
+
+#### Handling Atomic Transactions
+
+For methods that use `db.atomic()` transactions, split them into internal (with `_` prefix) and public versions. The internal version performs the database operations without releasing the connection, while the public version applies the decorator:
+
+```python
+def _set_object(self, k, obj, exp=3600):
+ """Internal version without connection release."""
+ expire_time = datetime.now() + timedelta(seconds=exp)
+ self.db.execute_sql(
+ 'replace into cache (cache_key, cache_value, expire_time) values (%s, %s, %s)',
+ (k, json.dumps(obj, ensure_ascii=False), expire_time)
+ )
+ return True
+
+@release_connection
+def set_object(self, k, obj, exp=3600):
+ """Public version with automatic connection release."""
+ return self._set_object(k, obj, exp)
+
+@release_connection
+def zadd(self, key: str, member: str, score: float):
+ try:
+ with self.db.atomic():
+ # ... transaction logic ...
+ # Call internal _set_object() to avoid releasing connection mid-transaction
+ return self._set_object(key, mp)
+ except Exception as e:
+ # ... error handling ...
+```
+
+This pattern prevents releasing connections inside transactions, which would cause errors, while ensuring connections are properly released after the transaction completes.
+
+#### Lock Operations
+
+For distributed lock operations in `MysqlDistributedLock`, manually release connections in the `finally` block of critical methods:
+
+```python
+def doAcquire(self):
+ try:
+ # ... lock acquisition logic ...
+ return True
+ except Exception as e:
+ # ... error handling ...
+ return False
+ finally:
+ self._release_connection()
+
+def delete_if_equal(self):
+ try:
+ # ... lock deletion logic ...
+ return True
+ except Exception as e:
+ # ... error handling ...
+ return False
+ finally:
+ self._release_connection()
+
+def _release_connection(self):
+ """Release the current thread's database connection back to the pool."""
+ if isinstance(self.db, RetryingPooledMySQLDatabase):
+ self.db.manual_close()
+```
+
+This pattern is critical for preventing connection pool exhaustion in high-concurrency scenarios and should be applied consistently across all database interaction code.
### Caching Table Existence Checks
✅ Accepted Error Handling and Bug FixesView Suggested Changes@@ -75,6 +75,26 @@
See [PR #56](https://github.com/oceanbase/powerrag/pull/56) for the fix.
+#### MaxConnectionsExceeded
+
+Under high concurrency, database operations may fail with `MaxConnectionsExceeded` errors due to connection pool exhaustion. This occurs because Peewee's `PooledMySQLDatabase` does not automatically return connections to the pool after `execute_sql()` calls. When many concurrent database operations occur, connections are not released back to the pool, causing the pool to become exhausted.
+
+**Resolution:**
+
+PR #58 addressed this issue by implementing connection release mechanisms:
+
+1. Added `manual_close()` method to `RetryingPooledMySQLDatabase` (in `api/db/db_models.py`) to explicitly return connections to the pool.
+2. Added `@release_connection` decorator to all public methods in `OceanBaseRedisDb` (in `rag/utils/ob_redis_conn.py`) to ensure connections are released after each operation.
+3. Split methods into internal (`_`-prefixed) and public versions to avoid releasing connections inside `atomic()` transactions, where connection state must be preserved.
+
+**Troubleshooting:**
+
+- If encountering connection pool exhaustion, ensure all public database methods use the `@release_connection` decorator.
+- For custom database operations using `execute_sql()`, call `manual_close()` after operations complete to return the connection to the pool.
+- Avoid releasing connections inside `atomic()` transactions. Use internal `_`-prefixed methods within transactions to maintain connection state.
+
+See [PR #58](https://github.com/oceanbase/powerrag/pull/58) for the complete fix.
+
### Connection Timeouts with PowerRAG Server
Connection timeouts and network anomalies can be caused by:✅ Accepted Note: You must be authenticated to accept/decline updates. |
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent OceanBase/MySQL connection pool exhaustion (e.g., MaxConnectionsExceeded) by explicitly returning Peewee pooled connections to the pool after raw execute_sql() usage in the OceanBase-backed cache/lock implementation.
Changes:
- Add
manual_close()toRetryingPooledMySQLDatabaseto explicitly close/return the current connection to the pool. - Add a
@release_connectiondecorator acrossOceanBaseRedisDbpublic methods and split some methods into internal/public variants to avoid releasing insideatomic()blocks. - Ensure
MysqlDistributedLockreleases the DB connection infinallyfor acquire/release-related operations.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
rag/utils/ob_redis_conn.py |
Introduces connection-release decorator/context manager, updates OceanBaseRedisDb methods to release connections, and adds lock-level release in finally. |
api/db/db_models.py |
Adds manual_close() helper to RetryingPooledMySQLDatabase for returning connections to the pool. |
You can also share your feedback on Copilot code review. Take the survey.
- Remove @release_connection from get_unacked_iterator since it is a generator; the decorator's finally block runs immediately on generator creation rather than after iteration, causing premature connection release - Remove unused db_connection context manager - Wrap manual_close() in try/except to avoid masking original exceptions when called from finally blocks Made-with: Cursor
There was a problem hiding this comment.
Pull request overview
This PR aims to prevent OceanBase/MySQL pooled connection exhaustion (and resulting MaxConnectionsExceeded errors) by explicitly returning Peewee pooled connections to the pool after cache/lock operations.
Changes:
- Added
manual_close()toRetryingPooledMySQLDatabaseto explicitly return the current thread’s connection to the pool. - Introduced a
@release_connectiondecorator and applied it to OceanBase-backed “Redis” DB public methods to close connections after each operation. - Split some methods into internal (
_-prefixed) variants to avoid releasing connections mid-atomic()transaction; added explicit connection release inMysqlDistributedLock.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 11 comments.
| File | Description |
|---|---|
rag/utils/ob_redis_conn.py |
Adds @release_connection, introduces internal helper methods for transactional safety, and ensures lock operations release pooled connections. |
api/db/db_models.py |
Adds manual_close() to explicitly close/return pooled connections after DB operations. |
Comments suppressed due to low confidence (1)
rag/utils/ob_redis_conn.py:362
- This
execute_sqlcall passeskeydirectly as params. Wrap it as(key,)for correct single-parameter binding.
cursor = self.db.execute_sql("select cache_value from cache where cache_key = %s and expire_time > "
"now()", key)
ret = cursor.fetchone()
You can also share your feedback on Copilot code review. Take the survey.
- Log at debug level in manual_close() instead of silently swallowing exceptions, to aid troubleshooting connection pool issues - Fix queue_info() comparing fetchone() result directly with 0; fetchone() returns a tuple like (count,), so use ret[0] == 0 instead Made-with: Cursor
Peewee's PooledMySQLDatabase does not automatically return connections to the pool after execute_sql(). Under high concurrency this causes pool exhaustion and MaxConnectionsExceeded errors.
manual_close()to RetryingPooledMySQLDatabase to return connection to pool@release_connectiondecorator to all OceanBaseRedisDb public methods_release_connection()to MysqlDistributedLock for doAcquire/delete_if_equalMade-with: Cursor