Conversation
0191f7e to
92fba59
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds a Redis-based “recently seen tx hash” cache (5-minute TTL) to the collector so downstream components (e.g., Protect) can filter transactions that loop back through the system.
Changes:
- Add Redis endpoint configuration plumbed from CLI → CollectorOpts → TxProcessor and initialize a Redis client on startup.
- Write tx hashes to Redis with a 5-minute TTL during tx processing (before ClickHouse write).
- Add a Redis client wrapper plus unit tests using
miniredis.
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
go.mod |
Adds dependencies for Redis client and miniredis test server. |
go.sum |
Updates module checksums for newly added dependencies. |
collector/tx_processor.go |
Adds Redis configuration/client and writes tx hashes to Redis during processing. |
collector/redis.go |
New Redis helper with AddTx + TTL. |
collector/redis_test.go |
Tests Redis AddTx, TTL behavior, and endpoint failure using miniredis. |
collector/collector.go |
Wires RedisEndpoint through collector options into the processor. |
cmd/collect/main.go |
Adds --redis-endpoint / REDIS_ENDPOINT flag and updates required-output validation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
892913c to
dffcd5d
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 7 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| func (r *Redis) AddTx(hash string) { | ||
| select { | ||
| case r.queue <- hash: | ||
| default: | ||
| r.log.Warnw("redis queue full, dropping tx", "tx", hash) | ||
| } |
There was a problem hiding this comment.
When the Redis queue is full, this logs a warning per dropped tx. Under load this can generate extremely high log volume and become an operational issue (I/O + cost), especially if Redis is briefly slow/unreachable. Consider rate-limiting/sampling this log and/or tracking drops via a counter metric (and maybe logging periodic summaries).
There was a problem hiding this comment.
we might actually want to log each tx that gets dropped. just incase we need to search for some tx while debugging, like first time we saw the tx. if i put it as a metric we lose that.
| func (p *TxProcessor) stopTransactionReceiverLoop() { | ||
| close(p.txC) | ||
| <-p.txCDone | ||
| } |
There was a problem hiding this comment.
stopTransactionReceiverLoop closes p.txC. That channel is also written to by multiple long-lived goroutines (e.g., NodeConnection.connect() does nc.txC <- ... without a select/closed check). On shutdown, those goroutines will panic with send on closed channel as soon as they receive another tx. Consider not closing a shared producer channel; instead introduce a context/done signal that producers can observe, or encapsulate the channel behind a TxProcessor.SubmitTx method that becomes a no-op after shutdown, and only close internal channels that have no external senders.
There was a problem hiding this comment.
this is pointing out some real issues with the existing implementation. I'll make a new issue for this.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
📝 Summary
This adds writing txes to redis. Just like how clickhouse does it.
This just adds another call to redis before clickhouse to store the tx hash with a TTL of 5min. Protect will check this to filter txs coming back through.
⛱ Motivation and Context
📚 References
✅ I have run these commands
make lintmake testgo mod tidy