-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
147 lines (125 loc) · 4.89 KB
/
main.py
File metadata and controls
147 lines (125 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import sys
try:
import readline
except ImportError:
# Readline not available (e.g., Windows)
pass
from minidb import MiniDB
def print_table(data):
"""Simple standard-library implementation to print results as a table."""
if not data or not isinstance(data, list):
if isinstance(data, dict):
# Special case for DESCRIBE output or single dictionary
print_dict_as_table(data)
else:
print(data)
return
if len(data) == 0:
print("Empty set (0 rows)")
return
# Get all unique columns across all rows
columns = []
for row in data:
for k in row.keys():
if k not in columns:
columns.append(k)
# Determine column widths
widths = {col: len(col) for col in columns}
for row in data:
for col in columns:
val = str(row.get(col, ""))
widths[col] = max(widths[col], len(val))
# Print header
header = " | ".join(col.ljust(widths[col]) for col in columns)
_print_divider(header)
print(header)
_print_divider(header)
# Print rows
for row in data:
line = " | ".join(str(row.get(col, "")).ljust(widths[col]) for col in columns)
print(line)
_print_divider(header)
print(f"({len(data)} rows in set)")
def print_dict_as_table(data):
"""Converts a dictionary (like DESCRIBE results) into a readable table."""
# Special handling for DESCRIBE results which have specific structure
if 'columns' in data and 'column_types' in data:
print(f"\nSchema Information for Table:")
rows = []
for col in data['columns']:
rows.append({
'Column': col,
'Type': data['column_types'].get(col, 'UNKNOWN'),
'Key': 'PRI' if col == data.get('primary_key') else 'UNI' if col in data.get('unique_columns', []) else 'MUL' if col in data.get('foreign_keys', {}) else '',
'Details': f"Ref: {data['foreign_keys'][col]}" if col in data.get('foreign_keys', {}) else '-'
})
print_table(rows)
else:
# Fallback for generic dict
for k, v in data.items():
print(f"{k}: {v}")
def _print_divider(header_line):
print("-" * len(header_line))
def main():
db = MiniDB()
print("Welcome to MiniDB CLI.")
print("Enter SQL commands. Use ';' to separate or terminate statements.")
print("Tip: If you forget the ';', just press Enter on an empty line to execute.")
print("Type 'exit' or 'quit' to logout.")
buffer = ""
while True:
try:
prompt = "minidb> " if not buffer else " -> "
line = input(prompt)
# Exit check (immediate)
if not buffer and line.strip().lower() in ('exit', 'quit', '.exit'):
print("Goodbye!")
break
# Handle empty line as execution trigger for existing buffer
if not line.strip() and buffer.strip():
# User pressed Enter on empty line - execute what we have
pass
elif not line.strip() and not buffer.strip():
continue
else:
buffer += " " + line
# If it doesn't end in a semicolon, keep buffering
if not buffer.strip().endswith(';'):
continue
# Split and execute statements
# We treat the buffer as a single block if no semicolon,
# or multiple blocks if semicolons exist.
raw_buffer = buffer.strip()
if not raw_buffer:
continue
statements = [s.strip() for s in raw_buffer.split(';') if s.strip()]
if not statements and raw_buffer:
statements = [raw_buffer] # Fallback for no semicolon execution
buffer = "" # Reset buffer
for stmt in statements:
if len(statements) > 1:
print(f"\nExecuting: {stmt}")
try:
results = db.execute_query(stmt)
if isinstance(results, list):
print_table(results)
elif isinstance(results, dict):
print_dict_as_table(results)
else:
print(results)
except Exception as stmt_error:
print(f"Statement Error: {stmt_error}")
except KeyboardInterrupt:
if buffer:
buffer = ""
print("\nBuffer cleared.")
continue
print("\nGoodbye!")
break
except EOFError:
print("\nGoodbye!")
break
except Exception as e:
print(f"CLI Error: {e}")
if __name__ == "__main__":
main()