Select Git revision
edge_cloud_configurator.py
edge_cloud_configurator.py 5.14 KiB
# Copyright (c) 2024 - Zizhe Wang
# https://zizhe.wang
import json
import os
import numpy as np
import re
import shutil
import tempfile
from OMPython import OMCSessionZMQ
from edge_cloud_wrapper import MOO4ModelicaWrapper
# Configuration parameters
model_path = 'ITSystem.mo'
model_name = 'ITSystem'
simulation_time = 100
data_file = 'energy_available_and_user_demand_data.txt'
start_hour = 8
end_hour = 13
def init_omc_session(temp_dir):
omc = OMCSessionZMQ()
omc.sendExpression(f'cd("{temp_dir}")')
omc.sendExpression(f'loadFile("{model_path}")')
return omc
def build_model(omc):
build_result = omc.sendExpression(f'buildModel({model_name})')
if 'error' in build_result:
raise Exception(f"Model build failed: {build_result}")
def simulate_and_evaluate(parameters, simulation_time, available_energy, user_demand):
# Create a temporary directory to avoid GUID mismatch issues
temp_dir = tempfile.mkdtemp()
try:
omc = init_omc_session(temp_dir)
build_model(omc) # Ensure the model is built
if parameters is None:
raise ValueError("Parameters must not be None")
for param, value in parameters.items():
response = omc.sendExpression(f'setParameterValue({model_name}, {param}, {value})')
print(f"Set parameter {param} to {value}: {response}")
# Set the additional parameters availableEnergy and userDemand
omc.sendExpression(f'setParameterValue({model_name}, availableEnergy, {available_energy})')
omc.sendExpression(f'setParameterValue({model_name}, userDemand, {user_demand})')
result = omc.sendExpression(f'simulate({model_name}, stopTime={simulation_time})')
termination_message = result.get('messages', "")
print("Termination message:", termination_message)
match = re.search(r'Simulation call terminate\(\) at time ([\d\.]+)', termination_message)
if match:
depletion_time = float(match.group(1))
print(f"The IT system can run for {depletion_time} seconds until the remaining energy is depleted.")
else:
depletion_time = simulation_time
print(f"The IT system ran until the end of the simulation time ({simulation_time} seconds).")
performance = omc.sendExpression(f"val(performance, {depletion_time})")
print(f"Performance: {performance}")
remaining_energy = omc.sendExpression(f"val(remainingEnergy, {depletion_time})")
print(f"Remaining energy at time {depletion_time}: {remaining_energy}")
energy_consumption = omc.sendExpression(f"val(energyConsumption, {depletion_time})")
print(f"Energy Consumption: {energy_consumption}")
user_demand_satisfied = False
if performance and performance >= user_demand:
print("User demand satisfied.")
user_demand_satisfied = True
else:
print("User demand not satisfied.")
return remaining_energy, user_demand_satisfied, parameters, depletion_time
finally:
shutil.rmtree(temp_dir) # Clean up the temporary directory
def read_data_from_file(filename):
data = []
with open(filename, 'r') as file:
for line in file:
time_str, energy, demand = line.split()
hour = int(time_str.split(':')[0])
data.append((hour, int(energy), int(demand)))
return data
def adaptive_control_loop(data, moo_wrapper):
report = []
for hour, energy_available, user_demand in data:
if start_hour <= hour < end_hour:
print(f"\nProcessing hour: {hour}")
# Update the config for the optimization framework
moo_wrapper.update_config(energy_available, user_demand, simulation_time)
# Run the optimization using the MOO4Modelica wrapper
moo_wrapper.run_optimization()
# Get the list of best parameters from the optimization results
parameter_list = moo_wrapper.get_parameters()
# Try each parameter set in order until user demand is satisfied or options are exhausted
user_demand_satisfied = False
for best_parameters in parameter_list:
remaining_energy, user_demand_satisfied, parameters, depletion_time = simulate_and_evaluate(best_parameters, simulation_time, energy_available, user_demand)
if user_demand_satisfied:
break
# Generate the report for each hour
if user_demand_satisfied:
report.append(f"Hour {hour}: User demand satisfied with configuration {parameters}. Simulation ran for {depletion_time} seconds out of {simulation_time} seconds.")
else:
report.append(f"Hour {hour}: No sufficient configuration found. Simulation ran for {depletion_time} seconds out of {simulation_time} seconds.")
# Print the final report
print("\nFinal Report:")
for line in report:
print(line)
if __name__ == "__main__":
data = read_data_from_file(data_file)
moo_wrapper = MOO4ModelicaWrapper(config_path='config.json')
adaptive_control_loop(data, moo_wrapper)