Skip to content

Vision-Language Model (VLM) Training Example

This example demonstrates how to train Vision-Language Models (VLMs) using rLLM's workflow framework and VeRL's backend. We use the Geometry3K dataset to train a multimodal agent that can solve geometry problems by reasoning over both images and text.

Overview

The VLM training example demonstrates:

  • How to implement multimodal workflows that process both images and text
  • How to integrate VLMs with rLLM's training pipeline
  • How to evaluate multimodal reasoning performance on mathematical tasks

Quick Start

Model Hosting

Start a vLLM or SGLang server for a vision-language model:

CUDA_VISIBLE_DEVICES=0 python -m sglang.launch_server \
    --model-path Qwen/Qwen3-VL-2B-Instruct \
    --host 0.0.0.0 \
    --port 30000

The server should be accessible at http://localhost:30000/v1

Dataset Preparation

Prepare the Geometry3K dataset:

cd examples/geo3k
python preprocess_geo3k.py

This will:

  • Download the hiyouga/geometry3k dataset from HuggingFace
  • Process geometry problems with images and text
  • Register the dataset with rLLM's DatasetRegistry
  • Save processed data for training and evaluation

Run VLM Agent

Execute the VLM agent on geometry problems:

cd examples/geo3k
python run_geo3k.py

This will:

  • Load the Geometry3K test dataset
  • Run the VLM workflow on each problem
  • Evaluate performance using pass@1 and pass@k metrics
  • Save results to logs/geo3k.json

Train VLM Agent

Train your own VLM agent using reinforcement learning:

cd examples/geo3k
bash train_geo3k.sh

Training Configuration:

  • Base model: Qwen/Qwen3-VL-2B-Instruct
  • Algorithm: GRPO (Group Relative Policy Optimization)
  • Training batch size: 32
  • Response length: Up to 2048 tokens
  • Number of GPUs: 8 (configurable)
  • Training epochs: 3

Results: vlm accuracy

Code Reference

VLM Workflow Implementation

The multimodal workflow that handles image and text inputs:

examples/geo3k/geo3k_workflow.py
import base64
from io import BytesIO

from PIL import Image

from rllm.agents.agent import Action, Episode, Step, Trajectory
from rllm.engine import ModelOutput, RolloutEngine
from rllm.rewards.reward_fn import RewardFunction, math_reward_fn
from rllm.workflows.simple_workflow import SimpleAgent
from rllm.workflows.workflow import TerminationEvent, TerminationReason, Workflow


class Geo3KWorkflow(Workflow):
    def __init__(self, rollout_engine: RolloutEngine, reward_function: RewardFunction = None, encode_as_base64: bool = False, **kwargs):
        super().__init__(rollout_engine, **kwargs)
        self.agent = SimpleAgent()
        self.reward_fn: RewardFunction = reward_function or math_reward_fn
        self.encode_as_base64 = encode_as_base64

    async def run(self, task: dict, uid: str, **kwargs) -> Episode:
        self.reset(task, uid)

        question = task.get("question")
        image = task.get("image", task.get("images", None))
        if isinstance(image, list) and len(image) > 0:
            image = image[0]
        if isinstance(image, dict) and "bytes" in image:
            image = Image.open(BytesIO(image["bytes"]))
        assert isinstance(image, Image.Image) or image is None, f"Image must be a PIL.Image.Image, but got {type(image)}"

        if self.encode_as_base64 and image is not None:
            # format as openai compatible base64 encoded image
            image = image.convert("RGB")
            buffer = BytesIO()
            image.save(buffer, format="JPEG")
            image_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
            messages = [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": question},
                        {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}},
                    ],
                }
            ]
        elif image is not None:
            messages = [{"role": "user", "content": question, "images": [image]}]
        else:
            messages = [{"role": "user", "content": question}]

        output: ModelOutput = await self.rollout_engine.get_model_response(messages, application_id=uid, **kwargs)
        action = Action(output.content)
        reward_result = self.reward_fn(task, action)

        trajectory: Trajectory = self.agent.trajectory
        trajectory.steps.append(
            Step(
                chat_completions=messages + [{"role": "assistant", "content": output.content, "reasoning": output.reasoning}],
                thought=output.reasoning,
                action=action,
                reward=reward_result.reward,
                model_output=output,
            )
        )

        self.commit(agent=self.agent, reset=True)

        if output.finish_reason == "length":
            raise TerminationEvent(TerminationReason.MAX_RESPONSE_LENGTH_EXCEEDED)

        raise TerminationEvent(TerminationReason.ENV_DONE)

Dataset Preprocessing

Script for preparing the Geometry3K dataset:

examples/geo3k/preprocess_geo3k.py
from datasets import load_dataset

from rllm.data.dataset import DatasetRegistry


