Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions server/repositories/contactRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@ import type { HydratedContact, ChildRecordsPayload } from "./types.ts";
// Re-export types for consumers
export type { HydratedContact, ChildRecordsPayload };

// Central registry of contact child relations for OCP extensibility
export const RELATION_REGISTRY = {
emails: { table: schema.contactEmails, dbName: "contact_emails" },
phones: { table: schema.contactPhones, dbName: "contact_phones" },
socialLinks: {
table: schema.contactSocialLinks,
dbName: "contact_social_links",
},
tags: { table: schema.contactTags, dbName: "contact_tags" },
interests: { table: schema.contactInterests, dbName: "contact_interests" },
addresses: { table: schema.contactAddresses, dbName: "contact_addresses" },
attributes: { table: schema.contactAttributes, dbName: "contact_attributes" },
education: { table: schema.contactEducation, dbName: "contact_education" },
experience: { table: schema.contactExperience, dbName: "contact_experience" },
sources: { table: schema.contactSources, dbName: "contact_sources" },
} as const;

// =============================================================================
// URL Utilities (used by social link insertion)
// =============================================================================
Expand Down
32 changes: 20 additions & 12 deletions server/routes/contacts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
isEmbeddingAvailable,
} from "../services/dedupe/embeddings.ts";
import { dedupeService } from "../services/dedupe/index.ts";
import { ParallelQueue } from "../ai/routing/ParallelQueue.ts";
import {
normalizeContactById,
normalizeContacts,
Expand Down Expand Up @@ -476,20 +477,27 @@ router.post(
`Background bulk embedding failed: ${getErrorMessage(err)}`,
),
);
// Schedule incremental dedupe for each imported contact
for (const cid of createdIds) {
setTimeout(() => {
// Process incremental dedupe sequentially in the background to prevent lock saturation and CPU spikes
(async () => {
// Wait 3 seconds to let bulk inserts and embedding tasks settle
await new Promise((resolve) => setTimeout(resolve, 3000));
await ParallelQueue.process(createdIds, 1, async (cid) => {
const irid = `imp-${cid.slice(0, 8)}`;
dedupeService
.incrementalDedupeCheck(cid, irid)
.catch((err) =>
log.warn(
"API",
`Incremental dedupe for ${cid} failed: ${getErrorMessage(err)}`,
),
try {
await dedupeService.incrementalDedupeCheck(cid, irid);
} catch (err) {
log.warn(
"API",
`Incremental dedupe for ${cid} failed: ${getErrorMessage(err)}`,
);
}, 3_000);
}
}
});
})().catch((err) =>
log.error(
"API",
`Bulk background dedupe queue crashed: ${getErrorMessage(err)}`,
),
);
}
res.status(201).json({ success: true, count });
}
Expand Down
27 changes: 9 additions & 18 deletions server/services/contactService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import path from "path";
import { db, sqlite } from "../db.ts";
import * as schema from "../../src/db/schema.ts";
import { eq } from "drizzle-orm";
import { contactRepo } from "../repositories/contactRepository.ts";
import {
contactRepo,
RELATION_REGISTRY,
} from "../repositories/contactRepository.ts";
import { queueGeocode } from "./geocoding/index.ts";
import {
processBase64Avatar,
Expand Down Expand Up @@ -318,25 +321,13 @@ export const contactService = {
.where(eq(schema.contacts.id, id))
.run();

const childMappings: [keyof typeof body, string][] = [
["emails", "contact_emails"],
["phones", "contact_phones"],
["socialLinks", "contact_social_links"],
["tags", "contact_tags"],
["interests", "contact_interests"],
["addresses", "contact_addresses"],
["attributes", "contact_attributes"],
["education", "contact_education"],
["experience", "contact_experience"],
["sources", "contact_sources"],
];

for (const [bodyKey, tableName] of childMappings) {
if (body[bodyKey] !== undefined && Array.isArray(body[bodyKey])) {
for (const [bodyKey, config] of Object.entries(RELATION_REGISTRY)) {
const key = bodyKey as keyof typeof RELATION_REGISTRY;
if (body[key] !== undefined && Array.isArray(body[key])) {
sqlite
.prepare(`DELETE FROM ${tableName} WHERE contactId = ?`)
.prepare(`DELETE FROM ${config.dbName} WHERE contactId = ?`)
.run(id);
contactRepo.insertChildRecords(id, { [bodyKey]: body[bodyKey] });
contactRepo.insertChildRecords(id, { [key]: body[key] } as any);
}
}
});
Expand Down
6 changes: 6 additions & 0 deletions server/services/dedupe/passes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,14 @@ export async function runFunnelPass(
}[] = [];
let autoCount = 0;
let discardCount = 0;
let scoredCount = 0;

for (const candidate of allCandidates) {
if (scoredCount > 0 && scoredCount % 100 === 0) {
await new Promise<void>((resolve) => setImmediate(resolve));
}
scoredCount++;

const nA = normalizedMap.get(candidate.idA);
const nB = normalizedMap.get(candidate.idB);
if (!nA || !nB) continue;
Expand Down
39 changes: 39 additions & 0 deletions tests/unit/relationRegistry.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { describe, it, expect } from "vitest";
import { RELATION_REGISTRY } from "../../server/repositories/contactRepository.ts";

describe("Central Child Relation Registry (OCP)", () => {
it("defines mapping configuration for all expected child tables", () => {
const keys = Object.keys(RELATION_REGISTRY) as Array<
keyof typeof RELATION_REGISTRY
>;

// Core child properties we expect to be registered
const expectedKeys = [
"emails",
"phones",
"socialLinks",
"tags",
"interests",
"addresses",
"attributes",
"education",
"experience",
"sources",
];

for (const key of expectedKeys) {
expect(keys).toContain(key);
const config = RELATION_REGISTRY[key as keyof typeof RELATION_REGISTRY];
expect(config).toBeDefined();
expect(config.dbName).toBeTypeOf("string");
expect(config.dbName.length).toBeGreaterThan(0);
expect(config.table).toBeDefined();
}
});

it("does not have duplicate database names", () => {
const dbNames = Object.values(RELATION_REGISTRY).map((cfg) => cfg.dbName);
const uniqueDbNames = new Set(dbNames);
expect(dbNames.length).toBe(uniqueDbNames.size);
});
});
Loading