-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathCommitManager.py
More file actions
179 lines (158 loc) · 7.93 KB
/
CommitManager.py
File metadata and controls
179 lines (158 loc) · 7.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# Copyright © 2025 Dr.-Ing. Paul Wilhelm <paul@wilhelm.dev>
# This file is part of Archive Agent. See LICENSE for details.
import asyncio
import time
import typer
from archive_agent.ai.AiManagerFactory import AiManagerFactory
from archive_agent.config.DecoderSettings import DecoderSettings
from archive_agent.core.CliManager import CliManager
from archive_agent.core.IngestionManager import IngestionManager
from archive_agent.core.lock import file_lock
from archive_agent.data.FileData import FileData
from archive_agent.db.QdrantManager import QdrantManager
from archive_agent.util.format import format_file
from archive_agent.watchlist.WatchlistManager import TrackedFiles, WatchlistManager
class CommitManager:
"""
Commit manager.
"""
def __init__(
self,
cli: CliManager,
watchlist: WatchlistManager,
ai_factory: AiManagerFactory,
decoder_settings: DecoderSettings,
qdrant: QdrantManager,
progress_manager,
max_workers_ingest: int,
max_workers_vision: int,
max_workers_embed: int,
):
"""
Initialize commit manager.
:param cli: CLI manager.
:param watchlist: Watchlist manager.
:param ai_factory: AI manager factory.
:param decoder_settings: Decoder settings.
:param qdrant: Qdrant manager.
:param progress_manager: Progress manager from ContextManager.
:param max_workers_ingest: Max. workers for IngestionManager.
:param max_workers_vision: Max. workers for VisionProcessor.
:param max_workers_embed: Max. workers for EmbedProcessor.
"""
self.cli = cli
self.watchlist = watchlist
self.ai_factory = ai_factory
self.decoder_settings = decoder_settings
self.qdrant = qdrant
self.progress_manager = progress_manager
self.ingestion = IngestionManager(cli, progress_manager, max_workers=max_workers_ingest)
self.max_workers_vision = max_workers_vision
self.max_workers_embed = max_workers_embed
@file_lock("archive_agent_watchlist")
def commit(self, confirm_delete: bool) -> None:
"""
Commit all tracked files.
:param confirm_delete: Automatically confirm deleting untracked files from the database.
"""
# Added files
added_files = self.watchlist.get_diff_files(self.watchlist.DIFF_ADDED)
if len(added_files) == 0:
self.cli.logger.info(f"No added files to commit")
else:
self.cli.logger.info(f"Committing ({len(added_files)}) added file(s)...")
self.cli.logger.info(f"⌛ Preparing documents for ingestion...please stand by")
self.commit_diff(added_files)
# Changed files
changed_files = self.watchlist.get_diff_files(self.watchlist.DIFF_CHANGED)
if len(changed_files) == 0:
self.cli.logger.info(f"No changed files to commit")
else:
self.cli.logger.info(f"Committing ({len(changed_files)}) changed file(s)...")
self.cli.logger.info(f"⌛ Preparing documents for ingestion...please stand by")
self.commit_diff(changed_files)
# Removed files
removed_files = self.watchlist.get_diff_files(self.watchlist.DIFF_REMOVED)
if len(removed_files) == 0:
self.cli.logger.info(f"No removed files to commit")
else:
self.cli.logger.info(f"Committing ({len(removed_files)}) removed file(s)...")
for file in removed_files.keys():
self.cli.logger.info(f"- TO BE REMOVED {format_file(file)}")
if confirm_delete:
self.cli.logger.warning(
f"Removing any data associated with "
f"({len(removed_files)}) untracked file(s) "
f"from the Qdrant database."
)
confirm = True
else:
self.cli.logger.warning(
f"You are about to remove any data associated with "
f"({len(removed_files)}) untracked file(s) "
f"from the Qdrant database."
)
confirm = typer.confirm(
f"👉 Delete files from the Qdrant database?"
)
if not confirm:
self.cli.logger.warning(f"({len(removed_files)}) untracked file(s) remain in the Qdrant database")
else:
self.commit_diff(removed_files)
if len(added_files) > 0 or len(changed_files) > 0 or len(removed_files) > 0:
self.cli.logger.info(f"✅ Commit completed:")
self.cli.logger.info(f"- ({len(added_files)}) file(s) added to Qdrant database")
self.cli.logger.info(f"- ({len(changed_files)}) file(s) updated in Qdrant database")
self.cli.logger.info(f"- ({len(removed_files)}) file(s) removed from Qdrant database")
def commit_diff(self, tracked_files: TrackedFiles) -> None:
"""
Commit tracked files.
:param tracked_files: Tracked files.
"""
tracked_file_data = []
# Populate list of file data objects, each with AI factory for parallel processing.
for file_path, file_meta in tracked_files.items():
file_data = FileData(
ai_factory=self.ai_factory,
decoder_settings=self.decoder_settings,
file_path=file_path,
file_meta=file_meta,
max_workers_vision=self.max_workers_vision,
max_workers_embed=self.max_workers_embed,
)
tracked_file_data.append(file_data)
tracked_unprocessable = [file_data for file_data in tracked_file_data if not file_data.is_processable()]
tracked_processable = [file_data for file_data in tracked_file_data if file_data.is_processable()]
for file_data in tracked_unprocessable:
self.cli.logger.warning(f"IGNORING unprocessable {format_file(file_data.file_path)}")
if file_data.file_meta['diff'] == self.watchlist.DIFF_REMOVED:
_success = asyncio.run(self.qdrant.remove(file_data))
self.watchlist.diff_mark_resolved(file_data)
files_to_process_in_parallel = [
fd for fd in tracked_processable if fd.file_meta['diff'] in [self.watchlist.DIFF_ADDED, self.watchlist.DIFF_CHANGED]
]
files_to_process_sequentially = [
fd for fd in tracked_processable if fd.file_meta['diff'] == self.watchlist.DIFF_REMOVED
]
ingest_t0 = time.monotonic()
processed_results = self.ingestion.process_files_parallel(files_to_process_in_parallel)
self.cli.logger.info(f"Ingestion completed in {time.monotonic() - ingest_t0:.1f}s")
qdrant_t0 = time.monotonic()
for file_data, success in processed_results:
if not success:
self.cli.logger.warning(f"Failed to process {format_file(file_data.file_path)}")
continue
db_t0 = time.monotonic()
if file_data.file_meta['diff'] == self.watchlist.DIFF_ADDED:
if asyncio.run(self.qdrant.add(file_data)):
self.watchlist.diff_mark_resolved(file_data)
elif file_data.file_meta['diff'] == self.watchlist.DIFF_CHANGED:
if asyncio.run(self.qdrant.change(file_data)):
self.watchlist.diff_mark_resolved(file_data)
self.cli.logger.info(f"Qdrant update for {format_file(file_data.file_path)} in {time.monotonic() - db_t0:.1f}s")
for file_data in files_to_process_sequentially:
db_t0 = time.monotonic()
if asyncio.run(self.qdrant.remove(file_data)):
self.watchlist.diff_mark_resolved(file_data)
self.cli.logger.info(f"Qdrant remove for {format_file(file_data.file_path)} in {time.monotonic() - db_t0:.1f}s")
self.cli.logger.info(f"All Qdrant operations completed in {time.monotonic() - qdrant_t0:.1f}s")