In this tutorial, we take a deep dive into the TaskTrove dataset on Hugging Face and build a complete, practical workflow to efficiently explore it. Instead of downloading the full multi-gigabyte dataset, we stream it directly and work with individual samples in real time. We begin by setting up the environment and inspecting the raw structure of the dataset, focusing on how each task is stored as a compressed binary blob. We then implement robust parsing logic to decode these binaries into meaningful formats such as tar archives, zip files, JSON, or plain text. Along the way, we analyze file structures, inspect metadata, and build utilities to better understand the contents of each task.
import subprocess, sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "-U",
"datasets", "huggingface_hub", "polars", "pandas",
"matplotlib", "seaborn", "tqdm", "pyarrow"])
import os, io, gzip, json, tarfile, zipfile, base64, re, warnings
from pathlib import Path
from collections import Counter, defaultdict
from typing import Any, Dict, Iterator, List, Optional, Union
import numpy as np
import pandas as pd
import polars as pl
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.auto import tqdm
from datasets import load_dataset
from huggingface_hub import HfApi
warnings.filterwarnings("ignore")
plt.rcParams["figure.dpi"] = 110
sns.set_style("whitegrid")
sns.set_palette("mako_r")
DATASET_ID = "open-thoughts/TaskTrove"
print("✓ environment ready")
ds_test = load_dataset(DATASET_ID, split="test", streaming=True)
ds_validation = load_dataset(DATASET_ID, split="validation", streaming=True)
first = next(iter(ds_test))
print("Keys :", list(first.keys()))
print("path :", first["path"])
print("task_binary type :", type(first["task_binary"]).__name__)
print("task_binary length:", len(first["task_binary"]), "bytes")
We set up the entire environment by installing all required libraries and importing the necessary modules. We configure visualization settings and initialize the dataset streaming pipeline to reduce download sizes. We also inspect the first sample to understand the dataset’s structure and key fields.
def to_bytes(blob) -> bytes:
"""Coerce whatever `datasets` gives us into raw bytes."""
if isinstance(blob, (bytes, bytearray)):
return bytes(blob)
if isinstance(blob, list):
return bytes(blob)
if isinstance(blob, str):
try:
return base64.b64decode(blob)
except Exception:
return blob.encode("utf-8", errors="replace")
return bytes(blob)
def parse_task(blob) -> Dict[str, Any]:
"""gunzip + auto-detect tar / zip / json / jsonl / text / binary."""
raw = to_bytes(blob)
compressed_size = len(raw)
data = gzip.decompress(raw) if raw[:2] == b"x1fx8b" else raw
raw_size = len(data)
bio = io.BytesIO(data)
try:
with tarfile.open(fileobj=bio) as tar:
files: Dict[str, Union[str, bytes]] = {}
for m in tar.getmembers():
if not m.isfile():
continue
f = tar.extractfile(m)
if f is None:
continue
content = f.read()
try:
files[m.name] = content.decode("utf-8")
except UnicodeDecodeError:
files[m.name] = content
if files:
return {"format": "tar", "files": files,
"raw_size": raw_size, "compressed_size": compressed_size}
except tarfile.TarError:
pass
bio.seek(0)
try:
with zipfile.ZipFile(bio) as zf:
files = {}
for name in zf.namelist():
if name.endswith("/"):
continue
with zf.open(name) as zh:
content = zh.read()
try:
files[name] = content.decode("utf-8")
except UnicodeDecodeError:
files[name] = content
return {"format": "zip", "files": files,
"raw_size": raw_size, "compressed_size": compressed_size}
except zipfile.BadZipFile:
pass
try:
text = data.decode("utf-8")
try:
return {"format": "json", "content": json.loads(text),
"raw_size": raw_size, "compressed_size": compressed_size}
except json.JSONDecodeError:
try:
items = [json.loads(l) for l in text.splitlines() if l.strip()]
return {"format": "jsonl", "content": items,
"raw_size": raw_size, "compressed_size": compressed_size}
except json.JSONDecodeError:
return {"format": "text", "content": text,
"raw_size": raw_size, "compressed_size": compressed_size}
except UnicodeDecodeError:
return {"format": "binary", "content": data,
"raw_size": raw_size, "compressed_size": compressed_size}
raw = to_bytes(first["task_binary"])
print("First 16 bytes (hex):", raw[:16].hex(" "))
task = parse_task(first["task_binary"])
print(f"Format : {task['format']}")
print(f"Compressed size : {task['compressed_size']:>10,} bytes")
print(f"Decompressed size : {task['raw_size']:>10,} bytes")
if task["format"] in ("tar", "zip"):
print(f"Members : {len(task['files'])}")
for name in list(task["files"])[:10]:
body = task["files"][name]
size = len(body) if isinstance(body, (str, bytes)) else 0
print(f" {name:<60} {size:>8} bytes")
We build robust utilities to convert raw task binaries into usable byte formats and parse them intelligently. We handle multiple formats like tar, zip, JSON, JSONL, and plain text using a unified parsing function. We then decode and inspect a sample task to understand its structure and size characteristics.
def show_task(task: Dict[str, Any], json_chars: int = 1500, code_chars: int = 600) -> None:
print("═" * 70)
print(f"FORMAT: {task['format']} | compressed {task['compressed_size']:,} → "
f"raw {task['raw_size']:,} bytes")
print("═" * 70)
if task["format"] not in ("tar", "zip"):
print(task.get("content", "<binary>"))
return
files = task["files"]
by_ext: Dict[str, List[str]] = defaultdict(list)
for name in files:
by_ext[Path(name).suffix.lower() or "<no-ext>"].append(name)
print("nFile-type breakdown:")
for ext, names in sorted(by_ext.items(), key=lambda x: -len(x[1])):
print(f" {ext:<10} {len(names):>4} file(s)")
meta = [n for n in files if n.lower().endswith((".json", ".yaml", ".yml", ".toml"))]
code = [n for n in files if n.endswith(".py")]
for name in meta[:3]:
print(f"n--- {name} ---")
body = files[name]
if isinstance(body, str):
try:
pretty = json.dumps(json.loads(body), indent=2)[:json_chars]
except json.JSONDecodeError:
pretty = body[:json_chars]
print(pretty)
if len(body) > json_chars:
print(f"… ({len(body)-json_chars:,} more chars)")
for name in code[:2]:
print(f"n--- {name} ---")
body = files[name]
if isinstance(body, str):
print(body[:code_chars])
if len(body) > code_chars:
print(f"… ({len(body)-code_chars:,} more chars)")
show_task(task)
def source_of(path: str) -> str:
return path.rsplit("-", 1)[0] if "-" in path else path
source_counts: Counter = Counter()
compressed_sizes: List[int] = []
for row in tqdm(ds_test, desc="counting paths"):
source_counts[source_of(row["path"])] += 1
compressed_sizes.append(len(row["task_binary"]))
print(f"nUnique source prefixes: {len(source_counts)}")
print("Top 15 sources:")
for src, n in source_counts.most_common(15):
print(f" {n:>6} {src}")
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
TOP_N = 15
top = source_counts.most_common(TOP_N)
labels = [s for s, _ in top]
values = [n for _, n in top]
axes[0].barh(range(len(labels)), values, color=sns.color_palette("mako_r", len(labels)))
axes[0].set_yticks(range(len(labels)))
axes[0].set_yticklabels(labels, fontsize=9)
axes[0].invert_yaxis()
axes[0].set_xlabel("number of tasks")
axes[0].set_title(f"Top {TOP_N} sources in test split", fontweight="bold")
for i, v in enumerate(values):
axes[0].text(v, i, f" {v:,}", va="center", fontsize=8)
axes[1].hist(np.array(compressed_sizes) / 1024, bins=50,
color=sns.color_palette("mako_r")[2], edgecolor="white")
axes[1].set_xscale("log")
axes[1].set_xlabel("compressed size (KB, log scale)")
axes[1].set_ylabel("# tasks")
axes[1].set_title("Distribution of compressed task sizes", fontweight="bold")
p50 = np.median(compressed_sizes) / 1024
p95 = np.percentile(compressed_sizes, 95) / 1024
axes[1].axvline(p50, color="crimson", linestyle="--", alpha=0.7, label=f"median = {p50:.1f} KB")
axes[1].axvline(p95, color="orange", linestyle="--", alpha=0.7, label=f"p95 = {p95:.1f} KB")
axes[1].legend()
plt.tight_layout()
plt.show()
We create a detailed visualization of each task by printing structured file breakdowns and previews. We analyze the dataset distribution by counting source prefixes and measuring compressed task sizes. We also generate plots to better understand the dataset composition and size distribution.
filename_counter: Counter = Counter()
all_json_keys: Counter = Counter()
samples_for_show: List = []
for i, row in enumerate(tqdm(ds_test, desc="inspecting structure", total=200)):
if i >= 200:
break
p = parse_task(row["task_binary"])
if p["format"] in ("tar", "zip"):
for name, body in p["files"].items():
filename_counter[name] += 1
if name.endswith(".json") and isinstance(body, str):
try:
obj = json.loads(body)
if isinstance(obj, dict):
for k in obj.keys():
all_json_keys[k] += 1
except Exception:
pass
if len(samples_for_show) < 2:
samples_for_show.append((row["path"], p))
print("nMost common filenames inside task archives:")
for name, n in filename_counter.most_common(15):
print(f" {n:>4} {name}")
print("nMost common top-level JSON keys (across any *.json):")
for k, n in all_json_keys.most_common(20):
print(f" {n:>4} {k}")
if samples_for_show:
print(f"nFull file listing for one sample task ({samples_for_show[0][0]}):")
for name, body in samples_for_show[0][1]["files"].items():
sz = len(body) if isinstance(body, (str, bytes)) else 0
print(f" {name} ({sz:,} B)")
VERIFIER_FILE_PATTERNS = ("verifier", "verify", "grader", "judge", "score", "eval")
VERIFIER_JSON_KEYS = ("verifier", "verifier_config", "judge", "grader",
"rubric", "test_patch", "FAIL_TO_PASS", "tests")
def has_verifier(parsed: Dict[str, Any]) -> bool:
"""Detect verifiers via filename, JSON content, or both."""
if parsed["format"] not in ("tar", "zip"):
c = parsed.get("content")
if isinstance(c, dict):
return any(k in c for k in VERIFIER_JSON_KEYS)
return False
files = parsed["files"]
for name in files:
low = name.lower()
if any(pat in low for pat in VERIFIER_FILE_PATTERNS):
return True
for name, body in files.items():
if name.endswith((".json", ".yaml", ".yml")) and isinstance(body, str):
try:
obj = json.loads(body)
if isinstance(obj, dict) and any(k in obj for k in VERIFIER_JSON_KEYS):
return True
except Exception:
pass
low = body.lower()
if "verifier" in low or "test_patch" in low:
return True
return False
class TaskTroveExplorer:
"""High-level interface to the open-thoughts/TaskTrove dataset."""
def __init__(self, split: str = "test", dataset_id: str = DATASET_ID):
self.dataset_id = dataset_id
self.split = split
self._ds = load_dataset(dataset_id, split=split, streaming=True)
def iter(self, limit: Optional[int] = None,
source_filter: Optional[str] = None) -> Iterator[Dict[str, Any]]:
rx = re.compile(source_filter) if source_filter else None
n = 0
for row in self._ds:
if rx and not rx.search(source_of(row["path"])):
continue
yield row
n += 1
if limit is not None and n >= limit:
return
def sample(self, n: int = 5,
source_filter: Optional[str] = None) -> List[Dict[str, Any]]:
out = []
for row in self.iter(limit=n, source_filter=source_filter):
parsed = parse_task(row["task_binary"])
parsed["path"] = row["path"]
parsed["source"] = source_of(row["path"])
out.append(parsed)
return out
def summary(self, limit: int = 1000,
source_filter: Optional[str] = None) -> pd.DataFrame:
rows = []
for row in self.iter(limit=limit, source_filter=source_filter):
parsed = parse_task(row["task_binary"])
rows.append({
"source": source_of(row["path"]),
"compressed": parsed["compressed_size"],
"raw": parsed["raw_size"],
"format": parsed["format"],
"n_files": len(parsed.get("files", {})),
"has_verifier": has_verifier(parsed),
})
df = pd.DataFrame(rows)
if df.empty:
return df
return (df.groupby("source")
.agg(n=("compressed", "count"),
mean_compressed_kb=("compressed", lambda s: s.mean()/1024),
mean_raw_kb=("raw", lambda s: s.mean()/1024),
mean_n_files=("n_files", "mean"),
verifier_rate=("has_verifier", "mean"))
.round(2)
.sort_values("n", ascending=False))
@staticmethod
def has_verifier(parsed: Dict[str, Any]) -> bool:
return has_verifier(parsed)
def export(self, output_dir: Union[str, Path], n: int = 10,
source_filter: Optional[str] = None) -> Path:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
for parsed in self.sample(n=n, source_filter=source_filter):
slug = parsed["path"].replace("/", "_")
tdir = output_dir / slug
tdir.mkdir(exist_ok=True)
if parsed["format"] in ("tar", "zip"):
for name, body in parsed["files"].items():
out = tdir / name
out.parent.mkdir(parents=True, exist_ok=True)
if isinstance(body, str):
out.write_text(body, encoding="utf-8")
else:
out.write_bytes(body)
else:
content = parsed.get("content", b"")
if isinstance(content, (dict, list)):
(tdir / "task.json").write_text(json.dumps(content, indent=2))
elif isinstance(content, str):
(tdir / "task.txt").write_text(content)
else:
(tdir / "task.bin").write_bytes(content)
print(f"✓ exported tasks to {output_dir.resolve()}")
return output_dir
explorer = TaskTroveExplorer(split="test")
print("nSample of 3 parsed tasks:")
for s in explorer.sample(n=3):
print(f"path: {s['path']} | source: {s['source']} | format: {s['format']} | "
f"files: {len(s.get('files', {}))} | verifier: {has_verifier(s)}")
We deeply inspect the internal structure of tasks by analyzing filenames and extracting common JSON keys. We implement a multi-signal verifier detection system to identify tasks suitable for evaluation or RL workflows. We also build a reusable explorer class that allows us to sample, summarize, and export tasks efficiently.
summary = explorer.summary(limit=1000)
print(f"nSummary across {len(summary)} sources (1000 sampled rows):")
print(summary.head(20))
if not summary.empty:
top_sources = summary.head(12)
fig, ax = plt.subplots(figsize=(11, 6))
x = np.arange(len(top_sources))
w = 0.4
ax.bar(x - w/2, top_sources["mean_compressed_kb"], w, label="compressed (KB)",
color=sns.color_palette("mako_r")[2])
ax.bar(x + w/2, top_sources["mean_raw_kb"], w, label="decompressed (KB)",
color=sns.color_palette("mako_r")[5])
ax.set_xticks(x)
ax.set_xticklabels(top_sources.index, rotation=40, ha="right", fontsize=9)
ax.set_ylabel("size (KB)")
ax.set_yscale("log")
ax.set_title("Mean task size by source (top 12 by row count)", fontweight="bold")
ax.legend()
plt.tight_layout()
plt.show()
fig, ax = plt.subplots(figsize=(11, 5))
vs = summary.head(15)["verifier_rate"].sort_values()
colors = sns.color_palette("RdYlGn", as_cmap=True)(vs.values)
ax.barh(range(len(vs)), vs.values, color=colors)
ax.set_yticks(range(len(vs)))
ax.set_yticklabels(vs.index, fontsize=9)
ax.set_xlabel("fraction of tasks with verifier signal")
ax.set_xlim(0, 1)
ax.set_title("Verifier presence by sourcen(green = verified ⇒ usable for RL)",
fontweight="bold")
for i, v in enumerate(vs.values):
ax.text(min(v + 0.01, 0.97), i, f"{v:.0%}", va="center", fontsize=9)
plt.tight_layout()
plt.show()
verified_task = None
for row in tqdm(ds_test, desc="hunting for a verified task"):
parsed = parse_task(row["task_binary"])
if has_verifier(parsed):
parsed["path"] = row["path"]
parsed["source"] = source_of(row["path"])
verified_task = parsed
break
if verified_task is None:
print("No verified task found in test split — try the validation split.")
else:
print(f"Found verified task: {verified_task['path']}")
print(f"Source : {verified_task['source']}")
if verified_task["format"] in ("tar", "zip"):
candidates = []
for n in verified_task["files"]:
low = n.lower()
score = sum(p in low for p in VERIFIER_FILE_PATTERNS)
if n.endswith((".json", ".yaml", ".yml", ".py")):
score += 1
candidates.append((score, n))
candidates.sort(reverse=True)
for _, name in candidates[:2]:
body = verified_task["files"][name]
if isinstance(body, str):
print(f"n--- {name} ({len(body):,} chars) ---")
print(body[:2000])
if len(body) > 2000:
print(f"… ({len(body)-2000:,} more chars)")
EXPORT_DIR = Path("/content/tasktrove_export") if Path("/content").exists()
else Path("./tasktrove_export")
EXPORT_DIR.mkdir(exist_ok=True)
explorer.export(EXPORT_DIR, n=5)
for task_dir in sorted(EXPORT_DIR.iterdir())[:3]:
print("─" * 60)
print(task_dir.name)
for sub in sorted(task_dir.rglob("*"))[:8]:
if sub.is_file():
print(f" {sub.relative_to(task_dir)} ({sub.stat().st_size:,} B)")
rows: List[Dict[str, Any]] = []
MAX_TASKS = 500
n_seen = 0
for row in tqdm(ds_test, desc="building slice", total=MAX_TASKS):
parsed = parse_task(row["task_binary"])
n_seen += 1
src = source_of(row["path"])
is_verified = has_verifier(parsed) or "verifier" in src.lower()
files = parsed.get("files", {})
instruction = ""
for name in files:
if name.endswith((".json", ".md", ".txt")) and isinstance(files[name], str):
if len(files[name]) > len(instruction):
instruction = files[name]
rows.append({
"path": row["path"],
"source": src,
"is_verified": bool(is_verified),
"n_files": len(files),
"compressed_kb": parsed["compressed_size"] / 1024,
"raw_kb": parsed["raw_size"] / 1024,
"instruction_preview": instruction[:300],
})
if len(rows) >= MAX_TASKS:
break
df = pl.DataFrame(rows)
print(f"nInspected {n_seen} rows, kept {len(df)} total "
f"({df['is_verified'].sum() if len(df) else 0} flagged verified)")
if len(df):
print(df.head(5))
if len(df) == 0:
print("Empty slice — nothing to aggregate or save.")
else:
grouped = (df.group_by("source")
.agg([pl.len().alias("n"),
pl.col("is_verified").sum().alias("n_verified"),
pl.col("raw_kb").mean().round(1).alias("mean_raw_kb"),
pl.col("n_files").mean().round(1).alias("mean_n_files")])
.sort("n", descending=True))
print("nSlice composition by source:")
print(grouped)
out_path = (Path("/content") if Path("/content").exists() else Path("."))
/ "tasktrove_slice.parquet"
df.write_parquet(out_path)
print(f"n✓ wrote {len(df)} rows to {out_path} "
f"({out_path.stat().st_size/1024:.1f} KB)")
api = HfApi()
files = api.list_repo_files(repo_id=DATASET_ID, repo_type="dataset")
subdirs = sorted({f.split("/", 1)[0] for f in files
if "/" in f and "__" in f.split("/", 1)[0]})
print(f"nFound {len(subdirs)} source-dataset subdirectories. First 25:")
for s in subdirs[:25]:
print(" ", s)
We aggregate statistics across sources and visualize key metrics, such as task size and verifier presence. We identify and inspect a verified task to understand how evaluation signals are structured. Finally, we build a clean dataset slice, export it, and prepare it for downstream analysis or modeling workflows.
In conclusion, we constructed a comprehensive pipeline to explore, analyze, and extract value from the TaskTrove dataset. We generated insights into source distributions, task sizes, and internal file patterns, and built mechanisms to detect verifier signals indicating high-quality, evaluation-ready tasks. We also created reusable tools, such as the TaskTroveExplorer class, to sample, summarize, and export tasks for downstream use. Also, we produced a clean, structured dataset slice that can be directly used for research, benchmarking, or reinforcement learning workflows. Through this process, we learn how to handle complex dataset formats efficiently and also establish a scalable approach to working with large, structured AI datasets in real-world scenarios.
Check out the Full Codes with Notebook here. Also, feel free to follow us on Twitter and don’t forget to join our 130k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us
The post A Coding Implementation to Explore and Analyze the TaskTrove Dataset with Streaming Parsing Visualization and Verifier Detection appeared first on MarkTechPost.
