Commit bac54de9 authored by Abiola Ibrahim's avatar Abiola Ibrahim Committed by Matt Holt

Fastcgi persistent fix (#1129)

* Support for configurable connection pool.

* ensure positive integer pool size config
parent 3f83eccf
package fastcgi
import "sync"
type dialer interface {
Dial() (*FCGIClient, error)
Close(*FCGIClient) error
}
// basicDialer is a basic dialer that wraps default fcgi functions.
type basicDialer struct {
network, address string
}
func (b basicDialer) Dial() (*FCGIClient, error) { return Dial(b.network, b.address) }
func (b basicDialer) Close(c *FCGIClient) error { return c.Close() }
// persistentDialer keeps a pool of fcgi connections.
// connections are not closed after use, rather added back to the pool for reuse.
type persistentDialer struct {
size int
network string
address string
pool []*FCGIClient
sync.Mutex
}
func (p *persistentDialer) Dial() (*FCGIClient, error) {
p.Lock()
// connection is available, return first one.
if len(p.pool) > 0 {
client := p.pool[0]
p.pool = p.pool[1:]
p.Unlock()
return client, nil
}
p.Unlock()
// no connection available, create new one
return Dial(p.network, p.address)
}
func (p *persistentDialer) Close(client *FCGIClient) error {
p.Lock()
if len(p.pool) < p.size {
// pool is not full yet, add connection for reuse
p.pool = append(p.pool, client)
p.Unlock()
return nil
}
p.Unlock()
// otherwise, close the connection.
return client.Close()
}
...@@ -12,29 +12,10 @@ import ( ...@@ -12,29 +12,10 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync"
"github.com/mholt/caddy/caddyhttp/httpserver" "github.com/mholt/caddy/caddyhttp/httpserver"
) )
// persistent fastcgi connections
type serialClient struct {
// for read/write serialisation
sync.Mutex
backend *FCGIClient
}
type concurrentPersistentConnectionsMap struct {
// for thread safe acces to the map
sync.Mutex
clientMap map[string]*serialClient
}
var persistentConnections = &(concurrentPersistentConnectionsMap{clientMap: make(map[string]*serialClient)})
// UsePersistentFcgiConnections TODO: add an option in Caddyfile and pass it down to here
var UsePersistentFcgiConnections = true
// Handler is a middleware type that can handle requests as a FastCGI client. // Handler is a middleware type that can handle requests as a FastCGI client.
type Handler struct { type Handler struct {
Next httpserver.Handler Next httpserver.Handler
...@@ -91,27 +72,10 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) ...@@ -91,27 +72,10 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error)
} }
// Connect to FastCGI gateway // Connect to FastCGI gateway
network, address := rule.parseAddress() fcgiBackend, err := rule.dialer.Dial()
var fcgiBackend *FCGIClient
var client *serialClient
reuse := false
if UsePersistentFcgiConnections {
persistentConnections.Lock()
client, reuse = persistentConnections.clientMap[network+address]
persistentConnections.Unlock()
}
if reuse {
client.Lock()
defer client.Unlock()
fcgiBackend = client.backend
} else {
fcgiBackend, err = Dial(network, address)
if err != nil { if err != nil {
return http.StatusBadGateway, err return http.StatusBadGateway, err
} }
}
var resp *http.Response var resp *http.Response
contentLength, _ := strconv.Atoi(r.Header.Get("Content-Length")) contentLength, _ := strconv.Atoi(r.Header.Get("Content-Length"))
...@@ -126,18 +90,12 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) ...@@ -126,18 +90,12 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error)
resp, err = fcgiBackend.Post(env, r.Method, r.Header.Get("Content-Type"), r.Body, contentLength) resp, err = fcgiBackend.Post(env, r.Method, r.Header.Get("Content-Type"), r.Body, contentLength)
} }
defer fcgiBackend.Close()
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return http.StatusBadGateway, err return http.StatusBadGateway, err
} }
if UsePersistentFcgiConnections {
persistentConnections.Lock()
persistentConnections.clientMap[network+address] = &(serialClient{backend: fcgiBackend})
persistentConnections.Unlock()
} else {
defer fcgiBackend.Close()
}
// Write response header // Write response header
writeHeader(w, resp) writeHeader(w, resp)
...@@ -151,10 +109,6 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) ...@@ -151,10 +109,6 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error)
if fcgiBackend.stderr.Len() != 0 { if fcgiBackend.stderr.Len() != 0 {
// Remove trailing newline, error logger already does this. // Remove trailing newline, error logger already does this.
err = LogError(strings.TrimSuffix(fcgiBackend.stderr.String(), "\n")) err = LogError(strings.TrimSuffix(fcgiBackend.stderr.String(), "\n"))
persistentConnections.Lock()
delete(persistentConnections.clientMap, network+address)
persistentConnections.Unlock()
fcgiBackend.Close()
} }
// Normally we would return the status code if it is an error status (>= 400), // Normally we would return the status code if it is an error status (>= 400),
...@@ -170,28 +124,28 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error) ...@@ -170,28 +124,28 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, error)
return h.Next.ServeHTTP(w, r) return h.Next.ServeHTTP(w, r)
} }
// parseAddress returns the network and address of r. // parseAddress returns the network and address of fcgiAddress.
// The first string is the network, "tcp" or "unix", implied from the scheme and address. // The first string is the network, "tcp" or "unix", implied from the scheme and address.
// The second string is r.Address, with scheme prefixes removed. // The second string is fcgiAddress, with scheme prefixes removed.
// The two returned strings can be used as parameters to the Dial() function. // The two returned strings can be used as parameters to the Dial() function.
func (r Rule) parseAddress() (string, string) { func parseAddress(fcgiAddress string) (string, string) {
// check if address has tcp scheme explicitly set // check if address has tcp scheme explicitly set
if strings.HasPrefix(r.Address, "tcp://") { if strings.HasPrefix(fcgiAddress, "tcp://") {
return "tcp", r.Address[len("tcp://"):] return "tcp", fcgiAddress[len("tcp://"):]
} }
// check if address has fastcgi scheme explicitly set // check if address has fastcgi scheme explicitly set
if strings.HasPrefix(r.Address, "fastcgi://") { if strings.HasPrefix(fcgiAddress, "fastcgi://") {
return "tcp", r.Address[len("fastcgi://"):] return "tcp", fcgiAddress[len("fastcgi://"):]
} }
// check if unix socket // check if unix socket
if trim := strings.HasPrefix(r.Address, "unix"); strings.HasPrefix(r.Address, "/") || trim { if trim := strings.HasPrefix(fcgiAddress, "unix"); strings.HasPrefix(fcgiAddress, "/") || trim {
if trim { if trim {
return "unix", r.Address[len("unix:"):] return "unix", fcgiAddress[len("unix:"):]
} }
return "unix", r.Address return "unix", fcgiAddress
} }
// default case, a plain tcp address with no scheme // default case, a plain tcp address with no scheme
return "tcp", r.Address return "tcp", fcgiAddress
} }
func writeHeader(w http.ResponseWriter, r *http.Response) { func writeHeader(w http.ResponseWriter, r *http.Response) {
...@@ -340,6 +294,9 @@ type Rule struct { ...@@ -340,6 +294,9 @@ type Rule struct {
// Ignored paths // Ignored paths
IgnoredSubPaths []string IgnoredSubPaths []string
// FCGI dialer
dialer dialer
} }
// canSplit checks if path can split into two based on rule.SplitPath. // canSplit checks if path can split into two based on rule.SplitPath.
......
...@@ -24,9 +24,10 @@ func TestServeHTTP(t *testing.T) { ...@@ -24,9 +24,10 @@ func TestServeHTTP(t *testing.T) {
w.Write([]byte(body)) w.Write([]byte(body))
})) }))
network, address := parseAddress(listener.Addr().String())
handler := Handler{ handler := Handler{
Next: nil, Next: nil,
Rules: []Rule{{Path: "/", Address: listener.Addr().String()}}, Rules: []Rule{{Path: "/", Address: listener.Addr().String(), dialer: basicDialer{network, address}}},
} }
r, err := http.NewRequest("GET", "/", nil) r, err := http.NewRequest("GET", "/", nil)
if err != nil { if err != nil {
...@@ -64,10 +65,10 @@ func TestRuleParseAddress(t *testing.T) { ...@@ -64,10 +65,10 @@ func TestRuleParseAddress(t *testing.T) {
} }
for _, entry := range getClientTestTable { for _, entry := range getClientTestTable {
if actualnetwork, _ := entry.rule.parseAddress(); actualnetwork != entry.expectednetwork { if actualnetwork, _ := parseAddress(entry.rule.Address); actualnetwork != entry.expectednetwork {
t.Errorf("Unexpected network for address string %v. Got %v, expected %v", entry.rule.Address, actualnetwork, entry.expectednetwork) t.Errorf("Unexpected network for address string %v. Got %v, expected %v", entry.rule.Address, actualnetwork, entry.expectednetwork)
} }
if _, actualaddress := entry.rule.parseAddress(); actualaddress != entry.expectedaddress { if _, actualaddress := parseAddress(entry.rule.Address); actualaddress != entry.expectedaddress {
t.Errorf("Unexpected parsed address for address string %v. Got %v, expected %v", entry.rule.Address, actualaddress, entry.expectedaddress) t.Errorf("Unexpected parsed address for address string %v. Got %v, expected %v", entry.rule.Address, actualaddress, entry.expectedaddress)
} }
} }
......
...@@ -193,9 +193,9 @@ func Dial(network, address string) (fcgi *FCGIClient, err error) { ...@@ -193,9 +193,9 @@ func Dial(network, address string) (fcgi *FCGIClient, err error) {
return DialWithDialer(network, address, net.Dialer{}) return DialWithDialer(network, address, net.Dialer{})
} }
// Close closes fcgi connnection // Close closes fcgi connnection.
func (c *FCGIClient) Close() { func (c *FCGIClient) Close() error {
c.rwc.Close() return c.rwc.Close()
} }
func (c *FCGIClient) writeRecord(recType uint8, content []byte) (err error) { func (c *FCGIClient) writeRecord(recType uint8, content []byte) (err error) {
...@@ -408,11 +408,11 @@ func (c *FCGIClient) Do(p map[string]string, req io.Reader) (r io.Reader, err er ...@@ -408,11 +408,11 @@ func (c *FCGIClient) Do(p map[string]string, req io.Reader) (r io.Reader, err er
// clientCloser is a io.ReadCloser. It wraps a io.Reader with a Closer // clientCloser is a io.ReadCloser. It wraps a io.Reader with a Closer
// that closes FCGIClient connection. // that closes FCGIClient connection.
type clientCloser struct { type clientCloser struct {
*FCGIClient f *FCGIClient
io.Reader io.Reader
} }
func (f clientCloser) Close() error { return f.rwc.Close() } func (c clientCloser) Close() error { return c.f.Close() }
// Request returns a HTTP Response with Header and Body // Request returns a HTTP Response with Header and Body
// from fcgi responder // from fcgi responder
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"errors" "errors"
"net/http" "net/http"
"path/filepath" "path/filepath"
"strconv"
"github.com/mholt/caddy" "github.com/mholt/caddy"
"github.com/mholt/caddy/caddyhttp/httpserver" "github.com/mholt/caddy/caddyhttp/httpserver"
...@@ -72,6 +73,9 @@ func fastcgiParse(c *caddy.Controller) ([]Rule, error) { ...@@ -72,6 +73,9 @@ func fastcgiParse(c *caddy.Controller) ([]Rule, error) {
} }
} }
network, address := parseAddress(rule.Address)
rule.dialer = basicDialer{network: network, address: address}
for c.NextBlock() { for c.NextBlock() {
switch c.Val() { switch c.Val() {
case "ext": case "ext":
...@@ -102,6 +106,19 @@ func fastcgiParse(c *caddy.Controller) ([]Rule, error) { ...@@ -102,6 +106,19 @@ func fastcgiParse(c *caddy.Controller) ([]Rule, error) {
return rules, c.ArgErr() return rules, c.ArgErr()
} }
rule.IgnoredSubPaths = ignoredPaths rule.IgnoredSubPaths = ignoredPaths
case "pool":
if !c.NextArg() {
return rules, c.ArgErr()
}
pool, err := strconv.Atoi(c.Val())
if err != nil {
return rules, err
}
if pool >= 0 {
rule.dialer = &persistentDialer{size: pool, network: network, address: address}
} else {
return rules, c.Errf("positive integer expected, found %d", pool)
}
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment