Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions app/api/messages/read/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import { createClient } from "@/lib/supabase/server"
import { type NextRequest, NextResponse } from "next/server"

/**
* POST /api/messages/read
* Mark one or more messages as read by the authenticated user.
* Body: { message_ids: string[], room_id: string }
*/
export async function POST(request: NextRequest) {
try {
const supabase = await createClient()

const {
data: { user },
} = await supabase.auth.getUser()

if (!user) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 })
}

const body = await request.json()
const { message_ids, room_id } = body

if (!message_ids || !Array.isArray(message_ids) || message_ids.length === 0) {
return NextResponse.json(
{ error: "message_ids must be a non-empty array" },
{ status: 400 },
)
}

if (!room_id) {
return NextResponse.json(
{ error: "room_id is required" },
{ status: 400 },
)
}

// Verify user is an active member of this room
const { data: membership, error: memberErr } = await supabase
.from("room_members")
.select("id, removed_at")
.eq("room_id", room_id)
.eq("user_id", user.id)
.maybeSingle()

if (memberErr) throw memberErr

if (!membership || membership.removed_at) {
return NextResponse.json(
{ error: "Forbidden. You are not an active member of this room." },
{ status: 403 },
)
}

// Upsert read receipts (ON CONFLICT DO NOTHING — don't overwrite earlier reads)
const readRows = message_ids.map((message_id: string) => ({
message_id,
user_id: user.id,
}))

const { data, error } = await supabase
.from("message_reads")
.upsert(readRows, {
onConflict: "message_id,user_id",
ignoreDuplicates: true,
})
.select()

if (error) throw error

return NextResponse.json({
success: true,
reads: data,
})
} catch (error) {
console.error("[v0] POST /api/messages/read error:", error)
return NextResponse.json(
{ error: "Failed to mark messages as read" },
{ status: 500 },
)
}
}

/**
* GET /api/messages/read
* Fetch read receipts for messages in a room.
* Query: ?room_id=...&message_ids=id1,id2,id3
*/
export async function GET(request: NextRequest) {
try {
const supabase = await createClient()

const {
data: { user },
} = await supabase.auth.getUser()

if (!user) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 })
}

const { searchParams } = new URL(request.url)
const roomId = searchParams.get("room_id")
const messageIdsParam = searchParams.get("message_ids")

if (!roomId) {
return NextResponse.json(
{ error: "room_id is required" },
{ status: 400 },
)
}

// Verify room membership
const { data: membership, error: memberErr } = await supabase
.from("room_members")
.select("id, removed_at")
.eq("room_id", roomId)
.eq("user_id", user.id)
.maybeSingle()

if (memberErr) throw memberErr

if (!membership || membership.removed_at) {
return NextResponse.json(
{ error: "Forbidden. You are not an active member of this room." },
{ status: 403 },
)
}

// Build query — optionally filter by specific message IDs
let query = supabase
.from("message_reads")
.select("message_id, user_id, read_at")

if (messageIdsParam) {
const messageIds = messageIdsParam.split(",").filter(Boolean)
if (messageIds.length > 0) {
query = query.in("message_id", messageIds)
}
}

// Only return reads for messages in the specified room
// We need to join through messages to filter by room_id
// Since Supabase doesn't support filtering through foreign keys in .select,
// we query the message IDs in this room first, then filter reads
const { data: roomMessages, error: roomMsgErr } = await supabase
.from("messages")
.select("id")
.eq("room_id", roomId)

if (roomMsgErr) throw roomMsgErr

const roomMessageIds = roomMessages?.map((m) => m.id) || []

if (roomMessageIds.length === 0) {
return NextResponse.json({ reads: [] })
}

// If specific message_ids were requested, intersect with room messages
let targetIds = roomMessageIds
if (messageIdsParam) {
const requestedIds = new Set(messageIdsParam.split(",").filter(Boolean))
targetIds = roomMessageIds.filter((id) => requestedIds.has(id))
}

if (targetIds.length === 0) {
return NextResponse.json({ reads: [] })
}

const { data: reads, error: readsErr } = await supabase
.from("message_reads")
.select("message_id, user_id, read_at")
.in("message_id", targetIds)

if (readsErr) throw readsErr

return NextResponse.json({ reads: reads || [] })
} catch (error) {
console.error("[v0] GET /api/messages/read error:", error)
return NextResponse.json(
{ error: "Failed to fetch read receipts" },
{ status: 500 },
)
}
}
10 changes: 8 additions & 2 deletions components/MessageItem.tsx
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import React from 'react';
import { Message } from '@/src/types/message';
import { Message, ReadReceipt } from '@/src/types/message';
import { EncryptionBadge } from './EncryptionBadge';
import { ReadReceiptIndicator } from '@/src/components/ReadReceiptIndicator';

interface Props {
message: Message;
readReceipts?: ReadReceipt[];
}

function formatTimestamp(date: Date): string {
return date.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit' });
}

export const MessageItem: React.FC<Props> = ({ message }) => {
export const MessageItem: React.FC<Props> = ({ message, readReceipts = [] }) => {
return (
<div className={message.isOwn ? 'flex flex-col items-end mb-3' : 'flex flex-col items-start mb-3'}>
<div className={message.isOwn ? 'max-w-xs md:max-w-md px-4 py-2 rounded-2xl text-sm break-words bg-blue-600 text-white' : 'max-w-xs md:max-w-md px-4 py-2 rounded-2xl text-sm break-words bg-gray-100 text-gray-900'}>
Expand All @@ -22,6 +24,10 @@ export const MessageItem: React.FC<Props> = ({ message }) => {
<div className="flex items-center gap-1 mt-1 px-1">
<span className='text-xs text-gray-500'>{formatTimestamp(message.timestamp)}</span>
{message.isEncrypted && <EncryptionBadge />}
<ReadReceiptIndicator
isOwn={message.isOwn}
readReceipts={readReceipts}
/>
</div>
</div>
);
Expand Down
2 changes: 1 addition & 1 deletion lib/websocket/chat-hooks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ interface RealtimeMessageUpdate {
displayName: string
content: string
createdAt: number
status: "sending" | "sent" | "delivered"
status: "sending" | "sent" | "delivered" | "read"
}

export interface TypingIndicator {
Expand Down
11 changes: 11 additions & 0 deletions lib/websocket/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,17 @@ export class WebSocketClient {
payload: { messageId, roomId },
timestamp: Date.now(),
});

/**
* Mark one or more messages as read in a room.
* Sends a batched read receipt via WebSocket for real-time propagation.
*/
markAsRead = (roomId: string, messageIds: string[]) =>
this.send({
type: "mark_read",
payload: { roomId, messageIds },
timestamp: Date.now(),
});
}

let instance: WebSocketClient | null = null;
Expand Down
7 changes: 7 additions & 0 deletions lib/websocket/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,5 +175,12 @@ export function useWebSocketSend() {
markAsDelivered: useCallback((messageId: string, roomId: string) => {
client.current.markAsDelivered(messageId, roomId);
}, []),
/**
* Mark one or more messages as read in a room.
* Batched for efficiency — accepts an array of message IDs.
*/
markAsRead: useCallback((roomId: string, messageIds: string[]) => {
client.current.markAsRead(roomId, messageIds);
}, []),
};
}
30 changes: 30 additions & 0 deletions lib/websocket/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,36 @@ export function createWebSocketServer(port: number = 3001) {
break
}

case "mark_read": {
const readRoomId = message.payload.roomId
const readMessageIds = message.payload.messageIds
const readUserId = connection.userId

if (!readUserId) {
ws.send(
JSON.stringify({
type: "error",
payload: { message: "Not authenticated" },
timestamp: Date.now(),
}),
)
break
}

// Broadcast read receipt to all room members (except sender)
broadcastToRoom(readRoomId, {
type: "message_read",
payload: {
roomId: readRoomId,
messageIds: readMessageIds,
userId: readUserId,
readAt: Date.now(),
},
timestamp: Date.now(),
}, clientId)
break
}

case "typing": {
const typingRoomId = message.payload.roomId

Expand Down
36 changes: 36 additions & 0 deletions scripts/migrations/add_message_reads_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- Migration: Add message_reads table for per-user read receipts
-- Run this against your Supabase project via the SQL editor or CLI

CREATE TABLE IF NOT EXISTS message_reads (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
message_id UUID NOT NULL REFERENCES messages(id) ON DELETE CASCADE,
user_id UUID NOT NULL,
read_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (message_id, user_id)
);

-- Enable Row Level Security
ALTER TABLE message_reads ENABLE ROW LEVEL SECURITY;

-- Policy: users can insert their own read receipts
CREATE POLICY "users_insert_own_reads"
ON message_reads FOR INSERT
WITH CHECK (auth.uid() = user_id);

-- Policy: room members can view read receipts for messages in their rooms
CREATE POLICY "room_members_select_reads"
ON message_reads FOR SELECT
USING (
EXISTS (
SELECT 1 FROM messages m
JOIN room_members rm ON rm.room_id = m.room_id
WHERE m.id = message_reads.message_id
AND rm.user_id = auth.uid()
AND rm.removed_at IS NULL
)
);

-- Indexes for performant lookups
CREATE INDEX IF NOT EXISTS idx_message_reads_message_id ON message_reads(message_id);
CREATE INDEX IF NOT EXISTS idx_message_reads_user_id ON message_reads(user_id);
CREATE INDEX IF NOT EXISTS idx_message_reads_message_user ON message_reads(message_id, user_id);
9 changes: 8 additions & 1 deletion src/components/ChatWindow.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import React, { useCallback, useState } from 'react';
import React, { useCallback, useState, useEffect } from 'react';
import { MessageList } from './MessageList';
import { MessageInput } from './MessageInput';
import { useMessages } from '../hooks/useMessages';
Expand Down Expand Up @@ -33,6 +33,13 @@ export const ChatWindow: React.FC<Props> = ({

useChatSubscription(sdk, addMessage);

// Read receipts — auto-tracks visibility and syncs in real-time
const { readReceipts } = useReadReceipts({
roomId: roomId || "",
messages,
currentUserId,
});

const handleSend = useCallback(async (text: string) => {
addMessage({ text, sender: walletAddress, isOwn: true, isEncrypted: true });
try {
Expand Down
6 changes: 4 additions & 2 deletions src/components/MessageList.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import React, { useEffect, useRef, useCallback, useState } from 'react';
import { Message } from '../types/message';
import { Message, ReadReceipt } from '../types/message';
import { MessageItem } from '@/components/MessageItem';

interface Props {
Expand All @@ -9,6 +9,7 @@ interface Props {
isLoadingMore?: boolean;
hasMore?: boolean;
firstMessageId?: string | null;
readReceipts?: Map<string, ReadReceipt[]>;
}

export const MessageList: React.FC<Props> = ({
Expand All @@ -18,6 +19,7 @@ export const MessageList: React.FC<Props> = ({
isLoadingMore = false,
hasMore = false,
firstMessageId,
readReceipts,
}) => {
const bottomRef = useRef<HTMLDivElement>(null);
const scrollContainerRef = useRef<HTMLDivElement>(null);
Expand Down Expand Up @@ -128,7 +130,7 @@ export const MessageList: React.FC<Props> = ({
ref={index === 0 ? firstMessageRef : undefined}
data-message-id={msg.id}
>
<MessageItem message={msg} />
<MessageItem message={msg} readReceipts={readReceipts?.get(msg.id)} />
</div>
))}

Expand Down
Loading