diff --git a/app/api/messages/read/route.ts b/app/api/messages/read/route.ts new file mode 100644 index 0000000..6ba7da9 --- /dev/null +++ b/app/api/messages/read/route.ts @@ -0,0 +1,184 @@ +import { createClient } from "@/lib/supabase/server" +import { type NextRequest, NextResponse } from "next/server" + +/** + * POST /api/messages/read + * Mark one or more messages as read by the authenticated user. + * Body: { message_ids: string[], room_id: string } + */ +export async function POST(request: NextRequest) { + try { + const supabase = await createClient() + + const { + data: { user }, + } = await supabase.auth.getUser() + + if (!user) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }) + } + + const body = await request.json() + const { message_ids, room_id } = body + + if (!message_ids || !Array.isArray(message_ids) || message_ids.length === 0) { + return NextResponse.json( + { error: "message_ids must be a non-empty array" }, + { status: 400 }, + ) + } + + if (!room_id) { + return NextResponse.json( + { error: "room_id is required" }, + { status: 400 }, + ) + } + + // Verify user is an active member of this room + const { data: membership, error: memberErr } = await supabase + .from("room_members") + .select("id, removed_at") + .eq("room_id", room_id) + .eq("user_id", user.id) + .maybeSingle() + + if (memberErr) throw memberErr + + if (!membership || membership.removed_at) { + return NextResponse.json( + { error: "Forbidden. You are not an active member of this room." }, + { status: 403 }, + ) + } + + // Upsert read receipts (ON CONFLICT DO NOTHING — don't overwrite earlier reads) + const readRows = message_ids.map((message_id: string) => ({ + message_id, + user_id: user.id, + })) + + const { data, error } = await supabase + .from("message_reads") + .upsert(readRows, { + onConflict: "message_id,user_id", + ignoreDuplicates: true, + }) + .select() + + if (error) throw error + + return NextResponse.json({ + success: true, + reads: data, + }) + } catch (error) { + console.error("[v0] POST /api/messages/read error:", error) + return NextResponse.json( + { error: "Failed to mark messages as read" }, + { status: 500 }, + ) + } +} + +/** + * GET /api/messages/read + * Fetch read receipts for messages in a room. + * Query: ?room_id=...&message_ids=id1,id2,id3 + */ +export async function GET(request: NextRequest) { + try { + const supabase = await createClient() + + const { + data: { user }, + } = await supabase.auth.getUser() + + if (!user) { + return NextResponse.json({ error: "Unauthorized" }, { status: 401 }) + } + + const { searchParams } = new URL(request.url) + const roomId = searchParams.get("room_id") + const messageIdsParam = searchParams.get("message_ids") + + if (!roomId) { + return NextResponse.json( + { error: "room_id is required" }, + { status: 400 }, + ) + } + + // Verify room membership + const { data: membership, error: memberErr } = await supabase + .from("room_members") + .select("id, removed_at") + .eq("room_id", roomId) + .eq("user_id", user.id) + .maybeSingle() + + if (memberErr) throw memberErr + + if (!membership || membership.removed_at) { + return NextResponse.json( + { error: "Forbidden. You are not an active member of this room." }, + { status: 403 }, + ) + } + + // Build query — optionally filter by specific message IDs + let query = supabase + .from("message_reads") + .select("message_id, user_id, read_at") + + if (messageIdsParam) { + const messageIds = messageIdsParam.split(",").filter(Boolean) + if (messageIds.length > 0) { + query = query.in("message_id", messageIds) + } + } + + // Only return reads for messages in the specified room + // We need to join through messages to filter by room_id + // Since Supabase doesn't support filtering through foreign keys in .select, + // we query the message IDs in this room first, then filter reads + const { data: roomMessages, error: roomMsgErr } = await supabase + .from("messages") + .select("id") + .eq("room_id", roomId) + + if (roomMsgErr) throw roomMsgErr + + const roomMessageIds = roomMessages?.map((m) => m.id) || [] + + if (roomMessageIds.length === 0) { + return NextResponse.json({ reads: [] }) + } + + // If specific message_ids were requested, intersect with room messages + let targetIds = roomMessageIds + if (messageIdsParam) { + const requestedIds = new Set(messageIdsParam.split(",").filter(Boolean)) + targetIds = roomMessageIds.filter((id) => requestedIds.has(id)) + } + + if (targetIds.length === 0) { + return NextResponse.json({ reads: [] }) + } + + const { data: reads, error: readsErr } = await supabase + .from("message_reads") + .select("message_id, user_id, read_at") + .in("message_id", targetIds) + + if (readsErr) throw readsErr + + return NextResponse.json({ reads: reads || [] }) + } catch (error) { + console.error("[v0] GET /api/messages/read error:", error) + return NextResponse.json( + { error: "Failed to fetch read receipts" }, + { status: 500 }, + ) + } +} diff --git a/components/MessageItem.tsx b/components/MessageItem.tsx index d6719d6..ce0b7a1 100644 --- a/components/MessageItem.tsx +++ b/components/MessageItem.tsx @@ -1,16 +1,18 @@ import React from 'react'; -import { Message } from '@/src/types/message'; +import { Message, ReadReceipt } from '@/src/types/message'; import { EncryptionBadge } from './EncryptionBadge'; +import { ReadReceiptIndicator } from '@/src/components/ReadReceiptIndicator'; interface Props { message: Message; + readReceipts?: ReadReceipt[]; } function formatTimestamp(date: Date): string { return date.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' }); } -export const MessageItem: React.FC = ({ message }) => { +export const MessageItem: React.FC = ({ message, readReceipts = [] }) => { return (
@@ -22,6 +24,10 @@ export const MessageItem: React.FC = ({ message }) => {
{formatTimestamp(message.timestamp)} {message.isEncrypted && } +
); diff --git a/lib/websocket/chat-hooks.tsx b/lib/websocket/chat-hooks.tsx index 93fa60c..f81ef67 100644 --- a/lib/websocket/chat-hooks.tsx +++ b/lib/websocket/chat-hooks.tsx @@ -12,7 +12,7 @@ interface RealtimeMessageUpdate { displayName: string content: string createdAt: number - status: "sending" | "sent" | "delivered" + status: "sending" | "sent" | "delivered" | "read" } export interface TypingIndicator { diff --git a/lib/websocket/client.ts b/lib/websocket/client.ts index 80059ca..abdfc32 100644 --- a/lib/websocket/client.ts +++ b/lib/websocket/client.ts @@ -259,6 +259,17 @@ export class WebSocketClient { payload: { messageId, roomId }, timestamp: Date.now(), }); + + /** + * Mark one or more messages as read in a room. + * Sends a batched read receipt via WebSocket for real-time propagation. + */ + markAsRead = (roomId: string, messageIds: string[]) => + this.send({ + type: "mark_read", + payload: { roomId, messageIds }, + timestamp: Date.now(), + }); } let instance: WebSocketClient | null = null; diff --git a/lib/websocket/hooks.ts b/lib/websocket/hooks.ts index c5f1d34..6f2c072 100644 --- a/lib/websocket/hooks.ts +++ b/lib/websocket/hooks.ts @@ -175,5 +175,12 @@ export function useWebSocketSend() { markAsDelivered: useCallback((messageId: string, roomId: string) => { client.current.markAsDelivered(messageId, roomId); }, []), + /** + * Mark one or more messages as read in a room. + * Batched for efficiency — accepts an array of message IDs. + */ + markAsRead: useCallback((roomId: string, messageIds: string[]) => { + client.current.markAsRead(roomId, messageIds); + }, []), }; } diff --git a/lib/websocket/server.ts b/lib/websocket/server.ts index 872c90d..46cc5c3 100644 --- a/lib/websocket/server.ts +++ b/lib/websocket/server.ts @@ -267,6 +267,36 @@ export function createWebSocketServer(port: number = 3001) { break } + case "mark_read": { + const readRoomId = message.payload.roomId + const readMessageIds = message.payload.messageIds + const readUserId = connection.userId + + if (!readUserId) { + ws.send( + JSON.stringify({ + type: "error", + payload: { message: "Not authenticated" }, + timestamp: Date.now(), + }), + ) + break + } + + // Broadcast read receipt to all room members (except sender) + broadcastToRoom(readRoomId, { + type: "message_read", + payload: { + roomId: readRoomId, + messageIds: readMessageIds, + userId: readUserId, + readAt: Date.now(), + }, + timestamp: Date.now(), + }, clientId) + break + } + case "typing": { const typingRoomId = message.payload.roomId diff --git a/scripts/migrations/add_message_reads_table.sql b/scripts/migrations/add_message_reads_table.sql new file mode 100644 index 0000000..e2da308 --- /dev/null +++ b/scripts/migrations/add_message_reads_table.sql @@ -0,0 +1,36 @@ +-- Migration: Add message_reads table for per-user read receipts +-- Run this against your Supabase project via the SQL editor or CLI + +CREATE TABLE IF NOT EXISTS message_reads ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + message_id UUID NOT NULL REFERENCES messages(id) ON DELETE CASCADE, + user_id UUID NOT NULL, + read_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (message_id, user_id) +); + +-- Enable Row Level Security +ALTER TABLE message_reads ENABLE ROW LEVEL SECURITY; + +-- Policy: users can insert their own read receipts +CREATE POLICY "users_insert_own_reads" + ON message_reads FOR INSERT + WITH CHECK (auth.uid() = user_id); + +-- Policy: room members can view read receipts for messages in their rooms +CREATE POLICY "room_members_select_reads" + ON message_reads FOR SELECT + USING ( + EXISTS ( + SELECT 1 FROM messages m + JOIN room_members rm ON rm.room_id = m.room_id + WHERE m.id = message_reads.message_id + AND rm.user_id = auth.uid() + AND rm.removed_at IS NULL + ) + ); + +-- Indexes for performant lookups +CREATE INDEX IF NOT EXISTS idx_message_reads_message_id ON message_reads(message_id); +CREATE INDEX IF NOT EXISTS idx_message_reads_user_id ON message_reads(user_id); +CREATE INDEX IF NOT EXISTS idx_message_reads_message_user ON message_reads(message_id, user_id); diff --git a/src/components/ChatWindow.tsx b/src/components/ChatWindow.tsx index 6955c84..00dfec8 100644 --- a/src/components/ChatWindow.tsx +++ b/src/components/ChatWindow.tsx @@ -1,4 +1,4 @@ -import React, { useCallback, useState } from 'react'; +import React, { useCallback, useState, useEffect } from 'react'; import { MessageList } from './MessageList'; import { MessageInput } from './MessageInput'; import { useMessages } from '../hooks/useMessages'; @@ -33,6 +33,13 @@ export const ChatWindow: React.FC = ({ useChatSubscription(sdk, addMessage); + // Read receipts — auto-tracks visibility and syncs in real-time + const { readReceipts } = useReadReceipts({ + roomId: roomId || "", + messages, + currentUserId, + }); + const handleSend = useCallback(async (text: string) => { addMessage({ text, sender: walletAddress, isOwn: true, isEncrypted: true }); try { diff --git a/src/components/MessageList.tsx b/src/components/MessageList.tsx index 709e9c0..650444d 100644 --- a/src/components/MessageList.tsx +++ b/src/components/MessageList.tsx @@ -1,5 +1,5 @@ import React, { useEffect, useRef, useCallback, useState } from 'react'; -import { Message } from '../types/message'; +import { Message, ReadReceipt } from '../types/message'; import { MessageItem } from '@/components/MessageItem'; interface Props { @@ -9,6 +9,7 @@ interface Props { isLoadingMore?: boolean; hasMore?: boolean; firstMessageId?: string | null; + readReceipts?: Map; } export const MessageList: React.FC = ({ @@ -18,6 +19,7 @@ export const MessageList: React.FC = ({ isLoadingMore = false, hasMore = false, firstMessageId, + readReceipts, }) => { const bottomRef = useRef(null); const scrollContainerRef = useRef(null); @@ -128,7 +130,7 @@ export const MessageList: React.FC = ({ ref={index === 0 ? firstMessageRef : undefined} data-message-id={msg.id} > - +
))} diff --git a/src/components/ReadReceiptIndicator.tsx b/src/components/ReadReceiptIndicator.tsx new file mode 100644 index 0000000..153c3e4 --- /dev/null +++ b/src/components/ReadReceiptIndicator.tsx @@ -0,0 +1,139 @@ +import React, { useMemo } from "react"; +import { ReadReceipt } from "@/src/types/message"; + +interface ReadReceiptIndicatorProps { + /** Whether this message was sent by the current user */ + isOwn: boolean; + /** Read receipts for this message */ + readReceipts: ReadReceipt[]; + /** Message delivery status */ + status?: "sending" | "sent" | "delivered" | "read"; +} + +/** + * Displays read receipt indicators for messages: + * - Single grey tick: sent + * - Double grey ticks: delivered + * - Double blue ticks: read (with count for groups) + * Only shown on the sender's own messages. + */ +export const ReadReceiptIndicator: React.FC = ({ + isOwn, + readReceipts, + status, +}) => { + // Only show indicators on own messages + if (!isOwn) return null; + + const readCount = readReceipts.length; + const isRead = readCount > 0; + + // Determine effective status + const effectiveStatus = isRead ? "read" : status || "sent"; + + // Format tooltip with reader info + const tooltip = useMemo(() => { + if (!isRead) return ""; + + if (readCount === 1) { + const readAt = readReceipts[0].readAt; + return `Read at ${readAt.toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" })}`; + } + + return `Read by ${readCount} ${readCount === 1 ? "person" : "people"}`; + }, [readReceipts, readCount, isRead]); + + return ( + + {effectiveStatus === "sending" && ( + + )} + + {effectiveStatus === "sent" && ( + + )} + + {effectiveStatus === "delivered" && ( + + )} + + {effectiveStatus === "read" && ( + <> + + {readCount > 1 && ( + + {readCount} + + )} + + )} + + ); +}; + +// --- SVG Icon components --- + +function ClockIcon({ className }: { className?: string }) { + return ( + + + + + ); +} + +function SingleCheck({ className }: { className?: string }) { + return ( + + + + ); +} + +function DoubleCheck({ className }: { className?: string }) { + return ( + + + + + ); +} diff --git a/src/hooks/useReadReceipts.ts b/src/hooks/useReadReceipts.ts new file mode 100644 index 0000000..4106691 --- /dev/null +++ b/src/hooks/useReadReceipts.ts @@ -0,0 +1,255 @@ +"use client"; + +import { useEffect, useRef, useCallback, useState } from "react"; +import { Message, ReadReceipt } from "../types/message"; +import { useWebSocketSend, useWebSocketMessage } from "@/lib/websocket/hooks"; +import { WebSocketMessage } from "@/types/websocket"; + +interface UseReadReceiptsOptions { + roomId: string; + messages: Message[]; + currentUserId?: string; + /** CSS selector for the scroll container. Defaults to the first scrollable parent. */ + scrollContainerSelector?: string; +} + +interface UseReadReceiptsReturn { + /** Map of messageId → array of read receipts */ + readReceipts: Map; + /** Get the count of users who read a specific message */ + getReadCount: (messageId: string) => number; + /** Check if a specific user has read a specific message */ + hasUserRead: (messageId: string, userId: string) => boolean; +} + +const BATCH_DELAY_MS = 500; + +export function useReadReceipts( + options: UseReadReceiptsOptions, +): UseReadReceiptsReturn { + const { roomId, messages, currentUserId } = options; + + const [readReceipts, setReadReceipts] = useState>( + new Map(), + ); + + const { markAsRead } = useWebSocketSend(); + + // Batch pending message IDs to mark as read + const pendingReadIdsRef = useRef>(new Set()); + const batchTimerRef = useRef | null>(null); + const observerRef = useRef(null); + const observedElementsRef = useRef>(new Set()); + + // Flush batched read receipts to API + WebSocket + const flushReadBatch = useCallback(() => { + const ids = Array.from(pendingReadIdsRef.current); + if (ids.length === 0 || !roomId) return; + + pendingReadIdsRef.current.clear(); + + // Persist to database + fetch("/api/messages/read", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ message_ids: ids, room_id: roomId }), + }).catch((err) => + console.error("[useReadReceipts] Failed to persist reads:", err), + ); + + // Broadcast via WebSocket for real-time updates + markAsRead(roomId, ids); + + // Optimistically update local state + if (currentUserId) { + setReadReceipts((prev) => { + const updated = new Map(prev); + const now = new Date(); + for (const id of ids) { + const existing = updated.get(id) || []; + // Don't add duplicate + if (!existing.some((r) => r.userId === currentUserId)) { + updated.set(id, [...existing, { userId: currentUserId, readAt: now }]); + } + } + return updated; + }); + } + }, [roomId, markAsRead, currentUserId]); + + // Schedule a batched flush + const scheduleFlush = useCallback(() => { + if (batchTimerRef.current) { + clearTimeout(batchTimerRef.current); + } + batchTimerRef.current = setTimeout(flushReadBatch, BATCH_DELAY_MS); + }, [flushReadBatch]); + + // Mark a message as read (add to pending batch) + const markMessageRead = useCallback( + (messageId: string) => { + if (!currentUserId) return; + + // Skip if we already know this user read it + const existing = readReceipts.get(messageId); + if (existing?.some((r) => r.userId === currentUserId)) return; + + pendingReadIdsRef.current.add(messageId); + scheduleFlush(); + }, + [currentUserId, readReceipts, scheduleFlush], + ); + + // Setup IntersectionObserver for auto-detecting visible messages + useEffect(() => { + if (!currentUserId) return; + + observerRef.current = new IntersectionObserver( + (entries) => { + for (const entry of entries) { + if (entry.isIntersecting) { + const messageId = (entry.target as HTMLElement).dataset.messageId; + if (messageId) { + markMessageRead(messageId); + } + } + } + }, + { + threshold: 0.5, // At least 50% visible + rootMargin: "0px", + }, + ); + + return () => { + observerRef.current?.disconnect(); + observedElementsRef.current.clear(); + }; + }, [currentUserId, markMessageRead]); + + // Observe/unobserve message elements as messages change + useEffect(() => { + const observer = observerRef.current; + if (!observer || !currentUserId) return; + + // Find all message elements in the DOM + const messageElements = document.querySelectorAll("[data-message-id]"); + const currentElements = new Set(); + + messageElements.forEach((el) => { + currentElements.add(el); + const messageId = (el as HTMLElement).dataset.messageId; + if (!messageId) return; + + // Find the message to check if it's our own + const msg = messages.find((m) => m.id === messageId); + if (!msg || msg.isOwn) return; // Don't track reads on own messages + + // Only observe if not already observed + if (!observedElementsRef.current.has(el)) { + observer.observe(el); + observedElementsRef.current.add(el); + } + }); + + // Unobserve elements that are no longer in the DOM + observedElementsRef.current.forEach((el) => { + if (!currentElements.has(el)) { + observer.unobserve(el); + observedElementsRef.current.delete(el); + } + }); + }, [messages, currentUserId]); + + // Listen for incoming read receipts from other users + useWebSocketMessage("message_read", (msg: WebSocketMessage) => { + const payload = msg.payload as { + roomId: string; + messageIds: string[]; + userId: string; + readAt: number; + }; + + if (payload.roomId !== roomId) return; + + setReadReceipts((prev) => { + const updated = new Map(prev); + const readAt = new Date(payload.readAt); + + for (const messageId of payload.messageIds) { + const existing = updated.get(messageId) || []; + // Don't add duplicate + if (!existing.some((r) => r.userId === payload.userId)) { + updated.set(messageId, [ + ...existing, + { userId: payload.userId, readAt }, + ]); + } + } + + return updated; + }); + }); + + // Fetch existing read receipts when room changes or messages load + useEffect(() => { + if (!roomId || messages.length === 0) return; + + const messageIds = messages.map((m) => m.id).join(","); + + fetch( + `/api/messages/read?room_id=${encodeURIComponent(roomId)}&message_ids=${encodeURIComponent(messageIds)}`, + ) + .then((res) => res.json()) + .then((data) => { + if (data.reads && Array.isArray(data.reads)) { + const receiptsMap = new Map(); + for (const read of data.reads) { + const existing = receiptsMap.get(read.message_id) || []; + existing.push({ + userId: read.user_id, + readAt: new Date(read.read_at), + }); + receiptsMap.set(read.message_id, existing); + } + setReadReceipts(receiptsMap); + } + }) + .catch((err) => + console.error("[useReadReceipts] Failed to fetch reads:", err), + ); + }, [roomId, messages.length]); + + // Cleanup on unmount + useEffect(() => { + return () => { + if (batchTimerRef.current) { + clearTimeout(batchTimerRef.current); + } + // Flush any remaining pending reads + flushReadBatch(); + }; + }, [flushReadBatch]); + + const getReadCount = useCallback( + (messageId: string): number => { + return readReceipts.get(messageId)?.length || 0; + }, + [readReceipts], + ); + + const hasUserRead = useCallback( + (messageId: string, userId: string): boolean => { + return ( + readReceipts.get(messageId)?.some((r) => r.userId === userId) || false + ); + }, + [readReceipts], + ); + + return { + readReceipts, + getReadCount, + hasUserRead, + }; +} diff --git a/src/types/message.ts b/src/types/message.ts index 8e6a9a9..402db1c 100644 --- a/src/types/message.ts +++ b/src/types/message.ts @@ -1,3 +1,8 @@ +export interface ReadReceipt { + userId: string; + readAt: Date; +} + export interface Message { id: string; text: string; @@ -5,4 +10,5 @@ export interface Message { timestamp: Date; isOwn: boolean; isEncrypted: boolean; + readBy?: ReadReceipt[]; } diff --git a/types/websocket.ts b/types/websocket.ts index 352148d..8b3e1a6 100644 --- a/types/websocket.ts +++ b/types/websocket.ts @@ -2,6 +2,7 @@ export type WebSocketServerEventType = | "message" | "message_status_update" + | "message_read" | "room_join" | "room_leave" | "user_typing" @@ -19,6 +20,7 @@ export type WebSocketClientEventType = | "leave_room" | "send_message" | "message_delivered" + | "mark_read" | "typing" | "stop_typing" | "wallet_event" @@ -53,7 +55,14 @@ export interface ChatMessage { avatarUrl?: string content: string createdAt: number - status?: "sending" | "sent" | "delivered" + status?: "sending" | "sent" | "delivered" | "read" +} + +export interface ReadReceiptPayload { + roomId: string + messageIds: string[] + userId: string + readAt: number } export interface RoomMember {