diff --git a/CLIENT.exe b/CLIENT.exe new file mode 100644 index 0000000..13ea5cd Binary files /dev/null and b/CLIENT.exe differ diff --git a/cmd/CLIENT/main.go b/cmd/CLIENT/main.go index 98dfab6..2069029 100644 --- a/cmd/CLIENT/main.go +++ b/cmd/CLIENT/main.go @@ -1,147 +1,21 @@ package main import ( - "bufio" "context" - "crypto/sha256" - "encoding/hex" - "encoding/json" - "fmt" - "io" "log" "os" "os/signal" - "strings" - "sync" "syscall" - "time" - webRTC "torrentium/internal/client" + "torrentium/internal/cli" + "torrentium/internal/core" db "torrentium/internal/db" p2p "torrentium/internal/p2p" - "github.com/dustin/go-humanize" - "github.com/ipfs/go-cid" "github.com/joho/godotenv" - dht "github.com/libp2p/go-libp2p-kad-dht" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" - "github.com/pion/webrtc/v3" - "github.com/schollz/progressbar/v3" - - "github.com/libp2p/go-libp2p/core/peer" - - "github.com/multiformats/go-multiaddr" - "github.com/multiformats/go-multihash" -) - -const ( - DefaultPieceSize = 1 << 20 // 1 MiB pieces - MaxProviders = 10 - MaxChunk = 16 * 1024 // 16KiB chunks - MaxParallelDownloads = 3 - PieceTimeout = 300 * time.Second // Timeout for downloading a single piece - RetransmissionTimeout = 5 * time.Second - KeepAliveInterval = 15 * time.Second - PingInterval = 10 * time.Second - MaxRTT = 500 * time.Millisecond - MinDelay = 0 - MaxDelay = 100 * time.Millisecond - ExponentialBackoffBase = 1 * time.Second - MaxBackoff = 32 * time.Second -) - -type Client struct { - host host.Host - dht *dht.IpfsDHT - webRTCPeers map[peer.ID]*webRTC.SimpleWebRTCPeer - peersMux sync.RWMutex - sharingFiles map[string]*FileInfo - activeDownloads map[string]*DownloadState - downloadsMux sync.RWMutex - db *db.Repository - unackedChunks map[string]map[int64]map[int]controlMessage - unackedChunksMux sync.RWMutex - congestionCtrl map[peer.ID]time.Duration - pingTimes map[peer.ID]time.Time - rttMeasurements map[peer.ID][]time.Duration - rttMux sync.Mutex -} - -type FileInfo struct { - FilePath string - Hash string - Size int64 - Name string - PieceSz int64 -} - -type controlMessage struct { - Command string `json:"command"` - CID string `json:"cid,omitempty"` - PieceSize int64 `json:"piece_size,omitempty"` - TotalSize int64 `json:"total_size,omitempty"` - HashHex string `json:"hash_hex,omitempty"` - NumPieces int64 `json:"num_pieces,omitempty"` - Pieces []db.Piece `json:"pieces,omitempty"` - PieceHash string `json:"piece_hash,omitempty"` - Index int64 `json:"index,omitempty"` - Filename string `json:"filename,omitempty"` - ChunkIndex int `json:"chunk_index,omitempty"` - TotalChunks int `json:"total_chunks,omitempty"` - Payload string `json:"payload,omitempty"` - Sequence int `json:"sequence,omitempty"` -} - -type DownloadState struct { - File *os.File - Manifest controlMessage - TotalPieces int - Pieces []db.Piece - Completed chan bool - Progress *progressbar.ProgressBar - PieceStatus []bool // true if piece is downloaded - PieceAssignees map[int]peer.ID - pieceBuffers map[int][][]byte // Buffer to reassemble chunks into pieces - mu sync.Mutex - completedPieces int - pieceTimers map[int]*time.Timer // Timers for each piece - retryCounts map[int]int // Retry counts for exponential backoff -} - -var ( - manifestWaiters = make(map[string]chan controlMessage) - manifestChMu sync.Mutex + host "github.com/libp2p/go-libp2p/core/host" ) -func setupGracefulShutdown(h host.Host) { - ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - go func() { - <-ch - log.Println("Shutting down gracefully...") - _ = h.Close() - os.Exit(0) - }() -} - -func NewClient(h host.Host, d *dht.IpfsDHT, repo *db.Repository) *Client { - c := &Client{ - host: h, - dht: d, - webRTCPeers: make(map[peer.ID]*webRTC.SimpleWebRTCPeer), - sharingFiles: make(map[string]*FileInfo), - activeDownloads: make(map[string]*DownloadState), - db: repo, - unackedChunks: make(map[string]map[int64]map[int]controlMessage), - congestionCtrl: make(map[peer.ID]time.Duration), - pingTimes: make(map[peer.ID]time.Time), - rttMeasurements: make(map[peer.ID][]time.Duration), - } - go c.monitorCongestion() - return c -} - func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -174,1198 +48,20 @@ func main() { setupGracefulShutdown(h) repo := db.NewRepository(DB) - client := NewClient(h, d, repo) - client.startDHTMaintenance() - p2p.RegisterSignalingProtocol(h, client.handleWebRTCOffer) - - client.commandLoop() -} - -func (c *Client) commandLoop() { - scanner := bufio.NewScanner(os.Stdin) - c.printInstructions() - for { - fmt.Print("> ") - if !scanner.Scan() { - break - } - parts := strings.Fields(scanner.Text()) - if len(parts) == 0 { - continue - } - cmd, args := parts[0], parts[1:] - var err error - switch cmd { - case "help": - c.printInstructions() - case "add": - if len(args) != 1 { - fmt.Println("Usage: add ") - } else { - err = c.addFile(args[0]) - } - case "list": - c.listLocalFiles() - case "search": - if len(args) != 1 { - fmt.Println("Usage: search ") - } else { - c.checkConnectionHealth() - if strings.HasPrefix(args[0], "bafy") || strings.HasPrefix(args[0], "Qm") { - err = c.enhancedSearchByCID(args[0]) - } else { - err = c.searchByText(args[0]) - } - } - case "download": - if len(args) != 1 { - fmt.Println("Usage: download ") - } else { - err = c.downloadFile(args[0]) - } - case "peers": - c.listConnectedPeers() - case "announce": - if len(args) != 1 { - fmt.Println("Usage: announce ") - } else { - err = c.announceFile(args[0]) - } - case "health": - c.checkConnectionHealth() - case "debug": - c.debugNetworkStatus() - case "exit": - return - default: - fmt.Println("Unknown command. Type 'help' for available commands.") - } - if err != nil { - log.Printf("Error: %v", err) - } - } -} + client := core.NewClient(h, d, repo) + client.StartDHTMaintenance() + p2p.RegisterSignalingProtocol(h, client.HandleWebRTCOffer) -func (c *Client) printInstructions() { - peerID := c.host.ID().String() - minWidth := len(" Your Peer ID: " + peerID) + 4 // 4 for border characters - width := 80 - if minWidth > width { - width = minWidth + 10 - } - - // Create the box - topBorder := "┌" + strings.Repeat("─", width-2) + "┐" - bottomBorder := "└" + strings.Repeat("─", width-2) + "┘" - - // Helper function to create a centered line - centerLine := func(text string) string { - if len(text) >= width-4 { - return "│ " + text[:width-6] + "... │" - } - padding := (width - 4 - len(text)) / 2 - leftPad := strings.Repeat(" ", padding) - rightPad := strings.Repeat(" ", width-4-len(text)-padding) - return "│ " + leftPad + text + rightPad + " │" - } - - // Helper function to create a left-aligned line - leftLine := func(text string) string { - if len(text) >= width-4 { - return "│ " + text[:width-6] + "... │" - } - rightPad := strings.Repeat(" ", width-4-len(text)) - return "│ " + text + rightPad + " │" - } - - // Print the fancy box - fmt.Println() - fmt.Println(topBorder) - fmt.Println(centerLine("DECENTRALIZED P2P FILE SHARING")) - fmt.Println("│" + strings.Repeat("─", width-2) + "│") - fmt.Println(centerLine("Available Commands")) - fmt.Println("│" + strings.Repeat(" ", width-2) + "│") - - // Commands with descriptions - commands := [][]string{ - {"add ", "Share a file on the network"}, - {"list", "List your shared files"}, - {"search ", "Search by CID or filename text"}, - {"download ", "Download a file by CID"}, - {"peers", "Show connected peers"}, - {"announce ", "Re-announce a file to DHT"}, - {"health", "Check connection health"}, - {"debug", "Show detailed network debug info"}, - {"help", "Show this help"}, - {"exit", "Exit the application"}, - } - - for _, cmd := range commands { - cmdText := fmt.Sprintf(" %-20s - %s", cmd[0], cmd[1]) - fmt.Println(leftLine(cmdText)) - } - - fmt.Println("│" + strings.Repeat(" ", width-2) + "│") - fmt.Println("│" + strings.Repeat("─", width-2) + "│") - - // Network info - peerID = c.host.ID().String() - fmt.Println(leftLine(" Your Peer ID: " + peerID)) - - // Show listening addresses - addrs := c.host.Addrs() - fmt.Println(leftLine(" Listening on:")) - - for i, addr := range addrs { - if i >= 3 { // Limit to 3 addresses to fit in box - moreAddrs := len(addrs) - 3 - fmt.Println(leftLine(fmt.Sprintf(" ... and %d more", moreAddrs))) - break - } - addrStr := addr.String() - if len(addrStr) > width-8 { - addrStr = addrStr[:width-11] + "..." - } - fmt.Println(leftLine(" " + addrStr)) - } - - fmt.Println(bottomBorder) - fmt.Println() + cli.Execute(client) } -func (c *Client) debugNetworkStatus() { - fmt.Println("\n=== Network Debug Info ===") - fmt.Printf("Our Peer ID: %s\n", c.host.ID()) - fmt.Printf("Our Addresses:\n") - for _, addr := range c.host.Addrs() { - fmt.Printf(" %s/p2p/%s\n", addr, c.host.ID()) - } - - peers := c.host.Network().Peers() - fmt.Printf("\nConnected Peers (%d):\n", len(peers)) - for i, peerID := range peers { - conn := c.host.Network().ConnsToPeer(peerID) - if len(conn) > 0 { - fmt.Printf(" %d. %s\n", i+1, peerID) - fmt.Printf(" Address: %s\n", conn[0].RemoteMultiaddr()) - } - } - - routingTableSize := c.dht.RoutingTable().Size() - fmt.Printf("\nDHT Routing Table Size: %d\n", routingTableSize) - - fmt.Printf("\nShared Files (%d):\n", len(c.sharingFiles)) - for cid, fileInfo := range c.sharingFiles { - fmt.Printf(" CID: %s\n", cid) - fmt.Printf(" File: %s\n", fileInfo.Name) - fmt.Printf(" ---\n") - } -} - -func (c *Client) announceFile(cidStr string) error { - fileCID, err := cid.Decode(cidStr) - if err != nil { - return fmt.Errorf("invalid CID: %w", err) - } - fmt.Printf("Re-announcing CID %s to DHT...\n", cidStr) - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - if err := c.dht.Provide(ctx, fileCID, true); err != nil { - return fmt.Errorf("failed to announce: %w", err) - } - fmt.Println(" - Successfully announced to DHT") - return nil -} - -func (c *Client) startDHTMaintenance() { - go func() { - ticker := time.NewTicker(10 * time.Minute) - defer ticker.Stop() - for range ticker.C { - log.Println("Performing DHT maintenance...") - c.dht.RefreshRoutingTable() - peers := c.host.Network().Peers() - log.Printf("Connected to %d peers", len(peers)) - if len(peers) < 5 { - log.Println("Low peer count; re-bootstrapping...") - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - _ = p2p.Bootstrap(ctx, c.host, c.dht) - cancel() - } - } - }() -} - -func (c *Client) checkConnectionHealth() { - peers := c.host.Network().Peers() - fmt.Printf("\n=== Connection Health ===\n") - fmt.Printf("Connected peers: %d\n", len(peers)) - if len(peers) < 3 { - fmt.Println(" - Warning: Low peer count. Consider restarting or checking network connectivity.") - } else { - fmt.Println(" - Good peer connectivity") - } - - routingTableSize := c.dht.RoutingTable().Size() - fmt.Printf("DHT routing table size: %d\n", routingTableSize) - if routingTableSize < 10 { - fmt.Println(" - Warning: Small DHT routing table. File discovery may be limited.") - } else { - fmt.Println(" - Good DHT connectivity") - } -} - -func (c *Client) addFile(filePath string) error { - ctx := context.Background() - f, err := os.Open(filePath) - if err != nil { - return fmt.Errorf("failed to open file: %w", err) - } - defer f.Close() - - info, err := f.Stat() - if err != nil { - return fmt.Errorf("failed to get file info: %w", err) - } - - hasher := sha256.New() - if _, err := io.Copy(hasher, f); err != nil { - return fmt.Errorf("failed to calculate hash: %w", err) - } - - fileHashBytes := hasher.Sum(nil) - fileHashStr := hex.EncodeToString(fileHashBytes) - mhash, err := multihash.Encode(fileHashBytes, multihash.SHA2_256) - if err != nil { - return fmt.Errorf("failed to create multihash: %w", err) - } - - fileCID := cid.NewCidV1(cid.Raw, mhash) - - // Create pieces manifest - pieceSz := int64(DefaultPieceSize) - numPieces := (info.Size() + pieceSz - 1) / pieceSz - if _, err := f.Seek(0, io.SeekStart); err != nil { - return err - } - - for idx := int64(0); idx < numPieces; idx++ { - offset := idx * pieceSz - size := min64(pieceSz, info.Size()-offset) - h := sha256.New() - if _, err := io.CopyN(h, f, size); err != nil { - return err - } - ph := hex.EncodeToString(h.Sum(nil)) - if err := c.db.UpsertPiece(ctx, fileCID.String(), idx, offset, size, ph, true); err != nil { - return err - } - } - - if err := c.db.AddLocalFile(ctx, fileCID.String(), info.Name(), info.Size(), filePath, fileHashStr); err != nil { - return fmt.Errorf("failed to store file metadata: %w", err) - } - - c.sharingFiles[fileCID.String()] = &FileInfo{ - FilePath: filePath, - Hash: fileHashStr, - Size: info.Size(), - Name: info.Name(), - PieceSz: pieceSz, - } - - log.Printf("Announcing file %s with CID %s to DHT...", info.Name(), fileCID.String()) - provideCtx, cancel := context.WithTimeout(ctx, 60*time.Second) - defer cancel() - if err := c.dht.Provide(provideCtx, fileCID, true); err != nil { - log.Printf(" - Warning: Failed to announce to DHT: %v", err) - } else { - log.Println(" - Successfully announced file to DHT") - } - - fmt.Printf("✓ File '%s' is now being shared\n", info.Name()) - fmt.Printf(" CID: %s\n", fileCID.String()) - fmt.Printf(" Hash: %s\n", fileHashStr) - fmt.Printf(" Size: %s\n", humanize.Bytes(uint64(info.Size()))) - return nil -} - -func (c *Client) listLocalFiles() { - ctx := context.Background() - files, err := c.db.GetLocalFiles(ctx) - if err != nil { - log.Printf("Error retrieving files: %v", err) - return - } - if len(files) == 0 { - fmt.Println(" - No files being shared.") - return - } - fmt.Println("\n=== Your Shared Files ===") - for _, file := range files { - fmt.Printf("Name: %s\n", file.Filename) - fmt.Printf(" CID: %s\n", file.CID) - fmt.Printf(" Size: %s\n", humanize.Bytes(uint64(file.FileSize))) - fmt.Printf(" Path: %s\n", file.FilePath) - fmt.Println(" ---") - } -} - -func (c *Client) searchByText(q string) error { - ctx := context.Background() - matches, err := c.db.SearchByFilename(ctx, q) - if err != nil { - return err - } - if len(matches) == 0 { - fmt.Printf("Searching for files containing '%s'...\n", q) - fmt.Println("Note: Direct filename search requires content indexing.") - fmt.Println("Try using the CID if you have it, or check with known peers.") - return nil - } - fmt.Printf("Local index matches for '%s':\n", q) - for _, m := range matches { - fmt.Printf("- %s CID:%s\n", m.Filename, m.CID) - } - return nil -} - -func (c *Client) enhancedSearchByCID(cidStr string) error { - fileCID, err := cid.Decode(cidStr) - if err != nil { - return fmt.Errorf("invalid CID: %w", err) - } - fmt.Printf("Searching for CID: %s\n", fileCID.String()) - providers, err := c.findProvidersWithTimeout(fileCID, 60*time.Second, MaxProviders) - if err != nil { - return fmt.Errorf("provider search failed: %w", err) - } - - if len(providers) == 0 { - fmt.Println("No providers found for this CID") - fmt.Println("This could mean:") - fmt.Println(" - The file is not being shared") - fmt.Println(" - The provider is offline") - fmt.Println(" - Network connectivity issues") - fmt.Println(" - DHT routing problem") - return nil - } - - fmt.Printf("Found %d provider(s):\n", len(providers)) - for i, provider := range providers { - fmt.Printf(" %d. %s\n", i+1, provider.ID) - if c.host.Network().Connectedness(provider.ID) == network.Connected { - fmt.Printf(" - Already connected\n") - } else { - fmt.Printf(" - Not connected\n") - } - } - return nil -} - -func (c *Client) findProvidersWithTimeout(id cid.Cid, timeout time.Duration, maxProviders int) ([]peer.AddrInfo, error) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - providersChan := c.dht.FindProvidersAsync(ctx, id, maxProviders) - var providers []peer.AddrInfo - var totalFound int - - done := make(chan struct{}) +func setupGracefulShutdown(h host.Host) { + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) go func() { - defer close(done) - for provider := range providersChan { - totalFound++ - if provider.ID != c.host.ID() { - providers = append(providers, provider) - fmt.Printf(" - Found provider %d: %s\n", len(providers), provider.ID) - if len(providers) >= maxProviders { - break - } - } - } - }() - - select { - case <-done: - fmt.Printf("Provider search completed. Found %d total providers, %d unique external providers\n", - totalFound, len(providers)) - case <-time.After(timeout): - fmt.Printf("Provider search timed out. Found %d providers so far\n", len(providers)) - } - - return providers, nil -} - -func (c *Client) connectToPeer(multiaddrStr string) error { - addr, err := multiaddr.NewMultiaddr(multiaddrStr) - if err != nil { - return fmt.Errorf("invalid multiaddr: %w", err) - } - - peerInfo, err := peer.AddrInfoFromP2pAddr(addr) - if err != nil { - return fmt.Errorf("failed to parse peer info: %w", err) - } - - fmt.Printf("Attempting to connect to peer %s...\n", peerInfo.ID) - - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - - if err := c.host.Connect(ctx, *peerInfo); err != nil { - return fmt.Errorf("failed to connect: %w", err) - } - - fmt.Printf(" - Successfully connected to peer %s\n", peerInfo.ID) - - c.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, time.Hour) - - return nil -} - -func (c *Client) listConnectedPeers() { - peers := c.host.Network().Peers() - fmt.Printf("\n=== Connected Peers (%d) ===\n", len(peers)) - for _, peerID := range peers { - conn := c.host.Network().ConnsToPeer(peerID) - if len(conn) > 0 { - fmt.Printf("Peer: %s\n", peerID) - fmt.Printf(" Address: %s\n", conn[0].RemoteMultiaddr()) - } - } -} - -func (c *Client) downloadFile(cidStr string) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - fileCID, err := cid.Decode(cidStr) - if err != nil { - return fmt.Errorf("invalid CID: %w", err) - } - - fmt.Printf("Looking for providers of CID: %s\n", fileCID.String()) - providers, err := c.findProvidersWithTimeout(fileCID, 60*time.Second, MaxProviders) - if err != nil { - return fmt.Errorf("provider search failed: %w", err) - } - - if len(providers) == 0 { - return fmt.Errorf("no providers found") - } - - fmt.Printf("Found %d providers. Getting file manifest...\n", len(providers)) - relayAddrStr := "/dns4/relay-torrentium.onrender.com/tcp/443/wss/p2p/12D3KooWKsLZ7VmZTq7qBHj2cv4DczbEoNFLLDaLLk9ADVxDnqS6" - - var manifest controlMessage - var firstPeer *webRTC.SimpleWebRTCPeer - - for _, p := range providers { - // Try direct connection first - peerConn, err := c.initiateWebRTCConnectionWithRetry(p.ID, 1) - if err != nil { - // Fallback: relay connection - log.Println("🔁 Direct connection failed, trying relay...") - - // Build circuit address - circuitStr := fmt.Sprintf("%s/p2p-circuit/p2p/%s", relayAddrStr, p.ID.String()) - circuitMaddr, err := multiaddr.NewMultiaddr(circuitStr) - if err != nil { - log.Printf("Invalid circuit multiaddr: %v", err) - continue - } - targetInfo := peer.AddrInfo{ID: p.ID, Addrs: []multiaddr.Multiaddr{circuitMaddr}} - - if err := c.host.Connect(ctx, targetInfo); err != nil { - log.Printf("❌ Relay dial failed: %v", err) - continue - } - - log.Printf("✅ Relay dial to %s successful", p.ID) - - //perform WebRTC handshake - peerConn, err = c.initiateWebRTCConnectionWithRetry(p.ID, 1) - if err != nil { - log.Printf("⚠ WebRTC connection via relay failed: %v", err) - continue - } - } - - // If WebRTC connected, fetch manifest - if peerConn != nil { - manifest, err = c.requestManifest(peerConn, cidStr) - if err == nil { - firstPeer = peerConn - break - } - peerConn.Close() - } - } - - if firstPeer == nil { - return fmt.Errorf("failed to connect to any provider to get manifest") - } - - // Store pieces in the database - for _, piece := range manifest.Pieces { - if err := c.db.UpsertPiece(ctx, cidStr, piece.Index, piece.Offset, piece.Size, piece.Hash, false); err != nil { - log.Printf("Failed to store piece info for download: %v", err) - } - } - - downloadPath := fmt.Sprintf("%s.download", manifest.Filename) - finalPath := manifest.Filename - localFile, err := os.Create(downloadPath) - if err != nil { - firstPeer.Close() - return fmt.Errorf("failed to create file: %w", err) - } - - pieces, _ := c.db.GetPieces(ctx, cidStr) - if len(pieces) == 0 { - return fmt.Errorf("failed to retrieve piece information after receiving manifest") - } - - state := &DownloadState{ - File: localFile, - Manifest: manifest, - TotalPieces: int(manifest.NumPieces), - Pieces: pieces, - Completed: make(chan bool, 1), - Progress: progressbar.DefaultBytes(manifest.TotalSize, "downloading..."), - PieceStatus: make([]bool, int(manifest.NumPieces)), - PieceAssignees: make(map[int]peer.ID), - pieceBuffers: make(map[int][][]byte), - completedPieces: 0, - pieceTimers: make(map[int]*time.Timer), - retryCounts: make(map[int]int), - } - c.downloadsMux.Lock() - c.activeDownloads[cidStr] = state - c.downloadsMux.Unlock() - - // Parallel downloads - var wg sync.WaitGroup - peersToUse := providers - if len(peersToUse) > MaxParallelDownloads { - peersToUse = peersToUse[:MaxParallelDownloads] - } - chunksPerPeer := state.TotalPieces / len(peersToUse) - - for i, p := range peersToUse { - start := i * chunksPerPeer - end := start + chunksPerPeer - if i == len(peersToUse)-1 { - end = state.TotalPieces - } - - wg.Add(1) - go func(peerInfo peer.AddrInfo, startPiece, endPiece int) { - defer wg.Done() - var peerConn *webRTC.SimpleWebRTCPeer - if peerInfo.ID.String() == firstPeer.GetSignalingStream().Conn().RemotePeer().String() { - peerConn = firstPeer - } else { - var connErr error - peerConn, connErr = c.initiateWebRTCConnectionWithRetry(peerInfo.ID, 2) - if connErr != nil { - log.Printf("Chunk peer connect failed: %v", connErr) - return - } - defer peerConn.Close() - } - c.downloadChunksFromPeer(peerConn, state, startPiece, endPiece) - }(p, start, end) - } - - <-state.Completed - localFile.Close() - - if err := os.Rename(downloadPath, finalPath); err != nil { - return fmt.Errorf("failed to rename file: %w", err) - } - - fmt.Printf("\n✅ Download complete. File saved as %s\n", finalPath) - return nil -} - -func (c *Client) downloadChunksFromPeer(peer *webRTC.SimpleWebRTCPeer, state *DownloadState, startPiece, endPiece int) { - for i := startPiece; i < endPiece; i++ { - state.mu.Lock() - if state.PieceStatus[i] { - state.mu.Unlock() - continue - } - state.PieceAssignees[i] = peer.GetSignalingStream().Conn().RemotePeer() - state.mu.Unlock() - - req := controlMessage{ - Command: "REQUEST_PIECE", - CID: state.Manifest.CID, - Index: int64(i), - } - - state.mu.Lock() //piece timeout - state.pieceTimers[i] = time.AfterFunc(PieceTimeout, func() { - log.Printf("Piece %d timed out, re-requesting...", i) - c.reRequestPiece(state, i) - }) - state.mu.Unlock() - - if err := peer.SendJSONReliable(req); err != nil { - log.Printf("Failed to request piece %d from %s: %v", i, peer.GetSignalingStream().Conn().RemotePeer(), err) - return - } - } -} - -func (c *Client) reRequestPiece(state *DownloadState, pieceIndex int) { - // Re-assign to another peer with backoff - retryCount := state.retryCounts[pieceIndex] - backoff := ExponentialBackoffBase * time.Duration(1< MaxBackoff { - backoff = MaxBackoff - } - time.AfterFunc(backoff, func() { - // For simplicity, we'll just re-request from any connected peer. - // A more advanced implementation would select a new peer. - for _, p := range c.webRTCPeers { - req := controlMessage{ - Command: "REQUEST_PIECE", - CID: state.Manifest.CID, - Index: int64(pieceIndex), - } - if err := p.SendJSONReliable(req); err == nil { - log.Printf("Re-requested piece %d from a different peer.", pieceIndex) - return - } - } - log.Printf("Failed to re-request piece %d: no available peers.", pieceIndex) - }) - state.retryCounts[pieceIndex]++ -} - -func (c *Client) requestManifest(peer *webRTC.SimpleWebRTCPeer, cidStr string) (controlMessage, error) { - req := controlMessage{Command: "REQUEST_MANIFEST", CID: cidStr} - if err := peer.SendJSONReliable(req); err != nil { - return controlMessage{}, err - } - - manifestCh := make(chan controlMessage, 1) - manifestChMu.Lock() - manifestWaiters[cidStr] = manifestCh - manifestChMu.Unlock() - - defer func() { - manifestChMu.Lock() - delete(manifestWaiters, cidStr) - manifestChMu.Unlock() + <-ch + log.Println("Shutting down gracefully...") + _ = h.Close() + os.Exit(0) }() - - select { - case manifest := <-manifestCh: - return manifest, nil - case <-time.After(30 * time.Second): - return controlMessage{}, fmt.Errorf("timed out waiting for manifest") - } -} - -func (c *Client) initiateWebRTCConnectionWithRetry(targetPeerID peer.ID, maxRetries int) (*webRTC.SimpleWebRTCPeer, error) { - - // First, test ICE connectivity - log.Printf("Testing ICE connectivity before attempting WebRTC connection...") - if err := webRTC.TestICEConnectivity(); err != nil { - log.Printf("ICE connectivity test failed: %v", err) - log.Printf("Warning: WebRTC connections may fail due to network restrictions") - } else { - log.Printf("ICE connectivity test passed") - } - - var lastErr error - for attempt := 1; attempt <= maxRetries; attempt++ { - if attempt > 1 { - backoff := time.Duration(1< 0 { - info = pinfo - } else { - - lastErr = fmt.Errorf("dht lookup failed: %w", err) - continue - } - - if len(info.Addrs) == 0 { - lastErr = fmt.Errorf("peer %s has no known multiaddrs", targetPeerID) - continue - } - - c.host.Peerstore().AddAddrs(info.ID, info.Addrs, time.Hour) - fmt.Println(c.host.Peerstore()) - - if c.host.Network().Connectedness(info.ID) != network.Connected { - connectCtx, connectCancel := context.WithTimeout(context.Background(), 20*time.Second) - err := c.host.Connect(connectCtx, info) - connectCancel() - if err != nil { - log.Printf("failed to connect to peer %s: %v", info.ID, err) - fmt.Printf("DHT lookup failed: %v. This could be a network issue now trying connection using relays.\n", err) - return nil, err - } - - fmt.Printf("Successfully connected to peer %s\n", info.ID) - - time.Sleep(1 * time.Second) - } - - webrtcPeer, err := webRTC.NewSimpleWebRTCPeer(c.onDataChannelMessage, c.onWebRTCPeerClose) - if err != nil { - lastErr = err - return nil, err - } - - offer, err := webrtcPeer.CreateOffer() - if err != nil { - webrtcPeer.Close() - lastErr = err - return nil, err - } - - streamCtx, streamCancel := context.WithTimeout(context.Background(), 30*time.Second) - s, err := c.host.NewStream(streamCtx, targetPeerID, p2p.SignalingProtocolID) - streamCancel() - if err != nil { - webrtcPeer.Close() - lastErr = err - return nil, err - - } - - webrtcPeer.SetSignalingStream(s) - encoder := json.NewEncoder(s) - decoder := json.NewDecoder(s) - - offerMsg := map[string]string{"type": "offer", "data": offer} - if err := encoder.Encode(offerMsg); err != nil { - webrtcPeer.Close() - lastErr = err - continue - } - - var answerMsg map[string]string - if err := decoder.Decode(&answerMsg); err != nil { - webrtcPeer.Close() - lastErr = fmt.Errorf("failed to decode answer: %w", err) - continue - } - - if answerMsg["type"] == "error" { - webrtcPeer.Close() - lastErr = fmt.Errorf("peer returned error: %s", answerMsg["data"]) - continue - } - if answerMsg["type"] != "answer" { - webrtcPeer.Close() - lastErr = fmt.Errorf("expected answer, got: %s", answerMsg["type"]) - continue - } - - if err := webrtcPeer.HandleAnswer(answerMsg["data"]); err != nil { - webrtcPeer.Close() - lastErr = err - continue - } - - if err := webrtcPeer.WaitForConnection(90 * time.Second); err != nil { - webrtcPeer.Close() - lastErr = fmt.Errorf("WebRTC connection failed: %w", err) - continue - } - - //wait for the data channels to be ready - if err := webrtcPeer.WaitForDataChannels(10 * time.Second); err != nil { - webrtcPeer.Close() - lastErr = fmt.Errorf("data channels did not open in time: %w", err) - continue - } - - fmt.Printf("WebRTC connection established with %s\n", targetPeerID) - c.peersMux.Lock() - c.webRTCPeers[targetPeerID] = webrtcPeer - c.peersMux.Unlock() - - return webrtcPeer, nil - } - return nil, lastErr -} - -func (c *Client) handleWebRTCOffer(offer, remotePeerID string, s network.Stream) (string, error) { - peerID, err := peer.Decode(remotePeerID) - if err != nil { - return "", fmt.Errorf("invalid peer ID: %w", err) - } - - webrtcPeer, err := webRTC.NewSimpleWebRTCPeer(c.onDataChannelMessage, c.onWebRTCPeerClose) - if err != nil { - return "", err - } - - webrtcPeer.SetSignalingStream(s) - - answer, err := webrtcPeer.HandleOffer(offer) - if err != nil { - webrtcPeer.Close() - return "", err - } - - c.peersMux.Lock() - c.webRTCPeers[peerID] = webrtcPeer - c.peersMux.Unlock() - - return answer, nil -} - -func (c *Client) onDataChannelMessage(msg webrtc.DataChannelMessage, peer *webRTC.SimpleWebRTCPeer) { - if !msg.IsString { - log.Printf("Received unexpected binary message, expecting JSON.") - return - } - if len(msg.Data) == 0 { - return - } - var ctrl controlMessage - if err := json.Unmarshal(msg.Data, &ctrl); err != nil { - var ping map[string]string - if err2 := json.Unmarshal(msg.Data, &ping); err2 == nil { - if ping["type"] == "ping" { - // Respond to ping - pong := map[string]string{"type": "pong"} - peer.SendJSONReliable(pong) - return - } else if ping["type"] == "pong" { - c.handlePong(peer.GetSignalingStream().Conn().RemotePeer()) - return - } - } - log.Printf("Failed to unmarshal control message: %v. Raw message: %s", err, string(msg.Data)) - return - } - c.handleControlMessage(ctrl, peer) -} - -func (c *Client) handleControlMessage(ctrl controlMessage, peer *webRTC.SimpleWebRTCPeer) { - ctx := context.Background() - switch ctrl.Command { - case "REQUEST_MANIFEST": - c.handleManifestRequest(ctx, ctrl, peer) - case "MANIFEST": - manifestChMu.Lock() - if ch, ok := manifestWaiters[ctrl.CID]; ok { - ch <- ctrl - } - manifestChMu.Unlock() - case "REQUEST_PIECE": - go c.handlePieceRequest(ctx, ctrl, peer) - case "PIECE_CHUNK": - c.handlePieceChunk(ctrl, peer) - case "CHUNK_ACK": - c.handleChunkAck(ctrl) - default: - //do nothing - } -} - -func (c *Client) handlePieceChunk(ctrl controlMessage, peer *webRTC.SimpleWebRTCPeer) { - c.downloadsMux.RLock() - state, ok := c.activeDownloads[ctrl.CID] - c.downloadsMux.RUnlock() - if !ok { - return - } - - // Send an ACK back to the sender using reliable channel - ackMsg := controlMessage{ - Command: "CHUNK_ACK", - CID: ctrl.CID, - Index: ctrl.Index, - Sequence: ctrl.Sequence, - } - if err := peer.SendJSONReliable(ackMsg); err != nil { - log.Printf("Failed to send ACK for chunk %d of piece %d: %v", ctrl.Sequence, ctrl.Index, err) - } - - state.mu.Lock() - defer state.mu.Unlock() - - if state.PieceStatus[ctrl.Index] { - return // Already have this piece - } - - if state.pieceBuffers[int(ctrl.Index)] == nil { - state.pieceBuffers[int(ctrl.Index)] = make([][]byte, ctrl.TotalChunks) - } - - chunkData, err := hex.DecodeString(ctrl.Payload) - if err != nil { - log.Printf("Failed to decode chunk payload: %v", err) - return - } - - state.pieceBuffers[int(ctrl.Index)][ctrl.ChunkIndex] = chunkData - _ = state.Progress.Add(len(chunkData)) - - // Check if piece is complete - isComplete := true - var pieceSize int - for _, chunk := range state.pieceBuffers[int(ctrl.Index)] { - if chunk == nil { - isComplete = false - break - } - pieceSize += len(chunk) - } - - if isComplete { - //Stop the timer for this piece - if timer, ok := state.pieceTimers[int(ctrl.Index)]; ok { - timer.Stop() - delete(state.pieceTimers, int(ctrl.Index)) - } - - // Reassemble and write piece - pieceData := make([]byte, 0, pieceSize) - for _, chunk := range state.pieceBuffers[int(ctrl.Index)] { - pieceData = append(pieceData, chunk...) - } - - //Verify piece hash - h := sha256.New() - h.Write(pieceData) - hash := hex.EncodeToString(h.Sum(nil)) - - if hash != state.Pieces[ctrl.Index].Hash { - log.Printf("Piece %d hash mismatch", ctrl.Index) - state.pieceBuffers[int(ctrl.Index)] = nil // Clear buffer to retry - return - } - - if _, err := state.File.WriteAt(pieceData, state.Pieces[ctrl.Index].Offset); err != nil { - log.Printf("Failed to write piece %d to file: %v", ctrl.Index, err) - return - } - - state.PieceStatus[ctrl.Index] = true - state.completedPieces++ - delete(state.pieceBuffers, int(ctrl.Index)) - - if state.completedPieces == state.TotalPieces { - state.Completed <- true - } - } -} - -func (c *Client) handleChunkAck(ctrl controlMessage) { - c.unackedChunksMux.Lock() - defer c.unackedChunksMux.Unlock() - if _, ok := c.unackedChunks[ctrl.CID]; ok { - if _, ok := c.unackedChunks[ctrl.CID][ctrl.Index]; ok { - delete(c.unackedChunks[ctrl.CID][ctrl.Index], ctrl.Sequence) - if len(c.unackedChunks[ctrl.CID][ctrl.Index]) == 0 { - delete(c.unackedChunks[ctrl.CID], ctrl.Index) - } - } - if len(c.unackedChunks[ctrl.CID]) == 0 { - delete(c.unackedChunks, ctrl.CID) - } - } -} - -func (c *Client) handlePieceRequest(ctx context.Context, ctrl controlMessage, peer *webRTC.SimpleWebRTCPeer) { - pieces, err := c.db.GetPieces(ctx, ctrl.CID) - if err != nil || int(ctrl.Index) >= len(pieces) { - log.Printf("Invalid piece request for CID %s, index %d", ctrl.CID, ctrl.Index) - return - } - - fileInfo, err := c.db.GetLocalFileByCID(ctx, ctrl.CID) - if err != nil { - log.Printf("File not found for piece request: %s", ctrl.CID) - return - } - - file, err := os.Open(fileInfo.FilePath) - if err != nil { - log.Printf("Failed to open file for piece request: %v", err) - return - } - defer file.Close() - - piece := pieces[ctrl.Index] - pieceBuffer := make([]byte, piece.Size) - _, err = file.ReadAt(pieceBuffer, piece.Offset) - if err != nil { - log.Printf("Failed to read piece %d: %v", ctrl.Index, err) - return - } - - totalChunks := (len(pieceBuffer) + MaxChunk - 1) / MaxChunk - for i := 0; i < totalChunks; i++ { - start := i * MaxChunk - end := start + MaxChunk - if end > len(pieceBuffer) { - end = len(pieceBuffer) - } - chunk := pieceBuffer[start:end] - - chunkMsg := controlMessage{ - Command: "PIECE_CHUNK", - CID: ctrl.CID, - Index: ctrl.Index, - ChunkIndex: i, - TotalChunks: totalChunks, - Payload: hex.EncodeToString(chunk), - Sequence: i, - } - - // Store the sent chunk and start a retransmission timer - c.unackedChunksMux.Lock() - if c.unackedChunks[ctrl.CID] == nil { - c.unackedChunks[ctrl.CID] = make(map[int64]map[int]controlMessage) - } - if c.unackedChunks[ctrl.CID][ctrl.Index] == nil { - c.unackedChunks[ctrl.CID][ctrl.Index] = make(map[int]controlMessage) - } - c.unackedChunks[ctrl.CID][ctrl.Index][i] = chunkMsg - c.unackedChunksMux.Unlock() - time.AfterFunc(RetransmissionTimeout, func() { c.retransmitChunk(peer, chunkMsg) }) - - if err := peer.SendJSON(chunkMsg); err != nil { - log.Printf("Failed to send chunk %d of piece %d: %v", i, ctrl.Index, err) - return - } - delay := c.congestionCtrl[peer.GetSignalingStream().Conn().RemotePeer()] - time.Sleep(delay) - } -} - -func (c *Client) retransmitChunk(peer *webRTC.SimpleWebRTCPeer, chunkMsg controlMessage) { - c.unackedChunksMux.RLock() - defer c.unackedChunksMux.RUnlock() - if _, ok := c.unackedChunks[chunkMsg.CID]; ok { - if _, ok := c.unackedChunks[chunkMsg.CID][chunkMsg.Index]; ok { - if _, ok := c.unackedChunks[chunkMsg.CID][chunkMsg.Index][chunkMsg.Sequence]; ok { - log.Printf("Retransmitting chunk %d of piece %d", chunkMsg.Sequence, chunkMsg.Index) - if err := peer.SendJSON(chunkMsg); err != nil { - log.Printf("Failed to retransmit chunk %d of piece %d: %v", chunkMsg.Sequence, chunkMsg.Index, err) - } - // Reset timer - time.AfterFunc(RetransmissionTimeout, func() { c.retransmitChunk(peer, chunkMsg) }) - } - } - } -} - -func (c *Client) handleManifestRequest(ctx context.Context, ctrl controlMessage, peer *webRTC.SimpleWebRTCPeer) { - localFile, err := c.db.GetLocalFileByCID(ctx, ctrl.CID) - if err != nil { - log.Printf("File not found for manifest: %s", ctrl.CID) - return - } - - pieces, err := c.db.GetPieces(ctx, ctrl.CID) - if err != nil { - log.Printf("Error getting pieces: %v", err) - return - } - - manifest := controlMessage{ - Command: "MANIFEST", - CID: ctrl.CID, - TotalSize: localFile.FileSize, - HashHex: localFile.FileHash, - NumPieces: int64(len(pieces)), - Pieces: pieces, - Filename: localFile.Filename, - } - - if err := peer.SendJSONReliable(manifest); err != nil { - log.Printf("Error sending manifest: %v", err) - } -} - -func (c *Client) onWebRTCPeerClose(peerID peer.ID) { - log.Printf("WebRTC peer disconnected: %s", peerID) - c.peersMux.Lock() - delete(c.webRTCPeers, peerID) - c.peersMux.Unlock() - - // Handle download resumption logic - c.downloadsMux.Lock() - defer c.downloadsMux.Unlock() - - for cid, state := range c.activeDownloads { - for pieceIndex, assignee := range state.PieceAssignees { - if assignee == peerID { - log.Printf("Peer %s disconnected, re-requesting piece %d for download %s", peerID, pieceIndex, cid) - // Re-queue the piece for download - go c.reRequestPiece(state, pieceIndex) - } - } - } -} - -func min64(a, b int64) int64 { - if a < b { - return a - } - return b -} - -func (c *Client) monitorCongestion() { - ticker := time.NewTicker(PingInterval) - for range ticker.C { - c.peersMux.RLock() - for pid, peer := range c.webRTCPeers { - if connState := peer.GetConnectionState(); connState == webRTC.ConnectionStateConnected { - c.pingTimes[pid] = time.Now() - ping := map[string]string{"type": "ping"} - peer.SendJSONReliable(ping) - } - } - c.peersMux.RUnlock() - } -} - -func (c *Client) handlePong(pid peer.ID) { - if start, ok := c.pingTimes[pid]; ok { - rtt := time.Since(start) - c.rttMux.Lock() - if _, ok := c.rttMeasurements[pid]; !ok { - c.rttMeasurements[pid] = []time.Duration{} - } - c.rttMeasurements[pid] = append(c.rttMeasurements[pid], rtt) - if len(c.rttMeasurements[pid]) > 10 { - c.rttMeasurements[pid] = c.rttMeasurements[pid][1:] - } - avgRTT := time.Duration(0) - for _, d := range c.rttMeasurements[pid] { - avgRTT += d - } - avgRTT /= time.Duration(len(c.rttMeasurements[pid])) - var delay time.Duration = MinDelay - if avgRTT > MaxRTT { - delay = MaxDelay - } else if avgRTT > MaxRTT/2 { - delay = (MaxDelay - MinDelay) / 2 - } - c.congestionCtrl[pid] = delay - c.rttMux.Unlock() - delete(c.pingTimes, pid) - } } diff --git a/go.mod b/go.mod index 48c6a64..b7cdd0d 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/boxo v0.33.1 // indirect github.com/ipfs/go-datastore v0.8.2 // indirect github.com/ipld/go-ipld-prime v0.21.0 // indirect @@ -24,6 +25,8 @@ require ( github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/polydawn/refmt v0.89.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/spf13/cobra v1.10.2 // indirect + github.com/spf13/pflag v1.0.9 // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel v1.37.0 // indirect diff --git a/go.sum b/go.sum index b73e4b0..5d4f8ad 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,7 @@ github.com/chengxilo/virtualterm v1.0.4/go.mod h1:DyxxBZz/x1iqJjFxTFcr6/x+jSpqN0 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -87,6 +88,8 @@ github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iP github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/ipfs/boxo v0.33.1 h1:89m+ksw+cYi0ecTNTJ71IRS5ZrLiovmO6XWHIOGhAEg= github.com/ipfs/boxo v0.33.1/go.mod h1:KwlJTzv5fb1GLlA9KyMqHQmvP+4mrFuiE3PnjdrPJHs= github.com/ipfs/go-block-format v0.2.2 h1:uecCTgRwDIXyZPgYspaLXoMiMmxQpSx2aq34eNc4YvQ= @@ -304,6 +307,7 @@ github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/schollz/progressbar/v3 v3.18.0 h1:uXdoHABRFmNIjUfte/Ex7WtuyVslrw2wVPQmCN62HpA= github.com/schollz/progressbar/v3 v3.18.0/go.mod h1:IsO3lpbaGuzh8zIMzgY3+J8l4C8GjO0Y9S69eFvNsec= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= @@ -338,6 +342,10 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= +github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= +github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= +github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -382,6 +390,7 @@ go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE= golang.org/x/build v0.0.0-20190111050920-041ab4dc3f9d/go.mod h1:OWs+y06UdEOHN4y+MfF/py+xQ/tYqIWW03b70/CG9Rw= golang.org/x/crypto v0.0.0-20181030102418-4d3f4d9ffa16/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/internal/cli/add.go b/internal/cli/add.go new file mode 100644 index 0000000..cd64cf6 --- /dev/null +++ b/internal/cli/add.go @@ -0,0 +1,20 @@ +package cli + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +var addCmd = &cobra.Command{ + Use: "add ", + Short: "Share a file on the network", + Long: `Add a file to the P2P network for sharing. The file will be announced to the DHT.`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + if client == nil { + return fmt.Errorf("client not initialized") + } + return client.AddFile(args[0]) + }, +} diff --git a/internal/cli/announce.go b/internal/cli/announce.go new file mode 100644 index 0000000..f11eb90 --- /dev/null +++ b/internal/cli/announce.go @@ -0,0 +1,20 @@ +package cli + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +var announceCmd = &cobra.Command{ + Use: "announce ", + Short: "Re-announce a file to DHT", + Long: `Re-announce a file's CID to the DHT to refresh its availability.`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + if client == nil { + return fmt.Errorf("client not initialized") + } + return client.AnnounceFile(args[0]) + }, +} diff --git a/internal/cli/debug.go b/internal/cli/debug.go new file mode 100644 index 0000000..924d260 --- /dev/null +++ b/internal/cli/debug.go @@ -0,0 +1,20 @@ +package cli + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +var debugCmd = &cobra.Command{ + Use: "debug", + Short: "Show detailed network debug info", + Long: `Display detailed network debugging information including peer IDs, addresses, and shared files.`, + RunE: func(cmd *cobra.Command, args []string) error { + if client == nil { + return fmt.Errorf("client not initialized") + } + client.DebugNetworkStatus() + return nil + }, +} diff --git a/internal/cli/download.go b/internal/cli/download.go new file mode 100644 index 0000000..86ff0cf --- /dev/null +++ b/internal/cli/download.go @@ -0,0 +1,20 @@ +package cli + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +var downloadCmd = &cobra.Command{ + Use: "download ", + Short: "Download a file by CID", + Long: `Download a file from the P2P network using its Content Identifier (CID).`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + if client == nil { + return fmt.Errorf("client not initialized") + } + return client.DownloadFile(args[0]) + }, +} diff --git a/internal/cli/health.go b/internal/cli/health.go new file mode 100644 index 0000000..d00f640 --- /dev/null +++ b/internal/cli/health.go @@ -0,0 +1,20 @@ +package cli + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +var healthCmd = &cobra.Command{ + Use: "health", + Short: "Check connection health", + Long: `Display the current health status of your P2P connections and DHT routing table.`, + RunE: func(cmd *cobra.Command, args []string) error { + if client == nil { + return fmt.Errorf("client not initialized") + } + client.CheckConnectionHealth() + return nil + }, +} diff --git a/internal/cli/interactive.go b/internal/cli/interactive.go new file mode 100644 index 0000000..f84c448 --- /dev/null +++ b/internal/cli/interactive.go @@ -0,0 +1,171 @@ +package cli + +import ( + "bufio" + "fmt" + "log" + "os" + "strings" + + "github.com/spf13/cobra" +) + +var interactiveCmd = &cobra.Command{ + Use: "interactive", + Short: "Start interactive CLI mode", + Long: `Start an interactive command-line interface for continuous operation.`, + RunE: func(cmd *cobra.Command, args []string) error { + if client == nil { + return fmt.Errorf("client not initialized") + } + CommandLoop() + return nil + }, +} + +// CommandLoop runs the interactive CLI mode +func CommandLoop() { + scanner := bufio.NewScanner(os.Stdin) + PrintInstructions() + for { + fmt.Print("> ") + if !scanner.Scan() { + break + } + parts := strings.Fields(scanner.Text()) + if len(parts) == 0 { + continue + } + cmd, args := parts[0], parts[1:] + var err error + switch cmd { + case "help": + PrintInstructions() + case "add": + if len(args) != 1 { + fmt.Println("Usage: add ") + } else { + err = client.AddFile(args[0]) + } + case "list": + client.ListLocalFiles() + case "search": + if len(args) != 1 { + fmt.Println("Usage: search ") + } else { + client.CheckConnectionHealth() + if strings.HasPrefix(args[0], "bafy") || strings.HasPrefix(args[0], "Qm") { + err = client.EnhancedSearchByCID(args[0]) + } else { + err = client.SearchByText(args[0]) + } + } + case "download": + if len(args) != 1 { + fmt.Println("Usage: download ") + } else { + err = client.DownloadFile(args[0]) + } + case "peers": + client.ListConnectedPeers() + case "announce": + if len(args) != 1 { + fmt.Println("Usage: announce ") + } else { + err = client.AnnounceFile(args[0]) + } + case "health": + client.CheckConnectionHealth() + case "debug": + client.DebugNetworkStatus() + case "exit": + return + default: + fmt.Println("Unknown command. Type 'help' for available commands.") + } + if err != nil { + log.Printf("Error: %v", err) + } + } +} + +// PrintInstructions displays the help banner +func PrintInstructions() { + peerID := client.Host.ID().String() + minWidth := len(" Your Peer ID: "+peerID) + 4 + width := 80 + if minWidth > width { + width = minWidth + 10 + } + + topBorder := "┌" + strings.Repeat("─", width-2) + "┐" + bottomBorder := "└" + strings.Repeat("─", width-2) + "┘" + + centerLine := func(text string) string { + if len(text) >= width-4 { + return "│ " + text[:width-6] + "... │" + } + padding := (width - 4 - len(text)) / 2 + leftPad := strings.Repeat(" ", padding) + rightPad := strings.Repeat(" ", width-4-len(text)-padding) + return "│ " + leftPad + text + rightPad + " │" + } + + leftLine := func(text string) string { + if len(text) >= width-4 { + return "│ " + text[:width-6] + "... │" + } + rightPad := strings.Repeat(" ", width-4-len(text)) + return "│ " + text + rightPad + " │" + } + + fmt.Println() + fmt.Println(topBorder) + fmt.Println(centerLine("DECENTRALIZED P2P FILE SHARING")) + fmt.Println("│" + strings.Repeat("─", width-2) + "│") + fmt.Println(centerLine("Available Commands")) + fmt.Println("│" + strings.Repeat(" ", width-2) + "│") + + commands := [][]string{ + {"add ", "Share a file on the network"}, + {"list", "List your shared files"}, + {"search ", "Search by CID or filename text"}, + {"download ", "Download a file by CID"}, + {"peers", "Show connected peers"}, + {"announce ", "Re-announce a file to DHT"}, + {"health", "Check connection health"}, + {"debug", "Show detailed network debug info"}, + {"help", "Show this help"}, + {"exit", "Exit the application"}, + } + + for _, cmd := range commands { + cmdText := fmt.Sprintf(" %-20s - %s", cmd[0], cmd[1]) + fmt.Println(leftLine(cmdText)) + } + + fmt.Println("│" + strings.Repeat(" ", width-2) + "│") + fmt.Println("│" + strings.Repeat("─", width-2) + "│") + + peerID = client.Host.ID().String() + fmt.Println(leftLine(" Your Peer ID: " + peerID)) + + addrs := client.Host.Addrs() + fmt.Println(leftLine(" Listening on:")) + + for i, addr := range addrs { + if i >= 3 { + moreAddrs := len(addrs) - 3 + fmt.Println(leftLine(fmt.Sprintf(" ... and %d more", moreAddrs))) + break + } + addrStr := addr.String() + if len(addrStr) > width-8 { + addrStr = addrStr[:width-11] + "..." + } + fmt.Println(leftLine(" " + addrStr)) + } + + fmt.Println(bottomBorder) + fmt.Println() +} diff --git a/internal/cli/list.go b/internal/cli/list.go new file mode 100644 index 0000000..92e783f --- /dev/null +++ b/internal/cli/list.go @@ -0,0 +1,20 @@ +package cli + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +var listCmd = &cobra.Command{ + Use: "list", + Short: "List your shared files", + Long: `Display all files that you are currently sharing on the P2P network.`, + RunE: func(cmd *cobra.Command, args []string) error { + if client == nil { + return fmt.Errorf("client not initialized") + } + client.ListLocalFiles() + return nil + }, +} diff --git a/internal/cli/peers.go b/internal/cli/peers.go new file mode 100644 index 0000000..1ac00aa --- /dev/null +++ b/internal/cli/peers.go @@ -0,0 +1,20 @@ +package cli + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +var peersCmd = &cobra.Command{ + Use: "peers", + Short: "Show connected peers", + Long: `Display all peers currently connected to your node.`, + RunE: func(cmd *cobra.Command, args []string) error { + if client == nil { + return fmt.Errorf("client not initialized") + } + client.ListConnectedPeers() + return nil + }, +} diff --git a/internal/cli/root.go b/internal/cli/root.go new file mode 100644 index 0000000..3e2ac27 --- /dev/null +++ b/internal/cli/root.go @@ -0,0 +1,61 @@ +package cli + +import ( + "fmt" + "os" + "os/signal" + "syscall" + + "torrentium/internal/core" + + "github.com/spf13/cobra" +) + +var client *core.Client + +var rootCmd = &cobra.Command{ + Use: "torrentium", + Short: "Decentralized P2P file sharing", + Long: ` +╔══════════════════════════════════════════════════════════════════════════════╗ +║ TORRENTIUM - Decentralized P2P File Sharing ║ +╠══════════════════════════════════════════════════════════════════════════════╣ +║ A peer-to-peer file sharing application using libp2p and WebRTC ║ +╚══════════════════════════════════════════════════════════════════════════════╝`, + PersistentPreRun: func(cmd *cobra.Command, args []string) { + // Skip client init for help commands + if cmd.Name() == "help" || cmd.Name() == "completion" { + return + } + }, +} + +func Execute(c *core.Client) { + client = c + + // Handle graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigChan + fmt.Println("\nShutting down...") + os.Exit(0) + }() + + if err := rootCmd.Execute(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +func init() { + rootCmd.AddCommand(addCmd) + rootCmd.AddCommand(listCmd) + rootCmd.AddCommand(searchCmd) + rootCmd.AddCommand(downloadCmd) + rootCmd.AddCommand(peersCmd) + rootCmd.AddCommand(announceCmd) + rootCmd.AddCommand(healthCmd) + rootCmd.AddCommand(debugCmd) + rootCmd.AddCommand(interactiveCmd) +} diff --git a/internal/cli/search.go b/internal/cli/search.go new file mode 100644 index 0000000..fbc54dc --- /dev/null +++ b/internal/cli/search.go @@ -0,0 +1,26 @@ +package cli + +import ( + "fmt" + "strings" + + "github.com/spf13/cobra" +) + +var searchCmd = &cobra.Command{ + Use: "search ", + Short: "Search for files by CID or filename", + Long: `Search for files on the network. Use a CID (starting with 'bafy' or 'Qm') for DHT lookup, or text for local filename search.`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + if client == nil { + return fmt.Errorf("client not initialized") + } + client.CheckConnectionHealth() + query := args[0] + if strings.HasPrefix(query, "bafy") || strings.HasPrefix(query, "Qm") { + return client.EnhancedSearchByCID(query) + } + return client.SearchByText(query) + }, +} diff --git a/internal/client/webrtc.go b/internal/client/webrtc.go index fb05e00..e210db6 100644 --- a/internal/client/webrtc.go +++ b/internal/client/webrtc.go @@ -13,6 +13,9 @@ import ( "github.com/pion/webrtc/v3" ) +// Re-export DataChannelMessage so it can be used by other packages +type DataChannelMessage = webrtc.DataChannelMessage + const ( maxICEGatheringTimeout = 15 * time.Second connectionTimeout = 30 * time.Second @@ -43,23 +46,23 @@ const ( ) type SimpleWebRTCPeer struct { - ID peer.ID - pc *webrtc.PeerConnection - dc *webrtc.DataChannel - reliableDC *webrtc.DataChannel - onMessage func(msg webrtc.DataChannelMessage, peer *SimpleWebRTCPeer) - onCloseCallback func(peerID peer.ID) - fileWriter io.WriteCloser - writerMutex sync.RWMutex - signalingStream network.Stream - streamMux sync.RWMutex - state ConnectionState - stateMux sync.RWMutex - closeOnce sync.Once - closeCh chan struct{} - keepAliveTick *time.Ticker - reliableDCOpen chan struct{} //To signal when the reliable channel is open - dcOpenWg sync.WaitGroup //To wait for all data channels + ID peer.ID + pc *webrtc.PeerConnection + dc *webrtc.DataChannel + reliableDC *webrtc.DataChannel + onMessage func(msg webrtc.DataChannelMessage, peer *SimpleWebRTCPeer) + onCloseCallback func(peerID peer.ID) + fileWriter io.WriteCloser + writerMutex sync.RWMutex + signalingStream network.Stream + streamMux sync.RWMutex + state ConnectionState + stateMux sync.RWMutex + closeOnce sync.Once + closeCh chan struct{} + keepAliveTick *time.Ticker + reliableDCOpen chan struct{} //To signal when the reliable channel is open + dcOpenWg sync.WaitGroup //To wait for all data channels } func NewSimpleWebRTCPeer(onMessage func(msg webrtc.DataChannelMessage, peer *SimpleWebRTCPeer), onClose func(peerID peer.ID)) (*SimpleWebRTCPeer, error) { @@ -69,12 +72,12 @@ func NewSimpleWebRTCPeer(onMessage func(msg webrtc.DataChannelMessage, peer *Sim } peer := &SimpleWebRTCPeer{ - pc: pc, - onMessage: onMessage, - onCloseCallback: onClose, - closeCh: make(chan struct{}), - state: ConnectionStateNew, - reliableDCOpen: make(chan struct{}), //Initialize the new channel + pc: pc, + onMessage: onMessage, + onCloseCallback: onClose, + closeCh: make(chan struct{}), + state: ConnectionStateNew, + reliableDCOpen: make(chan struct{}), //Initialize the new channel } //two data channels: "data" and "reliable" @@ -243,7 +246,7 @@ func (p *SimpleWebRTCPeer) SendJSON(v interface{}) error { return p.dc.SendText(string(data)) } -//SendJSONReliable waits for the channel to be ready +// SendJSONReliable waits for the channel to be ready func (p *SimpleWebRTCPeer) SendJSONReliable(v interface{}) error { select { case <-p.reliableDCOpen: @@ -332,7 +335,7 @@ func (p *SimpleWebRTCPeer) WaitForConnection(timeout time.Duration) error { } } -//function to wait for all data channels +// function to wait for all data channels func (p *SimpleWebRTCPeer) WaitForDataChannels(timeout time.Duration) error { done := make(chan struct{}) go func() { @@ -458,4 +461,4 @@ func TestICEConnectivity() error { log.Printf("ICE gathering timed out with %d candidates", candidateCount) return nil } -} \ No newline at end of file +} diff --git a/internal/core/client.go b/internal/core/client.go new file mode 100644 index 0000000..3ad7400 --- /dev/null +++ b/internal/core/client.go @@ -0,0 +1,59 @@ +package core + +import ( + "sync" + "time" + + webRTC "torrentium/internal/client" + db "torrentium/internal/db" + "torrentium/internal/types" + + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" +) + +type Client struct { + Host host.Host + DHT *dht.IpfsDHT + WebRTCPeers map[peer.ID]*webRTC.SimpleWebRTCPeer + PeersMux sync.RWMutex + SharingFiles map[string]*types.FileInfo + ActiveDownloads map[string]*types.DownloadState + DownloadsMux sync.RWMutex + DB *db.Repository + UnackedChunks map[string]map[int64]map[int]types.ControlMessage + UnackedChunksMux sync.RWMutex + CongestionCtrl map[peer.ID]time.Duration + PingTimes map[peer.ID]time.Time + RTTMeasurements map[peer.ID][]time.Duration + RTTMux sync.Mutex + + ManifestWaiters map[string]chan types.ControlMessage + ManifestChMu sync.Mutex +} + +func NewClient(h host.Host, d *dht.IpfsDHT, repo *db.Repository) *Client { + c := &Client{ + Host: h, + DHT: d, + WebRTCPeers: make(map[peer.ID]*webRTC.SimpleWebRTCPeer), + SharingFiles: make(map[string]*types.FileInfo), + ActiveDownloads: make(map[string]*types.DownloadState), + DB: repo, + UnackedChunks: make(map[string]map[int64]map[int]types.ControlMessage), + CongestionCtrl: make(map[peer.ID]time.Duration), + PingTimes: make(map[peer.ID]time.Time), + RTTMeasurements: make(map[peer.ID][]time.Duration), + ManifestWaiters: make(map[string]chan types.ControlMessage), + } + go c.MonitorCongestion() + return c +} + +func min64(a, b int64) int64 { + if a < b { + return a + } + return b +} diff --git a/internal/core/fileops.go b/internal/core/fileops.go new file mode 100644 index 0000000..df64152 --- /dev/null +++ b/internal/core/fileops.go @@ -0,0 +1,356 @@ +package core + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "log" + "os" + "sync" + "time" + + webRTC "torrentium/internal/client" + "torrentium/internal/types" + + "github.com/dustin/go-humanize" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multihash" + "github.com/schollz/progressbar/v3" +) + +func (c *Client) AddFile(filePath string) error { + ctx := context.Background() + f, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + return fmt.Errorf("failed to get file info: %w", err) + } + + hasher := sha256.New() + if _, err := io.Copy(hasher, f); err != nil { + return fmt.Errorf("failed to calculate hash: %w", err) + } + + fileHashBytes := hasher.Sum(nil) + fileHashStr := hex.EncodeToString(fileHashBytes) + mhash, err := multihash.Encode(fileHashBytes, multihash.SHA2_256) + if err != nil { + return fmt.Errorf("failed to create multihash: %w", err) + } + + fileCID := cid.NewCidV1(cid.Raw, mhash) + + // Create pieces manifest + pieceSz := int64(types.DefaultPieceSize) + numPieces := (info.Size() + pieceSz - 1) / pieceSz + if _, err := f.Seek(0, io.SeekStart); err != nil { + return err + } + + for idx := int64(0); idx < numPieces; idx++ { + offset := idx * pieceSz + size := min64(pieceSz, info.Size()-offset) + h := sha256.New() + if _, err := io.CopyN(h, f, size); err != nil { + return err + } + ph := hex.EncodeToString(h.Sum(nil)) + if err := c.DB.UpsertPiece(ctx, fileCID.String(), idx, offset, size, ph, true); err != nil { + return err + } + } + + if err := c.DB.AddLocalFile(ctx, fileCID.String(), info.Name(), info.Size(), filePath, fileHashStr); err != nil { + return fmt.Errorf("failed to store file metadata: %w", err) + } + + c.SharingFiles[fileCID.String()] = &types.FileInfo{ + FilePath: filePath, + Hash: fileHashStr, + Size: info.Size(), + Name: info.Name(), + PieceSz: pieceSz, + } + + log.Printf("Announcing file %s with CID %s to DHT...", info.Name(), fileCID.String()) + provideCtx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + if err := c.DHT.Provide(provideCtx, fileCID, true); err != nil { + log.Printf(" - Warning: Failed to announce to DHT: %v", err) + } else { + log.Println(" - Successfully announced file to DHT") + } + + fmt.Printf("✓ File '%s' is now being shared\n", info.Name()) + fmt.Printf(" CID: %s\n", fileCID.String()) + fmt.Printf(" Hash: %s\n", fileHashStr) + fmt.Printf(" Size: %s\n", humanize.Bytes(uint64(info.Size()))) + return nil +} + +func (c *Client) ListLocalFiles() { + ctx := context.Background() + files, err := c.DB.GetLocalFiles(ctx) + if err != nil { + log.Printf("Error retrieving files: %v", err) + return + } + if len(files) == 0 { + fmt.Println(" - No files being shared.") + return + } + fmt.Println("\n=== Your Shared Files ===") + for _, file := range files { + fmt.Printf("Name: %s\n", file.Filename) + fmt.Printf(" CID: %s\n", file.CID) + fmt.Printf(" Size: %s\n", humanize.Bytes(uint64(file.FileSize))) + fmt.Printf(" Path: %s\n", file.FilePath) + fmt.Println(" ---") + } +} + +func (c *Client) SearchByText(q string) error { + ctx := context.Background() + matches, err := c.DB.SearchByFilename(ctx, q) + if err != nil { + return err + } + if len(matches) == 0 { + fmt.Printf("Searching for files containing '%s'...\n", q) + fmt.Println("Note: Direct filename search requires content indexing.") + fmt.Println("Try using the CID if you have it, or check with known peers.") + return nil + } + fmt.Printf("Local index matches for '%s':\n", q) + for _, m := range matches { + fmt.Printf("- %s CID:%s\n", m.Filename, m.CID) + } + return nil +} + +func (c *Client) EnhancedSearchByCID(cidStr string) error { + fileCID, err := cid.Decode(cidStr) + if err != nil { + return fmt.Errorf("invalid CID: %w", err) + } + fmt.Printf("Searching for CID: %s\n", fileCID.String()) + providers, err := c.FindProvidersWithTimeout(fileCID, 60*time.Second, types.MaxProviders) + if err != nil { + return fmt.Errorf("provider search failed: %w", err) + } + + if len(providers) == 0 { + fmt.Println("No providers found for this CID") + fmt.Println("This could mean:") + fmt.Println(" - The file is not being shared") + fmt.Println(" - The provider is offline") + fmt.Println(" - Network connectivity issues") + fmt.Println(" - DHT routing problem") + return nil + } + + fmt.Printf("Found %d provider(s):\n", len(providers)) + for i, provider := range providers { + fmt.Printf(" %d. %s\n", i+1, provider.ID) + if c.Host.Network().Connectedness(provider.ID) == network.Connected { + fmt.Printf(" - Already connected\n") + } else { + fmt.Printf(" - Not connected\n") + } + } + return nil +} + +func (c *Client) DownloadFile(cidStr string) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fileCID, err := cid.Decode(cidStr) + if err != nil { + return fmt.Errorf("invalid CID: %w", err) + } + + fmt.Printf("Looking for providers of CID: %s\n", fileCID.String()) + providers, err := c.FindProvidersWithTimeout(fileCID, 60*time.Second, types.MaxProviders) + if err != nil { + return fmt.Errorf("provider search failed: %w", err) + } + + if len(providers) == 0 { + return fmt.Errorf("no providers found") + } + + fmt.Printf("Found %d providers. Getting file manifest...\n", len(providers)) + relayAddrStr := "/dns4/relay-torrentium.onrender.com/tcp/443/wss/p2p/12D3KooWKsLZ7VmZTq7qBHj2cv4DczbEoNFLLDaLLk9ADVxDnqS6" + + var manifest types.ControlMessage + var firstPeer *webRTC.SimpleWebRTCPeer + + for _, p := range providers { + // Try direct connection first + peerConn, err := c.InitiateWebRTCConnectionWithRetry(p.ID, 1) + if err != nil { + // Fallback: relay connection + log.Println("🔁 Direct connection failed, trying relay...") + + // Build circuit address + circuitStr := fmt.Sprintf("%s/p2p-circuit/p2p/%s", relayAddrStr, p.ID.String()) + circuitMaddr, err := multiaddr.NewMultiaddr(circuitStr) + if err != nil { + log.Printf("Invalid circuit multiaddr: %v", err) + continue + } + targetInfo := peer.AddrInfo{ID: p.ID, Addrs: []multiaddr.Multiaddr{circuitMaddr}} + + if err := c.Host.Connect(ctx, targetInfo); err != nil { + log.Printf("❌ Relay dial failed: %v", err) + continue + } + + log.Printf("✅ Relay dial to %s successful", p.ID) + + //perform WebRTC handshake + peerConn, err = c.InitiateWebRTCConnectionWithRetry(p.ID, 1) + if err != nil { + log.Printf("⚠ WebRTC connection via relay failed: %v", err) + continue + } + } + + // If WebRTC connected, fetch manifest + if peerConn != nil { + manifest, err = c.RequestManifest(peerConn, cidStr) + if err == nil { + firstPeer = peerConn + break + } + peerConn.Close() + } + } + + if firstPeer == nil { + return fmt.Errorf("failed to connect to any provider to get manifest") + } + + // Store pieces in the database + for _, piece := range manifest.Pieces { + if err := c.DB.UpsertPiece(ctx, cidStr, piece.Index, piece.Offset, piece.Size, piece.Hash, false); err != nil { + log.Printf("Failed to store piece info for download: %v", err) + } + } + + downloadPath := fmt.Sprintf("%s.download", manifest.Filename) + finalPath := manifest.Filename + localFile, err := os.Create(downloadPath) + if err != nil { + firstPeer.Close() + return fmt.Errorf("failed to create file: %w", err) + } + + pieces, _ := c.DB.GetPieces(ctx, cidStr) + if len(pieces) == 0 { + return fmt.Errorf("failed to retrieve piece information after receiving manifest") + } + + state := &types.DownloadState{ + File: localFile, + Manifest: manifest, + TotalPieces: int(manifest.NumPieces), + Pieces: pieces, + Completed: make(chan bool, 1), + Progress: progressbar.DefaultBytes(manifest.TotalSize, "downloading..."), + PieceStatus: make([]bool, int(manifest.NumPieces)), + PieceAssignees: make(map[int]peer.ID), + PieceBuffers: make(map[int][][]byte), + CompletedPieces: 0, + PieceTimers: make(map[int]*time.Timer), + RetryCounts: make(map[int]int), + } + c.DownloadsMux.Lock() + c.ActiveDownloads[cidStr] = state + c.DownloadsMux.Unlock() + + // Parallel downloads + var wg sync.WaitGroup + peersToUse := providers + if len(peersToUse) > types.MaxParallelDownloads { + peersToUse = peersToUse[:types.MaxParallelDownloads] + } + chunksPerPeer := state.TotalPieces / len(peersToUse) + + for i, p := range peersToUse { + start := i * chunksPerPeer + end := start + chunksPerPeer + if i == len(peersToUse)-1 { + end = state.TotalPieces + } + + wg.Add(1) + go func(peerInfo peer.AddrInfo, startPiece, endPiece int) { + defer wg.Done() + var peerConn *webRTC.SimpleWebRTCPeer + if peerInfo.ID.String() == firstPeer.GetSignalingStream().Conn().RemotePeer().String() { + peerConn = firstPeer + } else { + var connErr error + peerConn, connErr = c.InitiateWebRTCConnectionWithRetry(peerInfo.ID, 2) + if connErr != nil { + log.Printf("Chunk peer connect failed: %v", connErr) + return + } + defer peerConn.Close() + } + c.DownloadChunksFromPeer(peerConn, state, startPiece, endPiece) + }(p, start, end) + } + + <-state.Completed + localFile.Close() + + if err := os.Rename(downloadPath, finalPath); err != nil { + return fmt.Errorf("failed to rename file: %w", err) + } + + fmt.Printf("\n✅ Download complete. File saved as %s\n", finalPath) + return nil +} + +func (c *Client) DownloadChunksFromPeer(peer *webRTC.SimpleWebRTCPeer, state *types.DownloadState, startPiece, endPiece int) { + for i := startPiece; i < endPiece; i++ { + state.Mu.Lock() + if state.PieceStatus[i] { + state.Mu.Unlock() + continue + } + state.PieceAssignees[i] = peer.GetSignalingStream().Conn().RemotePeer() + state.Mu.Unlock() + + req := types.ControlMessage{ + Command: "REQUEST_PIECE", + CID: state.Manifest.CID, + Index: int64(i), + } + + state.Mu.Lock() //piece timeout + state.PieceTimers[i] = time.AfterFunc(types.PieceTimeout, func() { + log.Printf("Piece %d timed out, re-requesting...", i) + c.ReRequestPiece(state, i) + }) + state.Mu.Unlock() + + if err := peer.SendJSONReliable(req); err != nil { + log.Printf("Failed to request piece %d from %s: %v", i, peer.GetSignalingStream().Conn().RemotePeer(), err) + return + } + } +} diff --git a/internal/core/network.go b/internal/core/network.go new file mode 100644 index 0000000..4a0d0f2 --- /dev/null +++ b/internal/core/network.go @@ -0,0 +1,717 @@ +package core + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "log" + "os" + "time" + + webRTC "torrentium/internal/client" + p2p "torrentium/internal/p2p" + "torrentium/internal/types" + + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" +) + +func (c *Client) DebugNetworkStatus() { + fmt.Println("\n=== Network Debug Info ===") + fmt.Printf("Our Peer ID: %s\n", c.Host.ID()) + fmt.Printf("Our Addresses:\n") + for _, addr := range c.Host.Addrs() { + fmt.Printf(" %s/p2p/%s\n", addr, c.Host.ID()) + } + + peers := c.Host.Network().Peers() + fmt.Printf("\nConnected Peers (%d):\n", len(peers)) + for i, peerID := range peers { + conn := c.Host.Network().ConnsToPeer(peerID) + if len(conn) > 0 { + fmt.Printf(" %d. %s\n", i+1, peerID) + fmt.Printf(" Address: %s\n", conn[0].RemoteMultiaddr()) + } + } + + routingTableSize := c.DHT.RoutingTable().Size() + fmt.Printf("\nDHT Routing Table Size: %d\n", routingTableSize) + + fmt.Printf("\nShared Files (%d):\n", len(c.SharingFiles)) + for cid, fileInfo := range c.SharingFiles { + fmt.Printf(" CID: %s\n", cid) + fmt.Printf(" File: %s\n", fileInfo.Name) + fmt.Printf(" ---\n") + } +} + +func (c *Client) AnnounceFile(cidStr string) error { + fileCID, err := cid.Decode(cidStr) + if err != nil { + return fmt.Errorf("invalid CID: %w", err) + } + fmt.Printf("Re-announcing CID %s to DHT...\n", cidStr) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + if err := c.DHT.Provide(ctx, fileCID, true); err != nil { + return fmt.Errorf("failed to announce: %w", err) + } + fmt.Println(" - Successfully announced to DHT") + return nil +} + +func (c *Client) StartDHTMaintenance() { + go func() { + ticker := time.NewTicker(10 * time.Minute) + defer ticker.Stop() + for range ticker.C { + log.Println("Performing DHT maintenance...") + c.DHT.RefreshRoutingTable() + peers := c.Host.Network().Peers() + log.Printf("Connected to %d peers", len(peers)) + if len(peers) < 5 { + log.Println("Low peer count; re-bootstrapping...") + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + _ = p2p.Bootstrap(ctx, c.Host, c.DHT) + cancel() + } + } + }() +} + +func (c *Client) CheckConnectionHealth() { + peers := c.Host.Network().Peers() + fmt.Printf("\n=== Connection Health ===\n") + fmt.Printf("Connected peers: %d\n", len(peers)) + if len(peers) < 3 { + fmt.Println(" - Warning: Low peer count. Consider restarting or checking network connectivity.") + } else { + fmt.Println(" - Good peer connectivity") + } + + routingTableSize := c.DHT.RoutingTable().Size() + fmt.Printf("DHT routing table size: %d\n", routingTableSize) + if routingTableSize < 10 { + fmt.Println(" - Warning: Small DHT routing table. File discovery may be limited.") + } else { + fmt.Println(" - Good DHT connectivity") + } +} + +func (c *Client) FindProvidersWithTimeout(id cid.Cid, timeout time.Duration, maxProviders int) ([]peer.AddrInfo, error) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + providersChan := c.DHT.FindProvidersAsync(ctx, id, maxProviders) + var providers []peer.AddrInfo + var totalFound int + + done := make(chan struct{}) + go func() { + defer close(done) + for provider := range providersChan { + totalFound++ + if provider.ID != c.Host.ID() { + providers = append(providers, provider) + fmt.Printf(" - Found provider %d: %s\n", len(providers), provider.ID) + if len(providers) >= maxProviders { + break + } + } + } + }() + + select { + case <-done: + fmt.Printf("Provider search completed. Found %d total providers, %d unique external providers\n", + totalFound, len(providers)) + case <-time.After(timeout): + fmt.Printf("Provider search timed out. Found %d providers so far\n", len(providers)) + } + + return providers, nil +} + +func (c *Client) ConnectToPeer(multiaddrStr string) error { + addr, err := multiaddr.NewMultiaddr(multiaddrStr) + if err != nil { + return fmt.Errorf("invalid multiaddr: %w", err) + } + + peerInfo, err := peer.AddrInfoFromP2pAddr(addr) + if err != nil { + return fmt.Errorf("failed to parse peer info: %w", err) + } + + fmt.Printf("Attempting to connect to peer %s...\n", peerInfo.ID) + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + if err := c.Host.Connect(ctx, *peerInfo); err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + + fmt.Printf(" - Successfully connected to peer %s\n", peerInfo.ID) + + c.Host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, time.Hour) + + return nil +} + +func (c *Client) ListConnectedPeers() { + peers := c.Host.Network().Peers() + fmt.Printf("\n=== Connected Peers (%d) ===\n", len(peers)) + for _, peerID := range peers { + conn := c.Host.Network().ConnsToPeer(peerID) + if len(conn) > 0 { + fmt.Printf("Peer: %s\n", peerID) + fmt.Printf(" Address: %s\n", conn[0].RemoteMultiaddr()) + } + } +} + +func (c *Client) InitiateWebRTCConnectionWithRetry(targetPeerID peer.ID, maxRetries int) (*webRTC.SimpleWebRTCPeer, error) { + + // First, test ICE connectivity + log.Printf("Testing ICE connectivity before attempting WebRTC connection...") + if err := webRTC.TestICEConnectivity(); err != nil { + log.Printf("ICE connectivity test failed: %v", err) + log.Printf("Warning: WebRTC connections may fail due to network restrictions") + } else { + log.Printf("ICE connectivity test passed") + } + + var lastErr error + for attempt := 1; attempt <= maxRetries; attempt++ { + if attempt > 1 { + backoff := time.Duration(1< 0 { + info = pinfo + } else { + lastErr = fmt.Errorf("dht lookup failed: %w", err) + continue + } + + if len(info.Addrs) == 0 { + lastErr = fmt.Errorf("peer %s has no known multiaddrs", targetPeerID) + continue + } + + c.Host.Peerstore().AddAddrs(info.ID, info.Addrs, time.Hour) + fmt.Println(c.Host.Peerstore()) + + if c.Host.Network().Connectedness(info.ID) != network.Connected { + connectCtx, connectCancel := context.WithTimeout(context.Background(), 20*time.Second) + err := c.Host.Connect(connectCtx, info) + connectCancel() + if err != nil { + log.Printf("failed to connect to peer %s: %v", info.ID, err) + fmt.Printf("DHT lookup failed: %v. This could be a network issue now trying connection using relays.\n", err) + return nil, err + } + + fmt.Printf("Successfully connected to peer %s\n", info.ID) + + time.Sleep(1 * time.Second) + } + + webrtcPeer, err := webRTC.NewSimpleWebRTCPeer(c.OnDataChannelMessage, c.OnWebRTCPeerClose) + if err != nil { + lastErr = err + return nil, err + } + + offer, err := webrtcPeer.CreateOffer() + if err != nil { + webrtcPeer.Close() + lastErr = err + return nil, err + } + + streamCtx, streamCancel := context.WithTimeout(context.Background(), 30*time.Second) + s, err := c.Host.NewStream(streamCtx, targetPeerID, p2p.SignalingProtocolID) + streamCancel() + if err != nil { + webrtcPeer.Close() + lastErr = err + return nil, err + } + + webrtcPeer.SetSignalingStream(s) + encoder := json.NewEncoder(s) + decoder := json.NewDecoder(s) + + offerMsg := map[string]string{"type": "offer", "data": offer} + if err := encoder.Encode(offerMsg); err != nil { + webrtcPeer.Close() + lastErr = err + continue + } + + var answerMsg map[string]string + if err := decoder.Decode(&answerMsg); err != nil { + webrtcPeer.Close() + lastErr = fmt.Errorf("failed to decode answer: %w", err) + continue + } + + if answerMsg["type"] == "error" { + webrtcPeer.Close() + lastErr = fmt.Errorf("peer returned error: %s", answerMsg["data"]) + continue + } + if answerMsg["type"] != "answer" { + webrtcPeer.Close() + lastErr = fmt.Errorf("expected answer, got: %s", answerMsg["type"]) + continue + } + + if err := webrtcPeer.HandleAnswer(answerMsg["data"]); err != nil { + webrtcPeer.Close() + lastErr = err + continue + } + + if err := webrtcPeer.WaitForConnection(90 * time.Second); err != nil { + webrtcPeer.Close() + lastErr = fmt.Errorf("WebRTC connection failed: %w", err) + continue + } + + //wait for the data channels to be ready + if err := webrtcPeer.WaitForDataChannels(10 * time.Second); err != nil { + webrtcPeer.Close() + lastErr = fmt.Errorf("data channels did not open in time: %w", err) + continue + } + + fmt.Printf("WebRTC connection established with %s\n", targetPeerID) + c.PeersMux.Lock() + c.WebRTCPeers[targetPeerID] = webrtcPeer + c.PeersMux.Unlock() + + return webrtcPeer, nil + } + return nil, lastErr +} + +func (c *Client) HandleWebRTCOffer(offer, remotePeerID string, s network.Stream) (string, error) { + peerID, err := peer.Decode(remotePeerID) + if err != nil { + return "", fmt.Errorf("invalid peer ID: %w", err) + } + + webrtcPeer, err := webRTC.NewSimpleWebRTCPeer(c.OnDataChannelMessage, c.OnWebRTCPeerClose) + if err != nil { + return "", err + } + + webrtcPeer.SetSignalingStream(s) + + answer, err := webrtcPeer.HandleOffer(offer) + if err != nil { + webrtcPeer.Close() + return "", err + } + + c.PeersMux.Lock() + c.WebRTCPeers[peerID] = webrtcPeer + c.PeersMux.Unlock() + + return answer, nil +} + +func (c *Client) OnDataChannelMessage(msg webRTC.DataChannelMessage, peer *webRTC.SimpleWebRTCPeer) { + if !msg.IsString { + log.Printf("Received unexpected binary message, expecting JSON.") + return + } + if len(msg.Data) == 0 { + return + } + var ctrl types.ControlMessage + if err := json.Unmarshal(msg.Data, &ctrl); err != nil { + var ping map[string]string + if err2 := json.Unmarshal(msg.Data, &ping); err2 == nil { + if ping["type"] == "ping" { + // Respond to ping + pong := map[string]string{"type": "pong"} + peer.SendJSONReliable(pong) + return + } else if ping["type"] == "pong" { + c.HandlePong(peer.GetSignalingStream().Conn().RemotePeer()) + return + } + } + log.Printf("Failed to unmarshal control message: %v. Raw message: %s", err, string(msg.Data)) + return + } + c.HandleControlMessage(ctrl, peer) +} + +func (c *Client) HandleControlMessage(ctrl types.ControlMessage, peer *webRTC.SimpleWebRTCPeer) { + ctx := context.Background() + switch ctrl.Command { + case "REQUEST_MANIFEST": + c.HandleManifestRequest(ctx, ctrl, peer) + case "MANIFEST": + c.ManifestChMu.Lock() + if ch, ok := c.ManifestWaiters[ctrl.CID]; ok { + ch <- ctrl + } + c.ManifestChMu.Unlock() + case "REQUEST_PIECE": + go c.HandlePieceRequest(ctx, ctrl, peer) + case "PIECE_CHUNK": + c.HandlePieceChunk(ctrl, peer) + case "CHUNK_ACK": + c.HandleChunkAck(ctrl) + default: + //do nothing + } +} + +func (c *Client) HandlePieceChunk(ctrl types.ControlMessage, peer *webRTC.SimpleWebRTCPeer) { + c.DownloadsMux.RLock() + state, ok := c.ActiveDownloads[ctrl.CID] + c.DownloadsMux.RUnlock() + if !ok { + return + } + + // Send an ACK back to the sender using reliable channel + ackMsg := types.ControlMessage{ + Command: "CHUNK_ACK", + CID: ctrl.CID, + Index: ctrl.Index, + Sequence: ctrl.Sequence, + } + if err := peer.SendJSONReliable(ackMsg); err != nil { + log.Printf("Failed to send ACK for chunk %d of piece %d: %v", ctrl.Sequence, ctrl.Index, err) + } + + state.Mu.Lock() + defer state.Mu.Unlock() + + if state.PieceStatus[ctrl.Index] { + return // Already have this piece + } + + if state.PieceBuffers[int(ctrl.Index)] == nil { + state.PieceBuffers[int(ctrl.Index)] = make([][]byte, ctrl.TotalChunks) + } + + chunkData, err := hex.DecodeString(ctrl.Payload) + if err != nil { + log.Printf("Failed to decode chunk payload: %v", err) + return + } + + state.PieceBuffers[int(ctrl.Index)][ctrl.ChunkIndex] = chunkData + _ = state.Progress.Add(len(chunkData)) + + // Check if piece is complete + isComplete := true + var pieceSize int + for _, chunk := range state.PieceBuffers[int(ctrl.Index)] { + if chunk == nil { + isComplete = false + break + } + pieceSize += len(chunk) + } + + if isComplete { + //Stop the timer for this piece + if timer, ok := state.PieceTimers[int(ctrl.Index)]; ok { + timer.Stop() + delete(state.PieceTimers, int(ctrl.Index)) + } + + // Reassemble and write piece + pieceData := make([]byte, 0, pieceSize) + for _, chunk := range state.PieceBuffers[int(ctrl.Index)] { + pieceData = append(pieceData, chunk...) + } + + //Verify piece hash + h := sha256.New() + h.Write(pieceData) + hash := hex.EncodeToString(h.Sum(nil)) + + if hash != state.Pieces[ctrl.Index].Hash { + log.Printf("Piece %d hash mismatch", ctrl.Index) + state.PieceBuffers[int(ctrl.Index)] = nil // Clear buffer to retry + return + } + + if _, err := state.File.WriteAt(pieceData, state.Pieces[ctrl.Index].Offset); err != nil { + log.Printf("Failed to write piece %d to file: %v", ctrl.Index, err) + return + } + + state.PieceStatus[ctrl.Index] = true + state.CompletedPieces++ + delete(state.PieceBuffers, int(ctrl.Index)) + + if state.CompletedPieces == state.TotalPieces { + state.Completed <- true + } + } +} + +func (c *Client) HandleChunkAck(ctrl types.ControlMessage) { + c.UnackedChunksMux.Lock() + defer c.UnackedChunksMux.Unlock() + if _, ok := c.UnackedChunks[ctrl.CID]; ok { + if _, ok := c.UnackedChunks[ctrl.CID][ctrl.Index]; ok { + delete(c.UnackedChunks[ctrl.CID][ctrl.Index], ctrl.Sequence) + if len(c.UnackedChunks[ctrl.CID][ctrl.Index]) == 0 { + delete(c.UnackedChunks[ctrl.CID], ctrl.Index) + } + } + if len(c.UnackedChunks[ctrl.CID]) == 0 { + delete(c.UnackedChunks, ctrl.CID) + } + } +} + +func (c *Client) HandlePieceRequest(ctx context.Context, ctrl types.ControlMessage, peer *webRTC.SimpleWebRTCPeer) { + pieces, err := c.DB.GetPieces(ctx, ctrl.CID) + if err != nil || int(ctrl.Index) >= len(pieces) { + log.Printf("Invalid piece request for CID %s, index %d", ctrl.CID, ctrl.Index) + return + } + + fileInfo, err := c.DB.GetLocalFileByCID(ctx, ctrl.CID) + if err != nil { + log.Printf("File not found for piece request: %s", ctrl.CID) + return + } + + file, err := os.Open(fileInfo.FilePath) + if err != nil { + log.Printf("Failed to open file for piece request: %v", err) + return + } + defer file.Close() + + piece := pieces[ctrl.Index] + pieceBuffer := make([]byte, piece.Size) + _, err = file.ReadAt(pieceBuffer, piece.Offset) + if err != nil { + log.Printf("Failed to read piece %d: %v", ctrl.Index, err) + return + } + + totalChunks := (len(pieceBuffer) + types.MaxChunk - 1) / types.MaxChunk + for i := 0; i < totalChunks; i++ { + start := i * types.MaxChunk + end := start + types.MaxChunk + if end > len(pieceBuffer) { + end = len(pieceBuffer) + } + chunk := pieceBuffer[start:end] + + chunkMsg := types.ControlMessage{ + Command: "PIECE_CHUNK", + CID: ctrl.CID, + Index: ctrl.Index, + ChunkIndex: i, + TotalChunks: totalChunks, + Payload: hex.EncodeToString(chunk), + Sequence: i, + } + + // Store the sent chunk and start a retransmission timer + c.UnackedChunksMux.Lock() + if c.UnackedChunks[ctrl.CID] == nil { + c.UnackedChunks[ctrl.CID] = make(map[int64]map[int]types.ControlMessage) + } + if c.UnackedChunks[ctrl.CID][ctrl.Index] == nil { + c.UnackedChunks[ctrl.CID][ctrl.Index] = make(map[int]types.ControlMessage) + } + c.UnackedChunks[ctrl.CID][ctrl.Index][i] = chunkMsg + c.UnackedChunksMux.Unlock() + time.AfterFunc(types.RetransmissionTimeout, func() { c.RetransmitChunk(peer, chunkMsg) }) + + if err := peer.SendJSON(chunkMsg); err != nil { + log.Printf("Failed to send chunk %d of piece %d: %v", i, ctrl.Index, err) + return + } + delay := c.CongestionCtrl[peer.GetSignalingStream().Conn().RemotePeer()] + time.Sleep(delay) + } +} + +func (c *Client) RetransmitChunk(peer *webRTC.SimpleWebRTCPeer, chunkMsg types.ControlMessage) { + c.UnackedChunksMux.RLock() + defer c.UnackedChunksMux.RUnlock() + if _, ok := c.UnackedChunks[chunkMsg.CID]; ok { + if _, ok := c.UnackedChunks[chunkMsg.CID][chunkMsg.Index]; ok { + if _, ok := c.UnackedChunks[chunkMsg.CID][chunkMsg.Index][chunkMsg.Sequence]; ok { + log.Printf("Retransmitting chunk %d of piece %d", chunkMsg.Sequence, chunkMsg.Index) + if err := peer.SendJSON(chunkMsg); err != nil { + log.Printf("Failed to retransmit chunk %d of piece %d: %v", chunkMsg.Sequence, chunkMsg.Index, err) + } + // Reset timer + time.AfterFunc(types.RetransmissionTimeout, func() { c.RetransmitChunk(peer, chunkMsg) }) + } + } + } +} + +func (c *Client) HandleManifestRequest(ctx context.Context, ctrl types.ControlMessage, peer *webRTC.SimpleWebRTCPeer) { + localFile, err := c.DB.GetLocalFileByCID(ctx, ctrl.CID) + if err != nil { + log.Printf("File not found for manifest: %s", ctrl.CID) + return + } + + pieces, err := c.DB.GetPieces(ctx, ctrl.CID) + if err != nil { + log.Printf("Error getting pieces: %v", err) + return + } + + manifest := types.ControlMessage{ + Command: "MANIFEST", + CID: ctrl.CID, + TotalSize: localFile.FileSize, + HashHex: localFile.FileHash, + NumPieces: int64(len(pieces)), + Pieces: pieces, + Filename: localFile.Filename, + } + + if err := peer.SendJSONReliable(manifest); err != nil { + log.Printf("Error sending manifest: %v", err) + } +} + +func (c *Client) OnWebRTCPeerClose(peerID peer.ID) { + log.Printf("WebRTC peer disconnected: %s", peerID) + c.PeersMux.Lock() + delete(c.WebRTCPeers, peerID) + c.PeersMux.Unlock() + + // Handle download resumption logic + c.DownloadsMux.Lock() + defer c.DownloadsMux.Unlock() + + for cid, state := range c.ActiveDownloads { + for pieceIndex, assignee := range state.PieceAssignees { + if assignee == peerID { + log.Printf("Peer %s disconnected, re-requesting piece %d for download %s", peerID, pieceIndex, cid) + // Re-queue the piece for download + go c.ReRequestPiece(state, pieceIndex) + } + } + } +} + +func (c *Client) MonitorCongestion() { + ticker := time.NewTicker(types.PingInterval) + for range ticker.C { + c.PeersMux.RLock() + for pid, peer := range c.WebRTCPeers { + if connState := peer.GetConnectionState(); connState == webRTC.ConnectionStateConnected { + c.PingTimes[pid] = time.Now() + ping := map[string]string{"type": "ping"} + peer.SendJSONReliable(ping) + } + } + c.PeersMux.RUnlock() + } +} + +func (c *Client) HandlePong(pid peer.ID) { + if start, ok := c.PingTimes[pid]; ok { + rtt := time.Since(start) + c.RTTMux.Lock() + if _, ok := c.RTTMeasurements[pid]; !ok { + c.RTTMeasurements[pid] = []time.Duration{} + } + c.RTTMeasurements[pid] = append(c.RTTMeasurements[pid], rtt) + if len(c.RTTMeasurements[pid]) > 10 { + c.RTTMeasurements[pid] = c.RTTMeasurements[pid][1:] + } + avgRTT := time.Duration(0) + for _, d := range c.RTTMeasurements[pid] { + avgRTT += d + } + avgRTT /= time.Duration(len(c.RTTMeasurements[pid])) + var delay time.Duration = types.MinDelay + if avgRTT > types.MaxRTT { + delay = types.MaxDelay + } else if avgRTT > types.MaxRTT/2 { + delay = (types.MaxDelay - types.MinDelay) / 2 + } + c.CongestionCtrl[pid] = delay + c.RTTMux.Unlock() + delete(c.PingTimes, pid) + } +} + +func (c *Client) ReRequestPiece(state *types.DownloadState, pieceIndex int) { + // Re-assign to another peer with backoff + retryCount := state.RetryCounts[pieceIndex] + backoff := types.ExponentialBackoffBase * time.Duration(1< types.MaxBackoff { + backoff = types.MaxBackoff + } + time.AfterFunc(backoff, func() { + // For simplicity, we'll just re-request from any connected peer. + // A more advanced implementation would select a new peer. + for _, p := range c.WebRTCPeers { + req := types.ControlMessage{ + Command: "REQUEST_PIECE", + CID: state.Manifest.CID, + Index: int64(pieceIndex), + } + if err := p.SendJSONReliable(req); err == nil { + log.Printf("Re-requested piece %d from a different peer.", pieceIndex) + return + } + } + log.Printf("Failed to re-request piece %d: no available peers.", pieceIndex) + }) + state.RetryCounts[pieceIndex]++ +} + +func (c *Client) RequestManifest(peer *webRTC.SimpleWebRTCPeer, cidStr string) (types.ControlMessage, error) { + req := types.ControlMessage{Command: "REQUEST_MANIFEST", CID: cidStr} + if err := peer.SendJSONReliable(req); err != nil { + return types.ControlMessage{}, err + } + + manifestCh := make(chan types.ControlMessage, 1) + c.ManifestChMu.Lock() + c.ManifestWaiters[cidStr] = manifestCh + c.ManifestChMu.Unlock() + + defer func() { + c.ManifestChMu.Lock() + delete(c.ManifestWaiters, cidStr) + c.ManifestChMu.Unlock() + }() + + select { + case manifest := <-manifestCh: + return manifest, nil + case <-time.After(30 * time.Second): + return types.ControlMessage{}, fmt.Errorf("timed out waiting for manifest") + } +} diff --git a/internal/p2p/host.go b/internal/p2p/host.go index b8397bb..c27310c 100644 --- a/internal/p2p/host.go +++ b/internal/p2p/host.go @@ -58,8 +58,7 @@ func NewHost( } // Relay config (static relay on Render) - relayAddrStr := "/dns4/relay-torrentium.onrender.com/tcp/443/wss/p2p/12D3KooWS7jchAU23xcSYasitheTvTyBpjSx4KuRgj5rv5GBBYoB" - + relayAddrStr := "/dns4/relay-torrentium.onrender.com/tcp/443/wss/p2p/12D3KooWMbTZL5taZH4CK9hCkTLkXaPadBoMR3KJZRFhbYBPrdkK" relayMaddr, err := ma.NewMultiaddr(relayAddrStr) if err != nil { return nil, nil, fmt.Errorf("invalid relay multiaddr: %w", err) diff --git a/internal/types/models.go b/internal/types/models.go new file mode 100644 index 0000000..b752a4a --- /dev/null +++ b/internal/types/models.go @@ -0,0 +1,69 @@ +package types + +import ( + "os" + "sync" + "time" + + db "torrentium/internal/db" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/schollz/progressbar/v3" +) + +const ( + DefaultPieceSize = 1 << 20 // 1 MiB pieces + MaxProviders = 10 + MaxChunk = 16 * 1024 // 16KiB chunks + MaxParallelDownloads = 3 + PieceTimeout = 300 * time.Second // Timeout for downloading a single piece + RetransmissionTimeout = 5 * time.Second + KeepAliveInterval = 15 * time.Second + PingInterval = 10 * time.Second + MaxRTT = 500 * time.Millisecond + MinDelay = 0 + MaxDelay = 100 * time.Millisecond + ExponentialBackoffBase = 1 * time.Second + MaxBackoff = 32 * time.Second +) + +type FileInfo struct { + FilePath string + Hash string + Size int64 + Name string + PieceSz int64 +} + +type ControlMessage struct { + Command string `json:"command"` + CID string `json:"cid,omitempty"` + PieceSize int64 `json:"piece_size,omitempty"` + TotalSize int64 `json:"total_size,omitempty"` + HashHex string `json:"hash_hex,omitempty"` + NumPieces int64 `json:"num_pieces,omitempty"` + Pieces []db.Piece `json:"pieces,omitempty"` + PieceHash string `json:"piece_hash,omitempty"` + Index int64 `json:"index,omitempty"` + Filename string `json:"filename,omitempty"` + ChunkIndex int `json:"chunk_index,omitempty"` + TotalChunks int `json:"total_chunks,omitempty"` + Payload string `json:"payload,omitempty"` + Sequence int `json:"sequence,omitempty"` +} + +type DownloadState struct { + File *os.File + Manifest ControlMessage + TotalPieces int + Pieces []db.Piece + Completed chan bool + Progress *progressbar.ProgressBar + PieceStatus []bool // true if piece is downloaded + PieceAssignees map[int]peer.ID + PieceBuffers map[int][][]byte // Buffer to reassemble chunks into pieces + Mu sync.Mutex + CompletedPieces int + PieceTimers map[int]*time.Timer // Timers for each piece + RetryCounts map[int]int // Retry counts for exponential backoff +}