Grand Diomande Research · Full HTML Reader

Sensor Fusion Implementation Specification

This document provides detailed implementation specifications for the Mocopi + MediaPipe sensor fusion system. It includes code scaffolds, API contracts, and integration points with existing Comp-Core infrastructure.

Embodied Trajectory Systems research note experiment writeup candidate score 32 .md

Full Public Reader

Sensor Fusion Implementation Specification

Version 1.0 | December 2024
Status: Ready for Implementation

---

Overview

This document provides detailed implementation specifications for the Mocopi + MediaPipe sensor fusion system. It includes code scaffolds, API contracts, and integration points with existing Comp-Core infrastructure.

---

1. Module Structure

core/cc-core/cc_core/fusion/
├── __init__.py
├── unified_receiver.py      # Combined Mocopi + MediaPipe ingestion
├── temporal_aligner.py      # Clock synchronization
├── kalman_fusion.py         # Per-limb Kalman filter
├── occlusion_handler.py     # Dropout recovery
├── skeleton_builder.py      # Unified skeleton output
├── transforms/
│   ├── __init__.py
│   ├── to_25d.py            # Standard CC-MotionGen format
│   ├── to_63d.py            # Extended format with face/hands
│   └── mediapipe_to_bones.py # MediaPipe landmarks → bone transforms
└── capture/
    ├── __init__.py
    ├── session_manager.py   # Recording sessions
    ├── audio_sync.py        # Audio-motion synchronization
    ├── take_manager.py      # Multiple takes per phrase
    └── validator.py         # Quality validation

---

2. Core Classes

2.1 UnifiedReceiver

python
# core/cc-core/cc_core/fusion/unified_receiver.py

"""
Unified receiver for Mocopi and MediaPipe data streams.

This module provides a single entry point for receiving motion data
from both IMU sensors (Mocopi) and camera-based tracking (MediaPipe),
outputting a combined stream ready for sensor fusion.
"""

from __future__ import annotations

import asyncio
from dataclasses import dataclass, field
from typing import AsyncIterator, Callable, Optional
from enum import Enum

import numpy as np

from cc_core.integrations.mocopi_bridge import MocopiReceiver, PoseFrame
from cc_core.skeleton.mapping import MocopiToCC


class StreamSource(Enum):
    MOCOPI = "mocopi"
    MEDIAPIPE = "mediapipe"
    FUSED = "fused"


@dataclass
class MediaPipeFrame:
    """
    MediaPipe holistic result in a format compatible with fusion.
    """
    timestamp_ms: float
    frame_number: int

    # Pose landmarks (33 points)
    pose_landmarks: Optional[np.ndarray] = None  # [33, 4] (x, y, z, vis)
    pose_world_landmarks: Optional[np.ndarray] = None  # [33, 3] metric

    # Face landmarks (468 points)
    face_landmarks: Optional[np.ndarray] = None  # [468, 2] (x, y)

    # Hand landmarks (21 points each)
    left_hand_landmarks: Optional[np.ndarray] = None  # [21, 4]
    right_hand_landmarks: Optional[np.ndarray] = None  # [21, 4]

    # Confidence scores
    confidence: dict = field(default_factory=lambda: {
        'pose': 0.0, 'face': 0.0, 'left_hand': 0.0, 'right_hand': 0.0
    })


@dataclass
class UnifiedFrame:
    """
    Combined frame from both sensors ready for fusion.
    """
    timestamp_ms: float
    frame_number: int
    session_id: str

    # Raw data from each source
    mocopi_frame: Optional[PoseFrame] = None
    mediapipe_frame: Optional[MediaPipeFrame] = None

    # Availability flags
    has_mocopi: bool = False
    has_mediapipe: bool = False

    # Temporal alignment info
    mocopi_lag_ms: float = 0.0
    mediapipe_lag_ms: float = 0.0


