Skip to content

Commit

Permalink
[CE-157] Model layer for vSphere agent.
Browse files Browse the repository at this point in the history
Add logic for vSphere type host/cluster CURD operations

Change-Id: Ibbd8885f144b1540ae98ab514e87d9a984e37796
Signed-off-by: hainingzhang <haininghenryzh@vmware.com>
  • Loading branch information
hainingzhang committed Oct 24, 2017
1 parent 4f0031c commit 6b4c8bd
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 23 deletions.
5 changes: 3 additions & 2 deletions src/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
CONSENSUS_PLUGINS_FABRIC_V1, CONSENSUS_PLUGIN_SOLO, \
CONSENSUS_MODES, CONSENSUS_TYPES_FABRIC_V1, \
WORKER_TYPES, WORKER_TYPE_DOCKER, WORKER_TYPE_SWARM, WORKER_TYPE_K8S, \
WORKER_TYPE_VSPHERE, CLUSTER_PORT_START, CLUSTER_PORT_STEP, \
WORKER_TYPE_VSPHERE, \
CLUSTER_PORT_START, CLUSTER_PORT_STEP, \
NETWORK_SIZE_FABRIC_PRE_V1, NETWORK_SIZE_FABRIC_V1, \
CLUSTER_NETWORK, \
CLUSTER_LOG_TYPES, CLUSTER_LOG_LEVEL, \
Expand All @@ -23,7 +24,7 @@
VCENTER, VMUUID, VMMEMORY, VMCPU, VMNAME, VMIP, VMNETMASK, VMDNS, \
VMGATEWAY, TEMPLATE, VC_DATACENTER, VC_CLUSTER, VC_DATASTORE, NETWORK, \
NIC_DEVICE_ADDRESS_TYPE, VCUSERNAME, VCPWD, VCPORT, VCIP, \
WORKER_API_PORT
WORKER_API_PORT, HOST_STATUS, HOST_STATUS_ACTIVE, HOST_STATUS_PENDING

from .fabric_network_config import \
FabricPreNetworkConfig, FabricV1NetworkConfig
Expand Down
5 changes: 5 additions & 0 deletions src/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
# first port that can be assigned as cluster API
CLUSTER_PORT_START = int(os.getenv("CLUSTER_PORT_START", 7050))

# host status
HOST_STATUS = 'status'
HOST_STATUS_ACTIVE = 'active'
HOST_STATUS_PENDING = 'pending'

# number of port allocated to each cluster in case collision
CLUSTER_PORT_STEP = 100

Expand Down
22 changes: 19 additions & 3 deletions src/modules/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
NETWORK_TYPE_FABRIC_PRE_V1, NETWORK_TYPE_FABRIC_V1, \
CONSENSUS_PLUGINS_FABRIC_V1, CONSENSUS_MODES, \
WORKER_TYPES, WORKER_TYPE_DOCKER, WORKER_TYPE_SWARM, WORKER_TYPE_K8S, \
SYS_CREATOR, SYS_DELETER, SYS_USER, SYS_RESETTING, \
WORKER_TYPE_VSPHERE, SYS_CREATOR, SYS_DELETER, SYS_USER, \
SYS_RESETTING, VMIP, \
NETWORK_SIZE_FABRIC_PRE_V1, \
PEER_SERVICE_PORTS, CA_SERVICE_PORTS

from common import FabricPreNetworkConfig, FabricV1NetworkConfig

from modules import host

from agent import ClusterOnDocker
from agent import ClusterOnDocker, ClusterOnVsphere

logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)
Expand All @@ -49,7 +50,8 @@ def __init__(self):
self.host_handler = host.host_handler
self.cluster_agents = {
'docker': ClusterOnDocker(),
'swarm': ClusterOnDocker()
'swarm': ClusterOnDocker(),
'vsphere': ClusterOnVsphere()
}

def list(self, filter_data={}, col_name="active"):
Expand Down Expand Up @@ -114,6 +116,12 @@ def create(self, name, host_id, config, start_port=0,
if not worker:
return None

if worker.get("type") == WORKER_TYPE_VSPHERE:
vm_params = self.host_handler.get_vm_params_by_id(host_id)
docker_daemon = vm_params.get(VMIP) + ":2375"
worker.update({"worker_api": "tcp://" + docker_daemon})
logger.info(worker)

if len(worker.get("clusters")) >= worker.get("capacity"):
logger.warning("host {} is already full".format(host_id))
return None
Expand Down Expand Up @@ -173,6 +181,10 @@ def create(self, name, host_id, config, start_port=0,
# try to start one cluster at the host
worker = self.host_handler.db_update_one(
{"id": host_id}, {"$addToSet": {"clusters": cid}})
# worker get worker_api from host collection
if worker.get("type") == WORKER_TYPE_VSPHERE:
worker.update({"worker_api": worker_api})

if not worker or len(worker.get("clusters")) > worker.get("capacity"):
self.col_active.delete_one({"id": cid})
self.host_handler.db_update_one({"id": host_id},
Expand Down Expand Up @@ -622,6 +634,10 @@ def _get_service_ip(self, cluster_id, node='vp0'):
host_ip = get_swarm_node_ip(worker_api, "{}_{}".format(
cluster_id, node))
logger.debug("swarm host, ip = {}".format(host_ip))
elif host_type == WORKER_TYPE_VSPHERE:
vm_params = self.host_handler.get_vm_params_by_id(host_id)
host_ip = vm_params.get(VMIP)
logger.debug(" host, ip = {}".format(host_ip))
else:
logger.error("Unknown host type = {}".format(host_type))
host_ip = ""
Expand Down
183 changes: 165 additions & 18 deletions src/modules/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
NETWORK_SIZE_FABRIC_V1, \
CLUSTER_PORT_START, CLUSTER_PORT_STEP, \
CONSENSUS_PLUGINS_FABRIC_V1, CONSENSUS_PLUGIN_SOLO, \
WORKER_TYPES

from agent import DockerHost, KubernetesHost

WORKER_TYPES, VCENTER, VCUSERNAME, VCPWD, \
VMNAME, VMMEMORY, VMCPU, VMNETMASK, VMGATEWAY, TEMPLATE, VMIP, \
VIRTUAL_MACHINE, VCIP, VCPORT, VMDNS, NETWORK, VMUUID,\
VC_DATACENTER, VC_DATASTORE, VC_CLUSTER, \
WORKER_TYPE_DOCKER, WORKER_TYPE_SWARM, WORKER_TYPE_VSPHERE, \
HOST_STATUS, HOST_STATUS_PENDING

from agent import DockerHost, VsphereHost, KubernetesHost
from modules import cluster

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -53,13 +57,14 @@ def __init__(self):
self.host_agents = {
'docker': DockerHost("docker"),
'swarm': DockerHost("swarm"),
'kubernetes': KubernetesHost()
'kubernetes': KubernetesHost(),
'vsphere': VsphereHost()
}

def create(self, name, worker_api, host_type, capacity=1,
log_level=CLUSTER_LOG_LEVEL[0],
log_type=CLUSTER_LOG_TYPES[0], log_server="", autofill="false",
schedulable="false", serialization=True):
schedulable="false", serialization=True, params=None):
""" Create a new docker host node
A docker host is potentially a single node or a swarm.
Expand All @@ -75,13 +80,16 @@ def create(self, name, worker_api, host_type, capacity=1,
:param autofill: Whether automatically fillup with chains
:param schedulable: Whether can schedule cluster request to it
:param serialization: whether to get serialized result or object
:param params: extra data for vSphere host type
:return: True or False
"""
logger.debug("Create host: name={}, worker_api={}, host_type={}, "
"capacity={}, log={}/{}, autofill={}, schedulable={}"
.format(name, worker_api, host_type, capacity, log_type,
log_server, autofill, schedulable))
if not worker_api.startswith("tcp://"):

if params is None and not worker_api.startswith("tcp://"):
# params is None when host_type is either docker or swarm.
worker_api = "tcp://" + worker_api

if self.col.find_one({"worker_api": worker_api}):
Expand All @@ -93,10 +101,73 @@ def create(self, name, worker_api, host_type, capacity=1,
if log_type == CLUSTER_LOG_TYPES[0]:
log_server = ""

if not host_type or not self.host_agents[host_type].create(worker_api):
if not host_type:
logger.warning("{} cannot be setup".format(name))
return {}

if (host_type == WORKER_TYPE_DOCKER or
host_type == WORKER_TYPE_SWARM):
if not self.host_agents[host_type].create(worker_api):
logger.warning("{} cannot be setup".format(name))
return {}

if host_type == WORKER_TYPE_VSPHERE:

vc = params.get(VCENTER)
vm = params.get(VIRTUAL_MACHINE)

worker_api = vc.get(VCIP)
vc_username = vc.get(VCUSERNAME)
vc_passwd = vc.get(VCPWD)
vc_port = vc.get(VCPORT)

# Get vm params
# vmname is generated by upstream api.
vmname = vm.get(VMNAME)
vmmem = vm.get(VMMEMORY)
vmcpunum = vm.get(VMCPU)
vmip = vm.get(VMIP)
vmnetmask = vm.get(VMNETMASK)
vmdns = vm.get(VMDNS)
vmgateway = vm.get(VMGATEWAY)

# Get vc params
template = vc.get(TEMPLATE)
vcdatacenter = vc.get(VC_DATACENTER)
vccluster = vc.get(VC_CLUSTER)
vcdatastore = vc.get(VC_DATASTORE)
vcnetwork = vc.get(NETWORK)

h_update = {
VMNAME: vmname,
VMMEMORY: vmmem,
VMCPU: vmcpunum,
VMIP: vmip,
VMNETMASK: vmnetmask,
VMDNS: vmdns,
VMGATEWAY: vmgateway,
TEMPLATE: template,
VC_DATACENTER: vcdatacenter,
VC_CLUSTER: vccluster,
VC_DATASTORE: vcdatastore,
NETWORK: vcnetwork,
VCUSERNAME: vc_username,
VCPWD: vc_passwd,
VCPORT: vc_port,
HOST_STATUS: HOST_STATUS_PENDING
}
try:
if self.host_agents[host_type].create(worker_api,
vc_username,
vc_passwd, vc_port,
params):
logger.info("Creating vSphere host{}".format(name))

except Exception as e: # Catch failure while connecting to vc.
logger.error("{} cannot be setup".format(name))
logger.error("{}".format(e))
return {"msg": "{}".format(e)}

h = {
'id': '',
'name': name,
Expand All @@ -117,6 +188,12 @@ def create(self, name, worker_api, host_type, capacity=1,
{"_id": hid},
{"$set": {"id": str(hid)}})

if params is not None: # If params is not none, update extra info.
host = self.db_update_one(
{"id": str(hid)},
{"$set": h_update})
logger.info(host)

if capacity > 0 and autofill == "true": # should autofill it
self.fillup(str(hid))

Expand All @@ -138,6 +215,33 @@ def get_by_id(self, id):
return {}
return self._serialize(ins)

def get_vc_params_by_id(self, id):
""" Get vCenter params while host type is vsphere
:param id: id of the doc
:return: serialized result or obj
"""
ins = self.col.find_one({"id": id})
if not ins:
logger.warning("No host found with id=" + id)
return {}
return self._serialize(ins, keys=[VCUSERNAME,
VCPWD, VCPORT])

def get_vm_params_by_id(self, id):
""" Get VM params while host type is vsphere
:param id: id of the doc
:return: serialized result or obj
"""
ins = self.col.find_one({"id": id})
if not ins:
logger.warning("No host found with id=" + id)
return {}
return self._serialize(ins, keys=[VMUUID,
VMIP,
VMNAME])

def update(self, id, d):
""" Update a host's property
Expand All @@ -153,6 +257,9 @@ def update(self, id, d):
logger.warning("No host found with id=" + id)
return {}

if h_old.get("status") == "pending":
return {}

if "worker_api" in d and not d["worker_api"].startswith("tcp://"):
d["worker_api"] = "tcp://" + d["worker_api"]

Expand All @@ -177,7 +284,7 @@ def list(self, filter_data={}):
hosts = self.col.find(filter_data)
return list(map(self._serialize, hosts))

def delete(self, id, host_type=WORKER_TYPES[0]):
def delete(self, id):
""" Delete a host instance
:param id: id of the host to delete
Expand All @@ -193,7 +300,27 @@ def delete(self, id, host_type=WORKER_TYPES[0]):
logger.warning("There are clusters on that host, cannot delete.")
return False

self.host_agents[host_type].delete(h.get("worker_api"))
host_type = h.get("type")

if host_type is None:
logger.warning("Host type not found.")
return False

elif (host_type == WORKER_TYPE_DOCKER or
host_type == WORKER_TYPE_SWARM):
self.host_agents[host_type].delete(h.get("worker_api"))

elif host_type == WORKER_TYPE_VSPHERE:
if h.get("status") == "pending":
return False
vc_params = self.get_vc_params_by_id(id)
vm_params = self.get_vm_params_by_id(id)
logger.info(vc_params)
self.host_agents[host_type].delete(vm_params.get(VMUUID),
h.get("worker_api"),
vc_params.get(VCUSERNAME),
vc_params.get(VCPWD),
vc_params.get(VCPORT))
self.col.delete_one({"id": id})
return True

Expand All @@ -209,6 +336,9 @@ def fillup(self, id):
host = self.get_by_id(id)
if not host:
return False
if host.get("status") != "active":
logger.warning("host {} is not active".format(id))
return False
num_new = host.get("capacity") - len(host.get("clusters"))
if num_new <= 0:
logger.warning("host {} already full".format(id))
Expand Down Expand Up @@ -253,6 +383,9 @@ def clean(self, id):
host = self.get_by_id(id)
if not host:
return False
if host.get("status") != "active":
return False

if len(host.get("clusters")) <= 0:
return True

Expand Down Expand Up @@ -280,9 +413,13 @@ def reset(self, id):
"""
logger.debug("clean host with id = {}".format(id))
host = self.get_by_id(id)
if not host or len(host.get("clusters")) > 0:
logger.info(host)
if (not host or len(host.get("clusters")) > 0 or
host.get("status") != "active"):
logger.warning("No find resettable host with id ={}".format(id))
return False
logger.info(host.get("type"))
logger.info(self.host_agents[host.get("type")])
return self.host_agents[host.get("type")].reset(
host_type=host.get("type"), worker_api=host.get("worker_api"))

Expand All @@ -297,14 +434,24 @@ def refresh_status(self, id):
if not host:
logger.warning("No host found with id=" + id)
return False
if not self.host_agents[host.get("type")]\
.refresh_status(host.get("worker_api")):
logger.warning("Host {} is inactive".format(id))
self.db_set_by_id(id, status="inactive")

host_type = host.get("type")
if (host_type == WORKER_TYPE_DOCKER or
host_type == WORKER_TYPE_SWARM):
if not self.host_agents[host_type]\
.refresh_status(host.get("worker_api")):
logger.warning("Host {} is inactive".format(id))
self.db_set_by_id(id, status="inactive")
return False
else:
self.db_set_by_id(id, status="active")
return True
elif host_type == WORKER_TYPE_VSPHERE:
if not self.host_agents[host_type]\
.refresh_status(host.get(VMIP) + ":2375"):
logger.warning("Host {} is inactive".format(id))
self.db_set_by_id(id, status="inactive")
return False

self.db_set_by_id(id, status="active")
return True

def is_active(self, host_id):
"""
Expand Down

0 comments on commit 6b4c8bd

Please sign in to comment.