Skip to content
Snippets Groups Projects
Commit 65a8de7b authored by Zizhe Wang's avatar Zizhe Wang
Browse files

clean old scripts

parent ef2d1923
No related branches found
No related tags found
No related merge requests found
This is the one-script solution.
This one-script Python file has been developed first, because of better separate of concern and more readable code structure this script has been further developed into the following four scripts:
* config.py
* optimization_libraries.py
* optimize_main.py
* parallel_computing.py
\ No newline at end of file
# (c) Zizhe Wang
# https://zizhe.wang
model SimpleHeatingSystem
parameter Real Q_max = 5000 "Maximum heating power in watts";
parameter Real T_set = 293.15 "Setpoint temperature in Kelvin";
parameter Real efficiency = 0.9 "Heater efficiency";
parameter Real hysteresis = 0.5 "Hysteresis for temperature control";
parameter Real T_comfort = 293.15 "Comfortable room temperature in Kelvin";
Real Q_heat "Heating power";
Real T_room(start = 278.15) "Room temperature";
Real cost(start=0) "Cost of heating";
Real energy(start=0) "Energy consumed";
Real comfort(start=0) "Thermal comfort";
Boolean heater_on(start=false) "Heater status";
constant Real c_p = 1005 "Specific heat capacity of air (J/(kg·K))";
constant Real m_air = 50 "Mass of air in the room (kg)";
Real accumulated_discomfort(start=0) "Accumulated thermal discomfort";
equation
// Room temperature dynamics
der(T_room) = (Q_heat * efficiency - (T_room - 273.15)) / (m_air * c_p);
// Heater control logic with hysteresis
heater_on = if T_room < T_set - hysteresis then true else if T_room > T_set + hysteresis then false else pre(heater_on);
Q_heat = if heater_on then Q_max else 0;
// Calculations for energy and cost
der(energy) = Q_heat * efficiency;
der(cost) = 0.1 * Q_heat * efficiency;
// Calculations for accumulated discomfort (thermal discomfort over time)
der(accumulated_discomfort) = abs(T_room - T_comfort);
// Comfort calculation related to energy consumption
der(comfort) = if heater_on then efficiency * Q_heat / (accumulated_discomfort + 1) else 0;
end SimpleHeatingSystem;
# (c) Zizhe Wang
# https://zizhe.wang
######################################
# #
# OPTIMIZATION USING ONLY ONE SCRIPT #
# #
######################################
import os
import tempfile
import shutil
import numpy as np
from time import sleep
from OMPython import OMCSessionZMQ
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.algorithms.moo.nsga3 import NSGA3
from pymoo.algorithms.soo.nonconvex.cmaes import CMAES
from pymoo.optimize import minimize
from pymoo.core.problem import Problem
from scipy.optimize import differential_evolution, minimize as scipy_minimize
from joblib import Parallel, delayed # parallel processing
import matplotlib.pyplot as plt
"""
============================== GLOBAL SETTINGS ==============================
"""
# ====================================================
# !!! Be sure to only edit your setups in global settings!!!
# !!! Don't modify the code outside of it !!!
# ====================================================
# Simulation time and model path
SIMULATION_STOP_TIME = 2000 # in seconds
MODEL_NAME = "SimpleHeatingSystem"
MODEL_FILE = f"{MODEL_NAME}.mo"
current_directory = os.getcwd()
model_path = os.path.join(current_directory, MODEL_FILE)
# Parameters and result variables
PARAMETERS = ["Q_max", "T_set"]
RESULTS = ["energy", "cost", "comfort"]
# Parameter bounds
PARAM_BOUNDS = {
"Q_max": (1000, 5000),
"T_set": (280, 310),
}
# Optimization settings
POP_SIZE = 5 # Population size for NSGA2
N_GEN = 5 # Number of generations
# Algorithm selection
# Options: 'pymoo.NSGA2', 'pymoo.NSGA3', 'pymoo.CMAES', 'scipy.de', 'scipy.minimize'
OPTIMIZATION_LIBRARY = 'pymoo'
ALGORITHM_NAME = 'NSGA2'
def initialize_algorithm(): # Factory Function for Algorithm Initialization
if OPTIMIZATION_LIBRARY == 'pymoo':
if ALGORITHM_NAME == 'NSGA2':
return NSGA2(pop_size=POP_SIZE)
elif ALGORITHM_NAME == 'NSGA3':
return NSGA3(pop_size=POP_SIZE)
elif ALGORITHM_NAME == 'CMAES':
return CMAES(pop_size=POP_SIZE)
else:
raise ValueError(f"Unsupported algorithm: {ALGORITHM_NAME}")
elif OPTIMIZATION_LIBRARY == 'scipy':
if ALGORITHM_NAME == 'de':
return differential_evolution
elif ALGORITHM_NAME == 'minimize':
return scipy_minimize
else:
raise ValueError(f"Unsupported algorithm: {ALGORITHM_NAME}")
else:
raise ValueError(f"Unsupported optimization library: {OPTIMIZATION_LIBRARY}")
# Parallel processing
N_JOBS = -1 # Options: '-1', '1', 'n', 'None'
# ====================================================================
# -1 = use all available CPU cores
# 1 = disables parallel processing and runs the tasks sequentially
# n = specifies the exact number of CPU cores to use
# None = will run the tasks sequentially, equivalent to '1'
# ====================================================================
"""
=========================== END OF GLOBAL SETTINGS ===========================
"""
"""
============================== Pre-compilation Block==============================
"""
# """
# Pre-compile the Modelica model to ensure it can be loaded and built correctly.
# This helps to catch any errors early and verify the model setup.
# Uncomment to use this block if you need
# """
# # Pre-compile the model
# def precompile_model():
# try:
# omc = OMCSessionZMQ()
# load_model_result = omc.sendExpression(f"loadFile(\"{model_path}\")")
# print(f"Load model result: {load_model_result}")
# build_model_result = omc.sendExpression(f"buildModel({MODEL_NAME}.mo)")
# print(f"Build model result: {build_model_result}")
# if not build_model_result:
# raise RuntimeError(f"Failed to build model: {MODEL_NAME}.mo")
# return True
# except Exception as e:
# print(f"Error during model pre-compilation: {e}")
# return False
# # Pre-compile the model before running optimization
# if not precompile_model():
# raise RuntimeError("Model pre-compilation failed. Aborting optimization.")
"""
=========================== End of Pre-compilation Block ===========================
"""
temp_dirs = [] # List to store paths of temporary directories
# Define the optimization function
def optimization_function(param_values):
"""
Function to optimize the Modelica model given specific parameter values.
This function runs the simulation and retrieves the results.
"""
temp_dir = tempfile.mkdtemp() # Create a unique temporary directory for each worker
temp_dirs.append(temp_dir) # Store the path for later cleanup
try:
omc = OMCSessionZMQ() # Create a new OpenModelica session
temp_dir = temp_dir.replace('\\', '/')
omc.sendExpression(f'cd("{temp_dir}")')
# Copy model file to temporary directory
temp_model_path = os.path.join(temp_dir, MODEL_FILE).replace('\\', '/')
shutil.copy(model_path, temp_model_path)
load_temp_model_result = omc.sendExpression(f"loadFile(\"{temp_model_path}\")")
print(f"Load model result in worker: {load_temp_model_result}")
if load_temp_model_result is not True:
messages = omc.sendExpression("getErrorString()")
print(f"OpenModelica error messages: {messages}")
raise RuntimeError(f"Failed to load model: {MODEL_NAME}")
# Build the model in the worker session
build_model_result = omc.sendExpression(f"buildModel({MODEL_NAME})")
print(f"Build model result in worker: {build_model_result}")
if not (isinstance(build_model_result, tuple) and build_model_result[0]):
messages = omc.sendExpression("getErrorString()")
print(f"OpenModelica error messages: {messages}")
raise RuntimeError(f"Failed to build model: {MODEL_NAME}")
# Check if model files are created
expected_files = [
os.path.join(temp_dir, f"{MODEL_NAME}.exe").replace('\\', '/'),
os.path.join(temp_dir, f"{MODEL_NAME}_init.xml").replace('\\', '/')
]
for file in expected_files:
if not os.path.isfile(file):
raise RuntimeError(f"Expected file not found: {file}")
# Set model parameters
rounded_param_values = {param: round(value, 2) for param, value in param_values.items()} # two decimal places
for param, value in rounded_param_values.items():
set_param_result = omc.sendExpression(f"setParameterValue({MODEL_NAME}, {param}, {value})")
if not set_param_result:
raise RuntimeError(f"Failed to set model parameter: {param} = {value}")
print(f"Parameters set: {rounded_param_values}")
# Simulate the model
simulate_result = omc.sendExpression(f"simulate({MODEL_NAME}, stopTime={SIMULATION_STOP_TIME})")
print(f"Simulate result: {simulate_result}")
# Retrieve simulation results
result_values = {}
for result in RESULTS:
value_command = f"val({result}, {SIMULATION_STOP_TIME})"
print(f"Executing command: {value_command}")
value = omc.sendExpression(value_command)
print(f"Value for {result} at {SIMULATION_STOP_TIME}: {value}")
# Round the result to 2 decimal places
result_values[result] = round(value, 2)
if value is None:
raise ValueError(f"Simulation result returned None for {result}")
if not isinstance(value, (float, int)):
raise ValueError(f"Simulation result for {result} is not in expected format (float or int)")
print(f"Simulation results: {result_values}")
return list(result_values.values())
except Exception as e:
print(f"Error during simulation: {e}")
return [np.nan] * len(RESULTS)
# Define the optimization problem class for 'pymoo'
class OptimizationProblem(Problem):
def __init__(self):
self.param_names = list(PARAM_BOUNDS.keys())
n_var = len(self.param_names)
xl = np.array([PARAM_BOUNDS[param][0] for param in self.param_names])
xu = np.array([PARAM_BOUNDS[param][1] for param in self.param_names])
# Debugging statements to check dimensions and bounds
print(f"Number of variables: {n_var}")
print(f"Lower bounds: {xl}")
print(f"Upper bounds: {xu}")
super().__init__(n_var=n_var, n_obj=len(RESULTS), n_constr=0, xl=xl, xu=xu)
def _evaluate(self, X, out, *args, **kwargs):
param_values_list = [dict(zip(self.param_names, x)) for x in X]
results = Parallel(n_jobs=N_JOBS)(delayed(optimization_function)(param_values) for param_values in param_values_list)
out["F"] = np.array(results)
# Define the objective function for 'SciPy' directly
def scipy_objective_function(x):
param_values = dict(zip(PARAMETERS, x))
result_values = optimization_function(param_values)
return [result_values[RESULTS.index(res)] for res in RESULTS]
# Initialize the optimization algorithm
algorithm = initialize_algorithm()
# Define the global optimization problem and algorithm
problem = OptimizationProblem()
if OPTIMIZATION_LIBRARY == 'pymoo':
res = minimize(problem, algorithm, ("n_gen", N_GEN), verbose=True)
elif OPTIMIZATION_LIBRARY == 'scipy':
bounds = [(PARAM_BOUNDS[param][0], PARAM_BOUNDS[param][1]) for param in PARAMETERS]
if ALGORITHM_NAME == 'de':
result = algorithm(scipy_objective_function, bounds=bounds, strategy='best1bin', maxiter=N_GEN, popsize=POP_SIZE)
elif ALGORITHM_NAME == 'minimize':
x0 = np.mean(bounds, axis=1) # Initial guess: midpoint of bounds
result = algorithm(scipy_objective_function, x0=x0, bounds=bounds, method='L-BFGS-B', options={'maxiter': N_GEN})
# Extract results for plotting (assuming DE algorithm results for now)
res = {
'F': np.array([scipy_objective_function(ind) for ind in result.x])
}
# Cleanup temporary directories
for temp_dir in temp_dirs:
try:
shutil.rmtree(temp_dir)
except PermissionError:
print(f"PermissionError: Retrying to remove {temp_dir}")
sleep(2)
try:
shutil.rmtree(temp_dir)
except PermissionError:
print(f"Failed to remove {temp_dir} after multiple attempts")
# Print the results
print("Optimization Results:")
for i, result in enumerate(res.F):
print(f"Solution {i}: ", end="")
for name, value in zip(RESULTS, result):
print(f"{name.capitalize()} = {value:.2f}", end=", ")
print()
# Plot the results for Energy Consumption vs Comfort
plt.figure(figsize=(8, 6))
plt.scatter(res.F[:, 0], res.F[:, 2])
plt.xlabel('Energy Consumption')
plt.ylabel('Comfort')
plt.title('Pareto Front of Energy Consumption vs Comfort')
plt.grid(True)
plt.tight_layout()
plt.show()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment