Commit 3070cec4 authored by Kirill Smelkov's avatar Kirill Smelkov

xnet/virtnet: SubNetwork += AutoClose

When a virtnet network - e.g. lonet - is explicitly used in tests, the
open and close sequence is easy to implement:

    network = lonet.Join(...)
    defer network.Close()

    hostα = network.NewHost("α")
    defer hostα.Close()
    hostβ = network.NewHost("β")
    defer hostβ.Close()
    ...

However in other scenarios - for example when the code wants to call
xnet.Networker factory that creates a networker out of several potential
choices according to given parameters, only interface for network
access-point (e.g. Host in lonet/virtnet case) is returned. The network
itself (virtnet.SubNetwork) is not returned to the user. As it is this
makes it impossible to close the network and release its resources.

Instead of adding complexity to such "create-networker" functions to
e.g. change Close of returned networker to also close subnetwork, or
return both networker and a separate close func, let's teach virtnet to
close SubNetwork automatically when last host of this subnetwork is
closed. Make it opt-in since this behaviour is not universally wanted.

This way a factory that is asked to connect to network as e.g. a node on
lonet, could do the following:

    func neonet.Join(...) xnet.Networker {
        ...
        // join lonet "<net>" as host "<node>"
        network = lonet.Join("<net>")
        host = network.NewHost("<node>")
        network.AutoClose() // host.Close will close network
        return host
    }

and network will be closed on host.Close() call.

-> Add SubNetwork.AutoClose to schedule Close to be called after last host on the subnetwork is closed.
-> Mirror this change in virtnet part of lonet.py
parent 01a77bb9
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright (C) 2018 Nexedi SA and Contributors. # Copyright (C) 2018-2020 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com> # Kirill Smelkov <kirr@nexedi.com>
# #
# This program is free software: you can Use, Study, Modify and Redistribute # This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your # it under the terms of the GNU General Public License version 3, or (at your
...@@ -33,7 +33,8 @@ import functools ...@@ -33,7 +33,8 @@ import functools
import threading import threading
import logging as log import logging as log
from golang import func, go, chan, select, default, panic, gimport from golang import func, defer, go, chan, select, default, panic, gimport
from golang import sync
from golang.gcompat import qq from golang.gcompat import qq
xerr = gimport('lab.nexedi.com/kirr/go123/xerr') xerr = gimport('lab.nexedi.com/kirr/go123/xerr')
...@@ -54,6 +55,8 @@ errcause= xerr.cause ...@@ -54,6 +55,8 @@ errcause= xerr.cause
# downOnce.Do(...) # downOnce.Do(...)
# #
# in Go. # in Go.
#
# TODO just use sync.Once from pygolang.
_oncemu = threading.Lock() _oncemu = threading.Lock()
def set_once(event): def set_once(event):
with _oncemu: with _oncemu:
...@@ -115,16 +118,20 @@ class VirtSubNetwork(object): ...@@ -115,16 +118,20 @@ class VirtSubNetwork(object):
# ._registry Registry # ._registry Registry
# ._hostmu μ # ._hostmu μ
# ._hostmap {} hostname -> Host # ._hostmap {} hostname -> Host
# ._nopenhosts int
# ._autoclose bool
# ._down chan ø # ._down chan ø
# ._down_once threading.Event # ._down_once threading.Event
def __init__(self, network, registry): def __init__(self, network, registry):
self._network = network self._network = network
self._registry = registry self._registry = registry
self._hostmu = threading.Lock() self._hostmu = threading.Lock()
self._hostmap = {} self._hostmap = {}
self._down = chan() self._nopenhosts = 0
self._down_once = threading.Event() self._autoclose = False
self._down = chan()
self._down_once = threading.Event()
# must be implemented in particular virtnet implementation # must be implemented in particular virtnet implementation
def _vnet_newhost(self, hostname, registry): raise NotImplementedError() def _vnet_newhost(self, hostname, registry): raise NotImplementedError()
...@@ -140,14 +147,16 @@ class Host(object): ...@@ -140,14 +147,16 @@ class Host(object):
# ._socketv []socket ; port -> listener | conn ; [0] is always None # ._socketv []socket ; port -> listener | conn ; [0] is always None
# ._down chan ø # ._down chan ø
# ._down_once threading.Event # ._down_once threading.Event
# ._close_once sync.Once
def __init__(self, subnet, name): def __init__(self, subnet, name):
self._subnet = subnet self._subnet = subnet
self._name = name self._name = name
self._sockmu = threading.Lock() self._sockmu = threading.Lock()
self._socketv = [] self._socketv = []
self._down = chan() self._down = chan()
self._down_once = threading.Event() self._down_once = threading.Event()
self._close_once = sync.Once()
# socket represents one endpoint entry on Host. # socket represents one endpoint entry on Host.
...@@ -222,14 +231,18 @@ class Accept(object): ...@@ -222,14 +231,18 @@ class Accept(object):
# _shutdown is worker for close and _vnet_down. # _shutdown is worker for close and _vnet_down.
@func(VirtSubNetwork) @func(VirtSubNetwork)
def _shutdown(n, exc): def _shutdown(n, exc):
n.__shutdown(exc, True)
@func(VirtSubNetwork)
def __shutdown(n, exc, withHosts):
if not set_once(n._down_once): if not set_once(n._down_once):
return return
n._down.close() n._down.close()
with n._hostmu: if withHosts:
for host in n._hostmap.values(): with n._hostmu:
host._shutdown() for host in n._hostmap.values():
host._shutdown()
# XXX py: we don't collect / remember .downErr # XXX py: we don't collect / remember .downErr
if exc is not None: if exc is not None:
...@@ -241,8 +254,14 @@ def _shutdown(n, exc): ...@@ -241,8 +254,14 @@ def _shutdown(n, exc):
# close shutdowns subnetwork. # close shutdowns subnetwork.
@func(VirtSubNetwork) @func(VirtSubNetwork)
def close(n): def close(n):
n.__close(True)
@func(VirtSubNetwork)
def _closeWithoutHosts(n):
n.__close(False)
@func(VirtSubNetwork)
def __close(n, withHosts):
with errctx("virtnet %s: close" % qq(n._network)): with errctx("virtnet %s: close" % qq(n._network)):
n._shutdown(None) n.__shutdown(None, withHosts)
# _vnet_down shutdowns subnetwork upon engine error. # _vnet_down shutdowns subnetwork upon engine error.
@func(VirtSubNetwork) @func(VirtSubNetwork)
...@@ -265,6 +284,7 @@ def new_host(n, name): ...@@ -265,6 +284,7 @@ def new_host(n, name):
host = Host(n, name) host = Host(n, name)
n._hostmap[name] = host n._hostmap[name] = host
n._nopenhosts += 1
return host return host
...@@ -295,9 +315,29 @@ def _shutdown(h): ...@@ -295,9 +315,29 @@ def _shutdown(h):
# close shutdowns host. # close shutdowns host.
@func(Host) @func(Host)
def close(h): def close(h):
def autoclose():
def _():
n = h._subnet
with n._hostmu:
n._nopenHosts -= 1
if n._nopenHosts < 0:
panic("SubNetwork._nopenHosts < 0")
if n._autoclose and n._nopenHosts == 0:
n._closeWithoutHosts()
h._close_once.do(_)
defer(autoclose)
with errctx("virtnet %s: host %s: close" % (qq(h._subnet._network), qq(h._name))): with errctx("virtnet %s: host %s: close" % (qq(h._subnet._network), qq(h._name))):
h._shutdown() h._shutdown()
# autoclose schedules close to be called after last host on this subnetwork is closed.
@func(VirtSubNetwork)
def autoclose(n):
with n._hostmu:
if n._nopenHosts == 0:
panic("BUG: no opened hosts")
n._autoclose = True
# listen starts new listener on the host. # listen starts new listener on the host.
@func(Host) @func(Host)
......
...@@ -117,8 +117,10 @@ type SubNetwork struct { ...@@ -117,8 +117,10 @@ type SubNetwork struct {
registry Registry registry Registry
// {} hostname -> Host // {} hostname -> Host
hostMu sync.Mutex hostMu sync.Mutex
hostMap map[string]*Host hostMap map[string]*Host
nopenHosts int // #(hosts-in-open-state) in hostMap
autoClose bool // close SubNetwork when last host is Closed
down chan struct{} // closed when no longer operational down chan struct{} // closed when no longer operational
downErr error downErr error
...@@ -140,8 +142,9 @@ type Host struct { ...@@ -140,8 +142,9 @@ type Host struct {
sockMu sync.Mutex sockMu sync.Mutex
socketv []*socket socketv []*socket
down chan struct{} // closed when no longer operational down chan struct{} // closed when no longer operational
downOnce sync.Once downOnce sync.Once
closeOnce sync.Once
} }
var _ xnet.Networker = (*Host)(nil) var _ xnet.Networker = (*Host)(nil)
...@@ -236,16 +239,19 @@ func NewSubNetwork(network string, engine Engine, registry Registry) (*SubNetwor ...@@ -236,16 +239,19 @@ func NewSubNetwork(network string, engine Engine, registry Registry) (*SubNetwor
// //
// The error returned is cumulative shutdown error - the cause + any error from // The error returned is cumulative shutdown error - the cause + any error from
// closing engine and registry for the call when shutdown was actually performed. // closing engine and registry for the call when shutdown was actually performed.
func (n *SubNetwork) shutdown(err error) error { func (n *SubNetwork) shutdown(err error) error { return n._shutdown(err, true) }
func (n *SubNetwork) _shutdown(err error, withHosts bool) error {
n.downOnce.Do(func() { n.downOnce.Do(func() {
close(n.down) close(n.down)
// shutdown hosts // shutdown hosts
n.hostMu.Lock() if withHosts {
for _, host := range n.hostMap { n.hostMu.Lock()
host.shutdown() for _, host := range n.hostMap {
host.shutdown()
}
n.hostMu.Unlock()
} }
n.hostMu.Unlock()
var errv xerr.Errorv var errv xerr.Errorv
errv.Appendif( err ) errv.Appendif( err )
...@@ -262,9 +268,11 @@ func (n *SubNetwork) shutdown(err error) error { ...@@ -262,9 +268,11 @@ func (n *SubNetwork) shutdown(err error) error {
// //
// It recursively interrupts all blocking operations on the subnetwork and // It recursively interrupts all blocking operations on the subnetwork and
// shutdowns all subnetwork's hosts and connections. // shutdowns all subnetwork's hosts and connections.
func (n *SubNetwork) Close() (err error) { func (n *SubNetwork) Close() (err error) { return n._close(true) }
func (n *SubNetwork) closeWithoutHosts() (err error) { return n._close(false) }
func (n *SubNetwork) _close(withHosts bool) (err error) {
defer xerr.Contextf(&err, "virtnet %q: close", n.network) defer xerr.Contextf(&err, "virtnet %q: close", n.network)
return n.shutdown(nil) return n._shutdown(nil, withHosts)
} }
// VNetDown implements Notifier by shutting subnetwork down upon engine error. // VNetDown implements Notifier by shutting subnetwork down upon engine error.
...@@ -306,6 +314,7 @@ func (n *SubNetwork) NewHost(ctx context.Context, name string) (_ *Host, err err ...@@ -306,6 +314,7 @@ func (n *SubNetwork) NewHost(ctx context.Context, name string) (_ *Host, err err
host := &Host{subnet: n, name: name, down: make(chan struct{})} host := &Host{subnet: n, name: name, down: make(chan struct{})}
n.hostMap[name] = host n.hostMap[name] = host
n.nopenHosts++
return host, nil return host, nil
} }
...@@ -353,9 +362,37 @@ func (h *Host) shutdown() { ...@@ -353,9 +362,37 @@ func (h *Host) shutdown() {
func (h *Host) Close() (err error) { func (h *Host) Close() (err error) {
defer xerr.Contextf(&err, "virtnet %q: host %q: close", h.subnet.network, h.name) defer xerr.Contextf(&err, "virtnet %q: host %q: close", h.subnet.network, h.name)
h.shutdown() h.shutdown()
return nil
// close subnet if autoclose=y and we were the last open host
h.closeOnce.Do(func() {
n := h.subnet
n.hostMu.Lock()
defer n.hostMu.Unlock()
n.nopenHosts--
if n.nopenHosts < 0 {
panic("SubNetwork.nopenHosts < 0")
}
if n.autoClose && n.nopenHosts == 0 {
err = n.closeWithoutHosts()
}
})
return err
}
// AutoClose schedules Close to be called after last host on this subnetwork is closed.
//
// It is an error to call AutoClose with no opened hosts - this will panic.
func (n *SubNetwork) AutoClose() {
n.hostMu.Lock()
defer n.hostMu.Unlock()
if n.nopenHosts == 0 {
panic("BUG: no opened hosts")
}
n.autoClose = true
} }
// Listen starts new listener on the host. // Listen starts new listener on the host.
// //
// It either allocates free port if laddr is "" or with 0 port, or binds to laddr. // It either allocates free port if laddr is "" or with 0 port, or binds to laddr.
......
...@@ -382,3 +382,23 @@ func TestVNetDown(t0 *testing.T) { ...@@ -382,3 +382,23 @@ func TestVNetDown(t0 *testing.T) {
assert.Eq(errors.Cause(err), errSomeProblem) assert.Eq(errors.Cause(err), errSomeProblem)
assert.Eq(err.Error(), "virtnet \"pipet\": close: some problem") assert.Eq(err.Error(), "virtnet \"pipet\": close: some problem")
} }
// TestAutoClose verifies that subnet.AutoClose() leads to subnet.Close() after
// its last host is closed.
func TestAutoClose(t0 *testing.T) {
t := newTestNet(t0)
X := exc.Raiseif
assert := xtesting.Assert(t0)
t.net.AutoClose()
, err := t.net.NewHost(context.Background(), "γ"); X(err)
err = t..Close(); X(err)
err = t..Close(); X(err)
err = .Close(); X(err)
, err := t.net.NewHost(context.Background(), "δ")
assert.Eq(, (*Host)(nil))
assert.Eq(errors.Cause(err), ErrNetDown)
assert.Eq(err.Error(), "virtnet \"pipet\": new host \"δ\": network is down")
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment