diff --git a/README.md b/README.md index 7e77019..b0b70f0 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,11 @@ configured TAC bech32 prefixes, run: tacchaind debug prefixes ``` +The repository also includes a small JavaScript/TypeScript address conversion +utility in [`contrib/tac-address-converter`](contrib/tac-address-converter). +It can be used directly from JS projects or as a reference implementation for +the same deterministic EVM hex <-> TAC bech32 conversion. + ### Learn more - [Cosmos SDK docs](https://docs.cosmos.network) diff --git a/app/app.go b/app/app.go index 6447b9d..515e6ae 100644 --- a/app/app.go +++ b/app/app.go @@ -138,6 +138,7 @@ import ( evmfeemarkettypes "github.com/cosmos/evm/x/feemarket/types" "github.com/cosmos/evm/x/vm" ethcmn "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" evmcorevm "github.com/ethereum/go-ethereum/core/vm" sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool" @@ -987,6 +988,16 @@ func (app *TacChainApp) configureEVMMempool(appOpts servertypes.AppOptions, logg LegacyPoolConfig: evmconfig.GetLegacyPoolConfig(appOpts, logger), BlockGasLimit: evmconfig.GetBlockGasLimit(appOpts, logger), MinTip: evmconfig.GetMinTip(appOpts, logger), + BroadCastTxFn: func(txs []*ethtypes.Transaction) error { + logger.Debug("broadcasting EVM transactions", "tx_count", len(txs)) + txs = append([]*ethtypes.Transaction(nil), txs...) + go func() { + if err := app.broadcastEVMTransactions(txs); err != nil { + logger.Error("failed to broadcast EVM transactions", "err", err, "tx_count", len(txs)) + } + }() + return nil + }, } evmMp := evmmempool.NewExperimentalEVMMempool( @@ -1015,6 +1026,32 @@ func (app *TacChainApp) configureEVMMempool(appOpts servertypes.AppOptions, logg return nil } +func (app *TacChainApp) broadcastEVMTransactions(ethTxs []*ethtypes.Transaction) error { + for _, ethTx := range ethTxs { + msg := &evmvmtypes.MsgEthereumTx{} + msg.FromEthereumTx(ethTx) + + txBuilder := app.txConfig.NewTxBuilder() + if err := txBuilder.SetMsgs(msg); err != nil { + return fmt.Errorf("failed to set msg in tx builder: %w", err) + } + + txBytes, err := app.txConfig.TxEncoder()(txBuilder.GetTx()) + if err != nil { + return fmt.Errorf("failed to encode transaction: %w", err) + } + + res, err := app.clientCtx.BroadcastTxSync(txBytes) + if err != nil { + return fmt.Errorf("failed to broadcast transaction %s: %w", ethTx.Hash().Hex(), err) + } + if res.Code != 0 { + return fmt.Errorf("transaction %s rejected by mempool: code=%d, log=%s", ethTx.Hash().Hex(), res.Code, res.RawLog) + } + } + return nil +} + func (app *TacChainApp) setPostHandler() { postHandler, err := posthandler.NewPostHandler( posthandler.HandlerOptions{}, @@ -1215,6 +1252,7 @@ func (app *TacChainApp) RegisterAPIRoutes(apiSvr *api.Server, apiConfig config.A // RegisterTxService implements the Application.RegisterTxService method. func (app *TacChainApp) RegisterTxService(clientCtx client.Context) { + app.SetClientCtx(clientCtx) authtx.RegisterTxService(app.BaseApp.GRPCQueryRouter(), clientCtx, app.BaseApp.Simulate, app.interfaceRegistry) } diff --git a/app/evm_mempool_broadcast_test.go b/app/evm_mempool_broadcast_test.go new file mode 100644 index 0000000..1d87c5e --- /dev/null +++ b/app/evm_mempool_broadcast_test.go @@ -0,0 +1,180 @@ +package app + +import ( + "context" + "math/big" + "sync" + "testing" + "time" + + dbm "github.com/cosmos/cosmos-db" + "github.com/stretchr/testify/require" + + "cosmossdk.io/log" + + rpcmock "github.com/cometbft/cometbft/rpc/client/mock" + coretypes "github.com/cometbft/cometbft/rpc/core/types" + cmttypes "github.com/cometbft/cometbft/types" + "github.com/cosmos/cosmos-sdk/client" + simtestutil "github.com/cosmos/cosmos-sdk/testutil/sims" + "github.com/cosmos/evm/mempool/txpool/legacypool" + evmtypes "github.com/cosmos/evm/x/vm/types" + ethcmn "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" +) + +type broadcastRecorder struct { + rpcmock.Client + + mu sync.Mutex + calls int + tx cmttypes.Tx +} + +func (r *broadcastRecorder) BroadcastTxSync(_ context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error) { + r.mu.Lock() + defer r.mu.Unlock() + + r.calls++ + r.tx = append(r.tx[:0], tx...) + return &coretypes.ResultBroadcastTx{}, nil +} + +func (r *broadcastRecorder) callCount() int { + r.mu.Lock() + defer r.mu.Unlock() + return r.calls +} + +func (r *broadcastRecorder) txBytes() cmttypes.Tx { + r.mu.Lock() + defer r.mu.Unlock() + return append(cmttypes.Tx(nil), r.tx...) +} + +type blockingBroadcastClient struct { + rpcmock.Client + + started chan struct{} + release chan struct{} + done chan struct{} +} + +func (c *blockingBroadcastClient) BroadcastTxSync(context.Context, cmttypes.Tx) (*coretypes.ResultBroadcastTx, error) { + close(c.started) + <-c.release + close(c.done) + return &coretypes.ResultBroadcastTx{}, nil +} + +func TestEVMMempoolBroadcastTxFnUsesUpdatedClientCtx(t *testing.T) { + tacApp := NewTacChainAppWithCustomOptions(t, true, SetupOptions{ + Logger: log.NewTestLogger(t), + DB: dbm.NewMemDB(), + AppOpts: simtestutil.NewAppOptionsWithFlagHome(t.TempDir()), + }) + + txPool := tacApp.EVMMempool.GetTxPool() + require.Len(t, txPool.Subpools, 1) + + legacyPool, ok := txPool.Subpools[0].(*legacypool.LegacyPool) + require.True(t, ok) + require.NotNil(t, legacyPool.BroadcastTxFn) + + rpcClient := &broadcastRecorder{} + tacApp.RegisterTxService(client.Context{}. + WithTxConfig(tacApp.txConfig). + WithClient(rpcClient), + ) + + to := ethcmn.Address{} + ethTx := ethtypes.NewTx(ðtypes.LegacyTx{ + Nonce: 1, + To: &to, + Value: big.NewInt(0), + Gas: 21_000, + GasPrice: big.NewInt(1), + }) + + // Before the explicit BroadCastTxFn override, this callback captured the + // empty client.Context from app construction and returned "no RPC client is + // defined in offline mode". RegisterTxService is where evmserver passes the + // real clientCtx after local.New(bftNode), so it must refresh app.clientCtx. + require.NoError(t, legacyPool.BroadcastTxFn([]*ethtypes.Transaction{ethTx})) + require.Eventually(t, func() bool { + return rpcClient.callCount() == 1 + }, time.Second, 10*time.Millisecond) + + txBytes := rpcClient.txBytes() + require.NotEmpty(t, txBytes) + + decodedTx, err := tacApp.txConfig.TxDecoder()(txBytes) + require.NoError(t, err) + + msgs := decodedTx.GetMsgs() + require.Len(t, msgs, 1) + + msg, ok := msgs[0].(*evmtypes.MsgEthereumTx) + require.True(t, ok) + require.Equal(t, ethTx.Hash(), msg.Hash()) +} + +func TestEVMMempoolBroadcastTxFnDoesNotBlockOnBroadcast(t *testing.T) { + tacApp := NewTacChainAppWithCustomOptions(t, true, SetupOptions{ + Logger: log.NewTestLogger(t), + DB: dbm.NewMemDB(), + AppOpts: simtestutil.NewAppOptionsWithFlagHome(t.TempDir()), + }) + + txPool := tacApp.EVMMempool.GetTxPool() + require.Len(t, txPool.Subpools, 1) + + legacyPool, ok := txPool.Subpools[0].(*legacypool.LegacyPool) + require.True(t, ok) + require.NotNil(t, legacyPool.BroadcastTxFn) + + rpcClient := &blockingBroadcastClient{ + started: make(chan struct{}), + release: make(chan struct{}), + done: make(chan struct{}), + } + + tacApp.RegisterTxService(client.Context{}. + WithTxConfig(tacApp.txConfig). + WithClient(rpcClient), + ) + + to := ethcmn.Address{} + ethTx := ethtypes.NewTx(ðtypes.LegacyTx{ + Nonce: 1, + To: &to, + Value: big.NewInt(0), + Gas: 21_000, + GasPrice: big.NewInt(1), + }) + + done := make(chan error, 1) + go func() { + done <- legacyPool.BroadcastTxFn([]*ethtypes.Transaction{ethTx}) + }() + + select { + case err := <-done: + require.NoError(t, err) + case <-time.After(100 * time.Millisecond): + t.Fatal("BroadcastTxFn blocked on BroadcastTxSync") + } + + select { + case <-rpcClient.started: + case <-time.After(time.Second): + t.Fatal("BroadcastTxFn did not start background broadcast") + } + + close(rpcClient.release) + select { + case <-rpcClient.done: + case <-time.After(time.Second): + t.Fatal("background broadcast goroutine did not exit") + } +}