Skip to content
Snippets Groups Projects
Commit a0d53104 authored by Zizhe Wang's avatar Zizhe Wang
Browse files

feat support single-objective optimization

parent e049784a
No related branches found
No related tags found
No related merge requests found
# Copyright (c) 2024 - Zizhe Wang
# https://zizhe.wang
####################################
# #
# AUTOMATIC SEARCH SPACE REDUCTION #
# #
####################################
import numpy as np
from sklearn.cluster import KMeans
from scipy.stats.qmc import LatinHypercube as lhs
from config import PARAM_TYPES, PARAM_BOUNDS
# Initial Sampling
def initial_sampling(param_bounds, n_samples):
dimensions = len(param_bounds)
samples = lhs(d=dimensions).random(n=n_samples) # Latin hypercube sampling (LHS)
if samples.size == 0:
raise ValueError("Initial sampling produced an empty set of samples.")
# Scale samples to parameter bounds and respect parameter types
for i, (param, bounds_info) in enumerate(param_bounds.items()):
bounds = bounds_info["bounds"]
samples[:, i] = bounds[0] + samples[:, i] * (bounds[1] - bounds[0])
if bounds_info["type"] == 'int':
samples[:, i] = np.round(samples[:, i]).astype(int)
print(f"Initial samples shape: {samples.shape}")
return samples
# Evaluate Samples
def evaluate_samples(samples, objective_function):
results = []
for sample in samples:
result = objective_function(sample)
results.append(result)
return np.array(results)
# Advanced Clustering
def advanced_clustering_samples(samples, n_clusters):
if len(samples) == 0:
raise ValueError("Cannot cluster an empty set of samples.")
kmeans = KMeans(n_clusters=n_clusters, random_state=0, n_init=10)
kmeans.fit(samples)
labels = kmeans.predict(samples)
centers = kmeans.cluster_centers_
return labels, centers
# Adaptive Selection with Adaptive Threshold
def adaptive_select_informative_instances(samples, results, initial_threshold=0.05, adapt_rate=0.01, desired_samples=None, max_iterations=100):
if len(samples) == 0 or len(results) == 0:
raise ValueError("Received empty samples or results for selection.")
performance = np.nanmean(results, axis=1) # Use np.nanmean to ignore nan values
threshold = initial_threshold
iteration = 0
while iteration < max_iterations:
iteration += 1
print(f"Iteration {iteration}: Current threshold: {threshold}")
# Cap the threshold at 1.0
effective_threshold = min(threshold, 1.0)
cutoff = np.nanpercentile(performance, effective_threshold * 100) # Use np.nanpercentile to ignore nan values
selected_samples = samples[performance <= cutoff]
print(f"Iteration {iteration}: Number of selected samples: {len(selected_samples)}")
if desired_samples is not None and len(selected_samples) >= desired_samples:
print(f"Iteration {iteration}: Desired number of samples reached.")
break
if len(selected_samples) == len(samples):
print(f"Iteration {iteration}: All samples selected.")
break
threshold += adapt_rate
if iteration == max_iterations:
print(f"Final threshold after max iterations: {threshold}")
print(f"Performance values: {performance}")
print(f"Number of selected samples: {len(selected_samples)}")
if desired_samples is not None and len(selected_samples) < desired_samples:
print("Falling back to the best available samples.")
# Select the top desired_samples samples based on performance
best_indices = np.argsort(performance)[:desired_samples]
selected_samples = samples[best_indices]
if selected_samples.size == 0:
raise ValueError("Selection of informative instances resulted in an empty set.")
print(f"Final selected samples shape: {selected_samples.shape}")
return selected_samples[:desired_samples] # Ensure the number of selected samples matches the desired number
# Iterative Refinement
# Iterative Refinement
def iterative_refinement(samples, results, objective_function, maximize_indices, n_iterations=2, initial_threshold=0.10, adapt_rate=0.03):
for iteration in range(n_iterations):
print(f"Iteration {iteration}: Starting with samples shape: {samples.shape}")
# Evaluate current samples
current_results = evaluate_samples(samples, objective_function)
# Print performance metrics for current samples
print(f"Iteration {iteration}: Current results: {current_results}")
# Select informative instances with adaptive threshold
selected_samples = adaptive_select_informative_instances(samples, current_results, initial_threshold, adapt_rate)
# Ensure objective negation is correctly handled
for i in range(len(current_results)):
for idx in maximize_indices:
if not np.isnan(current_results[i][idx]):
current_results[i][idx] = -current_results[i][idx]
# Ensure at least a minimum number of samples are selected to maintain diversity
if len(selected_samples) < 3:
selected_samples = samples[np.argsort(np.nanmean(current_results, axis=1))[:3]]
# Re-cluster the selected samples
n_clusters = max(1, min(2, int(len(selected_samples) * 0.2))) # Ensure at least 1 cluster, maximum 2 clusters
labels, centers = advanced_clustering_samples(selected_samples, n_clusters)
# Generate new samples around cluster centers
new_samples = []
for center in centers:
for _ in range(max(1, (len(samples) - len(selected_samples)))): # Control the number of new samples
perturbations = np.random.uniform(-0.03, 0.03, center.shape) # Use smaller perturbations for finer adjustments
new_samples.append(center + perturbations)
# Combine selected samples with new samples, ensuring we don't grow the sample size too much
combined_samples = np.vstack((selected_samples, new_samples))
samples = combined_samples[:len(selected_samples) + (len(samples) - len(selected_samples))] # Ensure the sample size matches the original size
# Debugging output
print(f"Iteration {iteration}: Samples shape after selection and new sample generation: {samples.shape}")
return samples
def generate_new_samples(existing_samples, pop_size, n_adaptive_samples):
n_new_samples = pop_size - n_adaptive_samples
new_samples = initial_sampling(PARAM_BOUNDS, n_new_samples)
combined_samples = np.vstack((existing_samples, new_samples))
return combined_samples
\ No newline at end of file
{
"MODEL_NAME": "SimpleHeatingSystem",
"MODEL_FILE": "SimpleHeatingSystem.mo",
"SIMULATION_STOP_TIME": 3000,
"MODEL_NAME": "ITSystem",
"MODEL_FILE": "ITSystem.mo",
"SIMULATION_STOP_TIME": 100,
"PRECISION": 2,
"PARAMETERS": ["Q_max", "T_set"],
"PARAMETERS": ["activeCores", "cpuFrequency"],
"OBJECTIVES": [
{"name": "energy", "maximize": false},
{"name": "comfort", "maximize": true}
{"name": "remainingEnergy", "maximize": true},
{"name": "performance", "maximize": true}
],
"PARAM_BOUNDS": {
"Q_max": {
"bounds": [1000, 5000],
"activeCores": {
"bounds": [1, 4],
"type": "int"
},
"T_set": {
"bounds": [280, 310],
"type": "int"
"cpuFrequency": {
"bounds": [1.0, 3.0],
"type": "float"
}
},
"OPTIMIZATION_CONFIG": {
"USE_ADAPTIVE_INSTANCE_SELECTION": true,
"ADAPTIVE_INSTANCE_SELECTION_FREQUENCY": 5,
"ALGORITHM_NAME": "NSGA2",
"POP_SIZE": 10,
"MIN_POP_SIZE": 1,
"N_GEN": 10
"USE_SINGLE_OBJECTIVE": false,
"ALGORITHM_NAME": "nsga2",
"POP_SIZE": 5,
"N_GEN": 1
},
"PLOT_CONFIG": {
"PLOT_X": "Energy Consumption",
"PLOT_Y": "Comfort",
"PLOT_TITLE": "Pareto Front of Energy Consumption vs Comfort"
"PLOT_TITLE": "Pareto Front of Energy Consumption vs Comfort",
"ENABLE_PLOT": false
},
"N_JOBS": -1
}
\ No newline at end of file
......@@ -24,6 +24,9 @@ def initialize_algorithm(algorithm_name, pop_size=None):
if isinstance(attribute, type): # Check if it's a class
algorithm_modules[attribute_name.lower()] = attribute
# Print the available algorithms for debugging
# print("Available algorithms:", list(algorithm_modules.keys()))
# Check if the algorithm name is in the imported modules
if algorithm_name.lower() not in algorithm_modules:
raise ValueError(f"Algorithm {algorithm_name} is not supported.")
......
......@@ -14,8 +14,7 @@ import numpy as np
from time import sleep
from joblib import Parallel, delayed
from OMPython import OMCSessionZMQ
from config import MODEL_FILE, MODEL_NAME, SIMULATION_STOP_TIME, PARAMETERS, OBJECTIVE_NAMES, PARAM_BOUNDS, PARAM_TYPES, MODEL_PATH, PRECISION, OPTIMIZATION_CONFIG, N_JOBS
from adaptive_instance_selection import initial_sampling, evaluate_samples, advanced_clustering_samples, adaptive_select_informative_instances, iterative_refinement
from config import MODEL_FILE, MODEL_NAME, SIMULATION_STOP_TIME, PARAMETERS, OBJECTIVE_NAMES, PARAM_BOUNDS, PARAM_TYPES, MODEL_PATH, PRECISION, N_JOBS
temp_dirs = [] # List to store paths of temporary directories
......@@ -98,7 +97,7 @@ def optimization_function(param_values, retries=3, delay=2):
shutdown_omc(omc)
# If all attempts fail, return NaNs
return [np.nan] * len(OBJECTIVES)
return [np.nan] * len(OBJECTIVE_NAMES)
def shutdown_omc(omc):
try:
......@@ -125,28 +124,7 @@ def cleanup_temp_dirs():
print(f"Error: {e}")
break # Exit the loop for non-permission errors
def execute_parallel_tasks(tasks, use_adaptive_instance_selection, maximize_indices):
results = []
if use_adaptive_instance_selection:
# Initial sampling
initial_samples = initial_sampling(PARAM_BOUNDS, OPTIMIZATION_CONFIG['POP_SIZE'])
# Parallel evaluation of initial samples
initial_results = Parallel(n_jobs=N_JOBS)(delayed(optimization_function)(sample) for sample in initial_samples)
# Iterative refinement
refined_samples = iterative_refinement(initial_samples, initial_results, optimization_function, maximize_indices)
# Parallel evaluation of refined samples
refined_results = Parallel(n_jobs=N_JOBS)(delayed(optimization_function)(task) for task in refined_samples)
# Combine initial and refined results, ensuring the number matches the initial parameter sets
results = initial_results + refined_results
# Ensure only the first `len(tasks)` results are considered
results = results[:len(tasks)]
else:
def execute_parallel_tasks(tasks):
results = Parallel(n_jobs=N_JOBS)(delayed(optimization_function)(task) for task in tasks)
# Ensure results length matches tasks length by handling exceptions
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment