Notes by Louisa
Notes by Louisa
Notes by Louisa
  • Introduction
  • Chapter1 Python Cheatsheet
    • Reference, Deep Copy and Shallow Copy
    • Iterators
    • List Comprehensions
    • Numpy
    • Pandas
    • Data Visualization
    • DateTime
    • Python Good to knows
  • Chapter2 Java Cheatsheet
    • Fundamentals to Java
    • Interface, Abstract Class, Access Modifier, Exceptions
    • Linked List and Java List
    • Java Queue, Stack and Deque
    • Binary Tree
    • Heap in Java
    • Map/Set/Hash
    • OOD
  • Chapter3 Algorithm
    • Fundamental Knowledge
    • Binary Search
    • Basic Sorting
    • Advanced Sorting
    • Linked List
    • Recursion 1
    • HashTable
    • Queue
    • Sliding Window
    • Stack
    • Binary Tree
    • Binary Search Tree
    • Heap
    • String
    • Graph Search DFS1 (Back Tracking)
    • Recursion II and Memoization
    • Dynamic Programming
    • Complete Binary Tree, Segment Tree, Trie Tree
    • Graph Search BFS
    • Graph Search BFS 2
    • Graph Search DFS2
    • Problems from 'JianZhi Offer'
    • Problems Categorized
    • Bit Operations
  • Chapter4 Model
    • Linear Regression
    • Logistic Regression
    • Regularization and Feature Selection
    • Model Evaluation
    • Nonlinear Models
    • PCA
    • Unsupervised Learning
    • Gradient Descent and Gradient Boosting
    • XG Boost and Light GBD
    • Deep Learning
    • Tensorflow/Keras
    • RNN
  • Chapter5 Statistics and A/B Testing
    • Inference about independence
    • Probability, Sampling and Randomization with Python
    • A/B Testing
    • Stats Interview Review
    • Statistics Glossary
  • Chapter6 SQL
    • Student Scores Query
    • Order Query
    • Movie Rating Query
    • Social-Network Query
    • LeetCode SQL题目总结
    • Spark SQL
  • Chapter7 Big Data and Spark
    • Introduction to Pyspark
    • Data Cleaning with Apache Spark
    • Feature Engineering with Pyspark
    • Building Recommendation Engines with Pyspark
    • Building Data Engineering Pipelines in Python
    • Hadoop MapReduce
    • Big Data Related Paper
  • Chapter8 Code Walk-Throughs
    • Python
    • R
    • Shell
  • Chapter9 Special Topics
    • Anomaly Detection
    • E Commerce
    • Supply Chain
    • Social Network Analysis
    • NLP intro
    • Time Series
    • Challenge Prophet with LSTM models
  • Project: The Winning Recipes to an Oscar Award
  • Project: A Crime Analysis of the Last Decade NYC
  • Project: Predict User Type Based on Citibike Data
  • GeoSpark/GeoSparkVis for Geospatial Big Data
  • Single Scattering Albedo
  • Sea Ice Albedo Retrievals
  • Lidar Project
Powered by GitBook
On this page
  • Window Function
  • Load natural language text
  • Caching:
  • Spark UI
  • Logging primer
  • Query Plans
  • Extract, Transform, Select
  • Creating feature data for classification
  • Text Classification
  • Predicting and evaluating
  1. Chapter6 SQL

Spark SQL

Window Function

# Load trainsched.txt
df = spark.read.csv("trainsched.txt", header=True)

# Create temporary table called table1
df.createOrReplaceTempView('table1')

# Inspect the columns in the table df
spark.sql("DESCRIBE schedule").show()

# window function 
# LEAD function can query more than 1 row in a table without having to self join
query = """
SELECT train_id, station, time
LEAD(time, 1) OVER (ORDER BY time) AS time_next
FROM sched
WHERE train_id=324 """
spark.sql(query).show()

# Add col running_total that sums diff_min col in each group
query = """
SELECT train_id, station, time, diff_min,
SUM(diff_min) OVER (PARTITION BY train_id ORDER BY time) AS running_total
FROM schedule
"""

query = """
SELECT *
ROW_NUMBER() OVER (PARTITION BY train_id ORDER BY time) AS id
FROM schedule
"""

#using dot notation
from pyspark.sql import Window
from pyspark.sql.functions import row_number
df.withColumn(
"id", row_number()
.over(Window.partitionBy('train_id').orderBy('time'))
)

#using a WindowSpec

Dot notation and SQL

# Three ways to select 2 columns
df.select('train_id', 'station')
df.select(df.train_d, df.station)
from pyspark.sql.functions import col
df.select(col('train_id'), col('station'))

# 2 ways to rename a column
df.select('train_id', 'station')
    .withColumnRenamed('train_id','train')
    .show(5)
df.select(col('train_id').alias('train'), 'station')

# SQL queries using dot notation
spark.sql('SELECT train_id AS train, station FROM schedule LIMIT 5')
.show()
df.select(col('train_id').alias('train'), 'station')
.limit(5)
.show()

# dot notation
from pyspark.sql.functions import min, max, col
expr = [min(col("time")).alias('start'), max(col("time")).alias('end')]
dot_df = df.groupBy("train_id").agg(*expr)
dot_df.show()

# SQL query 
query = "SELECT train_id, min(time) as start, max(time) as end from schedule GROUP BY train_id"
sql_df = spark.sql(query)
sql_df.show()

# SparkSQL的agg一次只能接1个argument
spark.sql('SELECT train_id, MIN(time) AS start FROM schedule GROUP BY train_id').show()
df.groupBy('train_id').agg({'time':'min'}).withColumnRenamed('min(time)', 'start').show()

# UNIX_TIMESTAMP dot notation
window = Window.partitionBy('train_id').orderBy('time')
dot_df = df.withColumn('diff_min', 
                    (unix_timestamp(lead('time', 1).over(window),'H:m') 
                     - unix_timestamp('time', 'H:m'))/60)
                     
# sparkSQL notation
query = """
SELECT *, 
(unix_timestamp(lead(time, 1) over (PARTITION BY train_id ORDER BY time),'H:m') 
 - unix_timestamp(time, 'H:m'))/60 AS diff_min 
FROM schedule 
"""
sql_df = spark.sql(query)
sql_df.show()

# Words sorted by frequency query
query = """
SELECT word, COUNT(*) AS count FROM df
GROUP BY word
ORDER BY count DESC
"""

#等价的dot
df.groupBy('word') .count() .sort(desc('count')) .explain()

Load natural language text

# load text
df = spark.read.text("sherlock.txt")
print(df.first())
print(df.count())

# load parquet
df1 = spark.read.load('sherlock.parquet')  #parquet is hadoop file to store data structures
df1.show(15, truncate=False)

# Lower case operation
df = df1.select(lower(col('value'))) 
print(df.first())

# show column names
df.columns
# Alias operation
df = df1.select(lower(col('value')).alias('v'))

# Replacing text
df = df1.select(regexp_replace('value', 'Mr\.', 'Mr').alias('v')) #"Mr. Holmes." ==> "Mr Holmes."
df = df1.select(regexp_replace('value', 'don\'t', 'do not').alias('v')) #"don't know." ==> "do not know."

# Tokenizing text
df = df2.select(split('v', '[ ]').alias('words'))

# Split characters are discarded
punctuation = "_|.\?\!\",\'\[\]\*()"
df3 = df2.select(split('v', '[ %s]' % punctuation).alias('words'))

# Exploding an array
df4 = df3.select(explode('words').alias('word')) 

# Removing empty rows
nonblank_df = df.where(length('word') > 0)
# Adding a row id column
df2 = df.select('word', monotonically_increasing_id().alias('id'))

# Partitioning the data
df2 = df.withColumn('title', when(df.id < 25000, 'Preface') .when(df.id < 50000, 'Chapter 1') .when(df.id < 75000, 'Chapter 2')
.otherwise('Chapter 3'))
df2 = df2.withColumn('part', when(df2.id < 25000, 0) .when(df2.id < 50000, 1) .when(df2.id < 75000, 2)
.otherwise(3)) .show()

# Repartitioning on a column
df2 = df.repartition(4, 'part')

# How to know how many chapters in in repartitioning 
df.select('part', 'title').distinct().sort('part').show(truncate=False)
# or the following gives a number 
repart_df.rdd.getNumPartitions()

# Reading pre-partitioned text
df_parts = spark.read.text('sherlock_parts') #每个叫做sherlock的文档

# Word for each row, previous two and subsequent two words
query = """
SELECT
part,
LAG(word, 2) OVER(PARTITION BY part ORDER BY id) AS w1,
LAG(word, 1) OVER(PARTITION BY part ORDER BY id) AS w2,
word AS w3,
LEAD(word, 1) OVER(PARTITION BY part ORDER BY id) AS w4,
LEAD(word, 2) OVER(PARTITION BY part ORDER BY id) AS w5
FROM text
"""
spark.sql(query).where("part = 12").show(10)

# 还可以再加subquery,统计word-tuple的count
# Find the top 10 sequences of five words
query = """
SELECT w1, w2, w3, w4, w5, COUNT(*) AS count FROM (
   SELECT word AS w1,
   LEAD(word, 1) OVER (PARTITION BY part ORDER BY id) AS w2,
   LEAD(word, 2) OVER (PARTITION BY part ORDER BY id)  AS w3,
   LEAD(word, 3) OVER (PARTITION BY part ORDER BY id)  AS w4,
   LEAD(word, 4) OVER (PARTITION BY part ORDER BY id)  AS w5
   FROM text
)
GROUP BY w1, w2, w3, w4, w5
ORDER BY count DESC
LIMIT 10 """
df = spark.sql(query)
df.show()

#   Most frequent 3-tuple per chapter
query = """
SELECT chapter, w1, w2, w3, count FROM
(
  SELECT
  chapter,
  ROW_NUMBER() OVER (PARTITION BY chapter ORDER BY count DESC) AS row,
  w1, w2, w3, count
  FROM ( %s )
)
WHERE row = 1
ORDER BY chapter ASC
""" % subquery
spark.sql(query).show()

# length
query3agg = """
SELECT w1, w2, w3, length(w1)+length(w2)+length(w3) as length FROM (
SELECT
word AS w1,
LEAD(word,1) OVER(PARTITION BY part ORDER BY id ) AS w2, LEAD(word,2) OVER(PARTITION BY part ORDER BY id ) AS w3 FROM df
WHERE part <> 0 and part <> 13
)
GROUP BY w1, w2, w3 ORDER BY length DESC """
spark.sql(query3agg).show(truncate=False)

Caching:

Eviction Policy Least Recently Used (LRU) Caching is a lazy operation. It requires an action to trigger it.

eg. - spark.sql("select count(*) from text").show() - partitioned_df.count()

df.cache()    #df.persist() df.persist(storageLevel=pyspark.StorageLevel.MEMORY_AND_DISK)
df.unpersist()

#Determining whether a dataframe is cached
df.is_cached

#storage level useDisk useMemory useOffHeap deserialized replication
df.storageLevel

#Caching a table
df.createOrReplaceTempView('df') 
spark.catalog.cacheTable('df')
spark.catalog.isCached(tableName='df')
spark.catalog.dropTempView('table1')
# List the tables
print("Tables:\n", spark.catalog.listTables())

# Uncaching a table
spark.catalog.uncacheTable('df')
spark.catalog.clearCache()

Spark UI

- Spark Task is a unit of execution that runs on a single cpu - Spark Stage a group of tasks that perform the same computation in parallel, each task typically running on a different subset of the data - Spark Job is a computation triggered by an action, sliced into one or more stages. - Jobs, Stages, Storage, Environment, Executors, SQL -Storage: in memory, or on disk, across the cluster, at a snapshot in time.

如果是local,从http://[DRIVER_HOST]:4040开始依次往下。

Logging primer

import logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s') 
logging.info("Hello %s", "world")
logging.debug("Hello, take %d", 2)

#OUTPUT
# 2019-03-14 15:92:65,359 - INFO - Hello world
# 因为level=infor,debug不执行

#Logging with DEBUG level
import logging
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s') 
logging.info("Hello %s", "world")
logging.debug("Hello, take %d", 2)
#OUTPUT
#2018-03-14 12:00:00,000 - INFO - Hello world
#2018-03-14 12:00:00,001 - DEBUG - Hello, take 2

# Log columns of text_df as debug message
logging.debug("text_df columns: %s", text_df.columns)

# Log whether table1 is cached as info message
logging.info("table1 is cached: %s", spark.catalog.isCached(tableName="table1"))

# Log first row of text_df as warning message
logging.warning("The first row of text_df:\n %s", text_df.first())

# Log selected columns of text_df as error message
logging.error("Selected columns: %s", text_df.select("id", "word"))

如果用一个timer来对logging计时,

class timer:
    start_time = time.time() 
    step = 0
    
    def elapsed(self, reset=True): 
        self.step += 1
        print("%d. elapsed: %.1f sec %s"
        % (self.step, time.time() - self.start_time))
        if reset: 
            self.reset()

    def reset(self):
        self.start_time = time.time()

要注意的是即使是在info的level,debug里的操作其实还是执行了,比如

import logging logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s') 

# < create dataframe df here >

t = timer()
logging.info("No action here.")
t.elapsed()
logging.debug("df has %d rows.", df.count()) 
t.elapsed()

#2018-12-23 22:24:20,472 - INFO - No action here. 
#1. elapsed: 0.0 sec
#2. elapsed: 2.0 sec

比较好的做法是,disable action

ENABLED = False

t = timer()
logger.info("No action here.") t.elapsed()
if ENABLED:
    logger.info("df has %d rows.", df.count()) t.elapsed()

(行吧,这章没学懂)

Query Plans

# SQL 中的EXPLAIN SELECT * FROM table1解释执行顺序

#Load dataframe and register
df = sqlContext.read.load('/temp/df.parquet')
df.registerTempTable('df')
#Running an EXPLAIN query
spark.sql('EXPLAIN SELECT * FROM df').first()
#等价于下面的
df.explain()
spark.sql("SELECT * FROM df").explain()
#但如果cache()了,df.cache()
#要倒着看执行顺序

Extract, Transform, Select

# Importing the udf function
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
from pyspark.sql.types import StringType, IntegerType, FloatType, ArrayType
# Creating a boolean UDF
short_udf = udf(lambda x: 
True if not x or len(x) < 10 else False, BooleanType())

df.select(short_udf('textdata')\
 .alias("is short"))\
 .show(3)
 
 #Creating an array UDF
 df3.select('word array', in_udf('word array').alias('without endword'))\
  .show(5, truncate=30)
  
  from pyspark.sql.types import StringType, ArrayType
  # Removes last item in array
in_udf = udf(lambda x:
x[0:len(x)-1] if x and len(x) > 1 else [],
ArrayType(StringType()))

# Sparse vector format Indices Values
# Array: [1.0, 0.0, 0.0, 3.0]
# Sparse vector: (4, [0, 3], [1.0, 3.0])

hasattr(x, "toArray") #ensures it's an array by checking it has an attribute 'toArray'
x.numNonzeros()

# Show the rows where doc contains the item '5'
df_before.where(array_contains('doc', '5')).show()

