Service setup¶
- Download latest spark from here
- Extract in your home folder:
# replace with actual url
wget -c https://<cdn-url>/spark-<version>-bin-hadoop3.tgz
tar -zxvf spark-<version>-bin-hadoop3.tgz
- rename spark folder
mv spark-<version>-bin-hadoop3 spark
- Setup environment variables
Add these lines to
~/.bashrc
export SPARK_HOME=~/spark
export PATH=$PATH:$SPARK_HOME/bin
- Reload
~/.bashrc
source ~/.bashrc
- start spark cluster
$SPARK_HOME/sbin/start-all.sh
- Check ui at
http://<hostname>:8080/. Replace hostname with your hostname
# !pip install pyspark
Accessing the Spark UI:
Once the SparkSession is created, Spark starts a web UI to monitor jobs, stages, tasks, and storage. By default, it is available at:
http://<hostname>:4040 (or 4041, 4042 if 4040 is in use). You can click the link provided in the SparkSession output.
!echo $JAVA_HOME
/usr/lib/jvm/java-21-openjdk-amd64
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, hour, broadcast
from pyspark.storagelevel import StorageLevel
# ⚡ Pro Tip: Using local[4] for 4 local cores.
spark = SparkSession.builder.master("local[4]").appName("NYC Taxi Spark Demo").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# 👉 Open Spark UI now (typically http://localhost:4040)
spark
WARNING: Using incubator modules: jdk.incubator.vector Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties 26/03/10 00:06:12 WARN Utils: Your hostname, gamekeepers, resolves to a loopback address: 127.0.0.1; using 192.168.1.10 instead (on interface wlp4s0) 26/03/10 00:06:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 26/03/10 00:06:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 26/03/10 00:06:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
SparkSession - in-memory
Dataset¶
Description¶
(Who) VendorID
(When) tpep_pickup_datetime → tpep_dropoff_datetime
(Where) PULocationID → DOLocationID
(How far) trip_distance
(How many) passenger_count
(How paid) payment_type
(Rate type) RatecodeID
(Metadata) store_and_fwd_flag
(How much) fare_amount + extra + mta_tax + tip_amount +
tolls_amount + improvement_surcharge +
congestion_surcharge + Airport_fee +
cbd_congestion_fee = total_amount
Download¶
import pandas as pd
# !mkdir -p taxi_data
# !wget -c https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet
# !wget -c https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2025-01.parquet
# !wget -c https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2025-01.parquet
# !wget -c https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2025-01.parquet
# !wget -c https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
df = pd.read_parquet("taxi_data/yellow_tripdata_2025-01.parquet")
df[:100].to_csv("taxi_data/yellow_tripdata_sample.csv", index=False)
1. Load CSV (Lazy Evaluation Demo)¶
Assumes taxi_data/yellow_tripdata_2025-01.parquet and taxi_zone_lookup.csv are in the working directory.
trips = spark.read.parquet("taxi_data/yellow_tripdata_2025-01.parquet",
header=True,
inferSchema=True
)
zones = spark.read.csv("taxi_data/taxi_zone_lookup.csv",
header=True,
inferSchema=True
)
Nothing executed yet. Now trigger an action:
trips.count()
3475226
When did execution happen?
Check Spark UI → Job → Stages → Tasks
2. Inspect Partitions¶
print("Initial partitions:", trips.rdd.getNumPartitions())
# # Optional: Repartitioning
# trips = trips.repartition(8)
# print("Partitions after repartition(8):", trips.rdd.getNumPartitions())
Initial partitions: 4
Note::
- Partition = task parallelism
- Too few → underutilization
- Too many → scheduling overhead
3. Narrow Transformation (No Shuffle)¶
long_trips = trips.filter(col("trip_distance") > 5)
long_trips.count()
522785
Shuffle or not?
Check Spark UI:
- Single stage
- No shuffle read/write
4. Wide Transformation (Shuffle)¶
revenue_by_zone = trips.groupBy("PULocationID").agg(sum("total_amount").alias("total_revenue"))
revenue_by_zone.show(5)
+------------+------------------+ |PULocationID| total_revenue| +------------+------------------+ | 148| 848404.3599999994| | 243| 30711.29000000003| | 31|1023.6800000000001| | 137| 814154.0399999941| | 85|19080.979999999996| +------------+------------------+ only showing top 5 rows
👉 Open Spark UI
You should now see:
- 2 stages
- Shuffle write
- Shuffle read
Explaination:
- Wide transformation requires data exchange across partitions
- Creates a Stage boundary
- Shuffle cost is an important performance factor
5. Check DAG & Physical Plan (Catalyst)¶
revenue_by_zone.explain(True)
== Parsed Logical Plan ==
'Aggregate ['PULocationID], ['PULocationID, 'sum('total_amount) AS total_revenue#90]
+- Relation [VendorID#0,tpep_pickup_datetime#1,tpep_dropoff_datetime#2,passenger_count#3L,trip_distance#4,RatecodeID#5L,store_and_fwd_flag#6,PULocationID#7,DOLocationID#8,payment_type#9L,fare_amount#10,extra#11,mta_tax#12,tip_amount#13,tolls_amount#14,improvement_surcharge#15,total_amount#16,congestion_surcharge#17,Airport_fee#18,cbd_congestion_fee#19] parquet
== Analyzed Logical Plan ==
PULocationID: int, total_revenue: double
Aggregate [PULocationID#7], [PULocationID#7, sum(total_amount#16) AS total_revenue#90]
+- Relation [VendorID#0,tpep_pickup_datetime#1,tpep_dropoff_datetime#2,passenger_count#3L,trip_distance#4,RatecodeID#5L,store_and_fwd_flag#6,PULocationID#7,DOLocationID#8,payment_type#9L,fare_amount#10,extra#11,mta_tax#12,tip_amount#13,tolls_amount#14,improvement_surcharge#15,total_amount#16,congestion_surcharge#17,Airport_fee#18,cbd_congestion_fee#19] parquet
== Optimized Logical Plan ==
Aggregate [PULocationID#7], [PULocationID#7, sum(total_amount#16) AS total_revenue#90]
+- Project [PULocationID#7, total_amount#16]
+- Relation [VendorID#0,tpep_pickup_datetime#1,tpep_dropoff_datetime#2,passenger_count#3L,trip_distance#4,RatecodeID#5L,store_and_fwd_flag#6,PULocationID#7,DOLocationID#8,payment_type#9L,fare_amount#10,extra#11,mta_tax#12,tip_amount#13,tolls_amount#14,improvement_surcharge#15,total_amount#16,congestion_surcharge#17,Airport_fee#18,cbd_congestion_fee#19] parquet
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[PULocationID#7], functions=[sum(total_amount#16)], output=[PULocationID#7, total_revenue#90])
+- Exchange hashpartitioning(PULocationID#7, 200), ENSURE_REQUIREMENTS, [plan_id=176]
+- HashAggregate(keys=[PULocationID#7], functions=[partial_sum(total_amount#16)], output=[PULocationID#7, sum#115])
+- FileScan parquet [PULocationID#7,total_amount#16] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/ankush/workplace/da_colab/DS614/docs/notebooks/taxi_data/ye..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<PULocationID:int,total_amount:double>
Explaination:
- Logical Plan
- Optimized Logical Plan
- Physical Plan
- HashAggregate
This is the Catalyst Optimizer in action.
6. Join Without Broadcast (Shuffle Join)¶
joined = revenue_by_zone.join(
zones,
revenue_by_zone.PULocationID == zones.LocationID,
"inner"
)
joined.show(5)
+------------+------------------+----------+---------+--------------------+------------+ |PULocationID| total_revenue|LocationID| Borough| Zone|service_zone| +------------+------------------+----------+---------+--------------------+------------+ | 148| 848404.3599999994| 148|Manhattan| Lower East Side| Yellow Zone| | 243| 30711.29000000003| 243|Manhattan|Washington Height...| Boro Zone| | 31|1023.6800000000001| 31| Bronx| Bronx Park| Boro Zone| | 137| 814154.0399999941| 137|Manhattan| Kips Bay| Yellow Zone| | 85|19080.979999999996| 85| Brooklyn| Erasmus| Boro Zone| +------------+------------------+----------+---------+--------------------+------------+ only showing top 5 rows
Inspect plan:
joined.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [PULocationID#7], [LocationID#37], Inner, BuildRight, false
:- HashAggregate(keys=[PULocationID#7], functions=[sum(total_amount#16)])
: +- Exchange hashpartitioning(PULocationID#7, 200), ENSURE_REQUIREMENTS, [plan_id=334]
: +- HashAggregate(keys=[PULocationID#7], functions=[partial_sum(total_amount#16)])
: +- Filter isnotnull(PULocationID#7)
: +- FileScan parquet [PULocationID#7,total_amount#16] Batched: true, DataFilters: [isnotnull(PULocationID#7)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/ankush/workplace/da_colab/DS614/docs/notebooks/taxi_data/ye..., PartitionFilters: [], PushedFilters: [IsNotNull(PULocationID)], ReadSchema: struct<PULocationID:int,total_amount:double>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=337]
+- Filter isnotnull(LocationID#37)
+- FileScan csv [LocationID#37,Borough#38,Zone#39,service_zone#40] Batched: false, DataFilters: [isnotnull(LocationID#37)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/ankush/workplace/da_colab/DS614/docs/notebooks/taxi_data/ta..., PartitionFilters: [], PushedFilters: [IsNotNull(LocationID)], ReadSchema: struct<LocationID:int,Borough:string,Zone:string,service_zone:string>
You will likely see SortMergeJoin.
Explaination:
- Both sides are shuffled over the network.
- Computationally expensive.
7. Broadcast Join (Optimization)¶
joined_broadcast = revenue_by_zone.join(
broadcast(zones),
revenue_by_zone.PULocationID == zones.LocationID,
"inner"
)
joined_broadcast.show(5)
+------------+------------------+----------+---------+--------------------+------------+ |PULocationID| total_revenue|LocationID| Borough| Zone|service_zone| +------------+------------------+----------+---------+--------------------+------------+ | 148| 848404.3599999994| 148|Manhattan| Lower East Side| Yellow Zone| | 243| 30711.29000000003| 243|Manhattan|Washington Height...| Boro Zone| | 31|1023.6800000000001| 31| Bronx| Bronx Park| Boro Zone| | 137| 814154.0399999941| 137|Manhattan| Kips Bay| Yellow Zone| | 85|19080.979999999996| 85| Brooklyn| Erasmus| Boro Zone| +------------+------------------+----------+---------+--------------------+------------+ only showing top 5 rows
Check plan:
joined_broadcast.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [PULocationID#7], [LocationID#37], Inner, BuildRight, false
:- HashAggregate(keys=[PULocationID#7], functions=[sum(total_amount#16)])
: +- Exchange hashpartitioning(PULocationID#7, 200), ENSURE_REQUIREMENTS, [plan_id=495]
: +- HashAggregate(keys=[PULocationID#7], functions=[partial_sum(total_amount#16)])
: +- Filter isnotnull(PULocationID#7)
: +- FileScan parquet [PULocationID#7,total_amount#16] Batched: true, DataFilters: [isnotnull(PULocationID#7)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/ankush/workplace/da_colab/DS614/docs/notebooks/taxi_data/ye..., PartitionFilters: [], PushedFilters: [IsNotNull(PULocationID)], ReadSchema: struct<PULocationID:int,total_amount:double>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=498]
+- Filter isnotnull(LocationID#37)
+- FileScan csv [LocationID#37,Borough#38,Zone#39,service_zone#40] Batched: false, DataFilters: [isnotnull(LocationID#37)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/ankush/workplace/da_colab/DS614/docs/notebooks/taxi_data/ta..., PartitionFilters: [], PushedFilters: [IsNotNull(LocationID)], ReadSchema: struct<LocationID:int,Borough:string,Zone:string,service_zone:string>
Now it should show BroadcastHashJoin.
Explaination:
- Small dimension table replicated to all executors.
- No shuffle on the large dataset.
- A dramatic optimization moment!
8. CSV → Parquet (Columnar Storage)¶
trips.write.mode("overwrite").parquet("taxi_parquet")
Read Parquet:
trips_parquet = spark.read.parquet("taxi_parquet")
Test column pruning:
trips_parquet.select("total_amount").count()
3475226
Explaination:
- Only one column read from disk!
- Predicate pushdown possible (filtering at the file level).
- Huge advantage of Columnar storage (Apache Parquet).
9. Caching & Persistence¶
Run aggregation twice without cache:
%timeit trips_parquet.groupBy("PULocationID").sum("total_amount").count()
%timeit trips_parquet.groupBy("PULocationID").sum("total_amount").count()
131 ms ± 18.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) 96.2 ms ± 4.97 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
Now cache it:
trips_parquet.persist(StorageLevel.MEMORY_ONLY)
# materialize cache
trips_parquet.count()
3475226
Run aggregation again:
%timeit trips_parquet.groupBy("PULocationID").sum("total_amount").count()
336 ms ± 33.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
👉 Open Spark UI → Storage tab
Explaination:
- Lazy caching (only cached when an action is called).
- Storage vs execution memory.
- Reuse of cached data avoids re-reading from disk/recomputing.
10. RDD vs DataFrame Comparison (Quick Contrast)¶
RDD version (Manual & Unoptimized):
rdd = spark.sparkContext.textFile("taxi_data/yellow_tripdata_sample.csv")
header = rdd.first()
rdd_no_header = rdd.filter(lambda x: x != header)
rdd_parsed = rdd_no_header.map(lambda line: line.split(","))
# total_amount assumed column index 16 (adjust if needed based on your CSV)
revenue_rdd = rdd_parsed.map(lambda x: (x[7], float(x[16]) if len(x) > 16 and x[16] else 0.0)).reduceByKey(lambda a, b: a + b)
%timeit revenue_rdd.take(5)
150 ms ± 2.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Explaination:
- No optimizer (Catalyst can't see inside python lambda functions).
- Manual parsing overhead.
- No column pruning (reads entire row).
- Harder to maintain compared to DataFrame elegance.
11. Structured Streaming Demo (Mini)¶
import os
os.makedirs("incoming_trips", exist_ok=True)
stream_df = spark.readStream.schema(trips.schema).csv("incoming_trips")
Simple aggregation:
stream_agg = stream_df.groupBy("PULocationID").sum("total_amount")
query = stream_agg.writeStream.outputMode("complete").format("console").start()
------------------------------------------- Batch: 0 ------------------------------------------- +------------+------------------+ |PULocationID| sum(total_amount)| +------------+------------------+ | 148| 49.33| | 137| 17.1| | 211| 57.83| | 236| 47.64| | 232| 24.8| | 246|107.64000000000001| | 224| 25.3| | 140| 72.58999999999999| | 132| 227.47| | NULL| NULL| | 142| 142.97| | 164| 43.55| | 48| 24.5| | 43| 19.2| | 107| 35.78| | 263| 79.56| | 231| 37.95| | 229| 73.2| | 114| 94.21000000000001| | 161| 63.55| +------------+------------------+ only showing top 20 rows
------------------------------------------- Batch: 1 ------------------------------------------- +------------+------------------+ |PULocationID| sum(total_amount)| +------------+------------------+ | 148| 98.66| | 137| 34.2| | 211| 115.66| | 236| 95.28| | 232| 49.6| | 246|215.28000000000003| | 224| 50.6| | 140|145.17999999999998| | 132| 454.94| | NULL| NULL| | 142| 285.94| | 164| 87.1| | 48| 49.0| | 43| 38.4| | 107| 71.56| | 263| 159.12| | 231| 75.9| | 229| 146.4| | 114|188.42000000000002| | 161| 127.1| +------------+------------------+ only showing top 20 rows
👉 Action: Now copy one CSV file into the incoming_trips/ folder.
Check:
- Micro-batch execution in real-time.
- Same DataFrame API used for batch and streaming!
- Different underlying engine (Spark Structured Streaming).
Stop the stream when done:
query.stop()
What We covered¶
✔ Driver vs Executor → Spark UI
✔ DAG → explain()
✔ Lazy Evaluation → transformations before action
✔ Narrow vs Wide → filter vs groupBy
✔ Shuffle → aggregation + join
✔ Partitions → repartition()
✔ RDD vs DataFrame → manual vs optimized
✔ Catalyst → optimized physical plan
✔ Parquet → column pruning
✔ Broadcast → join strategy change
✔ Caching → Storage tab
✔ Structured Streaming → same API, streaming mode
✔ Spark UI debugging → stages + skew