Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-release patches #937

Merged
merged 7 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public List<HttpController> standardHttpControllers () {
new GtfsController(gtfsCache),
new BundleController(this),
new OpportunityDatasetController(fileStorage, taskScheduler, censusExtractor, database),
new RegionalAnalysisController(broker, fileStorage),
new RegionalAnalysisController(broker, fileStorage, taskScheduler),
new AggregationAreaController(fileStorage, database, taskScheduler),
// This broker controller registers at least one handler at URL paths beginning with /internal, which
// is exempted from authentication and authorization, but should be hidden from the world
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,18 @@ public interface Config {
* is too high, all remaining tasks in a job could be distributed to a single worker leaving none for the other
* workers, creating a slow-joiner problem especially if the tasks are complicated and slow to complete.
*
* The value should eventually be tuned. The current value of 16 is just the value used by the previous sporadic
* The value should eventually be tuned. The value of 16 is the value used by the previous sporadic
* polling system (WorkerStatus.LEGACY_WORKER_MAX_TASKS) which may not be ideal but is known to work.
*
* NOTE that as a side effect this limits the total throughput of each worker to:
* MAX_TASKS_PER_WORKER / AnalysisWorker#POLL_INTERVAL_MIN_SECONDS tasks per second.
* It is entirely plausible for half or more of the origins in a job to be unconnected to any roadways (water,
* deserts etc.) In this case the system may need to burn through millions of origins, only checking that they
* aren't attached to anything in the selected scenario. Not doing so could double the run time of an analysis.
* It may be beneficial to assign origins to workers more randomly, or to introduce a mechanism to pre-scan for
* disconnected origins or at least concisely signal large blocks of them in worker responses.
*/
public static final int MAX_TASKS_PER_WORKER = 16;
public static final int MAX_TASKS_PER_WORKER = 40;

/**
* Used when auto-starting spot instances. Set to a smaller value to increase the number of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.conveyal.analysis.AnalysisServerException;
import com.conveyal.analysis.SelectingGridReducer;
import com.conveyal.analysis.UserPermissions;
import com.conveyal.analysis.components.TaskScheduler;
import com.conveyal.analysis.components.broker.Broker;
import com.conveyal.analysis.components.broker.JobStatus;
import com.conveyal.analysis.models.AnalysisRequest;
Expand All @@ -11,6 +12,7 @@
import com.conveyal.analysis.models.RegionalAnalysis;
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.analysis.results.CsvResultType;
import com.conveyal.analysis.util.HttpStatus;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.file.FileStorage;
import com.conveyal.file.FileStorageFormat;
Expand All @@ -22,6 +24,7 @@
import com.conveyal.r5.analyst.PointSet;
import com.conveyal.r5.analyst.PointSetCache;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.progress.Task;
import com.google.common.primitives.Ints;
import com.mongodb.QueryBuilder;
import gnu.trove.list.array.TIntArrayList;
Expand All @@ -36,6 +39,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
Expand All @@ -45,9 +49,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.zip.GZIPOutputStream;

import static com.conveyal.analysis.util.JsonUtil.toJson;
Expand All @@ -60,6 +67,7 @@
import static com.google.common.base.Preconditions.checkState;
import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON;
import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_HTML;
import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_PLAIN;

/**
* Spark HTTP handler methods that allow launching new regional analyses, as well as deleting them and fetching
Expand All @@ -80,10 +88,12 @@ public class RegionalAnalysisController implements HttpController {

private final Broker broker;
private final FileStorage fileStorage;
private final TaskScheduler taskScheduler;

public RegionalAnalysisController (Broker broker, FileStorage fileStorage) {
public RegionalAnalysisController (Broker broker, FileStorage fileStorage, TaskScheduler taskScheduler) {
this.broker = broker;
this.fileStorage = fileStorage;
this.taskScheduler = taskScheduler;
}

private Collection<RegionalAnalysis> getRegionalAnalysesForRegion(String regionId, UserPermissions userPermissions) {
Expand Down Expand Up @@ -254,8 +264,9 @@ private HumanKey getSingleCutoffGrid (
grid.writeGeotiff(fos);
break;
}

LOG.debug("Finished deriving single-cutoff grid {}. Transferring to storage.", singleCutoffKey);
fileStorage.moveIntoStorage(singleCutoffFileStorageKey, localFile);
LOG.debug("Finished transferring single-cutoff grid {} to storage.", singleCutoffKey);
}
String analysisHumanName = humanNameForEntity(analysis);
String destinationHumanName = humanNameForEntity(destinations);
Expand All @@ -266,6 +277,10 @@ private HumanKey getSingleCutoffGrid (
return new HumanKey(singleCutoffFileStorageKey, resultHumanFilename);
}

// Prevent multiple requests from creating the same files in parallel.
// This could potentially be integrated into FileStorage with enum return values or an additional boolean method.
private Set<String> filesBeingPrepared = Collections.synchronizedSet(new HashSet<>());

private Object getAllRegionalResults (Request req, Response res) throws IOException {
final String regionalAnalysisId = req.params("_id");
final UserPermissions userPermissions = UserPermissions.from(req);
Expand All @@ -277,39 +292,61 @@ private Object getAllRegionalResults (Request req, Response res) throws IOExcept
throw AnalysisServerException.badRequest("Batch result download only available for gridded origins.");
}
FileStorageKey zippedResultsKey = new FileStorageKey(RESULTS, analysis._id + "_ALL.zip");
if (!fileStorage.exists(zippedResultsKey)) {
// Iterate over all dest, cutoff, percentile combinations and generate one geotiff grid output for each one.
List<HumanKey> humanKeys = new ArrayList<>();
for (String destinationPointSetId : analysis.destinationPointSetIds) {
OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions);
for (int cutoffMinutes : analysis.cutoffsMinutes) {
for (int percentile : analysis.travelTimePercentiles) {
HumanKey gridKey = getSingleCutoffGrid(
analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF
);
humanKeys.add(gridKey);
if (fileStorage.exists(zippedResultsKey)) {
res.type(APPLICATION_JSON.asString());
String analysisHumanName = humanNameForEntity(analysis);
return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip");
}
if (filesBeingPrepared.contains(zippedResultsKey.path)) {
res.type(TEXT_PLAIN.asString());
res.status(HttpStatus.ACCEPTED_202);
return "Geotiff zip is already being prepared in the background.";
}
// File did not exist. Create it in the background and ask caller to request it later.
filesBeingPrepared.add(zippedResultsKey.path);
Task task = Task.create("Zip all geotiffs for regional analysis " + analysis.name)
.forUser(userPermissions)
.withAction(progressListener -> {
int nSteps = analysis.destinationPointSetIds.length * analysis.cutoffsMinutes.length *
analysis.travelTimePercentiles.length * 2 + 1;
progressListener.beginTask("Creating and archiving geotiffs...", nSteps);
// Iterate over all dest, cutoff, percentile combinations and generate one geotiff for each combination.
List<HumanKey> humanKeys = new ArrayList<>();
for (String destinationPointSetId : analysis.destinationPointSetIds) {
OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions);
for (int cutoffMinutes : analysis.cutoffsMinutes) {
for (int percentile : analysis.travelTimePercentiles) {
HumanKey gridKey = getSingleCutoffGrid(
analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF
);
humanKeys.add(gridKey);
progressListener.increment();
}
}
}
}
File tempZipFile = File.createTempFile("regional", ".zip");
// Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition
// Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual entries.
// May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion.
tempZipFile.delete();
Map<String, String> env = Map.of("create", "true");
URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath());
try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) {
for (HumanKey key : humanKeys) {
Path storagePath = fileStorage.getFile(key.storageKey).toPath();
Path zipPath = zipFilesystem.getPath(key.humanName);
Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING);
File tempZipFile = File.createTempFile("regional", ".zip");
// Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition
// Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual
// entries. May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion.
tempZipFile.delete();
Map<String, String> env = Map.of("create", "true");
URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath());
try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) {
for (HumanKey key : humanKeys) {
Path storagePath = fileStorage.getFile(key.storageKey).toPath();
Path zipPath = zipFilesystem.getPath(key.humanName);
Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING);
progressListener.increment();
}
}
}
fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile);
}
res.type(APPLICATION_JSON.asString());
String analysisHumanName = humanNameForEntity(analysis);
return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip");
fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile);
progressListener.increment();
filesBeingPrepared.remove(zippedResultsKey.path);
});
taskScheduler.enqueue(task);
res.type(TEXT_PLAIN.asString());
res.status(HttpStatus.ACCEPTED_202);
return "Building geotiff zip in background.";
}

/**
Expand Down Expand Up @@ -666,7 +703,7 @@ public void registerEndpoints (spark.Service sparkService) {
sparkService.get("/:_id", this::getRegionalAnalysis);
sparkService.get("/:_id/all", this::getAllRegionalResults, toJson);
sparkService.get("/:_id/grid/:format", this::getRegionalResults, toJson);
sparkService.get("/:_id/csv/:resultType", this::getCsvResults, toJson);
sparkService.get("/:_id/csv/:resultType", this::getCsvResults);
sparkService.get("/:_id/scenarioJsonUrl", this::getScenarioJsonUrl, toJson);
sparkService.delete("/:_id", this::deleteRegionalAnalysis, toJson);
sparkService.post("", this::createRegionalAnalysis, toJson);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public void writeOneWorkResult (RegionalWorkResult workResult) throws Exception
// CsvWriter is not threadsafe and multiple threads may call this, so after values are generated,
// the actual writing is synchronized (TODO confirm)
// Is result row generation slow enough to bother synchronizing only the following block?
// This first dimension check is specific to each subclass. The check in the loop below is more general,
// applying to all subclasses (after the subclass-specific rowValues method may have added some columns).
checkDimension(workResult);
Iterable<String[]> rows = rowValues(workResult);
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,21 @@ public Iterable<String[]> rowValues (RegionalWorkResult workResult) {
return rows;
}

// Around 2024-04 we wanted to expand the number of CSV columns and needed to update the dimension checks below.
// The number of columns is checked twice, once in this specific CsvResultWriter implementation and once in the
// abstract superclass.
// We don't want to introduce a column count check with tolerance that is applied separately to each row, because
// this will not catch a whole class of problems where the worker instances are not producing a consistent number
// of columns across origins.
// We do ideally want to allow experimental workers that add an unknown number of columns, but they should add those
// columns to every row. This requires some kind of negotiated, flexible protocol between the backend and workers.
// Or some system where the first worker response received sets expectations and all other responses must match.
// We thought this through and decided it was too big a change to introduce immediately.
// So we only accept one specific quantity of CSV columns, but fail with a very specific message when we see a
// number of CSV columns that we recognize as coming from an obsolete worker version. Breaking backward
// compatibility is acceptable here because CSV paths are still considered an experimental feature.
// Ideally this very case-specific check and error message will be removed when some more general system is added.

@Override
protected void checkDimension (RegionalWorkResult workResult) {
// Path CSV output only supports a single freeform pointset for now.
Expand All @@ -53,6 +68,11 @@ protected void checkDimension (RegionalWorkResult workResult) {
for (ArrayList<String[]> oneDestination : workResult.pathResult) {
// Number of distinct paths per destination is variable, don't validate it.
for (String[] iterationDetails : oneDestination) {
if (iterationDetails.length == 10) {
throw new IllegalArgumentException(
"Please use worker version newer than v7.1. CSV columns in path results have changed."
);
}
checkDimension(workResult, "columns", iterationDetails.length, PathResult.DATA_COLUMNS.length);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Type getType() {
*/
@Override
public WebMercatorExtents getWebMercatorExtents() {
if (makeTauiSite) {
if (makeTauiSite || this.hasFlag("CROP_DESTINATIONS")) {
return WebMercatorExtents.forTask(this);
} else {
return WebMercatorExtents.forPointsets(this.destinationPointSets);
Expand Down Expand Up @@ -112,4 +112,8 @@ public int nTargetsPerOrigin () {
}
}

public boolean hasFlag (String flag) {
return this.flags != null && this.flags.contains(flag);
}

}
2 changes: 1 addition & 1 deletion src/main/java/com/conveyal/r5/analyst/progress/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ protected void bubbleUpProgress() {
}

/**
* Check that all necesary fields have been set before enqueueing for execution, and check any invariants.
* Check that all necessary fields have been set before enqueueing for execution, and check any invariants.
*/
public void validate () {
if (this.user == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public String[] detailsWithGtfsIds (TransitLayer transitLayer, CsvResultOptions
routeJoiner.toString(),
boardStopJoiner.toString(),
alightStopJoiner.toString(),
feedJoiner.toString(),
rideTimeJoiner.toString(),
feedJoiner.toString(),
accessTime,
egressTime
};
Expand Down
Loading