Course Content
Enterprise ML Patterns
Best practices for enterprise ML applications
Java Enterprise Patterns - Scalable ML Architecture
This tutorial covers enterprise-grade architectural patterns for machine learning systems using SuperML Java 2.1.0. Learn how to build scalable, maintainable, and resilient ML architectures that can handle enterprise-level workloads.
What Youβll Learn
- Microservices Architecture - ML service decomposition and communication
- Event-Driven Architecture - Asynchronous ML pipelines and event sourcing
- CQRS Pattern - Command Query Responsibility Segregation for ML systems
- Saga Pattern - Distributed transaction management
- Circuit Breaker - Fault tolerance and resilience
- API Gateway - Unified entry point and routing
- Service Discovery - Dynamic service registration and discovery
- Distributed Caching - Multi-level caching strategies
Prerequisites
- Completion of βJava Model Deploymentβ tutorial
- Spring Boot and Spring Cloud experience
- Microservices architecture knowledge
- Message queue systems (RabbitMQ, Kafka)
- Distributed systems understanding
Microservices Architecture
ML Service Decomposition
This example demonstrates how to decompose a monolithic ML system into microservices using domain-driven design principles.
package com.company.enterprise.microservices;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;
/**
* ML Model Service - Handles model training, evaluation, and management
* This service is responsible for the ML lifecycle management
*/
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
public class MLModelService {
public static void main(String[] args) {
SpringApplication.run(MLModelService.class, args);
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
package com.company.enterprise.microservices.model;
import com.superml.core.model.Model;
import com.superml.core.model.ModelManager;
import com.superml.core.data.Dataset;
import com.superml.core.evaluation.Evaluator;
import com.superml.core.training.TrainingConfig;
import com.superml.core.training.Trainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* ML Model Management Controller
* Handles model training, evaluation, and lifecycle management
*/
@RestController
@RequestMapping("/api/v1/models")
@RefreshScope
public class MLModelController {
private static final Logger logger = LoggerFactory.getLogger(MLModelController.class);
@Autowired
private ModelManager modelManager;
@Autowired
private DataServiceClient dataServiceClient;
@Autowired
private MetricsServiceClient metricsServiceClient;
@Autowired
private EventPublisher eventPublisher;
/**
* Train a new model asynchronously
*/
@PostMapping("/train")
public CompletableFuture<ResponseEntity<TrainingResponse>> trainModel(
@RequestBody TrainingRequest request) {
logger.info("Starting model training for: {}", request.getModelName());
return CompletableFuture.supplyAsync(() -> {
try {
// Publish training started event
eventPublisher.publishEvent(new ModelTrainingStartedEvent(
request.getModelName(), request.getDatasetId()));
// Get dataset from data service
Dataset dataset = dataServiceClient.getDataset(request.getDatasetId());
// Configure training
TrainingConfig config = new TrainingConfig()
.setAlgorithm(request.getAlgorithm())
.setHyperparameters(request.getHyperparameters())
.setValidationSplit(0.2)
.setEarlyStoppingPatience(10)
.setMetricsToTrack(request.getMetrics());
// Train model
Trainer trainer = new Trainer(config);
Model model = trainer.train(dataset);
// Evaluate model
Evaluator evaluator = new Evaluator();
Map<String, Double> metrics = evaluator.evaluate(model, dataset);
// Save model
String modelId = modelManager.saveModel(model, request.getModelName());
// Send metrics to metrics service
metricsServiceClient.recordTrainingMetrics(modelId, metrics);
// Publish training completed event
eventPublisher.publishEvent(new ModelTrainingCompletedEvent(
request.getModelName(), modelId, metrics));
TrainingResponse response = new TrainingResponse(
modelId, request.getModelName(), metrics, "COMPLETED");
logger.info("Model training completed: {}", modelId);
return ResponseEntity.ok(response);
} catch (Exception e) {
logger.error("Model training failed: {}", request.getModelName(), e);
// Publish training failed event
eventPublisher.publishEvent(new ModelTrainingFailedEvent(
request.getModelName(), e.getMessage()));
TrainingResponse response = new TrainingResponse(
null, request.getModelName(), null, "FAILED");
return ResponseEntity.badRequest().body(response);
}
});
}
/**
* Get model information
*/
@GetMapping("/{modelId}")
public ResponseEntity<ModelInfo> getModel(@PathVariable String modelId) {
try {
Model model = modelManager.getModel(modelId);
if (model == null) {
return ResponseEntity.notFound().build();
}
ModelInfo info = new ModelInfo(
modelId,
model.getName(),
model.getAlgorithm(),
model.getCreatedAt(),
model.getVersion(),
model.getMetrics()
);
return ResponseEntity.ok(info);
} catch (Exception e) {
logger.error("Error retrieving model: {}", modelId, e);
return ResponseEntity.badRequest().build();
}
}
/**
* List all models
*/
@GetMapping
public ResponseEntity<List<ModelInfo>> listModels(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
try {
List<ModelInfo> models = modelManager.listModels(page, size);
return ResponseEntity.ok(models);
} catch (Exception e) {
logger.error("Error listing models", e);
return ResponseEntity.badRequest().build();
}
}
/**
* Delete model
*/
@DeleteMapping("/{modelId}")
public ResponseEntity<Void> deleteModel(@PathVariable String modelId) {
try {
modelManager.deleteModel(modelId);
// Publish model deleted event
eventPublisher.publishEvent(new ModelDeletedEvent(modelId));
return ResponseEntity.noContent().build();
} catch (Exception e) {
logger.error("Error deleting model: {}", modelId, e);
return ResponseEntity.badRequest().build();
}
}
/**
* Update model metadata
*/
@PutMapping("/{modelId}")
public ResponseEntity<ModelInfo> updateModel(
@PathVariable String modelId,
@RequestBody ModelUpdateRequest request) {
try {
Model model = modelManager.getModel(modelId);
if (model == null) {
return ResponseEntity.notFound().build();
}
// Update model metadata
model.setName(request.getName());
model.setDescription(request.getDescription());
model.setTags(request.getTags());
modelManager.updateModel(model);
// Publish model updated event
eventPublisher.publishEvent(new ModelUpdatedEvent(modelId));
ModelInfo info = new ModelInfo(
modelId,
model.getName(),
model.getAlgorithm(),
model.getCreatedAt(),
model.getVersion(),
model.getMetrics()
);
return ResponseEntity.ok(info);
} catch (Exception e) {
logger.error("Error updating model: {}", modelId, e);
return ResponseEntity.badRequest().build();
}
}
// Request/Response DTOs
public static class TrainingRequest {
private String modelName;
private String datasetId;
private String algorithm;
private Map<String, Object> hyperparameters;
private List<String> metrics;
// Getters and setters
public String getModelName() { return modelName; }
public void setModelName(String modelName) { this.modelName = modelName; }
public String getDatasetId() { return datasetId; }
public void setDatasetId(String datasetId) { this.datasetId = datasetId; }
public String getAlgorithm() { return algorithm; }
public void setAlgorithm(String algorithm) { this.algorithm = algorithm; }
public Map<String, Object> getHyperparameters() { return hyperparameters; }
public void setHyperparameters(Map<String, Object> hyperparameters) { this.hyperparameters = hyperparameters; }
public List<String> getMetrics() { return metrics; }
public void setMetrics(List<String> metrics) { this.metrics = metrics; }
}
public static class TrainingResponse {
private String modelId;
private String modelName;
private Map<String, Double> metrics;
private String status;
public TrainingResponse(String modelId, String modelName,
Map<String, Double> metrics, String status) {
this.modelId = modelId;
this.modelName = modelName;
this.metrics = metrics;
this.status = status;
}
// Getters and setters
public String getModelId() { return modelId; }
public void setModelId(String modelId) { this.modelId = modelId; }
public String getModelName() { return modelName; }
public void setModelName(String modelName) { this.modelName = modelName; }
public Map<String, Double> getMetrics() { return metrics; }
public void setMetrics(Map<String, Double> metrics) { this.metrics = metrics; }
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
}
public static class ModelInfo {
private String modelId;
private String name;
private String algorithm;
private String createdAt;
private String version;
private Map<String, Double> metrics;
public ModelInfo(String modelId, String name, String algorithm,
String createdAt, String version, Map<String, Double> metrics) {
this.modelId = modelId;
this.name = name;
this.algorithm = algorithm;
this.createdAt = createdAt;
this.version = version;
this.metrics = metrics;
}
// Getters and setters
public String getModelId() { return modelId; }
public void setModelId(String modelId) { this.modelId = modelId; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getAlgorithm() { return algorithm; }
public void setAlgorithm(String algorithm) { this.algorithm = algorithm; }
public String getCreatedAt() { return createdAt; }
public void setCreatedAt(String createdAt) { this.createdAt = createdAt; }
public String getVersion() { return version; }
public void setVersion(String version) { this.version = version; }
public Map<String, Double> getMetrics() { return metrics; }
public void setMetrics(Map<String, Double> metrics) { this.metrics = metrics; }
}
public static class ModelUpdateRequest {
private String name;
private String description;
private List<String> tags;
// Getters and setters
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
public List<String> getTags() { return tags; }
public void setTags(List<String> tags) { this.tags = tags; }
}
}
Service Communication with Feign
package com.company.enterprise.microservices.clients;
import com.superml.core.data.Dataset;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
import java.util.Map;
/**
* Data Service Client for inter-service communication
* Handles data retrieval and preprocessing operations
*/
@FeignClient(name = "data-service", fallback = DataServiceClientFallback.class)
public interface DataServiceClient {
@GetMapping("/api/v1/datasets/{datasetId}")
Dataset getDataset(@PathVariable String datasetId);
@PostMapping("/api/v1/datasets/preprocess")
Dataset preprocessDataset(@RequestBody PreprocessingRequest request);
@GetMapping("/api/v1/datasets/{datasetId}/features")
List<String> getFeatures(@PathVariable String datasetId);
@GetMapping("/api/v1/datasets/{datasetId}/statistics")
Map<String, Object> getStatistics(@PathVariable String datasetId);
}
/**
* Fallback implementation for data service client
* Provides graceful degradation when data service is unavailable
*/
@Component
public class DataServiceClientFallback implements DataServiceClient {
private static final Logger logger = LoggerFactory.getLogger(DataServiceClientFallback.class);
@Override
public Dataset getDataset(String datasetId) {
logger.warn("Data service unavailable, returning empty dataset for ID: {}", datasetId);
return new Dataset(); // Return empty dataset or cached version
}
@Override
public Dataset preprocessDataset(PreprocessingRequest request) {
logger.warn("Data service unavailable, skipping preprocessing");
return new Dataset(); // Return unprocessed dataset
}
@Override
public List<String> getFeatures(String datasetId) {
logger.warn("Data service unavailable, returning empty features list");
return new ArrayList<>();
}
@Override
public Map<String, Object> getStatistics(String datasetId) {
logger.warn("Data service unavailable, returning empty statistics");
return new HashMap<>();
}
}
/**
* Metrics Service Client for tracking ML metrics
*/
@FeignClient(name = "metrics-service", fallback = MetricsServiceClientFallback.class)
public interface MetricsServiceClient {
@PostMapping("/api/v1/metrics/training")
void recordTrainingMetrics(@RequestBody TrainingMetrics metrics);
@PostMapping("/api/v1/metrics/inference")
void recordInferenceMetrics(@RequestBody InferenceMetrics metrics);
@GetMapping("/api/v1/metrics/model/{modelId}")
List<MetricPoint> getModelMetrics(@PathVariable String modelId);
}
@Component
public class MetricsServiceClientFallback implements MetricsServiceClient {
private static final Logger logger = LoggerFactory.getLogger(MetricsServiceClientFallback.class);
@Override
public void recordTrainingMetrics(TrainingMetrics metrics) {
logger.warn("Metrics service unavailable, metrics not recorded");
}
@Override
public void recordInferenceMetrics(InferenceMetrics metrics) {
logger.warn("Metrics service unavailable, metrics not recorded");
}
@Override
public List<MetricPoint> getModelMetrics(String modelId) {
logger.warn("Metrics service unavailable, returning empty metrics");
return new ArrayList<>();
}
}
Event-Driven Architecture
Event Publisher and Handlers
package com.company.enterprise.events;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* Event Publisher for ML system events
* Publishes events to message queue for asynchronous processing
*/
@Component
public class EventPublisher {
private static final Logger logger = LoggerFactory.getLogger(EventPublisher.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
private static final String EXCHANGE_NAME = "ml.events";
/**
* Publish ML event to message queue
*/
public void publishEvent(MLEvent event) {
try {
String eventJson = objectMapper.writeValueAsString(event);
String routingKey = event.getEventType().toLowerCase() + "." + event.getSource();
rabbitTemplate.convertAndSend(EXCHANGE_NAME, routingKey, eventJson);
logger.info("Published event: {} with routing key: {}", event.getEventType(), routingKey);
} catch (Exception e) {
logger.error("Failed to publish event: {}", event, e);
}
}
}
/**
* Base ML Event class
*/
public abstract class MLEvent {
private String eventId;
private String eventType;
private String source;
private String timestamp;
private Map<String, Object> metadata;
public MLEvent(String eventType, String source) {
this.eventId = UUID.randomUUID().toString();
this.eventType = eventType;
this.source = source;
this.timestamp = Instant.now().toString();
this.metadata = new HashMap<>();
}
// Getters and setters
public String getEventId() { return eventId; }
public void setEventId(String eventId) { this.eventId = eventId; }
public String getEventType() { return eventType; }
public void setEventType(String eventType) { this.eventType = eventType; }
public String getSource() { return source; }
public void setSource(String source) { this.source = source; }
public String getTimestamp() { return timestamp; }
public void setTimestamp(String timestamp) { this.timestamp = timestamp; }
public Map<String, Object> getMetadata() { return metadata; }
public void setMetadata(Map<String, Object> metadata) { this.metadata = metadata; }
}
/**
* Model Training Started Event
*/
public class ModelTrainingStartedEvent extends MLEvent {
private String modelName;
private String datasetId;
public ModelTrainingStartedEvent(String modelName, String datasetId) {
super("MODEL_TRAINING_STARTED", "model-service");
this.modelName = modelName;
this.datasetId = datasetId;
}
// Getters and setters
public String getModelName() { return modelName; }
public void setModelName(String modelName) { this.modelName = modelName; }
public String getDatasetId() { return datasetId; }
public void setDatasetId(String datasetId) { this.datasetId = datasetId; }
}
/**
* Model Training Completed Event
*/
public class ModelTrainingCompletedEvent extends MLEvent {
private String modelName;
private String modelId;
private Map<String, Double> metrics;
public ModelTrainingCompletedEvent(String modelName, String modelId, Map<String, Double> metrics) {
super("MODEL_TRAINING_COMPLETED", "model-service");
this.modelName = modelName;
this.modelId = modelId;
this.metrics = metrics;
}
// Getters and setters
public String getModelName() { return modelName; }
public void setModelName(String modelName) { this.modelName = modelName; }
public String getModelId() { return modelId; }
public void setModelId(String modelId) { this.modelId = modelId; }
public Map<String, Double> getMetrics() { return metrics; }
public void setMetrics(Map<String, Double> metrics) { this.metrics = metrics; }
}
/**
* Event Handlers for ML system events
*/
@Component
public class MLEventHandlers {
private static final Logger logger = LoggerFactory.getLogger(MLEventHandlers.class);
@Autowired
private ModelManager modelManager;
@Autowired
private NotificationService notificationService;
@Autowired
private ModelVersioningService versioningService;
/**
* Handle model training completed event
*/
@RabbitListener(queues = "ml.model.training.completed")
public void handleModelTrainingCompleted(String eventJson) {
try {
ObjectMapper mapper = new ObjectMapper();
ModelTrainingCompletedEvent event = mapper.readValue(eventJson, ModelTrainingCompletedEvent.class);
logger.info("Handling model training completed event: {}", event.getModelId());
// Create model version
versioningService.createVersion(event.getModelId(), event.getMetrics());
// Send notification
notificationService.sendTrainingCompletedNotification(
event.getModelName(), event.getModelId(), event.getMetrics());
// Check if model meets deployment criteria
if (meetsDeploymentCriteria(event.getMetrics())) {
// Trigger deployment process
triggerModelDeployment(event.getModelId());
}
} catch (Exception e) {
logger.error("Error handling model training completed event", e);
}
}
/**
* Handle model training failed event
*/
@RabbitListener(queues = "ml.model.training.failed")
public void handleModelTrainingFailed(String eventJson) {
try {
ObjectMapper mapper = new ObjectMapper();
ModelTrainingFailedEvent event = mapper.readValue(eventJson, ModelTrainingFailedEvent.class);
logger.info("Handling model training failed event: {}", event.getModelName());
// Send failure notification
notificationService.sendTrainingFailedNotification(
event.getModelName(), event.getErrorMessage());
// Record failure metrics
recordTrainingFailure(event.getModelName(), event.getErrorMessage());
} catch (Exception e) {
logger.error("Error handling model training failed event", e);
}
}
/**
* Handle data drift detection event
*/
@RabbitListener(queues = "ml.data.drift.detected")
public void handleDataDriftDetected(String eventJson) {
try {
ObjectMapper mapper = new ObjectMapper();
DataDriftDetectedEvent event = mapper.readValue(eventJson, DataDriftDetectedEvent.class);
logger.info("Handling data drift detected event: {}", event.getModelId());
// Send alert
notificationService.sendDataDriftAlert(event.getModelId(), event.getDriftScore());
// Trigger model retraining if drift is severe
if (event.getDriftScore() > 0.8) {
triggerModelRetraining(event.getModelId());
}
} catch (Exception e) {
logger.error("Error handling data drift detected event", e);
}
}
private boolean meetsDeploymentCriteria(Map<String, Double> metrics) {
// Check if model meets minimum performance criteria
Double accuracy = metrics.get("accuracy");
Double precision = metrics.get("precision");
Double recall = metrics.get("recall");
return accuracy != null && accuracy > 0.8 &&
precision != null && precision > 0.75 &&
recall != null && recall > 0.75;
}
private void triggerModelDeployment(String modelId) {
logger.info("Triggering model deployment for: {}", modelId);
// Implementation would trigger deployment pipeline
}
private void recordTrainingFailure(String modelName, String errorMessage) {
logger.info("Recording training failure for: {}", modelName);
// Implementation would record failure metrics
}
private void triggerModelRetraining(String modelId) {
logger.info("Triggering model retraining for: {}", modelId);
// Implementation would trigger retraining pipeline
}
}
CQRS Pattern Implementation
Command and Query Separation
package com.company.enterprise.cqrs;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Autowired;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* Command Handler for ML Model operations
* Handles write operations and state changes
*/
@Component
public class MLModelCommandHandler {
private static final Logger logger = LoggerFactory.getLogger(MLModelCommandHandler.class);
@Autowired
private ModelWriteRepository modelWriteRepository;
@Autowired
private EventPublisher eventPublisher;
@Autowired
private CommandValidator commandValidator;
/**
* Handle train model command
*/
public CompletableFuture<String> handle(TrainModelCommand command) {
logger.info("Handling train model command: {}", command.getModelName());
return CompletableFuture.supplyAsync(() -> {
try {
// Validate command
commandValidator.validate(command);
// Create model entity
ModelEntity model = new ModelEntity(
command.getModelName(),
command.getAlgorithm(),
command.getDatasetId(),
command.getHyperparameters()
);
// Save to write store
String modelId = modelWriteRepository.save(model);
// Publish domain event
eventPublisher.publishEvent(new ModelTrainingStartedEvent(
command.getModelName(), command.getDatasetId()));
logger.info("Model training command handled successfully: {}", modelId);
return modelId;
} catch (Exception e) {
logger.error("Error handling train model command", e);
throw new RuntimeException("Command handling failed", e);
}
});
}
/**
* Handle update model command
*/
public CompletableFuture<Void> handle(UpdateModelCommand command) {
logger.info("Handling update model command: {}", command.getModelId());
return CompletableFuture.runAsync(() -> {
try {
// Validate command
commandValidator.validate(command);
// Get existing model
ModelEntity model = modelWriteRepository.findById(command.getModelId());
if (model == null) {
throw new IllegalArgumentException("Model not found: " + command.getModelId());
}
// Update model
model.setName(command.getName());
model.setDescription(command.getDescription());
model.setTags(command.getTags());
model.setUpdatedAt(java.time.Instant.now());
// Save updated model
modelWriteRepository.update(model);
// Publish domain event
eventPublisher.publishEvent(new ModelUpdatedEvent(command.getModelId()));
logger.info("Model update command handled successfully: {}", command.getModelId());
} catch (Exception e) {
logger.error("Error handling update model command", e);
throw new RuntimeException("Command handling failed", e);
}
});
}
/**
* Handle delete model command
*/
public CompletableFuture<Void> handle(DeleteModelCommand command) {
logger.info("Handling delete model command: {}", command.getModelId());
return CompletableFuture.runAsync(() -> {
try {
// Validate command
commandValidator.validate(command);
// Check if model exists
ModelEntity model = modelWriteRepository.findById(command.getModelId());
if (model == null) {
throw new IllegalArgumentException("Model not found: " + command.getModelId());
}
// Soft delete model
model.setDeleted(true);
model.setDeletedAt(java.time.Instant.now());
modelWriteRepository.update(model);
// Publish domain event
eventPublisher.publishEvent(new ModelDeletedEvent(command.getModelId()));
logger.info("Model delete command handled successfully: {}", command.getModelId());
} catch (Exception e) {
logger.error("Error handling delete model command", e);
throw new RuntimeException("Command handling failed", e);
}
});
}
}
/**
* Query Handler for ML Model operations
* Handles read operations and data retrieval
*/
@Component
public class MLModelQueryHandler {
private static final Logger logger = LoggerFactory.getLogger(MLModelQueryHandler.class);
@Autowired
private ModelReadRepository modelReadRepository;
@Autowired
private CacheManager cacheManager;
/**
* Handle get model query
*/
public ModelReadModel handle(GetModelQuery query) {
logger.debug("Handling get model query: {}", query.getModelId());
// Check cache first
ModelReadModel cachedModel = cacheManager.get(query.getModelId(), ModelReadModel.class);
if (cachedModel != null) {
logger.debug("Returning cached model: {}", query.getModelId());
return cachedModel;
}
// Get from read repository
ModelReadModel model = modelReadRepository.findById(query.getModelId());
if (model != null) {
// Cache the result
cacheManager.put(query.getModelId(), model, 300); // 5 minutes TTL
}
return model;
}
/**
* Handle search models query
*/
public List<ModelReadModel> handle(SearchModelsQuery query) {
logger.debug("Handling search models query: {}", query.getSearchCriteria());
return modelReadRepository.search(
query.getSearchCriteria(),
query.getPage(),
query.getSize(),
query.getSortBy(),
query.getSortOrder()
);
}
/**
* Handle get model metrics query
*/
public List<MetricPoint> handle(GetModelMetricsQuery query) {
logger.debug("Handling get model metrics query: {}", query.getModelId());
return modelReadRepository.getMetrics(
query.getModelId(),
query.getStartTime(),
query.getEndTime()
);
}
/**
* Handle get model performance query
*/
public ModelPerformanceReadModel handle(GetModelPerformanceQuery query) {
logger.debug("Handling get model performance query: {}", query.getModelId());
// Check cache first
String cacheKey = "performance_" + query.getModelId();
ModelPerformanceReadModel cachedPerformance = cacheManager.get(cacheKey, ModelPerformanceReadModel.class);
if (cachedPerformance != null) {
return cachedPerformance;
}
// Get from read repository
ModelPerformanceReadModel performance = modelReadRepository.getPerformance(query.getModelId());
if (performance != null) {
// Cache the result
cacheManager.put(cacheKey, performance, 600); // 10 minutes TTL
}
return performance;
}
}
/**
* Command classes for ML operations
*/
public class TrainModelCommand {
private String modelName;
private String algorithm;
private String datasetId;
private Map<String, Object> hyperparameters;
public TrainModelCommand(String modelName, String algorithm, String datasetId, Map<String, Object> hyperparameters) {
this.modelName = modelName;
this.algorithm = algorithm;
this.datasetId = datasetId;
this.hyperparameters = hyperparameters;
}
// Getters and setters
public String getModelName() { return modelName; }
public void setModelName(String modelName) { this.modelName = modelName; }
public String getAlgorithm() { return algorithm; }
public void setAlgorithm(String algorithm) { this.algorithm = algorithm; }
public String getDatasetId() { return datasetId; }
public void setDatasetId(String datasetId) { this.datasetId = datasetId; }
public Map<String, Object> getHyperparameters() { return hyperparameters; }
public void setHyperparameters(Map<String, Object> hyperparameters) { this.hyperparameters = hyperparameters; }
}
public class UpdateModelCommand {
private String modelId;
private String name;
private String description;
private List<String> tags;
public UpdateModelCommand(String modelId, String name, String description, List<String> tags) {
this.modelId = modelId;
this.name = name;
this.description = description;
this.tags = tags;
}
// Getters and setters
public String getModelId() { return modelId; }
public void setModelId(String modelId) { this.modelId = modelId; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
public List<String> getTags() { return tags; }
public void setTags(List<String> tags) { this.tags = tags; }
}
public class DeleteModelCommand {
private String modelId;
public DeleteModelCommand(String modelId) {
this.modelId = modelId;
}
// Getters and setters
public String getModelId() { return modelId; }
public void setModelId(String modelId) { this.modelId = modelId; }
}
/**
* Query classes for ML operations
*/
public class GetModelQuery {
private String modelId;
public GetModelQuery(String modelId) {
this.modelId = modelId;
}
// Getters and setters
public String getModelId() { return modelId; }
public void setModelId(String modelId) { this.modelId = modelId; }
}
public class SearchModelsQuery {
private Map<String, Object> searchCriteria;
private int page;
private int size;
private String sortBy;
private String sortOrder;
public SearchModelsQuery(Map<String, Object> searchCriteria, int page, int size, String sortBy, String sortOrder) {
this.searchCriteria = searchCriteria;
this.page = page;
this.size = size;
this.sortBy = sortBy;
this.sortOrder = sortOrder;
}
// Getters and setters
public Map<String, Object> getSearchCriteria() { return searchCriteria; }
public void setSearchCriteria(Map<String, Object> searchCriteria) { this.searchCriteria = searchCriteria; }
public int getPage() { return page; }
public void setPage(int page) { this.page = page; }
public int getSize() { return size; }
public void setSize(int size) { this.size = size; }
public String getSortBy() { return sortBy; }
public void setSortBy(String sortBy) { this.sortBy = sortBy; }
public String getSortOrder() { return sortOrder; }
public void setSortOrder(String sortOrder) { this.sortOrder = sortOrder; }
}
public class GetModelMetricsQuery {
private String modelId;
private java.time.Instant startTime;
private java.time.Instant endTime;
public GetModelMetricsQuery(String modelId, java.time.Instant startTime, java.time.Instant endTime) {
this.modelId = modelId;
this.startTime = startTime;
this.endTime = endTime;
}
// Getters and setters
public String getModelId() { return modelId; }
public void setModelId(String modelId) { this.modelId = modelId; }
public java.time.Instant getStartTime() { return startTime; }
public void setStartTime(java.time.Instant startTime) { this.startTime = startTime; }
public java.time.Instant getEndTime() { return endTime; }
public void setEndTime(java.time.Instant endTime) { this.endTime = endTime; }
}
public class GetModelPerformanceQuery {
private String modelId;
public GetModelPerformanceQuery(String modelId) {
this.modelId = modelId;
}
// Getters and setters
public String getModelId() { return modelId; }
public void setModelId(String modelId) { this.modelId = modelId; }
}
Circuit Breaker Pattern
Fault Tolerance Implementation
package com.company.enterprise.resilience;
import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker;
import io.github.resilience4j.retry.annotation.Retry;
import io.github.resilience4j.timelimiter.annotation.TimeLimiter;
import io.github.resilience4j.bulkhead.annotation.Bulkhead;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* Resilient ML Service with Circuit Breaker pattern
* Provides fault tolerance and graceful degradation
*/
@Service
public class ResilientMLService {
private static final Logger logger = LoggerFactory.getLogger(ResilientMLService.class);
@Autowired
private MLModelService modelService;
@Autowired
private CacheManager cacheManager;
@Autowired
private FallbackMLService fallbackService;
/**
* Resilient model prediction with circuit breaker
*/
@CircuitBreaker(name = "model-inference", fallbackMethod = "fallbackPredict")
@Retry(name = "model-inference")
@TimeLimiter(name = "model-inference")
@Bulkhead(name = "model-inference")
public CompletableFuture<PredictionResult> predict(String modelId, PredictionRequest request) {
logger.debug("Making prediction with model: {}", modelId);
return CompletableFuture.supplyAsync(() -> {
try {
// Check cache first
String cacheKey = generateCacheKey(modelId, request);
PredictionResult cachedResult = cacheManager.get(cacheKey, PredictionResult.class);
if (cachedResult != null) {
logger.debug("Returning cached prediction for model: {}", modelId);
return cachedResult;
}
// Make prediction
PredictionResult result = modelService.predict(modelId, request);
// Cache the result
cacheManager.put(cacheKey, result, 300); // 5 minutes TTL
logger.debug("Prediction completed for model: {}", modelId);
return result;
} catch (Exception e) {
logger.error("Error making prediction with model: {}", modelId, e);
throw new RuntimeException("Prediction failed", e);
}
});
}
/**
* Fallback method for predict operation
*/
public CompletableFuture<PredictionResult> fallbackPredict(String modelId, PredictionRequest request, Exception ex) {
logger.warn("Circuit breaker activated for model: {}, using fallback", modelId, ex);
return CompletableFuture.supplyAsync(() -> {
try {
// Try to get cached result
String cacheKey = generateCacheKey(modelId, request);
PredictionResult cachedResult = cacheManager.get(cacheKey, PredictionResult.class);
if (cachedResult != null) {
logger.info("Returning cached prediction as fallback for model: {}", modelId);
return cachedResult;
}
// Use fallback service
PredictionResult fallbackResult = fallbackService.predict(modelId, request);
fallbackResult.setFallback(true);
logger.info("Returning fallback prediction for model: {}", modelId);
return fallbackResult;
} catch (Exception e) {
logger.error("Fallback prediction failed for model: {}", modelId, e);
// Return default prediction
PredictionResult defaultResult = new PredictionResult();
defaultResult.setModelId(modelId);
defaultResult.setFallback(true);
defaultResult.setError("Service temporarily unavailable");
return defaultResult;
}
});
}
/**
* Resilient model training with circuit breaker
*/
@CircuitBreaker(name = "model-training", fallbackMethod = "fallbackTraining")
@Retry(name = "model-training")
@TimeLimiter(name = "model-training")
@Bulkhead(name = "model-training")
public CompletableFuture<TrainingResult> trainModel(TrainingRequest request) {
logger.info("Starting model training: {}", request.getModelName());
return CompletableFuture.supplyAsync(() -> {
try {
// Validate training request
validateTrainingRequest(request);
// Start training
TrainingResult result = modelService.train(request);
logger.info("Model training completed: {}", request.getModelName());
return result;
} catch (Exception e) {
logger.error("Error training model: {}", request.getModelName(), e);
throw new RuntimeException("Training failed", e);
}
});
}
/**
* Fallback method for training operation
*/
public CompletableFuture<TrainingResult> fallbackTraining(TrainingRequest request, Exception ex) {
logger.warn("Circuit breaker activated for training: {}, using fallback", request.getModelName(), ex);
return CompletableFuture.supplyAsync(() -> {
try {
// Queue training request for later processing
fallbackService.queueTrainingRequest(request);
TrainingResult result = new TrainingResult();
result.setModelName(request.getModelName());
result.setStatus("QUEUED");
result.setMessage("Training queued for later processing due to system unavailability");
logger.info("Training request queued for: {}", request.getModelName());
return result;
} catch (Exception e) {
logger.error("Fallback training failed for: {}", request.getModelName(), e);
TrainingResult result = new TrainingResult();
result.setModelName(request.getModelName());
result.setStatus("FAILED");
result.setMessage("Training failed due to system unavailability");
return result;
}
});
}
/**
* Resilient model deployment with circuit breaker
*/
@CircuitBreaker(name = "model-deployment", fallbackMethod = "fallbackDeployment")
@Retry(name = "model-deployment")
@TimeLimiter(name = "model-deployment")
@Bulkhead(name = "model-deployment")
public CompletableFuture<DeploymentResult> deployModel(String modelId, DeploymentConfig config) {
logger.info("Starting model deployment: {}", modelId);
return CompletableFuture.supplyAsync(() -> {
try {
// Validate deployment config
validateDeploymentConfig(config);
// Deploy model
DeploymentResult result = modelService.deploy(modelId, config);
logger.info("Model deployment completed: {}", modelId);
return result;
} catch (Exception e) {
logger.error("Error deploying model: {}", modelId, e);
throw new RuntimeException("Deployment failed", e);
}
});
}
/**
* Fallback method for deployment operation
*/
public CompletableFuture<DeploymentResult> fallbackDeployment(String modelId, DeploymentConfig config, Exception ex) {
logger.warn("Circuit breaker activated for deployment: {}, using fallback", modelId, ex);
return CompletableFuture.supplyAsync(() -> {
try {
// Queue deployment request for later processing
fallbackService.queueDeploymentRequest(modelId, config);
DeploymentResult result = new DeploymentResult();
result.setModelId(modelId);
result.setStatus("QUEUED");
result.setMessage("Deployment queued for later processing due to system unavailability");
logger.info("Deployment request queued for: {}", modelId);
return result;
} catch (Exception e) {
logger.error("Fallback deployment failed for: {}", modelId, e);
DeploymentResult result = new DeploymentResult();
result.setModelId(modelId);
result.setStatus("FAILED");
result.setMessage("Deployment failed due to system unavailability");
return result;
}
});
}
/**
* Circuit breaker health check
*/
public CircuitBreakerHealthInfo getCircuitBreakerHealth() {
CircuitBreakerHealthInfo health = new CircuitBreakerHealthInfo();
// Get circuit breaker states
health.setInferenceState(getCircuitBreakerState("model-inference"));
health.setTrainingState(getCircuitBreakerState("model-training"));
health.setDeploymentState(getCircuitBreakerState("model-deployment"));
return health;
}
private void validateTrainingRequest(TrainingRequest request) {
if (request.getModelName() == null || request.getModelName().isEmpty()) {
throw new IllegalArgumentException("Model name is required");
}
if (request.getDatasetId() == null || request.getDatasetId().isEmpty()) {
throw new IllegalArgumentException("Dataset ID is required");
}
if (request.getAlgorithm() == null || request.getAlgorithm().isEmpty()) {
throw new IllegalArgumentException("Algorithm is required");
}
}
private void validateDeploymentConfig(DeploymentConfig config) {
if (config.getEnvironment() == null || config.getEnvironment().isEmpty()) {
throw new IllegalArgumentException("Deployment environment is required");
}
if (config.getResourceLimits() == null || config.getResourceLimits().isEmpty()) {
throw new IllegalArgumentException("Resource limits are required");
}
}
private String generateCacheKey(String modelId, PredictionRequest request) {
return String.format("prediction_%s_%s", modelId, request.hashCode());
}
private String getCircuitBreakerState(String name) {
// Implementation would get actual circuit breaker state from Resilience4j
return "CLOSED"; // Placeholder
}
/**
* Circuit breaker health information
*/
public static class CircuitBreakerHealthInfo {
private String inferenceState;
private String trainingState;
private String deploymentState;
// Getters and setters
public String getInferenceState() { return inferenceState; }
public void setInferenceState(String inferenceState) { this.inferenceState = inferenceState; }
public String getTrainingState() { return trainingState; }
public void setTrainingState(String trainingState) { this.trainingState = trainingState; }
public String getDeploymentState() { return deploymentState; }
public void setDeploymentState(String deploymentState) { this.deploymentState = deploymentState; }
}
}
Key Learning Points:
- Microservices Decomposition: Breaking down ML systems into focused, independent services
- Service Communication: Using Feign clients with fallback mechanisms for resilient communication
- Event-Driven Architecture: Asynchronous processing and loose coupling through events
- CQRS Pattern: Separating read and write operations for better scalability
- Circuit Breaker Pattern: Implementing fault tolerance and graceful degradation
- Resilience Patterns: Retry, timeout, and bulkhead patterns for robust systems
- Distributed Caching: Multi-level caching strategies for performance optimization
This tutorial provides a comprehensive foundation for building enterprise-grade ML systems with proven architectural patterns. The final tutorial will cover ML optimization techniques and performance tuning strategies.