Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add plaintext client as alternative to SSH. #198

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 25 additions & 21 deletions rally_ovs/plugins/ovs/ovsclients_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
import pipes
from io import StringIO
from rally_ovs.plugins.ovs.ovsclients import *
from rally_ovs.plugins.ovs.utils import get_ssh_from_credential
from rally_ovs.plugins.ovs.utils import get_client_connection

@configure("ssh")
class SshClient(OvsClient):


def create_client(self):
print("********* call OvnNbctl.create_client")
return get_ssh_from_credential(self.credential)
return get_client_connection(self.credential)


@configure("ovn-nbctl")
Expand All @@ -34,7 +33,7 @@ class OvnNbctl(OvsClient):

class _OvnNbctl(DdCtlMixin):
def __init__(self, credential):
self.ssh = get_ssh_from_credential(credential)
self.client = get_client_connection(credential)
self.context = {}
self.sandbox = None
self.batch_mode = False
Expand Down Expand Up @@ -85,8 +84,9 @@ def run(self, cmd, opts=[], args=[], stdout=sys.stdout,
cmd = itertools.chain(cmd_prefix, [ovn_cmd], opts, [cmd], args)
self.cmds.append(" ".join(cmd))

self.ssh.run("\n".join(self.cmds),
stdout=stdout, stderr=stderr, raise_on_error=raise_on_error)
self.client.run("\n".join(self.cmds),
stdout=stdout, stderr=stderr,
raise_on_error=raise_on_error)

self.cmds = None

Expand All @@ -110,8 +110,8 @@ def flush(self):

run_cmds.append(cmd_prefix + " ".join(self.cmds))

self.ssh.run("\n".join(run_cmds),
stdout=sys.stdout, stderr=sys.stderr)
self.client.run("\n".join(run_cmds),
stdout=sys.stdout, stderr=sys.stderr)

self.cmds = None

Expand Down Expand Up @@ -294,7 +294,7 @@ class OvnSbctl(OvsClient):

class _OvnSbctl(DdCtlMixin):
def __init__(self, credential):
self.ssh = get_ssh_from_credential(credential)
self.client = get_client_connection(credential)
self.context = {}
self.sandbox = None
self.batch_mode = False
Expand Down Expand Up @@ -332,7 +332,7 @@ def run(self, cmd, opts=[], args=[], stdout=sys.stdout, stderr=sys.stderr):
cmd = itertools.chain(cmd_prefix, ["ovn-sbctl"], opts, [cmd], args)
self.cmds.append(" ".join(cmd))

self.ssh.run("\n".join(self.cmds),
self.client.run("\n".join(self.cmds),
stdout=stdout, stderr=stderr)

self.cmds = None
Expand All @@ -355,7 +355,7 @@ def flush(self):
else:
run_cmds.append("sudo ovn-sbctl" + " ".join(self.cmds))

self.ssh.run("\n".join(run_cmds),
self.client.run("\n".join(run_cmds),
stdout=sys.stdout, stderr=sys.stderr)

self.cmds = None
Expand All @@ -368,13 +368,13 @@ def db_set(self, table, record, *col_values):

def count_igmp_flows(self, lswitch, network_prefix='239'):
stdout = StringIO()
self.ssh.run(
self.client.run(
"ovn-sbctl list datapath_binding | grep {sw} -B 1 | "
"grep uuid | cut -f 2 -d ':'".format(sw=lswitch),
stdout=stdout)
uuid = stdout.getvalue().rstrip()
stdout = StringIO()
self.ssh.run(
self.client.run(
"ovn-sbctl list logical_flow | grep 'dst == {nw}' -B 1 | "
"grep {uuid} -B 1 | wc -l".format(
uuid=uuid, nw=network_prefix),
Expand Down Expand Up @@ -405,6 +405,7 @@ def chassis_bound(self, chassis_name):
self.batch_mode = batch_mode
return len(stdout.getvalue().splitlines()) == 1


def create_client(self):
print("********* call OvnSbctl.create_client")

Expand All @@ -417,7 +418,7 @@ class OvsSsh(OvsClient):

class _OvsSsh(object):
def __init__(self, credential):
self.ssh = get_ssh_from_credential(credential)
self.client = get_client_connection(credential)
self.batch_mode = False
self.cmds = None

Expand All @@ -444,7 +445,7 @@ def run(self, cmd):
self.flush()

def run_immediate(self, cmd, stdout=sys.stdout, stderr=sys.stderr):
self.ssh.run(cmd, stdout)
self.client.run(cmd, stdout)

def flush(self):
if self.cmds == None:
Expand All @@ -453,7 +454,8 @@ def flush(self):
cmds = "\n".join(self.cmds)
self.cmds = None

self.ssh.run(cmds, stdout=sys.stdout, stderr=sys.stderr)
self.client.run(cmds, stdout=sys.stdout, stderr=sys.stderr)


def create_client(self):
print("********* call OvsSsh.create_client")
Expand All @@ -467,7 +469,7 @@ class OvsVsctl(OvsClient):
class _OvsVsctl(object):

def __init__(self, credential):
self.ssh = get_ssh_from_credential(credential)
self.client = get_client_connection(credential)
self.context = {}
self.batch_mode = False
self.sandbox = None
Expand Down Expand Up @@ -509,7 +511,7 @@ def run(self, cmd, opts=[], args=[], extras=[], stdout=sys.stdout, stderr=sys.st
if self.batch_mode:
return

self.ssh.run("\n".join(self.cmds), stdout=stdout, stderr=stderr)
self.client.run("\n".join(self.cmds), stdout=stdout, stderr=stderr)

self.cmds = None

Expand All @@ -521,7 +523,7 @@ def flush(self):
if self.install_method == "sandbox":
self.cmds.insert(0, ". %s/sandbox.rc" % self.sandbox)

self.ssh.run("\n".join(self.cmds),
self.client.run("\n".join(self.cmds),
stdout=sys.stdout, stderr=sys.stderr)

self.cmds = None
Expand All @@ -540,6 +542,7 @@ def db_set(self, table, record, *col_values):
args += set_colval_args(*col_values)
self.run("set", args=args)


def create_client(self):
print("********* call OvsVsctl.create_client")
client = self._OvsVsctl(self.credential)
Expand All @@ -553,7 +556,7 @@ class OvsOfctl(OvsClient):
class _OvsOfctl(object):

def __init__(self, credential):
self.ssh = get_ssh_from_credential(credential)
self.client = get_client_connection(credential)
self.context = {}
self.sandbox = None

Expand All @@ -577,7 +580,7 @@ def run(self, cmd, opts=[], args=[], stdout=sys.stdout, stderr=sys.stderr):
cmd_prefix = ["ovs-ofctl"]
cmd = itertools.chain(cmd_prefix, opts, [cmd], args)
cmds.append(" ".join(cmd))
self.ssh.run("\n".join(cmds),
self.client.run("\n".join(cmds),
stdout=stdout, stderr=stderr)

def dump_flows(self, bridge):
Expand All @@ -588,6 +591,7 @@ def dump_flows(self, bridge):
oflow_data = oflow_data.split('\n')
return len(oflow_data)


def create_client(self):
print("********* call OvsOfctl.create_client")
client = self._OvsOfctl(self.credential)
Expand Down
94 changes: 87 additions & 7 deletions rally_ovs/plugins/ovs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@

from rally.common import db

import socket
import selectors
import time

cidr_incr = utils.RAMInt()


'''
Find credential resource from DB by deployment uuid, and return
info as a dict.
Expand Down Expand Up @@ -147,9 +149,87 @@ def get_sandboxes(deploy_uuid, farm="", tag=""):
return sandboxes








class NCatError(Exception):
def __init__(self, details):
self.details = details


class NCatClient(object):
def __init__(self, server):
self.server = server
self.sock = socket.create_connection((server, 8000))
self.sel = selectors.DefaultSelector()
self.sel.register(self.sock, selectors.EVENT_READ)

def run(self, cmd, stdin=None, stdout=None, stderr=None,
raise_on_error=True, timeout=3600):
start = time.clock_gettime(time.CLOCK_MONOTONIC)
end = time.clock_gettime(time.CLOCK_MONOTONIC) + timeout
to = end - start
# We have to doctor the command a bit for three reasons:
# 1. We need to add a newline to ensure that the command
# gets sent to the server and doesn't just get put in
# the socket's write buffer.
# 2. We need to pipe stderr to stdout so that stderr gets
# returned over the client connection.
# 3. We need to add some marker text so our client knows
# that it has received all output from the command. This
# marker text let's us know if the command completed
# successfully or not.
good = "SUCCESS"
bad = "FAIL"
result = f"&& echo -n {good} || echo -n {bad}"
self.sock.send(f"({cmd}) 2>&1 {result}\n".encode('utf-8'))
out = ""
stream = None
error = False
while True:
events = self.sel.select(to)
for key, mask in events:
buf = key.fileobj.recv(4096).decode('utf-8')
if buf.endswith(good):
out += buf[:-len(good)]
stream = stdout
break
elif buf.endswith(bad):
out += buf[:-len(bad)]
# We assume that if the command errored, then everything
# that was output was stderr. This isn't necessarily
# accurate but it hopefully won't ruffle too many feathers.
stream = stderr
error = True
break
else:
out += buf
to = end - time.clock_gettime(time.CLOCK_MONOTONIC)

if stream is not None:
stream.write(out)

if error and raise_on_error:
details = (f"Error running command {cmd}\n"
f"Last stderr output is {out}\n")
raise NCatError(details)

def close(self):
# Test scenarios call close after every operation because with SSH,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding this is not true for SSH. We don't close connections for every operation. SSH connection is cached for every farm node, which models a physical server. For each new operation, it doesn't recreate new SSH connection, but just reusing existing connection to the node. Connections are closed when the test is done and the test program exits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I have an explanation for this :)

This patch was originally made against https://github.com/dceara/ovn-scale-test/tree/ovn-switch-per-node-devel . When I discussed with Dumitru about pushing this change, we agreed to open a PR here first and then we could port it to that particular branch after.

On that branch, we have added close() calls in a lot of places. We ran into issues where tests would fail after a while. We could no longer open new SSH connections because the servers would reject incoming connections due to having too many open. We added close() calls after basically every SSH command. My comment here makes sense in the context of how that branch operates, but here it's just wrong. I'll correct it.

By the way, if you're wondering why we are running into this problem of too many SSH connections, it likely has to do with our different understanding of farm nodes. As I said in another comment, we configure our tests to have multiple farm nodes per physical machine. So the number of SSH connections can pile up more quickly than if each farm node maps to a single physical machine.

Copy link
Collaborator

@dceara dceara Jun 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@putnopvut sorry for the confusion with the branches in my fork. Branch ovn-switch-per-node-devel is stale. What we use downstream is ovn-switch-per-node and in that branch I removed all the close() calls because we addressed the issues we had, i.e., use runner type serial instead of a single runner that would connect to all farm nodes. Like this only the connections used in one of the iteration of the runner will be open (and will get cleaned up when the iteration ends).

I'll try to open PRs for all the stuff that's only in my fork as soon as I get the chance. I hope that will clear things out.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explain @putnopvut and @dceara

# this is necessary to ensure that we do not open too many
# connections. Our ncat client cache is keyed on hostname rather than
# on the controller node. This means that we open far fewer connections
# than SSH does. Therefore, there is no reason to close connections
# as frequently as we do with SSH. We can afford to leave the
# connection open and reuse the clients instead. This is why we "pass"
# in this method.
pass


NCAT_CLIENT_CACHE = {}


def get_client_connection(cred):
try:
global NCAT_CLIENT_CACHE
server = cred["host"]
return NCAT_CLIENT_CACHE.setdefault(server, NCatClient(server))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a option for selecting client? It's better to keep ssh client as default instead of a fallback. There could be some security concerns about using plain text client in corp network.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely, I can do that.

except socket.error:
return get_ssh_from_credential(cred)