Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions src/dbtest/src/convert_sql2cypher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import os
import re

def sql_to_cypher(sql_line: str) -> str:
line = sql_line.strip()
if not line:
return line

# 保留 serializable 部分原样
if line.startswith("serializable") or line.startswith("}"):
return line
if re.match(r"^\d+-\d*,?\d*", line) and not any(kw in line.lower() for kw in ["drop", "create", "insert", "select", "update", "begin", "commit"]):
return line

# 提取前缀
if "-" in line:
prefix_str, stmt = line.split("-", 2)[0] + "-" + line.split("-", 2)[1], line.split("-", 2)[2]
else:
return line
stmt = stmt.strip().rstrip(";")

# 获取表名,作为标签(例如 t1 → :T1)
m_table = re.search(r"\bfrom\s+(\w+)|\binto\s+(\w+)|\bupdate\s+(\w+)|\btable\s+(\w+)", stmt, re.I)
label = "Test"
if m_table:
for g in m_table.groups():
if g:
label = g.capitalize()

# DROP TABLE
if stmt.lower().startswith("drop table"):
return f"{prefix_str}-MATCH (n:{label}) DETACH DELETE n;"

# CREATE TABLE
if stmt.lower().startswith("create table"):
return f"{prefix_str}-// CREATE TABLE ignored in Cypher"

# INSERT
m = re.match(r"insert into \w+ values\s*\((\d+),\s*(\d+)\)", stmt, re.I)
if m:
k, v = m.groups()
return f"{prefix_str}-CREATE (:{label} {{k: {k}, v: {v}}});"

# SELECT
if stmt.lower().startswith("select"):
if "where" in stmt.lower():
m = re.match(r"select \* from \w+ where k\s*=\s*(\d+)", stmt, re.I)
if m:
k = m.group(1)
return f"{prefix_str}-MATCH (n:{label} {{k: {k}}}) RETURN n;"
if "order by" in stmt.lower():
return f"{prefix_str}-MATCH (n:{label}) RETURN n ORDER BY n.k;"

# UPDATE
m = re.match(r"update \w+ set v\s*=\s*(\d+) where k\s*=\s*(\d+)", stmt, re.I)
if m:
v, k = m.groups()
return f"{prefix_str}-MATCH (n:{label} {{k: {k}}}) SET n.v = {v};"

# BEGIN
if stmt.lower() == "begin":
return f"{prefix_str}-:begin"

# COMMIT
if stmt.lower() == "commit":
return f"{prefix_str}-:commit"

# 默认保留
return f"{prefix_str}// [Unmapped] {stmt}"


def convert_folder(input_folder: str, output_folder: str):
if not os.path.exists(output_folder):
os.makedirs(output_folder)

for filename in os.listdir(input_folder):
if filename.endswith(".txt"):
input_path = os.path.join(input_folder, filename)
output_path = os.path.join(output_folder, filename)

with open(input_path, "r", encoding="utf-8") as infile:
lines = infile.readlines()

converted_lines = [sql_to_cypher(line) for line in lines]

with open(output_path, "w", encoding="utf-8") as outfile:
outfile.write("\n".join(converted_lines))

print(f"Converted {filename} → {output_path}")


