Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for mesos maintenance primitives #836

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.apache.mesos.chronos.scheduler.mesos

import java.util.logging.Logger

import org.apache.mesos.Protos

/**
* Helper for checking availability using mesos primitives
*/
object AvailabilityChecker {

private[this] val log = Logger.getLogger(getClass.getName)

def checkAvailability(offer: Protos.Offer): Boolean = {
val now = System.nanoTime()
if (offer.hasUnavailability && offer.getUnavailability.hasStart) {
val start = offer.getUnavailability.getStart.getNanoseconds
if (now.>=(start)) {
if (offer.getUnavailability.hasDuration) {
return start.+(offer.getUnavailability.getDuration.getNanoseconds).<(now)
} else {
return false;
}

}
}
return true
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class MesosJobFramework @Inject()(
case None =>
val neededResources = new Resources(job)
offerResources.toIterator.find { ors =>
ors._2.canSatisfy(neededResources) && ConstraintChecker.checkConstraints(ors._1, job.constraints)
ors._2.canSatisfy(neededResources) && ConstraintChecker.checkConstraints(ors._1, job.constraints) && AvailabilityChecker.checkAvailability(ors._1)
} match {
case Some((offer, resources)) =>
// Subtract this job's resource requirements from the remaining available resources in this offer.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.apache.mesos.chronos.scheduler.mesos

import java.util.concurrent.TimeUnit

import mesosphere.mesos.protos._
import mesosphere.mesos.util.FrameworkIdUtil
import org.apache.mesos.Protos.Offer
import org.apache.mesos.Protos.{DurationInfo, Offer, TimeInfo, Unavailability}
import org.apache.mesos.chronos.ChronosTestHelper._
import org.apache.mesos.chronos.scheduler.jobs.{BaseJob, JobScheduler, MockJobUtils, TaskManager}
import org.apache.mesos.{Protos, SchedulerDriver}
Expand Down Expand Up @@ -76,6 +78,33 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito {
there was one(mockSchedulerDriver).declineOffer(OfferID("1"), Protos.Filters.getDefaultInstance)
}

"Reject unavailable offer" in {
import mesosphere.mesos.protos.Implicits._

import scala.collection.JavaConverters._

val mockDriverFactory = MockJobUtils.mockDriverFactory
val mockSchedulerDriver = mockDriverFactory.get

val mesosJobFramework = spy(
new MesosJobFramework(
mockDriverFactory,
mock[JobScheduler],
mock[TaskManager],
makeConfig(),
mock[FrameworkIdUtil],
mock[MesosTaskBuilder],
mock[MesosOfferReviver]))

val tasks = mutable.Buffer[(String, BaseJob, Offer)]()
doReturn(tasks).when(mesosJobFramework).generateLaunchableTasks(any)

val offer: Offer = makeUnavailableOffer
mesosJobFramework.resourceOffers(mockSchedulerDriver, Seq[Protos.Offer](offer).asJava)

there was one(mockSchedulerDriver).declineOffer(OfferID("1"), Protos.Filters.getDefaultInstance)
}

"Reject unused offers with default RefuseSeconds if --decline_offer_duration is not set" in {
import mesosphere.mesos.protos.Implicits._

Expand Down Expand Up @@ -176,6 +205,22 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito {
}

private[this] def makeBasicOffer: Offer = {

makeBasicOfferBuilder
.build()
}

private[this] def makeUnavailableOffer: Offer = {

makeBasicOfferBuilder.setUnavailability(
Unavailability.newBuilder()
.setStart(TimeInfo.newBuilder().setNanoseconds(System.nanoTime()))
.setDuration(DurationInfo.newBuilder().setNanoseconds(TimeUnit.DAYS.toNanos(1)))
.build())
.build()
}

private[this] def makeBasicOfferBuilder: Offer.Builder = {
import mesosphere.mesos.protos.Implicits._

Protos.Offer.newBuilder()
Expand All @@ -186,7 +231,6 @@ class MesosJobFrameworkSpec extends SpecificationWithJUnit with Mockito {
.addResources(ScalarResource(Resource.CPUS, 1, "*"))
.addResources(ScalarResource(Resource.MEM, 100, "*"))
.addResources(ScalarResource(Resource.DISK, 100, "*"))
.build()
}


Expand Down