Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cephfs support #66

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions benchmark/cephfsfio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import subprocess
import common
import settings
import monitoring
import os
import time
import logging

from benchmark import Benchmark

logger = logging.getLogger("cbt")

class CephFsFio(Benchmark):

def __init__(self, cluster, config):
super(CephFsFio, self).__init__(cluster, config)

# FIXME there are too many permutations, need to put results in SQLITE3
self.cmd_path = config.get('cmd_path', '/usr/bin/fio')
self.pool_profile = config.get('pool_profile', 'default')

self.monaddr_mountpoint = config.get('monaddr_mountpoint')
self.concurrent_procs = config.get('concurrent_procs', 1)
self.total_procs = self.concurrent_procs * len(settings.getnodes('clients').split(','))
self.time = str(config.get('time', None))
self.ramp = str(config.get('ramp', None))
self.iodepth = config.get('iodepth', 16)
self.numjobs = config.get('numjobs', 1)
self.end_fsync = str(config.get('end_fsync', 0))
self.mode = config.get('mode', 'write')
self.rwmixread = config.get('rwmixread', 50)
self.rwmixwrite = 100 - self.rwmixread
self.log_avg_msec = config.get('log_avg_msec', None)
self.ioengine = config.get('ioengine', 'libaio')
self.op_size = config.get('op_size', 4194304)
self.vol_size = config.get('vol_size', 65536)
self.vol_order = config.get('vol_order', 22)
self.random_distribution = config.get('random_distribution', None)
self.rbdadd_mons = config.get('rbdadd_mons')
self.rbdadd_options = config.get('rbdadd_options', 'share')
self.client_ra = config.get('client_ra', 128)
self.datapoolname = "cbt-kernelcephfsfiodata"
self.metadatapoolname = "cbt-kernelcephfsfiometadata"

self.run_dir = '%s/cephfsfio/osd_ra-%08d/client_ra-%08d/op_size-%08d/concurrent_procs-%03d/iodepth-%03d/%s' % (self.run_dir, int(self.osd_ra), int(self.client_ra), int(self.op_size), int(self.concurrent_procs), int(self.iodepth), self.mode)
self.out_dir = '%s/cephfsfio/osd_ra-%08d/client_ra-%08d/op_size-%08d/concurrent_procs-%03d/iodepth-%03d/%s' % (self.archive_dir, int(self.osd_ra), int(self.client_ra), int(self.op_size), int(self.concurrent_procs), int(self.iodepth), self.mode)

# Make the file names string
self.names = ''
for i in xrange(self.concurrent_procs):
self.names += '--name=%s/cbt-kernelcephfsfio-`hostname -s`/cbt-kernelcephfsfio-%d ' % (self.cluster.mnt_dir, i)

def exists(self):
if os.path.exists(self.out_dir):
logger.info('Skipping existing test in %s.', self.out_dir)
return True
return False

def initialize(self):
super(CephFsFio, self).initialize()

logger.info('Running scrub monitoring.')
monitoring.start("%s/scrub_monitoring" % self.run_dir)
self.cluster.check_scrub()
monitoring.stop()

logger.info('Pausing for 60s for idle monitoring.')
monitoring.start("%s/idle_monitoring" % self.run_dir)
time.sleep(60)
monitoring.stop()

common.sync_files('%s/*' % self.run_dir, self.out_dir)

self.mkimages()

# Create the run directory
common.make_remote_dir(self.run_dir)

# populate the fio files
logger.info('Attempting to populating fio files...')
pre_cmd = 'sudo %s --ioengine=%s --rw=write --numjobs=%s --bs=4M --size %dM %s > /dev/null' % (self.cmd_path, self.ioengine, self.numjobs, self.vol_size*0.9, self.names)
common.pdsh(settings.getnodes('clients'), pre_cmd).communicate()

return True


def run(self):
super(CephFsFio, self).run()

# Set client readahead
#self.set_client_param('read_ahead_kb', self.client_ra)

# We'll always drop caches for rados bench
self.dropcaches()

monitoring.start(self.run_dir)

# Run the backfill testing thread if requested
if 'recovery_test' in self.cluster.config:
recovery_callback = self.recovery_callback
self.cluster.create_recovery_test(self.run_dir, recovery_callback)

time.sleep(5)
out_file = '%s/output' % self.run_dir
fio_cmd = 'sudo %s' % (self.cmd_path_full)
fio_cmd += ' --rw=%s' % self.mode
if (self.mode == 'readwrite' or self.mode == 'randrw'):
fio_cmd += ' --rwmixread=%s --rwmixwrite=%s' % (self.rwmixread, self.rwmixwrite)
fio_cmd += ' --ioengine=%s' % self.ioengine
if self.time is not None:
fio_cmd += ' --runtime=%s' % self.time
if self.ramp is not None:
fio_cmd += ' --ramp_time=%s' % self.ramp
fio_cmd += ' --numjobs=%s' % self.numjobs
fio_cmd += ' --direct=1'
fio_cmd += ' --bs=%dB' % self.op_size
fio_cmd += ' --iodepth=%d' % self.iodepth
if self.vol_size:
fio_cmd += ' --size=%dM' % (int(self.vol_size) * 0.9)
fio_cmd += ' --write_iops_log=%s' % out_file
fio_cmd += ' --write_bw_log=%s' % out_file
fio_cmd += ' --write_lat_log=%s' % out_file
if 'recovery_test' in self.cluster.config:
fio_cmd += ' --time_based'
if self.random_distribution is not None:
fio_cmd += ' --random_distribution=%s' % self.random_distribution
fio_cmd += ' %s > %s' % (self.names, out_file)
if self.log_avg_msec is not None:
fio_cmd += ' --log_avg_msec=%s' % self.log_avg_msec
logger.info('Running cephfs fio %s test.', self.mode)
common.pdsh(settings.getnodes('clients'), fio_cmd).communicate()

# If we were doing recovery, wait until it's done.
if 'recovery_test' in self.cluster.config:
self.cluster.wait_recovery_done()

monitoring.stop(self.run_dir)

# Finally, get the historic ops
self.cluster.dump_historic_ops(self.run_dir)
common.sync_files('%s/*' % self.run_dir, self.out_dir)

def cleanup(self):
super(CephFsFio, self).cleanup()
common.pdsh(settings.getnodes('clients'), 'sudo umount %s/cbt-kernelcephfsfio-`hostname -s`' % self.cluster.mnt_dir).communicate()

#def set_client_param(self, param, value):
#Not needed because this is no longer a rbd benchmark
#common.pdsh(settings.getnodes('clients'), 'find /sys/block/rbd* -exec sudo sh -c "echo %s > {}/queue/%s" \;' % (value, param)).communicate()

def __str__(self):
return "%s\n%s\n%s" % (self.run_dir, self.out_dir, super(CephFsFio, self).__str__())

def mkimages(self):
monitoring.start("%s/pool_monitoring" % self.run_dir)
self.cluster.rmpool(self.datapoolname, self.pool_profile)
self.cluster.rmpool(self.metadatapoolname, self.pool_profile)
self.cluster.mkpool(self.datapoolname, self.pool_profile)
self.cluster.mkpool(self.metadatapoolname, self.pool_profile)
stdout, self.adminkeyerror = common.pdsh(settings.getnodes('head'), 'ceph-authtool /tmp/cbt/ceph/keyring -p').communicate()
self.adminkey = stdout.split(':')[1]
self.adminkey = self.adminkey.strip()
common.pdsh(settings.getnodes('head'), 'ceph -c /tmp/cbt/ceph/ceph.conf fs new testfs %s %s' % (self.metadatapoolname, self.datapoolname)).communicate()
common.pdsh(settings.getnodes('clients'), 'sudo mkdir -p -m0755 -- %s/cbt-kernelcephfsfio-`hostname -s`' % self.cluster.mnt_dir).communicate()
common.pdsh(settings.getnodes('clients'), 'sudo mount -t ceph %s %s/cbt-kernelcephfsfio-`hostname -s` -o name=admin,secret=%s' % (self.monaddr_mountpoint, self.cluster.mnt_dir, self.adminkey)).communicate()
monitoring.stop()

def recovery_callback(self):
common.pdsh(settings.getnodes('clients'), 'sudo killall -9 fio').communicate()
12 changes: 12 additions & 0 deletions benchmark/cosbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def __init__(self, cluster, config):
self.objects = config["objects_max"]
self.mode = config["mode"]
self.user = settings.cluster.get('user')
self.rgw = settings.cluster.get('rgws')[0]
self.use_existing = settings.cluster.get('use_existing')

self.run_dir = '%s/osd_ra-%08d/op_size-%s/concurrent_procs-%03d/containers-%05d/objects-%05d/%s' % (self.run_dir, int(self.osd_ra), self.op_size, int(self.total_procs), int(self.containers),int(self.objects), self.mode)
self.out_dir = '%s/osd_ra-%08d/op_size-%s/concurrent_procs-%03d/containers-%05d/objects-%05d/%s' % (self.archive_dir, int(self.osd_ra), self.op_size, int(self.total_procs), int(self.containers),int(self.objects), self.mode)
Expand All @@ -46,7 +48,17 @@ def prerun_check(self):
pass
logger.debug("%s", cosconf)
if "username" in cosconf and "password" in cosconf and "url" in cosconf:

if not self.use_existing:
user, subuser = cosconf["username"].split(':')
stdout, stderr = common.pdsh("%s@%s" % (self.user, self.rgw),"radosgw-admin user create --uid='%s' --display-name='%s'" % (user, user)).communicate()
stdout, stderr = common.pdsh("%s@%s" % (self.user, self.rgw),"radosgw-admin subuser create --uid=%s --subuser=%s --access=full" % (user, cosconf["username"])).communicate()
stdout, stderr = common.pdsh("%s@%s" % (self.user, self.rgw),"radosgw-admin key create --uid=%s --subuser=%s --key-type=swift" % (user, cosconf["username"])).communicate()
stdout, stderr = common.pdsh("%s@%s" % (self.user, self.rgw),"radosgw-admin user modify --uid=%s --max-buckets=100000" % (user)).communicate()
stdout, stderr = common.pdsh("%s@%s" % (self.user, self.rgw),"radosgw-admin subuser modify --uid=%s --subuser=%s --secret=%s --key-type=swift" % (user, cosconf["username"], cosconf["password"])).communicate()

stdout, stderr = common.pdsh("%s@%s" % (self.user, self.config["controller"]),"curl -D - -H 'X-Auth-User: %s' -H 'X-Auth-Key: %s' %s" % (cosconf["username"], cosconf["password"], cosconf["url"])).communicate()

else:
logger.error("Auth Configuration in Yaml file is not in correct format")
sys.exit()
Expand Down
3 changes: 3 additions & 0 deletions benchmarkfactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from benchmark.nullbench import Nullbench
from benchmark.cosbench import Cosbench
from benchmark.cephtestrados import CephTestRados
from benchmark.cephfsfio import CephFsFio


def get_all(cluster, iteration):
Expand Down Expand Up @@ -59,3 +60,5 @@ def get_object(cluster, benchmark, bconfig):
return Cosbench(cluster, bconfig)
if benchmark == 'cephtestrados':
return CephTestRados(cluster, bconfig)
if benchmark == 'cephfsfio':
return CephFsFio(cluster, bconfig)
39 changes: 39 additions & 0 deletions cluster/ceph.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(self, config):
self.ceph_mon_cmd = config.get('ceph-mon_cmd', '/usr/bin/ceph-mon')
self.ceph_run_cmd = config.get('ceph-run_cmd', '/usr/bin/ceph-run')
self.ceph_rgw_cmd = config.get('ceph-rgw_cmd', '/usr/bin/radosgw')
self.ceph_mds_cmd = config.get('ceph-mds_cmd', '/usr/bin/ceph-mds')
self.log_dir = config.get('log_dir', "%s/log" % self.tmp_dir)
self.pid_dir = config.get('pid_dir', "%s/pid" % self.tmp_dir)
self.core_dir = config.get('core_dir', "%s/core" % self.tmp_dir)
Expand All @@ -39,6 +40,7 @@ def __init__(self, config):
self.osd_valgrind = config.get('osd_valgrind', None)
self.mon_valgrind = config.get('mon_valgrind', None)
self.rgw_valgrind = config.get('rgw_valgrind', None)
self.mds_valgrind = config.get('mds_valgrind', None)
self.tiering = config.get('tiering', False)
self.ruleset_map = {}
self.cur_ruleset = 1
Expand Down Expand Up @@ -82,6 +84,7 @@ def initialize(self):
self.make_mons()
self.make_osds()
self.start_rgw()
self.make_mdss()
monitoring.stop()

# Check Health
Expand Down Expand Up @@ -210,6 +213,42 @@ def make_mons(self):
cmd = '%s %s' % (self.ceph_run_cmd, cmd)
common.pdsh(monhost, 'sudo %s' % cmd).communicate()

def make_mdss(self):
# Build and distribute the keyring
common.pdsh(settings.getnodes('head'), 'ceph-authtool --create-keyring --gen-key --name=mds. %s --cap mds \'allow *\'' % self.keyring_fn).communicate()
common.pdsh(settings.getnodes('head'), 'ceph-authtool --gen-key --name=client.admin --set-uid=0 --cap mon \'allow *\' --cap osd \'allow *\' --cap mds allow %s' % self.keyring_fn).communicate()
common.rscp(settings.getnodes('head'), self.keyring_fn, '%s.tmp' % self.keyring_fn).communicate()
common.pdcp(settings.getnodes('mons', 'osds', 'rgws', 'mds'), '', '%s.tmp' % self.keyring_fn, self.keyring_fn).communicate()

# Build the monmap, retrieve it, and distribute it
mdss = settings.getnodes('mdss').split(',')
mdshosts = settings.cluster.get('mdss')
logger.info(mdshosts)