# UDF removes items in TRIVIAL_TOKENS from array
rm_trivial_udf = udf(lambda x:
                     list(set(x) - TRIVIAL_TOKENS) if x
                     else x,
                     ArrayType(StringType()))

# Remove trivial tokens from 'in' and 'out' columns of df2
df_after = df_before.withColumn('in', rm_trivial_udf('in'))\
                    .withColumn('out', rm_trivial_udf('out'))

# Show the rows of df_after where doc contains the item '5'
df_after.where(array_contains('doc','5')).show()

Creating feature data for classification

try: df.select(bad_udf('outvec').alias('label')).first()
except Exception as e: print(e.__class__) print(e.errmsg)
#debug 用的

#UDF return type must be properly cast
first_udf = udf(lambda x: int(x.indices[0])
if (x and hasattr(x, "toArray") and x.numNonzeros()) else 0,
IntegerType())

df.withColumn('label', first_udf('outvec')).drop('outvec').show(3)
# Add label by applying the first_udf to output column
df_new = df.withColumn('label', first_udf('output'))

CountVectorizer is a Feature Extractor Its input is an array of strings Its output is a vector

#Fitting the CountVectorizer
from pyspark.ml.feature import CountVectorizer 
cv = CountVectorizer(inputCol='words',
  outputCol="features") 
model = cv.fit(df) 
result = model.transform(df)
print(result)

Text Classification

# 加label
df_true = df.where("endword in ('she', 'he', 'hers', 'his', 'her', 'him')")\ .withColumn('label', lit(1))
df_false = df.where("endword not in ('she', 'he', 'hers', 'his', 'her', 'him')")\ .withColumn('label', lit(0))

# Combining the positive and negative data
df_examples = df_true.union(df_false)

# Splitting the data into training and evaluation sets
df_train, df_eval = df_examples.randomSplit((0.60, 0.40), 42)

# Training
from pyspark.ml.classification import LogisticRegression
logistic = LogisticRegression(maxIter=50, regParam=0.6, elasticNetParam=0.3) 
model = logistic.fit(df_train)
print("Training iterations: ", model.summary.totalIterations)
# Print the number of test examples
print("Number test: ", df_testset.count())


# Import the lit function
from pyspark.sql.functions import lit

# Select the rows where endword is 'him' and label 1
df_pos = df.where("endword = 'him'")\
           .withColumn('label', lit(1))

# Select the rows where endword is not 'him' and label 0
df_neg = df.where("endword <> 'him'")\
           .withColumn('label', lit(0))

# Union pos and neg in equal number
df_examples = df_pos.union(df_neg.limit(df_pos.count()))
print("Number of examples: ", df_examples.count())
df_examples.where("endword <> 'him'").sample(False, .1, 42).show(5)

Predicting and evaluating

# Applying a model to evaluation data
predicted = df_trained.transform(df_test) #返回的结果是一个dataframe

#prediction column: double 不过在这个语境是0或者1
#probability column: vector of length two 是概率值,加起来1 第一个值是estimated probability to be false; second is true

x = predicted.first
print("Right!" if x.label == int(x.prediction) else "Wrong")

#Evaluating classication accuracy
model_stats = model.evaluate(df_eval)
type(model_stats) #pyspark.ml.classification.BinaryLogisticRegressionSummary)
print("\nAccuracy: %.2f" % model_stats.areaUnderROC)

# Apply the model to the test data
predictions = df_fitted.transform(df_testset).select(fields)

# Print incorrect if prediction does not match label
for x in predictions.take(8):
    print()
    if x.label != int(x.prediction):
        print("INCORRECT ==> ")
    for y in fields:
        print(y,":", x[y])

prediction : 1.0 label : 1 endword : him doc : ['and', 'pierre', 'felt', 'that', 'their', 'opinion', 'placed', 'responsibilities', 'upon', 'him'] probability : [0.28537355252312796,0.714626447476872]

PreviousLeetCode SQL题目总结NextChapter7 Big Data and Spark

Last updated 5 years ago