-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpreprocessing
More file actions
281 lines (237 loc) · 8.54 KB
/
Copy pathpreprocessing
File metadata and controls
281 lines (237 loc) · 8.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
import pandas as pd
# Load Exchange Rates data and convert the 'Date' column to datetime format
df = pd.read_csv(r"C:\Users\saina\OneDrive\Desktop\PROJECT 2\csv files\Exchange_Rates.csv")
df['Date'] = pd.to_datetime(df['Date'])
Exchange_rates_df = df # Store the cleaned Exchange Rates DataFrame
# Load Stores data, convert 'Open Date' to datetime format, and fill NaN values with 0
df = pd.read_csv(r"C:\Users\saina\OneDrive\Desktop\PROJECT 2\csv files\Stores.csv")
df['Open Date'] = pd.to_datetime(df['Open Date'])
df = df.fillna(0) # For online stores, square meters column is NaN, so set it to 0
Stores_df = df # Store the cleaned Stores DataFrame
# Load Products data, clean 'Unit Cost USD' and 'Unit Price USD' columns by removing '$' and ',' and converting to float
df = pd.read_csv(r"C:\Users\saina\OneDrive\Desktop\PROJECT 2\csv files\Products.csv")
df['Unit Cost USD'] = df['Unit Cost USD'].str.replace('$', '').str.replace(',', '').astype(float)
df['Unit Price USD'] = df['Unit Price USD'].str.replace('$', '').str.replace(',', '').astype(float)
Products_df = df # Store the cleaned Products DataFrame
# Load Customers data, handle missing state codes, and convert 'Birthday' to datetime format
df = pd.read_csv(r"C:\Users\saina\OneDrive\Desktop\PROJECT 2\csv files\Customers.csv", encoding='ISO-8859-1')
df['State Code'] = df['State Code'].fillna('Napoli') # Fill missing state codes with 'Napoli'
df['Birthday'] = pd.to_datetime(df['Birthday']) # Convert 'Birthday' to datetime format
Customers_df = df # Store the cleaned Customers DataFrame
# Load Sales data, convert 'Order Date' and 'Delivery Date' to datetime format
df = pd.read_csv(r"C:\Users\saina\OneDrive\Desktop\PROJECT 2\csv files\Sales.csv")
df['Order Date'] = pd.to_datetime(df['Order Date'])
df['Delivery Date'] = pd.to_datetime(df['Delivery Date'])
# Fill null delivery dates with the order date plus 5 days (average delivery time)
df.loc[df['Delivery Date'].isna(), 'Delivery Date'] = df['Order Date'] + pd.Timedelta(days=5)
Sales_df = df # Store the cleaned Sales DataFrame
# The average duration for delivery is calculated to fill the null values in Delivery Date column but commented out for now
# not_null_df = df[df['Delivery Date'].notna()]
# not_null_df['Order Date'] = pd.to_datetime(not_null_df['Order Date'])
# not_null_df['Delivery Date'] = pd.to_datetime(not_null_df['Delivery Date'])
# not_null_df['Duration'] = (not_null_df['Delivery Date'] - not_null_df['Order Date']).dt.days
# average_duration = not_null_df['Duration'].mean()
# print("Average Duration overall:", average_duration) # Output of the average duration
Merging
# Merge Sales data with Customers data on 'CustomerKey'
sales_customers_df = pd.merge(Sales_df, Customers_df, on='CustomerKey', how='left')
# Merge the resulting DataFrame with Products data on 'ProductKey'
sales_customers_products_df = pd.merge(sales_customers_df, Products_df, on='ProductKey', how='left')
# Merge Sales data with Products data on 'ProductKey'
sales_products_df = pd.merge(Sales_df, Products_df, on='ProductKey', how='left')
# Merge Sales data with Stores data on 'StoreKey'
sales_stores_df = pd.merge(Sales_df, Stores_df, on='StoreKey', how='left')
# Merge sales_products_df with Exchange Rates data on 'Order Date' and 'Currency Code'
Sales_products_Exchangerates_df = pd.merge(
sales_products_df,
Exchange_rates_df[['Date', 'Currency', 'Exchange']], # Selecting only the needed columns
left_on=['Order Date', 'Currency Code'], # Columns in sales_products_df
right_on=['Date', 'Currency'], # Columns in exchange_rates_df
how='left' # Use 'left' to keep all rows from sales_products_df
)
# Note: The merged DataFrames re not used in any further analyses or visualizations in Power BI or SQL.
Storing in MySQL database
Sales table
import mysql.connector
con = mysql.connector.connect(
host="localhost",
user="root",
password="12345678"
)
cursor = con.cursor()
cursor.execute("create database if not exists DataSpark")
cursor.execute("use DataSpark")
query = """CREATE TABLE if not exists sales (
Order_Number INT,
Line_Item INT,
Order_Date DATE,
Delivery_Date DATE,
Customer_Key INT,
Store_Key INT,
Product_Key INT,
Quantity INT,
Currency_Code VARCHAR(255)
)"""
cursor.execute(query)
insert_query = """
INSERT INTO sales (Order_Number, Line_Item, Order_Date, Delivery_Date, Customer_Key, Store_Key, Product_Key, Quantity, Currency_Code)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# Convert DataFrame to list of tuples for insertion
data_to_insert = Sales_df.values.tolist()
# Execute insertion
cursor.executemany(insert_query, data_to_insert)
# Commit and close
con.commit()
cursor.close()
con.close()
Products table
import mysql.connector
con = mysql.connector.connect(
host="localhost",
user="root",
password="12345678",
database="DataSpark"
)
cursor = con.cursor()
query = """CREATE TABLE if not exists products (
Product_Key INT,
Product_Name VARCHAR(255),
Brand VARCHAR(255),
Color VARCHAR(255),
Unit_Cost_USD DECIMAL(10, 2),
Unit_Price_USD DECIMAL(10, 2),
Subcategory_Key INT,
Subcategory VARCHAR(255),
CategoryKey INT,
Category VARCHAR(255)
)"""
cursor.execute(query)
insert_query = """
INSERT INTO products (Product_Key, Product_Name, Brand, Color, Unit_Cost_USD, Unit_Price_USD, Subcategory_Key, Subcategory, CategoryKey, Category)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# Convert DataFrame to list of tuples for insertion
data_to_insert = Products_df.values.tolist()
# Execute insertion
cursor.executemany(insert_query, data_to_insert)
# Commit and close
con.commit()
cursor.close()
con.close()
Customers table
import mysql.connector
# Establishing connection to MySQL
con = mysql.connector.connect(
host="localhost",
user="root",
password="12345678",
database="DataSpark"
)
cursor = con.cursor()
# Creating the customers table
query = """CREATE TABLE IF NOT EXISTS customers (
Customer_Key INT,
Gender VARCHAR(50),
Name VARCHAR(255),
City VARCHAR(255),
State_Code VARCHAR(50),
State VARCHAR(255),
Zip_Code VARCHAR(50),
Country VARCHAR(255),
Continent VARCHAR(255),
Birthday DATE
)"""
cursor.execute(query)
# Inserting data into customers table
insert_query = """
INSERT INTO customers (Customer_Key, Gender, Name, City, State_Code, State, Zip_Code, Country, Continent, Birthday)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# Convert DataFrame to list of tuples for insertion
data_to_insert = Customers_df.values.tolist()
# Execute the insertion
cursor.executemany(insert_query, data_to_insert)
# Commit the changes and close the connection
con.commit()
cursor.close()
con.close()
Stores table
import mysql.connector
# Establishing connection to MySQL
con = mysql.connector.connect(
host="localhost",
user="root",
password="12345678",
database="DataSpark"
)
cursor = con.cursor()
# Creating the stores table
query = """CREATE TABLE IF NOT EXISTS stores (
Store_Key INT,
Country VARCHAR(255),
State VARCHAR(255),
Square_Meters FLOAT,
Open_Date DATE
)"""
cursor.execute(query)
# Inserting data into stores table
insert_query = """
INSERT INTO stores (Store_Key, Country, State, Square_Meters, Open_Date)
VALUES (%s, %s, %s, %s, %s)
"""
# Convert DataFrame to list of tuples for insertion
data_to_insert = Stores_df.values.tolist()
# Execute the insertion
cursor.executemany(insert_query, data_to_insert)
# Commit the changes and close the connection
con.commit()
cursor.close()
con.close()
Exchange_rates table
import mysql.connector
# Establish the connection
con = mysql.connector.connect(
host="localhost",
user="root",
password="12345678",
database="DataSpark"
)
cursor = con.cursor()
# Create the exchange_rates table
query = """CREATE TABLE IF NOT EXISTS exchange_rates (
Date DATE,
Currency VARCHAR(10),
Exchange FLOAT
)"""
cursor.execute(query)
# Prepare the insert query
insert_query = """
INSERT INTO exchange_rates (Date, Currency, Exchange)
VALUES (%s, %s, %s)
"""
# Convert DataFrame to list of tuples for insertion
data_to_insert = Exchange_rates_df.values.tolist()
# Execute the insertion
cursor.executemany(insert_query, data_to_insert)
# Commit the transaction and close con
con.commit()
cursor.close()
con.close()
Check all the tables are created or not
import mysql.connector
con = mysql.connector.connect(
host="localhost",
user="root",
password="12345678",
database="DataSpark"
)
cursor = con.cursor()
cursor.execute("show tables")
for i in cursor:
print(i)
('customers',)
('exchange_rates',)
('products',)
('sales',)
('stores',)