defcheck_load(df,num_records,num_columns):# Takes a dataframe and compares record and column counts to input# Message to return if the critera below aren't met message ='Validation Failed'# Check number of recordsif num_records == df.count():# Check number of columnsif num_columns ==len(df.columns):# Success message message ='Validation Passed'return message# Print the data validation messageprint(check_load(df, 5000, 74))# create list of actual dtypes to checkactual_dtypes_list = df.dtypesprint(actual_dtypes_list)# Iterate through the list of actual dtypes tuplesfor attribute_tuple in actual_dtypes_list:# Check if column name is dictionary of expected dtypes col_name = attribute_tuple[0]if col_name in validation_dict:# Compare attribute types col_type = attribute_tuple[1]if col_type == validation_dict[col_name]:print(col_name +' has expected dtype.')
#Meanpyspark.sql.functions.mean(col)df.agg({'SALESCLOSEPRICE': 'mean'}).collect()from pyspark.sql.functions import mean, stddev#Skewnesspyspark.sql.functions.skewness(col)from pyspark.sql.functions import skewnessprint(df.agg({'LISTPRICE': 'skewness'}).collect())#Minimumpyspark.sql.functions.min(col)#Covariancecov(col1, col2)df.cov('SALESCLOSEPRICE', 'YEARBUILT')#Correlationcorr(col1, col2)# Name and value of col with max corrcorr_max =0corr_max_col = columns[0]# Loop to check all columns contained in listfor col in columns:# Check the correlation of a pair of columns corr_val = df.corr(col, 'SALESCLOSEPRICE')# Logic to compare corr_max with current corr_valif corr_max < corr_val:# Update the column name and corr value corr_max = corr_val corr_max_col = colprint(corr_max_col)
如果想要用seaborn之类的包画图,要转成pands dataframe,所以要注意先做sampling,sample with replacement
# Sample 50% of the PySpark DataFrame and count rowsdf.sample(False, 0.5, 42).count()import seaborn as sns# Sample the dataframesample_df = df.select(['SALESCLOSEPRICE']).sample(False, 0.5, 42)# Convert the sample to a Pandas DataFramepandas_df = sample_df.toPandas()# Plot itsns.distplot(pandas_df)
还可以用lmplot() 来画两个column name的linear model plot. If they do they are good candidates to include in our analysis. If they don't it doesn't mean that we should throw them out, it means we may have to process or wrangle them before they can be used.
# Create new feature by adding two features togetherdf = df.withColumn('Total_SQFT', df['SQFTBELOWGROUND'] + df['SQFTABOVEGROUND'])# Create additional new feature using previously created featuredf = df.withColumn('BATHS_PER_1000SQFT', df['BATHSTOTAL'] / (df['Total_SQFT'] /1000))df[['BATHS_PER_1000SQFT']].describe().show()# Pandas dataframepandas_df = df.sample(False, 0.5, 0).toPandas()# Linear model plotssns.jointplot(x='Total_SQFT', y='SALESCLOSEPRICE', data=pandas_df, kind="reg", stat_func=r2)plt.show()sns.jointplot(x='BATHS_PER_1000SQFT', y='SALESCLOSEPRICE', data=pandas_df, kind="reg", stat_func=r2)plt.show()
还有上图这种叫countplot
# Import needed functionsfrom pyspark.sql.functions import to_date, dayofweek# Convert to date typedf = df.withColumn('LISTDATE', to_date('LISTDATE'))# Get the day of the weekdf = df.withColumn('List_Day_of_Week', dayofweek('LISTDATE'))# Sample and convert to pandas dataframesample_df = df.sample(False, 0.5, 42).toPandas()# Plot count plot of of day of weeksns.countplot(x="List_Day_of_Week", data=sample_df)plt.show()
Dropping Data
Where can data go bad?
Recorded wrong
Unique events
Formatted incorrectly
Duplications
Missing
Not relevant
For this dataset, 'NO' auto-generated record number, 'UNITNUMBER' irrelevant data, 'CLASS' all constant. spark的dropna和pandas df的很像,except有个*。代表unpack the list and apply to them all。
# List of columns to dropcols_to_drop = ['NO','UNITNUMBER','CLASS']# Drop the columnsdf = df.drop(*cols_to_drop)
也可以
Text Filtering
isin() is similar to like() but allows us to pass a list of values to use as a filter rather than a single one.
# Inspect unique values in the column 'ASSUMABLEMORTGAGE'df.select(['ASSUMABLEMORTGAGE']).distinct().show()# List of possible values containing 'yes'yes_values = ['Yes w/ Qualifying','Yes w/No Qualifying']# Filter the text values out of df but keep null valuestext_filter =~df['ASSUMABLEMORTGAGE'].isin(yes_values)| df['ASSUMABLEMORTGAGE'].isNull()df = df.where(text_filter)# Print count of remaining recordsprint(df.count())
Outlier Filtering
from pyspark.sql.functions import mean, stddev# Calculate values used for filteringstd_val = df.agg({'SALESCLOSEPRICE': 'stddev'}).collect()[0][0] mean_val = df.agg({'SALESCLOSEPRICE': 'mean'}).collect()[0][0]# Create three standard deviation (? ± 3?) upper and lower bounds for datahi_bound = mean_val + (3* std_val) low_bound = mean_val - (3* std_val)# Use where() to filter the DataFrame between valuesdf = df.where((df['LISTPRICE'] < hi_bound) & (df['LISTPRICE'] > low_bound))
Drop NA / NULL
DataFrame.dropna()how :‘any’or‘all’.If ‘any’,drop a record if it contains any nulls. If ‘all’, drop a record only if all its values are null.
thresh :int, default None. If speci ed, drop records that have less than thresh non-null values. This overwrites the how parameter.
subset :optional list of column names to consider.
# Drop any records with NULL valuesdf = df.dropna()# drop records if both LISTPRICE and SALESCLOSEPRICE are NULLdf = df.dropna(how='all', subset['LISTPRICE', 'SALESCLOSEPRICE '])# Drop records where at least two columns have NULL valuesdf = df.dropna(thresh=2)
Drop Duplicates
因为spark的分布式存储,这个操作不一定drop第一个出现的dup。
# Entire DataFramedf.dropDuplicates()# Check only a column listdf.dropDuplicates(['streetaddress'])
Adjusting Data
"Data does not give up its secrets easily, it must be tortured to confess."——Jeff Hooper, Bell Labs.
Min-Max Scaling
xi,j∗=xjmax−xjminxi,j−xjmin
# define min and max values and collect themmax_days = df.agg({'DAYSONMARKET': 'max'}).collect()[0][0] min_days = df.agg({'DAYSONMARKET': 'min'}).collect()[0][0]# create a new column based off the scaled datadf = df.withColumn("scaled_days",(df['DAYSONMARKET'] - min_days) / (max_days - min_days))df[['scaled_days']].show(5)
写一个wrapper
defmin_max_scaler(df,cols_to_scale):# Takes a dataframe and list of columns to minmax scale. Returns a dataframe.for col in cols_to_scale:# Define min and max values and collect them max_days = df.agg({col: 'max'}).collect()[0][0] min_days = df.agg({col: 'min'}).collect()[0][0] new_column_name ='scaled_'+ col# Create a new column based off the scaled data df = df.withColumn(new_column_name, (df[col] - min_days) / (max_days - min_days))return dfdf =min_max_scaler(df, cols_to_scale)# Show that our data is now between 0 and 1df[['DAYSONMARKET','scaled_DAYSONMARKET']].show()
Standardization - Z transform
mean_days = df.agg({'DAYSONMARKET': 'mean'}).collect()[0][0] stddev_days = df.agg({'DAYSONMARKET': 'stddev'}).collect()[0][0]
# Create a new column with the scaled datadf = df.withColumn("ztrans_days",(df['DAYSONMARKET'] - mean_days) / stddev_days)df.agg({'ztrans_days': 'mean'}).collect()df.agg({'ztrans_days': 'stddev'}).collect()
Log Scaling
下图是典型的positive skewed
# import the log functionfrom pyspark.sql.functions import log# Recalculate log of SALESCLOSEPRICEdf = df.withColumn('log_SalesClosePrice', log(df['SALESCLOSEPRICE']))
User Defined Scaling
比如对于房价,也可以用百分比来表示它在市场的时间。
# Define max and min values and collect themmax_days = df.agg({'DAYSONMARKET': 'max'}).collect()[0][0]min_days = df.agg({'DAYSONMARKET': 'min'}).collect()[0][0]# Create a new column based off the scaled datadf = df.withColumn('percentagesscaleddays', round((df['DAYSONMARKET'] - min_days) / (max_days - min_days)) *100)# Calc max and min for new columnprint(df.agg({'percentagesscaleddays': 'max'}).show())print(df.agg({'percentagesscaleddays': 'min'}).show())
用log后的值反映它的建成时间
from pyspark.sql.functions import log# Compute the skewnessprint(df.agg({'YEARBUILT': 'skewness'}).collect())# Calculate the max yearmax_year = df.agg({'YEARBUILT': 'max'}).collect()[0][0]# Create a new column of reflected datadf = df.withColumn('Reflect_YearBuilt', (max_year +1) - df['YEARBUILT'])# Create a new column based reflected datadf = df.withColumn('adj_yearbuilt', 1/log(df['Reflect_YearBuilt']))
Missing Value
How does data go missing in the digital age?
Data Collection
- Broken Sensors
Data Storage Rules
- 2017-01-01 vs January 1st, 2017
Joining Disparate Data
Monthly to Weekly
Intentionally Missing
- Privacy Concerns
Types of Missing
Missing completely at random
Missing Data is just a completely random subset
Missing at random
Missing conditionally at random based on another observation
Missing not at random
Data is missing because of how it is collected
When to drop rows with missing data?
Missing values are rare
Missing Completely at Random
isNull() df.where(df['ROOF'].isNull()).count()
True if the current expression is null.
超过某个阈值的时候drop
defcolumn_dropper(df,threshold):# Takes a dataframe and threshold for missing values. Returns a dataframe. total_records = df.count()for col in df.columns:# Calculate the percentage of missing values missing = df.where(df[col].isNull()).count() missing_percent = missing / total_records# Drop column if percent of missing is more than thresholdif missing_percent > threshold: df = df.drop(col)return df# Drop columns that are more than 60% missingdf =column_dropper(df, 0.6)
Plotting Missing Values
# Import libraryimport seaborn as sns # subset the dataframesub_df = df.select(['ROOMAREA1'])# sample the dataframesample_df = sub_df.sample(False, .5, 4)# Convert to Pandas DataFramepandas_df = sample_df.toPandas()# Plot itsns.heatmap(data=pandas_df.isnull())#convert it DataFrame into True/False
比如这张图里,可以看出来第二个column全都是missing
Imputation of Missing Value
Process of replacing missing values
Rule Based: Value based on business logic
Statistics Based: Using mean, median, etc
Model Based: Use model to predict value
Imputation of Missing Values
** fillna(value, subset=None) value the value to replace missings with
subset the list of column names to replace missings
# Replacing missing values with zerodf.fillna(0, subset=['DAYSONMARKET'])# Replacing with the mean value for that columncol_mean = df.agg({'DAYSONMARKET': 'mean'}).collect()[0][0] df.fillna(col_mean, subset=['DAYSONMARKET'])
Getting more data
External DataSource
Thoughts on External Data Sets
Pros
Cons
Add important predictors
May 'bog' analysis down
Supplement/replace values
Easy to induce data leakage
Cheap or easy to obtain
Become data set subject matter expert
Join - PySpark DataFrame
DataFrame.join( other,on=None, how=None)# Other DataFrame to merge# The keys to join on# Type of join to perform (default is 'inner')# Inspect dataframe headhdf.show(2)# Specify join conditoncond = [df['OFFMARKETDATE']== hdf['dt']]# Join two hdf onto dfdf = df.join(hdf, on=cond, 'left')# How many sales occurred on bank holidays? df.where(~df['nm'].isNull()).count()
Join - SparkSQL Join
# Register the dataframe as a temp tabledf.createOrReplaceTempView("df")hdf.createOrReplaceTempView("hdf")# Write a SQL Statementsql_df = spark.sql(""" SELECT* FROM dfLEFT JOIN hdfON df.OFFMARKETDATE = hdf.dt """)
Please keep in mind that PySpark's week starts on Sunday, with a value of 1 and ends on Saturday, a value of 7.
from pyspark.sql.functions import to_date # Cast the data type to Datedf = df.withColumn('LISTDATE', to_date('LISTDATE'))# Inspect the fielddf[['LISTDATE']].show(2)from pyspark.sql.functions import year, month # Create a new column of year numberdf = df.withColumn('LIST_YEAR', year('LISTDATE'))# Create a new column of month numberdf = df.withColumn('LIST_MONTH', month('LISTDATE'))from pyspark.sql.functions import dayofmonth, weekofyear# Create new columns of the day number within the monthdf = df.withColumn('LIST_DAYOFMONTH', dayofmonth('LISTDATE'))# Create new columns of the week number within the yeardf = df.withColumn('LIST_WEEKOFYEAR', weekofyear('LISTDATE'))from pyspark.sql.functions import datediff# Calculate difference between two date fieldsdf.withColumn('DAYSONMARKET', datediff('OFFMARKETDATE', 'LISTDATE'))
Lagging Features
window()Returns a record based off a group of records
lag(col, count=1)Returns the value that is offset by rows before the current row
from pyspark.sql.functions import lag from pyspark.sql.window import Window# Create Windoww =Window().orderBy(m_df['DATE'])# Create lagged columnm_df = m_df.withColumn('MORTGAGE-1wk', lag('MORTGAGE', count=1).over(w))# Inspect resultsm_df.show(3)
Extract Features
Extract with Text Match
from pyspark.sql.functions import when# Create boolean filtersfind_under_8 = df['ROOF'].like('%Age 8 Years or Less%')find_over_8 = df['ROOF'].like('%Age Over 8 Years%')# Apply filters using when() and otherwise()df = df.withColumn('old_roof', (when(find_over_8, 1) .when(find_under_8, 0) .otherwise(None)))df[['ROOF','old_roof']].show(3, truncate=100)
Split Columns
from pyspark.sql.functions import split # Split the column on commas into a listsplit_col =split(df['ROOF'], ',')# Put the first value of the list into a new columndf = df.withColumn('Roof_Material', split_col.getItem(0))# Inspect resultsdf[['ROOF','Roof_Material']].show(5, truncate=100)
Explode
Pivot
# Explode & Pivot!from pyspark.sql.functions import split, explode, lit, coalesce, first # Split the column on commas into a listdf = df.withColumn('roof_list', split(df['ROOF'], ', '))# Explode list into new records for each valueex_df = df.withColumn('ex_roof_list', explode(df['roof_list']))# Create a dummy column of constant valueex_df = ex_df.withColumn('constant_val', lit(1))# Pivot the values into boolean columnspiv_df = ex_df.groupBy('NO').pivot('ex_roof_list')\.agg(coalesce(first('constant_val')))
接下来,join回到原来的df
# Pivot piv_df = ex_df.groupBy('NO').pivot('ex_garage_list').agg(coalesce(first('constant_val')))# Join the dataframes together and fill nulljoined_df = df.join(piv_df, on='NO', how='left')# Columns to zero fillzfill_cols = piv_df.columns# Zero fill the pivoted valueszfilled_df = joined_df.fillna(0, subset=zfill_cols)
from pyspark.ml.feature import Binarizer # Cast the data type to doubledf = df.withColumn('FIREPLACES', df['FIREPLACES'].cast('double'))# Create binarizing transformerbin=Binarizer(threshold=0.0, inputCol='FIREPLACES', outputCol='FireplaceT')# Apply the transformerdf =bin.transform(df)# Inspect the resultsdf[['FIREPLACES','FireplaceT']].show(3)
Bucketing
设置一批区间,把数据bin在一起 比如当看到dist()里后面的数据比较少,就可以考虑合并掉它们。
from pyspark.ml.feature import Bucketizer # Define how to split datasplits = [0,1,2,3,4,float('Inf')]# Create bucketing transformerbuck =Bucketizer(splits=splits, inputCol='BATHSTOTAL', outputCol='baths')# Apply transformerdf = buck.transform(df)# Inspect resultsdf[['BATHSTOTAL','baths']].show(4)
# Create variables for max and min dates in our datasetmax_date = df.agg({'OFFMKTDATE': 'max'}).collect()[0][0] min_date = df.agg({'OFFMKTDATE': 'min'}).collect()[0][0]# Find how many days our data spansfrom pyspark.sql.functions import datediff range_in_days =datediff(max_date, min_date)# Find the date to split the dataset onfrom pyspark.sql.functions import date_add split_in_days =round(range_in_days *0.8)split_date =date_add(min_date, split_in_days)# Split the data into 80% train, 20% testtrain_df = df.where(df['OFFMKTDATE'] < split_date)test_df = df.where(df['OFFMKTDATE'] >= split_date)\.where(df['LISTDATE'] >= split_date)
写个wrapper
deftrain_test_split_date(df,split_col,test_days=45):"""Calculate the date to split test and training sets"""# Find how many days our data spans max_date = df.agg({split_col: 'max'}).collect()[0][0] min_date = df.agg({split_col: 'min'}).collect()[0][0]# Subtract an integer number of days from the last date in dataset split_date = max_date -timedelta(days=test_days)return split_date# Find the date to use in spitting test and trainsplit_date =train_test_split_date(df, 'OFFMKTDATE')# Create Sequential Test and Training Setstrain_df = df.where(df['OFFMKTDATE'] < split_date)test_df = df.where(df['OFFMKTDATE'] >= split_date).where(df['LISTDATE'] <= split_date)
Time Series Data Leakage
Data leakage will cause your model to have very optimistic metrics for accuracy but once real data is run through it the results are often very disappointing.
DAYSONMARKET only reflects what information we have at the time of predicting the value. I.e., if the house is still on the market, we don't know how many more days it will stay on the market. We need to adjust our test_df to reflect what information we currently have as of 2017-12-10.
NOTE: This example will use the lit() function. This function is used to allow single values where an entire column is expected in a function call.
Thinking critically about what information would be available at the time of prediction is crucial in having accurate model metrics and saves a lot of embarrassment down the road if decisions are being made based off your results!
from pyspark.sql.functions import lit, datediff, to_datesplit_date =to_date(lit('2017-12-10'))# Create Sequential Test settest_df = df.where(df['OFFMKTDATE'] >= split_date).where(df['LISTDATE'] <= split_date)# Create a copy of DAYSONMARKET to review latertest_df = test_df.withColumn('DAYSONMARKET_Original', test_df['DAYSONMARKET'])# Recalculate DAYSONMARKET from what we know on our split datetest_df = test_df.withColumn('DAYSONMARKET', datediff(split_date, 'LISTDATE'))# Review the differencetest_df[['LISTDATE','OFFMKTDATE','DAYSONMARKET_Original','DAYSONMARKET']].show()
Dataframe Columns to Feature Vectors
from pyspark.ml.feature import VectorAssembler # Replace Missing valuesdf = df.fillna(-1)# Define the columns to be converted to vectorsfeatures_cols =list(df.columns)# Remove the dependent variable from the listfeatures_cols.remove('SALESCLOSEPRICE')# Create the vector assembler transformervec =VectorAssembler(inputCols=features_cols, outputCol='features')# Apply the vector transformer to datadf = vec.transform(df)# Select only the feature vectors and the dependent variableml_ready_df = df.select(['SALESCLOSEPRICE', 'features'])# Inspect Resultsml_ready_df.show(5)
Drop Columns with Low Observations
一般低于30就不要了,因为不具有统计意义。Removing low observation features is helpful in many ways. It can improve processing speed of model training, prevent overfitting by coincidence and help interpretability by reducing the number of things to consider.
obs_threshold =30cols_to_remove =list()# Inspect first 10 binary columns in listfor col in binary_cols[0:10]:# Count the number of 1 values in the binary column obs_count = df.agg({col:'sum'}).collect()[0][0]# If less than our observation threshold, removeif obs_count <= obs_threshold: cols_to_remove.append(col)# Drop columns and print starting and ending dataframe shapesnew_df = df.drop(*cols_to_remove)print('Rows: '+str(df.count()) +' Columns: '+str(len(df.columns)))print('Rows: '+str(new_df.count()) +' Columns: '+str(len(new_df.columns)))
Random Forest, Naively Handling Missing and Categorical Values
对missing value友好,不需要minmax scale,对skewness不敏感,不需要onehotencod。 Missing values are handled by Random Forests internally where they partition on missing values. As long as you replace them with something outside of the range of normal values, they will be handled correctly.
Likewise, categorical features only need to be mapped to numbers, they are fine to stay all in one column by using a StringIndexer as we saw in chapter 3. OneHot encoding which converts each possible value to its own boolean feature is not needed.
# Replace missing valuesdf = df.fillna(-1, subset=['WALKSCORE', 'BIKESCORE'])# Create list of StringIndexers using list comprehensionindexers = [StringIndexer(inputCol=column, outputCol=column+"_IDX")\.setHandleInvalid("keep")for column in categorical_cols]# Create pipeline of indexersindexer_pipeline =Pipeline(stages=indexers)# Fit and Transform the pipeline to the original datadf_indexed = indexer_pipeline.fit(df).transform(df)# Clean up redundant columnsdf_indexed = df_indexed.drop(*categorical_cols)# Inspect data transformationsprint(df_indexed.dtypes)
Building Model
Training /Predicting with a Random Forest
from pyspark.ml.regression import RandomForestRegressor# Initialize model with columns to utilize rf =RandomForestRegressor(featuresCol="features", labelCol="SALESCLOSEPRICE", predictionCol="Prediction_Price", seed=42 )# Train modelmodel = rf.fit(train_df)# Make predictionspredictions = model.transform(test_df)# Inspect resultspredictions.select("Prediction_Price", "SALESCLOSEPRICE").show(5)
from pyspark.ml.regression import GBTRegressor# Train a Gradient Boosted Trees (GBT) model.gbt =GBTRegressor(featuresCol='features', labelCol="SALESCLOSEPRICE", predictionCol="Prediction_Price", seed=42 )# Train model.model = gbt.fit(train_df)
Evaluate a Model
from pyspark.ml.evaluation import RegressionEvaluator# Select columns to compute test errorevaluator =RegressionEvaluator(labelCol="SALESCLOSEPRICE", predictionCol="Prediction_Price")# Create evaluation metricsrmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})# Print Model Metricsprint('RMSE: '+str(rmse))print('R^2: '+str(r2))
写个wrapper
from pyspark.ml.evaluation import RegressionEvaluator# Select columns to compute test errorevaluator =RegressionEvaluator(labelcol='SALESCLOSEPRICE', predictionCol='Prediction_Price')# Dictionary of model predictions to loop overmodels ={'Gradient Boosted Trees': gbt_predictions,'Random Forest Regression': rfr_predictions}for key, preds in models.items():# Create evaluation metrics rmse = evaluator.evaluate(preds, {evaluator.metricName: 'rmse'}) r2 = evaluator.evaluate(preds, {evaluator.metricName: 'r2'})# Print Model Metricsprint(key +' RMSE: '+str(rmse))print(key +' R^2: '+str(r2))
R^2 is comparable across predictions regardless of dependent variable.
RMSE is comparable across predictions looking at the same dependent variable.
RMSE is a measure of unexplained variance in the dependent variable.
Interpret a Model
import pandas as pd# Convert feature importances to a pandas columnfi_df = pd.DataFrame(model.featureImportances.toArray(), columns=['importance'])# Convert list of feature names to pandas columnfi_df['feature']= pd.Series(feature_cols)# Sort the data based on feature importancefi_df.sort_values(by=['importance'], ascending=False, inplace=True)# Interpret resultsfi_df.head(9)
Save and Load the Model
from pyspark.ml.regression import RandomForestRegressionModel# Save modelmodel.save('rfr_no_listprice')# Load modelloaded_model = RandomForestRegressionModel.load('rfr_no_listprice')