From 002c437cc4b1711fd2ae862827e5652677b93574 Mon Sep 17 00:00:00 2001 From: WeihuaSun Date: Tue, 8 Oct 2024 20:21:34 +0800 Subject: [PATCH 01/12] verify cycle for weak isolation --- src/dbtest/src/mda_detect.py | 143 +++++++++++++++++++++++++++++------ 1 file changed, 121 insertions(+), 22 deletions(-) diff --git a/src/dbtest/src/mda_detect.py b/src/dbtest/src/mda_detect.py index 982361ea..27528d8f 100644 --- a/src/dbtest/src/mda_detect.py +++ b/src/dbtest/src/mda_detect.py @@ -10,10 +10,11 @@ # */ -import Queue +import queue import os import time +MAX_TS = 99999999999999999999 class Edge: def __init__(self, type, out): @@ -32,7 +33,9 @@ def __init__(self, op_type, txn_num, op_time, value): class Txn: def __init__(self): self.begin_ts = -1 - self.end_ts = 99999999999999999999 + self.end_ts = MAX_TS + self.snapshot_ts = MAX_TS + """ Find the total variable number. @@ -115,23 +118,37 @@ def set_finish_time(op_time, data_op_list, query, txn, version_list): data_value += tmp tmp, tmp1 = "", "" data_value = int(data_value) - for t in txn: + for txn_num,t in enumerate(txn): if t.begin_ts == op_time: t.begin_ts = data_value if t.end_ts == op_time: t.end_ts = data_value + # update 'visible_ts' for versions installed by the committed transaction + if isolation_level != "ru": + for i, data_versions in enumerate(version_list): + for j, version in enumerate(data_versions): + if version[1] == txn_num: + version_list[i][j][2] = data_value for i, list1 in enumerate(data_op_list): for op in list1: if op.op_time == op_time: op.op_time = data_value - if op.op_type == "W": - version_list[i].append(op.value) + if isolation_level == "ru": + visible_ts = op.op_time + else: + visible_ts = MAX_TS + #update 'snapshot_ts' + if op.op_type == "R" or op.op_type == "P": + if snapshot and isolation_level in {"ser","rr"}: + txn[op.txn_num].snapshot_ts = min(op.op_time,txn[op.txn_num].snapshot_ts) + elif op.op_type == "W": + version_list[i].append([op.value,op.txn_num,visible_ts]) op.value = len(version_list[i]) - 1 elif op.op_type == "D": - version_list[i].append(-1) + version_list[i].append([-1,op.txn_num,visible_ts]) op.value = len(version_list[i]) - 1 elif op.op_type == "I": - version_list[i].append(op.value) + version_list[i].append([op.value,op.txn_num,visible_ts]) op.value = len(version_list[i]) - 1 @@ -257,7 +274,7 @@ def insert_edge(data1, data2, indegree, edge, txn): def init_record(query, version_list): key = find_data(query, "(") value = find_data(query, ",") - version_list[key].append(value) + version_list[key].append((value,-1,0)) """ @@ -285,14 +302,25 @@ def readVersion_record(query, op_time, data_op_list, version_list): for op in list1: if op.op_time == op_time: value = op.value - if len(version_list[value]) == 0: + snapshot_ts = txn[op.txn_num].snapshot_ts + versions = version_list[value] + if len(versions) == 0: op.value = -1 - else: - if -1 not in version_list[value]: - error_message = "Value exists, but did not successully read" - return error_message - pos = version_list[value].index(-1) - op.value = pos + else: + deleted = False + for i, version in enumerate(versions): + version_val = version[0] + install_txn = version[1] + visible_ts = version[2] + if (visible_ts < snapshot_ts and visible_ts < MAX_TS) or install_txn == op.txn_num : + if version_val== -1 : + deleted = True + op.value = i + else: + error_message = "Value exists, but did not successully read" + return error_message + if not deleted: + op.value = -1 else: for s in data: key = find_data(s, "(") @@ -301,14 +329,27 @@ def readVersion_record(query, op_time, data_op_list, version_list): for op in list1: if key == i and op.op_time == op_time: value1 = op.value - if len(version_list[value1]) == 0: + versions = version_list[value1] + snapshot_ts = txn[op.txn_num].snapshot_ts + if len(versions) == 0: op.value = -1 else: - if version_list[value1].count(value) == 0: + find = False + for i, version in enumerate(reversed(versions)): + version_val = version[0] + install_txn = version[1] + visible_ts = version[2] + if (visible_ts < snapshot_ts and visible_ts < MAX_TS) or install_txn == op.txn_num: + if version_val== value: + find = True + op.value = i + break + else: + error_message = "Read version that is not the latest version" + return error_message + if not find: error_message = "Read version that does not exist" - return error_message - pos = version_list[value1].index(value) - op.value = pos + return error_message return error_message # for i, list1 in enumerate(data_op_list): @@ -563,7 +604,7 @@ def remove_unfinished_operation(data_op_list): """ # toposort to determine whether there is a cycle def check_cycle(edge, indegree, total): - q = Queue.Queue() + q = queue.Queue() for i, degree in enumerate(indegree): if degree == 0: q.put(i) ans = [] @@ -612,10 +653,15 @@ def dfs(result_folder, ts_now, now, type): else: path.append(v.out) edge_type.append(v.type) + accept = verify_cycle(edge_type,isolation_level,snapshot) with open(result_folder + "/check_result" + ts_now + ".txt", "a+") as f: for i in range(0, len(path)): f.write(str(path[i])) if i != len(path) - 1: f.write("->" + edge_type[i+1] + "->") + if accept: + f.write(" : Accept") + else: + f.write(" : Reject") f.write("\n\n") path.pop() edge_type.pop() @@ -693,9 +739,62 @@ def print_error(result_folder, ts_now, error_message): f.write("\n\n") -run_result_folder = "pg/serializable" +""" +Determines the validity of a cycle based on the edge types and the isolation level. + +Args: +edge_type (list): List of edge types representing dependencies between operations. +isolation_level (str): The isolation level of the database ('ru', 'rc', 'rr', 'ser'). +snapshot (bool): Optional parameter to indicate if snapshot isolation is used. Default is False. + +Returns: +bool: True if the cycle is valid, False otherwise. +""" +def verify_cycle(edge_type, isolation_level, snapshot=False): + write_set = {"I", "D", "W"} # Write operations: Insert, Delete, Write + generalized_edge_type = [] # To store generalized edge types (RW, WR, WW, PW, WP) + + # Generalize edge types based on whether they are reads (R) or writes (W) + for t in edge_type: + if t != 'null': + g_type = "" + g_type += "W" if t[0] in write_set else t[0] + g_type += "W" if t[-1] in write_set else t[-1] + generalized_edge_type.append(g_type) + + # Check validity based on isolation level + if isolation_level == "ru": + # Under 'ru' (read uncommitted), cycles consisting only of write dependencies (G0) are not allowed + return generalized_edge_type.count("WW") != len(generalized_edge_type) + + elif isolation_level == "rc": + # Under 'rc' (read committed), cycles consisting only of write and read dependencies (G1c) are not allowed + return generalized_edge_type.count("WW") + generalized_edge_type.count("WR") + generalized_edge_type.count("WP") != len(generalized_edge_type) + + elif isolation_level == "rr": + if snapshot: + # Under 'rr' with snapshot, consecutive "RW" or "PW" edges indicate a cycle + anti_set = {"RW", "PW"} + for i in range(len(generalized_edge_type) - 1): + if generalized_edge_type[i] in anti_set and generalized_edge_type[i + 1] in anti_set: + return True # Cycle is valid + return False # No valid cycle found + else: + # Without snapshot, cycles consisting only of write, read and item-anti dependencies (G1c+G2-item) are not allowed + return generalized_edge_type.count("WW") + generalized_edge_type.count("WR") + generalized_edge_type.count("WP") + generalized_edge_type.count("RW") != len(generalized_edge_type) + + elif isolation_level == "ser": + # Under 'ser' (serializable), no cycles are allowed + return False + + return True + + +run_result_folder = "pg/read-committed" #read-committed/serializable/repeatable-read result_folder = "check_result/" + run_result_folder do_test_list = "do_test_list.txt" +isolation_level = "rc" #[ru,rc,rr,ser] +snapshot = True #ts_now = "_2param_3txn_insert" ts_now = time.strftime("%Y%m%d_%H%M%S", time.localtime()) if not os.path.exists(result_folder): From 0f119611dfff0514708c3995c17aae3354b7b123 Mon Sep 17 00:00:00 2001 From: WeihuaSun Date: Tue, 8 Oct 2024 20:23:44 +0800 Subject: [PATCH 02/12] verify cycle for weak isolation --- src/dbtest/src/mda_detect.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dbtest/src/mda_detect.py b/src/dbtest/src/mda_detect.py index 27528d8f..90094446 100644 --- a/src/dbtest/src/mda_detect.py +++ b/src/dbtest/src/mda_detect.py @@ -790,9 +790,10 @@ def verify_cycle(edge_type, isolation_level, snapshot=False): return True -run_result_folder = "pg/read-committed" #read-committed/serializable/repeatable-read +run_result_folder = "pg/serializable" result_folder = "check_result/" + run_result_folder do_test_list = "do_test_list.txt" + isolation_level = "rc" #[ru,rc,rr,ser] snapshot = True #ts_now = "_2param_3txn_insert" From fe7198d4ff1b7b49fc572fb9a476087ce33043fe Mon Sep 17 00:00:00 2001 From: WeihuaSun Date: Wed, 9 Oct 2024 13:23:58 +0800 Subject: [PATCH 03/12] update some comment --- src/dbtest/src/mda_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbtest/src/mda_detect.py b/src/dbtest/src/mda_detect.py index 90094446..2af12f37 100644 --- a/src/dbtest/src/mda_detect.py +++ b/src/dbtest/src/mda_detect.py @@ -773,7 +773,7 @@ def verify_cycle(edge_type, isolation_level, snapshot=False): elif isolation_level == "rr": if snapshot: - # Under 'rr' with snapshot, consecutive "RW" or "PW" edges indicate a cycle + # Under 'rr' with snapshot, two consecutive anti-dependency edges are allowed. anti_set = {"RW", "PW"} for i in range(len(generalized_edge_type) - 1): if generalized_edge_type[i] in anti_set and generalized_edge_type[i + 1] in anti_set: From 4cea01d44b0245463ed3287618735a378a4af2dc Mon Sep 17 00:00:00 2001 From: WeihuaSun Date: Wed, 9 Oct 2024 21:37:58 +0800 Subject: [PATCH 04/12] fix-version-order --- src/dbtest/src/mda_detect.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/dbtest/src/mda_detect.py b/src/dbtest/src/mda_detect.py index 2af12f37..6732c979 100644 --- a/src/dbtest/src/mda_detect.py +++ b/src/dbtest/src/mda_detect.py @@ -335,6 +335,7 @@ def readVersion_record(query, op_time, data_op_list, version_list): op.value = -1 else: find = False + version_size = len(versions) for i, version in enumerate(reversed(versions)): version_val = version[0] install_txn = version[1] @@ -342,7 +343,7 @@ def readVersion_record(query, op_time, data_op_list, version_list): if (visible_ts < snapshot_ts and visible_ts < MAX_TS) or install_txn == op.txn_num: if version_val== value: find = True - op.value = i + op.value = version_size-i-1 break else: error_message = "Read version that is not the latest version" @@ -790,12 +791,12 @@ def verify_cycle(edge_type, isolation_level, snapshot=False): return True -run_result_folder = "pg/serializable" +run_result_folder = "mariadb/read-committed"#read-committed/repeatable-read result_folder = "check_result/" + run_result_folder do_test_list = "do_test_list.txt" isolation_level = "rc" #[ru,rc,rr,ser] -snapshot = True +snapshot = False #ts_now = "_2param_3txn_insert" ts_now = time.strftime("%Y%m%d_%H%M%S", time.localtime()) if not os.path.exists(result_folder): From e741dbd1e22e51f37a40232ea4c91f2362be97fd Mon Sep 17 00:00:00 2001 From: WeihuaSun Date: Wed, 9 Oct 2024 22:25:02 +0800 Subject: [PATCH 05/12] support for mariadb/mysql/pg --- src/dbtest/src/mda_detect.py | 43 ++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/src/dbtest/src/mda_detect.py b/src/dbtest/src/mda_detect.py index 6732c979..1943120d 100644 --- a/src/dbtest/src/mda_detect.py +++ b/src/dbtest/src/mda_detect.py @@ -13,6 +13,7 @@ import queue import os import time +import sys MAX_TS = 99999999999999999999 @@ -139,7 +140,7 @@ def set_finish_time(op_time, data_op_list, query, txn, version_list): visible_ts = MAX_TS #update 'snapshot_ts' if op.op_type == "R" or op.op_type == "P": - if snapshot and isolation_level in {"ser","rr"}: + if snapshot_read: txn[op.txn_num].snapshot_ts = min(op.op_time,txn[op.txn_num].snapshot_ts) elif op.op_type == "W": version_list[i].append([op.value,op.txn_num,visible_ts]) @@ -654,7 +655,7 @@ def dfs(result_folder, ts_now, now, type): else: path.append(v.out) edge_type.append(v.type) - accept = verify_cycle(edge_type,isolation_level,snapshot) + accept = verify_cycle(edge_type,isolation_level) with open(result_folder + "/check_result" + ts_now + ".txt", "a+") as f: for i in range(0, len(path)): f.write(str(path[i])) @@ -746,12 +747,11 @@ def print_error(result_folder, ts_now, error_message): Args: edge_type (list): List of edge types representing dependencies between operations. isolation_level (str): The isolation level of the database ('ru', 'rc', 'rr', 'ser'). -snapshot (bool): Optional parameter to indicate if snapshot isolation is used. Default is False. Returns: bool: True if the cycle is valid, False otherwise. """ -def verify_cycle(edge_type, isolation_level, snapshot=False): +def verify_cycle(edge_type, isolation_level): write_set = {"I", "D", "W"} # Write operations: Insert, Delete, Write generalized_edge_type = [] # To store generalized edge types (RW, WR, WW, PW, WP) @@ -771,15 +771,17 @@ def verify_cycle(edge_type, isolation_level, snapshot=False): elif isolation_level == "rc": # Under 'rc' (read committed), cycles consisting only of write and read dependencies (G1c) are not allowed return generalized_edge_type.count("WW") + generalized_edge_type.count("WR") + generalized_edge_type.count("WP") != len(generalized_edge_type) - + elif isolation_level == "rr": - if snapshot: - # Under 'rr' with snapshot, two consecutive anti-dependency edges are allowed. + if db_type in {"pg"}: # snapshot isolation + # Under snapshot isolation, two consecutive anti-dependency edges are allowed. anti_set = {"RW", "PW"} for i in range(len(generalized_edge_type) - 1): if generalized_edge_type[i] in anti_set and generalized_edge_type[i + 1] in anti_set: return True # Cycle is valid return False # No valid cycle found + elif db_type in {"mariadb","mysql"}: # MySQL's repeatable read is below the PL-2.99 level defined by Adya. Use PL-1 level instead. + return generalized_edge_type.count("WW") + generalized_edge_type.count("WR") + generalized_edge_type.count("WP") != len(generalized_edge_type) else: # Without snapshot, cycles consisting only of write, read and item-anti dependencies (G1c+G2-item) are not allowed return generalized_edge_type.count("WW") + generalized_edge_type.count("WR") + generalized_edge_type.count("WP") + generalized_edge_type.count("RW") != len(generalized_edge_type) @@ -789,14 +791,31 @@ def verify_cycle(edge_type, isolation_level, snapshot=False): return False return True - - -run_result_folder = "mariadb/read-committed"#read-committed/repeatable-read + + +db_type = sys.argv[1] #[mariadb,pg...] +isolation_level = sys.argv[2] #[ru,rc,rr,ser] + +isolations = { + "ru":"read-uncommitted", + "rc":"read-committed", + "rr":"repeatable-read", + "ser":"serializable" +} + +if db_type in {"pg"} and isolation_level in {"rr","ser"}: + snapshot_read = True +elif db_type in {"mysql","mariadb"} and isolation_level in {"rr"}: + snapshot_read = True +else: + snapshot_read = False + +run_result_folder = f"{db_type}/{isolations[isolation_level]}" result_folder = "check_result/" + run_result_folder do_test_list = "do_test_list.txt" -isolation_level = "rc" #[ru,rc,rr,ser] -snapshot = False + + #ts_now = "_2param_3txn_insert" ts_now = time.strftime("%Y%m%d_%H%M%S", time.localtime()) if not os.path.exists(result_folder): From 03e833be40b126a02ca2db06147202b44b32675f Mon Sep 17 00:00:00 2001 From: WeihuaSun Date: Wed, 9 Oct 2024 23:21:46 +0800 Subject: [PATCH 06/12] fix snapshot_ts for pg --- src/dbtest/src/mda_detect.py | 41 ++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/src/dbtest/src/mda_detect.py b/src/dbtest/src/mda_detect.py index 1943120d..0f01871d 100644 --- a/src/dbtest/src/mda_detect.py +++ b/src/dbtest/src/mda_detect.py @@ -122,6 +122,8 @@ def set_finish_time(op_time, data_op_list, query, txn, version_list): for txn_num,t in enumerate(txn): if t.begin_ts == op_time: t.begin_ts = data_value + if snapshot_read and db_type in {"pg"}: + txn[txn_num].snapshot_ts = data_value if t.end_ts == op_time: t.end_ts = data_value # update 'visible_ts' for versions installed by the committed transaction @@ -140,7 +142,7 @@ def set_finish_time(op_time, data_op_list, query, txn, version_list): visible_ts = MAX_TS #update 'snapshot_ts' if op.op_type == "R" or op.op_type == "P": - if snapshot_read: + if snapshot_read and db_type in {"mysql","mariadb"}: txn[op.txn_num].snapshot_ts = min(op.op_time,txn[op.txn_num].snapshot_ts) elif op.op_type == "W": version_list[i].append([op.value,op.txn_num,visible_ts]) @@ -336,22 +338,24 @@ def readVersion_record(query, op_time, data_op_list, version_list): op.value = -1 else: find = False + latest = 1 version_size = len(versions) for i, version in enumerate(reversed(versions)): version_val = version[0] install_txn = version[1] visible_ts = version[2] if (visible_ts < snapshot_ts and visible_ts < MAX_TS) or install_txn == op.txn_num: + latest -= 1 if version_val== value: find = True op.value = version_size-i-1 break - else: - error_message = "Read version that is not the latest version" - return error_message if not find: error_message = "Read version that does not exist" - return error_message + return error_message + if latest < 0: + error_message = "Read version that is not the latest version" + return error_message return error_message # for i, list1 in enumerate(data_op_list): @@ -793,8 +797,11 @@ def verify_cycle(edge_type, isolation_level): return True -db_type = sys.argv[1] #[mariadb,pg...] -isolation_level = sys.argv[2] #[ru,rc,rr,ser] +# db_type = sys.argv[1] #[mariadb,pg...] +# isolation_level = sys.argv[2] #[ru,rc,rr,ser] + +db_type = "pg" +isolation_level = "rr" isolations = { "ru":"read-uncommitted", @@ -803,17 +810,23 @@ def verify_cycle(edge_type, isolation_level): "ser":"serializable" } -if db_type in {"pg"} and isolation_level in {"rr","ser"}: - snapshot_read = True -elif db_type in {"mysql","mariadb"} and isolation_level in {"rr"}: - snapshot_read = True -else: - snapshot_read = False - run_result_folder = f"{db_type}/{isolations[isolation_level]}" result_folder = "check_result/" + run_result_folder do_test_list = "do_test_list.txt" +snapshot_read = False +if db_type in {"pg"}: + if isolation_level == "ru": + isolation_level = "rc" + if isolation_level in {"rr","ser"}: + snapshot_read = True +elif db_type in {"mysql","mariadb"} and isolation_level in {"rr"}: + snapshot_read = True + + + +print(snapshot_read) + #ts_now = "_2param_3txn_insert" From f5006cd01ff751af87926d252c65d80afb72907f Mon Sep 17 00:00:00 2001 From: WeihuaSun Date: Wed, 9 Oct 2024 23:22:17 +0800 Subject: [PATCH 07/12] fix snapshot_ts for pg --- src/dbtest/src/mda_detect.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dbtest/src/mda_detect.py b/src/dbtest/src/mda_detect.py index 0f01871d..f6bfd395 100644 --- a/src/dbtest/src/mda_detect.py +++ b/src/dbtest/src/mda_detect.py @@ -797,11 +797,11 @@ def verify_cycle(edge_type, isolation_level): return True -# db_type = sys.argv[1] #[mariadb,pg...] -# isolation_level = sys.argv[2] #[ru,rc,rr,ser] +db_type = sys.argv[1] #[mariadb,mysql,pg...] +isolation_level = sys.argv[2] #[ru,rc,rr,ser] -db_type = "pg" -isolation_level = "rr" +# db_type = "pg" +# isolation_level = "rr" isolations = { "ru":"read-uncommitted", From cf6cfa6b663e8ab47ff80c04442a159f1ada9fa5 Mon Sep 17 00:00:00 2001 From: WeihuaSun Date: Wed, 9 Oct 2024 23:26:19 +0800 Subject: [PATCH 08/12] fix snapshot_ts for pg --- src/dbtest/src/mda_detect.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/dbtest/src/mda_detect.py b/src/dbtest/src/mda_detect.py index f6bfd395..e513e0a7 100644 --- a/src/dbtest/src/mda_detect.py +++ b/src/dbtest/src/mda_detect.py @@ -796,13 +796,9 @@ def verify_cycle(edge_type, isolation_level): return True - db_type = sys.argv[1] #[mariadb,mysql,pg...] isolation_level = sys.argv[2] #[ru,rc,rr,ser] -# db_type = "pg" -# isolation_level = "rr" - isolations = { "ru":"read-uncommitted", "rc":"read-committed", @@ -823,12 +819,6 @@ def verify_cycle(edge_type, isolation_level): elif db_type in {"mysql","mariadb"} and isolation_level in {"rr"}: snapshot_read = True - - -print(snapshot_read) - - - #ts_now = "_2param_3txn_insert" ts_now = time.strftime("%Y%m%d_%H%M%S", time.localtime()) if not os.path.exists(result_folder): From 66e8d0106793ae2a1d38bd0e0005dfadf3040c5c Mon Sep 17 00:00:00 2001 From: WeihuaSun Date: Fri, 11 Oct 2024 12:55:58 +0800 Subject: [PATCH 09/12] disable concurrent transaction checks --- src/dbtest/src/mda_detect.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbtest/src/mda_detect.py b/src/dbtest/src/mda_detect.py index e513e0a7..a26a9c6b 100644 --- a/src/dbtest/src/mda_detect.py +++ b/src/dbtest/src/mda_detect.py @@ -254,7 +254,7 @@ def build_graph(data_op_list, indegree, edge, txn): None """ def insert_edge(data1, data2, indegree, edge, txn): - if check_concurrency(data1, data2, txn): + # if check_concurrency(data1, data2, txn): edge_type, data1, data2 = get_edge_type(data1, data2, txn) if edge_type != "RR" and edge_type != "RCR" and data1.txn_num != data2.txn_num: indegree[data2.txn_num] += 1 From 357f36a5fa4b8efe8c4e0cab50971d45aa0fd77a Mon Sep 17 00:00:00 2001 From: WeihuaSun Date: Mon, 14 Oct 2024 19:51:32 +0800 Subject: [PATCH 10/12] Ignore parameters that are not pg and mariadb --- src/dbtest/src/mda_detect.py | 44 ++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/src/dbtest/src/mda_detect.py b/src/dbtest/src/mda_detect.py index a26a9c6b..7aecde26 100644 --- a/src/dbtest/src/mda_detect.py +++ b/src/dbtest/src/mda_detect.py @@ -127,7 +127,7 @@ def set_finish_time(op_time, data_op_list, query, txn, version_list): if t.end_ts == op_time: t.end_ts = data_value # update 'visible_ts' for versions installed by the committed transaction - if isolation_level != "ru": + if isolation_level != "ru": # For unset, also update the visible timestamp for i, data_versions in enumerate(version_list): for j, version in enumerate(data_versions): if version[1] == txn_num: @@ -142,7 +142,7 @@ def set_finish_time(op_time, data_op_list, query, txn, version_list): visible_ts = MAX_TS #update 'snapshot_ts' if op.op_type == "R" or op.op_type == "P": - if snapshot_read and db_type in {"mysql","mariadb"}: + if snapshot_read and db_type in {"mariadb"}: txn[op.txn_num].snapshot_ts = min(op.op_time,txn[op.txn_num].snapshot_ts) elif op.op_type == "W": version_list[i].append([op.value,op.txn_num,visible_ts]) @@ -353,7 +353,7 @@ def readVersion_record(query, op_time, data_op_list, version_list): if not find: error_message = "Read version that does not exist" return error_message - if latest < 0: + if latest < 0 and db_type!= "unset": error_message = "Read version that is not the latest version" return error_message @@ -659,15 +659,15 @@ def dfs(result_folder, ts_now, now, type): else: path.append(v.out) edge_type.append(v.type) - accept = verify_cycle(edge_type,isolation_level) with open(result_folder + "/check_result" + ts_now + ".txt", "a+") as f: for i in range(0, len(path)): f.write(str(path[i])) if i != len(path) - 1: f.write("->" + edge_type[i+1] + "->") - if accept: - f.write(" : Accept") - else: - f.write(" : Reject") + if db_type != "unset": + if verify_cycle(edge_type,isolation_level): + f.write(" : Accept") + else: + f.write(" : Reject") f.write("\n\n") path.pop() edge_type.pop() @@ -784,11 +784,11 @@ def verify_cycle(edge_type, isolation_level): if generalized_edge_type[i] in anti_set and generalized_edge_type[i + 1] in anti_set: return True # Cycle is valid return False # No valid cycle found - elif db_type in {"mariadb","mysql"}: # MySQL's repeatable read is below the PL-2.99 level defined by Adya. Use PL-1 level instead. + elif db_type in {"mariadb"}: # maridb's repeatable read is below the PL-2.99 level defined by Adya. Use PL-1 level instead. return generalized_edge_type.count("WW") + generalized_edge_type.count("WR") + generalized_edge_type.count("WP") != len(generalized_edge_type) else: # Without snapshot, cycles consisting only of write, read and item-anti dependencies (G1c+G2-item) are not allowed - return generalized_edge_type.count("WW") + generalized_edge_type.count("WR") + generalized_edge_type.count("WP") + generalized_edge_type.count("RW") != len(generalized_edge_type) + return generalized_edge_type.count("WW") + generalized_edge_type.count("WR") + generalized_edge_type.count("WP") + generalized_edge_type.count("RW")+ generalized_edge_type.count("PW")!= len(generalized_edge_type) elif isolation_level == "ser": # Under 'ser' (serializable), no cycles are allowed @@ -796,8 +796,6 @@ def verify_cycle(edge_type, isolation_level): return True -db_type = sys.argv[1] #[mariadb,mysql,pg...] -isolation_level = sys.argv[2] #[ru,rc,rr,ser] isolations = { "ru":"read-uncommitted", @@ -806,17 +804,35 @@ def verify_cycle(edge_type, isolation_level): "ser":"serializable" } -run_result_folder = f"{db_type}/{isolations[isolation_level]}" +run_result_folder = "pg/repeatable-read" + +if (len(sys.argv) - 1) == 0: + db_type = "unset" + isolation_level = "unset" +else: + db_type = sys.argv[1] #[mariadb,pg] + isolation_level = sys.argv[2] #[ru,rc,rr,ser] + if db_type in {"mariadb","pg"}: + run_result_folder = f"{db_type}/{isolations[isolation_level]}" + else: + db_type = "unset" + isolation_level = "unset" + result_folder = "check_result/" + run_result_folder do_test_list = "do_test_list.txt" +print(result_folder) + snapshot_read = False +# If snapshot_read is False, snapshot_ts is always MAX_TS. This is equivalent to disabling snapshot reads. +# Once a version is visible, it will be read. + if db_type in {"pg"}: if isolation_level == "ru": isolation_level = "rc" if isolation_level in {"rr","ser"}: snapshot_read = True -elif db_type in {"mysql","mariadb"} and isolation_level in {"rr"}: +elif db_type in {"mariadb"} and isolation_level in {"rr"}: snapshot_read = True #ts_now = "_2param_3txn_insert" From c1040bba150f3b4612c8deb2f7ccf8b5b2b4db7e Mon Sep 17 00:00:00 2001 From: WeihuaSun Date: Mon, 14 Oct 2024 19:53:20 +0800 Subject: [PATCH 11/12] clean --- src/dbtest/src/mda_detect.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dbtest/src/mda_detect.py b/src/dbtest/src/mda_detect.py index 7aecde26..a83471c7 100644 --- a/src/dbtest/src/mda_detect.py +++ b/src/dbtest/src/mda_detect.py @@ -821,7 +821,6 @@ def verify_cycle(edge_type, isolation_level): result_folder = "check_result/" + run_result_folder do_test_list = "do_test_list.txt" -print(result_folder) snapshot_read = False # If snapshot_read is False, snapshot_ts is always MAX_TS. This is equivalent to disabling snapshot reads. From a7ef306ad024a08677cf67b6035a7f9b94335f10 Mon Sep 17 00:00:00 2001 From: WeihuaSun Date: Mon, 14 Oct 2024 20:12:18 +0800 Subject: [PATCH 12/12] If dbtype is unset, set visible_ts to the write finish time. --- src/dbtest/src/mda_detect.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dbtest/src/mda_detect.py b/src/dbtest/src/mda_detect.py index a83471c7..c466f3bf 100644 --- a/src/dbtest/src/mda_detect.py +++ b/src/dbtest/src/mda_detect.py @@ -127,7 +127,7 @@ def set_finish_time(op_time, data_op_list, query, txn, version_list): if t.end_ts == op_time: t.end_ts = data_value # update 'visible_ts' for versions installed by the committed transaction - if isolation_level != "ru": # For unset, also update the visible timestamp + if isolation_level != "ru" and isolation_level!="unset": for i, data_versions in enumerate(version_list): for j, version in enumerate(data_versions): if version[1] == txn_num: @@ -136,7 +136,7 @@ def set_finish_time(op_time, data_op_list, query, txn, version_list): for op in list1: if op.op_time == op_time: op.op_time = data_value - if isolation_level == "ru": + if isolation_level == "ru" or isolation_level == "unset": visible_ts = op.op_time else: visible_ts = MAX_TS @@ -804,7 +804,7 @@ def verify_cycle(edge_type, isolation_level): "ser":"serializable" } -run_result_folder = "pg/repeatable-read" +run_result_folder = "pg/serializable" if (len(sys.argv) - 1) == 0: db_type = "unset"