diff --git a/changelog/unreleased/solr-18011-locking-update.yml b/changelog/unreleased/solr-18011-locking-update.yml new file mode 100644 index 000000000000..7d2ad4cb4722 --- /dev/null +++ b/changelog/unreleased/solr-18011-locking-update.yml @@ -0,0 +1,9 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: Allow locked Admin APIs to call other locked AdminAPIs. These locked Admin APIs can only call other APIs on the same resource tree (Collection > Shard > Replica) to protect against deadlocks. +type: changed # added, changed, fixed, deprecated, removed, dependency_update, security, other +authors: + - name: Houston Putman + nick: HoustonPutman +links: + - name: SOLR-18011 + url: https://issues.apache.org/jira/browse/SOLR-18011 diff --git a/solr/core/src/java/org/apache/solr/api/V2HttpCall.java b/solr/core/src/java/org/apache/solr/api/V2HttpCall.java index ba2b6fa7af86..df88f992e8e5 100644 --- a/solr/core/src/java/org/apache/solr/api/V2HttpCall.java +++ b/solr/core/src/java/org/apache/solr/api/V2HttpCall.java @@ -18,6 +18,7 @@ package org.apache.solr.api; import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; +import static org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER; import static org.apache.solr.servlet.HttpSolrCall.Action.ADMIN; import static org.apache.solr.servlet.HttpSolrCall.Action.ADMIN_OR_REMOTEPROXY; import static org.apache.solr.servlet.HttpSolrCall.Action.PROCESS; @@ -212,6 +213,11 @@ private void initAdminRequest(String path) throws Exception { solrReq.getContext().put(CoreContainer.class.getName(), cores); requestType = AuthorizationContext.RequestType.ADMIN; action = ADMIN; + + String callingLockId = req.getHeader(CALLING_LOCK_ID_HEADER); + if (callingLockId != null && !callingLockId.isBlank()) { + solrReq.getContext().put(CALLING_LOCK_ID_HEADER, callingLockId); + } } protected void parseRequest() throws Exception { diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedCollectionLockFactory.java b/solr/core/src/java/org/apache/solr/cloud/DistributedCollectionLockFactory.java index e49aba6c3f51..f2ab771232ed 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DistributedCollectionLockFactory.java +++ b/solr/core/src/java/org/apache/solr/cloud/DistributedCollectionLockFactory.java @@ -54,6 +54,7 @@ public interface DistributedCollectionLockFactory { * @param replicaName is ignored and can be {@code null} if {@code level} is {@link * org.apache.solr.common.params.CollectionParams.LockLevel#COLLECTION} or {@link * org.apache.solr.common.params.CollectionParams.LockLevel#SHARD} + * @param callingLockId the lockId from the caller that should be mirrored by this lock * @return a lock instance that must be {@link DistributedLock#release()}'ed in a {@code finally}, * regardless of the lock having been acquired or not. */ @@ -62,5 +63,6 @@ DistributedLock createLock( CollectionParams.LockLevel level, String collName, String shardId, - String replicaName); + String replicaName, + String callingLockId); } diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedLock.java b/solr/core/src/java/org/apache/solr/cloud/DistributedLock.java index 1929766e86ef..c26c5d499f03 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DistributedLock.java +++ b/solr/core/src/java/org/apache/solr/cloud/DistributedLock.java @@ -24,4 +24,8 @@ public interface DistributedLock { void release(); boolean isAcquired(); + + String getLockId(); + + boolean isMirroringLock(); } diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedMultiLock.java b/solr/core/src/java/org/apache/solr/cloud/DistributedMultiLock.java index 9979c144e84b..97be2293399c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/DistributedMultiLock.java +++ b/solr/core/src/java/org/apache/solr/cloud/DistributedMultiLock.java @@ -20,7 +20,9 @@ import com.google.common.annotations.VisibleForTesting; import java.lang.invoke.MethodHandles; import java.util.List; +import java.util.stream.Collectors; import org.apache.solr.common.SolrException; +import org.apache.solr.common.util.StrUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +48,12 @@ public void waitUntilAcquired() { for (DistributedLock lock : locks) { log.debug("DistributedMultiLock.waitUntilAcquired. About to wait on lock {}", lock); lock.waitUntilAcquired(); - log.debug("DistributedMultiLock.waitUntilAcquired. Acquired lock {}", lock); + if (lock.isMirroringLock()) { + log.debug( + "DistributedMultiLock.waitUntilAcquired. Mirroring already-acquired lock {}", lock); + } else { + log.debug("DistributedMultiLock.waitUntilAcquired. Acquired lock {}", lock); + } } } @@ -70,6 +77,17 @@ public boolean isAcquired() { return true; } + public String getLockId() { + return locks.stream().map(DistributedLock::getLockId).collect(Collectors.joining(",")); + } + + public static List splitLockIds(String lockIds) { + if (StrUtils.isBlank(lockIds)) { + return List.of(); + } + return List.of(lockIds.split(",")); + } + @VisibleForTesting public int getCountInternalLocks() { return locks.size(); diff --git a/solr/core/src/java/org/apache/solr/cloud/LockTree.java b/solr/core/src/java/org/apache/solr/cloud/LockTree.java index e8d96d4f2cd5..6bb432abd8e8 100644 --- a/solr/core/src/java/org/apache/solr/cloud/LockTree.java +++ b/solr/core/src/java/org/apache/solr/cloud/LockTree.java @@ -21,8 +21,11 @@ import java.util.ArrayDeque; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.UUID; import org.apache.solr.cloud.OverseerMessageHandler.Lock; +import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.CollectionParams.LockLevel; import org.apache.solr.common.util.StrUtils; @@ -38,20 +41,36 @@ public class LockTree { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final Node root = new Node(null, LockLevel.CLUSTER, null); + public final Map allLocks = new HashMap<>(); + private class LockImpl implements Lock { final Node node; + final String id; LockImpl(Node node) { this.node = node; + this.id = UUID.randomUUID().toString(); } @Override public void unlock() { synchronized (LockTree.this) { - node.unlock(this); + if (node.unlock(this)) { + allLocks.remove(id); + } } } + @Override + public String id() { + return id; + } + + @Override + public boolean validateSubpath(int lockLevel, List path) { + return node.validateSubpath(lockLevel, path); + } + @Override public String toString() { return StrUtils.join(node.constructPath(new ArrayDeque<>()), '/'); @@ -71,12 +90,43 @@ public String toString() { public class Session { private SessionNode root = new SessionNode(LockLevel.CLUSTER); - public Lock lock(CollectionParams.CollectionAction action, List path) { + public Lock lock( + CollectionParams.CollectionAction action, List path, String callingLockId) { if (action.lockLevel == LockLevel.NONE) return FREELOCK; + Node startingNode = LockTree.this.root; + SessionNode startingSession = root; + + // If a callingLockId was passed in, validate it with the current lock path, and only start + // locking below the calling lock + Lock callingLock = StrUtils.isBlank(callingLockId) ? null : allLocks.get(callingLockId); + log.debug("Calling lock id: {}, level: {}", callingLockId, callingLock); + boolean reuseCurrentLock = false; + if (callingLock != null) { + if (callingLock.validateSubpath(action.lockLevel.getHeight(), path)) { + startingNode = ((LockImpl) callingLock).node; + startingSession = startingSession.find(startingNode.level.getHeight(), path); + if (startingSession == null) { + startingSession = root; + } + reuseCurrentLock = true; + } else { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + String.format( + Locale.ROOT, + "Cannot lock an action under a different path than the calling action. Calling action lock path: %s, Requested action lock path: %s", + callingLock, + String.join("/", path))); + } + } synchronized (LockTree.this) { - if (root.isBusy(action.lockLevel, path)) return null; - Lock lockObject = LockTree.this.root.lock(action.lockLevel, path); - if (lockObject == null) root.markBusy(action.lockLevel, path); + if (startingSession.isBusy(action.lockLevel, path)) return null; + Lock lockObject = startingNode.lock(action.lockLevel, path, reuseCurrentLock); + if (lockObject == null) { + startingSession.markBusy(action.lockLevel, path); + } else { + allLocks.put(lockObject.id(), lockObject); + } return lockObject; } } @@ -125,6 +175,18 @@ boolean isBusy(LockLevel lockLevel, List path) { return false; } } + + SessionNode find(int lockLevel, List path) { + if (level.getHeight() == lockLevel) { + return this; + } else if (level.getHeight() < lockLevel + && kids != null + && kids.containsKey(path.get(level.getHeight()))) { + return kids.get(path.get(level.getHeight())).find(lockLevel, path); + } else { + return null; + } + } } public Session getSession() { @@ -135,6 +197,7 @@ private class Node { final String name; final Node mom; final LockLevel level; + int refCount = 0; HashMap children = new HashMap<>(); LockImpl myLock; @@ -151,30 +214,49 @@ boolean isLocked() { return false; } - void unlock(LockImpl lockObject) { + boolean unlock(LockImpl lockObject) { + if (--refCount > 0) { + return false; + } if (myLock == lockObject) myLock = null; else { log.info("Unlocked multiple times : {}", lockObject); } + return true; } - Lock lock(LockLevel lockLevel, List path) { - if (myLock != null) return null; // I'm already locked. no need to go any further + Lock lock(LockLevel lockLevel, List path, boolean reuseCurrentLock) { + if (myLock != null && !reuseCurrentLock) { + // I'm already locked. no need to go any further + return null; + } if (lockLevel == level) { // lock is supposed to be acquired at this level + if (myLock != null && reuseCurrentLock) { + // I am already locked, and I want to be re-used + refCount++; + return myLock; + } // If I am locked or any of my children or grandchildren are locked // it is not possible to acquire a lock if (isLocked()) return null; + refCount++; return myLock = new LockImpl(this); } else { String childName = path.get(level.getHeight()); Node child = children.get(childName); if (child == null) children.put(childName, child = new Node(childName, level.getChild(), this)); - return child.lock(lockLevel, path); + return child.lock(lockLevel, path, false); } } + boolean validateSubpath(int lockLevel, List path) { + return level.getHeight() <= lockLevel + && (level.getHeight() == 0 || name.equals(path.get(level.getHeight() - 1))) + && (mom == null || mom.validateSubpath(lockLevel, path)); + } + ArrayDeque constructPath(ArrayDeque collect) { if (name != null) collect.addFirst(name); if (mom != null) mom.constructPath(collect); @@ -182,5 +264,20 @@ ArrayDeque constructPath(ArrayDeque collect) { } } - static final Lock FREELOCK = () -> {}; + static final String FREELOCK_ID = "-1"; + static final Lock FREELOCK = + new Lock() { + @Override + public void unlock() {} + + @Override + public String id() { + return FREELOCK_ID; + } + + @Override + public boolean validateSubpath(int lockLevel, List path) { + return false; + } + }; } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java index 0bf454a06421..750fc62effa1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java @@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; @@ -61,7 +62,7 @@ public OverseerConfigSetMessageHandler(ZkStateReader zkStateReader, CoreContaine } @Override - public OverseerSolrResponse processMessage(ZkNodeProps message, String operation) { + public OverseerSolrResponse processMessage(ZkNodeProps message, String operation, Lock lock) { NamedList results = new NamedList<>(); try { if (!operation.startsWith(CONFIGSETS_ACTION_PREFIX)) { @@ -113,11 +114,26 @@ public String getTimerName(String operation) { } @Override - public Lock lockTask(ZkNodeProps message, long ignored) { + public Lock lockTask(ZkNodeProps message, long ignored, String callingLockId) { String configSetName = getTaskKey(message); if (canExecute(configSetName, message)) { markExclusiveTask(configSetName, message); - return () -> unmarkExclusiveTask(configSetName, message); + return new Lock() { + @Override + public void unlock() { + unmarkExclusiveTask(configSetName, message); + } + + @Override + public String id() { + return configSetName; + } + + @Override + public boolean validateSubpath(int lockLevel, List path) { + return false; + } + }; } return null; } diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java index 3e369b907316..c01f365a3d76 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java @@ -16,6 +16,7 @@ */ package org.apache.solr.cloud; +import java.util.List; import org.apache.solr.common.cloud.ZkNodeProps; /** Interface for processing messages received by an {@link OverseerTaskProcessor} */ @@ -26,7 +27,7 @@ public interface OverseerMessageHandler { * @param operation the operation to process * @return response */ - OverseerSolrResponse processMessage(ZkNodeProps message, String operation); + OverseerSolrResponse processMessage(ZkNodeProps message, String operation, Lock lock); /** * @return the name of the OverseerMessageHandler @@ -41,6 +42,10 @@ public interface OverseerMessageHandler { interface Lock { void unlock(); + + String id(); + + boolean validateSubpath(int lockLevel, List path); } /** @@ -48,7 +53,7 @@ interface Lock { * * @return null if locking is not possible. */ - Lock lockTask(ZkNodeProps message, long batchSessionId); + Lock lockTask(ZkNodeProps message, long batchSessionId, String callingLockId); /** * @param message the message being processed diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java index 14bc88b583f4..30ff9a31996e 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.solr.cloud; +import static org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; import static org.apache.solr.common.params.CommonParams.ID; @@ -37,11 +38,13 @@ import java.util.function.Predicate; import org.apache.solr.cloud.Overseer.LeaderStatus; import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent; +import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.IOUtils; +import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.Utils; @@ -327,8 +330,32 @@ public void run() { workQueue.remove(head, asyncId == null); continue; } + if (operation == null) { + log.error("Msg does not have required {} : {}", Overseer.QUEUE_OPERATION, message); + workQueue.remove(head, asyncId == null); + continue; + } + String callingLockId = message.getStr(CALLING_LOCK_ID_HEADER); OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message); - OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, batchSessionId); + OverseerMessageHandler.Lock lock; + try { + lock = messageHandler.lockTask(message, batchSessionId, callingLockId); + } catch (SolrException e) { + // Lock acquisition can throw if e.g. callingLockId references an unrelated + // action. In that case, fail the task immediately rather than retrying. + log.error( + "Error occurred while trying to acquire lock for task [{}]", head.getId(), e); + NamedList errResp = new NamedList<>(); + errResp.add("exception", e.getMessage()); + OverseerSolrResponse response = new OverseerSolrResponse(errResp); + if (asyncId != null) { + failureMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response)); + } else { + head.setBytes(OverseerSolrResponseSerializer.serialize(response)); + } + workQueue.remove(head, asyncId == null); + continue; + } if (lock == null) { if (log.isDebugEnabled()) { log.debug("Exclusivity check failed for [{}]", message); @@ -561,7 +588,7 @@ public void run() { if (log.isDebugEnabled()) { log.debug("Runner processing {}", head.getId()); } - response = messageHandler.processMessage(message, operation); + response = messageHandler.processMessage(message, operation, lock); } finally { timerContext.stop(); updateStats(statsName); diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedCollectionLockFactory.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedCollectionLockFactory.java index fe77b18c3f2b..098545b6afcf 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedCollectionLockFactory.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedCollectionLockFactory.java @@ -44,7 +44,8 @@ public DistributedLock createLock( CollectionParams.LockLevel level, String collName, String shardId, - String replicaName) { + String replicaName, + String lockIdToMirror) { Objects.requireNonNull(collName, "collName can't be null"); if (level != CollectionParams.LockLevel.COLLECTION) { Objects.requireNonNull( @@ -56,7 +57,8 @@ public DistributedLock createLock( } String lockPath = getLockPath(level, collName, shardId, replicaName); - return doCreateLock(isWriteLock, lockPath); + + return doCreateLock(isWriteLock, lockPath, lockIdToMirror); } /** diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedConfigSetLockFactory.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedConfigSetLockFactory.java index 884703a8829b..3458ea322153 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedConfigSetLockFactory.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedConfigSetLockFactory.java @@ -40,7 +40,7 @@ public DistributedLock createLock(boolean isWriteLock, String configSetName) { Objects.requireNonNull(configSetName, "configSetName can't be null"); String lockPath = getLockPath(configSetName); - return doCreateLock(isWriteLock, lockPath); + return doCreateLock(isWriteLock, lockPath, null); } /** diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLock.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLock.java index 4b594508d2ac..30b70f66b82b 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLock.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLock.java @@ -24,6 +24,7 @@ import org.apache.solr.cloud.api.collections.DistributedCollectionConfigSetCommandRunner; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.util.StrUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -41,16 +42,20 @@ abstract class ZkDistributedLock implements DistributedLock { static final char LOCK_PREFIX_SUFFIX = '_'; /** Prefix of EPHEMERAL read lock node names */ - static final String READ_LOCK_PREFIX = "R" + LOCK_PREFIX_SUFFIX; + static final char READ_LOCK_PREFIX_CHAR = 'R'; + + static final String READ_LOCK_PREFIX = "" + READ_LOCK_PREFIX_CHAR + LOCK_PREFIX_SUFFIX; /** Prefix of EPHEMERAL write lock node names */ - static final String WRITE_LOCK_PREFIX = "W" + LOCK_PREFIX_SUFFIX; + static final char WRITE_LOCK_PREFIX_CHAR = 'W'; + + static final String WRITE_LOCK_PREFIX = "" + WRITE_LOCK_PREFIX_CHAR + LOCK_PREFIX_SUFFIX; /** Read lock. */ static class Read extends ZkDistributedLock { - protected Read(SolrZkClient zkClient, String lockPath) + protected Read(SolrZkClient zkClient, String lockPath, String lockIdToMirror) throws KeeperException, InterruptedException { - super(zkClient, lockPath, READ_LOCK_PREFIX); + super(zkClient, lockPath, READ_LOCK_PREFIX, lockIdToMirror); } @Override @@ -59,13 +64,18 @@ boolean isBlockedByNodeType(String otherLockName) { // Lower numbered read locks are ok, they can coexist. return otherLockName.startsWith(WRITE_LOCK_PREFIX); } + + @Override + boolean canMirrorLock(String lockId) { + return true; + } } /** Write lock. */ static class Write extends ZkDistributedLock { - protected Write(SolrZkClient zkClient, String lockPath) + protected Write(SolrZkClient zkClient, String lockPath, String lockIdToMirror) throws KeeperException, InterruptedException { - super(zkClient, lockPath, WRITE_LOCK_PREFIX); + super(zkClient, lockPath, WRITE_LOCK_PREFIX, lockIdToMirror); } @Override @@ -73,6 +83,17 @@ boolean isBlockedByNodeType(String otherLockName) { // A write lock is blocked by another read or write lock with a lower sequence number return true; } + + @Override + boolean canMirrorLock(String lockId) { + // Only another Write lock can be mirrored + int lockTypeSuffixIndex = lockId.lastIndexOf(LOCK_PREFIX_SUFFIX) - 1; + if (lockTypeSuffixIndex < 0) { + return false; + } else { + return lockId.charAt(lockTypeSuffixIndex) == WRITE_LOCK_PREFIX_CHAR; + } + } } private final SolrZkClient zkClient; @@ -80,22 +101,43 @@ boolean isBlockedByNodeType(String otherLockName) { private final String lockNode; protected final long sequence; protected volatile boolean released = false; + protected final boolean mirrored; - protected ZkDistributedLock(SolrZkClient zkClient, String lockDir, String lockNodePrefix) + protected ZkDistributedLock( + SolrZkClient zkClient, String lockDir, String lockNodePrefix, String lockIdToMirror) throws KeeperException, InterruptedException { this.zkClient = zkClient; this.lockDir = lockDir; // Create the SEQUENTIAL EPHEMERAL node. We enter the locking rat race here. We MUST eventually // call release() or we block others. - lockNode = - zkClient.create( - lockDir - + DistributedCollectionConfigSetCommandRunner.ZK_PATH_SEPARATOR - + lockNodePrefix, - null, - CreateMode.EPHEMERAL_SEQUENTIAL); + if (StrUtils.isBlank(lockIdToMirror)) { + lockNode = + zkClient.create( + lockDir + + DistributedCollectionConfigSetCommandRunner.ZK_PATH_SEPARATOR + + lockNodePrefix, + null, + CreateMode.EPHEMERAL_SEQUENTIAL); + mirrored = false; + } else { + if (!lockIdToMirror.startsWith(lockDir)) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Requested lock with path: " + + lockDir + + " cannot mirror the callingLock with id: " + + lockIdToMirror); + } + if (!canMirrorLock(lockIdToMirror)) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Cannot mirror lock " + lockIdToMirror + " with given lockPrefix: " + lockNodePrefix); + } + lockNode = lockIdToMirror; + mirrored = true; + } sequence = getSequenceFromNodename(lockNode); } @@ -158,8 +200,10 @@ public void waitUntilAcquired() { @Override public void release() { try { - zkClient.delete(lockNode, -1); - released = true; + if (!mirrored) { + zkClient.delete(lockNode, -1); + released = true; + } } catch (KeeperException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); } catch (InterruptedException e) { @@ -212,7 +256,11 @@ String nodeToWatch() throws KeeperException, InterruptedException { if (!foundSelf) { // If this basic assumption doesn't hold with Zookeeper, we're in deep trouble. And not only // here. - throw new SolrException(SERVER_ERROR, "Missing lock node " + lockNode); + if (mirrored) { + throw new SolrException(SERVER_ERROR, "Missing mirrored lock node " + lockNode); + } else { + throw new SolrException(SERVER_ERROR, "Missing lock node " + lockNode); + } } // Didn't return early on any other blocking lock, means we own it @@ -240,6 +288,18 @@ static long getSequenceFromNodename(String lockNode) { return Long.parseLong(lockNode.substring(lockNode.length() - SEQUENCE_LENGTH)); } + @Override + public String getLockId() { + return lockNode; + } + + @Override + public boolean isMirroringLock() { + return mirrored; + } + + abstract boolean canMirrorLock(String lockId); + @Override public String toString() { return lockNode; diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLockFactory.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLockFactory.java index 76696dc3942e..969e65a6aaa3 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLockFactory.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLockFactory.java @@ -20,6 +20,7 @@ import org.apache.solr.cloud.api.collections.DistributedCollectionConfigSetCommandRunner; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.util.StrUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -33,16 +34,19 @@ abstract class ZkDistributedLockFactory { this.rootPath = rootPath; } - protected DistributedLock doCreateLock(boolean isWriteLock, String lockPath) { + protected DistributedLock doCreateLock( + boolean isWriteLock, String lockPath, String lockIdToMirror) { try { - // TODO optimize by first attempting to create the ZkDistributedLock without calling - // makeLockPath() and only call it if the lock creation fails. This will be less costly on - // high contention (and slightly more on low contention) - makeLockPath(lockPath); + if (StrUtils.isBlank(lockIdToMirror)) { + // TODO optimize by first attempting to create the ZkDistributedLock without calling + // makeLockPath() and only call it if the lock creation fails. This will be less costly on + // high contention (and slightly more on low contention) + makeLockPath(lockPath); + } return isWriteLock - ? new ZkDistributedLock.Write(zkClient, lockPath) - : new ZkDistributedLock.Read(zkClient, lockPath); + ? new ZkDistributedLock.Write(zkClient, lockPath, lockIdToMirror) + : new ZkDistributedLock.Read(zkClient, lockPath, lockIdToMirror); } catch (KeeperException e) { throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); } catch (InterruptedException e) { diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AdminCmdContext.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AdminCmdContext.java index 48b7c475c74b..ee5f56deca1f 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AdminCmdContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AdminCmdContext.java @@ -17,12 +17,17 @@ package org.apache.solr.cloud.api.collections; +import static org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER; + import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.request.SolrQueryRequest; public class AdminCmdContext { private final CollectionParams.CollectionAction action; private final String asyncId; + private String lockId; + private String callingLockId; private ClusterState clusterState; public AdminCmdContext(CollectionParams.CollectionAction action) { @@ -34,6 +39,13 @@ public AdminCmdContext(CollectionParams.CollectionAction action, String asyncId) this.asyncId = asyncId; } + public AdminCmdContext( + CollectionParams.CollectionAction action, String asyncId, SolrQueryRequest req) { + this.action = action; + this.asyncId = asyncId; + this.withCallingLockId((String) req.getContext().get(CALLING_LOCK_ID_HEADER)); + } + public CollectionParams.CollectionAction getAction() { return action; } @@ -42,6 +54,24 @@ public String getAsyncId() { return asyncId; } + public AdminCmdContext withLockId(String lockId) { + this.lockId = lockId; + return this; + } + + public String getLockId() { + return lockId; + } + + public AdminCmdContext withCallingLockId(String callingLockId) { + this.callingLockId = callingLockId; + return this; + } + + public String getCallingLockId() { + return callingLockId; + } + public ClusterState getClusterState() { return clusterState; } @@ -57,7 +87,9 @@ public AdminCmdContext subRequestContext(CollectionParams.CollectionAction actio public AdminCmdContext subRequestContext( CollectionParams.CollectionAction action, String asyncId) { - AdminCmdContext nextContext = new AdminCmdContext(action, asyncId); - return nextContext.withClusterState(clusterState); + return new AdminCmdContext(action, asyncId) + .withCallingLockId(callingLockId) + .withLockId(lockId) + .withClusterState(clusterState); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionApiLockFactory.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionApiLockFactory.java index 2c5342c008dd..62cf0dd65af1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionApiLockFactory.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionApiLockFactory.java @@ -107,22 +107,37 @@ DistributedMultiLock createCollectionApiLock( // CollectionParams.LockLevel.COLLECTION; } - // The first requested lock is a write one (on the target object for the action, depending on - // lock level), then requesting read locks on "higher" levels (collection > shard > replica here - // for the level. Note LockLevel "height" is other way around). - boolean requestWriteLock = true; + List callingLockIdList = + DistributedMultiLock.splitLockIds(adminCmdContext.getCallingLockId()); + final CollectionParams.LockLevel[] iterationOrder = { - CollectionParams.LockLevel.REPLICA, + CollectionParams.LockLevel.COLLECTION, CollectionParams.LockLevel.SHARD, - CollectionParams.LockLevel.COLLECTION + CollectionParams.LockLevel.REPLICA }; List locks = new ArrayList<>(iterationOrder.length); + int lockLevelCount = 0; + for (CollectionParams.LockLevel level : iterationOrder) { // This comparison is based on the LockLevel height value that classifies replica > shard > // collection. if (lockLevel.isHigherOrEqual(level)) { - locks.add(lockFactory.createLock(requestWriteLock, level, collName, shardId, replicaName)); - requestWriteLock = false; + // The last requested lock is either a write or read one (on the target object for the + // action, depending on lock level) depending on what the action is. All "higher" levels of + // locks are reads (collection > shard > replica here for the level. Note LockLevel "height" + // is other way around). + boolean requestWriteLock = lockLevel.isEqual(level) && adminCmdContext.getAction().isWrite; + // Find the matching callingLockId for this level, if it was provided. All levels must be + // provided in order by the caller, so when we run out of callingLockIds, we are done + // mirroring and should start getting new locks. + String callingLockIdMatch = null; + if (lockLevelCount < callingLockIdList.size()) { + callingLockIdMatch = callingLockIdList.get(lockLevelCount++); + } + DistributedLock lock = + lockFactory.createLock( + requestWriteLock, level, collName, shardId, replicaName, callingLockIdMatch); + locks.add(lock); } } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java index 20c3e935e29d..94dae81bc727 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java @@ -58,6 +58,7 @@ public interface CollectionCommandContext { default ShardRequestTracker asyncRequestTracker(AdminCmdContext adminCmdContext) { return new ShardRequestTracker( adminCmdContext.getAsyncId(), + adminCmdContext.getLockId(), getAdminPath(), getZkStateReader(), newShardHandler().getShardHandlerFactory()); diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionHandlingUtils.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionHandlingUtils.java index f5e030d81e54..6ba0bc39a6fe 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionHandlingUtils.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionHandlingUtils.java @@ -17,6 +17,7 @@ package org.apache.solr.cloud.api.collections; +import static org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER; import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; import static org.apache.solr.common.params.CommonParams.NAME; @@ -629,7 +630,7 @@ public static ShardRequestTracker syncRequestTracker( public static ShardRequestTracker syncRequestTracker( AdminCmdContext adminCmdContext, String adminPath, CollectionCommandContext ccc) { - return requestTracker(null, adminPath, ccc); + return requestTracker(null, adminCmdContext.getLockId(), adminPath, ccc); } public static ShardRequestTracker asyncRequestTracker( @@ -639,17 +640,23 @@ public static ShardRequestTracker asyncRequestTracker( public static ShardRequestTracker asyncRequestTracker( AdminCmdContext adminCmdContext, String adminPath, CollectionCommandContext ccc) { - return requestTracker(adminCmdContext.getAsyncId(), adminPath, ccc); + return requestTracker( + adminCmdContext.getAsyncId(), adminCmdContext.getLockId(), adminPath, ccc); } protected static ShardRequestTracker requestTracker( - String asyncId, String adminPath, CollectionCommandContext ccc) { + String asyncId, String lockId, String adminPath, CollectionCommandContext ccc) { return new ShardRequestTracker( - asyncId, adminPath, ccc.getZkStateReader(), ccc.newShardHandler().getShardHandlerFactory()); + asyncId, + lockId, + adminPath, + ccc.getZkStateReader(), + ccc.newShardHandler().getShardHandlerFactory()); } public static class ShardRequestTracker { private final String asyncId; + private final String lockId; private final String adminPath; private final ZkStateReader zkStateReader; private final ShardHandlerFactory shardHandlerFactory; @@ -657,10 +664,12 @@ public static class ShardRequestTracker { public ShardRequestTracker( String asyncId, + String lockId, String adminPath, ZkStateReader zkStateReader, ShardHandlerFactory shardHandlerFactory) { this.asyncId = asyncId; + this.lockId = lockId; this.adminPath = adminPath; this.zkStateReader = zkStateReader; this.shardHandlerFactory = shardHandlerFactory; @@ -733,6 +742,9 @@ public void sendShardRequest( sreq.nodeName = nodeName; sreq.coreNodeName = coreNodeName; sreq.params = params; + if (StrUtils.isNotBlank(lockId)) { + sreq.headers = Map.of(CALLING_LOCK_ID_HEADER, lockId); + } shardHandler.submit(sreq, replica, sreq.params); } diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java index 4cc2c2ca1bf8..a626af3c45c6 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java @@ -411,6 +411,7 @@ public OverseerSolrResponse call() { // Block this thread until all required locks are acquired. lock.waitUntilAcquired(); + adminCmdContext.withLockId(lock.getLockId()); // Got the lock so moving from submitted to running if we run for an async task (if // asyncId is null the asyncTaskTracker calls do nothing). diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java index 793195c40f35..20d06bd4890c 100644 --- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java @@ -21,6 +21,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER; import static org.apache.solr.common.params.CommonAdminParams.ASYNC; import static org.apache.solr.common.params.CommonParams.NAME; @@ -113,7 +114,7 @@ public OverseerCollectionMessageHandler( } @Override - public OverseerSolrResponse processMessage(ZkNodeProps message, String operation) { + public OverseerSolrResponse processMessage(ZkNodeProps message, String operation, Lock lock) { // sometimes overseer messages have the collection name in 'name' field, not 'collection' MDCLoggingContext.setCollection( message.getStr(COLLECTION_PROP) != null @@ -128,8 +129,11 @@ public OverseerSolrResponse processMessage(ZkNodeProps message, String operation CollectionAction action = getCollectionAction(operation); CollApiCmds.CollectionApiCommand command = commandMapper.getActionCommand(action); if (command != null) { - AdminCmdContext adminCmdContext = new AdminCmdContext(action, message.getStr(ASYNC)); - adminCmdContext.withClusterState(cloudManager.getClusterState()); + AdminCmdContext adminCmdContext = + new AdminCmdContext(action, message.getStr(ASYNC)) + .withLockId(lock.id()) + .withCallingLockId(message.getStr(CALLING_LOCK_ID_HEADER)) + .withClusterState(cloudManager.getClusterState()); command.call(adminCmdContext, message, results); } else { throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation); @@ -182,7 +186,7 @@ public String getTaskKey(ZkNodeProps message) { * because it happens that a lock got released). */ @Override - public Lock lockTask(ZkNodeProps message, long batchSessionId) { + public Lock lockTask(ZkNodeProps message, long batchSessionId, String callingLockId) { if (sessionId != batchSessionId) { // this is always called in the same thread. // Each batch is supposed to have a new taskBatch @@ -196,7 +200,8 @@ public Lock lockTask(ZkNodeProps message, long batchSessionId) { Arrays.asList( getTaskKey(message), message.getStr(ZkStateReader.SHARD_ID_PROP), - message.getStr(ZkStateReader.REPLICA_PROP))); + message.getStr(ZkStateReader.REPLICA_PROP)), + callingLockId); } @Override diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index 0fa337f63622..be9f394d26c5 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -34,6 +34,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER; import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION; import static org.apache.solr.common.params.CollectionAdminParams.CREATE_NODE_SET_PARAM; import static org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES; @@ -162,6 +163,7 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.Pair; import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.Utils; import org.apache.solr.core.CloudConfig; import org.apache.solr.core.CoreContainer; @@ -319,7 +321,7 @@ void invokeAction( } AdminCmdContext adminCmdContext = - new AdminCmdContext(operation.action, req.getParams().get(ASYNC)); + new AdminCmdContext(operation.action, req.getParams().get(ASYNC), req); ZkNodeProps zkProps = new ZkNodeProps(props); final SolrResponse overseerResponse; @@ -366,6 +368,9 @@ public static SolrResponse submitCollectionApiCommand( if (adminCmdContext.getAsyncId() != null && !adminCmdContext.getAsyncId().isBlank()) { additionalProps.put(ASYNC, adminCmdContext.getAsyncId()); } + if (StrUtils.isNotBlank(adminCmdContext.getCallingLockId())) { + additionalProps.put(CALLING_LOCK_ID_HEADER, adminCmdContext.getCallingLockId()); + } m = m.plus(additionalProps); if (adminCmdContext.getAsyncId() != null) { String asyncId = adminCmdContext.getAsyncId(); diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java index 26afe771d5fc..bff0be76ad3e 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java @@ -458,9 +458,9 @@ private void rejoinElectionQueue( String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime()); asyncRequests.add(asyncId); + // ignore response; we construct our own collectionsHandler.submitCollectionApiCommand( - new AdminCmdContext(REBALANCELEADERS, asyncId), - new ZkNodeProps(propMap)); // ignore response; we construct our own + new AdminCmdContext(REBALANCELEADERS, asyncId, req), new ZkNodeProps(propMap)); } // maxWaitSecs - How long are we going to wait? Defaults to 30 seconds. diff --git a/solr/core/src/java/org/apache/solr/handler/admin/api/AdminAPIBase.java b/solr/core/src/java/org/apache/solr/handler/admin/api/AdminAPIBase.java index 3d627c4cfed4..faee1e61d72e 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/api/AdminAPIBase.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/api/AdminAPIBase.java @@ -138,7 +138,7 @@ protected SolrResponse submitRemoteMessageAndHandleException( ZkNodeProps remoteMessage) throws Exception { return submitRemoteMessageAndHandleException( - response, new AdminCmdContext(action, null), remoteMessage); + response, new AdminCmdContext(action, null, solrQueryRequest), remoteMessage); } protected SolrResponse submitRemoteMessageAndHandleAsync( @@ -149,7 +149,7 @@ protected SolrResponse submitRemoteMessageAndHandleAsync( throws Exception { var remoteResponse = submitRemoteMessageAndHandleException( - response, new AdminCmdContext(action, asyncId), remoteMessage); + response, new AdminCmdContext(action, asyncId, solrQueryRequest), remoteMessage); if (asyncId != null) { response.requestId = asyncId; diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java index df0f39ccea5c..431652720277 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java @@ -188,6 +188,9 @@ private LBSolrClient.Req prepareLBRequest( params.remove(CommonParams.WT); // use default (currently javabin) QueryRequest req = createQueryRequest(sreq, params, shard); req.setMethod(SolrRequest.METHOD.POST); + if (sreq.headers != null) { + req.addHeaders(sreq.headers); + } SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo(); if (requestInfo != null) { req.setUserPrincipal(requestInfo.getUserPrincipal()); diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java index cfcc58907e82..3e638f56de90 100644 --- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java +++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java @@ -19,6 +19,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP; +import static org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER; import static org.apache.solr.security.AuditEvent.EventType.COMPLETED; import static org.apache.solr.security.AuditEvent.EventType.ERROR; import static org.apache.solr.servlet.HttpSolrCall.Action.ADMIN; @@ -738,6 +739,10 @@ protected QueryResponseWriter getResponseWriter() { protected void handleAdmin(SolrQueryResponse solrResp) { SolrCore.preDecorateResponse(solrReq, solrResp); + String callingLockId = req.getHeader(CALLING_LOCK_ID_HEADER); + if (callingLockId != null && !callingLockId.isBlank()) { + solrReq.getContext().put(CALLING_LOCK_ID_HEADER, callingLockId); + } handler.handleRequest(solrReq, solrResp); SolrCore.postDecorateResponse(handler, solrReq, solrResp); } diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java index 371568db8697..2608a4e86350 100644 --- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; @@ -57,6 +58,7 @@ import org.apache.solr.cloud.Overseer.LeaderStatus; import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent; import org.apache.solr.cloud.api.collections.CollectionHandlingUtils; +import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler; import org.apache.solr.cluster.placement.PlacementPluginFactory; import org.apache.solr.cluster.placement.plugins.SimplePlacementFactory; import org.apache.solr.common.MapWriter; @@ -74,6 +76,7 @@ import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.ObjectCache; import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.TimeSource; @@ -1431,4 +1434,109 @@ public void testFloodQueue() throws Exception { waitForEmptyQueue(); stopProcessor(); } + + /** + * Verify that when lockTask throws a SolrException due to a callingLockId from an unrelated + * collection, the async task is properly marked as failed rather than silently swallowed or + * retried forever. This test exercises the real LockTree validation code path. + */ + @Test + public void testLockTaskExceptionFailsAsyncTask() throws Exception { + commonMocks(2, false); + + String asyncId = "lock-fail-test-async-123"; + + // Create a real OverseerCollectionMessageHandler so we exercise the real LockTree locking + OverseerCollectionMessageHandler collHandler = + new OverseerCollectionMessageHandler( + zkStateReaderMock, + "1234", + shardHandlerFactoryMock, + ADMIN_PATH, + new Stats(), + overseerMock, + new OverseerNodePrioritizer( + zkStateReaderMock, overseerMock, ADMIN_PATH, shardHandlerFactoryMock)); + + // Acquire a lock on collA by calling lockTask directly. + // This puts a real lock into the LockTree's allLocks map. + ZkNodeProps collAMessage = + new ZkNodeProps( + Map.of( + Overseer.QUEUE_OPERATION, + CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(), + "name", + "collA")); + OverseerMessageHandler.Lock collALock = collHandler.lockTask(collAMessage, 1, null); + assertNotNull("Should have acquired lock on collA", collALock); + String collALockId = collALock.id(); + + // Build a selector that always returns our real handler + OverseerTaskProcessor.OverseerMessageHandlerSelector selector = + new OverseerTaskProcessor.OverseerMessageHandlerSelector() { + @Override + public void close() { + IOUtils.closeQuietly(collHandler); + } + + @Override + public OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message) { + return collHandler; + } + }; + + // Create a processor using the real handler + OverseerTaskProcessor processor = + new OverseerTaskProcessor( + zkStateReaderMock, + "1234", + new Stats(), + selector, + mock(OverseerNodePrioritizer.class), + workQueueMock, + runningMapMock, + completedMapMock, + failureMapMock, + solrMetricsContextMock) { + @Override + protected LeaderStatus amILeader() { + return LeaderStatus.YES; + } + }; + + Thread processorThread = new Thread(processor); + processorThread.start(); + + try { + // Submit an async task for collB, but with collA's lock ID as the callingLockId. + // The real LockTree will find collA's lock, call validateSubpath, and throw SolrException + // because collB != collA. + Map propMap = + Map.of( + Overseer.QUEUE_OPERATION, + CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(), + "name", + "collB", + CollectionAdminParams.CALLING_LOCK_ID_HEADER, + collALockId, + "async", + asyncId); + ZkNodeProps props = new ZkNodeProps(propMap); + QueueEvent qe = new QueueEvent("lockFailTask", Utils.toJSON(props), null); + queue.add(qe); + + waitForEmptyQueue(); + + // Verify the task was put in the failure map + verify(failureMapMock, times(1)).put(eq(asyncId), any(byte[].class)); + + // Verify the task was NOT put in the running map (it should fail before reaching that point) + verify(runningMapMock, times(0)).put(eq(asyncId), any()); + } finally { + collALock.unlock(); + processor.close(); + processorThread.interrupt(); + processorThread.join(MAX_WAIT_MS); + } + } } diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java b/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java index 2604feadd65f..f56da34f780f 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java @@ -31,6 +31,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.cloud.OverseerMessageHandler.Lock; +import org.apache.solr.common.SolrException; import org.apache.solr.common.params.CollectionParams.CollectionAction; import org.apache.solr.common.util.Pair; import org.slf4j.Logger; @@ -41,23 +42,26 @@ public class TestLockTree extends SolrTestCaseJ4 { public void testLocks() throws Exception { LockTree lockTree = new LockTree(); - Lock coll1Lock = lockTree.getSession().lock(CollectionAction.CREATE, Arrays.asList("coll1")); + Lock coll1Lock = + lockTree.getSession().lock(CollectionAction.CREATE, Arrays.asList("coll1"), null); assertNotNull(coll1Lock); assertNull( "Should not be able to lock coll1/shard1", lockTree .getSession() - .lock(CollectionAction.BALANCESHARDUNIQUE, Arrays.asList("coll1", "shard1"))); + .lock(CollectionAction.BALANCESHARDUNIQUE, Arrays.asList("coll1", "shard1"), null)); coll1Lock.unlock(); Lock shard1Lock = lockTree .getSession() - .lock(CollectionAction.BALANCESHARDUNIQUE, Arrays.asList("coll1", "shard1")); + .lock(CollectionAction.BALANCESHARDUNIQUE, Arrays.asList("coll1", "shard1"), null); assertNotNull(shard1Lock); shard1Lock.unlock(); Lock replica1Lock = - lockTree.getSession().lock(ADDREPLICAPROP, Arrays.asList("coll1", "shard1", "core_node2")); + lockTree + .getSession() + .lock(ADDREPLICAPROP, Arrays.asList("coll1", "shard1", "core_node2"), null); assertNotNull(replica1Lock); List>> operations = new ArrayList<>(); @@ -80,7 +84,7 @@ public void testLocks() throws Exception { List locks = new CopyOnWriteArrayList<>(); List threads = new ArrayList<>(); for (Pair> operation : operations) { - final Lock lock = session.lock(operation.first(), operation.second()); + final Lock lock = session.lock(operation.first(), operation.second(), null); if (lock != null) { Thread thread = new Thread(getRunnable(completedOps, operation, locks, lock)); threads.add(thread); @@ -106,6 +110,123 @@ public void testLocks() throws Exception { } } + public void testCallingLockIdSubLocks() throws Exception { + LockTree lockTree = new LockTree(); + Lock coll1Lock = lockTree.getSession().lock(CollectionAction.CREATE, List.of("coll1"), null); + assertNotNull(coll1Lock); + + // Test sub-locks at the same level + assertNull( + "Should not be able to lock coll1 without using a callingLockId", + lockTree.getSession().lock(CollectionAction.RELOAD, List.of("coll1"), null)); + Lock coll1Lock2 = + lockTree.getSession().lock(CollectionAction.RELOAD, List.of("coll1"), coll1Lock.id()); + assertNotNull(coll1Lock2); + coll1Lock2.unlock(); + + // Test locks underneath + Lock shard1Lock = + lockTree + .getSession() + .lock(CollectionAction.ADDREPLICA, List.of("coll1", "shard1"), coll1Lock.id()); + assertNotNull(shard1Lock); + assertNull( + "Should not be able to lock coll1/shard1 since our callingLockId is only coll1, not shard1", + lockTree + .getSession() + .lock(CollectionAction.ADDREPLICA, List.of("coll1", "shard1"), coll1Lock.id())); + Lock shard2Lock = + lockTree + .getSession() + .lock(CollectionAction.ADDREPLICA, List.of("coll1", "shard2"), coll1Lock.id()); + assertNotNull(shard2Lock); + shard2Lock.unlock(); + shard1Lock.unlock(); + + // Test locks 2 underneath + Lock replica1Lock = + lockTree + .getSession() + .lock(MOCK_REPLICA_TASK, List.of("coll1", "shard1", "replica1"), coll1Lock.id()); + assertNull( + "Should not be able to lock coll1/shard1/replica1 since our callingLockId is only coll1, not replica1, which is already locked", + lockTree + .getSession() + .lock( + CollectionAction.MOCK_REPLICA_TASK, + List.of("coll1", "shard1", "replica1"), + coll1Lock.id())); + assertNull( + "Should not be able to lock coll1/shard1 since our callingLockId is only coll1, not shard1, which is locked because of a replica task", + lockTree + .getSession() + .lock(CollectionAction.ADDREPLICA, List.of("coll1", "shard1"), coll1Lock.id())); + assertNotNull(replica1Lock); + Lock replica2Lock = + lockTree + .getSession() + .lock(MOCK_REPLICA_TASK, List.of("coll1", "shard1", "replica2"), coll1Lock.id()); + assertNotNull(replica2Lock); + replica2Lock.unlock(); + replica1Lock.unlock(); + coll1Lock.unlock(); + + // Test difference at a higher level + Lock shard1Lock1 = + lockTree + .getSession() + .lock(CollectionAction.INSTALLSHARDDATA, List.of("coll1", "shard1"), null); + assertNotNull(shard1Lock1); + Lock shard1Lock2 = + lockTree + .getSession() + .lock(CollectionAction.INSTALLSHARDDATA, List.of("coll2", "shard1"), null); + assertNotNull(shard1Lock2); + assertThrows( + "Should not be able to lock coll1/shard1 since our callingLockId is coll2", + SolrException.class, + () -> + lockTree + .getSession() + .lock(CollectionAction.SYNCSHARD, List.of("coll1", "shard1"), shard1Lock2.id())); + Lock shard1Lock3 = + lockTree + .getSession() + .lock(CollectionAction.SYNCSHARD, List.of("coll1", "shard1"), shard1Lock1.id()); + assertNotNull(shard1Lock3); + shard1Lock2.unlock(); + shard1Lock3.unlock(); + + // Test difference at a higher level + assertNull( + "Should not be able to lock coll1 since we have no callingLockId and shard1 is already locked. Cannot move up", + lockTree.getSession().lock(CollectionAction.RELOAD, List.of("coll1"), null)); + assertThrows( + "Should not be able to lock coll1 since our callingLockId is coll1/shard1. Cannot move up", + SolrException.class, + () -> + lockTree + .getSession() + .lock(CollectionAction.RELOAD, List.of("coll1"), shard1Lock1.id())); + + // Test an unrelated lock + assertThrows( + "Should not be able to lock coll2 since our callingLockId is coll1/shard1. Cannot lock an unrelated resource", + SolrException.class, + () -> + lockTree + .getSession() + .lock(CollectionAction.CREATE, List.of("coll2"), shard1Lock1.id())); + assertThrows( + "Should not be able to lock coll1/shard2 since our callingLockId is coll1/shard1. Cannot lock an unrelated resource", + SolrException.class, + () -> + lockTree + .getSession() + .lock(CollectionAction.SYNCSHARD, List.of("coll1", "shard2"), shard1Lock1.id())); + shard1Lock1.unlock(); + } + private Runnable getRunnable( List>> completedOps, Pair> operation, diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkDistributedLockTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkDistributedLockTest.java index 2c767b91fef3..e31f042bd090 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkDistributedLockTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkDistributedLockTest.java @@ -75,17 +75,17 @@ private void monothreadedCollectionTests(DistributedCollectionLockFactory factor // Collection level locks DistributedLock collRL1 = factory.createLock( - false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME, null, null); + false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME, null, null, null); assertTrue("collRL1 should have been acquired", collRL1.isAcquired()); DistributedLock collRL2 = factory.createLock( - false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME, null, null); + false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME, null, null, null); assertTrue("collRL1 should have been acquired", collRL2.isAcquired()); DistributedLock collWL3 = factory.createLock( - true, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME, null, null); + true, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME, null, null, null); assertFalse( "collWL3 should not have been acquired, due to collRL1 and collRL2", collWL3.isAcquired()); @@ -100,7 +100,7 @@ private void monothreadedCollectionTests(DistributedCollectionLockFactory factor DistributedLock collRL4 = factory.createLock( - false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME, null, null); + false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME, null, null, null); assertFalse( "collRL4 should not have been acquired, due to collWL3 locking the collection", collRL4.isAcquired()); @@ -110,14 +110,14 @@ private void monothreadedCollectionTests(DistributedCollectionLockFactory factor // should see no impact. DistributedLock shardWL5 = factory.createLock( - true, CollectionParams.LockLevel.SHARD, COLLECTION_NAME, SHARD_NAME, null); + true, CollectionParams.LockLevel.SHARD, COLLECTION_NAME, SHARD_NAME, null, null); assertTrue( "shardWL5 should have been acquired, there is no lock on that shard", shardWL5.isAcquired()); DistributedLock shardWL6 = factory.createLock( - true, CollectionParams.LockLevel.SHARD, COLLECTION_NAME, SHARD_NAME, null); + true, CollectionParams.LockLevel.SHARD, COLLECTION_NAME, SHARD_NAME, null, null); assertFalse( "shardWL6 should not have been acquired, shardWL5 is locking that shard", shardWL6.isAcquired()); @@ -125,12 +125,22 @@ private void monothreadedCollectionTests(DistributedCollectionLockFactory factor // Get a lock on a Replica. Again this is independent of collection or shard level DistributedLock replicaRL7 = factory.createLock( - false, CollectionParams.LockLevel.REPLICA, COLLECTION_NAME, SHARD_NAME, REPLICA_NAME); + false, + CollectionParams.LockLevel.REPLICA, + COLLECTION_NAME, + SHARD_NAME, + REPLICA_NAME, + null); assertTrue("replicaRL7 should have been acquired", replicaRL7.isAcquired()); DistributedLock replicaWL8 = factory.createLock( - true, CollectionParams.LockLevel.REPLICA, COLLECTION_NAME, SHARD_NAME, REPLICA_NAME); + true, + CollectionParams.LockLevel.REPLICA, + COLLECTION_NAME, + SHARD_NAME, + REPLICA_NAME, + null); assertFalse( "replicaWL8 should not have been acquired, replicaRL7 is read locking that replica", replicaWL8.isAcquired()); @@ -164,13 +174,13 @@ private void multithreadedCollectionTests(DistributedCollectionLockFactory facto // Acquiring right away a read lock DistributedLock readLock = factory.createLock( - false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME, null, null); + false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME, null, null, null); assertTrue("readLock should have been acquired", readLock.isAcquired()); // And now creating a write lock, that can't be acquired just yet, because of the read lock DistributedLock writeLock = factory.createLock( - true, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME, null, null); + true, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME, null, null, null); assertFalse("writeLock should not have been acquired", writeLock.isAcquired()); // Wait for acquisition of the write lock on another thread (and be notified via a latch) diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionApiLockingTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionApiLockingTest.java index 7f6f7e7daf42..ee0da8853b82 100644 --- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionApiLockingTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionApiLockingTest.java @@ -17,12 +17,14 @@ package org.apache.solr.cloud.api.collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.cloud.DistributedMultiLock; import org.apache.solr.cloud.ZkDistributedCollectionLockFactory; import org.apache.solr.cloud.ZkTestServer; +import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.params.CollectionParams; import org.junit.Test; @@ -54,6 +56,7 @@ public void monothreadedApiLockTests() throws Exception { monothreadedTests(apiLockFactory); multithreadedTests(apiLockFactory); + testCallingLockIdSubLocks(apiLockFactory); } } finally { server.shutdown(); @@ -229,4 +232,201 @@ private void multithreadedTests(CollectionApiLockFactory apiLockingHelper) throw assertEquals( "we should have been notified that replica lock was acquired", 0, latch.getCount()); } + + public void testCallingLockIdSubLocks(CollectionApiLockFactory apiLockingHelper) + throws Exception { + DistributedMultiLock coll1Lock = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.CREATE), "coll1", null, null); + assertEquals("Wrong number of internalLocks", 1, coll1Lock.getCountInternalLocks()); + assertTrue("Lock should be acquired", coll1Lock.isAcquired()); + + // Test sub-locks at the same level + DistributedMultiLock coll1Lock2 = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.RELOAD), "coll1", null, null); + assertEquals("Wrong number of internalLocks", 1, coll1Lock2.getCountInternalLocks()); + assertFalse( + "Should not be able to lock coll1 without using a callingLockId", coll1Lock2.isAcquired()); + coll1Lock2.release(); + coll1Lock2 = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.RELOAD) + .withCallingLockId(coll1Lock.getLockId()), + "coll1", + null, + null); + assertEquals("Wrong number of internalLocks", 1, coll1Lock2.getCountInternalLocks()); + assertTrue("Should be able to lock coll1 when using a callingLockId", coll1Lock2.isAcquired()); + coll1Lock2.release(); + + // Test locks underneath + DistributedMultiLock shard1Lock = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.ADDREPLICA) + .withCallingLockId(coll1Lock.getLockId()), + "coll1", + "shard1", + null); + assertEquals("Wrong number of internalLocks", 2, shard1Lock.getCountInternalLocks()); + assertTrue( + "Should be able to lock coll1/shard1 when using a callingLockId on coll1", + shard1Lock.isAcquired()); + DistributedMultiLock shard1Lock2 = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.ADDREPLICA) + .withCallingLockId(coll1Lock.getLockId()), + "coll1", + "shard1", + null); + assertEquals("Wrong number of internalLocks", 2, shard1Lock2.getCountInternalLocks()); + assertFalse( + "Should not be able to lock coll1/shard1 since our callingLockId is only coll1, not shard1, since shard1 is already locked", + shard1Lock2.isAcquired()); + shard1Lock2.release(); + DistributedMultiLock shard2Lock = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.ADDREPLICA) + .withCallingLockId(coll1Lock.getLockId()), + "coll1", + "shard2", + null); + assertEquals("Wrong number of internalLocks", 2, shard2Lock.getCountInternalLocks()); + assertTrue( + "Should be able to lock coll1/shard2 when using a callingLockId on coll1, since shard2 has not been locked yet", + shard2Lock.isAcquired()); + shard2Lock.release(); + shard1Lock.release(); + + // Test locks 2 underneath + DistributedMultiLock replica1Lock = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.MOCK_REPLICA_TASK) + .withCallingLockId(coll1Lock.getLockId()), + "coll1", + "shard1", + "replica1"); + assertEquals("Wrong number of internalLocks", 3, replica1Lock.getCountInternalLocks()); + assertTrue( + "Should be able to lock shard1/replica1 when using a callingLockId on coll1", + replica1Lock.isAcquired()); + DistributedMultiLock replica2Lock = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.MOCK_REPLICA_TASK) + .withCallingLockId(coll1Lock.getLockId()), + "coll1", + "shard1", + "replica1"); + assertEquals("Wrong number of internalLocks", 3, replica2Lock.getCountInternalLocks()); + assertFalse( + "Should not be able to lock coll1/shard1/replica1 since our callingLockId is only coll1, not replica1, which is already locked", + replica2Lock.isAcquired()); + replica2Lock.release(); + shard1Lock = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.ADDREPLICA) + .withCallingLockId(coll1Lock.getLockId()), + "coll1", + "shard1", + null); + assertEquals("Wrong number of internalLocks", 2, shard1Lock.getCountInternalLocks()); + assertFalse( + "Should not be able to lock coll1/shard1 since our callingLockId is only coll1, not shard1, which is locked because of a replica task", + shard1Lock.isAcquired()); + shard1Lock.release(); + replica2Lock = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.MOCK_REPLICA_TASK) + .withCallingLockId(coll1Lock.getLockId()), + "coll1", + "shard1", + "replica2"); + assertEquals("Wrong number of internalLocks", 3, replica2Lock.getCountInternalLocks()); + assertTrue( + "Should be able to lock shard2/replica2 when using a callingLockId on coll1, since shard2 has not been locked yet", + replica2Lock.isAcquired()); + replica2Lock.release(); + replica1Lock.release(); + coll1Lock.release(); + + // Test difference at a higher level + DistributedMultiLock shard1Lock1 = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.INSTALLSHARDDATA), + "coll1", + "shard1", + null); + assertEquals("Wrong number of internalLocks", 2, shard1Lock1.getCountInternalLocks()); + assertTrue( + "Should be able to lock coll1/shard1 when not using a callingLockId since shard1 has not been locked yet", + shard1Lock1.isAcquired()); + shard1Lock2 = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.INSTALLSHARDDATA), + "coll2", + "shard1", + null); + assertEquals("Wrong number of internalLocks", 2, shard1Lock2.getCountInternalLocks()); + assertTrue( + "Should be able to lock coll2/shard1 when not using a callingLockId since shard1 has not been locked yet for coll2", + shard1Lock2.isAcquired()); + String badLockId = shard1Lock2.getLockId(); + assertThrows( + "Should not be able to lock coll1/shard1 since our callingLockId is coll2", + SolrException.class, + () -> + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.INSTALLSHARDDATA) + .withCallingLockId(badLockId), + "coll1", + "shard1", + null)); + DistributedMultiLock shard1Lock3 = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.INSTALLSHARDDATA) + .withCallingLockId(shard1Lock1.getLockId()), + "coll1", + "shard1", + null); + assertEquals("Wrong number of internalLocks", 2, shard1Lock3.getCountInternalLocks()); + assertTrue( + "Should be able to lock coll1/shard1 since our callingLockId is coll1/shard1", + shard1Lock3.isAcquired()); + shard1Lock2.release(); + shard1Lock3.release(); + + // Test difference at a higher level + coll1Lock2 = + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.RELOAD), "coll1", null, null); + assertEquals("Wrong number of internalLocks", 1, coll1Lock2.getCountInternalLocks()); + assertFalse( + "Should not be able to lock coll1 since we have no callingLockId and shard1 is already locked. Cannot move up", + coll1Lock2.isAcquired()); + coll1Lock2.release(); + assertExceptionThrownWithMessageContaining( + SolrException.class, + List.of("Cannot mirror lock"), + () -> + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.RELOAD) + .withCallingLockId(shard1Lock1.getLockId()), + "coll1", + null, + null)); + shard1Lock3.release(); + + // Test an unrelated lock + assertThrows( + "Should not be able to lock coll2even since callingLockId is coll1 and unrelated", + SolrException.class, + () -> + apiLockingHelper.createCollectionApiLock( + new AdminCmdContext(CollectionParams.CollectionAction.CREATE) + .withCallingLockId(shard1Lock1.getLockId()), + "coll2", + null, + null)); + shard1Lock1.release(); + } } diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java index 74a8e5dd869c..050e32bb977a 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java @@ -124,4 +124,6 @@ public interface CollectionAdminParams { String PROPERTY_PREFIX = "property."; String PER_REPLICA_STATE = CollectionStateProps.PER_REPLICA_STATE; + + String CALLING_LOCK_ID_HEADER = "callingLockId"; } diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java index 6ae82508df4b..a066ff1e89b3 100644 --- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java +++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java @@ -69,6 +69,10 @@ public int getHeight() { public boolean isHigherOrEqual(LockLevel that) { return height >= that.height; } + + public boolean isEqual(LockLevel that) { + return height == that.height; + } } /** @@ -136,7 +140,7 @@ enum CollectionAction { // TODO when we have a node level lock use it here BALANCE_REPLICAS(true, LockLevel.NONE), DELETENODE(true, LockLevel.NONE), - MOCK_REPLICA_TASK(false, LockLevel.REPLICA), + MOCK_REPLICA_TASK(true, LockLevel.REPLICA), NONE(false, LockLevel.NONE), // TODO: not implemented yet MERGESHARDS(true, LockLevel.SHARD),