def check_load(df, num_records, num_columns):
# Takes a dataframe and compares record and column counts to input
# Message to return if the critera below aren't met
message = 'Validation Failed'
# Check number of records
if num_records == df.count():
# Check number of columns
if num_columns == len(df.columns):
# Success message
message = 'Validation Passed'
return message
# Print the data validation message
print(check_load(df, 5000, 74))
# create list of actual dtypes to check
actual_dtypes_list = df.dtypes
print(actual_dtypes_list)
# Iterate through the list of actual dtypes tuples
for attribute_tuple in actual_dtypes_list:
# Check if column name is dictionary of expected dtypes
col_name = attribute_tuple[0]
if col_name in validation_dict:
# Compare attribute types
col_type = attribute_tuple[1]
if col_type == validation_dict[col_name]:
print(col_name + ' has expected dtype.')
#Mean
pyspark.sql.functions.mean(col)
df.agg({'SALESCLOSEPRICE': 'mean'}).collect()
from pyspark.sql.functions import mean, stddev
#Skewness
pyspark.sql.functions.skewness(col)
from pyspark.sql.functions import skewness
print(df.agg({'LISTPRICE': 'skewness'}).collect())
#Minimum
pyspark.sql.functions.min(col)
#Covariance
cov(col1, col2)
df.cov('SALESCLOSEPRICE', 'YEARBUILT')
#Correlation
corr(col1, col2)
# Name and value of col with max corr
corr_max = 0
corr_max_col = columns[0]
# Loop to check all columns contained in list
for col in columns:
# Check the correlation of a pair of columns
corr_val = df.corr(col, 'SALESCLOSEPRICE')
# Logic to compare corr_max with current corr_val
if corr_max < corr_val:
# Update the column name and corr value
corr_max = corr_val
corr_max_col = col
print(corr_max_col)
如果想要用seaborn之类的包画图,要转成pands dataframe,所以要注意先做sampling,sample with replacement
# Sample 50% of the PySpark DataFrame and count rows
df.sample(False, 0.5, 42).count()
import seaborn as sns
# Sample the dataframe
sample_df = df.select(['SALESCLOSEPRICE']).sample(False, 0.5, 42) # Convert the sample to a Pandas DataFrame
pandas_df = sample_df.toPandas()
# Plot it
sns.distplot(pandas_df)
还可以用lmplot() 来画两个column name的linear model plot. If they do they are good candidates to include in our analysis. If they don't it doesn't mean that we should throw them out, it means we may have to process or wrangle them before they can be used.
# Create new feature by adding two features together
df = df.withColumn('Total_SQFT', df['SQFTBELOWGROUND'] + df['SQFTABOVEGROUND'])
# Create additional new feature using previously created feature
df = df.withColumn('BATHS_PER_1000SQFT', df['BATHSTOTAL'] / (df['Total_SQFT'] / 1000))
df[['BATHS_PER_1000SQFT']].describe().show()
# Pandas dataframe
pandas_df = df.sample(False, 0.5, 0).toPandas()
# Linear model plots
sns.jointplot(x='Total_SQFT', y='SALESCLOSEPRICE', data=pandas_df, kind="reg", stat_func=r2)
plt.show()
sns.jointplot(x='BATHS_PER_1000SQFT', y='SALESCLOSEPRICE', data=pandas_df, kind="reg", stat_func=r2)
plt.show()
还有上图这种叫countplot
# Import needed functions
from pyspark.sql.functions import to_date, dayofweek
# Convert to date type
df = df.withColumn('LISTDATE', to_date('LISTDATE'))
# Get the day of the week
df = df.withColumn('List_Day_of_Week', dayofweek('LISTDATE'))
# Sample and convert to pandas dataframe
sample_df = df.sample(False, 0.5, 42).toPandas()
# Plot count plot of of day of week
sns.countplot(x="List_Day_of_Week", data=sample_df)
plt.show()
Dropping Data
Where can data go bad?
Recorded wrong
Unique events
Formatted incorrectly
Duplications
Missing
Not relevant
For this dataset, 'NO' auto-generated record number, 'UNITNUMBER' irrelevant data, 'CLASS' all constant. spark的dropna和pandas df的很像,except有个*。代表unpack the list and apply to them all。
# List of columns to drop
cols_to_drop = ['NO', 'UNITNUMBER', 'CLASS']
# Drop the columns
df = df.drop(*cols_to_drop)
也可以
Text Filtering
isin() is similar to like() but allows us to pass a list of values to use as a filter rather than a single one.
# Inspect unique values in the column 'ASSUMABLEMORTGAGE'
df.select(['ASSUMABLEMORTGAGE']).distinct().show()
# List of possible values containing 'yes'
yes_values = ['Yes w/ Qualifying', 'Yes w/No Qualifying']
# Filter the text values out of df but keep null values
text_filter = ~df['ASSUMABLEMORTGAGE'].isin(yes_values) | df['ASSUMABLEMORTGAGE'].isNull()
df = df.where(text_filter)
# Print count of remaining records
print(df.count())
Outlier Filtering
from pyspark.sql.functions import mean, stddev
# Calculate values used for filtering
std_val = df.agg({'SALESCLOSEPRICE': 'stddev'}).collect()[0][0]
mean_val = df.agg({'SALESCLOSEPRICE': 'mean'}).collect()[0][0]
# Create three standard deviation (? ± 3?) upper and lower bounds for data
hi_bound = mean_val + (3 * std_val)
low_bound = mean_val - (3 * std_val)
# Use where() to filter the DataFrame between values
df = df.where((df['LISTPRICE'] < hi_bound) & (df['LISTPRICE'] > low_bound))
Drop NA / NULL
DataFrame.dropna()how :‘any’or‘all’.If ‘any’,drop a record if it contains any nulls. If ‘all’, drop a record only if all its values are null.
thresh :int, default None. If speci ed, drop records that have less than thresh non-null values. This overwrites the how parameter.
subset :optional list of column names to consider.
# Drop any records with NULL values
df = df.dropna()
# drop records if both LISTPRICE and SALESCLOSEPRICE are NULL
df = df.dropna(how='all', subset['LISTPRICE', 'SALESCLOSEPRICE '])
# Drop records where at least two columns have NULL values
df = df.dropna(thresh=2)
Drop Duplicates
因为spark的分布式存储,这个操作不一定drop第一个出现的dup。
# Entire DataFrame
df.dropDuplicates()
# Check only a column list
df.dropDuplicates(['streetaddress'])
Adjusting Data
"Data does not give up its secrets easily, it must be tortured to confess."——Jeff Hooper, Bell Labs.
Min-Max Scaling
xi,j∗=xjmax−xjminxi,j−xjmin
# define min and max values and collect them
max_days = df.agg({'DAYSONMARKET': 'max'}).collect()[0][0] min_days = df.agg({'DAYSONMARKET': 'min'}).collect()[0][0]
# create a new column based off the scaled data
df = df.withColumn("scaled_days",
(df['DAYSONMARKET'] - min_days) / (max_days - min_days))
df[['scaled_days']].show(5)
写一个wrapper
def min_max_scaler(df, cols_to_scale):
# Takes a dataframe and list of columns to minmax scale. Returns a dataframe.
for col in cols_to_scale:
# Define min and max values and collect them
max_days = df.agg({col: 'max'}).collect()[0][0]
min_days = df.agg({col: 'min'}).collect()[0][0]
new_column_name = 'scaled_' + col
# Create a new column based off the scaled data
df = df.withColumn(new_column_name,
(df[col] - min_days) / (max_days - min_days))
return df
df = min_max_scaler(df, cols_to_scale)
# Show that our data is now between 0 and 1
df[['DAYSONMARKET', 'scaled_DAYSONMARKET']].show()
Standardization - Z transform
mean_days = df.agg({'DAYSONMARKET': 'mean'}).collect()[0][0] stddev_days = df.agg({'DAYSONMARKET': 'stddev'}).collect()[0][0]
# Create a new column with the scaled data
df = df.withColumn("ztrans_days",
(df['DAYSONMARKET'] - mean_days) / stddev_days)
df.agg({'ztrans_days': 'mean'}).collect()
df.agg({'ztrans_days': 'stddev'}).collect()
Log Scaling
下图是典型的positive skewed
# import the log function
from pyspark.sql.functions import log
# Recalculate log of SALESCLOSEPRICE
df = df.withColumn('log_SalesClosePrice', log(df['SALESCLOSEPRICE']))
User Defined Scaling
比如对于房价,也可以用百分比来表示它在市场的时间。
# Define max and min values and collect them
max_days = df.agg({'DAYSONMARKET': 'max'}).collect()[0][0]
min_days = df.agg({'DAYSONMARKET': 'min'}).collect()[0][0]
# Create a new column based off the scaled data
df = df.withColumn('percentagesscaleddays',
round((df['DAYSONMARKET'] - min_days) / (max_days - min_days)) * 100)
# Calc max and min for new column
print(df.agg({'percentagesscaleddays': 'max'}).show())
print(df.agg({'percentagesscaleddays': 'min'}).show())
用log后的值反映它的建成时间
from pyspark.sql.functions import log
# Compute the skewness
print(df.agg({'YEARBUILT': 'skewness'}).collect())
# Calculate the max year
max_year = df.agg({'YEARBUILT': 'max'}).collect()[0][0]
# Create a new column of reflected data
df = df.withColumn('Reflect_YearBuilt', (max_year + 1) - df['YEARBUILT'])
# Create a new column based reflected data
df = df.withColumn('adj_yearbuilt', 1 / log(df['Reflect_YearBuilt']))
Missing Value
How does data go missing in the digital age?
Data Collection
- Broken Sensors
Data Storage Rules
- 2017-01-01 vs January 1st, 2017
Joining Disparate Data
Monthly to Weekly
Intentionally Missing
- Privacy Concerns
Types of Missing
Missing completely at random
Missing Data is just a completely random subset
Missing at random
Missing conditionally at random based on another observation
Missing not at random
Data is missing because of how it is collected
When to drop rows with missing data?
Missing values are rare
Missing Completely at Random
isNull() df.where(df['ROOF'].isNull()).count()
True if the current expression is null.
超过某个阈值的时候drop
def column_dropper(df, threshold):
# Takes a dataframe and threshold for missing values. Returns a dataframe.
total_records = df.count()
for col in df.columns:
# Calculate the percentage of missing values
missing = df.where(df[col].isNull()).count()
missing_percent = missing / total_records
# Drop column if percent of missing is more than threshold
if missing_percent > threshold:
df = df.drop(col)
return df
# Drop columns that are more than 60% missing
df = column_dropper(df, 0.6)
Plotting Missing Values
# Import library
import seaborn as sns # subset the dataframe
sub_df = df.select(['ROOMAREA1']) # sample the dataframe
sample_df = sub_df.sample(False, .5, 4) # Convert to Pandas DataFrame
pandas_df = sample_df.toPandas()
# Plot it
sns.heatmap(data=pandas_df.isnull()) #convert it DataFrame into True/False
比如这张图里,可以看出来第二个column全都是missing
Imputation of Missing Value
Process of replacing missing values
Rule Based: Value based on business logic
Statistics Based: Using mean, median, etc
Model Based: Use model to predict value
Imputation of Missing Values
** fillna(value, subset=None) value the value to replace missings with
subset the list of column names to replace missings
# Replacing missing values with zero
df.fillna(0, subset=['DAYSONMARKET'])
# Replacing with the mean value for that column
col_mean = df.agg({'DAYSONMARKET': 'mean'}).collect()[0][0]
df.fillna(col_mean, subset=['DAYSONMARKET'])
Getting more data
External DataSource
Thoughts on External Data Sets
Pros
Cons
Add important predictors
May 'bog' analysis down
Supplement/replace values
Easy to induce data leakage
Cheap or easy to obtain
Become data set subject matter expert
Join - PySpark DataFrame
DataFrame.join( other,
on=None, how=None)
# Other DataFrame to merge
# The keys to join on
# Type of join to perform (default is 'inner')
# Inspect dataframe head
hdf.show(2)
# Specify join conditon
cond = [df['OFFMARKETDATE'] == hdf['dt']]
# Join two hdf onto df
df = df.join(hdf, on=cond, 'left')
# How many sales occurred on bank holidays? df.where(~df['nm'].isNull()).count()
Join - SparkSQL Join
# Register the dataframe as a temp table
df.createOrReplaceTempView("df")
hdf.createOrReplaceTempView("hdf")
# Write a SQL Statement
sql_df = spark.sql(""" SELECT
* FROM df
LEFT JOIN hdf
ON df.OFFMARKETDATE = hdf.dt """)
Please keep in mind that PySpark's week starts on Sunday, with a value of 1 and ends on Saturday, a value of 7.
from pyspark.sql.functions import to_date
# Cast the data type to Date
df = df.withColumn('LISTDATE', to_date('LISTDATE'))
# Inspect the field
df[['LISTDATE']].show(2)
from pyspark.sql.functions import year, month
# Create a new column of year number
df = df.withColumn('LIST_YEAR', year('LISTDATE'))
# Create a new column of month number
df = df.withColumn('LIST_MONTH', month('LISTDATE'))
from pyspark.sql.functions import dayofmonth, weekofyear
# Create new columns of the day number within the month
df = df.withColumn('LIST_DAYOFMONTH', dayofmonth('LISTDATE'))
# Create new columns of the week number within the year
df = df.withColumn('LIST_WEEKOFYEAR', weekofyear('LISTDATE'))
from pyspark.sql.functions import datediff
# Calculate difference between two date fields
df.withColumn('DAYSONMARKET', datediff('OFFMARKETDATE', 'LISTDATE'))
Lagging Features
window()Returns a record based off a group of records
lag(col, count=1)Returns the value that is offset by rows before the current row
from pyspark.sql.functions import lag
from pyspark.sql.window import Window
# Create Window
w = Window().orderBy(m_df['DATE'])
# Create lagged column
m_df = m_df.withColumn('MORTGAGE-1wk', lag('MORTGAGE', count=1).over(w))
# Inspect results
m_df.show(3)
Extract Features
Extract with Text Match
from pyspark.sql.functions import when
# Create boolean filters
find_under_8 = df['ROOF'].like('%Age 8 Years or Less%')
find_over_8 = df['ROOF'].like('%Age Over 8 Years%')
# Apply filters using when() and otherwise()
df = df.withColumn('old_roof', (when(find_over_8, 1)
.when(find_under_8, 0)
.otherwise(None)))
df[['ROOF', 'old_roof']].show(3, truncate=100)
Split Columns
from pyspark.sql.functions import split
# Split the column on commas into a list
split_col = split(df['ROOF'], ',')
# Put the first value of the list into a new column
df = df.withColumn('Roof_Material', split_col.getItem(0))
# Inspect results
df[['ROOF', 'Roof_Material']].show(5, truncate=100)
Explode
Pivot
# Explode & Pivot!
from pyspark.sql.functions import split, explode, lit, coalesce, first
# Split the column on commas into a list
df = df.withColumn('roof_list', split(df['ROOF'], ', '))
# Explode list into new records for each value
ex_df = df.withColumn('ex_roof_list', explode(df['roof_list']))
# Create a dummy column of constant value
ex_df = ex_df.withColumn('constant_val', lit(1))
# Pivot the values into boolean columns
piv_df = ex_df.groupBy('NO').pivot('ex_roof_list')\
.agg(coalesce(first('constant_val')))
接下来,join回到原来的df
# Pivot
piv_df = ex_df.groupBy('NO').pivot('ex_garage_list').agg(coalesce(first('constant_val')))
# Join the dataframes together and fill null
joined_df = df.join(piv_df, on='NO', how='left')
# Columns to zero fill
zfill_cols = piv_df.columns
# Zero fill the pivoted values
zfilled_df = joined_df.fillna(0, subset=zfill_cols)
from pyspark.ml.feature import Binarizer # Cast the data type to double
df = df.withColumn('FIREPLACES', df['FIREPLACES'].cast('double'))
# Create binarizing transformer
bin = Binarizer(threshold=0.0, inputCol='FIREPLACES', outputCol='FireplaceT')
# Apply the transformer
df = bin.transform(df)
# Inspect the results
df[['FIREPLACES','FireplaceT']].show(3)
Bucketing
设置一批区间,把数据bin在一起 比如当看到dist()里后面的数据比较少,就可以考虑合并掉它们。
from pyspark.ml.feature import Bucketizer # Define how to split data
splits = [0, 1, 2, 3, 4, float('Inf')]
# Create bucketing transformer
buck = Bucketizer(splits=splits, inputCol='BATHSTOTAL', outputCol='baths')
# Apply transformer
df = buck.transform(df)
# Inspect results
df[['BATHSTOTAL', 'baths']].show(4)
# Create variables for max and min dates in our dataset
max_date = df.agg({'OFFMKTDATE': 'max'}).collect()[0][0]
min_date = df.agg({'OFFMKTDATE': 'min'}).collect()[0][0]
# Find how many days our data spans
from pyspark.sql.functions import datediff
range_in_days = datediff(max_date, min_date)
# Find the date to split the dataset on
from pyspark.sql.functions import date_add
split_in_days = round(range_in_days * 0.8)
split_date = date_add(min_date, split_in_days)
# Split the data into 80% train, 20% test
train_df = df.where(df['OFFMKTDATE'] < split_date)
test_df = df.where(df['OFFMKTDATE'] >= split_date)\
.where(df['LISTDATE'] >= split_date)
写个wrapper
def train_test_split_date(df, split_col, test_days=45):
"""Calculate the date to split test and training sets"""
# Find how many days our data spans
max_date = df.agg({split_col: 'max'}).collect()[0][0]
min_date = df.agg({split_col: 'min'}).collect()[0][0]
# Subtract an integer number of days from the last date in dataset
split_date = max_date - timedelta(days=test_days)
return split_date
# Find the date to use in spitting test and train
split_date = train_test_split_date(df, 'OFFMKTDATE')
# Create Sequential Test and Training Sets
train_df = df.where(df['OFFMKTDATE'] < split_date)
test_df = df.where(df['OFFMKTDATE'] >= split_date).where(df['LISTDATE'] <= split_date)
Time Series Data Leakage
Data leakage will cause your model to have very optimistic metrics for accuracy but once real data is run through it the results are often very disappointing.
DAYSONMARKET only reflects what information we have at the time of predicting the value. I.e., if the house is still on the market, we don't know how many more days it will stay on the market. We need to adjust our test_df to reflect what information we currently have as of 2017-12-10.
NOTE: This example will use the lit() function. This function is used to allow single values where an entire column is expected in a function call.
Thinking critically about what information would be available at the time of prediction is crucial in having accurate model metrics and saves a lot of embarrassment down the road if decisions are being made based off your results!
from pyspark.sql.functions import lit, datediff, to_date
split_date = to_date(lit('2017-12-10'))
# Create Sequential Test set
test_df = df.where(df['OFFMKTDATE'] >= split_date).where(df['LISTDATE'] <= split_date)
# Create a copy of DAYSONMARKET to review later
test_df = test_df.withColumn('DAYSONMARKET_Original', test_df['DAYSONMARKET'])
# Recalculate DAYSONMARKET from what we know on our split date
test_df = test_df.withColumn('DAYSONMARKET', datediff(split_date, 'LISTDATE'))
# Review the difference
test_df[['LISTDATE', 'OFFMKTDATE', 'DAYSONMARKET_Original', 'DAYSONMARKET']].show()
Dataframe Columns to Feature Vectors
from pyspark.ml.feature import VectorAssembler
# Replace Missing values
df = df.fillna(-1)
# Define the columns to be converted to vectors
features_cols = list(df.columns)
# Remove the dependent variable from the list
features_cols.remove('SALESCLOSEPRICE')
# Create the vector assembler transformer
vec = VectorAssembler(inputCols=features_cols, outputCol='features')
# Apply the vector transformer to data
df = vec.transform(df)
# Select only the feature vectors and the dependent variable
ml_ready_df = df.select(['SALESCLOSEPRICE', 'features'])
# Inspect Results
ml_ready_df.show(5)
Drop Columns with Low Observations
一般低于30就不要了,因为不具有统计意义。Removing low observation features is helpful in many ways. It can improve processing speed of model training, prevent overfitting by coincidence and help interpretability by reducing the number of things to consider.
obs_threshold = 30
cols_to_remove = list()
# Inspect first 10 binary columns in list
for col in binary_cols[0:10]:
# Count the number of 1 values in the binary column
obs_count = df.agg({col:'sum'}).collect()[0][0]
# If less than our observation threshold, remove
if obs_count <= obs_threshold:
cols_to_remove.append(col)
# Drop columns and print starting and ending dataframe shapes
new_df = df.drop(*cols_to_remove)
print('Rows: ' + str(df.count()) + ' Columns: ' + str(len(df.columns)))
print('Rows: ' + str(new_df.count()) + ' Columns: ' + str(len(new_df.columns)))
Random Forest, Naively Handling Missing and Categorical Values
对missing value友好,不需要minmax scale,对skewness不敏感,不需要onehotencod。 Missing values are handled by Random Forests internally where they partition on missing values. As long as you replace them with something outside of the range of normal values, they will be handled correctly.
Likewise, categorical features only need to be mapped to numbers, they are fine to stay all in one column by using a StringIndexer as we saw in chapter 3. OneHot encoding which converts each possible value to its own boolean feature is not needed.
# Replace missing values
df = df.fillna(-1, subset=['WALKSCORE', 'BIKESCORE'])
# Create list of StringIndexers using list comprehension
indexers = [StringIndexer(inputCol=column, outputCol=column+"_IDX")\
.setHandleInvalid("keep") for column in categorical_cols]
# Create pipeline of indexers
indexer_pipeline = Pipeline(stages=indexers)
# Fit and Transform the pipeline to the original data
df_indexed = indexer_pipeline.fit(df).transform(df)
# Clean up redundant columns
df_indexed = df_indexed.drop(*categorical_cols)
# Inspect data transformations
print(df_indexed.dtypes)
Building Model
Training /Predicting with a Random Forest
from pyspark.ml.regression import RandomForestRegressor
# Initialize model with columns to utilize
rf = RandomForestRegressor(featuresCol="features",
labelCol="SALESCLOSEPRICE",
predictionCol="Prediction_Price",
seed=42
)
# Train model
model = rf.fit(train_df)
# Make predictions
predictions = model.transform(test_df)
# Inspect results
predictions.select("Prediction_Price", "SALESCLOSEPRICE").show(5)
from pyspark.ml.regression import GBTRegressor
# Train a Gradient Boosted Trees (GBT) model.
gbt = GBTRegressor(featuresCol='features',
labelCol="SALESCLOSEPRICE",
predictionCol="Prediction_Price",
seed=42
)
# Train model.
model = gbt.fit(train_df)
from pyspark.ml.evaluation import RegressionEvaluator
# Select columns to compute test error
evaluator = RegressionEvaluator(labelcol='SALESCLOSEPRICE',
predictionCol='Prediction_Price')
# Dictionary of model predictions to loop over
models = {'Gradient Boosted Trees': gbt_predictions, 'Random Forest Regression': rfr_predictions}
for key, preds in models.items():
# Create evaluation metrics
rmse = evaluator.evaluate(preds, {evaluator.metricName: 'rmse'})
r2 = evaluator.evaluate(preds, {evaluator.metricName: 'r2'})
# Print Model Metrics
print(key + ' RMSE: ' + str(rmse))
print(key + ' R^2: ' + str(r2))
R^2 is comparable across predictions regardless of dependent variable.
RMSE is comparable across predictions looking at the same dependent variable.
RMSE is a measure of unexplained variance in the dependent variable.
Interpret a Model
import pandas as pd
# Convert feature importances to a pandas column
fi_df = pd.DataFrame(model.featureImportances.toArray(), columns=['importance'])
# Convert list of feature names to pandas column
fi_df['feature'] = pd.Series(feature_cols)
# Sort the data based on feature importance
fi_df.sort_values(by=['importance'], ascending=False, inplace=True)
# Interpret results
fi_df.head(9)
Save and Load the Model
from pyspark.ml.regression import RandomForestRegressionModel
# Save model
model.save('rfr_no_listprice')
# Load model
loaded_model = RandomForestRegressionModel.load('rfr_no_listprice')