b1babo
timeline
keywords
articles
targets
projects
about
b1babo

2026 All Rights Reserved.

  • 关于本站
  • 所有文章
  • 站点地图
  • RSS Feed

Powered by Next.js & Trilium

  • GEPA算法

GEPA Pareto 前沿机制

b1babo
2026年4月19日
2026年4月19日

GEPA Pareto 前沿机制

Pareto 前沿是 GEPA 的核心机制之一,用于维护多个在不同任务子集上表现最优的候选者。

什么是 Pareto 前沿?

Pareto 前沿包含一组非支配的候选者:

  • 非支配:没有其他候选者在所有任务上都更好
  • 多样性:每个候选者在不同子集上表现优秀
  • 灵活性:可以从不同起点探索优化空间

重要:分数 vs Loss

神经网络: Loss 越低越好  ↓
         ──────────────────
GEPA:     分数越高越好  ↑
系统指标目标范围
神经网络Loss最小化0 ~ ∞
GEPAScore最大化0.0 ~ 1.0

Pareto 前沿维护的是:在每个验证样本上分数最高的候选者集合。

# 例如:样本0上的 Pareto 前沿
pareto_front_valset["0"] = 0.9      # 最高分数
program_at_pareto_front_valset["0"] = {1, 3}  # 候选者1和3都达到0.9分

# 分数说明:
# 0.9 = 90% 的任务正确完成(例如:9/10样本正确)
# 1.0 = 全部正确
# 0.0 = 全部错误
          分数 (Score)
              ↑
              │    ● Candidate B
              │   (0.9, 高准确率)
              │
              │         ● Candidate A
              │       (0.7, 较快)
              │
              └─────────────────→ 成本/时间

Pareto 前沿 = {A, B}
- A 在样本集合 S₁ 上分数最高 (例如数学题)
- B 在样本集合 S₂ 上分数最高 (例如语文题)

Pareto 前沿类型

GEPA 支持四种 Pareto 前沿类型:

1. Instance (默认)

frontier_type = "instance"
  • 按验证样本分组
  • 每个样本维护最优候选者
  • 适合:样本难度差异大的任务
# 例如:
{
    "sample_1": {candidate_2, candidate_5},  # 在样本1上最好
    "sample_2": {candidate_1, candidate_3},  # 在样本2上最好
    "sample_3": {candidate_2},               # 在样本3上最好
}

2. Objective

frontier_type = "objective"
  • 按目标指标分组
  • 每个目标维护最优候选者
  • 适合:多目标优化
# 例如:
{
    "accuracy": {candidate_1, candidate_2},
    "latency": {candidate_3},
    "cost": {candidate_2, candidate_4}
}

3. Hybrid

frontier_type = "hybrid"
  • 结合 instance 和 objective
  • 按样本或目标分组

4. Cartesian

frontier_type = "cartesian"
  • 按**(样本, 目标)**对分组
  • 最细粒度的跟踪

候选者选择策略

Pareto 选择 (默认)

from gepa.strategies import ParetoCandidateSelector

selector = ParetoCandidateSelector(rng=random.Random(0))
candidate_idx = selector.select_candidate_idx(state)

从 Pareto 前沿中随机选择一个候选者。

当前最佳选择

from gepa.strategies import CurrentBestCandidateSelector

selector = CurrentBestCandidateSelector()
candidate_idx = selector.select_candidate_idx(state)

选择在验证集上平均分数最高的候选者。

Epsilon-Greedy 选择

from gepa.strategies import EpsilonGreedyCandidateSelector

selector = EpsilonGreedyCandidateSelector(
    epsilon=0.1,  # 10% 概率随机选择
    rng=random.Random(0)
)

以 ε 概率随机选择,否则选择最佳候选者。

Top-K Pareto 选择

from gepa.strategies import TopKParetoCandidateSelector

selector = TopKParetoCandidateSelector(
    k=5,  # 只从前 5 名中选择
    rng=random.Random(0)
)

从 Pareto 前沿的前 K 名中选择。

如何选择"最好"的候选者?

Pareto 前沿确实可能包含多个候选者,但没有单一的"最好",因为每个候选者在不同样本上表现不同。选择策略取决于你的目标:

策略 1: 平均分数最高(推荐用于通用场景)

# 计算每个候选者在验证集上的平均分数
best_idx = None
best_avg_score = -1

for idx in range(len(state.all_candidates)):
    avg_score = state.get_avg_val_score(idx)
    if avg_score > best_avg_score:
        best_avg_score = avg_score
        best_idx = idx

best_candidate = state.all_candidates[best_idx]

使用场景:需要一个在所有任务上平均表现最好的候选者

示例:

