Skip to main content

Three-Worker Architecture

The Condor Coordinator uses a three-worker architecture that separates concerns for reliability, performance, and maintainability: Each worker is an independent process with specific responsibilities.

Worker 1: Predict Worker

The most critical component - must never be blocked or fail.

Responsibilities

  • Connect to Model Orchestrator via ModelRunner client
  • Maintain synchronized list of active models
  • Fetch or receive market data in real-time
  • Send tick events to all models with latest data
  • Request predictions from all models
  • Store predictions with metadata for later scoring

Key Characteristics

Must be:
  • Fast (no blocking operations)
  • Reliable (graceful error handling)
  • Asynchronous (handle many models concurrently)
Must not:
  • Perform heavy computations
  • Block on slow operations
  • Fail due to individual model errors

Implementation Example

# predict_worker.py
import asyncio
from model_runner_client import ModelRunnerClient, Argument, Variant, VariantType
from repositories import PredictionRepository, ModelRepository

class PredictWorker:
    """Real-time model orchestration and prediction collection."""

    def __init__(
        self,
        prediction_repo: PredictionRepository,
        model_repo: ModelRepository,
        crunch_id: str = "condor"
    ):
        self.prediction_repo = prediction_repo
        self.model_repo = model_repo

        # Initialize ModelRunner client
        self.runner = ModelRunnerClient(
            timeout=50,  # Max seconds to wait for all models
            crunch_id=crunch_id,
            host="model-orchestrator",  # Or localhost for local dev
            port=9091,
            max_consecutive_failures=10,
            max_consecutive_timeouts=10
        )

    async def start(self):
        """Initialize and start the predict worker."""
        # Connect to orchestrator
        await self.runner.init()

        # Start background sync of model list
        await self.runner.sync()

        # Start main prediction loop
        await self.run_prediction_loop()

    async def run_prediction_loop(self):
        """Main loop: fetch data, tick models, collect predictions."""
        while True:
            try:
                # 1. Fetch latest market data
                market_data = await self.fetch_market_data()

                # 2. Send tick to all models
                await self.send_tick(market_data)

                # 3. Request predictions for active assets/horizons
                predictions = await self.collect_predictions(
                    assets=["BTC-USD", "ETH-USD"],
                    horizons=[1, 4, 24]
                )

                # 4. Store predictions
                await self.prediction_repo.save_all(predictions)

                # 5. Update model metadata if new models joined
                await self.update_models()

                # Wait before next cycle
                await asyncio.sleep(60)  # 1 minute cycle

            except Exception as e:
                print(f"Error in prediction loop: {e}")
                await asyncio.sleep(5)  # Brief pause before retry

    async def send_tick(self, market_data: dict):
        """Send tick event to all models."""
        await self.runner.call(
            method="tick",
            arguments=[
                Argument(
                    position=1,
                    data=Variant(
                        type=VariantType.JSON,
                        value=encode_data(VariantType.JSON, market_data)
                    )
                )
            ]
        )

    async def collect_predictions(self, assets: list[str], horizons: list[int]):
        """Collect predictions from all models."""
        predictions = []
        step = self.get_current_step()

        for asset in assets:
            for horizon in horizons:
                # Call predict on all models
                results = await self.runner.call(
                    method="predict",
                    arguments=[
                        Argument(position=1, data=Variant(
                            type=VariantType.STRING,
                            value=encode_data(VariantType.STRING, asset)
                        )),
                        Argument(position=2, data=Variant(
                            type=VariantType.INT,
                            value=encode_data(VariantType.INT, horizon)
                        )),
                        Argument(position=3, data=Variant(
                            type=VariantType.INT,
                            value=encode_data(VariantType.INT, step)
                        )),
                    ]
                )

                # Process results
                for result in results:
                    prediction = self.create_prediction_entity(
                        result, asset, horizon, step
                    )
                    predictions.append(prediction)

        return predictions

    def create_prediction_entity(self, result, asset, horizon, step):
        """Transform ModelRunner result into Prediction entity."""
        return Prediction(
            id=generate_id(),
            model_id=result.model_id,
            asset=asset,
            horizon=horizon,
            step=step,
            status=result.status,  # SUCCESS, TIMEOUT, or FAILURE
            distribution=result.payload if result.status == "SUCCESS" else None,
            prediction_time_us=result.execution_time_us,
            created_at=datetime.utcnow(),
            resolvable_at=datetime.utcnow() + timedelta(hours=horizon)
        )

Data Flow

1. Fetch market data (from API, WebSocket, database)

2. Send tick(data) to all models

3. Models update internal state

4. Request predict(asset, horizon, step) from all models

5. Collect results (SUCCESS, TIMEOUT, or FAILURE)

6. Store predictions in database with metadata

Worker 2: Score Worker

CPU-intensive worker that computes scores and rankings.

Responsibilities

  • Fetch predictions that are ready to score
  • Retrieve realized market data for comparison
  • Compute distribution-based scores
  • Calculate rolling window metrics (24h, 72h, 7d)
  • Update model rankings and leaderboard
  • Handle score aggregation across horizons

Key Characteristics

Can be:
  • Slow (CPU intensive operations allowed)
  • Delayed (scoring 5-10 minutes later is fine)
  • Restarted (safe to stop and resume)
  • Re-run (idempotent scoring)
Benefits of isolation:
  • Doesn’t block prediction collection
  • Can run on more powerful machines
  • Easy to update scoring algorithms

Implementation Example

# score_worker.py
from repositories import PredictionRepository, ScoreRepository
from scoring import DistributionScorer

class ScoreWorker:
    """Batch scoring of predictions with rolling windows."""

    def __init__(
        self,
        prediction_repo: PredictionRepository,
        score_repo: ScoreRepository,
        scorer: DistributionScorer
    ):
        self.prediction_repo = prediction_repo
        self.score_repo = score_repo
        self.scorer = scorer

    async def run(self):
        """Main scoring loop."""
        while True:
            try:
                # 1. Find predictions ready to score
                ready_predictions = self.prediction_repo.fetch_ready_to_score(
                    limit=1000
                )

                if not ready_predictions:
                    await asyncio.sleep(60)
                    continue

                # 2. Fetch realized market data
                market_data = await self.fetch_realized_data(ready_predictions)

                # 3. Score each prediction
                for prediction in ready_predictions:
                    score = self.scorer.score_prediction(
                        prediction.distribution,
                        market_data[prediction.asset],
                        prediction.horizon
                    )

                    # Update prediction with score
                    prediction.score = score
                    self.prediction_repo.update(prediction)

                # 4. Update rolling window scores
                await self.update_rolling_scores()

                # 5. Update leaderboard
                await self.update_leaderboard()

                await asyncio.sleep(60)

            except Exception as e:
                print(f"Error in scoring: {e}")
                await asyncio.sleep(10)

    async def update_rolling_scores(self):
        """Calculate scores across different time windows."""
        windows = [
            {"name": "recent", "hours": 24},
            {"name": "steady", "hours": 72},
            {"name": "anchor", "hours": 168}  # 7 days
        ]

        for window in windows:
            cutoff = datetime.utcnow() - timedelta(hours=window["hours"])

            # Get all scores in window
            scores = self.score_repo.fetch_scores_since(cutoff)

            # Aggregate by model
            model_scores = self.aggregate_scores_by_model(scores)

            # Update model rankings
            for model_id, aggregate_score in model_scores.items():
                self.score_repo.update_model_score(
                    model_id,
                    window["name"],
                    aggregate_score
                )

Distribution Scoring Example

class DistributionScorer:
    """Score probability distributions using various metrics."""

    def score_prediction(
        self,
        predicted_dist: dict,
        realized_return: float,
        horizon: int
    ) -> float:
        """
        Score a distribution prediction.

        Uses Continuous Ranked Probability Score (CRPS) or similar.
        """
        # Find which bucket the realized return falls into
        buckets = sorted([float(k) for k in predicted_dist.keys()])

        # Calculate CRPS or custom metric
        score = self.calculate_crps(predicted_dist, realized_return, buckets)

        # Adjust for horizon
        adjusted_score = score * self.horizon_weight(horizon)

        return adjusted_score

    def calculate_crps(self, distribution, realized, buckets):
        """Continuous Ranked Probability Score."""
        # Simplified CRPS calculation
        # Lower is better - measures distance between distribution and outcome

        cumulative_prob = 0
        crps = 0

        for bucket in buckets:
            prob = distribution[str(bucket)]
            cumulative_prob += prob

            # Indicator: 1 if realized >= bucket, else 0
            indicator = 1 if realized >= bucket else 0

            # Accumulate squared difference
            crps += (cumulative_prob - indicator) ** 2

        return crps

Worker 3: Report Worker

FastAPI service exposing game state and metrics.

Responsibilities

  • Expose HTTP API for leaderboards
  • Provide per-model performance metrics
  • Serve aggregated statistics
  • Enable frontend visualization
  • Fully decentralized (no CrunchDAO storage)

Implementation Example

# report_worker.py
from fastapi import FastAPI, Query
from repositories import ModelRepository, ScoreRepository

app = FastAPI(title="Condor Report API")

@app.get("/leaderboard")
async def get_leaderboard(
    window: str = Query("recent", enum=["recent", "steady", "anchor"]),
    limit: int = 100
):
    """
    Get current leaderboard.

    Returns top models sorted by score for the specified window.
    """
    models = model_repo.fetch_all()

    # Sort by window score
    sorted_models = sorted(
        models.values(),
        key=lambda m: getattr(m.overall_score, f"{window}_score", 0),
        reverse=True
    )[:limit]

    return {
        "window": window,
        "count": len(sorted_models),
        "models": [
            {
                "rank": idx + 1,
                "model_id": model.id,
                "model_name": model.name,
                "cruncher_name": model.player.name,
                "score": getattr(model.overall_score, f"{window}_score", 0),
                "prediction_count": model.prediction_count
            }
            for idx, model in enumerate(sorted_models)
        ]
    }

@app.get("/model/{model_id}/metrics")
async def get_model_metrics(model_id: str):
    """Get detailed metrics for a specific model."""
    model = model_repo.fetch_by_id(model_id)
    scores = score_repo.fetch_model_scores(model_id, limit=1000)

    return {
        "model_id": model.id,
        "model_name": model.name,
        "overall_scores": {
            "recent_24h": model.overall_score.recent_score,
            "steady_72h": model.overall_score.steady_score,
            "anchor_7d": model.overall_score.anchor_score
        },
        "by_horizon": {
            "1h": calculate_horizon_metrics(scores, horizon=1),
            "4h": calculate_horizon_metrics(scores, horizon=4),
            "24h": calculate_horizon_metrics(scores, horizon=24)
        },
        "recent_predictions": len([s for s in scores if s.created_at > datetime.utcnow() - timedelta(days=1)])
    }

@app.get("/statistics")
async def get_game_statistics():
    """Get overall game statistics."""
    return {
        "total_models": model_repo.count(),
        "active_models": model_repo.count_active(),
        "total_predictions": prediction_repo.count(),
        "predictions_last_24h": prediction_repo.count_recent(hours=24)
    }

Entity & Repository Pattern

Clean separation between domain logic and infrastructure:

Entities

# entities/prediction.py
@dataclass
class Prediction:
    """Domain entity for a prediction."""
    id: str
    model_id: str
    asset: str
    horizon: int
    step: int
    status: PredictionStatus  # SUCCESS, TIMEOUT, FAILURE
    distribution: dict | None
    score: float | None
    created_at: datetime
    resolvable_at: datetime
    prediction_time_us: int

# entities/model.py
@dataclass
class Model:
    """Domain entity for a model."""
    id: str
    name: str
    player: Player
    deployment_id: str
    overall_score: ModelScore | None
    created_at: datetime
    updated_at: datetime

Repositories

# services/interfaces/prediction_repository.py
from abc import ABC, abstractmethod

class PredictionRepository(ABC):
    """Interface for prediction storage."""

    @abstractmethod
    async def save_all(self, predictions: Iterable[Prediction]):
        """Store multiple predictions."""
        ...

    @abstractmethod
    def fetch_ready_to_score(self, limit: int = 1000) -> list[Prediction]:
        """Fetch predictions where resolvable_at <= now."""
        ...

    @abstractmethod
    def update(self, prediction: Prediction):
        """Update existing prediction with score."""
        ...

Implementation

# infrastructure/db/prediction_repository.py
class DbPredictionRepository(PredictionRepository):
    """PostgreSQL implementation."""

    def __init__(self, db_session):
        self.db = db_session

    async def save_all(self, predictions):
        """Batch insert predictions."""
        # SQL INSERT with proper error handling
        ...

    def fetch_ready_to_score(self, limit=1000):
        """Query predictions ready for scoring."""
        # SQL SELECT with filtering
        ...

Next Steps

Now let’s see how to run this entire stack locally for development and testing.

Local Development Setup

Learn how to run the complete Condor stack locally with Docker.