class UnifiedReceiver:
    """
    Unified receiver that combines Mocopi and MediaPipe streams.

    Usage:
        async with UnifiedReceiver() as receiver:
            async for frame in receiver.stream():
                # frame contains both Mocopi and MediaPipe data
                fused = fusion_engine.fuse(frame)
    """

    def __init__(
        self,
        mocopi_port: int = 12351,
        mediapipe_ws_url: str = "ws://localhost:8766/mediapipe",
        session_id: str = None,
        target_fps: float = 60.0,
    ):
        self.mocopi_port = mocopi_port
        self.mediapipe_ws_url = mediapipe_ws_url
        self.session_id = session_id or self._generate_session_id()
        self.target_fps = target_fps

        # Internal receivers
        self._mocopi_receiver: Optional[MocopiReceiver] = None
        self._mediapipe_ws = None

        # Buffers for alignment
        self._mocopi_buffer: asyncio.Queue = asyncio.Queue(maxsize=10)
        self._mediapipe_buffer: asyncio.Queue = asyncio.Queue(maxsize=5)

        # Timing
        self._session_start: Optional[float] = None
        self._frame_counter = 0

        # Callbacks
        self._on_mocopi_frame: list[Callable] = []
        self._on_mediapipe_frame: list[Callable] = []
        self._on_unified_frame: list[Callable] = []

    async def __aenter__(self):
        await self.start()
        return self

    async def __aexit__(self, *args):
        await self.stop()

    async def start(self):
        """Start receiving from both sources."""
        import time
        self._session_start = time.perf_counter()

        # Start Mocopi receiver
        self._mocopi_receiver = MocopiReceiver(port=self.mocopi_port)
        asyncio.create_task(self._receive_mocopi())

        # Start MediaPipe WebSocket connection
        asyncio.create_task(self._receive_mediapipe())

    async def stop(self):
        """Stop all receivers."""
        if self._mocopi_receiver:
            await self._mocopi_receiver.close()
        if self._mediapipe_ws:
            await self._mediapipe_ws.close()

    async def stream(self) -> AsyncIterator[UnifiedFrame]:
        """
        Yield unified frames at target FPS.

        Aligns Mocopi (60Hz) and MediaPipe (30Hz) streams
        to produce unified frames at the target rate.
        """
        import time
        frame_interval = 1.0 / self.target_fps

        while True:
            # Calculate target timestamp
            elapsed = time.perf_counter() - self._session_start
            target_ts = elapsed * 1000  # ms

            # Get latest frames from each source
            mocopi_frame = await self._get_nearest_mocopi(target_ts)
            mediapipe_frame = await self._get_nearest_mediapipe(target_ts)

            # Build unified frame
            unified = UnifiedFrame(
                timestamp_ms=target_ts,
                frame_number=self._frame_counter,
                session_id=self.session_id,
                mocopi_frame=mocopi_frame,
                mediapipe_frame=mediapipe_frame,
                has_mocopi=mocopi_frame is not None,
                has_mediapipe=mediapipe_frame is not None,
                mocopi_lag_ms=abs(target_ts - mocopi_frame.t_device) if mocopi_frame else 0,
                mediapipe_lag_ms=abs(target_ts - mediapipe_frame.timestamp_ms) if mediapipe_frame else 0,
            )

            self._frame_counter += 1

            # Fire callbacks
            for cb in self._on_unified_frame:
                cb(unified)

            yield unified

            # Wait for next frame
            await asyncio.sleep(frame_interval)

    async def _receive_mocopi(self):
        """Background task to receive Mocopi frames."""
        async for frame in self._mocopi_receiver.stream():
            try:
                self._mocopi_buffer.put_nowait(frame)
            except asyncio.QueueFull:
                # Drop oldest frame
                try:
                    self._mocopi_buffer.get_nowait()
                    self._mocopi_buffer.put_nowait(frame)
                except:
                    pass

            for cb in self._on_mocopi_frame:
                cb(frame)

    async def _receive_mediapipe(self):
        """Background task to receive MediaPipe frames via WebSocket."""
        import websockets
        import json

        while True:
            try:
                async with websockets.connect(self.mediapipe_ws_url) as ws:
                    self._mediapipe_ws = ws
                    async for message in ws:
                        frame = self._parse_mediapipe_message(message)
                        try:
                            self._mediapipe_buffer.put_nowait(frame)
                        except asyncio.QueueFull:
                            try:
                                self._mediapipe_buffer.get_nowait()
                                self._mediapipe_buffer.put_nowait(frame)
                            except:
                                pass

                        for cb in self._on_mediapipe_frame:
                            cb(frame)

            except Exception as e:
                print(f"MediaPipe connection error: {e}, reconnecting...")
                await asyncio.sleep(2.0)

    def _parse_mediapipe_message(self, message: str | bytes) -> MediaPipeFrame:
        """Parse MediaPipe WebSocket message to MediaPipeFrame."""
        import json

        if isinstance(message, bytes):
            return self._parse_mediapipe_binary(message)

        data = json.loads(message)
        return MediaPipeFrame(
            timestamp_ms=data.get('timestamp_ms', 0),
            frame_number=data.get('frame_number', 0),
            pose_landmarks=np.array(data['poseLandmarks']) if data.get('poseLandmarks') else None,
            pose_world_landmarks=np.array(data['poseWorldLandmarks']) if data.get('poseWorldLandmarks') else None,
            face_landmarks=np.array(data['faceLandmarks']) if data.get('faceLandmarks') else None,
            left_hand_landmarks=np.array(data['leftHandLandmarks']) if data.get('leftHandLandmarks') else None,
            right_hand_landmarks=np.array(data['rightHandLandmarks']) if data.get('rightHandLandmarks') else None,
            confidence=data.get('confidence', {}),
        )

    async def _get_nearest_mocopi(self, target_ts: float) -> Optional[PoseFrame]:
        """Get the Mocopi frame nearest to target timestamp."""
        best_frame = None
        best_diff = float('inf')

        # Drain buffer and find nearest
        frames = []
        while not self._mocopi_buffer.empty():
            try:
                frame = self._mocopi_buffer.get_nowait()
                frames.append(frame)
                diff = abs(frame.t_device - target_ts)
                if diff < best_diff:
                    best_diff = diff
                    best_frame = frame
            except:
                break

        # Put back frames that are still useful (future frames)
        for f in frames:
            if f.t_device > target_ts - 50:  # Keep frames within 50ms
                try:
                    self._mocopi_buffer.put_nowait(f)
                except:
                    pass

        return best_frame

    async def _get_nearest_mediapipe(self, target_ts: float) -> Optional[MediaPipeFrame]:
        """Get the MediaPipe frame nearest to target timestamp."""
        # Similar logic to _get_nearest_mocopi
        best_frame = None
        best_diff = float('inf')

        frames = []
        while not self._mediapipe_buffer.empty():
            try:
                frame = self._mediapipe_buffer.get_nowait()
                frames.append(frame)
                diff = abs(frame.timestamp_ms - target_ts)
                if diff < best_diff:
                    best_diff = diff
                    best_frame = frame
            except:
                break

        for f in frames:
            if f.timestamp_ms > target_ts - 100:
                try:
                    self._mediapipe_buffer.put_nowait(f)
                except:
                    pass

        return best_frame

    @staticmethod
    def _generate_session_id() -> str:
        import uuid
        from datetime import datetime
        return f"fusion_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}"

2.2 KalmanFusion

python
# core/cc-core/cc_core/fusion/kalman_fusion.py

"""
Kalman filter-based sensor fusion for combining Mocopi and MediaPipe data.

This module implements an Extended Kalman Filter (EKF) for each limb,
fusing IMU orientation data from Mocopi with position data from MediaPipe.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Optional, Tuple

import numpy as np
from scipy.spatial.transform import Rotation


@dataclass
class FusionConfig:
    """Configuration for Kalman fusion."""

    # Mocopi measurement weights
    mocopi_orientation_confidence: float = 0.95
    mocopi_position_confidence: float = 0.70

    # MediaPipe measurement weights
    mediapipe_position_confidence: float = 0.80
    mediapipe_orientation_confidence: float = 0.40

    # Process noise (state transition uncertainty)
    position_process_noise: float = 0.01
    orientation_process_noise: float = 0.001
    velocity_process_noise: float = 0.1

    # Smoothing factor for output (0 = no smoothing, 1 = max smoothing)
    output_smoothing: float = 0.3


@dataclass
class LimbState:
    """State of a single limb after fusion."""

    position: np.ndarray  # [x, y, z]
    quaternion: np.ndarray  # [qw, qx, qy, qz]
    linear_velocity: np.ndarray  # [vx, vy, vz]
    angular_velocity: np.ndarray  # [wx, wy, wz]

    # Source attribution
    orientation_source: str  # "mocopi" | "mediapipe" | "fused"
    position_source: str

    # Uncertainty
    position_covariance: np.ndarray  # [3, 3]
    orientation_covariance: np.ndarray  # [4, 4]

    confidence: float


class LimbKalmanFilter:
    """
    Extended Kalman Filter for a single limb.

    State vector (13D):
    [0:3]  - Position (x, y, z)
    [3:7]  - Orientation quaternion (qw, qx, qy, qz)
    [7:10] - Linear velocity (vx, vy, vz)
    [10:13] - Angular velocity (wx, wy, wz)
    """

    def __init__(self, limb_name: str, config: FusionConfig):
        self.limb_name = limb_name
        self.config = config

        # State vector
        self.x = np.zeros(13)
        self.x[3] = 1.0  # Identity quaternion (w=1)

        # State covariance
        self.P = np.eye(13) * 0.1

        # Process noise covariance
        self.Q = np.diag([
            config.position_process_noise,
            config.position_process_noise,
            config.position_process_noise,
            config.orientation_process_noise,
            config.orientation_process_noise,
            config.orientation_process_noise,
            config.orientation_process_noise,
            config.velocity_process_noise,
            config.velocity_process_noise,
            config.velocity_process_noise,
            config.velocity_process_noise * 0.5,
            config.velocity_process_noise * 0.5,
            config.velocity_process_noise * 0.5,
        ])

        # Measurement noise for Mocopi (high orientation confidence)
        self.R_mocopi = np.diag([
            0.02, 0.02, 0.02,  # Position (medium)
            0.001, 0.001, 0.001, 0.001,  # Quaternion (very low - IMU accurate)
        ])

        # Measurement noise for MediaPipe (high position confidence)
        self.R_mediapipe = np.diag([
            0.01, 0.01, 0.05,  # Position (good XY, weaker Z)
            0.1, 0.1, 0.1, 0.1,  # Quaternion (derived, less reliable)
        ])

        # Last update time
        self.last_update_time: Optional[float] = None

        # Smoothed output
        self._smoothed_state: Optional[LimbState] = None

    def predict(self, dt: float):
        """
        Predict state forward by dt seconds.
        """
        # State transition: position += velocity * dt
        self.x[0:3] += self.x[7:10] * dt

        # Quaternion integration from angular velocity
        omega = self.x[10:13]
        omega_quat = np.array([0, omega[0], omega[1], omega[2]]) * dt / 2
        q = self.x[3:7]
        q_new = q + self._quaternion_multiply(q, omega_quat)
        self.x[3:7] = q_new / np.linalg.norm(q_new)  # Normalize

        # State transition Jacobian (linearized)
        F = np.eye(13)
        F[0:3, 7:10] = np.eye(3) * dt
        # Quaternion Jacobian is more complex, approximated

        # Update covariance
        self.P = F @ self.P @ F.T + self.Q

    def update_mocopi(
        self,
        position: np.ndarray,
        quaternion: np.ndarray,
        timestamp_ms: float,
    ):
        """
        Update with Mocopi measurement (trust orientation highly).
        """
        # Measurement vector
        z = np.concatenate([position, quaternion])

        # Measurement matrix (direct observation)
        H = np.zeros((7, 13))
        H[0:3, 0:3] = np.eye(3)  # Position
        H[3:7, 3:7] = np.eye(4)  # Quaternion

        # Kalman gain
        S = H @ self.P @ H.T + self.R_mocopi
        K = self.P @ H.T @ np.linalg.inv(S)

        # State update
        y = z - H @ self.x  # Innovation

        # Handle quaternion sign ambiguity
        if np.dot(z[3:7], self.x[3:7]) < 0:
            y[3:7] = -z[3:7] - self.x[3:7]

        self.x = self.x + K @ y

        # Normalize quaternion
        self.x[3:7] /= np.linalg.norm(self.x[3:7])

        # Covariance update
        self.P = (np.eye(13) - K @ H) @ self.P

        self.last_update_time = timestamp_ms

    def update_mediapipe(
        self,
        position: np.ndarray,
        quaternion: Optional[np.ndarray],
        timestamp_ms: float,
        confidence: float,
    ):
        """
        Update with MediaPipe measurement (trust position, less so orientation).
        """
        if confidence < 0.3:
            return  # Skip low-confidence observations

        # Scale measurement noise by inverse confidence
        R = self.R_mediapipe.copy()
        R *= (1.0 / max(confidence, 0.1))

        # Build measurement
        if quaternion is not None:
            z = np.concatenate([position, quaternion])
            H = np.zeros((7, 13))
            H[0:3, 0:3] = np.eye(3)
            H[3:7, 3:7] = np.eye(4)
        else:
            # Position only
            z = position
            H = np.zeros((3, 13))
            H[0:3, 0:3] = np.eye(3)
            R = R[0:3, 0:3]

        # Kalman update
        S = H @ self.P @ H.T + R
        K = self.P @ H.T @ np.linalg.inv(S)
        y = z - H @ self.x

        self.x = self.x + K @ y
        self.x[3:7] /= np.linalg.norm(self.x[3:7])
        self.P = (np.eye(13) - K @ H) @ self.P

        self.last_update_time = timestamp_ms

    def get_state(self) -> LimbState:
        """Get current fused state."""
        state = LimbState(
            position=self.x[0:3].copy(),
            quaternion=self.x[3:7].copy(),
            linear_velocity=self.x[7:10].copy(),
            angular_velocity=self.x[10:13].copy(),
            orientation_source="fused",
            position_source="fused",
            position_covariance=self.P[0:3, 0:3].copy(),
            orientation_covariance=self.P[3:7, 3:7].copy(),
            confidence=self._compute_confidence(),
        )

        # Apply smoothing
        if self._smoothed_state is not None:
            alpha = self.config.output_smoothing
            state.position = (1 - alpha) * state.position + alpha * self._smoothed_state.position
            state.quaternion = self._slerp(
                self._smoothed_state.quaternion,
                state.quaternion,
                1 - alpha
            )

        self._smoothed_state = state
        return state

    def _compute_confidence(self) -> float:
        """Compute confidence from state covariance."""
        pos_var = np.trace(self.P[0:3, 0:3])
        orient_var = np.trace(self.P[3:7, 3:7])

        # Lower variance = higher confidence
        confidence = 1.0 / (1.0 + pos_var + orient_var)
        return float(np.clip(confidence, 0, 1))

    @staticmethod
    def _quaternion_multiply(q1: np.ndarray, q2: np.ndarray) -> np.ndarray:
        """Multiply two quaternions."""
        w1, x1, y1, z1 = q1
        w2, x2, y2, z2 = q2
        return np.array([
            w1*w2 - x1*x2 - y1*y2 - z1*z2,
            w1*x2 + x1*w2 + y1*z2 - z1*y2,
            w1*y2 - x1*z2 + y1*w2 + z1*x2,
            w1*z2 + x1*y2 - y1*x2 + z1*w2,
        ])

    @staticmethod
    def _slerp(q1: np.ndarray, q2: np.ndarray, t: float) -> np.ndarray:
        """Spherical linear interpolation between quaternions."""
        dot = np.dot(q1, q2)
        if dot < 0:
            q2 = -q2
            dot = -dot

        if dot > 0.9995:
            result = q1 + t * (q2 - q1)
            return result / np.linalg.norm(result)

        theta = np.arccos(dot)
        sin_theta = np.sin(theta)

        s1 = np.sin((1 - t) * theta) / sin_theta
        s2 = np.sin(t * theta) / sin_theta

        return s1 * q1 + s2 * q2


class SensorFusionEngine:
    """
    Main fusion engine coordinating all limb filters.

    Usage:
        fusion = SensorFusionEngine()

        # For each frame:
        fusion.update(unified_frame)
        fused_skeleton = fusion.get_fused_skeleton()
    """

    LIMB_NAMES = [
        'hip', 'head',
        'left_shoulder', 'right_shoulder',
        'left_elbow', 'right_elbow',
        'left_wrist', 'right_wrist',
        'left_hip', 'right_hip',
        'left_knee', 'right_knee',
        'left_ankle', 'right_ankle',
    ]

    def __init__(self, config: Optional[FusionConfig] = None):
        self.config = config or FusionConfig()

        # Create Kalman filter per limb
        self.filters: Dict[str, LimbKalmanFilter] = {
            name: LimbKalmanFilter(name, self.config)
            for name in self.LIMB_NAMES
        }

        # Limb-specific fusion rules
        self.fusion_rules = self._build_fusion_rules()

        # Last frame time for dt calculation
        self._last_frame_time: Optional[float] = None

    def _build_fusion_rules(self) -> Dict[str, dict]:
        """
        Build per-limb fusion rules.

        Some limbs trust IMU more (orientation), others trust vision more (position).
        """
        return {
            # Core body - IMU primary for orientation
            'hip': {'orientation': 'mocopi', 'position': 'mocopi'},
            'head': {'orientation': 'mocopi', 'position': 'fused'},

            # Shoulders - blend
            'left_shoulder': {'orientation': 'fused', 'position': 'mediapipe'},
            'right_shoulder': {'orientation': 'fused', 'position': 'mediapipe'},

            # Elbows - vision primary (no IMU here in base kit)
            'left_elbow': {'orientation': 'mediapipe', 'position': 'mediapipe'},
            'right_elbow': {'orientation': 'mediapipe', 'position': 'mediapipe'},

            # Wrists - IMU orientation, fused position
            'left_wrist': {'orientation': 'mocopi', 'position': 'fused'},
            'right_wrist': {'orientation': 'mocopi', 'position': 'fused'},

            # Hips (leg) - blend
            'left_hip': {'orientation': 'fused', 'position': 'mediapipe'},
            'right_hip': {'orientation': 'fused', 'position': 'mediapipe'},

            # Knees - vision primary
            'left_knee': {'orientation': 'mediapipe', 'position': 'mediapipe'},
            'right_knee': {'orientation': 'mediapipe', 'position': 'mediapipe'},

            # Ankles - IMU orientation
            'left_ankle': {'orientation': 'mocopi', 'position': 'fused'},
            'right_ankle': {'orientation': 'mocopi', 'position': 'fused'},
        }

    def update(self, frame: 'UnifiedFrame'):
        """
        Update all filters with new frame data.
        """
        # Compute dt
        dt = 1/60.0  # Default
        if self._last_frame_time is not None:
            dt = (frame.timestamp_ms - self._last_frame_time) / 1000.0
            dt = np.clip(dt, 0.001, 0.1)  # Clamp to reasonable range
        self._last_frame_time = frame.timestamp_ms

        # Predict all filters forward
        for kf in self.filters.values():
            kf.predict(dt)

        # Update with Mocopi data
        if frame.has_mocopi and frame.mocopi_frame:
            self._update_from_mocopi(frame.mocopi_frame, frame.timestamp_ms)

        # Update with MediaPipe data
        if frame.has_mediapipe and frame.mediapipe_frame:
            self._update_from_mediapipe(frame.mediapipe_frame, frame.timestamp_ms)

    def _update_from_mocopi(self, mocopi: 'PoseFrame', timestamp_ms: float):
        """Extract limb data from Mocopi and update filters."""
        from cc_core.skeleton.mapping import MOCOPI_LIMB_BONE_MAP

        for limb_name, bone_id in MOCOPI_LIMB_BONE_MAP.items():
            if limb_name not in self.filters:
                continue
            if bone_id not in mocopi.bones:
                continue

            bone = mocopi.bones[bone_id]
            self.filters[limb_name].update_mocopi(
                position=bone.position,
                quaternion=bone.quaternion,
                timestamp_ms=timestamp_ms,
            )

    def _update_from_mediapipe(self, mp: 'MediaPipeFrame', timestamp_ms: float):
        """Extract limb data from MediaPipe and update filters."""
        from cc_core.fusion.transforms.mediapipe_to_bones import MediaPipeToBones

        converter = MediaPipeToBones()
        limb_data = converter.extract_limbs(mp)

        for limb_name, data in limb_data.items():
            if limb_name not in self.filters:
                continue

            self.filters[limb_name].update_mediapipe(
                position=data['position'],
                quaternion=data.get('quaternion'),
                timestamp_ms=timestamp_ms,
                confidence=data.get('confidence', 0.5),
            )

    def get_fused_skeleton(self) -> Dict[str, LimbState]:
        """Get the current fused skeleton state."""
        return {
            name: kf.get_state()
            for name, kf in self.filters.items()
        }

2.3 MediaPipe to Bones Transform

python
# core/cc-core/cc_core/fusion/transforms/mediapipe_to_bones.py

"""
Transform MediaPipe landmarks to bone-compatible format.

Converts MediaPipe's 33 pose landmarks into position and derived
orientation data that can be fed into the fusion engine.
"""

from typing import Dict, Optional
import numpy as np


# MediaPipe pose landmark indices
class PoseLandmark:
    NOSE = 0
    LEFT_EYE_INNER = 1
    LEFT_EYE = 2
    LEFT_EYE_OUTER = 3
    RIGHT_EYE_INNER = 4
    RIGHT_EYE = 5
    RIGHT_EYE_OUTER = 6
    LEFT_EAR = 7
    RIGHT_EAR = 8
    MOUTH_LEFT = 9
    MOUTH_RIGHT = 10
    LEFT_SHOULDER = 11
    RIGHT_SHOULDER = 12
    LEFT_ELBOW = 13
    RIGHT_ELBOW = 14
    LEFT_WRIST = 15
    RIGHT_WRIST = 16
    LEFT_PINKY = 17
    RIGHT_PINKY = 18
    LEFT_INDEX = 19
    RIGHT_INDEX = 20
    LEFT_THUMB = 21
    RIGHT_THUMB = 22
    LEFT_HIP = 23
    RIGHT_HIP = 24
    LEFT_KNEE = 25
    RIGHT_KNEE = 26
    LEFT_ANKLE = 27
    RIGHT_ANKLE = 28
    LEFT_HEEL = 29
    RIGHT_HEEL = 30
    LEFT_FOOT_INDEX = 31
    RIGHT_FOOT_INDEX = 32


class MediaPipeToBones:
    """
    Convert MediaPipe landmarks to bone transforms.
    """

    # Mapping: fusion limb name -> (primary landmark, secondary for orientation)
    LIMB_LANDMARKS = {
        'hip': (PoseLandmark.LEFT_HIP, PoseLandmark.RIGHT_HIP),
        'head': (PoseLandmark.NOSE, PoseLandmark.LEFT_EAR),
        'left_shoulder': (PoseLandmark.LEFT_SHOULDER, PoseLandmark.LEFT_ELBOW),
        'right_shoulder': (PoseLandmark.RIGHT_SHOULDER, PoseLandmark.RIGHT_ELBOW),
        'left_elbow': (PoseLandmark.LEFT_ELBOW, PoseLandmark.LEFT_WRIST),
        'right_elbow': (PoseLandmark.RIGHT_ELBOW, PoseLandmark.RIGHT_WRIST),
        'left_wrist': (PoseLandmark.LEFT_WRIST, PoseLandmark.LEFT_INDEX),
        'right_wrist': (PoseLandmark.RIGHT_WRIST, PoseLandmark.RIGHT_INDEX),
        'left_hip': (PoseLandmark.LEFT_HIP, PoseLandmark.LEFT_KNEE),
        'right_hip': (PoseLandmark.RIGHT_HIP, PoseLandmark.RIGHT_KNEE),
        'left_knee': (PoseLandmark.LEFT_KNEE, PoseLandmark.LEFT_ANKLE),
        'right_knee': (PoseLandmark.RIGHT_KNEE, PoseLandmark.RIGHT_ANKLE),
        'left_ankle': (PoseLandmark.LEFT_ANKLE, PoseLandmark.LEFT_FOOT_INDEX),
        'right_ankle': (PoseLandmark.RIGHT_ANKLE, PoseLandmark.RIGHT_FOOT_INDEX),
    }

    def __init__(self, use_world_landmarks: bool = True):
        """
        Args:
            use_world_landmarks: Use metric 3D coordinates if available
        """
        self.use_world_landmarks = use_world_landmarks

    def extract_limbs(self, frame: 'MediaPipeFrame') -> Dict[str, dict]:
        """
        Extract limb data from MediaPipe frame.

        Returns:
            Dict mapping limb names to {position, quaternion, confidence}
        """
        # Select landmark source
        if self.use_world_landmarks and frame.pose_world_landmarks is not None:
            landmarks = frame.pose_world_landmarks
            confidence_source = frame.pose_landmarks  # Get visibility from normalized
        else:
            landmarks = frame.pose_landmarks
            confidence_source = landmarks

        if landmarks is None:
            return {}

        result = {}

        for limb_name, (primary_idx, secondary_idx) in self.LIMB_LANDMARKS.items():
            # Get primary landmark position
            if limb_name == 'hip':
                # Hip is midpoint of left/right
                left_hip = landmarks[PoseLandmark.LEFT_HIP]
                right_hip = landmarks[PoseLandmark.RIGHT_HIP]
                position = (left_hip[:3] + right_hip[:3]) / 2
                confidence = min(
                    self._get_visibility(confidence_source, PoseLandmark.LEFT_HIP),
                    self._get_visibility(confidence_source, PoseLandmark.RIGHT_HIP)
                )
            else:
                position = landmarks[primary_idx][:3]
                confidence = self._get_visibility(confidence_source, primary_idx)

            # Derive orientation from limb direction
            quaternion = self._derive_quaternion(
                landmarks, primary_idx, secondary_idx
            )

            result[limb_name] = {
                'position': np.array(position, dtype=np.float32),
                'quaternion': quaternion,
                'confidence': confidence,
            }

        return result

    def _get_visibility(self, landmarks: np.ndarray, idx: int) -> float:
        """Get landmark visibility/confidence."""
        if landmarks is None or idx >= len(landmarks):
            return 0.0
        if landmarks.shape[1] > 3:
            return float(landmarks[idx, 3])
        return 0.5  # Default if no visibility data

    def _derive_quaternion(
        self,
        landmarks: np.ndarray,
        from_idx: int,
        to_idx: int,
    ) -> Optional[np.ndarray]:
        """
        Derive orientation quaternion from limb direction.

        This creates a quaternion that rotates the Y-axis to align
        with the direction from `from_idx` to `to_idx`.
        """
        from_pos = landmarks[from_idx][:3]
        to_pos = landmarks[to_idx][:3]

        direction = to_pos - from_pos
        length = np.linalg.norm(direction)

        if length < 0.001:
            return None  # Undefined orientation

        direction = direction / length

        # Create rotation from Y-axis to direction
        y_axis = np.array([0, 1, 0])

        # Handle parallel case
        dot = np.dot(y_axis, direction)
        if abs(dot) > 0.9999:
            if dot > 0:
                return np.array([1, 0, 0, 0])  # Identity
            else:
                return np.array([0, 1, 0, 0])  # 180 deg around X

        # Rotation axis and angle
        axis = np.cross(y_axis, direction)
        axis = axis / np.linalg.norm(axis)
        angle = np.arccos(np.clip(dot, -1, 1))

        # Axis-angle to quaternion
        half_angle = angle / 2
        w = np.cos(half_angle)
        xyz = axis * np.sin(half_angle)

        return np.array([w, xyz[0], xyz[1], xyz[2]], dtype=np.float32)

---

3. Frontend Integration

3.1 WebSocket Bridge for MediaPipe

To enable the Python fusion engine to receive MediaPipe data from the web dashboard, we need a WebSocket bridge.

typescript
// apps/web/cc-dashboard/src/lib/mediapipe/MediaPipeBridge.ts

/**
 * WebSocket bridge to stream MediaPipe data to Python backend.
 *
 * This enables the fusion engine to receive camera-based landmarks
 * alongside Mocopi IMU data.
 */

import { HolisticPipeline } from './pipeline'
import type { HolisticResult } from './types'

export interface BridgeConfig {
  wsUrl: string
  reconnectInterval: number
  binaryMode: boolean
}

export class MediaPipeBridge {
  private pipeline: HolisticPipeline
  private ws: WebSocket | null = null
  private config: BridgeConfig
  private reconnectTimer: number | null = null
  private frameCounter = 0

  constructor(pipeline: HolisticPipeline, config: Partial<BridgeConfig> = {}) {
    this.pipeline = pipeline
    this.config = {
      wsUrl: config.wsUrl ?? 'ws://localhost:8766/mediapipe',
      reconnectInterval: config.reconnectInterval ?? 2000,
      binaryMode: config.binaryMode ?? false,
    }

    // Listen to pipeline frames
    this.pipeline.on('frame', this.handleFrame.bind(this))
  }

  connect(): void {
    if (this.ws?.readyState === WebSocket.OPEN) return

    try {
      this.ws = new WebSocket(this.config.wsUrl)

      this.ws.onopen = () => {
        console.log('[MediaPipeBridge] Connected to fusion backend')
        this.sendHandshake()
      }

      this.ws.onclose = () => {
        console.log('[MediaPipeBridge] Disconnected, reconnecting...')
        this.scheduleReconnect()
      }

      this.ws.onerror = (error) => {
        console.error('[MediaPipeBridge] WebSocket error:', error)
      }
    } catch (error) {
      console.error('[MediaPipeBridge] Connection failed:', error)
      this.scheduleReconnect()
    }
  }

  disconnect(): void {
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer)
      this.reconnectTimer = null
    }
    if (this.ws) {
      this.ws.close()
      this.ws = null
    }
  }

  private sendHandshake(): void {
    this.send({
      type: 'handshake',
      source: 'mediapipe',
      version: '1.0',
      capabilities: {
        pose: true,
        face: true,
        hands: true,
        worldLandmarks: true,
      },
    })
  }

  private handleFrame(result: HolisticResult): void {
    if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return

    this.frameCounter++

    if (this.config.binaryMode) {
      this.sendBinary(result)
    } else {
      this.sendJSON(result)
    }
  }

  private sendJSON(result: HolisticResult): void {
    const message = {
      type: 'frame',
      timestamp_ms: result.timestamp_ms,
      frame_number: this.frameCounter,
      poseLandmarks: result.poseLandmarks,
      poseWorldLandmarks: result.poseWorldLandmarks,
      faceLandmarks: result.faceLandmarks,
      leftHandLandmarks: result.leftHandLandmarks,
      rightHandLandmarks: result.rightHandLandmarks,
      confidence: result.confidence,
    }
    this.send(message)
  }

  private sendBinary(result: HolisticResult): void {
    // Binary format for lower latency
    // Header: 8 bytes (timestamp_ms as float64)
    // Frame number: 4 bytes (uint32)
    // Pose landmarks: 33 * 4 * 4 = 528 bytes
    // Face landmarks: 468 * 2 * 4 = 3744 bytes (optional)
    // Hands: 21 * 4 * 4 * 2 = 672 bytes (optional)

    const buffer = new ArrayBuffer(8 + 4 + 528)
    const view = new DataView(buffer)

    // Header
    view.setFloat64(0, result.timestamp_ms, true)
    view.setUint32(8, this.frameCounter, true)

    // Pose landmarks
    if (result.poseLandmarks) {
      let offset = 12
      for (const lm of result.poseLandmarks) {
        view.setFloat32(offset, lm.x, true)
        view.setFloat32(offset + 4, lm.y, true)
        view.setFloat32(offset + 8, lm.z, true)
        view.setFloat32(offset + 12, lm.visibility ?? 0, true)
        offset += 16
      }
    }

    this.ws!.send(buffer)
  }

  private send(data: unknown): void {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(data))
    }
  }

  private scheduleReconnect(): void {
    if (this.reconnectTimer) return
    this.reconnectTimer = window.setTimeout(() => {
      this.reconnectTimer = null
      this.connect()
    }, this.config.reconnectInterval)
  }
}

3.2 Zustand Store Update

typescript
// apps/web/cc-dashboard/src/store/fusionStore.ts

/**
 * Zustand store for sensor fusion state.
 */

import { create } from 'zustand'

interface FusionState {
  // Connection status
  mocopiConnected: boolean
  mediapipeConnected: boolean
  fusionActive: boolean

  // Per-limb status
  limbStatus: Record<string, {
    source: 'mocopi' | 'mediapipe' | 'fused'
    confidence: number
    occluded: boolean
  }>

  // Quality metrics
  fusionQuality: number
  mocopiLatency: number
  mediapipeLatency: number
  syncOffset: number

  // Actions
  setMocopiConnected: (connected: boolean) => void
  setMediapipeConnected: (connected: boolean) => void
  setLimbStatus: (limb: string, status: FusionState['limbStatus'][string]) => void
  updateMetrics: (metrics: Partial<Pick<FusionState,
    'fusionQuality' | 'mocopiLatency' | 'mediapipeLatency' | 'syncOffset'
  >>) => void
}

export const useFusionStore = create<FusionState>((set) => ({
  mocopiConnected: false,
  mediapipeConnected: false,
  fusionActive: false,

  limbStatus: {},

  fusionQuality: 0,
  mocopiLatency: 0,
  mediapipeLatency: 0,
  syncOffset: 0,

  setMocopiConnected: (connected) => set({
    mocopiConnected: connected,
    fusionActive: connected && useFusionStore.getState().mediapipeConnected,
  }),

  setMediapipeConnected: (connected) => set({
    mediapipeConnected: connected,
    fusionActive: connected && useFusionStore.getState().mocopiConnected,
  }),

  setLimbStatus: (limb, status) => set((state) => ({
    limbStatus: { ...state.limbStatus, [limb]: status },
  })),

  updateMetrics: (metrics) => set(metrics),
}))

---

4. Capture Session API

4.1 Session Manager

python
# core/cc-core/cc_core/fusion/capture/session_manager.py

"""
Manage dance capture sessions with audio synchronization.
"""

from __future__ import annotations

import asyncio
import json
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import List, Optional

import numpy as np

from cc_core.fusion.unified_receiver import UnifiedReceiver, UnifiedFrame
from cc_core.fusion.kalman_fusion import SensorFusionEngine


@dataclass
class CaptureSession:
    """A recording session capturing motion for audio phrases."""

    session_id: str
    performer_id: str
    created_at: datetime = field(default_factory=datetime.now)

    # Configuration
    output_dir: Path = field(default_factory=lambda: Path("sessions"))
    target_fps: float = 60.0

    # State
    is_recording: bool = False
    current_phrase_id: Optional[str] = None
    current_take: int = 0

    # Captured data
    frames: List[UnifiedFrame] = field(default_factory=list)
    fused_skeletons: List[dict] = field(default_factory=list)

    # Metadata
    phrases_recorded: List[str] = field(default_factory=list)
    takes_per_phrase: dict = field(default_factory=dict)


class SessionManager:
    """
    Manage capture sessions for the dance data collection pipeline.

    Usage:
        manager = SessionManager()
        session = await manager.create_session("performer_1")

        # For each phrase:
        await manager.start_recording(session, "phrase_007", audio_duration=8.2)
        # ... performer dances ...
        take = await manager.stop_recording(session)

        if take.is_valid:
            await manager.accept_take(session, take)
        else:
            await manager.reject_take(session, take)
    """

    def __init__(
        self,
        mocopi_port: int = 12351,
        mediapipe_ws_url: str = "ws://localhost:8766/mediapipe",
        output_base: Path = Path("data/captures"),
    ):
        self.mocopi_port = mocopi_port
        self.mediapipe_ws_url = mediapipe_ws_url
        self.output_base = output_base

        # Active components
        self._receiver: Optional[UnifiedReceiver] = None
        self._fusion: Optional[SensorFusionEngine] = None
        self._recording_task: Optional[asyncio.Task] = None

    async def create_session(
        self,
        performer_id: str,
        session_id: Optional[str] = None,
    ) -> CaptureSession:
        """Create a new capture session."""
        if session_id is None:
            session_id = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

        session = CaptureSession(
            session_id=session_id,
            performer_id=performer_id,
            output_dir=self.output_base / session_id,
        )

        # Create output directory
        session.output_dir.mkdir(parents=True, exist_ok=True)

        # Initialize receiver and fusion engine
        self._receiver = UnifiedReceiver(
            mocopi_port=self.mocopi_port,
            mediapipe_ws_url=self.mediapipe_ws_url,
            session_id=session_id,
            target_fps=session.target_fps,
        )
        self._fusion = SensorFusionEngine()

        await self._receiver.start()

        return session

    async def start_recording(
        self,
        session: CaptureSession,
        phrase_id: str,
        audio_duration: float,
        countdown_seconds: float = 3.0,
    ) -> None:
        """Start recording motion for a phrase."""
        if session.is_recording:
            raise RuntimeError("Already recording")

        session.current_phrase_id = phrase_id
        session.current_take = session.takes_per_phrase.get(phrase_id, 0) + 1
        session.frames = []
        session.fused_skeletons = []
        session.is_recording = True

        # Calculate total recording time
        total_duration = countdown_seconds + audio_duration + 1.0  # 1s buffer

        # Start recording task
        self._recording_task = asyncio.create_task(
            self._record_frames(session, total_duration)
        )

    async def stop_recording(self, session: CaptureSession) -> 'Take':
        """Stop recording and return the captured take."""
        session.is_recording = False

        if self._recording_task:
            self._recording_task.cancel()
            try:
                await self._recording_task
            except asyncio.CancelledError:
                pass

        # Build take from captured frames
        take = Take(
            phrase_id=session.current_phrase_id,
            take_number=session.current_take,
            frames=session.frames.copy(),
            fused_skeletons=session.fused_skeletons.copy(),
            session_id=session.session_id,
        )

        # Validate take
        take.validate()

        return take

    async def accept_take(self, session: CaptureSession, take: 'Take') -> Path:
        """Accept a take and save it to disk."""
        # Update metadata
        session.phrases_recorded.append(take.phrase_id)
        session.takes_per_phrase[take.phrase_id] = take.take_number

        # Save take
        output_path = session.output_dir / f"{take.phrase_id}_take_{take.take_number}.npz"
        take.save(output_path)

        return output_path

    async def reject_take(self, session: CaptureSession, take: 'Take') -> None:
        """Reject a take (don't save)."""
        # Just clear - take is garbage collected
        pass

    async def end_session(self, session: CaptureSession) -> dict:
        """End the session and return summary."""
        await self._receiver.stop()

        # Save session metadata
        metadata = {
            'session_id': session.session_id,
            'performer_id': session.performer_id,
            'created_at': session.created_at.isoformat(),
            'phrases_recorded': session.phrases_recorded,
            'takes_per_phrase': session.takes_per_phrase,
            'total_phrases': len(session.phrases_recorded),
        }

        with open(session.output_dir / 'session_metadata.json', 'w') as f:
            json.dump(metadata, f, indent=2)

        return metadata

    async def _record_frames(self, session: CaptureSession, duration: float):
        """Background task to record frames."""
        import time
        start_time = time.perf_counter()

        async for frame in self._receiver.stream():
            if not session.is_recording:
                break

            # Check duration
            elapsed = time.perf_counter() - start_time
            if elapsed > duration:
                break

            # Store raw frame
            session.frames.append(frame)

            # Fuse and store
            self._fusion.update(frame)
            fused = self._fusion.get_fused_skeleton()
            session.fused_skeletons.append(fused)


@dataclass
class Take:
    """A single recording take of a phrase."""

    phrase_id: str
    take_number: int
    frames: List[UnifiedFrame]
    fused_skeletons: List[dict]
    session_id: str

    # Validation results
    is_valid: bool = False
    validation_report: dict = field(default_factory=dict)

    def validate(self):
        """Run validation checks on the take."""
        from cc_core.fusion.capture.validator import TakeValidator

        validator = TakeValidator()
        self.is_valid, self.validation_report = validator.validate(self)

    def save(self, path: Path):
        """Save take to disk."""
        # Convert to numpy arrays
        fused_array = self._skeletons_to_array()

        np.savez_compressed(
            path,
            fused_motion=fused_array,
            frame_count=len(self.frames),
            phrase_id=self.phrase_id,
            take_number=self.take_number,
            is_valid=self.is_valid,
        )

    def _skeletons_to_array(self) -> np.ndarray:
        """Convert list of skeleton dicts to numpy array."""
        from cc_core.fusion.transforms.to_63d import Extended63DTransformer

        transformer = Extended63DTransformer()

        frames = []
        for skeleton in self.fused_skeletons:
            frame_63d = transformer.transform_skeleton(skeleton)
            frames.append(frame_63d)

        return np.array(frames, dtype=np.float32)

---

5. Next Steps

Immediate Implementation Tasks

1. Create fusion module structure

bash
   mkdir -p core/cc-core/cc_core/fusion/{transforms,capture}
   touch core/cc-core/cc_core/fusion/__init__.py

2. Implement UnifiedReceiver (Week 1)
- Port the code from this spec
- Add unit tests
- Verify Mocopi + simulated MediaPipe

3. Implement KalmanFusion (Week 2)
- Port the EKF code
- Tune parameters with real sensor data
- Add visualization for debugging

4. Build WebSocket bridge (Week 2)
- Add MediaPipeBridge to dashboard
- Test end-to-end streaming

5. Implement SessionManager (Week 3)
- Port capture session code
- Build minimal UI (Streamlit)
- Record first test sessions

6. Training integration (Week 4)
- Implement 63D transformer
- Update CC-MotionGen dataset class
- Run comparison training

---

Document Version: 1.0
Last Updated: December 2024
Author: Comp-Core Team

Promotion Decision

Attach run IDs, datasets, metrics, and reproduction commands.

Source Anchor

projects/Documentation/03-guides/development/FUSION_IMPLEMENTATION_SPEC.md

Detected Structure

Method · Evaluation · Code Anchors · Architecture