Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python

## Ignore DB files ##
*.db

### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down Expand Up @@ -176,4 +179,4 @@ pyrightconfig.json
# End of https://www.toptal.com/developers/gitignore/api/python

# VSCode
.vscode
.vscode
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
gptpenv
2 changes: 1 addition & 1 deletion chatgpt_to_sqlite/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ def export(filename, db_path, num):

# Build messages tables
messages_table = db.table("messages")
messages_table.upsert_all(messages, pk="message_id")
messages_table.upsert_all(all_messages, pk="message_id")
messages_table.add_foreign_key("chat_id", "conversations", "chat_id", ignore=True)
91 changes: 56 additions & 35 deletions chatgpt_to_sqlite/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,65 +13,86 @@ def concatenate_rows(message: dict, chat_id: str) -> Optional[dict]:
if not message:
return None

# sender (user, assistant, and system)
sender = message["author"]["role"] if message["author"] else "unknown"
sender = message.get("author", {}).get("role", "unknown")
content = message.get("content", {})
parts = content.get("parts")

if "parts" not in message["content"]:
# Make sure parts is a list with at least one valid string
if not isinstance(parts, list) or not parts:
print(f"SKIPPED: NO PARTS: {chat_id}")
return None

# Grab the first non-empty string part
first_valid_part = next(
(p for p in parts if isinstance(p, str) and p.strip()), None
)
if not first_valid_part:
return None

metadata = message.get("metadata", {})
is_user_system_message = metadata.get("is_user_system_message", False)

# System message
if is_user_system_message is True:
user_about_message = metadata.get("user_context_message_data", "").get(
"about_user_message", ""
)
about_model_message = metadata.get("user_context_message_data", "").get(
"about_model_message", ""
if is_user_system_message:
context_data = metadata.get("user_context_message_data", {})
user_about_message = context_data.get("about_user_message", "")
about_model_message = context_data.get("about_model_message", "")
total_system_message = (
f"ABOUT YOU:\n{about_model_message}\n\n"
f"ABOUT YOUR USER:\n{user_about_message}\n\n"
f"FIRST MESSAGE FROM THE USER:\n\n"
)
total_system_message = f"ABOUT YOU:\n{about_model_message}\n\nABOUT YOUR USER:\n{user_about_message}\n\nFIRST MESSAGE FROM THE USER:\n\n"
return {
"message_id": message["id"],
"message_id": message.get("id"),
"sender": "system",
"create_time": convert_timestamp(message["create_time"]),
"status": message["status"],
"weight": message["weight"],
"create_time": convert_timestamp(message.get("create_time")),
"status": message.get("status"),
"weight": message.get("weight"),
"text": total_system_message,
"model": None,
"chat_id": chat_id,
}

# User and assistant messages
text = message["content"]["parts"][0]

if text == "":
return None

model = message["metadata"].get("model_slug")
model = metadata.get("model_slug")

return {
"message_id": message["id"],
"message_id": message.get("id"),
"sender": sender,
"create_time": convert_timestamp(message["create_time"]),
"status": message["status"],
"weight": message["weight"],
"text": text,
"create_time": convert_timestamp(message.get("create_time")),
"status": message.get("status"),
"weight": message.get("weight"),
"text": first_valid_part.strip(),
"model": model,
"chat_id": chat_id,
}


# source: https://github.com/duarteocarmo/mistral-doc/blob/1f909bb4e23c4ae487890072192722c2e86da1f3/process_gpt_export.py#L44
def load_documents(data: dict) -> list[list[dict]]:
def walk_conversation(mapping: dict, current_node_id: str) -> list[dict]:
"""Walks the mapping from current_node back to root and returns messages in order."""
ordered_nodes = []

while current_node_id:
node = mapping.get(current_node_id)
if not node:
break
message = node.get("message")
if message:
ordered_nodes.append((message, current_node_id))
current_node_id = node.get("parent")

# Reverse to get oldest → newest
return list(reversed(ordered_nodes))


def load_documents(data: dict) -> list[dict]:
documents = []
for d in data:
messages = d["mapping"]
messages = [
concatenate_rows(messages[key]["message"], d["id"])
for _, key in enumerate(messages)
]
messages = [message for message in messages if message]
mapping = d["mapping"]
current_node = d.get("current_node")
path = walk_conversation(mapping, current_node)

messages = [concatenate_rows(message, d["id"]) for message, _ in path]
messages = [m for m in messages if m]

document = {
"chat_id": d["id"],
"title": d["title"],
Expand Down
48 changes: 48 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
[pytest-html]
title = Test Report
show_summary = True
show_results_table = True
show_test_summary = True
show_logs = True

[pytest]
# Set the test discovery directory
testpaths = chatgpt_to_sqlite/tests

# Enable assertion rewriting for better debugging
addopts =
--strict-markers
--disable-warnings
--tb=short
--maxfail=300
--durations=5
--capture=no
--log-level=INFO
--log-format="%(asctime)s - %(levelname)s - %(message)s"
--log-date-format="%Y-%m-%d %H:%M:%S"

# Markers for test categorization
markers =
slow: Tests that take a long time to run
integration: Integration tests requiring services
unit: Fast, self-contained tests
regression: Tests for past bugs

# Filter warnings (modify as needed)
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning
ignore::ResourceWarning

# Set log CLI output
log_cli = true
log_cli_level = INFO

# Configure pytest cache
cache_dir = .pytest_cache

# Enable capturing logs
log_file = pytest.log
log_file_level = INFO
log_file_format = %(asctime)s [%(levelname)s] %(message)s
log_file_date_format = %Y-%m-%d %H:%M:%S
25 changes: 25 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from __future__ import annotations

import shutil
from pathlib import Path

import pytest


@pytest.fixture(scope="session")
def chatgpt_db(tmp_path_factory) -> str:
"""Provide a temporary copy of the repository's chatgpt.db for tests.

Returns the filesystem path to the copied database so tests can connect
without depending on CWD or mutating the source DB.
"""
repo_root = Path(__file__).resolve().parents[1]
src = repo_root / "chatgpt.db"
if not src.exists():
raise FileNotFoundError(f"Expected database not found: {src}")

tmp_dir = tmp_path_factory.mktemp("chatgpt_db")
dst = tmp_dir / "chatgpt.db"
shutil.copyfile(src, dst)
return str(dst)

113 changes: 113 additions & 0 deletions tests/test_post_import_sanity_on_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import sqlite3
from datetime import datetime

import pytest

# -- FIXTURE: Load conversations and messages into usable Python dicts --


@pytest.fixture
def get_most_active_conversation_with_messages(chatgpt_db, message_limit=10):
conn = sqlite3.connect(chatgpt_db)
cursor = conn.cursor()

# Get the chat_id of the conversation with most messages
cursor.execute(
"""
SELECT c.chat_id, c.title, c.create_time, c.update_time, COUNT(m.message_id) as msg_count
FROM conversations c
JOIN messages m ON c.chat_id = m.chat_id
GROUP BY c.chat_id
ORDER BY msg_count DESC
LIMIT 1
"""
)
top_convo = cursor.fetchone()

if not top_convo:
print("No conversations found.")
return None

chat_id = top_convo[0]
conversation = {
"chat_id": chat_id,
"title": top_convo[1],
"create_time": top_convo[2],
"update_time": top_convo[3],
"messages": [],
}

# Get messages for that conversation
cursor.execute(
"""
SELECT message_id, sender, create_time, status, weight, text, model
FROM messages
WHERE chat_id = ?
ORDER BY create_time
LIMIT ?
""",
(chat_id, message_limit),
)

messages = cursor.fetchall()

for msg in messages:
conversation["messages"].append(
{
"message_id": msg[0],
"sender": msg[1],
"create_time": msg[2],
"status": msg[3],
"weight": msg[4],
"text": msg[5],
"model": msg[6],
}
)

conn.close()
return conversation


def test_preview_documents(get_most_active_conversation_with_messages):
documents = get_most_active_conversation_with_messages
if not documents:
print("No conversations found.")
return

convo = documents
print(f"\n📘 Conversation: {convo['title']}")
print("-" * 60)
for msg in convo["messages"]:
sender = msg["sender"]
text = msg["text"]
print(f"[{sender.upper()}]: {text}\n")


def test_sanity_check_db(chatgpt_db):
conn = sqlite3.connect(chatgpt_db)
cursor = conn.cursor()

cursor.execute("SELECT COUNT(*) FROM conversations")
conv_count = cursor.fetchone()[0]
print(f"🗂️ Total conversations: {conv_count}")

cursor.execute("SELECT COUNT(*) FROM messages")
msg_count = cursor.fetchone()[0]
print(f"💬 Total messages: {msg_count}")

cursor.execute(
"""
SELECT c.title, c.chat_id, COUNT(m.message_id) as message_count
FROM conversations c
LEFT JOIN messages m ON c.chat_id = m.chat_id
GROUP BY c.chat_id
ORDER BY message_count DESC
LIMIT 20
"""
)
rows = cursor.fetchall()
print("\n📊 Top 20 conversations by message count:")
for title, chat_id, count in rows:
print(f" - {title[:40]:40} | ID: {chat_id[:8]}... | {count} messages")

conn.close()
Loading