Commit 4ba5c2ef authored by Mitchell Hashimoto's avatar Mitchell Hashimoto

packer/rpc: post-processors work on a single connection

parent a32cd59c
...@@ -8,7 +8,8 @@ import ( ...@@ -8,7 +8,8 @@ import (
// An implementation of packer.Artifact where the artifact is actually // An implementation of packer.Artifact where the artifact is actually
// available over an RPC connection. // available over an RPC connection.
type artifact struct { type artifact struct {
client *rpc.Client client *rpc.Client
endpoint string
} }
// ArtifactServer wraps a packer.Artifact implementation and makes it // ArtifactServer wraps a packer.Artifact implementation and makes it
...@@ -18,32 +19,32 @@ type ArtifactServer struct { ...@@ -18,32 +19,32 @@ type ArtifactServer struct {
} }
func Artifact(client *rpc.Client) *artifact { func Artifact(client *rpc.Client) *artifact {
return &artifact{client} return &artifact{client: client}
} }
func (a *artifact) BuilderId() (result string) { func (a *artifact) BuilderId() (result string) {
a.client.Call("Artifact.BuilderId", new(interface{}), &result) a.client.Call(a.endpoint+".BuilderId", new(interface{}), &result)
return return
} }
func (a *artifact) Files() (result []string) { func (a *artifact) Files() (result []string) {
a.client.Call("Artifact.Files", new(interface{}), &result) a.client.Call(a.endpoint+".Files", new(interface{}), &result)
return return
} }
func (a *artifact) Id() (result string) { func (a *artifact) Id() (result string) {
a.client.Call("Artifact.Id", new(interface{}), &result) a.client.Call(a.endpoint+".Id", new(interface{}), &result)
return return
} }
func (a *artifact) String() (result string) { func (a *artifact) String() (result string) {
a.client.Call("Artifact.String", new(interface{}), &result) a.client.Call(a.endpoint+".String", new(interface{}), &result)
return return
} }
func (a *artifact) Destroy() error { func (a *artifact) Destroy() error {
var result error var result error
if err := a.client.Call("Artifact.Destroy", new(interface{}), &result); err != nil { if err := a.client.Call(a.endpoint+".Destroy", new(interface{}), &result); err != nil {
return err return err
} }
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
type Client struct { type Client struct {
mux *MuxConn mux *MuxConn
client *rpc.Client client *rpc.Client
server *rpc.Server
} }
func NewClient(rwc io.ReadWriteCloser) (*Client, error) { func NewClient(rwc io.ReadWriteCloser) (*Client, error) {
...@@ -20,14 +21,28 @@ func NewClient(rwc io.ReadWriteCloser) (*Client, error) { ...@@ -20,14 +21,28 @@ func NewClient(rwc io.ReadWriteCloser) (*Client, error) {
// remote RPC server. On the remote side Server.ServeConn also listens // remote RPC server. On the remote side Server.ServeConn also listens
// on this stream ID. // on this stream ID.
mux := NewMuxConn(rwc) mux := NewMuxConn(rwc)
stream, err := mux.Dial(0) clientConn, err := mux.Dial(0)
if err != nil { if err != nil {
mux.Close()
return nil, err return nil, err
} }
// Accept connection ID 1 which is what the remote end uses to
// be an RPC client back to us so we can even serve some objects.
serverConn, err := mux.Accept(1)
if err != nil {
mux.Close()
return nil, err
}
// Start our RPC server on this end
server := rpc.NewServer()
go server.ServeConn(serverConn)
return &Client{ return &Client{
mux: mux, mux: mux,
client: rpc.NewClient(stream), client: rpc.NewClient(clientConn),
server: server,
}, nil }, nil
} }
...@@ -41,7 +56,8 @@ func (c *Client) Close() error { ...@@ -41,7 +56,8 @@ func (c *Client) Close() error {
func (c *Client) Artifact() packer.Artifact { func (c *Client) Artifact() packer.Artifact {
return &artifact{ return &artifact{
client: c.client, client: c.client,
endpoint: DefaultArtifactEndpoint,
} }
} }
...@@ -54,5 +70,6 @@ func (c *Client) Cache() packer.Cache { ...@@ -54,5 +70,6 @@ func (c *Client) Cache() packer.Cache {
func (c *Client) PostProcessor() packer.PostProcessor { func (c *Client) PostProcessor() packer.PostProcessor {
return &postProcessor{ return &postProcessor{
client: c.client, client: c.client,
server: c.server,
} }
} }
...@@ -9,27 +9,36 @@ import ( ...@@ -9,27 +9,36 @@ import (
// executed over an RPC connection. // executed over an RPC connection.
type postProcessor struct { type postProcessor struct {
client *rpc.Client client *rpc.Client
server *rpc.Server
} }
// PostProcessorServer wraps a packer.PostProcessor implementation and makes it // PostProcessorServer wraps a packer.PostProcessor implementation and makes it
// exportable as part of a Golang RPC server. // exportable as part of a Golang RPC server.
type PostProcessorServer struct { type PostProcessorServer struct {
p packer.PostProcessor client *rpc.Client
server *rpc.Server
p packer.PostProcessor
} }
type PostProcessorConfigureArgs struct { type PostProcessorConfigureArgs struct {
Configs []interface{} Configs []interface{}
} }
type PostProcessorPostProcessArgs struct {
ArtifactEndpoint string
UiEndpoint string
}
type PostProcessorProcessResponse struct { type PostProcessorProcessResponse struct {
Err error Err error
Keep bool Keep bool
RPCAddress string ArtifactEndpoint string
} }
func PostProcessor(client *rpc.Client) *postProcessor { func PostProcessor(client *rpc.Client) *postProcessor {
return &postProcessor{client} return &postProcessor{client: client}
} }
func (p *postProcessor) Configure(raw ...interface{}) (err error) { func (p *postProcessor) Configure(raw ...interface{}) (err error) {
args := &PostProcessorConfigureArgs{Configs: raw} args := &PostProcessorConfigureArgs{Configs: raw}
if cerr := p.client.Call("PostProcessor.Configure", args, &err); cerr != nil { if cerr := p.client.Call("PostProcessor.Configure", args, &err); cerr != nil {
...@@ -40,12 +49,21 @@ func (p *postProcessor) Configure(raw ...interface{}) (err error) { ...@@ -40,12 +49,21 @@ func (p *postProcessor) Configure(raw ...interface{}) (err error) {
} }
func (p *postProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Artifact, bool, error) { func (p *postProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Artifact, bool, error) {
server := rpc.NewServer() artifactEndpoint := registerComponent(p.server, "Artifact", &ArtifactServer{
RegisterArtifact(server, a) artifact: a,
RegisterUi(server, ui) }, true)
uiEndpoint := registerComponent(p.server, "Ui", &UiServer{
ui: ui,
}, true)
args := PostProcessorPostProcessArgs{
ArtifactEndpoint: artifactEndpoint,
UiEndpoint: uiEndpoint,
}
var response PostProcessorProcessResponse var response PostProcessorProcessResponse
if err := p.client.Call("PostProcessor.PostProcess", serveSingleConn(server), &response); err != nil { if err := p.client.Call("PostProcessor.PostProcess", &args, &response); err != nil {
return nil, false, err return nil, false, err
} }
...@@ -53,16 +71,14 @@ func (p *postProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Art ...@@ -53,16 +71,14 @@ func (p *postProcessor) PostProcess(ui packer.Ui, a packer.Artifact) (packer.Art
return nil, false, response.Err return nil, false, response.Err
} }
if response.RPCAddress == "" { if response.ArtifactEndpoint == "" {
return nil, false, nil return nil, false, nil
} }
client, err := rpcDial(response.RPCAddress) return &artifact{
if err != nil { client: p.client,
return nil, false, err endpoint: response.ArtifactEndpoint,
} }, response.Keep, nil
return Artifact(client), response.Keep, nil
} }
func (p *PostProcessorServer) Configure(args *PostProcessorConfigureArgs, reply *error) error { func (p *PostProcessorServer) Configure(args *PostProcessorConfigureArgs, reply *error) error {
...@@ -74,19 +90,23 @@ func (p *PostProcessorServer) Configure(args *PostProcessorConfigureArgs, reply ...@@ -74,19 +90,23 @@ func (p *PostProcessorServer) Configure(args *PostProcessorConfigureArgs, reply
return nil return nil
} }
func (p *PostProcessorServer) PostProcess(address string, reply *PostProcessorProcessResponse) error { func (p *PostProcessorServer) PostProcess(args *PostProcessorPostProcessArgs, reply *PostProcessorProcessResponse) error {
client, err := rpcDial(address) artifact := &artifact{
if err != nil { client: p.client,
return err endpoint: args.ArtifactEndpoint,
} }
responseAddress := "" ui := &Ui{
client: p.client,
endpoint: args.UiEndpoint,
}
artifact, keep, err := p.p.PostProcess(&Ui{client: client}, Artifact(client)) var artifactEndpoint string
if err == nil && artifact != nil { artifactResult, keep, err := p.p.PostProcess(ui, artifact)
server := rpc.NewServer() if err == nil && artifactResult != nil {
RegisterArtifact(server, artifact) artifactEndpoint = registerComponent(p.server, "Artifact", &ArtifactServer{
responseAddress = serveSingleConn(server) artifact: artifactResult,
}, true)
} }
if err != nil { if err != nil {
...@@ -94,9 +114,9 @@ func (p *PostProcessorServer) PostProcess(address string, reply *PostProcessorPr ...@@ -94,9 +114,9 @@ func (p *PostProcessorServer) PostProcess(address string, reply *PostProcessorPr
} }
*reply = PostProcessorProcessResponse{ *reply = PostProcessorProcessResponse{
Err: err, Err: err,
Keep: keep, Keep: keep,
RPCAddress: responseAddress, ArtifactEndpoint: artifactEndpoint,
} }
return nil return nil
......
...@@ -56,7 +56,9 @@ func TestPostProcessorRPC(t *testing.T) { ...@@ -56,7 +56,9 @@ func TestPostProcessorRPC(t *testing.T) {
} }
// Test PostProcess // Test PostProcess
a := new(packer.MockArtifact) a := &packer.MockArtifact{
IdValue: "ppTestId",
}
ui := new(testUi) ui := new(testUi)
artifact, _, err := ppClient.PostProcess(ui, a) artifact, _, err := ppClient.PostProcess(ui, a)
if err != nil { if err != nil {
...@@ -67,12 +69,12 @@ func TestPostProcessorRPC(t *testing.T) { ...@@ -67,12 +69,12 @@ func TestPostProcessorRPC(t *testing.T) {
t.Fatal("postprocess should be called") t.Fatal("postprocess should be called")
} }
if p.ppArtifact.BuilderId() != "bid" { if p.ppArtifact.Id() != "ppTestId" {
t.Fatal("unknown artifact") t.Fatal("unknown artifact")
} }
if artifact.BuilderId() != "bid" { if artifact.Id() != "id" {
t.Fatal("unknown result artifact") t.Fatalf("unknown artifact: %s", artifact.Id())
} }
} }
......
...@@ -56,7 +56,7 @@ func RegisterHook(s *rpc.Server, h packer.Hook) { ...@@ -56,7 +56,7 @@ func RegisterHook(s *rpc.Server, h packer.Hook) {
// Registers the appropriate endpoing on an RPC server to serve a // Registers the appropriate endpoing on an RPC server to serve a
// PostProcessor. // PostProcessor.
func RegisterPostProcessor(s *rpc.Server, p packer.PostProcessor) { func RegisterPostProcessor(s *rpc.Server, p packer.PostProcessor) {
registerComponent(s, "PostProcessor", &PostProcessorServer{p}, false) registerComponent(s, "PostProcessor", &PostProcessorServer{p: p}, false)
} }
// Registers the appropriate endpoint on an RPC server to serve a packer.Provisioner // Registers the appropriate endpoint on an RPC server to serve a packer.Provisioner
...@@ -70,17 +70,6 @@ func RegisterUi(s *rpc.Server, ui packer.Ui) { ...@@ -70,17 +70,6 @@ func RegisterUi(s *rpc.Server, ui packer.Ui) {
registerComponent(s, "Ui", &UiServer{ui}, false) registerComponent(s, "Ui", &UiServer{ui}, false)
} }
// registerComponent registers a single Packer RPC component onto
// the RPC server. If id is true, then a unique ID number will be appended
// onto the end of the endpoint.
//
// The endpoint name is returned.
func registerComponent(s *rpc.Server, name string, rcvr interface{}, id bool) string {
endpoint := name
s.RegisterName(endpoint, rcvr)
return endpoint
}
func serveSingleConn(s *rpc.Server) string { func serveSingleConn(s *rpc.Server) string {
l := netListenerInRange(portRangeMin, portRangeMax) l := netListenerInRange(portRangeMin, portRangeMax)
......
...@@ -9,31 +9,35 @@ import ( ...@@ -9,31 +9,35 @@ import (
"sync/atomic" "sync/atomic"
) )
var endpointId uint64
const (
DefaultArtifactEndpoint string = "Artifact"
)
// Server represents an RPC server for Packer. This must be paired on // Server represents an RPC server for Packer. This must be paired on
// the other side with a Client. // the other side with a Client.
type Server struct { type Server struct {
endpointId uint64 components map[string]interface{}
rpcServer *rpc.Server
} }
// NewServer returns a new Packer RPC server. // NewServer returns a new Packer RPC server.
func NewServer() *Server { func NewServer() *Server {
return &Server{ return &Server{
endpointId: 0, components: make(map[string]interface{}),
rpcServer: rpc.NewServer(),
} }
} }
func (s *Server) RegisterArtifact(a packer.Artifact) { func (s *Server) RegisterArtifact(a packer.Artifact) {
s.registerComponent("Artifact", &ArtifactServer{a}, false) s.components[DefaultArtifactEndpoint] = a
} }
func (s *Server) RegisterCache(c packer.Cache) { func (s *Server) RegisterCache(c packer.Cache) {
s.registerComponent("Cache", &CacheServer{c}, false) s.components["Cache"] = c
} }
func (s *Server) RegisterPostProcessor(p packer.PostProcessor) { func (s *Server) RegisterPostProcessor(p packer.PostProcessor) {
s.registerComponent("PostProcessor", &PostProcessorServer{p}, false) s.components["PostProcessor"] = p
} }
// ServeConn serves a single connection over the RPC server. It is up // ServeConn serves a single connection over the RPC server. It is up
...@@ -50,7 +54,42 @@ func (s *Server) ServeConn(conn io.ReadWriteCloser) { ...@@ -50,7 +54,42 @@ func (s *Server) ServeConn(conn io.ReadWriteCloser) {
return return
} }
s.rpcServer.ServeConn(stream) clientConn, err := mux.Dial(1)
if err != nil {
log.Printf("[ERR] Error connecting to client stream: %s", err)
return
}
client := rpc.NewClient(clientConn)
// Create the RPC server
server := rpc.NewServer()
for endpoint, iface := range s.components {
var endpointVal interface{}
switch v := iface.(type) {
case packer.Artifact:
endpointVal = &ArtifactServer{
artifact: v,
}
case packer.Cache:
endpointVal = &CacheServer{
cache: v,
}
case packer.PostProcessor:
endpointVal = &PostProcessorServer{
client: client,
server: server,
p: v,
}
default:
log.Printf("[ERR] Unknown component for endpoint: %s", endpoint)
return
}
registerComponent(server, endpoint, endpointVal, false)
}
server.ServeConn(stream)
} }
// registerComponent registers a single Packer RPC component onto // registerComponent registers a single Packer RPC component onto
...@@ -58,12 +97,12 @@ func (s *Server) ServeConn(conn io.ReadWriteCloser) { ...@@ -58,12 +97,12 @@ func (s *Server) ServeConn(conn io.ReadWriteCloser) {
// onto the end of the endpoint. // onto the end of the endpoint.
// //
// The endpoint name is returned. // The endpoint name is returned.
func (s *Server) registerComponent(name string, rcvr interface{}, id bool) string { func registerComponent(server *rpc.Server, name string, rcvr interface{}, id bool) string {
endpoint := name endpoint := name
if id { if id {
fmt.Sprintf("%s.%d", endpoint, atomic.AddUint64(&s.endpointId, 1)) fmt.Sprintf("%s.%d", endpoint, atomic.AddUint64(&endpointId, 1))
} }
s.rpcServer.RegisterName(endpoint, rcvr) server.RegisterName(endpoint, rcvr)
return endpoint return endpoint
} }
...@@ -9,7 +9,8 @@ import ( ...@@ -9,7 +9,8 @@ import (
// An implementation of packer.Ui where the Ui is actually executed // An implementation of packer.Ui where the Ui is actually executed
// over an RPC connection. // over an RPC connection.
type Ui struct { type Ui struct {
client *rpc.Client client *rpc.Client
endpoint string
} }
// UiServer wraps a packer.Ui implementation and makes it exportable // UiServer wraps a packer.Ui implementation and makes it exportable
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment