Introduction to Pyspark

Connect to Python

Creating the connection is as simple as creating an instance of the SparkContext class. The class constructor takes a few optional arguments that allow you to specify the attributes of the cluster you're connecting to.

To start working with Spark DataFrames, you first have to create a SparkSession object from your SparkContext. You can think of the SparkContext as your connection to the cluster and the SparkSession as your interface with that connection.

Create a SparkSession

SparkSession has an attribute called catalog which lists all the data inside the cluster. This attribute has a few methods for extracting different pieces of information.

One of the most useful is the .listTables()method, which returns the names of all the tables in your cluster as a list.

Pandas Dataframe <-> Spark Dataframe

.createDataFrame() method takes a pandas DataFrame and returns a Spark DataFrame. .toPandas()does the opposite.

Spark Dataframe的内容存在locally,不在SparkSession catalog. This means that you can use all the Spark DataFrame methods on it, but you can't access the data in other contexts. 比如不能直接在spark dataframe上.sql() ,必须.createTempView() 或者 .createOrReplaceTempView()

比如上面这个代码,第一次tempview的listtable是[],第二次是[Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

withColumn

Spark DataFrame is immutable. This means that it can't be changed, and so columns can't be updated in place.

Thus, all these methods return a new DataFrame. To overwrite the original DataFrame you must reassign the returned DataFrame using the method like so:

Filter

The .filter() method takes either an expression that would follow the WHERE clause of a SQL expression as a string, or a Spark Column of boolean (True/False) values.

For example, the following two expressions will produce the same output:

Select

These arguments can either be the column name as a string (one for each column) or a column object (using the df.colName syntax).

The difference between .select() and .withColumn() methods is that .select() returns only the columns you specify, while .withColumn()returns all the columns of the DataFrame in addition to the one you defined.

When you're selecting a column using the df.colName notation, you can perform any column operation and the .select() method will return the transformed column. For example,

The equivalent Spark DataFrame method .selectExpr()takes SQL expressions as a string:

with the SQL as keyword being equivalent to the .alias() method. To select multiple columns, you can pass multiple strings.

Groupby

This creates a GroupedData object (so you can use the .min() method), then finds the minimum value in col, and returns it as a DataFrame.

还可以在aggregate functions from the pyspark.sql.functions叠加std之类的。

Join

Cast

Spark接收的是numerics,所以要么double要么integer。To convert the type of a column using the .cast()method, you can write code like this:

比如,make a boolean:

Onehot Encoder, StringIndexer

VectorAssembler

Pipeline

Split the data

cross validation

Tuning

Last updated