Machine Learning Generation Systems
1. [Overview](#overview) 2. [CC-MotionGen](#cc-motiongen) 3. [RAG++ Policy](#rag-policy) 4. [MotionPhrase System](#motionphrase-system) 5. [Training Pipeline](#training-pipeline) 6. [Inference API](#inference-api) 7. [Evaluation Metrics](#evaluation-metrics)
Full Public Reader
Machine Learning Generation Systems
CC-MotionGen + RAG++ Documentation
Version: 2.0.0
Last Updated: December 26, 2024
---
Table of Contents
1. [Overview](#overview)
2. [CC-MotionGen](#cc-motiongen)
3. [RAG++ Policy](#rag-policy)
4. [MotionPhrase System](#motionphrase-system)
5. [Training Pipeline](#training-pipeline)
6. [Inference API](#inference-api)
7. [Evaluation Metrics](#evaluation-metrics)
---
1. Overview
The ML Generation Systems provide music-conditioned motion generation through diffusion models enhanced with retrieval-augmented priors.
┌─────────────────────────────────────────────────────────────────────────────┐
│ ML GENERATION ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ INPUT CONDITIONING │ │
│ │ │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────┐ │ │
│ │ │ Music Audio │ │ Text Prompt │ │ Motion Seed │ │ │
│ │ │ (WAV/MP3) │ │ (Natural │ │ (Optional │ │ │
│ │ │ │ │ Language) │ │ Starting Pose) │ │ │
│ │ └────────┬────────┘ └────────┬────────┘ └──────────┬──────────┘ │ │
│ │ │ │ │ │ │
│ │ ▼ ▼ │ │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │
│ │ │ Jukebox │ │ CLIP/T5 │ │ │ │
│ │ │ Encoder │ │ Encoder │ │ │ │
│ │ │ (4800-dim) │ │ (768-dim) │ │ │ │
│ │ └────────┬────────┘ └────────┬────────┘ │ │ │
│ │ │ │ │ │ │
│ │ └────────────┬───────┘ │ │ │
│ │ ▼ │ │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │ │
│ │ │ Conditioning Vector (5568-dim) │ │ │ │
│ │ └─────────────────────────┬───────────────────────┘ │ │ │
│ └────────────────────────────┼─────────────────────────┼───────────────┘ │
│ │ │ │
│ ▼ │ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ RAG++ RETRIEVAL │ │
│ │ │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────┐ │ │
│ │ │ FAISS Index │ │ Cross-Encoder │ │ Prior Builder │ │ │
│ │ │ (Dense │──▶│ Reranker │──▶│ │ │ │
│ │ │ Retrieval) │ │ │ │ Top-K Blend │ │ │
│ │ └─────────────────┘ └─────────────────┘ └──────────┬──────────┘ │ │
│ │ │ │ │
│ │ ┌────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ Motion Prior (T×J×3) │ │ │
│ │ └──────────────────────────────┬──────────────────────────────────┘ │ │
│ └─────────────────────────────────┼─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ CC-MotionGen DIFFUSION │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ U-Net Architecture │ │ │
│ │ │ │ │ │
│ │ │ Input: x_t (noisy motion) │ │ │
│ │ │ t (timestep embedding) │ │ │
│ │ │ c (conditioning) │ │ │
│ │ │ p (motion prior) │ │ │
│ │ │ │ │ │
│ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │
│ │ │ │ Down Blocks Middle Block Up Blocks │ │ │ │
│ │ │ │ ┌─────┐ ┌─────────┐ ┌─────┐ │ │ │ │
│ │ │ │ │Conv │──┬───▶│ Self │────▶│Conv │ │ │ │ │
│ │ │ │ │+Attn│ │ │ +Cross │ │+Attn│ │ │ │ │
│ │ │ │ └─────┘ │ │ Attn │ └─────┘ │ │ │ │
│ │ │ │ │ └─────────┘ ▲ │ │ │ │
│ │ │ │ └────────────────────────┘ (skip connections)│ │ │ │
│ │ │ └─────────────────────────────────────────────────────────┘ │ │ │
│ │ │ │ │ │
│ │ │ Output: ε_θ (predicted noise) │ │ │
│ │ │ │ │ │
│ │ └──────────────────────────────┬──────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─────────────────────────────────────────────────────────────────┐ │ │
│ │ │ DDPM/DDIM Sampling Loop │ │ │
│ │ │ │ │ │
│ │ │ for t = T, T-1, ..., 1: │ │ │
│ │ │ ε = UNet(x_t, t, c, p) │ │ │
│ │ │ x_{t-1} = denoise(x_t, ε, t) │ │ │
│ │ │ │ │ │
│ │ └──────────────────────────────┬──────────────────────────────────┘ │ │
│ └─────────────────────────────────┼─────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ POST-PROCESSING │ │
│ │ │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────┐ │ │
│ │ │ Motion │ │ Temporal │ │ Foot Contact │ │ │
│ │ │ Decoder │──▶│ Smoothing │──▶│ Correction │ │ │
│ │ │ │ │ (Gaussian) │ │ (IK) │ │ │
│ │ └─────────────────┘ └─────────────────┘ └──────────┬──────────┘ │ │
│ └───────────────────────────────────────────────────────┼───────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────┐ │
│ │ OUTPUT │ │
│ │ │ │
│ │ Motion Sequence: (T=196, J=22, D=3) │ │
│ │ - 196 frames (~6.5 seconds at 30fps) │ │
│ │ - 22 joints (SMPL skeleton) │ │
│ │ - 3D positions per joint │ │
│ │ │ │
│ └───────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘---
2. CC-MotionGen
2.1 Model Architecture
Location: `core/cc-ml/cc_motiongen/`
# core/cc-ml/cc_motiongen/model/diffusion.py
class MotionDiffusion(nn.Module):
"""
Denoising Diffusion Probabilistic Model for motion generation.
"""
def __init__(self, config: DiffusionConfig):
super().__init__()
self.config = config
self.num_timesteps = config.num_timesteps
# U-Net denoiser
self.unet = MotionUNet(
in_channels=config.motion_dim,
out_channels=config.motion_dim,
model_channels=config.model_channels,
num_res_blocks=config.num_res_blocks,
attention_resolutions=config.attention_resolutions,
dropout=config.dropout,
channel_mult=config.channel_mult,
num_heads=config.num_heads,
context_dim=config.context_dim
)
# Noise schedule
self.betas = self._cosine_beta_schedule(
config.num_timesteps,
config.beta_start,
config.beta_end
)
# Precompute diffusion parameters
self.alphas = 1.0 - self.betas
self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
self.sqrt_alphas_cumprod = torch.sqrt(self.alphas_cumprod)
self.sqrt_one_minus_alphas_cumprod = torch.sqrt(1.0 - self.alphas_cumprod)
def forward(
self,
x: torch.Tensor, # (B, T, J, 3) motion
t: torch.Tensor, # (B,) timesteps
context: torch.Tensor, # (B, D) conditioning
prior: torch.Tensor = None # (B, T, J, 3) motion prior
) -> torch.Tensor:
"""
Predict noise given noisy input and conditioning.
"""
# Flatten motion: (B, T, J, 3) -> (B, T, J*3)
B, T, J, _ = x.shape
x_flat = x.reshape(B, T, J * 3)
# Add prior as additional context if provided
if prior is not None:
prior_flat = prior.reshape(B, T, J * 3)
x_flat = torch.cat([x_flat, prior_flat], dim=-1)
# Get timestep embeddings
t_emb = self._timestep_embedding(t)
# U-Net forward
noise_pred = self.unet(x_flat, t_emb, context)
# Reshape back: (B, T, J*3) -> (B, T, J, 3)
return noise_pred.reshape(B, T, J, 3)
@torch.no_grad()
def sample(
self,
context: torch.Tensor,
prior: torch.Tensor = None,
num_steps: int = 50,
guidance_scale: float = 7.5
) -> torch.Tensor:
"""
Generate motion using DDIM sampling.
"""
device = context.device
B = context.shape[0]
# Initialize with noise
x = torch.randn(
B,
self.config.seq_length,
self.config.num_joints,
3,
device=device
)
# DDIM timesteps
timesteps = torch.linspace(
self.num_timesteps - 1,
0,
num_steps,
dtype=torch.long,
device=device
)
for i, t in enumerate(timesteps):
t_batch = t.expand(B)
# Classifier-free guidance
if guidance_scale > 1.0:
# Conditional prediction
noise_cond = self(x, t_batch, context, prior)
# Unconditional prediction
noise_uncond = self(x, t_batch, torch.zeros_like(context), None)
# Guided prediction
noise = noise_uncond + guidance_scale * (noise_cond - noise_uncond)
else:
noise = self(x, t_batch, context, prior)
# DDIM update step
x = self._ddim_step(x, noise, t, timesteps[i + 1] if i < len(timesteps) - 1 else 0)
return x2.2 U-Net Architecture
# core/cc-ml/cc_motiongen/model/unet.py
class MotionUNet(nn.Module):
"""
U-Net architecture for motion denoising.
"""
def __init__(
self,
in_channels: int,
out_channels: int,
model_channels: int = 256,
num_res_blocks: int = 2,
attention_resolutions: tuple = (4, 2, 1),
dropout: float = 0.1,
channel_mult: tuple = (1, 2, 4, 8),
num_heads: int = 8,
context_dim: int = 768
):
super().__init__()
self.in_channels = in_channels
self.model_channels = model_channels
# Time embedding
self.time_embed = nn.Sequential(
nn.Linear(model_channels, model_channels * 4),
nn.SiLU(),
nn.Linear(model_channels * 4, model_channels * 4)
)
# Input projection
self.input_proj = nn.Linear(in_channels, model_channels)
# Down blocks
self.down_blocks = nn.ModuleList()
ch = model_channels
for level, mult in enumerate(channel_mult):
out_ch = model_channels * mult
for _ in range(num_res_blocks):
self.down_blocks.append(
ResBlock(ch, out_ch, dropout, time_channels=model_channels * 4)
)
ch = out_ch
if level in attention_resolutions:
self.down_blocks.append(
CrossAttentionBlock(ch, context_dim, num_heads)
)
if level < len(channel_mult) - 1:
self.down_blocks.append(Downsample(ch))
# Middle blocks
self.middle_block = nn.Sequential(
ResBlock(ch, ch, dropout, time_channels=model_channels * 4),
CrossAttentionBlock(ch, context_dim, num_heads),
ResBlock(ch, ch, dropout, time_channels=model_channels * 4)
)
# Up blocks (with skip connections)
self.up_blocks = nn.ModuleList()
for level, mult in enumerate(reversed(channel_mult)):
out_ch = model_channels * mult
for i in range(num_res_blocks + 1):
skip_ch = ch if i == 0 else 0
self.up_blocks.append(
ResBlock(ch + skip_ch, out_ch, dropout, time_channels=model_channels * 4)
)
ch = out_ch
if level in attention_resolutions:
self.up_blocks.append(
CrossAttentionBlock(ch, context_dim, num_heads)
)
if level < len(channel_mult) - 1:
self.up_blocks.append(Upsample(ch))
# Output projection
self.output_proj = nn.Sequential(
nn.GroupNorm(32, ch),
nn.SiLU(),
nn.Linear(ch, out_channels)
)
def forward(
self,
x: torch.Tensor, # (B, T, D)
t_emb: torch.Tensor, # (B, D)
context: torch.Tensor # (B, L, C)
) -> torch.Tensor:
# Project input
h = self.input_proj(x)
# Time embedding
t_emb = self.time_embed(t_emb)
# Encoder with skip connections
skips = []
for block in self.down_blocks:
if isinstance(block, (ResBlock, CrossAttentionBlock)):
h = block(h, t_emb, context)
else:
h = block(h)
skips.append(h)
# Middle
h = self.middle_block[0](h, t_emb, context)
h = self.middle_block[1](h, t_emb, context)
h = self.middle_block[2](h, t_emb, context)
# Decoder with skip connections
for block in self.up_blocks:
if isinstance(block, (ResBlock, CrossAttentionBlock)):
if skips:
h = torch.cat([h, skips.pop()], dim=-1)
h = block(h, t_emb, context)
else:
h = block(h)
# Output
return self.output_proj(h)2.3 Motion Decoder
# core/cc-ml/cc_motiongen/model/decoder.py
class MotionDecoder(nn.Module):
"""
Decodes latent motion representation to joint positions.
"""
def __init__(
self,
latent_dim: int = 512,
hidden_dim: int = 1024,
num_joints: int = 22,
num_layers: int = 4
):
super().__init__()
self.num_joints = num_joints
# MLP decoder
layers = []
in_dim = latent_dim
for i in range(num_layers - 1):
out_dim = hidden_dim
layers.extend([
nn.Linear(in_dim, out_dim),
nn.LayerNorm(out_dim),
nn.GELU(),
nn.Dropout(0.1)
])
in_dim = out_dim
layers.append(nn.Linear(in_dim, num_joints * 3))
self.decoder = nn.Sequential(*layers)
# FK layer for converting local to global positions
self.forward_kinematics = ForwardKinematics(SKELETON_PARENTS)
def forward(self, z: torch.Tensor) -> torch.Tensor:
"""
Decode latent to joint positions.
Args:
z: (B, T, latent_dim) latent motion
Returns:
positions: (B, T, J, 3) joint positions
"""
B, T, _ = z.shape
# Decode to local positions
local = self.decoder(z) # (B, T, J*3)
local = local.reshape(B, T, self.num_joints, 3)
# Apply forward kinematics
global_pos = self.forward_kinematics(local)
return global_pos
class ForwardKinematics(nn.Module):
"""
Convert local joint positions/rotations to global.
"""
def __init__(self, parents: dict):
super().__init__()
self.parents = parents
self.joint_order = self._topological_sort()
def forward(self, local: torch.Tensor) -> torch.Tensor:
"""
Apply forward kinematics.
Args:
local: (B, T, J, 3) local positions
Returns:
global_pos: (B, T, J, 3) global positions
"""
B, T, J, _ = local.shape
global_pos = torch.zeros_like(local)
for joint_idx, joint_name in enumerate(self.joint_order):
parent_name = self.parents.get(joint_name)
if parent_name is None:
# Root joint
global_pos[:, :, joint_idx] = local[:, :, joint_idx]
else:
parent_idx = self.joint_order.index(parent_name)
global_pos[:, :, joint_idx] = (
global_pos[:, :, parent_idx] + local[:, :, joint_idx]
)
return global_pos2.4 Configuration
# core/cc-ml/cc_motiongen/config.py
from dataclasses import dataclass
@dataclass
class DiffusionConfig:
"""Configuration for CC-MotionGen model."""
# Motion dimensions
seq_length: int = 196 # ~6.5 seconds at 30fps
num_joints: int = 22 # SMPL skeleton
motion_dim: int = 66 # 22 joints × 3 coordinates
# Diffusion parameters
num_timesteps: int = 1000 # Training timesteps
beta_start: float = 0.0001
beta_end: float = 0.02
# U-Net architecture
model_channels: int = 256
num_res_blocks: int = 2
attention_resolutions: tuple = (4, 2, 1)
dropout: float = 0.1
channel_mult: tuple = (1, 2, 4, 8)
num_heads: int = 8
# Conditioning
context_dim: int = 5568 # Jukebox (4800) + CLIP (768)
# Training
batch_size: int = 64
learning_rate: float = 1e-4
weight_decay: float = 0.01
num_epochs: int = 500
warmup_steps: int = 1000
# Inference
inference_steps: int = 50 # DDIM steps
guidance_scale: float = 7.5
@dataclass
class TrainingConfig:
"""Training configuration."""
# Data
train_data_path: str = "gs://comp-core-data/motion/train"
val_data_path: str = "gs://comp-core-data/motion/val"
# Checkpoints
checkpoint_dir: str = "gs://comp-core-models/cc_motiongen"
save_every: int = 1000
eval_every: int = 500
# Hardware
num_gpus: int = 4
fp16: bool = True
gradient_accumulation: int = 4
# Logging
wandb_project: str = "cc-motiongen"
log_every: int = 100---
3. RAG++ Policy
3.1 Overview
RAG++ (Retrieval-Augmented Generation++) enhances motion generation by retrieving relevant motion phrases from a curated database.
Location: `core/cc-core/cc_core/policy/rag_motionphrase/`
3.2 Architecture
# core/cc-core/cc_core/policy/rag_motionphrase/service.py
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class RetrievalResult:
phrase_id: str
motion_data: np.ndarray # (T, J, 3)
similarity_score: float
rerank_score: float
metadata: dict
class RAGPlusPlusService:
"""
Retrieval-Augmented Generation service for motion priors.
"""
def __init__(self, config: RAGConfig):
self.config = config
# Components
self.retriever = DenseRetriever(config.index_path)
self.reranker = CrossEncoderReranker(config.reranker_model)
self.prior_builder = MotionPriorBuilder(config.blend_config)
self.cache = LRUCache(maxsize=config.cache_size)
async def get_prior(
self,
query_embedding: np.ndarray,
text_query: Optional[str] = None,
top_k: int = 10,
rerank_top_n: int = 5
) -> np.ndarray:
"""
Retrieve and blend motion phrases into a prior.
Args:
query_embedding: Conditioning vector from music/text encoders
text_query: Optional text description for reranking
top_k: Number of candidates to retrieve
rerank_top_n: Number of top candidates after reranking
Returns:
Motion prior tensor (T, J, 3)
"""
# Check cache
cache_key = self._compute_cache_key(query_embedding)
if cache_key in self.cache:
return self.cache[cache_key]
# Stage 1: Dense retrieval
candidates = await self.retriever.search(
query_embedding,
k=top_k
)
# Stage 2: Cross-encoder reranking
if text_query and len(candidates) > rerank_top_n:
candidates = await self.reranker.rerank(
query=text_query,
candidates=candidates,
top_n=rerank_top_n
)
# Stage 3: Build motion prior
prior = self.prior_builder.build(candidates)
# Cache result
self.cache[cache_key] = prior
return prior3.3 Dense Retriever
# core/cc-core/cc_core/policy/rag_motionphrase/retriever.py
import faiss
import numpy as np
class DenseRetriever:
"""
FAISS-based dense retrieval for motion phrases.
"""
def __init__(self, index_path: str):
self.index = faiss.read_index(index_path)
self.metadata = self._load_metadata(index_path)
async def search(
self,
query: np.ndarray,
k: int = 10
) -> List[RetrievalResult]:
"""
Search for similar motion phrases.
"""
# Normalize query
query = query / np.linalg.norm(query)
query = query.reshape(1, -1).astype('float32')
# FAISS search
distances, indices = self.index.search(query, k)
# Build results
results = []
for i, (dist, idx) in enumerate(zip(distances[0], indices[0])):
if idx == -1:
continue
meta = self.metadata[idx]
results.append(RetrievalResult(
phrase_id=meta['id'],
motion_data=self._load_motion(meta['path']),
similarity_score=1.0 - dist, # Convert distance to similarity
rerank_score=0.0,
metadata=meta
))
return results
def _load_motion(self, path: str) -> np.ndarray:
"""Load motion data from storage."""
return np.load(path)3.4 Cross-Encoder Reranker
# core/cc-core/cc_core/policy/rag_motionphrase/reranker.py
from transformers import AutoModelForSequenceClassification, AutoTokenizer
class CrossEncoderReranker:
"""
Cross-encoder model for reranking retrieved motion phrases.
"""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
self.model.eval()
async def rerank(
self,
query: str,
candidates: List[RetrievalResult],
top_n: int = 5
) -> List[RetrievalResult]:
"""
Rerank candidates using cross-encoder.
"""
if not candidates:
return []
# Prepare inputs
pairs = [
(query, self._motion_to_text(c.metadata))
for c in candidates
]
# Tokenize
inputs = self.tokenizer(
pairs,
padding=True,
truncation=True,
return_tensors="pt"
)
# Score
with torch.no_grad():
scores = self.model(**inputs).logits.squeeze(-1)
# Update rerank scores and sort
for candidate, score in zip(candidates, scores):
candidate.rerank_score = score.item()
candidates.sort(key=lambda x: x.rerank_score, reverse=True)
return candidates[:top_n]
def _motion_to_text(self, metadata: dict) -> str:
"""Convert motion metadata to text description."""
return f"{metadata.get('genre', 'dance')} {metadata.get('style', '')} {metadata.get('energy', 'medium')} energy"3.5 Prior Builder
# core/cc-core/cc_core/policy/rag_motionphrase/prior_builder.py
class MotionPriorBuilder:
"""
Builds motion priors from retrieved phrases.
"""
def __init__(self, config: BlendConfig):
self.config = config
def build(self, candidates: List[RetrievalResult]) -> np.ndarray:
"""
Build weighted blend of motion phrases.
"""
if not candidates:
return np.zeros((196, 22, 3)) # Default zero prior
# Compute weights from scores
scores = np.array([
c.rerank_score if c.rerank_score > 0 else c.similarity_score
for c in candidates
])
weights = self._softmax(scores / self.config.temperature)
# Align temporal lengths
target_length = self.config.sequence_length
aligned_motions = [
self._temporal_align(c.motion_data, target_length)
for c in candidates
]
# Weighted blend
prior = np.zeros((target_length, 22, 3))
for motion, weight in zip(aligned_motions, weights):
prior += weight * motion
# Apply temporal smoothing
prior = self._gaussian_smooth(prior, sigma=self.config.smooth_sigma)
return prior
def _temporal_align(self, motion: np.ndarray, target_length: int) -> np.ndarray:
"""Resample motion to target length."""
current_length = motion.shape[0]
if current_length == target_length:
return motion
# Linear interpolation
indices = np.linspace(0, current_length - 1, target_length)
aligned = np.zeros((target_length, motion.shape[1], motion.shape[2]))
for i, idx in enumerate(indices):
low = int(idx)
high = min(low + 1, current_length - 1)
t = idx - low
aligned[i] = (1 - t) * motion[low] + t * motion[high]
return aligned
def _softmax(self, x: np.ndarray) -> np.ndarray:
"""Compute softmax."""
exp_x = np.exp(x - np.max(x))
return exp_x / exp_x.sum()
def _gaussian_smooth(self, motion: np.ndarray, sigma: float) -> np.ndarray:
"""Apply Gaussian smoothing along time axis."""
from scipy.ndimage import gaussian_filter1d
return gaussian_filter1d(motion, sigma, axis=0)---
4. MotionPhrase System
4.1 Overview
MotionPhrase is the data curation and indexing system for RAG++.
Location: `core/cc-ml/motionphrase/`
4.2 Phrase Extraction
# core/cc-ml/motionphrase/extract_phrases.py
class PhraseExtractor:
"""
Extract motion phrases from full sequences.
"""
def __init__(self, config: ExtractionConfig):
self.config = config
self.beat_detector = BeatDetector()
self.phrase_detector = PhraseDetector()
def extract(self, motion: np.ndarray, audio: np.ndarray) -> List[MotionPhrase]:
"""
Extract phrases aligned to music structure.
"""
# Detect beats
beats = self.beat_detector.detect(audio, sr=self.config.sample_rate)
# Detect phrase boundaries (usually 4 or 8 bars)
boundaries = self.phrase_detector.detect(audio, beats)
# Extract motion segments
phrases = []
for start_beat, end_beat in boundaries:
start_frame = self._beat_to_frame(start_beat)
end_frame = self._beat_to_frame(end_beat)
if end_frame - start_frame < self.config.min_phrase_length:
continue
phrase_motion = motion[start_frame:end_frame]
# Compute features
features = self._compute_features(phrase_motion, audio, start_beat, end_beat)
phrases.append(MotionPhrase(
motion=phrase_motion,
start_beat=start_beat,
end_beat=end_beat,
features=features
))
return phrases
def _compute_features(
self,
motion: np.ndarray,
audio: np.ndarray,
start_beat: int,
end_beat: int
) -> PhraseFeatures:
"""Compute phrase-level features for retrieval."""
return PhraseFeatures(
energy=self._compute_energy(motion),
tempo=self._estimate_tempo(start_beat, end_beat),
style=self._classify_style(motion),
genre=self._classify_genre(audio)
)4.3 Index Building
# core/cc-ml/motionphrase/build_indexes.py
import faiss
from sentence_transformers import SentenceTransformer
class IndexBuilder:
"""
Build FAISS indexes for motion phrase retrieval.
"""
def __init__(self, config: IndexConfig):
self.config = config
self.encoder = SentenceTransformer(config.encoder_model)
def build(self, phrases: List[MotionPhrase]) -> faiss.Index:
"""
Build FAISS index from phrases.
"""
# Compute embeddings
embeddings = []
for phrase in phrases:
# Combine motion and metadata features
emb = self._embed_phrase(phrase)
embeddings.append(emb)
embeddings = np.array(embeddings).astype('float32')
# Normalize
faiss.normalize_L2(embeddings)
# Build index
dimension = embeddings.shape[1]
if self.config.index_type == 'flat':
index = faiss.IndexFlatIP(dimension)
elif self.config.index_type == 'ivf':
quantizer = faiss.IndexFlatIP(dimension)
index = faiss.IndexIVFFlat(
quantizer,
dimension,
self.config.nlist,
faiss.METRIC_INNER_PRODUCT
)
index.train(embeddings)
else:
raise ValueError(f"Unknown index type: {self.config.index_type}")
index.add(embeddings)
return index
def _embed_phrase(self, phrase: MotionPhrase) -> np.ndarray:
"""Create embedding for a phrase."""
# Text description from features
text = f"{phrase.features.genre} {phrase.features.style} dance with {phrase.features.energy} energy at {phrase.features.tempo} bpm"
# Encode text
text_emb = self.encoder.encode(text)
# Motion statistics embedding
motion_stats = self._compute_motion_stats(phrase.motion)
# Concatenate
return np.concatenate([text_emb, motion_stats])---
5. Training Pipeline
5.1 Training Script
# core/cc-ml/cc_motiongen/scripts/train.py
import torch
from torch.utils.data import DataLoader
from accelerate import Accelerator
import wandb
def train(config: TrainingConfig):
"""Main training loop."""
# Initialize accelerator
accelerator = Accelerator(
mixed_precision='fp16' if config.fp16 else 'no',
gradient_accumulation_steps=config.gradient_accumulation
)
# Model
model = MotionDiffusion(config.model)
# Data
train_dataset = MotionDataset(config.train_data_path)
train_loader = DataLoader(
train_dataset,
batch_size=config.batch_size,
shuffle=True,
num_workers=8
)
# Optimizer
optimizer = torch.optim.AdamW(
model.parameters(),
lr=config.learning_rate,
weight_decay=config.weight_decay
)
# Learning rate scheduler
scheduler = get_cosine_schedule_with_warmup(
optimizer,
num_warmup_steps=config.warmup_steps,
num_training_steps=len(train_loader) * config.num_epochs
)
# Prepare with accelerator
model, optimizer, train_loader, scheduler = accelerator.prepare(
model, optimizer, train_loader, scheduler
)
# Training loop
global_step = 0
for epoch in range(config.num_epochs):
model.train()
for batch in train_loader:
with accelerator.accumulate(model):
motion = batch['motion']
context = batch['context']
# Sample timesteps
t = torch.randint(
0,
config.model.num_timesteps,
(motion.shape[0],),
device=motion.device
)
# Add noise
noise = torch.randn_like(motion)
noisy_motion = model.q_sample(motion, t, noise)
# Predict noise
noise_pred = model(noisy_motion, t, context)
# Loss
loss = F.mse_loss(noise_pred, noise)
accelerator.backward(loss)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
global_step += 1
# Logging
if global_step % config.log_every == 0:
wandb.log({
'loss': loss.item(),
'lr': scheduler.get_last_lr()[0],
'epoch': epoch
}, step=global_step)
# Checkpointing
if global_step % config.save_every == 0:
accelerator.save_state(f"{config.checkpoint_dir}/step_{global_step}")
# Evaluation
if global_step % config.eval_every == 0:
evaluate(model, config)5.2 GCP Deployment
# core/cc-ml/cc_motiongen/cloudbuild-training.yaml
steps:
# Build training image
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', 'gcr.io/$PROJECT_ID/cc-motiongen-train', '.']
# Push image
- name: 'gcr.io/cloud-builders/docker'
args: ['push', 'gcr.io/$PROJECT_ID/cc-motiongen-train']
# Submit Vertex AI training job
- name: 'gcr.io/cloud-builders/gcloud'
args:
- 'ai'
- 'custom-jobs'
- 'create'
- '--region=us-central1'
- '--display-name=cc-motiongen-training'
- '--worker-pool-spec=machine-type=n1-standard-8,accelerator-type=NVIDIA_TESLA_V100,accelerator-count=4,container-image-uri=gcr.io/$PROJECT_ID/cc-motiongen-train'---
6. Inference API
6.1 REST API
# Inference endpoint
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI()
class GenerationRequest(BaseModel):
audio_url: str = None
text_prompt: str = None
duration: float = 6.5
guidance_scale: float = 7.5
use_rag: bool = True
class GenerationResponse(BaseModel):
motion: list # (T, J, 3) as nested list
fps: int = 30
num_frames: int
num_joints: int = 22
@app.post("/generate", response_model=GenerationResponse)
async def generate_motion(request: GenerationRequest):
"""Generate motion from music/text conditioning."""
# Encode conditioning
if request.audio_url:
audio = await load_audio(request.audio_url)
context = music_encoder.encode(audio)
elif request.text_prompt:
context = text_encoder.encode(request.text_prompt)
else:
raise HTTPException(400, "Must provide audio_url or text_prompt")
# Get RAG++ prior
prior = None
if request.use_rag:
prior = await rag_service.get_prior(context, request.text_prompt)
# Generate motion
motion = model.sample(
context=torch.tensor(context).unsqueeze(0),
prior=torch.tensor(prior).unsqueeze(0) if prior is not None else None,
guidance_scale=request.guidance_scale
)
# Post-process
motion = postprocess(motion.squeeze(0).numpy())
return GenerationResponse(
motion=motion.tolist(),
num_frames=motion.shape[0]
)6.2 Python Client
# Client usage
import requests
def generate_motion(
audio_path: str = None,
text: str = None,
api_url: str = "http://localhost:8000"
) -> np.ndarray:
"""Generate motion using the inference API."""
response = requests.post(
f"{api_url}/generate",
json={
"audio_url": audio_path,
"text_prompt": text,
"use_rag": True
}
)
response.raise_for_status()
data = response.json()
return np.array(data['motion'])
# Example usage
motion = generate_motion(text="energetic hip-hop dance with arm waves")
print(f"Generated {motion.shape[0]} frames of motion")---
7. Evaluation Metrics
7.1 Quality Metrics
# core/cc-ml/cc_motiongen/evaluation/benchmarks.py
class MotionQualityMetrics:
"""Evaluation metrics for generated motion."""
def __init__(self):
self.fid_model = load_fid_model()
def frechet_inception_distance(
self,
generated: np.ndarray,
real: np.ndarray
) -> float:
"""Compute FID between generated and real motion distributions."""
gen_features = self.fid_model.extract_features(generated)
real_features = self.fid_model.extract_features(real)
mu_gen, sigma_gen = gen_features.mean(0), np.cov(gen_features, rowvar=False)
mu_real, sigma_real = real_features.mean(0), np.cov(real_features, rowvar=False)
return self._compute_fid(mu_gen, sigma_gen, mu_real, sigma_real)
def diversity(self, motions: np.ndarray) -> float:
"""Compute diversity of generated motions."""
n = len(motions)
distances = []
for i in range(n):
for j in range(i + 1, n):
dist = np.mean(np.abs(motions[i] - motions[j]))
distances.append(dist)
return np.mean(distances)
def multimodality(
self,
motions_per_condition: List[np.ndarray]
) -> float:
"""Compute multimodality (diversity given same condition)."""
multimod_scores = []
for motions in motions_per_condition:
if len(motions) < 2:
continue
multimod_scores.append(self.diversity(motions))
return np.mean(multimod_scores)
def foot_skating(self, motion: np.ndarray) -> float:
"""Compute foot skating metric (lower is better)."""
foot_joints = [16, 17, 20, 21] # Left/right foot indices
velocities = np.diff(motion[:, foot_joints], axis=0)
skating_frames = 0
for t in range(len(velocities)):
for joint in range(4):
# Check if foot is on ground (y < threshold)
if motion[t, foot_joints[joint], 1] < 0.05:
# Check if moving (skating)
vel_mag = np.linalg.norm(velocities[t, joint])
if vel_mag > 0.01: # Velocity threshold
skating_frames += 1
return skating_frames / (len(velocities) * 4)
def beat_alignment(
self,
motion: np.ndarray,
beats: np.ndarray,
fps: int = 30
) -> float:
"""Compute alignment between motion peaks and music beats."""
# Compute motion velocity
velocity = np.linalg.norm(np.diff(motion, axis=0), axis=(1, 2))
# Find motion peaks
from scipy.signal import find_peaks
motion_peaks, _ = find_peaks(velocity, height=np.mean(velocity))
motion_peak_times = motion_peaks / fps
# Compute alignment score
alignments = []
for beat_time in beats:
if len(motion_peak_times) == 0:
continue
closest_peak = motion_peak_times[
np.argmin(np.abs(motion_peak_times - beat_time))
]
alignments.append(1.0 - min(abs(closest_peak - beat_time), 0.2) / 0.2)
return np.mean(alignments) if alignments else 0.07.2 Evaluation Harness
# core/cc-ml/cc_motiongen/evaluation/harness.py
class EvaluationHarness:
"""Complete evaluation pipeline."""
def __init__(self, model, test_dataset):
self.model = model
self.test_dataset = test_dataset
self.metrics = MotionQualityMetrics()
def run_evaluation(self, num_samples: int = 1000) -> dict:
"""Run full evaluation suite."""
generated_motions = []
real_motions = []
for i, batch in enumerate(self.test_dataset):
if i >= num_samples:
break
# Generate
gen = self.model.sample(batch['context'])
generated_motions.append(gen)
real_motions.append(batch['motion'])
generated = np.stack(generated_motions)
real = np.stack(real_motions)
results = {
'fid': self.metrics.frechet_inception_distance(generated, real),
'diversity': self.metrics.diversity(generated),
'multimodality': self.metrics.multimodality(generated),
'foot_skating': np.mean([
self.metrics.foot_skating(m) for m in generated
]),
'beat_alignment': np.mean([
self.metrics.beat_alignment(g, b['beats'])
for g, b in zip(generated, self.test_dataset)
])
}
return results---
Document Version: 2.0.0
Generated: December 26, 2024
Promotion Decision
Promote into a technical note or architecture paper with implementation anchors.
Source Anchor
projects/Documentation/01-architecture/systems/ML_GENERATION_SYSTEMS.md
Detected Structure
Method · Evaluation · Code Anchors · Architecture