Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: reorganise code to improve code local…
Browse files Browse the repository at this point in the history
…ity (#36439)

* chain.go => config_chain.go
* simplify encoding registry
* simplify transform registry
    This removes the registration code and makes the use of the registry
    effectively local rather than a mutated global. It also simplifies the
    structure of the registry type.
* improve code locality of chain helpers
* improve code locality of request
    Use conventional ordering: type, constructor, methods and caller before
    callee in source order.
* improve code locality of response
  • Loading branch information
efd6 authored Aug 30, 2023
1 parent 73f8c42 commit eef9598
Show file tree
Hide file tree
Showing 19 changed files with 783 additions and 940 deletions.
File renamed without changes.
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (c *requestConfig) Validate() error {
return fmt.Errorf("unsupported method %q", c.Method)
}

if _, err := newBasicTransformsFromConfig(c.Transforms, requestNamespace, nil); err != nil {
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Transforms, requestNamespace, nil); err != nil {
return err
}

Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/httpjson/config_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ type splitConfig struct {
}

func (c *responseConfig) Validate() error {
if _, err := newBasicTransformsFromConfig(c.Transforms, responseNamespace, nil); err != nil {
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Transforms, responseNamespace, nil); err != nil {
return err
}
if _, err := newBasicTransformsFromConfig(c.Pagination, paginationNamespace, nil); err != nil {
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Pagination, paginationNamespace, nil); err != nil {
return err
}
if c.DecodeAs != "" {
Expand All @@ -52,7 +52,7 @@ func (c *responseConfig) Validate() error {
}

func (c *splitConfig) Validate() error {
if _, err := newBasicTransformsFromConfig(c.Transforms, responseNamespace, nil); err != nil {
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Transforms, responseNamespace, nil); err != nil {
return err
}

Expand Down
109 changes: 33 additions & 76 deletions x-pack/filebeat/input/httpjson/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,8 @@ import (
"net/http"

"github.com/elastic/mito/lib/xml"

"github.com/elastic/elastic-agent-libs/logp"
)

type encoderFunc func(trReq transformable) ([]byte, error)

type decoderFunc func(p []byte, dst *response) error

var (
registeredEncoders = map[string]encoderFunc{}
registeredDecoders = map[string]decoderFunc{}
defaultEncoder encoderFunc = encodeAsJSON
defaultDecoder decoderFunc = decodeAsJSON
)

func registerEncoder(contentType string, enc encoderFunc) error {
if contentType == "" {
return errors.New("content-type can't be empty")
}

if enc == nil {
return errors.New("encoder can't be nil")
}

if _, found := registeredEncoders[contentType]; found {
return errors.New("already registered")
}

registeredEncoders[contentType] = enc

return nil
}

func registerDecoder(contentType string, dec decoderFunc) error {
if contentType == "" {
return errors.New("content-type can't be empty")
}

if dec == nil {
return errors.New("decoder can't be nil")
}

if _, found := registeredDecoders[contentType]; found {
return errors.New("already registered")
}

registeredDecoders[contentType] = dec

return nil
}

func encode(contentType string, trReq transformable) ([]byte, error) {
enc, found := registeredEncoders[contentType]
if !found {
Expand All @@ -81,35 +32,34 @@ func decode(contentType string, p []byte, dst *response) error {
return dec(p, dst)
}

func registerEncoders() {
log := logp.L().Named(logName)
log.Debugf("registering encoder 'application/json': returned error: %#v",
registerEncoder("application/json", encodeAsJSON))

log.Debugf("registering encoder 'application/x-www-form-urlencoded': returned error: %#v",
registerEncoder("application/x-www-form-urlencoded", encodeAsForm))
}

func registerDecoders() {
log := logp.L().Named(logName)
log.Debugf("registering decoder 'application/json': returned error: %#v",
registerDecoder("application/json", decodeAsJSON))

log.Debugf("registering decoder 'application/x-ndjson': returned error: %#v",
registerDecoder("application/x-ndjson", decodeAsNdjson))

log.Debugf("registering decoder 'text/csv': returned error: %#v",
registerDecoder("text/csv", decodeAsCSV))

log.Debugf("registering decoder 'application/zip': returned error: %#v",
registerDecoder("application/zip", decodeAsZip))
var (
// registeredEncoders is the set of available encoders.
registeredEncoders = map[string]encoderFunc{
"application/json": encodeAsJSON,
"application/x-www-form-urlencoded": encodeAsForm,
}
// defaultEncoder is the decoder used when no registers
// encoder is available.
defaultEncoder = encodeAsJSON

// registeredDecoders is the set of available decoders.
registeredDecoders = map[string]decoderFunc{
"application/json": decodeAsJSON,
"application/x-ndjson": decodeAsNdjson,
"text/csv": decodeAsCSV,
"application/zip": decodeAsZip,
"application/xml": decodeAsXML,
"text/xml; charset=utf-8": decodeAsXML,
}
// defaultDecoder is the decoder used when no registers
// decoder is available.
defaultDecoder = decodeAsJSON
)

log.Debugf("registering decoder 'application/xml': returned error: %#v",
registerDecoder("application/xml", decodeAsXML))
log.Debugf("registering decoder 'text/xml': returned error: %#v",
registerDecoder("text/xml; charset=utf-8", decodeAsXML))
}
type encoderFunc func(trReq transformable) ([]byte, error)
type decoderFunc func(p []byte, dst *response) error

// encodeAsJSON encodes trReq as a JSON message.
func encodeAsJSON(trReq transformable) ([]byte, error) {
if len(trReq.body()) == 0 {
return nil, nil
Expand All @@ -120,10 +70,12 @@ func encodeAsJSON(trReq transformable) ([]byte, error) {
return json.Marshal(trReq.body())
}

// decodeAsJSON decodes the JSON message in p into dst.
func decodeAsJSON(p []byte, dst *response) error {
return json.Unmarshal(p, &dst.body)
}

// encodeAsForm encodes trReq as a URL encoded form.
func encodeAsForm(trReq transformable) ([]byte, error) {
url := trReq.url()
body := []byte(url.RawQuery)
Expand All @@ -135,6 +87,8 @@ func encodeAsForm(trReq transformable) ([]byte, error) {
return body, nil
}

// decodeAsNdjson decodes the message in p as a JSON object stream
// It is more relaxed than NDJSON.
func decodeAsNdjson(p []byte, dst *response) error {
var results []interface{}
dec := json.NewDecoder(bytes.NewReader(p))
Expand All @@ -149,6 +103,7 @@ func decodeAsNdjson(p []byte, dst *response) error {
return nil
}

// decodeAsCSV decodes p as a headed CSV document into dst.
func decodeAsCSV(p []byte, dst *response) error {
var results []interface{}

Expand Down Expand Up @@ -189,6 +144,7 @@ func decodeAsCSV(p []byte, dst *response) error {
return nil
}

// decodeAsZip decodes p as a ZIP archive into dst.
func decodeAsZip(p []byte, dst *response) error {
var results []interface{}
r, err := zip.NewReader(bytes.NewReader(p), int64(len(p)))
Expand Down Expand Up @@ -225,6 +181,7 @@ func decodeAsZip(p []byte, dst *response) error {
return nil
}

// decodeAsXML decodes p as an XML document into dst.
func decodeAsXML(p []byte, dst *response) error {
cdata, body, err := xml.Unmarshal(bytes.NewReader(p), dst.xmlDetails)
if err != nil {
Expand Down
37 changes: 37 additions & 0 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,43 @@ func newNetHTTPClient(ctx context.Context, cfg *requestConfig, log *logp.Logger,
return netHTTPClient, nil
}

func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *requestConfig, log *logp.Logger, reg *monitoring.Registry, p ...*Policy) (*httpClient, error) {
// Make retryable HTTP client
netHTTPClient, err := newNetHTTPClient(ctx, requestCfg, log, reg)
if err != nil {
return nil, err
}

var retryPolicyFunc retryablehttp.CheckRetry
if len(p) != 0 {
retryPolicyFunc = p[0].CustomRetryPolicy
} else {
retryPolicyFunc = retryablehttp.DefaultRetryPolicy
}

client := &retryablehttp.Client{
HTTPClient: netHTTPClient,
Logger: newRetryLogger(log),
RetryWaitMin: requestCfg.Retry.getWaitMin(),
RetryWaitMax: requestCfg.Retry.getWaitMax(),
RetryMax: requestCfg.Retry.getMaxAttempts(),
CheckRetry: retryPolicyFunc,
Backoff: retryablehttp.DefaultBackoff,
}

limiter := newRateLimiterFromConfig(requestCfg.RateLimit, log)

if authCfg != nil && authCfg.OAuth2.isEnabled() {
authClient, err := authCfg.OAuth2.client(ctx, client.StandardClient())
if err != nil {
return nil, err
}
return &httpClient{client: authClient, limiter: limiter}, nil
}

return &httpClient{client: client.StandardClient(), limiter: limiter}, nil
}

// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption {
Expand Down
5 changes: 0 additions & 5 deletions x-pack/filebeat/input/httpjson/input_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManage

// Init initializes both wrapped input managers.
func (m InputManager) Init(grp unison.Group, mode v2.Mode) error {
registerRequestTransforms()
registerResponseTransforms()
registerPaginationTransforms()
registerEncoders()
registerDecoders()
return multierr.Append(
m.stateless.Init(grp, mode),
m.cursor.Init(grp, mode),
Expand Down
Loading

0 comments on commit eef9598

Please sign in to comment.