From 09129b436553fd01ecb7f818d66b2f21580f52b1 Mon Sep 17 00:00:00 2001 From: Mark Michelson Date: Tue, 7 Apr 2020 08:53:03 -0400 Subject: [PATCH] Add plaintext client as alternative to SSH. This offers an alternative to the SSH client currently used during scale testing. This instead connects to a server using plaintext and sends commands this way. It is assumed that a server is running on the server on port 8000 Because ncat is a good example of such a server, the client is called NCatClient in this change. The reason for this new method is to eliminate the overhead of connecting over SSH from the measurements in tests. For reasonably large-scale tests (~300 nodes) it results in a bout a 7% reduction in execution time. One notable difference between this new method and using SSH is that connections are created per host, rather than per sandbox. This allows for clients that connect to sandboxes hosted on the same machine to share the same connection. This has only been tested with tests that execute serially. Tests that run scenarios in parallel may encounter issues. If there is no server listening, and a connection cannot be created, we fall back to using SSH. --- rally_ovs/plugins/ovs/ovsclients_impl.py | 46 ++++++------ rally_ovs/plugins/ovs/utils.py | 94 ++++++++++++++++++++++-- 2 files changed, 112 insertions(+), 28 deletions(-) diff --git a/rally_ovs/plugins/ovs/ovsclients_impl.py b/rally_ovs/plugins/ovs/ovsclients_impl.py index 16bab27..1532ead 100644 --- a/rally_ovs/plugins/ovs/ovsclients_impl.py +++ b/rally_ovs/plugins/ovs/ovsclients_impl.py @@ -17,15 +17,14 @@ import pipes from io import StringIO from rally_ovs.plugins.ovs.ovsclients import * -from rally_ovs.plugins.ovs.utils import get_ssh_from_credential +from rally_ovs.plugins.ovs.utils import get_client_connection @configure("ssh") class SshClient(OvsClient): def create_client(self): - print("********* call OvnNbctl.create_client") - return get_ssh_from_credential(self.credential) + return get_client_connection(self.credential) @configure("ovn-nbctl") @@ -34,7 +33,7 @@ class OvnNbctl(OvsClient): class _OvnNbctl(DdCtlMixin): def __init__(self, credential): - self.ssh = get_ssh_from_credential(credential) + self.client = get_client_connection(credential) self.context = {} self.sandbox = None self.batch_mode = False @@ -85,8 +84,9 @@ def run(self, cmd, opts=[], args=[], stdout=sys.stdout, cmd = itertools.chain(cmd_prefix, [ovn_cmd], opts, [cmd], args) self.cmds.append(" ".join(cmd)) - self.ssh.run("\n".join(self.cmds), - stdout=stdout, stderr=stderr, raise_on_error=raise_on_error) + self.client.run("\n".join(self.cmds), + stdout=stdout, stderr=stderr, + raise_on_error=raise_on_error) self.cmds = None @@ -110,8 +110,8 @@ def flush(self): run_cmds.append(cmd_prefix + " ".join(self.cmds)) - self.ssh.run("\n".join(run_cmds), - stdout=sys.stdout, stderr=sys.stderr) + self.client.run("\n".join(run_cmds), + stdout=sys.stdout, stderr=sys.stderr) self.cmds = None @@ -294,7 +294,7 @@ class OvnSbctl(OvsClient): class _OvnSbctl(DdCtlMixin): def __init__(self, credential): - self.ssh = get_ssh_from_credential(credential) + self.client = get_client_connection(credential) self.context = {} self.sandbox = None self.batch_mode = False @@ -332,7 +332,7 @@ def run(self, cmd, opts=[], args=[], stdout=sys.stdout, stderr=sys.stderr): cmd = itertools.chain(cmd_prefix, ["ovn-sbctl"], opts, [cmd], args) self.cmds.append(" ".join(cmd)) - self.ssh.run("\n".join(self.cmds), + self.client.run("\n".join(self.cmds), stdout=stdout, stderr=stderr) self.cmds = None @@ -355,7 +355,7 @@ def flush(self): else: run_cmds.append("sudo ovn-sbctl" + " ".join(self.cmds)) - self.ssh.run("\n".join(run_cmds), + self.client.run("\n".join(run_cmds), stdout=sys.stdout, stderr=sys.stderr) self.cmds = None @@ -368,13 +368,13 @@ def db_set(self, table, record, *col_values): def count_igmp_flows(self, lswitch, network_prefix='239'): stdout = StringIO() - self.ssh.run( + self.client.run( "ovn-sbctl list datapath_binding | grep {sw} -B 1 | " "grep uuid | cut -f 2 -d ':'".format(sw=lswitch), stdout=stdout) uuid = stdout.getvalue().rstrip() stdout = StringIO() - self.ssh.run( + self.client.run( "ovn-sbctl list logical_flow | grep 'dst == {nw}' -B 1 | " "grep {uuid} -B 1 | wc -l".format( uuid=uuid, nw=network_prefix), @@ -405,6 +405,7 @@ def chassis_bound(self, chassis_name): self.batch_mode = batch_mode return len(stdout.getvalue().splitlines()) == 1 + def create_client(self): print("********* call OvnSbctl.create_client") @@ -417,7 +418,7 @@ class OvsSsh(OvsClient): class _OvsSsh(object): def __init__(self, credential): - self.ssh = get_ssh_from_credential(credential) + self.client = get_client_connection(credential) self.batch_mode = False self.cmds = None @@ -444,7 +445,7 @@ def run(self, cmd): self.flush() def run_immediate(self, cmd, stdout=sys.stdout, stderr=sys.stderr): - self.ssh.run(cmd, stdout) + self.client.run(cmd, stdout) def flush(self): if self.cmds == None: @@ -453,7 +454,8 @@ def flush(self): cmds = "\n".join(self.cmds) self.cmds = None - self.ssh.run(cmds, stdout=sys.stdout, stderr=sys.stderr) + self.client.run(cmds, stdout=sys.stdout, stderr=sys.stderr) + def create_client(self): print("********* call OvsSsh.create_client") @@ -467,7 +469,7 @@ class OvsVsctl(OvsClient): class _OvsVsctl(object): def __init__(self, credential): - self.ssh = get_ssh_from_credential(credential) + self.client = get_client_connection(credential) self.context = {} self.batch_mode = False self.sandbox = None @@ -509,7 +511,7 @@ def run(self, cmd, opts=[], args=[], extras=[], stdout=sys.stdout, stderr=sys.st if self.batch_mode: return - self.ssh.run("\n".join(self.cmds), stdout=stdout, stderr=stderr) + self.client.run("\n".join(self.cmds), stdout=stdout, stderr=stderr) self.cmds = None @@ -521,7 +523,7 @@ def flush(self): if self.install_method == "sandbox": self.cmds.insert(0, ". %s/sandbox.rc" % self.sandbox) - self.ssh.run("\n".join(self.cmds), + self.client.run("\n".join(self.cmds), stdout=sys.stdout, stderr=sys.stderr) self.cmds = None @@ -540,6 +542,7 @@ def db_set(self, table, record, *col_values): args += set_colval_args(*col_values) self.run("set", args=args) + def create_client(self): print("********* call OvsVsctl.create_client") client = self._OvsVsctl(self.credential) @@ -553,7 +556,7 @@ class OvsOfctl(OvsClient): class _OvsOfctl(object): def __init__(self, credential): - self.ssh = get_ssh_from_credential(credential) + self.client = get_client_connection(credential) self.context = {} self.sandbox = None @@ -577,7 +580,7 @@ def run(self, cmd, opts=[], args=[], stdout=sys.stdout, stderr=sys.stderr): cmd_prefix = ["ovs-ofctl"] cmd = itertools.chain(cmd_prefix, opts, [cmd], args) cmds.append(" ".join(cmd)) - self.ssh.run("\n".join(cmds), + self.client.run("\n".join(cmds), stdout=stdout, stderr=stderr) def dump_flows(self, bridge): @@ -588,6 +591,7 @@ def dump_flows(self, bridge): oflow_data = oflow_data.split('\n') return len(oflow_data) + def create_client(self): print("********* call OvsOfctl.create_client") client = self._OvsOfctl(self.credential) diff --git a/rally_ovs/plugins/ovs/utils.py b/rally_ovs/plugins/ovs/utils.py index e17661e..8e3d9be 100644 --- a/rally_ovs/plugins/ovs/utils.py +++ b/rally_ovs/plugins/ovs/utils.py @@ -23,10 +23,12 @@ from rally.common import db +import socket +import selectors +import time cidr_incr = utils.RAMInt() - ''' Find credential resource from DB by deployment uuid, and return info as a dict. @@ -147,9 +149,87 @@ def get_sandboxes(deploy_uuid, farm="", tag=""): return sandboxes - - - - - - +class NCatError(Exception): + def __init__(self, details): + self.details = details + + +class NCatClient(object): + def __init__(self, server): + self.server = server + self.sock = socket.create_connection((server, 8000)) + self.sel = selectors.DefaultSelector() + self.sel.register(self.sock, selectors.EVENT_READ) + + def run(self, cmd, stdin=None, stdout=None, stderr=None, + raise_on_error=True, timeout=3600): + start = time.clock_gettime(time.CLOCK_MONOTONIC) + end = time.clock_gettime(time.CLOCK_MONOTONIC) + timeout + to = end - start + # We have to doctor the command a bit for three reasons: + # 1. We need to add a newline to ensure that the command + # gets sent to the server and doesn't just get put in + # the socket's write buffer. + # 2. We need to pipe stderr to stdout so that stderr gets + # returned over the client connection. + # 3. We need to add some marker text so our client knows + # that it has received all output from the command. This + # marker text let's us know if the command completed + # successfully or not. + good = "SUCCESS" + bad = "FAIL" + result = f"&& echo -n {good} || echo -n {bad}" + self.sock.send(f"({cmd}) 2>&1 {result}\n".encode('utf-8')) + out = "" + stream = None + error = False + while True: + events = self.sel.select(to) + for key, mask in events: + buf = key.fileobj.recv(4096).decode('utf-8') + if buf.endswith(good): + out += buf[:-len(good)] + stream = stdout + break + elif buf.endswith(bad): + out += buf[:-len(bad)] + # We assume that if the command errored, then everything + # that was output was stderr. This isn't necessarily + # accurate but it hopefully won't ruffle too many feathers. + stream = stderr + error = True + break + else: + out += buf + to = end - time.clock_gettime(time.CLOCK_MONOTONIC) + + if stream is not None: + stream.write(out) + + if error and raise_on_error: + details = (f"Error running command {cmd}\n" + f"Last stderr output is {out}\n") + raise NCatError(details) + + def close(self): + # Test scenarios call close after every operation because with SSH, + # this is necessary to ensure that we do not open too many + # connections. Our ncat client cache is keyed on hostname rather than + # on the controller node. This means that we open far fewer connections + # than SSH does. Therefore, there is no reason to close connections + # as frequently as we do with SSH. We can afford to leave the + # connection open and reuse the clients instead. This is why we "pass" + # in this method. + pass + + +NCAT_CLIENT_CACHE = {} + + +def get_client_connection(cred): + try: + global NCAT_CLIENT_CACHE + server = cred["host"] + return NCAT_CLIENT_CACHE.setdefault(server, NCatClient(server)) + except socket.error: + return get_ssh_from_credential(cred)