Skip to content

Commit

Permalink
apply mesos#836
Browse files Browse the repository at this point in the history
  • Loading branch information
vixns committed Aug 31, 2017
1 parent 89d6f42 commit 0d5d96c
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.apache.mesos.chronos.scheduler.mesos

import java.util.logging.Logger

import org.apache.mesos.Protos

/**
* Helper for checking availability using mesos primitives
*/
object AvailabilityChecker {

private[this] val log = Logger.getLogger(getClass.getName)

def checkAvailability(offer: Protos.Offer): Boolean = {
val now = System.nanoTime()
if (offer.hasUnavailability && offer.getUnavailability.hasStart) {
val start = offer.getUnavailability.getStart.getNanoseconds
if (now.>=(start)) {
if (offer.getUnavailability.hasDuration) {
return start.+(offer.getUnavailability.getDuration.getNanoseconds).<(now)
} else {
return false;
}

}
}
return true
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class MesosJobFramework @Inject()(
case None =>
val neededResources = new Resources(job)
offerResources.toIterator.find { ors =>
ors._2.canSatisfy(neededResources) && ConstraintChecker.checkConstraints(ors._1, job.constraints)
ors._2.canSatisfy(neededResources) && ConstraintChecker.checkConstraints(ors._1, job.constraints) && AvailabilityChecker.checkAvailability(ors._1)
} match {
case Some((offer, resources)) =>
// Subtract this job's resource requirements from the remaining available resources in this offer.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.apache.mesos.chronos.scheduler.mesos

import java.util.concurrent.TimeUnit

import mesosphere.mesos.protos._
import mesosphere.mesos.util.FrameworkIdUtil
import org.apache.mesos.Protos.Offer
import org.apache.mesos.Protos.{DurationInfo, Offer, TimeInfo, Unavailability}
import org.apache.mesos.chronos.ChronosTestHelper._
import org.apache.mesos.chronos.scheduler.jobs.{BaseJob, JobScheduler, MockJobUtils, TaskManager}
import org.apache.mesos.{Protos, SchedulerDriver}
Expand Down Expand Up @@ -76,6 +78,33 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito {
there was one(mockSchedulerDriver).declineOffer(OfferID("1"), Protos.Filters.getDefaultInstance)
}

"Reject unavailable offer" in {
import mesosphere.mesos.protos.Implicits._

import scala.collection.JavaConverters._

val mockDriverFactory = MockJobUtils.mockDriverFactory
val mockSchedulerDriver = mockDriverFactory.get

val mesosJobFramework = spy(
new MesosJobFramework(
mockDriverFactory,
mock[JobScheduler],
mock[TaskManager],
makeConfig(),
mock[FrameworkIdUtil],
mock[MesosTaskBuilder],
mock[MesosOfferReviver]))

val tasks = mutable.Buffer[(String, BaseJob, Offer)]()
doReturn(tasks).when(mesosJobFramework).generateLaunchableTasks(any)

val offer: Offer = makeUnavailableOffer
mesosJobFramework.resourceOffers(mockSchedulerDriver, Seq[Protos.Offer](offer).asJava)

there was one(mockSchedulerDriver).declineOffer(OfferID("1"), Protos.Filters.getDefaultInstance)
}

"Reject unused offers with default RefuseSeconds if --decline_offer_duration is not set" in {
import mesosphere.mesos.protos.Implicits._

Expand Down Expand Up @@ -176,6 +205,22 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito {
}

private[this] def makeBasicOffer: Offer = {

makeBasicOfferBuilder
.build()
}

private[this] def makeUnavailableOffer: Offer = {

makeBasicOfferBuilder.setUnavailability(
Unavailability.newBuilder()
.setStart(TimeInfo.newBuilder().setNanoseconds(System.nanoTime()))
.setDuration(DurationInfo.newBuilder().setNanoseconds(TimeUnit.DAYS.toNanos(1)))
.build())
.build()
}

private[this] def makeBasicOfferBuilder: Offer.Builder = {
import mesosphere.mesos.protos.Implicits._

Protos.Offer.newBuilder()
Expand All @@ -186,7 +231,6 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito {
.addResources(ScalarResource(Resource.CPUS, 1, "*"))
.addResources(ScalarResource(Resource.MEM, 100, "*"))
.addResources(ScalarResource(Resource.DISK, 100, "*"))
.build()
}


Expand Down

0 comments on commit 0d5d96c

Please sign in to comment.