From 6282fb2b27205a0933ed8dcceb43571dbd971e14 Mon Sep 17 00:00:00 2001 From: nk7up Date: Tue, 5 Aug 2025 13:05:54 -0400 Subject: [PATCH 1/6] verifying stream records --- tests/test_mongodb_incremental.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_mongodb_incremental.py b/tests/test_mongodb_incremental.py index 2a6f2ea..f048d8e 100644 --- a/tests/test_mongodb_incremental.py +++ b/tests/test_mongodb_incremental.py @@ -476,4 +476,6 @@ def test_run(self): self.expected_sync_streams(), self.expected_pks()) + print("Record count by stream: ", record_count_by_stream) + print("Expected last sync row counts: ", self.expected_last_sync_row_counts()) self.assertDictEqual(record_count_by_stream, self.expected_last_sync_row_counts()) From 236f085c03786e530b52bba4c6108d5bca9b9f2f Mon Sep 17 00:00:00 2001 From: nk7up Date: Thu, 7 Aug 2025 11:42:14 -0400 Subject: [PATCH 2/6] removing print statements --- tests/test_mongodb_incremental.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/test_mongodb_incremental.py b/tests/test_mongodb_incremental.py index f048d8e..71e9ab1 100644 --- a/tests/test_mongodb_incremental.py +++ b/tests/test_mongodb_incremental.py @@ -475,7 +475,4 @@ def test_run(self): conn_id, self.expected_sync_streams(), self.expected_pks()) - - print("Record count by stream: ", record_count_by_stream) - print("Expected last sync row counts: ", self.expected_last_sync_row_counts()) self.assertDictEqual(record_count_by_stream, self.expected_last_sync_row_counts()) From 6e73d8501199cd38c3cb836b04b9a96e1b95e2eb Mon Sep 17 00:00:00 2001 From: nk7up Date: Mon, 11 Aug 2025 10:51:29 -0400 Subject: [PATCH 3/6] adding markers and sync job to fix the flaky sync jobs for assertions --- tests/test_mongodb_oplog_bookmarks.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/tests/test_mongodb_oplog_bookmarks.py b/tests/test_mongodb_oplog_bookmarks.py index 12a3eed..4ddc173 100644 --- a/tests/test_mongodb_oplog_bookmarks.py +++ b/tests/test_mongodb_oplog_bookmarks.py @@ -193,13 +193,22 @@ def test_run(self): client["simple_db"]["simple_coll_2"].insert_one({"int_field": 101, "string_field": random_string_generator()}) changed_ids.add(client['simple_db']['simple_coll_2'].find({'int_field': 101})[0]['_id']) + #Running a Sync before the marker 102 + sync_job_name = runner.run_sync_mode(self, conn_id) + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) # ----------------------------------- # ----------- Subsequent Oplog Sync --------- # ----------------------------------- + # Inserting Marker 102 + with get_test_connection() as client: + marker_point = {"int_field": 102, "string_field": "marker_" + random_string_generator()} + result = client["simple_db"]["simple_coll_1"].insert_one(marker_point) + oplog_entry_point = client.local.oplog.rs.find_one({"o._id": result.inserted_id}) + marker_ts = oplog_entry_point["ts"] # Run sync sync_job_name = runner.run_sync_mode(self, conn_id) - exit_status = menagerie.get_exit_status(conn_id, sync_job_name) menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) @@ -222,12 +231,12 @@ def test_run(self): final_state = menagerie.get_state(conn_id) - with get_test_connection() as client: - row = client.local.oplog.rs.find_one(sort=[('$natural', pymongo.DESCENDING)]) - latest_oplog_ts = row.get('ts') + # with get_test_connection() as client: + # row = client.local.oplog.rs.find_one(sort=[('$natural', pymongo.DESCENDING)]) + # latest_oplog_ts = row.get('ts') self.assertEqual( - (latest_oplog_ts.time, latest_oplog_ts.inc), + (marker_ts.time, marker_ts.inc), (final_state['bookmarks']['simple_db-simple_coll_1']['oplog_ts_time'], final_state['bookmarks']['simple_db-simple_coll_1']['oplog_ts_inc']) ) From 73aeba849cc429a251b4a54e3390a7bd1b4eed49 Mon Sep 17 00:00:00 2001 From: nk7up Date: Mon, 11 Aug 2025 11:52:03 -0400 Subject: [PATCH 4/6] setting the coll_1 value to 52 records for expected value --- tests/test_mongodb_incremental.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_mongodb_incremental.py b/tests/test_mongodb_incremental.py index 71e9ab1..adfb1f6 100644 --- a/tests/test_mongodb_incremental.py +++ b/tests/test_mongodb_incremental.py @@ -97,7 +97,7 @@ def expected_row_counts(self): def expected_last_sync_row_counts(self): return { - 'simple_coll_1': 53, # 50 documents for initial insert, 2 documents inserted at the start of sync2 , 1 document which was updated at the end of sync 2. This is the last update before sync 3 so as part of historical sync this data is captured as well as in the log for log based replication + 'simple_coll_1': 52, # 50 documents for initial insert, 2 documents inserted at the start of sync2 , 1 document which was updated at the end of sync 2. This is the last update before sync 3 so as part of historical sync this data is captured as well as in the log for log based replication 'simple_coll_2': 102, **{"simple_coll_{}".format(k): 1 for k in self.key_names()} } From 11735e36dd4f32496fd004a1227dace81dceefd6 Mon Sep 17 00:00:00 2001 From: nk7up Date: Mon, 11 Aug 2025 12:15:30 -0400 Subject: [PATCH 5/6] reverting the value change --- tests/test_mongodb_incremental.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_mongodb_incremental.py b/tests/test_mongodb_incremental.py index adfb1f6..71e9ab1 100644 --- a/tests/test_mongodb_incremental.py +++ b/tests/test_mongodb_incremental.py @@ -97,7 +97,7 @@ def expected_row_counts(self): def expected_last_sync_row_counts(self): return { - 'simple_coll_1': 52, # 50 documents for initial insert, 2 documents inserted at the start of sync2 , 1 document which was updated at the end of sync 2. This is the last update before sync 3 so as part of historical sync this data is captured as well as in the log for log based replication + 'simple_coll_1': 53, # 50 documents for initial insert, 2 documents inserted at the start of sync2 , 1 document which was updated at the end of sync 2. This is the last update before sync 3 so as part of historical sync this data is captured as well as in the log for log based replication 'simple_coll_2': 102, **{"simple_coll_{}".format(k): 1 for k in self.key_names()} } From a5792021c7babf2ddd0011f21d800325f715da3a Mon Sep 17 00:00:00 2001 From: nk7up Date: Mon, 11 Aug 2025 13:10:09 -0400 Subject: [PATCH 6/6] handling assertions to accept record writes after marker point --- tests/test_mongodb_oplog_bookmarks.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/test_mongodb_oplog_bookmarks.py b/tests/test_mongodb_oplog_bookmarks.py index 4ddc173..b3d4bc2 100644 --- a/tests/test_mongodb_oplog_bookmarks.py +++ b/tests/test_mongodb_oplog_bookmarks.py @@ -235,8 +235,9 @@ def test_run(self): # row = client.local.oplog.rs.find_one(sort=[('$natural', pymongo.DESCENDING)]) # latest_oplog_ts = row.get('ts') - self.assertEqual( - (marker_ts.time, marker_ts.inc), - (final_state['bookmarks']['simple_db-simple_coll_1']['oplog_ts_time'], - final_state['bookmarks']['simple_db-simple_coll_1']['oplog_ts_inc']) + tap_oplog_bookmark = ( + final_state['bookmarks']['simple_db-simple_coll_1']['oplog_ts_time'], + final_state['bookmarks']['simple_db-simple_coll_1']['oplog_ts_inc'] ) + marker_tuples = (marker_ts.time, marker_ts.inc) + self.assertGreaterEqual(tap_oplog_bookmark, marker_tuples)