// Copyright (C) 2018-2019 Nexedi SA and Contributors. // Kirill Smelkov <kirr@nexedi.com> // // This program is free software: you can Use, Study, Modify and Redistribute // it under the terms of the GNU General Public License version 3, or (at your // option) any later version, as published by the Free Software Foundation. // // You can also Link and Combine this program with other software covered by // the terms of any of the Free Software licenses or any of the Open Source // Initiative approved licenses and Convey the resulting work. Corresponding // source of such a combination shall include the source code for all other // software used. // // This program is distributed WITHOUT ANY WARRANTY; without even the implied // warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // // See COPYING file for full licensing terms. // See https://www.nexedi.com/licensing for rationale and options. package main // ZBlk* + ZBigFile loading // module: "wendelin.bigfile.file_zodb" // // ZBigFile // .blksize xint // .blktab LOBtree{} blk -> ZBlk*(blkdata) // // ZBlk0 (aliased as ZBlk) // str with trailing '\0' removed. // // ZBlk1 // .chunktab IOBtree{} offset -> ZData(chunk) // // ZData // str (chunk) import ( "context" "fmt" "log" "reflect" "sort" "sync" "syscall" "golang.org/x/sync/errgroup" "lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb/btree" pickle "github.com/kisielk/og-rek" "./internal/pycompat" ) // zBlk is the interface that every ZBlk* block implements internally. type zBlk interface { // loadBlkData loads from database and returns data block stored by this ZBlk. // // If returned data size is less than the block size of containing ZBigFile, // the block trailing is assumed to be trailing \0. loadBlkData(ctx context.Context) ([]byte, error) // bindZFile associates ZBlk as being used by zfile to store block #blk. // // A ZBlk may be bound to several blocks inside one file, and to // several files. // // The information is preserved even when ZBlk comes to ghost // state, but is lost if ZBlk is garbage collected. // // it is safe to call multiple bindZFile simultaneously. // it is not safe to call bindZFile and boundTo simultaneously. // // XXX link to overview. bindZFile(zfile *ZBigFile, blk int64) // XXX unbindZFile // XXX zfile -> bind map for it // blkBoundTo returns ZBlk association with zfile(s)/#blk(s). // // The association returned is that was previously set by bindZFile. // // blkBoundTo must not be called simultaneously wrt bindZFile. blkBoundTo() map[*ZBigFile]SetI64 } // module of Wendelin ZODB py objects const zwendelin = "wendelin.bigfile.file_zodb" // ---- zBlkBase ---- // zBlkBase provides common functionality to implement ZBlk* -> zfile, #blk binding. // // The data stored by zBlkBase is transient - it is _not_ included into // persistent state. type zBlkBase struct { bindMu sync.Mutex // used only for binding to support multiple loaders inzfile map[*ZBigFile]SetI64 // {} zfile -> set(#blk) } // bindZFile implements zBlk. func (zb *zBlkBase) bindZFile(zfile *ZBigFile, blk int64) { zb.bindMu.Lock() defer zb.bindMu.Unlock() blkmap, ok := zb.inzfile[zfile] if !ok { blkmap = make(SetI64, 1) if zb.inzfile == nil { zb.inzfile = make(map[*ZBigFile]SetI64) } zb.inzfile[zfile] = blkmap } blkmap.Add(blk) } // blkBoundTo implementss zBlk. func (zb *zBlkBase) blkBoundTo() map[*ZBigFile]SetI64 { return zb.inzfile } // ---- ZBlk0 ---- // ZBlk0 mimics ZBlk0 from python. type ZBlk0 struct { zBlkBase zodb.Persistent // XXX py source uses bytes(buf) but on python2 it still results in str blkdata string } type zBlk0State ZBlk0 // hide state methods from public API func (zb *zBlk0State) DropState() { zb.blkdata = "" } func (zb *zBlk0State) PySetState(pystate interface{}) error { blkdata, ok := pystate.(string) if !ok { return fmt.Errorf("expect str; got %s", typeOf(pystate)) } log.Printf("ZBlk0.PySetState #%d", len(blkdata)) zb.blkdata = blkdata return nil } func (zb *ZBlk0) loadBlkData(ctx context.Context) ([]byte, error) { // XXX err ctx err := zb.PActivate(ctx) if err != nil { return nil, err } defer zb.PDeactivate() return mem.Bytes(zb.blkdata), nil } // ---- ZBlk1 --- // ZData mimics ZData from python. type ZData struct { zBlkBase zodb.Persistent // XXX py source uses bytes(buf) but on python2 it still results in str data string } type zDataState ZData // hide state methods from public API func (zd *zDataState) DropState() { zd.data = "" } func (zd *zDataState) PySetState(pystate interface{}) error { //log.Printf("ZData.PySetState") data, ok := pystate.(string) if !ok { return fmt.Errorf("expect str; got %s", typeOf(pystate)) } zd.data = data return nil } // ZBlk1 mimics ZBlk1 from python. type ZBlk1 struct { zodb.Persistent chunktab *btree.IOBTree // {} offset -> ZData(chunk) } type zBlk1State ZBlk1 // hide state methods from public API func (zb *zBlk1State) DropState() { zb.chunktab = nil } func (zb *zBlk1State) PySetState(pystate interface{}) error { log.Printf("ZBlk1.PySetState") chunktab, ok := pystate.(*btree.IOBTree) if !ok { return fmt.Errorf("expect IOBTree; got %s", typeOf(pystate)) } zb.chunktab = chunktab return nil } func (zb *ZBlk1) loadBlkData(ctx context.Context) ([]byte, error) { // XXX errctx err := zb.PActivate(ctx) if err != nil { return nil, err } defer zb.PDeactivate() // get to all ZData objects; activate them and build // // {} offset -> ZData // // with all ZData being live. var mu sync.Mutex chunktab := make(map[int32]*ZData) // on return deactivate all ZData objects loaded in chunktab defer func() { for _, zd := range chunktab { zd.PDeactivate() } }() wg, ctx := errgroup.WithContext(ctx) // loadZData loads 1 ZData object into chunktab and leaves it activated. loadZData := func(offset int32, zd *ZData) error { err := zd.PActivate(ctx) if err != nil { return err } // no PDeactivate, zd remains live //fmt.Printf("@%d -> zdata #%s (%d)\n", offset, zd.POid(), len(zd.data)) mu.Lock() defer mu.Unlock() chunktab[offset] = zd return nil } // loadBucket loads all ZData objects from leaf BTree bucket. loadBucket := func(b *btree.IOBucket) error { err := b.PActivate(ctx) if err != nil { return err } defer b.PDeactivate() // go through all bucket key/v -> chunktab // XXX off < 0 !ok // XXX off + len > blksize !ok //fmt.Printf("\nbucket: %v\n\n", b.Entryv()) for _, e := range b.Entryv() { zd, ok := e.Value().(*ZData) if !ok { return fmt.Errorf("!ZData (%s)", typeOf(e.Value())) } offset := e.Key() wg.Go(func() error { return loadZData(offset, zd) }) } return nil } // loadBTree spawns loading of all BTree children. var loadBTree func(t *btree.IOBTree) error loadBTree = func(t *btree.IOBTree) error { err := t.PActivate(ctx) if err != nil { return err } defer t.PDeactivate() //fmt.Printf("\nbtree: %v\n\n", t.Entryv()) for _, e := range t.Entryv() { switch child := e.Child().(type) { case *btree.IOBTree: wg.Go(func() error { return loadBTree(child) }) case *btree.IOBucket: wg.Go(func() error { return loadBucket(child) }) default: panic(fmt.Sprintf("IOBTree has %s child", typeOf(child))) } } return nil } wg.Go(func() error { return loadBTree(zb.chunktab) }) err = wg.Wait() if err != nil { return nil, err // XXX err ctx } // empty .chunktab -> ø if len(chunktab) == 0 { return nil, nil } // glue all chunks from chunktab offv := make([]int32, 0, len(chunktab)) // ↑ for off := range(chunktab) { offv = append(offv, off) } sort.Slice(offv, func(i, j int) bool { return offv[i] < offv[j] }) //fmt.Printf("#chunktab: %d\n", len(chunktab)) //fmt.Printf("offv: %v\n", offv) // find out whole blk len via inspecting tail chunk tailStart := offv[len(offv)-1] tailChunk := chunktab[tailStart] blklen := tailStart + int32(len(tailChunk.data)) // XXX overflow? // whole buffer initialized as 0 + tail_chunk blkdata := make([]byte, blklen) copy(blkdata[tailStart:], tailChunk.data) // go through all chunks besides tail and extract them stop := int32(0) for _, start := range offv[:len(offv)-1] { chunk := chunktab[start] if !(start >= stop) { // verify chunks don't overlap return nil, fmt.Errorf("!(start >= stop)") // XXX } if !(start + int32(len(chunk.data)) <= int32(len(blkdata))) { // XXX overflow? return nil, fmt.Errorf("blkdata overrun") // XXX } stop = start + int32(len(chunk.data)) // XXX overflow? copy(blkdata[start:], chunk.data) } return blkdata, nil } // ---------------------------------------- // ZBigFile mimics ZBigFile from python. type ZBigFile struct { zodb.Persistent blksize int64 blktab *btree.LOBTree // {} blk -> ZBlk*(blkdata) } type zBigFileState ZBigFile // hide state methods from public API // DropState implements zodb.Stateful. func (bf *zBigFileState) DropState() { bf.blksize = 0 bf.blktab = nil } // PySetState implements zodb.PyStateful. func (bf *zBigFileState) PySetState(pystate interface{}) (err error) { // ZBigFile // .blksize xint // .blktab LOBtree{} blk -> ZBlk*(blkdata) // // state: (.blksize, .blktab) t, ok := pystate.(pickle.Tuple) if !ok { return fmt.Errorf("expect [2](); got %s", typeOf(pystate)) } if len(t) != 2 { return fmt.Errorf("expect [2](); got [%d]()", len(t)) } blksize, ok := pycompat.Int64(t[0]) if !ok { return fmt.Errorf("blksize: expect integer; got %s", typeOf(t[0])) } if blksize <= 0 { return fmt.Errorf("blksize: must be > 0; got %d", blksize) } blktab, ok := t[1].(*btree.LOBTree) if !ok { return fmt.Errorf("blktab: expect LOBTree; got %s", typeOf(t[1])) } bf.blksize = blksize bf.blktab = blktab return nil } // LoadBlk loads data for file block #blk. // // XXX better load into user-provided buf? func (bf *ZBigFile) LoadBlk(ctx context.Context, blk int64) (_ []byte, err error) { defer xerr.Contextf(&err, "bigfile %s: loadblk %d", bf.POid(), blk) err = bf.PActivate(ctx) if err != nil { return nil, err } defer bf.PDeactivate() xzblk, ok, err := bf.blktab.Get(ctx, blk) if err != nil { return nil, err } if !ok { return make([]byte, bf.blksize), nil } zblk, ok := xzblk.(zBlk) if !ok { return nil, fmt.Errorf("expect ZBlk*; got %s", typeOf(xzblk)) } blkdata, err := zblk.loadBlkData(ctx) if err != nil { return nil, err } l := int64(len(blkdata)) if l > bf.blksize { return nil, fmt.Errorf("invalid blk: size = %d (> blksize = %d)", l, bf.blksize) } // append trailing \0 to data to reach .blksize if l < bf.blksize { d := make([]byte, bf.blksize) copy(d, blkdata) blkdata = d } zblk.bindZFile(bf, blk) //log.Printf("ZBigFile.loadblk(%d) -> %dB", blk, len(blkdata)) return blkdata, nil } // Size returns whole file size. func (bf *ZBigFile) Size(ctx context.Context) (_ int64, err error) { defer xerr.Contextf(&err, "bigfile %s: size", bf.POid()) err = bf.PActivate(ctx) if err != nil { return 0, err } defer bf.PDeactivate() tailblk, ok, err := bf.blktab.MaxKey(ctx) if err != nil { return 0, err } if !ok { return 0, nil } size := (tailblk + 1) * bf.blksize if size / bf.blksize != tailblk + 1 { return 0, syscall.EFBIG // overflow } return size, nil } // ---------------------------------------- func init() { t := reflect.TypeOf zodb.RegisterClass(zwendelin + ".ZBlk0", t(ZBlk0{}), t(zBlk0State{})) zodb.RegisterClass(zwendelin + ".ZBlk1", t(ZBlk1{}), t(zBlk1State{})) zodb.RegisterClass(zwendelin + ".ZData", t(ZData{}), t(zDataState{})) zodb.RegisterClass(zwendelin + ".ZBigFile", t(ZBigFile{}), t(zBigFileState{})) // backward compatibility zodb.RegisterClassAlias(zwendelin + ".ZBlk", zwendelin + ".ZBlk0") }