Skip to content
Snippets Groups Projects
Commit 0c64a379 authored by Zizhe Wang's avatar Zizhe Wang
Browse files

feat, fix, refactor: abstract and generalize the orchestration process

parent bfe4aeab
No related branches found
No related tags found
No related merge requests found
# Copyright (c) 2024 - Zizhe Wang
# https://zizhe.wang
import json
import os
import numpy as np
import re
import shutil
import tempfile
from OMPython import OMCSessionZMQ
from edge_cloud_wrapper import MOO4ModelicaWrapper
# Configuration parameters
model_path = 'ITSystem.mo'
model_name = 'ITSystem'
simulation_time = 100
data_file = 'energy_available_and_user_demand_data.txt'
start_hour = 8
end_hour = 13
def init_omc_session(temp_dir):
omc = OMCSessionZMQ()
omc.sendExpression(f'cd("{temp_dir}")')
omc.sendExpression(f'loadFile("{model_path}")')
return omc
def build_model(omc):
build_result = omc.sendExpression(f'buildModel({model_name})')
if 'error' in build_result:
raise Exception(f"Model build failed: {build_result}")
def simulate_and_evaluate(parameters, simulation_time, available_energy, user_demand):
# Create a temporary directory to avoid GUID mismatch issues
temp_dir = tempfile.mkdtemp()
try:
omc = init_omc_session(temp_dir)
build_model(omc) # Ensure the model is built
if parameters is None:
raise ValueError("Parameters must not be None")
for param, value in parameters.items():
response = omc.sendExpression(f'setParameterValue({model_name}, {param}, {value})')
print(f"Set parameter {param} to {value}: {response}")
# Set the additional parameters availableEnergy and userDemand
omc.sendExpression(f'setParameterValue({model_name}, availableEnergy, {available_energy})')
omc.sendExpression(f'setParameterValue({model_name}, userDemand, {user_demand})')
result = omc.sendExpression(f'simulate({model_name}, stopTime={simulation_time})')
termination_message = result.get('messages', "")
print("Termination message:", termination_message)
match = re.search(r'Simulation call terminate\(\) at time ([\d\.]+)', termination_message)
if match:
depletion_time = float(match.group(1))
print(f"The IT system can run for {depletion_time} seconds until the remaining energy is depleted.")
else:
depletion_time = simulation_time
print(f"The IT system ran until the end of the simulation time ({simulation_time} seconds).")
performance = omc.sendExpression(f"val(performance, {depletion_time})")
print(f"Performance: {performance}")
remaining_energy = omc.sendExpression(f"val(remainingEnergy, {depletion_time})")
print(f"Remaining energy at time {depletion_time}: {remaining_energy}")
energy_consumption = omc.sendExpression(f"val(energyConsumption, {depletion_time})")
print(f"Energy Consumption: {energy_consumption}")
user_demand_satisfied = False
if performance and performance >= user_demand:
print("User demand satisfied.")
user_demand_satisfied = True
else:
print("User demand not satisfied.")
return remaining_energy, user_demand_satisfied, parameters, depletion_time
finally:
shutil.rmtree(temp_dir) # Clean up the temporary directory
def read_data_from_file(filename):
data = []
with open(filename, 'r') as file:
for line in file:
time_str, energy, demand = line.split()
hour = int(time_str.split(':')[0])
data.append((hour, int(energy), int(demand)))
return data
def adaptive_control_loop(data, moo_wrapper):
report = []
for hour, energy_available, user_demand in data:
if start_hour <= hour < end_hour:
print(f"\nProcessing hour: {hour}")
# Update the config for the optimization framework
moo_wrapper.update_config(energy_available, user_demand, simulation_time)
# Run the optimization using the MOO4Modelica wrapper
moo_wrapper.run_optimization()
# Get the list of best parameters from the optimization results
parameter_list = moo_wrapper.get_parameters()
# Try each parameter set in order until user demand is satisfied or options are exhausted
user_demand_satisfied = False
for best_parameters in parameter_list:
remaining_energy, user_demand_satisfied, parameters, depletion_time = simulate_and_evaluate(best_parameters, simulation_time, energy_available, user_demand)
if user_demand_satisfied:
break
# Generate the report for each hour
if user_demand_satisfied:
report.append(f"Hour {hour}: User demand satisfied with configuration {parameters}. Simulation ran for {depletion_time} seconds out of {simulation_time} seconds.")
else:
report.append(f"Hour {hour}: No sufficient configuration found. Simulation ran for {depletion_time} seconds out of {simulation_time} seconds.")
# Print the final report
print("\nFinal Report:")
for line in report:
print(line)
if __name__ == "__main__":
data = read_data_from_file(data_file)
moo_wrapper = MOO4ModelicaWrapper(config_path='config.json')
adaptive_control_loop(data, moo_wrapper)
\ No newline at end of file
{
"DATA_FILE_PATH": "energy_available_and_user_demand_data.txt",
"CONFIG_PATH": "config.json",
"MODEL_FILE": "ITSystem.mo",
"MODEL_NAME": "ITSystem",
"SIMULATION_TIME": 100,
"START_TIME": 8,
"END_TIME": 13,
"TIME_UNIT": "hour",
"OPTIMIZATION_PARAMETERS": {
"activeCores": {
"bounds": [1, 4],
"type": "int"
},
"cpuFrequency": {
"bounds": [1.0, 3.0],
"type": "float"
}
},
"EVALUATION_PARAMETERS": {
"performance": "performance",
"remaining_energy": "remainingEnergy",
"energy_consumption": "energyConsumption"
},
"INPUT_PARAMETERS": {
"available_energy": "availableEnergy",
"user_demand": "userDemand"
},
"CRITERIA": {
"GOAL_EXPRESSION": "evaluation_results['performance'] >= simulation_inputs['user_demand']"
}
}
\ No newline at end of file
# Copyright (c) 2024 - Zizhe Wang
# https://zizhe.wang
import json
import os
import re
import shutil
import tempfile
from OMPython import OMCSessionZMQ
def init_omc_session(temp_dir, model_file):
omc = OMCSessionZMQ()
omc.sendExpression(f'cd("{temp_dir}")')
load_result = omc.sendExpression(f'loadFile("{model_file}")')
return omc
def build_model(omc, model_name):
build_result = omc.sendExpression(f'buildModel({model_name})')
return build_result
def simulate_and_evaluate(parameters, simulation_time, simulation_inputs, global_config):
# General setup and configuration
model_file = global_config['MODEL_FILE']
model_name = global_config['MODEL_NAME']
param_types = {param: details['type'] for param, details in global_config['OPTIMIZATION_PARAMETERS'].items()}
# Create a temporary directory to avoid GUID mismatch issues
temp_dir = tempfile.mkdtemp()
evaluation_parameters = global_config['EVALUATION_PARAMETERS']
input_parameters = global_config['INPUT_PARAMETERS']
goal_expression = global_config['CRITERIA']['GOAL_EXPRESSION']
try:
omc = init_omc_session(temp_dir, model_file)
build_result = build_model(omc, model_name)
print(f"Model build result: {build_result}")
# Set model parameters
for param, value in parameters.items():
param_type = param_types[param]
if param_type == "int":
value = int(value)
response = omc.sendExpression(f'setParameterValue({model_name}, {param}, {value})')
print(f"Set parameter {param} to {value}: {response}")
# Set input parameters
for input_param, value in simulation_inputs.items():
omc.sendExpression(f'setParameterValue({model_name}, {input_parameters[input_param]}, {value})')
# Run simulation
result = omc.sendExpression(f'simulate({model_name}, stopTime={simulation_time})')
termination_message = result.get('messages', "")
# Determine depletion time if simulation terminates early
match = re.search(r'Simulation call terminate\(\) at time ([\d\.]+)', termination_message)
depletion_time = float(match.group(1)) if match else simulation_time
# Collect evaluation results
evaluation_results = {}
for criterion, expression in evaluation_parameters.items():
result_value = omc.sendExpression(f"val({expression}, {depletion_time})")
evaluation_results[criterion] = result_value
print(f"{criterion}: {result_value}")
# Evaluate the goal expression dynamically
goal_satisfied = eval(goal_expression)
print(f"Goal satisfied: {goal_satisfied}")
return evaluation_results, goal_satisfied, parameters, depletion_time
finally:
shutil.rmtree(temp_dir) # Clean up the temporary directory
def read_data_from_file(filename):
data = []
with open(filename, 'r') as file:
for line in file:
time_str, *values = line.split()
time_value = int(time_str.split(':')[0]) # Extract time value from the string
data.append((time_value, *map(int, values))) # Combine time value with other data
return data
def adaptive_control_loop(data, moo_wrapper, global_config):
start_time = global_config['START_TIME']
end_time = global_config['END_TIME']
simulation_time = global_config['SIMULATION_TIME']
input_keys = list(global_config['INPUT_PARAMETERS'].keys()) # List of input parameter keys
report = []
for entry in data:
time_value, *input_values = entry
if start_time <= time_value < end_time:
print(f"Processing {global_config['TIME_UNIT']} {time_value}")
# Create a dictionary of inputs
simulation_inputs = dict(zip(input_keys, input_values))
# Update the config for the optimization framework
moo_wrapper.update_config(simulation_inputs, simulation_time)
# Run the optimization using the wrapper
moo_wrapper.run_optimization()
# Get the list of best parameters from the optimization results
parameter_list = moo_wrapper.get_parameters()
# Try each parameter set in order until goal is satisfied or options are exhausted
goal_satisfied = False
for best_parameters in parameter_list:
try:
evaluation_results, goal_satisfied, parameters, depletion_time = simulate_and_evaluate(
best_parameters, simulation_time, simulation_inputs, global_config)
except Exception as e:
print(f"Simulation error: {e}")
continue
if goal_satisfied:
break
# Generate the report for each hour
if goal_satisfied:
report.append(f"{global_config['TIME_UNIT'].capitalize()} {time_value}: Goal satisfid with configuration {parameters}. Simulation ran for {depletion_time} seconds out of {simulation_time} seconds.")
else:
report.append(f"{global_config['TIME_UNIT'].capitalize()} {time_value}: No sufficient configuration found. Simulation ran for {depletion_time} seconds out of {simulation_time} seconds.")
# Print the final report
print("\nFinal Report:")
for line in report:
print(line)
if __name__ == "__main__":
# Load the orchestration configuration from orchestration_config.json
with open(orchestration_config_path, 'r') as f:
orchestration_config = json.load(f)
# Read data from the specified data file
data = read_data_from_file(orchestration_config['DATA_FILE_PATH'])
# Initialize the MOO4ModelicaWrapper with the path to the MOO4Modelica config and the orchestration configuration
moo_wrapper = MOO4ModelicaWrapper(config_path=orchestration_config['CONFIG_PATH'], global_config=orchestration_config)
# Run the adaptive control loop with the loaded data and configuration
adaptive_control_loop(data, moo_wrapper, orchestration_config)
\ No newline at end of file
......@@ -6,32 +6,24 @@ import os
from optimize_main import run_optimization
class MOO4ModelicaWrapper:
def __init__(self, config_path):
def __init__(self, config_path, global_config):
self.config_path = config_path
self.global_config = global_config
self.load_config()
self.model_path = self.config['MODEL_FILE']
self.model_name = self.config['MODEL_NAME']
self.model_file = global_config['MODEL_FILE']
self.model_name = global_config['MODEL_NAME']
def load_config(self):
with open(self.config_path, 'r') as f:
self.config = json.load(f)
def update_config(self, available_energy, user_demand, simulation_time):
def update_config(self, simulation_inputs, simulation_time):
# Modify the configuration with new parameters
self.config['PARAMETERS'] = ["activeCores", "cpuFrequency"]
self.config['PARAM_BOUNDS'] = {
"activeCores": {
"bounds": [1, 4],
"type": "int"
},
"cpuFrequency": {
"bounds": [1.0, 3.0],
"type": "float"
}
}
self.config['PARAMETERS'] = list(self.global_config['OPTIMIZATION_PARAMETERS'].keys())
self.config['PARAM_BOUNDS'] = self.global_config['OPTIMIZATION_PARAMETERS']
self.config['SIMULATION_STOP_TIME'] = simulation_time
self.config['available_energy'] = available_energy
self.config['user_demand'] = user_demand
for input_param, value in simulation_inputs.items():
self.config[self.global_config['INPUT_PARAMETERS'][input_param]] = value
# Save the updated configuration to a file
with open(self.config_path, 'w') as f:
......@@ -50,9 +42,6 @@ class MOO4ModelicaWrapper:
parameters = optimization_results["parameters"]
return [
{
"activeCores": int(params[0]),
"cpuFrequency": float(params[1])
}
{param: value for param, value in zip(self.config['PARAMETERS'], params)}
for params in parameters
]
\ No newline at end of file
......@@ -4,17 +4,29 @@
import json
import os
from optimize_main import run_optimization
from edge_cloud_wrapper import MOO4ModelicaWrapper
from edge_cloud_configurator import adaptive_control_loop, read_data_from_file
from orchestration_wrapper import MOO4ModelicaWrapper
from orchestration_configurator import adaptive_control_loop, read_data_from_file
base_path = os.path.dirname(__file__)
data_file_path = os.path.join(base_path, 'energy_available_and_user_demand_data.txt')
config_path = os.path.join(base_path, 'config.json')
orchestration_config_path = os.path.join(base_path, 'orchestration_config.json')
def main():
# Load the orchestration configuration
with open(orchestration_config_path, 'r') as f:
global_config = json.load(f)
# Get paths from the orchestration configuration
data_file_path = os.path.join(base_path, global_config['DATA_FILE_PATH'])
config_path = os.path.join(base_path, global_config['CONFIG_PATH'])
# Read data from the specified data file
data = read_data_from_file(data_file_path)
moo_wrapper = MOO4ModelicaWrapper(config_path=config_path)
adaptive_control_loop(data, moo_wrapper)
# Initialize the MOO4ModelicaWrapper with the path to the MOO4Modelica config and the orchestration configuration
moo_wrapper = MOO4ModelicaWrapper(config_path=config_path, global_config=global_config)
# Run the adaptive control loop with the loaded data and configuration
adaptive_control_loop(data, moo_wrapper, global_config)
if __name__ == "__main__":
main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment