# Modern Data Stack > Parquet + DuckDB + Polars + PyArrow form the modern analytics stack replacing slow CSV workflows. This cheatsheet covers the full picture: columnar storage, in-process SQL, and blazing-fast DataFrames. --- ## 1. Why the Modern Stack? ``` Old workflow: CSV → pandas.read_csv() → slow load → OOM on 10GB files New workflow: Parquet → DuckDB/Polars → lazy execution → 10x faster, 5x less memory Key wins: Parquet → columnar storage, 5-10x smaller than CSV, typed, compressed DuckDB → SQL on ANY file (CSV/Parquet/JSON) without loading it into RAM Polars → DataFrame API, lazy evaluation, parallelized, Rust-backed PyArrow → shared in-memory format between all three (zero-copy) ``` | Tool | Role | Best For | | :--- | :--- | :--- | | **Parquet** | File format | Storing/sharing large datasets | | **DuckDB** | In-process SQL engine | Ad-hoc queries, joins, aggregations on files | | **Polars** | DataFrame library | Pandas replacement, transformations, pipelines | | **PyArrow** | In-memory columnar format | Data interchange, fast I/O, flight protocol | | **Delta Lake** | ACID table format on Parquet | Mutable data lakes with history | --- ## 2. Parquet : Columnar File Format ### Why Parquet vs CSV? | Feature | CSV | Parquet | | :--- | :--- | :--- | | **Storage** | Row-oriented | Columnar (reads only needed columns) ✅ | | **Compression** | None | Snappy/Gzip/Zstd built-in (3-10x smaller) ✅ | | **Schema** | None (inferred) | Typed schema embedded ✅ | | **Read speed** | Full file always | Column pruning + predicate pushdown ✅ | | **Nulls** | String "null" | Native null handling | | **Splittable** | Yes | Yes (row groups) | | **Human-readable** | Yes | No (binary) | ``` Columnar advantage example: SELECT avg(salary) FROM employees.csv → reads ALL columns, ALL rows SELECT avg(salary) FROM employees.parquet → reads ONLY salary column ✅ ``` ### Read/Write with Pandas ```python import pandas as pd # Read df = pd.read_parquet("data.parquet") df = pd.read_parquet("data.parquet", columns=["name", "salary"]) # column pruning ✅ df = pd.read_parquet("data.parquet", engine="pyarrow") # default df = pd.read_parquet("data.parquet", engine="fastparquet") # Write df.to_parquet("output.parquet") df.to_parquet("output.parquet", compression="snappy") # default: snappy ✅ df.to_parquet("output.parquet", compression="gzip") # smaller, slower df.to_parquet("output.parquet", compression="zstd") # best ratio+speed ✅ df.to_parquet("output.parquet", index=False) # don't write row index # Partitioned Parquet (Hive-style) df.to_parquet("output/", partition_cols=["year", "country"]) # Creates: output/year=2024/country=India/part-0.parquet # output/year=2024/country=USA/part-0.parquet ... pd.read_parquet("output/", filters=[("year", "==", 2024)]) # reads only matching partitions ✅ ``` ### Read/Write with PyArrow ```python import pyarrow as pa import pyarrow.parquet as pq # Read table = pq.read_table("data.parquet") table = pq.read_table("data.parquet", columns=["name", "age"]) # Convert to/from pandas df = table.to_pandas() table = pa.Table.from_pandas(df) # Write pq.write_table(table, "output.parquet", compression="snappy") # Write partitioned pq.write_to_dataset(table, root_path="output/", partition_cols=["country"]) # Read metadata without loading data meta = pq.read_metadata("data.parquet") print(meta.num_rows, meta.num_row_groups) schema = pq.read_schema("data.parquet") print(schema) # Streaming large files row-group by row-group pf = pq.ParquetFile("large.parquet") for batch in pf.iter_batches(batch_size=10000): df_chunk = batch.to_pandas() # process chunk ``` --- ## 3. DuckDB : SQL on Files DuckDB is an in-process OLAP database. No server, no setup : just SQL on any file. ### Core Usage ```python import duckdb # Query CSV directly : no loading! ✅ result = duckdb.sql("SELECT country, COUNT(*) as n FROM 'data.csv' GROUP BY country ORDER BY n DESC") df = result.df() # → pandas DataFrame arrow = result.arrow() # → PyArrow Table polars_df = result.pl() # → Polars DataFrame # Query Parquet result = duckdb.sql("SELECT avg(salary) FROM 'employees.parquet' WHERE department = 'Engineering'") # Query JSON duckdb.sql("SELECT * FROM 'data.json' LIMIT 10") duckdb.sql("SELECT * FROM 'data.jsonl' LIMIT 10") # line-delimited JSON # Query a folder of Parquet files (glob) duckdb.sql("SELECT * FROM 'data/year=2024/**/*.parquet' WHERE country = 'India'") # Query existing pandas DataFrame (magic variable substitution) import pandas as pd df = pd.read_csv("data.csv") result = duckdb.sql("SELECT * FROM df WHERE age > 30") # df is the variable ✅ ``` ### Persistent Database ```python # In-memory (default) : lost after session con = duckdb.connect() # Persistent : saves to file con = duckdb.connect("analytics.db") con.execute(""" CREATE TABLE IF NOT EXISTS sales AS SELECT * FROM 'sales_2024.parquet' """) con.execute("INSERT INTO sales VALUES (1, 'Product A', 150.0)") con.execute("UPDATE sales SET price = 200.0 WHERE id = 1") # Query rows = con.execute("SELECT * FROM sales LIMIT 5").fetchdf() con.close() ``` ### DuckDB Power Features ```python import duckdb # Aggregate multiple files duckdb.sql(""" SELECT year, month, SUM(revenue) FROM 'sales/year=*/month=*/*.parquet' GROUP BY year, month """) # Window functions duckdb.sql(""" SELECT name, salary, RANK() OVER (PARTITION BY dept ORDER BY salary DESC) as rank, AVG(salary) OVER (PARTITION BY dept) as dept_avg FROM 'employees.parquet' """) # Export results duckdb.sql("COPY (SELECT * FROM 'input.csv' WHERE year=2024) TO 'output.parquet' (FORMAT PARQUET)") duckdb.sql("COPY result_table TO 'output.csv' (HEADER, DELIMITER ',')") # Spatial extension duckdb.sql("INSTALL spatial; LOAD spatial;") duckdb.sql(""" SELECT ST_Distance( ST_Point(77.5946, 12.9716), -- Bangalore ST_Point(72.8777, 19.0760) -- Mumbai ) """) # HTTP: query remote files duckdb.sql("SELECT * FROM 'https://example.com/data.parquet' LIMIT 10") # S3 / GCS duckdb.sql("INSTALL httpfs; LOAD httpfs;") duckdb.sql("SET s3_region='us-east-1'; SET s3_access_key_id='...'") duckdb.sql("SELECT * FROM 's3://my-bucket/data/*.parquet'") ``` ### DuckDB vs SQLite | Feature | DuckDB | SQLite | | :--- | :--- | :--- | | **Optimized for** | Analytics (OLAP) | Transactional (OLTP) | | **Parallel queries** | Multi-core ✅ | Single-threaded | | **Query CSV/Parquet** | Native ✅ | No | | **Columnar execution** | Yes ✅ | No | | **Best for** | Analytics, aggregations | Small apps, mobile | --- ## 4. Polars : Fast DataFrames Polars is a Rust-backed DataFrame library with lazy evaluation and a clean expression API. ### Eager vs Lazy ```python import polars as pl # Eager : executes immediately (like pandas) df = pl.read_csv("data.csv") result = df.filter(pl.col("age") > 25).group_by("country").agg(pl.col("salary").mean()) # Lazy : builds a query plan, executes on .collect() lf = pl.scan_csv("data.csv") # scan_* = lazy ✅ result = ( lf .filter(pl.col("age") > 25) .group_by("country") .agg(pl.col("salary").mean().alias("avg_salary")) .sort("avg_salary", descending=True) .limit(10) .collect() # execute! ✅ ) ``` ### Read/Write ```python import polars as pl # Read df = pl.read_csv("data.csv") df = pl.read_parquet("data.parquet") df = pl.read_json("data.json") df = pl.read_excel("data.xlsx", sheet_name="Sheet1") # Lazy reads (recommended for large files) lf = pl.scan_csv("data.csv") lf = pl.scan_parquet("data.parquet") lf = pl.scan_parquet("data/year=2024/**/*.parquet") # glob ✅ # Write df.write_parquet("output.parquet", compression="snappy") df.write_csv("output.csv") df.write_json("output.json") ``` ### Expressions : The Core Concept ```python df = pl.read_csv("data.csv") # Select columns df.select(["name", "age"]) df.select(pl.col("age") * 2) # transform df.select(pl.col("^age.*$")) # regex column selection df.select(pl.all().exclude("id")) # all except id # Filter df.filter(pl.col("age") > 25) df.filter((pl.col("age") > 25) & (pl.col("country") == "India")) df.filter(pl.col("name").str.starts_with("A")) # With new/transformed columns df.with_columns( (pl.col("salary") * 1.1).alias("salary_new"), # 10% raise pl.col("name").str.to_uppercase().alias("name_upper"), pl.col("date").str.to_date("%Y-%m-%d"), ) # GroupBy + Aggregations df.group_by("country").agg( pl.col("salary").mean().alias("avg_salary"), pl.col("salary").max().alias("max_salary"), pl.len().alias("count"), pl.col("name").n_unique().alias("unique_employees") ) # Sort df.sort("salary", descending=True) df.sort(["country", "salary"], descending=[False, True]) # Join orders = pl.read_parquet("orders.parquet") df.join(orders, on="customer_id", how="left") # inner, left, outer, cross, semi, anti # Null handling df.drop_nulls() df.fill_null(0) df.fill_null(strategy="forward") df.filter(pl.col("salary").is_not_null()) ``` ### String Operations ```python df.with_columns( pl.col("email").str.contains("@gmail"), pl.col("name").str.to_lowercase(), pl.col("date_str").str.to_date("%d/%m/%Y"), pl.col("text").str.replace_all(r"\s+", " "), # regex replace pl.col("code").str.slice(0, 3), # substring pl.col("full_name").str.split(" ").list.get(0).alias("first_name") ) ``` ### Window Functions ```python df.with_columns( pl.col("salary").mean().over("department").alias("dept_avg"), pl.col("salary").rank(descending=True).over("department").alias("rank"), pl.col("salary").cumsum().over("department").alias("cumulative"), ) ``` ### Polars vs Pandas | Feature | Polars | Pandas | | :--- | :--- | :--- | | **Speed** | 5-50x faster ✅ | Baseline | | **Memory** | Lower (columnar, no copy) ✅ | Higher | | **Parallelism** | Multi-core by default ✅ | Single-threaded | | **Lazy eval** | Yes (`scan_*` + `.collect()`) ✅ | No | | **Null handling** | Native null (not NaN) | NaN (float-based) | | **Index** | No index ✅ (simpler) | Row index | | **API consistency** | Expression-based, uniform | Mixed, complex | | **Ecosystem** | Growing | Mature | --- ## 5. PyArrow : The Glue Layer PyArrow is the in-memory columnar format shared by Pandas, Polars, DuckDB, and Parquet. ```python import pyarrow as pa import pyarrow.compute as pc # Create Arrow table table = pa.table({ "name": ["Alice", "Bob", "Carol"], "age": [30, 25, 35], "salary": [90000.0, 75000.0, 110000.0] }) # Schema print(table.schema) # name: string # age: int64 # salary: double # Column operations (vectorized) ages = table["age"] filtered = pc.filter(table, pc.greater(ages, 27)) # age > 27 doubled = pc.multiply(table["salary"], 2) # Convert df_pandas = table.to_pandas() df_polars = pl.from_arrow(table) table_from_pandas = pa.Table.from_pandas(df_pandas) # zero-copy where possible ✅ # ChunkedArray operations chunked = table.column("salary") print(chunked.sum()) # pc.sum print(chunked.mean()) # pc.mean # Cast types table = table.set_column( table.schema.get_field_index("age"), "age", table["age"].cast(pa.int32()) ) # Append rows (Arrow tables are immutable → concatenate) new_rows = pa.table({"name": ["Dave"], "age": [28], "salary": [80000.0]}) combined = pa.concat_tables([table, new_rows]) ``` --- ## 6. Data Validation with Pandera ```python import pandera as pa import pandas as pd # Define schema schema = pa.DataFrameSchema({ "id": pa.Column(int, pa.Check.greater_than(0)), "name": pa.Column(str, pa.Check.str_length(min_value=1)), "age": pa.Column(int, pa.Check.in_range(0, 150)), "email": pa.Column(str, pa.Check.str_matches(r"^[\w.-]+@[\w.-]+\.\w+$")), "salary": pa.Column(float, pa.Check.greater_than_or_equal_to(0), nullable=True), }) df = pd.read_csv("employees.csv") validated = schema.validate(df) # raises SchemaError if invalid ✅ # Coerce types automatically schema = pa.DataFrameSchema({ "age": pa.Column(int, coerce=True), # "30" → 30 }) ``` --- ## 7. Complete Pipeline Example ```python import duckdb import polars as pl import pyarrow.parquet as pq # Step 1: DuckDB : aggregate raw CSV to Parquet duckdb.sql(""" COPY ( SELECT country, strftime(order_date, '%Y-%m') as month, SUM(amount) as total_revenue, COUNT(*) as num_orders FROM 'raw_orders/*.csv' WHERE order_date >= '2024-01-01' GROUP BY country, month ) TO 'aggregated.parquet' (FORMAT PARQUET, COMPRESSION 'snappy') """) # Step 2: Polars : transform and enrich result = ( pl.scan_parquet("aggregated.parquet") .with_columns( pl.col("total_revenue").round(2), (pl.col("total_revenue") / pl.col("num_orders")).alias("avg_order_value") ) .filter(pl.col("total_revenue") > 1000) .sort("total_revenue", descending=True) .collect() ) result.write_parquet("final_report.parquet") # Step 3: Pandas : final analysis df = pd.read_parquet("final_report.parquet") df.groupby("country")["total_revenue"].sum().plot(kind="bar") ``` --- ## 8. Quick Reference ``` File format decision tree: Sharing data? → Parquet (typed, compressed, portable) Log/event stream? → Parquet partitioned by date Ad-hoc SQL analytics? → DuckDB on Parquet/CSV ETL pipeline in Python?→ Polars (lazy, fast) Final analysis/plots? → Pandas (rich ecosystem) Inter-tool exchange? → PyArrow (zero-copy) ``` | Operation | Tool | Code | | :--- | :--- | :--- | | Read CSV fast | Polars | `pl.scan_csv("f.csv").collect()` | | SQL on CSV | DuckDB | `duckdb.sql("SELECT * FROM 'f.csv'")` | | Save compressed | Pandas | `df.to_parquet("f.parquet", compression="snappy")` | | Column pruning | Pandas/Polars | `pd.read_parquet(f, columns=["a","b"])` | | GroupBy aggregate | Polars | `.group_by("x").agg(pl.col("y").mean())` | | Window function | Polars | `pl.col("y").mean().over("x")` | | Join two tables | DuckDB | `SELECT * FROM a JOIN b ON a.id = b.id` | | Convert Pandas↔Polars | Both | `pl.from_pandas(df)` / `df.to_pandas()` | | Convert Pandas↔Arrow | PyArrow | `pa.Table.from_pandas(df)` / `.to_pandas()` | | Validate schema | Pandera | `schema.validate(df)` |