Skip to content
21 changes: 16 additions & 5 deletions assets/js/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ import {hooks as colocatedHooks} from "phoenix-colocated/pulse"
import topbar from "../vendor/topbar"
import { domToBlob } from "modern-screenshot"

async function capturePortfolio(target) {
function todayStamp() {
const d = new Date()
const yyyy = d.getFullYear()
const mm = String(d.getMonth() + 1).padStart(2, "0")
const dd = String(d.getDate()).padStart(2, "0")
return `${yyyy}${mm}${dd}`
}

async function capturePortfolio(target, basename) {
target.classList.add("capture-mode")

// Wait one frame for layout to settle
Expand All @@ -35,7 +43,8 @@ async function capturePortfolio(target) {
try {
const bg = getComputedStyle(target).backgroundColor
const blob = await domToBlob(target, { scale: 2, backgroundColor: bg })
return new File([blob], "portfolio.png", { type: "image/png" })
const filename = `${basename || "pulse"}-${todayStamp()}.png`
return new File([blob], filename, { type: "image/png" })
} finally {
target.classList.remove("capture-mode")
}
Expand All @@ -52,11 +61,12 @@ const SaveImage = {
const setLabel = (msg) => { if (label) label.textContent = msg }
const target = document.getElementById("portfolio-capture")
if (!target) return
const basename = this.el.dataset.filename || "pulse"

setLabel(capturingLabel)

try {
const file = await capturePortfolio(target)
const file = await capturePortfolio(target, basename)

// Try native share with image (works on most mobile browsers)
if (navigator.canShare && navigator.canShare({ files: [file] })) {
Expand All @@ -68,7 +78,7 @@ const SaveImage = {
// Desktop fallback: download via anchor click
const a = document.createElement("a")
a.href = URL.createObjectURL(file)
a.download = "portfolio.png"
a.download = file.name
a.click()
URL.revokeObjectURL(a.href)
setLabel(savedLabel)
Expand All @@ -95,11 +105,12 @@ const ShareLink = {
const copiedLabel = this.el.dataset.labelCopied || "Copied!"
const setLabel = (msg) => { if (label) label.textContent = msg }
const target = document.getElementById("portfolio-capture")
const basename = this.el.dataset.filename || "pulse"

if (target) {
setLabel(capturingLabel)
try {
const file = await capturePortfolio(target)
const file = await capturePortfolio(target, basename)

if (navigator.canShare && navigator.canShare({ files: [file] })) {
await navigator.share({ files: [file], title, text, url })
Expand Down
15 changes: 12 additions & 3 deletions lib/pulse/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,20 @@ defmodule Pulse.Application do
{Pulse.Store, []},
{Pulse.Analytics, []},

# Dashboard aggregator (must start before NATS consumer)
# Radar OTP tree (mirror of the portfolio tree)
{Registry, keys: :unique, name: Pulse.RadarRegistry},
{Pulse.RadarSupervisor, []},
{Pulse.RadarStore, []},
{Pulse.RadarAnalytics, []},

# Aggregators (must start before NATS consumer so they're subscribed)
Pulse.DashboardAggregator,
Pulse.RadarAggregator,

# Restore persisted portfolios before NATS events arrive
{Task, fn -> Pulse.Store.restore_all() end},
# Restore persisted state before NATS events arrive. Two tasks need
# unique child IDs (the default id is just `Task`, which would collide).
Supervisor.child_spec({Task, fn -> Pulse.Store.restore_all() end}, id: :portfolio_restore),
Supervisor.child_spec({Task, fn -> Pulse.RadarStore.restore_all() end}, id: :radar_restore),

# NATS connection and consumer
Pulse.Nats.Connection,
Expand Down
45 changes: 44 additions & 1 deletion lib/pulse/nats/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ defmodule Pulse.Nats.Consumer do

require Logger

@subjects ~w(portfolio.updated portfolio.opted_in portfolio.opted_out stock.price_updated)
@subjects ~w(
portfolio.updated portfolio.opted_in portfolio.opted_out
radar.updated radar.opted_in radar.opted_out
stock.price_updated
)

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
Expand Down Expand Up @@ -123,6 +127,45 @@ defmodule Pulse.Nats.Consumer do
Phoenix.PubSub.broadcast(Pulse.PubSub, "stocks", {:price_updated, payload})
end

# ── Radar events (mirror of the portfolio path) ──────────────────────

defp handle_event("radar.opted_in", %{"slug" => slug} = payload) do
Logger.info("Radar opted in: #{slug}")

case Pulse.RadarSupervisor.start_worker(slug) do
{:ok, _pid} ->
if payload["stocks"], do: Pulse.RadarWorker.update_stocks(slug, payload)

{:error, {:already_started, _pid}} ->
Logger.info("Radar worker already exists for #{slug}, updating stocks")
if payload["stocks"], do: Pulse.RadarWorker.update_stocks(slug, payload)

{:error, reason} ->
Logger.error("Failed to start radar worker for #{slug}: #{inspect(reason)}")
end
end

defp handle_event("radar.opted_out", %{"slug" => slug}) do
Logger.info("Radar opted out: #{slug}")
Pulse.RadarStore.delete(slug)
Pulse.RadarAnalytics.delete(slug)
Pulse.RadarSupervisor.stop_worker(slug)
end

defp handle_event("radar.updated", %{"slug" => slug, "stocks" => _} = payload) do
Logger.info("Radar updated: #{slug}")

case Registry.lookup(Pulse.RadarRegistry, slug) do
[{_pid, _}] ->
Pulse.RadarWorker.update_stocks(slug, payload)

[] ->
Logger.warning("No radar worker for #{slug}, starting one")
Pulse.RadarSupervisor.start_worker(slug)
Pulse.RadarWorker.update_stocks(slug, payload)
end
end

defp handle_event(topic, _payload) do
Logger.warning("Unhandled NATS event: #{topic}")
end
Expand Down
155 changes: 155 additions & 0 deletions lib/pulse/radar_aggregator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
defmodule Pulse.RadarAggregator do
@moduledoc """
GenServer that aggregates community-wide radar stats across all active
`Pulse.RadarWorker` processes — mirror of `Pulse.DashboardAggregator`.

Powers the "Community watchlist" card on the Pulse dashboard:
- `most_watched` — top symbols by count of radars they appear in.
Includes average target price when the cohort has at least
`@min_cohort` users (avoids exposing a single user's preference).
- `below_community_target` — same list filtered to current_price <
avg_target, sorted by largest gap (consensus buy signals).
"""
use GenServer

require Logger

@recompute_debounce 500
@min_cohort 3
@top_n 10

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

def get_stats do
GenServer.call(__MODULE__, :get_stats)
end

def refresh do
GenServer.call(__MODULE__, :refresh)
end

@impl true
def init(_opts) do
Phoenix.PubSub.subscribe(Pulse.PubSub, "radars")
{:ok, %{stats: compute_stats(), debounce_ref: nil}}
end

@impl true
def handle_call(:get_stats, _from, state) do
{:reply, state.stats, state}
end

def handle_call(:refresh, _from, state) do
stats = compute_stats()
{:reply, stats, %{state | stats: stats}}
end

@impl true
def handle_info(:recompute, state) do
stats = compute_stats()
Phoenix.PubSub.broadcast(Pulse.PubSub, "dashboard", {:radar_dashboard_updated, stats})
{:noreply, %{state | stats: stats, debounce_ref: nil}}
end

def handle_info({:radar_changed, _slug}, state) do
if state.debounce_ref, do: Process.cancel_timer(state.debounce_ref)
ref = Process.send_after(self(), :recompute, @recompute_debounce)
{:noreply, %{state | debounce_ref: ref}}
end

def handle_info(_msg, state) do
{:noreply, state}
end

defp compute_stats do
children = DynamicSupervisor.which_children(Pulse.RadarSupervisor)

radars =
children
|> Enum.filter(fn {_, pid, _, _} -> is_pid(pid) end)
|> Enum.map(fn {_, pid, _, _} ->
try do
GenServer.call(pid, :get_radar, 2_000)
catch
:exit, _ -> nil
end
end)
|> Enum.reject(&is_nil/1)

all_stocks = Enum.flat_map(radars, & &1.stocks)

# Group by symbol so we can compute count + average target across users.
by_symbol = Enum.group_by(all_stocks, & &1["symbol"])

most_watched =
by_symbol
|> Enum.map(fn {symbol, stocks} ->
targets = stocks |> Enum.map(& &1["target_price"]) |> Enum.reject(&is_nil/1)
current_price = stocks |> Enum.find_value(& &1["price"])
avg_target = if length(targets) >= @min_cohort, do: avg(targets), else: nil

%{
symbol: symbol,
name: stocks |> Enum.find_value(& &1["name"]),
currency: stocks |> Enum.find_value(& &1["currency"]),
watchers: length(stocks),
current_price: current_price,
avg_target_price: avg_target
}
end)
|> Enum.sort_by(&(-&1.watchers))
|> Enum.take(@top_n)

below_community_target =
most_watched
|> Enum.filter(fn s ->
is_number(s.current_price) and is_number(s.avg_target_price) and
s.current_price < s.avg_target_price
end)
|> Enum.map(fn s ->
Map.put(s, :percent_below, percent_below(s.current_price, s.avg_target_price))
end)
|> Enum.sort_by(&(-&1.percent_below))
|> Enum.take(@top_n)

# Most-recently-updated radars. Workers without a updated_at (shouldn't
# happen but be defensive) sort to the end via a tuple-shaped key.
recent_radars =
radars
|> Enum.sort_by(
fn r ->
{if(r.updated_at, do: 0, else: 1), r.updated_at}
end,
fn
{0, a}, {0, b} -> DateTime.compare(a, b) == :gt
a, b -> a <= b
end
)
|> Enum.take(@top_n)
|> Enum.map(fn r ->
%{
slug: r.slug,
updated_at: r.updated_at,
stock_count: length(r.stocks),
below_target_count: r.metrics[:below_target_count] || 0
}
end)

%{
radar_count: length(radars),
total_targeted: by_symbol |> Map.keys() |> length(),
most_watched: most_watched,
below_community_target: below_community_target,
recent_radars: recent_radars
}
end

defp avg([]), do: nil
defp avg(list), do: Float.round(Enum.sum(list) / length(list), 2)

defp percent_below(price, target) do
Float.round((target - price) / target * 100, 1)
end
end
Loading
Loading