Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
W
wendelin.core
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Joshua
wendelin.core
Commits
0ed611a3
Commit
0ed611a3
authored
Jul 01, 2019
by
Kirill Smelkov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
.
parent
a384567c
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
95 additions
and
111 deletions
+95
-111
wcfs/__init__.py
wcfs/__init__.py
+70
-110
wcfs/wcfs_test.py
wcfs/wcfs_test.py
+25
-1
No files found.
wcfs/__init__.py
View file @
0ed611a3
...
...
@@ -38,7 +38,8 @@ import logging as log
from
os.path
import
dirname
from
errno
import
ENOENT
,
EEXIST
from
golang
import
go
,
chan
,
select
,
default
from
golang
import
chan
,
select
,
default
from
golang
import
sync
,
context
from
golang.gcompat
import
qq
from
ZODB.FileStorage
import
FileStorage
...
...
@@ -51,11 +52,16 @@ class Conn(object):
# .mountpoint path to wcfs mountpoint
# ._fwcfs /.wcfs/zurl opened to keep the server from going away (at least cleanly)
def
__init__
(
self
,
mountpoint
,
fwcfs
):
# XXX 4testing only?
# ._proc wcfs process if it was opened by this conn | None
def
__init__
(
self
,
mountpoint
,
fwcfs
,
proc
):
self
.
mountpoint
=
mountpoint
self
.
_fwcfs
=
fwcfs
self
.
_proc
=
proc
def
close
(
self
):
# XXX unmount wcfs as well?
self
.
_fwcfs
.
close
()
# open creates wcfs file handle, which can be mmaped to give data of ZBigFile.
...
...
@@ -171,7 +177,7 @@ def join(zurl, autostart=_default_autostart(), shared=False): # -> Conn
raise
else
:
# already have it
return
Conn
(
mntpt
,
f
)
return
Conn
(
mntpt
,
f
,
None
)
if
not
autostart
:
raise
RuntimeError
(
"wcfs: join %s: server not started"
%
zurl
)
...
...
@@ -190,113 +196,67 @@ def _start(zurl, *optv): # -> Conn
mntpt
=
_mntpt_4zurl
(
zurl
)
log
.
info
(
"wcfs: starting for %s ..."
,
zurl
)
cancel
=
chan
()
# cancels wcfs server running (and waitmounted in initialization phase)
startedok
=
chan
()
# indicates to wcfs server that whole startup was ok
# spawn spawns and monitors wcfs server. it is running until either wcfs
# server terminates, or cancel or startedok are ready.
ewcfs
=
chan
(
1
)
# err | None
def
spawn
():
err
=
None
try
:
argv
=
[
_wcfs_exe
()]
+
list
(
optv
)
+
[
zurl
,
mntpt
]
p
=
subprocess
.
Popen
(
argv
,
close_fds
=
True
)
while
1
:
ret
=
p
.
poll
()
if
ret
is
not
None
:
err
=
"spawn: exited with %s"
%
ret
break
_
,
_rx
=
select
(
cancel
.
recv
,
# 0
startedok
.
recv
,
# 1
default
,
# 2
)
if
_
==
0
:
p
.
terminate
()
break
if
_
==
1
:
# startup was ok - don't monitor spawned wcfs any longer
break
time
.
sleep
(
0.1
)
except
Exception
as
e
:
log
.
exception
(
"wcfs server"
)
err
=
"spawn: %s"
%
e
# XXX wrap with errctx
ewcfs
.
send
(
err
)
# waitmounted waits till wcfs mount is ready.
mounted
=
chan
(
1
)
# file | err
def
waitmounted
():
res
=
None
# XXX errctx
try
:
while
1
:
try
:
f
=
open
(
"%s/.wcfs/zurl"
%
mntpt
)
except
IOError
as
e
:
if
e
.
errno
!=
ENOENT
:
raise
else
:
res
=
f
dotwcfs
=
f
.
read
()
if
dotwcfs
!=
zurl
:
raise
RuntimeError
(
".wcfs/zurl != zurl (%s != %s)"
%
(
qq
(
dotwcfs
),
qq
(
zurl
)))
break
_
,
_rx
=
select
(
cancel
.
recv
,
# 0
default
,
# 1
)
if
_
==
0
:
res
=
"waitmounted: cancel"
break
time
.
sleep
(
0.1
)
except
Exception
as
e
:
res
=
"waitmounted: %s"
%
e
# XXX errctx
mounted
.
send
(
res
)
# spawn wcfs and wait till it is mounted.
go
(
spawn
)
go
(
waitmounted
)
_
,
_rx
=
select
(
ewcfs
.
recv
,
# 0
mounted
.
recv
,
# 1
)
if
_
==
0
:
# wcfs error
err
=
_rx
if
_
==
1
:
if
isinstance
(
_rx
,
file
):
# mounted ok - return Conn object.
#
# NOTE: we tell `spawn` thread to exit and stop monitoring spawned
# wcfs, because we want spawned wcfs to potentially overlive our
# process and to serve other processes. For the same reason we do
# not preserve cancel channel in returned Conn.
f
=
_rx
startedok
.
close
()
return
Conn
(
mntpt
,
f
)
# waitmounted error
err
=
_rx
cancel
.
close
()
raise
RuntimeError
(
"wcfs: start: %s"
%
err
)
# XXX errctx
# XXX errctx "wcfs: start"
# spawn wcfs and wait till filesystem-level access to it is ready
conn
=
Conn
(
mntpt
,
None
,
None
)
wg
=
sync
.
WorkGroup
(
context
.
background
())
fsready
=
chan
()
def
_
(
ctx
):
# XXX errctx "spawn"
argv
=
[
_wcfs_exe
()]
+
list
(
optv
)
+
[
zurl
,
mntpt
]
proc
=
subprocess
.
Popen
(
argv
,
close_fds
=
True
)
while
1
:
ret
=
proc
.
poll
()
if
ret
is
not
None
:
raise
"exited with %s"
%
ret
_
,
_rx
=
select
(
ctx
.
done
().
recv
,
# 0
fsready
.
recv
,
# 1
default
,
# 2
)
if
_
==
0
:
proc
.
terminate
()
raise
ctx
.
err
()
if
_
==
1
:
# startup was ok - don't monitor spawned wcfs any longer
conn
.
_proc
=
proc
return
time
.
sleep
(
0.1
)
wg
.
go
(
_
)
def
_
(
ctx
):
# XXX errctx "waitmount"
while
1
:
try
:
f
=
open
(
"%s/.wcfs/zurl"
%
mntpt
)
except
IOError
as
e
:
if
e
.
errno
!=
ENOENT
:
raise
else
:
dotwcfs
=
f
.
read
()
if
dotwcfs
!=
zurl
:
raise
RuntimeError
(
".wcfs/zurl != zurl (%s != %s)"
%
(
qq
(
dotwcfs
),
qq
(
zurl
)))
conn
.
_fwcfs
=
f
fsready
.
close
()
return
_
,
_rx
=
select
(
ctx
.
done
().
recv
,
# 0
default
,
# 1
)
if
_
==
0
:
raise
ctx
.
err
()
time
.
sleep
(
0.1
)
wg
.
go
(
_
)
wg
.
wait
()
return
conn
# _wcfs_exe returns path to wcfs executable.
...
...
wcfs/wcfs_test.py
View file @
0ed611a3
...
...
@@ -78,7 +78,7 @@ def setup_function(f):
# make sure we unmount wcfs after every test.
def
teardown_function
(
f
):
mounted
=
not
subprocess
.
call
([
"mountpoint"
,
"-q"
,
testmntpt
]
)
mounted
=
(
0
==
subprocess
.
call
([
"mountpoint"
,
"-q"
,
testmntpt
])
)
if
mounted
:
subprocess
.
check_call
([
"fusermount"
,
"-u"
,
testmntpt
])
if
os
.
path
.
exists
(
testmntpt
):
...
...
@@ -188,7 +188,10 @@ class DFile:
class
tDB
:
def
__init__
(
t
):
t
.
root
=
testdb
.
dbopen
()
assert
not
os
.
path
.
exists
(
testmntpt
)
t
.
wc
=
wcfs
.
join
(
testzurl
,
autostart
=
True
)
assert
os
.
path
.
exists
(
testmntpt
)
assert
0
==
subprocess
.
call
([
"mountpoint"
,
"-q"
,
testmntpt
])
# ZBigFile(s) scheduled for commit
t
.
_changed
=
{}
# ZBigFile -> {} blk -> data
...
...
@@ -227,6 +230,27 @@ class tDB:
# it also prints change history to help developer overview current testcase.
@
func
def
close
(
t
):
# unmount and wait for wcfs to exit
def
_
():
assert
0
==
subprocess
.
call
([
"mountpoint"
,
"-q"
,
testmntpt
])
subprocess
.
check_call
([
"fusermount"
,
"-u"
,
testmntpt
])
wg
=
sync
.
WorkGroup
(
timeout
())
def
_
(
ctx
):
while
1
:
if
ready
(
ctx
.
done
()):
raise
ctx
.
err
()
ret
=
t
.
wc
.
_proc
.
poll
()
if
ret
is
not
None
:
return
tdelay
()
wg
.
go
(
_
)
wg
.
wait
()
assert
0
!=
subprocess
.
call
([
"mountpoint"
,
"-q"
,
testmntpt
])
os
.
rmdir
(
testmntpt
)
defer
(
_
)
defer
(
t
.
dump_history
)
for
tf
in
t
.
_files
.
copy
():
tf
.
close
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment