Creating the connection is as simple as creating an instance of the SparkContext class. The class constructor takes a few optional arguments that allow you to specify the attributes of the cluster you're connecting to.
To start working with Spark DataFrames, you first have to create a SparkSession object from your SparkContext. You can think of the SparkContext as your connection to the cluster and the SparkSession as your interface with that connection.
Create a SparkSession
# Import SparkSession from pyspark.sqlfrom pyspark.sql import SparkSession# Create my_sparkmy_spark = SparkSession.builder.getOrCreate()# Print the tables in the catalogprint(spark.catalog.listTables())
SparkSession has an attribute called catalog which lists all the data inside the cluster. This attribute has a few methods for extracting different pieces of information.
One of the most useful is the .listTables()method, which returns the names of all the tables in your cluster as a list.
Pandas Dataframe <-> Spark Dataframe
.createDataFrame() method takes a pandas DataFrame and returns a Spark DataFrame. .toPandas()does the opposite.
# Convert the results to a pandas DataFramepd_counts = flight_counts.toPandas()
Spark Dataframe的内容存在locally,不在SparkSession catalog. This means that you can use all the Spark DataFrame methods on it, but you can't access the data in other contexts. 比如不能直接在spark dataframe上.sql() ,必须.createTempView() 或者 .createOrReplaceTempView()。
# Create pd_temppd_temp = pd.DataFrame(np.random.random(10))# Create spark_temp from pd_tempspark_temp = spark.createDataFrame(pd_temp)# Examine the tables in the catalogprint(spark.catalog.listTables())# Add spark_temp to the catalogspark_temp.createOrReplaceTempView("temp")# Examine the tables in the catalog againprint(spark.catalog.listTables())
The .filter() method takes either an expression that would follow the WHERE clause of a SQL expression as a string, or a Spark Column of boolean (True/False) values.
For example, the following two expressions will produce the same output:
# Filter flights by passing a stringlong_flights1 = flights.filter("distance > 1000")# Filter flights by passing a column of boolean valueslong_flights2 = flights.filter(flights.distance >1000)
Select
These arguments can either be the column name as a string (one for each column) or a column object (using the df.colName syntax).
The difference between .select() and .withColumn() methods is that .select() returns only the columns you specify, while .withColumn()returns all the columns of the DataFrame in addition to the one you defined.
# Select the first set of columnsselected1 = flights.select("tailnum", "origin", "dest")# Select the second set of columnstemp = flights.select(flights.origin, flights.dest, flights.carrier)# Define first filterfilterA = flights.origin =="SEA"# Define second filterfilterB = flights.dest =="PDX"# Filter the data, first by filterA then by filterBselected2 = temp.filter(filterA).filter(filterB)
When you're selecting a column using the df.colName notation, you can perform any column operation and the .select() method will return the transformed column. For example,
The equivalent Spark DataFrame method .selectExpr()takes SQL expressions as a string:
flights.selectExpr("air_time/60 as duration_hrs")
with the SQL as keyword being equivalent to the .alias() method. To select multiple columns, you can pass multiple strings.
# Define avg_speedavg_speed = (flights.distance/(flights.air_time/60)).alias("avg_speed")# Select the correct columnsspeed1 = flights.select("origin", "dest", "tailnum", avg_speed)# Create the same table using a SQL expressionspeed2 = flights.selectExpr("origin", "dest", "tailnum", "distance/(air_time/60) as avg_speed")
Groupby
df.groupBy().min("col").show()
This creates a GroupedData object (so you can use the .min() method), then finds the minimum value in col, and returns it as a DataFrame.
# Find the shortest flight from PDX in terms of distanceflights.filter(flights.origin =="PDX").groupBy().min("distance").show()# Find the longest flight from SEA in terms of air timeflights.filter(flights.origin =="SEA").groupBy().max("air_time").show()# Average duration of Delta flightsflights.filter(flights.carrier =="DL").filter(flights.origin =="SEA").groupBy().avg("air_time").show()# Total hours in the airflights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").show()# Group by tailnumby_plane = flights.groupBy("tailnum")# Number of flights each plane madeby_plane.count().show()# Group by originby_origin = flights.groupBy("origin")# Average duration of flights from PDX and SEAby_origin.avg("air_time").show()
还可以在aggregate functions from the pyspark.sql.functions叠加std之类的。
# Import pyspark.sql.functions as Fimport pyspark.sql.functions as F# Group by month and destby_month_dest = flights.groupBy("month", "dest")# Average departure delay by month and destinationby_month_dest.avg("dep_delay").show()# Standard deviation of departure delayby_month_dest.agg(F.stddev("dep_delay")).show()
Join
# Rename the faa columnairports = airports.withColumnRenamed("faa", "dest")# Join the DataFramesflights_with_airports = flights.join(airports, on="dest", how="leftouter")
Cast
Spark接收的是numerics,所以要么double要么integer。To convert the type of a column using the .cast()method, you can write code like this:
# Create is_latemodel_data = model_data.withColumn("is_late", model_data.arr_delay >0)# Convert to an integermodel_data = model_data.withColumn("label", model_data.is_late.cast("integer"))
Onehot Encoder, StringIndexer
# Create a StringIndexercarr_indexer =StringIndexer(inputCol="carrier", outputCol="carrier_index")# Create a OneHotEncodercarr_encoder =OneHotEncoder(inputCol="carrier_index", outputCol="carrier_fact")
VectorAssembler
# Make a VectorAssemblervec_assembler = VectorAssembler(inputCols=["month", "air_time", "carrier_fact", "dest_fact", "plane_age"], outputCol="features")
Pipeline
# Import Pipelinefrom pyspark.ml import Pipeline# Make the pipelineflights_pipe =Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])# Fit and transform the datapiped_data = flights_pipe.fit(model_data).transform(model_data)
Split the data
# Split the data into training and test setstraining, test = piped_data.randomSplit([.6, .4])
cross validation
# Import the evaluation submoduleimport pyspark.ml.evaluation as evals# Create a BinaryClassificationEvaluatorevaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")
Tuning
# Import the tuning submoduleimport pyspark.ml.tuning as tune# Create the parameter gridgrid = tune.ParamGridBuilder()# Add the hyperparametergrid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))grid = grid.addGrid(lr.elasticNetParam, [0, 1])# Build the gridgrid = grid.build()# Create the CrossValidatorcv = tune.CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator )# Fit cross validation modelsmodels = cv.fit(training)# Extract the best modelbest_lr = models.bestModel