Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions src/dbtest/src/mda_detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class Txn:
def __init__(self):
self.begin_ts = -1
self.end_ts = 99999999999999999999
self.isolation_level = "SERIALIZABLE" # Default to SERIALIZABLE isolation level


"""
Find the total variable number.
Expand Down Expand Up @@ -150,6 +152,24 @@ def set_finish_time(op_time, data_op_list, query, txn, version_list):
# or the start time of the second transaction is less than the end time of the first transaction
# we think they are concurrent
def check_concurrency(data1, data2, txn):
# Get isolation levels of both transactions
level1 = txn[data1.txn_num].isolation_level
level2 = txn[data2.txn_num].isolation_level

# Relax checks for READ UNCOMMITTED level
if level1 == "READ UNCOMMITTED" or level2 == "READ UNCOMMITTED":
return True

# Partial relaxation for READ COMMITTED level
if level1 == "READ COMMITTED" or level2 == "READ COMMITTED":
if txn[data2.txn_num].begin_ts < txn[data1.txn_num].end_ts:
return True
elif txn[data1.txn_num].begin_ts < txn[data2.txn_num].end_ts:
return True
else:
return False

# Strict checks for REPEATABLE READ and SERIALIZABLE levels
if txn[data2.txn_num].begin_ts < txn[data1.txn_num].end_ts:
return True
elif txn[data1.txn_num].begin_ts < txn[data2.txn_num].end_ts:
Expand All @@ -175,6 +195,10 @@ def check_concurrency(data1, data2, txn):
# decide which operation comes first depending on the read or write version
# if later operation happened after the first txn commit time, edge type will add "C"
def get_edge_type(data1, data2, txn):
# Get transaction isolation levels
level1 = txn[data1.txn_num].isolation_level
level2 = txn[data2.txn_num].isolation_level

if data1.value <= data2.value:
before, after = data1, data2
else:
Expand All @@ -184,10 +208,15 @@ def get_edge_type(data1, data2, txn):
# before, after = data2, data1
# else:
# before, after = data1, data2
if data2.op_time > txn[data1.txn_num].end_ts:

# Adjust edge type based on isolation level
if level1 == "READ UNCOMMITTED" or level2 == "READ UNCOMMITTED":
state = ""
elif data2.op_time > txn[data1.txn_num].end_ts:
state = "C"
else:
state = ""

return before.op_type + state + after.op_type, before, after


Expand Down Expand Up @@ -236,7 +265,13 @@ def build_graph(data_op_list, indegree, edge, txn):
def insert_edge(data1, data2, indegree, edge, txn):
if check_concurrency(data1, data2, txn):
edge_type, data1, data2 = get_edge_type(data1, data2, txn)
if edge_type != "RR" and edge_type != "RCR" and data1.txn_num != data2.txn_num:
# Adjust edge insertion based on isolation level
level1 = txn[data1.txn_num].isolation_level
level2 = txn[data2.txn_num].isolation_level

if (edge_type != "RR" and edge_type != "RCR" and
data1.txn_num != data2.txn_num and
not (level1 == "READ UNCOMMITTED" and level2 == "READ UNCOMMITTED")):
indegree[data2.txn_num] += 1
edge[data1.txn_num].append(Edge(edge_type, data2.txn_num))

Expand Down Expand Up @@ -500,6 +535,18 @@ def operation_record(total_num, query, txn, data_op_list, version_list):
if op_time == 0 and query.find("INSERT") != -1:
init_record(query, version_list)
return error_message
# Set transaction isolation level
if query.find("SET TRANSACTION ISOLATION LEVEL") != -1:
if query.find("READ UNCOMMITTED") != -1:
txn[txn_num].isolation_level = "READ UNCOMMITTED"
elif query.find("READ COMMITTED") != -1:
txn[txn_num].isolation_level = "READ COMMITTED"
elif query.find("REPEATABLE READ") != -1:
txn[txn_num].isolation_level = "REPEATABLE READ"
elif query.find("SERIALIZABLE") != -1:
txn[txn_num].isolation_level = "SERIALIZABLE"
return error_message

if query.find("returnresult") != -1:
error_message = readVersion_record(query, op_time, data_op_list, version_list)
return error_message
Expand Down