Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions qracer/conversation/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from qracer.memory.memory_searcher import MemorySearcher
from qracer.models import ToolResult, TradeThesis
from qracer.risk.calculator import RiskCalculator
from qracer.risk.models import RebalanceAction
from qracer.tools import pipeline

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -69,7 +70,16 @@ async def handle(self, intent: Intent) -> HandlerResult:

calculator = RiskCalculator(self._portfolio_config)
snapshot = calculator.build_snapshot(prices)
exposure = calculator.build_exposure(snapshot)
breached = calculator.check_limits(snapshot, exposure)

text = format_portfolio(snapshot, language=self._language)

if breached:
suggestions = calculator.suggest_rebalance(snapshot, exposure)
if suggestions:
text += "\n\n" + _format_rebalance_suggestions(suggestions)

return HandlerResult(text=text, analysis=AnalysisResult(confidence=1.0, iterations=0))


Expand Down Expand Up @@ -213,3 +223,14 @@ async def handle(self, intent: Intent) -> HandlerResult:
# Synthesize response.
text = await self._synthesizer.synthesize(intent, analysis)
return HandlerResult(text=text, analysis=analysis)


def _format_rebalance_suggestions(suggestions: list[RebalanceAction]) -> str:
"""Format rebalancing suggestions for display."""
lines = ["Rebalancing Suggestions:"]
for s in suggestions:
if s.action == "reduce":
lines.append(f" REDUCE {s.ticker}: sell {abs(s.shares_delta):.0f} shares — {s.reason}")
else:
lines.append(f" ADD {s.ticker} — {s.reason}")
return "\n".join(lines)
2 changes: 2 additions & 0 deletions qracer/risk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
ExposureBreakdown,
HoldingSnapshot,
PortfolioSnapshot,
RebalanceAction,
RiskAssessment,
)

Expand All @@ -13,6 +14,7 @@
"ExposureBreakdown",
"HoldingSnapshot",
"PortfolioSnapshot",
"RebalanceAction",
"RiskAssessment",
"RiskCalculator",
"SectorResolver",
Expand Down
144 changes: 144 additions & 0 deletions qracer/risk/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ExposureBreakdown,
HoldingSnapshot,
PortfolioSnapshot,
RebalanceAction,
RiskAssessment,
)

Expand Down Expand Up @@ -311,6 +312,149 @@ def _compute_sector_weight(self, sector: str, snapshot: PortfolioSnapshot) -> fl
total += h.weight_pct
return total

# ------------------------------------------------------------------
# Rebalancing suggestions
# ------------------------------------------------------------------

_MIN_POSITION_WEIGHT_PCT = 5.0 # floor when reducing sector positions

def suggest_rebalance(
self,
snapshot: PortfolioSnapshot,
exposure: ExposureBreakdown,
) -> list[RebalanceAction]:
"""Suggest specific rebalancing actions for breached portfolio limits.

Handles two breach scenarios:
1. **Single-position breaches** — holdings exceeding the individual
position limit are reduced to that limit.
2. **Sector breaches** — for sectors over the sector limit, the
largest positions are progressively reduced (maintaining a 5%
minimum per position) until the sector is within limits.
"""
if not snapshot.holdings or snapshot.total_value <= 0:
return []

actions: list[RebalanceAction] = []
limits = self._config.limits

# 1. Single-position breaches
for h in snapshot.holdings:
if h.weight_pct > limits.max_single_position_pct:
target_value = snapshot.total_value * limits.max_single_position_pct / 100.0
excess_value = h.market_value - target_value
shares_to_sell = excess_value / h.current_price if h.current_price > 0 else 0.0
if shares_to_sell > 0:
actions.append(
RebalanceAction(
ticker=h.ticker,
action="reduce",
shares_delta=-round(shares_to_sell, 2),
reason=(
f"Position weight {h.weight_pct:.1f}% exceeds "
f"limit {limits.max_single_position_pct:.1f}%"
),
)
)

# 2. Sector breaches
for sector, sector_pct in exposure.sector_weights.items():
if sector_pct <= limits.max_sector_pct:
continue

excess_pct = sector_pct - limits.max_sector_pct
# Holdings in this sector, sorted largest-weight-first.
sector_holdings = sorted(
[h for h in snapshot.holdings if self._sectors.get_sector(h.ticker) == sector],
key=lambda h: h.weight_pct,
reverse=True,
)

remaining_excess = excess_pct
for h in sector_holdings:
if remaining_excess <= 0:
break
# Don't reduce a position below the floor.
reducible = max(0.0, h.weight_pct - self._MIN_POSITION_WEIGHT_PCT)
if reducible <= 0:
continue
cut_pct = min(reducible, remaining_excess)
cut_value = snapshot.total_value * cut_pct / 100.0
shares_to_sell = cut_value / h.current_price if h.current_price > 0 else 0.0
if shares_to_sell > 0:
# Avoid duplicate if already covered by single-position action.
already = any(a.ticker == h.ticker for a in actions)
if not already:
actions.append(
RebalanceAction(
ticker=h.ticker,
action="reduce",
shares_delta=-round(shares_to_sell, 2),
reason=(
f"Sector '{sector}' weight {sector_pct:.1f}% exceeds "
f"limit {limits.max_sector_pct:.1f}%"
),
)
)
remaining_excess -= cut_pct

return actions

def suggest_additions(
self,
candidates: list[str],
snapshot: PortfolioSnapshot,
corr_result: CorrelationResult | None = None,
) -> list[RebalanceAction]:
"""Suggest additions preferring tickers with low correlation to existing holdings.

Ranks *candidates* by their average absolute correlation against
current portfolio positions (lower is better) and returns the top
three as "add" suggestions.
"""
effective_corr = corr_result or self._last_correlation
if effective_corr is None or not snapshot.holdings:
# Without correlation data, return candidates as-is (up to 3).
return [
RebalanceAction(
ticker=t,
action="add",
shares_delta=0,
reason="No correlation data available — candidate for diversification",
)
for t in candidates[:3]
]

matrix = effective_corr.correlation_matrix
held_tickers = {h.ticker for h in snapshot.holdings}

scored: list[tuple[str, float]] = []
for candidate in candidates:
if candidate in held_tickers:
continue
candidate_corrs = matrix.get(candidate, {})
if not candidate_corrs:
# Unknown correlation — treat as moderately uncorrelated.
scored.append((candidate, 0.5))
continue
avg_corr = sum(
abs(candidate_corrs.get(h.ticker, 0.0)) for h in snapshot.holdings
) / len(snapshot.holdings)
scored.append((candidate, avg_corr))

# Sort by average correlation ascending (lowest = most diversifying).
scored.sort(key=lambda x: x[1])

return [
RebalanceAction(
ticker=ticker,
action="add",
shares_delta=0,
reason=f"Low avg correlation ({avg_corr:.2f}) with existing holdings",
)
for ticker, avg_corr in scored[:3]
]

def update_peak(self, current_value: float) -> float:
"""Update and return the peak portfolio value."""
if current_value > self._peak_value:
Expand Down
11 changes: 11 additions & 0 deletions qracer/risk/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ class ExposureBreakdown(BaseModel):
correlation_data_unavailable: bool = False


class RebalanceAction(BaseModel):
"""A single suggested rebalancing action."""

model_config = ConfigDict(frozen=True)

ticker: str
action: str # "reduce" or "add"
shares_delta: float # negative for sells, positive for buys
reason: str


class RiskAssessment(BaseModel):
"""Full risk assessment combining snapshot, exposure, and limit checks."""

Expand Down
Loading
Loading