diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java index fb331e221..88f516053 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/RouteSegmentedConnPool.java @@ -172,10 +172,11 @@ public RouteSegmentedConnPool( this.timeouts = new ScheduledThreadPoolExecutor(1, tf); this.timeouts.setRemoveOnCancelPolicy(true); - // Asynchronous disposer for slow GRACEFUL closes. + // Asynchronous disposer for slow GRACEFUL closes. Capped at 32 so very + // many-core boxes do not over-provision; threads time out when idle. final int cores = Math.max(2, Runtime.getRuntime().availableProcessors()); - final int nThreads = Math.min(8, cores); - final int qsize = 1024; + final int nThreads = Math.min(32, Math.max(8, cores * 2)); + final int qsize = 4096; final ThreadFactory df = r -> { final Thread t = new Thread(r, "seg-pool-disposer"); @@ -184,10 +185,11 @@ public RouteSegmentedConnPool( }; this.disposer = new ThreadPoolExecutor( nThreads, nThreads, - 0L, TimeUnit.MILLISECONDS, + 30_000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(qsize), df, new ThreadPoolExecutor.AbortPolicy()); + this.disposer.allowCoreThreadTimeOut(true); } final class Segment { @@ -293,19 +295,21 @@ public Future> lease( // Late hit after enqueuing final PoolEntry late = pollAvailable(seg, state); if (late != null) { - if (seg.waiters.remove(w)) { + if (w.complete(late)) { cancelTimeout(w); fireOnLease(route); if (callback != null) { callback.completed(late); } - w.complete(late); dequeueIfDrained(seg); return w; } else { boolean handedOff = false; for (Waiter other; (other = seg.waiters.pollFirst()) != null; ) { - if (!other.cancelled && compatible(other.state, late.getState())) { + if (other.cancelled || other.isDone()) { + continue; + } + if (compatible(other.state, late.getState())) { cancelTimeout(other); handedOff = other.complete(late); if (handedOff) { @@ -361,8 +365,10 @@ public void release(final PoolEntry entry, final boolean reusable) { if (stillValid) { if (!handOffToCompatibleWaiter(entry, seg)) { offerAvailable(seg, entry); - enqueueIfNeeded(route, seg); - triggerDrainIfMany(); + if (!seg.waiters.isEmpty()) { + enqueueIfNeeded(route, seg); + triggerDrainIfMany(); + } } } else { discardAndDecr(entry, CloseMode.GRACEFUL); @@ -612,7 +618,7 @@ private boolean compatible(final Object needed, final Object have) { } private boolean handOffToCompatibleWaiter(final PoolEntry entry, final Segment seg) { - final Deque skipped = new ArrayDeque<>(); + Deque skipped = null; boolean handedOff = false; for (; ; ) { @@ -632,12 +638,17 @@ private boolean handOffToCompatibleWaiter(final PoolEntry entry, final Seg break; } } else { + if (skipped == null) { + skipped = new ArrayDeque<>(); + } skipped.addLast(w); } } - while (!skipped.isEmpty()) { - seg.waiters.addFirst(skipped.pollLast()); + if (skipped != null) { + while (!skipped.isEmpty()) { + seg.waiters.addFirst(skipped.pollLast()); + } } return handedOff; }