Migration Guides Détaillés pour CDPs

Vue d’ensemble

Guide technique complet pour migrer vos données clients vers des Customer Data Platforms (CDPs), incluant la planification, l’implémentation et l’optimisation post-migration.

🎯 Qu’est-ce qu’une CDP ?

Une Customer Data Platform centralise toutes les données clients provenant de différentes sources pour créer un profil client unifié et exploitable.

Avantages d’une CDP

  • Vue client à 360°: Profil unifié multi-touchpoints
  • Segmentation avancée: Audiences dynamiques et comportementales
  • Personnalisation: Expériences sur-mesure en temps réel
  • Activation multicanal: Synchronisation vers tous les outils marketing
  • Conformité: Gestion centralisée des consentements et privacy

🏗️ Planification de la Migration

Audit de l’Existant

// Script d'audit des sources de données existantes
class DataSourceAuditor {
  constructor() {
    this.sources = [];
    this.dataSchema = new Map();
    this.volumes = new Map();
  }

  async auditExistingSources() {
    // Audit Google Analytics
    const gaData = await this.auditGoogleAnalytics();
    this.sources.push({
      name: 'Google Analytics',
      type: 'web_analytics',
      volume: gaData.sessions_per_month,
      schema: gaData.events_structure,
      integration_complexity: 'medium'
    });

    // Audit CRM/ERP
    const crmData = await this.auditCRM();
    this.sources.push({
      name: 'CRM System',
      type: 'customer_records',
      volume: crmData.contacts_count,
      schema: crmData.fields_mapping,
      integration_complexity: 'high'
    });

    // Audit Email Marketing
    const emailData = await this.auditEmailPlatform();
    this.sources.push({
      name: 'Email Platform',
      type: 'email_engagement',
      volume: emailData.subscribers_count,
      schema: emailData.engagement_events,
      integration_complexity: 'low'
    });

    return this.generateAuditReport();
  }

  generateMigrationPlan() {
    return {
      phases: [
        {
          name: 'Phase 1: Core Implementation',
          duration: '4-6 weeks',
          sources: this.sources.filter(s => s.integration_complexity === 'low'),
          risks: 'Low - Standard integrations'
        },
        {
          name: 'Phase 2: Advanced Integrations',
          duration: '6-8 weeks', 
          sources: this.sources.filter(s => s.integration_complexity === 'medium'),
          risks: 'Medium - Custom mapping required'
        },
        {
          name: 'Phase 3: Complex Systems',
          duration: '8-12 weeks',
          sources: this.sources.filter(s => s.integration_complexity === 'high'),
          risks: 'High - Deep technical integration'
        }
      ]
    };
  }
}

Mapping des Données

# Exemple de schema mapping pour migration
data_mapping:
  customer_profile:
    source_fields:
      - crm.contact_id -> customer_id (primary_key)
      - crm.email -> email (identifier)
      - crm.first_name -> first_name
      - crm.last_name -> last_name
      - ga.client_id -> web_session_id
      - email.subscriber_id -> email_id

  events_mapping:
    web_events:
      - ga.page_view -> page_viewed
      - ga.purchase -> order_completed
      - ga.add_to_cart -> product_added_to_cart
    
    email_events:
      - email.opened -> email_opened
      - email.clicked -> email_clicked
      - email.unsubscribed -> email_unsubscribed

  custom_attributes:
    behavioral:
      - purchase_frequency: "Orders per 90 days"
      - avg_order_value: "Average transaction amount"
      - preferred_category: "Most purchased product category"
    
    demographic:
      - age_group: "Calculated from birth_date"
      - location_tier: "City size classification"

🚀 Migration vers Segment

Installation et Configuration

// Configuration initiale Segment
import { Analytics } from '@segment/analytics-node';

class SegmentMigration {
  constructor(writeKey) {
    this.analytics = new Analytics({ writeKey });
    this.batchSize = 100;
    this.rateLimits = {
      track: 500, // événements par seconde
      identify: 100 // identifications par seconde
    };
  }

