!pip install -q trafilatura datatrove tokenizers transformers datasetsPre-training LLMs from Scratch
A hands-on guide to web scraping, data cleaning, deduplication, tokenizer training, and dataset preparation for LLM pretraining
Table of Contents
1. Setup & Installation
Install the required packages for data collection, cleaning, and tokenization.
2. Web Scraping with Trafilatura
Trafilatura extracts clean text content from web pages, stripping boilerplate (navigation, ads, footers). It is the go-to tool for building text corpora from the web.
import trafilatura
# Fetch and extract content from a single URL
url = "https://en.wikipedia.org/wiki/Large_language_model"
downloaded = trafilatura.fetch_url(url)
# Extract clean text
text = trafilatura.extract(
downloaded,
include_comments=False,
include_tables=True,
output_format="txt",
)
print(f"Extracted {len(text)} characters")
print(text[:500])3. Scraping at Scale
For large-scale data collection, use multithreading and save results in JSONL format for downstream processing.
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
def scrape_url(url):
"""Fetch and extract text from a URL. Returns None on failure."""
try:
downloaded = trafilatura.fetch_url(url)
if downloaded is None:
return None
text = trafilatura.extract(
downloaded,
include_comments=False,
include_tables=True,
)
if text and len(text) > 100:
return {"url": url, "text": text}
except Exception as e:
print(f"Error scraping {url}: {e}")
return None
# Example URLs
urls = [
"https://en.wikipedia.org/wiki/Large_language_model",
"https://en.wikipedia.org/wiki/Transformer_(deep_learning_architecture)",
]
# Scrape in parallel and save as JSONL
results = []
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(scrape_url, url): url for url in urls}
for future in as_completed(futures):
result = future.result()
if result:
results.append(result)
# Save to JSONL
output_path = "scraped_data.jsonl"
with open(output_path, "w") as f:
for item in results:
f.write(json.dumps(item) + "\n")
print(f"Saved {len(results)} documents to {output_path}")4. Using Common Crawl with DataTrove
DataTrove provides scalable pipelines for processing Common Crawl WARC files with built-in extractors and writers.
from datatrove.pipeline.readers import WarcReader
from datatrove.pipeline.writers import JsonlWriter
from datatrove.pipeline.extractors import Trafilatura
# Define pipeline: read WARC -> extract text -> write JSONL
pipeline = [
WarcReader(
data_folder="s3://commoncrawl/crawl-data/CC-MAIN-2024-10",
glob_pattern="*/warc/*",
limit=1000, # Limit for demo
),
Trafilatura(), # Extract clean text from HTML
JsonlWriter(
output_folder="./common_crawl_output",
output_filename="extracted.jsonl",
),
]
print("Pipeline defined with", len(pipeline), "stages")
print("Stages:", [type(stage).__name__ for stage in pipeline])5. Data Cleaning & Filtering
DataTrove includes production-quality filters inspired by the Gopher, C4, and FineWeb pipelines.
from datatrove.pipeline.filters import (
LanguageFilter,
URLFilter,
GopherQualityFilter,
GopherRepetitionFilter,
C4QualityFilter,
)
from datatrove.pipeline.readers import JsonlReader
# Define a cleaning & filtering pipeline
cleaning_pipeline = [
JsonlReader(
data_folder="./common_crawl_output",
glob_pattern="*.jsonl",
),
# Keep only English text
LanguageFilter(language_threshold=0.65),
# Remove known bad URL patterns
URLFilter(),
# Gopher-style quality checks: word count, symbol ratio, etc.
GopherQualityFilter(
min_doc_words=50,
max_doc_words=100_000,
),
# Gopher-style repetition filter
GopherRepetitionFilter(
top_n_grams=(2, 3, 4),
dup_n_grams=(5, 6, 7, 8, 9, 10),
),
# C4-style quality filter
C4QualityFilter(),
# Write cleaned output
JsonlWriter(
output_folder="./cleaned_output",
output_filename="cleaned.jsonl",
),
]
print("Cleaning pipeline defined with", len(cleaning_pipeline), "stages")
for stage in cleaning_pipeline:
print(f" - {type(stage).__name__}")6. Educational Quality Classifier
The FineWeb-Edu classifier scores text on educational quality (0–5). High-scoring documents produce better pretraining data.
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
# Load the FineWeb-Edu classifier
model_name = "HuggingFaceFW/fineweb-edu-classifier"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
def score_educational_quality(text, max_length=512):
"""Score a text's educational quality on a 0-5 scale."""
inputs = tokenizer(
text,
return_tensors="pt",
padding="longest",
truncation=True,
max_length=max_length,
)
with torch.no_grad():
outputs = model(**inputs)
score = outputs.logits.squeeze().item()
return round(score, 2)
# Test with example texts
examples = [
"The mitochondria is the powerhouse of the cell. It produces ATP through oxidative phosphorylation.",
"lol check out this crazy video!! click here now!!!",
"Gradient descent is an optimization algorithm that iteratively adjusts parameters to minimize a loss function.",
]
for text in examples:
score = score_educational_quality(text)
print(f"Score: {score:.2f} | {text[:80]}...")7. MinHash Deduplication
Deduplication removes near-duplicate documents using MinHash locality-sensitive hashing. DataTrove implements this as a 4-stage pipeline.
from datatrove.pipeline.dedup import (
MinhashDedupSignature,
MinhashDedupBuckets,
MinhashDedupCluster,
MinhashDedupFilter,
)
# Stage 1: Compute MinHash signatures for each document
stage1_signatures = MinhashDedupSignature(
output_folder="./minhash/signatures",
n_grams=5,
num_hashes=128,
)
# Stage 2: Group similar signatures into buckets (LSH)
stage2_buckets = MinhashDedupBuckets(
input_folder="./minhash/signatures",
output_folder="./minhash/buckets",
)
# Stage 3: Cluster buckets and identify duplicates
stage3_cluster = MinhashDedupCluster(
input_folder="./minhash/buckets",
output_folder="./minhash/clusters",
)
# Stage 4: Filter out duplicates, keeping one copy per cluster
stage4_filter = MinhashDedupFilter(
input_folder="./minhash/clusters",
output_folder="./deduplicated_output",
)
dedup_stages = [stage1_signatures, stage2_buckets, stage3_cluster, stage4_filter]
print("MinHash dedup pipeline:")
for i, stage in enumerate(dedup_stages, 1):
print(f" Stage {i}: {type(stage).__name__}")8. Training a Custom Tokenizer
Training a domain-specific BPE tokenizer ensures better compression and representation for your corpus.
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, decoders
# Initialize a BPE tokenizer
tokenizer = Tokenizer(models.BPE())
tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(add_prefix_space=False)
tokenizer.decoder = decoders.ByteLevel()
# Configure the BPE trainer
trainer = trainers.BpeTrainer(
vocab_size=32000,
min_frequency=2,
special_tokens=["<|endoftext|>", "<|padding|>", "<|unknown|>"],
show_progress=True,
)
# Train on text files (use your cleaned corpus)
# tokenizer.train(files=["cleaned_corpus.txt"], trainer=trainer)
# For demo: train on a small sample
sample_texts = [
"Large language models are neural networks trained on vast text corpora.",
"Tokenization splits text into subword units for processing.",
"BPE iteratively merges the most frequent character pairs.",
]
tokenizer.train_from_iterator(sample_texts, trainer=trainer)
# Test the tokenizer
encoded = tokenizer.encode("Large language models use tokenization.")
print(f"Vocab size: {tokenizer.get_vocab_size()}")
print(f"Tokens: {encoded.tokens}")
print(f"IDs: {encoded.ids}")
# Save the tokenizer
# tokenizer.save("custom_tokenizer.json")9. Tokenizing a Dataset for Pretraining
DataTrove’s DocumentTokenizer converts cleaned text into token sequences ready for pretraining.
from datatrove.pipeline.tokens import DocumentTokenizer
# Define tokenization pipeline
tokenization_pipeline = [
JsonlReader(
data_folder="./deduplicated_output",
glob_pattern="*.jsonl",
),
DocumentTokenizer(
output_folder="./tokenized_output",
tokenizer_name_or_path="custom_tokenizer.json", # Your trained tokenizer
eos_token="<|endoftext|>",
max_tokens_per_file=1e9, # ~1B tokens per output file
shuffle=True,
),
]
print("Tokenization pipeline:")
for stage in tokenization_pipeline:
print(f" - {type(stage).__name__}")
# Run with: executor = LocalPipelineExecutor(pipeline=tokenization_pipeline, tasks=8)
# executor.run()