GEPA Pareto 前沿机制
b1babo
2026年4月19日
2026年4月19日
GEPA Pareto 前沿机制
Pareto 前沿是 GEPA 的核心机制之一,用于维护多个在不同任务子集上表现最优的候选者。
什么是 Pareto 前沿?
Pareto 前沿包含一组非支配的候选者:
- 非支配:没有其他候选者在所有任务上都更好
- 多样性:每个候选者在不同子集上表现优秀
- 灵活性:可以从不同起点探索优化空间
重要:分数 vs Loss
神经网络: Loss 越低越好 ↓
──────────────────
GEPA: 分数越高越好 ↑| 系统 | 指标 | 目标 | 范围 |
|---|---|---|---|
| 神经网络 | Loss | 最小化 | 0 ~ ∞ |
| GEPA | Score | 最大化 | 0.0 ~ 1.0 |
Pareto 前沿维护的是:在每个验证样本上分数最高的候选者集合。
# 例如:样本0上的 Pareto 前沿
pareto_front_valset["0"] = 0.9 # 最高分数
program_at_pareto_front_valset["0"] = {1, 3} # 候选者1和3都达到0.9分
# 分数说明:
# 0.9 = 90% 的任务正确完成(例如:9/10样本正确)
# 1.0 = 全部正确
# 0.0 = 全部错误 分数 (Score)
↑
│ ● Candidate B
│ (0.9, 高准确率)
│
│ ● Candidate A
│ (0.7, 较快)
│
└─────────────────→ 成本/时间
Pareto 前沿 = {A, B}
- A 在样本集合 S₁ 上分数最高 (例如数学题)
- B 在样本集合 S₂ 上分数最高 (例如语文题)Pareto 前沿类型
GEPA 支持四种 Pareto 前沿类型:
1. Instance (默认)
frontier_type = "instance"- 按验证样本分组
- 每个样本维护最优候选者
- 适合:样本难度差异大的任务
# 例如:
{
"sample_1": {candidate_2, candidate_5}, # 在样本1上最好
"sample_2": {candidate_1, candidate_3}, # 在样本2上最好
"sample_3": {candidate_2}, # 在样本3上最好
}2. Objective
frontier_type = "objective"- 按目标指标分组
- 每个目标维护最优候选者
- 适合:多目标优化
# 例如:
{
"accuracy": {candidate_1, candidate_2},
"latency": {candidate_3},
"cost": {candidate_2, candidate_4}
}3. Hybrid
frontier_type = "hybrid"- 结合 instance 和 objective
- 按样本或目标分组
4. Cartesian
frontier_type = "cartesian"- 按**(样本, 目标)**对分组
- 最细粒度的跟踪
候选者选择策略
Pareto 选择 (默认)
from gepa.strategies import ParetoCandidateSelector
selector = ParetoCandidateSelector(rng=random.Random(0))
candidate_idx = selector.select_candidate_idx(state)从 Pareto 前沿中随机选择一个候选者。
当前最佳选择
from gepa.strategies import CurrentBestCandidateSelector
selector = CurrentBestCandidateSelector()
candidate_idx = selector.select_candidate_idx(state)选择在验证集上平均分数最高的候选者。
Epsilon-Greedy 选择
from gepa.strategies import EpsilonGreedyCandidateSelector
selector = EpsilonGreedyCandidateSelector(
epsilon=0.1, # 10% 概率随机选择
rng=random.Random(0)
)以 ε 概率随机选择,否则选择最佳候选者。
Top-K Pareto 选择
from gepa.strategies import TopKParetoCandidateSelector
selector = TopKParetoCandidateSelector(
k=5, # 只从前 5 名中选择
rng=random.Random(0)
)从 Pareto 前沿的前 K 名中选择。
如何选择"最好"的候选者?
Pareto 前沿确实可能包含多个候选者,但没有单一的"最好",因为每个候选者在不同样本上表现不同。选择策略取决于你的目标:
策略 1: 平均分数最高(推荐用于通用场景)
# 计算每个候选者在验证集上的平均分数
best_idx = None
best_avg_score = -1
for idx in range(len(state.all_candidates)):
avg_score = state.get_avg_val_score(idx)
if avg_score > best_avg_score:
best_avg_score = avg_score
best_idx = idx
best_candidate = state.all_candidates[best_idx]使用场景:需要一个在所有任务上平均表现最好的候选者
示例:
候选者 A: [0.9, 0.9, 0.9, 0.9] → 平均 0.9
候选者 B: [1.0, 1.0, 0.5, 0.5] → 平均 0.75
选择: A(虽然 B 在某些样本上更好,但 A 更稳定)策略 2: 特定任务专用
# 选择在你关心的任务类型上表现最好的
task_type = "math" # 例如:只关心数学题
best_idx = None
best_score = -1
for idx in range(len(state.all_candidates)):
# 只看数学样本的分数
math_scores = [state.val_scores[idx][str(i)]
for i in math_sample_indices]
avg_math_score = sum(math_scores) / len(math_scores)
if avg_math_score > best_score:
best_score = avg_math_score
best_idx = idx使用场景:你的系统主要处理特定类型的任务
策略 3: 鲁棒性优先
# 选择最小分数最高的(最坏情况下的最好表现)
best_idx = None
best_min_score = -1
for idx in range(len(state.all_candidates)):
min_score = min(state.val_scores[idx].values())
if min_score > best_min_score:
best_min_score = min_score
best_idx = idx示例:
候选者 A: [0.8, 0.8, 0.8, 0.8] → 最小 0.8
候选者 B: [1.0, 1.0, 0.3, 0.3] → 最小 0.3
选择: A(更鲁棒,不会出现特别差的情况)策略 4: 集成使用多个候选者
# 保留整个 Pareto 前沿,运行时动态选择
pareto_front = state.get_pareto_front_mapping()
def select_candidate_for_input(input_text):
# 根据输入特征选择最合适的候选者
if is_math_problem(input_text):
return pareto_front["math_sample_0"].pop()
else:
return pareto_front["other_sample_0"].pop()使用场景:可以在运行时根据输入动态选择候选者
优化完成后的最终选择
import gepa
result = gepa.optimize(
seed_candidate={"prompt": "..."},
trainset=train_data,
valset=val_data,
max_metric_calls=150
)
# 方式1: 直接使用最佳候选者(默认,基于平均分数)
best_candidate = result.best_program
# 方式2: 从状态中手动选择
state = result.state
# 找平均分数最高的
best_idx = max(
range(len(state.all_candidates)),
key=lambda i: state.get_avg_val_score(i)
)
best_candidate = state.all_candidates[best_idx]
# 方式3: 探索整个 Pareto 前沿
pareto_mapping = state.get_pareto_front_mapping()
# 根据你的需求选择合适的候选者候选者数量的实际情况
虽然理论上前沿可能很大,但实际情况中:
验证集大小: 100 个样本
前沿类型: instance
理论最大值: 每个样本有不同的最优者 → 100 个候选者
实际情况: 通常 5-20 个候选者就能覆盖前沿原因:
- 优秀候选者往往在多个样本上都表现好
- 接受标准(必须比父代更好)限制了增长速度
- 合并策略会整合不同候选者的优势
推荐做法
| 场景 | 推荐策略 |
|---|---|
| 通用系统 | 平均分数最高 |
| 专用系统 | 特定任务分数最高 |
| 关键应用 | 最小分数最高(最鲁棒) |
| 研究/分析 | 保留整个前沿,分析差异 |
Pareto 前沿更新流程
┌─────────────────────────────────────────────────────────────────────────┐
│ Pareto 前沿更新流程 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. 新候选者被接受 │
│ └→ 在训练集 minibatch 上表现更好 │
│ │
│ 2. 在验证集上完整评估 │
│ └→ 计算每个样本的分数 │
│ │
│ 3. 更新 Pareto 前沿 │
│ └→ 比较新候选者与现有前沿 │
│ │
│ 4. 被替代的候选者 │
│ └→ 从前沿移除,但保留在候选池中 │
│ │
│ 5. 下一轮选择 │
│ └→ 从更新后的 Pareto 前沿选择 │
│ │
└─────────────────────────────────────────────────────────────────────────┘Pareto 前沿更新详解
数据结构
Pareto 前沿使用两个核心数据结构:
# src/gepa/core/state.py
# 1. 记录每个验证样本上的最高分数
pareto_front_valset: dict[str, float] = {}
# 例如: {0: 0.8, 1: 1.0, 2: 0.6}
# key: 验证样本索引, value: 该样本上的最高分数
# 2. 记录每个验证样本上达到最高分数的候选者集合
program_at_pareto_front_valset: dict[str, set[int]] = {}
# 例如: {0: {1, 3}, 1: {2}, 2: {0, 1, 3}}
# key: 验证样本索引, value: 候选者索引集合更新函数实现
def _update_pareto_front_for_val_id(
self,
val_id: str, # 验证样本索引
score: float, # 新候选者在该样本上的分数
program_idx: int # 新候选者的索引
) -> tuple[bool, bool]:
"""
更新单个验证样本的 Pareto 前沿。
返回: (is_pareto_updated, is_new_pareto)
- is_pareto_updated: 前沿是否发生变化
- is_new_pareto: 新候选者是否进入前沿
"""
# 获取该样本当前的最高分数
prev_score = self.pareto_front_valset.get(val_id, float("-inf"))
if score > prev_score:
# 情况1: 新候选者表现更好,完全替换
self.pareto_front_valset[val_id] = score
self.program_at_pareto_front_valset[val_id] = {program_idx}
return True, True
elif score == prev_score:
# 情况2: 与现有最高分数持平,加入集合
pareto_front = self.program_at_pareto_front_valset.setdefault(val_id, set())
is_new = program_idx not in pareto_front
pareto_front.add(program_idx)
return True, is_new
else:
# 情况3: 分数更低,不更新前沿
return False, False完整更新流程
当一个新候选者(索引为 5)通过接受测试后,在验证集上进行评估:
# 1. 在验证集上评估新候选者
val_scores = [0.9, 0.7, 0.8, 0.6, 0.9] # 候选者5在5个验证样本上的分数
# 2. 逐个更新 Pareto 前沿
for val_id, score in enumerate(val_scores):
is_updated, is_new = state._update_pareto_front_for_val_id(
val_id=str(val_id),
score=score,
program_idx=5
)具体示例
假设当前 Pareto 前沿状态:
# 当前状态
pareto_front_valset = {
"0": 0.8, # 样本0最高分: 0.8
"1": 1.0, # 样本1最高分: 1.0
"2": 0.6 # 样本2最高分: 0.6
}
program_at_pareto_front_valset = {
"0": {1, 3}, # 样本0: 候选者1和3达到0.8
"1": {2}, # 样本1: 候选者2达到1.0
"2": {0} # 样本2: 候选者0达到0.6
}新候选者(索引5)在验证集上的分数:[0.9, 0.7, 0.6]
更新样本0 (score=0.9 > 0.8):
# 新分数更高,替换
pareto_front_valset["0"] = 0.9
program_at_pareto_front_valset["0"] = {5}
# 候选者1和3被从前沿移除更新样本1 (score=0.7 < 1.0):
# 新分数更低,不更新
# 前沿保持不变: {2}更新样本2 (score=0.6 == 0.6):
# 新分数持平,加入集合
program_at_pareto_front_valset["2"] = {0, 5}
# 候选者0和5并列最高最终状态:
pareto_front_valset = {
"0": 0.9, # 提高了!
"1": 1.0, # 不变
"2": 0.6 # 不变
}
program_at_pareto_front_valset = {
"0": {5}, # 被替换
"1": {2}, # 不变
"2": {0, 5} # 扩展
}验证集评估的完整代码
# src/gepa/core/engine.py
def _evaluate_and_update_pareto_front_for_new_program(
self,
new_candidate: dict[str, str],
new_candidate_idx: int,
state: GEPAState
) -> None:
"""评估新候选者并更新 Pareto 前沿"""
# 1. 在验证集上评估
val_batch = self.valset # 完整验证集
val_eval_result = self.adapter.evaluate(
batch=val_batch,
candidate=new_candidate
)
# 2. 更新每个验证样本的分数记录
for val_id, score in enumerate(val_eval_result.scores):
state.update_val_score(
program_idx=new_candidate_idx,
val_id=str(val_id),
score=score
)
# 3. 更新 Pareto 前沿
for val_id, score in enumerate(val_eval_result.scores):
state._update_pareto_front_for_val_id(
val_id=str(val_id),
score=score,
program_idx=new_candidate_idx
)关键要点
- 逐样本更新:Pareto 前沿是针对每个验证样本独立更新的,不是基于平均分数
- 更高则替换,相等则加入:
- 新分数 > 旧最高分:完全替换
- 新分数 == 旧最高分:加入集合
- 新分数 < 旧最高分:不更新
- 被替换的候选者:从前沿移除但保留在候选池中,可能在其他样本上仍然是 Pareto 最优
- 多候选者前沿:同一个样本可以有多个候选者并列最高分
代码示例
使用 Pareto 前沿
import gepa
result = gepa.optimize(
seed_candidate={"prompt": "You are helpful."},
trainset=train_data,
valset=val_data,
# Pareto 配置
frontier_type="instance", # 前沿类型
candidate_selection_strategy="pareto", # 选择策略
# 其他配置
max_metric_calls=150
)查看前沿
# 获取 Pareto 前沿
pareto_front = result.state.get_pareto_front_mapping()
print("Pareto 前沿:")
for key, candidates in pareto_front.items():
print(f" {key}: {candidates}")为什么需要 Pareto 前沿?
1. 避免局部最优
没有 Pareto 前沿:
Candidate A (平均 0.7) → Candidate B (平均 0.75) → Candidate C (平均 0.8)
↑
陷入局部最优
有 Pareto 前沿:
Pareto 前沿 = {A, B, C, D, E}
→ 从多个方向探索
→ 更可能找到全局最优2. 利用互补优势
Candidate A: 擅长数学题,不擅长语文题
Candidate B: 擅长语文题,不擅长数学题
Pareto 前沿 = {A, B}
通过合并 A 和 B 的优势 → Candidate C (两者都擅长)3. 处理异构数据
# 不同类型的样本
samples = [
{"type": "math", "input": "1+1=?", ...},
{"type": "coding", "input": "Write code...", ...},
{"type": "writing", "input": "Write essay...", ...}
]
# Pareto 前沿
{
"math_sample_1": {candidate_3}, # 数学专家
"coding_sample_2": {candidate_1}, # 编程专家
"writing_sample_3": {candidate_5} # 写作专家
}合并策略
当两个候选者在不同任务上表现优秀时,可以合并它们:
result = gepa.optimize(
seed_candidate={"prompt": "..."},
trainset=train_data,
valset=val_data,
# 启用合并
use_merge=True,
max_merge_invocations=5, # 最多合并 5 次
merge_val_overlap_floor=5, # 至少 5 个共享样本
max_metric_calls=150
)关键优势
| 特性 | 单一候选者 | Pareto 前沿 |
|---|---|---|
| 探索能力 | 局限 | 全局 |
| 鲁棒性 | 低 | 高 |
| 多样性 | 单一 | 多样 |
| 合并机会 | 无 | 有 |
类比神经网络
| 概念 | 神经网络 | GEPA |
|---|---|---|
| 模型选择 | 单一模型 | 候选者集合 |
| 优化目标 | 单点最优 | Pareto 最优 |
| 集成学习 | Model Ensemble | Pareto 前沿 + 合并 |
| 过拟合 | 容易 | 通过多样性缓解 |
监控 Pareto 前沿
from gepa.core.callbacks import GEPACallback
class ParetoMonitor(GEPACallback):
def on_pareto_front_updated(self, event):
print(f"Iteration {event['iteration']}:")
print(f" New front: {event['new_front']}")
print(f" Displaced: {event['displaced_candidates']}")
# 使用
result = gepa.optimize(
...,
callbacks=[ParetoMonitor()]
)最佳实践
- 选择合适的前沿类型
- 小数据集:
instance - 多目标:
objective - 复杂任务:
hybrid
- 小数据集:
- 平衡探索与利用
- 探索:
pareto选择 - 利用:
current_best选择 - 混合:
epsilon_greedy选择
- 探索:
- 合理配置合并
- 确保有足够的共享样本
- 避免过早合并
- 监控合并效果
下一步
- 与神经网络对比 - 深入理解 GEPA 的设计理念
- 候选者选择策略 - 详细了解选择策略
评论