-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSelectIndicator.py
More file actions
141 lines (112 loc) · 4.32 KB
/
SelectIndicator.py
File metadata and controls
141 lines (112 loc) · 4.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import streamlit as st
import pandas as pd
import sqlite3
# 数据库路径
db_path = "soil_indicators.db"
# **连接数据库并获取数据**
def load_data():
conn = sqlite3.connect(db_path)
df = pd.read_sql_query("SELECT * FROM indicators", conn)
conn.close()
return df
# **确保数据库和表存在**
def ensure_database():
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS indicators (
id INTEGER PRIMARY KEY AUTOINCREMENT,
Category TEXT,
Indicator TEXT,
Unit_of_Measurement TEXT,
Description TEXT,
NL TEXT,
SP TEXT,
IR TEXT,
FR TEXT
)
''')
conn.commit()
conn.close()
ensure_database()
# **加载数据**
data = load_data()
# **自动检测国家列**
def separate_columns(df):
fixed_columns = {"id", "Category", "Indicator", "Unit of Measurement", "Description"}
country_columns = [col for col in df.columns if col not in fixed_columns]
return country_columns, list(fixed_columns)
# 获取国家列 & 其他列
country_columns, fixed_columns = separate_columns(data)
# **用户选择国家**
countries = st.multiselect("Select Countries", country_columns, default=country_columns)
# **选择指标分类**
categories = ["All"] + list(data["Category"].dropna().unique())
category = st.selectbox("Select Category", categories)
# **搜索栏**
search_query = st.text_input("Search Indicator")
# **数据筛选**
def filter_data(df, countries, category, search_query):
filtered_df = df.copy()
# **确保所有数据都是字符串,并转换为小写,忽略 "X" 和 "x" 的大小写差异**
df[countries] = df[countries].fillna("").astype(str).apply(lambda x: x.str.lower())
# **按国家筛选**
if countries:
if len(countries) == 1:
country_filter = df[countries[0]] == "x"
else:
country_filter = df[countries].eq("x").all(axis=1)
filtered_df = filtered_df[country_filter]
# **按类别筛选**
if category and category != "All":
filtered_df = filtered_df[filtered_df["Category"] == category]
# **按关键词搜索**
if search_query:
filtered_df = filtered_df[filtered_df["Indicator"].str.contains(search_query, case=False, na=False)]
return filtered_df
# **应用筛选**
filtered_data = filter_data(data, countries, category, search_query)
# **显示数据(可编辑)**
st.subheader("Edit Data Directly")
edited_data = st.data_editor(filtered_data, num_rows="dynamic")
# **保存修改**
if st.button("Save Changes"):
conn = sqlite3.connect(db_path)
edited_data.to_sql("indicators", conn, if_exists="replace", index=False)
conn.close()
st.success("Database updated successfully!")
# **数据导出**
st.download_button("Download CSV", filtered_data.to_csv(index=False), "filtered_data.csv", "text/csv")
# **新增国家列**
st.subheader("Add New Country")
new_country = st.text_input("Enter New Country Code (e.g., DE, IT)")
if st.button("Add Country"):
if new_country and new_country.isidentifier() and new_country not in country_columns:
conn = sqlite3.connect(db_path)
try:
# **添加新国家列**
conn.execute(f"ALTER TABLE indicators ADD COLUMN '{new_country}' TEXT;")
conn.commit()
# **为所有现有数据填充默认值**
data[new_country] = "" # 默认值为空(国家不能做此指标)
data.to_sql("indicators", conn, if_exists="replace", index=False)
st.success(f"Country '{new_country}' added successfully!")
except Exception as e:
st.error(f"Failed to add country: {e}")
finally:
conn.close()
st.experimental_rerun()
else:
st.warning("Invalid country code or country already exists.")
# **新增数据**
st.subheader("Add New Data")
new_data = {}
for col in fixed_columns + country_columns:
new_data[col] = st.text_input(f"Enter {col}")
if st.button("Add Row"):
conn = sqlite3.connect(db_path)
placeholders = ", ".join(["?" for _ in new_data])
conn.execute(f"INSERT INTO indicators ({', '.join(new_data.keys())}) VALUES ({placeholders})", tuple(new_data.values()))
conn.commit()
conn.close()
st.success("New row added!")