Skip to main content

Overview

The Coordinator directory contains the core implementation of the Iris Classification Coordinator - the system that ties all pieces of the Crunch together. This directory demonstrates the three fundamental responsibilities that every Coordinator must implement.

Architecture

The Coordinator follows a modular architecture where each component has a single, well-defined purpose:
coordinator/
├── coordinator.py          # Main Coordinator orchestration logic
├── data_manager.py         # Data Management (Responsibility 1)
├── model_orchestrator.py   # Model Orchestration (Responsibility 2)
├── scoring_engine.py       # Scoring & Rewards (Responsibility 3)
├── main.py                 # Entry point for running the coordinator

The Three Core Responsibilities

1. Data Management

Manage all data operations for the iris classification challenge. Key Functions:
  • Load the iris dataset from scikit-learn
  • Split data into training and prediction sets (stratified sampling)
  • Save datasets as CSV files locally.
  • Select what data to use when calling the train and infer functions of the models.
  • Handle data transformations and formatting

2. Model Orchestration

Administer game rules and coordinate communication with the distributed model cluster. Key Functions:
  • Initialize connections to model cluster via gRPC
  • Send training data to all available models
  • Collect predictions from all models
  • Handle cluster availability and failover scenarios
  • Manage model lifecycle (start, train, predict, cleanup)
"""
Model Orchestration Module

Handles all model interaction with the model cluster:
training data distribution and prediction collection.
"""

from typing import Dict, Optional

import pandas as pd
import numpy as np
from model_runner_client.model_concurrent_runners.dynamic_subclass_model_concurrent_runner import DynamicSubclassModelConcurrentRunner
from model_runner_client.grpc.generated.commons_pb2 import VariantType, Variant, Argument
from model_runner_client.utils.datatype_transformer import encode_data

class ModelOrchestrator:
    """Orchestrates model operations in the distributed cluster"""

    def __init__(self,
                 crunch_id: str = "crunch_name",
                 host: str = "localhost",
                 port: int = 9091,
                 timeout: int = 30):
        self.crunch_id = crunch_id
        self.host = host
        self.port = port
        self.timeout = timeout
        self.model_runner: Optional[DynamicSubclassModelConcurrentRunner] = None

    async def initialize_cluster(self):
        """Initialize connection to model cluster"""
        self.model_runner = DynamicSubclassModelConcurrentRunner(
            timeout=self.timeout,
            crunch_id=self.crunch_id,
            host=self.host,
            port=self.port,
            base_classname='crunchdao.crunch_example.iris.IrisModelBase'
        )

        await self.model_runner.init()

    async def send_training_data(self, train_data: pd.DataFrame):
        """Send training data to all models in the cluster"""
        payload: bytes = encode_data(VariantType.PARQUET, train_data)
        # Call train method on all models concurrently
        await self.model_runner.call(
            method_name='train',
            args=[
                Argument(position=1, data=Variant(type=VariantType.PARQUET, value=payload))
            ],
            kwargs=None
        )

    async def collect_predictions(self, predict_data: pd.DataFrame) -> Optional[Dict[str, np.ndarray]]:
        """Collect predictions from all models in the cluster"""

        # Call infer method on all models concurrently, encode as binary and send
        payload: bytes = encode_data(VariantType.PARQUET, predict_data)
        return await self.model_runner.call(
            method_name='infer',
            args=[
                Argument(position=1, data=Variant(type=VariantType.PARQUET, value=payload))
            ],
            kwargs=None
        )

3. Scoring & Rewards

Evaluate model performance and distribute prizes. Key Functions:
  • Score predictions against ground truth
  • Calculate multiple performance metrics (accuracy, precision, recall, F1)
  • Generate prize distributions based on rankings
  • Create leaderboards and save results
  • Handle tie-breaking and ranking logic

Main Coordinator

The main Coordinator class ties all three responsibilities together.

Running the Coordinator

Basic Usage

cd coordinator/
uv run python main.py

Output Files

The Coordinator generates output files to publish the prizes and the leaderboard:
  • prizes.json - Prize distribution results
  • leaderboard.csv - Final ranked leaderboard

Local Testing

Will help you get the complete infrastructure containerized environment with blockchain integration