if __name__ == "__main__":
input_folder = "./../t/pg"
output_folder = "./../t/neo4j"
convert_folder(input_folder, output_folder)
143 changes: 106 additions & 37 deletions src/dbtest/src/mda_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ def __init__(self, op_type, txn_num, op_num):
def init_table(file_name, sql_count, txn_count, table_num, db_type, test_type):
data_num = 2
with open(file_name, "a+") as file_test:
if db_type == "neo4j":
# 图数据库无需建表,直接返回
file_test.write(f"# {sql_count}-{txn_count}-Neo4j: No table creation needed.\n")
return data_num
for i in range(1, table_num + 1):
drop_sql = str(sql_count) + "-" + str(txn_count) + "-" + "DROP TABLE IF EXISTS t" + str(i) + ";\n"
file_test.write(drop_sql)
Expand Down Expand Up @@ -252,18 +256,27 @@ def insert_data(file_name, sql_count, txn_count, cur_count, partition_num, inser
# if it is not initialization, we need to pay attention to whether the transaction should be started
if sql_count != 0 and txn[txn_count].begin_ts == -1:
txn[txn_count].begin_ts = sql_count
begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n"
file_test.write(begin_sql)
if db_type == "neo4j":
begin_cypher = f"{sql_count}-{txn_count}-:begin\n"
file_test.write(begin_cypher)
else:
begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n"
file_test.write(begin_sql)
sql_count += 1
exist[cur_count] = True
if data_num == 2:
if db_type == "neo4j":
# Cypher: 创建节点
insert_cypher = f"{sql_count}-{txn_count}-CREATE (n:Test {{k: {cur_count}, v: {cur_count}}});\n"
file_test.write(insert_cypher)
elif data_num == 2:
insert_sql = str(sql_count) + "-" + str(txn_count) + "-" + "INSERT INTO t" + \
str(insert_table) + " VALUES (" + str(cur_count) + "," + str(cur_count) + ");\n"
file_test.write(insert_sql)
else:
insert_sql = str(sql_count) + "-" + str(txn_count) + "-" + "INSERT INTO t" + \
str(insert_table) + " VALUES (" + str(cur_count) + "," + str(partition_num) + \
"," + str(cur_count) + "," + str(cur_count) + ");\n"
file_test.write(insert_sql)
file_test.write(insert_sql)
data_value[cur_count] = cur_count
except OptionException:
if data_num == 2:
Expand Down Expand Up @@ -308,17 +321,26 @@ def delete_data(file_name, sql_count, txn_count, cur_count, delete_table, data_n
else:
if txn[txn_count].begin_ts == -1:
txn[txn_count].begin_ts = sql_count
begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n"
file_test.write(begin_sql)
if db_type == "neo4j":
begin_cypher = f"{sql_count}-{txn_count}-:begin\n"
file_test.write(begin_cypher)
else:
begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n"
file_test.write(begin_sql)
sql_count += 1
exist[cur_count] = False
if data_num == 2:
if db_type == "neo4j":
# Cypher: 删除节点
delete_cypher = f"{sql_count}-{txn_count}-MATCH (n:Test {{k: {cur_count}}}) DELETE n;\n"
file_test.write(delete_cypher)
elif data_num == 2:
delete_sql = str(sql_count) + "-" + str(txn_count) + "-" + "DELETE FROM t" + \
str(delete_table) + " WHERE k=" + str(cur_count) + ";\n"
file_test.write(delete_sql)
else:
delete_sql = str(sql_count) + "-" + str(txn_count) + "-" + "DELETE FROM t" + \
str(delete_table) + " WHERE value1=" + str(cur_count) + ";\n"
file_test.write(delete_sql)
file_test.write(delete_sql)
data_op_list[cur_count].append(Operation("D", txn_count))
except OptionException:
file_test.write("the transaction has ended and cannot be read")
Expand Down Expand Up @@ -360,17 +382,26 @@ def write_data(file_name, sql_count, txn_count, op_num, data_num, txn, data_valu
else:
if txn[txn_count].begin_ts == -1:
txn[txn_count].begin_ts = sql_count
begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n"
file_test.write(begin_sql)
if db_type == "neo4j":
begin_cypher = f"{sql_count}-{txn_count}-:begin\n"
file_test.write(begin_cypher)
else:
begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n"
file_test.write(begin_sql)
sql_count += 1
if data_num == 2:
if db_type == "neo4j":
# Cypher: 更新节点属性
write_cypher = f"{sql_count}-{txn_count}-MATCH (n:Test {{k: {op_num}}}) SET n.v = {data_value[op_num] + 1};\n"
file_test.write(write_cypher)
elif data_num == 2:
write_sql = str(sql_count) + "-" + str(txn_count) + "-" + "UPDATE t1 SET v=" + \
str(data_value[op_num] + 1) + " WHERE k=" + str(op_num) + ";\n"
file_test.write(write_sql)
else:
write_sql = str(sql_count) + "-" + str(txn_count) + "-" + "UPDATE t" + str(txn_count) + \
" SET value2=" + str(data_value[op_num] + 1) + " WHERE value1=" + \
str(op_num) + ";\n"
file_test.write(write_sql)
file_test.write(write_sql)
data_op_list[op_num].append(Operation("W", txn_count))
data_value[op_num] += 1
except OptionException:
Expand Down Expand Up @@ -409,16 +440,25 @@ def read_data(file_name, sql_count, txn_count, op_num, data_num, txn, data_op_li
else:
if txn[txn_count].begin_ts == -1:
txn[txn_count].begin_ts = sql_count
begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n"
file_test.write(begin_sql)
if db_type == "neo4j":
begin_cypher = f"{sql_count}-{txn_count}-:begin\n"
file_test.write(begin_cypher)
else:
begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n"
file_test.write(begin_sql)
sql_count += 1
if data_num == 2:
if db_type == "neo4j":
# Cypher: 查询节点
read_cypher = f"{sql_count}-{txn_count}-MATCH (n:Test {{k: {op_num}}}) RETURN n;\n"
file_test.write(read_cypher)
elif data_num == 2:
read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t1 WHERE k=" + \
str(op_num) + ";\n"
file_test.write(read_sql)
else:
read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t" + str(txn_count) + \
" WHERE value1=" + str(op_num) + ";\n"
file_test.write(read_sql)
file_test.write(read_sql)
data_op_list[op_num].append(Operation("R", txn_count))
except OptionException:
file_test.write("the transaction has ended and cannot be read")
Expand Down Expand Up @@ -456,16 +496,25 @@ def read_data_predicate(file_name, sql_count, txn_count, op_num, data_num, txn,
else:
if txn[txn_count].begin_ts == -1:
txn[txn_count].begin_ts = sql_count
begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n"
file_test.write(begin_sql)
if db_type == "neo4j":
begin_cypher = f"{sql_count}-{txn_count}-:begin\n"
file_test.write(begin_cypher)
else:
begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n"
file_test.write(begin_sql)
sql_count += 1
if data_num == 2:
if db_type == "neo4j":
# Cypher: 范围查询
read_cypher = f"{sql_count}-{txn_count}-MATCH (n:Test) WHERE n.k > {op_num*2} AND n.k < {op_num*2+2} RETURN n;\n"
file_test.write(read_cypher)
elif data_num == 2:
read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t1 WHERE k>" + \
str(op_num*2) + " and k<" + str(op_num*2+2) + ";\n"
file_test.write(read_sql)
else:
read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t" + str(txn_count) + \
" WHERE value1>" + str(op_num*2) + " and value1<" + str(op_num*2+2) + ";\n"
file_test.write(read_sql)
file_test.write(read_sql)
data_op_list[op_num].append(Operation("P", txn_count))
except OptionException:
file_test.write("the transaction has ended and cannot be read")
Expand Down Expand Up @@ -499,8 +548,12 @@ def abort_txn(file_name, sql_count, txn_count, txn):
raise OptionException
else:
txn[txn_count].end_ts = sql_count
abort_sql = str(sql_count) + "-" + str(txn_count) + "-" + "ROLLBACK;\n"
file_test.write(abort_sql)
if db_type == "neo4j":
abort_cypher = f"{sql_count}-{txn_count}-:rollback\n"
file_test.write(abort_cypher)
else:
abort_sql = str(sql_count) + "-" + str(txn_count) + "-" + "ROLLBACK;\n"
file_test.write(abort_sql)
except OptionException:
file_test.write("transaction" + str(txn_count) + " ended and can't be rolled back again")
print("transaction" + str(txn_count) + " ended and can't be rolled back again")
Expand Down Expand Up @@ -533,8 +586,12 @@ def commit_txn(file_name, sql_count, txn_count, txn):
raise OptionException
else:
txn[txn_count].end_ts = sql_count
commit_sql = str(sql_count) + "-" + str(txn_count) + "-" + "COMMIT;\n"
file_test.write(commit_sql)
if db_type == "neo4j":
commit_cypher = f"{sql_count}-{txn_count}-:commit\n"
file_test.write(commit_cypher)
else:
commit_sql = str(sql_count) + "-" + str(txn_count) + "-" + "COMMIT;\n"
file_test.write(commit_sql)
except OptionException:
file_test.write("transaction" + str(txn_count) + " ended and can't be committed again")
print("transaction" + str(txn_count) + " ended and can't be committed again")
Expand All @@ -558,20 +615,32 @@ def commit_txn(file_name, sql_count, txn_count, txn):
"""
def execute_check(file_name, sql_count, txn_count, data_num, table_num):
with open(file_name, "a+") as file_test:
begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n"
file_test.write(begin_sql)
sql_count += 1
for i in range(1, table_num + 1):
if data_num == 2:
read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t1 ORDER BY k;\n"
else:
read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t" + str(i) + \
" ORDER BY k;\n"
file_test.write(read_sql)
if db_type == "neo4j":
begin_cypher = f"{sql_count}-{txn_count}-:begin\n"
file_test.write(begin_cypher)
sql_count += 1
# 只需一次全图查询
read_cypher = f"{sql_count}-{txn_count}-MATCH (n:Test) RETURN n ORDER BY n.k;\n"
file_test.write(read_cypher)
sql_count += 1
commit_cypher = f"{sql_count}-{txn_count}-:commit\n"
file_test.write(commit_cypher)
sql_count += 1
else:
begin_sql = str(sql_count) + "-" + str(txn_count) + "-" + "BEGIN;\n"
file_test.write(begin_sql)
sql_count += 1
for i in range(1, table_num + 1):
if data_num == 2:
read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t1 ORDER BY k;\n"
else:
read_sql = str(sql_count) + "-" + str(txn_count) + "-" + "SELECT * FROM t" + str(i) + \
" ORDER BY k;\n"
file_test.write(read_sql)
sql_count += 1
commit_sql = str(sql_count) + "-" + str(txn_count) + "-" + "COMMIT;\n"
file_test.write(commit_sql)
sql_count += 1
commit_sql = str(sql_count) + "-" + str(txn_count) + "-" + "COMMIT;\n"
file_test.write(commit_sql)
sql_count += 1

"""
Check the last operation before the current position in a list of operations.
Expand Down
26 changes: 26 additions & 0 deletions src/dbtest/t/neo4j/iat_dda_read_skew_committed.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
ParamNum:2
0-1-MATCH (n:If) DETACH DELETE n;
0-1-// CREATE TABLE ignored in Cypher
0-1-CREATE (:T1 {k: 0, v: 0});
0-1-CREATE (:T1 {k: 1, v: 0});
0-1-:commit
1-1-:begin
2-1-MATCH (n:T1 {k: 0}) RETURN n;
3-2-:begin
4-2-MATCH (n:T1 {k: 1}) SET n.v = 1;
5-2-MATCH (n:T1 {k: 0}) SET n.v = 1;
6-2-:commit
7-1-MATCH (n:T1 {k: 1}) RETURN n;
8-1-:commit
9-3-MATCH (n:T1) RETURN n ORDER BY n.k;
10-3-:commit

serializable {
2-0,0
7-1,0
9-0,1 1,1

2-0,1
7-1,1
9-0,1 1,1
}
24 changes: 24 additions & 0 deletions src/dbtest/t/neo4j/iat_dda_read_write_skew1_committed.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
ParamNum:2
0-1-MATCH (n:If) DETACH DELETE n;
0-1-// CREATE TABLE ignored in Cypher
0-1-CREATE (:T1 {k: 0, v: 0});
0-1-CREATE (:T1 {k: 1, v: 0});
0-1-:commit
1-1-:begin
2-1-MATCH (n:T1 {k: 0}) RETURN n;
3-2-:begin
4-2-MATCH (n:T1 {k: 1}) SET n.v = 2;
5-2-MATCH (n:T1 {k: 0}) SET n.v = 1;
6-2-:commit
7-1-MATCH (n:T1 {k: 1}) SET n.v = 1;
8-1-:commit
9-3-MATCH (n:T1) RETURN n ORDER BY n.k;
10-3-:commit

serializable {
2-0,0
9-0,1 1,2

2-0,1
9-0,1 1,1
}
Loading