From cd83c7be48bf0802b982d5d08456a13443d708b3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 21 Feb 2022 20:13:09 +0000 Subject: [PATCH] rebase: bump github.com/ceph/go-ceph from 0.13.0 to 0.14.0 Bumps [github.com/ceph/go-ceph](https://github.com/ceph/go-ceph) from 0.13.0 to 0.14.0. - [Release notes](https://github.com/ceph/go-ceph/releases) - [Changelog](https://github.com/ceph/go-ceph/blob/master/docs/release-process.md) - [Commits](https://github.com/ceph/go-ceph/compare/v0.13.0...v0.14.0) --- updated-dependencies: - dependency-name: github.com/ceph/go-ceph dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 +- .../ceph/go-ceph/cephfs/admin/mgrmodule.go | 62 +-- .../go-ceph/common/admin/manager/admin.go | 17 + .../ceph/go-ceph/common/admin/manager/doc.go | 5 + .../go-ceph/common/admin/manager/module.go | 75 ++++ .../go-ceph/internal/cutil/sync_buffer.go | 2 +- .../rados/rados_read_op_assert_version.go | 22 + .../rados/rados_write_op_assert_version.go | 22 + .../go-ceph/rados/rados_write_op_remove.go | 19 + .../go-ceph/rados/rados_write_op_setxattr.go | 34 ++ .../rados/read_op_omap_get_vals_by_keys.go | 120 ++++++ .../ceph/go-ceph/rados/read_op_read.go | 75 ++++ .../ceph/go-ceph/rados/read_step.go | 31 ++ .../github.com/ceph/go-ceph/rados/watcher.go | 379 ++++++++++++++++++ ...write_op_preview.go => write_op_cmpext.go} | 4 - vendor/github.com/ceph/go-ceph/rbd/rbd.go | 24 +- vendor/modules.txt | 3 +- 18 files changed, 832 insertions(+), 68 deletions(-) create mode 100644 vendor/github.com/ceph/go-ceph/common/admin/manager/admin.go create mode 100644 vendor/github.com/ceph/go-ceph/common/admin/manager/doc.go create mode 100644 vendor/github.com/ceph/go-ceph/common/admin/manager/module.go create mode 100644 vendor/github.com/ceph/go-ceph/rados/rados_read_op_assert_version.go create mode 100644 vendor/github.com/ceph/go-ceph/rados/rados_write_op_assert_version.go create mode 100644 vendor/github.com/ceph/go-ceph/rados/rados_write_op_remove.go create mode 100644 vendor/github.com/ceph/go-ceph/rados/rados_write_op_setxattr.go create mode 100644 vendor/github.com/ceph/go-ceph/rados/read_op_omap_get_vals_by_keys.go create mode 100644 vendor/github.com/ceph/go-ceph/rados/read_op_read.go create mode 100644 vendor/github.com/ceph/go-ceph/rados/read_step.go create mode 100644 vendor/github.com/ceph/go-ceph/rados/watcher.go rename vendor/github.com/ceph/go-ceph/rados/{write_op_preview.go => write_op_cmpext.go} (95%) diff --git a/go.mod b/go.mod index bd7f898a6..372a7ffe9 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/IBM/keyprotect-go-client v0.7.0 github.com/aws/aws-sdk-go v1.42.53 github.com/ceph/ceph-csi/api v0.0.0-00010101000000-000000000000 - github.com/ceph/go-ceph v0.13.0 + github.com/ceph/go-ceph v0.14.0 github.com/container-storage-interface/spec v1.5.0 github.com/csi-addons/replication-lib-utils v0.2.0 github.com/csi-addons/spec v0.1.2-0.20211220115741-32fa508dadbe diff --git a/go.sum b/go.sum index 13887ed6c..887bdadc2 100644 --- a/go.sum +++ b/go.sum @@ -168,8 +168,8 @@ github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3 github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/centrify/cloud-golang-sdk v0.0.0-20190214225812-119110094d0f/go.mod h1:C0rtzmGXgN78pYR0tGJFhtHgkbAs0lIbHwkB81VxDQE= -github.com/ceph/go-ceph v0.13.0 h1:69dgIPlNHD2OCz98T0benI4++vcnShGcpQK4RIALjw4= -github.com/ceph/go-ceph v0.13.0/go.mod h1:mafFpf5Vg8Ai8Bd+FAMvKBHLmtdpTXdRP/TNq8XWegY= +github.com/ceph/go-ceph v0.14.0 h1:sJoT0au7NT3TPmDWf5W9w6tZy0U/5xZrIXVVauZR+Xo= +github.com/ceph/go-ceph v0.14.0/go.mod h1:mafFpf5Vg8Ai8Bd+FAMvKBHLmtdpTXdRP/TNq8XWegY= github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= diff --git a/vendor/github.com/ceph/go-ceph/cephfs/admin/mgrmodule.go b/vendor/github.com/ceph/go-ceph/cephfs/admin/mgrmodule.go index 7efcd0ce4..21b8dbfbc 100644 --- a/vendor/github.com/ceph/go-ceph/cephfs/admin/mgrmodule.go +++ b/vendor/github.com/ceph/go-ceph/cephfs/admin/mgrmodule.go @@ -1,40 +1,31 @@ package admin import ( - "github.com/ceph/go-ceph/internal/commands" + "github.com/ceph/go-ceph/common/admin/manager" ) const mirroring = "mirroring" // EnableModule will enable the specified manager module. // +// Deprecated: use the equivalent function in cluster/admin/manager. +// // Similar To: // ceph mgr module enable [--force] func (fsa *FSAdmin) EnableModule(module string, force bool) error { - m := map[string]string{ - "prefix": "mgr module enable", - "module": module, - "format": "json", - } - if force { - m["force"] = "--force" - } - // Why is this _only_ part of the mon command json? You'd think a mgr - // command would be available as a MgrCommand but I couldn't figure it out. - return commands.MarshalMonCommand(fsa.conn, m).NoData().End() + mgradmin := manager.NewFromConn(fsa.conn) + return mgradmin.EnableModule(module, force) } // DisableModule will disable the specified manager module. // +// Deprecated: use the equivalent function in cluster/admin/manager. +// // Similar To: // ceph mgr module disable func (fsa *FSAdmin) DisableModule(module string) error { - m := map[string]string{ - "prefix": "mgr module disable", - "module": module, - "format": "json", - } - return commands.MarshalMonCommand(fsa.conn, m).NoData().End() + mgradmin := manager.NewFromConn(fsa.conn) + return mgradmin.DisableModule(module) } // EnableMirroringModule will enable the mirroring module for cephfs. @@ -42,7 +33,8 @@ func (fsa *FSAdmin) DisableModule(module string) error { // Similar To: // ceph mgr module enable mirroring [--force] func (fsa *FSAdmin) EnableMirroringModule(force bool) error { - return fsa.EnableModule(mirroring, force) + mgradmin := manager.NewFromConn(fsa.conn) + return mgradmin.EnableModule(mirroring, force) } // DisableMirroringModule will disable the mirroring module for cephfs. @@ -50,34 +42,6 @@ func (fsa *FSAdmin) EnableMirroringModule(force bool) error { // Similar To: // ceph mgr module disable mirroring func (fsa *FSAdmin) DisableMirroringModule() error { - return fsa.DisableModule(mirroring) -} - -type moduleInfo struct { - EnabledModules []string `json:"enabled_modules"` - //DisabledModules []string `json:"disabled_modules"` - // DisabledModules is documented in ceph as a list of string - // but that's not what comes back from the server (on pacific). - // Since we don't need this today, we're just going to ignore - // it, but if we ever want to support this for external consumers - // we'll need to figure out the real structure of this. -} - -func parseModuleInfo(res response) (*moduleInfo, error) { - m := &moduleInfo{} - if err := res.NoStatus().Unmarshal(m).End(); err != nil { - return nil, err - } - return m, nil -} - -// listModules returns moduleInfo or error. it is not exported because -// this is really not a cephfs specific thing but we needed it -// for cephfs tests. maybe lift it somewhere else someday. -func (fsa *FSAdmin) listModules() (*moduleInfo, error) { - m := map[string]string{ - "prefix": "mgr module ls", - "format": "json", - } - return parseModuleInfo(commands.MarshalMonCommand(fsa.conn, m)) + mgradmin := manager.NewFromConn(fsa.conn) + return mgradmin.DisableModule(mirroring) } diff --git a/vendor/github.com/ceph/go-ceph/common/admin/manager/admin.go b/vendor/github.com/ceph/go-ceph/common/admin/manager/admin.go new file mode 100644 index 000000000..912172b48 --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/common/admin/manager/admin.go @@ -0,0 +1,17 @@ +package manager + +import ( + ccom "github.com/ceph/go-ceph/common/commands" +) + +// MgrAdmin is used to administrate ceph's manager (mgr). +type MgrAdmin struct { + conn ccom.RadosCommander +} + +// NewFromConn creates an new management object from a preexisting +// rados connection. The existing connection can be rados.Conn or any +// type implementing the RadosCommander interface. +func NewFromConn(conn ccom.RadosCommander) *MgrAdmin { + return &MgrAdmin{conn} +} diff --git a/vendor/github.com/ceph/go-ceph/common/admin/manager/doc.go b/vendor/github.com/ceph/go-ceph/common/admin/manager/doc.go new file mode 100644 index 000000000..26283df6e --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/common/admin/manager/doc.go @@ -0,0 +1,5 @@ +/* +Package manager from common/admin contains a set of APIs used to interact +with and administer the Ceph manager (mgr). +*/ +package manager diff --git a/vendor/github.com/ceph/go-ceph/common/admin/manager/module.go b/vendor/github.com/ceph/go-ceph/common/admin/manager/module.go new file mode 100644 index 000000000..925b87321 --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/common/admin/manager/module.go @@ -0,0 +1,75 @@ +package manager + +import ( + "github.com/ceph/go-ceph/internal/commands" +) + +// EnableModule will enable the specified manager module. +// +// Similar To: +// ceph mgr module enable [--force] +func (fsa *MgrAdmin) EnableModule(module string, force bool) error { + m := map[string]string{ + "prefix": "mgr module enable", + "module": module, + "format": "json", + } + if force { + m["force"] = "--force" + } + // Why is this _only_ part of the mon command json? You'd think a mgr + // command would be available as a MgrCommand but I couldn't figure it out. + return commands.MarshalMonCommand(fsa.conn, m).NoData().End() +} + +// DisableModule will disable the specified manager module. +// +// Similar To: +// ceph mgr module disable +func (fsa *MgrAdmin) DisableModule(module string) error { + m := map[string]string{ + "prefix": "mgr module disable", + "module": module, + "format": "json", + } + return commands.MarshalMonCommand(fsa.conn, m).NoData().End() +} + +// DisabledModule describes a disabled Ceph mgr module. +// The Ceph JSON structure contains a complex module_options +// substructure that go-ceph does not currently implement. +type DisabledModule struct { + Name string `json:"name"` + CanRun bool `json:"can_run"` + ErrorString string `json:"error_string"` +} + +// ModuleInfo contains fields that report the status of modules within the +// ceph mgr. +type ModuleInfo struct { + // EnabledModules lists the names of the enabled modules. + EnabledModules []string `json:"enabled_modules"` + // AlwaysOnModules lists the names of the always-on modules. + AlwaysOnModules []string `json:"always_on_modules"` + // DisabledModules lists structures describing modules that are + // not currently enabled. + DisabledModules []DisabledModule `json:"disabled_modules"` +} + +func parseModuleInfo(res commands.Response) (*ModuleInfo, error) { + m := &ModuleInfo{} + if err := res.NoStatus().Unmarshal(m).End(); err != nil { + return nil, err + } + return m, nil +} + +// ListModules returns a module info struct reporting the lists of +// enabled, disabled, and always-on modules in the Ceph mgr. +func (fsa *MgrAdmin) ListModules() (*ModuleInfo, error) { + m := map[string]string{ + "prefix": "mgr module ls", + "format": "json", + } + return parseModuleInfo(commands.MarshalMonCommand(fsa.conn, m)) +} diff --git a/vendor/github.com/ceph/go-ceph/internal/cutil/sync_buffer.go b/vendor/github.com/ceph/go-ceph/internal/cutil/sync_buffer.go index c3f763b6d..b7cbf9b6d 100644 --- a/vendor/github.com/ceph/go-ceph/internal/cutil/sync_buffer.go +++ b/vendor/github.com/ceph/go-ceph/internal/cutil/sync_buffer.go @@ -26,4 +26,4 @@ func (v *SyncBuffer) Release() { // Sync asserts that changes in the C buffer are available in the data // slice -func (v *SyncBuffer) Sync() {} +func (*SyncBuffer) Sync() {} diff --git a/vendor/github.com/ceph/go-ceph/rados/rados_read_op_assert_version.go b/vendor/github.com/ceph/go-ceph/rados/rados_read_op_assert_version.go new file mode 100644 index 000000000..ce67c6ae3 --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/rados/rados_read_op_assert_version.go @@ -0,0 +1,22 @@ +//go:build ceph_preview +// +build ceph_preview + +package rados + +// #cgo LDFLAGS: -lrados +// #include +// #include +// +import "C" + +// AssertVersion ensures that the object exists and that its internal version +// number is equal to "ver" before reading. "ver" should be a version number +// previously obtained with IOContext.GetLastVersion(). +// PREVIEW +// +// Implements: +// void rados_read_op_assert_version(rados_read_op_t read_op, +// uint64_t ver) +func (r *ReadOp) AssertVersion(ver uint64) { + C.rados_read_op_assert_version(r.op, C.uint64_t(ver)) +} diff --git a/vendor/github.com/ceph/go-ceph/rados/rados_write_op_assert_version.go b/vendor/github.com/ceph/go-ceph/rados/rados_write_op_assert_version.go new file mode 100644 index 000000000..f9400a35c --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/rados/rados_write_op_assert_version.go @@ -0,0 +1,22 @@ +//go:build ceph_preview +// +build ceph_preview + +package rados + +// #cgo LDFLAGS: -lrados +// #include +// #include +// +import "C" + +// AssertVersion ensures that the object exists and that its internal version +// number is equal to "ver" before writing. "ver" should be a version number +// previously obtained with IOContext.GetLastVersion(). +// PREVIEW +// +// Implements: +// void rados_read_op_assert_version(rados_read_op_t read_op, +// uint64_t ver) +func (w *WriteOp) AssertVersion(ver uint64) { + C.rados_write_op_assert_version(w.op, C.uint64_t(ver)) +} diff --git a/vendor/github.com/ceph/go-ceph/rados/rados_write_op_remove.go b/vendor/github.com/ceph/go-ceph/rados/rados_write_op_remove.go new file mode 100644 index 000000000..103433e24 --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/rados/rados_write_op_remove.go @@ -0,0 +1,19 @@ +//go:build ceph_preview +// +build ceph_preview + +package rados + +// #cgo LDFLAGS: -lrados +// #include +// #include +// +import "C" + +// Remove object. +// PREVIEW +// +// Implements: +// void rados_write_op_remove(rados_write_op_t write_op) +func (w *WriteOp) Remove() { + C.rados_write_op_remove(w.op) +} diff --git a/vendor/github.com/ceph/go-ceph/rados/rados_write_op_setxattr.go b/vendor/github.com/ceph/go-ceph/rados/rados_write_op_setxattr.go new file mode 100644 index 000000000..ebacb714c --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/rados/rados_write_op_setxattr.go @@ -0,0 +1,34 @@ +//go:build ceph_preview +// +build ceph_preview + +package rados + +// #cgo LDFLAGS: -lrados +// #include +// #include +// +import "C" + +import ( + "unsafe" +) + +// SetXattr sets an xattr. +// PREVIEW +// +// Implements: +// void rados_write_op_setxattr(rados_write_op_t write_op, +// const char * name, +// const char * value, +// size_t value_len) +func (w *WriteOp) SetXattr(name string, value []byte) { + cName := C.CString(name) + defer C.free(unsafe.Pointer(cName)) + + C.rados_write_op_setxattr( + w.op, + cName, + (*C.char)(unsafe.Pointer(&value[0])), + C.size_t(len(value)), + ) +} diff --git a/vendor/github.com/ceph/go-ceph/rados/read_op_omap_get_vals_by_keys.go b/vendor/github.com/ceph/go-ceph/rados/read_op_omap_get_vals_by_keys.go new file mode 100644 index 000000000..c3289073b --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/rados/read_op_omap_get_vals_by_keys.go @@ -0,0 +1,120 @@ +//go:build ceph_preview +// +build ceph_preview + +package rados + +// #cgo LDFLAGS: -lrados +// #include +// #include +// +import "C" + +import ( + "unsafe" +) + +// ReadOpOmapGetValsByKeysStep holds the result of the +// GetOmapValuesByKeys read operation. +// Result is valid only after Operate() was called. +type ReadOpOmapGetValsByKeysStep struct { + // C arguments + + iter C.rados_omap_iter_t + prval *C.int + + // Internal state + + // canIterate is only set after the operation is performed and is + // intended to prevent premature fetching of data. + canIterate bool +} + +func newReadOpOmapGetValsByKeysStep() *ReadOpOmapGetValsByKeysStep { + s := &ReadOpOmapGetValsByKeysStep{ + prval: (*C.int)(C.malloc(C.sizeof_int)), + } + + return s +} + +func (s *ReadOpOmapGetValsByKeysStep) free() { + s.canIterate = false + C.rados_omap_get_end(s.iter) + + C.free(unsafe.Pointer(s.prval)) + s.prval = nil +} + +func (s *ReadOpOmapGetValsByKeysStep) update() error { + err := getError(*s.prval) + s.canIterate = (err == nil) + + return err +} + +// Next gets the next omap key/value pair referenced by +// ReadOpOmapGetValsByKeysStep's internal iterator. +// If there are no more elements to retrieve, (nil, nil) is returned. +// May be called only after Operate() finished. +// PREVIEW +func (s *ReadOpOmapGetValsByKeysStep) Next() (*OmapKeyValue, error) { + if !s.canIterate { + return nil, ErrOperationIncomplete + } + + var ( + cKey *C.char + cVal *C.char + cValLen C.size_t + ) + + ret := C.rados_omap_get_next(s.iter, &cKey, &cVal, &cValLen) + if ret != 0 { + return nil, getError(ret) + } + + if cKey == nil { + // Iterator has reached the end of the list. + return nil, nil + } + + return &OmapKeyValue{ + Key: C.GoString(cKey), + Value: C.GoBytes(unsafe.Pointer(cVal), C.int(cValLen)), + }, nil +} + +// GetOmapValuesByKeys starts iterating over specific key/value pairs. +// PREVIEW +// +// Implements: +// void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op, +// char const * const * keys, +// size_t keys_len, +// rados_omap_iter_t * iter, +// int * prval) +func (r *ReadOp) GetOmapValuesByKeys(keys []string) *ReadOpOmapGetValsByKeysStep { + s := newReadOpOmapGetValsByKeysStep() + r.steps = append(r.steps, s) + + cKeys := make([]*C.char, len(keys)) + defer func() { + for _, cKeyPtr := range cKeys { + C.free(unsafe.Pointer(cKeyPtr)) + } + }() + + for i, key := range keys { + cKeys[i] = C.CString(key) + } + + C.rados_read_op_omap_get_vals_by_keys( + r.op, + &cKeys[0], + C.size_t(len(keys)), + &s.iter, + s.prval, + ) + + return s +} diff --git a/vendor/github.com/ceph/go-ceph/rados/read_op_read.go b/vendor/github.com/ceph/go-ceph/rados/read_op_read.go new file mode 100644 index 000000000..0c4809360 --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/rados/read_op_read.go @@ -0,0 +1,75 @@ +//go:build ceph_preview +// +build ceph_preview + +package rados + +// #cgo LDFLAGS: -lrados +// #include +// #include +// +import "C" + +import ( + "unsafe" +) + +// ReadOpReadStep holds the result of the Read read operation. +// Result is valid only after Operate() was called. +type ReadOpReadStep struct { + // C returned data: + bytesRead *C.size_t + prval *C.int + + BytesRead int64 // Bytes read by this action. + Result int // Result of this action. +} + +func (s *ReadOpReadStep) update() error { + s.BytesRead = (int64)(*s.bytesRead) + s.Result = (int)(*s.prval) + + return nil +} + +func (s *ReadOpReadStep) free() { + C.free(unsafe.Pointer(s.bytesRead)) + C.free(unsafe.Pointer(s.prval)) + + s.bytesRead = nil + s.prval = nil +} + +func newReadOpReadStep() *ReadOpReadStep { + return &ReadOpReadStep{ + bytesRead: (*C.size_t)(C.malloc(C.sizeof_size_t)), + prval: (*C.int)(C.malloc(C.sizeof_int)), + } +} + +// Read bytes from offset into buffer. +// len(buffer) is the maximum number of bytes read from the object. +// buffer[:ReadOpReadStep.BytesRead] then contains object data. +// PREVIEW +// +// Implements: +// void rados_read_op_read(rados_read_op_t read_op, +// uint64_t offset, +// size_t len, +// char * buffer, +// size_t * bytes_read, +// int * prval) +func (r *ReadOp) Read(offset uint64, buffer []byte) *ReadOpReadStep { + oe := newReadStep(buffer, offset) + readStep := newReadOpReadStep() + r.steps = append(r.steps, oe, readStep) + C.rados_read_op_read( + r.op, + oe.cOffset, + oe.cReadLen, + oe.cBuffer, + readStep.bytesRead, + readStep.prval, + ) + + return readStep +} diff --git a/vendor/github.com/ceph/go-ceph/rados/read_step.go b/vendor/github.com/ceph/go-ceph/rados/read_step.go new file mode 100644 index 000000000..732f37b00 --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/rados/read_step.go @@ -0,0 +1,31 @@ +package rados + +// #include +import "C" + +import ( + "unsafe" +) + +type readStep struct { + withoutUpdate + withoutFree + // the c pointer utilizes the Go byteslice data and no free is needed + + // inputs: + b []byte + + // arguments: + cBuffer *C.char + cReadLen C.size_t + cOffset C.uint64_t +} + +func newReadStep(b []byte, offset uint64) *readStep { + return &readStep{ + b: b, + cBuffer: (*C.char)(unsafe.Pointer(&b[0])), // TODO: must be pinned + cReadLen: C.size_t(len(b)), + cOffset: C.uint64_t(offset), + } +} diff --git a/vendor/github.com/ceph/go-ceph/rados/watcher.go b/vendor/github.com/ceph/go-ceph/rados/watcher.go new file mode 100644 index 000000000..4569c6849 --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/rados/watcher.go @@ -0,0 +1,379 @@ +//go:build ceph_preview +// +build ceph_preview + +package rados + +/* +#cgo LDFLAGS: -lrados +#include +#include +extern void watchNotifyCb(void*, uint64_t, uint64_t, uint64_t, void*, size_t); +extern void watchErrorCb(void*, uint64_t, int); +*/ +import "C" + +import ( + "encoding/binary" + "fmt" + "math" + "sync" + "time" + "unsafe" +) + +type ( + // WatcherID is the unique id of a Watcher. + WatcherID uint64 + // NotifyID is the unique id of a NotifyEvent. + NotifyID uint64 + // NotifierID is the unique id of a notifying client. + NotifierID uint64 +) + +// NotifyEvent is received by a watcher for each notification. +type NotifyEvent struct { + ID NotifyID + WatcherID WatcherID + NotifierID NotifierID + Data []byte +} + +// NotifyAck represents an acknowleged notification. +type NotifyAck struct { + WatcherID WatcherID + NotifierID NotifierID + Response []byte +} + +// NotifyTimeout represents an unacknowleged notification. +type NotifyTimeout struct { + WatcherID WatcherID + NotifierID NotifierID +} + +// Watcher receives all notifications for certain object. +type Watcher struct { + id WatcherID + oid string + ioctx *IOContext + events chan NotifyEvent + errors chan error + done chan struct{} +} + +var ( + watchers = map[WatcherID]*Watcher{} + watchersMtx sync.RWMutex +) + +// Watch creates a Watcher for the specified object. +// PREVIEW +// +// A Watcher receives all notifications that are sent to the object on which it +// has been created. It exposes two read-only channels: Events() receives all +// the NotifyEvents and Errors() receives all occuring errors. A typical code +// creating a Watcher could look like this: +// +// watcher, err := ioctx.Watch(oid) +// go func() { // event handler +// for ne := range watcher.Events() { +// ... +// ne.Ack([]byte("response data...")) +// ... +// } +// }() +// go func() { // error handler +// for err := range watcher.Errors() { +// ... handle err ... +// } +// }() +// +// CAUTION: the Watcher references the IOContext in which it has been created. +// Therefore all watchers must be deleted with the Delete() method before the +// IOContext is being destroyed. +// +// Implements: +// int rados_watch2(rados_ioctx_t io, const char* o, uint64_t* cookie, +// rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, void* arg) +func (ioctx *IOContext) Watch(obj string) (*Watcher, error) { + return ioctx.WatchWithTimeout(obj, 0) +} + +// WatchWithTimeout creates a watcher on an object. Same as Watcher(), but +// different timeout than the default can be specified. +// PREVIEW +// +// Implements: +// int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie, +// rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, uint32_t timeout, +// void *arg); +func (ioctx *IOContext) WatchWithTimeout(oid string, timeout time.Duration) (*Watcher, error) { + cObj := C.CString(oid) + defer C.free(unsafe.Pointer(cObj)) + var id C.uint64_t + watchersMtx.Lock() + defer watchersMtx.Unlock() + ret := C.rados_watch3( + ioctx.ioctx, + cObj, + &id, + (C.rados_watchcb2_t)(C.watchNotifyCb), + (C.rados_watcherrcb_t)(C.watchErrorCb), + C.uint32_t(timeout.Milliseconds()/1000), + nil, + ) + if err := getError(ret); err != nil { + return nil, err + } + evCh := make(chan NotifyEvent) + errCh := make(chan error) + w := &Watcher{ + id: WatcherID(id), + ioctx: ioctx, + oid: oid, + events: evCh, + errors: errCh, + done: make(chan struct{}), + } + watchers[WatcherID(id)] = w + return w, nil +} + +// ID returns the WatcherId of the Watcher +// PREVIEW +func (w *Watcher) ID() WatcherID { + return w.id +} + +// Events returns a read-only channel, that receives all notifications that are +// sent to the object of the Watcher. +// PREVIEW +func (w *Watcher) Events() <-chan NotifyEvent { + return w.events +} + +// Errors returns a read-only channel, that receives all errors for the Watcher. +// PREVIEW +func (w *Watcher) Errors() <-chan error { + return w.errors +} + +// Check on the status of a Watcher. +// PREVIEW +// +// Returns the time since it was last confirmed. If there is an error, the +// Watcher is no longer valid, and should be destroyed with the Delete() method. +// +// Implements: +// int rados_watch_check(rados_ioctx_t io, uint64_t cookie) +func (w *Watcher) Check() (time.Duration, error) { + ret := C.rados_watch_check(w.ioctx.ioctx, C.uint64_t(w.id)) + if ret < 0 { + return 0, getError(ret) + } + return time.Millisecond * time.Duration(ret), nil +} + +// Delete the watcher. This closes both the event and error channel. +// PREVIEW +// +// Implements: +// int rados_unwatch2(rados_ioctx_t io, uint64_t cookie) +func (w *Watcher) Delete() error { + watchersMtx.Lock() + _, ok := watchers[w.id] + if ok { + delete(watchers, w.id) + } + watchersMtx.Unlock() + if !ok { + return nil + } + ret := C.rados_unwatch2(w.ioctx.ioctx, C.uint64_t(w.id)) + if ret != 0 { + return getError(ret) + } + close(w.done) // unblock blocked callbacks + close(w.events) + close(w.errors) + return nil +} + +// Notify sends a notification with the provided data to all Watchers of the +// specified object. +// PREVIEW +// +// CAUTION: even if the error is not nil. the returned slices +// might still contain data. +func (ioctx *IOContext) Notify(obj string, data []byte) ([]NotifyAck, []NotifyTimeout, error) { + return ioctx.NotifyWithTimeout(obj, data, 0) +} + +// NotifyWithTimeout is like Notify() but with a different timeout than the +// default. +// PREVIEW +// +// Implements: +// int rados_notify2(rados_ioctx_t io, const char* o, const char* buf, int buf_len, +// uint64_t timeout_ms, char** reply_buffer, size_t* reply_buffer_len) +func (ioctx *IOContext) NotifyWithTimeout(obj string, data []byte, timeout time.Duration) ([]NotifyAck, + []NotifyTimeout, error) { + cObj := C.CString(obj) + defer C.free(unsafe.Pointer(cObj)) + var cResponse *C.char + defer C.rados_buffer_free(cResponse) + var responseLen C.size_t + var dataPtr *C.char + if len(data) > 0 { + dataPtr = (*C.char)(unsafe.Pointer(&data[0])) + } + ret := C.rados_notify2( + ioctx.ioctx, + cObj, + dataPtr, + C.int(len(data)), + C.uint64_t(timeout.Milliseconds()), + &cResponse, + &responseLen, + ) + // cResponse has been set even if an error is returned, so we decode it anyway + acks, timeouts := decodeNotifyResponse(cResponse, responseLen) + return acks, timeouts, getError(ret) +} + +// Ack sends an acknowledgement with the specified response data to the notfier +// of the NotifyEvent. If a notify is not ack'ed, the originating Notify() call +// blocks and eventiually times out. +// PREVIEW +// +// Implements: +// int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id, +// uint64_t cookie, const char *buf, int buf_len) +func (ne *NotifyEvent) Ack(response []byte) error { + watchersMtx.RLock() + w, ok := watchers[ne.WatcherID] + watchersMtx.RUnlock() + if !ok { + return fmt.Errorf("can't ack on deleted watcher %v", ne.WatcherID) + } + cOID := C.CString(w.oid) + defer C.free(unsafe.Pointer(cOID)) + var respPtr *C.char + if len(response) > 0 { + respPtr = (*C.char)(unsafe.Pointer(&response[0])) + } + ret := C.rados_notify_ack( + w.ioctx.ioctx, + cOID, + C.uint64_t(ne.ID), + C.uint64_t(ne.WatcherID), + respPtr, + C.int(len(response)), + ) + return getError(ret) +} + +// WatcherFlush flushes all pending notifications of the cluster. +// PREVIEW +// +// Implements: +// int rados_watch_flush(rados_t cluster) +func (c *Conn) WatcherFlush() error { + if !c.connected { + return ErrNotConnected + } + ret := C.rados_watch_flush(c.cluster) + return getError(ret) +} + +// decoder for this notify response format: +// le32 num_acks +// { +// le64 gid global id for the client (for client.1234 that's 1234) +// le64 cookie cookie for the client +// le32 buflen length of reply message buffer +// u8 buflen payload +// } num_acks +// le32 num_timeouts +// { +// le64 gid global id for the client +// le64 cookie cookie for the client +// } num_timeouts +// +// NOTE: starting with pacific this is implemented as a C function and this can +// be replaced later +func decodeNotifyResponse(response *C.char, len C.size_t) ([]NotifyAck, []NotifyTimeout) { + if len == 0 || response == nil { + return nil, nil + } + b := (*[math.MaxInt32]byte)(unsafe.Pointer(response))[:len:len] + pos := 0 + + num := binary.LittleEndian.Uint32(b[pos:]) + pos += 4 + acks := make([]NotifyAck, num) + for i := range acks { + acks[i].NotifierID = NotifierID(binary.LittleEndian.Uint64(b[pos:])) + pos += 8 + acks[i].WatcherID = WatcherID(binary.LittleEndian.Uint64(b[pos:])) + pos += 8 + dataLen := binary.LittleEndian.Uint32(b[pos:]) + pos += 4 + if dataLen > 0 { + acks[i].Response = C.GoBytes(unsafe.Pointer(&b[pos]), C.int(dataLen)) + pos += int(dataLen) + } + } + + num = binary.LittleEndian.Uint32(b[pos:]) + pos += 4 + timeouts := make([]NotifyTimeout, num) + for i := range timeouts { + timeouts[i].NotifierID = NotifierID(binary.LittleEndian.Uint64(b[pos:])) + pos += 8 + timeouts[i].WatcherID = WatcherID(binary.LittleEndian.Uint64(b[pos:])) + pos += 8 + } + return acks, timeouts +} + +//export watchNotifyCb +func watchNotifyCb(_ unsafe.Pointer, notifyID C.uint64_t, id C.uint64_t, + notifierID C.uint64_t, cData unsafe.Pointer, dataLen C.size_t) { + watchersMtx.RLock() + w, ok := watchers[WatcherID(id)] + watchersMtx.RUnlock() + if !ok { + // usually this should not happen, but who knows + // TODO: some log message (once we have logging) + return + } + ev := NotifyEvent{ + ID: NotifyID(notifyID), + WatcherID: WatcherID(id), + NotifierID: NotifierID(notifierID), + } + if dataLen > 0 { + ev.Data = C.GoBytes(cData, C.int(dataLen)) + } + select { + case <-w.done: // unblock when deleted + case w.events <- ev: + } +} + +//export watchErrorCb +func watchErrorCb(_ unsafe.Pointer, id C.uint64_t, err C.int) { + watchersMtx.RLock() + w, ok := watchers[WatcherID(id)] + watchersMtx.RUnlock() + if !ok { + // usually this should not happen, but who knows + // TODO: some log message (once we have logging) + return + } + select { + case <-w.done: // unblock when deleted + case w.errors <- getError(err): + } +} diff --git a/vendor/github.com/ceph/go-ceph/rados/write_op_preview.go b/vendor/github.com/ceph/go-ceph/rados/write_op_cmpext.go similarity index 95% rename from vendor/github.com/ceph/go-ceph/rados/write_op_preview.go rename to vendor/github.com/ceph/go-ceph/rados/write_op_cmpext.go index 2a01411a6..7286fdd19 100644 --- a/vendor/github.com/ceph/go-ceph/rados/write_op_preview.go +++ b/vendor/github.com/ceph/go-ceph/rados/write_op_cmpext.go @@ -1,6 +1,3 @@ -//go:build ceph_preview -// +build ceph_preview - package rados // #cgo LDFLAGS: -lrados @@ -40,7 +37,6 @@ func newWriteOpCmpExtStep() *WriteOpCmpExtStep { } // CmpExt ensures that given object range (extent) satisfies comparison. -// PREVIEW // // Implements: // void rados_write_op_cmpext(rados_write_op_t write_op, diff --git a/vendor/github.com/ceph/go-ceph/rbd/rbd.go b/vendor/github.com/ceph/go-ceph/rbd/rbd.go index e7cd6c7cf..5f27bb7d2 100644 --- a/vendor/github.com/ceph/go-ceph/rbd/rbd.go +++ b/vendor/github.com/ceph/go-ceph/rbd/rbd.go @@ -706,15 +706,13 @@ func (image *Image) BreakLock(client string, cookie string) error { return getError(C.rbd_break_lock(image.image, cClient, cCookie)) } -// ssize_t rbd_read(rbd_image_t image, uint64_t ofs, size_t len, char *buf); -// TODO: int64_t rbd_read_iterate(rbd_image_t image, uint64_t ofs, size_t len, -// int (*cb)(uint64_t, size_t, const char *, void *), void *arg); -// TODO: int rbd_read_iterate2(rbd_image_t image, uint64_t ofs, uint64_t len, -// int (*cb)(uint64_t, size_t, const char *, void *), void *arg); -// TODO: int rbd_diff_iterate(rbd_image_t image, -// const char *fromsnapname, -// uint64_t ofs, uint64_t len, -// int (*cb)(uint64_t, size_t, int, void *), void *arg); +// Read data from the image. The length of the read is determined by the length +// of the buffer slice. The position of the read is determined by an internal +// offset which is not safe in concurrent code. Prefer ReadAt when possible. +// +// Implements: +// ssize_t rbd_read(rbd_image_t image, uint64_t ofs, size_t len, +// char *buf); func (image *Image) Read(data []byte) (int, error) { if err := image.validate(imageIsOpen); err != nil { return 0, err @@ -742,7 +740,13 @@ func (image *Image) Read(data []byte) (int, error) { return ret, nil } -// ssize_t rbd_write(rbd_image_t image, uint64_t ofs, size_t len, const char *buf); +// Write data to an image. The length of the write is determined by the length of +// the buffer slice. The position of the write is determined by an internal +// offset which is not safe in concurrent code. Prefer WriteAt when possible. +// +// Implements: +// ssize_t rbd_write(rbd_image_t image, uint64_t ofs, size_t len, +// const char *buf); func (image *Image) Write(data []byte) (n int, err error) { if err := image.validate(imageIsOpen); err != nil { return 0, err diff --git a/vendor/modules.txt b/vendor/modules.txt index 0d6c46af9..d7a6c0b35 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -69,9 +69,10 @@ github.com/cenkalti/backoff/v3 ## explicit; go 1.16 github.com/ceph/ceph-csi/api/deploy/kubernetes/rbd github.com/ceph/ceph-csi/api/deploy/ocp -# github.com/ceph/go-ceph v0.13.0 +# github.com/ceph/go-ceph v0.14.0 ## explicit; go 1.12 github.com/ceph/go-ceph/cephfs/admin +github.com/ceph/go-ceph/common/admin/manager github.com/ceph/go-ceph/common/commands github.com/ceph/go-ceph/internal/callbacks github.com/ceph/go-ceph/internal/commands