Source code for network_gym_client.northbound_interface
#Copyright(C) 2023 Intel Corporation
#SPDX-License-Identifier: Apache-2.0
#File : northbound_interface.py
import zmq
import sys
import threading
import time
from random import randint, random
import json
import pandas as pd
[docs]class NorthBoundClient():
"""NetworkGym northbound interface client.
Northbound interface connects the network gym client to the network gym server. Client sends the network policy to the Sever/Env.
Sever/Env replies the network stats to the Client.
"""
def __init__(self, id, config_json):
"""Initialize NorthBoundClient.
Args:
id (int): client ID
config_json (json): configuration file
"""
self.identity = u'%s-%d' % (config_json["session_name"], id)
self.config_json=config_json
self.socket = None
self.context = zmq.Context()
self.context.setsockopt(zmq.LINGER, 10000)
#connect to network gym server using ZMQ socket
def connect(self):
"""Connect to the network gym server. Send the Start Env request, where the configuration is loaded from json file.
"""
self.socket = self.context.socket(zmq.DEALER)
self.socket.plain_username = bytes(self.config_json["session_name"], 'utf-8')
self.socket.plain_password = bytes(self.config_json["session_key"], 'utf-8')
self.socket.identity = self.identity.encode('utf-8')
if self.config_json["connect_via_server_ip_and_server_port"]:
self.socket.connect('tcp://'+str(self.config_json["server_ip"])+':'+str(self.config_json["server_port"]))
print(self.identity + " send start request to server: " + str(self.config_json["server_ip"])+" via port: "+str(self.config_json["server_port"]) + ".")
print("[TIP]: Set connect_via_server_ip_and_server_port to false to connect via port forwarding.")
else:
self.socket.connect('tcp://localhost:'+str(self.config_json["local_fowarded_port"]))
print(self.identity + " send start request to localhost via port: "+str(self.config_json["local_fowarded_port"]) + ".")
print ("[TIP]: Set connect_via_server_ip_and_server_port to true to connect via server ip and port.")
self.gma_start_request = self.config_json["env_config"]
self.socket.send(json.dumps(self.gma_start_request, indent=2).encode('utf-8'))#send start simulation request
#send action to network gym server
def send (self, policy):
"""Send the Policy to the server and environment.
Args:
policy (json): network policy
"""
action_json = {}
action_json["type"] = "env-action"
action_json["action_list"] = policy
#print(action_json)
json_str = json.dumps(action_json, indent=2)
#print(identity +" Send: "+ json_str)
self.socket.send(json_str.encode('utf-8')) #send action
#receive a msg from network gym server
def recv (self):
"""Receive a message from the network gym server.
Returns:
pd.DataFrame: the network stats measurement from the environment
"""
# Set a timeout every time we receive from the server.
poller = zmq.Poller()
poller.register(self.socket, flags=zmq.POLLIN)
if poller.poll(timeout=60000):
poller.unregister(self.socket)
# recv will be called later
else:
if self.config_json["connect_via_server_ip_and_server_port"]:
raise IOError("Cannot connect to the server! Check the configure parameters in common_config.json. Make sure the server_ip and server_port is correct and you can ping server_ip.")
else:
raise IOError("Cannot connect to the server! Check the configure parameters in common_config.json. Make sure the port forwarding to external server is up.")
reply = self.socket.recv()
relay_json = json.loads(reply)
#print(relay_json)
if relay_json["type"] == "no-available-worker":
# no available network gym worker, retry the request later
print(self.identity+" Receive: "+reply.decode())
print(self.identity+" "+"retry later...")
self.socket.close()
self.context.term()
quit()
#elif relay_json["type"] == "env-end":
# # simulation end from the network gym server
# print(self.identity +" Receive: "+ reply.decode())
# print(self.identity+" "+"Simulation Completed.")
# #quit() quit the program in main function.
#
# return None
elif relay_json["type"] == "env-measurement":
return self.process_measurement(relay_json)
elif relay_json["type"] == "env-error":
# error happened. Check the error msg.
print(self.identity +" Receive: "+ reply.decode())
print(self.identity +" "+ "Simulation Stopped with ***[Error]***!")
self.socket.close()
self.context.term()
quit()
else:
# Unkown msg type, please check.This should not happen.
print(self.identity +" Receive: "+ reply.decode())
print(self.identity +" "+ "***[ERROR]*** unkown msg type!")
self.socket.close()
self.context.term()
quit()
def process_measurement (self, reply_json):
"""Process the measurement.
Args:
reply_json (json): the network stats measurement
Returns:
pd.DataFrame: the processed network stats measurement
"""
#print(reply_json)
if(not reply_json['network_stats']):
return None
network_stats = pd.json_normalize(reply_json['network_stats'])
if "workload_stats" in reply_json:
# workload measurement available
print('Env (workload_stats) --> ' + str(reply_json['workload_stats']))
#if "sim_time_lapse_ms" and "time_lapse_ms" in reply_json['workload_stats']:
# if reply_json['workload_stats']['time_lapse_ms']>0:
# print('Env Measurement --> Percentage of time spend on simulation: ' + str(int(100*reply_json['workload_stats']['sim_time_lapse_ms']/reply_json['workload_stats']['time_lapse_ms'])) + '%')
return network_stats
def close(self):
self.socket.close()
self.context.term()