import dspy
from roma_dspy import Aggregator, Atomizer, Executor, Planner, Verifier, SubTask
from roma_dspy.types import TaskType
# Optional tool that the Executor may call
def get_weather(city: str) -> str:
"""Return a canned weather report for the city."""
return f"The weather in {city} is sunny."
# Executor geared toward ReAct with a Fireworks model
executor_lm = dspy.LM(
"fireworks_ai/accounts/fireworks/models/kimi-k2-instruct-0905",
temperature=0.7,
cache=True,
)
executor = Executor(
lm=executor_lm,
prediction_strategy="react",
tools=[get_weather],
context_defaults={"track_usage": True},
)
# Atomizer decides when to branch into planning
atomizer = Atomizer(
lm=dspy.LM("openrouter/google/gemini-2.5-flash", temperature=0.6, cache=False),
prediction_strategy="cot",
context_defaults={"track_usage": True},
)
# Planner produces executable subtasks for non-atomic goals
planner = Planner(
lm=dspy.LM("openrouter/openai/gpt-4o-mini", temperature=0.85, cache=True),
prediction_strategy="cot",
context_defaults={"track_usage": True},
)
aggregator = Aggregator(
lm=dspy.LM("openrouter/openai/gpt-4o-mini", temperature=0.65),
prediction_strategy="cot",
)
verifier = Verifier(
lm=dspy.LM("openrouter/openai/gpt-4o-mini", temperature=0.0),
)
def run_pipeline(goal: str) -> str:
atomized = atomizer.forward(goal)
if atomized.is_atomic or atomized.node_type.is_execute:
execution = executor.forward(goal)
candidate = execution.output
else:
plan = planner.forward(goal)
results = []
for idx, subtask in enumerate(plan.subtasks, start=1):
execution = executor.forward(subtask.goal)
results.append(
SubTask(
goal=subtask.goal,
task_type=subtask.task_type,
dependencies=subtask.dependencies,
)
)
aggregated = aggregator.forward(goal, results)
candidate = aggregated.synthesized_result
verdict = verifier.forward(goal, candidate)
if verdict.verdict:
return candidate
return f"Verifier flagged the output: {verdict.feedback or 'no feedback returned'}"
print(run_pipeline("Plan a weekend in Barcelona and include a packing list."))
Open AGI Summit
빌더, 창업자, 연구자들이 오픈소스 AI의 미래를 논의하는 공간입니다.
AGI는 모두에게 열려 있고 접근 가능해야 한다는 공동의 신념으로 연결된 연구자, 개발자, 커뮤니티를 한자리에 모으고 있습니다.