simple-pv-simulator/EnergySystem.py
2024-05-07 20:21:20 +02:00

139 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from config import pv_config, ess_config, grid_config
import pandas as pd
class EnergySystem:
def __init__(self, pv_type: pv_config, ess_type: ess_config, grid_type: grid_config):
self.pv = pv_type
self.ess = ess_type
self.grid = grid_type
self.day_generated = []
self.generated = 0
self.stored = 0
self.hour_stored = []
self.hour_stored_2 = []
self.afford = True
self.cost = self.ess.get_cost() + self.pv.get_cost()
self.overload_cnt = 0
self.spring_week_gen = []
self.summer_week_gen = []
self.autumn_week_gen = []
self.winter_week_gen = []
self.spring_week_soc = []
self.summer_week_soc = []
self.autumn_week_soc = []
self.winter_week_soc = []
self.granularity = 4
self.season_step = self.granularity * 24 * 7 * 12
self.season_start= self.granularity * 24 * 7 * 2
self.week_length = self.granularity * 24 * 7
self.unmet = []
def get_cost(self):
return self.ess.get_cost()+self.pv.get_cost()
# 优先使用PV供电给工厂 - 如果PV输出能满足工厂的需求则直接供电多余的电能用来给ESS充电。
# PV不足时使用ESS补充 - 如果PV输出不足以满足工厂需求首先从ESS获取所需电量。
# 如果ESS也不足以满足需求再从电网获取 - 当ESS中的存储电量也不足以补充时再从电网购买剩余所需电量。
def simulate(self, data, time_interval):
total_benefit = 0
for index, row in data.iterrows():
time = row['time']
sunlight_intensity = row['sunlight']
factory_demand = row['demand']
electricity_price = row['price']
# electricity_price = self.grid.get_price_for_time(time)
if time == '00:00':
self.day_generated.append(self.generated)
self.generated = 0
if time.endswith('14:00'):
soc = self.ess.storage / self.ess.capacity
self.hour_stored.append(soc)
if time.endswith('08:00'):
soc = self.ess.storage / self.ess.capacity
self.hour_stored_2.append(soc)
generated_pv_power = self.pv.capacity * sunlight_intensity # 生成的功率,单位 kW
generated_pv_energy = generated_pv_power * time_interval * self.pv.loss # 生成的能量,单位 kWh
self.generated += generated_pv_energy
# pv生成的能量如果比工厂的需求要大
if generated_pv_energy >= factory_demand * time_interval:
# 剩余的能量(kwh) = pv生成的能量 - 工厂需求的功率 * 时间间隔
surplus_energy = generated_pv_energy - factory_demand * time_interval
# 要充到ess中的能量 = min(剩余的能量,ess的充电功率*时间间隔(ess在时间间隔内能充进的电量),ess的容量-ess储存的能量(ess中能冲进去的电量))
charge_to_ess = min(surplus_energy, self.ess.charge_power * time_interval, self.ess.capacity - self.ess.storage)
self.ess.storage += charge_to_ess
surplus_after_ess = surplus_energy - charge_to_ess
# 如果还有电量盈余,且pv功率大于ess的充电功率+工厂的需求功率则准备卖电
if surplus_after_ess > 0 and generated_pv_power > self.ess.charge_power + factory_demand:
sold_to_grid = surplus_after_ess
sell_income = sold_to_grid * self.grid.sell_price
total_benefit += sell_income
# 节省的能量 = 工厂需求的能量 * 时间段
# total_energy = factory_demand * time_interval
saved_energy = factory_demand * time_interval
# pv比工厂的需求小
else:
# 从ess中需要的电量 = 工厂需要的电量 - pv中的电量
needed_from_ess = factory_demand * time_interval - generated_pv_energy
# 如果ess中存的电量比需要的多
if self.ess.storage * self.ess.loss >= needed_from_ess:
# 取出电量
if self.ess.discharge_power * time_interval * self.ess.loss < needed_from_ess:
discharging_power = self.ess.discharge_power * time_interval
else:
discharging_power = needed_from_ess / self.ess.loss
self.ess.storage -= discharging_power
# 节省下来的能量 = pv的能量 + 放出来的能量
saved_energy = generated_pv_energy + discharging_power * self.ess.loss
else:
# 如果存的电量不够
# 需要把ess中的所有电量释放出来
if self.grid.capacity * time_interval + generated_pv_energy + self.ess.storage * self.ess.loss < factory_demand * time_interval:
self.afford = False
self.overload_cnt+=1
log = f"index: {index}, time: {time}, SoC:{self.ess.storage / self.ess.capacity}%, storage: {self.ess.storage}, pv_gen:{generated_pv_power}, power_demand: {factory_demand}, overload_cnt:{self.overload_cnt}, day:{int(index/96) + 1}"
self.unmet.append((index,time,factory_demand,generated_pv_power))
# with open(f'plots/summary/ess-{self.ess.capacity}-pv-{self.pv.capacity}', 'a') as f:
# f.write(log)
# print(log)
# self.unmet.append(log)
saved_energy = generated_pv_energy + self.ess.storage * self.ess.loss
self.ess.storage = 0
needed_from_grid = factory_demand * time_interval - saved_energy
net_grid = min(self.grid.capacity * time_interval, needed_from_grid) * self.grid.loss
# grid_energy += net_grid
# total_energy += net_grid
# print(total_energy)
# 工厂需求量-总能量
# unmet_demand = max(0, factory_demand * time_interval - total_energy)
# benefit = (total_energy - unmet_demand) * electricity_price
benefit = (saved_energy) * electricity_price
cost = net_grid * electricity_price
# print(f"time:{time} benefit: {benefit}, cost: {cost}")
total_benefit += benefit - cost
# # spring
week_start = self.season_start
week_end = self.week_length + week_start
if index in range(week_start, week_end):
self.spring_week_gen.append(generated_pv_power)
self.spring_week_soc.append(self.ess.storage / self.ess.capacity)
# summer
# week_start += self.season_step
# week_end += self.season_step
# if index in range(week_start, week_end):
# self.summer_week_gen.append(generated_pv_power)
# self.summer_week_soc.append(self.ess.storage / self.ess.capacity)
# # autumn
# week_start += self.season_step
# week_end += self.season_step
# if index in range(week_start, week_end):
# self.autumn_week_gen.append(generated_pv_power)
# self.autumn_week_soc.append(self.ess.storage / self.ess.capacity)
# week_start += self.season_step
# week_end += self.season_step
# if index in range(week_start, week_end):
# self.winter_week_gen.append(generated_pv_power)
# self.winter_week_soc.append(self.ess.storage / self.ess.capacity)
return total_benefit