From 7214ad30272938bd659ae4fba2e1faeba9511221 Mon Sep 17 00:00:00 2001 From: Ameen Date: Thu, 14 May 2026 08:37:49 +0000 Subject: [PATCH 01/26] feat: add configurable automatic field alias mapping --- .../lightning_field_mapping/__init__.py | 0 .../lightning_field_mapping.js | 71 ++++++++++++ .../lightning_field_mapping.json | 52 +++++++++ .../lightning_field_mapping.py | 9 ++ .../test_lightning_field_mapping.py | 9 ++ .../doctype/lightning_fields/__init__.py | 0 .../lightning_fields/lightning_fields.json | 39 +++++++ .../lightning_fields/lightning_fields.py | 9 ++ .../lightning_upload/lightning_upload.js | 103 ++++++++---------- .../lightning_upload/lightning_upload.py | 59 +++++++++- .../lightning_upload_settings.json | 2 +- 11 files changed, 289 insertions(+), 64 deletions(-) create mode 100644 lightning_import/lightning_import/doctype/lightning_field_mapping/__init__.py create mode 100644 lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.js create mode 100644 lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.json create mode 100644 lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.py create mode 100644 lightning_import/lightning_import/doctype/lightning_field_mapping/test_lightning_field_mapping.py create mode 100644 lightning_import/lightning_import/doctype/lightning_fields/__init__.py create mode 100644 lightning_import/lightning_import/doctype/lightning_fields/lightning_fields.json create mode 100644 lightning_import/lightning_import/doctype/lightning_fields/lightning_fields.py diff --git a/lightning_import/lightning_import/doctype/lightning_field_mapping/__init__.py b/lightning_import/lightning_import/doctype/lightning_field_mapping/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.js b/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.js new file mode 100644 index 0000000..2879671 --- /dev/null +++ b/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.js @@ -0,0 +1,71 @@ +// Copyright (c) 2025, Tridz Technologies Pvt Ltd and contributors +// For license information, please see license.txt + +frappe.ui.form.on('Lightning Field Mapping', { + refresh: function(frm) { + if (frm.doc.reference_doctype) { + frm.trigger('set_field_options'); + } + }, + + reference_doctype: function(frm) { + // Clear existing rows when changing doctype to avoid invalid mappings + if (frm.doc.mappings && frm.doc.mappings.length > 0) { + frm.clear_table('mappings'); + frm.refresh_field('mappings'); + } + frm.trigger('set_field_options'); + }, + + set_field_options: function(frm) { + if (frm.doc.reference_doctype) { + frappe.call({ + method: 'lightning_import.lightning_import.api.get_fields.get_doctype_fields', + args: { doctype: frm.doc.reference_doctype }, + callback: function(r) { + if (r.message && r.message.fields) { + const options = [''].concat(r.message.fields.map(f => f.fieldname)).join('\n'); + + // 1. Update base metadata + let base_df = frappe.meta.get_docfield('Lightning Fields', 'field_name'); + if (base_df) { + base_df.options = options; + } + + // 2. Update form-specific metadata + let form_df = frappe.meta.get_docfield('Lightning Fields', 'field_name', frm.doc.name); + if (form_df) { + form_df.options = options; + } + + // 3. Update grid column properties directly + if (frm.fields_dict.mappings && frm.fields_dict.mappings.grid) { + frm.fields_dict.mappings.grid.update_docfield_property('field_name', 'options', options); + } + + frm.refresh_field('mappings'); + } + } + }); + } else { + // Clear options if no doctype selected + let base_df = frappe.meta.get_docfield('Lightning Fields', 'field_name'); + if (base_df) base_df.options = ""; + + let form_df = frappe.meta.get_docfield('Lightning Fields', 'field_name', frm.doc.name); + if (form_df) form_df.options = ""; + + if (frm.fields_dict.mappings && frm.fields_dict.mappings.grid) { + frm.fields_dict.mappings.grid.update_docfield_property('field_name', 'options', ""); + } + frm.refresh_field('mappings'); + } + } +}); + +frappe.ui.form.on('Lightning Fields', { + mappings_add: function(frm, cdt, cdn) { + // Ensure options are set when a new row is added + frm.trigger('set_field_options'); + } +}); diff --git a/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.json b/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.json new file mode 100644 index 0000000..324d187 --- /dev/null +++ b/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.json @@ -0,0 +1,52 @@ +{ + "actions": [], + "allow_rename": 1, + "creation": "2026-05-14 11:00:52.970656", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "reference_doctype", + "mappings" + ], + "fields": [ + { + "fieldname": "reference_doctype", + "fieldtype": "Link", + "label": "Reference DocType", + "options": "DocType" + }, + { + "fieldname": "mappings", + "fieldtype": "Table", + "label": "Mappings", + "options": "Lightning Fields" + } + ], + "grid_page_length": 50, + "index_web_pages_for_search": 1, + "links": [], + "modified": "2026-05-14 11:02:04.933447", + "modified_by": "Administrator", + "module": "Lightning Import", + "name": "Lightning Field Mapping", + "owner": "Administrator", + "permissions": [ + { + "create": 1, + "delete": 1, + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "System Manager", + "share": 1, + "write": 1 + } + ], + "row_format": "Dynamic", + "rows_threshold_for_grid_search": 20, + "sort_field": "modified", + "sort_order": "DESC", + "states": [] +} \ No newline at end of file diff --git a/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.py b/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.py new file mode 100644 index 0000000..4e0c5c5 --- /dev/null +++ b/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.py @@ -0,0 +1,9 @@ +# Copyright (c) 2026, Tridz Technologies Pvt Ltd and contributors +# For license information, please see license.txt + +# import frappe +from frappe.model.document import Document + + +class LightningFieldMapping(Document): + pass diff --git a/lightning_import/lightning_import/doctype/lightning_field_mapping/test_lightning_field_mapping.py b/lightning_import/lightning_import/doctype/lightning_field_mapping/test_lightning_field_mapping.py new file mode 100644 index 0000000..f393ff8 --- /dev/null +++ b/lightning_import/lightning_import/doctype/lightning_field_mapping/test_lightning_field_mapping.py @@ -0,0 +1,9 @@ +# Copyright (c) 2026, Tridz Technologies Pvt Ltd and Contributors +# See license.txt + +# import frappe +from frappe.tests.utils import FrappeTestCase + + +class TestLightningFieldMapping(FrappeTestCase): + pass diff --git a/lightning_import/lightning_import/doctype/lightning_fields/__init__.py b/lightning_import/lightning_import/doctype/lightning_fields/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lightning_import/lightning_import/doctype/lightning_fields/lightning_fields.json b/lightning_import/lightning_import/doctype/lightning_fields/lightning_fields.json new file mode 100644 index 0000000..2436ba4 --- /dev/null +++ b/lightning_import/lightning_import/doctype/lightning_fields/lightning_fields.json @@ -0,0 +1,39 @@ +{ + "actions": [], + "allow_rename": 1, + "creation": "2026-05-13 12:51:05.099561", + "doctype": "DocType", + "editable_grid": 1, + "engine": "InnoDB", + "field_order": [ + "field_name", + "alternate_name" + ], + "fields": [ + { + "fieldname": "field_name", + "fieldtype": "Select", + "label": "Field Name" + }, + { + "fieldname": "alternate_name", + "fieldtype": "Data", + "label": "Alternate Name" + } + ], + "grid_page_length": 50, + "index_web_pages_for_search": 1, + "istable": 1, + "links": [], + "modified": "2026-05-14 11:02:40.388976", + "modified_by": "Administrator", + "module": "Lightning Import", + "name": "Lightning Fields", + "owner": "Administrator", + "permissions": [], + "row_format": "Dynamic", + "rows_threshold_for_grid_search": 20, + "sort_field": "modified", + "sort_order": "DESC", + "states": [] +} \ No newline at end of file diff --git a/lightning_import/lightning_import/doctype/lightning_fields/lightning_fields.py b/lightning_import/lightning_import/doctype/lightning_fields/lightning_fields.py new file mode 100644 index 0000000..46c6d51 --- /dev/null +++ b/lightning_import/lightning_import/doctype/lightning_fields/lightning_fields.py @@ -0,0 +1,9 @@ +# Copyright (c) 2026, Tridz Technologies Pvt Ltd and contributors +# For license information, please see license.txt + +# import frappe +from frappe.model.document import Document + + +class LightningFields(Document): + pass diff --git a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.js b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.js index f3ba7c0..28e1fb6 100644 --- a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.js +++ b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.js @@ -20,10 +20,10 @@ frappe.realtime.on('socket_disconnected', () => { }); frappe.ui.form.on('Lightning Upload', { - refresh: function(frm) { + refresh: function (frm) { // Store reference to current form frappe.progress_state.current_form = frm; - + // Show Start Import button only when status is Draft and document is saved if (!frm.is_new() && frm.doc.status === "Draft") { frm.page.set_primary_action(__('Start Import'), () => { @@ -71,23 +71,23 @@ frappe.ui.form.on('Lightning Upload', { }); }, - onload: function(frm) { + onload: function (frm) { // Store reference to current form frappe.progress_state.current_form = frm; - + // Set up progress tracking when form loads if import is in progress if (frm.doc.status === 'Queued' || frm.doc.status === 'In Progress') { setup_progress_tracking(frm); } }, - csv_file: function(frm) { + csv_file: function (frm) { if (frm.doc.import_type === 'Insert and Update Records') { if (frm.doc.csv_file) { frm.events.populate_update_on_field(frm); } } - + frappe.db.get_single_value('Lightning Upload Settings', 'enable_file_duplicate_check').then(enabled => { if (enabled && frm.doc.csv_file) { frm.events.populate_duplicate_check_field(frm); @@ -95,7 +95,7 @@ frappe.ui.form.on('Lightning Upload', { }); }, - import_type: function(frm) { + import_type: function (frm) { if (frm.doc.import_type === 'Insert and Update Records') { if (frm.doc.csv_file) { frm.events.populate_update_on_field(frm); @@ -105,11 +105,11 @@ frappe.ui.form.on('Lightning Upload', { } }, - populate_duplicate_check_field: function(frm) { + populate_duplicate_check_field: function (frm) { frappe.call({ method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.get_csv_headers_for_upload', args: { file_url: frm.doc.csv_file }, - callback: function(r) { + callback: function (r) { if (r.message && r.message.status === 'success') { const headers = r.message.headers; const options = [''].concat(headers); @@ -120,12 +120,12 @@ frappe.ui.form.on('Lightning Upload', { }); }, - populate_update_on_field: function(frm) { + populate_update_on_field: function (frm) { // Fetch CSV headers from the backend frappe.call({ method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.get_csv_headers_for_upload', args: { file_url: frm.doc.csv_file }, - callback: function(r) { + callback: function (r) { if (r.message && r.message.status === 'success') { const headers = r.message.headers; // Prepend a blank option @@ -140,9 +140,9 @@ frappe.ui.form.on('Lightning Upload', { }); // Global event handler for import progress -frappe.realtime.on('import_progress', function(data) { +frappe.realtime.on('import_progress', function (data) { console.log('[Lightning Import] Received import_progress event:', data); - + const frm = frappe.progress_state.current_form; if (!frm) { console.log('[Lightning Import] No form found in progress_state'); @@ -169,7 +169,7 @@ frappe.realtime.on('import_progress', function(data) { function setup_progress_tracking(frm) { console.log('[Lightning Import] Setting up progress tracking for form:', frm.doc.name); - + // Clear any existing progress bar if (frm.progress_bar) { console.log('[Lightning Import] Removing existing progress bar'); @@ -196,12 +196,12 @@ function update_progress(frm, data) { data: data, hasProgressBar: !!frm.progress_bar }); - + if (!data || !frm.progress_bar) { console.log('[Lightning Import] Missing data or progress bar, skipping update'); return; } - + // Update progress bar console.log('[Lightning Import] Updating progress bar to:', data.progress + '%'); frm.progress_bar.find('.progress-bar') @@ -275,20 +275,20 @@ function update_progress(frm, data) { frm.progress_bar.remove(); frm.progress_bar = null; } - + // Update buttons based on status if (data.status === 'Failed' || data.status === 'Partial Success') { // Check if the Export Error Rows button already exists - const hasExportButton = frm.page.custom_buttons && + const hasExportButton = frm.page.custom_buttons && frm.page.custom_buttons.some(btn => btn.label === __('Export Error Rows')); - + if (!hasExportButton) { frm.add_custom_button(__('Export Error Rows'), () => { export_error_rows(frm); }); } } - + // Refresh primary action button if (data.status === 'Draft') { frm.page.set_primary_action(__('Start Import'), () => { @@ -325,7 +325,7 @@ function start_import(frm) { docname: frm.doc.name, mapping: mapping_json }, - callback: function(r) { + callback: function (r) { if (r.message && r.message.status === 'success') { frappe.show_alert({ message: r.message.message, @@ -355,7 +355,7 @@ function start_import(frm) { }, freeze: true, freeze_message: __('Checking for duplicates in file...'), - callback: function(r) { + callback: function (r) { if (!r.message || r.message.status === 'error') { // If duplicate check itself fails, still allow continuing console.warn('[Lightning Import] Duplicate check failed:', r.message && r.message.message); @@ -446,7 +446,7 @@ function start_import(frm) { frappe.call({ method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.auto_map_and_validate', args: { docname: frm.doc.name }, - callback: function(r) { + callback: function (r) { if (!r.message) { frappe.msgprint(__('Error during auto-mapping. Please map fields manually.')); return; @@ -493,7 +493,7 @@ function export_error_rows(frm) { args: { docname: frm.doc.name }, - callback: function(r) { + callback: function (r) { if (r.message && r.message.status === 'success') { window.open(r.message.file_url, '_blank'); } else { @@ -511,7 +511,7 @@ function open_field_mapping_dialog(frm) { frappe.call({ method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.get_csv_headers_for_upload', args: { file_url: frm.doc.csv_file }, - callback: function(csvRes) { + callback: function (csvRes) { if (!csvRes.message || csvRes.message.status !== 'success') { frappe.show_alert({ message: csvRes.message ? csvRes.message.message : __('Failed to fetch CSV headers'), indicator: 'red' }); return; @@ -526,51 +526,38 @@ function open_field_mapping_dialog(frm) { frappe.show_alert({ message: __('Failed to fetch DocType fields'), indicator: 'red' }); return; } - + const fieldOptions = dtRes.message.fields || []; - - const normalize = str => (typeof str === 'string' ? str.toLowerCase().replace(/[\s_]+/g, '') : ''); - - const normalizedFieldMap = {}; - fieldOptions.forEach(f => { - if (f.fieldname) { - normalizedFieldMap[normalize(f.fieldname)] = f.fieldname; - if (f.label) { - normalizedFieldMap[normalize(f.label)] = f.fieldname; - } - } - }); - normalizedFieldMap['id'] = 'name'; - normalizedFieldMap['name'] = 'first_name' - - frappe.model.with_doctype(frm.doc.import_doctype, () => { + + frappe.model.with_doctype(frm.doc.import_doctype, async () => { const meta = frappe.get_meta(frm.doc.import_doctype); const requiredFields = meta.fields.filter(f => f.reqd).map(f => f.fieldname); - + let existingMapping = {}; try { if (frm.doc.field_mapping) { existingMapping = JSON.parse(frm.doc.field_mapping); } - } catch (e) { + } catch (e) { console.log('Error parsing existing mapping:', e); } - + + // Fetch auto-mapping from backend (including aliases) + const auto_mapping_res = await frappe.xcall('lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.auto_map_and_validate', { docname: frm.doc.name }); + const backend_mapping = auto_mapping_res ? auto_mapping_res.mapping : {}; + const mapping = {}; csvHeaders.forEach(header => { - const normalizedHeader = normalize(header); if (existingMapping[header]) { mapping[header] = existingMapping[header]; - } else if (normalizedFieldMap[normalizedHeader]) { - mapping[header] = normalizedFieldMap[normalizedHeader]; } else { - mapping[header] = ''; + mapping[header] = backend_mapping[header] || ''; } }); - + let tableHtml = `
Map columns from ${frappe.utils.escape_html(frm.doc.csv_file.split('/').pop())} to fields in ${frappe.utils.escape_html(frm.doc.import_doctype)}
`; tableHtml += ``; - + csvHeaders.forEach(header => { tableHtml += ``; tableHtml += ``; }); - + tableHtml += `
CSV ColumnDocType Field
`; - + const d = new frappe.ui.Dialog({ title: __('Map Columns'), fields: [ @@ -602,20 +589,20 @@ function open_field_mapping_dialog(frm) { values[header] = value; }); - + const mappedFields = Object.values(values).filter(Boolean); const unmappedRequired = requiredFields.filter(f => !mappedFields.includes(f)); if (unmappedRequired.length) { frappe.msgprint(__('Please map all required fields: {0}', [unmappedRequired.join(', ')])); return; } - + const duplicates = mappedFields.filter((item, idx) => mappedFields.indexOf(item) !== idx); if (duplicates.length) { frappe.msgprint(__('Duplicate mapping for: {0}', [duplicates.join(', ')])); return; } - + frappe.call({ method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.save_field_mapping', args: { @@ -625,7 +612,7 @@ function open_field_mapping_dialog(frm) { callback: function (res) { if (res.message && res.message.status === 'success') { d.hide(); - frm.reload_doc(); + frm.reload_doc(); frappe.show_alert({ message: __('Field mapping saved.'), indicator: 'green' }); } else { frappe.show_alert({ message: res.message?.message || __('Failed to save mapping'), indicator: 'red' }); @@ -634,7 +621,7 @@ function open_field_mapping_dialog(frm) { }); } }); - + d.show(); }); } diff --git a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py index 96e0b79..de31ce8 100644 --- a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py +++ b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py @@ -573,6 +573,19 @@ def process_import_queue(docname): "message": str(e) } +def normalize_column(value): + if not value: + return "" + + return ( + str(value) + .strip() + .lower() + .replace(" ", "") + .replace("_", "") + .replace("-", "") + ) + @frappe.whitelist() def auto_map_and_validate(docname): """ @@ -591,23 +604,59 @@ def auto_map_and_validate(docname): detailed_fields = get_detailed_doctype_fields(doc.import_doctype) + mapping_docs = frappe.get_all( + "Lightning Field Mapping", + filters={ + "reference_doctype": doc.import_doctype + }, + fields=["name"] + ) + + alias_map = {} + + for mapping_doc in mapping_docs: + mapping = frappe.get_doc( + "Lightning Field Mapping", + mapping_doc.name + ) + + for row in mapping.mappings: + if row.alternate_name and row.field_name: + alias_map[ + normalize_column(row.alternate_name) + ] = row.field_name + + frappe.logger().info(f"Alias Map loaded: {alias_map}") + normalized_field_map = {} - def normalize(s): + def normalize_legacy(s): return s.lower().replace("_", " ").replace("-", " ") for f in detailed_fields: if f.get('fieldname'): - normalized_field_map[normalize(f.get('fieldname'))] = f.get('fieldname') + normalized_field_map[normalize_legacy(f.get('fieldname'))] = f.get('fieldname') if f.get('label'): - normalized_field_map[normalize(f.get('label'))] = f.get('fieldname') + normalized_field_map[normalize_legacy(f.get('label'))] = f.get('fieldname') normalized_field_map['id'] = 'name' normalized_field_map['name'] = 'first_name' auto_mapping = {} for header in csv_headers: - normalized_header = normalize(header) - auto_mapping[header] = normalized_field_map.get(normalized_header, "") + # Step 1: Exact fieldname or label match (Legacy) + header_norm_legacy = normalize_legacy(header) + mapped_field = normalized_field_map.get(header_norm_legacy, "") + + # Step 2: Alias match (runs AFTER existing matches) + header_norm_column = normalize_column(header) + frappe.logger().info(f"Processing header: '{header}' | Legacy Norm: '{header_norm_legacy}' | Column Norm: '{header_norm_column}'") + + if not mapped_field: + mapped_field = alias_map.get(header_norm_column) + if mapped_field: + frappe.logger().info(f"Matched alias for '{header}': {mapped_field}") + + auto_mapping[header] = mapped_field or "" mapped_fields = [v for v in auto_mapping.values() if v] unmapped_required = [f for f in required_fields if f not in mapped_fields] diff --git a/lightning_import/lightning_import/doctype/lightning_upload_settings/lightning_upload_settings.json b/lightning_import/lightning_import/doctype/lightning_upload_settings/lightning_upload_settings.json index a186422..71b4db6 100644 --- a/lightning_import/lightning_import/doctype/lightning_upload_settings/lightning_upload_settings.json +++ b/lightning_import/lightning_import/doctype/lightning_upload_settings/lightning_upload_settings.json @@ -36,7 +36,7 @@ "index_web_pages_for_search": 1, "issingle": 1, "links": [], - "modified": "2026-05-07 19:10:24.149957", + "modified": "2026-05-14 11:02:16.027165", "modified_by": "Administrator", "module": "Lightning Import", "name": "Lightning Upload Settings", From ddefe01825f6258d40bb21b48bf0a0d5ebb6fa78 Mon Sep 17 00:00:00 2001 From: Ameen Date: Thu, 14 May 2026 08:37:57 +0000 Subject: [PATCH 02/26] fix: prevent timestamp mismatch during import processing --- .../lightning_upload/lightning_upload.py | 67 +++++++++++-------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py index de31ce8..a837ec1 100644 --- a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py +++ b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py @@ -376,8 +376,7 @@ def process_import_queue(docname): doc = frappe.get_doc("Lightning Upload", docname) # Update initial status and total records - doc.status = "In Progress" - doc.save(ignore_permissions=True, ignore_version=True) + frappe.db.set_value("Lightning Upload", docname, "status", "In Progress") frappe.db.commit() progress_key = f"lightning_import_{docname}" @@ -398,9 +397,8 @@ def process_import_queue(docname): csv_data = doc.get_mapped_data() csv_time = round((time.time() - csv_start) * 1000, 2) total_rows = len(csv_data) - # Update total records on the doc object - doc.total_records = total_rows - doc.save(ignore_permissions=True, ignore_version=True) + # Update total records + frappe.db.set_value("Lightning Upload", docname, "total_records", total_rows) frappe.db.commit() frappe.publish_realtime( event='import_progress', @@ -448,11 +446,17 @@ def process_import_queue(docname): 'failed': len(result['failed_rows']) }) - # --- Update doc properties in memory and save once per batch --- - doc.successful_records = successful_records - doc.failed_records = failed_records - doc.last_processed_row = i + len(batch) - doc.save(ignore_permissions=True, ignore_version=True) + # --- Update doc properties using set_value to avoid TimestampMismatchError --- + frappe.db.set_value( + "Lightning Upload", + docname, + { + "successful_records": successful_records, + "failed_records": failed_records, + "last_processed_row": i + len(batch) + }, + update_modified=False + ) frappe.db.commit() progress_data = { @@ -484,7 +488,7 @@ def process_import_queue(docname): if all_failed_rows: error_start = time.time() doc.error_log = json.dumps(all_failed_rows, indent=2) - doc.save(ignore_permissions=True, ignore_version=True) + frappe.db.set_value("Lightning Upload", docname, "error_log", doc.error_log) doc.generate_error_file(all_failed_rows) error_file_time = round((time.time() - error_start) * 1000, 2) @@ -495,18 +499,23 @@ def process_import_queue(docname): else: final_status = "Completed" - # --- Final updates on the doc object before the final save --- - doc.status = final_status - doc.import_time = time_str - doc.timing_details = json.dumps({ - "total_time_seconds": round(time_taken, 2), - "csv_load_time_ms": csv_time, - "error_file_time_ms": error_file_time, - "batch_timings": batch_timings, - "average_batch_time_ms": round(sum(b['total_time_ms'] for b in batch_timings) / len(batch_timings), 2) if batch_timings else 0, - "average_insert_time_ms": round(sum(b['insert_time_ms'] for b in batch_timings) / len(batch_timings), 2) if batch_timings else 0 - }, indent=2) - doc.save(ignore_permissions=True, ignore_version=True) + # --- Final updates using set_value --- + frappe.db.set_value( + "Lightning Upload", + docname, + { + "status": final_status, + "import_time": time_str, + "timing_details": json.dumps({ + "total_time_seconds": round(time_taken, 2), + "csv_load_time_ms": csv_time, + "error_file_time_ms": error_file_time, + "batch_timings": batch_timings, + "average_batch_time_ms": round(sum(b['total_time_ms'] for b in batch_timings) / len(batch_timings), 2) if batch_timings else 0, + "average_insert_time_ms": round(sum(b['insert_time_ms'] for b in batch_timings) / len(batch_timings), 2) if batch_timings else 0 + }, indent=2) + } + ) frappe.db.commit() final_progress = { @@ -546,10 +555,14 @@ def process_import_queue(docname): except Exception as e: frappe.log_error(frappe.get_traceback(), "Lightning Import Error") try: - doc = frappe.get_doc("Lightning Upload", docname) # Re-fetch in case of an error to ensure we have the latest state - doc.status = "Failed" - doc.error_log = str(e) - doc.save(ignore_permissions=True, ignore_version=True) + frappe.db.set_value( + "Lightning Upload", + docname, + { + "status": "Failed", + "error_log": str(e) + } + ) frappe.db.commit() progress_key = f"lightning_import_{docname}" error_progress = { From a87047f8ba18e8687112cb24e6304eabb09d867b Mon Sep 17 00:00:00 2001 From: Ameen Date: Mon, 18 May 2026 07:07:58 +0000 Subject: [PATCH 03/26] update field mapping schema --- .../lightning_field_mapping/lightning_field_mapping.json | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.json b/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.json index 324d187..4d4384d 100644 --- a/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.json +++ b/lightning_import/lightning_import/doctype/lightning_field_mapping/lightning_field_mapping.json @@ -1,6 +1,7 @@ { "actions": [], "allow_rename": 1, + "autoname": "LFM-.#####", "creation": "2026-05-14 11:00:52.970656", "doctype": "DocType", "engine": "InnoDB", @@ -25,10 +26,11 @@ "grid_page_length": 50, "index_web_pages_for_search": 1, "links": [], - "modified": "2026-05-14 11:02:04.933447", + "modified": "2026-05-14 12:47:06.768504", "modified_by": "Administrator", "module": "Lightning Import", "name": "Lightning Field Mapping", + "naming_rule": "Expression", "owner": "Administrator", "permissions": [ { From b0857c4a1f37e0da5b7f26c1a1971c6e0fb1b8a3 Mon Sep 17 00:00:00 2001 From: Ameen Date: Wed, 20 May 2026 11:02:17 +0000 Subject: [PATCH 04/26] refactor: modularize backend bulk import engine and helper utilities --- .../lightning_upload/lightning_upload.py | 871 ++++++++++-------- 1 file changed, 491 insertions(+), 380 deletions(-) diff --git a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py index a837ec1..f641f73 100644 --- a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py +++ b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py @@ -15,15 +15,398 @@ import time import random +# ========================================== +# FILE HELPERS +# ========================================== + +def get_raw_sheet_rows(doc): + """Get raw CSV rows from the document's uploaded sheet""" + if not doc.csv_file: + frappe.throw(_("No CSV file attached")) + + try: + file_doc = frappe.get_doc("File", {"file_url": doc.csv_file}) + file_path = file_doc.get_full_path() + except Exception as e: + frappe.log_error(frappe.get_traceback(), "Lightning Import File Access Error") + frappe.throw(_("Error accessing file: {}".format(str(e)))) + + try: + with open(file_path, 'r', encoding='utf-8') as csvfile: + reader = csv.DictReader(csvfile) + return list(reader) + except Exception as e: + frappe.throw(_("Error reading CSV data: {}").format(str(e))) + +def get_sheet_headers(doc): + """Get headers from CSV file for the doc""" + if not doc.csv_file: + frappe.throw(_("No CSV file attached")) + + try: + file_doc = frappe.get_doc("File", {"file_url": doc.csv_file}) + file_path = file_doc.get_full_path() + return get_csv_headers(file_path) + except Exception as e: + frappe.throw(_("Error reading CSV headers: {}").format(str(e))) + +# ========================================== +# MAPPING HELPERS +# ========================================== + +def normalize_column(value): + """Normalize column names for fuzzy mapping and aliases""" + if not value: + return "" + return ( + str(value) + .strip() + .lower() + .replace(" ", "") + .replace("_", "") + .replace("-", "") + ) + +def build_alias_map_for_doctype(doctype): + """Build alias mapping dictionary for a specific target DocType""" + alias_map = {} + try: + mapping_docs = frappe.get_all( + "Lightning Field Mapping", + filters={"reference_doctype": doctype}, + fields=["name"] + ) + for mapping_doc in mapping_docs: + mapping = frappe.get_doc("Lightning Field Mapping", mapping_doc.name) + for row in mapping.mappings: + if row.alternate_name and row.field_name: + alias_map[normalize_column(row.alternate_name)] = row.field_name + except Exception as e: + frappe.log_error(frappe.get_traceback(), "Error building alias map") + return alias_map + +def auto_map_headers_for_doctype(headers, doctype): + """Generate automatic field mapping from headers for a specific target DocType""" + print("Headers:", headers) + print("Target:", doctype) + + meta = frappe.get_meta(doctype) + doctype_fields_meta = meta.get("fields", {"fieldtype": ["not in", ['Section Break', 'Column Break', 'Tab Break', 'Fold']]}) + required_fields = [f.fieldname for f in doctype_fields_meta if f.reqd] + + detailed_fields = get_detailed_doctype_fields(doctype) + alias_map = build_alias_map_for_doctype(doctype) + print("Alias Map:", alias_map) + + normalized_field_map = {} + def normalize_legacy(s): + if not s: + return "" + return s.lower().replace("_", " ").replace("-", " ") + + for f in detailed_fields: + if f.get('fieldname'): + normalized_field_map[normalize_legacy(f.get('fieldname'))] = f.get('fieldname') + if f.get('label'): + normalized_field_map[normalize_legacy(f.get('label'))] = f.get('fieldname') + + normalized_field_map['id'] = 'name' + normalized_field_map['name'] = 'first_name' + + auto_mapping = {} + for header in headers: + # Step 1: Exact fieldname or label match (Legacy) + header_norm_legacy = normalize_legacy(header) + mapped_field = normalized_field_map.get(header_norm_legacy, "") + + # Step 2: Alias match + header_norm_column = normalize_column(header) + if not mapped_field: + mapped_field = alias_map.get(header_norm_column) + + auto_mapping[header] = mapped_field or "" + + print("Generated Mapping:", auto_mapping) + + mapped_fields = [v for v in auto_mapping.values() if v] + unmapped_required = [f for f in required_fields if f not in mapped_fields] + + return { + "mapping": auto_mapping, + "unmapped_required": unmapped_required + } + +# ========================================== +# ROW HELPERS +# ========================================== + +def map_rows_for_doctype(raw_rows, mapping): + """Map CSV rows using the field mapping dictionary""" + mapped_rows = [] + if not mapping: + return mapped_rows + + if isinstance(mapping, str): + mapping = json.loads(mapping) + + for idx, row in enumerate(raw_rows, start=1): + mapped_row = {} + for csv_field, doctype_field in mapping.items(): + if doctype_field: # Only map if field is not empty + mapped_row[doctype_field] = row.get(csv_field, None) + # Inject meta-fields for precise row tracking + mapped_row["__csv_row_number__"] = idx + mapped_row["__original_row__"] = row + mapped_rows.append(mapped_row) + return mapped_rows + +def prepare_records(import_doctype, rows): + """Prepare, type-convert, and validate mapped rows for a DocType""" + meta = frappe.get_meta(import_doctype) + field_types = {f.fieldname: f.fieldtype for f in meta.fields} + required_fields = [f.fieldname for f in meta.fields if f.reqd] + + records_to_process = [] + failed_rows = [] + + for row in rows: + try: + converted_data = {} + for field, value in row.items(): + # Bypass metadata fields from strict validation + if field.startswith("__"): + converted_data[field] = value + continue + + if field in field_types: + field_type = field_types[field] + try: + if value is not None and str(value).strip() != "": + if field_type == "Int": converted_data[field] = int(float(value)) + elif field_type == "Float": converted_data[field] = float(value) + elif field_type == "Date": converted_data[field] = frappe.utils.getdate(value) + elif field_type == "Datetime": converted_data[field] = frappe.utils.get_datetime(value) + else: converted_data[field] = value + else: + converted_data[field] = None + except (ValueError, TypeError): + raise ValueError(f"Invalid value for field {field}: {value}") + else: + converted_data[field] = value + + missing_fields = [f for f in required_fields if not converted_data.get(f)] + if missing_fields: + raise ValueError(f"Missing required fields: {', '.join(missing_fields)}") + + if 'owner' not in converted_data: converted_data['owner'] = frappe.session.user + if 'modified_by' not in converted_data: converted_data['modified_by'] = frappe.session.user + if 'creation' not in converted_data: converted_data['creation'] = frappe.utils.now() + if 'modified' not in converted_data: converted_data['modified'] = frappe.utils.now() + + # Validate row data via hooks + if LightningUploadSettings.get_validate_from_hook(): + for method in frappe.get_hooks('lightning_import_validate_row'): + frappe.call(method, data=converted_data, doctype=import_doctype, import_type=None) + + records_to_process.append(converted_data) + + except Exception as e: + failed_rows.append({'row': row, 'error': str(e)}) + + return records_to_process, failed_rows + +# ========================================== +# DATABASE HELPERS +# ========================================== + +def execute_bulk_insert(import_doctype, records): + """Perform bulk INSERT operation on the database""" + if not records: + return + + meta = frappe.get_meta(import_doctype) + all_fields = ['name', 'owner', 'modified_by', 'creation', 'modified'] + [f.fieldname for f in meta.fields] + + # Exclude helper metadata fields beginning with '__' + fields = sorted(list(set(k for r in records for k in r.keys() if k in all_fields and not k.startswith("__")))) + + values_list = [] + for record in records: + row_values = [frappe.db.escape(cstr(record.get(f))) for f in fields] + values_list.append(f"({', '.join(row_values)})") + + sql = f""" + INSERT INTO `tab{import_doctype}` (`{ '`, `'.join(fields) }`) + VALUES {', '.join(values_list)} + """ + frappe.db.sql(sql) + +def execute_bulk_update(import_doctype, records): + """Perform bulk UPDATE operation using CASE WHEN statements""" + if not records: + return + + meta = frappe.get_meta(import_doctype) + updatable_fields = [f.fieldname for f in meta.fields] + ['modified', 'modified_by'] + + fields_to_update = sorted(list(set( + k for r in records for k in r.keys() if k in updatable_fields and not k.startswith("__") + ))) + + if not fields_to_update: + return + + set_clauses = [] + for field in fields_to_update: + case_statements = [ + f"WHEN `name` = {frappe.db.escape(record['name'])} THEN {frappe.db.escape(cstr(record.get(field)))}" + for record in records if record.get('name') and record.get(field) is not None + ] + + if case_statements: + set_clauses.append(f"`{field}` = CASE {' '.join(case_statements)} ELSE `{field}` END") + + if not set_clauses: + return + + names_to_update = [frappe.db.escape(record['name']) for record in records if record.get('name')] + unique_names = list(set(names_to_update)) + + sql = f""" + UPDATE `tab{import_doctype}` + SET {', '.join(set_clauses)} + WHERE `name` IN ({', '.join(unique_names)}) + """ + frappe.db.sql(sql) + +# ========================================== +# MAIN REUSABLE IMPORT ENGINE +# ========================================== + +def import_rows_for_doctype(import_config, raw_rows): + """ + Main reusable import engine. + Imports raw_rows into the target doctype according to import_config. + """ + import_doctype = import_config.get("import_doctype") + import_type = import_config.get("import_type") + field_mapping = import_config.get("field_mapping") + update_on_field = import_config.get("update_on_field") + + # 1. Map rows + mapped_rows = map_rows_for_doctype(raw_rows, field_mapping) + + # 2. Filter out rows with no mapped data + valid_mapped_rows = [] + for r in mapped_rows: + if any(val is not None and str(val).strip() != "" for k, val in r.items() if not k.startswith("__")): + valid_mapped_rows.append(r) + + if not valid_mapped_rows: + return {"success_count": 0, "failed_rows": []} + + # 3. Prepare records + records_to_process, failed_rows = prepare_records(import_doctype, valid_mapped_rows) + + if not records_to_process: + return {"success_count": 0, "failed_rows": failed_rows} + + success_count = 0 + try: + if import_type == "Insert and Update Records": + mapping = json.loads(field_mapping) if isinstance(field_mapping, str) else field_mapping + update_on_csv_col = update_on_field + if not update_on_csv_col: + raise ValueError("Validate On CSV Column not specified for 'Insert and Update' mode.") + + mapped_update_field = mapping.get(update_on_csv_col) + if not mapped_update_field: + raise ValueError(f"The selected update column '{update_on_csv_col}' is not mapped to any DocType field.") + + keys_to_check = list(set([rec.get(mapped_update_field) for rec in records_to_process if rec.get(mapped_update_field)])) + + existing_docs_map = {} + if keys_to_check: + existing = frappe.get_all( + import_doctype, + filters={mapped_update_field: ['in', keys_to_check]}, + fields=['name', mapped_update_field] + ) + existing_docs_map = {doc[mapped_update_field]: doc.name for doc in existing} + + to_insert = [] + to_update = [] + for record in records_to_process: + key_value = record.get(mapped_update_field) + if key_value in existing_docs_map: + record['name'] = existing_docs_map[key_value] + to_update.append(record) + else: + # Generate unique docname + timestamp = int(time.time() * 1000000) + random_suffix = random.randint(100000, 999999) + record['name'] = f"{import_doctype}-{timestamp}{random_suffix}" + to_insert.append(record) + + UPDATE_CHUNK_SIZE = 1000 + if to_update: + for i in range(0, len(to_update), UPDATE_CHUNK_SIZE): + chunk = to_update[i:i + UPDATE_CHUNK_SIZE] + execute_bulk_update(import_doctype, chunk) + + if to_insert: + execute_bulk_insert(import_doctype, to_insert) + + success_count = len(to_insert) + len(to_update) + + elif import_type == "Insert New Records": + for record in records_to_process: + timestamp = int(time.time() * 1000000) + random_suffix = random.randint(100000, 999999) + record['name'] = f"{import_doctype}-{timestamp}{random_suffix}" + execute_bulk_insert(import_doctype, records_to_process) + success_count = len(records_to_process) + + elif import_type == "Update Existing Records": + UPDATE_CHUNK_SIZE = 1000 + for i in range(0, len(records_to_process), UPDATE_CHUNK_SIZE): + chunk = records_to_process[i:i + UPDATE_CHUNK_SIZE] + execute_bulk_update(import_doctype, chunk) + success_count = len(records_to_process) + + except Exception as e: + for record in records_to_process: + failed_rows.append({'row': record, 'error': str(e)}) + success_count = 0 + + return {'success_count': success_count, 'failed_rows': failed_rows} + +# ========================================== +# DOCTYPE CONTROLLER +# ========================================== + class LightningUpload(Document): def validate(self): """Validate the document before save""" if self.csv_file: self.validate_csv_file() + def validate_mappings(self): + """Explicit mapping validation before starting the import process""" + if not self.field_mapping: + frappe.throw(_("Please map fields before starting import")) + meta = frappe.get_meta(self.import_doctype) + required_fields = [f.fieldname for f in meta.fields if f.reqd] + mapping = json.loads(self.field_mapping) + mapped_fields = [v for v in mapping.values() if v] + unmapped_required = [f for f in required_fields if f not in mapped_fields] + if unmapped_required: + frappe.throw(_("Please map all required fields for {0}: {1}").format(self.import_doctype, ", ".join(unmapped_required))) + if self.import_type == "Update Existing Records" and 'name' not in mapped_fields: + frappe.throw(_("For 'Update Existing Records', the target field 'name' (ID) must be mapped.")) + def validate_csv_file(self): """Validate if the uploaded file is a valid CSV file""" - # Get file content try: file_doc = frappe.get_doc("File", {"file_url": self.csv_file}) file_path = file_doc.get_full_path() @@ -32,16 +415,13 @@ def validate_csv_file(self): frappe.throw(_("Error accessing file: {}".format(str(e)))) return - # Check file extension file_ext = os.path.splitext(file_path)[1].lower() if file_ext != '.csv': frappe.throw(_("Please upload a CSV file. Current file type: {}".format(file_ext))) return - # Try to read the file as CSV try: with open(file_path, 'r', encoding='utf-8') as csvfile: - # Try to read first few lines to validate CSV format reader = csv.reader(csvfile) header = next(reader, None) @@ -49,8 +429,7 @@ def validate_csv_file(self): frappe.throw(_("CSV file is empty")) return - # Try to read a few rows to ensure it's properly formatted - for i in range(5): # Check first 5 rows + for i in range(5): try: next(reader) except StopIteration: @@ -69,221 +448,90 @@ def validate_csv_file(self): def get_csv_data(self): """Get CSV data as list of dictionaries""" - file_doc = frappe.get_doc("File", {"file_url": self.csv_file}) - file_path = file_doc.get_full_path() - - with open(file_path, 'r', encoding='utf-8') as csvfile: - reader = csv.DictReader(csvfile) - return list(reader) + return get_raw_sheet_rows(self) def get_mapped_data(self): """Return list of mapped CSV rows using saved field mapping""" raw_rows = self.get_csv_data() - - # Load mapping (JSON string to dict) - if not self.field_mapping: - frappe.throw("Field mapping is not defined") - - mapping = json.loads(self.field_mapping) - mapped_rows = [] - - for row in raw_rows: - mapped_row = {} - for csv_field, doctype_field in mapping.items(): - if doctype_field: # Only map if field is not empty (not restricted) - mapped_row[doctype_field] = row.get(csv_field, None) - mapped_rows.append(mapped_row) - - return mapped_rows + return map_rows_for_doctype(raw_rows, self.field_mapping) def generate_docname(self, row_data): """Generate a unique docname based on row data""" - # Get current timestamp in microseconds timestamp = int(time.time() * 1000000) - # Generate a random 6-digit number random_suffix = random.randint(100000, 999999) - # Combine timestamp and random number for absolute uniqueness - unique_id = f"{timestamp}{random_suffix}" - return f"{self.import_doctype}-{unique_id}" + return f"{self.import_doctype}-{timestamp}{random_suffix}" def insert_records(self, rows): - """Insert or update records in bulk using SQL, with a configurable update key.""" - success_count = 0 - failed_rows = [] - - meta = frappe.get_meta(self.import_doctype) - field_types = {f.fieldname: f.fieldtype for f in meta.fields} - required_fields = [f.fieldname for f in meta.fields if f.reqd] - - # Step 1: Prepare all rows first (data conversion, validation) - records_to_process = [] - for row in rows: - try: - converted_data = {} - for field, value in row.items(): - if field in field_types: - field_type = field_types[field] - try: - if value: - if field_type == "Int": converted_data[field] = int(value) - elif field_type == "Float": converted_data[field] = float(value) - elif field_type == "Date": converted_data[field] = frappe.utils.getdate(value) - elif field_type == "Datetime": converted_data[field] = frappe.utils.get_datetime(value) - else: converted_data[field] = value - else: - converted_data[field] = None - except (ValueError, TypeError): - raise ValueError(f"Invalid value for field {field}: {value}") - else: - converted_data[field] = value - - missing_fields = [f for f in required_fields if not converted_data.get(f)] - if missing_fields: - raise ValueError(f"Missing required fields: {', '.join(missing_fields)}") - - if 'owner' not in converted_data: converted_data['owner'] = frappe.session.user - if 'modified_by' not in converted_data: converted_data['modified_by'] = frappe.session.user - if 'creation' not in converted_data: converted_data['creation'] = frappe.utils.now() - if 'modified' not in converted_data: converted_data['modified'] = frappe.utils.now() - - self.validate_row_data(converted_data) - - records_to_process.append(converted_data) - - except Exception as e: - failed_rows.append({'row': row, 'error': str(e)}) - - if not records_to_process: - return {'success_count': 0, 'failed_rows': failed_rows} - - # Step 2: Main logic fork based on the selected import type - try: - if self.import_type == "Insert and Update Records": - mapping = json.loads(self.field_mapping) - update_on_csv_col = self.update_on_field - if not update_on_csv_col: - raise ValueError("Validate On CSV Column not specified for 'Insert and Update' mode.") - - mapped_update_field = mapping.get(update_on_csv_col) - if not mapped_update_field: - raise ValueError(f"The selected update column '{update_on_csv_col}' is not mapped to any DocType field.") - - keys_to_check = list(set([rec.get(mapped_update_field) for rec in records_to_process if rec.get(mapped_update_field)])) - - existing_docs_map = {} - if keys_to_check: - existing = frappe.get_all( - self.import_doctype, - filters={mapped_update_field: ['in', keys_to_check]}, - fields=['name', mapped_update_field] - ) - existing_docs_map = {doc[mapped_update_field]: doc.name for doc in existing} - - to_insert = [] - to_update = [] - for record in records_to_process: - key_value = record.get(mapped_update_field) - if key_value in existing_docs_map: - record['name'] = existing_docs_map[key_value] - to_update.append(record) - else: - record['name'] = self.generate_docname(record) - to_insert.append(record) - - # --- MODIFICATION: Process updates in smaller chunks --- - UPDATE_CHUNK_SIZE = 1000 # Adjust this value as needed - if to_update: - for i in range(0, len(to_update), UPDATE_CHUNK_SIZE): - chunk = to_update[i:i + UPDATE_CHUNK_SIZE] - self._execute_bulk_update(chunk) - - if to_insert: - self._execute_bulk_insert(to_insert) - - success_count = len(to_insert) + len(to_update) - - elif self.import_type == "Insert New Records": - for record in records_to_process: - record['name'] = self.generate_docname(record) - self._execute_bulk_insert(records_to_process) - success_count = len(records_to_process) - - elif self.import_type == "Update Existing Records": - # --- MODIFICATION: Process updates in smaller chunks --- - UPDATE_CHUNK_SIZE = 1000 # Adjust this value as needed - for i in range(0, len(records_to_process), UPDATE_CHUNK_SIZE): - chunk = records_to_process[i:i + UPDATE_CHUNK_SIZE] - self._execute_bulk_update(chunk) - success_count = len(records_to_process) - - except Exception as e: - # Frappe will handle rollback in the calling function - for record in records_to_process: - failed_rows.append({'row': record, 'error': str(e)}) - success_count = 0 - - return {'success_count': success_count, 'failed_rows': failed_rows} - def _execute_bulk_insert(self, records): - """Helper function to perform a bulk INSERT operation.""" - if not records: return - - meta = frappe.get_meta(self.import_doctype) - all_fields = ['name', 'owner', 'modified_by', 'creation', 'modified'] + [f.fieldname for f in meta.fields] - fields = sorted(list(set(k for r in records for k in r.keys() if k in all_fields))) - - values_list = [] - for record in records: - row_values = [frappe.db.escape(cstr(record.get(f))) for f in fields] - values_list.append(f"({', '.join(row_values)})") + """Insert or update records in bulk using SQL (for backward compatibility)""" + records_to_process, failed_rows = prepare_records(self.import_doctype, rows) + + if not records_to_process: + return {'success_count': 0, 'failed_rows': failed_rows} - sql = f""" - INSERT INTO `tab{self.import_doctype}` (`{'`, `'.join(fields)}`) - VALUES {', '.join(values_list)} - """ - frappe.db.sql(sql) - - def _execute_bulk_update(self, records): - """Helper function to perform a single bulk UPDATE operation using CASE WHEN.""" - if not records: - return + success_count = 0 + try: + if self.import_type == "Insert and Update Records": + mapping = json.loads(self.field_mapping) + update_on_csv_col = self.update_on_field + if not update_on_csv_col: + raise ValueError("Validate On CSV Column not specified for 'Insert and Update' mode.") + + mapped_update_field = mapping.get(update_on_csv_col) + if not mapped_update_field: + raise ValueError(f"The selected update column '{update_on_csv_col}' is not mapped to any DocType field.") - meta = frappe.get_meta(self.import_doctype) - # Exclude system fields from being updated directly, except 'modified' and 'modified_by' - # 'name' is used for the WHERE clause, not for updating - updatable_fields = [f.fieldname for f in meta.fields] + ['modified', 'modified_by'] - - # Get a list of all fields present in at least one record to be updated - fields_to_update = sorted(list(set( - k for r in records for k in r.keys() if k in updatable_fields - ))) - - if not fields_to_update: - return - - set_clauses = [] - for field in fields_to_update: - # Build the CASE statement for each field - case_statements = [ - f"WHEN `name` = {frappe.db.escape(record['name'])} THEN {frappe.db.escape(cstr(record.get(field)))}" - for record in records if record.get('name') and record.get(field) is not None - ] - - if case_statements: - set_clauses.append(f"`{field}` = CASE {' '.join(case_statements)} ELSE `{field}` END") + keys_to_check = list(set([rec.get(mapped_update_field) for rec in records_to_process if rec.get(mapped_update_field)])) + + existing_docs_map = {} + if keys_to_check: + existing = frappe.get_all( + self.import_doctype, + filters={mapped_update_field: ['in', keys_to_check]}, + fields=['name', mapped_update_field] + ) + existing_docs_map = {doc[mapped_update_field]: doc.name for doc in existing} + + to_insert = [] + to_update = [] + for record in records_to_process: + key_value = record.get(mapped_update_field) + if key_value in existing_docs_map: + record['name'] = existing_docs_map[key_value] + to_update.append(record) + else: + record['name'] = self.generate_docname(record) + to_insert.append(record) + + UPDATE_CHUNK_SIZE = 1000 + if to_update: + for i in range(0, len(to_update), UPDATE_CHUNK_SIZE): + chunk = to_update[i:i + UPDATE_CHUNK_SIZE] + execute_bulk_update(self.import_doctype, chunk) + + if to_insert: + execute_bulk_insert(self.import_doctype, to_insert) + + success_count = len(to_insert) + len(to_update) + + elif self.import_type == "Insert New Records": + for record in records_to_process: + record['name'] = self.generate_docname(record) + execute_bulk_insert(self.import_doctype, records_to_process) + success_count = len(records_to_process) + + elif self.import_type == "Update Existing Records": + UPDATE_CHUNK_SIZE = 1000 + for i in range(0, len(records_to_process), UPDATE_CHUNK_SIZE): + chunk = records_to_process[i:i + UPDATE_CHUNK_SIZE] + execute_bulk_update(self.import_doctype, chunk) + success_count = len(records_to_process) + + except Exception as e: + for record in records_to_process: + failed_rows.append({'row': record, 'error': str(e)}) + success_count = 0 - if not set_clauses: - return - - # Collect all the document names for the WHERE clause - names_to_update = [frappe.db.escape(record['name']) for record in records if record.get('name')] - unique_names = list(set(names_to_update)) - - sql = f""" - UPDATE `tab{self.import_doctype}` - SET {', '.join(set_clauses)} - WHERE `name` IN ({', '.join(unique_names)}) - """ - frappe.db.sql(sql) + return {'success_count': success_count, 'failed_rows': failed_rows} def generate_error_file(self, failed_rows): """Generate a CSV file containing failed rows with error messages""" @@ -295,13 +543,18 @@ def generate_error_file(self, failed_rows): with os.fdopen(fd, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) - headers = list(failed_rows[0]['row'].keys()) + # Get original columns if available + row_example = failed_rows[0]['row'] + headers = [k for k in row_example.keys() if not k.startswith("__")] headers.extend(['Error Message', 'Row Number']) writer.writerow(headers) - for idx, failed_row in enumerate(failed_rows, 1): - row_data = list(failed_row['row'].values()) - row_data.extend([failed_row['error'], idx]) + for failed_row in failed_rows: + r_dict = failed_row['row'] + row_data = [r_dict.get(h) for h in headers if not h.startswith("__")] + + row_num = r_dict.get('__csv_row_number__', '') + row_data.extend([failed_row['error'], row_num]) writer.writerow(row_data) with open(path, 'rb') as f: @@ -329,6 +582,10 @@ def validate_row_data(self, data): for method in frappe.get_hooks('lightning_import_validate_row'): frappe.call(method, data=data, doctype=self.import_doctype, import_type=self.import_type) +# ========================================== +# PUBLIC API UTILITIES +# ========================================== + def get_detailed_doctype_fields(doctype): """Internal helper to get field details including labels.""" meta = frappe.get_meta(doctype) @@ -345,7 +602,6 @@ def get_detailed_doctype_fields(doctype): ] return detailed_fields - def get_doctype_fields(doctype): """Get all field names from a DocType""" meta = frappe.get_meta(doctype) @@ -364,18 +620,20 @@ def get_csv_headers(file_path): return [header.strip() for header in headers] except Exception as e: frappe.throw(f"Error reading CSV headers: {str(e)}") - + +# ========================================== +# WHITELISTED ENDPOINTS +# ========================================== + @frappe.whitelist() def process_import_queue(docname): - """Process the import in batches""" + """Process the single import in batches""" start_time = time.time() batch_timings = [] try: - # --- Get the doc once outside the loop --- doc = frappe.get_doc("Lightning Upload", docname) - # Update initial status and total records frappe.db.set_value("Lightning Upload", docname, "status", "In Progress") frappe.db.commit() @@ -397,7 +655,7 @@ def process_import_queue(docname): csv_data = doc.get_mapped_data() csv_time = round((time.time() - csv_start) * 1000, 2) total_rows = len(csv_data) - # Update total records + frappe.db.set_value("Lightning Upload", docname, "total_records", total_rows) frappe.db.commit() frappe.publish_realtime( @@ -427,7 +685,6 @@ def process_import_queue(docname): progress = min(100, int((i / total_rows) * 100)) insert_start = time.time() - # The doc.insert_records(batch) call implicitly uses the updated doc object result = doc.insert_records(batch) insert_time = round((time.time() - insert_start) * 1000, 2) @@ -446,7 +703,6 @@ def process_import_queue(docname): 'failed': len(result['failed_rows']) }) - # --- Update doc properties using set_value to avoid TimestampMismatchError --- frappe.db.set_value( "Lightning Upload", docname, @@ -479,18 +735,15 @@ def process_import_queue(docname): message=progress_data, user=frappe.session.user, after_commit=True - ) + ) time_taken = time.time() - start_time time_str = f"{int(time_taken)}s" if time_taken < 60 else f"{time_taken/60:.1f}m" - error_file_time = 0 if all_failed_rows: - error_start = time.time() - doc.error_log = json.dumps(all_failed_rows, indent=2) + doc.error_log = json.dumps([{"error": f['error'], "row": f['row']} for f in all_failed_rows], indent=2) frappe.db.set_value("Lightning Upload", docname, "error_log", doc.error_log) doc.generate_error_file(all_failed_rows) - error_file_time = round((time.time() - error_start) * 1000, 2) if failed_records == total_rows: final_status = "Failed" @@ -499,21 +752,12 @@ def process_import_queue(docname): else: final_status = "Completed" - # --- Final updates using set_value --- frappe.db.set_value( "Lightning Upload", docname, { "status": final_status, - "import_time": time_str, - "timing_details": json.dumps({ - "total_time_seconds": round(time_taken, 2), - "csv_load_time_ms": csv_time, - "error_file_time_ms": error_file_time, - "batch_timings": batch_timings, - "average_batch_time_ms": round(sum(b['total_time_ms'] for b in batch_timings) / len(batch_timings), 2) if batch_timings else 0, - "average_insert_time_ms": round(sum(b['insert_time_ms'] for b in batch_timings) / len(batch_timings), 2) if batch_timings else 0 - }, indent=2) + "import_time": time_str } ) frappe.db.commit() @@ -524,15 +768,9 @@ def process_import_queue(docname): "title": f"Import {final_status.lower()}", "progress_key": progress_key, "time_taken": time_str, - "total_records": total_rows, "successful_records": successful_records, "failed_records": failed_records, - "timing_details": { - "total_time_seconds": round(time_taken, 2), - "csv_load_time_ms": csv_time, - "error_file_time_ms": error_file_time, - "average_batch_time_ms": round(sum(b['total_time_ms'] for b in batch_timings) / len(batch_timings), 2) if batch_timings else 0 - } + "total_records": total_rows } frappe.cache().set_value(progress_key, final_progress) frappe.publish_realtime( @@ -544,25 +782,13 @@ def process_import_queue(docname): return { "status": "success", - "message": f"Import {final_status.lower()}. Successful: {successful_records}, Failed: {failed_records}, Time taken: {time_str}", - "time_taken": time_str, - "total_records": total_rows, - "successful_records": successful_records, - "failed_records": failed_records, - "timing_details": final_progress["timing_details"] + "message": f"Import completed. Successful: {successful_records}, Failed: {failed_records}. Time taken: {time_str}" } except Exception as e: - frappe.log_error(frappe.get_traceback(), "Lightning Import Error") + frappe.log_error(frappe.get_traceback(), "Lightning Import Processing Error") try: - frappe.db.set_value( - "Lightning Upload", - docname, - { - "status": "Failed", - "error_log": str(e) - } - ) + frappe.db.set_value("Lightning Upload", docname, "status", "Failed") frappe.db.commit() progress_key = f"lightning_import_{docname}" error_progress = { @@ -586,19 +812,6 @@ def process_import_queue(docname): "message": str(e) } -def normalize_column(value): - if not value: - return "" - - return ( - str(value) - .strip() - .lower() - .replace(" ", "") - .replace("_", "") - .replace("-", "") - ) - @frappe.whitelist() def auto_map_and_validate(docname): """ @@ -606,85 +819,12 @@ def auto_map_and_validate(docname): and validates if all required fields are mapped. """ doc = frappe.get_doc("Lightning Upload", docname) - - file_doc = frappe.get_doc("File", {"file_url": doc.csv_file}) - file_path = file_doc.get_full_path() - csv_headers = get_csv_headers(file_path) - - meta = frappe.get_meta(doc.import_doctype) - doctype_fields_meta = meta.get("fields", {"fieldtype": ["not in", ['Section Break', 'Column Break', 'Tab Break', 'Fold']]}) - required_fields = [f.fieldname for f in doctype_fields_meta if f.reqd] - - detailed_fields = get_detailed_doctype_fields(doc.import_doctype) - - mapping_docs = frappe.get_all( - "Lightning Field Mapping", - filters={ - "reference_doctype": doc.import_doctype - }, - fields=["name"] - ) - - alias_map = {} - - for mapping_doc in mapping_docs: - mapping = frappe.get_doc( - "Lightning Field Mapping", - mapping_doc.name - ) - - for row in mapping.mappings: - if row.alternate_name and row.field_name: - alias_map[ - normalize_column(row.alternate_name) - ] = row.field_name - - frappe.logger().info(f"Alias Map loaded: {alias_map}") - - normalized_field_map = {} - def normalize_legacy(s): - return s.lower().replace("_", " ").replace("-", " ") - - for f in detailed_fields: - if f.get('fieldname'): - normalized_field_map[normalize_legacy(f.get('fieldname'))] = f.get('fieldname') - if f.get('label'): - normalized_field_map[normalize_legacy(f.get('label'))] = f.get('fieldname') - - normalized_field_map['id'] = 'name' - normalized_field_map['name'] = 'first_name' - - auto_mapping = {} - for header in csv_headers: - # Step 1: Exact fieldname or label match (Legacy) - header_norm_legacy = normalize_legacy(header) - mapped_field = normalized_field_map.get(header_norm_legacy, "") - - # Step 2: Alias match (runs AFTER existing matches) - header_norm_column = normalize_column(header) - frappe.logger().info(f"Processing header: '{header}' | Legacy Norm: '{header_norm_legacy}' | Column Norm: '{header_norm_column}'") - - if not mapped_field: - mapped_field = alias_map.get(header_norm_column) - if mapped_field: - frappe.logger().info(f"Matched alias for '{header}': {mapped_field}") - - auto_mapping[header] = mapped_field or "" - - mapped_fields = [v for v in auto_mapping.values() if v] - unmapped_required = [f for f in required_fields if f not in mapped_fields] - - return { - "mapping": auto_mapping, - "unmapped_required": unmapped_required - } + headers = get_sheet_headers(doc) + return auto_map_headers_for_doctype(headers, doc.import_doctype) @frappe.whitelist() def start_import(docname, mapping=None): - """ - API endpoint to start the import process. - If a mapping is provided, it saves it before starting. - """ + """API endpoint to start the import process in the background""" try: doc = frappe.get_doc("Lightning Upload", docname) @@ -695,8 +835,7 @@ def start_import(docname, mapping=None): frappe.db.set_value("Lightning Upload", docname, "field_mapping", mapping) doc.field_mapping = mapping - if not doc.field_mapping: - frappe.throw(_("Please map fields before starting import")) + doc.validate_mappings() progress_key = f"lightning_import_{docname}" initial_progress = { @@ -709,15 +848,6 @@ def start_import(docname, mapping=None): } frappe.cache().set_value(progress_key, initial_progress) - frappe.db.set_value("Lightning Upload", docname, "status", "Queued", update_modified=False) - - frappe.publish_realtime( - event='import_progress', - message=initial_progress, - user=frappe.session.user, - after_commit=True - ) - frappe.enqueue( "lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.process_import_queue", docname=docname, @@ -733,9 +863,9 @@ def start_import(docname, mapping=None): } except Exception as e: - frappe.log_error(frappe.get_traceback(), "Lightning Import Error") + frappe.log_error(frappe.get_traceback(), "Lightning Import Start Error") try: - frappe.db.set_value("Lightning Upload", docname, "status", "Draft", update_modified=False) + frappe.db.set_value("Lightning Upload", docname, "status", "Draft") frappe.db.commit() except: pass @@ -766,16 +896,9 @@ def export_error_rows(docname): if not doc.error_log: frappe.throw(_("No error log available")) - failed_rows = json.loads(doc.error_log) - - file_url = doc.generate_error_file(failed_rows) - - if not file_url: - frappe.throw(_("No error file could be generated")) - return { "status": "success", - "file_url": file_url + "file_url": doc.error_file } except Exception as e: @@ -790,7 +913,6 @@ def get_csv_headers_for_upload(docname=None, file_url=None): """Return the CSV headers for a given Lightning Upload docname or file_url""" try: if not file_url and docname: - # Fallback to fetching via docname if file_url not provided if frappe.db.exists("Lightning Upload", docname): doc = frappe.get_doc("Lightning Upload", docname) file_url = doc.csv_file @@ -820,28 +942,19 @@ def save_field_mapping(docname, mapping): @frappe.whitelist() def check_file_duplicates(docname, mapping=None): - """ - Pre-import duplicate check. - - Checks if 'enable_file_duplicate_check' is on in Settings. - If so, it scans the specific CSV column selected in 'duplicate_check_field' - on the Lightning Upload document. - """ + """Pre-import duplicate check for single import.""" try: - # Check if feature is enabled in Settings settings = frappe.get_single("Lightning Upload Settings") if not settings.get("enable_file_duplicate_check"): return {"status": "success", "has_duplicates": False, "duplicates": [], "total_duplicate_rows": 0} doc = frappe.get_doc("Lightning Upload", docname) - # If no column was selected, skip check if not doc.get("duplicate_check_field"): return {"status": "success", "has_duplicates": False, "duplicates": [], "total_duplicate_rows": 0} csv_col_to_check = doc.duplicate_check_field - # Save mapping if provided if mapping: frappe.db.set_value("Lightning Upload", docname, "field_mapping", mapping) doc.field_mapping = mapping @@ -855,12 +968,11 @@ def check_file_duplicates(docname, mapping=None): duplicates = [] duplicate_row_numbers = set() - # Group 1-based row numbers by cell value value_to_rows = {} for idx, row in enumerate(raw_rows, start=1): val = str(row.get(csv_col_to_check, "") or "").strip() if val == "": - continue # skip blank cells + continue value_to_rows.setdefault(val, []).append(idx) dup_entries = [ @@ -873,7 +985,6 @@ def check_file_duplicates(docname, mapping=None): for entry in dup_entries: duplicate_row_numbers.update(entry["rows"]) - # Try to find the doctype field this maps to (for display) doctype_field = csv_col_to_check if doc.field_mapping: field_mapping = json.loads(doc.field_mapping) From cbde63e697bbcc4d49cd272668909fbb71094ea2 Mon Sep 17 00:00:00 2001 From: Ameen Date: Wed, 20 May 2026 11:02:53 +0000 Subject: [PATCH 05/26] feat: implement background Multiple Target queue import engine --- .../lightning_multi_import_target.js | 6 + .../lightning_multi_import_target.json | 119 ++++ .../lightning_multi_import_target.py | 8 + .../lightning_upload/lightning_upload.json | 38 +- .../lightning_upload/lightning_upload.py | 644 +++++++++++++++++- 5 files changed, 784 insertions(+), 31 deletions(-) create mode 100644 lightning_import/lightning_import/doctype/lightning_multi_import_target/lightning_multi_import_target.js create mode 100644 lightning_import/lightning_import/doctype/lightning_multi_import_target/lightning_multi_import_target.json create mode 100644 lightning_import/lightning_import/doctype/lightning_multi_import_target/lightning_multi_import_target.py diff --git a/lightning_import/lightning_import/doctype/lightning_multi_import_target/lightning_multi_import_target.js b/lightning_import/lightning_import/doctype/lightning_multi_import_target/lightning_multi_import_target.js new file mode 100644 index 0000000..57557bb --- /dev/null +++ b/lightning_import/lightning_import/doctype/lightning_multi_import_target/lightning_multi_import_target.js @@ -0,0 +1,6 @@ +// Copyright (c) 2026, Tridz Technologies Pvt Ltd and contributors +// For license information, please see license.txt + +frappe.ui.form.on("Lightning Multi Import Target", { + // custom logic for child table if needed +}); diff --git a/lightning_import/lightning_import/doctype/lightning_multi_import_target/lightning_multi_import_target.json b/lightning_import/lightning_import/doctype/lightning_multi_import_target/lightning_multi_import_target.json new file mode 100644 index 0000000..4530405 --- /dev/null +++ b/lightning_import/lightning_import/doctype/lightning_multi_import_target/lightning_multi_import_target.json @@ -0,0 +1,119 @@ +{ + "actions": [], + "allow_rename": 1, + "creation": "2026-05-20 05:00:00.000000", + "doctype": "DocType", + "engine": "InnoDB", + "field_order": [ + "enabled", + "execution_order", + "target_doctype", + "import_type", + "map_fields", + "field_mapping", + "update_on_field", + "duplicate_check_field", + "status", + "total_records", + "successful_records", + "failed_records", + "last_processed_row" + ], + "fields": [ + { + "default": "1", + "fieldname": "enabled", + "fieldtype": "Check", + "label": "Enabled" + }, + { + "fieldname": "execution_order", + "fieldtype": "Int", + "label": "Execution Order" + }, + { + "fieldname": "target_doctype", + "fieldtype": "Link", + "in_list_view": 1, + "label": "Target DocType", + "options": "DocType", + "reqd": 1 + }, + { + "fieldname": "import_type", + "fieldtype": "Select", + "in_list_view": 1, + "label": "Import Type", + "options": "Insert New Records\nUpdate Existing Records\nInsert and Update Records", + "reqd": 1 + }, + { + "fieldname": "map_fields", + "fieldtype": "Button", + "in_list_view": 1, + "label": "Map Fields" + }, + { + "fieldname": "field_mapping", + "fieldtype": "Code", + "label": "Field Mapping", + "options": "JSON", + "read_only": 1 + }, + { + "fieldname": "update_on_field", + "fieldtype": "Select", + "label": "Validate On CSV Column" + }, + { + "fieldname": "duplicate_check_field", + "fieldtype": "Select", + "label": "Check Duplicates in Column" + }, + { + "fieldname": "status", + "fieldtype": "Select", + "in_list_view": 1, + "label": "Status", + "options": "Draft\nQueued\nIn Progress\nCompleted\nFailed\nPartial Success", + "read_only": 1, + "default": "Draft" + }, + { + "fieldname": "total_records", + "fieldtype": "Int", + "label": "Total Records", + "read_only": 1 + }, + { + "fieldname": "successful_records", + "fieldtype": "Int", + "label": "Successful Records", + "read_only": 1 + }, + { + "fieldname": "failed_records", + "fieldtype": "Int", + "label": "Failed Records", + "read_only": 1 + }, + { + "fieldname": "last_processed_row", + "fieldtype": "Int", + "label": "Last Processed Row", + "read_only": 1 + } + ], + "istable": 1, + "modified": "2026-05-20 05:00:00.000000", + "modified_by": "Administrator", + "module": "Lightning Import", + "name": "Lightning Multi Import Target", + "naming_rule": "Expression", + "owner": "Administrator", + "permissions": [], + "row_format": "Dynamic", + "sort_field": "execution_order", + "sort_order": "ASC", + "states": [] +} diff --git a/lightning_import/lightning_import/doctype/lightning_multi_import_target/lightning_multi_import_target.py b/lightning_import/lightning_import/doctype/lightning_multi_import_target/lightning_multi_import_target.py new file mode 100644 index 0000000..00cb67f --- /dev/null +++ b/lightning_import/lightning_import/doctype/lightning_multi_import_target/lightning_multi_import_target.py @@ -0,0 +1,8 @@ +# Copyright (c) 2026, Tridz Technologies Pvt Ltd and contributors +# For license information, please see license.txt + +import frappe +from frappe.model.document import Document + +class LightningMultiImportTarget(Document): + pass diff --git a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.json b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.json index dfc4b39..0edb5d0 100644 --- a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.json +++ b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.json @@ -7,9 +7,12 @@ "engine": "InnoDB", "field_order": [ "status", + "single_import", + "multiple_import", "import_doctype", "import_type", "csv_file", + "multi_import_targets", "field_mapping", "total_data", "error_file", @@ -36,22 +39,36 @@ "read_only": 1 }, { + "default": "1", + "fieldname": "single_import", + "fieldtype": "Check", + "label": "Single Import", + "read_only_depends_on": "eval:doc.status && doc.status !== \"Draft\"" + }, + { + "default": "0", + "fieldname": "multiple_import", + "fieldtype": "Check", + "label": "Multiple Import", + "read_only_depends_on": "eval:doc.status && doc.status !== \"Draft\"" + }, + { + "depends_on": "eval:doc.single_import == 1", "fieldname": "import_doctype", "fieldtype": "Link", "in_list_view": 1, "label": "DocType", "options": "DocType", - "read_only_depends_on": "eval:doc.status && doc.status !== \"Draft\"", - "reqd": 1 + "read_only_depends_on": "eval:doc.status && doc.status !== \"Draft\"" }, { + "depends_on": "eval:doc.single_import == 1", "fieldname": "import_type", "fieldtype": "Select", "in_list_view": 1, "label": "Import Type", "options": "Insert New Records\nUpdate Existing Records\nInsert and Update Records", - "read_only_depends_on": "eval:doc.status && doc.status !== \"Draft\"", - "reqd": 1 + "read_only_depends_on": "eval:doc.status && doc.status !== \"Draft\"" }, { "fieldname": "csv_file", @@ -60,6 +77,14 @@ "read_only_depends_on": "eval:doc.status && doc.status !== \"Draft\"", "reqd": 1 }, + { + "depends_on": "eval:doc.multiple_import == 1", + "fieldname": "multi_import_targets", + "fieldtype": "Table", + "label": "Multi Import Targets", + "options": "Lightning Multi Import Target", + "read_only_depends_on": "eval:doc.status && doc.status !== \"Draft\"" + }, { "description": "Mapping between CSV columns and DocType fields (JSON)", "fieldname": "field_mapping", @@ -135,13 +160,14 @@ "read_only": 1 }, { - "depends_on": "eval:doc.import_type == \"Insert and Update Records\"", + "depends_on": "eval:doc.single_import == 1 && doc.import_type == \"Insert and Update Records\"", "description": "Select the unique field (like Email, Mobile No, etc.) to check if a record already exists.", "fieldname": "update_on_field", "fieldtype": "Select", "label": "Validate On CSV Column" }, { + "depends_on": "eval:doc.single_import == 1", "description": "Select the CSV column to check for duplicates within the file.", "fieldname": "duplicate_check_field", "fieldtype": "Select", @@ -152,7 +178,7 @@ "grid_page_length": 50, "index_web_pages_for_search": 1, "links": [], - "modified": "2026-05-07 19:10:24.449237", + "modified": "2026-05-20 05:00:00.000000", "modified_by": "Administrator", "module": "Lightning Import", "name": "Lightning Upload", diff --git a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py index f641f73..e83aeed 100644 --- a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py +++ b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.py @@ -388,22 +388,86 @@ def import_rows_for_doctype(import_config, raw_rows): class LightningUpload(Document): def validate(self): """Validate the document before save""" - if self.csv_file: - self.validate_csv_file() - + if not self.csv_file: + frappe.throw(_("CSV File is required.")) + + self.validate_csv_file() + + # Enforce one import mode selected + if not self.single_import and not self.multiple_import: + frappe.throw(_("Please select either Single Import or Multiple Import.")) + + if self.single_import and self.multiple_import: + frappe.throw(_("Both Single Import and Multiple Import cannot be selected together.")) + + if self.single_import: + if not self.import_doctype: + frappe.throw(_("DocType is required for Single Import.")) + if not self.import_type: + frappe.throw(_("Import Type is required for Single Import.")) + if self.import_type == "Insert and Update Records" and not self.update_on_field: + frappe.throw(_("Validate On CSV Column is required for 'Insert and Update Records' import type.")) + + elif self.multiple_import: + enabled_targets = [t for t in self.multi_import_targets if t.enabled] + if not enabled_targets: + frappe.throw(_("At least one enabled target row must exist for Multiple Import.")) + + for idx, target in enumerate(self.multi_import_targets, start=1): + if not target.enabled: + continue + if not target.target_doctype: + frappe.throw(_("Row #{0}: Target DocType is required.").format(idx)) + if not target.import_type: + frappe.throw(_("Row #{0}: Import Type is required.").format(idx)) + if target.import_type == "Insert and Update Records" and not target.update_on_field: + frappe.throw(_("Row #{0}: Validate On CSV Column is required for 'Insert and Update Records' import type.").format(idx)) + def validate_mappings(self): """Explicit mapping validation before starting the import process""" - if not self.field_mapping: - frappe.throw(_("Please map fields before starting import")) - meta = frappe.get_meta(self.import_doctype) - required_fields = [f.fieldname for f in meta.fields if f.reqd] - mapping = json.loads(self.field_mapping) - mapped_fields = [v for v in mapping.values() if v] - unmapped_required = [f for f in required_fields if f not in mapped_fields] - if unmapped_required: - frappe.throw(_("Please map all required fields for {0}: {1}").format(self.import_doctype, ", ".join(unmapped_required))) - if self.import_type == "Update Existing Records" and 'name' not in mapped_fields: - frappe.throw(_("For 'Update Existing Records', the target field 'name' (ID) must be mapped.")) + if self.single_import: + if not self.field_mapping: + frappe.throw(_("Please map fields before starting import")) + meta = frappe.get_meta(self.import_doctype) + required_fields = [f.fieldname for f in meta.fields if f.reqd] + mapping = json.loads(self.field_mapping) + mapped_fields = [v for v in mapping.values() if v] + unmapped_required = [f for f in required_fields if f not in mapped_fields] + if unmapped_required: + frappe.throw(_("Please map all required fields for {0}: {1}").format(self.import_doctype, ", ".join(unmapped_required))) + + if self.import_type in ["Update Existing Records", "Insert and Update Records"]: + has_name = 'name' in mapped_fields + has_update_on = False + if self.update_on_field: + has_update_on = bool(mapping.get(self.update_on_field)) + if not (has_name or has_update_on): + frappe.throw(_("For updating records, either the 'name' (ID) field or the configured 'update_on_field' ({0}) must be mapped.").format(self.update_on_field or "")) + + elif self.multiple_import: + enabled_targets = [t for t in self.multi_import_targets if t.enabled] + if not enabled_targets: + frappe.throw(_("At least one enabled target row must exist for Multiple Import.")) + for idx, target in enumerate(self.multi_import_targets, start=1): + if not target.enabled: + continue + if not target.field_mapping: + frappe.throw(_("Row #{0}: Field mapping is not defined. Please map fields for {1}.").format(idx, target.target_doctype)) + meta = frappe.get_meta(target.target_doctype) + required_fields = [f.fieldname for f in meta.fields if f.reqd] + mapping = json.loads(target.field_mapping) + mapped_fields = [v for v in mapping.values() if v] + unmapped_required = [f for f in required_fields if f not in mapped_fields] + if unmapped_required: + frappe.throw(_("Row #{0}: Please map all required fields for {1}: {2}").format(idx, target.target_doctype, ", ".join(unmapped_required))) + + if target.import_type in ["Update Existing Records", "Insert and Update Records"]: + has_name = 'name' in mapped_fields + has_update_on = False + if target.update_on_field: + has_update_on = bool(mapping.get(target.update_on_field)) + if not (has_name or has_update_on): + frappe.throw(_("Row #{0}: For updating records in {1}, either the 'name' (ID) field or the configured 'update_on_field' ({2}) must be mapped.").format(idx, target.target_doctype, target.update_on_field or "")) def validate_csv_file(self): """Validate if the uploaded file is a valid CSV file""" @@ -735,15 +799,18 @@ def process_import_queue(docname): message=progress_data, user=frappe.session.user, after_commit=True - ) + ) time_taken = time.time() - start_time time_str = f"{int(time_taken)}s" if time_taken < 60 else f"{time_taken/60:.1f}m" + error_file_time = 0 if all_failed_rows: - doc.error_log = json.dumps([{"error": f['error'], "row": f['row']} for f in all_failed_rows], indent=2) + error_start = time.time() + doc.error_log = json.dumps(all_failed_rows, indent=2) frappe.db.set_value("Lightning Upload", docname, "error_log", doc.error_log) doc.generate_error_file(all_failed_rows) + error_file_time = round((time.time() - error_start) * 1000, 2) if failed_records == total_rows: final_status = "Failed" @@ -757,7 +824,15 @@ def process_import_queue(docname): docname, { "status": final_status, - "import_time": time_str + "import_time": time_str, + "timing_details": json.dumps({ + "total_time_seconds": round(time_taken, 2), + "csv_load_time_ms": csv_time, + "error_file_time_ms": error_file_time, + "batch_timings": batch_timings, + "average_batch_time_ms": round(sum(b['total_time_ms'] for b in batch_timings) / len(batch_timings), 2) if batch_timings else 0, + "average_insert_time_ms": round(sum(b['insert_time_ms'] for b in batch_timings) / len(batch_timings), 2) if batch_timings else 0 + }, indent=2) } ) frappe.db.commit() @@ -768,9 +843,15 @@ def process_import_queue(docname): "title": f"Import {final_status.lower()}", "progress_key": progress_key, "time_taken": time_str, + "total_records": total_rows, "successful_records": successful_records, "failed_records": failed_records, - "total_records": total_rows + "timing_details": { + "total_time_seconds": round(time_taken, 2), + "csv_load_time_ms": csv_time, + "error_file_time_ms": error_file_time, + "average_batch_time_ms": round(sum(b['total_time_ms'] for b in batch_timings) / len(batch_timings), 2) if batch_timings else 0 + } } frappe.cache().set_value(progress_key, final_progress) frappe.publish_realtime( @@ -782,13 +863,25 @@ def process_import_queue(docname): return { "status": "success", - "message": f"Import completed. Successful: {successful_records}, Failed: {failed_records}. Time taken: {time_str}" + "message": f"Import {final_status.lower()}. Successful: {successful_records}, Failed: {failed_records}, Time taken: {time_str}", + "time_taken": time_str, + "total_records": total_rows, + "successful_records": successful_records, + "failed_records": failed_records, + "timing_details": final_progress["timing_details"] } except Exception as e: - frappe.log_error(frappe.get_traceback(), "Lightning Import Processing Error") + frappe.log_error(frappe.get_traceback(), "Lightning Import Error") try: - frappe.db.set_value("Lightning Upload", docname, "status", "Failed") + frappe.db.set_value( + "Lightning Upload", + docname, + { + "status": "Failed", + "error_log": str(e) + } + ) frappe.db.commit() progress_key = f"lightning_import_{docname}" error_progress = { @@ -824,7 +917,9 @@ def auto_map_and_validate(docname): @frappe.whitelist() def start_import(docname, mapping=None): - """API endpoint to start the import process in the background""" + """ + API endpoint to start the single import process. + """ try: doc = frappe.get_doc("Lightning Upload", docname) @@ -848,6 +943,16 @@ def start_import(docname, mapping=None): } frappe.cache().set_value(progress_key, initial_progress) + frappe.db.set_value("Lightning Upload", docname, "status", "Queued", update_modified=False) + frappe.db.commit() + + frappe.publish_realtime( + event='import_progress', + message=initial_progress, + user=frappe.session.user, + after_commit=True + ) + frappe.enqueue( "lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.process_import_queue", docname=docname, @@ -863,9 +968,9 @@ def start_import(docname, mapping=None): } except Exception as e: - frappe.log_error(frappe.get_traceback(), "Lightning Import Start Error") + frappe.log_error(frappe.get_traceback(), "Lightning Import Error") try: - frappe.db.set_value("Lightning Upload", docname, "status", "Draft") + frappe.db.set_value("Lightning Upload", docname, "status", "Draft", update_modified=False) frappe.db.commit() except: pass @@ -1007,4 +1112,493 @@ def check_file_duplicates(docname, mapping=None): except Exception as e: frappe.log_error(frappe.get_traceback(), "Lightning Import Duplicate Check Error") - return {"status": "error", "message": str(e)} \ No newline at end of file + return {"status": "error", "message": str(e)} + +# ========================================== +# NEW MULTIPLE IMPORT ENDPOINTS & HELPERS +# ========================================== + +def generate_multi_error_file(doc, all_failed_rows): + """Generate a single combined CSV error file for multiple target DocTypes""" + if not all_failed_rows: + return None + + fd, path = tempfile.mkstemp(suffix='.csv') + try: + with os.fdopen(fd, 'w', newline='', encoding='utf-8') as csvfile: + writer = csv.writer(csvfile) + + writer.writerow([ + 'Target DocType', + 'CSV Row Number', + 'Error Message', + 'Original CSV Row Values', + 'Mapped Row Values' + ]) + + for failed_row in all_failed_rows: + target_doctype = failed_row.get('target_doctype', '') + error_msg = failed_row.get('error', '') + row_dict = failed_row.get('row', {}) + + row_num = row_dict.get('__csv_row_number__', '') + original_values = row_dict.get('__original_row__', {}) + mapped_values = {k: v for k, v in row_dict.items() if k not in ['__csv_row_number__', '__original_row__']} + + writer.writerow([ + target_doctype, + row_num, + error_msg, + json.dumps(original_values), + json.dumps(mapped_values) + ]) + + with open(path, 'rb') as f: + file_content = f.read() + + file_doc = save_file( + fname=f"multi_error_log_{doc.name}.csv", + content=file_content, + dt="Lightning Upload", + dn=doc.name, + folder="Home/Attachments", + is_private=1 + ) + + frappe.db.set_value("Lightning Upload", doc.name, "error_file", file_doc.file_url) + return file_doc.file_url + + finally: + if os.path.exists(path): + os.unlink(path) + +@frappe.whitelist() +def check_multi_file_duplicates(docname): + """Check duplicates for all enabled target DocTypes in a multiple import""" + try: + settings = frappe.get_single("Lightning Upload Settings") + if not settings.get("enable_file_duplicate_check"): + return {"status": "success", "has_duplicates": False, "targets": []} + + doc = frappe.get_doc("Lightning Upload", docname) + if not doc.multiple_import: + return {"status": "success", "has_duplicates": False, "targets": []} + + raw_rows = get_raw_sheet_rows(doc) + if not raw_rows: + return {"status": "success", "has_duplicates": False, "targets": []} + + has_duplicates = False + targets_with_duplicates = [] + + for target in doc.multi_import_targets: + if not target.enabled: + continue + if not target.duplicate_check_field: + continue + + csv_col_to_check = target.duplicate_check_field + value_to_rows = {} + for idx, row in enumerate(raw_rows, start=1): + val = str(row.get(csv_col_to_check, "") or "").strip() + if val == "": + continue + value_to_rows.setdefault(val, []).append(idx) + + dup_entries = [ + {"value": val, "rows": idxs, "count": len(idxs)} + for val, idxs in value_to_rows.items() + if len(idxs) > 1 + ] + + if dup_entries: + has_duplicates = True + doctype_field = csv_col_to_check + if target.field_mapping: + mapping = json.loads(target.field_mapping) + doctype_field = mapping.get(csv_col_to_check) or csv_col_to_check + + targets_with_duplicates.append({ + "target_doctype": target.target_doctype, + "csv_column": csv_col_to_check, + "field": doctype_field, + "duplicate_values": dup_entries, + "count": len(dup_entries) + }) + + return { + "status": "success", + "has_duplicates": has_duplicates, + "targets": targets_with_duplicates + } + + except Exception as e: + frappe.log_error(frappe.get_traceback(), "Lightning Import Multi Duplicate Check Error") + return {"status": "error", "message": str(e)} + +@frappe.whitelist() +def auto_map_multi_import(docname): + """Generate field mapping for every enabled target DocType in multi_import_targets""" + try: + doc = frappe.get_doc("Lightning Upload", docname) + if not doc.multiple_import: + frappe.throw(_("Multiple Import is not enabled for this document.")) + + if not doc.csv_file: + frappe.throw(_("No CSV file attached.")) + + headers = get_sheet_headers(doc) + + for target in doc.multi_import_targets: + if not target.enabled: + continue + if not target.target_doctype: + continue + + mapping_res = auto_map_headers_for_doctype(headers, target.target_doctype) + target.field_mapping = json.dumps(mapping_res["mapping"]) + + doc.flags.ignore_validate = True + doc.save(ignore_permissions=True) + return {"status": "success", "message": _("Auto mapping completed for all enabled targets.")} + + except Exception as e: + frappe.log_error(frappe.get_traceback(), "Lightning Import Auto Map Multi Error") + return {"status": "error", "message": str(e)} + +@frappe.whitelist() +def start_multi_import(docname): + """API endpoint to start the multiple import process.""" + try: + doc = frappe.get_doc("Lightning Upload", docname) + + if doc.status != "Draft": + frappe.throw(_("Import can only be started from Draft status")) + + if not doc.multiple_import: + frappe.throw(_("Multiple Import must be enabled.")) + + doc.validate_mappings() + + progress_key = f"lightning_import_{docname}" + initial_progress = { + "status": "Queued", + "progress": 0, + "title": "Import queued...", + "progress_key": progress_key, + "multiple_import": True, + "successful_records": 0, + "failed_records": 0 + } + frappe.cache().set_value(progress_key, initial_progress) + + frappe.db.set_value("Lightning Upload", docname, "status", "Queued", update_modified=False) + frappe.db.commit() + + frappe.publish_realtime( + event='import_progress', + message=initial_progress, + user=frappe.session.user, + after_commit=True + ) + + frappe.enqueue( + "lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.process_multi_import_queue", + docname=docname, + now=False, + queue="long", + timeout=3600 + ) + + return { + "status": "success", + "message": _("Multiple Import process started successfully"), + "progress_key": progress_key + } + + except Exception as e: + frappe.log_error(frappe.get_traceback(), "Lightning Multi Import Error") + try: + frappe.db.set_value("Lightning Upload", docname, "status", "Draft", update_modified=False) + frappe.db.commit() + except: + pass + return { + "status": "error", + "message": str(e) + } + +@frappe.whitelist() +def process_multi_import_queue(docname): + """Background job to execute multiple target DocType imports""" + start_time = time.time() + + try: + doc = frappe.get_doc("Lightning Upload", docname) + if not doc.multiple_import: + frappe.throw(_("Multiple Import is not enabled for this document.")) + + frappe.db.set_value("Lightning Upload", docname, "status", "In Progress") + frappe.db.commit() + + progress_key = f"lightning_import_{docname}" + initial_progress = { + "status": "In Progress", + "progress": 0, + "title": "Starting multiple import...", + "progress_key": progress_key, + "multiple_import": True + } + frappe.cache().set_value(progress_key, initial_progress) + frappe.publish_realtime( + event='import_progress', + message=initial_progress, + user=frappe.session.user, + after_commit=True + ) + + raw_rows = get_raw_sheet_rows(doc) + total_raw_rows = len(raw_rows) + + enabled_targets = [t for t in doc.multi_import_targets if t.enabled] + enabled_targets.sort(key=lambda x: (x.execution_order if x.execution_order is not None and x.execution_order != "" else float('inf'), x.idx)) + + total_targets = len(enabled_targets) + if total_targets == 0: + frappe.throw(_("No enabled targets to import.")) + + # Initialize target statuses in database + for target in enabled_targets: + frappe.db.set_value("Lightning Multi Import Target", target.name, { + "status": "Queued", + "total_records": 0, + "successful_records": 0, + "failed_records": 0, + "last_processed_row": 0 + }, update_modified=False) + frappe.db.commit() + + all_failed_rows = [] + target_statuses = [] + batch_size = LightningUploadSettings.get_batch_size() + + overall_successful_records = 0 + overall_failed_records = 0 + + for t_idx, target in enumerate(enabled_targets): + target_doctype = target.target_doctype + import_type = target.import_type + field_mapping = target.field_mapping + update_on_field = target.update_on_field + + frappe.db.set_value("Lightning Multi Import Target", target.name, "status", "In Progress", update_modified=False) + frappe.db.commit() + + mapped_rows = map_rows_for_doctype(raw_rows, field_mapping) + + valid_indices_and_rows = [] + for idx, r in enumerate(mapped_rows, start=1): + if any(val is not None and str(val).strip() != "" for k, val in r.items() if not k.startswith("__")): + valid_indices_and_rows.append((idx, raw_rows[idx-1], r)) + + total_target_rows = len(valid_indices_and_rows) + + frappe.db.set_value("Lightning Multi Import Target", target.name, "total_records", total_target_rows, update_modified=False) + frappe.db.commit() + + successful_records = 0 + failed_records = 0 + + if total_target_rows == 0: + frappe.db.set_value("Lightning Multi Import Target", target.name, "status", "Completed", update_modified=False) + frappe.db.commit() + target_statuses.append("Completed") + continue + + for i in range(0, total_target_rows, batch_size): + batch_slice = valid_indices_and_rows[i:i + batch_size] + batch_raw_rows = [item[1] for item in batch_slice] + + import_config = { + "import_doctype": target_doctype, + "import_type": import_type, + "field_mapping": field_mapping, + "update_on_field": update_on_field + } + + result = import_rows_for_doctype(import_config, batch_raw_rows) + + successful_records += result['success_count'] + failed_records += len(result['failed_rows']) + + for failed_row in result['failed_rows']: + failed_row['target_doctype'] = target_doctype + all_failed_rows.append(failed_row) + + frappe.db.set_value( + "Lightning Multi Import Target", + target.name, + { + "successful_records": successful_records, + "failed_records": failed_records, + "last_processed_row": min(total_target_rows, i + batch_size) + }, + update_modified=False + ) + frappe.db.commit() + + overall_progress = min(100, int(((t_idx + (min(total_target_rows, i + batch_size) / total_target_rows)) / total_targets) * 100)) + + progress_data = { + "status": "In Progress", + "progress": overall_progress, + "title": f"Importing {target_doctype}... ({overall_progress}%)", + "progress_key": progress_key, + "multiple_import": True, + "current_target_doctype": target_doctype, + "total_targets": total_targets, + "current_target_index": t_idx + 1, + "target_status": "In Progress", + "target_successful_records": successful_records, + "target_failed_records": failed_records, + "target_total_records": total_target_rows + } + frappe.cache().set_value(progress_key, progress_data) + frappe.publish_realtime( + event='import_progress', + message=progress_data, + user=frappe.session.user, + after_commit=True + ) + + if failed_records == total_target_rows: + target_final_status = "Failed" + elif failed_records > 0: + target_final_status = "Partial Success" + else: + target_final_status = "Completed" + + frappe.db.set_value("Lightning Multi Import Target", target.name, "status", target_final_status, update_modified=False) + frappe.db.commit() + target_statuses.append(target_final_status) + + overall_successful_records += successful_records + overall_failed_records += failed_records + + time_taken = time.time() - start_time + time_str = f"{int(time_taken)}s" if time_taken < 60 else f"{time_taken/60:.1f}m" + + if all_failed_rows: + doc.error_log = json.dumps([ + { + "target_doctype": f.get("target_doctype"), + "error": f.get("error"), + "row_num": f.get("row", {}).get("__csv_row_number__"), + "original_row": f.get("row", {}).get("__original_row__") + } + for f in all_failed_rows + ], indent=2) + frappe.db.set_value("Lightning Upload", docname, "error_log", doc.error_log) + generate_multi_error_file(doc, all_failed_rows) + + if all(status == "Completed" for status in target_statuses): + final_status = "Completed" + elif all(status == "Failed" for status in target_statuses): + final_status = "Failed" + else: + final_status = "Partial Success" + + frappe.db.set_value( + "Lightning Upload", + docname, + { + "status": final_status, + "import_time": time_str, + "successful_records": overall_successful_records, + "failed_records": overall_failed_records, + "total_records": total_raw_rows + } + ) + frappe.db.commit() + + final_progress = { + "status": final_status, + "progress": 100, + "title": f"Multiple Import {final_status.lower()}", + "progress_key": progress_key, + "multiple_import": True, + "time_taken": time_str, + "successful_records": overall_successful_records, + "failed_records": overall_failed_records, + "total_records": total_raw_rows + } + frappe.cache().set_value(progress_key, final_progress) + frappe.publish_realtime( + event='import_progress', + message=final_progress, + user=frappe.session.user, + after_commit=True + ) + + return { + "status": "success", + "message": f"Multiple Import {final_status.lower()}. Successful: {overall_successful_records}, Failed: {overall_failed_records}, Time taken: {time_str}" + } + + except Exception as e: + frappe.log_error(frappe.get_traceback(), "Lightning Multiple Import Error") + try: + frappe.db.set_value( + "Lightning Upload", + docname, + { + "status": "Failed", + "error_log": str(e) + } + ) + frappe.db.commit() + progress_key = f"lightning_import_{docname}" + error_progress = { + "status": "Failed", + "progress": 0, + "title": "Multiple Import failed", + "progress_key": progress_key, + "multiple_import": True, + "error": str(e) + } + frappe.cache().set_value(progress_key, error_progress) + frappe.publish_realtime( + event='import_progress', + message=error_progress, + user=frappe.session.user, + after_commit=True + ) + except: + pass + return { + "status": "error", + "message": str(e) + } + +@frappe.whitelist() +def get_auto_mapping_for_doctype(docname, doctype): + """Get automatic field mapping for a specific target DocType""" + print("Docname:", docname) + print("Target DocType:", doctype) + try: + if not docname or docname.startswith("new-lightning-upload-"): + frappe.throw(_("Document name must be a saved document in the database.")) + + if not frappe.db.exists("Lightning Upload", docname): + frappe.throw(_("Lightning Upload {0} not found").format(docname)) + + doc = frappe.get_doc("Lightning Upload", docname) + headers = get_sheet_headers(doc) + print("Headers:", headers) + + res = auto_map_headers_for_doctype(headers, doctype) + print("Generated Mapping:", res.get("mapping", {})) + return res + except Exception as e: + frappe.log_error(frappe.get_traceback(), "Error getting auto mapping") + return {"mapping": {}, "unmapped_required": []} \ No newline at end of file From 5bb0cc77313a6adcd2e83a6115d187cb7bf7f962 Mon Sep 17 00:00:00 2001 From: Ameen Date: Wed, 20 May 2026 11:04:19 +0000 Subject: [PATCH 06/26] feat: implement centralized Combined Field Mapping dialog --- .../lightning_upload/lightning_upload.js | 921 +++++++++++++----- 1 file changed, 676 insertions(+), 245 deletions(-) diff --git a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.js b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.js index 28e1fb6..223684a 100644 --- a/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.js +++ b/lightning_import/lightning_import/doctype/lightning_upload/lightning_upload.js @@ -19,89 +19,186 @@ frappe.realtime.on('socket_disconnected', () => { console.log('[Lightning Import] Socket disconnected'); }); -frappe.ui.form.on('Lightning Upload', { - refresh: function (frm) { - // Store reference to current form - frappe.progress_state.current_form = frm; +async function ensure_doc_saved(frm) { + if (frm.is_new() || frm.is_dirty() || frm.doc.__unsaved) { + await frm.save(); + } +} - // Show Start Import button only when status is Draft and document is saved - if (!frm.is_new() && frm.doc.status === "Draft") { - frm.page.set_primary_action(__('Start Import'), () => { - start_import(frm); - }); - // Add Map Fields button if CSV file is attached +function setup_buttons(frm) { + if (!frm.page) return; + + frm.page.clear_primary_action(); + frm.clear_custom_buttons(); + + if (frm.doc.status === "Draft") { + if (frm.doc.single_import) { + // Keep standard Save action as primary if form is dirty, otherwise set Start Import as primary + if (frm.is_dirty() || frm.doc.__unsaved) { + frm.page.set_primary_action(__('Save'), () => frm.save()); + frm.add_custom_button(__('Start Import'), async () => { + await ensure_doc_saved(frm); + start_import(frm); + }); + } else { + frm.page.set_primary_action(__('Start Import'), async () => { + await ensure_doc_saved(frm); + start_import(frm); + }); + } if (frm.doc.csv_file) { - frm.add_custom_button(__('Map Fields'), () => { + frm.add_custom_button(__('Map Fields'), async () => { + await ensure_doc_saved(frm); open_field_mapping_dialog(frm); }); } - } else { - // Remove the primary action button if not in Draft status - frm.page.clear_primary_action(); + } else if (frm.doc.multiple_import) { + // Keep standard Save action as primary if form is dirty, otherwise set Start Multi Import as primary + if (frm.is_dirty() || frm.doc.__unsaved) { + frm.page.set_primary_action(__('Save'), () => frm.save()); + frm.add_custom_button(__('Start Multi Import'), async () => { + await ensure_doc_saved(frm); + start_multi_import(frm); + }); + } else { + frm.page.set_primary_action(__('Start Multi Import'), async () => { + await ensure_doc_saved(frm); + start_multi_import(frm); + }); + } + if (frm.doc.csv_file) { + frm.add_custom_button(__('Map Fields'), async () => { + await ensure_doc_saved(frm); + open_combined_multi_mapping_dialog(frm); + }); + } } + } + + // Show Export Error Rows button only for Failed or Partial Success + if (frm.doc.status === 'Failed' || frm.doc.status === 'Partial Success') { + frm.add_custom_button(__('Export Error Rows'), () => { + export_error_rows(frm); + }); + } +} + +// Centralized UI state handler that handles all visibilities, field options, and buttons +function update_import_mode_ui(frm) { + if (!frm || !frm.doc) return; - // Show Export Error Rows button only for Failed or Partial Success - if (frm.doc.status === 'Failed' || frm.doc.status === 'Partial Success') { - frm.add_custom_button(__('Export Error Rows'), () => { - export_error_rows(frm); + // 1. Toggle field visibilities and requirements + if (frm.doc.single_import) { + frm.set_df_property('import_doctype', 'reqd', 1); + frm.set_df_property('import_doctype', 'hidden', 0); + frm.set_df_property('import_type', 'reqd', 1); + frm.set_df_property('import_type', 'hidden', 0); + frm.set_df_property('update_on_field', 'hidden', frm.doc.import_type !== 'Insert and Update Records' ? 1 : 0); + frm.set_df_property('duplicate_check_field', 'hidden', 0); + frm.set_df_property('field_mapping', 'hidden', 0); + + frm.set_df_property('multi_import_targets', 'reqd', 0); + frm.set_df_property('multi_import_targets', 'hidden', 1); + } else if (frm.doc.multiple_import) { + frm.set_df_property('import_doctype', 'reqd', 0); + frm.set_df_property('import_doctype', 'hidden', 1); + frm.set_df_property('import_type', 'reqd', 0); + frm.set_df_property('import_type', 'hidden', 1); + frm.set_df_property('update_on_field', 'hidden', 1); + frm.set_df_property('duplicate_check_field', 'hidden', 1); + frm.set_df_property('field_mapping', 'hidden', 1); + + frm.set_df_property('multi_import_targets', 'reqd', 1); + frm.set_df_property('multi_import_targets', 'hidden', 0); + + // Hide Map Fields column inside child table grid + if (frm.fields_dict['multi_import_targets'] && frm.fields_dict['multi_import_targets'].grid) { + const grid = frm.fields_dict['multi_import_targets'].grid; + grid.docfields.forEach(df => { + if (df.fieldname === 'map_fields') { + df.hidden = 1; + } }); + grid.refresh(); } + } + + // 2. Populate CSV column dropdowns dynamically + if (frm.doc.csv_file) { + if (frm.doc.single_import) { + if (frm.doc.import_type === 'Insert and Update Records') { + frm.events.populate_update_on_field(frm); + } + frappe.db.get_single_value('Lightning Upload Settings', 'enable_file_duplicate_check').then(enabled => { + if (enabled) { + frm.set_df_property('duplicate_check_field', 'hidden', 0); + frm.events.populate_duplicate_check_field(frm); + } else { + frm.set_df_property('duplicate_check_field', 'hidden', 1); + } + }); + } else if (frm.doc.multiple_import) { + frm.events.populate_multi_import_selects(frm); + } + } + + // 3. Stably debounce rendering of buttons to ensure they do not get cleared by framework/workflow + if (frm._button_timeout) { + clearTimeout(frm._button_timeout); + } + frm._button_timeout = setTimeout(() => { + setup_buttons(frm); + }, 150); +} + +frappe.ui.form.on('Lightning Upload', { + refresh: function (frm) { + frappe.progress_state.current_form = frm; + + // Perform centralized UI updates + update_import_mode_ui(frm); // Set up progress tracking if import is in progress if (frm.doc.status === 'Queued' || frm.doc.status === 'In Progress') { setup_progress_tracking(frm); } - - // Also trigger the import_type logic on refresh - // to populate the dropdown if the form loads in the right state - if (frm.doc.import_type === 'Insert and Update Records' && frm.doc.csv_file) { - frm.events.populate_update_on_field(frm); - } - - // Manage Duplicate Check field visibility and options - frappe.db.get_single_value('Lightning Upload Settings', 'enable_file_duplicate_check').then(enabled => { - if (enabled) { - frm.set_df_property('duplicate_check_field', 'hidden', 0); - if (frm.doc.csv_file) { - frm.events.populate_duplicate_check_field(frm); - } - } else { - frm.set_df_property('duplicate_check_field', 'hidden', 1); - } - }); }, onload: function (frm) { - // Store reference to current form frappe.progress_state.current_form = frm; - // Set up progress tracking when form loads if import is in progress if (frm.doc.status === 'Queued' || frm.doc.status === 'In Progress') { setup_progress_tracking(frm); } + + update_import_mode_ui(frm); }, csv_file: function (frm) { - if (frm.doc.import_type === 'Insert and Update Records') { - if (frm.doc.csv_file) { - frm.events.populate_update_on_field(frm); - } - } - - frappe.db.get_single_value('Lightning Upload Settings', 'enable_file_duplicate_check').then(enabled => { - if (enabled && frm.doc.csv_file) { - frm.events.populate_duplicate_check_field(frm); - } - }); + update_import_mode_ui(frm); }, import_type: function (frm) { - if (frm.doc.import_type === 'Insert and Update Records') { - if (frm.doc.csv_file) { - frm.events.populate_update_on_field(frm); - } else { - frappe.msgprint(__('Please attach a CSV file first to select the update column.')); - } + update_import_mode_ui(frm); + }, + + single_import: function(frm) { + if (frm.doc.single_import) { + frm.set_value('multiple_import', 0); + update_import_mode_ui(frm); + } else if (!frm.doc.multiple_import) { + frm.set_value('single_import', 1); + frappe.msgprint(__('Please select either Single Import or Multiple Import.')); + } + }, + + multiple_import: function(frm) { + if (frm.doc.multiple_import) { + frm.set_value('single_import', 0); + update_import_mode_ui(frm); + } else if (!frm.doc.single_import) { + frm.set_value('multiple_import', 1); + frappe.msgprint(__('Please select either Single Import or Multiple Import.')); } }, @@ -121,21 +218,62 @@ frappe.ui.form.on('Lightning Upload', { }, populate_update_on_field: function (frm) { - // Fetch CSV headers from the backend frappe.call({ method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.get_csv_headers_for_upload', args: { file_url: frm.doc.csv_file }, callback: function (r) { if (r.message && r.message.status === 'success') { const headers = r.message.headers; - // Prepend a blank option const options = [''].concat(headers); - // Set the options for the dropdown and refresh the field frm.set_df_property('update_on_field', 'options', options); frm.refresh_field('update_on_field'); } } }); + }, + + populate_multi_import_selects: function (frm) { + if (!frm.doc.csv_file) return; + + frappe.call({ + method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.get_csv_headers_for_upload', + args: { file_url: frm.doc.csv_file }, + callback: function (r) { + if (r.message && r.message.status === 'success') { + const headers = r.message.headers; + const options = [''].concat(headers); + + if (frm.fields_dict['multi_import_targets'] && frm.fields_dict['multi_import_targets'].grid) { + const grid = frm.fields_dict['multi_import_targets'].grid; + + // Set on grid columns metadata + grid.docfields.forEach(df => { + if (df.fieldname === 'update_on_field' || df.fieldname === 'duplicate_check_field') { + df.options = options; + } + }); + + // Set on global form docfields metadata + const update_df = frappe.meta.get_docfield("Lightning Multi Import Target", "update_on_field", frm.docname); + if (update_df) update_df.options = options; + + const dup_df = frappe.meta.get_docfield("Lightning Multi Import Target", "duplicate_check_field", frm.docname); + if (dup_df) dup_df.options = options; + + grid.refresh(); + } + } + } + }); + } +}); + +// Grid row trigger mapping +frappe.ui.form.on('Lightning Multi Import Target', { + multi_import_targets_add: function(frm, cdt, cdn) { + if (frm.doc.csv_file) { + frm.events.populate_multi_import_selects(frm); + } } }); @@ -144,169 +282,109 @@ frappe.realtime.on('import_progress', function (data) { console.log('[Lightning Import] Received import_progress event:', data); const frm = frappe.progress_state.current_form; - if (!frm) { - console.log('[Lightning Import] No form found in progress_state'); - return; - } + if (!frm) return; - // If we have a progress key in the event, verify it matches if (data.progress_key) { const formProgressKey = `lightning_import_${frm.doc.name}`; - console.log('[Lightning Import] Progress key check:', { - received: data.progress_key, - expected: formProgressKey, - matches: data.progress_key === formProgressKey - }); - if (data.progress_key !== formProgressKey) { - console.log('[Lightning Import] Progress key mismatch, ignoring event'); - return; - } + if (data.progress_key !== formProgressKey) return; } - console.log('[Lightning Import] Updating progress with data:', data); update_progress(frm, data); }); function setup_progress_tracking(frm) { - console.log('[Lightning Import] Setting up progress tracking for form:', frm.doc.name); - - // Clear any existing progress bar if (frm.progress_bar) { - console.log('[Lightning Import] Removing existing progress bar'); frm.progress_bar.remove(); } - // Create progress bar container - console.log('[Lightning Import] Creating new progress bar'); frm.progress_bar = $(`
-
-
+
`).insertAfter(frm.page.main); } function update_progress(frm, data) { - console.log('[Lightning Import] update_progress called with:', { - form: frm.doc.name, - data: data, - hasProgressBar: !!frm.progress_bar - }); - - if (!data || !frm.progress_bar) { - console.log('[Lightning Import] Missing data or progress bar, skipping update'); - return; - } + if (!data || !frm.progress_bar) return; - // Update progress bar - console.log('[Lightning Import] Updating progress bar to:', data.progress + '%'); frm.progress_bar.find('.progress-bar') .css('width', `${data.progress}%`) .attr('aria-valuenow', data.progress); - frm.progress_bar.find('.progress-status').html(data.title); + + let titleHtml = data.title; + if (data.multiple_import && data.current_target_doctype) { + titleHtml = `Overall Progress: ${data.progress}%
` + + `Importing Target DocType: ${data.current_target_doctype} (${data.current_target_index}/${data.total_targets})
` + + `Processed: ${data.target_successful_records} Succeeded, ${data.target_failed_records} Failed (Total: ${data.target_total_records})`; + } + frm.progress_bar.find('.progress-status').html(titleHtml); - // Update status in form without marking as dirty if (data.status) { - console.log('[Lightning Import] Updating status to:', data.status); frm.doc.status = data.status; frm.refresh_field('status'); - // pill is updating only after update of workflow_state and header frm.refresh_field('workflow_state'); frm.refresh_header(); } - // Update other fields if available if (data.successful_records !== undefined) { - console.log('[Lightning Import] Updating successful_records to:', data.successful_records); frm.doc.successful_records = data.successful_records; frm.refresh_field('successful_records'); } if (data.failed_records !== undefined) { - console.log('[Lightning Import] Updating failed_records to:', data.failed_records); frm.doc.failed_records = data.failed_records; frm.refresh_field('failed_records'); } if (data.import_time) { - console.log('[Lightning Import] Updating import_time to:', data.import_time); frm.doc.import_time = data.import_time; frm.refresh_field('import_time'); } if (data.total_records !== undefined) { - console.log('[Lightning Import] Updating total_records to:', data.total_records); frm.doc.total_records = data.total_records; frm.refresh_field('total_records'); } - // Handle completion + // Refresh child table rows state in real-time + if (data.multiple_import && frm.fields_dict['multi_import_targets']) { + frm.reload_doc(); + } + if (data.status === 'Completed' || data.status === 'Failed' || data.status === 'Partial Success') { - console.log('[Lightning Import] Import completed with status:', data.status); let message = ''; if (data.status === 'Completed') { message = __(`Successfully imported ${data.successful_records} records`); - if (data.time_taken) { - message += __(`, time taken: ${data.time_taken}`); - } + if (data.time_taken) message += __(`, time taken: ${data.time_taken}`); } else if (data.status === 'Partial Success') { message = __(`Import partially completed. ${data.successful_records} records imported, ${data.failed_records} failed`); - if (data.time_taken) { - message += __(`, time taken: ${data.time_taken}`); - } + if (data.time_taken) message += __(`, time taken: ${data.time_taken}`); } else { message = __('Import failed'); - if (data.error) { - message += `: ${data.error}`; - } + if (data.error) message += `: ${data.error}`; } - console.log('[Lightning Import] Showing completion alert:', message); frappe.show_alert({ message: message, indicator: data.status === 'Completed' ? 'green' : (data.status === 'Partial Success' ? 'orange' : 'red'), timeout: 10 }); - // Remove progress bar if (frm.progress_bar) { - console.log('[Lightning Import] Removing progress bar after completion'); frm.progress_bar.remove(); frm.progress_bar = null; } - // Update buttons based on status - if (data.status === 'Failed' || data.status === 'Partial Success') { - // Check if the Export Error Rows button already exists - const hasExportButton = frm.page.custom_buttons && - frm.page.custom_buttons.some(btn => btn.label === __('Export Error Rows')); - - if (!hasExportButton) { - frm.add_custom_button(__('Export Error Rows'), () => { - export_error_rows(frm); - }); - } - } - - // Refresh primary action button - if (data.status === 'Draft') { - frm.page.set_primary_action(__('Start Import'), () => { - start_import(frm); - }); - } else { - frm.page.clear_primary_action(); - } - - // Force a full form refresh after completion setTimeout(() => { frm.reload_doc(); }, 1000); } } +// Start Single Import function start_import(frm) { - // Helper function to make the final call to start the import const call_start_import_py = (mapping_json = null) => { if (!frm.progress_bar) { frm.progress_bar = $(` @@ -345,7 +423,6 @@ function start_import(frm) { }); }; - // Show duplicate warning dialog then proceed or cancel const check_and_proceed = (mapping_json = null) => { frappe.call({ method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.check_file_duplicates', @@ -357,20 +434,16 @@ function start_import(frm) { freeze_message: __('Checking for duplicates in file...'), callback: function (r) { if (!r.message || r.message.status === 'error') { - // If duplicate check itself fails, still allow continuing - console.warn('[Lightning Import] Duplicate check failed:', r.message && r.message.message); call_start_import_py(mapping_json); return; } const result = r.message; if (!result.has_duplicates) { - // No duplicates — proceed directly call_start_import_py(mapping_json); return; } - // Build an HTML summary table for duplicates let html = `
@@ -398,7 +471,6 @@ function start_import(frm) { `; col.duplicate_values.forEach(entry => { - // Show at most 20 row numbers to keep the dialog compact const rowDisplay = entry.rows.length > 20 ? entry.rows.slice(0, 20).join(', ') + `... (+${entry.rows.length - 20} more)` : entry.rows.join(', '); @@ -431,18 +503,14 @@ function start_import(frm) { } }); d.show(); - // Make the dialog wider for readability d.$wrapper.find('.modal-dialog').css('max-width', '750px'); } }); }; - // Main logic starts here if (frm.doc.field_mapping) { - // If a mapping is already saved, run duplicate check then proceed. check_and_proceed(); } else { - // If no mapping exists, perform auto-mapping and validation first. frappe.call({ method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.auto_map_and_validate', args: { docname: frm.doc.name }, @@ -459,15 +527,12 @@ function start_import(frm) { }; if (data.unmapped_required.length > 0) { - // If required fields are missing, ask the user what to do first. frappe.confirm( __('The following required fields could not be auto-mapped:
{0}.

Rows without these fields will fail to import. Do you want to continue anyway?', [data.unmapped_required.join(', ')]), () => { - // User chose to "Continue Anyway" — still run dup check proceed_with_dup_check(); }, () => { - // User chose to "Cancel and Map Fields" open_field_mapping_dialog(frm); }, __('Missing Required Fields'), @@ -475,7 +540,6 @@ function start_import(frm) { __('Cancel and Map Fields') ); } else { - // All required fields mapped — run dup check before import frappe.show_alert({ message: __('All required fields were auto-mapped. Checking for duplicates...'), indicator: 'green' @@ -487,6 +551,119 @@ function start_import(frm) { } } +// Start Multiple Import +function start_multi_import(frm) { + const call_start_multi_import_py = () => { + setup_progress_tracking(frm); + + frappe.call({ + method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.start_multi_import', + args: { + docname: frm.doc.name + }, + callback: function (r) { + if (r.message && r.message.status === 'success') { + frappe.show_alert({ + message: r.message.message, + indicator: 'green' + }); + } else { + if (frm.progress_bar) { + frm.progress_bar.remove(); + frm.progress_bar = null; + } + frappe.show_alert({ + message: r.message.message || __('Failed to start Multiple Import'), + indicator: 'red' + }); + } + } + }); + }; + + // Check duplicates across all targets + frappe.call({ + method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.check_multi_file_duplicates', + args: { + docname: frm.doc.name + }, + freeze: true, + freeze_message: __('Checking for duplicates across target DocTypes...'), + callback: function (r) { + if (!r.message || r.message.status === 'error') { + call_start_multi_import_py(); + return; + } + + const result = r.message; + if (!result.has_duplicates) { + call_start_multi_import_py(); + return; + } + + // Build duplicate warning layout + let html = ` +
+ + ⚠ Duplicate values detected in the CSV file for one or more targets. + +
+ `; + + result.targets.forEach(target => { + html += ` +
+
+ DocType: ${frappe.utils.escape_html(target.target_doctype)} | + Column: ${frappe.utils.escape_html(target.csv_column)} + (maps to: ${frappe.utils.escape_html(target.field)}) +
+ + + + + + + + `; + target.duplicate_values.forEach(entry => { + const rowDisplay = entry.rows.length > 20 + ? entry.rows.slice(0, 20).join(', ') + `... (+${entry.rows.length - 20} more)` + : entry.rows.join(', '); + html += ` + + + + + + `; + }); + html += `
Duplicate ValueOccurrencesRow Numbers
${frappe.utils.escape_html(String(entry.value))}${entry.count}${rowDisplay}
`; + }); + + html += `
+ You can still continue with the import. Duplicate rows will be processed normally — no rows are skipped automatically. +
`; + + const d = new frappe.ui.Dialog({ + title: __('Duplicate Values Detected'), + fields: [{ fieldtype: 'HTML', fieldname: 'dup_summary', options: html }], + primary_action_label: __('Continue Import'), + primary_action() { + d.hide(); + call_start_multi_import_py(); + }, + secondary_action_label: __('Cancel'), + secondary_action() { + d.hide(); + } + }); + d.show(); + d.$wrapper.find('.modal-dialog').css('max-width', '750px'); + } + }); +} + function export_error_rows(frm) { frappe.call({ method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.export_error_rows', @@ -506,7 +683,7 @@ function export_error_rows(frm) { }); } -// --- Field Mapping Dialog --- +// Single Field Mapping Dialog function open_field_mapping_dialog(frm) { frappe.call({ method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.get_csv_headers_for_upload', @@ -518,114 +695,368 @@ function open_field_mapping_dialog(frm) { } const csvHeaders = csvRes.message.headers; - frappe.call({ - method: 'lightning_import.lightning_import.api.get_fields.get_doctype_fields', - args: { doctype: frm.doc.import_doctype }, - callback: function (dtRes) { - if (!dtRes.message || !dtRes.message.fields) { - frappe.show_alert({ message: __('Failed to fetch DocType fields'), indicator: 'red' }); - return; + + frappe.model.with_doctype(frm.doc.import_doctype, async () => { + const meta = frappe.get_meta(frm.doc.import_doctype); + const requiredFields = meta.fields.filter(f => f.reqd).map(f => f.fieldname); + const fields = meta.fields.filter(f => !['Section Break', 'Column Break', 'Tab Break', 'Fold'].includes(f.fieldtype)).map(f => ({ + fieldname: f.fieldname, + label: f.label || f.fieldname, + reqd: f.reqd + })); + const system_fields = [ + { fieldname: 'name', label: 'ID' }, + { fieldname: 'owner', label: 'Owner' }, + { fieldname: 'creation', label: 'Created On' }, + { fieldname: 'modified', label: 'Last Modified' }, + { fieldname: 'modified_by', label: 'Modified By' } + ]; + const fieldOptions = fields.concat(system_fields); + + let existingMapping = {}; + try { + if (frm.doc.field_mapping) { + existingMapping = JSON.parse(frm.doc.field_mapping); } + } catch (e) { + console.log('Error parsing existing mapping:', e); + } - const fieldOptions = dtRes.message.fields || []; + const auto_mapping_res = await frappe.xcall('lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.auto_map_and_validate', { docname: frm.doc.name }); + const backend_mapping = auto_mapping_res ? auto_mapping_res.mapping : {}; + + const mapping = {}; + csvHeaders.forEach(header => { + if (existingMapping[header]) { + mapping[header] = existingMapping[header]; + } else { + mapping[header] = backend_mapping[header] || ''; + } + }); - frappe.model.with_doctype(frm.doc.import_doctype, async () => { - const meta = frappe.get_meta(frm.doc.import_doctype); - const requiredFields = meta.fields.filter(f => f.reqd).map(f => f.fieldname); + let tableHtml = `
Map columns from ${frappe.utils.escape_html(frm.doc.csv_file.split('/').pop())} to fields in ${frappe.utils.escape_html(frm.doc.import_doctype)}
`; + tableHtml += ``; - let existingMapping = {}; - try { - if (frm.doc.field_mapping) { - existingMapping = JSON.parse(frm.doc.field_mapping); - } - } catch (e) { - console.log('Error parsing existing mapping:', e); - } + csvHeaders.forEach(header => { + tableHtml += ``; + tableHtml += ``; + }); - // Fetch auto-mapping from backend (including aliases) - const auto_mapping_res = await frappe.xcall('lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.auto_map_and_validate', { docname: frm.doc.name }); - const backend_mapping = auto_mapping_res ? auto_mapping_res.mapping : {}; + tableHtml += `
CSV ColumnDocType Field
`; - const mapping = {}; - csvHeaders.forEach(header => { - if (existingMapping[header]) { - mapping[header] = existingMapping[header]; - } else { - mapping[header] = backend_mapping[header] || ''; - } + const d = new frappe.ui.Dialog({ + title: __('Map Columns'), + fields: [ + { fieldtype: 'HTML', fieldname: 'mapping_table', options: tableHtml } + ], + primary_action_label: __('Save Mapping'), + primary_action() { + const values = {}; + d.$wrapper.find('.field-mapping-select').each(function () { + const header = $(this).data('header'); + const value = $(this).val(); + values[header] = value; }); - let tableHtml = `
Map columns from ${frappe.utils.escape_html(frm.doc.csv_file.split('/').pop())} to fields in ${frappe.utils.escape_html(frm.doc.import_doctype)}
`; - tableHtml += ``; - - csvHeaders.forEach(header => { - tableHtml += ``; - tableHtml += ``; - }); + } - tableHtml += `
CSV ColumnDocType Field
`; - - const d = new frappe.ui.Dialog({ - title: __('Map Columns'), - fields: [ - { fieldtype: 'HTML', fieldname: 'mapping_table', options: tableHtml } - ], - primary_action_label: __('Save Mapping'), - primary_action() { - const values = {}; - d.$wrapper.find('.field-mapping-select').each(function () { - const header = $(this).data('header'); - const value = $(this).val(); - - values[header] = value; - }); - - const mappedFields = Object.values(values).filter(Boolean); - const unmappedRequired = requiredFields.filter(f => !mappedFields.includes(f)); - if (unmappedRequired.length) { - frappe.msgprint(__('Please map all required fields: {0}', [unmappedRequired.join(', ')])); - return; - } + const duplicates = mappedFields.filter((item, idx) => mappedFields.indexOf(item) !== idx); + if (duplicates.length) { + frappe.msgprint(__('Duplicate mapping for: {0}', [duplicates.join(', ')])); + return; + } - const duplicates = mappedFields.filter((item, idx) => mappedFields.indexOf(item) !== idx); - if (duplicates.length) { - frappe.msgprint(__('Duplicate mapping for: {0}', [duplicates.join(', ')])); - return; + frappe.call({ + method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.save_field_mapping', + args: { + docname: frm.doc.name, + mapping: JSON.stringify(values) + }, + callback: function (res) { + if (res.message && res.message.status === 'success') { + d.hide(); + frm.reload_doc(); + frappe.show_alert({ message: __('Field mapping saved.'), indicator: 'green' }); + } else { + frappe.show_alert({ message: res.message?.message || __('Failed to save mapping'), indicator: 'red' }); } - - frappe.call({ - method: 'lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.save_field_mapping', - args: { - docname: frm.doc.name, - mapping: JSON.stringify(values) - }, - callback: function (res) { - if (res.message && res.message.status === 'success') { - d.hide(); - frm.reload_doc(); - frappe.show_alert({ message: __('Field mapping saved.'), indicator: 'green' }); - } else { - frappe.show_alert({ message: res.message?.message || __('Failed to save mapping'), indicator: 'red' }); - } - } - }); } }); + } + }); + + d.show(); + d.$wrapper.find('.modal-dialog').css('max-width', '750px'); + }); + } + }); +} - d.show(); +// Combined Multi-Field Mapping Dialog +async function open_combined_multi_mapping_dialog(frm) { + const enabled_targets = frm.doc.multi_import_targets.filter(t => t.enabled && t.target_doctype); + if (!enabled_targets.length) { + frappe.msgprint(__('Please add, enable, and select at least one Target DocType first.')); + return; + } + if (!frm.doc.csv_file) { + frappe.msgprint(__('Please attach a CSV file first.')); + return; + } + + let csvRes; + try { + // 1. Fetch CSV headers safely + csvRes = await frappe.xcall('lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.get_csv_headers_for_upload', { + file_url: frm.doc.csv_file + }); + } catch (err) { + console.error('Error fetching CSV headers:', err); + frappe.msgprint(__('Could not retrieve CSV headers. Please ensure the attached file is valid and the document is saved.')); + return; + } + + if (!csvRes || csvRes.status !== 'success') { + frappe.show_alert({ message: csvRes ? csvRes.message : __('Failed to fetch CSV headers'), indicator: 'red' }); + return; + } + + const csvHeaders = csvRes.headers; + + // 2. Fetch metadata for all target DocTypes and auto-mappings safely + const doctypes_metadata = {}; + for (const target of enabled_targets) { + try { + // Load metadata + await new Promise((resolve, reject) => { + frappe.model.with_doctype(target.target_doctype, resolve, reject); + }); + + const meta = frappe.get_meta(target.target_doctype); + if (!meta) { + throw new Error(`Meta not found for ${target.target_doctype}`); + } + + const requiredFields = meta.fields.filter(f => f.reqd).map(f => f.fieldname); + const fields = meta.fields.filter(f => !['Section Break', 'Column Break', 'Tab Break', 'Fold'].includes(f.fieldtype)).map(f => ({ + fieldname: f.fieldname, + label: f.label || f.fieldname, + reqd: f.reqd + })); + const system_fields = [ + { fieldname: 'name', label: 'ID' }, + { fieldname: 'owner', label: 'Owner' }, + { fieldname: 'creation', label: 'Created On' }, + { fieldname: 'modified', label: 'Last Modified' }, + { fieldname: 'modified_by', label: 'Modified By' } + ]; + const fieldOptions = fields.concat(system_fields); + + // Fetch backend auto-mapping safely + let auto_mapping_res; + try { + auto_mapping_res = await frappe.xcall('lightning_import.lightning_import.doctype.lightning_upload.lightning_upload.get_auto_mapping_for_doctype', { + docname: frm.doc.name, + doctype: target.target_doctype + }); + } catch (err) { + console.error(`Error getting auto mapping for ${target.target_doctype}:`, err); + auto_mapping_res = { mapping: {} }; + } + + const backend_mapping = auto_mapping_res ? auto_mapping_res.mapping : {}; + + let existingMapping = {}; + try { + if (target.field_mapping) { + existingMapping = JSON.parse(target.field_mapping); + } + } catch (e) { + console.log('Error parsing existing mapping:', e); + } + + doctypes_metadata[target.target_doctype] = { + fields: fieldOptions, + required: requiredFields, + backend_mapping: backend_mapping, + existingMapping: existingMapping, + targetName: target.name + }; + } catch (targetErr) { + console.error(`Error preparing target DocType ${target.target_doctype}:`, targetErr); + frappe.msgprint(__('Failed to prepare metadata for Target DocType "{0}". Please check its configuration.', [target.target_doctype])); + return; + } + } + + // 3. Build combined HTML table with C * T rows to allow multi-doctype mapping + let tableHtml = `
Combined Map Columns from ${frappe.utils.escape_html(frm.doc.csv_file.split('/').pop())} to targets
`; + tableHtml += `
`; + tableHtml += ``; + + csvHeaders.forEach(header => { + Object.keys(doctypes_metadata).forEach(doctype => { + const meta = doctypes_metadata[doctype]; + + // Determine initial Field for this specific target + let initialField = ''; + if (meta.existingMapping[header]) { + initialField = meta.existingMapping[header]; + } else if (meta.backend_mapping[header]) { + initialField = meta.backend_mapping[header]; + } + + + + tableHtml += ``; + tableHtml += ``; + + // DocType Dropdown Selector + tableHtml += ``; + + // Target Field Dropdown Selector + tableHtml += ``; + }); + }); + + tableHtml += `
CSV ColumnDocTypeDocType Field
`; + + const d = new frappe.ui.Dialog({ + title: __('Combined Field Mapping'), + fields: [ + { fieldtype: 'HTML', fieldname: 'mapping_table', options: tableHtml } + ], + primary_action_label: __('Save Mapping'), + primary_action() { + // Collect mapped values for each target row + const targetMappings = {}; + enabled_targets.forEach(t => { + targetMappings[t.name] = {}; + }); + + // Gather inputs from table + let hasHeaderDoctypeDuplicate = false; + d.$wrapper.find('.mapping-dialog-row').each(function () { + const header = $(this).data('header'); + const docType = $(this).find('.combined-doctype-select').val(); + const field = $(this).find('.combined-field-select').val(); + + if (docType && field) { + const meta = doctypes_metadata[docType]; + if (meta && targetMappings[meta.targetName]) { + if (targetMappings[meta.targetName][header]) { + frappe.msgprint(__('Invalid Mapping: CSV Header "{0}" is mapped to multiple fields in DocType "{1}".', [header, docType])); + hasHeaderDoctypeDuplicate = true; + return false; // break jquery each loop + } + targetMappings[meta.targetName][header] = field; + } + } + }); + + if (hasHeaderDoctypeDuplicate) return; + + // Save values back to the child rows and check duplicates inside each target independently + let hasDuplicates = false; + for (const doctype of Object.keys(doctypes_metadata)) { + const meta = doctypes_metadata[doctype]; + const targetName = meta.targetName; + const mapping = targetMappings[targetName]; + const mappedFields = Object.values(mapping).filter(Boolean); + + // Duplicate check within a single doctype mappings + const duplicates = mappedFields.filter((item, idx) => mappedFields.indexOf(item) !== idx); + if (duplicates.length) { + frappe.msgprint(__('Duplicate mapping for {0} fields: {1}', [doctype, duplicates.join(', ')])); + hasDuplicates = true; + break; + } + + // Warn for missing required fields + const unmappedRequired = meta.required.filter(f => !mappedFields.includes(f)); + if (unmappedRequired.length) { + frappe.show_alert({ + message: __('Note: {0} is missing required fields: {1}', [doctype, unmappedRequired.join(', ')]), + indicator: 'orange' }); } + } + + if (hasDuplicates) return; + + // Commit mappings to the child rows in the form doc + enabled_targets.forEach(t => { + const mappingString = JSON.stringify(targetMappings[t.name]); + frappe.model.set_value(t.doctype, t.name, 'field_mapping', mappingString); + }); + + // Save document to database + frm.save().then(() => { + d.hide(); + frm.reload_doc(); + frappe.show_alert({ message: __('All field mappings successfully saved in database.'), indicator: 'green' }); + }); + } + }); + + // Dynamic field list switching on DocType change + d.$wrapper.on('change', '.combined-doctype-select', function() { + const header = $(this).data('header'); + const selectedDocType = $(this).val(); + const row = $(this).closest('tr'); + const fieldSelect = row.find('.combined-field-select'); + + // Clear existing options + fieldSelect.empty(); + fieldSelect.append($('