Skip to content

Commit

Permalink
Merge branch 'dev' into chicago-rta-fares
Browse files Browse the repository at this point in the history
  • Loading branch information
ansoncfit committed Apr 29, 2024
2 parents ff2d39d + a96f49a commit 8fcb8ea
Show file tree
Hide file tree
Showing 32 changed files with 653 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public List<HttpController> standardHttpControllers () {
new GtfsController(gtfsCache),
new BundleController(this),
new OpportunityDatasetController(fileStorage, taskScheduler, censusExtractor, database),
new RegionalAnalysisController(broker, fileStorage),
new RegionalAnalysisController(broker, fileStorage, taskScheduler),
new AggregationAreaController(fileStorage, database, taskScheduler),
// This broker controller registers at least one handler at URL paths beginning with /internal, which
// is exempted from authentication and authorization, but should be hidden from the world
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void respondToException(Exception e, Request request, Response response,
// Include a stack trace except when the error is known to be about unauthenticated or unauthorized access,
// in which case we don't want to leak information about the server to people scanning it for weaknesses.
if (type != UNAUTHORIZED && type != FORBIDDEN) {
body.put("stackTrace", errorEvent.stackTrace);
body.put("stackTrace", errorEvent.filteredStackTrace);
}
response.status(code);
response.type("application/json");
Expand Down
71 changes: 43 additions & 28 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,59 +95,62 @@ public interface Config {
boolean testTaskRedelivery ();
}

private Config config;
private final Config config;

// Component Dependencies
private final FileStorage fileStorage;
private final EventBus eventBus;
private final WorkerLauncher workerLauncher;

private final ListMultimap<WorkerCategory, Job> jobs =
MultimapBuilder.hashKeys().arrayListValues().build();
private final ListMultimap<WorkerCategory, Job> jobs = MultimapBuilder.hashKeys().arrayListValues().build();

/**
* The most tasks to deliver to a worker at a time. Workers may request less tasks than this, and the broker should
* never send more than the minimum of the two values. 50 tasks gives response bodies of about 65kB. If this value
* is too high, all remaining tasks in a job could be distributed to a single worker leaving none for the other
* workers, creating a slow-joiner problem especially if the tasks are complicated and slow to complete.
*
* The value should eventually be tuned. The current value of 16 is just the value used by the previous sporadic
* The value should eventually be tuned. The value of 16 is the value used by the previous sporadic
* polling system (WorkerStatus.LEGACY_WORKER_MAX_TASKS) which may not be ideal but is known to work.
*
* NOTE that as a side effect this limits the total throughput of each worker to:
* MAX_TASKS_PER_WORKER / AnalysisWorker#POLL_INTERVAL_MIN_SECONDS tasks per second.
* It is entirely plausible for half or more of the origins in a job to be unconnected to any roadways (water,
* deserts etc.) In this case the system may need to burn through millions of origins, only checking that they
* aren't attached to anything in the selected scenario. Not doing so could double the run time of an analysis.
* It may be beneficial to assign origins to workers more randomly, or to introduce a mechanism to pre-scan for
* disconnected origins or at least concisely signal large blocks of them in worker responses.
*/
public final int MAX_TASKS_PER_WORKER = 16;
public static final int MAX_TASKS_PER_WORKER = 40;

/**
* Used when auto-starting spot instances. Set to a smaller value to increase the number of
* workers requested automatically
*/
public final int TARGET_TASKS_PER_WORKER_TRANSIT = 800;
public final int TARGET_TASKS_PER_WORKER_NONTRANSIT = 4_000;
public static final int TARGET_TASKS_PER_WORKER_TRANSIT = 800;
public static final int TARGET_TASKS_PER_WORKER_NONTRANSIT = 4_000;

/**
* We want to request spot instances to "boost" regional analyses after a few regional task
* results are received for a given workerCategory. Do so after receiving results for an
* arbitrary task toward the beginning of the job
*/
public final int AUTO_START_SPOT_INSTANCES_AT_TASK = 42;
public static final int AUTO_START_SPOT_INSTANCES_AT_TASK = 42;

/** The maximum number of spot instances allowable in an automatic request */
public final int MAX_WORKERS_PER_CATEGORY = 250;
public static final int MAX_WORKERS_PER_CATEGORY = 250;

/**
* How long to give workers to start up (in ms) before assuming that they have started (and
* starting more on a given graph if they haven't.
*/
public static final long WORKER_STARTUP_TIME = 60 * 60 * 1000;


/** Keeps track of all the workers that have contacted this broker recently asking for work. */
private WorkerCatalog workerCatalog = new WorkerCatalog();

/**
* These objects piece together results received from workers into one regional analysis result
* file per job.
*/
private static Map<String, MultiOriginAssembler> resultAssemblers = new HashMap<>();
/** These objects piece together results received from workers into one regional analysis result file per job. */
private Map<String, MultiOriginAssembler> resultAssemblers = new HashMap<>();

/**
* keep track of which graphs we have launched workers on and how long ago we launched them, so
Expand Down Expand Up @@ -177,19 +180,23 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn
LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId);
throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId);
}
// Create the Job object to share with the MultiOriginAssembler, but defer adding this job to the Multimap of
// active jobs until we're sure the result assembler was constructed without any errors. Always add and remove
// the Job and corresponding MultiOriginAssembler as a unit in the same synchronized block of code (see #887).
WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis);
Job job = new Job(templateTask, workerTags);
jobs.put(job.workerCategory, job);

// Register the regional job so results received from multiple workers can be assembled into one file.
// If any parameters fail checks here, an exception may cause this method to exit early.
// TODO encapsulate MultiOriginAssemblers in a new Component
// Note: if this fails with an exception we'll have a job enqueued, possibly being processed, with no assembler.
// That is not catastrophic, but the user may need to recognize and delete the stalled regional job.
MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage);
resultAssemblers.put(templateTask.jobId, assembler);

// A MultiOriginAssembler was successfully put in place. It's now safe to register and start the Job.
jobs.put(job.workerCategory, job);

// If this is a fake job for testing, don't confuse the worker startup code below with its null graph ID.
if (config.testTaskRedelivery()) {
// This is a fake job for testing, don't confuse the worker startup code below with null graph ID.
return;
}

Expand Down Expand Up @@ -385,14 +392,20 @@ public synchronized void markTaskCompleted (Job job, int taskId) {
}

/**
* When job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
* Record an error that happened while a worker was processing a task on the given job. This method is tolerant
* of job being null, because it's called on a code path where any number of things could be wrong or missing.
* This method also ensures synchronization of writes to Jobs from any non-synchronized sections of an HTTP handler.
* Once job.errors is non-empty, job.isErrored() becomes true and job.isActive() becomes false.
* The Job will stop delivering tasks, allowing workers to shut down, but will continue to exist allowing the user
* to see the error message. User will then need to manually delete it, which will remove the result assembler.
* This method ensures synchronization of writes to Jobs from the unsynchronized worker poll HTTP handler.
*/
private synchronized void recordJobError (Job job, String error) {
if (job != null) {
job.errors.add(error);
// Limit the number of errors recorded to one.
// Still using a Set<String> instead of just String since the set of errors is exposed in a UI-facing API.
if (job.errors.isEmpty()) {
job.errors.add(error);
}
}
}

Expand Down Expand Up @@ -488,21 +501,23 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) {
// Once the job is retrieved, it can be used below to requestExtraWorkersIfAppropriate without synchronization,
// because that method only uses final fields of the job.
Job job = null;
MultiOriginAssembler assembler;
try {
MultiOriginAssembler assembler;
synchronized (this) {
job = findJob(workResult.jobId);
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
// This will mark the job as errored and not-active, stopping distribution of tasks to workers.
// To ensure that happens, record errors before any other conditional that could exit this method.
if (workResult.error != null) {
recordJobError(job, workResult.error);
return;
}
assembler = resultAssemblers.get(workResult.jobId);
if (job == null || assembler == null || !job.isActive()) {
// This will happen naturally for all delivered tasks after a job is deleted or it errors out.
LOG.debug("Ignoring result for unrecognized, deleted, or inactive job ID {}.", workResult.jobId);
return;
}
if (workResult.error != null) {
// Record any error reported by the worker and don't pass bad results on to regional result assembly.
recordJobError(job, workResult.error);
return;
}
// Mark tasks completed first before passing results to the assembler. On the final result received,
// this will minimize the risk of race conditions by quickly making the job invisible to incoming stray
// results from spurious redeliveries, before the assembler is busy finalizing and uploading results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ private RegionalTask makeOneTask (int taskNumber) {
public int deliveryPass = 0;

/**
* If any error compromises the usabilty or quality of results from any origin, it is recorded here.
* If any error compromises the usability or quality of results from any origin, it is recorded here.
* This is a Set because identical errors are likely to be reported from many workers or individual tasks.
* The presence of an error here causes the job to be considered "errored" and "inactive" and stop delivering tasks.
* There is some risk here of accumulating unbounded amounts of large error messages (see #919).
* The field type could be changed to a single String instead of Set, but it's exposed on a UI-facing API as a Set.
*/
public final Set<String> errors = new HashSet();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

import com.conveyal.r5.util.ExceptionUtils;

import static com.conveyal.r5.util.ExceptionUtils.filterStackTrace;

/**
* This Event is fired each time a Throwable (usually an Exception or Error) occurs on the backend. It can then be
* recorded or tracked in various places - the console logs, Slack, etc. This could eventually be used for errors on
* the workers as well, but we'd have to be careful not to generate hundreds of messages at once.
*/
public class ErrorEvent extends Event {

// We may serialize this object, so we convert the Throwable to two strings to control its representation.
// All Events are intended to be eligible for serialization into a log or database, so we convert the Throwable to
// some Strings to determine its representation in a simple way.
// For flexibility in event handlers, it is tempting to hold on to the original Throwable instead of derived
// Strings. Exceptions are famously slow, but it's the initial creation and filling in the stack trace that are
// slow. Once the instace exists, repeatedly examining its stack trace should not be prohibitively costly. Still,
// we do probably gain some efficiency by converting the stack trace to a String once and reusing that.
// slow. Once the instance exists, repeatedly examining its stack trace should not be prohibitively costly.

public final String summary;

Expand All @@ -23,11 +25,16 @@ public class ErrorEvent extends Event {
*/
public final String httpPath;

/** The full stack trace of the exception that occurred. */
public final String stackTrace;

/** A minimal stack trace showing the immediate cause within Conveyal code. */
public final String filteredStackTrace;

public ErrorEvent (Throwable throwable, String httpPath) {
this.summary = ExceptionUtils.shortCauseString(throwable);
this.stackTrace = ExceptionUtils.stackTraceString(throwable);
this.filteredStackTrace = ExceptionUtils.filterStackTrace(throwable);
this.httpPath = httpPath;
}

Expand All @@ -54,25 +61,9 @@ public String traceWithContext (boolean verbose) {
if (verbose) {
builder.append(stackTrace);
} else {
builder.append(filterStackTrace(stackTrace));
builder.append(filteredStackTrace);
}
return builder.toString();
}

private static String filterStackTrace (String stackTrace) {
if (stackTrace == null) return null;
final String unknownFrame = "Unknown stack frame, probably optimized out by JVM.";
String error = stackTrace.lines().findFirst().get();
String frame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at "))
.findFirst().orElse(unknownFrame);
String conveyalFrame = stackTrace.lines()
.map(String::strip)
.filter(s -> s.startsWith("at com.conveyal."))
.filter(s -> !frame.equals(s))
.findFirst().orElse("");
return String.join("\n", error, frame, conveyalFrame);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.conveyal.analysis.persistence.AnalysisDB;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.file.FileStorage;
import com.conveyal.file.UrlWithHumanName;
import com.conveyal.r5.analyst.progress.Task;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.bson.conversions.Bson;
Expand All @@ -27,6 +28,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.mongodb.client.model.Filters.and;
import static com.mongodb.client.model.Filters.eq;
import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON;

/**
* Stores vector aggregationAreas (used to define the region of a weighted average accessibility metric).
Expand Down Expand Up @@ -98,10 +100,10 @@ private Collection<AggregationArea> getAggregationAreas (Request req, Response r
}

/** Returns a JSON-wrapped URL for the mask grid of the aggregation area whose id matches the path parameter. */
private ObjectNode getAggregationAreaGridUrl (Request req, Response res) {
private UrlWithHumanName getAggregationAreaGridUrl (Request req, Response res) {
AggregationArea aggregationArea = aggregationAreaCollection.findPermittedByRequestParamId(req);
String url = fileStorage.getURL(aggregationArea.getStorageKey());
return JsonUtil.objectNode().put("url", url);
res.type(APPLICATION_JSON.asString());
return fileStorage.getJsonUrl(aggregationArea.getStorageKey(), aggregationArea.name, "grid");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ private Object getFile (Request req, Response res) throws Exception {
FileStorageKey key = new FileStorageKey(category, filename);
File file = fileStorage.getFile(key);
FileStorageFormat format = FileStorageFormat.fromFilename(filename);
res.type(format.mimeType);
if (format != null) {
res.type(format.mimeType);
}

// If the content-encoding is set to gzip, Spark automatically gzips the response. This double-gzips anything
// that was already gzipped. Some of our files are already gzipped, and we rely on the the client browser to
Expand Down
Loading

0 comments on commit 8fcb8ea

Please sign in to comment.