GenAI Mastery Series · NLP · Databricks · LangChain
Categorizing Wikipedia at Scale with OpenAI, LangChain & Databricks
A complete walkthrough of a large-scale text classification pipeline built inside a Databricks notebook — from loading 10,000 Wikipedia articles to batch-classifying them into 50 categories using OpenAI’s language model via LangChain. Every step includes the real working code.
Overview
Pipeline architecture
The full pipeline runs end-to-end inside a single Databricks notebook. Wikipedia articles are loaded from HuggingFace, cleaned to first-line summaries, batched, and sent to GPT-4 via LangChain’s chain interface. Responses are parsed from JSON into a DataFrame.
HuggingFace
wikimedia/wikipedia dataset
Clean
First-line extraction
LangChain
Prompt + ChatOpenAI
Batch (8)
Rate-limit safe
DataFrame
id + category
Step 1
Install required packages
In a Databricks notebook, use %pip magic commands to install packages into the cluster. The %restart_python command refreshes the interpreter to pick up the new packages without restarting the whole cluster.
%pip install langchain_openai
%pip install --upgrade langchain_core langchain_openai
%restart_pythonStep 2
Import libraries
Standard Python utilities (json, time, os) combined with LangChain for the LLM interface, HuggingFace Datasets for Wikipedia data loading, and tqdm for progress visibility during batch processing.
import json
import time
import os
import getpass
import pandas as pd
from datasets import Dataset, load_dataset
from tqdm import tqdm
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAIStep 3
Load & clean the dataset
The HuggingFace wikimedia/wikipedia dataset is massive — we take a 10,000 article slice from the English November 2023 snapshot. The cleaning step extracts only the first line of each article (the summary sentence), which is sufficient for category classification and drastically reduces token usage.
# Load the Wikipedia English dataset (Nov 2023 snapshot)
dataset = load_dataset("wikimedia/wikipedia", "20231101.en")
# Take a 10k article sample
NUM_SAMPLES = 10000
articles = dataset["train"][:NUM_SAMPLES]["text"]
ids = dataset["train"][:NUM_SAMPLES]["id"]
# Clean: keep only the first line (article summary) to reduce tokens
articles = [x.split("\n")[0] for x in articles]
# Sanity check
print(len(articles)) # → 10000
print(articles[99]) # inspect a sample articleStep 4
Configure OpenAI + LangChain
Use getpass to securely prompt for the API key without echoing it to the notebook output. Then initialize ChatOpenAI — LangChain’s wrapper around the OpenAI Chat Completions API.
# Securely enter API key (won't echo to notebook output)
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter your OpenAI API key: ")
# Initialize the LangChain ChatOpenAI wrapper
llm = ChatOpenAI()
print(llm.model_name) # → "gpt-3.5-turbo" (default) or your configured modelStep 5 — Core Logic
Define the prompt template
The ChatPromptTemplate structures the conversation: a system message sets the classification task with all 50 categories, and the human message carries the article payload. The double curly braces {{ }} in the JSON schema escape the literal braces so LangChain doesn’t treat them as template variables.
prompt = ChatPromptTemplate.from_messages([
("system", """Your task is to assess the article and categorize it
into one of the following predefined categories:
'History', 'Geography', 'Science', 'Technology', 'Mathematics',
'Literature', 'Art', 'Music', 'Film', 'Television', 'Sports',
'Politics', 'Philosophy', 'Religion', 'Sociology', 'Psychology',
'Economics', 'Business', 'Medicine', 'Biology', 'Chemistry',
'Physics', 'Astronomy', 'Environmental Science', 'Engineering',
'Computer Science', 'Linguistics', 'Anthropology', 'Archaeology',
'Education', 'Law', 'Military', 'Architecture', 'Fashion',
'Cuisine', 'Travel', 'Mythology', 'Folklore', 'Biography',
'Social Issues', 'Human Rights', 'Technology Ethics',
'Climate Change', 'Conservation', 'Urban Studies', 'Demographics',
'Journalism', 'Cryptocurrency', 'Artificial Intelligence'
Output ONLY a JSON object — no extra text:
{{
"id": string,
"category": string
}}"""),
("human", "{input}")
])json.loads() parsing creates a simple but robust structured output pipeline.
Step 6
Build the chain & test it
LangChain’s pipe operator | composes the prompt template and the LLM into a reusable chain. One call to .invoke() with a single article validates the whole setup before committing to batch processing.
# Compose prompt → llm into a reusable chain
chain = prompt | llm
# Test with article[0] before running the full batch
content = json.dumps({"id": ids[0], "article": articles[0]})
response = chain.invoke(content)
print(response.content)
# → {"id": "1", "category": "History"}Step 7 — Core Loop
Batch processing with rate-limit handling
Processing 1,000 articles one-by-one would quickly hit OpenAI’s requests-per-minute limit. The solution: accumulate inputs into batches of 8 and call .batch() with a 1.5-second sleep between each batch. tqdm wraps the loop to give live progress in the notebook.
results = []
BATCH_SIZE = 8
inputs = []
for index, article in tqdm(enumerate(articles[:1000])):
inputs.append(
json.dumps({"id": ids[index], "article": articles[index]})
)
if len(inputs) == BATCH_SIZE:
time.sleep(1.5) # respect rate limits
response = chain.batch(inputs)
results += response
inputs = [] # reset buffer
# Flush any remaining articles in the last partial batch
if inputs:
response = chain.batch(inputs)
results += responsetenacity.
Step 8
Parse results into a DataFrame
Not every LLM response will be valid JSON — network hiccups, model refusals, and malformed outputs all happen at scale. The pattern below separates successful parses from failures so you can inspect and retry the failures without losing the successful results.
success = []
failure = []
for output in results:
content = output.content
try:
content = json.loads(content)
success.append(content)
except ValueError as e:
failure.append(content) # keep for retry / inspection
print(f"Success: {len(success)} | Failure: {len(failure)}")
# Convert to DataFrame for analysis / export
df = pd.DataFrame(success)
df.head(10)Sample Output
What the pipeline produces
All 50 available classification categories:
Interview Prep
Cheat sheet — quick definitions to remember
What is LangChain and what problem does it solve?
What is a ChatPromptTemplate?
{input} placeholder gets filled at runtime. Separating instructions from data is a core prompt engineering best practice.
Why use
.batch() instead of looping .invoke()?.batch() sends multiple requests concurrently using asyncio under the hood, while .invoke() is sequential. For 8 articles, batch is roughly 8x faster. The sleep between batches manages rate limits — you get concurrency within a batch, pacing across batches.
Why separate success and failure lists instead of crashing on parse error?
How do you get reliable structured JSON from an LLM?
JsonOutputParser) for automatic parsing and retry. (3) Validate with Pydantic — define a model and parse the JSON through it to catch type errors.
Why use Databricks for this pipeline?
pandas_udf. It also integrates with Delta Lake for storing results, MLflow for experiment tracking, and Unity Catalog for data governance.
How would you scale this to 10 million articles?
time.sleep() with exponential backoff via tenacity. (3) Use LangChain’s async batch with chain.abatch() and asyncio for maximum concurrency per node.