import pandas as pd
import numpy as np
from sklearn_pandas import DataFrameMapper
from sklearn.model_selection import train_test_split, KFold, GridSearchCV, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import OneHotEncoder
from feature_engine.imputation import CategoricalImputer
from sklearn.base import BaseEstimator,TransformerMixin
# load train and test data
train_raw = pd.read_csv("./house-prices-advanced-regression-techniques/train.csv", index_col='Id')
test_raw = pd.read_csv("./house-prices-advanced-regression-techniques/test.csv", index_col='Id')
print("The shape of train: {}".format(train_raw.shape))
print("The shape of test: {}".format(test_raw.shape))
## getColInfo shows a table with column names and their dtypes and the number of unique values
## you can set parameter dtype and cardinality_threshold to show columns you want
## it can be used to check how many categories a column has for the reference of
## choosing methods of encoding categorical variables.
def getColInfo(train, dtype=None, cardinality_threshold=0):
train_dtypes = [train[dt].dtype for dt in train.columns]
nuniques = [train[col].nunique() for col in train.columns]
res = pd.DataFrame({'Columns Names': train.columns,
'Dtypes': train_dtypes,
'Unique Value Numbers': nuniques })
if dtype != None:
res = res.loc[res['Dtypes']==dtype]
res = pd.DataFrame(res.loc[res['Unique Value Numbers'] >= cardinality_threshold])
return(res.sort_values(by=['Unique Value Numbers'], axis=0))
## getMissingData shows the percentage of columns that contain missing data
def getMissingData(train):
all_data_na = (train.isnull().sum() / len(train)) * 100
all_data_na = all_data_na.drop(all_data_na[all_data_na == 0].index).sort_values(ascending=False)
all_data_na_dtypes = [train[dt].dtype for dt in all_data_na.index]
nuniques = train[all_data_na.index].nunique()
missing_data = pd.DataFrame({'Dtypes': all_data_na_dtypes,
'Missing Ratio' : all_data_na,
'Unique Value Numbers': nuniques })
if missing_data.shape[0] == 0:
return("no missing vlaue")
return(missing_data)
getColInfo(train_raw, dtype='object', cardinality_threshold=10)
ob = getMissingData(train_raw)
type(ob.loc[ob['Missing Ratio']>5].index)
X_train, X_valid, y_train, y_valid = train_test_split(train_raw.drop('SalePrice', axis=1),
train_raw['SalePrice'],
test_size=0.1,
random_state=99)
X_train.shape
## define a class to filter unwanted columns
class ColumnFilter(BaseEstimator, TransformerMixin):
def __init__(self, threshold=0):
super().__init__()
self.threshold = threshold
self.col_names_deleted = []
def fit(self, X, y=None):
missing_df = self.get_missing_data(X, self.threshold)
self.col_names_deleted = missing_df.index
return self
def transform(self, X, y=None):
return(X.drop(columns=self.col_names_deleted))
def get_missing_data(self, X, threshold):
all_data_na = (X.isnull().sum() / len(X)) * 100
all_data_na = all_data_na.drop(all_data_na[all_data_na == 0].index).sort_values(ascending=False)
all_data_na_dtypes = [X[dt].dtype for dt in all_data_na.index]
nuniques = X[all_data_na.index].nunique()
missing_data = pd.DataFrame({'Dtypes': all_data_na_dtypes,
'Missing Ratio' : all_data_na,
'Unique Value Numbers': nuniques })
return(missing_data.loc[missing_data['Missing Ratio'] >= self.threshold])
# test
# drop all the columns if their missing ration is greater than 5%
cf = ColumnFilter(5)
X_train_transformed = cf.fit_transform(X_train)
X_train_transformed.shape
X_train_transformed.head()
For qualitative values, impute the mean value if there is no specified requirement. For categorical values, impute the mode value if there is no specified requirement.
## Below is a class you can use to realize customized imputation
from sklearn.base import TransformerMixin
from sklearn.base import BaseEstimator
class MyImputer(BaseEstimator, TransformerMixin):
def __init__(self):
"""Impute missing values.
Columns of dtype object are imputed with the most frequent value
in column.
Columns of other types are imputed with mean of column.
"""
def fit(self, X, y=None):
self.fill = pd.Series([X[c].value_counts().index[0]
if X[c].dtype == np.dtype('O') else X[c].mean() for c in X],
index=X.columns)
return self
def transform(self, X, y=None):
return X.fillna(self.fill)
# test
# impute X_train
my_imputer = MyImputer()
X_train_imputed = my_imputer.fit_transform(X_train_transformed)
# check missing data after imputation
getMissingData(X_train_imputed)
X_train_imputed.shape
We will encode all the categorical data using one-hot encoding with a specified threshold of cardinality.
class MyEncoder(BaseEstimator, TransformerMixin):
def __init__(self, cardinality_threshold=0):
self.cardinality_threshold=cardinality_threshold
self.cat_cols = []
self.one_hot_encoder = OneHotEncoder(handle_unknown='ignore')
def fit(self, X, y=None):
# get all the categorial column names
self.cat_cols = list(X.columns[X.dtypes == 'O'])
self.cat_cols = list(X[self.cat_cols].columns[X[self.cat_cols].nunique() > self.cardinality_threshold])
self.one_hot_encoder.fit(X[self.cat_cols])
return(self)
def transform(self, X, y=None):
# one hot encoder transform
X_encoded = pd.DataFrame(self.one_hot_encoder.transform(X[self.cat_cols]).toarray(),
index=X.index,
columns=self.one_hot_encoder.get_feature_names_out())
#print('The shape of X_encoded is {}'.format(X_encoded.shape))
# delete original categorical columns
X_dropped = X.drop(self.cat_cols,axis=1)
#print('The shape of X_dropped is {}'.format(X_dropped.shape))
# concat
return(pd.concat([X_dropped,X_encoded],axis=1))
my_encoder= MyEncoder()
my_encoder.fit(X_train)
X_encoded = my_encoder.transform(X_train)
X_encoded.shape
pipe_ranf = Pipeline([('ColumnFilter_ranf', ColumnFilter()),
('MyImputer_ranf', MyImputer()),
('MyEncoder_ranf', MyEncoder()),
('model_ranf', RandomForestRegressor())])
grid_params_ranf = {
'ColumnFilter_ranf__threshold':[5],
'model_ranf__n_estimators':[500],
'model_ranf__max_depth':[12,14,16],
'model_ranf__criterion':['squared_error'],
'model_ranf__min_samples_leaf':[5,10,20],
'model_ranf__oob_score':[True],
'model_ranf__random_state':[2021]
}
ranf_cv_tune = GridSearchCV(estimator=pipe_ranf,
param_grid=grid_params_ranf,
scoring='neg_mean_squared_error',
cv=5)
# log transform y
ranf_cv_tune.fit(X_train, np.log1p(y_train))
ranf_best = ranf_cv_tune.best_estimator_
pred_valid = ranf_best.predict(X_valid)
def MSE(y_valid,y_pred):
res = np.square(np.subtract(y_valid,y_pred)).mean()
return(res)
MSE(np.log1p(y_valid), pred_valid)
# test
pipe_ranf2 = Pipeline([('ColumnFilter_ranf', ColumnFilter(5)),
('MyImputer_ranf', MyImputer()),
('MyEncoder_ranf', MyEncoder()),])
X_transformed = pipe_ranf2.fit_transform(X_train)
ranf2 = RandomForestRegressor()
grid_params_ranf2 = {
'n_estimators':[500],
'max_depth':[4,6,10,14],
'criterion':['squared_error'],
'min_samples_leaf':[5,10,20],
'oob_score':[True],
'random_state':[2021]
}
grid2 = GridSearchCV(estimator=ranf2,
param_grid=grid_params_ranf2,
scoring='neg_mean_squared_error',
cv=5)
grid2.fit(X_transformed.values, np.log1p(y_train))
grid2.best_estimator_
ranf_best = RandomForestRegressor(
n_estimators=500,
max_depth=14,
criterion='squared_error',
min_samples_leaf=5,
oob_score=True,
random_state=2021
)
ranf_best.fit(X_transformed.values, np.log1p(y_train))
X_valid_transformed = pipe_ranf2.transform(X_valid)
pred_value = ranf_best.predict(X_valid_transformed.values)
MSE_valid = np.square(np.subtract(np.log1p(y_valid),pred_value)).mean();MSE_valid
# define transform function for y
import math
def trans_Y(Y):
return([math.exp(1)**y - 1 for y in Y ])
test_raw_transformed = pipe_ranf2.transform(test_raw)
pred_test = ranf_best.predict(test_raw_transformed.values)
pred_test_reversed = trans_Y(pred_test)
output = pd.DataFrame({'Id': test_raw.index,
'SalePrice': pred_test_reversed})
output.to_csv('submission.csv', index=False)
import xgboost as xgb
pipe_xgb = Pipeline([('ColumnFilter_ranf', ColumnFilter()),
('MyImputer_ranf', MyImputer()),
('MyEncoder_ranf', MyEncoder()),
('XGBoost', xgb.XGBRegressor())])
grid_params_xgb = {
'ColumnFilter_ranf__threshold':[5],
'XGBoost__n_estimators':[200,500],
'XGBoost__max_depth':[2,4],
'XGBoost__min_child_weight': [5,8],
'XGBoost__learning_rate': [0.1,0.05],
'XGBoost__gamma':[0.01,0.1],
'XGBoost__reg_lambda': [0.1,0.5,2,5],
'XGBoost__random_state': [2022]
}
xgb_cv_tune = GridSearchCV(estimator=pipe_xgb,
param_grid=grid_params_xgb,
scoring='neg_mean_squared_error',
cv=5)
xgb_cv_tune.fit(X_train,np.log1p(y_train))
xgb_best = xgb_cv_tune.best_estimator_;xgb_best
pred_xgb = xgb_best.predict(X_valid)
MSE_valid = MSE(np.log1p(y_valid), pred_xgb);MSE_valid
pred_test = xgb_regressor_refit.predict(test_raw_transformed)
pred_test_reversed = trans_Y(pred_test)
output = pd.DataFrame({'Id': test_raw.index,
'SalePrice': pred_test_reversed})
output.to_csv('submission_xgboost.csv', index=False)
from sklearn.preprocessing import RobustScaler
from sklearn.linear_model import Lasso
pipe_lasso = Pipeline([('ColumnFilter_ranf', ColumnFilter()),
('MyImputer_ranf', MyImputer()),
('MyEncoder_ranf', MyEncoder()),
('RobustScaler', RobustScaler()),
('Lasso', Lasso())
])
grid_params_lasso = {
'ColumnFilter_ranf__threshold':[5],
'Lasso__alpha':[0.01,0.05,0.1,1,2],
'Lasso__random_state':[2021],
'Lasso__selection':['random'],
'Lasso__max_iter': [1000,1500,2000]
}
lasso_cv_tune = GridSearchCV(estimator=pipe_lasso,
param_grid=grid_params_lasso,
scoring='neg_mean_squared_error',
cv=5)
lasso_cv_tune.fit(X_train, np.log1p(y_train))
lasso_best = lasso_cv_tune.best_estimator_
lasso_best.fit(X_train, np.log1p(y_train))
lasso_pred = lasso_best.predict(X_valid)
MSE(np.log1p(y_valid), lasso_pred)
from sklearn.kernel_ridge import KernelRidge
pipe_ridge = Pipeline([('ColumnFilter_ranf', ColumnFilter()),
('MyImputer_ranf', MyImputer()),
('MyEncoder_ranf', MyEncoder()),
('RobustScaler', RobustScaler()),
('KernelRidge', KernelRidge())
])
grid_params_ridge = {
'ColumnFilter_ranf__threshold':[5],
'KernelRidge__alpha':[0.01,0.1,0.5],
'KernelRidge__kernel':['polynomial','linear'],
'KernelRidge__degree': [3],
'KernelRidge__coef0': [2,2.5,3]
}
ridge_cv_tune = GridSearchCV(estimator=pipe_ridge,
param_grid=grid_params_ridge,
scoring='neg_mean_squared_error',
cv=5)
ridge_cv_tune.fit(X_train, np.log1p(y_train))
ridge_best = ridge_cv_tune.best_estimator_
ridge_cv_tune.scorer_
ridge_pred = pipe_ridge.predict(X_valid)
MSE(np.log1p(y_valid), ridge_pred)
from sklearn.linear_model import ElasticNet
pipe_elastic = Pipeline([('ColumnFilter_ranf', ColumnFilter()),
('MyImputer_ranf', MyImputer()),
('MyEncoder_ranf', MyEncoder()),
('RobustScaler', RobustScaler()),
('ElasticNet', ElasticNet())
])
grid_params_elastic = {
'ColumnFilter_ranf__threshold':[5],
'ElasticNet__alpha':[0.01,0.1,0.5,0.6,0.8],
'ElasticNet__l1_ratio':[0.1,0.3,0.5,0.7,1]
}
elastic_cv_tune = GridSearchCV(estimator=pipe_elastic,
param_grid=grid_params_elastic,
scoring='neg_mean_squared_error',
cv=5)
elastic_cv_tune.fit(X_train, np.log1p(y_train))
elastic_best = elastic_cv_tune.best_estimator_
elastic_pred = elastic_best.predict(X_valid)
MSE(np.log1p(y_valid), elastic_pred)
pred_elastic = pipe_elastic.predict(test_raw)
pred_test_reversed = trans_Y(pred_elastic)
output = pd.DataFrame({'Id': test_raw.index,
'SalePrice': pred_test_reversed})
output.to_csv('submission_elastic.csv', index=False)
from sklearn.ensemble import VotingRegressor
votingR = VotingRegressor(estimators=[
('xgb', xgb_best),
('lasso',lasso_best),
('elastic', elastic_best)],
n_jobs=4)
votingR = votingR.fit(X_train, np.log1p(y_train))
voting_pred = votingR.predict(X_valid)
MSE(np.log1p(y_valid), voting_pred)
pred_test = votingR.predict(test_raw)
pred_test_reversed = trans_Y(pred_test)
output = pd.DataFrame({'Id': test_raw.index,
'SalePrice': pred_test_reversed})
output.to_csv('submission_ensemble_average.csv', index=False)