# Python for Data Pipelines ## 1. Memory Management: Generators vs Lists ``` File Size Approach Small (<100MB) --> pd.read_csv() or open().read() Medium (100MB-1GB) --> pd.read_csv(chunksize=N) Large (>1GB) --> Generator: for line in open() Compressed (.gz) --> gzip.open('file.gz', 'rt') ``` ### List vs Generator Comparison | Approach | Memory | Syntax | Use When | | :--- | :--- | :--- | :--- | | List comprehension | All in RAM at once | `[x for x in f]` | Small data, need indexing | | Generator expression | One item at a time | `(x for x in f)` | Large files, sequential processing | | `readlines()` | All in RAM ❌ | `f.readlines()` | Never for large files | ### Generator with `yield` ```python def process_logs(filepath): """Generator: yields one parsed entry at a time - O(1) memory""" with open(filepath, 'r') as f: for line in f: line = line.strip() if line: yield parse_line(line) # one at a time, never loads all # Usage - works for files of ANY size for entry in process_logs('access.log'): if entry['status'] == 404: print(entry['url']) # Memory: ~few KB regardless of file size ✅ # readlines(): proportional to file size, crashes on 10GB ❌ ``` --- ## 2. `pd.read_csv()` Large File Options ```python # Chunked processing - most memory efficient for large CSVs total = 0 for chunk in pd.read_csv('large.csv', chunksize=10000): total += chunk['revenue'].sum() # Useful parameters pd.read_csv( 'large.csv', chunksize=10000, # rows per chunk ✅ usecols=['a', 'b', 'c'], # load only needed columns (saves memory) dtype={'id': str}, # prevent auto-conversion of IDs to int nrows=100, # load only first N rows (for testing) encoding='utf-8', sep='\t', # tab-separated low_memory=False, # better dtype inference skiprows=range(1, 10), # skip specific rows parse_dates=['date_col'] # auto-parse datetime columns ) ``` --- ## 3. File I/O: `gzip`, `open`, `pathlib` ```python import gzip from pathlib import Path # Read gzipped text file (generator pattern) ✅ with gzip.open('logfile.gz', 'rt') as f: # 'rt' = read text for line in f: process(line) # pandas auto-detects .gz extension df = pd.read_csv('data.csv.gz') # File modes open('file.txt', 'r') # read text (default) open('file.txt', 'w') # write (creates or overwrites) open('file.txt', 'a') # append open('file.txt', 'x') # exclusive create (fails if exists) open('file.txt', 'rb') # read binary open('file.txt', 'wb') # write binary # pathlib - OS-independent path handling ✅ from pathlib import Path p = Path('data') / 'subfolder' / 'file.txt' # path joining p.exists() # True/False p.is_file() p.is_dir() p.name # 'file.txt' p.stem # 'file' p.suffix # '.txt' p.parent # Path('data/subfolder') p.read_text() # read file content as string p.write_text("x") # write string to file p.mkdir(parents=True, exist_ok=True) # create directory Path('.').glob('**/*.csv') # find all CSV files recursively ✅ ``` --- ## 4. Atomic Writes & `shutil` ```python import tempfile, os, shutil # ✅ Atomic write pattern - prevents partial/corrupt output files def atomic_write(data, final_path): """Write to temp first, then atomic move to final""" fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(final_path)) try: with os.fdopen(fd, 'w') as tmp: tmp.write(data) # os.replace is atomic at OS level os.replace(temp_path, final_path) # ✅ all-or-nothing except Exception: os.remove(temp_path) # clean up on failure raise # shutil operations shutil.copy('src.txt', 'dst.txt') # copy file shutil.copy2('src.txt', 'dst.txt') # copy with metadata shutil.copytree('src_dir/', 'dst_dir/') # copy directory shutil.move('temp.csv', 'final.csv') # atomic move ✅ shutil.rmtree('directory/') # delete directory + contents ``` --- ## 5. `try-except-finally` Execution Order (Exam Critical) ```python # Flow with NO exception: # try -> else (if present) -> finally ✅ # Flow WITH exception: # try -> except -> finally ✅ # finally ALWAYS runs, even if there is a return in try/except ✅ try: result = risky_operation() except FileNotFoundError: logging.error("File missing") # specific exception ✅ except ValueError as e: logging.error(f"Bad value: {e}") except Exception as e: # catch-all (use sparingly) logging.error(f"Unexpected: {e}") else: print("Success!") # only if NO exception finally: connection.close() # ALWAYS runs ✅ (DB/file cleanup) ``` ### Exception Types | Exception | When | | :--- | :--- | | `ValueError` | Wrong value type or format | | `FileNotFoundError` | File doesn't exist | | `KeyError` | Dictionary key missing | | `TypeError` | Wrong data type | | `ZeroDivisionError` | Division by zero | | `IndexError` | List index out of range | | `AttributeError` | Object has no such attribute | | `ImportError` | Module not found | - ❌ `try-catch-finally` is Java/JavaScript, NOT Python - ❌ Generic `except Exception` hides bugs; use specific types first --- ## 6. `logging` vs `print` ```python import logging # Configure once at start logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filename='pipeline.log' # log to file ) # Log levels (low to high) logging.debug("Debug detail") # development only logging.info("Pipeline started") # general flow ✅ logging.warning("Missing data") # something unexpected logging.error("API call failed") # serious problem ✅ logging.critical("DB crashed") # system failure # In data pipelines: try: df = pd.read_csv('data.csv') except FileNotFoundError: logging.error("Data file missing") except pd.errors.EmptyDataError: logging.error("File is empty") finally: logging.info("Pipeline step complete") ``` --- ## 7. `os` Module Reference ```python import os # Existence checks os.path.exists('file.txt') # True if exists (file or dir) os.path.isfile('file.txt') # True if file os.path.isdir('folder/') # True if directory # Path manipulation os.path.join('folder', 'file.txt') # 'folder/file.txt' (OS-safe) ✅ os.path.basename('/path/file.txt') # 'file.txt' os.path.dirname('/path/file.txt') # '/path' os.path.splitext('file.txt') # ('file', '.txt') os.path.abspath('file.txt') # full absolute path # File info os.path.getsize('file.txt') # size in bytes os.path.getmtime('file.txt') # last modified time # Directory ops os.listdir('.') # list directory os.makedirs('path/to/dir', exist_ok=True) # create dirs os.remove('file.txt') # delete file os.rename('old.txt', 'new.txt') # rename file # Environment os.getcwd() # current working directory os.environ.get('API_KEY') # get env variable ✅ os.getenv('API_KEY', 'default') # with default value ``` --- ## 8. `uv` - Modern Package Management ```bash # Project setup uv init project_name --python 3.12 # create new project uv add pandas requests openai # add dependencies uv remove requests # remove dependency uv run script.py # run script in venv uv lock # create uv.lock (commit this!) ✅ uv sync # install from uv.lock # Equivalent to pip uv pip install requests uv pip install -r requirements.txt ``` - `uv.lock` ensures reproducibility (like `package-lock.json` in Node) ✅ --- ## 9. Datetime & Log Parsing (Exam Critical) ```python from datetime import datetime # Parse Apache log timestamp ✅ (most exam-relevant) ts = datetime.strptime("14/Dec/2024:16:45:11 -0500", "%d/%b/%Y:%H:%M:%S %z") # Extract components ts.hour # 16 ts.minute # 45 ts.second # 11 ts.weekday() # 0=Mon, 1=Tue, 2=Wed, 3=Thu, 4=Fri, 5=Sat, 6=Sun ✅ ts.date() # date part ts.time() # time part ``` ### Format Code Reference | Code | Meaning | Example | | :--- | :--- | :--- | | `%d` | Day (zero-padded) | 01-31 | | `%m` | Month as number | 01-12 | | `%b` | Abbreviated month | Jan, Dec | | `%B` | Full month name | January | | `%Y` | 4-digit year | 2024 | | `%H` | Hour 24h | 00-23 | | `%M` | Minutes | 00-59 | | `%S` | Seconds | 00-59 | | `%z` | UTC offset ✅ | -0500, +0530 | | `%Z` | Timezone name | EST, UTC | | `%A` | Full weekday | Monday | | `%a` | Short weekday | Mon | ### Time Range Filtering (Exam Critical) ```python # 16:00 to 18:59 -> 16 <= hour < 19 ✅ (NOT <= 18, includes 18:59) # 14:00 to 16:59 -> 14 <= hour < 17 ✅ # ❌ Common wrong answers: # 16 <= hour <= 16 # only catches hour 16 # 16 < hour < 19 # misses 16:xx entries # hour in range(16, 20) # includes 19:xx ``` --- ## 10. Apache Log Parsing ```python import gzip, shlex from datetime import datetime def parse_log_line(line): """Parse Apache log using shlex to handle quoted fields correctly""" try: parts = shlex.split(line) # ✅ handles quoted User-Agent correctly return { 'ip': parts[0], 'timestamp': datetime.strptime(parts[3].strip('[]'), '%d/%b/%Y:%H:%M:%S %z'), 'method': parts[5], # from "GET /url HTTP/1.1" -> split(' ')[0] 'url': parts[6], # split(' ')[1] ✅ 'protocol': parts[7], # split(' ')[2] ✅ 'status': int(parts[8]), 'size': parts[9], 'user_agent': parts[11] } except: return None # Handle both plain and gzipped logs def analyze_log(filepath): opener = gzip.open if filepath.endswith('.gz') else open with opener(filepath, 'rt') as f: for line in f: # generator pattern ✅ entry = parse_log_line(line.strip()) if entry: yield entry ``` ### Request Field Splitting ```python request = "GET /checkout/payment HTTP/1.1" parts = request.split(' ') method = parts[0] # 'GET' url = parts[1] # '/checkout/payment' ✅ protocol = parts[2] # 'HTTP/1.1' ✅ # URL matching url.startswith('/tamilmp3/') # ✅ prefix match (exam answer) '/tamilmp3/' in url # ❌ matches anywhere, not just start ``` --- ## 11. String Operations ```python s.lower() # 'HELLO' -> 'hello' s.upper() # 'hello' -> 'HELLO' s.title() # 'hello world' -> 'Hello World' s.strip() # remove leading + trailing whitespace ✅ s.lstrip() / s.rstrip() # left or right only s.startswith('pre') # True/False ✅ s.endswith('suf') # True/False 'sub' in s # True/False s.find('sub') # index or -1 s.count('sub') # occurrences s.replace('old','new') # replace all s.split(',') # split to list ','.join(['a','b']) # join list to string s.zfill(5) # '42' -> '00042' f"Hello {name}" # f-string ✅ ``` --- ## 12. Regular Expressions ```python import re re.search(pattern, string) # find first match anywhere re.match(pattern, string) # match at START only re.findall(pattern, string) # return all matches as list ✅ re.sub(pattern, replacement, string) # replace all matches re.split(pattern, string) # split by pattern # Common patterns \d # digit (0-9) \d+ # one or more digits \w # word char (letter/digit/underscore) \s # whitespace . # any char except newline ^ # start of string $ # end of string [A-Z] # any uppercase letter (a|b) # a or b (alternation) * # zero or more + # one or more ? # zero or one {3} # exactly 3 {2,5} # between 2 and 5 # Log-relevant patterns re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', line) # extract IP re.search(r'"(GET|POST|PUT|DELETE)', line) # HTTP method re.search(r'" (\d{3}) ', line) # status code re.compile(r'(0[89]|1[0-5]):[0-5][0-9]') # time 08:xx-15:xx ``` --- ## 13. Haversine & Geospatial ```python import math EARTH_RADIUS_KM = 6371 # ✅ TDS standard constant def haversine(lat1, lon1, lat2, lon2): """Returns distance in km between two coordinates""" lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2]) # ✅ must convert to radians dlat = lat2 - lat1 dlon = lon2 - lon1 a = math.sin(dlat/2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon/2)**2 return 2 * EARTH_RADIUS_KM * math.asin(math.sqrt(a)) # Find nearest warehouse def nearest(lat, lon, warehouses): return min(warehouses, key=lambda w: haversine(lat, lon, w['lat'], w['lon'])) ``` --- ## 14. Common Python Traps (Exam Critical) | Trap | Wrong | Correct | | :--- | :--- | :--- | | Mutable default arg | `def f(a=[])` | `def f(a=None): if a is None: a=[]` | | `is` vs `==` | `x is "hello"` (identity check) | `x == "hello"` (value check) ✅ | | `range` in Python 3 | Assumes it's a list | It's a lazy generator, no memory allocation | | `finally` with `return` | Thinks `return` skips `finally` | `finally` ALWAYS runs ✅ | | readlines on large file | `f.readlines()` | `for line in f:` ✅ | | `collections.Counter` | Manual dict counting | `Counter(items).most_common(5)` ✅ | --- ## 15. `collections` Module ```python from collections import Counter, defaultdict, OrderedDict # Counter - frequency counting ✅ c = Counter(['a', 'b', 'a', 'c', 'a']) c.most_common(2) # [('a', 3), ('b', 1)] # Count URL segments in logs Counter( url.split('/')[2] for url in urls if len(url.split('/')) > 2 ) # defaultdict - no KeyError on missing keys dd = defaultdict(int) dd['missing_key'] += 1 # no error, starts at 0 # OrderedDict (less needed in Python 3.7+ where dicts preserve order) od = OrderedDict() ``` --- ## 16. Quick Reference ```python # JSON: string vs file data = json.loads(str_data) # from string ✅ data = json.load(file_obj) # from file object ✅ json.dumps(data) # to string json.dump(data, file_obj) # to file # Environment variables (never hardcode keys!) import os key = os.getenv('API_KEY') # ✅ key = os.environ.get('API_KEY', 'default') # List all CSV files recursively from pathlib import Path csv_files = list(Path('data').glob('**/*.csv')) # Memory profiling import sys sys.getsizeof([1,2,3]) # size of object del large_data # free memory import gc; gc.collect() # force garbage collection ```