Skip to content

Commit

Permalink
Merge pull request #937 from conveyal/patches-pre-7.2
Browse files Browse the repository at this point in the history
Pre-release patches
  • Loading branch information
abyrd authored Apr 18, 2024
2 parents 953757c + b016f1e commit a96f49a
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public List<HttpController> standardHttpControllers () {
new GtfsController(gtfsCache),
new BundleController(this),
new OpportunityDatasetController(fileStorage, taskScheduler, censusExtractor, database),
new RegionalAnalysisController(broker, fileStorage),
new RegionalAnalysisController(broker, fileStorage, taskScheduler),
new AggregationAreaController(fileStorage, database, taskScheduler),
// This broker controller registers at least one handler at URL paths beginning with /internal, which
// is exempted from authentication and authorization, but should be hidden from the world
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,18 @@ public interface Config {
* is too high, all remaining tasks in a job could be distributed to a single worker leaving none for the other
* workers, creating a slow-joiner problem especially if the tasks are complicated and slow to complete.
*
* The value should eventually be tuned. The current value of 16 is just the value used by the previous sporadic
* The value should eventually be tuned. The value of 16 is the value used by the previous sporadic
* polling system (WorkerStatus.LEGACY_WORKER_MAX_TASKS) which may not be ideal but is known to work.
*
* NOTE that as a side effect this limits the total throughput of each worker to:
* MAX_TASKS_PER_WORKER / AnalysisWorker#POLL_INTERVAL_MIN_SECONDS tasks per second.
* It is entirely plausible for half or more of the origins in a job to be unconnected to any roadways (water,
* deserts etc.) In this case the system may need to burn through millions of origins, only checking that they
* aren't attached to anything in the selected scenario. Not doing so could double the run time of an analysis.
* It may be beneficial to assign origins to workers more randomly, or to introduce a mechanism to pre-scan for
* disconnected origins or at least concisely signal large blocks of them in worker responses.
*/
public static final int MAX_TASKS_PER_WORKER = 16;
public static final int MAX_TASKS_PER_WORKER = 40;

/**
* Used when auto-starting spot instances. Set to a smaller value to increase the number of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.conveyal.analysis.AnalysisServerException;
import com.conveyal.analysis.SelectingGridReducer;
import com.conveyal.analysis.UserPermissions;
import com.conveyal.analysis.components.TaskScheduler;
import com.conveyal.analysis.components.broker.Broker;
import com.conveyal.analysis.components.broker.JobStatus;
import com.conveyal.analysis.models.AnalysisRequest;
Expand All @@ -11,6 +12,7 @@
import com.conveyal.analysis.models.RegionalAnalysis;
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.analysis.results.CsvResultType;
import com.conveyal.analysis.util.HttpStatus;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.file.FileStorage;
import com.conveyal.file.FileStorageFormat;
Expand All @@ -22,6 +24,7 @@
import com.conveyal.r5.analyst.PointSet;
import com.conveyal.r5.analyst.PointSetCache;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.progress.Task;
import com.google.common.primitives.Ints;
import com.mongodb.QueryBuilder;
import gnu.trove.list.array.TIntArrayList;
Expand All @@ -36,6 +39,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
Expand All @@ -45,9 +49,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.zip.GZIPOutputStream;

import static com.conveyal.analysis.util.JsonUtil.toJson;
Expand All @@ -60,6 +67,7 @@
import static com.google.common.base.Preconditions.checkState;
import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON;
import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_HTML;
import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_PLAIN;

/**
* Spark HTTP handler methods that allow launching new regional analyses, as well as deleting them and fetching
Expand All @@ -80,10 +88,12 @@ public class RegionalAnalysisController implements HttpController {

private final Broker broker;
private final FileStorage fileStorage;
private final TaskScheduler taskScheduler;

public RegionalAnalysisController (Broker broker, FileStorage fileStorage) {
public RegionalAnalysisController (Broker broker, FileStorage fileStorage, TaskScheduler taskScheduler) {
this.broker = broker;
this.fileStorage = fileStorage;
this.taskScheduler = taskScheduler;
}

private Collection<RegionalAnalysis> getRegionalAnalysesForRegion(String regionId, UserPermissions userPermissions) {
Expand Down Expand Up @@ -254,8 +264,9 @@ private HumanKey getSingleCutoffGrid (
grid.writeGeotiff(fos);
break;
}

LOG.debug("Finished deriving single-cutoff grid {}. Transferring to storage.", singleCutoffKey);
fileStorage.moveIntoStorage(singleCutoffFileStorageKey, localFile);
LOG.debug("Finished transferring single-cutoff grid {} to storage.", singleCutoffKey);
}
String analysisHumanName = humanNameForEntity(analysis);
String destinationHumanName = humanNameForEntity(destinations);
Expand All @@ -266,6 +277,10 @@ private HumanKey getSingleCutoffGrid (
return new HumanKey(singleCutoffFileStorageKey, resultHumanFilename);
}

// Prevent multiple requests from creating the same files in parallel.
// This could potentially be integrated into FileStorage with enum return values or an additional boolean method.
private Set<String> filesBeingPrepared = Collections.synchronizedSet(new HashSet<>());

private Object getAllRegionalResults (Request req, Response res) throws IOException {
final String regionalAnalysisId = req.params("_id");
final UserPermissions userPermissions = UserPermissions.from(req);
Expand All @@ -277,39 +292,61 @@ private Object getAllRegionalResults (Request req, Response res) throws IOExcept
throw AnalysisServerException.badRequest("Batch result download only available for gridded origins.");
}
FileStorageKey zippedResultsKey = new FileStorageKey(RESULTS, analysis._id + "_ALL.zip");
if (!fileStorage.exists(zippedResultsKey)) {
// Iterate over all dest, cutoff, percentile combinations and generate one geotiff grid output for each one.
List<HumanKey> humanKeys = new ArrayList<>();
for (String destinationPointSetId : analysis.destinationPointSetIds) {
OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions);
for (int cutoffMinutes : analysis.cutoffsMinutes) {
for (int percentile : analysis.travelTimePercentiles) {
HumanKey gridKey = getSingleCutoffGrid(
analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF
);
humanKeys.add(gridKey);
if (fileStorage.exists(zippedResultsKey)) {
res.type(APPLICATION_JSON.asString());
String analysisHumanName = humanNameForEntity(analysis);
return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip");
}
if (filesBeingPrepared.contains(zippedResultsKey.path)) {
res.type(TEXT_PLAIN.asString());
res.status(HttpStatus.ACCEPTED_202);
return "Geotiff zip is already being prepared in the background.";
}
// File did not exist. Create it in the background and ask caller to request it later.
filesBeingPrepared.add(zippedResultsKey.path);
Task task = Task.create("Zip all geotiffs for regional analysis " + analysis.name)
.forUser(userPermissions)
.withAction(progressListener -> {
int nSteps = analysis.destinationPointSetIds.length * analysis.cutoffsMinutes.length *
analysis.travelTimePercentiles.length * 2 + 1;
progressListener.beginTask("Creating and archiving geotiffs...", nSteps);
// Iterate over all dest, cutoff, percentile combinations and generate one geotiff for each combination.
List<HumanKey> humanKeys = new ArrayList<>();
for (String destinationPointSetId : analysis.destinationPointSetIds) {
OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions);
for (int cutoffMinutes : analysis.cutoffsMinutes) {
for (int percentile : analysis.travelTimePercentiles) {
HumanKey gridKey = getSingleCutoffGrid(
analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF
);
humanKeys.add(gridKey);
progressListener.increment();
}
}
}
}
File tempZipFile = File.createTempFile("regional", ".zip");
// Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition
// Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual entries.
// May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion.
tempZipFile.delete();
Map<String, String> env = Map.of("create", "true");
URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath());
try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) {
for (HumanKey key : humanKeys) {
Path storagePath = fileStorage.getFile(key.storageKey).toPath();
Path zipPath = zipFilesystem.getPath(key.humanName);
Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING);
File tempZipFile = File.createTempFile("regional", ".zip");
// Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition
// Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual
// entries. May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion.
tempZipFile.delete();
Map<String, String> env = Map.of("create", "true");
URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath());
try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) {
for (HumanKey key : humanKeys) {
Path storagePath = fileStorage.getFile(key.storageKey).toPath();
Path zipPath = zipFilesystem.getPath(key.humanName);
Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING);
progressListener.increment();
}
}
}
fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile);
}
res.type(APPLICATION_JSON.asString());
String analysisHumanName = humanNameForEntity(analysis);
return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip");
fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile);
progressListener.increment();
filesBeingPrepared.remove(zippedResultsKey.path);
});
taskScheduler.enqueue(task);
res.type(TEXT_PLAIN.asString());
res.status(HttpStatus.ACCEPTED_202);
return "Building geotiff zip in background.";
}

/**
Expand Down Expand Up @@ -666,7 +703,7 @@ public void registerEndpoints (spark.Service sparkService) {
sparkService.get("/:_id", this::getRegionalAnalysis);
sparkService.get("/:_id/all", this::getAllRegionalResults, toJson);
sparkService.get("/:_id/grid/:format", this::getRegionalResults, toJson);
sparkService.get("/:_id/csv/:resultType", this::getCsvResults, toJson);
sparkService.get("/:_id/csv/:resultType", this::getCsvResults);
sparkService.get("/:_id/scenarioJsonUrl", this::getScenarioJsonUrl, toJson);
sparkService.delete("/:_id", this::deleteRegionalAnalysis, toJson);
sparkService.post("", this::createRegionalAnalysis, toJson);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public void writeOneWorkResult (RegionalWorkResult workResult) throws Exception
// CsvWriter is not threadsafe and multiple threads may call this, so after values are generated,
// the actual writing is synchronized (TODO confirm)
// Is result row generation slow enough to bother synchronizing only the following block?
// This first dimension check is specific to each subclass. The check in the loop below is more general,
// applying to all subclasses (after the subclass-specific rowValues method may have added some columns).
checkDimension(workResult);
Iterable<String[]> rows = rowValues(workResult);
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ public Iterable<String[]> rowValues (RegionalWorkResult workResult) {
return rows;
}

// Around 2024-04 we wanted to expand the number of CSV columns and needed to update the dimension checks below.
// The number of columns is checked twice, once in this specific CsvResultWriter implementation and once in the
// abstract superclass.
// We don't want to introduce a column count check with tolerance that is applied separately to each row, because
// this will not catch a whole class of problems where the worker instances are not producing a consistent number
// of columns across origins.
// We do ideally want to allow experimental workers that add an unknown number of columns, but they should add those
// columns to every row. This requires some kind of negotiated, flexible protocol between the backend and workers.
// Or some system where the first worker response received sets expectations and all other responses must match.
// We thought this through and decided it was too big a change to introduce immediately.
// So we only accept one specific quantity of CSV columns, but fail with a very specific message when we see a
// number of CSV columns that we recognize as coming from an obsolete worker version. Breaking backward
// compatibility is acceptable here because CSV paths are still considered an experimental feature.
// Ideally this very case-specific check and error message will be removed when some more general system is added.

@Override
protected void checkDimension (RegionalWorkResult workResult) {
// Path CSV output only supports a single freeform pointset for now.
Expand All @@ -53,6 +68,11 @@ protected void checkDimension (RegionalWorkResult workResult) {
for (ArrayList<String[]> oneDestination : workResult.pathResult) {
// Number of distinct paths per destination is variable, don't validate it.
for (String[] iterationDetails : oneDestination) {
if (iterationDetails.length == 10) {
throw new IllegalArgumentException(
"Please use worker version newer than v7.1. CSV columns in path results have changed."
);
}
checkDimension(workResult, "columns", iterationDetails.length, PathResult.DATA_COLUMNS.length);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Type getType() {
*/
@Override
public WebMercatorExtents getWebMercatorExtents() {
if (makeTauiSite) {
if (makeTauiSite || this.hasFlag("CROP_DESTINATIONS")) {
return WebMercatorExtents.forTask(this);
} else {
return WebMercatorExtents.forPointsets(this.destinationPointSets);
Expand Down Expand Up @@ -112,4 +112,8 @@ public int nTargetsPerOrigin () {
}
}

public boolean hasFlag (String flag) {
return this.flags != null && this.flags.contains(flag);
}

}
2 changes: 1 addition & 1 deletion src/main/java/com/conveyal/r5/analyst/progress/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ protected void bubbleUpProgress() {
}

/**
* Check that all necesary fields have been set before enqueueing for execution, and check any invariants.
* Check that all necessary fields have been set before enqueueing for execution, and check any invariants.
*/
public void validate () {
if (this.user == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public String[] detailsWithGtfsIds (TransitLayer transitLayer, CsvResultOptions
routeJoiner.toString(),
boardStopJoiner.toString(),
alightStopJoiner.toString(),
feedJoiner.toString(),
rideTimeJoiner.toString(),
feedJoiner.toString(),
accessTime,
egressTime
};
Expand Down

0 comments on commit a96f49a

Please sign in to comment.