Skip to main content

Custom Stream evaluators

Evaluators are responsible for the calculation of metrics. To help you get started quickly, Steel Thread provides a built-in LLMJudgeEvaluator for stream based evaluation using LLM-as-Judge. This explained in the previous section on basic usage.

You can add your own custom Stream evaluators, be it LLM-as-Judge or deterministic ones. StreamEvaluator can implement two methods:

from steelthread.streams import PlanStreamItem, PlanRunStreamItem, StreamMetric,
class MyStreamEvaluator(StreamEvaluator):
def process_plan(self, stream_item: PlanStreamItem) -> list[StreamMetric] | StreamMetric:
...

def process_plan_run(self, stream_item: PlanRunStreamItem) -> list[StreamMetric] | StreamMetric | None:
...

Below are two examples of custom evaluators, both using LLM-as-Judge or deterministic

You can use an LLM to score plan runs automatically. When you subclass StreamEvaluator, you first initialise your LLMScorer and then define how you want to process plans / plan runs with this evaluator.

from portia import Config

from steelthread.steelthread import SteelThread

from steelthread.streams import (
StreamConfig,
PlanRunStreamItem,
StreamEvaluator,
StreamMetric
)
from steelthread.utils.llm import LLMScorer, MetricOnly


class LLMVerbosityJudge(StreamEvaluator):
def __init__(self, config):
self.scorer = LLMScorer(config)

def process_plan_run(self, stream_item: PlanRunStreamItem):
# The stream_item object holds the underlying plan / plan run being evaluated.
task_data = stream_item.plan_run.model_dump_json()
# The description is used to inform the LLM on how to score the metric.
metrics = self.scorer.score(
task_data=[task_data],
metrics_to_score=[
MetricOnly(
name="verbosity",
description="Scores 0 if the answer is too verbose. 0 otherwise."),
],
)

return [
StreamMetric.from_stream_item(
stream_item=stream_item,
score=m.score,
name=m.name,
description=m.description,
explanation=m.explanation,
)
for m in metrics
]

# Setup config + Steel Thread
config = Config.from_default()

# To use your evaluator, pass it to the runner
SteelThread().process_stream(
StreamConfig(
eval_dataset_name="your-stream-name-here",
config=config,
evaluators=[LLMVerbosityJudge(config)],
),
)