Press ESC to exit fullscreen
πŸ“– Lesson ⏱️ 90 minutes

Enterprise ML Patterns

Best practices for enterprise ML applications

Java Enterprise Patterns - Scalable ML Architecture

This tutorial covers enterprise-grade architectural patterns for machine learning systems using SuperML Java 2.1.0. Learn how to build scalable, maintainable, and resilient ML architectures that can handle enterprise-level workloads.

What You’ll Learn

  • Microservices Architecture - ML service decomposition and communication
  • Event-Driven Architecture - Asynchronous ML pipelines and event sourcing
  • CQRS Pattern - Command Query Responsibility Segregation for ML systems
  • Saga Pattern - Distributed transaction management
  • Circuit Breaker - Fault tolerance and resilience
  • API Gateway - Unified entry point and routing
  • Service Discovery - Dynamic service registration and discovery
  • Distributed Caching - Multi-level caching strategies

Prerequisites

  • Completion of β€œJava Model Deployment” tutorial
  • Spring Boot and Spring Cloud experience
  • Microservices architecture knowledge
  • Message queue systems (RabbitMQ, Kafka)
  • Distributed systems understanding

Microservices Architecture

ML Service Decomposition

This example demonstrates how to decompose a monolithic ML system into microservices using domain-driven design principles.

package com.company.enterprise.microservices;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

/**
 * ML Model Service - Handles model training, evaluation, and management
 * This service is responsible for the ML lifecycle management
 */
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
public class MLModelService {
    
    public static void main(String[] args) {
        SpringApplication.run(MLModelService.class, args);
    }
    
    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}
package com.company.enterprise.microservices.model;

import com.superml.core.model.Model;
import com.superml.core.model.ModelManager;
import com.superml.core.data.Dataset;
import com.superml.core.evaluation.Evaluator;
import com.superml.core.training.TrainingConfig;
import com.superml.core.training.Trainer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
 * ML Model Management Controller
 * Handles model training, evaluation, and lifecycle management
 */
@RestController
@RequestMapping("/api/v1/models")
@RefreshScope
public class MLModelController {
    
    private static final Logger logger = LoggerFactory.getLogger(MLModelController.class);
    
    @Autowired
    private ModelManager modelManager;
    
    @Autowired
    private DataServiceClient dataServiceClient;
    
    @Autowired
    private MetricsServiceClient metricsServiceClient;
    
    @Autowired
    private EventPublisher eventPublisher;
    
    /**
     * Train a new model asynchronously
     */
    @PostMapping("/train")
    public CompletableFuture<ResponseEntity<TrainingResponse>> trainModel(
            @RequestBody TrainingRequest request) {
        
        logger.info("Starting model training for: {}", request.getModelName());
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                // Publish training started event
                eventPublisher.publishEvent(new ModelTrainingStartedEvent(
                    request.getModelName(), request.getDatasetId()));
                
                // Get dataset from data service
                Dataset dataset = dataServiceClient.getDataset(request.getDatasetId());
                
                // Configure training
                TrainingConfig config = new TrainingConfig()
                    .setAlgorithm(request.getAlgorithm())
                    .setHyperparameters(request.getHyperparameters())
                    .setValidationSplit(0.2)
                    .setEarlyStoppingPatience(10)
                    .setMetricsToTrack(request.getMetrics());
                
                // Train model
                Trainer trainer = new Trainer(config);
                Model model = trainer.train(dataset);
                
                // Evaluate model
                Evaluator evaluator = new Evaluator();
                Map<String, Double> metrics = evaluator.evaluate(model, dataset);
                
                // Save model
                String modelId = modelManager.saveModel(model, request.getModelName());
                
                // Send metrics to metrics service
                metricsServiceClient.recordTrainingMetrics(modelId, metrics);
                
                // Publish training completed event
                eventPublisher.publishEvent(new ModelTrainingCompletedEvent(
                    request.getModelName(), modelId, metrics));
                
                TrainingResponse response = new TrainingResponse(
                    modelId, request.getModelName(), metrics, "COMPLETED");
                
                logger.info("Model training completed: {}", modelId);
                return ResponseEntity.ok(response);
                
            } catch (Exception e) {
                logger.error("Model training failed: {}", request.getModelName(), e);
                
                // Publish training failed event
                eventPublisher.publishEvent(new ModelTrainingFailedEvent(
                    request.getModelName(), e.getMessage()));
                
                TrainingResponse response = new TrainingResponse(
                    null, request.getModelName(), null, "FAILED");
                return ResponseEntity.badRequest().body(response);
            }
        });
    }
    
    /**
     * Get model information
     */
    @GetMapping("/{modelId}")
    public ResponseEntity<ModelInfo> getModel(@PathVariable String modelId) {
        try {
            Model model = modelManager.getModel(modelId);
            
            if (model == null) {
                return ResponseEntity.notFound().build();
            }
            
            ModelInfo info = new ModelInfo(
                modelId,
                model.getName(),
                model.getAlgorithm(),
                model.getCreatedAt(),
                model.getVersion(),
                model.getMetrics()
            );
            
            return ResponseEntity.ok(info);
            
        } catch (Exception e) {
            logger.error("Error retrieving model: {}", modelId, e);
            return ResponseEntity.badRequest().build();
        }
    }
    
    /**
     * List all models
     */
    @GetMapping
    public ResponseEntity<List<ModelInfo>> listModels(
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "20") int size) {
        
        try {
            List<ModelInfo> models = modelManager.listModels(page, size);
            return ResponseEntity.ok(models);
            
        } catch (Exception e) {
            logger.error("Error listing models", e);
            return ResponseEntity.badRequest().build();
        }
    }
    
    /**
     * Delete model
     */
    @DeleteMapping("/{modelId}")
    public ResponseEntity<Void> deleteModel(@PathVariable String modelId) {
        try {
            modelManager.deleteModel(modelId);
            
            // Publish model deleted event
            eventPublisher.publishEvent(new ModelDeletedEvent(modelId));
            
            return ResponseEntity.noContent().build();
            
        } catch (Exception e) {
            logger.error("Error deleting model: {}", modelId, e);
            return ResponseEntity.badRequest().build();
        }
    }
    
    /**
     * Update model metadata
     */
    @PutMapping("/{modelId}")
    public ResponseEntity<ModelInfo> updateModel(
            @PathVariable String modelId,
            @RequestBody ModelUpdateRequest request) {
        
        try {
            Model model = modelManager.getModel(modelId);
            if (model == null) {
                return ResponseEntity.notFound().build();
            }
            
            // Update model metadata
            model.setName(request.getName());
            model.setDescription(request.getDescription());
            model.setTags(request.getTags());
            
            modelManager.updateModel(model);
            
            // Publish model updated event
            eventPublisher.publishEvent(new ModelUpdatedEvent(modelId));
            
            ModelInfo info = new ModelInfo(
                modelId,
                model.getName(),
                model.getAlgorithm(),
                model.getCreatedAt(),
                model.getVersion(),
                model.getMetrics()
            );
            
            return ResponseEntity.ok(info);
            
        } catch (Exception e) {
            logger.error("Error updating model: {}", modelId, e);
            return ResponseEntity.badRequest().build();
        }
    }
    
    // Request/Response DTOs
    public static class TrainingRequest {
        private String modelName;
        private String datasetId;
        private String algorithm;
        private Map<String, Object> hyperparameters;
        private List<String> metrics;
        
        // Getters and setters
        public String getModelName() { return modelName; }
        public void setModelName(String modelName) { this.modelName = modelName; }
        
        public String getDatasetId() { return datasetId; }
        public void setDatasetId(String datasetId) { this.datasetId = datasetId; }
        
        public String getAlgorithm() { return algorithm; }
        public void setAlgorithm(String algorithm) { this.algorithm = algorithm; }
        
        public Map<String, Object> getHyperparameters() { return hyperparameters; }
        public void setHyperparameters(Map<String, Object> hyperparameters) { this.hyperparameters = hyperparameters; }
        
        public List<String> getMetrics() { return metrics; }
        public void setMetrics(List<String> metrics) { this.metrics = metrics; }
    }
    
    public static class TrainingResponse {
        private String modelId;
        private String modelName;
        private Map<String, Double> metrics;
        private String status;
        
        public TrainingResponse(String modelId, String modelName, 
                               Map<String, Double> metrics, String status) {
            this.modelId = modelId;
            this.modelName = modelName;
            this.metrics = metrics;
            this.status = status;
        }
        
        // Getters and setters
        public String getModelId() { return modelId; }
        public void setModelId(String modelId) { this.modelId = modelId; }
        
        public String getModelName() { return modelName; }
        public void setModelName(String modelName) { this.modelName = modelName; }
        
        public Map<String, Double> getMetrics() { return metrics; }
        public void setMetrics(Map<String, Double> metrics) { this.metrics = metrics; }
        
        public String getStatus() { return status; }
        public void setStatus(String status) { this.status = status; }
    }
    
    public static class ModelInfo {
        private String modelId;
        private String name;
        private String algorithm;
        private String createdAt;
        private String version;
        private Map<String, Double> metrics;
        
        public ModelInfo(String modelId, String name, String algorithm, 
                        String createdAt, String version, Map<String, Double> metrics) {
            this.modelId = modelId;
            this.name = name;
            this.algorithm = algorithm;
            this.createdAt = createdAt;
            this.version = version;
            this.metrics = metrics;
        }
        
        // Getters and setters
        public String getModelId() { return modelId; }
        public void setModelId(String modelId) { this.modelId = modelId; }
        
        public String getName() { return name; }
        public void setName(String name) { this.name = name; }
        
        public String getAlgorithm() { return algorithm; }
        public void setAlgorithm(String algorithm) { this.algorithm = algorithm; }
        
        public String getCreatedAt() { return createdAt; }
        public void setCreatedAt(String createdAt) { this.createdAt = createdAt; }
        
        public String getVersion() { return version; }
        public void setVersion(String version) { this.version = version; }
        
        public Map<String, Double> getMetrics() { return metrics; }
        public void setMetrics(Map<String, Double> metrics) { this.metrics = metrics; }
    }
    
    public static class ModelUpdateRequest {
        private String name;
        private String description;
        private List<String> tags;
        
        // Getters and setters
        public String getName() { return name; }
        public void setName(String name) { this.name = name; }
        
        public String getDescription() { return description; }
        public void setDescription(String description) { this.description = description; }
        
        public List<String> getTags() { return tags; }
        public void setTags(List<String> tags) { this.tags = tags; }
    }
}

