From 8b583f5de4557500d675c01e707f7ef34e096942 Mon Sep 17 00:00:00 2001 From: Kleonikos Kyriakis Date: Mon, 29 Apr 2024 14:04:44 +0300 Subject: [PATCH] Add otel tracing (#9) * Add otel tracing * Bump mautrix to v0.17.0 --------- Co-authored-by: Kleonikos Kyriakis Co-authored-by: evlekht --- README.MD | 9 ++++- camino-matrix-go | 2 +- camino-messenger-bot.yaml.example | 8 +++- config/config.go | 10 +++++ config/flag_keys.go | 6 +++ config/flags.go | 9 +++++ docker-compose.yml | 10 ++++- go.mod | 31 ++++++++------- go.sum | 34 ++++++++-------- internal/app/app.go | 33 +++++++++++++++- internal/matrix/matrix_messenger.go | 46 +++++++++++++++------- internal/matrix/room_handler.go | 33 ++++++++-------- internal/messaging/processor.go | 21 +++++++++- internal/rpc/server/server.go | 32 +++++++-------- internal/tracing/exporter.go | 43 +++++++++++++++++++++ internal/tracing/nooptracer.go | 27 +++++++++++++ internal/tracing/tracer.go | 60 +++++++++++++++++++++++++++++ utils/constants/app.go | 8 ++++ 18 files changed, 336 insertions(+), 86 deletions(-) create mode 100644 internal/tracing/exporter.go create mode 100644 internal/tracing/nooptracer.go create mode 100644 internal/tracing/tracer.go create mode 100644 utils/constants/app.go diff --git a/README.MD b/README.MD index d23849f0..903ad36e 100644 --- a/README.MD +++ b/README.MD @@ -88,4 +88,11 @@ Create a configuration YAML file based on the provided example "camino-messenger ./bot --config camino-messenger-bot.yaml ``` -Feel free to explore and customize the application based on your specific requirements. If you encounter any issues or have questions, please refer to the documentation or open an issue in the repository. \ No newline at end of file +Feel free to explore and customize the application based on your specific requirements. If you encounter any issues or have questions, please refer to the documentation or open an issue in the repository. + +## Tracing +The application supports tracing by providing an otel (OpenTelemetry) sdk implementation adding traces to all requests and responses. +Traces cross the boundaries of the application by using traceIDs and context propagation. +The tracing configuration can be set in the configuration file. +By default, tracing is disabled. For the moment the application uses a Jaeger exporter to send the traces to a Jaeger instance. +A future improvement could include an otel collector as the middle stop for traces and feed to them different tracing systems. \ No newline at end of file diff --git a/camino-matrix-go b/camino-matrix-go index 7d4a1899..887ca8e7 160000 --- a/camino-matrix-go +++ b/camino-matrix-go @@ -1 +1 @@ -Subproject commit 7d4a1899b35f1b52768d91a063d2a49d083c4055 +Subproject commit 887ca8e76a5996e23b52c5bba9a26963da734a9d diff --git a/camino-messenger-bot.yaml.example b/camino-messenger-bot.yaml.example index f0cd6736..8af70d04 100644 --- a/camino-messenger-bot.yaml.example +++ b/camino-messenger-bot.yaml.example @@ -28,4 +28,10 @@ tvm_node_uri: https://kopernikus.demo.camino.network # camino node uri tvm_private_key: PrivateKey-YOUR_PRIVATE_KEY # used for signing transactions on the t-chain / can be the same as the matrix_key tvm_network_id: 1002 # 1002 for devnet, 1001 for testnet, 1000 for mainnet tvm_chain_id: BLOCKCHAIN_ID # blockchain id of the t-chain -tvm_await_tx_confirmation_timeout: 6000 # timeout in milliseconds for awaiting transaction confirmation \ No newline at end of file +tvm_await_tx_confirmation_timeout: 6000 # timeout in milliseconds for awaiting transaction confirmation +tracing_enabled: true +tracing_host: localhost +tracing_port: 4317 +tracing_insecure: true +tracing_cert_file: server-cert.pem +tracing_key_file: server-key.pem \ No newline at end of file diff --git a/config/config.go b/config/config.go index 7db3b778..9824b8ed 100644 --- a/config/config.go +++ b/config/config.go @@ -49,6 +49,14 @@ type TvmConfig struct { AwaitTxConfirmationTimeout uint `mapstructure:"tvm_await_tx_confirmation_timeout"` // in milliseconds" } +type TracingConfig struct { + Enabled bool `mapstructure:"tracing_enabled"` + Host string `mapstructure:"tracing_host"` + Port int `mapstructure:"tracing_port"` + Insecure bool `mapstructure:"tracing_insecure"` + CertFile string `mapstructure:"tracing_cert_file"` + KeyFile string `mapstructure:"tracing_key_file"` +} type Config struct { AppConfig `mapstructure:",squash"` //TODO use nested yaml structure MatrixConfig `mapstructure:",squash"` @@ -56,6 +64,7 @@ type Config struct { PartnerPluginConfig `mapstructure:",squash"` ProcessorConfig `mapstructure:",squash"` TvmConfig `mapstructure:",squash"` + TracingConfig `mapstructure:",squash"` } func ReadConfig() (*Config, error) { @@ -80,6 +89,7 @@ func ReadConfig() (*Config, error) { readPartnerRpcServerConfig(cfg.PartnerPluginConfig, fs) readMessengerConfig(cfg.ProcessorConfig, fs) readTvmConfig(cfg.TvmConfig, fs) + readTracingConfig(cfg.TracingConfig, fs) // Parse command-line flags pfs := pflag.NewFlagSet(fs.Name(), pflag.ContinueOnError) diff --git a/config/flag_keys.go b/config/flag_keys.go index a42ce193..b0ee65ac 100644 --- a/config/flag_keys.go +++ b/config/flag_keys.go @@ -20,4 +20,10 @@ const ( TvmNetworkIDKey = "tvm_network_id" TvmChainIDKey = "tvm_chain_id" TvmAwaitTxConfirmationTimeout = "tvm_await_tx_confirmation_timeout" + TracingEnabledKey = "tracing_enabled" + TracingHostKey = "tracing_host" + TracingPortKey = "tracing_port" + TracingInsecureKey = "tracing_insecure" + TracingCertFileKey = "tracing_cert_file" + TracingKeyFileKey = "tracing_key_file" ) diff --git a/config/flags.go b/config/flags.go index e3360f7e..0fffa34a 100644 --- a/config/flags.go +++ b/config/flags.go @@ -42,3 +42,12 @@ func readTvmConfig(cfg TvmConfig, fs *flag.FlagSet) { fs.StringVar(&cfg.ChainID, TvmChainIDKey, "", "The TVM chain ID") fs.UintVar(&cfg.AwaitTxConfirmationTimeout, TvmAwaitTxConfirmationTimeout, 3000, "The TVM await transaction confirmation timeout (in milliseconds)") } + +func readTracingConfig(cfg TracingConfig, fs *flag.FlagSet) { + fs.BoolVar(&cfg.Enabled, TracingEnabledKey, false, "Whether tracing is enabled") + fs.StringVar(&cfg.Host, TracingHostKey, "localhost", "The tracing host") + fs.IntVar(&cfg.Port, TracingPortKey, 4317, "The tracing port") + fs.BoolVar(&cfg.Insecure, TracingInsecureKey, true, "Whether the tracing connection should be insecure") + fs.StringVar(&cfg.CertFile, TracingCertFileKey, "", "The tracing certificate file") + fs.StringVar(&cfg.KeyFile, TracingKeyFileKey, "", "The tracing key file") +} diff --git a/docker-compose.yml b/docker-compose.yml index 30d7339d..8104fefb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -46,4 +46,12 @@ services: args: port: 50052 ports: - - "50052:50052" \ No newline at end of file + - "50052:50052" + jaeger: + image: jaegertracing/all-in-one:${JAEGER_VERSION:-latest} + ports: + - "16686:16686" + - "4317:4317" + - "4318:4318" + environment: + - LOG_LEVEL=debug \ No newline at end of file diff --git a/go.mod b/go.mod index 8a1e4e87..12fd49ab 100644 --- a/go.mod +++ b/go.mod @@ -8,15 +8,16 @@ require ( github.com/ava-labs/avalanchego v1.10.18 github.com/ava-labs/hypersdk v0.0.16 github.com/chain4travel/caminotravelvm v0.0.0-20240419161941-a32dadd85f51 - github.com/chzyer/readline v1.5.1 - github.com/google/uuid v1.4.0 github.com/klauspost/compress v1.17.3 - github.com/mattn/go-sqlite3 v1.14.18 - github.com/pierrec/lz4 v2.6.1+incompatible - github.com/rs/zerolog v1.31.0 + github.com/mattn/go-sqlite3 v1.14.19 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.16.0 github.com/stretchr/testify v1.8.4 + go.opentelemetry.io/otel v1.11.2 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2 + go.opentelemetry.io/otel/sdk v1.11.2 + go.opentelemetry.io/otel/trace v1.11.2 go.uber.org/zap v1.26.0 golang.org/x/sync v0.5.0 google.golang.org/grpc v1.59.0 @@ -30,6 +31,7 @@ require ( github.com/btcsuite/btcd/btcutil v1.1.3 // indirect github.com/cenkalti/backoff/v4 v4.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/chzyer/readline v1.5.1 // indirect github.com/cockroachdb/errors v1.9.1 // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect github.com/cockroachdb/pebble v0.0.0-20230224221607-fccb83b60d5c // indirect @@ -67,6 +69,7 @@ require ( github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect + github.com/rs/zerolog v1.31.0 // indirect github.com/spf13/cobra v1.7.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/supranational/blst v0.3.11 // indirect @@ -74,18 +77,15 @@ require ( github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect github.com/tidwall/sjson v1.2.5 // indirect - go.opentelemetry.io/otel v1.11.2 // indirect + go.mau.fi/util v0.3.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.2 // indirect go.opentelemetry.io/otel/exporters/zipkin v1.11.2 // indirect - go.opentelemetry.io/otel/sdk v1.11.2 // indirect - go.opentelemetry.io/otel/trace v1.11.2 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/mock v0.4.0 // indirect - golang.org/x/crypto v0.17.0 // indirect - golang.org/x/term v0.15.0 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/term v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect gonum.org/v1/gonum v0.11.0 // indirect google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect @@ -105,10 +105,9 @@ require ( github.com/spf13/cast v1.5.1 // indirect github.com/subosito/gotenv v1.6.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/exp v0.0.0-20231127185646-65229373498e - golang.org/x/net v0.19.0 // indirect - golang.org/x/sys v0.15.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 + golang.org/x/net v0.20.0 // indirect + golang.org/x/sys v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect google.golang.org/protobuf v1.33.0 gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index 66e9aa58..96423d8c 100644 --- a/go.sum +++ b/go.sum @@ -397,7 +397,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno= github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo= -github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= +github.com/DATA-DOG/go-sqlmock v1.5.1 h1:FK6RCIUSfmbnI/imIICmboyQBkOckutaa6R5YYlLZyo= github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= github.com/DataDog/zstd v1.5.2/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY= @@ -644,8 +644,6 @@ github.com/google/renameio/v2 v2.0.0 h1:UifI23ZTGY8Tt29JbYFiuyIU3eX+RNFtUwefq9qA github.com/google/renameio/v2 v2.0.0/go.mod h1:BtmJXm5YlszgC+TD4HOEEUFgkJP3nLxehU6hfe7jRt4= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= -github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.0.0-20220520183353-fd19c99a87aa/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8= github.com/googleapis/enterprise-certificate-proxy v0.1.0/go.mod h1:17drOmN3MwGY7t0e+Ei9b45FFGA3fBs3x36SsCg1hq8= github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg= @@ -747,8 +745,8 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/mattn/go-sqlite3 v1.14.18 h1:JL0eqdCOq6DJVNPSvArO/bIV9/P7fbGrV00LZHc+5aI= -github.com/mattn/go-sqlite3 v1.14.18/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/mattn/go-sqlite3 v1.14.19 h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbWfGI= +github.com/mattn/go-sqlite3 v1.14.19/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= @@ -791,8 +789,6 @@ github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAy github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= -github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= -github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= @@ -910,6 +906,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mau.fi/util v0.3.0 h1:Lt3lbRXP6ZBqTINK0EieRWor3zEwwwrDT14Z5N8RUCs= +go.mau.fi/util v0.3.0/go.mod h1:9dGsBCCbZJstx16YgnVMVi3O2bOizELoKpugLD4FoGs= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -960,8 +958,8 @@ golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -972,8 +970,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= -golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= +golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu+kQvCqcsoVaQgSV60o= +golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -1062,8 +1060,8 @@ golang.org/x/net v0.0.0-20221012135044-0b7e1fb9d458/go.mod h1:YDH+HFinaLZZlnHAfS golang.org/x/net v0.0.0-20221014081412-f15817d10f9b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= -golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= -golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1195,14 +1193,14 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= -golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= -golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= +golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= +golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1282,7 +1280,7 @@ golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k= -golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM= +golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/app/app.go b/internal/app/app.go index 22dc512d..6f44666e 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -2,13 +2,16 @@ package app import ( "context" - "github.com/chain4travel/camino-messenger-bot/internal/tvm" + "fmt" "github.com/chain4travel/camino-messenger-bot/config" "github.com/chain4travel/camino-messenger-bot/internal/matrix" "github.com/chain4travel/camino-messenger-bot/internal/messaging" "github.com/chain4travel/camino-messenger-bot/internal/rpc/client" "github.com/chain4travel/camino-messenger-bot/internal/rpc/server" + "github.com/chain4travel/camino-messenger-bot/internal/tracing" + "github.com/chain4travel/camino-messenger-bot/internal/tvm" + "github.com/chain4travel/camino-messenger-bot/utils/constants" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -16,6 +19,7 @@ import ( type App struct { cfg *config.Config logger *zap.SugaredLogger + tracer tracing.Tracer } func NewApp(cfg *config.Config) (*App, error) { @@ -62,6 +66,14 @@ func (a *App) Run(ctx context.Context) error { // start msg processor msgProcessor := a.startMessageProcessor(ctx, messenger, serviceRegistry, responseHandler, g, userIDUpdatedChan) + // init tracer + tracer := a.initTracer() + defer func() { + if err := tracer.Shutdown(); err != nil { + a.logger.Fatal("failed to shutdown tracer: %w", err) + } + }() + // start rpc server a.startRPCServer(msgProcessor, serviceRegistry, g, gCtx) @@ -71,6 +83,23 @@ func (a *App) Run(ctx context.Context) error { return nil } +func (a *App) initTracer() tracing.Tracer { + var ( + tracer tracing.Tracer + err error + ) + if a.cfg.TracingConfig.Enabled { + tracer, err = tracing.NewTracer(&a.cfg.TracingConfig, fmt.Sprintf("%s:%d", constants.AppName, a.cfg.RPCServerConfig.Port)) + } else { + tracer, err = tracing.NewNoOpTracer() + } + if err != nil { + a.logger.Fatal("failed to initialize tracer: %w", err) + } + a.tracer = tracer + return tracer +} + func (a *App) initTVMClient() messaging.ResponseHandler { var responseHandler messaging.ResponseHandler // TODO make client init conditional based on provided config @@ -122,7 +151,7 @@ func (a *App) startMessenger(g *errgroup.Group, gCtx context.Context) (messaging } func (a *App) startRPCServer(msgProcessor messaging.Processor, serviceRegistry *messaging.ServiceRegistry, g *errgroup.Group, gCtx context.Context) { - rpcServer := server.NewServer(&a.cfg.RPCServerConfig, a.logger, msgProcessor, serviceRegistry) + rpcServer := server.NewServer(&a.cfg.RPCServerConfig, a.logger, a.tracer, msgProcessor, serviceRegistry) g.Go(func() error { a.logger.Info("Starting gRPC server...") rpcServer.Start() diff --git a/internal/matrix/matrix_messenger.go b/internal/matrix/matrix_messenger.go index 0b702060..ef558e53 100644 --- a/internal/matrix/matrix_messenger.go +++ b/internal/matrix/matrix_messenger.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/chain4travel/camino-messenger-bot/internal/compression" "reflect" "sync" "time" @@ -12,7 +11,11 @@ import ( "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/formatting" "github.com/chain4travel/camino-messenger-bot/config" + "github.com/chain4travel/camino-messenger-bot/internal/compression" "github.com/chain4travel/camino-messenger-bot/internal/messaging" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "maunium.net/go/mautrix" "maunium.net/go/mautrix/crypto/cryptohelper" @@ -34,9 +37,12 @@ type client struct { cryptoHelper *cryptohelper.CryptoHelper } type messenger struct { - msgChannel chan messaging.Message - cfg *config.MatrixConfig - logger *zap.SugaredLogger + msgChannel chan messaging.Message + + cfg *config.MatrixConfig + logger *zap.SugaredLogger + tracer trace.Tracer + client client roomHandler RoomHandler msgAssembler MessageAssembler @@ -52,6 +58,7 @@ func NewMessenger(cfg *config.MatrixConfig, logger *zap.SugaredLogger) *messenge msgChannel: make(chan messaging.Message), cfg: cfg, logger: logger, + tracer: otel.GetTracerProvider().Tracer(""), client: client{Client: c}, roomHandler: NewRoomHandler(c, logger), msgAssembler: NewMessageAssembler(logger), @@ -66,8 +73,15 @@ func (m *messenger) StartReceiver() (string, error) { syncer := m.client.Syncer.(*mautrix.DefaultSyncer) event.TypeMap[C4TMessage] = reflect.TypeOf(CaminoMatrixMessage{}) // custom message event types have to be registered properly - syncer.OnEventType(C4TMessage, func(source mautrix.EventSource, evt *event.Event) { + syncer.OnEventType(C4TMessage, func(ctx context.Context, evt *event.Event) { msg := evt.Content.Parsed.(*CaminoMatrixMessage) + traceID, err := trace.TraceIDFromHex(msg.Metadata.RequestID) + if err != nil { + m.logger.Warnf("failed to parse traceID from hex [requestID:%s]: %v", msg.Metadata.RequestID, err) + } + ctx = trace.ContextWithRemoteSpanContext(ctx, trace.NewSpanContext(trace.SpanContextConfig{TraceID: traceID})) + _, span := m.tracer.Start(ctx, "messenger.OnC4TMessageReceive", trace.WithSpanKind(trace.SpanKindConsumer), trace.WithAttributes(attribute.String("type", evt.Type.Type))) + defer span.End() t := time.Now() completeMsg, err, completed := m.msgAssembler.AssembleMessage(*msg) if err != nil { @@ -85,9 +99,9 @@ func (m *messenger) StartReceiver() (string, error) { Type: messaging.MessageType(msg.MsgType), } }) - syncer.OnEventType(event.StateMember, func(source mautrix.EventSource, evt *event.Event) { + syncer.OnEventType(event.StateMember, func(ctx context.Context, evt *event.Event) { if evt.GetStateKey() == m.client.UserID.String() && evt.Content.AsMember().Membership == event.MembershipInvite { - _, err := m.client.JoinRoomByID(evt.RoomID) + _, err := m.client.JoinRoomByID(ctx, evt.RoomID) if err == nil { m.logger.Info("Joined room after invite", zap.String("room_id", evt.RoomID.String()), @@ -121,7 +135,7 @@ func (m *messenger) StartReceiver() (string, error) { Signature: signature[2:], // removing 0x prefix } - err = cryptoHelper.Init() + err = cryptoHelper.Init(context.TODO()) if err != nil { return "", err } @@ -154,26 +168,32 @@ func (m *messenger) StopReceiver() error { return m.client.cryptoHelper.Close() } -func (m *messenger) SendAsync(_ context.Context, msg messaging.Message) error { +func (m *messenger) SendAsync(ctx context.Context, msg messaging.Message) error { m.logger.Info("Sending async message", zap.String("msg", msg.Metadata.RequestID)) + ctx, span := m.tracer.Start(ctx, "messenger.SendAsync", trace.WithSpanKind(trace.SpanKindProducer), trace.WithAttributes(attribute.String("type", string(msg.Type)))) + defer span.End() - roomID, err := m.roomHandler.GetOrCreateRoomForRecipient(id.UserID(msg.Metadata.Recipient)) + ctx, roomSpan := m.tracer.Start(ctx, "roomHandler.GetOrCreateRoom", trace.WithAttributes(attribute.String("type", string(msg.Type)))) + roomID, err := m.roomHandler.GetOrCreateRoomForRecipient(ctx, id.UserID(msg.Metadata.Recipient)) if err != nil { return err } + roomSpan.End() + ctx, compressSpan := m.tracer.Start(ctx, "messenger.Compress", trace.WithAttributes(attribute.String("type", string(msg.Type)))) messages, err := m.compressor.Compress(msg) if err != nil { return err } + compressSpan.End() - return m.sendMessageEvents(roomID, C4TMessage, messages) + return m.sendMessageEvents(ctx, roomID, C4TMessage, messages) } -func (m *messenger) sendMessageEvents(roomID id.RoomID, eventType event.Type, messages []CaminoMatrixMessage) error { +func (m *messenger) sendMessageEvents(ctx context.Context, roomID id.RoomID, eventType event.Type, messages []CaminoMatrixMessage) error { //TODO add retry logic? for _, msg := range messages { - _, err := m.client.SendMessageEvent(roomID, eventType, msg) + _, err := m.client.SendMessageEvent(ctx, roomID, eventType, msg) if err != nil { return err } diff --git a/internal/matrix/room_handler.go b/internal/matrix/room_handler.go index 5c231e07..7dd9093a 100644 --- a/internal/matrix/room_handler.go +++ b/internal/matrix/room_handler.go @@ -1,6 +1,7 @@ package matrix import ( + "context" "sync" "go.uber.org/zap" @@ -10,10 +11,10 @@ import ( ) type RoomHandler interface { - GetOrCreateRoomForRecipient(recipient id.UserID) (id.RoomID, error) - CreateRoomAndInviteUser(userID id.UserID) (id.RoomID, error) - EnableEncryptionForRoom(roomID id.RoomID) error - GetEncryptedRoomForRecipient(recipient id.UserID) (id.RoomID, bool) + GetOrCreateRoomForRecipient(context.Context, id.UserID) (id.RoomID, error) + CreateRoomAndInviteUser(context.Context, id.UserID) (id.RoomID, error) + EnableEncryptionForRoom(context.Context, id.RoomID) error + GetEncryptedRoomForRecipient(context.Context, id.UserID) (id.RoomID, bool) } type roomHandler struct { client *mautrix.Client @@ -26,20 +27,20 @@ func NewRoomHandler(client *mautrix.Client, logger *zap.SugaredLogger) RoomHandl return &roomHandler{client: client, logger: logger, rooms: make(map[id.UserID]id.RoomID)} } -func (r *roomHandler) GetOrCreateRoomForRecipient(recipient id.UserID) (id.RoomID, error) { +func (r *roomHandler) GetOrCreateRoomForRecipient(ctx context.Context, recipient id.UserID) (id.RoomID, error) { // check if room already established with recipient - roomID, found := r.GetEncryptedRoomForRecipient(recipient) + roomID, found := r.GetEncryptedRoomForRecipient(ctx, recipient) var err error // if not create room and invite recipient if !found { - roomID, err = r.CreateRoomAndInviteUser(recipient) + roomID, err = r.CreateRoomAndInviteUser(ctx, recipient) if err != nil { return "", err } // enable encryption for room - err = r.EnableEncryptionForRoom(roomID) + err = r.EnableEncryptionForRoom(ctx, roomID) if err != nil { return "", err } @@ -49,14 +50,14 @@ func (r *roomHandler) GetOrCreateRoomForRecipient(recipient id.UserID) (id.RoomI return roomID, nil } -func (r *roomHandler) CreateRoomAndInviteUser(userID id.UserID) (id.RoomID, error) { +func (r *roomHandler) CreateRoomAndInviteUser(ctx context.Context, userID id.UserID) (id.RoomID, error) { r.logger.Debugf("Creating room and inviting user %v", userID) req := mautrix.ReqCreateRoom{ Visibility: "private", Preset: "private_chat", Invite: []id.UserID{userID}, } - resp, err := r.client.CreateRoom(&req) + resp, err := r.client.CreateRoom(ctx, &req) if err != nil { return "", err } @@ -64,28 +65,28 @@ func (r *roomHandler) CreateRoomAndInviteUser(userID id.UserID) (id.RoomID, erro return resp.RoomID, nil } -func (r *roomHandler) EnableEncryptionForRoom(roomID id.RoomID) error { +func (r *roomHandler) EnableEncryptionForRoom(ctx context.Context, roomID id.RoomID) error { r.logger.Debugf("Enabling encryption for room %s", roomID) - _, err := r.client.SendStateEvent(roomID, event.StateEncryption, "", + _, err := r.client.SendStateEvent(ctx, roomID, event.StateEncryption, "", event.EncryptionEventContent{Algorithm: id.AlgorithmMegolmV1}) return err } -func (r *roomHandler) GetEncryptedRoomForRecipient(recipient id.UserID) (id.RoomID, bool) { +func (r *roomHandler) GetEncryptedRoomForRecipient(ctx context.Context, recipient id.UserID) (id.RoomID, bool) { roomID := r.fetchCachedRoom(recipient) if roomID != "" { return roomID, true } // if not found query joined rooms - rooms, err := r.client.JoinedRooms() + rooms, err := r.client.JoinedRooms(ctx) if err != nil { return "", false } for _, roomID := range rooms.JoinedRooms { - if !r.client.StateStore.IsEncrypted(roomID) { + if encrypted, err := r.client.StateStore.IsEncrypted(ctx, roomID); err != nil || !encrypted { continue } - members, err := r.client.JoinedMembers(roomID) + members, err := r.client.JoinedMembers(ctx, roomID) if err != nil { return "", false } diff --git a/internal/messaging/processor.go b/internal/messaging/processor.go index 88f824a9..da180dae 100644 --- a/internal/messaging/processor.go +++ b/internal/messaging/processor.go @@ -4,6 +4,9 @@ import ( "context" "errors" "fmt" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "sync" "time" @@ -47,6 +50,7 @@ type processor struct { messenger Messenger userID string logger *zap.SugaredLogger + tracer trace.Tracer timeout time.Duration // timeout after which a request is considered failed mu sync.Mutex @@ -68,6 +72,7 @@ func NewProcessor(messenger Messenger, logger *zap.SugaredLogger, cfg config.Pro cfg: cfg, messenger: messenger, logger: logger, + tracer: otel.GetTracerProvider().Tracer(""), timeout: time.Duration(cfg.Timeout) * time.Millisecond, // for now applies to all request types responseChannels: make(map[string]chan Message), serviceRegistry: registry, @@ -141,11 +146,14 @@ func (p *processor) Request(ctx context.Context, msg Message) (Message, error) { return Message{}, ErrMissingRecipient } msg.Metadata.Cheques = nil //TODO issue and attach cheques + ctx, span := p.tracer.Start(ctx, "processor.Request", trace.WithAttributes(attribute.String("type", string(msg.Type)))) + defer span.End() err := p.messenger.SendAsync(ctx, msg) if err != nil { return Message{}, err } - + ctx, responseSpan := p.tracer.Start(ctx, "processor.AwaitResponse", trace.WithSpanKind(trace.SpanKindConsumer), trace.WithAttributes(attribute.String("type", string(msg.Type)))) + defer responseSpan.End() for { select { case response := <-responseChan: @@ -161,6 +169,13 @@ func (p *processor) Request(ctx context.Context, msg Message) (Message, error) { } func (p *processor) Respond(msg Message) error { + traceID, err := trace.TraceIDFromHex(msg.Metadata.RequestID) + if err != nil { + p.logger.Warnf("failed to parse traceID from hex [requestID:%s]: %v", msg.Metadata.RequestID, err) + } + ctx := trace.ContextWithRemoteSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{TraceID: traceID})) + ctx, span := p.tracer.Start(ctx, "processor-response", trace.WithAttributes(attribute.String("type", string(msg.Type)))) + defer span.End() var service Service var supported bool if service, supported = p.serviceRegistry.GetService(msg.Type); !supported { @@ -173,9 +188,11 @@ func (p *processor) Respond(msg Message) error { md.Sender = p.userID md.Stamp(fmt.Sprintf("%s-%s", p.Checkpoint(), "request")) - ctx := grpc_metadata.NewOutgoingContext(context.Background(), msg.Metadata.ToGrpcMD()) + ctx = grpc_metadata.NewOutgoingContext(ctx, msg.Metadata.ToGrpcMD()) var header grpc_metadata.MD + ctx, cspan := p.tracer.Start(ctx, "service.Call", trace.WithSpanKind(trace.SpanKindClient), trace.WithAttributes(attribute.String("type", string(msg.Type)))) response, msgType, err := service.Call(ctx, &msg.Content.RequestContent, grpc.Header(&header)) + cspan.End() if err != nil { return err //TODO handle error and return a response message } diff --git a/internal/rpc/server/server.go b/internal/rpc/server/server.go index b672a551..e9d722c5 100644 --- a/internal/rpc/server/server.go +++ b/internal/rpc/server/server.go @@ -1,15 +1,16 @@ package server import ( - "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/activity/v1alpha/activityv1alphagrpc" - "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/book/v1alpha/bookv1alphagrpc" - bookv1alpha "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/book/v1alpha" "context" "errors" "fmt" "log" "net" + "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/activity/v1alpha/activityv1alphagrpc" + "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/book/v1alpha/bookv1alphagrpc" + bookv1alpha "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/services/book/v1alpha" + "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/network/v1alpha/networkv1alphagrpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/partner/v1alpha/partnerv1alphagrpc" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/ping/v1alpha/pingv1alphagrpc" @@ -26,7 +27,7 @@ import ( "github.com/chain4travel/camino-messenger-bot/internal/messaging" "github.com/chain4travel/camino-messenger-bot/internal/metadata" utils "github.com/chain4travel/camino-messenger-bot/utils/tls" - "github.com/google/uuid" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -58,6 +59,7 @@ type server struct { grpcServer *grpc.Server cfg *config.RPCServerConfig logger *zap.SugaredLogger + tracer trace.Tracer processor messaging.Processor serviceRegistry *messaging.ServiceRegistry } @@ -66,7 +68,7 @@ func (s *server) Checkpoint() string { return "request-gateway" } -func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, processor messaging.Processor, serviceRegistry *messaging.ServiceRegistry) *server { +func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, tracer trace.Tracer, processor messaging.Processor, serviceRegistry *messaging.ServiceRegistry) *server { var opts []grpc.ServerOption if cfg.Unencrypted { logger.Warn("Running gRPC server without TLS!") @@ -77,7 +79,7 @@ func NewServer(cfg *config.RPCServerConfig, logger *zap.SugaredLogger, processor } opts = []grpc.ServerOption{grpc.Creds(creds)} } - server := &server{cfg: cfg, logger: logger, processor: processor, serviceRegistry: serviceRegistry} + server := &server{cfg: cfg, logger: logger, tracer: tracer, processor: processor, serviceRegistry: serviceRegistry} server.grpcServer = createGrpcServerAndRegisterServices(server, opts...) return server } @@ -127,7 +129,7 @@ func (s *server) AccommodationSearch(ctx context.Context, request *accommodation } func (s *server) Ping(ctx context.Context, request *pingv1alpha.PingRequest) (*pingv1alpha.PingResponse, error) { - response, err := s.processInternalRequest(ctx, messaging.PingRequest, &messaging.RequestContent{PingRequest: *request}) + response, err := s.processExternalRequest(ctx, messaging.PingRequest, &messaging.RequestContent{PingRequest: *request}) return &response.PingResponse, err } @@ -167,6 +169,8 @@ func (s *server) TransportSearch(ctx context.Context, request *transportv1alpha. } func (s *server) processInternalRequest(ctx context.Context, requestType messaging.MessageType, request *messaging.RequestContent) (messaging.ResponseContent, error) { + ctx, span := s.tracer.Start(ctx, "server.processInternalRequest", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() service, registered := s.serviceRegistry.GetService(requestType) if !registered { return messaging.ResponseContent{}, fmt.Errorf("%v: %s", messaging.ErrUnsupportedRequestType, requestType) @@ -176,7 +180,9 @@ func (s *server) processInternalRequest(ctx context.Context, requestType messagi } func (s *server) processExternalRequest(ctx context.Context, requestType messaging.MessageType, request *messaging.RequestContent) (messaging.ResponseContent, error) { - err, md := s.processMetadata(ctx) + ctx, span := s.tracer.Start(ctx, "server.processExternalRequest", trace.WithSpanKind(trace.SpanKindServer)) + defer span.End() + err, md := s.processMetadata(ctx, span.SpanContext().TraceID()) if err != nil { return messaging.ResponseContent{}, fmt.Errorf("error processing metadata: %v", err) } @@ -193,16 +199,12 @@ func (s *server) processExternalRequest(ctx context.Context, requestType messagi grpc.SendHeader(ctx, response.Metadata.ToGrpcMD()) return response.Content.ResponseContent, err //TODO set specific errors according to https://grpc.github.io/grpc/core/md_doc_statuscodes.html ? } -func (s *server) processMetadata(ctx context.Context) (error, metadata.Metadata) { - requestID, err := uuid.NewRandom() - if err != nil { - return nil, metadata.Metadata{} - } +func (s *server) processMetadata(ctx context.Context, id trace.TraceID) (error, metadata.Metadata) { md := metadata.Metadata{ - RequestID: requestID.String(), + RequestID: id.String(), } md.Stamp(fmt.Sprintf("%s-%s", s.Checkpoint(), "received")) - err = md.ExtractMetadata(ctx) + err := md.ExtractMetadata(ctx) return err, md } diff --git a/internal/tracing/exporter.go b/internal/tracing/exporter.go new file mode 100644 index 00000000..5d22a06c --- /dev/null +++ b/internal/tracing/exporter.go @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2022-2023, Chain4Travel AG. All rights reserved. + * See the file LICENSE for licensing terms. + */ + +package tracing + +import ( + "context" + "fmt" + "github.com/chain4travel/camino-messenger-bot/config" + utils "github.com/chain4travel/camino-messenger-bot/utils/tls" + "time" + + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/sdk/trace" +) + +const exportTimeout = 10 * time.Second +const exporterInstantiationTimeout = 5 * time.Second + +func newExporter(cfg *config.TracingConfig) (trace.SpanExporter, error) { + + var client otlptrace.Client + opts := []otlptracegrpc.Option{ + otlptracegrpc.WithEndpoint(fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)), + otlptracegrpc.WithTimeout(exportTimeout), + } + if cfg.Insecure { + opts = append(opts, otlptracegrpc.WithInsecure()) + } else { + creds, err := utils.LoadTLSCredentials(cfg.CertFile, cfg.KeyFile) + if err != nil { + return nil, fmt.Errorf("could not load TLS keys: %s", err) + } + opts = append(opts, otlptracegrpc.WithTLSCredentials(creds)) + } + client = otlptracegrpc.NewClient(opts...) + ctx, cancel := context.WithTimeout(context.Background(), exporterInstantiationTimeout) + defer cancel() + return otlptrace.New(ctx, client) +} diff --git a/internal/tracing/nooptracer.go b/internal/tracing/nooptracer.go new file mode 100644 index 00000000..265c77c6 --- /dev/null +++ b/internal/tracing/nooptracer.go @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2022-2023, Chain4Travel AG. All rights reserved. + * See the file LICENSE for licensing terms. + */ + +package tracing + +import ( + "context" + "go.opentelemetry.io/otel/trace" +) + +var _ Tracer = (*noopTracer)(nil) + +type noopTracer struct { + tp trace.TracerProvider +} + +func NewNoOpTracer() (Tracer, error) { + return &noopTracer{tp: trace.NewNoopTracerProvider()}, nil +} +func (n *noopTracer) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + return n.tp.Tracer("").Start(ctx, spanName, opts...) +} +func (n *noopTracer) Shutdown() error { + return nil // nothing to do here +} diff --git a/internal/tracing/tracer.go b/internal/tracing/tracer.go new file mode 100644 index 00000000..247913e9 --- /dev/null +++ b/internal/tracing/tracer.go @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2022-2023, Chain4Travel AG. All rights reserved. + * See the file LICENSE for licensing terms. + */ + +package tracing + +import ( + "context" + "github.com/chain4travel/camino-messenger-bot/config" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.12.0" + "go.opentelemetry.io/otel/trace" + "time" +) + +const tracerProviderShutdownTimeout = exportTimeout + 5*time.Second + +var _ Tracer = (*tracer)(nil) + +type Tracer interface { + trace.Tracer + Shutdown() error +} + +type tracer struct { + tp *sdktrace.TracerProvider +} + +func (t *tracer) Start(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { + return t.tp.Tracer("").Start(ctx, spanName, opts...) +} +func (t *tracer) Shutdown() error { + ctx, cancel := context.WithTimeout(context.Background(), tracerProviderShutdownTimeout) + defer cancel() + return t.tp.Shutdown(ctx) +} + +func NewTracer(tracingConfig *config.TracingConfig, name string) (Tracer, error) { + exporter, err := newExporter(tracingConfig) + if err != nil { + return nil, err + } + + batchSpanProcessor := sdktrace.NewBatchSpanProcessor(exporter) + tp := sdktrace.NewTracerProvider( + sdktrace.WithSpanProcessor(batchSpanProcessor), + sdktrace.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceNameKey.String(name), + )), + ) + otel.SetTracerProvider(tp) + + return &tracer{ + tp: tp, + }, nil +} diff --git a/utils/constants/app.go b/utils/constants/app.go new file mode 100644 index 00000000..92cac624 --- /dev/null +++ b/utils/constants/app.go @@ -0,0 +1,8 @@ +/* + * Copyright (C) 2022-2023, Chain4Travel AG. All rights reserved. + * See the file LICENSE for licensing terms. + */ + +package constants + +const AppName = "camino-messenger-bot"