Commit 2a84720d authored by Jacob Vosmaer's avatar Jacob Vosmaer

Use reader/writer from gitaly streamio

parent 45ce9636
...@@ -2,6 +2,7 @@ package git ...@@ -2,6 +2,7 @@ package git
import ( import (
"fmt" "fmt"
"io"
"net/http" "net/http"
"gitlab.com/gitlab-org/gitlab-workhorse/internal/api" "gitlab.com/gitlab-org/gitlab-workhorse/internal/api"
...@@ -71,12 +72,12 @@ func handleGetInfoRefsWithGitaly(w http.ResponseWriter, a *api.Response, rpc str ...@@ -71,12 +72,12 @@ func handleGetInfoRefsWithGitaly(w http.ResponseWriter, a *api.Response, rpc str
ctx, cancelFunc := context.WithCancel(context.Background()) ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc() defer cancelFunc()
infoRefsResponseWriter, err := smarthttp.InfoRefsResponseWriterTo(ctx, &a.Repository, rpc) infoRefsResponseReader, err := smarthttp.InfoRefsResponseReader(ctx, &a.Repository, rpc)
if err != nil { if err != nil {
return fmt.Errorf("GetInfoRefsHandler: %v", err) return fmt.Errorf("GetInfoRefsHandler: %v", err)
} }
if _, err = infoRefsResponseWriter.WriteTo(w); err != nil { if _, err = io.Copy(w, infoRefsResponseReader); err != nil {
return fmt.Errorf("GetInfoRefsHandler: copy Gitaly response: %v", err) return fmt.Errorf("GetInfoRefsHandler: copy Gitaly response: %v", err)
} }
......
...@@ -7,7 +7,7 @@ import ( ...@@ -7,7 +7,7 @@ import (
"strconv" "strconv"
pb "gitlab.com/gitlab-org/gitaly-proto/go" pb "gitlab.com/gitlab-org/gitaly-proto/go"
pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper" "gitlab.com/gitlab-org/gitaly/streamio"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
...@@ -26,7 +26,7 @@ func (client *CommitClient) SendBlob(w http.ResponseWriter, request *pb.TreeEntr ...@@ -26,7 +26,7 @@ func (client *CommitClient) SendBlob(w http.ResponseWriter, request *pb.TreeEntr
} }
firstResponseReceived := false firstResponseReceived := false
rr := pbhelper.NewReceiveReader(func() ([]byte, error) { rr := streamio.NewReader(func() ([]byte, error) {
resp, err := c.Recv() resp, err := c.Recv()
if !firstResponseReceived && err == nil { if !firstResponseReceived && err == nil {
......
...@@ -5,7 +5,7 @@ import ( ...@@ -5,7 +5,7 @@ import (
"io" "io"
pb "gitlab.com/gitlab-org/gitaly-proto/go" pb "gitlab.com/gitlab-org/gitaly-proto/go"
pbhelper "gitlab.com/gitlab-org/gitaly-proto/go/helper" "gitlab.com/gitlab-org/gitaly/streamio"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
...@@ -14,25 +14,30 @@ type SmartHTTPClient struct { ...@@ -14,25 +14,30 @@ type SmartHTTPClient struct {
pb.SmartHTTPClient pb.SmartHTTPClient
} }
func (client *SmartHTTPClient) InfoRefsResponseWriterTo(ctx context.Context, repo *pb.Repository, rpc string) (io.WriterTo, error) { func (client *SmartHTTPClient) InfoRefsResponseReader(ctx context.Context, repo *pb.Repository, rpc string) (io.Reader, error) {
rpcRequest := &pb.InfoRefsRequest{Repository: repo} rpcRequest := &pb.InfoRefsRequest{Repository: repo}
var c pbhelper.InfoRefsClient
var err error
switch rpc { switch rpc {
case "git-upload-pack": case "git-upload-pack":
c, err = client.InfoRefsUploadPack(ctx, rpcRequest) stream, err := client.InfoRefsUploadPack(ctx, rpcRequest)
return infoRefsReader(stream), err
case "git-receive-pack": case "git-receive-pack":
c, err = client.InfoRefsReceivePack(ctx, rpcRequest) stream, err := client.InfoRefsReceivePack(ctx, rpcRequest)
return infoRefsReader(stream), err
default: default:
return nil, fmt.Errorf("InfoRefsResponseWriterTo: Unsupported RPC: %q", rpc) return nil, fmt.Errorf("InfoRefsResponseWriterTo: Unsupported RPC: %q", rpc)
} }
}
if err != nil { type infoRefsClient interface {
return nil, fmt.Errorf("InfoRefsResponseWriterTo: RPC call failed: %v", err) Recv() (*pb.InfoRefsResponse, error)
} }
return &pbhelper.InfoRefsClientWriterTo{c}, nil func infoRefsReader(stream infoRefsClient) io.Reader {
return streamio.NewReader(func() ([]byte, error) {
resp, err := stream.Recv()
return resp.GetData(), err
})
} }
func (client *SmartHTTPClient) ReceivePack(repo *pb.Repository, glId string, glRepository string, clientRequest io.Reader, clientResponse io.Writer) error { func (client *SmartHTTPClient) ReceivePack(repo *pb.Repository, glId string, glRepository string, clientRequest io.Reader, clientResponse io.Writer) error {
...@@ -58,7 +63,7 @@ func (client *SmartHTTPClient) ReceivePack(repo *pb.Repository, glId string, glR ...@@ -58,7 +63,7 @@ func (client *SmartHTTPClient) ReceivePack(repo *pb.Repository, glId string, glR
errC := make(chan error, numStreams) errC := make(chan error, numStreams)
go func() { go func() {
rr := pbhelper.NewReceiveReader(func() ([]byte, error) { rr := streamio.NewReader(func() ([]byte, error) {
response, err := stream.Recv() response, err := stream.Recv()
return response.GetData(), err return response.GetData(), err
}) })
...@@ -67,7 +72,7 @@ func (client *SmartHTTPClient) ReceivePack(repo *pb.Repository, glId string, glR ...@@ -67,7 +72,7 @@ func (client *SmartHTTPClient) ReceivePack(repo *pb.Repository, glId string, glR
}() }()
go func() { go func() {
sw := pbhelper.NewSendWriter(func(data []byte) error { sw := streamio.NewWriter(func(data []byte) error {
return stream.Send(&pb.PostReceivePackRequest{Data: data}) return stream.Send(&pb.PostReceivePackRequest{Data: data})
}) })
_, err := io.Copy(sw, clientRequest) _, err := io.Copy(sw, clientRequest)
...@@ -105,7 +110,7 @@ func (client *SmartHTTPClient) UploadPack(repo *pb.Repository, clientRequest io. ...@@ -105,7 +110,7 @@ func (client *SmartHTTPClient) UploadPack(repo *pb.Repository, clientRequest io.
errC := make(chan error, numStreams) errC := make(chan error, numStreams)
go func() { go func() {
rr := pbhelper.NewReceiveReader(func() ([]byte, error) { rr := streamio.NewReader(func() ([]byte, error) {
response, err := stream.Recv() response, err := stream.Recv()
return response.GetData(), err return response.GetData(), err
}) })
...@@ -114,7 +119,7 @@ func (client *SmartHTTPClient) UploadPack(repo *pb.Repository, clientRequest io. ...@@ -114,7 +119,7 @@ func (client *SmartHTTPClient) UploadPack(repo *pb.Repository, clientRequest io.
}() }()
go func() { go func() {
sw := pbhelper.NewSendWriter(func(data []byte) error { sw := streamio.NewWriter(func(data []byte) error {
return stream.Send(&pb.PostUploadPackRequest{Data: data}) return stream.Send(&pb.PostUploadPackRequest{Data: data})
}) })
_, err := io.Copy(sw, clientRequest) _, err := io.Copy(sw, clientRequest)
......
package helper
import (
"io"
pb "gitlab.com/gitlab-org/gitaly-proto/go"
)
type InfoRefsClient interface {
Recv() (*pb.InfoRefsResponse, error)
}
type InfoRefsClientWriterTo struct {
InfoRefsClient
}
func (clientReader *InfoRefsClientWriterTo) WriteTo(w io.Writer) (total int64, err error) {
for {
response, err := clientReader.Recv()
if err == io.EOF {
return total, nil
} else if err != nil {
return total, err
}
n, err := w.Write(response.GetData())
total += int64(n)
if err != nil {
return total, err
}
}
}
...@@ -77,7 +77,7 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, ...@@ -77,7 +77,7 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
LICENSE - gitlab.com/gitlab-org/gitaly/internal/service/middleware/panichandler LICENSE - gitlab.com/gitlab-org/gitaly/internal/middleware/panichandler
Copyright (c) 2016 Masahiro Sano Copyright (c) 2016 Masahiro Sano
MIT License MIT License
......
package helper package streamio
import ( import (
"io" "io"
) )
// NewReceiveReader turns receiver into an io.Reader. Errors from the // NewReader turns receiver into an io.Reader. Errors from the receiver
// receiver function are passed on unmodified. This means receiver should // function are passed on unmodified. This means receiver should emit
// emit io.EOF when done. // io.EOF when done.
func NewReceiveReader(receiver func() ([]byte, error)) io.Reader { func NewReader(receiver func() ([]byte, error)) io.Reader {
return &receiveReader{receiver: receiver} return &receiveReader{receiver: receiver}
} }
...@@ -29,9 +29,9 @@ func (rr *receiveReader) Read(p []byte) (int, error) { ...@@ -29,9 +29,9 @@ func (rr *receiveReader) Read(p []byte) (int, error) {
return n, nil return n, nil
} }
// NewSendWriter turns sender into an io.Writer. The number of 'bytes // NewWriter turns sender into an io.Writer. The number of 'bytes
// written' reported back is always len(p). // written' reported back is always len(p).
func NewSendWriter(sender func(p []byte) error) io.Writer { func NewWriter(sender func(p []byte) error) io.Writer {
return &sendWriter{sender: sender} return &sendWriter{sender: sender}
} }
......
...@@ -135,21 +135,21 @@ ...@@ -135,21 +135,21 @@
"version": "v0.9.0", "version": "v0.9.0",
"versionExact": "v0.9.0" "versionExact": "v0.9.0"
}, },
{
"checksumSHA1": "GkeSZfXVbtAkBZOrswot19GJZqQ=",
"path": "gitlab.com/gitlab-org/gitaly-proto/go/helper",
"revision": "e302a46f7ccd889bfaa683d2ba06ffe06ca5875b",
"revisionTime": "2017-06-12T18:49:50Z",
"version": "v0.9.0",
"versionExact": "v0.9.0"
},
{ {
"checksumSHA1": "dUHJbKas746n5fLzlwxHb6FOCxs=", "checksumSHA1": "dUHJbKas746n5fLzlwxHb6FOCxs=",
"path": "gitlab.com/gitlab-org/gitaly/auth", "path": "gitlab.com/gitlab-org/gitaly/auth",
"revision": "b933e5ce4843ec6c332a0184afb8e69820cc9050", "revision": "e4f8d3d14cc3fe673cb511fb4d0189b68a158ccd",
"revisionTime": "2017-06-22T09:36:09Z", "revisionTime": "2017-06-30T12:58:40Z",
"version": "v0.13.0", "version": "v0.14.0",
"versionExact": "v0.13.0" "versionExact": "v0.14.0"
},
{
"checksumSHA1": "sdUF3j5MaQ9Tjc2dGHqc/toQxyk=",
"path": "gitlab.com/gitlab-org/gitaly/streamio",
"revision": "e4f8d3d14cc3fe673cb511fb4d0189b68a158ccd",
"revisionTime": "2017-06-30T12:58:40Z",
"version": "v0.14.0",
"versionExact": "v0.14.0"
}, },
{ {
"checksumSHA1": "9jjO5GjLa0XF/nfWihF02RoH4qc=", "checksumSHA1": "9jjO5GjLa0XF/nfWihF02RoH4qc=",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment