Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ FBKit centralizes mutation safety in `agent/services/safety_gate.py`.
| `WS_AUTH_ENABLED` | follows `API_AUTH_ENABLED` | Must be `true` before creating live arms or approving live tasks |
| `FBKIT_NODE_ID` | `hostname:pid` | Optional worker identity for lease/status visibility; must be unique per worker process sharing one DB |
| `LIVE_ACCOUNT_LEASE_TTL_SECONDS` | `900` | Live account lease TTL for live mutating tasks; clamped to `60`-`3600` seconds |
| `LIVE_ACCOUNT_LEASE_HEARTBEAT_SECONDS` | `60` | Live account lease refresh interval while a live mutating task is processing; clamped to `5`-`300` seconds |

Mutating task types include posting, messaging, liking, commenting, sharing, friend actions, group actions, page follow/unfollow, and video reup tasks.

Expand All @@ -144,6 +145,7 @@ Additional protections:
- live quota is reserved before live dispatch, skipped for dry-run tasks, and not reserved until live auth, arm, and extension guard readiness all pass
- live quota reservation is date-scoped and idempotent for the same task retry; dry-run tasks remain exempt from live quota reservation
- the worker uses a SQLite-backed live account lease to block same-account live mutating non-dry-run claims across workers sharing one DB; same-account dry-run/read-only work remains exempt
- the worker refreshes the matching SQLite live account lease during processing so long live mutating tasks do not expire while still running
- the worker still reports process-local `active_live_account_ids` as telemetry/defense-in-depth, but the DB lease is the cross-worker guard
- `FBClient` requires exact `fb_uid` routing when a task targets a specific Facebook account
- `FBClient` marks sessions stale by heartbeat age and prefers the freshest duplicate session for exact `fb_uid` routing
Expand Down
6 changes: 6 additions & 0 deletions agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ def _clamped_int(value: str | None, default: int, minimum: int, maximum: int) ->
minimum=60,
maximum=3600,
)
LIVE_ACCOUNT_LEASE_HEARTBEAT_SECONDS = _clamped_int(
os.environ.get("LIVE_ACCOUNT_LEASE_HEARTBEAT_SECONDS"),
default=60,
minimum=5,
maximum=300,
)

# ─── Safety Gate ──────────────────────────────────────────────
# Defaults protect personal accounts from accidental live mutations.
Expand Down
38 changes: 38 additions & 0 deletions agent/db/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,17 @@ async def get_task(task_id: str) -> dict | None:
return _row_to_dict(await cur.fetchone())


async def get_task_by_ref_id(ref_id: str) -> dict | None:
db = await get_db()
cur = await db.execute("SELECT * FROM task WHERE ref_id = ?", (ref_id,))
return _row_to_dict(await cur.fetchone())


async def rollback():
db = await get_db()
await db.rollback()


async def list_tasks(status: str = None, task_type: str = None, account_id: str = None) -> list[dict]:
db = await get_db()
conditions, params = [], []
Expand Down Expand Up @@ -672,6 +683,33 @@ async def release_live_account_lease(account_id: str, task_id: str, node_id: str
return cur.rowcount == 1


async def refresh_live_account_lease(
account_id: str,
task_id: str,
node_id: str,
ttl_seconds: int | None = None,
) -> dict | None:
"""Extend the lease held by the matching account/task/node tuple."""
if not account_id or not task_id or not node_id:
return None
db = await get_db()
now = utc_now_iso()
expires_at = _lease_expires_at(ttl_seconds)
cur = await db.execute(
"UPDATE live_account_lease SET heartbeat_at = ?, expires_at = ? "
"WHERE account_id = ? AND task_id = ? AND node_id = ? AND expires_at > ?",
(now, expires_at, account_id, task_id, node_id, now),
)
await db.commit()
if cur.rowcount != 1:
return None
cur = await db.execute(
"SELECT * FROM live_account_lease WHERE account_id = ? AND task_id = ? AND node_id = ?",
(account_id, task_id, node_id),
)
return _row_to_dict(await cur.fetchone())


async def list_active_live_account_leases() -> list[dict]:
db = await get_db()
cur = await db.execute(
Expand Down
1 change: 1 addition & 0 deletions agent/db/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
CREATE INDEX IF NOT EXISTS idx_task_priority ON task(priority DESC);
CREATE INDEX IF NOT EXISTS idx_task_status_scheduled_priority ON task(status, scheduled_at, priority DESC);
CREATE INDEX IF NOT EXISTS idx_task_account_status ON task(account_id, status);
CREATE UNIQUE INDEX IF NOT EXISTS idx_task_zoopost_ref ON task(ref_id) WHERE ref_id LIKE 'zoopost:%';

-- ─── FB Group ───────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS fb_group (
Expand Down
Loading
Loading