候选者 A: [0.9, 0.9, 0.9, 0.9] → 平均 0.9
候选者 B: [1.0, 1.0, 0.5, 0.5] → 平均 0.75

选择: A(虽然 B 在某些样本上更好,但 A 更稳定)

策略 2: 特定任务专用

# 选择在你关心的任务类型上表现最好的
task_type = "math"  # 例如:只关心数学题
best_idx = None
best_score = -1

for idx in range(len(state.all_candidates)):
    # 只看数学样本的分数
    math_scores = [state.val_scores[idx][str(i)]
                   for i in math_sample_indices]
    avg_math_score = sum(math_scores) / len(math_scores)
    if avg_math_score > best_score:
        best_score = avg_math_score
        best_idx = idx

使用场景:你的系统主要处理特定类型的任务

策略 3: 鲁棒性优先

# 选择最小分数最高的(最坏情况下的最好表现)
best_idx = None
best_min_score = -1

for idx in range(len(state.all_candidates)):
    min_score = min(state.val_scores[idx].values())
    if min_score > best_min_score:
        best_min_score = min_score
        best_idx = idx

示例:

候选者 A: [0.8, 0.8, 0.8, 0.8] → 最小 0.8
候选者 B: [1.0, 1.0, 0.3, 0.3] → 最小 0.3

选择: A(更鲁棒,不会出现特别差的情况)

策略 4: 集成使用多个候选者

# 保留整个 Pareto 前沿,运行时动态选择
pareto_front = state.get_pareto_front_mapping()

def select_candidate_for_input(input_text):
    # 根据输入特征选择最合适的候选者
    if is_math_problem(input_text):
        return pareto_front["math_sample_0"].pop()
    else:
        return pareto_front["other_sample_0"].pop()

使用场景:可以在运行时根据输入动态选择候选者

优化完成后的最终选择

import gepa

result = gepa.optimize(
    seed_candidate={"prompt": "..."},
    trainset=train_data,
    valset=val_data,
    max_metric_calls=150
)

# 方式1: 直接使用最佳候选者(默认,基于平均分数)
best_candidate = result.best_program

# 方式2: 从状态中手动选择
state = result.state

# 找平均分数最高的
best_idx = max(
    range(len(state.all_candidates)),
    key=lambda i: state.get_avg_val_score(i)
)
best_candidate = state.all_candidates[best_idx]

# 方式3: 探索整个 Pareto 前沿
pareto_mapping = state.get_pareto_front_mapping()
# 根据你的需求选择合适的候选者

候选者数量的实际情况

虽然理论上前沿可能很大,但实际情况中:

验证集大小: 100 个样本
前沿类型: instance

理论最大值: 每个样本有不同的最优者 → 100 个候选者
实际情况: 通常 5-20 个候选者就能覆盖前沿

原因:

  1. 优秀候选者往往在多个样本上都表现好
  2. 接受标准(必须比父代更好)限制了增长速度
  3. 合并策略会整合不同候选者的优势

推荐做法

场景推荐策略
通用系统平均分数最高
专用系统特定任务分数最高
关键应用最小分数最高(最鲁棒)
研究/分析保留整个前沿,分析差异

Pareto 前沿更新流程

┌─────────────────────────────────────────────────────────────────────────┐
│                       Pareto 前沿更新流程                                │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  1. 新候选者被接受                                                      │
│     └→ 在训练集 minibatch 上表现更好                                    │
│                                                                         │
│  2. 在验证集上完整评估                                                   │
│     └→ 计算每个样本的分数                                               │
│                                                                         │
│  3. 更新 Pareto 前沿                                                    │
│     └→ 比较新候选者与现有前沿                                           │
│                                                                         │
│  4. 被替代的候选者                                                      │
│     └→ 从前沿移除,但保留在候选池中                                     │
│                                                                         │
│  5. 下一轮选择                                                          │
│     └→ 从更新后的 Pareto 前沿选择                                       │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

Pareto 前沿更新详解

数据结构

Pareto 前沿使用两个核心数据结构:

# src/gepa/core/state.py

# 1. 记录每个验证样本上的最高分数
pareto_front_valset: dict[str, float] = {}
# 例如: {0: 0.8, 1: 1.0, 2: 0.6}
#      key: 验证样本索引, value: 该样本上的最高分数

# 2. 记录每个验证样本上达到最高分数的候选者集合
program_at_pareto_front_valset: dict[str, set[int]] = {}
# 例如: {0: {1, 3}, 1: {2}, 2: {0, 1, 3}}
#      key: 验证样本索引, value: 候选者索引集合

更新函数实现

def _update_pareto_front_for_val_id(
    self,
    val_id: str,        # 验证样本索引
    score: float,       # 新候选者在该样本上的分数
    program_idx: int    # 新候选者的索引
) -> tuple[bool, bool]:
    """
    更新单个验证样本的 Pareto 前沿。

    返回: (is_pareto_updated, is_new_pareto)
        - is_pareto_updated: 前沿是否发生变化
        - is_new_pareto: 新候选者是否进入前沿
    """
    # 获取该样本当前的最高分数
    prev_score = self.pareto_front_valset.get(val_id, float("-inf"))

    if score > prev_score:
        # 情况1: 新候选者表现更好,完全替换
        self.pareto_front_valset[val_id] = score
        self.program_at_pareto_front_valset[val_id] = {program_idx}
        return True, True

    elif score == prev_score:
        # 情况2: 与现有最高分数持平,加入集合
        pareto_front = self.program_at_pareto_front_valset.setdefault(val_id, set())
        is_new = program_idx not in pareto_front
        pareto_front.add(program_idx)
        return True, is_new

    else:
        # 情况3: 分数更低,不更新前沿
        return False, False

完整更新流程

当一个新候选者(索引为 5)通过接受测试后,在验证集上进行评估:

# 1. 在验证集上评估新候选者
val_scores = [0.9, 0.7, 0.8, 0.6, 0.9]  # 候选者5在5个验证样本上的分数

# 2. 逐个更新 Pareto 前沿
for val_id, score in enumerate(val_scores):
    is_updated, is_new = state._update_pareto_front_for_val_id(
        val_id=str(val_id),
        score=score,
        program_idx=5
    )

具体示例

假设当前 Pareto 前沿状态:

# 当前状态
pareto_front_valset = {
    "0": 0.8,   # 样本0最高分: 0.8
    "1": 1.0,   # 样本1最高分: 1.0
    "2": 0.6    # 样本2最高分: 0.6
}

program_at_pareto_front_valset = {
    "0": {1, 3},  # 样本0: 候选者1和3达到0.8
    "1": {2},     # 样本1: 候选者2达到1.0
    "2": {0}      # 样本2: 候选者0达到0.6
}

新候选者(索引5)在验证集上的分数:[0.9, 0.7, 0.6]

更新样本0 (score=0.9 > 0.8):

# 新分数更高,替换
pareto_front_valset["0"] = 0.9
program_at_pareto_front_valset["0"] = {5}
# 候选者1和3被从前沿移除

更新样本1 (score=0.7 < 1.0):

# 新分数更低,不更新
# 前沿保持不变: {2}

更新样本2 (score=0.6 == 0.6):

# 新分数持平,加入集合
program_at_pareto_front_valset["2"] = {0, 5}
# 候选者0和5并列最高

最终状态:

pareto_front_valset = {
    "0": 0.9,   # 提高了!
    "1": 1.0,   # 不变
    "2": 0.6    # 不变
}

program_at_pareto_front_valset = {
    "0": {5},     # 被替换
    "1": {2},     # 不变
    "2": {0, 5}   # 扩展
}

验证集评估的完整代码

# src/gepa/core/engine.py

def _evaluate_and_update_pareto_front_for_new_program(
    self,
    new_candidate: dict[str, str],
    new_candidate_idx: int,
    state: GEPAState
) -> None:
    """评估新候选者并更新 Pareto 前沿"""

    # 1. 在验证集上评估
    val_batch = self.valset  # 完整验证集
    val_eval_result = self.adapter.evaluate(
        batch=val_batch,
        candidate=new_candidate
    )

    # 2. 更新每个验证样本的分数记录
    for val_id, score in enumerate(val_eval_result.scores):
        state.update_val_score(
            program_idx=new_candidate_idx,
            val_id=str(val_id),
            score=score
        )

    # 3. 更新 Pareto 前沿
    for val_id, score in enumerate(val_eval_result.scores):
        state._update_pareto_front_for_val_id(
            val_id=str(val_id),
            score=score,
            program_idx=new_candidate_idx
        )

关键要点

  1. 逐样本更新:Pareto 前沿是针对每个验证样本独立更新的,不是基于平均分数
  2. 更高则替换,相等则加入:
    • 新分数 > 旧最高分:完全替换
    • 新分数 == 旧最高分:加入集合
    • 新分数 < 旧最高分:不更新
  3. 被替换的候选者:从前沿移除但保留在候选池中,可能在其他样本上仍然是 Pareto 最优
  4. 多候选者前沿:同一个样本可以有多个候选者并列最高分

代码示例

使用 Pareto 前沿

import gepa

