Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
neoppod
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Levin Zimmermann
neoppod
Commits
5ad44db5
Commit
5ad44db5
authored
8 years ago
by
Kirill Smelkov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
.
parent
81e7018a
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
62 additions
and
36 deletions
+62
-36
neo/client/Storage.py
neo/client/Storage.py
+2
-1
neo/lib/logger.py
neo/lib/logger.py
+11
-1
neo/storage/handlers/client.py
neo/storage/handlers/client.py
+6
-0
neo/tests/threaded/testReplication.py
neo/tests/threaded/testReplication.py
+43
-34
No files found.
neo/client/Storage.py
View file @
5ad44db5
...
...
@@ -81,7 +81,8 @@ class Storage(BaseStorage.BaseStorage,
try
:
return
self
.
app
.
load
(
oid
)[:
2
]
except
NEOStorageNotFoundError
:
raise
POSException
.
POSKeyError
(
oid
)
raise
#raise POSException.POSKeyError(oid)
def
new_oid
(
self
):
return
self
.
app
.
new_oid
()
...
...
This diff is collapsed.
Click to expand it.
neo/lib/logger.py
View file @
5ad44db5
...
...
@@ -28,6 +28,7 @@ from logging import getLogger, Formatter, Logger, StreamHandler, \
from
time
import
time
from
traceback
import
format_exception
import
bz2
,
inspect
,
neo
,
os
,
signal
,
sqlite3
,
sys
,
threading
from
cStringIO
import
StringIO
# Stats for storage node of matrix test (py2.7:SQLite)
RECORD_SIZE
=
(
234360832
# extra memory used
...
...
@@ -37,6 +38,8 @@ RECORD_SIZE = ( 234360832 # extra memory used
FMT
=
(
'%(asctime)s %(levelname)-9s %(name)-10s'
' [%(module)14s:%(lineno)3d]
\
n
%(message)s'
)
from
.
import
protocol
class
_Formatter
(
Formatter
):
def
formatTime
(
self
,
record
,
datefmt
=
None
):
...
...
@@ -222,8 +225,14 @@ class NEOLogger(Logger):
peer
=
'%s %s (%s:%u)'
%
(
'>'
if
r
.
outgoing
else
'<'
,
uuid_str
(
r
.
uuid
),
ip
,
port
)
msg
=
r
.
msg
pktcls
=
protocol
.
StaticRegistry
[
r
.
code
]
#bmsg = StringIO(msg)
#hmsg = protocol.Packets.parse(bmsg, protocol.ParserState())
print
'PACKET %s %s
\
t
%s
\
t
%s
\
t
%s %s'
%
(
r
.
created
,
r
.
_name
,
r
.
msg_id
,
pktcls
.
__name__
,
peer
,
r
.
pkt
.
decode
())
if
msg
is
not
None
:
msg
=
buffer
(
msg
)
self
.
_db
.
execute
(
"INSERT INTO packet VALUES (NULL,?,?,?,?,?,?)"
,
(
r
.
created
,
r
.
_name
,
r
.
msg_id
,
r
.
code
,
peer
,
msg
))
else
:
...
...
@@ -264,11 +273,12 @@ class NEOLogger(Logger):
self
.
parent
.
callHandlers
(
record
)
def
packet
(
self
,
connection
,
packet
,
outgoing
):
if
self
.
_db
is
not
None
:
if
True
or
self
.
_db
is
not
None
:
body
=
packet
.
_body
if
self
.
_max_packet
and
self
.
_max_packet
<
len
(
body
):
body
=
None
self
.
_queue
(
PacketRecord
(
pkt
=
packet
,
created
=
time
(),
msg_id
=
packet
.
_id
,
code
=
packet
.
_code
,
...
...
This diff is collapsed.
Click to expand it.
neo/storage/handlers/client.py
View file @
5ad44db5
...
...
@@ -23,6 +23,8 @@ from ..transactions import ConflictError, DelayedError, NotRegisteredError
from
..exception
import
AlreadyPendingError
import
time
import
traceback
# Log stores taking (incl. lock delays) more than this many seconds.
# Set to None to disable.
SLOW_STORE
=
2
...
...
@@ -45,6 +47,7 @@ class ClientOperationHandler(EventHandler):
app
.
queueEvent
(
self
.
askObject
,
conn
,
(
oid
,
serial
,
tid
))
return
o
=
app
.
dm
.
getObject
(
oid
,
serial
,
tid
)
print
'AAA %r'
%
o
try
:
serial
,
next_serial
,
compression
,
checksum
,
data
,
data_serial
=
o
except
TypeError
:
...
...
@@ -252,6 +255,7 @@ class ClientROOperationHandler(ClientOperationHandler):
super
(
ClientROOperationHandler
,
self
).
askTransactionInformation
(
conn
,
tid
)
def
askObject
(
self
,
conn
,
oid
,
serial
,
tid
):
print
'
\
n
\
n
\
n
ASK OBJECT %r, %r, %r
\
n
\
n
\
n
'
%
(
oid
,
serial
,
tid
)
backup_tid
=
self
.
app
.
dm
.
getBackupTID
()
if
serial
and
serial
>
backup_tid
:
# obj lookup will find nothing, but return properly either
...
...
@@ -264,7 +268,9 @@ class ClientROOperationHandler(ClientOperationHandler):
if
not
serial
and
not
tid
:
tid
=
add64
(
backup_tid
,
1
)
print
'-> %r %r %r'
%
(
oid
,
serial
,
tid
)
super
(
ClientROOperationHandler
,
self
).
askObject
(
conn
,
oid
,
serial
,
tid
)
print
'XXX'
def
askTIDsFrom
(
self
,
conn
,
min_tid
,
max_tid
,
length
,
partition
):
backup_tid
=
self
.
app
.
dm
.
getBackupTID
()
...
...
This diff is collapsed.
Click to expand it.
neo/tests/threaded/testReplication.py
View file @
5ad44db5
...
...
@@ -14,6 +14,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from
logging
import
getLogger
,
INFO
,
DEBUG
import
random
import
time
import
transaction
...
...
@@ -32,6 +34,10 @@ from neo.lib.util import p64
from
..
import
Patch
from
.
import
ConnectionFilter
,
NEOCluster
,
NEOThreadedTest
,
predictable_random
# dump log to stderr
logging
.
backlog
(
max_size
=
None
)
del
logging
.
default_root_handler
.
handle
getLogger
().
setLevel
(
DEBUG
)
def
backup_test
(
partitions
=
1
,
upstream_kw
=
{},
backup_kw
=
{}):
def
decorator
(
wrapped
):
...
...
@@ -53,40 +59,6 @@ def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
return
decorator
"""
# handy tool to get various ids of a cluster in tests
# XXX move to NEOCluster ?
class IDs:
def __init__(self, cluster):
self.cluster = cluster
def _recovery(self):
return self.cluster.neoctl.getRecovery()
@property
def ptid(self):
return self._recovery()[0]
@property
def backup_tid(self):
return self._recovery()[1]
@property
def truncated_tid(self):
return self._recovery()[2]
@property
def last_tid(self):
return self.cluster.master.getLastTransaction()
# XXX and attributes
@property
def cluster_state(self):
return self.cluster.neoctl.getClusterState()
"""
class
ReplicationTests
(
NEOThreadedTest
):
...
...
@@ -547,5 +519,42 @@ class ReplicationTests(NEOThreadedTest):
checker
.
CHECK_COUNT
=
CHECK_COUNT
cluster
.
stop
()
@
backup_test
()
def
testBackupReadAccess
(
self
,
backup
):
"""Check data can be read from backup cluster by clients"""
B
=
backup
U
=
B
.
upstream
S
=
U
.
getZODBStorage
()
Sb
=
B
.
getZODBStorage
()
oid_list
=
[]
tid_list
=
[]
for
i
in
xrange
(
10
):
# store new data to U
txn
=
transaction
.
Transaction
()
S
.
tpc_begin
(
txn
)
oid
=
S
.
new_oid
()
S
.
store
(
oid
,
None
,
'%s-%i'
%
(
oid
,
i
),
''
,
txn
)
S
.
tpc_vote
(
txn
)
tid
=
S
.
tpc_finish
(
txn
)
oid_list
.
append
(
oid
)
tid_list
.
append
(
tid
)
# make sure data propagated to B
self
.
tic
()
self
.
assertEqual
(
B
.
backup_tid
,
U
.
last_tid
)
self
.
assertEqual
(
B
.
last_tid
,
U
.
last_tid
)
self
.
assertEqual
(
1
,
self
.
checkBackup
(
B
))
# try to read data from B
Sb
.
_cache
.
clear
()
for
j
,
oid
in
enumerate
(
oid_list
):
data
=
Sb
.
load
(
oid
,
''
)
self
.
assertEqual
(
data
,
'%s-%s'
%
(
oid
,
j
))
#Sb.loadSerial(oid, tid)
#Sb.loadBefore(oid, tid)
if
__name__
==
"__main__"
:
unittest
.
main
()
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment