Commit 89d653c0 authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: Implement protection against faulty client + related fixes and improvements

The WCFS documentation specifies [1]:

- - - 8> - - - 8> - - -

If a client, on purpose or due to a bug or being stopped, is slow to respond
with ack to file invalidation notification, it creates a problem because the
server will become blocked waiting for pin acknowledgments, and thus all
other clients, that try to work with the same file, will get stuck.

[...]

Lacking OS primitives to change address space of another process and not
being able to work it around with ptrace in userspace, wcfs takes approach
to kill a slow client on 30 seconds timeout by default.

- - - <8 - - - <8 - - -

But before, this protection wasn't implemented yet: one
faulty client could therefore freeze the whole system. With this work
this protection is implemented now: faulty clients are killed after the
timeout or any other misbehaviour in their pin handlers.

Working on this topic also resulted in several fixes and improvements
around isolation protocol implementation on the server side.

See individual patches for details.

[1] https://lab.nexedi.com/nexedi/wendelin.core/blob/38dde766/wcfs/wcfs.go#L186-208Co-authored-by: Levin Zimmermann's avatarLevin Zimmermann <levin.zimmermann@nexedi.com>

/reviewed-on nexedi/wendelin.core!18
parents 79e6f7b9 1fcef9c9
module lab.nexedi.com/nexedi/wendelin.core/wcfs
go 1.14
go 1.19
require (
github.com/golang/glog v1.0.0
......@@ -8,11 +8,34 @@ require (
github.com/johncgriffin/overflow v0.0.0-20211019200055-46fa312c352c
github.com/kisielk/og-rek v1.2.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
github.com/shirou/gopsutil/v4 v4.24.8
github.com/stretchr/testify v1.9.0
lab.nexedi.com/kirr/go123 v0.0.0-20230822135329-95433de34faf
lab.nexedi.com/kirr/neo/go v0.0.0-20240723085959-839ee634bd66
)
require (
crawshaw.io/sqlite v0.3.2 // indirect
github.com/DataDog/czlib v0.0.0-20210322182103-8087f4e14ae7 // indirect
github.com/cznic/strutil v0.0.0-20181122101858-275e90344537 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/shamaton/msgpack v1.2.1 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/someonegg/gocontainer v1.0.0 // indirect
github.com/tinylib/msgp v1.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
golang.org/x/sys v0.24.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
// we use kirr/go-fuse@y/nodefs-cancel
// see https://github.com/hanwen/go-fuse/pull/343 for details
replace github.com/hanwen/go-fuse/v2 v2.4.2 => lab.nexedi.com/kirr/go-fuse/v2 v2.4.2-0.20231211215333-9f9ad4a1c7cc
......@@ -7,7 +7,6 @@ crawshaw.io/sqlite v0.3.2/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/czlib v0.0.0-20210322182103-8087f4e14ae7 h1:6ZJZdzkbvKb6HRXmZ12ICZ0IbqfR+0Cd2C0IutWHHIA=
github.com/DataDog/czlib v0.0.0-20210322182103-8087f4e14ae7/go.mod h1:ROY4muaTWpoeQAx/oUkvxe9zKCmgU5xDGXsfEbA+omc=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
......@@ -28,6 +27,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
......@@ -49,15 +50,14 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gwenn/gosqlite v0.0.0-20211101095637-b18efb2e44c8 h1:sWkgaGez8CNa2KHGBTTop16/mC03VP6MDqPKfvhEmCU=
github.com/gwenn/gosqlite v0.0.0-20211101095637-b18efb2e44c8/go.mod h1:WBYs9HfQGOYDCz7rFwMk7aHkbTTB0cUkQe3pZQARvIg=
github.com/gwenn/yacr v0.0.0-20200110180258-a66d8c42d0ff/go.mod h1:5SNcBGxZ5OaJAMJCSI/x3V7SGsvXqbwnwP/sHZLgYsw=
github.com/gwenn/yacr v0.0.0-20211101095056-492fb0c571bc h1:AUv494HF3D9ht26o89DuJjqM9QDHwZeYCNU/JSU5jqI=
github.com/gwenn/yacr v0.0.0-20211101095056-492fb0c571bc/go.mod h1:Ps/gikIXcn2rRmeP0HQ9EvUYJrfrjAi51Wg8acsrkP0=
github.com/hanwen/go-fuse/v2 v2.2.0 h1:jo5QZYmBLNcl9ovypWaQ5yXMSSV+Ch68xoC3rtZvvBM=
github.com/hanwen/go-fuse/v2 v2.2.0/go.mod h1:B1nGE/6RBFyBRC1RRnf23UpwCdyJ31eukw34oAKukAc=
github.com/johncgriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:2n/HCxBM7oa5PNCPKIhV26EtJkaPXFfcVojPAT3ujTU=
github.com/johncgriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:B9OPZOhZ3FIi6bu54lAgCMzXLh11Z7ilr3rOr/ClP+E=
github.com/kisielk/og-rek v1.2.0 h1:CTvDIin+YnetsSQAYbe+QNAxXU3B50C5hseEz8xEoJw=
......@@ -72,6 +72,9 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78=
github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI=
github.com/philhofer/fwd v1.1.1 h1:GdGcTjf5RNAxwS4QLsiMzJYj5KEvPJD3Abr261yRQXQ=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
......@@ -79,32 +82,37 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/shamaton/msgpack v1.2.1 h1:40cwW7YAEdOIxcxIsUkAxSMUyYWZUyNiazI5AyiBntI=
github.com/shamaton/msgpack v1.2.1/go.mod h1:ibiaNQRTCUISAYkkyOpaSCEBiCAxXe6u6Mu1sQ6945U=
github.com/shirou/gopsutil/v4 v4.24.8 h1:pVQjIenQkIhqO81mwTaXjTzOMT7d3TZkf43PlVFHENI=
github.com/shirou/gopsutil/v4 v4.24.8/go.mod h1:wE0OrJtj4dG+hYkxqDH3QiBICdKSf04/npcvLLc/oRg=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/someonegg/gocontainer v1.0.0 h1:9MMUFbQf7g+g9sMG4ggBHPDS1+Iz+wd9Ee/O4BNRdw0=
github.com/someonegg/gocontainer v1.0.0/go.mod h1:zGJcXRK0ikzEYPFKTaFXi6UU/ulNuJypfADX4UQGtMw=
github.com/someonegg/gox v1.0.0/go.mod h1:pngAcWxBFnyYM4oY+h9Rgv0WaikLkSfY5dBYuabUnBE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tinylib/msgp v1.1.6 h1:i+SbKraHhnrf9M5MYmvQhFnbLhAXSDWF8WWsuyRdocw=
github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
......@@ -137,16 +145,20 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210301091718-77cc2087c03b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211111213525-f221eed1c01e h1:zeJt6jBtVDK23XK9QXcmG0FvO0elikp0dYZQZOeL1y0=
golang.org/x/sys v0.0.0-20211111213525-f221eed1c01e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
......@@ -193,40 +205,15 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
lab.nexedi.com/kirr/go-fuse/v2 v2.0.0-20210910085851-e6ee85fd0a1e h1:QP8PhLssUs3SEoM+UfQLxfDke7uQtyte4FNu6cw00L4=
lab.nexedi.com/kirr/go-fuse/v2 v2.0.0-20210910085851-e6ee85fd0a1e/go.mod h1:B1nGE/6RBFyBRC1RRnf23UpwCdyJ31eukw34oAKukAc=
lab.nexedi.com/kirr/go-fuse/v2 v2.1.1-0.20221221150221-643585d735fa h1:fd1Vl/L1Y65P39U/Oi2kGzo4MOmra1v2wjfbNsH6fns=
lab.nexedi.com/kirr/go-fuse/v2 v2.1.1-0.20221221150221-643585d735fa/go.mod h1:B1nGE/6RBFyBRC1RRnf23UpwCdyJ31eukw34oAKukAc=
lab.nexedi.com/kirr/go-fuse/v2 v2.3.1-0.20230614151023-eb4d413d568b h1:RoJ56sezXJW76DDZMZ4S5y8SX0VjEOG7VpRB45Om2NI=
lab.nexedi.com/kirr/go-fuse/v2 v2.3.1-0.20230614151023-eb4d413d568b/go.mod h1:xKwi1cF7nXAOBCXujD5ie0ZKsxc8GGSA1rlMJc+8IJs=
lab.nexedi.com/kirr/go-fuse/v2 v2.3.1-0.20230724151956-aebdd447543b h1:yNmHPgNG3yyBNQE5j0uGkf+kmMKG2CFx3cHB48RIvX4=
lab.nexedi.com/kirr/go-fuse/v2 v2.3.1-0.20230724151956-aebdd447543b/go.mod h1:xKwi1cF7nXAOBCXujD5ie0ZKsxc8GGSA1rlMJc+8IJs=
lab.nexedi.com/kirr/go-fuse/v2 v2.4.2-0.20231211215333-9f9ad4a1c7cc h1:OBs7UB6zsBEdZJs1/Bou+5yeBAZlga2IcoZVa1M0/hE=
lab.nexedi.com/kirr/go-fuse/v2 v2.4.2-0.20231211215333-9f9ad4a1c7cc/go.mod h1:xKwi1cF7nXAOBCXujD5ie0ZKsxc8GGSA1rlMJc+8IJs=
lab.nexedi.com/kirr/go123 v0.0.0-20211124154638-01e8697d1901/go.mod h1:pwDpdCuvtz0QxisDzV/z9eUb9zc/rMQec520h4i8VWQ=
lab.nexedi.com/kirr/go123 v0.0.0-20221005052354-179529a731c0 h1:WdmGIknGmIc+R0dcrJlCf0Oy7SQ5UuS94eg8VSH6N1E=
lab.nexedi.com/kirr/go123 v0.0.0-20221005052354-179529a731c0/go.mod h1:pwDpdCuvtz0QxisDzV/z9eUb9zc/rMQec520h4i8VWQ=
lab.nexedi.com/kirr/go123 v0.0.0-20221221144149-4def45d2dd95 h1:BEgSdlZD7lS3tWmBF+TLwWl29N37gXvHrhlwrGuvEFM=
lab.nexedi.com/kirr/go123 v0.0.0-20221221144149-4def45d2dd95/go.mod h1:pwDpdCuvtz0QxisDzV/z9eUb9zc/rMQec520h4i8VWQ=
lab.nexedi.com/kirr/go123 v0.0.0-20230205112549-0399d7ad8db9 h1:shGiikNVHWS0+4+Pnq+5bpVLZ5LywT70Ubq+qvn6B2w=
lab.nexedi.com/kirr/go123 v0.0.0-20230205112549-0399d7ad8db9/go.mod h1:pwDpdCuvtz0QxisDzV/z9eUb9zc/rMQec520h4i8VWQ=
lab.nexedi.com/kirr/go123 v0.0.0-20230714134036-155506a9880e h1:EB5l7YM2WxDXMeCtYR4SHw30GYcE8lWwu+a0bgrSJVs=
lab.nexedi.com/kirr/go123 v0.0.0-20230714134036-155506a9880e/go.mod h1:pwDpdCuvtz0QxisDzV/z9eUb9zc/rMQec520h4i8VWQ=
lab.nexedi.com/kirr/go123 v0.0.0-20230822135329-95433de34faf h1:UwAEraoydFHxDqudGYtxBpyXbt9SqInI657OKf9VMJc=
lab.nexedi.com/kirr/go123 v0.0.0-20230822135329-95433de34faf/go.mod h1:pwDpdCuvtz0QxisDzV/z9eUb9zc/rMQec520h4i8VWQ=
lab.nexedi.com/kirr/neo/go v0.0.0-20221104095432-3e13fa061a13 h1:Cz9Iiz5oA05lG6udEd7FMKfADeFd9K0dmdpj7aTib0A=
lab.nexedi.com/kirr/neo/go v0.0.0-20221104095432-3e13fa061a13/go.mod h1:0Wk1qKrdjMWr8njsuBIbgPMBNjPlNFozuBDe15Lqq6A=
lab.nexedi.com/kirr/neo/go v0.0.0-20230524100036-4c9414ea9a03 h1:p2+Affqxn6i7MHLY3/r3CudQkq77mg7x9RPiBwRrGF4=
lab.nexedi.com/kirr/neo/go v0.0.0-20230524100036-4c9414ea9a03/go.mod h1:0Wk1qKrdjMWr8njsuBIbgPMBNjPlNFozuBDe15Lqq6A=
lab.nexedi.com/kirr/neo/go v0.0.0-20230802174919-db81e0de796a h1:2V+p3nvFyW5l46sdCS/nWzy4/ZqVjTZiuKKTJU5apxs=
lab.nexedi.com/kirr/neo/go v0.0.0-20230802174919-db81e0de796a/go.mod h1:0Wk1qKrdjMWr8njsuBIbgPMBNjPlNFozuBDe15Lqq6A=
lab.nexedi.com/kirr/neo/go v0.0.0-20240721193236-95572d6ade90 h1:FG8pBViB6I8pLqVhL5y+IZ+mmehfmNK47PjyYmG2sRs=
lab.nexedi.com/kirr/neo/go v0.0.0-20240721193236-95572d6ade90/go.mod h1:0Wk1qKrdjMWr8njsuBIbgPMBNjPlNFozuBDe15Lqq6A=
lab.nexedi.com/kirr/neo/go v0.0.0-20240723085959-839ee634bd66 h1:xAQcab3p0CiJ36aLqNGB8kH6hpsalgOnEL9D5CZgoN0=
lab.nexedi.com/kirr/neo/go v0.0.0-20240723085959-839ee634bd66/go.mod h1:0Wk1qKrdjMWr8njsuBIbgPMBNjPlNFozuBDe15Lqq6A=
# Copyright (C) 2019-2021 Nexedi SA and Contributors.
# Copyright (C) 2019-2024 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -33,7 +33,7 @@ from posix.types cimport off_t
from cpython.exc cimport PyErr_SetFromErrno
from golang cimport chan, pychan, select, panic, topyexc, cbool
from golang cimport chan, pychan, select, panic, topyexc, cbool, structZ
from golang cimport sync, time
# _tWCFS is pyx part of tWCFS.
......@@ -53,16 +53,15 @@ cdef class _tWCFS:
# but pin handler is failing one way or another - select will wake-up
# but, if _abort_ontimeout uses GIL, won't continue to run trying to lock
# GIL -> deadlock.
def _abort_ontimeout(_tWCFS t, int fdabort, double dt, pychan nogilready not None):
cdef chan[double] timeoutch = time.after(dt)
def _abort_ontimeout(_tWCFS t, int fdabort, double dt, pychan timeoutch not None, pychan nogilready not None):
emsg1 = "\nC: test timed out after %.1fs\n" % (dt / time.second)
cdef char *_emsg1 = emsg1
with nogil:
# tell main thread that we entered nogil world
nogilready.chan_structZ().close()
t.__abort_ontimeout(dt, timeoutch, fdabort, _emsg1)
t.__abort_ontimeout(timeoutch.chan_structZ(), fdabort, _emsg1)
cdef void __abort_ontimeout(_tWCFS t, double dt, chan[double] timeoutch,
cdef void __abort_ontimeout(_tWCFS t, chan[structZ] timeoutch,
int fdabort, const char *emsg1) nogil except +topyexc:
_ = select([
timeoutch.recvs(), # 0
......
// Copyright (C) 2018-2021 Nexedi SA and Contributors.
// Copyright (C) 2018-2024 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -25,17 +25,21 @@ import (
"fmt"
"io"
"math"
"os"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
log "github.com/golang/glog"
"github.com/shirou/gopsutil/v4/process"
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/hanwen/go-fuse/v2/fuse/nodefs"
"github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xio"
"lab.nexedi.com/kirr/neo/go/zodb"
......@@ -321,11 +325,16 @@ func NewFileSock() *FileSock {
// The handle should be given to kernel as result of a file open, for that file
// to be connected to the socket.
func (sk *FileSock) File() nodefs.File {
return WithOpenStreamFlags(sk.file)
}
// WithOpenStreamFlags wraps file handle with FUSE flags needed when opening stream IO.
func WithOpenStreamFlags(file nodefs.File) nodefs.File {
// nonseekable & directio for opened file to have streaming semantic as
// if it was a socket. FOPEN_STREAM is used so that both read and write
// could be run simultaneously: git.kernel.org/linus/10dce8af3422
return &nodefs.WithFlags{
File: sk.file,
File: file,
FuseFlags: fuse.FOPEN_STREAM | fuse.FOPEN_NONSEEKABLE | fuse.FOPEN_DIRECT_IO,
}
}
......@@ -428,7 +437,14 @@ func (f *skFile) Release() {
}
// ---- parsing ----
// fatalEIO switches filesystem into EIO mode and terminates the program.
func fatalEIO() {
// log.Fatal terminates the program and so any attempt to access
// was-mounted filesystem starts to return ENOTCONN
log.Fatal("switching filesystem to EIO mode")
}
// ---- parsing / formatting ----
// parseWatchFrame parses line going through /head/watch into (stream, msg)
//
......@@ -489,6 +505,17 @@ func parseWatch(msg string) (oid zodb.Oid, at zodb.Tid, err error) {
return oid, at, nil
}
// isoRevstr returns string form of revision as used in isolation protocol.
//
// It is almost the same as standard string form of ZODB revision except that
// zodb.TidMax is represented as "head".
func isoRevstr(rev zodb.Tid) string {
if rev == zodb.TidMax {
return "head"
}
return rev.String()
}
// ---- make df happy (else it complains "function not supported") ----
func (root *Root) StatFs() *fuse.StatfsOut {
......@@ -515,3 +542,87 @@ func (root *Root) StatFs() *fuse.StatfsOut {
func panicf(format string, argv ...interface{}) {
panic(fmt.Sprintf(format, argv...))
}
// findAliveProces lookups process by pid and makes sure it is alive.
//
// NOTE: starting from go1.23 it, via os.FindProcess, uses pidfd which avoids potential
// race of later signalling to pid of already long-gone and replaced process.
func findAliveProcess(pid int) (_ *os.Process, err error) {
defer xerr.Contextf(&err, "findAlive pid%d", pid)
proc, err := os.FindProcess(pid)
if err != nil {
return nil, err
}
// verify that found process is actually good because
// os.FindProcess returns "done" stub instead of an error
alive, err := isProcessAlive(proc)
if err != nil {
return nil, err
}
if !alive {
proc.Release()
return nil, syscall.ESRCH
}
return proc, nil
}
// isProcessAlive returns whether process is alive or not.
func isProcessAlive(proc *os.Process) (_ bool, err error) {
defer xerr.Contextf(&err, "isAlive pid%d", proc.Pid)
// verify that proc's pid exists
// proc.Signal(0) returns ok even for zombie, but zombie is not alive
err = proc.Signal(syscall.Signal(0))
if err != nil {
var e syscall.Errno
if errors.As(err, &e) && e == syscall.EPERM {
return false, err
}
return false, nil
}
// pid exists. Check if proc is not zombie
gproc, err := process.NewProcess(int32(proc.Pid))
if err != nil {
return false, err
}
statusv, err := gproc.Status()
if err != nil {
return false, err
}
for _, status := range statusv {
if status == process.Zombie {
return false, nil
}
}
return true, nil
}
// waitProcessEnd waits for process to end.
//
// Contrary to os.Process.Wait it does not require the caller to be a parent of proc.
func waitProcessEnd(ctx context.Context, proc *os.Process) (_ bool, err error) {
defer xerr.Contextf(&err, "waitEnd pid%d", proc.Pid)
tick := time.NewTicker(100*time.Millisecond)
defer tick.Stop()
for {
alive, err := isProcessAlive(proc)
if err != nil {
return false, err
}
if !alive {
return true, nil
}
select {
case <-ctx.Done():
return false, ctx.Err()
case <-tick.C:
// ok
}
}
}
// Copyright (C) 2018-2022 Nexedi SA and Contributors.
// Copyright (C) 2018-2024 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -205,7 +205,16 @@
//
// Lacking OS primitives to change address space of another process and not
// being able to work it around with ptrace in userspace, wcfs takes approach
// to kill a slow client on 30 seconds timeout by default.
// to kill a slow or faulty client on 30 seconds timeout or on any other pin
// handling error. This way wcfs achieves progress and safety properties:
// processing does not get stuck even if there is a hung client, and there is
// no corruption in the data that is provided to all live and well-behaving
// clients.
//
// Killing a client with SIGBUS is similar to how OS kernel sends SIGBUS when
// a memory-mapped file is accessed and loading file data results in EIO. It is
// also similar to wendelin.core 1 where SIGBUS is raised if loading file block
// results in an error.
//
//
// Writes
......@@ -542,6 +551,12 @@ type Root struct {
// directories + ZODB connections for @<rev>/
revMu sync.Mutex
revTab map[zodb.Tid]*Head
// time budget for a client to handle pin notification
pinTimeout time.Duration
// collected statistics
stats *Stats
}
// /(head|<rev>)/ - served by Head.
......@@ -674,6 +689,15 @@ type WatchLink struct {
txMu sync.Mutex
rxMu sync.Mutex
rxTab map[/*stream*/uint64]chan string // client replies go via here
// serve operates under .serveCtx and can be requested to stop via serveCancel
serveCtx context.Context
serveCancel context.CancelFunc
down1 sync.Once
down chan struct{} // ready after shutdown completes
pinWG sync.WaitGroup // all pin handlers are accounted here
client *os.Process // client that opened the WatchLink
}
// Watch represents watching for changes to 1 BigFile over particular watch link.
......@@ -681,6 +705,12 @@ type Watch struct {
link *WatchLink // link to client
file *BigFile // watching this file
// setupMu is used to allow only 1 watch request for particular file to
// be handled simultaneously for particular client. It complements atMu
// by continuing to protect setupWatch from another setupWatch when
// setupWatch non-atomically downgrades atMu.W to atMu.R .
setupMu sync.Mutex
// atMu, similarly to zheadMu, protects watch.at and pins associated with Watch.
// atMu.R guarantees that watch.at is not changing, but multiple
// simultaneous pins could be running (used e.g. by readPinWatchers).
......@@ -704,6 +734,15 @@ type blkPinState struct {
err error
}
// Stats keeps collected statistics.
//
// The statistics is accessible via .wcfs/stats file served by _wcfs_Stats.
type Stats struct {
pin atomic.Int64 // # of times wcfs issued pin request
pinkill atomic.Int64 // # of times a client was killed due to badly handling pin
}
// -------- ZODB cache control --------
// zodbCacheControl implements zodb.LiveCacheControl to tune ZODB to never evict
......@@ -960,6 +999,7 @@ retry:
}
// notify .wcfs/zhead
gdebug.zheadSockTabMu.Lock()
for sk := range gdebug.zheadSockTab {
_, err := fmt.Fprintf(xio.BindCtxW(sk, ctx), "%s\n", δZ.Tid)
if err != nil {
......@@ -968,15 +1008,15 @@ retry:
delete(gdebug.zheadSockTab, sk)
}
}
gdebug.zheadSockTabMu.Unlock()
// shrink δFtail not to grow indefinitely.
// cover history for at least 1 minute, but including all watches.
// No need to lock anything because we are holding zheadMu and
// setupWatch too runs with zheadMu locked.
//
// TODO shrink δFtail only once in a while - there is no need to compute
// revCut and cut δFtail on every transaction.
revCut := zodb.TidFromTime(zhead.At().Time().Add(-1*time.Minute))
head.wlinkMu.Lock()
for wlink := range head.wlinkTab {
for _, w := range wlink.byfile {
if w.at < revCut {
......@@ -984,6 +1024,7 @@ retry:
}
}
}
head.wlinkMu.Unlock()
bfdir.δFtail.ForgetPast(revCut)
// notify zhead.At waiters
......@@ -1277,7 +1318,6 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
δFtail.Track(f.zfile, blk, treepath, blkcov, zblk)
// we have the data - it can be used after watchers are updated
// XXX should we use ctx here? (see readPinWatchers comments)
err = f.readPinWatchers(ctx, blk, blkrevMax)
if err != nil {
blkdata = nil
......@@ -1410,21 +1450,65 @@ func traceIso(format string, argv ...interface{}) {
// rev = zodb.TidMax means @head; otherwise rev must be ≤ w.at and there must
// be no rev_next changing file[blk]: rev < rev_next ≤ w.at.
//
// must be called with atMu rlocked.
// Pinning works under WatchLink.serveCtx + pinTimeout instead of explicitly
// specified context because pinning is critical operation whose failure leads
// to client being SIGBUS'ed and so pinning should not be interrupted arbitrarily.
//
// Corresponding watchlink is shutdown on any error.
//
// TODO close watch on any error
func (w *Watch) pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
defer xerr.Contextf(&err, "wlink%d: f<%s>", w.link.id, w.file.zfile.POid())
return w._pin(ctx, blk, rev)
// No error is returned as the only error that pin cannot handle itself inside
// is considered to be fatal and the filesystem is switched to EIO mode on that.
// See badPinKill documentation for details.
//
// pin is invoked by BigFile.readPinWatchers . It is called with atMu rlocked.
func (w *Watch) pin(blk int64, rev zodb.Tid) {
w._pin(w.link.serveCtx, blk, rev)
}
func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
foid := w.file.zfile.POid()
revstr := rev.String()
if rev == zodb.TidMax {
revstr = "head"
// _pin serves pin and is also invoked directly by WatchLink.setupWatch .
//
// It is invoked with ctx being either WatchLink.serveCtx or descendant of it.
// In all cases it is called with atMu rlocked.
func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) {
if ctx.Err() != nil {
return // don't enter pinWG if watchlink is down
}
defer xerr.Contextf(&err, "pin #%d @%s", blk, revstr)
w.link.pinWG.Add(1)
defer w.link.pinWG.Done()
ctx, cancel := context.WithTimeout(ctx, groot.pinTimeout)
defer cancel()
err := w.__pin(ctx, blk, rev)
if err != nil {
w.link.shutdown(err)
}
}
// PinError indicates to WatchLink shutdown that pinning a block failed and so
// badPinKill needs to be run.
type PinError struct {
blk int64
rev zodb.Tid
err error
}
func (e *PinError) Error() string {
return fmt.Sprintf("pin #%d @%s: %s", e.blk, isoRevstr(e.rev), e.err)
}
func (e *PinError) Unwrap() error {
return e.err
}
func (w *Watch) __pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
defer func() {
if err != nil {
err = &PinError{blk, rev, err}
}
}()
foid := w.file.zfile.POid()
if !(rev == zodb.TidMax || rev <= w.at) {
panicf("f<%s>: wlink%d: pin #%d @%s: watch.at (%s) < rev",
......@@ -1470,7 +1554,8 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
// perform IO without w.pinnedMu
w.pinnedMu.Unlock()
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, revstr))
groot.stats.pin.Add(1)
ack, err := w.link.sendReq(ctx, fmt.Sprintf("pin %s #%d @%s", foid, blk, isoRevstr(rev)))
w.pinnedMu.Lock()
// check IO reply & verify/signal blkpin is ready
......@@ -1502,6 +1587,92 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
return nil
}
// badPinKill is invoked by shutdown to kill client that did not handle pin
// notification correctly and in time.
//
// Because proper pin handling is critical for safety it is considered to be a
// fatal error if the client could not be killed as wcfs no longer can
// continue to provide correct uncorrupted data to it. The filesystem is
// switched to EIO mode in such case.
func (wlink *WatchLink) badPinKill(reason error) {
pid := wlink.client.Pid
logf := func(format string, argv ...any) {
emsg := fmt.Sprintf("pid%d: ", pid)
emsg += fmt.Sprintf(format, argv...)
log.Error(emsg)
}
logf("client failed to handle pin notification correctly and timely in %s: %s", groot.pinTimeout, reason)
logf("-> killing it because else 1) all other clients will remain stuck, and 2) we no longer can provide correct data to the faulty client.")
logf(` (see "Protection against slow or faulty clients" in wcfs description for details)`)
err := wlink._badPinKill()
if err != nil {
logf("failed to kill it: %s", err)
logf("this is major unexpected event.")
fatalEIO()
}
logf("terminated")
groot.stats.pinkill.Add(1)
}
func (wlink *WatchLink) _badPinKill() error {
client := wlink.client
pid := client.Pid
// time budget for pin + wait + fatal-notify + kill = pinTimeout + 1 + 1/3·pinTimeout
// < 2 ·pinTimeout if pinTimeout > 3/2
//
// NOTE wcfs_faultyprot_test.py waits for 2·pinTimeout to reliably
// detect whether client was killed or not.
timeout := groot.pinTimeout/3
ctx := context.Background()
ctx1, cancel := context.WithTimeout(ctx, timeout*1/2)
defer cancel()
ctx2, cancel := context.WithTimeout(ctx, timeout*2/2)
defer cancel()
// SIGBUS => wait for some time; if still alive => SIGKILL
// TODO kirr: "The kernel then sends SIGBUS on such case with the details about
// access to which address generated this error going in si_addr field of
// siginfo structure. It would be good if we can mimic that behaviour to a
// reasonable extent if possible."
log.Errorf("pid%d: <- SIGBUS", pid)
err := client.Signal(syscall.SIGBUS)
if err != nil {
return err
}
ok, err := waitProcessEnd(ctx1, client)
if err != nil && !errors.Is(err, ctx1.Err()) {
return err
}
if ok {
return nil
}
log.Errorf("pid%d: is still alive after SIGBUS", pid)
log.Errorf("pid%d: <- SIGKILL", pid)
err = client.Signal(syscall.SIGKILL)
if err != nil {
return err
}
ok, err = waitProcessEnd(ctx2, client)
if err != nil && !errors.Is(err, ctx2.Err()) {
return err
}
if ok {
return nil
}
err = fmt.Errorf("is still alive after SIGKILL")
log.Errorf("pid%d: %s", pid, err)
return err
}
// readPinWatchers complements readBlk: it sends `pin blk` for watchers of the file
// after a block was loaded from ZODB but before block data is returned to kernel.
//
......@@ -1509,10 +1680,6 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
//
// Must be called only for f under head/
// Must be called with f.head.zheadMu rlocked.
//
// XXX do we really need to use/propagate caller context here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout (TODO).
// Should a READ interrupt cause watch update failure? -> probably no
func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb.Tid) (err error) {
defer xerr.Context(&err, "pin watchers") // f.path and blk is already put into context by readBlk
......@@ -1534,8 +1701,15 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb
blkrevRough := true
wg := xsync.NewWorkGroup(ctx)
defer func() {
err2 := wg.Wait()
if err == nil {
err = err2
}
}()
f.watchMu.RLock()
defer f.watchMu.RUnlock()
for w := range f.watchTab {
w := w
......@@ -1582,13 +1756,16 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb
return err
}
//fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev)
// TODO close watcher on any error
return w.pin(ctx, blk, pinrev)
// NOTE we do not propagate context to pin. Ideally update
// watchers should be synchronous, and in practice we just use 30s timeout.
// A READ interrupt should not cause watch update failure.
w.pin(blk, pinrev) // only fatal error
return nil
})
}
f.watchMu.RUnlock()
return wg.Wait()
return nil
}
// setupWatch sets up or updates a Watch when client sends `watch <file> @<at>` request.
......@@ -1645,6 +1822,10 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
}
}
// allow only 1 setupWatch to run simultaneously for particular file
w.setupMu.Lock()
defer w.setupMu.Unlock()
f := w.file
f.watchMu.Lock()
......@@ -1781,8 +1962,16 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// downgrade atMu.W -> atMu.R to let other clients to access the file.
// NOTE there is no primitive to do Wlock->Rlock atomically, but we are
// ok with that since we prepared everything to handle simultaneous pins
// from other reads.
// ok with that since:
//
// * wrt readPinWatchers we prepared everything to handle
// simultaneous pins from other reads.
// * wrt setupWatch we can still be sure that no another setupWatch
// started to run simultaneously during atMu.Unlock -> atMu.RLock
// because we still hold setupMu.
//
// ( for the reference: pygolang provides RWMutex.UnlockToRLock while go
// rejected it in golang.org/issues/38891 )
w.atMu.Unlock()
w.atMu.RLock()
defer w.atMu.RUnlock()
......@@ -1792,12 +1981,13 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
blk := blk
rev := rev
wg.Go(func(ctx context.Context) error {
return w._pin(ctx, blk, rev)
w._pin(ctx, blk, rev) // only fatal error
return nil
})
}
err = wg.Wait()
if err != nil {
return err
return err // should not fail
}
return nil
......@@ -1805,62 +1995,102 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
// Open serves /head/watch opens.
func (wnode *WatchNode) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
node, err := wnode.open(flags, fctx)
return node, err2LogStatus(err)
}
func (wnode *WatchNode) open(flags uint32, fctx *fuse.Context) (_ nodefs.File, err error) {
defer xerr.Contextf(&err, "/head/watch: open")
// TODO(?) check flags
head := wnode.head
// remember our client who opened the watchlink.
// We will need to kill the client if it will be e.g. slow to respond to pin notifications.
client, err := findAliveProcess(int(fctx.Caller.Pid))
if err != nil {
return nil, err
}
serveCtx, serveCancel := context.WithCancel(context.TODO() /*TODO ctx of wcfs running*/)
wlink := &WatchLink{
sk: NewFileSock(),
id: atomic.AddInt32(&wnode.idNext, +1),
head: head,
byfile: make(map[zodb.Oid]*Watch),
rxTab: make(map[uint64]chan string),
serveCtx: serveCtx,
serveCancel: serveCancel,
down: make(chan struct{}),
client: client,
}
head.wlinkMu.Lock()
// XXX del wlinkTab[w] on w.sk.File.Release
head.wlinkTab[wlink] = struct{}{}
head.wlinkMu.Unlock()
go wlink.serve()
return wlink.sk.File(), fuse.OK
go wlink.serve(serveCtx)
return wlink.sk.File(), nil
}
// shutdown shuts down communication over watchlink due to specified reason and
// marks the watchlink as no longer active.
//
// The client is killed if the reason is due to "failed to pin".
// Only the first shutdown call has the effect, but all calls wait for the
// actual shutdown to complete.
//
// NOTE shutdown can be invoked under atMu.R from pin.
func (wlink *WatchLink) shutdown(reason error) {
wlink.down1.Do(func() {
// mark wlink as down; this signals serve loop to exit and cancels all in-progress pins
wlink.serveCancel()
// give client a chance to be notified if shutdown was due to some logical error
kill := false
if reason != nil {
_, kill = reason.(*PinError)
emsg := "error: "
if kill {
emsg = "fatal: "
}
emsg += reason.Error()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_ = wlink.send(ctx, 0, emsg)
}
// kill client if shutdown is due to faulty pin handling
if kill {
wlink.badPinKill(reason) // only fatal error
}
// NOTE unregistering watches and wlink itself is done on serve exit, not
// here, to avoid AB-BA deadlock on atMu and e.g. WatchLink.byfileMu .
// It is ok to leave some watches still present in BigFile.watchTab
// until final cleanup because pin becomes noop on down watchlink.
close(wlink.down)
})
<-wlink.down
}
// serve serves client initiated watch requests and routes client replies to
// wcfs initiated pin requests.
func (wlink *WatchLink) serve() {
err := wlink._serve()
func (wlink *WatchLink) serve(ctx context.Context) {
err := wlink._serve(ctx)
if err != nil {
log.Error(err)
}
head := wlink.head
head.wlinkMu.Lock()
delete(head.wlinkTab, wlink)
head.wlinkMu.Unlock()
}
func (wlink *WatchLink) _serve() (err error) {
func (wlink *WatchLink) _serve(ctx context.Context) (err error) {
defer xerr.Contextf(&err, "wlink %d: serve rx", wlink.id)
ctx0 := context.TODO() // TODO ctx = merge(ctx of wcfs running, ctx of wlink timeout)
ctx, cancel := context.WithCancel(ctx0)
wg := xsync.NewWorkGroup(ctx)
r := bufio.NewReader(xio.BindCtxR(wlink.sk, ctx))
// final watchlink cleanup is done on serve exit
defer func() {
// cancel all handlers on both error and ok return.
// ( ok return is e.g. when we received "bye", so if client
// sends "bye" and some pin handlers are in progress - they
// anyway don't need to wait for client replies anymore )
cancel()
err2 := wg.Wait()
if err == nil {
err = err2
}
// unregister all watches created on this wlink
wlink.byfileMu.Lock()
for _, w := range wlink.byfile {
......@@ -1871,45 +2101,83 @@ func (wlink *WatchLink) _serve() (err error) {
wlink.byfile = nil
wlink.byfileMu.Unlock()
// write to peer if it was logical error on client side
// unregister wlink itself
head := wlink.head
head.wlinkMu.Lock()
delete(head.wlinkTab, wlink)
head.wlinkMu.Unlock()
// close .sk
// closing .sk.tx wakes up rx on client side.
err2 := wlink.sk.Close()
if err == nil {
err = err2
}
// release client process
wlink.client.Release()
}()
// watch handlers are spawned in dedicated workgroup
//
// Pin handlers are run either inside - for pins run from setupWatch, or,
// for pins run from readPinWatchers, under wlink.pinWG.
// Upon serve exit we cancel them all and wait for their completion.
wg := xsync.NewWorkGroup(ctx)
defer func() {
// cancel all watch and pin handlers on both error and ok return.
//
// For ok return, when we received "bye", we want to cancel
// in-progress pin handlers without killing clients. That's why
// we call shutdown ourselves.
//
// For error return, we want any in-progress, and so will
// become failed, pin handler to result in corresponding client
// to become killed. That's why we trigger only cancel
// ourselves and let failed pin handlers to invoke shutdown
// with their specific reason.
//
// NOTE this affects pin handlers invoked by both setupWatch and readPinWatchers.
if err != nil {
_ = wlink.send(ctx0, 0, fmt.Sprintf("error: %s", err))
wlink.serveCancel()
} else {
wlink.shutdown(nil)
}
// close .sk.tx : this wakes up rx on client side.
err2 = wlink.sk.CloseWrite()
// wait for setupWatch and pin handlers spawned from it to complete
err2 := wg.Wait()
if err == nil {
err = err2
}
// wait for all other pin handlers to complete
wlink.pinWG.Wait()
// make sure that shutdown is actually invoked if it was an
// error and there were no in-progress pin handlers
wlink.shutdown(err)
}()
// close .sk.rx on error/wcfs stopping or return: this wakes up read(sk).
retq := make(chan struct{})
defer close(retq)
// cancel main thread on any watch handler error
ctx, mainCancel := context.WithCancel(ctx)
defer mainCancel()
wg.Go(func(ctx context.Context) error {
// monitor is always canceled - either at parent ctx cancel, or
// upon return from serve (see "cancel all handlers ..." ^^^).
// If it was return - report returned error to wg.Wait, not "canceled".
// monitor is always canceled - either due to parent ctx cancel, error in workgroup,
// or return from serve and running "cancel all watch handlers ..." above.
<-ctx.Done()
e := ctx.Err()
select {
default:
case <-retq:
e = err // returned error
}
e2 := wlink.sk.CloseRead()
if e == nil {
e = e2
}
return e
mainCancel()
return nil
})
r := bufio.NewReader(xio.BindCtxR(wlink.sk, ctx))
for {
// NOTE r.Read is woken up by ctx cancel because wlink.sk implements xio.Reader natively
l, err := r.ReadString('\n') // TODO limit accepted line len to prevent DOS
if err != nil {
// r.Read is woken up by sk.CloseRead when serve decides to exit
if err == io.ErrClosedPipe || err == io.EOF {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
if errors.Is(err, ctx.Err()) {
err = nil
}
return err
......@@ -2331,10 +2599,10 @@ var gfsconn *nodefs.FileSystemConnector
// so we still have to reference the root via path.
var gmntpt string
// debugging (protected by zhead.W)
// debugging
var gdebug = struct {
// .wcfs/zhead opens
// protected by groot.head.zheadMu
zheadSockTabMu sync.Mutex
zheadSockTab map[*FileSock]struct{}
}{}
......@@ -2342,22 +2610,108 @@ func init() {
gdebug.zheadSockTab = make(map[*FileSock]struct{})
}
// _wcfs_Zhead serves .wcfs/zhead opens.
// _wcfs_Zhead serves .wcfs/zhead .
type _wcfs_Zhead struct {
fsNode
}
func (zh *_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
// _wcfs_ZheadH serves .wcfs/zhead opens.
type _wcfs_ZheadH struct {
nodefs.File // = .sk.file
sk *FileSock
}
func (*_wcfs_Zhead) Open(flags uint32, fctx *fuse.Context) (nodefs.File, fuse.Status) {
// TODO(?) check flags
sk := NewFileSock()
sk.CloseRead()
zh := &_wcfs_ZheadH{
File: sk.file,
sk: sk,
}
groot.head.zheadMu.Lock() // TODO +fctx -> cancel
defer groot.head.zheadMu.Unlock()
// TODO del zheadSockTab[sk] on sk.File.Release (= client drops opened handle)
gdebug.zheadSockTabMu.Lock() // TODO +fctx -> cancel
gdebug.zheadSockTab[sk] = struct{}{}
return sk.File(), fuse.OK
gdebug.zheadSockTabMu.Unlock()
return WithOpenStreamFlags(zh), fuse.OK
}
func (zh *_wcfs_ZheadH) Release() {
gdebug.zheadSockTabMu.Lock()
delete(gdebug.zheadSockTab, zh.sk)
gdebug.zheadSockTabMu.Unlock()
zh.File.Release()
}
// _wcfs_Stats serves .wcfs/stats reads.
//
// In the output:
//
// - entries that start with capital letter, e.g. "Watch", indicate current
// number of named instances. This numbers can go up and down.
//
// - entries that start with lowercase letter, e.g. "pin", indicate number of
// named event occurrences. This numbers are cumulative counters and should
// never go down.
func _wcfs_Stats(fctx *fuse.Context) ([]byte, error) {
stats := ""
num := func(name string, value any) {
stats += fmt.Sprintf("%s\t: %d\n", name, value)
}
root := groot
head := root.head
bfdir := head.bfdir
// dump information detected at runtime
root.revMu.Lock()
lenRevTab := len(root.revTab)
root.revMu.Unlock()
head.wlinkMu.Lock()
ΣWatch := 0
ΣPinnedBlk := 0
lenWLinkTab := len(head.wlinkTab)
for wlink := range head.wlinkTab {
wlink.byfileMu.Lock()
ΣWatch += len(wlink.byfile)
for _, w := range wlink.byfile {
w.atMu.RLock()
w.pinnedMu.Lock()
ΣPinnedBlk += len(w.pinned)
w.pinnedMu.Unlock()
w.atMu.RUnlock()
}
wlink.byfileMu.Unlock()
}
head.wlinkMu.Unlock()
head.zheadMu.RLock()
bfdir.fileMu.Lock()
lenFileTab := len(bfdir.fileTab)
bfdir.fileMu.Unlock()
head.zheadMu.RUnlock()
gdebug.zheadSockTabMu.Lock()
lenZHeadSockTab := len(gdebug.zheadSockTab)
gdebug.zheadSockTabMu.Unlock()
num("BigFile", lenFileTab) // # of head/BigFile
num("RevHead", lenRevTab) // # of @revX/ directories
num("ZHeadLink", lenZHeadSockTab) // # of open .wcfs/zhead handles
num("WatchLink", lenWLinkTab) // # of open watchlinks
num("Watch", ΣWatch) // # of setup watches
num("PinnedBlk", ΣPinnedBlk) // # of currently on-client pinned blocks
// dump information collected in root.stats
s := root.stats
num("pin", s.pin.Load())
num("pinkill", s.pinkill.Load())
return []byte(stats), nil
}
// TODO -> enable/disable fuse debugging dynamically (by write to .wcfs/debug ?)
......@@ -2376,6 +2730,7 @@ func main() {
func _main() (err error) {
debug := flag.Bool("d", false, "debug")
autoexit := flag.Bool("autoexit", false, "automatically stop service when there is no client activity")
pintimeout := flag.Duration("pintimeout", 30*time.Second, "clients are killed if they do not handle pin notification in pintimeout time")
flag.Parse()
if len(flag.Args()) != 2 {
......@@ -2463,6 +2818,8 @@ func _main() (err error) {
zdb: zdb,
head: head,
revTab: make(map[zodb.Tid]*Head),
pinTimeout: *pintimeout,
stats: &Stats{},
}
opts := &fuse.MountOptions{
......@@ -2470,8 +2827,6 @@ func _main() (err error) {
Name: "wcfs",
// We retrieve kernel cache in ZBlk.blksize chunks, which are 2MB in size.
// XXX currently go-fuse caps MaxWrite to 128KB.
// TODO -> teach go-fuse to handle Init.MaxPages (Linux 4.20+).
MaxWrite: 2*1024*1024,
// TODO(?) tune MaxReadAhead? MaxBackground?
......@@ -2520,6 +2875,7 @@ func _main() (err error) {
_wcfs := newFSNode(fSticky)
mkdir(root, ".wcfs", &_wcfs)
mkfile(&_wcfs, "zurl", NewStaticFile([]byte(zurl)))
mkfile(&_wcfs, "pintimeout", NewStaticFile([]byte(fmt.Sprintf("%.1f", float64(root.pinTimeout) / float64(time.Second)))))
// .wcfs/zhead - special file channel that sends zhead.at.
//
......@@ -2532,6 +2888,10 @@ func _main() (err error) {
fsNode: newFSNode(fSticky),
})
// .wcfs/stats - special file with collected statistics.
mkfile(&_wcfs, "stats", NewSmallFile(_wcfs_Stats))
// TODO handle autoexit
// (exit when kernel forgets all our inodes - wcfs.py keeps .wcfs/zurl
// opened, so when all inodes has been forgotten - we know all wcfs.py clients exited)
......@@ -2562,8 +2922,8 @@ func _main() (err error) {
err = root.zwatcher(serveCtx, zwatchq)
if errors.Cause(err) != context.Canceled {
log.Error(err)
log.Errorf("zwatcher failed -> switching filesystem to EIO mode (TODO)")
// TODO: switch fs to EIO mode
log.Error("zwatcher failed")
fatalEIO()
}
// wait for unmount
......
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2024 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""wcfs_faultyprot_test.py complements wcfs_test.py with tests that exercise
protection against slow/faulty clients in isolation protocol."""
from __future__ import print_function, absolute_import
from wendelin.lib.zodb import zstor_2zurl
from wendelin import wcfs
import sys, os, subprocess, traceback
import six
from golang import select, func, defer
from golang import context, sync, time
from pytest import mark, fixture
from wendelin.wcfs.wcfs_test import tDB, h, tAt, eprint, \
setup_module, teardown_module, setup_function, teardown_function
if six.PY2:
from _multiprocessing import Connection as MPConnection
else:
from multiprocessing.connection import Connection as MPConnection
# tests in this module require WCFS to promptly react to pin handler
# timeouts so that verifying WCFS killing logic does not take a lot of time.
@fixture
def with_prompt_pintimeout(monkeypatch):
tkill = 3*time.second
return monkeypatch.setenv("WENDELIN_CORE_WCFS_OPTIONS", "-pintimeout %.1fs" % tkill, prepend=" ")
# tSubProcess provides infrastructure to run a function in separate process.
#
# It runs f(cin, cout, *argv, **kw) in subprocess with cin and cout
# connected to parent via multiprocessing.Connection .
#
# It is similar to multiprocessing.Process in spawn mode that is available on py3.
# We need to use spawn mode - not fork - because fork does not work well when
# parent process is multithreaded, as many things, that are relying on the
# additional threads in the original process, stop to function in the forked
# child without additional care. For example pygolang timers and signals
# currently stop to work after the fork, and in general it is believed that in
# multithreaded programs the only safe thing to do after the fork is exec.
# Please see section "NOTES" in
#
# https://man7.org/linux/man-pages/man3/pthread_atfork.3.html
#
# for details about this issue.
class tSubProcess(object):
def __init__(proc, f, *argv, **kw):
exev = [sys.executable, '-c', 'from wendelin.wcfs import wcfs_faultyprot_test as t; '
't.tSubProcess._start(%r)' % f.__name__]
proc.popen = subprocess.Popen(exev, stdin=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True)
try:
proc.cin = MPConnection(proc.popen.stdin.fileno(), readable=False)
proc.cout = MPConnection(proc.popen.stdout.fileno(), writable=False)
proc.send(argv)
proc.send(kw)
except:
proc.popen.kill()
raise
# _start is trampoline ran in the subprocess to launch to user function.
@staticmethod
def _start(funcname):
cin = MPConnection(sys.stdin.fileno(), writable=False)
cout = MPConnection(sys.stdout.fileno(), readable=False)
argv = cin.recv()
kw = cin.recv()
f = globals()[funcname]
procname = kw.pop('_procname', f.__name__)
try:
f(cin, cout, *argv, **kw)
_ = 'END'
except BaseException as exc:
# dump traceback so it appears in the log because Traceback objects are not picklable
eprint("\nException in subprocess %s (pid%d):" % (procname, os.getpid()))
traceback.print_exc()
_ = exc
cout.send(_)
cout.close()
# close releases resources associated with subprocess.
def close(proc):
if proc.popen.returncode is None:
proc.popen.kill()
# exitcode returns subprocess exit code or None if subprocess has not yet terminated.
@property
def exitcode(proc):
return proc.popen.returncode
# join waits for the subprocess to end.
def join(proc, ctx):
gotend = False
goteof = False
joined = False
while not (goteof and joined):
if ctx.err() is not None:
raise ctx.err()
if not joined:
joined = (proc.popen.poll() is not None)
# recv from proc to see if it was END or exception
# make sure to recv at least once after joined to read buffered messages / exception
if goteof:
time.sleep(0.1*time.second)
else:
try:
_, ok = proc.tryrecv()
except EOFError:
goteof = True
else:
if ok:
if not gotend:
assert _ == 'END'
gotend = True
else:
raise AssertionError("got %r after END" % (_,))
# send sends object to subprocess input.
def send(proc, obj):
proc.cin.send(obj)
# recv receives object/exception from subprocess output.
def recv(proc, ctx): # -> obj | raise exception | EOFError
while 1:
if ctx.err() is not None:
raise ctx.err()
_, ok = proc.tryrecv()
if ok:
return _
# tryrecv tries to receive an object/exception from subprocess output.
# It does so without blocking.
def tryrecv(proc): # -> (obj, ok) | raise exception | EOFError
_ = proc.cout.poll(0.1*time.second)
if not _:
return None, False
_ = proc.cout.recv()
if isinstance(_, BaseException):
raise _
return _, True
# tFaultySubProcess runs f(tFaultyClient, *argv, *kw) in subprocess.
# It's a small convenience wrapper over tSubProcess - please see its documentation for details.
class tFaultySubProcess(tSubProcess):
def __init__(fproc, t, f, *argv, **kw):
kw.setdefault('zurl', zstor_2zurl(t.root._p_jar.db().storage))
kw.setdefault('zfile_oid', t.zfile._p_oid)
kw.setdefault('_procname', f.__name__)
kw.setdefault('pintimeout', t.pintimeout)
tremain = t.ctx.deadline() - time.now()
assert t.pintimeout < tremain/3 # 2·pintimeout is needed to reliably detect wcfs kill reaction
for k,v in list(kw.items()):
if isinstance(v, tAt): # tAt is not picklable
kw[k] = v.raw
super(tFaultySubProcess, fproc).__init__(_tFaultySubProcess_start, f.__name__, *argv, **kw)
assert fproc.cout.recv() == "f: start"
@func
def _tFaultySubProcess_start(cin, cout, funcname, **kw):
f = tFaultyClient()
f.cin = cin
f.cout = cout
f.zurl = kw.pop('zurl')
f.zfile_oid = kw.pop('zfile_oid')
f.pintimeout = kw.pop('pintimeout')
f.wc = wcfs.join(f.zurl, autostart=False); defer(f.wc.close)
# we do not need to implement timeouts precisely in the child process
# because parent will kill us on its timeout anyway.
ctx = context.background()
f.cout.send("f: start")
testf = globals()[funcname]
testf(ctx, f, **kw)
# tFaultyClient is placeholder for arguments + WCFS connection for running test
# function inside tFaultySubProcess.
class tFaultyClient:
# .cin
# .cout
# .zurl
# .zfile_oid
# .wc
# .pintimeout
pass
# ---- tests ----
# verify that wcfs kills slow/faulty client who does not handle pin
# notifications correctly and in time during watch setup.
#
# This verifies setupWatch codepath.
@func # faulty client that does not read pin notifications during watch setup.
def _bad_watch_no_pin_read(ctx, f, at):
wlf = f.wc._open("head/watch", mode='r+b') ; defer(wlf.close)
# wait for command to start watching
_ = f.cin.recv()
assert _ == "start watch", _
# send watch; the write should go ok.
wlf.write(b"1 watch %s @%s\n" % (h(f.zfile_oid), h(at)))
# there is no pin handler, because noone reads pin notifications
# -> wcfs must kill us after timing out with sending pin request
f.assertKilled(ctx, "wcfs did not kill client that does not read pin requests")
@func # faulty client that terminates connnection abruptly after receiving pin during watch setup.
def _bad_watch_eof_pin_reply(ctx, f, at):
wlf = f.wc._open("head/watch", mode='r+b') ; defer(wlf.close)
# wait for command to start watching
_ = f.cin.recv()
assert _ == "start watch", _
# send watch; the write should go ok.
wlf.write(b"1 watch %s @%s\n" % (h(f.zfile_oid), h(at)))
# pin notification must be coming
_ = wlf.readline()
assert _.startswith(b"2 pin "), _
f.cout.send(_[2:].rstrip()) # received message without sequence number and trailing \n
# we don't reply to pin notification and just close the connection instead
# NOTE it is different from WatchLink.closeWrite which sends "bye" before doing OS-level close
wlf.close()
# wcfs must kill us right after receiving EOF
f.assertKilled(ctx, "wcfs did not kill client that replied EOF to pin")
@func # faulty client that behaves in problematic way in its pin handler during watch setup.
def __bad_watch_pinh(ctx, f, at, pinh, pinhFailReason):
wl = wcfs.WatchLink(f.wc) ; defer(wl.close)
# wait for command to start watching
_ = f.cin.recv()
assert _ == "start watch", _
wg = sync.WorkGroup(ctx)
def _(ctx):
# send watch. The pin handler either won't be replying or will reply with an error
# -> we should never get reply here.
_ = wl.sendReq(ctx, b"watch %s @%s" % (h(f.zfile_oid), h(at)))
raise AssertionError("watch request completed (should not as pin handler %s); reply: %r" % (pinhFailReason, _))
wg.go(_)
def _(ctx):
pinh(ctx, wl)
wg.go(_)
wg.wait()
def _bad_watch_no_pin_reply (ctx, f, at): __bad_watch_pinh(ctx, f, at, f._pinner_no_pin_reply, "is stuck")
def _bad_watch_nak_pin_reply(ctx, f, at): __bad_watch_pinh(ctx, f, at, f._pinner_nak_pin_reply, "replies nak")
@mark.parametrize('faulty', [
_bad_watch_no_pin_read,
_bad_watch_no_pin_reply,
_bad_watch_eof_pin_reply,
_bad_watch_nak_pin_reply,
])
@func
def test_wcfs_pinhfaulty_kill_on_watch(faulty, with_prompt_pintimeout):
t = tDB(multiproc=True); zf = t.zfile
defer(t.close)
at1 = t.commit(zf, {2:'c1'})
at2 = t.commit(zf, {2:'c2'})
f = t.open(zf)
f.assertData(['','','c2'])
# launch faulty process that should be killed by wcfs on problematic pin during watch setup
p = tFaultySubProcess(t, faulty, at=at1)
defer(p.close)
t.assertStats({'pinkill': 0})
# wait till faulty client issues its watch, receives pin and pauses/misbehaves
p.send("start watch")
if faulty != _bad_watch_no_pin_read:
assert p.recv(t.ctx) == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1))
# issue our watch request - it should be served well and without any delay
wl = t.openwatch()
wl.watch(zf, at1, {2:at1})
# the faulty client must become killed by wcfs
p.join(t.ctx)
assert p.exitcode is not None
t.assertStats({'pinkill': 1})
# verify that wcfs kills slow/faulty client who does not handle pin
# notifications correctly and in time caused by asynchronous read access.
#
# This verifies readPinWatchers codepath.
@func # faulty client that does not read pin notifications triggered by read.
def _bad_pinh_no_pin_read(ctx, f, at):
wlf = f.wc._open("head/watch", mode='r+b') ; defer(wlf.close)
# initial watch setup goes ok
wlf.write(b"1 watch %s @%s\n" % (h(f.zfile_oid), h(at)))
_ = wlf.readline()
assert _ == b"1 ok\n", _
f.cout.send("f: watch setup ok")
# sleep > wcfs pin timeout - wcfs must kill us
f.assertKilled(ctx, "wcfs did not kill client that does not read pin requests")
@func # faulty client that terminates connnection abruptly after receiving pin triggered by read.
def _bad_pinh_eof_pin_reply(ctx, f, at):
wlf = f.wc._open("head/watch", mode='r+b') ; defer(wlf.close)
# initial watch setup goes ok
wlf.write(b"1 watch %s @%s\n" % (h(f.zfile_oid), h(at)))
_ = wlf.readline()
assert _ == b"1 ok\n", _
f.cout.send("f: watch setup ok")
# wait for "pin ..." due to read access in the parent
_ = wlf.readline()
assert _.startswith(b"2 pin "), _
f.cout.send(_[2:].rstrip())
# close connection abruptly.
# NOTE it is different from WatchLink.closeWrite which sends "bye" before doing OS-level close
wlf.close()
# wcfs must kill us right after receiving EOF
f.assertKilled(ctx, "wcfs did not kill client that replied EOF to pin")
@func # faulty client that behaves in problematic way in its pin notifications triggered by read.
def __bad_pinh(ctx, f, at, pinh):
wl = wcfs.WatchLink(f.wc) ; defer(wl.close)
# initial watch setup goes ok
_ = wl.sendReq(ctx, b"watch %s @%s" % (h(f.zfile_oid), h(at)))
assert _ == b"ok", _
f.cout.send("f: watch setup ok")
# wait for "pin ..." due to read access in the parent
pinh(ctx, wl)
def _bad_pinh_no_pin_reply (ctx, f, at): __bad_pinh(ctx, f, at, f._pinner_no_pin_reply)
def _bad_pinh_nak_pin_reply(ctx, f, at): __bad_pinh(ctx, f, at, f._pinner_nak_pin_reply)
@mark.parametrize('faulty', [
_bad_pinh_no_pin_read,
_bad_pinh_no_pin_reply,
_bad_pinh_eof_pin_reply,
_bad_pinh_nak_pin_reply,
])
@func
def test_wcfs_pinhfaulty_kill_on_access(faulty, with_prompt_pintimeout):
t = tDB(multiproc=True); zf = t.zfile; at0=t.at0
defer(t.close)
at1 = t.commit(zf, {2:'c1'})
at2 = t.commit(zf, {2:'c2'})
f = t.open(zf)
f.assertData(['','','c2'])
# issue our watch request - it should be served well
wl = t.openwatch()
wl.watch(zf, at1, {2:at1})
# spawn faulty client and wait until it setups its watch
p = tFaultySubProcess(t, faulty, at=at2)
defer(p.close)
assert p.recv(t.ctx) == "f: watch setup ok"
t.assertStats({'pinkill': 0})
# commit new transaction and issue read access to modified block
# our read should be served well even though faulty client is either stuck
# or behaves in problematic way in its pin handler.
# As the result the faulty client should be killed by wcfs.
at3 = t.commit(zf, {1:'b3'})
wg = sync.WorkGroup(t.ctx)
def _(ctx):
f.assertBlk(1, 'b3', {wl: {1:at0}}, timeout=2*t.pintimeout)
wg.go(_)
def _(ctx):
if faulty != _bad_pinh_no_pin_read:
assert p.recv(ctx) == b"pin %s #%d @%s" % (h(zf._p_oid), 1, h(at0))
wg.go(_)
wg.wait()
p.join(t.ctx)
assert p.exitcode is not None
t.assertStats({'pinkill': 1})
# _pinner_<problem> simulates faulty pinner inside client that behaves in
# problematic way in its pin notification handler.
@func(tFaultyClient)
def _pinner_no_pin_reply(f, ctx, wl):
req = wl.recvReq(ctx)
assert req is not None
f.cout.send(req.msg)
# sleep > wcfs pin timeout - wcfs must kill us
f.assertKilled(ctx, "wcfs did not kill stuck client")
@func(tFaultyClient)
def _pinner_nak_pin_reply(f, ctx, wl):
req = wl.recvReq(ctx)
assert req is not None
f.cout.send(req.msg)
wl.replyReq(ctx, req, b"nak")
# wcfs must kill us right after receiving the nak
f.assertKilled(ctx, "wcfs did not kill client that replied nak to pin")
# assertKilled assert that the current process becomes killed after time goes after pintimeout.
@func(tFaultyClient)
def assertKilled(f, ctx, failmsg):
# sleep > wcfs pin timeout - wcfs must kill us
_, _rx = select(
ctx.done().recv, # 0
time.after(2*f.pintimeout).recv, # 1
)
if _ == 0:
raise ctx.err()
raise AssertionError(failmsg)
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2022 Nexedi SA and Contributors.
# Copyright (C) 2018-2024 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
......@@ -22,6 +22,9 @@
Virtmem layer provided by wcfs client package is unit-tested by
wcfs/client/client_test.py .
Protection from slow or faulty clients is unit-tested by
wcfs/wcfs_faultyprot_test.py .
At functional level, the whole wendelin.core test suite is used to verify
wcfs.py/wcfs.go while running tox tests in wcfs mode.
"""
......@@ -49,12 +52,11 @@ from resource import setrlimit, getrlimit, RLIMIT_MEMLOCK
from golang import go, chan, select, func, defer, error, b
from golang import context, errors, sync, time
from zodbtools.util import ashex as h, fromhex
import pytest; xfail = pytest.mark.xfail
from pytest import raises, fail
from wendelin.wcfs.internal import io, mm
from wendelin.wcfs.internal.wcfs_test import _tWCFS, read_exfault_nogil, SegmentationFault, install_sigbus_trap, fadvise_dontneed
from wendelin.wcfs.client._wcfs import _tpywlinkwrite as _twlinkwrite
from wendelin.wcfs import _is_mountpoint as is_mountpoint, _procwait as procwait, _ready as ready, _rmdir_ifexists as rmdir_ifexists
from wendelin.wcfs import _is_mountpoint as is_mountpoint, _procwait as procwait, _waitfor as waitfor, _ready as ready, _rmdir_ifexists as rmdir_ifexists
# setup:
......@@ -298,11 +300,13 @@ def start_and_crash_wcfs(zurl, mntpt): # -> WCFS
# many tests need to be run with some reasonable timeout to detect lack of wcfs
# response. with_timeout and timeout provide syntactic shortcuts to do so.
def with_timeout(parent=context.background()): # -> ctx, cancel
return context.with_timeout(parent, 3*time.second)
def with_timeout(parent=context.background(), dt=None): # -> ctx, cancel
if dt is None:
dt = 3*time.second
return context.with_timeout(parent, dt)
def timeout(parent=context.background()): # -> ctx
ctx, _ = with_timeout()
def timeout(parent=context.background(), dt=None): # -> ctx
ctx, _ = with_timeout(parent, dt)
return ctx
# tdelay is used in places where we need to delay a bit in order to e.g.
......@@ -348,14 +352,27 @@ class DFile:
# TODO(?) print -> t.trace/debug() + t.verbose depending on py.test -v -v ?
class tWCFS(_tWCFS):
@func
def __init__(t):
def __init__(t, multiproc=False):
assert not os.path.exists(testmntpt)
wc = wcfs.join(testzurl, autostart=True)
assert wc.mountpoint == testmntpt
assert os.path.exists(wc.mountpoint)
assert is_mountpoint(wc.mountpoint)
t.wc = wc
t.pintimeout = float(t.wc._read(".wcfs/pintimeout"))
# multiproc=True indicates that wcfs server will be used by multiple client processes
t.multiproc=multiproc
# the whole test is limited in time to detect deadlocks
# NOTE with_timeout must be << timeout
# NOTE pintimeout can be either
# * >> timeout (most of the test), or
# * << timeout (faulty protection tests)
timeout = 10*time.second
t.ctx, t._ctx_cancel = context.with_timeout(context.background(), timeout)
# make sure any stuck FUSE request is aborted. To do so
# force-unmount wcfs on timeout to unstuck current test and let it fail.
# Force-unmount can be done reliably only by writing into
# /sys/fs/fuse/connections/<X>/abort. For everything else there are
......@@ -364,15 +381,21 @@ class tWCFS(_tWCFS):
# still wait for request completion even after fatal signal )
nogilready = chan(dtype='C.structZ')
t._wcfuseabort = os.dup(wc._wcsrv._fuseabort.fileno())
go(t._abort_ontimeout, t._wcfuseabort, 10*time.second, nogilready) # NOTE must be: with_timeout << · << wcfs_pin_timeout
go(t._abort_ontimeout, t._wcfuseabort, timeout, t.ctx.done(), nogilready)
nogilready.recv() # wait till _abort_ontimeout enters nogil
t._stats_prev = None
t.assertStats({'BigFile': 0, 'RevHead': 0, 'ZHeadLink': 0,
'WatchLink': 0, 'Watch': 0, 'PinnedBlk': 0,
'pin': 0, 'pinkill': 0})
# _abort_ontimeout is in wcfs_test.pyx
# close closes connection to wcfs, unmounts the filesystem and makes sure
# that wcfs server exits.
@func
def close(t):
defer(t._ctx_cancel)
def _():
os.close(t._wcfuseabort)
defer(_)
......@@ -394,6 +417,74 @@ class tWCFS(_tWCFS):
defer(t.wc.close)
assert is_mountpoint(t.wc.mountpoint)
# assertStats asserts that content of .wcfs/stats eventually reaches expected state.
#
# For all keys k from kvok it verifies that eventually stats[k] == kvok[k]
# and that it stays that way.
#
# The state is asserted eventually instead of immediately - for both
# counters and instance values - because wcfs increments a counter
# _after_ corresponding event happened, for example pinkill after actually
# killing client process, and the tests can start to observe that state
# before wcfs actually does counter increment. For the similar reason we
# need to assert that the counters stay in expected state to make sure that
# no extra event happened. For instance values we need to assert
# eventually as well, because in many cases OS kernel sends events to wcfs
# asynchronously after client triggers an action. For example for ZHeadLink
# after client closes corresponding file handle, the kernel sends RELEASE
# to wcfs asynchronously, and it is only after that final RELEASE when wcfs
# removes corresponding entry from zheadSockTab. So if we would assert on
# instance values immediately after close, it could happen before wcfs
# received corresponding RELEASE and the assertion would fail.
#
# Note that the set of keys in kvok can be smaller than the full set of keys in stats.
def assertStats(t, kvok):
# kstats loads stats subset with kvok keys.
def kstats():
stats = t._loadStats()
kstats = {}
for k in kvok.keys():
kstats[k] = stats.get(k, None)
return kstats
# wait till stats reaches expected state
ctx = timeout()
while 1:
kv = kstats()
if kv == kvok:
break
if ctx.err() is not None:
assert kv == kvok, "stats did not reach expected state"
tdelay()
# make sure that it stays that way for some time
# we do not want to make the assertion time big because it will results
# in slowing down all tests
for _ in range(3):
tdelay()
kv = kstats()
assert kv == kvok, "stats did not stay at expected state"
# _loadStats loads content of .wcfs/stats .
def _loadStats(t): # -> {}
stats = {}
for l in t.wc._read(".wcfs/stats").splitlines():
# key : value
k, v = l.split(':')
k = k.strip()
v = v.strip()
stats[k] = int(v)
# verify that keys remains the same and that cumulative counters do not decrease
if t._stats_prev is not None:
assert stats.keys() == t._stats_prev.keys()
for k in stats.keys():
if k[0].islower():
assert stats[k] >= t._stats_prev[k], k
t._stats_prev = stats
return stats
class tDB(tWCFS):
# __init__ initializes test database and wcfs.
......@@ -402,7 +493,7 @@ class tDB(tWCFS):
# create before wcfs startup. old_data is []changeDelta - see .commit
# and .change for details.
@func
def __init__(t, old_data=[]):
def __init__(t, old_data=[], **kw):
t.root = testdb.dbopen()
def _(): # close/unlock db if __init__ fails
exc = sys.exc_info()[1]
......@@ -432,7 +523,7 @@ class tDB(tWCFS):
t._commit(t.zfile, changeDelta)
# start wcfs after testdb is created and initial data is committed
super(tDB, t).__init__()
super(tDB, t).__init__(**kw)
# fh(.wcfs/zhead) + history of zhead read from there
t._wc_zheadfh = open(t.wc.mountpoint + "/.wcfs/zhead")
......@@ -445,6 +536,9 @@ class tDB(tWCFS):
t._files = set()
t._wlinks = set()
t.assertStats({'ZHeadLink': 1})
@property
def head(t):
return t.dFtail[-1].rev
......@@ -465,6 +559,11 @@ class tDB(tWCFS):
assert len(t._wlinks) == 0
t._wc_zheadfh.close()
zstats = {'WatchLink': 0, 'Watch': 0, 'PinnedBlk': 0, 'ZHeadLink': 0}
if not t.multiproc:
zstats['pinkill'] = 0
t.assertStats(zstats)
# open opens wcfs file corresponding to zf@at and starts to track it.
# see returned tFile for details.
def open(t, zf, at=None): # -> tFile
......@@ -704,8 +803,11 @@ class tFile:
#
# The automatic computation of pinokByWLink is verified against explicitly
# provided pinokByWLink when it is present.
#
# The whole read operation must complete in specified time.
# The default timeout is used if timeout is not explicitly given.
@func
def assertBlk(t, blk, dataok, pinokByWLink=None):
def assertBlk(t, blk, dataok, pinokByWLink=None, timeout=None):
# TODO -> assertCtx('blk #%d' % blk)
def _():
assertCtx = 'blk #%d' % blk
......@@ -718,10 +820,10 @@ class tFile:
dataok = b(dataok)
blkdata, _ = t.tdb._blkDataAt(t.zf, blk, t.at)
assert blkdata == dataok, "computed vs explicit data"
t._assertBlk(blk, dataok, pinokByWLink)
t._assertBlk(blk, dataok, pinokByWLink, timeout=timeout)
@func
def _assertBlk(t, blk, dataok, pinokByWLink=None, pinfunc=None):
def _assertBlk(t, blk, dataok, pinokByWLink=None, pinfunc=None, timeout=None):
assert len(dataok) <= t.blksize
dataok += b'\0'*(t.blksize - len(dataok)) # tailing zeros
assert blk < t._sizeinblk()
......@@ -769,6 +871,9 @@ class tFile:
pinokByWLink[wlink] = (t.zf, pinok)
# access 1 byte on the block and verify that wcfs sends us correct pins
ctx, cancel = with_timeout(t.tdb.ctx, timeout)
defer(cancel)
blkview = t._blk(blk)
assert t.cached()[blk] == cached
......@@ -807,7 +912,7 @@ class tFile:
b = _rx
ev.append('read ' + b)
ev = doCheckingPin(_, pinokByWLink, pinfunc)
ev = doCheckingPin(ctx, _, pinokByWLink, pinfunc)
# XXX hack - wlinks are notified and emit events simultaneously - we
# check only that events begin and end with read pre/post and that pins
......@@ -877,8 +982,12 @@ class tWatchLink(wcfs.WatchLink):
# this tWatchLink currently watches the following files at particular state.
t._watching = {} # {} foid -> tWatch
if not tdb.multiproc:
tdb.assertStats({'WatchLink': len(tdb._wlinks)})
def close(t):
t.tdb._wlinks.remove(t)
tdb = t.tdb
tdb._wlinks.remove(t)
super(tWatchLink, t).close()
# disable all established watches
......@@ -887,6 +996,9 @@ class tWatchLink(wcfs.WatchLink):
w.pinned = {}
t._watching = {}
if not tdb.multiproc:
tdb.assertStats({'WatchLink': len(tdb._wlinks)})
# ---- infrastructure: watch setup/adjust ----
......@@ -1018,7 +1130,7 @@ def _watch(twlink, zf, at, pinok, replyok):
else:
assert reply == replyok
doCheckingPin(_, {twlink: (zf, pinok)})
doCheckingPin(timeout(twlink.tdb.ctx), _, {twlink: (zf, pinok)})
# doCheckingPin calls f and verifies that wcfs sends expected pins during the
......@@ -1030,14 +1142,14 @@ def _watch(twlink, zf, at, pinok, replyok):
#
# pinfunc is called after pin request is received from wcfs, but before pin ack
# is replied back. Pinfunc must not block.
def doCheckingPin(f, pinokByWLink, pinfunc=None): # -> []event(str)
def doCheckingPin(ctx, f, pinokByWLink, pinfunc=None): # -> []event(str)
# call f and check that we receive pins as specified.
# Use timeout to detect wcfs replying less pins than expected.
#
# XXX detect not sent pins via ack'ing previous pins as they come in (not
# waiting for all of them) and then seeing that we did not received expected
# pin when f completes?
ctx, cancel = with_timeout()
ctx, cancel = context.with_cancel(ctx)
wg = sync.WorkGroup(ctx)
ev = []
......@@ -1350,7 +1462,8 @@ def test_wcfs_watch_robust():
"file not yet known to wcfs or is not a ZBigFile"
wl.close()
# closeTX/bye cancels blocked pin handlers
# closeTX gently with "bye" cancels blocked pin handlers without killing client
# (closing abruptly is verified in wcfs_faultyprot_test.py)
f = t.open(zf)
f.assertBlk(2, 'c2')
f.assertCache([0,0,1])
......@@ -1358,23 +1471,15 @@ def test_wcfs_watch_robust():
wl = t.openwatch()
wg = sync.WorkGroup(timeout())
def _(ctx):
# TODO clarify what wcfs should do if pin handler closes wlink TX:
# - reply error + close, or
# - just close
# t = when reviewing WatchLink.serve in wcfs.go
#assert wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1))) == \
# "error setup watch f<%s> @%s: " % (h(zf._p_oid), h(at1)) + \
# "pin #%d @%s: context canceled" % (2, h(at1))
#with raises(error, match="unexpected EOF"):
with raises(error, match="recvReply: link is down"):
wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1)))
wg.go(_)
def _(ctx):
req = wl.recvReq(ctx)
assert req is not None
assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1))
# don't reply to req - close instead
# NOTE this closes watchlink gently with first sending "bye" message
wl.closeWrite()
wg.go(_)
wg.wait()
......@@ -1450,45 +1555,7 @@ def test_wcfs_watch_going_back():
wl.close()
# verify that wcfs kills slow/faulty client who does not reply to pin in time.
@xfail # protection against faulty/slow clients
@func
def test_wcfs_pintimeout_kill():
# adjusted wcfs timeout to kill client who is stuck not providing pin reply
tkill = 3*time.second
t = tDB(); zf = t.zfile # XXX wcfs args += tkill=<small>
defer(t.close)
at1 = t.commit(zf, {2:'c1'})
at2 = t.commit(zf, {2:'c2'})
f = t.open(zf)
f.assertData(['','','c2'])
# XXX move into subprocess not to kill whole testing
ctx, _ = context.with_timeout(context.background(), 2*tkill)
wl = t.openwatch()
wg = sync.WorkGroup(ctx)
def _(ctx):
# send watch. The pin handler won't be replying -> we should never get reply here.
wl.sendReq(ctx, b"watch %s @%s" % (h(zf._p_oid), h(at1)))
fail("watch request completed (should not as pin handler is stuck)")
wg.go(_)
def _(ctx):
req = wl.recvReq(ctx)
assert req is not None
assert req.msg == b"pin %s #%d @%s" % (h(zf._p_oid), 2, h(at1))
# sleep > wcfs pin timeout - wcfs must kill us
_, _rx = select(
ctx.done().recv, # 0
time.after(tkill).recv, # 1
)
if _ == 0:
raise ctx.err()
fail("wcfs did not killed stuck client")
wg.go(_)
wg.wait()
# tests for "Protection against slow or faulty clients" are in wcfs_faultyprot_test.py
# watch with @at > head - must wait for head to become >= at.
......@@ -1824,6 +1891,68 @@ def test_wcfs_watch_2files():
# ----------------------------------------
# verify that wcfs switches to EIO mode after zwatcher failure.
# in EIO mode accessing anything on the filesystem returns ENOTCONN error.
@func
def test_wcfs_eio_after_zwatcher_fail(capfd):
# we will use low-level tWCFS instead of tDB for precise control of access
# to the filesystem. For example tDB keeps open connection to .wcfs/zhead
# and inspects it during commit which can break in various ways on switch
# to EIO mode. Do all needed actions by hand to avoid unneeded uncertainty.
# create ZBigFile
root = testdb.dbopen()
def _():
dbclose(root)
defer(_)
root['zfile'] = zf = ZBigFile(blksize)
transaction.commit()
# start wcfs
t = tWCFS()
def _():
with raises(IOError) as exc:
t.close()
assert exc.value.errno == ENOTCONN
defer(_)
t.wc._stat("head/bigfile/%s" % h(zf._p_oid)) # wcfs starts to track zf
# instead of simulating e.g. ZODB server failure we utilize the fact that
# currently zwatcher fails when there is ZBigFile epoch
zf.blksize += 1
transaction.commit()
# on new transaction with ZBigFile epoch wcfs should switch to EIO when it
# learns about that transaction
def _():
try:
t.wc._stat("head")
except:
return True
else:
return False
waitfor(timeout(), _)
# verify it was indeed switch to EIO
_ = capfd.readouterr()
assert not ready(t._wcfuseaborted) # wcfs might have been killed on overall test timeout
assert "test timed out" not in _.err
assert "aborting wcfs fuse connection to unblock" not in _.err
assert "zwatcher failed" in _.err
assert "switching filesystem to EIO mode" in _.err
# verify that accessing any file returns ENOTCONN after the switch
def checkeio(path):
with raises(IOError) as exc:
t.wc._read(path)
assert exc.value.errno == ENOTCONN
checkeio(".wcfs/zurl")
checkeio("head/at")
checkeio("head/bigfile/%s" % h(zf._p_oid))
checkeio("anything")
# verify that wcfs does not panic with "no current transaction" / "at out of
# bounds" on read/invalidate/watch codepaths.
@func
......@@ -1937,6 +2066,13 @@ class tAt(bytes):
return "@" + h(at)
__str__ = __repr__
# raw returns raw bytes form of at.
# It should be used in contexts where at needs to be pickled, because tAt
# is unpicklable due to .tdb being unpicklable.
@property
def raw(at):
return fromhex(h(at))
# hpin returns human-readable representation for {}blk->rev.
@func(tDB)
def hpin(t, pin):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment