Commit c25427ce authored by Yufei Gu's avatar Yufei Gu Committed by Andrew Wang
Browse files

YARN-7290. Method canContainerBePreempted can return true when it shouldn't....

YARN-7290. Method canContainerBePreempted can return true when it shouldn't. (Contributed by Steven Rand)

(cherry picked from commit 2bde3aed)
(cherry picked from commit f335d509)
No related merge requests found
Showing with 93 additions and 35 deletions
+93 -35
......@@ -588,7 +588,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
}
boolean canContainerBePreempted(RMContainer container) {
boolean canContainerBePreempted(RMContainer container,
Resource alreadyConsideringForPreemption) {
if (!isPreemptable()) {
return false;
}
......@@ -610,6 +611,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Check if the app's allocation will be over its fairshare even
// after preempting this container
Resource usageAfterPreemption = getUsageAfterPreemptingContainer(
container.getAllocatedResource(),
alreadyConsideringForPreemption);
return !isUsageBelowShare(usageAfterPreemption, getFairShare());
}
private Resource getUsageAfterPreemptingContainer(Resource containerResources,
Resource alreadyConsideringForPreemption) {
Resource usageAfterPreemption = Resources.clone(getResourceUsage());
// Subtract resources of containers already queued for preemption
......@@ -617,10 +627,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
Resources.subtractFrom(usageAfterPreemption, resourcesToBePreempted);
}
// Subtract this container's allocation to compute usage after preemption
Resources.subtractFrom(
usageAfterPreemption, container.getAllocatedResource());
return !isUsageBelowShare(usageAfterPreemption, getFairShare());
// Subtract resources of this container and other containers of this app
// that the FSPreemptionThread is already considering for preemption.
Resources.subtractFrom(usageAfterPreemption, containerResources);
Resources.subtractFrom(usageAfterPreemption,
alreadyConsideringForPreemption);
return usageAfterPreemption;
}
/**
......
......@@ -19,7 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
......@@ -29,7 +29,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
......@@ -130,10 +132,21 @@ class FSPreemptionThread extends Thread {
}
} // End of iteration through nodes for one RR
if (bestContainers != null && bestContainers.containers.size() > 0) {
containersToPreempt.addAll(bestContainers.containers);
// Reserve the containers for the starved app
trackPreemptionsAgainstNode(bestContainers.containers, starvedApp);
if (bestContainers != null) {
List<RMContainer> containers = bestContainers.getAllContainers();
if (containers.size() > 0) {
containersToPreempt.addAll(containers);
// Reserve the containers for the starved app
trackPreemptionsAgainstNode(containers, starvedApp);
// Warn application about containers to be killed
for (RMContainer container : containers) {
FSAppAttempt app = scheduler.getSchedulerApp(
container.getApplicationAttemptId());
LOG.info("Preempting container " + container +
" from queue " + app.getQueueName());
app.trackContainerForPreemption(container);
}
}
}
}
} // End of iteration over RRs
......@@ -170,10 +183,12 @@ class FSPreemptionThread extends Thread {
for (RMContainer container : containersToCheck) {
FSAppAttempt app =
scheduler.getSchedulerApp(container.getApplicationAttemptId());
ApplicationId appId = app.getApplicationId();
if (app.canContainerBePreempted(container)) {
if (app.canContainerBePreempted(container,
preemptableContainers.getResourcesToPreemptForApp(appId))) {
// Flag container for preemption
if (!preemptableContainers.addContainer(container)) {
if (!preemptableContainers.addContainer(container, appId)) {
return null;
}
......@@ -199,15 +214,6 @@ class FSPreemptionThread extends Thread {
}
private void preemptContainers(List<RMContainer> containers) {
// Warn application about containers to be killed
for (RMContainer container : containers) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
LOG.info("Preempting container " + container +
" from queue " + app.getQueueName());
app.trackContainerForPreemption(container);
}
// Schedule timer task to kill containers
preemptionTimer.schedule(
new PreemptContainersTask(containers), warnTimeBeforeKill);
......@@ -237,14 +243,14 @@ class FSPreemptionThread extends Thread {
* A class to track preemptable containers.
*/
private static class PreemptableContainers {
List<RMContainer> containers;
Map<ApplicationId, List<RMContainer>> containersByApp;
int numAMContainers;
int maxAMContainers;
PreemptableContainers(int maxAMContainers) {
containers = new ArrayList<>();
numAMContainers = 0;
this.maxAMContainers = maxAMContainers;
this.containersByApp = new HashMap<>();
}
/**
......@@ -254,7 +260,7 @@ class FSPreemptionThread extends Thread {
* @param container the container to add
* @return true if success; false otherwise
*/
private boolean addContainer(RMContainer container) {
private boolean addContainer(RMContainer container, ApplicationId appId) {
if (container.isAMContainer()) {
numAMContainers++;
if (numAMContainers >= maxAMContainers) {
......@@ -262,8 +268,30 @@ class FSPreemptionThread extends Thread {
}
}
containers.add(container);
if (!containersByApp.containsKey(appId)) {
containersByApp.put(appId, new ArrayList<>());
}
containersByApp.get(appId).add(container);
return true;
}
private List<RMContainer> getAllContainers() {
List<RMContainer> allContainers = new ArrayList<>();
for (List<RMContainer> containersForApp : containersByApp.values()) {
allContainers.addAll(containersForApp);
}
return allContainers;
}
private Resource getResourcesToPreemptForApp(ApplicationId appId) {
Resource resourcesToPreempt = Resources.createResource(0, 0);
if (containersByApp.containsKey(appId)) {
for (RMContainer container : containersByApp.get(appId)) {
Resources.addTo(resourcesToPreempt, container.getAllocatedResource());
}
}
return resourcesToPreempt;
}
}
}
......@@ -278,11 +278,12 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
preemptHalfResources(queue2);
}
private void verifyPreemption(int numStarvedAppContainers)
private void verifyPreemption(int numStarvedAppContainers,
int numGreedyAppContainers)
throws InterruptedException {
// Sleep long enough for four containers to be preempted.
for (int i = 0; i < 1000; i++) {
if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) {
if (greedyApp.getLiveContainers().size() == numGreedyAppContainers) {
break;
}
Thread.sleep(10);
......@@ -290,12 +291,12 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
// Post preemption, verify the greedyApp has the correct # of containers.
assertEquals("Incorrect # of containers on the greedy app",
2 * numStarvedAppContainers, greedyApp.getLiveContainers().size());
numGreedyAppContainers, greedyApp.getLiveContainers().size());
// Verify the queue metrics are set appropriately. The greedyApp started
// with 8 1GB, 1vcore containers.
assertEquals("Incorrect # of preempted containers in QueueMetrics",
8 - 2 * numStarvedAppContainers,
8 - numGreedyAppContainers,
greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers());
// Verify the node is reserved for the starvingApp
......@@ -340,7 +341,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
String queue = "root.preemptable.child-1";
submitApps(queue, queue);
if (fairsharePreemption) {
verifyPreemption(2);
verifyPreemption(2, 4);
} else {
verifyNoPreemption();
}
......@@ -349,13 +350,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
@Test
public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.preemptable.child-2");
verifyPreemption(2);
verifyPreemption(2, 4);
}
@Test
public void testPreemptionBetweenNonSiblingQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
verifyPreemption(2);
verifyPreemption(2, 4);
}
@Test
......@@ -389,7 +390,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
setNumAMContainersPerNode(2);
preemptHalfResources("root.preemptable.child-2");
verifyPreemption(2);
verifyPreemption(2, 4);
ArrayList<RMContainer> containers =
(ArrayList<RMContainer>) starvingApp.getLiveContainers();
......@@ -401,6 +402,22 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
+ "nodes.", !host0.equals(host1));
}
@Test
public void testAppNotPreemptedBelowFairShare() throws Exception {
takeAllResources("root.preemptable.child-1");
tryPreemptMoreThanFairShare("root.preemptable.child-2");
}
private void tryPreemptMoreThanFairShare(String queueName)
throws InterruptedException {
ApplicationAttemptId appAttemptId
= createSchedulingRequest(3 * GB, 3, queueName, "default",
NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
starvingApp = scheduler.getSchedulerApp(appAttemptId);
verifyPreemption(1, 5);
}
@Test
public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare()
throws InterruptedException {
......@@ -414,10 +431,10 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
// Submit a job so half the resources go to parent's sibling
preemptHalfResources("root.preemptable-sibling");
verifyPreemption(2);
verifyPreemption(2, 4);
// Submit a job to the child's sibling to force preemption from the child
preemptHalfResources("root.preemptable.child-2");
verifyPreemption(1);
verifyPreemption(1, 2);
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment