From 0df8415067a021793dce689c64708c6a0f85c86c Mon Sep 17 00:00:00 2001 From: gman Date: Tue, 20 Mar 2018 15:46:31 +0100 Subject: [PATCH] cephfs: cleaning/renaming --- deploy/cephfs/docker/cephfs_provisioner.py | 332 --------------------- pkg/cephfs/{cephfs.go => driver.go} | 16 +- 2 files changed, 5 insertions(+), 343 deletions(-) delete mode 100644 deploy/cephfs/docker/cephfs_provisioner.py rename pkg/cephfs/{cephfs.go => driver.go} (84%) diff --git a/deploy/cephfs/docker/cephfs_provisioner.py b/deploy/cephfs/docker/cephfs_provisioner.py deleted file mode 100644 index fff4637e9..000000000 --- a/deploy/cephfs/docker/cephfs_provisioner.py +++ /dev/null @@ -1,332 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2017 The Kubernetes Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import rados -import getopt -import sys -import json - -""" -CEPH_CLUSTER_NAME=test CEPH_MON=172.24.0.4 CEPH_AUTH_ID=admin CEPH_AUTH_KEY=AQCMpH9YM4Q1BhAAXGNQyyOne8ZsXqWGon/dIQ== cephfs_provisioner.py -n foo -u bar -""" -try: - import ceph_volume_client - ceph_module_found = True -except ImportError as e: - ceph_volume_client = None - ceph_module_found = False - -VOlUME_GROUP="kubernetes" -CONF_PATH="/etc/ceph/" - -class CephFSNativeDriver(object): - """Driver for the Ceph Filesystem. - - This driver is 'native' in the sense that it exposes a CephFS filesystem - for use directly by guests, with no intermediate layer like NFS. - """ - - def __init__(self, *args, **kwargs): - self._volume_client = None - - - def _create_conf(self, cluster_name, mons): - """ Create conf using monitors - Create a minimal ceph conf with monitors and cephx - """ - conf_path = CONF_PATH + cluster_name + ".conf" - if not os.path.isfile(conf_path) or os.access(conf_path, os.W_OK): - conf = open(conf_path, 'w') - conf.write("[global]\n") - conf.write("mon_host = " + mons + "\n") - conf.write("auth_cluster_required = cephx\nauth_service_required = cephx\nauth_client_required = cephx\n") - conf.close() - return conf_path - - def _create_keyring(self, cluster_name, id, key): - """ Create client keyring using id and key - """ - keyring_path = CONF_PATH + cluster_name + "." + "client." + id + ".keyring" - if not os.path.isfile(keyring_path) or os.access(keyring_path, os.W_OK): - keyring = open(keyring_path, 'w') - keyring.write("[client." + id + "]\n") - keyring.write("key = " + key + "\n") - keyring.write("caps mds = \"allow *\"\n") - keyring.write("caps mon = \"allow *\"\n") - keyring.write("caps osd = \"allow *\"\n") - keyring.close() - - @property - def volume_client(self): - if self._volume_client: - return self._volume_client - - if not ceph_module_found: - raise ValueError("Ceph client libraries not found.") - - try: - cluster_name = os.environ["CEPH_CLUSTER_NAME"] - except KeyError: - cluster_name = "ceph" - try: - mons = os.environ["CEPH_MON"] - except KeyError: - raise ValueError("Missing CEPH_MON env") - try: - auth_id = os.environ["CEPH_AUTH_ID"] - except KeyError: - raise ValueError("Missing CEPH_AUTH_ID") - try: - auth_key = os.environ["CEPH_AUTH_KEY"] - except: - raise ValueError("Missing CEPH_AUTH_KEY") - - conf_path = self._create_conf(cluster_name, mons) - self._create_keyring(cluster_name, auth_id, auth_key) - - self._volume_client = ceph_volume_client.CephFSVolumeClient( - auth_id, conf_path, cluster_name) - try: - self._volume_client.connect(None) - except Exception: - self._volume_client = None - raise - - return self._volume_client - - def _authorize_ceph(self, volume_path, auth_id, readonly): - path = self._volume_client._get_path(volume_path) - - # First I need to work out what the data pool is for this share: - # read the layout - pool_name = self._volume_client._get_ancestor_xattr(path, "ceph.dir.layout.pool") - namespace = self._volume_client.fs.getxattr(path, "ceph.dir.layout.pool_namespace") - - # Now construct auth capabilities that give the guest just enough - # permissions to access the share - client_entity = "client.{0}".format(auth_id) - want_access_level = 'r' if readonly else 'rw' - want_mds_cap = 'allow r,allow {0} path={1}'.format(want_access_level, path) - want_osd_cap = 'allow {0} pool={1} namespace={2}'.format( - want_access_level, pool_name, namespace) - - try: - existing = self._volume_client._rados_command( - 'auth get', - { - 'entity': client_entity - } - ) - # FIXME: rados raising Error instead of ObjectNotFound in auth get failure - except rados.Error: - caps = self._volume_client._rados_command( - 'auth get-or-create', - { - 'entity': client_entity, - 'caps': [ - 'mds', want_mds_cap, - 'osd', want_osd_cap, - 'mon', 'allow r'] - }) - else: - # entity exists, update it - cap = existing[0] - - # Construct auth caps that if present might conflict with the desired - # auth caps. - unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw' - unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path) - unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format( - unwanted_access_level, pool_name, namespace) - - def cap_update(orig, want, unwanted): - # Updates the existing auth caps such that there is a single - # occurrence of wanted auth caps and no occurrence of - # conflicting auth caps. - - cap_tokens = set(orig.split(",")) - - cap_tokens.discard(unwanted) - cap_tokens.add(want) - - return ",".join(cap_tokens) - - osd_cap_str = cap_update(cap['caps'].get('osd', ""), want_osd_cap, unwanted_osd_cap) - mds_cap_str = cap_update(cap['caps'].get('mds', ""), want_mds_cap, unwanted_mds_cap) - - caps = self._volume_client._rados_command( - 'auth caps', - { - 'entity': client_entity, - 'caps': [ - 'mds', mds_cap_str, - 'osd', osd_cap_str, - 'mon', cap['caps'].get('mon')] - }) - caps = self._volume_client._rados_command( - 'auth get', - { - 'entity': client_entity - } - ) - - # Result expected like this: - # [ - # { - # "entity": "client.foobar", - # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==", - # "caps": { - # "mds": "allow *", - # "mon": "allow *" - # } - # } - # ] - assert len(caps) == 1 - assert caps[0]['entity'] == client_entity - return caps[0] - - def create_share(self, path, user_id, size=None): - """Create a CephFS volume. - """ - volume_path = ceph_volume_client.VolumePath(VOlUME_GROUP, path) - - # Create the CephFS volume - volume = self.volume_client.create_volume(volume_path, size=size) - - # To mount this you need to know the mon IPs and the path to the volume - mon_addrs = self.volume_client.get_mon_addrs() - - export_location = "{addrs}:{path}".format( - addrs=",".join(mon_addrs), - path=volume['mount_path']) - - """TODO - restrict to user_id - """ - auth_result = self._authorize_ceph(volume_path, user_id, False) - ret = { - 'path': volume['mount_path'], - 'user': auth_result['entity'], - 'key': auth_result['key'] - } - - self._create_keyring(self.volume_client.cluster_name, user_id, auth_result['key']) - - return json.dumps(ret) - - def _deauthorize(self, volume_path, auth_id): - """ - The volume must still exist. - NOTE: In our `_authorize_ceph` method we give user extra mds `allow r` - cap to work around a kernel cephfs issue. So we need a customized - `_deauthorize` method to remove caps instead of using - `volume_client._deauthorize`. - This methid is modified from - https://github.com/ceph/ceph/blob/v13.0.0/src/pybind/ceph_volume_client.py#L1181. - """ - client_entity = "client.{0}".format(auth_id) - path = self.volume_client._get_path(volume_path) - path = self.volume_client._get_path(volume_path) - pool_name = self.volume_client._get_ancestor_xattr(path, "ceph.dir.layout.pool") - namespace = self.volume_client.fs.getxattr(path, "ceph.dir.layout.pool_namespace") - - # The auth_id might have read-only or read-write mount access for the - # volume path. - access_levels = ('r', 'rw') - want_mds_caps = {'allow {0} path={1}'.format(access_level, path) - for access_level in access_levels} - want_osd_caps = {'allow {0} pool={1} namespace={2}'.format( - access_level, pool_name, namespace) - for access_level in access_levels} - - try: - existing = self.volume_client._rados_command( - 'auth get', - { - 'entity': client_entity - } - ) - - def cap_remove(orig, want): - cap_tokens = set(orig.split(",")) - return ",".join(cap_tokens.difference(want)) - - cap = existing[0] - osd_cap_str = cap_remove(cap['caps'].get('osd', ""), want_osd_caps) - mds_cap_str = cap_remove(cap['caps'].get('mds', ""), want_mds_caps) - - if (not osd_cap_str) and (not osd_cap_str or mds_cap_str == "allow r"): - # If osd caps are removed and mds caps are removed or only have "allow r", we can remove entity safely. - self.volume_client._rados_command('auth del', {'entity': client_entity}, decode=False) - else: - self.volume_client._rados_command( - 'auth caps', - { - 'entity': client_entity, - 'caps': [ - 'mds', mds_cap_str, - 'osd', osd_cap_str, - 'mon', cap['caps'].get('mon', 'allow r')] - }) - - # FIXME: rados raising Error instead of ObjectNotFound in auth get failure - except rados.Error: - # Already gone, great. - return - - def delete_share(self, path, user_id): - volume_path = ceph_volume_client.VolumePath(VOlUME_GROUP, path) - self._deauthorize(volume_path, user_id) - self.volume_client.delete_volume(volume_path) - self.volume_client.purge_volume(volume_path) - - def __del__(self): - if self._volume_client: - self._volume_client.disconnect() - self._volume_client = None - -def main(): - create = True - share = "" - user = "" - cephfs = CephFSNativeDriver() - try: - opts, args = getopt.getopt(sys.argv[1:], "rn:u:", ["remove"]) - except getopt.GetoptError: - print "Usage: " + sys.argv[0] + " --remove -n share_name -u ceph_user_id" - sys.exit(1) - - for opt, arg in opts: - if opt == '-n': - share = arg - elif opt == '-u': - user = arg - elif opt in ("-r", "--remove"): - create = False - - if share == "" or user == "": - print "Usage: " + sys.argv[0] + " --remove -n share_name -u ceph_user_id" - sys.exit(1) - - if create == True: - print cephfs.create_share(share, user) - else: - cephfs.delete_share(share, user) - - -if __name__ == "__main__": - main() diff --git a/pkg/cephfs/cephfs.go b/pkg/cephfs/driver.go similarity index 84% rename from pkg/cephfs/cephfs.go rename to pkg/cephfs/driver.go index 29e3a96d0..d280423a4 100644 --- a/pkg/cephfs/cephfs.go +++ b/pkg/cephfs/driver.go @@ -19,12 +19,12 @@ package cephfs import ( "github.com/golang/glog" - "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/container-storage-interface/spec/lib/go/csi/v0" "github.com/kubernetes-csi/drivers/pkg/csi-common" ) const ( - PluginFolder = "/var/lib/kubelet/plugins/cephfsplugin" + PluginFolder = "/var/lib/kubelet/plugins/csi-cephfsplugin" ) type cephfsDriver struct { @@ -40,15 +40,9 @@ type cephfsDriver struct { var ( driver *cephfsDriver - version = csi.Version{ - Minor: 2, - } + version = "0.2.0" ) -func GetSupportedVersions() []*csi.Version { - return []*csi.Version{&version} -} - func NewCephFSDriver() *cephfsDriver { return &cephfsDriver{} } @@ -72,11 +66,11 @@ func NewNodeServer(d *csicommon.CSIDriver) *nodeServer { } func (fs *cephfsDriver) Run(driverName, nodeId, endpoint string) { - glog.Infof("Driver: %v version: %v", driverName, GetVersionString(&version)) + glog.Infof("Driver: %v version: %v", driverName, version) // Initialize default library driver - fs.driver = csicommon.NewCSIDriver(driverName, &version, GetSupportedVersions(), nodeId) + fs.driver = csicommon.NewCSIDriver(driverName, version, nodeId) if fs.driver == nil { glog.Fatalln("Failed to initialize CSI driver") }