Avoid large Dask graph when checkpointing flat views#49
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates store_flat_views to avoid building a large Dask task graph by saving pandas “flat view” DataFrames directly (locally) instead of submitting per-view Dask tasks.
Changes:
- Replaced
dask_client.submit(...)calls with direct_save_flat_view(...)execution. - Removed the collection/return of Dask futures; method now returns an empty list.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def store_flat_views(self, flat_views: Dict[ViewKey, pd.DataFrame]): | ||
| store_flat_view_tasks = [] | ||
| for view_key in flat_views: | ||
| flat_view_checkpoint_name = self.get_checkpoint_name(CHECKPOINT_FLAT_VIEW, *list(view_key)) | ||
| flat_view_checkpoint_path = self.get_checkpoint_path(name=flat_view_checkpoint_name) | ||
| if self.has_checkpoint(name=flat_view_checkpoint_name): | ||
| continue | ||
| store_flat_view_tasks.append( | ||
| self.dask_client.submit( | ||
| self._save_flat_view, | ||
| view=flat_views[view_key], | ||
| view_path=flat_view_checkpoint_path, | ||
| ) | ||
| # Save local pandas flat views directly to avoid shipping large payloads | ||
| # through Dask task graphs. | ||
| self._save_flat_view( | ||
| view=flat_views[view_key], | ||
| view_path=flat_view_checkpoint_path, | ||
| ) | ||
| return store_flat_view_tasks | ||
| return [] |
There was a problem hiding this comment.
This changes the method’s externally observable behavior: previously it returned a list of Dask futures (allowing async execution and error handling via futures), but it now always returns an empty list even though work was performed. If callers rely on the returned tasks to wait/collect exceptions, they’ll silently lose that behavior. Consider either (a) changing the return type to None and updating callers, or (b) returning a meaningful result (e.g., paths written / count written) so callers can still coordinate.
| self._save_flat_view( | ||
| view=flat_views[view_key], | ||
| view_path=flat_view_checkpoint_path, | ||
| ) |
There was a problem hiding this comment.
Saving views is now fully sequential on the caller thread. If flat_views can be large, this may noticeably increase wall-clock time compared to parallel Dask submission. If parallelism is still desired without shipping DataFrames through Dask, consider using a local thread/process pool (or a configurable strategy) that runs _save_flat_view concurrently while keeping data local.
hariharan-devarajan
left a comment
There was a problem hiding this comment.
Looks good to me.
This pull request simplifies the
store_flat_viewsmethod inanalyzer.pyby changing how flat views are saved. Instead of submitting tasks to Dask for saving each view, the method now saves pandas DataFrames directly, which avoids sending large data through Dask and improves performance for local data.Flat view storage optimization:
store_flat_viewsto save pandas flat views directly instead of using Dask tasks, reducing overhead and avoiding large payload transfers.