# Build the ceph-mdss
user = settings.cluster.get('user')
for mdshost, mdss in mdshosts.iteritems():
if user:
mdshost = '%s@%s' % (user, mdshost)
for mds, addr in mdss.iteritems():
common.pdsh(mdshost, 'sudo rm -rf %s/mds.%s' % (self.tmp_dir, mds)).communicate()
common.pdsh(mdshost, 'mkdir -p %s/mds.%s' % (self.tmp_dir, mds)).communicate()
common.pdsh(mdshost, 'cp %s %s/mds.%s/keyring' % (self.keyring_fn, self.tmp_dir, mds)).communicate()

# Start the mdss
for mdshost, mdss in mdshosts.iteritems():
if user:
mdshost = '%s@%s' % (user, mdshost)
for mds, addr in mdss.iteritems():
pidfile="%s/%s.pid" % (self.pid_dir, mdshost)
cmd = 'sudo sh -c "ulimit -n 16384 && ulimit -c unlimited && exec %s -c %s -i %s --keyring=%s --pid-file=%s"' % (self.ceph_mds_cmd, self.tmp_conf, mds, self.keyring_fn, pidfile)
if self.mds_valgrind:
cmd = "%s %s" % (common.setup_valgrind(self.mds_valgrind, 'mds.%s' % mdshost, self.tmp_dir), cmd)
else:
cmd = '%s %s' % (self.ceph_run_cmd, cmd)
common.pdsh(mdshost, 'sudo %s' % cmd).communicate()


def make_osds(self):
osdnum = 0
osdhosts = settings.cluster.get('osds')
Expand Down
63 changes: 63 additions & 0 deletions example/cephfsfio/test5.ceph.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
[global]
osd pool default size = 1
auth cluster required = none
auth service required = none
auth client required = none
keyring = /tmp/cbt/ceph/keyring
osd pg bits = 8
osd pgp bits = 8
log to syslog = false
log file = /tmp/cbt/ceph/log/$name.log
public network = 192.168.110.0/24
cluster network = 192.168.110.0/24
rbd cache = true
osd scrub load threshold = 0.01
osd scrub min interval = 137438953472
osd scrub max interval = 137438953472
osd deep scrub interval = 137438953472
osd max scrubs = 16
filestore merge threshold = 40
filestore split multiple = 8
osd op threads = 8
mon pg warn max object skew = 100000
mon pg warn min per osd = 0
mon pg warn max per osd = 32768

[mon]
mon data = /tmp/cbt/ceph/mon.$id

[mon.a]
host = inf1
mon addr = 192.168.110.51:6789

[mon.b]
host = inf2
mon addr = 192.168.110.52:6789

[mon.c]
host = inf3
mon addr = 192.168.110.53:6789

[mds.a]
host = inf1

[mds.b]
host = inf2

[mds.c]
host = inf3

[osd.0]
host = inf1
osd data = /tmp/cbt/mnt/osd-device-0-data
osd journal = /dev/disk/by-partlabel/osd-device-0-journal

[osd.1]
host = inf2
osd data = /tmp/cbt/mnt/osd-device-0-data
osd journal = /dev/disk/by-partlabel/osd-device-0-journal

[osd.2]
host = inf3
osd data = /tmp/cbt/mnt/osd-device-0-data
osd journal = /dev/disk/by-partlabel/osd-device-0-journal
49 changes: 49 additions & 0 deletions example/cephfsfio/test5.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
cluster:
user: 'cbt'
head: "cadmin"
clients: ["cadmin"]
osds: ["inf1", "inf2", "inf3"]
mons:
inf1:
a: "192.168.110.51:6789"
inf2:
b: "192.168.110.52:6789"
inf3:
c: "192.168.110.53:6789"
mdss:
inf1:
a: "192.168.110.51"
inf2:
b: "192.168.110.52"
inf3:
c: "192.168.110.53"
rgws: ["inf1", "inf2", "inf3"]
osds_per_node: 1
fs: 'xfs'
mkfs_opts: '-f -i size=2048 -n size=64k'
mount_opts: '-o inode64,noatime,logbsize=256k'
conf_file: '/home/cbt/cbt/runs/test5.ceph.conf'
iterations: 1
#rebuild_every_test: False
use_existing: False
clusterid: "ceph"
tmp_dir: "/tmp/cbt"
pool_profiles:
rbd:
pg_size: 64
pgp_size: 64
replication: 2
benchmarks:
cephfsfio:
monaddr_mountpoint: '192.168.110.51,192.168.110.52,192.168.110.53:/'
time: 10
vol_size: 1024
mode: ['read', 'write']
rwmixread: 50
op_size: [4096]
concurrent_procs: [1]
iodepth: [32]
osd_ra: [4096]
cmd_path: '/usr/local/bin/fio'
pool_profile: 'rbd'
log_avg_msec: 100
Loading