diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index e5608b3e2..987191a94 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -32,6 +32,7 @@ import ( "github.com/ceph/go-ceph/rados" librbd "github.com/ceph/go-ceph/rbd" + "github.com/ceph/go-ceph/rbd/admin" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" @@ -1733,3 +1734,41 @@ func (rs *rbdSnapshot) isCompatibleThickProvision(dst *rbdVolume) error { return nil } + +func (ri *rbdImage) addSnapshotScheduling( + interval admin.Interval, + startTime admin.StartTime) error { + ls := admin.NewLevelSpec(ri.Pool, ri.RadosNamespace, ri.RbdImageName) + ra, err := ri.conn.GetRBDAdmin() + if err != nil { + return err + } + adminConn := ra.MirrorSnashotSchedule() + // list all the snapshot scheduling and check at least one image scheduling + // exists with specified interval. + ssList, err := adminConn.List(ls) + if err != nil { + return err + } + + for _, ss := range ssList { + // make sure we are matching image level scheduling. The + // `adminConn.List` lists the global level scheduling also. + if ss.Name == ri.String() { + for _, s := range ss.Schedule { + // TODO: Add support to check start time also. + // The start time is currently stored with different format + // in ceph. Comparison is not possible unless we know in + // which format ceph is storing it. + if s.Interval == interval { + return err + } + } + } + } + err = adminConn.Add(ls, interval, startTime) + if err != nil { + return err + } + return nil +} diff --git a/internal/rbd/replicationcontrollerserver.go b/internal/rbd/replicationcontrollerserver.go index 8ce695056..294e0e897 100644 --- a/internal/rbd/replicationcontrollerserver.go +++ b/internal/rbd/replicationcontrollerserver.go @@ -19,12 +19,14 @@ package rbd import ( "context" "errors" + "regexp" "strconv" "strings" "github.com/ceph/ceph-csi/internal/util" librbd "github.com/ceph/go-ceph/rbd" + "github.com/ceph/go-ceph/rbd/admin" "github.com/csi-addons/spec/lib/go/replication" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -56,6 +58,18 @@ const ( imageMirroringKey = "mirroringMode" // forceKey + key to get the force option from parameters. forceKey = "force" + + // schedulingIntervalKey to get the schedulingInterval from the + // parameters. + // Interval of time between scheduled snapshots. Typically in the form + // . + schedulingIntervalKey = "schedulingInterval" + + // schedulingStartTimeKey to get the schedulingStartTime from the + // parameters. + // (optional) StartTime is the time the snapshot schedule + // begins, can be specified using the ISO 8601 time format. + schedulingStartTimeKey = "schedulingStartTime" ) // ReplicationServer struct of rbd CSI driver with supported methods of Replication @@ -108,6 +122,56 @@ func getMirroringMode(ctx context.Context, parameters map[string]string) (librbd return mirroringMode, nil } +// getSchedulingDetails gets the mirroring mode and scheduling details from the +// input GRPC request parameters and validates the scheduling is only supported +// for mirroring mode. +func getSchedulingDetails(parameters map[string]string) (admin.Interval, admin.StartTime, error) { + admInt := admin.NoInterval + adminStartTime := admin.NoStartTime + var err error + + val := parameters[imageMirroringKey] + + switch imageMirroringMode(val) { + case imageMirrorModeSnapshot: + default: + return admInt, adminStartTime, status.Error(codes.InvalidArgument, "scheduling is only supported for snapshot mode") + } + + // validate mandatory interval field + interval, ok := parameters[schedulingIntervalKey] + if ok && interval == "" { + return admInt, adminStartTime, status.Error(codes.InvalidArgument, "scheduling interval cannot be empty") + } + adminStartTime = admin.StartTime(parameters[schedulingStartTimeKey]) + if !ok { + // startTime is alone not supported it has to be present with interval + if adminStartTime != "" { + return admInt, admin.NoStartTime, status.Errorf(codes.InvalidArgument, + "%q parameter is supported only with %q", + schedulingStartTimeKey, + schedulingIntervalKey) + } + } + if interval != "" { + admInt, err = validateSchedulingInterval(interval) + if err != nil { + return admInt, admin.NoStartTime, status.Error(codes.InvalidArgument, err.Error()) + } + } + return admInt, adminStartTime, nil +} + +// validateSchedulingInterval return the interval as it is if its ending with +// `m|h|d` or else it will return error. +func validateSchedulingInterval(interval string) (admin.Interval, error) { + var re = regexp.MustCompile(`^[0-9]+[mhd]$`) + if re.MatchString(interval) { + return admin.Interval(interval), nil + } + return "", errors.New("interval specified without d, h, m suffix") +} + // EnableVolumeReplication extracts the RBD volume information from the // volumeID, If the image is present it will enable the mirroring based on the // user provided information. @@ -124,6 +188,11 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, } defer cr.DeleteCredentials() + interval, startTime, err := getSchedulingDetails(req.GetParameters()) + if err != nil { + return nil, err + } + if acquired := rs.VolumeLocks.TryAcquire(volumeID); !acquired { util.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) @@ -162,6 +231,20 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context, return nil, status.Error(codes.Internal, err.Error()) } } + + if interval != "" { + err = rbdVol.addSnapshotScheduling(interval, startTime) + if err != nil { + return nil, err + } + util.DebugLog( + ctx, + "Added scheduling at interval %s, start time %s for volume %s", + interval, + startTime, + rbdVol) + } + return &replication.EnableVolumeReplicationResponse{}, nil } diff --git a/internal/rbd/replicationcontrollerserver_test.go b/internal/rbd/replicationcontrollerserver_test.go new file mode 100644 index 000000000..e5b2aada3 --- /dev/null +++ b/internal/rbd/replicationcontrollerserver_test.go @@ -0,0 +1,155 @@ +/* +Copyright 2021 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rbd + +import ( + "reflect" + "testing" + + "github.com/ceph/go-ceph/rbd/admin" +) + +func TestValidateSchedulingInterval(t *testing.T) { + t.Parallel() + tests := []struct { + name string + interval string + want admin.Interval + wantErr bool + }{ + { + "valid interval in minutes", + "3m", + admin.Interval("3m"), + false, + }, + { + "valid interval in hour", + "22h", + admin.Interval("22h"), + false, + }, + { + "valid interval in days", + "13d", + admin.Interval("13d"), + false, + }, + { + "invalid interval without number", + "d", + admin.Interval(""), + true, + }, + { + "invalid interval without (m|h|d) suffix", + "12", + admin.Interval(""), + true, + }, + } + for _, tt := range tests { + st := tt + t.Run(tt.name, func(t *testing.T) { + got, err := validateSchedulingInterval(st.interval) + if (err != nil) != st.wantErr { + t.Errorf("validateSchedulingInterval() error = %v, wantErr %v", err, st.wantErr) + return + } + if !reflect.DeepEqual(got, st.want) { + t.Errorf("validateSchedulingInterval() = %v, want %v", got, st.want) + } + }) + } +} + +func TestGetSchedulingDetails(t *testing.T) { + tests := []struct { + name string + parameters map[string]string + wantInterval admin.Interval + wantStartTime admin.StartTime + wantErr bool + }{ + { + "valid parameters", + map[string]string{ + imageMirroringKey: string(imageMirrorModeSnapshot), + schedulingIntervalKey: "1h", + schedulingStartTimeKey: "14:00:00-05:00", + }, + admin.Interval("1h"), + admin.StartTime("14:00:00-05:00"), + false, + }, + { + "valid parameters when optional startTime is missing", + map[string]string{ + imageMirroringKey: string(imageMirrorModeSnapshot), + schedulingIntervalKey: "1h", + }, + admin.Interval("1h"), + admin.NoStartTime, + false, + }, + { + "when mirroring mode is journal", + map[string]string{ + imageMirroringKey: "journal", + schedulingIntervalKey: "1h", + }, + admin.NoInterval, + admin.NoStartTime, + true, + }, + { + "when startTime is specified without interval", + map[string]string{ + imageMirroringKey: string(imageMirrorModeSnapshot), + schedulingStartTimeKey: "14:00:00-05:00", + }, + admin.NoInterval, + admin.NoStartTime, + true, + }, + { + "when no scheduling is specified", + map[string]string{ + imageMirroringKey: string(imageMirrorModeSnapshot), + }, + admin.NoInterval, + admin.NoStartTime, + false, + }, + } + for _, tt := range tests { + st := tt + t.Run(tt.name, func(t *testing.T) { + interval, startTime, err := getSchedulingDetails(st.parameters) + if (err != nil) != st.wantErr { + t.Errorf("getSchedulingDetails() error = %v, wantErr %v", err, st.wantErr) + return + } + if !reflect.DeepEqual(interval, st.wantInterval) { + t.Errorf("getSchedulingDetails() interval = %v, want %v", interval, st.wantInterval) + } + if !reflect.DeepEqual(startTime, st.wantStartTime) { + t.Errorf("getSchedulingDetails() startTime = %v, want %v", startTime, st.wantStartTime) + } + }) + } +}