Merge pull request #1781 from ceph/ceph-volume-tests-cli

ceph-volume executable tests and testinfra API update
Andrew Schoen 2017-08-22 10:50:32 -05:00 committed by GitHub
commit d5c0ff35ac
6 changed files with 75 additions and 58 deletions

View File

@ -3,25 +3,29 @@ import os
def node(Ansible, Interface, Command, request):
def node(host, request):
This fixture represents a single node in the ceph cluster. Using the
Ansible fixture provided by testinfra it can access all the ansible variables
host.ansible fixture provided by testinfra it can access all the ansible variables
provided to it by the specific test scenario being ran.
You must include this fixture on any tests that operate on specific type of node
because it contains the logic to manage which tests a node should run.
ansible_vars = Ansible.get_variables()
ansible_vars = host.ansible.get_variables()
# tox will pass in this environment variable. we need to do it this way
# because testinfra does not collect and provide ansible config passed in
# from using --extra-vars
ceph_stable_release = os.environ.get("CEPH_STABLE_RELEASE", "kraken")
node_type = ansible_vars["group_names"][0]
docker = ansible_vars.get("docker")
lvm_scenario = ansible_vars.get("osd_scenario") == 'lvm'
if not request.node.get_marker(node_type) and not request.node.get_marker('all'):
pytest.skip("Not a valid test for node type: %s" % node_type)
if not lvm_scenario and request.node.get_marker("lvm_scenario"):
pytest.skip("Not a valid test for non-lvm scenarios")
if request.node.get_marker("no_docker") and docker:
pytest.skip("Not a valid test for containerized deployments or atomic hosts")
@ -40,7 +44,7 @@ def node(Ansible, Interface, Command, request):
cluster_address = ""
# I can assume eth1 because I know all the vagrant
# boxes we test with use that interface
address = Interface("eth1").addresses[0]
address = host.interface("eth1").addresses[0]
subnet = ".".join(ansible_vars["public_network"].split(".")[0:-1])
num_mons = len(ansible_vars["groups"]["mons"])
num_devices = len(ansible_vars.get("devices", []))
@ -54,8 +58,8 @@ def node(Ansible, Interface, Command, request):
# I can assume eth2 because I know all the vagrant
# boxes we test with use that interface. OSDs are the only
# nodes that have this interface.
cluster_address = Interface("eth2").addresses[0]
cmd = Command('sudo ls /var/lib/ceph/osd/ | sed "s/.*-//"')
cluster_address = host.interface("eth2").addresses[0]
cmd ='sudo ls /var/lib/ceph/osd/ | sed "s/.*-//"')
if cmd.rc == 0:
osd_ids = cmd.stdout.rstrip("\n").split("\n")
osds = osd_ids

View File

@ -4,46 +4,46 @@ import pytest
class TestMons(object):
def test_ceph_mon_package_is_installed(self, node, Package):
assert Package("ceph-mon").is_installed
def test_ceph_mon_package_is_installed(self, node, host):
assert host.package("ceph-mon").is_installed
def test_mon_listens_on_6789(self, node, Socket):
assert Socket("tcp://%s:6789" % node["address"]).is_listening
def test_mon_listens_on_6789(self, node, host):
assert host.socket("tcp://%s:6789" % node["address"]).is_listening
def test_mon_service_is_running(self, node, Service):
def test_mon_service_is_running(self, node, host):
service_name = "ceph-mon@ceph-{hostname}".format(
assert Service(service_name).is_running
assert host.service(service_name).is_running
def test_mon_service_is_enabled(self, node, Service):
def test_mon_service_is_enabled(self, node, host):
service_name = "ceph-mon@ceph-{hostname}".format(
assert Service(service_name).is_enabled
assert host.service(service_name).is_enabled
def test_can_get_cluster_health(self, node, Command):
def test_can_get_cluster_health(self, node, host):
cmd = "sudo ceph --cluster={} --connect-timeout 5 -s".format(node["cluster_name"])
output = Command.check_output(cmd)
output = host.check_output(cmd)
assert output.strip().startswith("cluster")
class TestOSDs(object):
def test_all_osds_are_up_and_in(self, node, Command):
def test_all_osds_are_up_and_in(self, node, host):
cmd = "sudo ceph --cluster={} --connect-timeout 5 -s".format(node["cluster_name"])
output = Command.check_output(cmd)
output = host.check_output(cmd)
phrase = "{num_osds} osds: {num_osds} up, {num_osds} in".format(num_osds=node["total_osds"])
assert phrase in output
def test_all_docker_osds_are_up_and_in(self, node, Command):
def test_all_docker_osds_are_up_and_in(self, node, host):
cmd = "sudo docker exec ceph-mon-ceph-{} ceph --cluster={} --connect-timeout 5 -s".format(
output = Command.check_output(cmd)
output = host.check_output(cmd)
phrase = "{num_osds} osds: {num_osds} up, {num_osds} in".format(num_osds=node["total_osds"])
assert phrase in output

View File

@ -1,7 +1,7 @@
class TestOSD(object):
def test_osds_are_all_collocated(self, node, Command):
def test_osds_are_all_collocated(self, node, host):
# TODO: figure out way to paramaterize node['vars']['devices'] for this test
for device in node["vars"]["devices"]:
assert Command.check_output("sudo blkid -s PARTLABEL -o value %s2" % device) in ["ceph journal", "ceph block"]
assert host.check_output("sudo blkid -s PARTLABEL -o value %s2" % device) in ["ceph journal", "ceph block"]

View File

@ -4,30 +4,43 @@ import pytest
class TestOSDs(object):
def test_ceph_osd_package_is_installed(self, node, Package):
assert Package("ceph-osd").is_installed
def test_ceph_osd_package_is_installed(self, node, host):
assert host.package("ceph-osd").is_installed
def test_osds_listen_on_public_network(self, node, Command):
def test_osds_listen_on_public_network(self, node, host):
# TODO: figure out way to paramaterize this test
nb_port = (node["num_devices"] * 2)
assert Command.check_output("netstat -lntp | grep ceph-osd | grep %s | wc -l" % (node["address"])) == str(nb_port)
assert host.check_output("netstat -lntp | grep ceph-osd | grep %s | wc -l" % (node["address"])) == str(nb_port)
def test_osds_listen_on_cluster_network(self, node, Command):
def test_osds_listen_on_cluster_network(self, node, host):
# TODO: figure out way to paramaterize this test
nb_port = (node["num_devices"] * 2)
assert Command.check_output("netstat -lntp | grep ceph-osd | grep %s | wc -l" % (node["cluster_address"])) == str(nb_port)
assert host.check_output("netstat -lntp | grep ceph-osd | grep %s | wc -l" % (node["cluster_address"])) == str(nb_port)
def test_osd_services_are_running(self, node, Service):
def test_osd_services_are_running(self, node, host):
# TODO: figure out way to paramaterize node['osds'] for this test
for osd in node["osds"]:
assert Service("ceph-osd@%s" % osd).is_running
assert host.service("ceph-osd@%s" % osd).is_running
def test_osd_services_are_running(self, node, host):
# TODO: figure out way to paramaterize node['osds'] for this test
for osd in node["osds"]:
assert host.service("ceph-osd@%s" % osd).is_running
def test_osd_are_mounted(self, node, MountPoint):
def test_osd_are_mounted(self, node, host):
# TODO: figure out way to paramaterize node['osd_ids'] for this test
for osd_id in node["osd_ids"]:
osd_path = "/var/lib/ceph/osd/{cluster}-{osd_id}".format(
assert MountPoint(osd_path).exists
assert host.mount_point(osd_path).exists
def test_ceph_volume_is_installed(self, node, host):
def test_ceph_volume_systemd_is_installed(self, node, host):

View File

@ -5,24 +5,24 @@ import json
class TestRGWs(object):
def test_rgw_bucket_default_quota_is_set(self, node, File):
assert File(node["conf_path"]).contains("rgw override bucket index max shards")
assert File(node["conf_path"]).contains("rgw bucket default quota max objects")
def test_rgw_bucket_default_quota_is_set(self, node, host):
assert host.file(node["conf_path"]).contains("rgw override bucket index max shards")
assert host.file(node["conf_path"]).contains("rgw bucket default quota max objects")
def test_rgw_bucket_default_quota_is_applied(self, node, Command):
def test_rgw_bucket_default_quota_is_applied(self, node, host):
radosgw_admin_cmd = "sudo radosgw-admin --cluster={} user create --uid=test --display-name Test".format(node["cluster_name"])
radosgw_admin_output = Command.check_output(radosgw_admin_cmd)
radosgw_admin_output = host.check_output(radosgw_admin_cmd)
radosgw_admin_output_json = json.loads(radosgw_admin_output)
assert radosgw_admin_output_json["bucket_quota"]["enabled"] == True
assert radosgw_admin_output_json["bucket_quota"]["max_objects"] == 1638400
def test_rgw_tuning_pools_are_set(self, node, Command):
def test_rgw_tuning_pools_are_set(self, node, host):
cmd = "sudo ceph --cluster={} --connect-timeout 5 osd dump".format(node["cluster_name"])
output = Command.check_output(cmd)
output = host.check_output(cmd)
pools = node["vars"]["create_pools"]
for pool_name, pg_num in pools.items():
assert pool_name in output
pg_num_str = "pg_num {pg_num}".format(pg_num = pg_num["pg_num"])
pg_num_str = "pg_num {pg_num}".format(pg_num=pg_num["pg_num"])
assert pg_num_str in output

View File

@ -3,40 +3,40 @@ import pytest
class TestInstall(object):
def test_ceph_dir_exists(self, File, node):
assert File('/etc/ceph').exists
def test_ceph_dir_exists(self, host, node):
assert host.file('/etc/ceph').exists
def test_ceph_dir_is_a_directory(self, File, node):
assert File('/etc/ceph').is_directory
def test_ceph_dir_is_a_directory(self, host, node):
assert host.file('/etc/ceph').is_directory
def test_ceph_conf_exists(self, File, node):
assert File(node["conf_path"]).exists
def test_ceph_conf_exists(self, host, node):
assert host.file(node["conf_path"]).exists
def test_ceph_conf_is_a_file(self, File, node):
assert File(node["conf_path"]).is_file
def test_ceph_conf_is_a_file(self, host, node):
assert host.file(node["conf_path"]).is_file
def test_ceph_command_exists(self, Command, node):
assert Command.exists("ceph")
def test_ceph_command_exists(self, host, node):
assert host.exists("ceph")
class TestCephConf(object):
def test_ceph_config_has_inital_members_line(self, node, File):
assert File(node["conf_path"]).contains("^mon initial members = .*$")
def test_ceph_config_has_inital_members_line(self, node, host):
assert host.file(node["conf_path"]).contains("^mon initial members = .*$")
def test_initial_members_line_has_correct_value(self, node, File):
def test_initial_members_line_has_correct_value(self, node, host):
mons = ",".join("ceph-%s" % host
for host in node["vars"]["groups"]["mons"])
line = "mon initial members = {}".format(mons)
assert File(node["conf_path"]).contains(line)
assert host.file(node["conf_path"]).contains(line)
def test_ceph_config_has_mon_host_line(self, node, File):
assert File(node["conf_path"]).contains("^mon host = .*$")
def test_ceph_config_has_mon_host_line(self, node, host):
assert host.file(node["conf_path"]).contains("^mon host = .*$")
def test_mon_host_line_has_correct_value(self, node, File):
def test_mon_host_line_has_correct_value(self, node, host):
mon_ips = []
for x in range(0, node["num_mons"]):
mon_ips.append("{}.1{}".format(node["subnet"], x))
line = "mon host = {}".format(",".join(mon_ips))
assert File(node["conf_path"]).contains(line)
assert host.file(node["conf_path"]).contains(line)