DataFrames & Spark SQL
Apache Spark: DataFrames & Spark SQL Apache Spark is a unified analytics engine for large-scale data processing. DataFrames are distributed collections of data …
Apache Spark: DataFrames & Spark SQL
Apache Spark is a unified analytics engine for large-scale data processing. DataFrames are distributed collections of data organized into named columns — like a database table or a pandas DataFrame but distributed across a cluster.
SparkSession & Loading Data
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
spark = SparkSession.builder .appName("MyApp") .config("spark.sql.adaptive.enabled", "true") .getOrCreate()
# Read from various sources
df = spark.read.csv("s3://bucket/data/*.csv", header=True, inferSchema=True)
df = spark.read.parquet("hdfs:///data/orders/")
df = spark.read.json("s3://bucket/events/")
df = spark.read.orc("/data/sales/")
# Define schema explicitly (faster than inferSchema for large files)
schema = StructType([
StructField("order_id", IntegerType(), nullable=False),
StructField("user_id", StringType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("status", StringType(), nullable=True),
])
df = spark.read.csv("orders.csv", header=True, schema=schema)
# From JDBC
df = spark.read.jdbc(
url="jdbc:postgresql://host:5432/mydb",
table="orders",
properties={"user": "user", "password": "pass", "driver": "org.postgresql.Driver"}
)
# Write
df.write.mode("overwrite").parquet("output/orders/")
df.write.mode("append").partitionBy("year", "month").parquet("output/partitioned/")
df.write.format("delta").mode("overwrite").save("output/delta/")DataFrame Operations
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Select & rename
df.select("order_id", "amount", "status")
df.select(F.col("amount").alias("total"), F.col("status").cast("string"))
# Filter
df.filter(F.col("amount") > 100)
df.filter((F.col("status") == "completed") & (F.col("amount") > 50))
df.where("status = 'completed' AND amount > 50") # SQL string
# Add/transform columns
df.withColumn("tax", F.col("amount") * 0.1)
df.withColumn("year", F.year(F.col("created_at")))
df.withColumn("log_amount", F.log(F.col("amount")))
df.withColumn("status_upper", F.upper(F.col("status")))
df.withColumnRenamed("user_id", "userId")
# Aggregations
df.groupBy("status").agg(
F.count("*").alias("count"),
F.sum("amount").alias("total"),
F.avg("amount").alias("avg"),
F.max("amount").alias("max_amount"),
F.collect_list("order_id").alias("order_ids"),
F.countDistinct("user_id").alias("unique_users"),
)
# Joins
orders.join(users, orders.user_id == users.id, how="inner")
orders.join(users, "user_id", how="left") # same column name shorthand
orders.join(
F.broadcast(small_table), # broadcast hint for small table
"key", "left"
)
# Window functions
window = Window.partitionBy("user_id").orderBy(F.desc("created_at"))
df.withColumn("rank", F.rank().over(window)) .withColumn("cumsum", F.sum("amount").over(window.rowsBetween(Window.unboundedPreceding, 0))) .withColumn("prev_amount", F.lag("amount", 1).over(window))
# Sort, limit, distinct
df.orderBy(F.desc("amount"))
df.orderBy("status", F.asc("amount"))
df.limit(100)
df.distinct()
df.dropDuplicates(["user_id"])
# Inspect
df.show(20, truncate=False)
df.printSchema()
df.describe().show() # basic stats
df.count()Spark SQL
# Register as temp view and query with SQL
df.createOrReplaceTempView("orders")
users.createOrReplaceTempView("users")
result = spark.sql("""
SELECT
u.name,
COUNT(o.order_id) AS order_count,
SUM(o.amount) AS total_spent,
RANK() OVER (ORDER BY SUM(o.amount) DESC) AS spending_rank
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE o.status = 'completed'
AND o.created_at >= DATE_SUB(CURRENT_DATE(), 30)
GROUP BY u.id, u.name
HAVING SUM(o.amount) > 500
ORDER BY total_spent DESC
LIMIT 100
""")
result.show()
# Use catalog for persistent tables (Hive metastore)
df.write.saveAsTable("mydb.orders")
spark.sql("SELECT COUNT(*) FROM mydb.orders").show()