ceph_key: support using different keyring

Currently the `ceph_key` module doesn't support using a different
keyring than `client.admin`.
This commit adds the possibility to use a different keyring.

Usage:
```
      ceph_key:
        name: "client.rgw.myrgw-node.rgw123"
        cluster: "ceph"
        user: "client.bootstrap-rgw"
        user_key: /var/lib/ceph/bootstrap-rgw/ceph.keyring
        dest: "/var/lib/ceph/radosgw/ceph-rgw.myrgw-node.rgw123/keyring"
        caps:
          osd: 'allow rwx'
          mon: 'allow rw'
          import_key: False
        owner: "ceph"
        group: "ceph"
        mode: "0400"
```

Where:
`user` corresponds to `-n (--name)`
`user_key` corresponds to `-k (--keyring)`

Signed-off-by: Guillaume Abrioux <gabrioux@redhat.com>
pull/5783/head
Guillaume Abrioux 2020-10-03 06:56:06 +02:00
parent a802fa2810
commit 12e6260266
2 changed files with 103 additions and 76 deletions

View File

@ -47,6 +47,16 @@ options:
description: description:
- name of the CephX key - name of the CephX key
required: true required: true
user:
description:
- entity used to perform operation.
It corresponds to the -n option (--name)
required: false
user_key:
description:
- the path to the keyring corresponding to the
user being used.
It corresponds to the -k option (--keyring)
state: state:
description: description:
- If 'present' is used, the module creates a keyring - If 'present' is used, the module creates a keyring
@ -131,6 +141,8 @@ caps:
- name: create cephx key - name: create cephx key
ceph_key: ceph_key:
name: "{{ keys_to_create }}" name: "{{ keys_to_create }}"
user: client.bootstrap-rgw
user_key: /var/lib/ceph/bootstrap-rgw/ceph.keyring
state: present state: present
caps: "{{ caps }}" caps: "{{ caps }}"
@ -244,24 +256,26 @@ def generate_secret():
return secret return secret
def generate_caps(cmd, _type, caps): def generate_caps(_type, caps):
''' '''
Generate CephX capabilities list Generate CephX capabilities list
''' '''
caps_cli = []
for k, v in caps.items(): for k, v in caps.items():
# makes sure someone didn't pass an empty var, # makes sure someone didn't pass an empty var,
# we don't want to add an empty cap # we don't want to add an empty cap
if len(k) == 0: if len(k) == 0:
continue continue
if _type == "ceph-authtool": if _type == "ceph-authtool":
cmd.extend(["--cap"]) caps_cli.extend(["--cap"])
cmd.extend([k, v]) caps_cli.extend([k, v])
return cmd return caps_cli
def generate_ceph_cmd(cluster, args, user, user_key, container_image=None): def generate_ceph_cmd(cluster, args, user, user_key_path, container_image=None):
''' '''
Generate 'ceph' command line to execute Generate 'ceph' command line to execute
''' '''
@ -278,7 +292,7 @@ def generate_ceph_cmd(cluster, args, user, user_key, container_image=None):
'-n', '-n',
user, user,
'-k', '-k',
user_key, user_key_path,
'--cluster', '--cluster',
cluster, cluster,
'auth', 'auth',
@ -312,40 +326,38 @@ def generate_ceph_authtool_cmd(cluster, name, secret, caps, dest, container_imag
] ]
cmd.extend(base_cmd) cmd.extend(base_cmd)
cmd = generate_caps(cmd, "ceph-authtool", caps) cmd.extend(generate_caps("ceph-authtool", caps))
return cmd return cmd
def create_key(module, result, cluster, name, secret, caps, import_key, dest, container_image=None): # noqa E501 def create_key(module, result, cluster, user, user_key_path, name, secret, caps, import_key, dest, container_image=None): # noqa E501
''' '''
Create a CephX key Create a CephX key
''' '''
args = [
'import',
'-i',
dest,
]
cmd_list = [] cmd_list = []
if not secret: if not secret:
secret = generate_secret() secret = generate_secret()
if user == 'client.admin':
args = ['import', '-i', dest]
else:
args = ['get-or-create', name]
args.extend(generate_caps(None, caps))
args.extend(['-o', dest])
cmd_list.append(generate_ceph_authtool_cmd( cmd_list.append(generate_ceph_authtool_cmd(
cluster, name, secret, caps, dest, container_image)) cluster, name, secret, caps, dest, container_image))
if import_key: if import_key or user != 'client.admin':
user = "client.admin"
keyring_filename = cluster + "." + user + ".keyring"
user_key = os.path.join("/etc/ceph/", keyring_filename)
cmd_list.append(generate_ceph_cmd( cmd_list.append(generate_ceph_cmd(
cluster, args, user, user_key, container_image)) cluster, args, user, user_key_path, container_image))
return cmd_list return cmd_list
def delete_key(cluster, name, container_image=None): def delete_key(cluster, user, user_key_path, name, container_image=None):
''' '''
Delete a CephX key Delete a CephX key
''' '''
@ -357,16 +369,13 @@ def delete_key(cluster, name, container_image=None):
name, name,
] ]
user = "client.admin"
keyring_filename = cluster + "." + user + ".keyring"
user_key = os.path.join("/etc/ceph/", keyring_filename)
cmd_list.append(generate_ceph_cmd( cmd_list.append(generate_ceph_cmd(
cluster, args, user, user_key, container_image)) cluster, args, user, user_key_path, container_image))
return cmd_list return cmd_list
def get_key(cluster, name, dest, container_image=None): def get_key(cluster, user, user_key_path, name, dest, container_image=None):
''' '''
Get a CephX key (write on the filesystem) Get a CephX key (write on the filesystem)
''' '''
@ -380,16 +389,13 @@ def get_key(cluster, name, dest, container_image=None):
dest, dest,
] ]
user = "client.admin"
keyring_filename = cluster + "." + user + ".keyring"
user_key = os.path.join("/etc/ceph/", keyring_filename)
cmd_list.append(generate_ceph_cmd( cmd_list.append(generate_ceph_cmd(
cluster, args, user, user_key, container_image)) cluster, args, user, user_key_path, container_image))
return cmd_list return cmd_list
def info_key(cluster, name, user, user_key, output_format, container_image=None): # noqa E501 def info_key(cluster, name, user, user_key_path, output_format, container_image=None): # noqa E501
''' '''
Get information about a CephX key Get information about a CephX key
''' '''
@ -404,12 +410,12 @@ def info_key(cluster, name, user, user_key, output_format, container_image=None)
] ]
cmd_list.append(generate_ceph_cmd( cmd_list.append(generate_ceph_cmd(
cluster, args, user, user_key, container_image)) cluster, args, user, user_key_path, container_image))
return cmd_list return cmd_list
def list_keys(cluster, user, user_key, container_image=None): def list_keys(cluster, user, user_key_path, container_image=None):
''' '''
List all CephX keys List all CephX keys
''' '''
@ -423,7 +429,7 @@ def list_keys(cluster, user, user_key, container_image=None):
] ]
cmd_list.append(generate_ceph_cmd( cmd_list.append(generate_ceph_cmd(
cluster, args, user, user_key, container_image)) cluster, args, user, user_key_path, container_image))
return cmd_list return cmd_list
@ -505,6 +511,8 @@ def run_module():
secret=dict(type='str', required=False, default=None, no_log=True), secret=dict(type='str', required=False, default=None, no_log=True),
import_key=dict(type='bool', required=False, default=True), import_key=dict(type='bool', required=False, default=True),
dest=dict(type='str', required=False, default='/etc/ceph/'), dest=dict(type='str', required=False, default='/etc/ceph/'),
user=dict(type='str', required=False, default='client.admin'),
user_key=dict(type='str', required=False, default=None)
) )
module = AnsibleModule( module = AnsibleModule(
@ -523,6 +531,8 @@ def run_module():
secret = module.params.get('secret') secret = module.params.get('secret')
import_key = module.params.get('import_key') import_key = module.params.get('import_key')
dest = module.params.get('dest') dest = module.params.get('dest')
user = module.params.get('user')
user_key = module.params.get('user_key')
changed = False changed = False
@ -551,9 +561,13 @@ def run_module():
_caps = caps _caps = caps
key_exist = 1 key_exist = 1
user = "client.admin" if not user_key:
keyring_filename = cluster + '.' + user + '.keyring' user_key_filename = '{}.{}.keyring'.format(cluster, user)
user_key = os.path.join("/etc/ceph/", keyring_filename) user_key_dir = '/etc/ceph'
user_key_path = os.path.join(user_key_dir, user_key_filename)
else:
user_key_path = user_key
output_format = "json" output_format = "json"
if (state in ["present", "update"]): if (state in ["present", "update"]):
@ -575,7 +589,7 @@ def run_module():
if import_key: if import_key:
_info_key = [] _info_key = []
rc, cmd, out, err = exec_commands( rc, cmd, out, err = exec_commands(
module, info_key(cluster, name, user, user_key, output_format, container_image)) # noqa E501 module, info_key(cluster, name, user, user_key_path, output_format, container_image)) # noqa E501
key_exist = rc key_exist = rc
if not caps and key_exist != 0: if not caps and key_exist != 0:
fatal("Capabilities must be provided when state is 'present'", module) # noqa E501 fatal("Capabilities must be provided when state is 'present'", module) # noqa E501
@ -591,7 +605,7 @@ def run_module():
_caps = _info_key[0]['caps'] _caps = _info_key[0]['caps']
if secret == _secret and caps == _caps: if secret == _secret and caps == _caps:
if not os.path.isfile(file_path): if not os.path.isfile(file_path):
rc, cmd, out, err = exec_commands(module, get_key(cluster, name, file_path, container_image)) # noqa E501 rc, cmd, out, err = exec_commands(module, get_key(cluster, user, user_key_path, name, file_path, container_image)) # noqa E501
result["rc"] = rc result["rc"] = rc
if rc != 0: if rc != 0:
result["stdout"] = "Couldn't fetch the key {0} at {1}.".format(name, file_path) # noqa E501 result["stdout"] = "Couldn't fetch the key {0} at {1}.".format(name, file_path) # noqa E501
@ -609,7 +623,7 @@ def run_module():
module.exit_json(**result) module.exit_json(**result)
if (key_exist == 0 and (secret != _secret or caps != _caps)) or key_exist != 0: # noqa E501 if (key_exist == 0 and (secret != _secret or caps != _caps)) or key_exist != 0: # noqa E501
rc, cmd, out, err = exec_commands(module, create_key( rc, cmd, out, err = exec_commands(module, create_key(
module, result, cluster, name, secret, caps, import_key, file_path, container_image)) # noqa E501 module, result, cluster, user, user_key_path, name, secret, caps, import_key, file_path, container_image)) # noqa E501
if rc != 0: if rc != 0:
result["stdout"] = "Couldn't create or update {0}".format(name) result["stdout"] = "Couldn't create or update {0}".format(name)
result["stderr"] = err result["stderr"] = err
@ -620,7 +634,7 @@ def run_module():
elif state == "absent": elif state == "absent":
if key_exist == 0: if key_exist == 0:
rc, cmd, out, err = exec_commands( rc, cmd, out, err = exec_commands(
module, delete_key(cluster, name, container_image)) module, delete_key(cluster, user, user_key_path, name, container_image))
if rc == 0: if rc == 0:
changed = True changed = True
else: else:
@ -628,7 +642,7 @@ def run_module():
elif state == "info": elif state == "info":
rc, cmd, out, err = exec_commands( rc, cmd, out, err = exec_commands(
module, info_key(cluster, name, user, user_key, output_format, container_image)) # noqa E501 module, info_key(cluster, name, user, user_key_path, output_format, container_image)) # noqa E501
if rc != 0: if rc != 0:
result["stdout"] = "skipped, since {0} does not exist".format(name) result["stdout"] = "skipped, since {0} does not exist".format(name)
result['rc'] = 0 result['rc'] = 0
@ -636,15 +650,15 @@ def run_module():
elif state == "list": elif state == "list":
rc, cmd, out, err = exec_commands( rc, cmd, out, err = exec_commands(
module, list_keys(cluster, user, user_key, container_image)) module, list_keys(cluster, user, user_key_path, container_image))
elif state == "fetch_initial_keys": elif state == "fetch_initial_keys":
hostname = socket.gethostname().split('.', 1)[0] hostname = socket.gethostname().split('.', 1)[0]
user = "mon." user = "mon."
keyring_filename = cluster + "-" + hostname + "/keyring" keyring_filename = cluster + "-" + hostname + "/keyring"
user_key = os.path.join("/var/lib/ceph/mon/", keyring_filename) user_key_path = os.path.join("/var/lib/ceph/mon/", keyring_filename)
rc, cmd, out, err = exec_commands( rc, cmd, out, err = exec_commands(
module, list_keys(cluster, user, user_key, container_image)) module, list_keys(cluster, user, user_key_path, container_image))
if rc != 0: if rc != 0:
result["stdout"] = "failed to retrieve ceph keys" result["stdout"] = "failed to retrieve ceph keys"
result["sdterr"] = err result["sdterr"] = err
@ -669,7 +683,7 @@ def run_module():
] ]
info_cmd = info_key(cluster, entity, user, info_cmd = info_key(cluster, entity, user,
user_key, output_format, container_image) user_key_path, output_format, container_image)
# we use info_cmd[0] because info_cmd is an array made of an array # we use info_cmd[0] because info_cmd is an array made of an array
info_cmd[0].extend(extra_args) info_cmd[0].extend(extra_args)
rc, cmd, out, err = exec_commands( rc, cmd, out, err = exec_commands(

View File

@ -1,13 +1,13 @@
import json import json
import os import os
import sys import sys
import ceph_key
import mock import mock
import pytest import pytest
from ansible.module_utils import basic from ansible.module_utils import basic
from ansible.module_utils._text import to_bytes from ansible.module_utils._text import to_bytes
sys.path.append('./library') sys.path.append('./library')
import ceph_key # noqa: E402
# From ceph-ansible documentation # From ceph-ansible documentation
@ -42,10 +42,8 @@ class TestCephKeyModule(object):
'mon': 'allow *', 'mon': 'allow *',
'osd': 'allow rwx', 'osd': 'allow rwx',
} }
fake_cmd = ['ceph']
fake_type = "ceph-authtool" fake_type = "ceph-authtool"
expected_command_list = [ expected_command_list = [
'ceph',
'--cap', '--cap',
'mon', 'mon',
'allow *', 'allow *',
@ -53,7 +51,7 @@ class TestCephKeyModule(object):
'osd', 'osd',
'allow rwx' 'allow rwx'
] ]
result = ceph_key.generate_caps(fake_cmd, fake_type, fake_caps) result = ceph_key.generate_caps(fake_type, fake_caps)
assert result == expected_command_list assert result == expected_command_list
def test_generate_caps_not_ceph_authtool(self): def test_generate_caps_not_ceph_authtool(self):
@ -61,16 +59,14 @@ class TestCephKeyModule(object):
'mon': 'allow *', 'mon': 'allow *',
'osd': 'allow rwx', 'osd': 'allow rwx',
} }
fake_cmd = ['ceph']
fake_type = "" fake_type = ""
expected_command_list = [ expected_command_list = [
'ceph',
'mon', 'mon',
'allow *', 'allow *',
'osd', 'osd',
'allow rwx' 'allow rwx'
] ]
result = ceph_key.generate_caps(fake_cmd, fake_type, fake_caps) result = ceph_key.generate_caps(fake_type, fake_caps)
assert result == expected_command_list assert result == expected_command_list
def test_generate_ceph_cmd_list_non_container(self): def test_generate_ceph_cmd_list_non_container(self):
@ -189,6 +185,8 @@ class TestCephKeyModule(object):
def test_create_key_non_container(self): def test_create_key_non_container(self):
fake_module = "fake" fake_module = "fake"
fake_user = 'client.admin'
fake_user_key = '/etc/ceph/fake.client.admin.keyring'
fake_result = " fake" fake_result = " fake"
fake_cluster = "fake" fake_cluster = "fake"
fake_name = "client.fake" fake_name = "client.fake"
@ -204,15 +202,17 @@ class TestCephKeyModule(object):
expected_command_list = [ expected_command_list = [
['ceph-authtool', '--create-keyring', fake_file_destination, '--name', fake_name, ['ceph-authtool', '--create-keyring', fake_file_destination, '--name', fake_name,
'--add-key', fake_secret, '--cap', 'mon', 'allow *', '--cap', 'osd', 'allow rwx'], '--add-key', fake_secret, '--cap', 'mon', 'allow *', '--cap', 'osd', 'allow rwx'],
['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', '--cluster', fake_cluster, 'auth', ['ceph', '-n', fake_user, '-k', fake_user_key, '--cluster', fake_cluster, 'auth',
'import', '-i', fake_file_destination], 'import', '-i', fake_file_destination],
] ]
result = ceph_key.create_key(fake_module, fake_result, fake_cluster, result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_user, fake_user_key,
fake_name, fake_secret, fake_caps, fake_import_key, fake_file_destination) fake_name, fake_secret, fake_caps, fake_import_key, fake_file_destination)
assert result == expected_command_list assert result == expected_command_list
def test_create_key_container(self): def test_create_key_container(self):
fake_module = "fake" fake_module = "fake"
fake_user = 'client.admin'
fake_user_key = '/etc/ceph/fake.client.admin.keyring'
fake_result = "fake" fake_result = "fake"
fake_cluster = "fake" fake_cluster = "fake"
fake_name = "client.fake" fake_name = "client.fake"
@ -255,12 +255,14 @@ class TestCephKeyModule(object):
'--cluster', fake_cluster, '--cluster', fake_cluster,
'auth', 'import', 'auth', 'import',
'-i', fake_file_destination]] '-i', fake_file_destination]]
result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name, result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_user, fake_user_key, fake_name,
fake_secret, fake_caps, fake_import_key, fake_file_destination, fake_container_image) fake_secret, fake_caps, fake_import_key, fake_file_destination, fake_container_image)
assert result == expected_command_list assert result == expected_command_list
def test_create_key_non_container_no_import(self): def test_create_key_non_container_no_import(self):
fake_module = "fake" fake_module = "fake"
fake_user = 'client.admin'
fake_user_key = '/etc/ceph/fake.client.admin.keyring'
fake_result = "fake" fake_result = "fake"
fake_cluster = "fake" fake_cluster = "fake"
fake_name = "client.fake" fake_name = "client.fake"
@ -289,12 +291,14 @@ class TestCephKeyModule(object):
'osd', 'osd',
'allow rwx', ] 'allow rwx', ]
] ]
result = ceph_key.create_key(fake_module, fake_result, fake_cluster, result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_user, fake_user_key,
fake_name, fake_secret, fake_caps, fake_import_key, fake_file_destination) # noqa E501 fake_name, fake_secret, fake_caps, fake_import_key, fake_file_destination) # noqa E501
assert result == expected_command_list assert result == expected_command_list
def test_create_key_container_no_import(self): def test_create_key_container_no_import(self):
fake_module = "fake" fake_module = "fake"
fake_user = 'client.admin'
fake_user_key = '/etc/ceph/fake.client.admin.keyring'
fake_result = "fake" fake_result = "fake"
fake_cluster = "fake" fake_cluster = "fake"
fake_name = "client.fake" fake_name = "client.fake"
@ -309,7 +313,7 @@ class TestCephKeyModule(object):
fake_file_destination = os.path.join(fake_dest, fake_keyring_filename) fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
# create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array # noqa E501 # create_key passes (one for ceph-authtool and one for itself) itw own array so the expected result is an array within an array # noqa E501
fake_container_image = "quay.ceph.io/ceph-ci/daemon:latest-luminous" fake_container_image = "quay.ceph.io/ceph-ci/daemon:latest-luminous"
expected_command_list = [['docker', expected_command_list = [['docker', # noqa E128
'run', 'run',
'--rm', '--rm',
'--net=host', '--net=host',
@ -330,21 +334,25 @@ class TestCephKeyModule(object):
'--cap', '--cap',
'osd', 'osd',
'allow rwx']] 'allow rwx']]
result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_name, result = ceph_key.create_key(fake_module, fake_result, fake_cluster, fake_user, fake_user_key, fake_name,
fake_secret, fake_caps, fake_import_key, fake_file_destination, fake_container_image) fake_secret, fake_caps, fake_import_key, fake_file_destination, fake_container_image)
assert result == expected_command_list assert result == expected_command_list
def test_delete_key_non_container(self): def test_delete_key_non_container(self):
fake_user = 'client.admin'
fake_user_key = '/etc/ceph/fake.client.admin.keyring'
fake_cluster = "fake" fake_cluster = "fake"
fake_name = "client.fake" fake_name = "client.fake"
expected_command_list = [ expected_command_list = [
['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring', ['ceph', '-n', 'client.admin', '-k', '/etc/ceph/fake.client.admin.keyring',
'--cluster', fake_cluster, 'auth', 'del', fake_name], '--cluster', fake_cluster, 'auth', 'del', fake_name],
] ]
result = ceph_key.delete_key(fake_cluster, fake_name) result = ceph_key.delete_key(fake_cluster, fake_user, fake_user_key, fake_name)
assert result == expected_command_list assert result == expected_command_list
def test_delete_key_container(self): def test_delete_key_container(self):
fake_user = 'client.admin'
fake_user_key = '/etc/ceph/fake.client.admin.keyring'
fake_cluster = "fake" fake_cluster = "fake"
fake_name = "client.fake" fake_name = "client.fake"
fake_container_image = "quay.ceph.io/ceph-ci/daemon:latest-luminous" fake_container_image = "quay.ceph.io/ceph-ci/daemon:latest-luminous"
@ -362,31 +370,32 @@ class TestCephKeyModule(object):
'--cluster', fake_cluster, '--cluster', fake_cluster,
'auth', 'del', fake_name]] 'auth', 'del', fake_name]]
result = ceph_key.delete_key( result = ceph_key.delete_key(
fake_cluster, fake_name, fake_container_image) fake_cluster, fake_user, fake_user_key, fake_name, fake_container_image)
assert result == expected_command_list assert result == expected_command_list
def test_info_key_non_container(self): def test_info_key_non_container(self):
fake_user = 'client.admin'
fake_user_key = '/etc/ceph/fake.client.admin.keyring'
fake_cluster = "fake" fake_cluster = "fake"
fake_name = "client.fake" fake_name = "client.fake"
fake_user = "fake-user" fake_user = "fake-user"
fake_key = "/tmp/my-key"
fake_output_format = "json" fake_output_format = "json"
expected_command_list = [ expected_command_list = [
['ceph', '-n', "fake-user", '-k', "/tmp/my-key", '--cluster', fake_cluster, 'auth', ['ceph', '-n', fake_user, '-k', fake_user_key, '--cluster', fake_cluster, 'auth',
'get', fake_name, '-f', 'json'], 'get', fake_name, '-f', 'json'],
] ]
result = ceph_key.info_key( result = ceph_key.info_key(
fake_cluster, fake_name, fake_user, fake_key, fake_output_format) fake_cluster, fake_name, fake_user, fake_user_key, fake_output_format)
assert result == expected_command_list assert result == expected_command_list
def test_info_key_container(self): def test_info_key_container(self):
fake_cluster = "fake" fake_cluster = "fake"
fake_name = "client.fake" fake_name = "client.fake"
fake_user = "fake-user" fake_user = 'client.admin'
fake_key = "/tmp/my-key" fake_user_key = '/etc/ceph/fake.client.admin.keyring'
fake_output_format = "json" fake_output_format = "json"
fake_container_image = "quay.ceph.io/ceph-ci/daemon:latest-luminous" fake_container_image = "quay.ceph.io/ceph-ci/daemon:latest-luminous"
expected_command_list = [['docker', expected_command_list = [['docker', # noqa E128
'run', 'run',
'--rm', '--rm',
'--net=host', '--net=host',
@ -395,13 +404,13 @@ class TestCephKeyModule(object):
'-v', '/var/log/ceph/:/var/log/ceph/:z', '-v', '/var/log/ceph/:/var/log/ceph/:z',
'--entrypoint=ceph', '--entrypoint=ceph',
'quay.ceph.io/ceph-ci/daemon:latest-luminous', 'quay.ceph.io/ceph-ci/daemon:latest-luminous',
'-n', "fake-user", '-n', fake_user,
'-k', "/tmp/my-key", '-k', fake_user_key,
'--cluster', fake_cluster, '--cluster', fake_cluster,
'auth', 'get', fake_name, 'auth', 'get', fake_name,
'-f', 'json']] '-f', 'json']]
result = ceph_key.info_key( result = ceph_key.info_key(
fake_cluster, fake_name, fake_user, fake_key, fake_output_format, fake_container_image) fake_cluster, fake_name, fake_user, fake_user_key, fake_output_format, fake_container_image) # noqa E501
assert result == expected_command_list assert result == expected_command_list
def test_list_key_non_container(self): def test_list_key_non_container(self):
@ -417,12 +426,14 @@ class TestCephKeyModule(object):
def test_get_key_container(self): def test_get_key_container(self):
fake_cluster = "fake" fake_cluster = "fake"
fake_user = 'client.admin'
fake_user_key = '/etc/ceph/fake.client.admin.keyring'
fake_name = "client.fake" fake_name = "client.fake"
fake_container_image = "quay.ceph.io/ceph-ci/daemon:latest-luminous" fake_container_image = "quay.ceph.io/ceph-ci/daemon:latest-luminous"
fake_dest = "/fake/ceph" fake_dest = "/fake/ceph"
fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring" fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
fake_file_destination = os.path.join(fake_dest, fake_keyring_filename) fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
expected_command_list = [['docker', expected_command_list = [['docker', # noqa E128
'run', 'run',
'--rm', '--rm',
'--net=host', '--net=host',
@ -431,27 +442,29 @@ class TestCephKeyModule(object):
'-v', '/var/log/ceph/:/var/log/ceph/:z', '-v', '/var/log/ceph/:/var/log/ceph/:z',
'--entrypoint=ceph', '--entrypoint=ceph',
'quay.ceph.io/ceph-ci/daemon:latest-luminous', 'quay.ceph.io/ceph-ci/daemon:latest-luminous',
'-n', "client.admin", '-n', fake_user,
'-k', "/etc/ceph/fake.client.admin.keyring", '-k', fake_user_key,
'--cluster', fake_cluster, '--cluster', fake_cluster,
'auth', 'get', 'auth', 'get',
fake_name, '-o', fake_file_destination], ] fake_name, '-o', fake_file_destination]]
result = ceph_key.get_key( result = ceph_key.get_key(
fake_cluster, fake_name, fake_file_destination, fake_container_image) fake_cluster, fake_user, fake_user_key, fake_name, fake_file_destination, fake_container_image)
assert result == expected_command_list assert result == expected_command_list
def test_get_key_non_container(self): def test_get_key_non_container(self):
fake_cluster = "fake" fake_cluster = "fake"
fake_user = 'client.admin'
fake_user_key = '/etc/ceph/fake.client.admin.keyring'
fake_dest = "/fake/ceph" fake_dest = "/fake/ceph"
fake_name = "client.fake" fake_name = "client.fake"
fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring" fake_keyring_filename = fake_cluster + "." + fake_name + ".keyring"
fake_file_destination = os.path.join(fake_dest, fake_keyring_filename) fake_file_destination = os.path.join(fake_dest, fake_keyring_filename)
expected_command_list = [ expected_command_list = [
['ceph', '-n', "client.admin", '-k', "/etc/ceph/fake.client.admin.keyring", ['ceph', '-n', fake_user, '-k', fake_user_key,
'--cluster', fake_cluster, 'auth', 'get', fake_name, '-o', fake_file_destination], '--cluster', fake_cluster, 'auth', 'get', fake_name, '-o', fake_file_destination],
] ]
result = ceph_key.get_key( result = ceph_key.get_key(
fake_cluster, fake_name, fake_file_destination) fake_cluster, fake_user, fake_user_key, fake_name, fake_file_destination)
assert result == expected_command_list assert result == expected_command_list
def test_list_key_non_container_with_mon_key(self): def test_list_key_non_container_with_mon_key(self):