  // Migration des profils utilisateurs
  async migrateUserProfiles(users) {
    const batches = this.createBatches(users, this.batchSize);
    
    for (const batch of batches) {
      await Promise.all(batch.map(user => this.migrateUser(user)));
      
      // Respect des rate limits
      await this.sleep(1000 / this.rateLimits.identify);
    }
  }

  async migrateUser(user) {
    try {
      await this.analytics.identify({
        userId: user.id,
        traits: {
          email: user.email,
          firstName: user.firstName,
          lastName: user.lastName,
          phone: user.phone,
          // Attributs calculés
          totalOrders: user.orderHistory.length,
          lifetimeValue: user.orderHistory.reduce((sum, order) => sum + order.total, 0),
          lastPurchaseDate: user.lastOrder?.date,
          preferredCategory: this.calculatePreferredCategory(user.orderHistory),
          // Segmentation
          customerTier: this.calculateCustomerTier(user),
          churnRisk: this.calculateChurnRisk(user)
        },
        context: {
          ip: user.lastKnownIP,
          userAgent: user.lastUserAgent,
          timezone: user.timezone
        }
      });
      
      console.log(`✅ Migrated user ${user.id}`);
      
    } catch (error) {
      console.error(`❌ Failed to migrate user ${user.id}:`, error);
      await this.logFailedMigration(user, error);
    }
  }

  // Migration des événements historiques
  async migrateHistoricalEvents(events) {
    const sortedEvents = events.sort((a, b) => new Date(a.timestamp) - new Date(b.timestamp));
    
    for (const event of sortedEvents) {
      await this.migrateEvent(event);
    }
  }

  async migrateEvent(event) {
    const segmentEvent = {
      userId: event.userId,
      event: this.mapEventName(event.type),
      properties: this.mapEventProperties(event.properties),
      timestamp: event.timestamp,
      context: {
        page: event.page,
        referrer: event.referrer,
        campaign: event.campaign
      }
    };

    await this.analytics.track(segmentEvent);
  }

  // Configuration des destinations
  setupDestinations() {
    const destinations = {
      // Marketing automation
      klaviyo: {
        enabled: true,
        settings: {
          apiKey: process.env.KLAVIYO_API_KEY,
          listId: process.env.KLAVIYO_LIST_ID
        }
      },
      
      // Analytics
      googleAnalytics: {
        enabled: true,
        settings: {
          trackingId: process.env.GA_TRACKING_ID,
          enhancedEcommerce: true
        }
      },
      
      // Advertising
      facebookPixel: {
        enabled: true,
        settings: {
          pixelId: process.env.FB_PIXEL_ID,
          standardEvents: true
        }
      }
    };

    return destinations;
  }
}

Configuration Server-Side

// Configuration backend pour Segment
const express = require('express');
const { Analytics } = require('@segment/analytics-node');

class SegmentBackendIntegration {
  constructor() {
    this.analytics = new Analytics({
      writeKey: process.env.SEGMENT_WRITE_KEY,
      flushAt: 20, // Batch size
      flushInterval: 10000 // 10 seconds
    });
    
    this.setupRoutes();
  }

  setupRoutes() {
    const router = express.Router();
    
    // Endpoint pour les événements e-commerce
    router.post('/track/ecommerce', async (req, res) => {
      try {
        const { userId, event, properties } = req.body;
        
        // Validation et enrichissement
        const enrichedProperties = await this.enrichEcommerceEvent(properties);
        
        await this.analytics.track({
          userId,
          event,
          properties: enrichedProperties,
          context: {
            ip: req.ip,
            userAgent: req.get('User-Agent')
          }
        });
        
        res.json({ success: true });
        
      } catch (error) {
        console.error('Segment tracking error:', error);
        res.status(500).json({ error: 'Tracking failed' });
      }
    });

    // Webhook pour synchronisation bidirectionnelle
    router.post('/webhook/segment', (req, res) => {
      const { type, userId, properties } = req.body;
      
      switch (type) {
        case 'identify':
          this.handleUserUpdate(userId, properties);
          break;
        case 'track':
          this.handleEventReceived(userId, properties);
          break;
      }
      
      res.status(200).send('OK');
    });
    
    return router;
  }

  async enrichEcommerceEvent(properties) {
    return {
      ...properties,
      // Enrichissement automatique
      timestamp: new Date().toISOString(),
      currency: properties.currency || 'EUR',
      // Classification automatique
      category_hierarchy: await this.getCategoryHierarchy(properties.category),
      // Calculs de performance
      profit_margin: await this.calculateProfitMargin(properties.products),
      // Attribution marketing
      attribution_data: await this.getAttributionData(properties.orderId)
    };
  }
}

📊 Migration vers Klaviyo

Configuration Klaviyo CDP

# Migration vers Klaviyo avec Python
import klaviyo_api
from klaviyo_api.rest import ApiException
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

class KlaviyoMigration:
    def __init__(self, api_key):
        configuration = klaviyo_api.Configuration(
            host="https://a.klaviyo.com",
            api_key={'Klaviyo-API-Key': api_key}
        )
        self.profiles_api = klaviyo_api.ProfilesApi(klaviyo_api.ApiClient(configuration))
        self.events_api = klaviyo_api.EventsApi(klaviyo_api.ApiClient(configuration))
        
    def migrate_customer_profiles(self, customers_df):
        """Migration des profils clients avec données enrichies"""
        
        for index, customer in customers_df.iterrows():
            try:
                # Calcul des métriques comportementales
                behavioral_data = self.calculate_behavioral_metrics(customer)
                
                # Création du profil enrichi
                profile_data = {
                    "type": "profile",
                    "attributes": {
                        "email": customer['email'],
                        "first_name": customer['first_name'],
                        "last_name": customer['last_name'],
                        "phone_number": customer['phone'],
                        
                        # Données comportementales calculées
                        "total_orders": behavioral_data['total_orders'],
                        "lifetime_value": behavioral_data['lifetime_value'],
                        "average_order_value": behavioral_data['avg_order_value'],
                        "last_order_date": behavioral_data['last_order_date'],
                        
                        # Segmentation automatique
                        "customer_tier": behavioral_data['customer_tier'],
                        "purchase_frequency": behavioral_data['purchase_frequency'],
                        "preferred_categories": behavioral_data['preferred_categories'],
                        
                        # Scoring prédictif
                        "churn_probability": behavioral_data['churn_score'],
                        "next_purchase_prediction": behavioral_data['next_purchase_days'],
                        "product_affinity": behavioral_data['product_affinity_scores']
                    },
                    "properties": {
                        "migration_date": datetime.now().isoformat(),
                        "data_source": "crm_migration",
                        "profile_completeness": behavioral_data['profile_completeness']
                    }
                }
                
                # Création du profil dans Klaviyo
                response = self.profiles_api.create_profile(profile_data)
                print(f"✅ Profile created: {customer['email']}")
                
            except ApiException as e:
                print(f"❌ Error creating profile for {customer['email']}: {e}")
                
    def migrate_historical_events(self, events_df):
        """Migration des événements historiques"""
        
        # Regroupement par utilisateur pour optimiser
        user_events = events_df.groupby('user_id')
        
        for user_id, user_events_df in user_events:
            try:
                # Tri chronologique des événements
                sorted_events = user_events_df.sort_values('timestamp')
                
                for _, event in sorted_events.iterrows():
                    klaviyo_event = self.map_to_klaviyo_event(event)
                    
                    response = self.events_api.create_event(klaviyo_event)
                    
                print(f"✅ Migrated {len(sorted_events)} events for user {user_id}")
                
            except ApiException as e:
                print(f"❌ Error migrating events for user {user_id}: {e}")
    
    def calculate_behavioral_metrics(self, customer):
        """Calcul des métriques comportementales avancées"""
        
        # Récupération de l'historique des commandes
        orders = self.get_customer_orders(customer['id'])
        
        if orders.empty:
            return self.default_behavioral_metrics()
        
        # Calculs de base
        total_orders = len(orders)
        lifetime_value = orders['total'].sum()
        avg_order_value = orders['total'].mean()
        
        # Fréquence d'achat
        date_range = (orders['date'].max() - orders['date'].min()).days
        purchase_frequency = total_orders / max(date_range / 30, 1)  # par mois
        
        # Catégories préférées
        category_counts = orders['category'].value_counts()
        preferred_categories = category_counts.head(3).index.tolist()
        
        # Scoring de risque de churn
        days_since_last_order = (datetime.now() - orders['date'].max()).days
        churn_score = self.calculate_churn_probability(
            days_since_last_order, purchase_frequency, avg_order_value
        )
        
        # Prédiction prochaine commande
        avg_days_between_orders = orders['date'].diff().dt.days.mean()
        next_purchase_days = max(avg_days_between_orders - days_since_last_order, 0)
        
        # Classification du client
        customer_tier = self.classify_customer_tier(lifetime_value, total_orders)
        
        return {
            'total_orders': total_orders,
            'lifetime_value': round(lifetime_value, 2),
            'avg_order_value': round(avg_order_value, 2),
            'last_order_date': orders['date'].max().isoformat(),
            'purchase_frequency': round(purchase_frequency, 2),
            'preferred_categories': preferred_categories,
            'churn_score': round(churn_score, 3),
            'next_purchase_days': int(next_purchase_days),
            'customer_tier': customer_tier,
            'product_affinity_scores': self.calculate_product_affinity(orders),
            'profile_completeness': self.calculate_profile_completeness(customer)
        }

    def setup_advanced_segmentation(self):
        """Configuration des segments avancés dans Klaviyo"""
        
        segments_config = [
            {
                'name': 'High Value Customers',
                'definition': {
                    'conditions': [
                        {'field': 'lifetime_value', 'operator': '>', 'value': 1000},
                        {'field': 'total_orders', 'operator': '>=', 'value': 5}
                    ]
                }
            },
            {
                'name': 'At Risk Customers',
                'definition': {
                    'conditions': [
                        {'field': 'churn_probability', 'operator': '>', 'value': 0.7},
                        {'field': 'last_order_date', 'operator': '<', 'value': '30_days_ago'}
                    ]
                }
            },
            {
                'name': 'Frequent Buyers',
                'definition': {
                    'conditions': [
                        {'field': 'purchase_frequency', 'operator': '>', 'value': 2},
                        {'field': 'customer_tier', 'operator': 'in', 'value': ['gold', 'platinum']}
                    ]
                }
            }
        ]
        
        for segment in segments_config:
            self.create_klaviyo_segment(segment)

Flows et Automatisations

def setup_klaviyo_flows(self):
    """Configuration des flows automatisés post-migration"""
    
    flows_config = [
        {
            'name': 'Post-Migration Welcome Series',
            'trigger': {
                'type': 'profile_property_changed',
                'property': 'migration_date'
            },
            'steps': [
                {
                    'type': 'email',
                    'template': 'welcome_back_email',
                    'delay': '1_hour',
                    'personalization': {
                        'use_historical_data': True,
                        'recommend_products': True
                    }
                },
                {
                    'type': 'sms',
                    'template': 'exclusive_offer_sms',
                    'delay': '3_days',
                    'conditions': {
                        'customer_tier': ['gold', 'platinum']
                    }
                }
            ]
        },
        {
            'name': 'Churn Prevention Flow',
            'trigger': {
                'type': 'profile_property_update',
                'property': 'churn_probability',
                'condition': '> 0.6'
            },
            'steps': [
                {
                    'type': 'email',
                    'template': 'we_miss_you_email',
                    'delay': 'immediate',
                    'personalization': {
                        'discount_percentage': 'dynamic_based_on_tier',
                        'product_recommendations': 'last_purchased_category'
                    }
                }
            ]
        }
    ]
    
    for flow in flows_config:
        self.create_klaviyo_flow(flow)

🏢 Migration vers Adobe CDP

Configuration Adobe Experience Platform

// SDK Adobe Experience Platform
import { createEdge } from '@adobe/experience-platform-web-sdk';

class AdobeCDPMigration {
  constructor(config) {
    this.edge = createEdge({
      configId: config.datastreamId,
      orgId: config.orgId,
      debugEnabled: config.debug || false
    });
    
    this.schemas = {
      profile: config.profileSchemaId,
      behavioralEvent: config.behavioralSchemaId,
      purchaseEvent: config.purchaseSchemaId
    };
  }

  async migrateCustomerProfiles(profiles) {
    for (const profile of profiles) {
      try {
        const xdmProfile = this.transformToXDM(profile);
        
        await this.edge.sendEvent({
          xdm: {
            ...xdmProfile,
            _schema: {
              name: this.schemas.profile
            }
          },
          identityMap: {
            email: [{
              id: profile.email,
              primary: true,
              authenticatedState: 'authenticated'
            }],
            crmId: [{
              id: profile.crmId,
              primary: false
            }]
          }
        });
        
        console.log(`✅ Adobe CDP: Migrated profile ${profile.email}`);
        
      } catch (error) {
        console.error(`❌ Adobe CDP migration failed for ${profile.email}:`, error);
        await this.handleMigrationError(profile, error);
      }
    }
  }

  transformToXDM(profile) {
    return {
      person: {
        name: {
          firstName: profile.firstName,
          lastName: profile.lastName,
          fullName: `${profile.firstName} ${profile.lastName}`
        },
        birthDate: profile.birthDate,
        gender: profile.gender
      },
      personalEmail: {
        primary: true,
        address: profile.email,
        status: 'active',
        type: 'personal'
      },
      mobilePhone: {
        primary: true,
        number: profile.phone,
        status: 'active'
      },
      homeAddress: {
        street1: profile.address?.street,
        city: profile.address?.city,
        postalCode: profile.address?.postalCode,
        country: profile.address?.country
      },
      // Données comportementales enrichies
      commerce: {
        purchases: {
          value: profile.lifetimeValue || 0
        },
        order: {
          purchaseOrderNumber: profile.lastOrderId,
          currencyCode: 'EUR'
        }
      },
      // Attributs personnalisés calculés
      _tenant: {
        customerTier: profile.customerTier,
        churnProbability: profile.churnProbability,
        preferredCategories: profile.preferredCategories,
        avgOrderValue: profile.avgOrderValue,
        totalOrders: profile.totalOrders
      }
    };
  }

  // Configuration des audiences temps réel
  async setupRealtimeAudiences() {
    const audiences = [
      {
        name: 'High Value Customers',
        description: 'Customers with high lifetime value and engagement',
        expression: `
          _tenant.customerTier = "platinum" OR 
          (_tenant.lifetimeValue > 1000 AND _tenant.totalOrders >= 5)
        `,
        evaluationType: 'streaming'
      },
      {
        name: 'At Risk Segment',
        description: 'Customers with high churn probability',
        expression: `
          _tenant.churnProbability > 0.7 AND
          commerce.purchases.value > 100
        `,
        evaluationType: 'batch',
        schedule: 'daily'
      }
    ];

    for (const audience of audiences) {
      await this.createAdobeAudience(audience);
    }
  }
}

🛠️ Solutions CDP Custom

Architecture Microservices

# CDP Custom avec FastAPI et Apache Kafka
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from kafka import KafkaProducer, KafkaConsumer
import redis
import asyncpg
from typing import List, Dict, Any
import json
from datetime import datetime

class CustomCDP:
    def __init__(self):
        self.app = FastAPI(title="Custom CDP", version="1.0.0")
        self.kafka_producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.setup_routes()
    
    def setup_routes(self):
        
        @self.app.post("/profiles/ingest")
        async def ingest_profile(profile: CustomerProfile, background_tasks: BackgroundTasks):
            """Ingestion en temps réel des données de profil"""
            
            # Validation et enrichissement
            enriched_profile = await self.enrich_profile(profile)
            
            # Sauvegarde dans la base de données
            background_tasks.add_task(self.save_profile_to_db, enriched_profile)
            
            # Publication sur Kafka pour traitement temps réel
            self.kafka_producer.send('profile_updates', {
                'profile_id': profile.id,
                'data': enriched_profile.dict(),
                'timestamp': datetime.utcnow().isoformat()
            })
            
            # Cache Redis pour accès rapide
            await self.cache_profile(enriched_profile)
            
            return {"status": "success", "profile_id": profile.id}

        @self.app.post("/events/track")
        async def track_event(event: CustomerEvent):
            """Tracking d'événements comportementaux"""
            
            # Publication immédiate sur Kafka
            self.kafka_producer.send('behavioral_events', {
                'user_id': event.user_id,
                'event_type': event.event_type,
                'properties': event.properties,
                'timestamp': event.timestamp
            })
            
            # Mise à jour du profil en temps réel
            await self.update_profile_from_event(event)
            
            return {"status": "tracked"}

        @self.app.get("/profiles/{profile_id}/unified")
        async def get_unified_profile(profile_id: str):
            """Récupération du profil unifié temps réel"""
            
            # Tentative de récupération depuis le cache
            cached_profile = await self.get_cached_profile(profile_id)
            if cached_profile:
                return cached_profile
            
            # Reconstruction depuis les données
            unified_profile = await self.build_unified_profile(profile_id)
            
            # Mise en cache
            await self.cache_profile(unified_profile)
            
            return unified_profile

    async def real_time_segmentation_engine(self):
        """Moteur de segmentation temps réel"""
        
        consumer = KafkaConsumer(
            'profile_updates', 'behavioral_events',
            bootstrap_servers=['localhost:9092'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
        for message in consumer:
            try:
                # Évaluation des règles de segmentation
                affected_segments = await self.evaluate_segmentation_rules(message.value)
                
                # Mise à jour des segments
                for segment_id in affected_segments:
                    await self.update_segment_membership(
                        message.value['profile_id'], 
                        segment_id
                    )
                
                # Déclenchement des actions automatiques
                await self.trigger_segment_actions(message.value, affected_segments)
                
            except Exception as e:
                print(f"Segmentation error: {e}")

    async def setup_data_destinations(self):
        """Configuration des destinations de données"""
        
        destinations = {
            'email_platform': {
                'type': 'webhook',
                'url': 'https://api.klaviyo.com/api/track',
                'auth': {'api_key': 'klaviyo_key'},
                'mapping': self.klaviyo_mapping,
                'batch_size': 100,
                'frequency': '5min'
            },
            'advertising_platform': {
                'type': 'api',
                'platform': 'facebook',
                'auth': {'access_token': 'fb_token'},
                'mapping': self.facebook_mapping,
                'sync_type': 'real_time'
            },
            'analytics_warehouse': {
                'type': 'database',
                'connection': 'postgresql://user:pass@host:5432/analytics',
                'table': 'customer_events',
                'batch_size': 1000,
                'frequency': '1hour'
            }
        }
        
        return destinations

Pipeline de Données en Temps Réel

# Pipeline Apache Spark pour traitement batch
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import great_expectations as ge

class CDPDataPipeline:
    def __init__(self):
        self.spark = SparkSession.builder \
            .appName("CDP Data Pipeline") \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
            .getOrCreate()
    
    def process_daily_batch(self, date: str):
        """Traitement batch quotidien pour calculs complexes"""
        
        # Lecture des événements de la journée
        events_df = self.spark.read \
            .option("multiline", "true") \
            .json(f"s3://cdp-data/events/date={date}")
        
        # Calcul des métriques comportementales
        user_metrics = self.calculate_behavioral_metrics(events_df)
        
        # Scoring ML pour prédictions
        scored_profiles = self.apply_ml_scoring(user_metrics)
        
        # Détection d'anomalies
        anomaly_detection = self.detect_behavioral_anomalies(scored_profiles)
        
        # Sauvegarde des résultats
        self.save_processed_data(scored_profiles, date)
        
        return {
            'processed_profiles': scored_profiles.count(),
            'anomalies_detected': anomaly_detection.count()
        }
    
    def calculate_behavioral_metrics(self, events_df):
        """Calculs avancés des métriques comportementales"""
        
        # Fenêtres temporelles pour calculs roulants
        window_30d = Window.partitionBy("user_id") \
                          .orderBy("timestamp") \
                          .rangeBetween(-30*24*3600, 0)
        
        window_7d = Window.partitionBy("user_id") \
                         .orderBy("timestamp") \
                         .rangeBetween(-7*24*3600, 0)
        
        metrics_df = events_df.groupBy("user_id").agg(
            # Métriques de base
            count("*").alias("total_events"),
            countDistinct("session_id").alias("unique_sessions"),
            sum(when(col("event_type") == "purchase", col("value")).otherwise(0)).alias("total_revenue"),
            
            # Métriques roulantes 30 jours
            sum(when(col("event_type") == "purchase", col("value")).otherwise(0))
                .over(window_30d).alias("revenue_30d"),
            count("event_type").over(window_30d).alias("events_30d"),
            
            # Métriques roulantes 7 jours
            count("event_type").over(window_7d).alias("events_7d"),
            
            # Calculs de fréquence et récence
            max("timestamp").alias("last_activity"),
            min("timestamp").alias("first_activity"),
            
            # Analyse de séquences comportementales
            collect_list(
                struct(col("event_type"), col("timestamp"))
                .orderBy("timestamp")
            ).alias("event_sequence")
        )
        
        # Calculs dérivés
        enhanced_metrics = metrics_df.withColumn(
            "avg_session_value", 
            col("total_revenue") / col("unique_sessions")
        ).withColumn(
            "activity_frequency",
            col("total_events") / datediff(col("last_activity"), col("first_activity"))
        ).withColumn(
            "engagement_trend",
            col("events_7d") / col("events_30d") * 4  # Normalisation sur 30 jours
        )
        
        return enhanced_metrics

    def setup_data_quality_monitoring(self):
        """Configuration du monitoring de qualité des données"""
        
        # Configuration Great Expectations
        context = ge.get_context()
        
        # Définition des attentes de qualité
        expectations = [
            {
                'expectation_type': 'expect_column_values_to_not_be_null',
                'column': 'user_id'
            },
            {
                'expectation_type': 'expect_column_values_to_be_unique',
                'column': 'user_id'
            },
            {
                'expectation_type': 'expect_column_values_to_be_between',
                'column': 'total_revenue',
                'min_value': 0,
                'max_value': 10000
            },
            {
                'expectation_type': 'expect_column_values_to_match_regex',
                'column': 'user_id',
                'regex': r'^[a-f0-9-]{36}$'  # UUID format
            }
        ]
        
        return expectations

✅ Checklist de Migration

Phase de Préparation

  • Audit complet des sources de données existantes
  • Mapping des schémas et définition des transformations
  • Sélection et configuration de la plateforme CDP
  • Tests de connectivité avec les sources de données
  • Plan de sauvegarde et de rollback

Phase d’Implémentation

  • Migration des profils clients de base
  • Configuration des sources de données temps réel
  • Migration des événements historiques
  • Tests de qualité des données migrées
  • Configuration des segments et audiences

Phase d’Activation

  • Configuration des destinations marketing
  • Tests des parcours automatisés
  • Formation des équipes utilisatrices
  • Monitoring et alertes de performance
  • Documentation technique complète

Phase d’Optimisation

  • Analyse de performance post-migration
  • Optimisation des coûts et du débit
  • Amélioration continue des segments
  • Extension vers nouvelles sources/destinations

📈 ROI et Métriques de Succès

  • Réduction des coûts d’intégration: 60-80%
  • Amélioration de la précision des segments: 40-60%
  • Accélération du time-to-market: 3x plus rapide
  • Augmentation des taux de conversion: 25-35%
  • Réduction de la fragmentation des données: 90%

La migration vers une CDP représente un investissement stratégique majeur qui unifie vos données clients et démultiplie l’efficacité de vos actions marketing.