diff --git a/config.go b/config.go index 05a81575..a529392e 100644 --- a/config.go +++ b/config.go @@ -36,6 +36,12 @@ type BackoffStrategy interface { Calculate(attempt int) time.Duration } +// A Dialer is a means to establish a connection. +type Dialer interface { + // Dial connects to the given address via the proxy. + Dial(network, addr string) (c net.Conn, err error) +} + // ExponentialStrategy implements an exponential backoff strategy (default) type ExponentialStrategy struct { cfg *Config @@ -101,6 +107,10 @@ type Config struct { // LocalAddr is the local address to use when dialing an nsqd. // If empty, a local address is automatically chosen. LocalAddr net.Addr `opt:"local_addr"` + // Dialer affect connection when dialing an nsqd. Overwrite this to connect over proxy. + // + // Conflict with options LocalAddr and DialTimeout. + Dialer Dialer `opt:"dialer"` // Duration between polling lookupd for new producers, and fractional jitter to add to // the lookupd pool loop. this helps evenly distribute requests even if multiple consumers diff --git a/conn.go b/conn.go index f4180a4d..29c316a6 100644 --- a/conn.go +++ b/conn.go @@ -140,9 +140,12 @@ func (c *Conn) getLogger() (logger, LogLevel, string) { // Connect dials and bootstraps the nsqd connection // (including IDENTIFY) and returns the IdentifyResponse func (c *Conn) Connect() (*IdentifyResponse, error) { - dialer := &net.Dialer{ - LocalAddr: c.config.LocalAddr, - Timeout: c.config.DialTimeout, + dialer := c.config.Dialer + if dialer == nil { + dialer = &net.Dialer{ + LocalAddr: c.config.LocalAddr, + Timeout: c.config.DialTimeout, + } } conn, err := dialer.Dial("tcp", c.addr)