diff --git a/cmd/simulator/config/flags.go b/cmd/simulator/config/flags.go index 70a227f61d..d1db9f1b68 100644 --- a/cmd/simulator/config/flags.go +++ b/cmd/simulator/config/flags.go @@ -13,7 +13,7 @@ import ( "github.com/spf13/viper" ) -const Version = "v0.1.0" +const Version = "v0.1.1" const ( ConfigFilePathKey = "config-file" @@ -27,6 +27,7 @@ const ( VersionKey = "version" TimeoutKey = "timeout" BatchSizeKey = "batch-size" + MetricsPortKey = "metrics-port" ) var ( @@ -44,6 +45,7 @@ type Config struct { KeyDir string `json:"key-dir"` Timeout time.Duration `json:"timeout"` BatchSize uint64 `json:"batch-size"` + MetricsPort uint64 `json:"metrics-port"` } func BuildConfig(v *viper.Viper) (Config, error) { @@ -56,6 +58,7 @@ func BuildConfig(v *viper.Viper) (Config, error) { KeyDir: v.GetString(KeyDirKey), Timeout: v.GetDuration(TimeoutKey), BatchSize: v.GetUint64(BatchSizeKey), + MetricsPort: v.GetUint64(MetricsPortKey), } if len(c.Endpoints) == 0 { return c, ErrNoEndpoints @@ -118,4 +121,5 @@ func addSimulatorFlags(fs *pflag.FlagSet) { fs.Duration(TimeoutKey, 5*time.Minute, "Specify the timeout for the simulator to complete (0 indicates no timeout)") fs.String(LogLevelKey, "info", "Specify the log level to use in the simulator") fs.Uint64(BatchSizeKey, 100, "Specify the batchsize for the worker to issue and confirm txs") + fs.Uint64(MetricsPortKey, 8082, "Specify the port to use for the metrics server") } diff --git a/cmd/simulator/load/funder.go b/cmd/simulator/load/funder.go index 995cae9203..4e7cca00cc 100644 --- a/cmd/simulator/load/funder.go +++ b/cmd/simulator/load/funder.go @@ -10,6 +10,7 @@ import ( "math/big" "github.com/ava-labs/subnet-evm/cmd/simulator/key" + "github.com/ava-labs/subnet-evm/cmd/simulator/metrics" "github.com/ava-labs/subnet-evm/cmd/simulator/txs" "github.com/ava-labs/subnet-evm/core/types" "github.com/ava-labs/subnet-evm/ethclient" @@ -21,7 +22,7 @@ import ( // DistributeFunds ensures that each address in keys has at least [minFundsPerAddr] by sending funds // from the key with the highest starting balance. // This function returns a set of at least [numKeys] keys, each having a minimum balance [minFundsPerAddr]. -func DistributeFunds(ctx context.Context, client ethclient.Client, keys []*key.Key, numKeys int, minFundsPerAddr *big.Int) ([]*key.Key, error) { +func DistributeFunds(ctx context.Context, client ethclient.Client, keys []*key.Key, numKeys int, minFundsPerAddr *big.Int, m *metrics.Metrics) ([]*key.Key, error) { if len(keys) < numKeys { return nil, fmt.Errorf("insufficient number of keys %d < %d", len(keys), numKeys) } @@ -107,7 +108,7 @@ func DistributeFunds(ctx context.Context, client ethclient.Client, keys []*key.K return nil, fmt.Errorf("failed to generate fund distribution sequence from %s of length %d", maxFundsKey.Address, len(needFundsAddrs)) } worker := NewSingleAddressTxWorker(ctx, client, maxFundsKey.Address) - txFunderAgent := txs.NewIssueNAgent[*types.Transaction](txSequence, worker, numTxs) + txFunderAgent := txs.NewIssueNAgent[*types.Transaction](txSequence, worker, numTxs, m) if err := txFunderAgent.Execute(ctx); err != nil { return nil, err diff --git a/cmd/simulator/load/loader.go b/cmd/simulator/load/loader.go index 779128e986..2c8458f035 100644 --- a/cmd/simulator/load/loader.go +++ b/cmd/simulator/load/loader.go @@ -6,11 +6,20 @@ package load import ( "context" "crypto/ecdsa" + "errors" "fmt" + "io/ioutil" "math/big" + "net/http" + "os" + "os/signal" + "strconv" + "strings" + "syscall" "github.com/ava-labs/subnet-evm/cmd/simulator/config" "github.com/ava-labs/subnet-evm/cmd/simulator/key" + "github.com/ava-labs/subnet-evm/cmd/simulator/metrics" "github.com/ava-labs/subnet-evm/cmd/simulator/txs" "github.com/ava-labs/subnet-evm/core/types" "github.com/ava-labs/subnet-evm/ethclient" @@ -18,9 +27,15 @@ import ( "github.com/ethereum/go-ethereum/common" ethcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/sync/errgroup" ) +const ( + MetricsEndpoint = "/metrics" // Endpoint for the Prometheus Metrics Server +) + // ExecuteLoader creates txSequences from [config] and has txAgents execute the specified simulation. func ExecuteLoader(ctx context.Context, config config.Config) error { if config.Timeout > 0 { @@ -29,6 +44,24 @@ func ExecuteLoader(ctx context.Context, config config.Config) error { defer cancel() } + // Create buffered sigChan to receive SIGINT notifications + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT) + + // Create context with cancel + ctx, cancel := context.WithCancel(ctx) + + go func() { + // Blocks until we receive a SIGINT notification or if parent context is done + select { + case <-sigChan: + case <-ctx.Done(): + } + + // Cancel the child context and end all processes + cancel() + }() + // Construct the arguments for the load simulator clients := make([]ethclient.Client, 0, len(config.Endpoints)) for i := 0; i < config.Workers; i++ { @@ -62,8 +95,14 @@ func ExecuteLoader(ctx context.Context, config config.Config) error { // to fund gas for all of their transactions. maxFeeCap := new(big.Int).Mul(big.NewInt(params.GWei), big.NewInt(config.MaxFeeCap)) minFundsPerAddr := new(big.Int).Mul(maxFeeCap, big.NewInt(int64(config.TxsPerWorker*params.TxGas))) + + // Create metrics + reg := prometheus.NewRegistry() + m := metrics.NewMetrics(reg) + metricsPort := strconv.Itoa(int(config.MetricsPort)) + log.Info("Distributing funds", "numTxsPerWorker", config.TxsPerWorker, "minFunds", minFundsPerAddr) - keys, err = DistributeFunds(ctx, clients[0], keys, config.Workers, minFundsPerAddr) + keys, err = DistributeFunds(ctx, clients[0], keys, config.Workers, minFundsPerAddr, m) if err != nil { return err } @@ -112,7 +151,7 @@ func ExecuteLoader(ctx context.Context, config config.Config) error { log.Info("Constructing tx agents...", "numAgents", config.Workers) agents := make([]txs.Agent[*types.Transaction], 0, config.Workers) for i := 0; i < config.Workers; i++ { - agents = append(agents, txs.NewIssueNAgent[*types.Transaction](txSequences[i], NewSingleAddressTxWorker(ctx, clients[i], senders[i]), config.BatchSize)) + agents = append(agents, txs.NewIssueNAgent[*types.Transaction](txSequences[i], NewSingleAddressTxWorker(ctx, clients[i], senders[i]), config.BatchSize, m)) } log.Info("Starting tx agents...") @@ -124,10 +163,59 @@ func ExecuteLoader(ctx context.Context, config config.Config) error { }) } + go startMetricsServer(ctx, metricsPort, reg) + log.Info("Waiting for tx agents...") if err := eg.Wait(); err != nil { return err } log.Info("Tx agents completed successfully.") + + printOutputFromMetricsServer(metricsPort) return nil } + +func startMetricsServer(ctx context.Context, metricsPort string, reg *prometheus.Registry) { + // Create a prometheus server to expose individual tx metrics + server := &http.Server{ + Addr: fmt.Sprintf(":%s", metricsPort), + } + + // Start up go routine to listen for SIGINT notifications to gracefully shut down server + go func() { + // Blocks until signal is received + <-ctx.Done() + + if err := server.Shutdown(ctx); err != nil { + log.Error("Metrics server error: %v", err) + } + log.Info("Received a SIGINT signal: Gracefully shutting down metrics server") + }() + + // Start metrics server + http.Handle(MetricsEndpoint, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg})) + log.Info(fmt.Sprintf("Metrics Server: localhost:%s%s", metricsPort, MetricsEndpoint)) + if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { + log.Error("Metrics server error: %v", err) + } +} + +func printOutputFromMetricsServer(metricsPort string) { + // Get response from server + resp, err := http.Get(fmt.Sprintf("http://localhost:%s%s", metricsPort, MetricsEndpoint)) + if err != nil { + log.Error("cannot get response from metrics servers", "err", err) + return + } + // Read response body + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Error("cannot read response body", "err", err) + return + } + // Print out formatted individual metrics + parts := strings.Split(string(respBody), "\n") + for _, s := range parts { + fmt.Printf(" \t\t\t%s\n", s) + } +} diff --git a/cmd/simulator/metrics/metrics.go b/cmd/simulator/metrics/metrics.go new file mode 100644 index 0000000000..8462e2f2e3 --- /dev/null +++ b/cmd/simulator/metrics/metrics.go @@ -0,0 +1,42 @@ +// (c) 2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +type Metrics struct { + // Summary of the quantiles of Individual Issuance Tx Times + IssuanceTxTimes prometheus.Summary + // Summary of the quantiles of Individual Confirmation Tx Times + ConfirmationTxTimes prometheus.Summary + // Summary of the quantiles of Individual Issuance To Confirmation Tx Times + IssuanceToConfirmationTxTimes prometheus.Summary +} + +// NewMetrics creates and returns a Metrics and registers it with a Collector +func NewMetrics(reg prometheus.Registerer) *Metrics { + m := &Metrics{ + IssuanceTxTimes: prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "tx_issuance_time", + Help: "Individual Tx Issuance Times for a Load Test", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }), + ConfirmationTxTimes: prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "tx_confirmation_time", + Help: "Individual Tx Confirmation Times for a Load Test", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }), + IssuanceToConfirmationTxTimes: prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "tx_issuance_to_confirmation_time", + Help: "Individual Tx Issuance To Confirmation Times for a Load Test", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }), + } + reg.MustRegister(m.IssuanceTxTimes) + reg.MustRegister(m.ConfirmationTxTimes) + reg.MustRegister(m.IssuanceToConfirmationTxTimes) + return m +} diff --git a/cmd/simulator/txs/agent.go b/cmd/simulator/txs/agent.go index c452765db7..09c3027f07 100644 --- a/cmd/simulator/txs/agent.go +++ b/cmd/simulator/txs/agent.go @@ -7,42 +7,53 @@ import ( "context" "errors" "fmt" + "time" + + "github.com/ava-labs/subnet-evm/cmd/simulator/metrics" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" ) +type THash interface { + Hash() common.Hash +} + // TxSequence provides an interface to return a channel of transactions. // The sequence is responsible for closing the channel when there are no further // transactions. -type TxSequence[T any] interface { +type TxSequence[T THash] interface { Chan() <-chan T } // Worker defines the interface for issuance and confirmation of transactions. // The caller is responsible for calling Close to cleanup resources used by the // worker at the end of the simulation. -type Worker[T any] interface { +type Worker[T THash] interface { IssueTx(ctx context.Context, tx T) error ConfirmTx(ctx context.Context, tx T) error Close(ctx context.Context) error } // Execute the work of the given agent. -type Agent[T any] interface { +type Agent[T THash] interface { Execute(ctx context.Context) error } // issueNAgent issues and confirms a batch of N transactions at a time. -type issueNAgent[T any] struct { +type issueNAgent[T THash] struct { sequence TxSequence[T] worker Worker[T] n uint64 + metrics *metrics.Metrics } // NewIssueNAgent creates a new issueNAgent -func NewIssueNAgent[T any](sequence TxSequence[T], worker Worker[T], n uint64) Agent[T] { +func NewIssueNAgent[T THash](sequence TxSequence[T], worker Worker[T], n uint64, metrics *metrics.Metrics) Agent[T] { return &issueNAgent[T]{ sequence: sequence, worker: worker, n: n, + metrics: metrics, } } @@ -53,33 +64,85 @@ func (a issueNAgent[T]) Execute(ctx context.Context) error { } txChan := a.sequence.Chan() + confirmedCount := 0 + batchI := 0 + m := a.metrics + txMap := make(map[common.Hash]time.Time) + + // Tracks the total amount of time waiting for issuing and confirming txs + var ( + totalIssuedTime time.Duration + totalConfirmedTime time.Duration + ) + defer func() { + if err := a.worker.Close(ctx); err != nil { + log.Error("error trying to close worker: %w", "err", err) + } + }() + + // Start time for execution + start := time.Now() for { var ( - txs = make([]T, 0, a.n) - tx T - ok bool + txs = make([]T, 0, a.n) + tx T + moreTxs bool ) + // Start issuance batch + issuedStart := time.Now() + L: for i := uint64(0); i < a.n; i++ { select { - case tx, ok = <-txChan: - if !ok { - return a.worker.Close(ctx) - } case <-ctx.Done(): return ctx.Err() + case tx, moreTxs = <-txChan: + if !moreTxs { + break L + } + issuanceIndividualStart := time.Now() + txMap[tx.Hash()] = issuanceIndividualStart + if err := a.worker.IssueTx(ctx, tx); err != nil { + return fmt.Errorf("failed to issue transaction %d: %w", len(txs), err) + } + issuanceIndividualDuration := time.Since(issuanceIndividualStart) + m.IssuanceTxTimes.Observe(issuanceIndividualDuration.Seconds()) + txs = append(txs, tx) } - - if err := a.worker.IssueTx(ctx, tx); err != nil { - return fmt.Errorf("failed to issue transaction %d: %w", len(txs), err) - } - txs = append(txs, tx) } + // Get the batch's issuance time and add it to totalIssuedTime + issuedDuration := time.Since(issuedStart) + log.Info("Issuance Batch Done", "batch", batchI, "time", issuedDuration.Seconds()) + totalIssuedTime += issuedDuration + // Wait for txs in this batch to confirm + confirmedStart := time.Now() for i, tx := range txs { + confirmedIndividualStart := time.Now() if err := a.worker.ConfirmTx(ctx, tx); err != nil { return fmt.Errorf("failed to await transaction %d: %w", i, err) } + confirmationIndividualDuration := time.Since(confirmedIndividualStart) + issuanceToConfirmationIndividualDuration := time.Since(txMap[tx.Hash()]) + m.ConfirmationTxTimes.Observe(confirmationIndividualDuration.Seconds()) + m.IssuanceToConfirmationTxTimes.Observe(issuanceToConfirmationIndividualDuration.Seconds()) + delete(txMap, tx.Hash()) + confirmedCount++ } + // Get the batch's confirmation time and add it to totalConfirmedTime + confirmedDuration := time.Since(confirmedStart) + log.Info("Confirmed Batch Done", "batch", batchI, "time", confirmedDuration.Seconds()) + totalConfirmedTime += confirmedDuration + + // Check if this is the last batch, if so write the final log and return + if !moreTxs { + totalTime := time.Since(start).Seconds() + log.Info("Execution complete", "totalTxs", confirmedCount, "totalTime", totalTime, "TPS", float64(confirmedCount)/totalTime, + "issuanceTime", totalIssuedTime.Seconds(), "confirmedTime", totalConfirmedTime.Seconds()) + + return nil + } + + batchI++ } } diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 5c5928cbfe..1e907b04fd 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -388,7 +388,7 @@ func (ec *client) SubscribeNewAcceptedTransactions(ctx context.Context, ch chan< return ec.c.EthSubscribe(ctx, ch, "newAcceptedTransactions") } -// SubscribeNewAcceptedTransactions subscribes to notifications about the accepted transaction hashes on the given channel. +// SubscribeNewPendingTransactions subscribes to notifications about the pending transaction hashes on the given channel. func (ec *client) SubscribeNewPendingTransactions(ctx context.Context, ch chan<- *common.Hash) (interfaces.Subscription, error) { return ec.c.EthSubscribe(ctx, ch, "newPendingTransactions") } diff --git a/tests/load/load_test.go b/tests/load/load_test.go index 19ddc3789e..eafe8b69a3 100644 --- a/tests/load/load_test.go +++ b/tests/load/load_test.go @@ -41,7 +41,7 @@ var _ = ginkgo.Describe("[Load Simulator]", ginkgo.Ordered, func() { err := os.Setenv("RPC_ENDPOINTS", commaSeparatedRPCEndpoints) gomega.Expect(err).Should(gomega.BeNil()) - log.Info("Sleeping with network running", "rpcEndpoints", commaSeparatedRPCEndpoints) + log.Info("Running load simulator...", "rpcEndpoints", commaSeparatedRPCEndpoints) cmd := exec.Command("./scripts/run_simulator.sh") log.Info("Running load simulator script", "cmd", cmd.String())