def prepare_geo3k_data():
    # Load dataset
    dataset = load_dataset("hiyouga/geometry3k")
    train_dataset = dataset["train"]
    test_dataset = dataset["test"]

    # instruction_following = (
    #     r"You FIRST think about the reasoning process as an internal monologue and then provide the final answer. "
    #     r"The reasoning process MUST BE enclosed within <think> </think> tags. "
    #     r"The final answer MUST BE put in \boxed{}."
    # )

    instruction_following = "Let's think step by step and output your final answer in \\boxed{}."

    def process_fn(example, idx):
        problem = example.pop("problem")
        prompt = problem + instruction_following
        answer = example.pop("answer")
        image = example.pop("images")

        data = {
            "idx": idx,
            "data_source": "geo3k",
            "image": image,
            "question": prompt,
            "ground_truth": answer,
        }
        return data

    # Preprocess datasets
    train_dataset = train_dataset.map(function=process_fn, with_indices=True, num_proc=8)
    test_dataset = test_dataset.map(function=process_fn, with_indices=True, num_proc=8)

    # Register datasets
    train_dataset = DatasetRegistry.register_dataset("geo3k", train_dataset, "train")
    test_dataset = DatasetRegistry.register_dataset("geo3k", test_dataset, "test")

    return train_dataset, test_dataset


if __name__ == "__main__":
    train_dataset, test_dataset = prepare_geo3k_data()
    print(train_dataset.get_data_path())
    print(test_dataset.get_data_path())

Evaluation Script

Main script for running the VLM workflow:

examples/geo3k/run_geo3k.py
import asyncio
import json
import os
from copy import deepcopy

from geo3k_workflow import Geo3KWorkflow

from rllm.data.dataset import DatasetRegistry
from rllm.engine import AgentWorkflowEngine, OpenAIEngine
from rllm.rewards.reward_fn import math_reward_fn


def load_data(n=1):
    """Load geo3k data using the Dataset interface."""
    dataset = DatasetRegistry.load_dataset("geo3k", "test")
    if dataset is None:
        print("Dataset not found, preparing dataset...")
        from prepare_geo3k_data import preprocess_geo3k_data

        _, dataset = preprocess_geo3k_data()

    data = []
    for idx, example in enumerate(dataset):
        for i in range(n):
            data.append(deepcopy(example))
    return data


def evaluate_results(results):
    """Evaluate the results and compute pass@k metrics."""
    from collections import defaultdict

    # Create a map to store correct answers per problem
    problem_correct_map = defaultdict(int)
    problem_total_map = defaultdict(int)

    # Count correct answers for each problem
    for episode in results:
        idx = episode.task["idx"]

        # Use the episode-level is_correct flag set by the workflow
        is_correct = episode.is_correct

        problem_correct_map[idx] += int(is_correct)
        problem_total_map[idx] += 1

    # Calculate pass@1 and pass@k
    k = max(problem_total_map.values()) if problem_total_map else 1
    total_problems = len(problem_correct_map)

    if total_problems > 0:
        pass_at_1 = sum(problem_correct_map.values()) / sum(problem_total_map.values())
        pass_at_k = sum(1 for idx, correct in problem_correct_map.items() if correct > 0) / total_problems
    else:
        pass_at_1 = 0.0
        pass_at_k = 0.0

    print("Total unique problems:", total_problems)
    print("Average Pass@1 Accuracy:", pass_at_1)
    print(f"Average Pass@{k} Accuracy:", pass_at_k)


if __name__ == "__main__":
    import os

    os.environ["TOKENIZERS_PARALLELISM"] = "true"

    n_parallel_tasks = 128
    model_name = "Qwen/Qwen3-VL-2B-Instruct"

    rollout_engine = OpenAIEngine(
        model=model_name,
        max_prompt_length=1024,
        max_response_length=2048,
        base_url="http://localhost:30000/v1",
        api_key="None",
        sampling_params={"temperature": 0.6, "top_p": 0.95},
    )

    engine = AgentWorkflowEngine(
        workflow_cls=Geo3KWorkflow,
        workflow_args={
            "reward_function": math_reward_fn,
            "encode_as_base64": True,
        },
        rollout_engine=rollout_engine,
        config=None,
        n_parallel_tasks=n_parallel_tasks,
        retry_limit=1,
    )

    tasks = load_data(n=4)
    print(f"Loaded {len(tasks)} geo3k tasks")

    results = asyncio.run(engine.execute_tasks(tasks))

    # Evaluate results (rewards are already assigned in the workflow)
    print("Evaluating results...")
    evaluate_results(results)

    # Save results
    os.makedirs("logs", exist_ok=True)
    with open("logs/geo3k.json", "w") as f:
        json.dump([episode.to_dict() for episode in results], f, indent=4)

    print("\nResults saved to logs/geo3k.json")

Training Script

Training configuration using the VLM workflow:

examples/geo3k/train_geo3k.py
import hydra

from examples.geo3k.geo3k_workflow import Geo3KWorkflow
from rllm.data.dataset import DatasetRegistry
from rllm.rewards.reward_fn import math_reward_fn
from rllm.trainer.agent_trainer import AgentTrainer


@hydra.main(config_path="pkg://rllm.trainer.config", config_name="agent_ppo_trainer", version_base=None)
def main(config):
    train_dataset = DatasetRegistry.load_dataset("geo3k", "train")
    test_dataset = DatasetRegistry.load_dataset("geo3k", "test")

    trainer = AgentTrainer(
        workflow_class=Geo3KWorkflow,
        workflow_args={
            "reward_function": math_reward_fn,
        },
        config=config,
        train_dataset=train_dataset,
        val_dataset=test_dataset,
    )
    trainer.train()


if __name__ == "__main__":
    main()