result = gepa.optimize(
    seed_candidate={"prompt": "You are helpful."},
    trainset=train_data,
    valset=val_data,

    # Pareto 配置
    frontier_type="instance",           # 前沿类型
    candidate_selection_strategy="pareto",  # 选择策略

    # 其他配置
    max_metric_calls=150
)

查看前沿

# 获取 Pareto 前沿
pareto_front = result.state.get_pareto_front_mapping()

print("Pareto 前沿:")
for key, candidates in pareto_front.items():
    print(f"  {key}: {candidates}")

为什么需要 Pareto 前沿?

1. 避免局部最优

没有 Pareto 前沿:
Candidate A (平均 0.7) → Candidate B (平均 0.75) → Candidate C (平均 0.8)
                                        ↑
                              陷入局部最优

有 Pareto 前沿:
Pareto 前沿 = {A, B, C, D, E}
  → 从多个方向探索
  → 更可能找到全局最优

2. 利用互补优势

Candidate A: 擅长数学题,不擅长语文题
Candidate B: 擅长语文题,不擅长数学题

Pareto 前沿 = {A, B}

通过合并 A 和 B 的优势 → Candidate C (两者都擅长)

3. 处理异构数据

# 不同类型的样本
samples = [
    {"type": "math", "input": "1+1=?", ...},
    {"type": "coding", "input": "Write code...", ...},
    {"type": "writing", "input": "Write essay...", ...}
]

# Pareto 前沿
{
    "math_sample_1": {candidate_3},     # 数学专家
    "coding_sample_2": {candidate_1},   # 编程专家
    "writing_sample_3": {candidate_5}   # 写作专家
}

合并策略

当两个候选者在不同任务上表现优秀时,可以合并它们:

result = gepa.optimize(
    seed_candidate={"prompt": "..."},
    trainset=train_data,
    valset=val_data,

    # 启用合并
    use_merge=True,
    max_merge_invocations=5,     # 最多合并 5 次
    merge_val_overlap_floor=5,  # 至少 5 个共享样本

    max_metric_calls=150
)

关键优势

特性单一候选者Pareto 前沿
探索能力局限全局
鲁棒性低高
多样性单一多样
合并机会无有

类比神经网络

概念神经网络GEPA
模型选择单一模型候选者集合
优化目标单点最优Pareto 最优
集成学习Model EnsemblePareto 前沿 + 合并
过拟合容易通过多样性缓解

监控 Pareto 前沿

from gepa.core.callbacks import GEPACallback

class ParetoMonitor(GEPACallback):
    def on_pareto_front_updated(self, event):
        print(f"Iteration {event['iteration']}:")
        print(f"  New front: {event['new_front']}")
        print(f"  Displaced: {event['displaced_candidates']}")

# 使用
result = gepa.optimize(
    ...,
    callbacks=[ParetoMonitor()]
)

最佳实践

  1. 选择合适的前沿类型
    • 小数据集:instance
    • 多目标:objective
    • 复杂任务:hybrid
  2. 平衡探索与利用
    • 探索:pareto 选择
    • 利用:current_best 选择
    • 混合:epsilon_greedy 选择
  3. 合理配置合并
    • 确保有足够的共享样本
    • 避免过早合并
    • 监控合并效果

下一步

  • 与神经网络对比 - 深入理解 GEPA 的设计理念
  • 候选者选择策略 - 详细了解选择策略

本页目录

  • GEPA Pareto 前沿机制
  • 什么是 Pareto 前沿?
    • 重要:分数 vs Loss
  • Pareto 前沿类型
    • 1. Instance (默认)
    • 2. Objective
    • 3. Hybrid
    • 4. Cartesian
  • 候选者选择策略
    • Pareto 选择 (默认)
    • 当前最佳选择
    • Epsilon-Greedy 选择
    • Top-K Pareto 选择
  • 如何选择"最好"的候选者?
    • 策略 1: 平均分数最高(推荐用于通用场景)
    • 策略 2: 特定任务专用
    • 策略 3: 鲁棒性优先
    • 策略 4: 集成使用多个候选者
    • 优化完成后的最终选择
    • 候选者数量的实际情况
    • 推荐做法
  • Pareto 前沿更新流程
  • Pareto 前沿更新详解
    • 数据结构
    • 更新函数实现
    • 完整更新流程
    • 具体示例
    • 验证集评估的完整代码
    • 关键要点
  • 代码示例
    • 使用 Pareto 前沿
    • 查看前沿
  • 为什么需要 Pareto 前沿?
    • 1. 避免局部最优
    • 2. 利用互补优势
    • 3. 处理异构数据
  • 合并策略
  • 关键优势
  • 类比神经网络
  • 监控 Pareto 前沿
  • 最佳实践
  • 下一步

评论