Skip to content

heap memory leak because the ConcurrentLinkedQueue in Batcher is unbounded and filled #181

@masterOcean

Description

@masterOcean

⚠️ heap memory leak because the ConcurrentLinkedQueue in Batcher is unbounded and filled

Describe the bug

Image Image Image the batcher_call_sched metric is normal, but batcher_call_time metric is zero, because the dist req is pushed into the callTaskBuffers, but in batchAndEmit() not polled. I suppose the grpc server did not response for a long time, then pipelineDepth.getAndDecrement() do not execute, so the pipelineDepth.get() < maxPipelineDepth.get() condition in trigger() can not be satisfied, so the dist req is pushed and not polled. then the heap is filled by the dist req.
   public final CompletableFuture<CallResultT> submit(BatcherKeyT batcherKey, CallT request) {
        if (avgLatencyNanos.estimate() < burstLatencyNanos) {
            ICallTask<CallT, CallResultT, BatcherKeyT> callTask = new CallTask<>(batcherKey, request);
            boolean offered = callTaskBuffers.offer(callTask);
            assert offered;
            trigger();
            return callTask.resultPromise();
        } else {
            dropCounter.increment();
            return CompletableFuture.failedFuture(new BackPressureException("Too high average latency"));
        }
    }

    private void trigger() {
        if (triggering.compareAndSet(false, true)) {
            try {
                if (!callTaskBuffers.isEmpty() && pipelineDepth.get() < maxPipelineDepth.get()) {
                    batchAndEmit();
                }
            } catch (Throwable e) {
                log.error("Unexpected exception", e);
            } finally {
                triggering.set(false);
                if (!callTaskBuffers.isEmpty() && pipelineDepth.get() < maxPipelineDepth.get()) {
                    trigger();
                }
            }
        }
    }

 private void batchAndEmit() {
        pipelineDepth.incrementAndGet();
        long buildStart = System.nanoTime();
        IBatchCall<CallT, CallResultT, BatcherKeyT> batchCall = batchPool.poll();
        assert batchCall != null;
        int batchSize = 0;
        LinkedList<ICallTask<CallT, CallResultT, BatcherKeyT>> batchedTasks = new LinkedList<>();
        ICallTask<CallT, CallResultT, BatcherKeyT> callTask;
        while (batchSize < maxBatchSize && (callTask = callTaskBuffers.poll()) != null) {
            batchCall.add(callTask);
            batchedTasks.add(callTask);
            batchSize++;
            queueingTimeSummary.record(System.nanoTime() - callTask.ts());
        }
        batchSizeSummary.record(batchSize);
        long execStart = System.nanoTime();
        batchBuildTimeSummary.record((execStart - buildStart));
        final int finalBatchSize = batchSize;
        batchCall.execute()
            .whenComplete((v, e) -> {
                long execEnd = System.nanoTime();
                if (e != null) {
                    log.error("Unexpected exception during handling batchcall result", e);
                    // reset max batch size
                    maxBatchSize = 1;
                } else {
                    long thisLatency = execEnd - execStart;
                    if (thisLatency > 0) {
                        updateMaxBatchSize(finalBatchSize, thisLatency);
                    }
                    batchExecTimer.record(thisLatency, TimeUnit.NANOSECONDS);
                }
                batchedTasks.forEach(t -> {
                    long callLatency = execEnd - t.ts();
                    avgLatencyNanos.observe(callLatency);
                    batchCallTimer.record(callLatency, TimeUnit.NANOSECONDS);
                });
                batchCall.reset();
                batchPool.offer(batchCall);
                pipelineDepth.getAndDecrement();
                if (!callTaskBuffers.isEmpty()) {
                    trigger();
                }
            });
    }

Environment

  • Version: [3.2.1]
  • JVM Version: [OpenJDK17]
  • Hardware Spec: [32c64g]
  • OS: [CentOS 7]

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions