Source code for network_gym_env.configure

#Copyright(C) 2023 Intel Corporation
#SPDX-License-Identifier: Apache-2.0
#File : dummy_worker.py

import zmq
import threading
import json
import traceback
import pathlib
import socket
FILE_PATH = pathlib.Path(__file__).parent
from network_gym_env.southbound_interface import *

"""
Flow Chart:
-------------------------------------------------
env_config.connect()
[every T seconds]:
    env_config -- "env-hello" --> server
env_config <-- "env-start" -- server
env_config.close()

env_sim.connect()
[every interval]:
    env_sim -- "env-measurement" --> server
    env_sim <-- "env-action" -- server
env_sim.close()

env_config.connect()
env_config -- "env-end" --> server
[every T seconds]:
    env_config -- "env-hello" --> server
...
-------------------------------------------------
"""

[docs]class Configure(threading.Thread): """Environment Configure Component. 1. The "env_config" establishes a connection with the server and periodically dispatches the message "env-hello." 2. Upon reception of an "env-start" signal, the "env_config" terminates its socket connection and initiates the "env_sim" simulator. 3. Upon launching, the "env_sim" establishes a connection with the server, utilizing the identical identification as the "env_config." Subsequently, it commences the exchange of measurement/action messages with the client. 4. Following the conclusion of the "env_sim" simulation, the socket is closed, and the "env_config" reconnects. From the server's point, the "env_config" and "env_sim" are perceived as identical entities. The division between the "env_config" and "env_sim" components provides the advantage of facilitating straightforward expansion of the "env_sim" to other simulators (e.g., ns-3) or test environments, all the while utilizing the same underlying "env_config" code. """ def __init__(self, id, NetworkGymSim, env_list=['custom']): """Initialize custom environment. Args: id (int): environment identity NetworkGymSim (simulator): network simulator env_list (list[str]): a list of supported environments, non-offical account can only use 'custom' as env name """ threading.Thread.__init__ (self) #common_config.json is shared by all environments f = open(FILE_PATH / 'common_config.json') self.config_json = json.load(f) self.identity = u'%s-%d-%s' % (self.config_json["session_name"], id, socket.gethostname()) self.env_list = env_list self.NetworkGymSim = NetworkGymSim self.context = zmq.Context() self.context.setsockopt(zmq.LINGER, 10000) def run(self): """Run the environement configure. """ # connect to server via southbound Interface identity = str(self.identity) env_config = southbound_connect(identity, self.config_json, self.context) print(identity + ': env_config socket connected.') poller = zmq.Poller() poller.register(env_config, flags=zmq.POLLIN) try: while True: # Send Hello msg. hello_msg = json.loads('{"type":"env-hello"}') hello_msg["env_list"] = self.env_list # add supported env_list to hello msg env_config.send_multipart([b'', json.dumps(hello_msg, indent=2).encode('utf-8')])# Hello msg does not include client print(identity + ': send env-hello msg.') print(hello_msg) # resend hello msg after 5 seconds if no msg is received. # [Warning] always use a poll with timeout before receiving a msg (recv_multipart)! otherwise, it may stuck forever! if poller.poll(timeout=5*1000): # received a new msg msg = env_config.recv_multipart() if len(msg) < 2: print ("[Error] Ignore msg with wrong size:" + str(len(msg))) print(msg) continue msg_json = json.loads(msg[1]) if "type" in msg_json and msg_json["type"] == "env-start": # In idle mode (periodic sending env-hello msg), the first msg should be "env-start" # check if the env is supported (in the env_list) if "env" not in msg_json or msg_json["env"] not in self.env_list: # not supported env print("Unkown Environment!") error_msg = json.loads('{"type":"env-error", "error_msg": "Unkown Environment!"}') msg[1]=json.dumps(error_msg, indent=2).encode('utf-8') env_config.send_multipart(msg) continue print(identity + ': Recv.') print(msg_json) poller.unregister(env_config) env_config.close() print(identity + ': env_config socket closed.') # start simulator -------------------> # The simulator will start a new socket to send measurement and receive action. # use try except such that if there is an error in the simulator, the thread will not stop and we can report the error to the client. sim_error_msg = '' try: self.NetworkGymSim(identity, self.config_json, msg[0].decode(), msg_json) # replace it with your own simulator. except Exception: traceback.print_exc() sim_error_msg = traceback.format_exc() # simulator terminated <------------------- # env_config reconnect the server. env_config = southbound_connect(identity, self.config_json, self.context) print(identity + ': env_config socket connected.') poller.register(env_config, flags=zmq.POLLIN) if sim_error_msg != '': # simualtor crashed with error msg, relay to error msg to client error_msg = json.loads('{"type":"env-error"}') error_msg["error_msg"] = sim_error_msg msg[1]=json.dumps(error_msg, indent=2).encode('utf-8') env_config.send_multipart(msg)# Env End msg else: # no error, send env-end to terminate client to env worker mapping end_msg = json.loads('{"type":"env-end"}') env_config.send_multipart([msg[0], json.dumps(end_msg, indent=2).encode('utf-8')])# Env End msg else: print("Unkown MSG type, Expecting env-start!") error_msg = json.loads('{"type":"env-error", "error_msg": "Unkown MSG type, Expecting env-start!"}') msg[1]=json.dumps(error_msg, indent=2).encode('utf-8') env_config.send_multipart(msg) except: poller.unregister(env_config) env_config.close() self.context.term()