Vision-Language Model (VLM) Training Example
This example demonstrates how to train Vision-Language Models (VLMs) using rLLM's workflow framework and VeRL's backend. We use the Geometry3K dataset to train a multimodal agent that can solve geometry problems by reasoning over both images and text.
Overview
The VLM training example demonstrates:
- How to implement multimodal workflows that process both images and text
- How to integrate VLMs with rLLM's training pipeline
- How to evaluate multimodal reasoning performance on mathematical tasks
Quick Start
Model Hosting
Start a vLLM or SGLang server for a vision-language model:
CUDA_VISIBLE_DEVICES=0 python -m sglang.launch_server \
--model-path Qwen/Qwen3-VL-2B-Instruct \
--host 0.0.0.0 \
--port 30000
The server should be accessible at http://localhost:30000/v1
Dataset Preparation
Prepare the Geometry3K dataset:
This will:
- Download the
hiyouga/geometry3kdataset from HuggingFace - Process geometry problems with images and text
- Register the dataset with rLLM's DatasetRegistry
- Save processed data for training and evaluation
Run VLM Agent
Execute the VLM agent on geometry problems:
This will:
- Load the Geometry3K test dataset
- Run the VLM workflow on each problem
- Evaluate performance using pass@1 and pass@k metrics
- Save results to
logs/geo3k.json
Train VLM Agent
Train your own VLM agent using reinforcement learning:
Training Configuration:
- Base model:
Qwen/Qwen3-VL-2B-Instruct - Algorithm: GRPO (Group Relative Policy Optimization)
- Training batch size: 32
- Response length: Up to 2048 tokens
- Number of GPUs: 8 (configurable)
- Training epochs: 3
Results:

Code Reference
VLM Workflow Implementation
The multimodal workflow that handles image and text inputs:
import base64
from io import BytesIO
from PIL import Image
from rllm.agents.agent import Action, Episode, Step, Trajectory
from rllm.engine import ModelOutput, RolloutEngine
from rllm.rewards.reward_fn import RewardFunction, math_reward_fn
from rllm.workflows.simple_workflow import SimpleAgent
from rllm.workflows.workflow import TerminationEvent, TerminationReason, Workflow
class Geo3KWorkflow(Workflow):
def __init__(self, rollout_engine: RolloutEngine, reward_function: RewardFunction = None, encode_as_base64: bool = False, **kwargs):
super().__init__(rollout_engine, **kwargs)
self.agent = SimpleAgent()
self.reward_fn: RewardFunction = reward_function or math_reward_fn
self.encode_as_base64 = encode_as_base64
async def run(self, task: dict, uid: str, **kwargs) -> Episode:
self.reset(task, uid)
question = task.get("question")
image = task.get("image", task.get("images", None))
if isinstance(image, list) and len(image) > 0:
image = image[0]
if isinstance(image, dict) and "bytes" in image:
image = Image.open(BytesIO(image["bytes"]))
assert isinstance(image, Image.Image) or image is None, f"Image must be a PIL.Image.Image, but got {type(image)}"
if self.encode_as_base64 and image is not None:
# format as openai compatible base64 encoded image
image = image.convert("RGB")
buffer = BytesIO()
image.save(buffer, format="JPEG")
image_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": question},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}},
],
}
]
elif image is not None:
messages = [{"role": "user", "content": question, "images": [image]}]
else:
messages = [{"role": "user", "content": question}]
output: ModelOutput = await self.rollout_engine.get_model_response(messages, application_id=uid, **kwargs)
action = Action(output.content)
reward_result = self.reward_fn(task, action)
trajectory: Trajectory = self.agent.trajectory
trajectory.steps.append(
Step(
chat_completions=messages + [{"role": "assistant", "content": output.content, "reasoning": output.reasoning}],
thought=output.reasoning,
action=action,
reward=reward_result.reward,
model_output=output,
)
)
self.commit(agent=self.agent, reset=True)
if output.finish_reason == "length":
raise TerminationEvent(TerminationReason.MAX_RESPONSE_LENGTH_EXCEEDED)
raise TerminationEvent(TerminationReason.ENV_DONE)
Dataset Preprocessing
Script for preparing the Geometry3K dataset:
from datasets import load_dataset
from rllm.data.dataset import DatasetRegistry
def prepare_geo3k_data():
# Load dataset
dataset = load_dataset("hiyouga/geometry3k")
train_dataset = dataset["train"]
test_dataset = dataset["test"]
# instruction_following = (
# r"You FIRST think about the reasoning process as an internal monologue and then provide the final answer. "
# r"The reasoning process MUST BE enclosed within <think> </think> tags. "
# r"The final answer MUST BE put in \boxed{}."
# )
instruction_following = "Let's think step by step and output your final answer in \\boxed{}."
def process_fn(example, idx):
problem = example.pop("problem")
prompt = problem + instruction_following
answer = example.pop("answer")
image = example.pop("images")
data = {
"idx": idx,
"data_source": "geo3k",
"image": image,
"question": prompt,
"ground_truth": answer,
}
return data
# Preprocess datasets
train_dataset = train_dataset.map(function=process_fn, with_indices=True, num_proc=8)
test_dataset = test_dataset.map(function=process_fn, with_indices=True, num_proc=8)
# Register datasets
train_dataset = DatasetRegistry.register_dataset("geo3k", train_dataset, "train")
test_dataset = DatasetRegistry.register_dataset("geo3k", test_dataset, "test")
return train_dataset, test_dataset
if __name__ == "__main__":
train_dataset, test_dataset = prepare_geo3k_data()
print(train_dataset.get_data_path())
print(test_dataset.get_data_path())
Evaluation Script
Main script for running the VLM workflow:
import asyncio
import json
import os
from copy import deepcopy
from geo3k_workflow import Geo3KWorkflow
from rllm.data.dataset import DatasetRegistry
from rllm.engine import AgentWorkflowEngine, OpenAIEngine
from rllm.rewards.reward_fn import math_reward_fn
def load_data(n=1):
"""Load geo3k data using the Dataset interface."""
dataset = DatasetRegistry.load_dataset("geo3k", "test")
if dataset is None:
print("Dataset not found, preparing dataset...")
from prepare_geo3k_data import preprocess_geo3k_data
_, dataset = preprocess_geo3k_data()
data = []
for idx, example in enumerate(dataset):
for i in range(n):
data.append(deepcopy(example))
return data
def evaluate_results(results):
"""Evaluate the results and compute pass@k metrics."""
from collections import defaultdict
# Create a map to store correct answers per problem
problem_correct_map = defaultdict(int)
problem_total_map = defaultdict(int)
# Count correct answers for each problem
for episode in results:
idx = episode.task["idx"]
# Use the episode-level is_correct flag set by the workflow
is_correct = episode.is_correct
problem_correct_map[idx] += int(is_correct)
problem_total_map[idx] += 1
# Calculate pass@1 and pass@k
k = max(problem_total_map.values()) if problem_total_map else 1
total_problems = len(problem_correct_map)
if total_problems > 0:
pass_at_1 = sum(problem_correct_map.values()) / sum(problem_total_map.values())
pass_at_k = sum(1 for idx, correct in problem_correct_map.items() if correct > 0) / total_problems
else:
pass_at_1 = 0.0
pass_at_k = 0.0
print("Total unique problems:", total_problems)
print("Average Pass@1 Accuracy:", pass_at_1)
print(f"Average Pass@{k} Accuracy:", pass_at_k)
if __name__ == "__main__":
import os
os.environ["TOKENIZERS_PARALLELISM"] = "true"
n_parallel_tasks = 128
model_name = "Qwen/Qwen3-VL-2B-Instruct"
rollout_engine = OpenAIEngine(
model=model_name,
max_prompt_length=1024,
max_response_length=2048,
base_url="http://localhost:30000/v1",
api_key="None",
sampling_params={"temperature": 0.6, "top_p": 0.95},
)
engine = AgentWorkflowEngine(
workflow_cls=Geo3KWorkflow,
workflow_args={
"reward_function": math_reward_fn,
"encode_as_base64": True,
},
rollout_engine=rollout_engine,
config=None,
n_parallel_tasks=n_parallel_tasks,
retry_limit=1,
)
tasks = load_data(n=4)
print(f"Loaded {len(tasks)} geo3k tasks")
results = asyncio.run(engine.execute_tasks(tasks))
# Evaluate results (rewards are already assigned in the workflow)
print("Evaluating results...")
evaluate_results(results)
# Save results
os.makedirs("logs", exist_ok=True)
with open("logs/geo3k.json", "w") as f:
json.dump([episode.to_dict() for episode in results], f, indent=4)
print("\nResults saved to logs/geo3k.json")
Training Script
Training configuration using the VLM workflow:
import hydra
from examples.geo3k.geo3k_workflow import Geo3KWorkflow
from rllm.data.dataset import DatasetRegistry
from rllm.rewards.reward_fn import math_reward_fn
from rllm.trainer.agent_trainer import AgentTrainer
@hydra.main(config_path="pkg://rllm.trainer.config", config_name="agent_ppo_trainer", version_base=None)
def main(config):
train_dataset = DatasetRegistry.load_dataset("geo3k", "train")
test_dataset = DatasetRegistry.load_dataset("geo3k", "test")
trainer = AgentTrainer(
workflow_class=Geo3KWorkflow,
workflow_args={
"reward_function": math_reward_fn,
},
config=config,
train_dataset=train_dataset,
val_dataset=test_dataset,
)
trainer.train()
if __name__ == "__main__":
main()