db.go 8.02 KB
Newer Older
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
1 2
// Copyright (C) 2018-2019  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
3
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
19 20

package zodb
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
21
// application-level database handle.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
22

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
23 24
import (
	"context"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
25
	"sort"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
26
	"sync"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
27
	"time"
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
28 29 30 31

	"lab.nexedi.com/kirr/neo/go/transaction"
)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
32
// DB represents a handle to database at application level and contains pool
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
33 34 35 36
// of connections. DB.Open opens database connection. The connection will be
// automatically put back into DB pool for future reuse after corresponding
// transaction is complete. DB thus provides service to maintain live objects
// cache and reuse live objects from transaction to transaction.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
37 38 39 40 41 42 43 44 45
//
// Note that it is possible to have several DB handles to the same database.
// This might be useful if application accesses distinctly different sets of
// objects in different transactions and knows beforehand which set it will be
// next time. Then, to avoid huge cache misses, it makes sense to keep DB
// handles opened for every possible case of application access.
//
// DB is safe to access from multiple goroutines simultaneously.
type DB struct {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
46
	stor IStorage
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
47

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
48 49
	mu    sync.Mutex
	connv []*Connection // order by ↑= .at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
50

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
51
	// XXX -> Storage. XXX or -> Cache? (so it is not duplicated many times for many DB case)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
52 53 54 55 56 57 58

	// δtail of database changes for invalidations
	// min(rev) = min(conn.at) for all conn ∈ db (opened and in the pool)
	δtail ΔTail // [](rev↑, []oid)

	// openers waiting for δtail.Head to become covering their at.
	δwait map[δwaiter]struct{} // set(at, ready)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
59 60
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
61 62 63 64 65
// δwaiter represents someone waiting for δtail.Head to become ≥ at.
// XXX place
type δwaiter struct {
	at    Tid
	ready chan struct{}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
66 67 68 69 70
}


// NewDB creates new database handle.
func NewDB(stor IStorage) *DB {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
71
	// XXX db options?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
72 73 74 75 76 77
	db := &DB{stor: stor}
	watchq := make(chan CommitEvent)
	stor.AddWatch(watchq)
	// XXX DelWatch? in db.Close() ?
	go db.watcher(watchq)
	return db
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
78 79
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
80 81 82 83 84 85
// ConnOptions describes options to DB.Open .
type ConnOptions struct {
	At     Tid  // if !0, open Connection bound to `at` view of database; not latest.
	NoSync bool // don't sync with storage to get its last tid.
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
86 87 88 89 90
// String represents connection options in human-readable form.
//
// For example:
//
//	(@head, sync)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
func (opt *ConnOptions) String() string {
	s := "(@"
	if opt.At != 0 {
		s += opt.At.String()
	} else {
		s += "head"
	}

	s += ", "
	if opt.NoSync {
		s += "no"
	}
	s += "sync)"
	return s
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
// watcher receives events about new committed transactions and updates δtail.
//
// it also wakes up δtail waiters.
func (db *DB) watcher(watchq <-chan CommitEvent) { // XXX err ?
	for {
		event, ok := <-watchq
		if !ok {
			// XXX wake up all waiters?
			return // closed
		}

		var readyv []chan struct{} // waiters that become ready

		db.mu.Lock()
		db.δtail.Append(event.Tid, event.Changev)
		for w := range db.δwait {
			if w.at <= event.Tid {
				readyv = append(readyv, w.ready)
				delete(db.δwait, w)
			}
		}
		db.mu.Unlock()

		// wakeup waiters outside of db.mu
		for _, ready := range readyv {
			close(ready)
		}
	}
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
137
// Open opens new connection to the database.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
138
//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
139 140
// By default the connection is opened to current latest database state; opt.At
// can be specified to open connection bound to particular view of the database.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
141 142
//
// Open must be called under transaction.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
143 144
// Opened connection must be used only under the same transaction and only
// until that transaction is complete.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
145 146 147 148 149 150 151 152 153
func (db *DB) Open(ctx context.Context, opt *ConnOptions) (_ *Connection, err error) {
	defer func() {
		if err == nil {
			return
		}

		err = &OpError{
			URL:  db.stor.URL(),
			Op:   "open db",
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
154
			Args: opt,
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
155 156 157
			Err:  err,
		}
	}()
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
158

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
159 160
	txn := transaction.Current(ctx)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
161 162
	at := opt.At
	if at == 0 {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
163
		head := zodb.Tid(0)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
164

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
165 166 167 168 169 170 171
		if opt.NoSync {
			// XXX locking
			// XXX prevent retrieved head to be removed from δtail
			head = db.δtail.Head()	// = 0 if empty
		}

		// !NoSync or δtail empty
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
172
		// sync storage for lastTid
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
173 174 175
		if head == 0 {
			var err error

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
176 177 178
			// XXX stor.LastTid returns last_tid storage itself
			// received on server, not last_tid on server.
			// -> add stor.Sync() ?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
179 180 181 182 183 184 185
			head, err = db.stor.LastTid(ctx)
			if err != nil {
				return nil, err
			}
		}

		at = head
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
186 187
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
	// wait till .δtail.head is up to date covering ≥ at
	var δready chan struct{}
	db.mu.Lock()
	δhead := δtail.Head()
	// XXX prevent head from going away?
	if δhead < at {
		δready = make(chan struct{})
		db.δwait[δwaiter{at, δready}] = struct{}
	}
	db.mu.Unlock()

	if δready != nil {
		select {
		case <-ctx.Done():
			return ctx.Err()

		case <-δready:
			// ok
		}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
207
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
208

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
209 210 211
	// now we have both at and invalidation data covering it -> proceed to
	// get connection from the pool.
	conn := db.get(at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
212 213 214
	conn.txn = txn
	txn.RegisterSync((*connTxnSync)(conn))

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
215
	return conn, nil
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
216 217
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
218 219 220 221
// get returns connection from db pool most close to at.
//
// it creates new one if there is no close-enough connection in the pool.
func (db *DB) get(at Tid) *Connection {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
222 223 224
	db.mu.Lock()
	defer db.mu.Unlock()

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
225 226
	l := len(db.connv)

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
227 228
	// find connv index corresponding to at:
	// [i-1].at ≤ at < [i].at
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
229
	i := sort.Search(l, func(i int) bool {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
230 231 232 233
		return at < db.connv[i].at
	})

	// search through window of X previous connections and find out the one
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
234
	// with minimal distance to get to state @at. If all connections are too
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
235
	// distant - create connection anew.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
236
	//
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
237 238 239 240 241 242 243 244 245 246
	// XXX search not only previous, but future too? (we can get back to
	// past by invalidating what was later changed)
	const X = 10 // XXX hardcoded
	jδmin := -1
	for j := i - X; j < i; j++ {
		if j < 0 {
			continue
		}

		// TODO search for max N(live) - N(live, that will need to be invalidated)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
247
		jδmin = j // XXX stub (using rightmost j)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
248 249 250 251 252
	}

	// nothing found or too distant
	const Tnear = 10*time.Minute // XXX hardcoded
	if jδmin < 0 || tabs(δtid(at, db.connv[jδmin].at)) > Tnear {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
253
		return newConnection(db, at)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
254 255
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
256 257 258 259 260
	// reuse the connection
	conn := db.connv[jδmin]
	copy(db.connv[jδmin:], db.connv[jδmin+1:])
	db.connv[l-1] = nil
	db.connv = db.connv[:l-1]
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
261

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
262 263 264
	if conn.db != db {
		panic("DB.get: foreign connection in the pool")
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
265
	if conn.txn != nil {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
266
		panic("DB.get: live connection in the pool")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
267 268 269
	}

	if conn.at != at {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
270
		panic("DB.get: TODO: invalidations")	// XXX
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
271
	}
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
272

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
273 274 275 276 277
	return conn
}

// put puts connection back into db pool.
func (db *DB) put(conn *Connection) {
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
278 279 280 281
	if conn.db != db {
		panic("DB.put: conn.db != db")
	}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
282 283 284 285 286 287
	conn.txn = nil

	db.mu.Lock()
	defer db.mu.Unlock()

	// XXX check if len(connv) > X, and drop conn if yes
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
288 289 290 291 292 293 294 295 296
	// [i-1].at ≤ at < [i].at
	i := sort.Search(len(db.connv), func(i int) bool {
		return conn.at < db.connv[i].at
	})

	//db.connv = append(db.connv[:i], conn, db.connv[i:]...)
	db.connv = append(db.connv, nil)
	copy(db.connv[i+1:], db.connv[i:])
	db.connv[i] = conn
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
297 298

	// XXX GC too idle connections here?
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
299 300 301 302 303 304
}

// ---- txn sync ----

type connTxnSync Connection // hide from public API

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
305 306 307
func (csync *connTxnSync) BeforeCompletion(txn transaction.Transaction) {
	conn := (*Connection)(csync)
	conn.checkTxn(txn, "BeforeCompletion")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
308 309 310
	// nothing
}

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
311
// AfterCompletion puts conn back into db pool after transaction is complete.
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
312 313 314
func (csync *connTxnSync) AfterCompletion(txn transaction.Transaction) {
	conn := (*Connection)(csync)
	conn.checkTxn(txn, "AfterCompletion")
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
315

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
316 317
	// XXX check that conn was explicitly closed by user?

Kirill Smelkov's avatar
.  
Kirill Smelkov committed
318
	conn.db.put(conn)
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
319
}