Commit a036bec9 authored by Mitchell Hashimoto's avatar Mitchell Hashimoto

packer/rpc: Hook

parent db06fc75
...@@ -58,6 +58,13 @@ func (c *Client) Communicator() packer.Communicator { ...@@ -58,6 +58,13 @@ func (c *Client) Communicator() packer.Communicator {
} }
} }
func (c *Client) Hook() packer.Hook {
return &hook{
client: c.client,
mux: c.mux,
}
}
func (c *Client) PostProcessor() packer.PostProcessor { func (c *Client) PostProcessor() packer.PostProcessor {
return &postProcessor{ return &postProcessor{
client: c.client, client: c.client,
......
...@@ -10,32 +10,40 @@ import ( ...@@ -10,32 +10,40 @@ import (
// over an RPC connection. // over an RPC connection.
type hook struct { type hook struct {
client *rpc.Client client *rpc.Client
mux *MuxConn
} }
// HookServer wraps a packer.Hook implementation and makes it exportable // HookServer wraps a packer.Hook implementation and makes it exportable
// as part of a Golang RPC server. // as part of a Golang RPC server.
type HookServer struct { type HookServer struct {
hook packer.Hook hook packer.Hook
mux *MuxConn
} }
type HookRunArgs struct { type HookRunArgs struct {
Name string Name string
Data interface{} Data interface{}
RPCAddress string StreamId uint32
} }
func Hook(client *rpc.Client) *hook { func Hook(client *rpc.Client) *hook {
return &hook{client} return &hook{client: client}
} }
func (h *hook) Run(name string, ui packer.Ui, comm packer.Communicator, data interface{}) error { func (h *hook) Run(name string, ui packer.Ui, comm packer.Communicator, data interface{}) error {
server := rpc.NewServer() nextId := h.mux.NextId()
RegisterCommunicator(server, comm) server := NewServerWithMux(h.mux, nextId)
RegisterUi(server, ui) server.RegisterCommunicator(comm)
address := serveSingleConn(server) server.RegisterUi(ui)
go server.Serve()
args := &HookRunArgs{name, data, address} args := HookRunArgs{
return h.client.Call("Hook.Run", args, new(interface{})) Name: name,
Data: data,
StreamId: nextId,
}
return h.client.Call("Hook.Run", &args, new(interface{}))
} }
func (h *hook) Cancel() { func (h *hook) Cancel() {
...@@ -46,12 +54,13 @@ func (h *hook) Cancel() { ...@@ -46,12 +54,13 @@ func (h *hook) Cancel() {
} }
func (h *HookServer) Run(args *HookRunArgs, reply *interface{}) error { func (h *HookServer) Run(args *HookRunArgs, reply *interface{}) error {
client, err := rpcDial(args.RPCAddress) client, err := NewClientWithMux(h.mux, args.StreamId)
if err != nil { if err != nil {
return err return NewBasicError(err)
} }
defer client.Close()
if err := h.hook.Run(args.Name, &Ui{client: client}, Communicator(client), args.Data); err != nil { if err := h.hook.Run(args.Name, client.Ui(), client.Communicator(), args.Data); err != nil {
return NewBasicError(err) return NewBasicError(err)
} }
......
...@@ -2,7 +2,6 @@ package rpc ...@@ -2,7 +2,6 @@ package rpc
import ( import (
"github.com/mitchellh/packer/packer" "github.com/mitchellh/packer/packer"
"net/rpc"
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
...@@ -14,17 +13,11 @@ func TestHookRPC(t *testing.T) { ...@@ -14,17 +13,11 @@ func TestHookRPC(t *testing.T) {
h := new(packer.MockHook) h := new(packer.MockHook)
// Serve // Serve
server := rpc.NewServer() client, server := testClientServer(t)
RegisterHook(server, h) defer client.Close()
address := serveSingleConn(server) defer server.Close()
server.RegisterHook(h)
// Create the client over RPC and run some methods to verify it works hClient := client.Hook()
client, err := rpc.Dial("tcp", address)
if err != nil {
t.Fatalf("err: %s", err)
}
hClient := Hook(client)
// Test Run // Test Run
ui := &testUi{} ui := &testUi{}
...@@ -60,17 +53,11 @@ func TestHook_cancelWhileRun(t *testing.T) { ...@@ -60,17 +53,11 @@ func TestHook_cancelWhileRun(t *testing.T) {
} }
// Serve // Serve
server := rpc.NewServer() client, server := testClientServer(t)
RegisterHook(server, h) defer client.Close()
address := serveSingleConn(server) defer server.Close()
server.RegisterHook(h)
// Create the client over RPC and run some methods to verify it works hClient := client.Hook()
client, err := rpc.Dial("tcp", address)
if err != nil {
t.Fatalf("err: %s", err)
}
hClient := Hook(client)
// Start the run // Start the run
finished := make(chan struct{}) finished := make(chan struct{})
......
...@@ -50,7 +50,7 @@ func RegisterEnvironment(s *rpc.Server, e packer.Environment) { ...@@ -50,7 +50,7 @@ func RegisterEnvironment(s *rpc.Server, e packer.Environment) {
// Registers the appropriate endpoint on an RPC server to serve a // Registers the appropriate endpoint on an RPC server to serve a
// Hook. // Hook.
func RegisterHook(s *rpc.Server, h packer.Hook) { func RegisterHook(s *rpc.Server, h packer.Hook) {
registerComponent(s, "Hook", &HookServer{h}, false) registerComponent(s, "Hook", &HookServer{hook: h}, false)
} }
// Registers the appropriate endpoing on an RPC server to serve a // Registers the appropriate endpoing on an RPC server to serve a
......
...@@ -15,6 +15,7 @@ const ( ...@@ -15,6 +15,7 @@ const (
DefaultArtifactEndpoint string = "Artifact" DefaultArtifactEndpoint string = "Artifact"
DefaultCacheEndpoint = "Cache" DefaultCacheEndpoint = "Cache"
DefaultCommunicatorEndpoint = "Communicator" DefaultCommunicatorEndpoint = "Communicator"
DefaultHookEndpoint = "Hook"
DefaultPostProcessorEndpoint = "PostProcessor" DefaultPostProcessorEndpoint = "PostProcessor"
DefaultUiEndpoint = "Ui" DefaultUiEndpoint = "Ui"
) )
...@@ -63,6 +64,13 @@ func (s *Server) RegisterCommunicator(c packer.Communicator) { ...@@ -63,6 +64,13 @@ func (s *Server) RegisterCommunicator(c packer.Communicator) {
}) })
} }
func (s *Server) RegisterHook(h packer.Hook) {
s.server.RegisterName(DefaultHookEndpoint, &HookServer{
hook: h,
mux: s.mux,
})
}
func (s *Server) RegisterPostProcessor(p packer.PostProcessor) { func (s *Server) RegisterPostProcessor(p packer.PostProcessor) {
s.server.RegisterName(DefaultPostProcessorEndpoint, &PostProcessorServer{ s.server.RegisterName(DefaultPostProcessorEndpoint, &PostProcessorServer{
mux: s.mux, mux: s.mux,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment