Skip to content
9 changes: 9 additions & 0 deletions packages/p2p-media-loader-core/src/bandwidth-calculator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,13 @@ export class BandwidthCalculator {
this.loadingOnlyTimestamps.splice(0, samplesToRemove);
this.timestamps.splice(0, samplesToRemove);
}

clear() {
this.bytes.length = 0;
this.loadingOnlyTimestamps.length = 0;
this.timestamps.length = 0;
this.loadingsCount = 0;
this.noLoadingsTime = 0;
this.loadingsStoppedAt = 0;
}
}
24 changes: 22 additions & 2 deletions packages/p2p-media-loader-core/src/p2p/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,30 @@ export class P2PLoader {
}

if (peersWithSegment.length === 0) return;
const peer = Utils.getRandomItem(peersWithSegment);

let selectedPeer: Peer;

if (peersWithSegment.length === 1) {
selectedPeer = peersWithSegment[0];
} else {
let maxSpeed = 0;
for (const peer of peersWithSegment) {
const speed = peer.downloadBandwidth;
if (speed > maxSpeed) maxSpeed = speed;
}

if (maxSpeed > 0) {
const baseSpeed = Math.max(1, maxSpeed * 0.1);
selectedPeer = Utils.getWeightedRandomItem(peersWithSegment, (peer) =>
Math.max(peer.downloadBandwidth, baseSpeed),
);
} else {
selectedPeer = Utils.getRandomItem(peersWithSegment);
}
}

const request = this.requests.getOrCreateRequest(segment);
peer.downloadSegment(request);
selectedPeer.downloadSegment(request);
}

isSegmentLoadingOrLoadedBySomeone(segment: SegmentWithStream): boolean {
Expand Down
29 changes: 29 additions & 0 deletions packages/p2p-media-loader-core/src/p2p/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import * as Utils from "../utils/utils.js";
import * as Command from "./commands/index.js";
import { PeerProtocol, PeerConfig } from "./peer-protocol.js";
import { EventTarget } from "../utils/event-target.js";
import { BandwidthCalculator } from "../bandwidth-calculator.js";

const { PeerCommandType } = Command;
type PeerEventHandlers = {
Expand Down Expand Up @@ -40,6 +41,8 @@ export class Peer {
private downloadingErrors: RequestError<
PeerRequestErrorType | RequestAbortErrorType
>[] = [];
private readonly bandwidthCalculator = new BandwidthCalculator();
private cachedDownloadBandwidth = { value: 0, timestamp: 0 };
private logger = debug("p2pml-core:peer");
private readonly onPeerClosed: CoreEventMap["onPeerClose"];

Expand Down Expand Up @@ -78,6 +81,18 @@ export class Peer {
return this.downloadingContext?.request.segment;
}

get downloadBandwidth(): number {
const now = performance.now();
// Cache the array iteration math for 1000ms to preserve O(1) hot path efficiency during rapid queue segment evaluations
if (now - this.cachedDownloadBandwidth.timestamp > 1000) {
// Uses a 15-second tracking window to calculate a moving average of the peer's throughput speed
this.cachedDownloadBandwidth.value =
this.bandwidthCalculator.getBandwidthLoadingOnly(15);
this.cachedDownloadBandwidth.timestamp = now;
}
return this.cachedDownloadBandwidth.value;
}

getSegmentStatus(
segment: SegmentWithStream,
): "loaded" | "http-loading" | undefined {
Expand Down Expand Up @@ -178,6 +193,7 @@ export class Peer {

this.downloadingErrors = [];
controls.completeOnSuccess();
this.bandwidthCalculator.stopLoading();
this.downloadingContext = undefined;
break;
}
Expand Down Expand Up @@ -219,13 +235,16 @@ export class Peer {
return;
}

this.bandwidthCalculator.addBytes(chunk.byteLength);
this.cachedDownloadBandwidth.timestamp = 0; // invalidate cache
controls.addLoadedChunk(chunk);
};

downloadSegment(segmentRequest: Request) {
if (this.downloadingContext) {
throw new Error("Some segment already is downloading");
}
this.bandwidthCalculator.startLoading();
this.downloadingContext = {
request: segmentRequest,
requestId: Math.floor(Math.random() * 1000),
Expand All @@ -240,6 +259,12 @@ export class Peer {
const { request, requestId } = this.downloadingContext;
this.sendCancelSegmentRequestCommand(request.segment, requestId);
this.downloadingErrors.push(error);
this.bandwidthCalculator.stopLoading();
if (error.type !== "abort") {
this.bandwidthCalculator.clear();
this.cachedDownloadBandwidth.timestamp = 0;
this.logger(`cleared bandwidth history due to ${error.type}`);
}
this.downloadingContext = undefined;

const timeoutErrors = this.downloadingErrors.filter(
Expand Down Expand Up @@ -295,6 +320,10 @@ export class Peer {
this.logger(`cancel segment request ${segment.externalId} (${type})`);
const error = new RequestError(type);
controls.abortOnError(error);
this.bandwidthCalculator.stopLoading();
this.bandwidthCalculator.clear();
this.cachedDownloadBandwidth.timestamp = 0;
this.logger(`cleared bandwidth history due to ${error.type}`);
this.downloadingContext = undefined;
this.downloadingErrors.push(error);
}
Expand Down
23 changes: 23 additions & 0 deletions packages/p2p-media-loader-core/src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,29 @@ export function getRandomItem<T>(items: T[]): T {
return items[Math.floor(Math.random() * items.length)];
}

export function getWeightedRandomItem<T>(
items: T[],
weightAccessor: (item: T) => number,
): T {
if (items.length === 0) throw new Error("Cannot get item from empty array");
if (items.length === 1) return items[0];

let totalWeight = 0;
const weights = items.map((item) => {
const weight = weightAccessor(item);
totalWeight += weight;
return weight;
});

let randomWeight = Math.random() * totalWeight;
for (let i = 0; i < items.length; i++) {
randomWeight -= weights[i];
if (randomWeight <= 0) return items[i];
}

return items[items.length - 1];
}

export function utf8ToUintArray(utf8String: string): Uint8Array {
const encoder = new TextEncoder();
const bytes = new Uint8Array(utf8String.length);
Expand Down
Loading