Source code for rl_coach.data_stores.nfs_data_store

#
# Copyright (c) 2017 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


import uuid

from rl_coach.data_stores.data_store import DataStoreParameters
from rl_coach.data_stores.checkpoint_data_store import CheckpointDataStore


class NFSDataStoreParameters(DataStoreParameters):
    def __init__(self, ds_params, deployed=False, server=None, path=None, checkpoint_dir: str=""):
        super().__init__(ds_params.store_type, ds_params.orchestrator_type, ds_params.orchestrator_params)
        self.namespace = "default"
        if "namespace" in ds_params.orchestrator_params:
            self.namespace = ds_params.orchestrator_params["namespace"]
        self.checkpoint_dir = checkpoint_dir
        self.name = None
        self.pvc_name = None
        self.pv_name = None
        self.svc_name = None
        self.server = None
        self.path = "/"
        self.deployed = deployed
        if deployed:
            self.server = server
            self.path = path


[docs]class NFSDataStore(CheckpointDataStore): """ An implementation of data store which uses NFS for storing policy checkpoints when using Coach in distributed mode. The policy checkpoints are written by the trainer and read by the rollout worker. """ def __init__(self, params: NFSDataStoreParameters): """ :param params: The parameters required to use the NFS data store. """ self.params = params def deploy(self) -> bool: """ Deploy the NFS server in an orchestrator if/when required. """ if self.params.orchestrator_type == "kubernetes": if not self.params.deployed: if not self.deploy_k8s_nfs(): return False if not self.create_k8s_nfs_resources(): return False return True def get_info(self): from kubernetes import client as k8sclient return k8sclient.V1PersistentVolumeClaimVolumeSource( claim_name=self.params.pvc_name ) def undeploy(self) -> bool: """ Undeploy the NFS server and resources from an orchestrator. """ if self.params.orchestrator_type == "kubernetes": if not self.params.deployed: if not self.undeploy_k8s_nfs(): return False if not self.delete_k8s_nfs_resources(): return False return True def save_to_store(self): pass def load_from_store(self): pass def deploy_k8s_nfs(self) -> bool: """ Deploy the NFS server in the Kubernetes orchestrator. """ from kubernetes import client as k8sclient name = "nfs-server-{}".format(uuid.uuid4()) container = k8sclient.V1Container( name=name, image="k8s.gcr.io/volume-nfs:0.8", ports=[k8sclient.V1ContainerPort( name="nfs", container_port=2049, protocol="TCP" ), k8sclient.V1ContainerPort( name="rpcbind", container_port=111 ), k8sclient.V1ContainerPort( name="mountd", container_port=20048 ), ], volume_mounts=[k8sclient.V1VolumeMount( name='nfs-host-path', mount_path='/exports' )], security_context=k8sclient.V1SecurityContext(privileged=True) ) template = k8sclient.V1PodTemplateSpec( metadata=k8sclient.V1ObjectMeta(labels={'app': name}), spec=k8sclient.V1PodSpec( containers=[container], volumes=[k8sclient.V1Volume( name="nfs-host-path", host_path=k8sclient.V1HostPathVolumeSource(path='/tmp/nfsexports-{}'.format(uuid.uuid4())) )] ) ) deployment_spec = k8sclient.V1DeploymentSpec( replicas=1, template=template, selector=k8sclient.V1LabelSelector( match_labels={'app': name} ) ) deployment = k8sclient.V1Deployment( api_version='apps/v1', kind='Deployment', metadata=k8sclient.V1ObjectMeta(name=name, labels={'app': name}), spec=deployment_spec ) k8s_apps_v1_api_client = k8sclient.AppsV1Api() try: k8s_apps_v1_api_client.create_namespaced_deployment(self.params.namespace, deployment) self.params.name = name except k8sclient.rest.ApiException as e: print("Got exception: %s\n while creating nfs-server", e) return False k8s_core_v1_api_client = k8sclient.CoreV1Api() svc_name = "nfs-service-{}".format(uuid.uuid4()) service = k8sclient.V1Service( api_version='v1', kind='Service', metadata=k8sclient.V1ObjectMeta( name=svc_name ), spec=k8sclient.V1ServiceSpec( selector={'app': self.params.name}, ports=[k8sclient.V1ServicePort( protocol='TCP', port=2049, target_port=2049 )] ) ) try: svc_response = k8s_core_v1_api_client.create_namespaced_service(self.params.namespace, service) self.params.svc_name = svc_name self.params.server = svc_response.spec.cluster_ip except k8sclient.rest.ApiException as e: print("Got exception: %s\n while creating a service for nfs-server", e) return False return True def create_k8s_nfs_resources(self) -> bool: """ Create NFS resources such as PV and PVC in Kubernetes. """ from kubernetes import client as k8sclient pv_name = "nfs-ckpt-pv-{}".format(uuid.uuid4()) persistent_volume = k8sclient.V1PersistentVolume( api_version="v1", kind="PersistentVolume", metadata=k8sclient.V1ObjectMeta( name=pv_name, labels={'app': pv_name} ), spec=k8sclient.V1PersistentVolumeSpec( access_modes=["ReadWriteMany"], nfs=k8sclient.V1NFSVolumeSource( path=self.params.path, server=self.params.server ), capacity={'storage': '10Gi'}, storage_class_name="" ) ) k8s_api_client = k8sclient.CoreV1Api() try: k8s_api_client.create_persistent_volume(persistent_volume) self.params.pv_name = pv_name except k8sclient.rest.ApiException as e: print("Got exception: %s\n while creating the NFS PV", e) return False pvc_name = "nfs-ckpt-pvc-{}".format(uuid.uuid4()) persistent_volume_claim = k8sclient.V1PersistentVolumeClaim( api_version="v1", kind="PersistentVolumeClaim", metadata=k8sclient.V1ObjectMeta( name=pvc_name ), spec=k8sclient.V1PersistentVolumeClaimSpec( access_modes=["ReadWriteMany"], resources=k8sclient.V1ResourceRequirements( requests={'storage': '10Gi'} ), selector=k8sclient.V1LabelSelector( match_labels={'app': self.params.pv_name} ), storage_class_name="" ) ) try: k8s_api_client.create_namespaced_persistent_volume_claim(self.params.namespace, persistent_volume_claim) self.params.pvc_name = pvc_name except k8sclient.rest.ApiException as e: print("Got exception: %s\n while creating the NFS PVC", e) return False return True def undeploy_k8s_nfs(self) -> bool: from kubernetes import client as k8sclient del_options = k8sclient.V1DeleteOptions() k8s_apps_v1_api_client = k8sclient.AppsV1Api() try: k8s_apps_v1_api_client.delete_namespaced_deployment(self.params.name, self.params.namespace, del_options) except k8sclient.rest.ApiException as e: print("Got exception: %s\n while deleting nfs-server", e) return False k8s_core_v1_api_client = k8sclient.CoreV1Api() try: k8s_core_v1_api_client.delete_namespaced_service(self.params.svc_name, self.params.namespace, del_options) except k8sclient.rest.ApiException as e: print("Got exception: %s\n while deleting the service for nfs-server", e) return False return True def delete_k8s_nfs_resources(self) -> bool: """ Delete NFS resources such as PV and PVC from the Kubernetes orchestrator. """ from kubernetes import client as k8sclient del_options = k8sclient.V1DeleteOptions() k8s_api_client = k8sclient.CoreV1Api() try: k8s_api_client.delete_persistent_volume(self.params.pv_name, del_options) except k8sclient.rest.ApiException as e: print("Got exception: %s\n while deleting NFS PV", e) return False try: k8s_api_client.delete_namespaced_persistent_volume_claim(self.params.pvc_name, self.params.namespace, del_options) except k8sclient.rest.ApiException as e: print("Got exception: %s\n while deleting NFS PVC", e) return False return True def setup_checkpoint_dir(self, crd=None): if crd: # TODO: find a way to upload this to the deployed nfs store. pass