Service Communication with Feign

package com.company.enterprise.microservices.clients;

import com.superml.core.data.Dataset;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

import java.util.List;
import java.util.Map;

/**
 * Data Service Client for inter-service communication
 * Handles data retrieval and preprocessing operations
 */
@FeignClient(name = "data-service", fallback = DataServiceClientFallback.class)
public interface DataServiceClient {
    
    @GetMapping("/api/v1/datasets/{datasetId}")
    Dataset getDataset(@PathVariable String datasetId);
    
    @PostMapping("/api/v1/datasets/preprocess")
    Dataset preprocessDataset(@RequestBody PreprocessingRequest request);
    
    @GetMapping("/api/v1/datasets/{datasetId}/features")
    List<String> getFeatures(@PathVariable String datasetId);
    
    @GetMapping("/api/v1/datasets/{datasetId}/statistics")
    Map<String, Object> getStatistics(@PathVariable String datasetId);
}

/**
 * Fallback implementation for data service client
 * Provides graceful degradation when data service is unavailable
 */
@Component
public class DataServiceClientFallback implements DataServiceClient {
    
    private static final Logger logger = LoggerFactory.getLogger(DataServiceClientFallback.class);
    
    @Override
    public Dataset getDataset(String datasetId) {
        logger.warn("Data service unavailable, returning empty dataset for ID: {}", datasetId);
        return new Dataset(); // Return empty dataset or cached version
    }
    
    @Override
    public Dataset preprocessDataset(PreprocessingRequest request) {
        logger.warn("Data service unavailable, skipping preprocessing");
        return new Dataset(); // Return unprocessed dataset
    }
    
    @Override
    public List<String> getFeatures(String datasetId) {
        logger.warn("Data service unavailable, returning empty features list");
        return new ArrayList<>();
    }
    
    @Override
    public Map<String, Object> getStatistics(String datasetId) {
        logger.warn("Data service unavailable, returning empty statistics");
        return new HashMap<>();
    }
}

/**
 * Metrics Service Client for tracking ML metrics
 */
@FeignClient(name = "metrics-service", fallback = MetricsServiceClientFallback.class)
public interface MetricsServiceClient {
    
    @PostMapping("/api/v1/metrics/training")
    void recordTrainingMetrics(@RequestBody TrainingMetrics metrics);
    
    @PostMapping("/api/v1/metrics/inference")
    void recordInferenceMetrics(@RequestBody InferenceMetrics metrics);
    
    @GetMapping("/api/v1/metrics/model/{modelId}")
    List<MetricPoint> getModelMetrics(@PathVariable String modelId);
}

@Component
public class MetricsServiceClientFallback implements MetricsServiceClient {
    
    private static final Logger logger = LoggerFactory.getLogger(MetricsServiceClientFallback.class);
    
    @Override
    public void recordTrainingMetrics(TrainingMetrics metrics) {
        logger.warn("Metrics service unavailable, metrics not recorded");
    }
    
    @Override
    public void recordInferenceMetrics(InferenceMetrics metrics) {
        logger.warn("Metrics service unavailable, metrics not recorded");
    }
    
    @Override
    public List<MetricPoint> getModelMetrics(String modelId) {
        logger.warn("Metrics service unavailable, returning empty metrics");
        return new ArrayList<>();
    }
}

Event-Driven Architecture

Event Publisher and Handlers

package com.company.enterprise.events;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * Event Publisher for ML system events
 * Publishes events to message queue for asynchronous processing
 */
@Component
public class EventPublisher {
    
    private static final Logger logger = LoggerFactory.getLogger(EventPublisher.class);
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Autowired
    private ObjectMapper objectMapper;
    
    private static final String EXCHANGE_NAME = "ml.events";
    
    /**
     * Publish ML event to message queue
     */
    public void publishEvent(MLEvent event) {
        try {
            String eventJson = objectMapper.writeValueAsString(event);
            
            String routingKey = event.getEventType().toLowerCase() + "." + event.getSource();
            
            rabbitTemplate.convertAndSend(EXCHANGE_NAME, routingKey, eventJson);
            
            logger.info("Published event: {} with routing key: {}", event.getEventType(), routingKey);
            
        } catch (Exception e) {
            logger.error("Failed to publish event: {}", event, e);
        }
    }
}

/**
 * Base ML Event class
 */
public abstract class MLEvent {
    private String eventId;
    private String eventType;
    private String source;
    private String timestamp;
    private Map<String, Object> metadata;
    
    public MLEvent(String eventType, String source) {
        this.eventId = UUID.randomUUID().toString();
        this.eventType = eventType;
        this.source = source;
        this.timestamp = Instant.now().toString();
        this.metadata = new HashMap<>();
    }
    
    // Getters and setters
    public String getEventId() { return eventId; }
    public void setEventId(String eventId) { this.eventId = eventId; }
    
    public String getEventType() { return eventType; }
    public void setEventType(String eventType) { this.eventType = eventType; }
    
    public String getSource() { return source; }
    public void setSource(String source) { this.source = source; }
    
    public String getTimestamp() { return timestamp; }
    public void setTimestamp(String timestamp) { this.timestamp = timestamp; }
    
    public Map<String, Object> getMetadata() { return metadata; }
    public void setMetadata(Map<String, Object> metadata) { this.metadata = metadata; }
}

/**
 * Model Training Started Event
 */
public class ModelTrainingStartedEvent extends MLEvent {
    private String modelName;
    private String datasetId;
    
    public ModelTrainingStartedEvent(String modelName, String datasetId) {
        super("MODEL_TRAINING_STARTED", "model-service");
        this.modelName = modelName;
        this.datasetId = datasetId;
    }
    
    // Getters and setters
    public String getModelName() { return modelName; }
    public void setModelName(String modelName) { this.modelName = modelName; }
    
    public String getDatasetId() { return datasetId; }
    public void setDatasetId(String datasetId) { this.datasetId = datasetId; }
}

/**
 * Model Training Completed Event
 */
public class ModelTrainingCompletedEvent extends MLEvent {
    private String modelName;
    private String modelId;
    private Map<String, Double> metrics;
    
    public ModelTrainingCompletedEvent(String modelName, String modelId, Map<String, Double> metrics) {
        super("MODEL_TRAINING_COMPLETED", "model-service");
        this.modelName = modelName;
        this.modelId = modelId;
        this.metrics = metrics;
    }
    
    // Getters and setters
    public String getModelName() { return modelName; }
    public void setModelName(String modelName) { this.modelName = modelName; }
    
    public String getModelId() { return modelId; }
    public void setModelId(String modelId) { this.modelId = modelId; }
    
    public Map<String, Double> getMetrics() { return metrics; }
    public void setMetrics(Map<String, Double> metrics) { this.metrics = metrics; }
}

/**
 * Event Handlers for ML system events
 */
@Component
public class MLEventHandlers {
    
    private static final Logger logger = LoggerFactory.getLogger(MLEventHandlers.class);
    
    @Autowired
    private ModelManager modelManager;
    
    @Autowired
    private NotificationService notificationService;
    
    @Autowired
    private ModelVersioningService versioningService;
    
    /**
     * Handle model training completed event
     */
    @RabbitListener(queues = "ml.model.training.completed")
    public void handleModelTrainingCompleted(String eventJson) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            ModelTrainingCompletedEvent event = mapper.readValue(eventJson, ModelTrainingCompletedEvent.class);
            
            logger.info("Handling model training completed event: {}", event.getModelId());
            
            // Create model version
            versioningService.createVersion(event.getModelId(), event.getMetrics());
            
            // Send notification
            notificationService.sendTrainingCompletedNotification(
                event.getModelName(), event.getModelId(), event.getMetrics());
            
            // Check if model meets deployment criteria
            if (meetsDeploymentCriteria(event.getMetrics())) {
                // Trigger deployment process
                triggerModelDeployment(event.getModelId());
            }
            
        } catch (Exception e) {
            logger.error("Error handling model training completed event", e);
        }
    }
    
    /**
     * Handle model training failed event
     */
    @RabbitListener(queues = "ml.model.training.failed")
    public void handleModelTrainingFailed(String eventJson) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            ModelTrainingFailedEvent event = mapper.readValue(eventJson, ModelTrainingFailedEvent.class);
            
            logger.info("Handling model training failed event: {}", event.getModelName());
            
            // Send failure notification
            notificationService.sendTrainingFailedNotification(
                event.getModelName(), event.getErrorMessage());
            
            // Record failure metrics
            recordTrainingFailure(event.getModelName(), event.getErrorMessage());
            
        } catch (Exception e) {
            logger.error("Error handling model training failed event", e);
        }
    }
    
    /**
     * Handle data drift detection event
     */
    @RabbitListener(queues = "ml.data.drift.detected")
    public void handleDataDriftDetected(String eventJson) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            DataDriftDetectedEvent event = mapper.readValue(eventJson, DataDriftDetectedEvent.class);
            
            logger.info("Handling data drift detected event: {}", event.getModelId());
            
            // Send alert
            notificationService.sendDataDriftAlert(event.getModelId(), event.getDriftScore());
            
            // Trigger model retraining if drift is severe
            if (event.getDriftScore() > 0.8) {
                triggerModelRetraining(event.getModelId());
            }
            
        } catch (Exception e) {
            logger.error("Error handling data drift detected event", e);
        }
    }
    
    private boolean meetsDeploymentCriteria(Map<String, Double> metrics) {
        // Check if model meets minimum performance criteria
        Double accuracy = metrics.get("accuracy");
        Double precision = metrics.get("precision");
        Double recall = metrics.get("recall");
        
        return accuracy != null && accuracy > 0.8 && 
               precision != null && precision > 0.75 && 
               recall != null && recall > 0.75;
    }
    
    private void triggerModelDeployment(String modelId) {
        logger.info("Triggering model deployment for: {}", modelId);
        // Implementation would trigger deployment pipeline
    }
    
    private void recordTrainingFailure(String modelName, String errorMessage) {
        logger.info("Recording training failure for: {}", modelName);
        // Implementation would record failure metrics
    }
    
    private void triggerModelRetraining(String modelId) {
        logger.info("Triggering model retraining for: {}", modelId);
        // Implementation would trigger retraining pipeline
    }
}

CQRS Pattern Implementation

Command and Query Separation

package com.company.enterprise.cqrs;

import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Autowired;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
 * Command Handler for ML Model operations
 * Handles write operations and state changes
 */
@Component
public class MLModelCommandHandler {
    
    private static final Logger logger = LoggerFactory.getLogger(MLModelCommandHandler.class);
    
    @Autowired
    private ModelWriteRepository modelWriteRepository;
    
    @Autowired
    private EventPublisher eventPublisher;
    
    @Autowired
    private CommandValidator commandValidator;
    
    /**
     * Handle train model command
     */
    public CompletableFuture<String> handle(TrainModelCommand command) {
        logger.info("Handling train model command: {}", command.getModelName());
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                // Validate command
                commandValidator.validate(command);
                
                // Create model entity
                ModelEntity model = new ModelEntity(
                    command.getModelName(),
                    command.getAlgorithm(),
                    command.getDatasetId(),
                    command.getHyperparameters()
                );
                
                // Save to write store
                String modelId = modelWriteRepository.save(model);
                
                // Publish domain event
                eventPublisher.publishEvent(new ModelTrainingStartedEvent(
                    command.getModelName(), command.getDatasetId()));
                
                logger.info("Model training command handled successfully: {}", modelId);
                return modelId;
                
            } catch (Exception e) {
                logger.error("Error handling train model command", e);
                throw new RuntimeException("Command handling failed", e);
            }
        });
    }
    
    /**
     * Handle update model command
     */
    public CompletableFuture<Void> handle(UpdateModelCommand command) {
        logger.info("Handling update model command: {}", command.getModelId());
        
        return CompletableFuture.runAsync(() -> {
            try {
                // Validate command
                commandValidator.validate(command);
                
                // Get existing model
                ModelEntity model = modelWriteRepository.findById(command.getModelId());
                if (model == null) {
                    throw new IllegalArgumentException("Model not found: " + command.getModelId());
                }
                
                // Update model
                model.setName(command.getName());
                model.setDescription(command.getDescription());
                model.setTags(command.getTags());
                model.setUpdatedAt(java.time.Instant.now());
                
                // Save updated model
                modelWriteRepository.update(model);
                
                // Publish domain event
                eventPublisher.publishEvent(new ModelUpdatedEvent(command.getModelId()));
                
                logger.info("Model update command handled successfully: {}", command.getModelId());
                
            } catch (Exception e) {
                logger.error("Error handling update model command", e);
                throw new RuntimeException("Command handling failed", e);
            }
        });
    }
    
    /**
     * Handle delete model command
     */
    public CompletableFuture<Void> handle(DeleteModelCommand command) {
        logger.info("Handling delete model command: {}", command.getModelId());
        
        return CompletableFuture.runAsync(() -> {
            try {
                // Validate command
                commandValidator.validate(command);
                
                // Check if model exists
                ModelEntity model = modelWriteRepository.findById(command.getModelId());
                if (model == null) {
                    throw new IllegalArgumentException("Model not found: " + command.getModelId());
                }
                
                // Soft delete model
                model.setDeleted(true);
                model.setDeletedAt(java.time.Instant.now());
                modelWriteRepository.update(model);
                
                // Publish domain event
                eventPublisher.publishEvent(new ModelDeletedEvent(command.getModelId()));
                
                logger.info("Model delete command handled successfully: {}", command.getModelId());
                
            } catch (Exception e) {
                logger.error("Error handling delete model command", e);
                throw new RuntimeException("Command handling failed", e);
            }
        });
    }
}

/**
 * Query Handler for ML Model operations
 * Handles read operations and data retrieval
 */
@Component
public class MLModelQueryHandler {
    
    private static final Logger logger = LoggerFactory.getLogger(MLModelQueryHandler.class);
    
    @Autowired
    private ModelReadRepository modelReadRepository;
    
    @Autowired
    private CacheManager cacheManager;
    
    /**
     * Handle get model query
     */
    public ModelReadModel handle(GetModelQuery query) {
        logger.debug("Handling get model query: {}", query.getModelId());
        
        // Check cache first
        ModelReadModel cachedModel = cacheManager.get(query.getModelId(), ModelReadModel.class);
        if (cachedModel != null) {
            logger.debug("Returning cached model: {}", query.getModelId());
            return cachedModel;
        }
        
        // Get from read repository
        ModelReadModel model = modelReadRepository.findById(query.getModelId());
        
        if (model != null) {
            // Cache the result
            cacheManager.put(query.getModelId(), model, 300); // 5 minutes TTL
        }
        
        return model;
    }
    
    /**
     * Handle search models query
     */
    public List<ModelReadModel> handle(SearchModelsQuery query) {
        logger.debug("Handling search models query: {}", query.getSearchCriteria());
        
        return modelReadRepository.search(
            query.getSearchCriteria(),
            query.getPage(),
            query.getSize(),
            query.getSortBy(),
            query.getSortOrder()
        );
    }
    
    /**
     * Handle get model metrics query
     */
    public List<MetricPoint> handle(GetModelMetricsQuery query) {
        logger.debug("Handling get model metrics query: {}", query.getModelId());
        
        return modelReadRepository.getMetrics(
            query.getModelId(),
            query.getStartTime(),
            query.getEndTime()
        );
    }
    
    /**
     * Handle get model performance query
     */
    public ModelPerformanceReadModel handle(GetModelPerformanceQuery query) {
        logger.debug("Handling get model performance query: {}", query.getModelId());
        
        // Check cache first
        String cacheKey = "performance_" + query.getModelId();
        ModelPerformanceReadModel cachedPerformance = cacheManager.get(cacheKey, ModelPerformanceReadModel.class);
        if (cachedPerformance != null) {
            return cachedPerformance;
        }
        
        // Get from read repository
        ModelPerformanceReadModel performance = modelReadRepository.getPerformance(query.getModelId());
        
        if (performance != null) {
            // Cache the result
            cacheManager.put(cacheKey, performance, 600); // 10 minutes TTL
        }
        
        return performance;
    }
}

/**
 * Command classes for ML operations
 */
public class TrainModelCommand {
    private String modelName;
    private String algorithm;
    private String datasetId;
    private Map<String, Object> hyperparameters;
    
    public TrainModelCommand(String modelName, String algorithm, String datasetId, Map<String, Object> hyperparameters) {
        this.modelName = modelName;
        this.algorithm = algorithm;
        this.datasetId = datasetId;
        this.hyperparameters = hyperparameters;
    }
    
    // Getters and setters
    public String getModelName() { return modelName; }
    public void setModelName(String modelName) { this.modelName = modelName; }
    
    public String getAlgorithm() { return algorithm; }
    public void setAlgorithm(String algorithm) { this.algorithm = algorithm; }
    
    public String getDatasetId() { return datasetId; }
    public void setDatasetId(String datasetId) { this.datasetId = datasetId; }
    
    public Map<String, Object> getHyperparameters() { return hyperparameters; }
    public void setHyperparameters(Map<String, Object> hyperparameters) { this.hyperparameters = hyperparameters; }
}

public class UpdateModelCommand {
    private String modelId;
    private String name;
    private String description;
    private List<String> tags;
    
    public UpdateModelCommand(String modelId, String name, String description, List<String> tags) {
        this.modelId = modelId;
        this.name = name;
        this.description = description;
        this.tags = tags;
    }
    
    // Getters and setters
    public String getModelId() { return modelId; }
    public void setModelId(String modelId) { this.modelId = modelId; }
    
    public String getName() { return name; }
    public void setName(String name) { this.name = name; }
    
    public String getDescription() { return description; }
    public void setDescription(String description) { this.description = description; }
    
    public List<String> getTags() { return tags; }
    public void setTags(List<String> tags) { this.tags = tags; }
}

public class DeleteModelCommand {
    private String modelId;
    
    public DeleteModelCommand(String modelId) {
        this.modelId = modelId;
    }
    
    // Getters and setters
    public String getModelId() { return modelId; }
    public void setModelId(String modelId) { this.modelId = modelId; }
}

/**
 * Query classes for ML operations
 */
public class GetModelQuery {
    private String modelId;
    
    public GetModelQuery(String modelId) {
        this.modelId = modelId;
    }
    
    // Getters and setters
    public String getModelId() { return modelId; }
    public void setModelId(String modelId) { this.modelId = modelId; }
}

public class SearchModelsQuery {
    private Map<String, Object> searchCriteria;
    private int page;
    private int size;
    private String sortBy;
    private String sortOrder;
    
    public SearchModelsQuery(Map<String, Object> searchCriteria, int page, int size, String sortBy, String sortOrder) {
        this.searchCriteria = searchCriteria;
        this.page = page;
        this.size = size;
        this.sortBy = sortBy;
        this.sortOrder = sortOrder;
    }
    
    // Getters and setters
    public Map<String, Object> getSearchCriteria() { return searchCriteria; }
    public void setSearchCriteria(Map<String, Object> searchCriteria) { this.searchCriteria = searchCriteria; }
    
    public int getPage() { return page; }
    public void setPage(int page) { this.page = page; }
    
    public int getSize() { return size; }
    public void setSize(int size) { this.size = size; }
    
    public String getSortBy() { return sortBy; }
    public void setSortBy(String sortBy) { this.sortBy = sortBy; }
    
    public String getSortOrder() { return sortOrder; }
    public void setSortOrder(String sortOrder) { this.sortOrder = sortOrder; }
}

public class GetModelMetricsQuery {
    private String modelId;
    private java.time.Instant startTime;
    private java.time.Instant endTime;
    
    public GetModelMetricsQuery(String modelId, java.time.Instant startTime, java.time.Instant endTime) {
        this.modelId = modelId;
        this.startTime = startTime;
        this.endTime = endTime;
    }
    
    // Getters and setters
    public String getModelId() { return modelId; }
    public void setModelId(String modelId) { this.modelId = modelId; }
    
    public java.time.Instant getStartTime() { return startTime; }
    public void setStartTime(java.time.Instant startTime) { this.startTime = startTime; }
    
    public java.time.Instant getEndTime() { return endTime; }
    public void setEndTime(java.time.Instant endTime) { this.endTime = endTime; }
}

public class GetModelPerformanceQuery {
    private String modelId;
    
    public GetModelPerformanceQuery(String modelId) {
        this.modelId = modelId;
    }
    
    // Getters and setters
    public String getModelId() { return modelId; }
    public void setModelId(String modelId) { this.modelId = modelId; }
}

Circuit Breaker Pattern

Fault Tolerance Implementation

package com.company.enterprise.resilience;

import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.retry.annotation.Retry;
import io.github.resilience4j.timelimiter.annotation.TimeLimiter;
import io.github.resilience4j.bulkhead.annotation.Bulkhead;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * Resilient ML Service with Circuit Breaker pattern
 * Provides fault tolerance and graceful degradation
 */
@Service
public class ResilientMLService {
    
    private static final Logger logger = LoggerFactory.getLogger(ResilientMLService.class);
    
    @Autowired
    private MLModelService modelService;
    
    @Autowired
    private CacheManager cacheManager;
    
    @Autowired
    private FallbackMLService fallbackService;
    
