Sensor Fusion Implementation Specification
This document provides detailed implementation specifications for the Mocopi + MediaPipe sensor fusion system. It includes code scaffolds, API contracts, and integration points with existing Comp-Core infrastructure.
Full Public Reader
Sensor Fusion Implementation Specification
Version 1.0 | December 2024
Status: Ready for Implementation
---
Overview
This document provides detailed implementation specifications for the Mocopi + MediaPipe sensor fusion system. It includes code scaffolds, API contracts, and integration points with existing Comp-Core infrastructure.
---
1. Module Structure
core/cc-core/cc_core/fusion/
├── __init__.py
├── unified_receiver.py # Combined Mocopi + MediaPipe ingestion
├── temporal_aligner.py # Clock synchronization
├── kalman_fusion.py # Per-limb Kalman filter
├── occlusion_handler.py # Dropout recovery
├── skeleton_builder.py # Unified skeleton output
├── transforms/
│ ├── __init__.py
│ ├── to_25d.py # Standard CC-MotionGen format
│ ├── to_63d.py # Extended format with face/hands
│ └── mediapipe_to_bones.py # MediaPipe landmarks → bone transforms
└── capture/
├── __init__.py
├── session_manager.py # Recording sessions
├── audio_sync.py # Audio-motion synchronization
├── take_manager.py # Multiple takes per phrase
└── validator.py # Quality validation---
2. Core Classes
2.1 UnifiedReceiver
# core/cc-core/cc_core/fusion/unified_receiver.py
"""
Unified receiver for Mocopi and MediaPipe data streams.
This module provides a single entry point for receiving motion data
from both IMU sensors (Mocopi) and camera-based tracking (MediaPipe),
outputting a combined stream ready for sensor fusion.
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from typing import AsyncIterator, Callable, Optional
from enum import Enum
import numpy as np
from cc_core.integrations.mocopi_bridge import MocopiReceiver, PoseFrame
from cc_core.skeleton.mapping import MocopiToCC
class StreamSource(Enum):
MOCOPI = "mocopi"
MEDIAPIPE = "mediapipe"
FUSED = "fused"
@dataclass
class MediaPipeFrame:
"""
MediaPipe holistic result in a format compatible with fusion.
"""
timestamp_ms: float
frame_number: int
# Pose landmarks (33 points)
pose_landmarks: Optional[np.ndarray] = None # [33, 4] (x, y, z, vis)
pose_world_landmarks: Optional[np.ndarray] = None # [33, 3] metric
# Face landmarks (468 points)
face_landmarks: Optional[np.ndarray] = None # [468, 2] (x, y)
# Hand landmarks (21 points each)
left_hand_landmarks: Optional[np.ndarray] = None # [21, 4]
right_hand_landmarks: Optional[np.ndarray] = None # [21, 4]
# Confidence scores
confidence: dict = field(default_factory=lambda: {
'pose': 0.0, 'face': 0.0, 'left_hand': 0.0, 'right_hand': 0.0
})
@dataclass
class UnifiedFrame:
"""
Combined frame from both sensors ready for fusion.
"""
timestamp_ms: float
frame_number: int
session_id: str
# Raw data from each source
mocopi_frame: Optional[PoseFrame] = None
mediapipe_frame: Optional[MediaPipeFrame] = None
# Availability flags
has_mocopi: bool = False
has_mediapipe: bool = False
# Temporal alignment info
mocopi_lag_ms: float = 0.0
mediapipe_lag_ms: float = 0.0
class UnifiedReceiver:
"""
Unified receiver that combines Mocopi and MediaPipe streams.
Usage:
async with UnifiedReceiver() as receiver:
async for frame in receiver.stream():
# frame contains both Mocopi and MediaPipe data
fused = fusion_engine.fuse(frame)
"""
def __init__(
self,
mocopi_port: int = 12351,
mediapipe_ws_url: str = "ws://localhost:8766/mediapipe",
session_id: str = None,
target_fps: float = 60.0,
):
self.mocopi_port = mocopi_port
self.mediapipe_ws_url = mediapipe_ws_url
self.session_id = session_id or self._generate_session_id()
self.target_fps = target_fps
# Internal receivers
self._mocopi_receiver: Optional[MocopiReceiver] = None
self._mediapipe_ws = None
# Buffers for alignment
self._mocopi_buffer: asyncio.Queue = asyncio.Queue(maxsize=10)
self._mediapipe_buffer: asyncio.Queue = asyncio.Queue(maxsize=5)
# Timing
self._session_start: Optional[float] = None
self._frame_counter = 0
# Callbacks
self._on_mocopi_frame: list[Callable] = []
self._on_mediapipe_frame: list[Callable] = []
self._on_unified_frame: list[Callable] = []
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, *args):
await self.stop()
async def start(self):
"""Start receiving from both sources."""
import time
self._session_start = time.perf_counter()
# Start Mocopi receiver
self._mocopi_receiver = MocopiReceiver(port=self.mocopi_port)
asyncio.create_task(self._receive_mocopi())
# Start MediaPipe WebSocket connection
asyncio.create_task(self._receive_mediapipe())
async def stop(self):
"""Stop all receivers."""
if self._mocopi_receiver:
await self._mocopi_receiver.close()
if self._mediapipe_ws:
await self._mediapipe_ws.close()
async def stream(self) -> AsyncIterator[UnifiedFrame]:
"""
Yield unified frames at target FPS.
Aligns Mocopi (60Hz) and MediaPipe (30Hz) streams
to produce unified frames at the target rate.
"""
import time
frame_interval = 1.0 / self.target_fps
while True:
# Calculate target timestamp
elapsed = time.perf_counter() - self._session_start
target_ts = elapsed * 1000 # ms
# Get latest frames from each source
mocopi_frame = await self._get_nearest_mocopi(target_ts)
mediapipe_frame = await self._get_nearest_mediapipe(target_ts)
# Build unified frame
unified = UnifiedFrame(
timestamp_ms=target_ts,
frame_number=self._frame_counter,
session_id=self.session_id,
mocopi_frame=mocopi_frame,
mediapipe_frame=mediapipe_frame,
has_mocopi=mocopi_frame is not None,
has_mediapipe=mediapipe_frame is not None,
mocopi_lag_ms=abs(target_ts - mocopi_frame.t_device) if mocopi_frame else 0,
mediapipe_lag_ms=abs(target_ts - mediapipe_frame.timestamp_ms) if mediapipe_frame else 0,
)
self._frame_counter += 1
# Fire callbacks
for cb in self._on_unified_frame:
cb(unified)
yield unified
# Wait for next frame
await asyncio.sleep(frame_interval)
async def _receive_mocopi(self):
"""Background task to receive Mocopi frames."""
async for frame in self._mocopi_receiver.stream():
try:
self._mocopi_buffer.put_nowait(frame)
except asyncio.QueueFull:
# Drop oldest frame
try:
self._mocopi_buffer.get_nowait()
self._mocopi_buffer.put_nowait(frame)
except:
pass
for cb in self._on_mocopi_frame:
cb(frame)
async def _receive_mediapipe(self):
"""Background task to receive MediaPipe frames via WebSocket."""
import websockets
import json
while True:
try:
async with websockets.connect(self.mediapipe_ws_url) as ws:
self._mediapipe_ws = ws
async for message in ws:
frame = self._parse_mediapipe_message(message)
try:
self._mediapipe_buffer.put_nowait(frame)
except asyncio.QueueFull:
try:
self._mediapipe_buffer.get_nowait()
self._mediapipe_buffer.put_nowait(frame)
except:
pass
for cb in self._on_mediapipe_frame:
cb(frame)
except Exception as e:
print(f"MediaPipe connection error: {e}, reconnecting...")
await asyncio.sleep(2.0)
def _parse_mediapipe_message(self, message: str | bytes) -> MediaPipeFrame:
"""Parse MediaPipe WebSocket message to MediaPipeFrame."""
import json
if isinstance(message, bytes):
return self._parse_mediapipe_binary(message)
data = json.loads(message)
return MediaPipeFrame(
timestamp_ms=data.get('timestamp_ms', 0),
frame_number=data.get('frame_number', 0),
pose_landmarks=np.array(data['poseLandmarks']) if data.get('poseLandmarks') else None,
pose_world_landmarks=np.array(data['poseWorldLandmarks']) if data.get('poseWorldLandmarks') else None,
face_landmarks=np.array(data['faceLandmarks']) if data.get('faceLandmarks') else None,
left_hand_landmarks=np.array(data['leftHandLandmarks']) if data.get('leftHandLandmarks') else None,
right_hand_landmarks=np.array(data['rightHandLandmarks']) if data.get('rightHandLandmarks') else None,
confidence=data.get('confidence', {}),
)
async def _get_nearest_mocopi(self, target_ts: float) -> Optional[PoseFrame]:
"""Get the Mocopi frame nearest to target timestamp."""
best_frame = None
best_diff = float('inf')
# Drain buffer and find nearest
frames = []
while not self._mocopi_buffer.empty():
try:
frame = self._mocopi_buffer.get_nowait()
frames.append(frame)
diff = abs(frame.t_device - target_ts)
if diff < best_diff:
best_diff = diff
best_frame = frame
except:
break
# Put back frames that are still useful (future frames)
for f in frames:
if f.t_device > target_ts - 50: # Keep frames within 50ms
try:
self._mocopi_buffer.put_nowait(f)
except:
pass
return best_frame
async def _get_nearest_mediapipe(self, target_ts: float) -> Optional[MediaPipeFrame]:
"""Get the MediaPipe frame nearest to target timestamp."""
# Similar logic to _get_nearest_mocopi
best_frame = None
best_diff = float('inf')
frames = []
while not self._mediapipe_buffer.empty():
try:
frame = self._mediapipe_buffer.get_nowait()
frames.append(frame)
diff = abs(frame.timestamp_ms - target_ts)
if diff < best_diff:
best_diff = diff
best_frame = frame
except:
break
for f in frames:
if f.timestamp_ms > target_ts - 100:
try:
self._mediapipe_buffer.put_nowait(f)
except:
pass
return best_frame
@staticmethod
def _generate_session_id() -> str:
import uuid
from datetime import datetime
return f"fusion_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"2.2 KalmanFusion
# core/cc-core/cc_core/fusion/kalman_fusion.py
"""
Kalman filter-based sensor fusion for combining Mocopi and MediaPipe data.
This module implements an Extended Kalman Filter (EKF) for each limb,
fusing IMU orientation data from Mocopi with position data from MediaPipe.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Optional, Tuple
import numpy as np
from scipy.spatial.transform import Rotation
@dataclass
class FusionConfig:
"""Configuration for Kalman fusion."""
# Mocopi measurement weights
mocopi_orientation_confidence: float = 0.95
mocopi_position_confidence: float = 0.70
# MediaPipe measurement weights
mediapipe_position_confidence: float = 0.80
mediapipe_orientation_confidence: float = 0.40
# Process noise (state transition uncertainty)
position_process_noise: float = 0.01
orientation_process_noise: float = 0.001
velocity_process_noise: float = 0.1
# Smoothing factor for output (0 = no smoothing, 1 = max smoothing)
output_smoothing: float = 0.3
@dataclass
class LimbState:
"""State of a single limb after fusion."""
position: np.ndarray # [x, y, z]
quaternion: np.ndarray # [qw, qx, qy, qz]
linear_velocity: np.ndarray # [vx, vy, vz]
angular_velocity: np.ndarray # [wx, wy, wz]
# Source attribution
orientation_source: str # "mocopi" | "mediapipe" | "fused"
position_source: str
# Uncertainty
position_covariance: np.ndarray # [3, 3]
orientation_covariance: np.ndarray # [4, 4]
confidence: float
class LimbKalmanFilter:
"""
Extended Kalman Filter for a single limb.
State vector (13D):
[0:3] - Position (x, y, z)
[3:7] - Orientation quaternion (qw, qx, qy, qz)
[7:10] - Linear velocity (vx, vy, vz)
[10:13] - Angular velocity (wx, wy, wz)
"""
def __init__(self, limb_name: str, config: FusionConfig):
self.limb_name = limb_name
self.config = config
# State vector
self.x = np.zeros(13)
self.x[3] = 1.0 # Identity quaternion (w=1)
# State covariance
self.P = np.eye(13) * 0.1
# Process noise covariance
self.Q = np.diag([
config.position_process_noise,
config.position_process_noise,
config.position_process_noise,
config.orientation_process_noise,
config.orientation_process_noise,
config.orientation_process_noise,
config.orientation_process_noise,
config.velocity_process_noise,
config.velocity_process_noise,
config.velocity_process_noise,
config.velocity_process_noise * 0.5,
config.velocity_process_noise * 0.5,
config.velocity_process_noise * 0.5,
])
# Measurement noise for Mocopi (high orientation confidence)
self.R_mocopi = np.diag([
0.02, 0.02, 0.02, # Position (medium)
0.001, 0.001, 0.001, 0.001, # Quaternion (very low - IMU accurate)
])
# Measurement noise for MediaPipe (high position confidence)
self.R_mediapipe = np.diag([
0.01, 0.01, 0.05, # Position (good XY, weaker Z)
0.1, 0.1, 0.1, 0.1, # Quaternion (derived, less reliable)
])
# Last update time
self.last_update_time: Optional[float] = None
# Smoothed output
self._smoothed_state: Optional[LimbState] = None
def predict(self, dt: float):
"""
Predict state forward by dt seconds.
"""
# State transition: position += velocity * dt
self.x[0:3] += self.x[7:10] * dt
# Quaternion integration from angular velocity
omega = self.x[10:13]
omega_quat = np.array([0, omega[0], omega[1], omega[2]]) * dt / 2
q = self.x[3:7]
q_new = q + self._quaternion_multiply(q, omega_quat)
self.x[3:7] = q_new / np.linalg.norm(q_new) # Normalize
# State transition Jacobian (linearized)
F = np.eye(13)
F[0:3, 7:10] = np.eye(3) * dt
# Quaternion Jacobian is more complex, approximated
# Update covariance
self.P = F @ self.P @ F.T + self.Q
def update_mocopi(
self,
position: np.ndarray,
quaternion: np.ndarray,
timestamp_ms: float,
):
"""
Update with Mocopi measurement (trust orientation highly).
"""
# Measurement vector
z = np.concatenate([position, quaternion])
# Measurement matrix (direct observation)
H = np.zeros((7, 13))
H[0:3, 0:3] = np.eye(3) # Position
H[3:7, 3:7] = np.eye(4) # Quaternion
# Kalman gain
S = H @ self.P @ H.T + self.R_mocopi
K = self.P @ H.T @ np.linalg.inv(S)
# State update
y = z - H @ self.x # Innovation
# Handle quaternion sign ambiguity
if np.dot(z[3:7], self.x[3:7]) < 0:
y[3:7] = -z[3:7] - self.x[3:7]
self.x = self.x + K @ y
# Normalize quaternion
self.x[3:7] /= np.linalg.norm(self.x[3:7])
# Covariance update
self.P = (np.eye(13) - K @ H) @ self.P
self.last_update_time = timestamp_ms
def update_mediapipe(
self,
position: np.ndarray,
quaternion: Optional[np.ndarray],
timestamp_ms: float,
confidence: float,
):
"""
Update with MediaPipe measurement (trust position, less so orientation).
"""
if confidence < 0.3:
return # Skip low-confidence observations
# Scale measurement noise by inverse confidence
R = self.R_mediapipe.copy()
R *= (1.0 / max(confidence, 0.1))
# Build measurement
if quaternion is not None:
z = np.concatenate([position, quaternion])
H = np.zeros((7, 13))
H[0:3, 0:3] = np.eye(3)
H[3:7, 3:7] = np.eye(4)
else:
# Position only
z = position
H = np.zeros((3, 13))
H[0:3, 0:3] = np.eye(3)
R = R[0:3, 0:3]
# Kalman update
S = H @ self.P @ H.T + R
K = self.P @ H.T @ np.linalg.inv(S)
y = z - H @ self.x
self.x = self.x + K @ y
self.x[3:7] /= np.linalg.norm(self.x[3:7])
self.P = (np.eye(13) - K @ H) @ self.P
self.last_update_time = timestamp_ms
def get_state(self) -> LimbState:
"""Get current fused state."""
state = LimbState(
position=self.x[0:3].copy(),
quaternion=self.x[3:7].copy(),
linear_velocity=self.x[7:10].copy(),
angular_velocity=self.x[10:13].copy(),
orientation_source="fused",
position_source="fused",
position_covariance=self.P[0:3, 0:3].copy(),
orientation_covariance=self.P[3:7, 3:7].copy(),
confidence=self._compute_confidence(),
)
# Apply smoothing
if self._smoothed_state is not None:
alpha = self.config.output_smoothing
state.position = (1 - alpha) * state.position + alpha * self._smoothed_state.position
state.quaternion = self._slerp(
self._smoothed_state.quaternion,
state.quaternion,
1 - alpha
)
self._smoothed_state = state
return state
def _compute_confidence(self) -> float:
"""Compute confidence from state covariance."""
pos_var = np.trace(self.P[0:3, 0:3])
orient_var = np.trace(self.P[3:7, 3:7])
# Lower variance = higher confidence
confidence = 1.0 / (1.0 + pos_var + orient_var)
return float(np.clip(confidence, 0, 1))
@staticmethod
def _quaternion_multiply(q1: np.ndarray, q2: np.ndarray) -> np.ndarray:
"""Multiply two quaternions."""
w1, x1, y1, z1 = q1
w2, x2, y2, z2 = q2
return np.array([
w1*w2 - x1*x2 - y1*y2 - z1*z2,
w1*x2 + x1*w2 + y1*z2 - z1*y2,
w1*y2 - x1*z2 + y1*w2 + z1*x2,
w1*z2 + x1*y2 - y1*x2 + z1*w2,
])
@staticmethod
def _slerp(q1: np.ndarray, q2: np.ndarray, t: float) -> np.ndarray:
"""Spherical linear interpolation between quaternions."""
dot = np.dot(q1, q2)
if dot < 0:
q2 = -q2
dot = -dot
if dot > 0.9995:
result = q1 + t * (q2 - q1)
return result / np.linalg.norm(result)
theta = np.arccos(dot)
sin_theta = np.sin(theta)
s1 = np.sin((1 - t) * theta) / sin_theta
s2 = np.sin(t * theta) / sin_theta
return s1 * q1 + s2 * q2
class SensorFusionEngine:
"""
Main fusion engine coordinating all limb filters.
Usage:
fusion = SensorFusionEngine()
# For each frame:
fusion.update(unified_frame)
fused_skeleton = fusion.get_fused_skeleton()
"""
LIMB_NAMES = [
'hip', 'head',
'left_shoulder', 'right_shoulder',
'left_elbow', 'right_elbow',
'left_wrist', 'right_wrist',
'left_hip', 'right_hip',
'left_knee', 'right_knee',
'left_ankle', 'right_ankle',
]
def __init__(self, config: Optional[FusionConfig] = None):
self.config = config or FusionConfig()
# Create Kalman filter per limb
self.filters: Dict[str, LimbKalmanFilter] = {
name: LimbKalmanFilter(name, self.config)
for name in self.LIMB_NAMES
}
# Limb-specific fusion rules
self.fusion_rules = self._build_fusion_rules()
# Last frame time for dt calculation
self._last_frame_time: Optional[float] = None
def _build_fusion_rules(self) -> Dict[str, dict]:
"""
Build per-limb fusion rules.
Some limbs trust IMU more (orientation), others trust vision more (position).
"""
return {
# Core body - IMU primary for orientation
'hip': {'orientation': 'mocopi', 'position': 'mocopi'},
'head': {'orientation': 'mocopi', 'position': 'fused'},
# Shoulders - blend
'left_shoulder': {'orientation': 'fused', 'position': 'mediapipe'},
'right_shoulder': {'orientation': 'fused', 'position': 'mediapipe'},
# Elbows - vision primary (no IMU here in base kit)
'left_elbow': {'orientation': 'mediapipe', 'position': 'mediapipe'},
'right_elbow': {'orientation': 'mediapipe', 'position': 'mediapipe'},
# Wrists - IMU orientation, fused position
'left_wrist': {'orientation': 'mocopi', 'position': 'fused'},
'right_wrist': {'orientation': 'mocopi', 'position': 'fused'},
# Hips (leg) - blend
'left_hip': {'orientation': 'fused', 'position': 'mediapipe'},
'right_hip': {'orientation': 'fused', 'position': 'mediapipe'},
# Knees - vision primary
'left_knee': {'orientation': 'mediapipe', 'position': 'mediapipe'},
'right_knee': {'orientation': 'mediapipe', 'position': 'mediapipe'},
# Ankles - IMU orientation
'left_ankle': {'orientation': 'mocopi', 'position': 'fused'},
'right_ankle': {'orientation': 'mocopi', 'position': 'fused'},
}
def update(self, frame: 'UnifiedFrame'):
"""
Update all filters with new frame data.
"""
# Compute dt
dt = 1/60.0 # Default
if self._last_frame_time is not None:
dt = (frame.timestamp_ms - self._last_frame_time) / 1000.0
dt = np.clip(dt, 0.001, 0.1) # Clamp to reasonable range
self._last_frame_time = frame.timestamp_ms
# Predict all filters forward
for kf in self.filters.values():
kf.predict(dt)
# Update with Mocopi data
if frame.has_mocopi and frame.mocopi_frame:
self._update_from_mocopi(frame.mocopi_frame, frame.timestamp_ms)
# Update with MediaPipe data
if frame.has_mediapipe and frame.mediapipe_frame:
self._update_from_mediapipe(frame.mediapipe_frame, frame.timestamp_ms)
def _update_from_mocopi(self, mocopi: 'PoseFrame', timestamp_ms: float):
"""Extract limb data from Mocopi and update filters."""
from cc_core.skeleton.mapping import MOCOPI_LIMB_BONE_MAP
for limb_name, bone_id in MOCOPI_LIMB_BONE_MAP.items():
if limb_name not in self.filters:
continue
if bone_id not in mocopi.bones:
continue
bone = mocopi.bones[bone_id]
self.filters[limb_name].update_mocopi(
position=bone.position,
quaternion=bone.quaternion,
timestamp_ms=timestamp_ms,
)
def _update_from_mediapipe(self, mp: 'MediaPipeFrame', timestamp_ms: float):
"""Extract limb data from MediaPipe and update filters."""
from cc_core.fusion.transforms.mediapipe_to_bones import MediaPipeToBones
converter = MediaPipeToBones()
limb_data = converter.extract_limbs(mp)
for limb_name, data in limb_data.items():
if limb_name not in self.filters:
continue
self.filters[limb_name].update_mediapipe(
position=data['position'],
quaternion=data.get('quaternion'),
timestamp_ms=timestamp_ms,
confidence=data.get('confidence', 0.5),
)
def get_fused_skeleton(self) -> Dict[str, LimbState]:
"""Get the current fused skeleton state."""
return {
name: kf.get_state()
for name, kf in self.filters.items()
}2.3 MediaPipe to Bones Transform
# core/cc-core/cc_core/fusion/transforms/mediapipe_to_bones.py
"""
Transform MediaPipe landmarks to bone-compatible format.
Converts MediaPipe's 33 pose landmarks into position and derived
orientation data that can be fed into the fusion engine.
"""
from typing import Dict, Optional
import numpy as np
# MediaPipe pose landmark indices
class PoseLandmark:
NOSE = 0
LEFT_EYE_INNER = 1
LEFT_EYE = 2
LEFT_EYE_OUTER = 3
RIGHT_EYE_INNER = 4
RIGHT_EYE = 5
RIGHT_EYE_OUTER = 6
LEFT_EAR = 7
RIGHT_EAR = 8
MOUTH_LEFT = 9
MOUTH_RIGHT = 10
LEFT_SHOULDER = 11
RIGHT_SHOULDER = 12
LEFT_ELBOW = 13
RIGHT_ELBOW = 14
LEFT_WRIST = 15
RIGHT_WRIST = 16
LEFT_PINKY = 17
RIGHT_PINKY = 18
LEFT_INDEX = 19
RIGHT_INDEX = 20
LEFT_THUMB = 21
RIGHT_THUMB = 22
LEFT_HIP = 23
RIGHT_HIP = 24
LEFT_KNEE = 25
RIGHT_KNEE = 26
LEFT_ANKLE = 27
RIGHT_ANKLE = 28
LEFT_HEEL = 29
RIGHT_HEEL = 30
LEFT_FOOT_INDEX = 31
RIGHT_FOOT_INDEX = 32
class MediaPipeToBones:
"""
Convert MediaPipe landmarks to bone transforms.
"""
# Mapping: fusion limb name -> (primary landmark, secondary for orientation)
LIMB_LANDMARKS = {
'hip': (PoseLandmark.LEFT_HIP, PoseLandmark.RIGHT_HIP),
'head': (PoseLandmark.NOSE, PoseLandmark.LEFT_EAR),
'left_shoulder': (PoseLandmark.LEFT_SHOULDER, PoseLandmark.LEFT_ELBOW),
'right_shoulder': (PoseLandmark.RIGHT_SHOULDER, PoseLandmark.RIGHT_ELBOW),
'left_elbow': (PoseLandmark.LEFT_ELBOW, PoseLandmark.LEFT_WRIST),
'right_elbow': (PoseLandmark.RIGHT_ELBOW, PoseLandmark.RIGHT_WRIST),
'left_wrist': (PoseLandmark.LEFT_WRIST, PoseLandmark.LEFT_INDEX),
'right_wrist': (PoseLandmark.RIGHT_WRIST, PoseLandmark.RIGHT_INDEX),
'left_hip': (PoseLandmark.LEFT_HIP, PoseLandmark.LEFT_KNEE),
'right_hip': (PoseLandmark.RIGHT_HIP, PoseLandmark.RIGHT_KNEE),
'left_knee': (PoseLandmark.LEFT_KNEE, PoseLandmark.LEFT_ANKLE),
'right_knee': (PoseLandmark.RIGHT_KNEE, PoseLandmark.RIGHT_ANKLE),
'left_ankle': (PoseLandmark.LEFT_ANKLE, PoseLandmark.LEFT_FOOT_INDEX),
'right_ankle': (PoseLandmark.RIGHT_ANKLE, PoseLandmark.RIGHT_FOOT_INDEX),
}
def __init__(self, use_world_landmarks: bool = True):
"""
Args:
use_world_landmarks: Use metric 3D coordinates if available
"""
self.use_world_landmarks = use_world_landmarks
def extract_limbs(self, frame: 'MediaPipeFrame') -> Dict[str, dict]:
"""
Extract limb data from MediaPipe frame.
Returns:
Dict mapping limb names to {position, quaternion, confidence}
"""
# Select landmark source
if self.use_world_landmarks and frame.pose_world_landmarks is not None:
landmarks = frame.pose_world_landmarks
confidence_source = frame.pose_landmarks # Get visibility from normalized
else:
landmarks = frame.pose_landmarks
confidence_source = landmarks
if landmarks is None:
return {}
result = {}
for limb_name, (primary_idx, secondary_idx) in self.LIMB_LANDMARKS.items():
# Get primary landmark position
if limb_name == 'hip':
# Hip is midpoint of left/right
left_hip = landmarks[PoseLandmark.LEFT_HIP]
right_hip = landmarks[PoseLandmark.RIGHT_HIP]
position = (left_hip[:3] + right_hip[:3]) / 2
confidence = min(
self._get_visibility(confidence_source, PoseLandmark.LEFT_HIP),
self._get_visibility(confidence_source, PoseLandmark.RIGHT_HIP)
)
else:
position = landmarks[primary_idx][:3]
confidence = self._get_visibility(confidence_source, primary_idx)
# Derive orientation from limb direction
quaternion = self._derive_quaternion(
landmarks, primary_idx, secondary_idx
)
result[limb_name] = {
'position': np.array(position, dtype=np.float32),
'quaternion': quaternion,
'confidence': confidence,
}
return result
def _get_visibility(self, landmarks: np.ndarray, idx: int) -> float:
"""Get landmark visibility/confidence."""
if landmarks is None or idx >= len(landmarks):
return 0.0
if landmarks.shape[1] > 3:
return float(landmarks[idx, 3])
return 0.5 # Default if no visibility data
def _derive_quaternion(
self,
landmarks: np.ndarray,
from_idx: int,
to_idx: int,
) -> Optional[np.ndarray]:
"""
Derive orientation quaternion from limb direction.
This creates a quaternion that rotates the Y-axis to align
with the direction from `from_idx` to `to_idx`.
"""
from_pos = landmarks[from_idx][:3]
to_pos = landmarks[to_idx][:3]
direction = to_pos - from_pos
length = np.linalg.norm(direction)
if length < 0.001:
return None # Undefined orientation
direction = direction / length
# Create rotation from Y-axis to direction
y_axis = np.array([0, 1, 0])
# Handle parallel case
dot = np.dot(y_axis, direction)
if abs(dot) > 0.9999:
if dot > 0:
return np.array([1, 0, 0, 0]) # Identity
else:
return np.array([0, 1, 0, 0]) # 180 deg around X
# Rotation axis and angle
axis = np.cross(y_axis, direction)
axis = axis / np.linalg.norm(axis)
angle = np.arccos(np.clip(dot, -1, 1))
# Axis-angle to quaternion
half_angle = angle / 2
w = np.cos(half_angle)
xyz = axis * np.sin(half_angle)
return np.array([w, xyz[0], xyz[1], xyz[2]], dtype=np.float32)---
3. Frontend Integration
3.1 WebSocket Bridge for MediaPipe
To enable the Python fusion engine to receive MediaPipe data from the web dashboard, we need a WebSocket bridge.
// apps/web/cc-dashboard/src/lib/mediapipe/MediaPipeBridge.ts
/**
* WebSocket bridge to stream MediaPipe data to Python backend.
*
* This enables the fusion engine to receive camera-based landmarks
* alongside Mocopi IMU data.
*/
import { HolisticPipeline } from './pipeline'
import type { HolisticResult } from './types'
export interface BridgeConfig {
wsUrl: string
reconnectInterval: number
binaryMode: boolean
}
export class MediaPipeBridge {
private pipeline: HolisticPipeline
private ws: WebSocket | null = null
private config: BridgeConfig
private reconnectTimer: number | null = null
private frameCounter = 0
constructor(pipeline: HolisticPipeline, config: Partial<BridgeConfig> = {}) {
this.pipeline = pipeline
this.config = {
wsUrl: config.wsUrl ?? 'ws://localhost:8766/mediapipe',
reconnectInterval: config.reconnectInterval ?? 2000,
binaryMode: config.binaryMode ?? false,
}
// Listen to pipeline frames
this.pipeline.on('frame', this.handleFrame.bind(this))
}
connect(): void {
if (this.ws?.readyState === WebSocket.OPEN) return
try {
this.ws = new WebSocket(this.config.wsUrl)
this.ws.onopen = () => {
console.log('[MediaPipeBridge] Connected to fusion backend')
this.sendHandshake()
}
this.ws.onclose = () => {
console.log('[MediaPipeBridge] Disconnected, reconnecting...')
this.scheduleReconnect()
}
this.ws.onerror = (error) => {
console.error('[MediaPipeBridge] WebSocket error:', error)
}
} catch (error) {
console.error('[MediaPipeBridge] Connection failed:', error)
this.scheduleReconnect()
}
}
disconnect(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
if (this.ws) {
this.ws.close()
this.ws = null
}
}
private sendHandshake(): void {
this.send({
type: 'handshake',
source: 'mediapipe',
version: '1.0',
capabilities: {
pose: true,
face: true,
hands: true,
worldLandmarks: true,
},
})
}
private handleFrame(result: HolisticResult): void {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return
this.frameCounter++
if (this.config.binaryMode) {
this.sendBinary(result)
} else {
this.sendJSON(result)
}
}
private sendJSON(result: HolisticResult): void {
const message = {
type: 'frame',
timestamp_ms: result.timestamp_ms,
frame_number: this.frameCounter,
poseLandmarks: result.poseLandmarks,
poseWorldLandmarks: result.poseWorldLandmarks,
faceLandmarks: result.faceLandmarks,
leftHandLandmarks: result.leftHandLandmarks,
rightHandLandmarks: result.rightHandLandmarks,
confidence: result.confidence,
}
this.send(message)
}
private sendBinary(result: HolisticResult): void {
// Binary format for lower latency
// Header: 8 bytes (timestamp_ms as float64)
// Frame number: 4 bytes (uint32)
// Pose landmarks: 33 * 4 * 4 = 528 bytes
// Face landmarks: 468 * 2 * 4 = 3744 bytes (optional)
// Hands: 21 * 4 * 4 * 2 = 672 bytes (optional)
const buffer = new ArrayBuffer(8 + 4 + 528)
const view = new DataView(buffer)
// Header
view.setFloat64(0, result.timestamp_ms, true)
view.setUint32(8, this.frameCounter, true)
// Pose landmarks
if (result.poseLandmarks) {
let offset = 12
for (const lm of result.poseLandmarks) {
view.setFloat32(offset, lm.x, true)
view.setFloat32(offset + 4, lm.y, true)
view.setFloat32(offset + 8, lm.z, true)
view.setFloat32(offset + 12, lm.visibility ?? 0, true)
offset += 16
}
}
this.ws!.send(buffer)
}
private send(data: unknown): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data))
}
}
private scheduleReconnect(): void {
if (this.reconnectTimer) return
this.reconnectTimer = window.setTimeout(() => {
this.reconnectTimer = null
this.connect()
}, this.config.reconnectInterval)
}
}3.2 Zustand Store Update
// apps/web/cc-dashboard/src/store/fusionStore.ts
/**
* Zustand store for sensor fusion state.
*/
import { create } from 'zustand'
interface FusionState {
// Connection status
mocopiConnected: boolean
mediapipeConnected: boolean
fusionActive: boolean
// Per-limb status
limbStatus: Record<string, {
source: 'mocopi' | 'mediapipe' | 'fused'
confidence: number
occluded: boolean
}>
// Quality metrics
fusionQuality: number
mocopiLatency: number
mediapipeLatency: number
syncOffset: number
// Actions
setMocopiConnected: (connected: boolean) => void
setMediapipeConnected: (connected: boolean) => void
setLimbStatus: (limb: string, status: FusionState['limbStatus'][string]) => void
updateMetrics: (metrics: Partial<Pick<FusionState,
'fusionQuality' | 'mocopiLatency' | 'mediapipeLatency' | 'syncOffset'
>>) => void
}
export const useFusionStore = create<FusionState>((set) => ({
mocopiConnected: false,
mediapipeConnected: false,
fusionActive: false,
limbStatus: {},
fusionQuality: 0,
mocopiLatency: 0,
mediapipeLatency: 0,
syncOffset: 0,
setMocopiConnected: (connected) => set({
mocopiConnected: connected,
fusionActive: connected && useFusionStore.getState().mediapipeConnected,
}),
setMediapipeConnected: (connected) => set({
mediapipeConnected: connected,
fusionActive: connected && useFusionStore.getState().mocopiConnected,
}),
setLimbStatus: (limb, status) => set((state) => ({
limbStatus: { ...state.limbStatus, [limb]: status },
})),
updateMetrics: (metrics) => set(metrics),
}))---
4. Capture Session API
4.1 Session Manager
# core/cc-core/cc_core/fusion/capture/session_manager.py
"""
Manage dance capture sessions with audio synchronization.
"""
from __future__ import annotations
import asyncio
import json
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import List, Optional
import numpy as np
from cc_core.fusion.unified_receiver import UnifiedReceiver, UnifiedFrame
from cc_core.fusion.kalman_fusion import SensorFusionEngine
@dataclass
class CaptureSession:
"""A recording session capturing motion for audio phrases."""
session_id: str
performer_id: str
created_at: datetime = field(default_factory=datetime.now)
# Configuration
output_dir: Path = field(default_factory=lambda: Path("sessions"))
target_fps: float = 60.0
# State
is_recording: bool = False
current_phrase_id: Optional[str] = None
current_take: int = 0
# Captured data
frames: List[UnifiedFrame] = field(default_factory=list)
fused_skeletons: List[dict] = field(default_factory=list)
# Metadata
phrases_recorded: List[str] = field(default_factory=list)
takes_per_phrase: dict = field(default_factory=dict)
class SessionManager:
"""
Manage capture sessions for the dance data collection pipeline.
Usage:
manager = SessionManager()
session = await manager.create_session("performer_1")
# For each phrase:
await manager.start_recording(session, "phrase_007", audio_duration=8.2)
# ... performer dances ...
take = await manager.stop_recording(session)
if take.is_valid:
await manager.accept_take(session, take)
else:
await manager.reject_take(session, take)
"""
def __init__(
self,
mocopi_port: int = 12351,
mediapipe_ws_url: str = "ws://localhost:8766/mediapipe",
output_base: Path = Path("data/captures"),
):
self.mocopi_port = mocopi_port
self.mediapipe_ws_url = mediapipe_ws_url
self.output_base = output_base
# Active components
self._receiver: Optional[UnifiedReceiver] = None
self._fusion: Optional[SensorFusionEngine] = None
self._recording_task: Optional[asyncio.Task] = None
async def create_session(
self,
performer_id: str,
session_id: Optional[str] = None,
) -> CaptureSession:
"""Create a new capture session."""
if session_id is None:
session_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
session = CaptureSession(
session_id=session_id,
performer_id=performer_id,
output_dir=self.output_base / session_id,
)
# Create output directory
session.output_dir.mkdir(parents=True, exist_ok=True)
# Initialize receiver and fusion engine
self._receiver = UnifiedReceiver(
mocopi_port=self.mocopi_port,
mediapipe_ws_url=self.mediapipe_ws_url,
session_id=session_id,
target_fps=session.target_fps,
)
self._fusion = SensorFusionEngine()
await self._receiver.start()
return session
async def start_recording(
self,
session: CaptureSession,
phrase_id: str,
audio_duration: float,
countdown_seconds: float = 3.0,
) -> None:
"""Start recording motion for a phrase."""
if session.is_recording:
raise RuntimeError("Already recording")
session.current_phrase_id = phrase_id
session.current_take = session.takes_per_phrase.get(phrase_id, 0) + 1
session.frames = []
session.fused_skeletons = []
session.is_recording = True
# Calculate total recording time
total_duration = countdown_seconds + audio_duration + 1.0 # 1s buffer
# Start recording task
self._recording_task = asyncio.create_task(
self._record_frames(session, total_duration)
)
async def stop_recording(self, session: CaptureSession) -> 'Take':
"""Stop recording and return the captured take."""
session.is_recording = False
if self._recording_task:
self._recording_task.cancel()
try:
await self._recording_task
except asyncio.CancelledError:
pass
# Build take from captured frames
take = Take(
phrase_id=session.current_phrase_id,
take_number=session.current_take,
frames=session.frames.copy(),
fused_skeletons=session.fused_skeletons.copy(),
session_id=session.session_id,
)
# Validate take
take.validate()
return take
async def accept_take(self, session: CaptureSession, take: 'Take') -> Path:
"""Accept a take and save it to disk."""
# Update metadata
session.phrases_recorded.append(take.phrase_id)
session.takes_per_phrase[take.phrase_id] = take.take_number
# Save take
output_path = session.output_dir / f"{take.phrase_id}_take_{take.take_number}.npz"
take.save(output_path)
return output_path
async def reject_take(self, session: CaptureSession, take: 'Take') -> None:
"""Reject a take (don't save)."""
# Just clear - take is garbage collected
pass
async def end_session(self, session: CaptureSession) -> dict:
"""End the session and return summary."""
await self._receiver.stop()
# Save session metadata
metadata = {
'session_id': session.session_id,
'performer_id': session.performer_id,
'created_at': session.created_at.isoformat(),
'phrases_recorded': session.phrases_recorded,
'takes_per_phrase': session.takes_per_phrase,
'total_phrases': len(session.phrases_recorded),
}
with open(session.output_dir / 'session_metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
return metadata
async def _record_frames(self, session: CaptureSession, duration: float):
"""Background task to record frames."""
import time
start_time = time.perf_counter()
async for frame in self._receiver.stream():
if not session.is_recording:
break
# Check duration
elapsed = time.perf_counter() - start_time
if elapsed > duration:
break
# Store raw frame
session.frames.append(frame)
# Fuse and store
self._fusion.update(frame)
fused = self._fusion.get_fused_skeleton()
session.fused_skeletons.append(fused)
@dataclass
class Take:
"""A single recording take of a phrase."""
phrase_id: str
take_number: int
frames: List[UnifiedFrame]
fused_skeletons: List[dict]
session_id: str
# Validation results
is_valid: bool = False
validation_report: dict = field(default_factory=dict)
def validate(self):
"""Run validation checks on the take."""
from cc_core.fusion.capture.validator import TakeValidator
validator = TakeValidator()
self.is_valid, self.validation_report = validator.validate(self)
def save(self, path: Path):
"""Save take to disk."""
# Convert to numpy arrays
fused_array = self._skeletons_to_array()
np.savez_compressed(
path,
fused_motion=fused_array,
frame_count=len(self.frames),
phrase_id=self.phrase_id,
take_number=self.take_number,
is_valid=self.is_valid,
)
def _skeletons_to_array(self) -> np.ndarray:
"""Convert list of skeleton dicts to numpy array."""
from cc_core.fusion.transforms.to_63d import Extended63DTransformer
transformer = Extended63DTransformer()
frames = []
for skeleton in self.fused_skeletons:
frame_63d = transformer.transform_skeleton(skeleton)
frames.append(frame_63d)
return np.array(frames, dtype=np.float32)---
5. Next Steps
Immediate Implementation Tasks
1. Create fusion module structure
mkdir -p core/cc-core/cc_core/fusion/{transforms,capture}
touch core/cc-core/cc_core/fusion/__init__.py2. Implement UnifiedReceiver (Week 1)
- Port the code from this spec
- Add unit tests
- Verify Mocopi + simulated MediaPipe
3. Implement KalmanFusion (Week 2)
- Port the EKF code
- Tune parameters with real sensor data
- Add visualization for debugging
4. Build WebSocket bridge (Week 2)
- Add MediaPipeBridge to dashboard
- Test end-to-end streaming
5. Implement SessionManager (Week 3)
- Port capture session code
- Build minimal UI (Streamlit)
- Record first test sessions
6. Training integration (Week 4)
- Implement 63D transformer
- Update CC-MotionGen dataset class
- Run comparison training
---
Document Version: 1.0
Last Updated: December 2024
Author: Comp-Core Team
Promotion Decision
Attach run IDs, datasets, metrics, and reproduction commands.
Source Anchor
projects/Documentation/03-guides/development/FUSION_IMPLEMENTATION_SPEC.md
Detected Structure
Method · Evaluation · Code Anchors · Architecture