Commit 171781c3 authored by Mitchell Hashimoto's avatar Mitchell Hashimoto

packer/rpc: work-in-progress commit

parent 4ba5c2ef
...@@ -11,9 +11,11 @@ func TestArtifactRPC(t *testing.T) { ...@@ -11,9 +11,11 @@ func TestArtifactRPC(t *testing.T) {
a := new(packer.MockArtifact) a := new(packer.MockArtifact)
// Start the server // Start the server
server := NewServer() client, server := testClientServer(t)
defer client.Close()
defer server.Close()
server.RegisterArtifact(a) server.RegisterArtifact(a)
client := testClient(t, server)
aClient := client.Artifact() aClient := client.Artifact()
// Test // Test
......
...@@ -51,10 +51,11 @@ func TestCacheRPC(t *testing.T) { ...@@ -51,10 +51,11 @@ func TestCacheRPC(t *testing.T) {
c := new(testCache) c := new(testCache)
// Start the server // Start the server
server := NewServer() client, server := testClientServer(t)
server.RegisterCache(c)
client := testClient(t, server)
defer client.Close() defer client.Close()
defer server.Close()
server.RegisterCache(c)
cacheClient := client.Cache() cacheClient := client.Cache()
// Test Lock // Test Lock
......
...@@ -12,7 +12,6 @@ import ( ...@@ -12,7 +12,6 @@ import (
type Client struct { type Client struct {
mux *MuxConn mux *MuxConn
client *rpc.Client client *rpc.Client
server *rpc.Server
} }
func NewClient(rwc io.ReadWriteCloser) (*Client, error) { func NewClient(rwc io.ReadWriteCloser) (*Client, error) {
...@@ -27,22 +26,9 @@ func NewClient(rwc io.ReadWriteCloser) (*Client, error) { ...@@ -27,22 +26,9 @@ func NewClient(rwc io.ReadWriteCloser) (*Client, error) {
return nil, err return nil, err
} }
// Accept connection ID 1 which is what the remote end uses to
// be an RPC client back to us so we can even serve some objects.
serverConn, err := mux.Accept(1)
if err != nil {
mux.Close()
return nil, err
}
// Start our RPC server on this end
server := rpc.NewServer()
go server.ServeConn(serverConn)
return &Client{ return &Client{
mux: mux, mux: mux,
client: rpc.NewClient(clientConn), client: rpc.NewClient(clientConn),
server: server,
}, nil }, nil
} }
...@@ -70,6 +56,12 @@ func (c *Client) Cache() packer.Cache { ...@@ -70,6 +56,12 @@ func (c *Client) Cache() packer.Cache {
func (c *Client) PostProcessor() packer.PostProcessor { func (c *Client) PostProcessor() packer.PostProcessor {
return &postProcessor{ return &postProcessor{
client: c.client, client: c.client,
server: c.server, }
}
func (c *Client) Ui() packer.Ui {
return &Ui{
client: c.client,
endpoint: DefaultUiEndpoint,
} }
} }
...@@ -5,29 +5,44 @@ import ( ...@@ -5,29 +5,44 @@ import (
"testing" "testing"
) )
func testClient(t *testing.T, server *Server) *Client { func testConn(t *testing.T) (net.Conn, net.Conn) {
l, err := net.Listen("tcp", ":0") l, err := net.Listen("tcp", ":0")
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
var serverConn net.Conn
doneCh := make(chan struct{})
go func() { go func() {
conn, err := l.Accept() defer close(doneCh)
defer l.Close()
var err error
serverConn, err = l.Accept()
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
server.ServeConn(conn)
}() }()
clientConn, err := net.Dial("tcp", l.Addr().String()) clientConn, err := net.Dial("tcp", l.Addr().String())
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
<-doneCh
return clientConn, serverConn
}
func testClientServer(t *testing.T) (*Client, *Server) {
clientConn, serverConn := testConn(t)
server := NewServer(serverConn)
go server.Serve()
client, err := NewClient(clientConn) client, err := NewClient(clientConn)
if err != nil { if err != nil {
server.Close()
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
return client return client, server
} }
...@@ -34,10 +34,11 @@ func TestPostProcessorRPC(t *testing.T) { ...@@ -34,10 +34,11 @@ func TestPostProcessorRPC(t *testing.T) {
p := new(TestPostProcessor) p := new(TestPostProcessor)
// Start the server // Start the server
server := NewServer() client, server := testClientServer(t)
server.RegisterPostProcessor(p)
client := testClient(t, server)
defer client.Close() defer client.Close()
defer server.Close()
server.RegisterPostProcessor(p)
ppClient := client.PostProcessor() ppClient := client.PostProcessor()
// Test Configure // Test Configure
......
...@@ -12,84 +12,68 @@ import ( ...@@ -12,84 +12,68 @@ import (
var endpointId uint64 var endpointId uint64
const ( const (
DefaultArtifactEndpoint string = "Artifact" DefaultArtifactEndpoint string = "Artifact"
DefaultCacheEndpoint = "Cache"
DefaultPostProcessorEndpoint = "PostProcessor"
DefaultUiEndpoint = "Ui"
) )
// Server represents an RPC server for Packer. This must be paired on // Server represents an RPC server for Packer. This must be paired on
// the other side with a Client. // the other side with a Client.
type Server struct { type Server struct {
components map[string]interface{} mux *MuxConn
server *rpc.Server
} }
// NewServer returns a new Packer RPC server. // NewServer returns a new Packer RPC server.
func NewServer() *Server { func NewServer(conn io.ReadWriteCloser) *Server {
return &Server{ return &Server{
components: make(map[string]interface{}), mux: NewMuxConn(conn),
server: rpc.NewServer(),
} }
} }
func (s *Server) Close() error {
return s.mux.Close()
}
func (s *Server) RegisterArtifact(a packer.Artifact) { func (s *Server) RegisterArtifact(a packer.Artifact) {
s.components[DefaultArtifactEndpoint] = a s.server.RegisterName(DefaultArtifactEndpoint, &ArtifactServer{
artifact: a,
})
} }
func (s *Server) RegisterCache(c packer.Cache) { func (s *Server) RegisterCache(c packer.Cache) {
s.components["Cache"] = c s.server.RegisterName(DefaultCacheEndpoint, &CacheServer{
cache: c,
})
} }
func (s *Server) RegisterPostProcessor(p packer.PostProcessor) { func (s *Server) RegisterPostProcessor(p packer.PostProcessor) {
s.components["PostProcessor"] = p s.server.RegisterName(DefaultPostProcessorEndpoint, &PostProcessorServer{
p: p,
})
}
func (s *Server) RegisterUi(ui packer.Ui) {
s.server.RegisterName(DefaultUiEndpoint, &UiServer{
ui: ui,
})
} }
// ServeConn serves a single connection over the RPC server. It is up // ServeConn serves a single connection over the RPC server. It is up
// to the caller to obtain a proper io.ReadWriteCloser. // to the caller to obtain a proper io.ReadWriteCloser.
func (s *Server) ServeConn(conn io.ReadWriteCloser) { func (s *Server) Serve() {
mux := NewMuxConn(conn)
defer mux.Close()
// Accept a connection on stream ID 0, which is always used for // Accept a connection on stream ID 0, which is always used for
// normal client to server connections. // normal client to server connections.
stream, err := mux.Accept(0) stream, err := s.mux.Accept(0)
defer stream.Close()
if err != nil { if err != nil {
log.Printf("[ERR] Error retrieving stream for serving: %s", err) log.Printf("[ERR] Error retrieving stream for serving: %s", err)
return return
} }
clientConn, err := mux.Dial(1) s.server.ServeConn(stream)
if err != nil {
log.Printf("[ERR] Error connecting to client stream: %s", err)
return
}
client := rpc.NewClient(clientConn)
// Create the RPC server
server := rpc.NewServer()
for endpoint, iface := range s.components {
var endpointVal interface{}
switch v := iface.(type) {
case packer.Artifact:
endpointVal = &ArtifactServer{
artifact: v,
}
case packer.Cache:
endpointVal = &CacheServer{
cache: v,
}
case packer.PostProcessor:
endpointVal = &PostProcessorServer{
client: client,
server: server,
p: v,
}
default:
log.Printf("[ERR] Unknown component for endpoint: %s", endpoint)
return
}
registerComponent(server, endpoint, endpointVal, false)
}
server.ServeConn(stream)
} }
// registerComponent registers a single Packer RPC component onto // registerComponent registers a single Packer RPC component onto
......
package rpc package rpc
import ( import (
"net/rpc"
"reflect" "reflect"
"testing" "testing"
) )
...@@ -52,17 +51,12 @@ func TestUiRPC(t *testing.T) { ...@@ -52,17 +51,12 @@ func TestUiRPC(t *testing.T) {
ui := new(testUi) ui := new(testUi)
// Start the RPC server // Start the RPC server
server := rpc.NewServer() client, server := testClientServer(t)
RegisterUi(server, ui) defer client.Close()
address := serveSingleConn(server) defer server.Close()
server.RegisterUi(ui)
// Create the client over RPC and run some methods to verify it works uiClient := client.Ui()
client, err := rpc.Dial("tcp", address)
if err != nil {
panic(err)
}
uiClient := &Ui{client: client}
// Basic error and say tests // Basic error and say tests
result, err := uiClient.Ask("query") result, err := uiClient.Ask("query")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment