Commit 2ba713d7 authored by Mitchell Hashimoto's avatar Mitchell Hashimoto

packer/rpc: Builder

parent e6939938
package rpc package rpc
import ( import (
"encoding/gob"
"fmt"
"github.com/mitchellh/packer/packer" "github.com/mitchellh/packer/packer"
"log" "log"
"net/rpc" "net/rpc"
...@@ -12,35 +10,27 @@ import ( ...@@ -12,35 +10,27 @@ import (
// over an RPC connection. // over an RPC connection.
type builder struct { type builder struct {
client *rpc.Client client *rpc.Client
mux *MuxConn
} }
// BuilderServer wraps a packer.Builder implementation and makes it exportable // BuilderServer wraps a packer.Builder implementation and makes it exportable
// as part of a Golang RPC server. // as part of a Golang RPC server.
type BuilderServer struct { type BuilderServer struct {
builder packer.Builder builder packer.Builder
mux *MuxConn
} }
type BuilderPrepareArgs struct { type BuilderPrepareArgs struct {
Configs []interface{} Configs []interface{}
} }
type BuilderRunArgs struct {
RPCAddress string
ResponseAddress string
}
type BuilderPrepareResponse struct { type BuilderPrepareResponse struct {
Warnings []string Warnings []string
Error error Error error
} }
type BuilderRunResponse struct {
Err error
RPCAddress string
}
func Builder(client *rpc.Client) *builder { func Builder(client *rpc.Client) *builder {
return &builder{client} return &builder{client: client}
} }
func (b *builder) Prepare(config ...interface{}) ([]string, error) { func (b *builder) Prepare(config ...interface{}) ([]string, error) {
...@@ -54,58 +44,28 @@ func (b *builder) Prepare(config ...interface{}) ([]string, error) { ...@@ -54,58 +44,28 @@ func (b *builder) Prepare(config ...interface{}) ([]string, error) {
} }
func (b *builder) Run(ui packer.Ui, hook packer.Hook, cache packer.Cache) (packer.Artifact, error) { func (b *builder) Run(ui packer.Ui, hook packer.Hook, cache packer.Cache) (packer.Artifact, error) {
// Create and start the server for the Build and UI nextId := b.mux.NextId()
server := rpc.NewServer() server := NewServerWithMux(b.mux, nextId)
RegisterCache(server, cache) server.RegisterCache(cache)
RegisterHook(server, hook) server.RegisterHook(hook)
RegisterUi(server, ui) server.RegisterUi(ui)
go server.Serve()
// Create a server for the response
responseL := netListenerInRange(portRangeMin, portRangeMax) var responseId uint32
runResponseCh := make(chan *BuilderRunResponse) if err := b.client.Call("Builder.Run", nextId, &responseId); err != nil {
go func() {
defer responseL.Close()
var response BuilderRunResponse
defer func() { runResponseCh <- &response }()
conn, err := responseL.Accept()
if err != nil {
response.Err = err
return
}
defer conn.Close()
decoder := gob.NewDecoder(conn)
if err := decoder.Decode(&response); err != nil {
response.Err = fmt.Errorf("Error waiting for Run: %s", err)
}
}()
args := &BuilderRunArgs{
serveSingleConn(server),
responseL.Addr().String(),
}
if err := b.client.Call("Builder.Run", args, new(interface{})); err != nil {
return nil, err return nil, err
} }
response := <-runResponseCh if responseId == 0 {
if response.Err != nil {
return nil, response.Err
}
if response.RPCAddress == "" {
return nil, nil return nil, nil
} }
client, err := rpcDial(response.RPCAddress) client, err := NewClientWithMux(b.mux, responseId)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return Artifact(client), nil return client.Artifact(), nil
} }
func (b *builder) Cancel() { func (b *builder) Cancel() {
...@@ -127,45 +87,26 @@ func (b *BuilderServer) Prepare(args *BuilderPrepareArgs, reply *BuilderPrepareR ...@@ -127,45 +87,26 @@ func (b *BuilderServer) Prepare(args *BuilderPrepareArgs, reply *BuilderPrepareR
return nil return nil
} }
func (b *BuilderServer) Run(args *BuilderRunArgs, reply *interface{}) error { func (b *BuilderServer) Run(streamId uint32, reply *uint32) error {
client, err := rpcDial(args.RPCAddress) client, err := NewClientWithMux(b.mux, streamId)
if err != nil { if err != nil {
return err return NewBasicError(err)
} }
defer client.Close()
responseC, err := tcpDial(args.ResponseAddress) artifact, err := b.builder.Run(client.Ui(), client.Hook(), client.Cache())
if err != nil { if err != nil {
return err return NewBasicError(err)
} }
responseWriter := gob.NewEncoder(responseC) *reply = 0
if artifact != nil {
// Run the build in a goroutine so we don't block the RPC connection streamId = b.mux.NextId()
go func() { server := NewServerWithMux(b.mux, streamId)
defer responseC.Close() server.RegisterArtifact(artifact)
go server.Serve()
cache := Cache(client) *reply = streamId
hook := Hook(client) }
ui := &Ui{client: client}
artifact, responseErr := b.builder.Run(ui, hook, cache)
responseAddress := ""
if responseErr == nil && artifact != nil {
// Wrap the artifact
server := rpc.NewServer()
RegisterArtifact(server, artifact)
responseAddress = serveSingleConn(server)
}
if responseErr != nil {
responseErr = NewBasicError(responseErr)
}
err := responseWriter.Encode(&BuilderRunResponse{responseErr, responseAddress})
if err != nil {
log.Printf("BuildServer.Run error: %s", err)
}
}()
return nil return nil
} }
......
...@@ -2,31 +2,19 @@ package rpc ...@@ -2,31 +2,19 @@ package rpc
import ( import (
"github.com/mitchellh/packer/packer" "github.com/mitchellh/packer/packer"
"net/rpc"
"reflect" "reflect"
"testing" "testing"
) )
var testBuilderArtifact = &packer.MockArtifact{} var testBuilderArtifact = &packer.MockArtifact{}
func builderRPCClient(t *testing.T) (*packer.MockBuilder, packer.Builder) {
b := new(packer.MockBuilder)
// Start the server
server := rpc.NewServer()
RegisterBuilder(server, b)
address := serveSingleConn(server)
// Create the client over RPC and run some methods to verify it works
client, err := rpc.Dial("tcp", address)
if err != nil {
t.Fatalf("err: %s", err)
}
return b, Builder(client)
}
func TestBuilderPrepare(t *testing.T) { func TestBuilderPrepare(t *testing.T) {
b, bClient := builderRPCClient(t) b := new(packer.MockBuilder)
client, server := testClientServer(t)
defer client.Close()
defer server.Close()
server.RegisterBuilder(b)
bClient := client.Builder()
// Test Prepare // Test Prepare
config := 42 config := 42
...@@ -48,7 +36,12 @@ func TestBuilderPrepare(t *testing.T) { ...@@ -48,7 +36,12 @@ func TestBuilderPrepare(t *testing.T) {
} }
func TestBuilderPrepare_Warnings(t *testing.T) { func TestBuilderPrepare_Warnings(t *testing.T) {
b, bClient := builderRPCClient(t) b := new(packer.MockBuilder)
client, server := testClientServer(t)
defer client.Close()
defer server.Close()
server.RegisterBuilder(b)
bClient := client.Builder()
expected := []string{"foo"} expected := []string{"foo"}
b.PrepareWarnings = expected b.PrepareWarnings = expected
...@@ -64,7 +57,12 @@ func TestBuilderPrepare_Warnings(t *testing.T) { ...@@ -64,7 +57,12 @@ func TestBuilderPrepare_Warnings(t *testing.T) {
} }
func TestBuilderRun(t *testing.T) { func TestBuilderRun(t *testing.T) {
b, bClient := builderRPCClient(t) b := new(packer.MockBuilder)
client, server := testClientServer(t)
defer client.Close()
defer server.Close()
server.RegisterBuilder(b)
bClient := client.Builder()
// Test Run // Test Run
cache := new(testCache) cache := new(testCache)
...@@ -79,34 +77,21 @@ func TestBuilderRun(t *testing.T) { ...@@ -79,34 +77,21 @@ func TestBuilderRun(t *testing.T) {
t.Fatal("run should be called") t.Fatal("run should be called")
} }
b.RunCache.Lock("foo")
if !cache.lockCalled {
t.Fatal("should be called")
}
b.RunHook.Run("foo", nil, nil, nil)
if !hook.RunCalled {
t.Fatal("should be called")
}
b.RunUi.Say("format")
if !ui.sayCalled {
t.Fatal("say should be called")
}
if ui.sayMessage != "format" {
t.Fatalf("bad: %s", ui.sayMessage)
}
if artifact.Id() != testBuilderArtifact.Id() { if artifact.Id() != testBuilderArtifact.Id() {
t.Fatalf("bad: %s", artifact.Id()) t.Fatalf("bad: %s", artifact.Id())
} }
} }
func TestBuilderRun_nilResult(t *testing.T) { func TestBuilderRun_nilResult(t *testing.T) {
b, bClient := builderRPCClient(t) b := new(packer.MockBuilder)
b.RunNilResult = true b.RunNilResult = true
client, server := testClientServer(t)
defer client.Close()
defer server.Close()
server.RegisterBuilder(b)
bClient := client.Builder()
cache := new(testCache) cache := new(testCache)
hook := &packer.MockHook{} hook := &packer.MockHook{}
ui := &testUi{} ui := &testUi{}
...@@ -120,7 +105,13 @@ func TestBuilderRun_nilResult(t *testing.T) { ...@@ -120,7 +105,13 @@ func TestBuilderRun_nilResult(t *testing.T) {
} }
func TestBuilderRun_ErrResult(t *testing.T) { func TestBuilderRun_ErrResult(t *testing.T) {
b, bClient := builderRPCClient(t) b := new(packer.MockBuilder)
client, server := testClientServer(t)
defer client.Close()
defer server.Close()
server.RegisterBuilder(b)
bClient := client.Builder()
b.RunErrResult = true b.RunErrResult = true
cache := new(testCache) cache := new(testCache)
...@@ -136,7 +127,12 @@ func TestBuilderRun_ErrResult(t *testing.T) { ...@@ -136,7 +127,12 @@ func TestBuilderRun_ErrResult(t *testing.T) {
} }
func TestBuilderCancel(t *testing.T) { func TestBuilderCancel(t *testing.T) {
b, bClient := builderRPCClient(t) b := new(packer.MockBuilder)
client, server := testClientServer(t)
defer client.Close()
defer server.Close()
server.RegisterBuilder(b)
bClient := client.Builder()
bClient.Cancel() bClient.Cancel()
if !b.CancelCalled { if !b.CancelCalled {
......
...@@ -45,6 +45,13 @@ func (c *Client) Artifact() packer.Artifact { ...@@ -45,6 +45,13 @@ func (c *Client) Artifact() packer.Artifact {
} }
} }
func (c *Client) Builder() packer.Builder {
return &builder{
client: c.client,
mux: c.mux,
}
}
func (c *Client) Cache() packer.Cache { func (c *Client) Cache() packer.Cache {
return &cache{ return &cache{
client: c.client, client: c.client,
......
...@@ -20,7 +20,7 @@ func RegisterBuild(s *rpc.Server, b packer.Build) { ...@@ -20,7 +20,7 @@ func RegisterBuild(s *rpc.Server, b packer.Build) {
// Registers the appropriate endpoint on an RPC server to serve a // Registers the appropriate endpoint on an RPC server to serve a
// Packer Builder. // Packer Builder.
func RegisterBuilder(s *rpc.Server, b packer.Builder) { func RegisterBuilder(s *rpc.Server, b packer.Builder) {
registerComponent(s, "Builder", &BuilderServer{b}, false) registerComponent(s, "Builder", &BuilderServer{builder: b}, false)
} }
// Registers the appropriate endpoint on an RPC server to serve a // Registers the appropriate endpoint on an RPC server to serve a
......
...@@ -13,6 +13,7 @@ var endpointId uint64 ...@@ -13,6 +13,7 @@ var endpointId uint64
const ( const (
DefaultArtifactEndpoint string = "Artifact" DefaultArtifactEndpoint string = "Artifact"
DefaultBuilderEndpoint = "Builder"
DefaultCacheEndpoint = "Cache" DefaultCacheEndpoint = "Cache"
DefaultCommandEndpoint = "Command" DefaultCommandEndpoint = "Command"
DefaultCommunicatorEndpoint = "Communicator" DefaultCommunicatorEndpoint = "Communicator"
...@@ -53,6 +54,13 @@ func (s *Server) RegisterArtifact(a packer.Artifact) { ...@@ -53,6 +54,13 @@ func (s *Server) RegisterArtifact(a packer.Artifact) {
}) })
} }
func (s *Server) RegisterBuilder(b packer.Builder) {
s.server.RegisterName(DefaultBuilderEndpoint, &BuilderServer{
builder: b,
mux: s.mux,
})
}
func (s *Server) RegisterCache(c packer.Cache) { func (s *Server) RegisterCache(c packer.Cache) {
s.server.RegisterName(DefaultCacheEndpoint, &CacheServer{ s.server.RegisterName(DefaultCacheEndpoint, &CacheServer{
cache: c, cache: c,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment