Source code for network_gym_client.envs.qos_steer.adapter

#Copyright(C) 2023 Intel Corporation
#SPDX-License-Identifier: Apache-2.0
#File : adapter.py


import network_gym_client.adapter
import sys
from gymnasium import spaces
import numpy as np
from pathlib import Path
import json

[docs]class Adapter(network_gym_client.adapter.Adapter): """qos_steer environment adapter. Args: Adapter (network_gym_client.adapter.Adapter): base class. """ def __init__(self, config_json): """Initialize the adapter. Args: config_json (json): the configuration file """ super().__init__(config_json) self.env = Path(__file__).resolve().parent.name self.num_features = 3 self.size_per_feature = int(self.config_json['env_config']['num_users']) if config_json['env_config']['env'] != self.env: sys.exit("[ERROR] wrong environment Adapter. Configured environment: " + str(config_json['env_config']['env']) + " != Launched environment: " + str(self.env)) def get_action_space (self): """Get action space for qos_steer env. Returns: spaces: action spaces """ myarray = np.empty([self.size_per_feature,], dtype=int) myarray.fill(2) #print(myarray) return spaces.MultiDiscrete(myarray) #consistent with the get_observation function. def get_observation_space(self): """Get observation space for qos_steer env. Returns: spaces: observation spaces """ return spaces.Box(low=0, high=1000, shape=(self.num_features, self.size_per_feature), dtype=np.float32) def get_observation(self, df): """Prepare observation for qos_steer env. This function should return the same number of features defined in the :meth:`get_observation_space`. Args: df (pd.DataFrame): network stats measurement Returns: spaces: observation spaces """ #print (df) #data_recv_flat = df.explode(column=['user', 'value']) #print(data_recv_flat) df_rate = None df_wifi_traffic_ratio = None df_x_loc = None df_y_loc = None df_phy_wifi_max_rate = None df_phy_lte_max_rate = None rate_value = np.empty(self.size_per_feature, dtype=object) wifi_max_rate_value = np.empty(self.size_per_feature, dtype=object) lte_max_rate_value = np.empty(self.size_per_feature, dtype=object) for index, row in df.iterrows(): if row['source'] == 'gma': if row['name'] == 'dl::rate': df_rate = row rate_value = row['value'] self.action_data_format = row elif row['name'] == 'wifi::dl::traffic_ratio': df_wifi_traffic_ratio = row elif row['name'] == 'x_loc': df_x_loc = row elif row['name'] == 'y_loc': df_y_loc = row elif row['source'] == 'wifi': if row['name'] == 'dl::max_rate': df_phy_wifi_max_rate = row wifi_max_rate_value = row['value'] elif row['source'] == 'lte': if row['name'] == 'dl::max_rate': df_phy_lte_max_rate = row lte_max_rate_value = row['value'] #print(df_rate) #print(df_phy_lte_max_rate) #print(df_phy_wifi_max_rate) #print(df_wifi_split_ratio) #print(df_x_loc) #Print(df_y_loc) # if not empty and send to wanDB database self.wandb_log_buffer_append(self.df_to_dict(df_phy_wifi_max_rate)) self.wandb_log_buffer_append(self.df_to_dict(df_phy_lte_max_rate)) dict_rate = self.df_to_dict(df_rate) if df_rate is not None: dict_rate["sum_rate"] = sum(df_rate["value"]) self.wandb_log_buffer_append(dict_rate) self.wandb_log_buffer_append(self.df_to_dict(df_wifi_traffic_ratio)) self.wandb_log_buffer_append(self.df_to_dict(df_x_loc)) self.wandb_log_buffer_append(self.df_to_dict(df_y_loc)) observation = np.vstack([lte_max_rate_value, wifi_max_rate_value, rate_value]) print('Observation --> ' + str(observation)) return observation def get_policy(self, action): """Prepare network policy for qos_steer env. Args: action (spaces): action from RL agent Returns: json: network policy """ # you may also check other constraints for action... e.g., min, max. policy1 = json.loads(self.action_data_format.to_json()) policy1["name"] = "wifi::dl::split_ratio" policy1["value"] = action.tolist() print('Action --> ' + str(policy1)) return policy1 def get_reward(self, df): """Prepare reward for qos_steer env. Args: df (pd.DataFrame]): network stats measurement Returns: spaces: reward space """ df_wifi_qos_rate = None for index, row in df.iterrows(): if row['source'] == 'gma': if row['name'] == 'wifi::dl::qos_rate': df_wifi_qos_rate = row #print (df_wifi_qos_rate) reward = 0 if self.config_json["rl_config"]["reward_type"] == "wifi_qos_user_num": reward = self.calculate_wifi_qos_user_num(df_wifi_qos_rate[:]["value"]) else: sys.exit("[ERROR] reward type not supported yet") self.wandb_log_buffer_append(self.df_to_dict(df_wifi_qos_rate, "wifi-qos-rate")) self.wandb_log_buffer_append({"reward": reward}) return reward def calculate_wifi_qos_user_num(self, qos_rate): """Calculate the number of QoS users over Wi-Fi. Default reward function. Args: qos_rate (pandas.DataFrame): qos data rate per user Returns: double: reward """ #print(qos_rate) reward = 0 for r in qos_rate: if r > 0.1: #we assume the min qos rate is 0.1 mbps reward = reward + 1 return reward