    /**
     * Resilient model prediction with circuit breaker
     */
    @CircuitBreaker(name = "model-inference", fallbackMethod = "fallbackPredict")
    @Retry(name = "model-inference")
    @TimeLimiter(name = "model-inference")
    @Bulkhead(name = "model-inference")
    public CompletableFuture<PredictionResult> predict(String modelId, PredictionRequest request) {
        logger.debug("Making prediction with model: {}", modelId);
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                // Check cache first
                String cacheKey = generateCacheKey(modelId, request);
                PredictionResult cachedResult = cacheManager.get(cacheKey, PredictionResult.class);
                if (cachedResult != null) {
                    logger.debug("Returning cached prediction for model: {}", modelId);
                    return cachedResult;
                }
                
                // Make prediction
                PredictionResult result = modelService.predict(modelId, request);
                
                // Cache the result
                cacheManager.put(cacheKey, result, 300); // 5 minutes TTL
                
                logger.debug("Prediction completed for model: {}", modelId);
                return result;
                
            } catch (Exception e) {
                logger.error("Error making prediction with model: {}", modelId, e);
                throw new RuntimeException("Prediction failed", e);
            }
        });
    }
    
    /**
     * Fallback method for predict operation
     */
    public CompletableFuture<PredictionResult> fallbackPredict(String modelId, PredictionRequest request, Exception ex) {
        logger.warn("Circuit breaker activated for model: {}, using fallback", modelId, ex);
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                // Try to get cached result
                String cacheKey = generateCacheKey(modelId, request);
                PredictionResult cachedResult = cacheManager.get(cacheKey, PredictionResult.class);
                if (cachedResult != null) {
                    logger.info("Returning cached prediction as fallback for model: {}", modelId);
                    return cachedResult;
                }
                
                // Use fallback service
                PredictionResult fallbackResult = fallbackService.predict(modelId, request);
                fallbackResult.setFallback(true);
                
                logger.info("Returning fallback prediction for model: {}", modelId);
                return fallbackResult;
                
            } catch (Exception e) {
                logger.error("Fallback prediction failed for model: {}", modelId, e);
                
                // Return default prediction
                PredictionResult defaultResult = new PredictionResult();
                defaultResult.setModelId(modelId);
                defaultResult.setFallback(true);
                defaultResult.setError("Service temporarily unavailable");
                
                return defaultResult;
            }
        });
    }
    
    /**
     * Resilient model training with circuit breaker
     */
    @CircuitBreaker(name = "model-training", fallbackMethod = "fallbackTraining")
    @Retry(name = "model-training")
    @TimeLimiter(name = "model-training")
    @Bulkhead(name = "model-training")
    public CompletableFuture<TrainingResult> trainModel(TrainingRequest request) {
        logger.info("Starting model training: {}", request.getModelName());
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                // Validate training request
                validateTrainingRequest(request);
                
                // Start training
                TrainingResult result = modelService.train(request);
                
                logger.info("Model training completed: {}", request.getModelName());
                return result;
                
            } catch (Exception e) {
                logger.error("Error training model: {}", request.getModelName(), e);
                throw new RuntimeException("Training failed", e);
            }
        });
    }
    
    /**
     * Fallback method for training operation
     */
    public CompletableFuture<TrainingResult> fallbackTraining(TrainingRequest request, Exception ex) {
        logger.warn("Circuit breaker activated for training: {}, using fallback", request.getModelName(), ex);
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                // Queue training request for later processing
                fallbackService.queueTrainingRequest(request);
                
                TrainingResult result = new TrainingResult();
                result.setModelName(request.getModelName());
                result.setStatus("QUEUED");
                result.setMessage("Training queued for later processing due to system unavailability");
                
                logger.info("Training request queued for: {}", request.getModelName());
                return result;
                
            } catch (Exception e) {
                logger.error("Fallback training failed for: {}", request.getModelName(), e);
                
                TrainingResult result = new TrainingResult();
                result.setModelName(request.getModelName());
                result.setStatus("FAILED");
                result.setMessage("Training failed due to system unavailability");
                
                return result;
            }
        });
    }
    
    /**
     * Resilient model deployment with circuit breaker
     */
    @CircuitBreaker(name = "model-deployment", fallbackMethod = "fallbackDeployment")
    @Retry(name = "model-deployment")
    @TimeLimiter(name = "model-deployment")
    @Bulkhead(name = "model-deployment")
    public CompletableFuture<DeploymentResult> deployModel(String modelId, DeploymentConfig config) {
        logger.info("Starting model deployment: {}", modelId);
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                // Validate deployment config
                validateDeploymentConfig(config);
                
                // Deploy model
                DeploymentResult result = modelService.deploy(modelId, config);
                
                logger.info("Model deployment completed: {}", modelId);
                return result;
                
            } catch (Exception e) {
                logger.error("Error deploying model: {}", modelId, e);
                throw new RuntimeException("Deployment failed", e);
            }
        });
    }
    
    /**
     * Fallback method for deployment operation
     */
    public CompletableFuture<DeploymentResult> fallbackDeployment(String modelId, DeploymentConfig config, Exception ex) {
        logger.warn("Circuit breaker activated for deployment: {}, using fallback", modelId, ex);
        
        return CompletableFuture.supplyAsync(() -> {
            try {
                // Queue deployment request for later processing
                fallbackService.queueDeploymentRequest(modelId, config);
                
                DeploymentResult result = new DeploymentResult();
                result.setModelId(modelId);
                result.setStatus("QUEUED");
                result.setMessage("Deployment queued for later processing due to system unavailability");
                
                logger.info("Deployment request queued for: {}", modelId);
                return result;
                
            } catch (Exception e) {
                logger.error("Fallback deployment failed for: {}", modelId, e);
                
                DeploymentResult result = new DeploymentResult();
                result.setModelId(modelId);
                result.setStatus("FAILED");
                result.setMessage("Deployment failed due to system unavailability");
                
                return result;
            }
        });
    }
    
    /**
     * Circuit breaker health check
     */
    public CircuitBreakerHealthInfo getCircuitBreakerHealth() {
        CircuitBreakerHealthInfo health = new CircuitBreakerHealthInfo();
        
        // Get circuit breaker states
        health.setInferenceState(getCircuitBreakerState("model-inference"));
        health.setTrainingState(getCircuitBreakerState("model-training"));
        health.setDeploymentState(getCircuitBreakerState("model-deployment"));
        
        return health;
    }
    
    private void validateTrainingRequest(TrainingRequest request) {
        if (request.getModelName() == null || request.getModelName().isEmpty()) {
            throw new IllegalArgumentException("Model name is required");
        }
        
        if (request.getDatasetId() == null || request.getDatasetId().isEmpty()) {
            throw new IllegalArgumentException("Dataset ID is required");
        }
        
        if (request.getAlgorithm() == null || request.getAlgorithm().isEmpty()) {
            throw new IllegalArgumentException("Algorithm is required");
        }
    }
    
    private void validateDeploymentConfig(DeploymentConfig config) {
        if (config.getEnvironment() == null || config.getEnvironment().isEmpty()) {
            throw new IllegalArgumentException("Deployment environment is required");
        }
        
        if (config.getResourceLimits() == null || config.getResourceLimits().isEmpty()) {
            throw new IllegalArgumentException("Resource limits are required");
        }
    }
    
    private String generateCacheKey(String modelId, PredictionRequest request) {
        return String.format("prediction_%s_%s", modelId, request.hashCode());
    }
    
    private String getCircuitBreakerState(String name) {
        // Implementation would get actual circuit breaker state from Resilience4j
        return "CLOSED"; // Placeholder
    }
    
    /**
     * Circuit breaker health information
     */
    public static class CircuitBreakerHealthInfo {
        private String inferenceState;
        private String trainingState;
        private String deploymentState;
        
        // Getters and setters
        public String getInferenceState() { return inferenceState; }
        public void setInferenceState(String inferenceState) { this.inferenceState = inferenceState; }
        
        public String getTrainingState() { return trainingState; }
        public void setTrainingState(String trainingState) { this.trainingState = trainingState; }
        
        public String getDeploymentState() { return deploymentState; }
        public void setDeploymentState(String deploymentState) { this.deploymentState = deploymentState; }
    }
}

Key Learning Points:

  • Microservices Decomposition: Breaking down ML systems into focused, independent services
  • Service Communication: Using Feign clients with fallback mechanisms for resilient communication
  • Event-Driven Architecture: Asynchronous processing and loose coupling through events
  • CQRS Pattern: Separating read and write operations for better scalability
  • Circuit Breaker Pattern: Implementing fault tolerance and graceful degradation
  • Resilience Patterns: Retry, timeout, and bulkhead patterns for robust systems
  • Distributed Caching: Multi-level caching strategies for performance optimization

This tutorial provides a comprehensive foundation for building enterprise-grade ML systems with proven architectural patterns. The final tutorial will cover ML optimization techniques and performance tuning strategies.