Grand Diomande Research · Full HTML Reader

Machine Learning Generation Systems

1. [Overview](#overview) 2. [CC-MotionGen](#cc-motiongen) 3. [RAG++ Policy](#rag-policy) 4. [MotionPhrase System](#motionphrase-system) 5. [Training Pipeline](#training-pipeline) 6. [Inference API](#inference-api) 7. [Evaluation Metrics](#evaluation-metrics)

Embodied Trajectory Systems architecture technical paper candidate score 54 .md

Full Public Reader

Machine Learning Generation Systems

CC-MotionGen + RAG++ Documentation

Version: 2.0.0
Last Updated: December 26, 2024

---

Table of Contents

1. [Overview](#overview)
2. [CC-MotionGen](#cc-motiongen)
3. [RAG++ Policy](#rag-policy)
4. [MotionPhrase System](#motionphrase-system)
5. [Training Pipeline](#training-pipeline)
6. [Inference API](#inference-api)
7. [Evaluation Metrics](#evaluation-metrics)

---

1. Overview

The ML Generation Systems provide music-conditioned motion generation through diffusion models enhanced with retrieval-augmented priors.

┌─────────────────────────────────────────────────────────────────────────────┐
│                     ML GENERATION ARCHITECTURE                               │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                              │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │                         INPUT CONDITIONING                             │ │
│  │                                                                        │ │
│  │  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────────┐   │ │
│  │  │   Music Audio   │  │   Text Prompt   │  │   Motion Seed       │   │ │
│  │  │   (WAV/MP3)     │  │   (Natural      │  │   (Optional         │   │ │
│  │  │                 │  │    Language)    │  │    Starting Pose)   │   │ │
│  │  └────────┬────────┘  └────────┬────────┘  └──────────┬──────────┘   │ │
│  │           │                    │                      │               │ │
│  │           ▼                    ▼                      │               │ │
│  │  ┌─────────────────┐  ┌─────────────────┐            │               │ │
│  │  │   Jukebox       │  │   CLIP/T5       │            │               │ │
│  │  │   Encoder       │  │   Encoder       │            │               │ │
│  │  │   (4800-dim)    │  │   (768-dim)     │            │               │ │
│  │  └────────┬────────┘  └────────┬────────┘            │               │ │
│  │           │                    │                      │               │ │
│  │           └────────────┬───────┘                      │               │ │
│  │                        ▼                              │               │ │
│  │  ┌─────────────────────────────────────────────────┐ │               │ │
│  │  │         Conditioning Vector (5568-dim)          │ │               │ │
│  │  └─────────────────────────┬───────────────────────┘ │               │ │
│  └────────────────────────────┼─────────────────────────┼───────────────┘ │
│                               │                         │                  │
│                               ▼                         │                  │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │                         RAG++ RETRIEVAL                                │ │
│  │                                                                        │ │
│  │  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────────┐   │ │
│  │  │   FAISS Index   │  │  Cross-Encoder  │  │   Prior Builder     │   │ │
│  │  │   (Dense        │──▶│  Reranker       │──▶│                     │   │ │
│  │  │    Retrieval)   │  │                 │  │   Top-K Blend       │   │ │
│  │  └─────────────────┘  └─────────────────┘  └──────────┬──────────┘   │ │
│  │                                                       │               │ │
│  │              ┌────────────────────────────────────────┘               │ │
│  │              │                                                        │ │
│  │              ▼                                                        │ │
│  │  ┌─────────────────────────────────────────────────────────────────┐ │ │
│  │  │                 Motion Prior (T×J×3)                             │ │ │
│  │  └──────────────────────────────┬──────────────────────────────────┘ │ │
│  └─────────────────────────────────┼─────────────────────────────────────┘ │
│                                    │                                        │
│                                    ▼                                        │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │                     CC-MotionGen DIFFUSION                             │ │
│  │                                                                        │ │
│  │  ┌─────────────────────────────────────────────────────────────────┐ │ │
│  │  │                        U-Net Architecture                        │ │ │
│  │  │                                                                  │ │ │
│  │  │   Input: x_t (noisy motion)                                     │ │ │
│  │  │          t (timestep embedding)                                  │ │ │
│  │  │          c (conditioning)                                        │ │ │
│  │  │          p (motion prior)                                        │ │ │
│  │  │                                                                  │ │ │
│  │  │   ┌─────────────────────────────────────────────────────────┐   │ │ │
│  │  │   │  Down Blocks    Middle Block    Up Blocks               │   │ │ │
│  │  │   │  ┌─────┐       ┌─────────┐     ┌─────┐                 │   │ │ │
│  │  │   │  │Conv │──┬───▶│  Self   │────▶│Conv │                 │   │ │ │
│  │  │   │  │+Attn│  │    │  +Cross │     │+Attn│                 │   │ │ │
│  │  │   │  └─────┘  │    │  Attn   │     └─────┘                 │   │ │ │
│  │  │   │           │    └─────────┘         ▲                   │   │ │ │
│  │  │   │           └────────────────────────┘ (skip connections)│   │ │ │
│  │  │   └─────────────────────────────────────────────────────────┘   │ │ │
│  │  │                                                                  │ │ │
│  │  │   Output: ε_θ (predicted noise)                                 │ │ │
│  │  │                                                                  │ │ │
│  │  └──────────────────────────────┬──────────────────────────────────┘ │ │
│  │                                 │                                     │ │
│  │                                 ▼                                     │ │
│  │  ┌─────────────────────────────────────────────────────────────────┐ │ │
│  │  │               DDPM/DDIM Sampling Loop                            │ │ │
│  │  │                                                                  │ │ │
│  │  │   for t = T, T-1, ..., 1:                                       │ │ │
│  │  │       ε = UNet(x_t, t, c, p)                                    │ │ │
│  │  │       x_{t-1} = denoise(x_t, ε, t)                              │ │ │
│  │  │                                                                  │ │ │
│  │  └──────────────────────────────┬──────────────────────────────────┘ │ │
│  └─────────────────────────────────┼─────────────────────────────────────┘ │
│                                    │                                        │
│                                    ▼                                        │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │                       POST-PROCESSING                                  │ │
│  │                                                                        │ │
│  │  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────────┐   │ │
│  │  │   Motion        │  │   Temporal      │  │   Foot Contact      │   │ │
│  │  │   Decoder       │──▶│   Smoothing     │──▶│   Correction        │   │ │
│  │  │                 │  │   (Gaussian)    │  │   (IK)              │   │ │
│  │  └─────────────────┘  └─────────────────┘  └──────────┬──────────┘   │ │
│  └───────────────────────────────────────────────────────┼───────────────┘ │
│                                                          │                  │
│                                                          ▼                  │
│  ┌───────────────────────────────────────────────────────────────────────┐ │
│  │                         OUTPUT                                         │ │
│  │                                                                        │ │
│  │   Motion Sequence: (T=196, J=22, D=3)                                │ │
│  │   - 196 frames (~6.5 seconds at 30fps)                               │ │
│  │   - 22 joints (SMPL skeleton)                                         │ │
│  │   - 3D positions per joint                                            │ │
│  │                                                                        │ │
│  └───────────────────────────────────────────────────────────────────────┘ │
│                                                                              │
└─────────────────────────────────────────────────────────────────────────────┘

---

2. CC-MotionGen

2.1 Model Architecture

Location: `core/cc-ml/cc_motiongen/`

python
# core/cc-ml/cc_motiongen/model/diffusion.py

class MotionDiffusion(nn.Module):
    """
    Denoising Diffusion Probabilistic Model for motion generation.
    """

    def __init__(self, config: DiffusionConfig):
        super().__init__()

        self.config = config
        self.num_timesteps = config.num_timesteps

        # U-Net denoiser
        self.unet = MotionUNet(
            in_channels=config.motion_dim,
            out_channels=config.motion_dim,
            model_channels=config.model_channels,
            num_res_blocks=config.num_res_blocks,
            attention_resolutions=config.attention_resolutions,
            dropout=config.dropout,
            channel_mult=config.channel_mult,
            num_heads=config.num_heads,
            context_dim=config.context_dim
        )

        # Noise schedule
        self.betas = self._cosine_beta_schedule(
            config.num_timesteps,
            config.beta_start,
            config.beta_end
        )

        # Precompute diffusion parameters
        self.alphas = 1.0 - self.betas
        self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
        self.sqrt_alphas_cumprod = torch.sqrt(self.alphas_cumprod)
        self.sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - self.alphas_cumprod)

    def forward(
        self,
        x: torch.Tensor,          # (B, T, J, 3) motion
        t: torch.Tensor,          # (B,) timesteps
        context: torch.Tensor,    # (B, D) conditioning
        prior: torch.Tensor = None # (B, T, J, 3) motion prior
    ) -> torch.Tensor:
        """
        Predict noise given noisy input and conditioning.
        """
        # Flatten motion: (B, T, J, 3) -> (B, T, J*3)
        B, T, J, _ = x.shape
        x_flat = x.reshape(B, T, J * 3)

        # Add prior as additional context if provided
        if prior is not None:
            prior_flat = prior.reshape(B, T, J * 3)
            x_flat = torch.cat([x_flat, prior_flat], dim=-1)

        # Get timestep embeddings
        t_emb = self._timestep_embedding(t)

        # U-Net forward
        noise_pred = self.unet(x_flat, t_emb, context)

        # Reshape back: (B, T, J*3) -> (B, T, J, 3)
        return noise_pred.reshape(B, T, J, 3)

    @torch.no_grad()
    def sample(
        self,
        context: torch.Tensor,
        prior: torch.Tensor = None,
        num_steps: int = 50,
        guidance_scale: float = 7.5
    ) -> torch.Tensor:
        """
        Generate motion using DDIM sampling.
        """
        device = context.device
        B = context.shape[0]

        # Initialize with noise
        x = torch.randn(
            B,
            self.config.seq_length,
            self.config.num_joints,
            3,
            device=device
        )

        # DDIM timesteps
        timesteps = torch.linspace(
            self.num_timesteps - 1,
            0,
            num_steps,
            dtype=torch.long,
            device=device
        )

        for i, t in enumerate(timesteps):
            t_batch = t.expand(B)

            # Classifier-free guidance
            if guidance_scale > 1.0:
                # Conditional prediction
                noise_cond = self(x, t_batch, context, prior)

                # Unconditional prediction
                noise_uncond = self(x, t_batch, torch.zeros_like(context), None)

                # Guided prediction
                noise = noise_uncond + guidance_scale * (noise_cond - noise_uncond)
            else:
                noise = self(x, t_batch, context, prior)

            # DDIM update step
            x = self._ddim_step(x, noise, t, timesteps[i + 1] if i < len(timesteps) - 1 else 0)

        return x

2.2 U-Net Architecture

python
# core/cc-ml/cc_motiongen/model/unet.py

class MotionUNet(nn.Module):
    """
    U-Net architecture for motion denoising.
    """

    def __init__(
        self,
        in_channels: int,
        out_channels: int,
        model_channels: int = 256,
        num_res_blocks: int = 2,
        attention_resolutions: tuple = (4, 2, 1),
        dropout: float = 0.1,
        channel_mult: tuple = (1, 2, 4, 8),
        num_heads: int = 8,
        context_dim: int = 768
    ):
        super().__init__()

        self.in_channels = in_channels
        self.model_channels = model_channels

        # Time embedding
        self.time_embed = nn.Sequential(
            nn.Linear(model_channels, model_channels * 4),
            nn.SiLU(),
            nn.Linear(model_channels * 4, model_channels * 4)
        )

        # Input projection
        self.input_proj = nn.Linear(in_channels, model_channels)

        # Down blocks
        self.down_blocks = nn.ModuleList()
        ch = model_channels
        for level, mult in enumerate(channel_mult):
            out_ch = model_channels * mult

            for _ in range(num_res_blocks):
                self.down_blocks.append(
                    ResBlock(ch, out_ch, dropout, time_channels=model_channels * 4)
                )
                ch = out_ch

                if level in attention_resolutions:
                    self.down_blocks.append(
                        CrossAttentionBlock(ch, context_dim, num_heads)
                    )

            if level < len(channel_mult) - 1:
                self.down_blocks.append(Downsample(ch))

        # Middle blocks
        self.middle_block = nn.Sequential(
            ResBlock(ch, ch, dropout, time_channels=model_channels * 4),
            CrossAttentionBlock(ch, context_dim, num_heads),
            ResBlock(ch, ch, dropout, time_channels=model_channels * 4)
        )

        # Up blocks (with skip connections)
        self.up_blocks = nn.ModuleList()
        for level, mult in enumerate(reversed(channel_mult)):
            out_ch = model_channels * mult

            for i in range(num_res_blocks + 1):
                skip_ch = ch if i == 0 else 0
                self.up_blocks.append(
                    ResBlock(ch + skip_ch, out_ch, dropout, time_channels=model_channels * 4)
                )
                ch = out_ch

                if level in attention_resolutions:
                    self.up_blocks.append(
                        CrossAttentionBlock(ch, context_dim, num_heads)
                    )

            if level < len(channel_mult) - 1:
                self.up_blocks.append(Upsample(ch))

        # Output projection
        self.output_proj = nn.Sequential(
            nn.GroupNorm(32, ch),
            nn.SiLU(),
            nn.Linear(ch, out_channels)
        )

    def forward(
        self,
        x: torch.Tensor,      # (B, T, D)
        t_emb: torch.Tensor,  # (B, D)
        context: torch.Tensor # (B, L, C)
    ) -> torch.Tensor:
        # Project input
        h = self.input_proj(x)

        # Time embedding
        t_emb = self.time_embed(t_emb)

        # Encoder with skip connections
        skips = []
        for block in self.down_blocks:
            if isinstance(block, (ResBlock, CrossAttentionBlock)):
                h = block(h, t_emb, context)
            else:
                h = block(h)
            skips.append(h)

        # Middle
        h = self.middle_block[0](h, t_emb, context)
        h = self.middle_block[1](h, t_emb, context)
        h = self.middle_block[2](h, t_emb, context)

        # Decoder with skip connections
        for block in self.up_blocks:
            if isinstance(block, (ResBlock, CrossAttentionBlock)):
                if skips:
                    h = torch.cat([h, skips.pop()], dim=-1)
                h = block(h, t_emb, context)
            else:
                h = block(h)

        # Output
        return self.output_proj(h)

2.3 Motion Decoder

python
# core/cc-ml/cc_motiongen/model/decoder.py

class MotionDecoder(nn.Module):
    """
    Decodes latent motion representation to joint positions.
    """

    def __init__(
        self,
        latent_dim: int = 512,
        hidden_dim: int = 1024,
        num_joints: int = 22,
        num_layers: int = 4
    ):
        super().__init__()

        self.num_joints = num_joints

        # MLP decoder
        layers = []
        in_dim = latent_dim

        for i in range(num_layers - 1):
            out_dim = hidden_dim
            layers.extend([
                nn.Linear(in_dim, out_dim),
                nn.LayerNorm(out_dim),
                nn.GELU(),
                nn.Dropout(0.1)
            ])
            in_dim = out_dim

        layers.append(nn.Linear(in_dim, num_joints * 3))

        self.decoder = nn.Sequential(*layers)

        # FK layer for converting local to global positions
        self.forward_kinematics = ForwardKinematics(SKELETON_PARENTS)

    def forward(self, z: torch.Tensor) -> torch.Tensor:
        """
        Decode latent to joint positions.

        Args:
            z: (B, T, latent_dim) latent motion

        Returns:
            positions: (B, T, J, 3) joint positions
        """
        B, T, _ = z.shape

        # Decode to local positions
        local = self.decoder(z)  # (B, T, J*3)
        local = local.reshape(B, T, self.num_joints, 3)

        # Apply forward kinematics
        global_pos = self.forward_kinematics(local)

        return global_pos


class ForwardKinematics(nn.Module):
    """
    Convert local joint positions/rotations to global.
    """

    def __init__(self, parents: dict):
        super().__init__()
        self.parents = parents
        self.joint_order = self._topological_sort()

    def forward(self, local: torch.Tensor) -> torch.Tensor:
        """
        Apply forward kinematics.

        Args:
            local: (B, T, J, 3) local positions

        Returns:
            global_pos: (B, T, J, 3) global positions
        """
        B, T, J, _ = local.shape
        global_pos = torch.zeros_like(local)

        for joint_idx, joint_name in enumerate(self.joint_order):
            parent_name = self.parents.get(joint_name)

            if parent_name is None:
                # Root joint
                global_pos[:, :, joint_idx] = local[:, :, joint_idx]
            else:
                parent_idx = self.joint_order.index(parent_name)
                global_pos[:, :, joint_idx] = (
                    global_pos[:, :, parent_idx] + local[:, :, joint_idx]
                )

        return global_pos

2.4 Configuration

python
# core/cc-ml/cc_motiongen/config.py

from dataclasses import dataclass

@dataclass
class DiffusionConfig:
    """Configuration for CC-MotionGen model."""

    # Motion dimensions
    seq_length: int = 196           # ~6.5 seconds at 30fps
    num_joints: int = 22            # SMPL skeleton
    motion_dim: int = 66            # 22 joints × 3 coordinates

    # Diffusion parameters
    num_timesteps: int = 1000       # Training timesteps
    beta_start: float = 0.0001
    beta_end: float = 0.02

    # U-Net architecture
    model_channels: int = 256
    num_res_blocks: int = 2
    attention_resolutions: tuple = (4, 2, 1)
    dropout: float = 0.1
    channel_mult: tuple = (1, 2, 4, 8)
    num_heads: int = 8

    # Conditioning
    context_dim: int = 5568         # Jukebox (4800) + CLIP (768)

    # Training
    batch_size: int = 64
    learning_rate: float = 1e-4
    weight_decay: float = 0.01
    num_epochs: int = 500
    warmup_steps: int = 1000

    # Inference
    inference_steps: int = 50       # DDIM steps
    guidance_scale: float = 7.5

@dataclass
class TrainingConfig:
    """Training configuration."""

    # Data
    train_data_path: str = "gs://comp-core-data/motion/train"
    val_data_path: str = "gs://comp-core-data/motion/val"

    # Checkpoints
    checkpoint_dir: str = "gs://comp-core-models/cc_motiongen"
    save_every: int = 1000
    eval_every: int = 500

    # Hardware
    num_gpus: int = 4
    fp16: bool = True
    gradient_accumulation: int = 4

    # Logging
    wandb_project: str = "cc-motiongen"
    log_every: int = 100

---

3. RAG++ Policy

3.1 Overview

RAG++ (Retrieval-Augmented Generation++) enhances motion generation by retrieving relevant motion phrases from a curated database.

Location: `core/cc-core/cc_core/policy/rag_motionphrase/`

3.2 Architecture

python
# core/cc-core/cc_core/policy/rag_motionphrase/service.py

from typing import List, Optional
from dataclasses import dataclass

@dataclass
class RetrievalResult:
    phrase_id: str
    motion_data: np.ndarray      # (T, J, 3)
    similarity_score: float
    rerank_score: float
    metadata: dict

class RAGPlusPlusService:
    """
    Retrieval-Augmented Generation service for motion priors.
    """

    def __init__(self, config: RAGConfig):
        self.config = config

        # Components
        self.retriever = DenseRetriever(config.index_path)
        self.reranker = CrossEncoderReranker(config.reranker_model)
        self.prior_builder = MotionPriorBuilder(config.blend_config)
        self.cache = LRUCache(maxsize=config.cache_size)

    async def get_prior(
        self,
        query_embedding: np.ndarray,
        text_query: Optional[str] = None,
        top_k: int = 10,
        rerank_top_n: int = 5
    ) -> np.ndarray:
        """
        Retrieve and blend motion phrases into a prior.

        Args:
            query_embedding: Conditioning vector from music/text encoders
            text_query: Optional text description for reranking
            top_k: Number of candidates to retrieve
            rerank_top_n: Number of top candidates after reranking

        Returns:
            Motion prior tensor (T, J, 3)
        """
        # Check cache
        cache_key = self._compute_cache_key(query_embedding)
        if cache_key in self.cache:
            return self.cache[cache_key]

        # Stage 1: Dense retrieval
        candidates = await self.retriever.search(
            query_embedding,
            k=top_k
        )

        # Stage 2: Cross-encoder reranking
        if text_query and len(candidates) > rerank_top_n:
            candidates = await self.reranker.rerank(
                query=text_query,
                candidates=candidates,
                top_n=rerank_top_n
            )

        # Stage 3: Build motion prior
        prior = self.prior_builder.build(candidates)

        # Cache result
        self.cache[cache_key] = prior

        return prior

3.3 Dense Retriever

python
# core/cc-core/cc_core/policy/rag_motionphrase/retriever.py

import faiss
import numpy as np

class DenseRetriever:
    """
    FAISS-based dense retrieval for motion phrases.
    """

    def __init__(self, index_path: str):
        self.index = faiss.read_index(index_path)
        self.metadata = self._load_metadata(index_path)

    async def search(
        self,
        query: np.ndarray,
        k: int = 10
    ) -> List[RetrievalResult]:
        """
        Search for similar motion phrases.
        """
        # Normalize query
        query = query / np.linalg.norm(query)
        query = query.reshape(1, -1).astype('float32')

        # FAISS search
        distances, indices = self.index.search(query, k)

        # Build results
        results = []
        for i, (dist, idx) in enumerate(zip(distances[0], indices[0])):
            if idx == -1:
                continue

            meta = self.metadata[idx]
            results.append(RetrievalResult(
                phrase_id=meta['id'],
                motion_data=self._load_motion(meta['path']),
                similarity_score=1.0 - dist,  # Convert distance to similarity
                rerank_score=0.0,
                metadata=meta
            ))

        return results

    def _load_motion(self, path: str) -> np.ndarray:
        """Load motion data from storage."""
        return np.load(path)

3.4 Cross-Encoder Reranker

python
# core/cc-core/cc_core/policy/rag_motionphrase/reranker.py

from transformers import AutoModelForSequenceClassification, AutoTokenizer

class CrossEncoderReranker:
    """
    Cross-encoder model for reranking retrieved motion phrases.
    """

    def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
        self.model.eval()

    async def rerank(
        self,
        query: str,
        candidates: List[RetrievalResult],
        top_n: int = 5
    ) -> List[RetrievalResult]:
        """
        Rerank candidates using cross-encoder.
        """
        if not candidates:
            return []

        # Prepare inputs
        pairs = [
            (query, self._motion_to_text(c.metadata))
            for c in candidates
        ]

        # Tokenize
        inputs = self.tokenizer(
            pairs,
            padding=True,
            truncation=True,
            return_tensors="pt"
        )

        # Score
        with torch.no_grad():
            scores = self.model(**inputs).logits.squeeze(-1)

        # Update rerank scores and sort
        for candidate, score in zip(candidates, scores):
            candidate.rerank_score = score.item()

        candidates.sort(key=lambda x: x.rerank_score, reverse=True)

        return candidates[:top_n]

    def _motion_to_text(self, metadata: dict) -> str:
        """Convert motion metadata to text description."""
        return f"{metadata.get('genre', 'dance')} {metadata.get('style', '')} {metadata.get('energy', 'medium')} energy"

3.5 Prior Builder

python
# core/cc-core/cc_core/policy/rag_motionphrase/prior_builder.py

class MotionPriorBuilder:
    """
    Builds motion priors from retrieved phrases.
    """

    def __init__(self, config: BlendConfig):
        self.config = config

    def build(self, candidates: List[RetrievalResult]) -> np.ndarray:
        """
        Build weighted blend of motion phrases.
        """
        if not candidates:
            return np.zeros((196, 22, 3))  # Default zero prior

        # Compute weights from scores
        scores = np.array([
            c.rerank_score if c.rerank_score > 0 else c.similarity_score
            for c in candidates
        ])
        weights = self._softmax(scores / self.config.temperature)

        # Align temporal lengths
        target_length = self.config.sequence_length
        aligned_motions = [
            self._temporal_align(c.motion_data, target_length)
            for c in candidates
        ]

        # Weighted blend
        prior = np.zeros((target_length, 22, 3))
        for motion, weight in zip(aligned_motions, weights):
            prior += weight * motion

        # Apply temporal smoothing
        prior = self._gaussian_smooth(prior, sigma=self.config.smooth_sigma)

        return prior

    def _temporal_align(self, motion: np.ndarray, target_length: int) -> np.ndarray:
        """Resample motion to target length."""
        current_length = motion.shape[0]

        if current_length == target_length:
            return motion

        # Linear interpolation
        indices = np.linspace(0, current_length - 1, target_length)
        aligned = np.zeros((target_length, motion.shape[1], motion.shape[2]))

        for i, idx in enumerate(indices):
            low = int(idx)
            high = min(low + 1, current_length - 1)
            t = idx - low
            aligned[i] = (1 - t) * motion[low] + t * motion[high]

        return aligned

    def _softmax(self, x: np.ndarray) -> np.ndarray:
        """Compute softmax."""
        exp_x = np.exp(x - np.max(x))
        return exp_x / exp_x.sum()

    def _gaussian_smooth(self, motion: np.ndarray, sigma: float) -> np.ndarray:
        """Apply Gaussian smoothing along time axis."""
        from scipy.ndimage import gaussian_filter1d
        return gaussian_filter1d(motion, sigma, axis=0)

---

4. MotionPhrase System

4.1 Overview

MotionPhrase is the data curation and indexing system for RAG++.

Location: `core/cc-ml/motionphrase/`

4.2 Phrase Extraction

python
# core/cc-ml/motionphrase/extract_phrases.py

class PhraseExtractor:
    """
    Extract motion phrases from full sequences.
    """

    def __init__(self, config: ExtractionConfig):
        self.config = config
        self.beat_detector = BeatDetector()
        self.phrase_detector = PhraseDetector()

    def extract(self, motion: np.ndarray, audio: np.ndarray) -> List[MotionPhrase]:
        """
        Extract phrases aligned to music structure.
        """
        # Detect beats
        beats = self.beat_detector.detect(audio, sr=self.config.sample_rate)

        # Detect phrase boundaries (usually 4 or 8 bars)
        boundaries = self.phrase_detector.detect(audio, beats)

        # Extract motion segments
        phrases = []
        for start_beat, end_beat in boundaries:
            start_frame = self._beat_to_frame(start_beat)
            end_frame = self._beat_to_frame(end_beat)

            if end_frame - start_frame < self.config.min_phrase_length:
                continue

            phrase_motion = motion[start_frame:end_frame]

            # Compute features
            features = self._compute_features(phrase_motion, audio, start_beat, end_beat)

            phrases.append(MotionPhrase(
                motion=phrase_motion,
                start_beat=start_beat,
                end_beat=end_beat,
                features=features
            ))

        return phrases

    def _compute_features(
        self,
        motion: np.ndarray,
        audio: np.ndarray,
        start_beat: int,
        end_beat: int
    ) -> PhraseFeatures:
        """Compute phrase-level features for retrieval."""
        return PhraseFeatures(
            energy=self._compute_energy(motion),
            tempo=self._estimate_tempo(start_beat, end_beat),
            style=self._classify_style(motion),
            genre=self._classify_genre(audio)
        )

4.3 Index Building

python
# core/cc-ml/motionphrase/build_indexes.py

import faiss
from sentence_transformers import SentenceTransformer

class IndexBuilder:
    """
    Build FAISS indexes for motion phrase retrieval.
    """

    def __init__(self, config: IndexConfig):
        self.config = config
        self.encoder = SentenceTransformer(config.encoder_model)

    def build(self, phrases: List[MotionPhrase]) -> faiss.Index:
        """
        Build FAISS index from phrases.
        """
        # Compute embeddings
        embeddings = []
        for phrase in phrases:
            # Combine motion and metadata features
            emb = self._embed_phrase(phrase)
            embeddings.append(emb)

        embeddings = np.array(embeddings).astype('float32')

        # Normalize
        faiss.normalize_L2(embeddings)

        # Build index
        dimension = embeddings.shape[1]

        if self.config.index_type == 'flat':
            index = faiss.IndexFlatIP(dimension)
        elif self.config.index_type == 'ivf':
            quantizer = faiss.IndexFlatIP(dimension)
            index = faiss.IndexIVFFlat(
                quantizer,
                dimension,
                self.config.nlist,
                faiss.METRIC_INNER_PRODUCT
            )
            index.train(embeddings)
        else:
            raise ValueError(f"Unknown index type: {self.config.index_type}")

        index.add(embeddings)

        return index

    def _embed_phrase(self, phrase: MotionPhrase) -> np.ndarray:
        """Create embedding for a phrase."""
        # Text description from features
        text = f"{phrase.features.genre} {phrase.features.style} dance with {phrase.features.energy} energy at {phrase.features.tempo} bpm"

        # Encode text
        text_emb = self.encoder.encode(text)

        # Motion statistics embedding
        motion_stats = self._compute_motion_stats(phrase.motion)

        # Concatenate
        return np.concatenate([text_emb, motion_stats])

---

5. Training Pipeline

5.1 Training Script

python
# core/cc-ml/cc_motiongen/scripts/train.py

import torch
from torch.utils.data import DataLoader
from accelerate import Accelerator
import wandb

def train(config: TrainingConfig):
    """Main training loop."""

    # Initialize accelerator
    accelerator = Accelerator(
        mixed_precision='fp16' if config.fp16 else 'no',
        gradient_accumulation_steps=config.gradient_accumulation
    )

    # Model
    model = MotionDiffusion(config.model)

    # Data
    train_dataset = MotionDataset(config.train_data_path)
    train_loader = DataLoader(
        train_dataset,
        batch_size=config.batch_size,
        shuffle=True,
        num_workers=8
    )

    # Optimizer
    optimizer = torch.optim.AdamW(
        model.parameters(),
        lr=config.learning_rate,
        weight_decay=config.weight_decay
    )

    # Learning rate scheduler
    scheduler = get_cosine_schedule_with_warmup(
        optimizer,
        num_warmup_steps=config.warmup_steps,
        num_training_steps=len(train_loader) * config.num_epochs
    )

    # Prepare with accelerator
    model, optimizer, train_loader, scheduler = accelerator.prepare(
        model, optimizer, train_loader, scheduler
    )

    # Training loop
    global_step = 0
    for epoch in range(config.num_epochs):
        model.train()

        for batch in train_loader:
            with accelerator.accumulate(model):
                motion = batch['motion']
                context = batch['context']

                # Sample timesteps
                t = torch.randint(
                    0,
                    config.model.num_timesteps,
                    (motion.shape[0],),
                    device=motion.device
                )

                # Add noise
                noise = torch.randn_like(motion)
                noisy_motion = model.q_sample(motion, t, noise)

                # Predict noise
                noise_pred = model(noisy_motion, t, context)

                # Loss
                loss = F.mse_loss(noise_pred, noise)

                accelerator.backward(loss)
                optimizer.step()
                scheduler.step()
                optimizer.zero_grad()

            global_step += 1

            # Logging
            if global_step % config.log_every == 0:
                wandb.log({
                    'loss': loss.item(),
                    'lr': scheduler.get_last_lr()[0],
                    'epoch': epoch
                }, step=global_step)

            # Checkpointing
            if global_step % config.save_every == 0:
                accelerator.save_state(f"{config.checkpoint_dir}/step_{global_step}")

            # Evaluation
            if global_step % config.eval_every == 0:
                evaluate(model, config)

5.2 GCP Deployment

yaml
# core/cc-ml/cc_motiongen/cloudbuild-training.yaml

steps:
  # Build training image
  - name: 'gcr.io/cloud-builders/docker'
    args: ['build', '-t', 'gcr.io/$PROJECT_ID/cc-motiongen-train', '.']

  # Push image
  - name: 'gcr.io/cloud-builders/docker'
    args: ['push', 'gcr.io/$PROJECT_ID/cc-motiongen-train']

  # Submit Vertex AI training job
  - name: 'gcr.io/cloud-builders/gcloud'
    args:
      - 'ai'
      - 'custom-jobs'
      - 'create'
      - '--region=us-central1'
      - '--display-name=cc-motiongen-training'
      - '--worker-pool-spec=machine-type=n1-standard-8,accelerator-type=NVIDIA_TESLA_V100,accelerator-count=4,container-image-uri=gcr.io/$PROJECT_ID/cc-motiongen-train'

---

6. Inference API

6.1 REST API

python
# Inference endpoint

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class GenerationRequest(BaseModel):
    audio_url: str = None
    text_prompt: str = None
    duration: float = 6.5
    guidance_scale: float = 7.5
    use_rag: bool = True

class GenerationResponse(BaseModel):
    motion: list  # (T, J, 3) as nested list
    fps: int = 30
    num_frames: int
    num_joints: int = 22

@app.post("/generate", response_model=GenerationResponse)
async def generate_motion(request: GenerationRequest):
    """Generate motion from music/text conditioning."""

    # Encode conditioning
    if request.audio_url:
        audio = await load_audio(request.audio_url)
        context = music_encoder.encode(audio)
    elif request.text_prompt:
        context = text_encoder.encode(request.text_prompt)
    else:
        raise HTTPException(400, "Must provide audio_url or text_prompt")

    # Get RAG++ prior
    prior = None
    if request.use_rag:
        prior = await rag_service.get_prior(context, request.text_prompt)

    # Generate motion
    motion = model.sample(
        context=torch.tensor(context).unsqueeze(0),
        prior=torch.tensor(prior).unsqueeze(0) if prior is not None else None,
        guidance_scale=request.guidance_scale
    )

    # Post-process
    motion = postprocess(motion.squeeze(0).numpy())

    return GenerationResponse(
        motion=motion.tolist(),
        num_frames=motion.shape[0]
    )

6.2 Python Client

python
# Client usage

import requests

def generate_motion(
    audio_path: str = None,
    text: str = None,
    api_url: str = "http://localhost:8000"
) -> np.ndarray:
    """Generate motion using the inference API."""

    response = requests.post(
        f"{api_url}/generate",
        json={
            "audio_url": audio_path,
            "text_prompt": text,
            "use_rag": True
        }
    )

    response.raise_for_status()
    data = response.json()

    return np.array(data['motion'])

# Example usage
motion = generate_motion(text="energetic hip-hop dance with arm waves")
print(f"Generated {motion.shape[0]} frames of motion")

---

7. Evaluation Metrics

7.1 Quality Metrics

python
# core/cc-ml/cc_motiongen/evaluation/benchmarks.py

class MotionQualityMetrics:
    """Evaluation metrics for generated motion."""

    def __init__(self):
        self.fid_model = load_fid_model()

    def frechet_inception_distance(
        self,
        generated: np.ndarray,
        real: np.ndarray
    ) -> float:
        """Compute FID between generated and real motion distributions."""
        gen_features = self.fid_model.extract_features(generated)
        real_features = self.fid_model.extract_features(real)

        mu_gen, sigma_gen = gen_features.mean(0), np.cov(gen_features, rowvar=False)
        mu_real, sigma_real = real_features.mean(0), np.cov(real_features, rowvar=False)

        return self._compute_fid(mu_gen, sigma_gen, mu_real, sigma_real)

    def diversity(self, motions: np.ndarray) -> float:
        """Compute diversity of generated motions."""
        n = len(motions)
        distances = []

        for i in range(n):
            for j in range(i + 1, n):
                dist = np.mean(np.abs(motions[i] - motions[j]))
                distances.append(dist)

        return np.mean(distances)

    def multimodality(
        self,
        motions_per_condition: List[np.ndarray]
    ) -> float:
        """Compute multimodality (diversity given same condition)."""
        multimod_scores = []

        for motions in motions_per_condition:
            if len(motions) < 2:
                continue
            multimod_scores.append(self.diversity(motions))

        return np.mean(multimod_scores)

    def foot_skating(self, motion: np.ndarray) -> float:
        """Compute foot skating metric (lower is better)."""
        foot_joints = [16, 17, 20, 21]  # Left/right foot indices

        velocities = np.diff(motion[:, foot_joints], axis=0)
        skating_frames = 0

        for t in range(len(velocities)):
            for joint in range(4):
                # Check if foot is on ground (y < threshold)
                if motion[t, foot_joints[joint], 1] < 0.05:
                    # Check if moving (skating)
                    vel_mag = np.linalg.norm(velocities[t, joint])
                    if vel_mag > 0.01:  # Velocity threshold
                        skating_frames += 1

        return skating_frames / (len(velocities) * 4)

    def beat_alignment(
        self,
        motion: np.ndarray,
        beats: np.ndarray,
        fps: int = 30
    ) -> float:
        """Compute alignment between motion peaks and music beats."""
        # Compute motion velocity
        velocity = np.linalg.norm(np.diff(motion, axis=0), axis=(1, 2))

        # Find motion peaks
        from scipy.signal import find_peaks
        motion_peaks, _ = find_peaks(velocity, height=np.mean(velocity))
        motion_peak_times = motion_peaks / fps

        # Compute alignment score
        alignments = []
        for beat_time in beats:
            if len(motion_peak_times) == 0:
                continue
            closest_peak = motion_peak_times[
                np.argmin(np.abs(motion_peak_times - beat_time))
            ]
            alignments.append(1.0 - min(abs(closest_peak - beat_time), 0.2) / 0.2)

        return np.mean(alignments) if alignments else 0.0

7.2 Evaluation Harness

python
# core/cc-ml/cc_motiongen/evaluation/harness.py

class EvaluationHarness:
    """Complete evaluation pipeline."""

    def __init__(self, model, test_dataset):
        self.model = model
        self.test_dataset = test_dataset
        self.metrics = MotionQualityMetrics()

    def run_evaluation(self, num_samples: int = 1000) -> dict:
        """Run full evaluation suite."""

        generated_motions = []
        real_motions = []

        for i, batch in enumerate(self.test_dataset):
            if i >= num_samples:
                break

            # Generate
            gen = self.model.sample(batch['context'])
            generated_motions.append(gen)
            real_motions.append(batch['motion'])

        generated = np.stack(generated_motions)
        real = np.stack(real_motions)

        results = {
            'fid': self.metrics.frechet_inception_distance(generated, real),
            'diversity': self.metrics.diversity(generated),
            'multimodality': self.metrics.multimodality(generated),
            'foot_skating': np.mean([
                self.metrics.foot_skating(m) for m in generated
            ]),
            'beat_alignment': np.mean([
                self.metrics.beat_alignment(g, b['beats'])
                for g, b in zip(generated, self.test_dataset)
            ])
        }

        return results

---

Document Version: 2.0.0
Generated: December 26, 2024

Promotion Decision

Promote into a technical note or architecture paper with implementation anchors.

Source Anchor

projects/Documentation/01-architecture/systems/ML_GENERATION_SYSTEMS.md

Detected Structure

Method · Evaluation · Code Anchors · Architecture