Data Cleaning with Apache Spark
Spark Schema
.printSchema()
Immutability and Lazy Processing
A component of functional programming Defined once Unable to be directly modi ed Re-created if reassigned Able to be shared efficiently
Understanding Parquet
Difficulties with CSV files
No de fined schema (no data type, no column name beyond a header row)
Nested data requires special (content containing a comma requires escaping, using the escape character within content requires even further escaping)
handling Encoding format limited
for spark: slow to parse, cannot be shared during the import process; if no schema is defined, all data must be read before a schema can be inferred, forcing the code to read the file twice.
for spark: files cannot be filtered (no 'predicate pushdown', ordering tasks to do the least amount of work, filtering data prior to processing is one of the primary optimizations of predicate pushdown.)
for spark: any intermediate use requires redefining schema. Spark process are often multi-step and may utilize an intermediate file representation. These representations allow data to be used later without regenerating the data from source.
The Parquet Format
columnar data format
supported in spark and other data processing frameworks
supports predicate pushdown
automatically stores schema information
binary file format
Parquet is a compressed columnar data format developed for use in any Hadoop based system. Include: Spark, Hadoop, Apache Impala... Perfect for intermediary or on-disk representation of processed data.
predicate pushdown This means Spark will only process the data necessary to complete the operations you define versus reading the entire dataset.
Working with Parquet
Parquet with SQL
parquet - dataframe - table
DataFrame column operations
DataFrames:
Made up of rows & columns
Immutable
Use various transformation operations to modify data
Common DataFrame transformations
Filtering data
Remove nulls
Remove odd entries
Split data from combined sources
Negate with ~
Column string transformations
ArrayType() column functions
Various utility functions / transformations to interact with ArrayType() .
.size()
- returns length of arrayType() column
.getItem()
- used to retrieve a specifi c item at index of list column.
Conditional DataFrame column operations
Conditional clauses
Conditional Clauses are:
Inline version of if / then / else
.when()
.otherwise()
User de fined functions
Python method
Wrapped via the
pyspark.sql.functions.udf
methodStored as a variable
Called like a normal Spark function
Reverse string UDF
Argument-less example
Partitioning and lazy processing
Partitioning
DataFrames are broken up into partitions
Partition size can vary
Each partition is handled independently
To check the number of partitions, use the method .rdd.getNumPartitions()
on a DataFrame.
Lazy processing
Transformations are lazy * .withColumn(...) * .select(...) * .cache()
Nothing is actually done until an action is performed * .count() * .write(...) *. show()
Transformations can be re-ordered for best performance
Sometimes causes unexpected behavior
Adding IDs
Normal ID fields:
Common in relational databases
Most usually an integer increasing, sequential and unique
Not very parallel
Monotonically increasing IDs
pyspark.sql.functions.monotonically_increasing_id()
Integer (64-bit), increases in value, unique
Not necessarily sequential (gaps exist)
Completely parallel
More ID Tricks
Depending on your needs, you may want to start your IDs at a certain value so there isn't overlap with previous runs of the Spark task. This behavior is similar to how IDs would behave in a relational database. Make sure that the IDs output from a monthly Spark task start at the highest value from the previous month.
Caching
Caching in Spark:
Stores DataFrames in memory or on disk
Improves speed on later transformations / actions
Reduces resource usage
Disadvantages of caching
Very large data sets may not t in memory
Local disk based caching may not be a performance improvement
Cached objects may not be available
Caching tips When developing Spark tasks:
Cache only if you need it
Try caching DataFrames at various points and determine if your performance improves
Cache in memory and fast SSD / NVMe storage
Cache to slow local disk if needed
Use intermediate files!
Stop caching objects when finished
Eviction Policy Least Recently Used (LRU)
Caching is a lazy operation. It requires an action to trigger it. eg.
spark.sql("select count(*) from text").show()
partitioned_df.count()
Improve import performance
Spark clusters
Spark Clusters are made of two types of processes
Driver process
Worker processes
Import performance
Important parameters:
Number of objects (Files, Network locations, etc)
More objects better than larger ones
Can import via wildcard
airport_df = spark.read.csv('airports-*.txt.gz')
General size of objects
Spark performs better if objects are of similar size
It's safe to assume the more import objects available, the better the cluster can divvy up the job.
Schemas
A well-de ned schema will drastically improve import performance
Avoids reading the data multiple times
Provides validation on import
How to split objects
Use OS utilities / scripts (split, cut, awk)
split -l 10000 -d largefile chunk-
每个文件100000行,字符,名字叫largefile,生成chunk0000开始
Use custom scripts
Write out to Parquet
df_csv = spark.read.csv('singlelargefile.csv') df_csv.write.parquet('data.parquet')
df = spark.read.parquet('data.parquet')
Explaining the Spark execution plan
== Physical Plan == *(2) HashAggregate(keys=[VOTER NAME#15], functions=[]) +- Exchange hashpartitioning(VOTER NAME#15, 200)
+- *(1) HashAggregate(keys=[VOTER NAME#15], functions=[]) +- *(1) FileScan csv [VOTER NAME#15] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/DallasCouncilVotes.csv.gz], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<VOTER NAME:string>
Shuffling
Shuffl ing refers to moving data around to various workers to complete a task
Hides complexity from the user
Can be slow to complete
Lowers overall throughput
Is often necessary, but try to minimize
How to limit shuf fling
Limit use of
.repartition(num_partitions)
Use.coalesce(num_partitions)
insteadUse care when calling
.join()
Use.broadcast()
May not need to limit it
Broadcasting
Provides a copy of an object to each worker
Prevents undue / excess communication between nodes
Can drastically speed up
.join()
operations
A couple tips:
Broadcast the smaller DataFrame. The larger the DataFrame, the more time required to transfer to the worker nodes.
On small DataFrames, it may be better skip broadcasting and let Spark figure out any optimization on its own.
If you look at the query execution plan, a broadcastHashJoin indicates you've successfully configured broadcasting.
Use the .broadcast(<DataFrame>)
method
Cluster Configurations
Con figuration options
Spark contains many con figuration settings
These can be modi ed to match needs
Reading con figuration settings:
spark.conf.get(<configuration name>)
Writing con figuration settings
spark.conf.set(<configuration name>)
Configure Spark to use 500 partitions
spark.conf.set('spark.sql.shuffle.partitions', 500)
Cluster Types
Spark deployment options:
Single node (deploying all components on a single system, can be physical/VM/container)
Standalone (dedicated machines as the driver and workers)
Managed (cluster components are handled by a third party cluster manager)
YARN
Mesos
Kubernetes
Driver
Task assignment
Result consolidation
Shared data access
Tips:
Driver node should have double the memory of the worker
Fast local storage helpful
Worker
Runs actual tasks
Ideally has all code, data, and resources for a given task
Recommendations:
More worker nodes is often better than larger workers
Test to find the balance
Fast local storage extremely useful
Data Pipelines
Input(s) CSV, JSON, web services, databases
Transformations
withColumn()
,.filter()
,.drop()
Output(s) CSV, Parquet, database Validation
Analysis
Pipeline details
Not formally de ned in Spark
Typically all normal Spark code required for task
Data Handling
Parsing
Incorrect data * Empty rows * Commented lines * Headers
Nested structures * Multiple delimiters
Non-regular data * Differing numbers of columns per row
Removing blank lines, headers, and comments
Spark's CSV parser:
Automatically removes blank lines
Can remove comments using an optional argument
df1 = spark.read.csv('datafile.csv.gz', comment='#')
Handles header fields
De fined via argument
Ignored if a schema is defi ned
df1 = spark.read.csv('datafile.csv.gz', header='True')
Count the number of rows beginning with '#'
comment_count = annotations_df.where(col('_c0').startswith('#')).count()
Automatic column creation
Spark will:
Automatically create columns in a DataFrame based on sep argument
df1 = spark.read.csv('datafile.csv.gz', sep=',')
Defaults to using ,
Can still successfully parse if sep is not in string
df1 = spark.read.csv('datafile.csv.gz', sep='*')
Stores data in column defaulting to
_c0
Allows you to properly handle nested separators
Data Validation
Validation is:
Verifying that a dataset complies with the expected format
Number of rows / columns
Data types
Complex validation rules
Validating via joins
Compares data against known values
Easy to fi nd data in a given set
Comparatively fast
This automatically removes any rows with a company not in the valid_df !
Complex rule validation
Using Spark components to validate logic:
Calculations
Verifying against external source
Likely uses a UDF to modify / verify the DataFrame
Last updated