Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
neoppod
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Levin Zimmermann
neoppod
Commits
abca1b3f
Commit
abca1b3f
authored
Aug 27, 2017
by
Kirill Smelkov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
.
parent
8d3fa558
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
56 additions
and
11 deletions
+56
-11
go/neo/client/client.go
go/neo/client/client.go
+20
-8
go/neo/nodetab.go
go/neo/nodetab.go
+3
-2
go/neo/parttab.go
go/neo/parttab.go
+1
-1
go/xcommon/xcontext/xcontext.go
go/xcommon/xcontext/xcontext.go
+32
-0
No files found.
go/neo/client/client.go
View file @
abca1b3f
...
@@ -22,6 +22,7 @@ package client
...
@@ -22,6 +22,7 @@ package client
import
(
import
(
"context"
"context"
"math/rand"
"net/url"
"net/url"
"lab.nexedi.com/kirr/neo/go/neo"
"lab.nexedi.com/kirr/neo/go/neo"
...
@@ -43,8 +44,6 @@ func (c *Client) StorageName() string {
...
@@ -43,8 +44,6 @@ func (c *Client) StorageName() string {
return
"neo"
return
"neo"
}
}
// XXX loading cache (+ singleflight)
// NewClient creates new client node.
// NewClient creates new client node.
// it will connect to master @masterAddr and identify with sepcified cluster name
// it will connect to master @masterAddr and identify with sepcified cluster name
func
NewClient
(
clusterName
,
masterAddr
string
,
net
xnet
.
Networker
)
(
*
Client
,
error
)
{
func
NewClient
(
clusterName
,
masterAddr
string
,
net
xnet
.
Networker
)
(
*
Client
,
error
)
{
...
@@ -60,7 +59,7 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) (*Client, err
...
@@ -60,7 +59,7 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) (*Client, err
},
},
}
}
// XXX ->
background
// XXX ->
talkMaster
cli
.
node
.
Dial
(
context
.
TODO
(),
neo
.
MASTER
,
masterAddr
)
cli
.
node
.
Dial
(
context
.
TODO
(),
neo
.
MASTER
,
masterAddr
)
panic
(
"TODO"
)
panic
(
"TODO"
)
}
}
...
@@ -101,9 +100,23 @@ func (c *Client) LastOid() (zodb.Oid, error) {
...
@@ -101,9 +100,23 @@ func (c *Client) LastOid() (zodb.Oid, error) {
}
}
func
(
c
*
Client
)
Load
(
xid
zodb
.
Xid
)
(
data
[]
byte
,
tid
zodb
.
Tid
,
err
error
)
{
func
(
c
*
Client
)
Load
(
xid
zodb
.
Xid
)
(
data
[]
byte
,
tid
zodb
.
Tid
,
err
error
)
{
panic
(
"TODO"
)
// XXX check pt is operational first?
/*
cellv
:=
c
.
node
.
PartTab
.
Get
(
xid
.
Oid
)
// FIXME do not use global conn (see comment in openClientByURL)
// XXX cellv = filter(cellv, UP_TO_DATE)
cell
:=
cellv
[
rand
.
Intn
(
len
(
cellv
))]
stor
:=
c
.
node
.
NodeTab
.
Get
(
cell
.
NodeUUID
)
if
stor
==
nil
{
// XXX?
}
//Slink := c.Connect(stor) // single-flight Dial; puts result into stor.Link (XXX ok?)
Slink
:=
stor
.
Connect
()
// single-flight Dial; puts result into stor.Link (XXX ok?)
// TODO maintain conn pool so every new GetObject request does not
// spawn new goroutine on server
// Sconn = stor.GetConn()
// XXX defer if ok stor.PutConn(Sconn)
Sconn
:=
Slink
.
NewConn
()
req
:=
neo
.
GetObject
{
Oid
:
xid
.
Oid
}
req
:=
neo
.
GetObject
{
Oid
:
xid
.
Oid
}
if
xid
.
TidBefore
{
if
xid
.
TidBefore
{
req
.
Serial
=
neo
.
INVALID_TID
req
.
Serial
=
neo
.
INVALID_TID
...
@@ -114,7 +127,7 @@ func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
...
@@ -114,7 +127,7 @@ func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
}
}
resp
:=
neo
.
AnswerGetObject
{}
resp
:=
neo
.
AnswerGetObject
{}
err =
c.storC
onn.Ask(&req, &resp)
err
=
Sc
onn
.
Ask
(
&
req
,
&
resp
)
if
err
!=
nil
{
if
err
!=
nil
{
return
nil
,
0
,
err
// XXX err context
return
nil
,
0
,
err
// XXX err context
}
}
...
@@ -125,7 +138,6 @@ func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
...
@@ -125,7 +138,6 @@ func (c *Client) Load(xid zodb.Xid) (data []byte, tid zodb.Tid, err error) {
// reply.NextSerial
// reply.NextSerial
// reply.DataSerial
// reply.DataSerial
return
resp
.
Data
,
resp
.
Serial
,
nil
return
resp
.
Data
,
resp
.
Serial
,
nil
*/
}
}
func
(
c
*
Client
)
Iterate
(
tidMin
,
tidMax
zodb
.
Tid
)
zodb
.
IStorageIterator
{
func
(
c
*
Client
)
Iterate
(
tidMin
,
tidMax
zodb
.
Tid
)
zodb
.
IStorageIterator
{
...
...
go/neo/nodetab.go
View file @
abca1b3f
...
@@ -101,7 +101,7 @@ type Node struct {
...
@@ -101,7 +101,7 @@ type Node struct {
}
}
// Get finds node by uuid
// Get finds node by uuid
.
func
(
nt
*
NodeTable
)
Get
(
uuid
NodeUUID
)
*
Node
{
func
(
nt
*
NodeTable
)
Get
(
uuid
NodeUUID
)
*
Node
{
// FIXME linear scan
// FIXME linear scan
for
_
,
node
:=
range
nt
.
nodev
{
for
_
,
node
:=
range
nt
.
nodev
{
...
@@ -114,7 +114,8 @@ func (nt *NodeTable) Get(uuid NodeUUID) *Node {
...
@@ -114,7 +114,8 @@ func (nt *NodeTable) Get(uuid NodeUUID) *Node {
// XXX GetByAddress ?
// XXX GetByAddress ?
// Update updates information about a node
// Update updates information about a node.
//
// it returns corresponding node entry for convenience
// it returns corresponding node entry for convenience
func
(
nt
*
NodeTable
)
Update
(
nodeInfo
NodeInfo
,
conn
*
Conn
/*XXX better link *NodeLink*/
)
*
Node
{
func
(
nt
*
NodeTable
)
Update
(
nodeInfo
NodeInfo
,
conn
*
Conn
/*XXX better link *NodeLink*/
)
*
Node
{
node
:=
nt
.
Get
(
nodeInfo
.
UUID
)
node
:=
nt
.
Get
(
nodeInfo
.
UUID
)
...
...
go/neo/parttab.go
View file @
abca1b3f
...
@@ -136,7 +136,7 @@ type Cell struct {
...
@@ -136,7 +136,7 @@ type Cell struct {
//
//
}
}
// Get returns cells oid is associated with
// Get returns cells oid is associated with
.
func
(
pt
*
PartitionTable
)
Get
(
oid
zodb
.
Oid
)
[]
Cell
{
func
(
pt
*
PartitionTable
)
Get
(
oid
zodb
.
Oid
)
[]
Cell
{
if
len
(
pt
.
tab
)
==
0
{
if
len
(
pt
.
tab
)
==
0
{
return
nil
return
nil
...
...
go/xcommon/xcontext/xcontext.go
View file @
abca1b3f
...
@@ -45,6 +45,37 @@ func Merge(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) {
...
@@ -45,6 +45,37 @@ func Merge(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) {
done
:
make
(
chan
struct
{}),
done
:
make
(
chan
struct
{}),
cancelCh
:
make
(
chan
struct
{}),
cancelCh
:
make
(
chan
struct
{}),
}
}
/*
// ctx1 will never be canceled?
switch ctx1.Done() {
case nil, context.Background().Done():
bg1 = true
}
// ----//---- same for ctx2?
*/
/*
XXX do we need vvv?
// if src ctx is already cancelled - make mc cancelled right after creation
select {
case <-ctx1.Done():
mc.done = ctx1.Done()
mc.doneErr = ctx1.Err()
case <-ctx2.Done():
mc.done = ctx2.Done()
mc.doneErr = ctx2.Err()
// src ctx not canceled - spawn ctx{1,2}.done merger.
default:
done := make(chan struct{})
mc.done = done
go mc.wait(done)
}
*/
go
mc
.
wait
()
go
mc
.
wait
()
return
mc
,
mc
.
cancel
return
mc
,
mc
.
cancel
}
}
...
@@ -124,6 +155,7 @@ func (mc *mergeCtx) Value(key interface{}) interface{} {
...
@@ -124,6 +155,7 @@ func (mc *mergeCtx) Value(key interface{}) interface{} {
return
mc
.
ctx2
.
Value
(
key
)
return
mc
.
ctx2
.
Value
(
key
)
}
}
// Cancelled reports whether an error is due to a canceled context.
// Cancelled reports whether an error is due to a canceled context.
//
//
// Since both cancellation ways - explicit and due to exceeding context
// Since both cancellation ways - explicit and due to exceeding context
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment