diff --git a/src/main/java/com/conveyal/analysis/components/BackendComponents.java b/src/main/java/com/conveyal/analysis/components/BackendComponents.java index 9a7270a1b..dafb1b57e 100644 --- a/src/main/java/com/conveyal/analysis/components/BackendComponents.java +++ b/src/main/java/com/conveyal/analysis/components/BackendComponents.java @@ -86,7 +86,7 @@ public List standardHttpControllers () { new GtfsController(gtfsCache), new BundleController(this), new OpportunityDatasetController(fileStorage, taskScheduler, censusExtractor, database), - new RegionalAnalysisController(broker, fileStorage), + new RegionalAnalysisController(broker, fileStorage, taskScheduler), new AggregationAreaController(fileStorage, database, taskScheduler), // This broker controller registers at least one handler at URL paths beginning with /internal, which // is exempted from authentication and authorization, but should be hidden from the world diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index 7e613de43..5a45edf70 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -110,10 +110,18 @@ public interface Config { * is too high, all remaining tasks in a job could be distributed to a single worker leaving none for the other * workers, creating a slow-joiner problem especially if the tasks are complicated and slow to complete. * - * The value should eventually be tuned. The current value of 16 is just the value used by the previous sporadic + * The value should eventually be tuned. The value of 16 is the value used by the previous sporadic * polling system (WorkerStatus.LEGACY_WORKER_MAX_TASKS) which may not be ideal but is known to work. + * + * NOTE that as a side effect this limits the total throughput of each worker to: + * MAX_TASKS_PER_WORKER / AnalysisWorker#POLL_INTERVAL_MIN_SECONDS tasks per second. + * It is entirely plausible for half or more of the origins in a job to be unconnected to any roadways (water, + * deserts etc.) In this case the system may need to burn through millions of origins, only checking that they + * aren't attached to anything in the selected scenario. Not doing so could double the run time of an analysis. + * It may be beneficial to assign origins to workers more randomly, or to introduce a mechanism to pre-scan for + * disconnected origins or at least concisely signal large blocks of them in worker responses. */ - public static final int MAX_TASKS_PER_WORKER = 16; + public static final int MAX_TASKS_PER_WORKER = 40; /** * Used when auto-starting spot instances. Set to a smaller value to increase the number of diff --git a/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java b/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java index a60eccd2b..82806ad66 100644 --- a/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java +++ b/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java @@ -3,6 +3,7 @@ import com.conveyal.analysis.AnalysisServerException; import com.conveyal.analysis.SelectingGridReducer; import com.conveyal.analysis.UserPermissions; +import com.conveyal.analysis.components.TaskScheduler; import com.conveyal.analysis.components.broker.Broker; import com.conveyal.analysis.components.broker.JobStatus; import com.conveyal.analysis.models.AnalysisRequest; @@ -11,6 +12,7 @@ import com.conveyal.analysis.models.RegionalAnalysis; import com.conveyal.analysis.persistence.Persistence; import com.conveyal.analysis.results.CsvResultType; +import com.conveyal.analysis.util.HttpStatus; import com.conveyal.analysis.util.JsonUtil; import com.conveyal.file.FileStorage; import com.conveyal.file.FileStorageFormat; @@ -22,6 +24,7 @@ import com.conveyal.r5.analyst.PointSet; import com.conveyal.r5.analyst.PointSetCache; import com.conveyal.r5.analyst.cluster.RegionalTask; +import com.conveyal.r5.analyst.progress.Task; import com.google.common.primitives.Ints; import com.mongodb.QueryBuilder; import gnu.trove.list.array.TIntArrayList; @@ -36,6 +39,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.URI; import java.nio.file.FileSystem; import java.nio.file.FileSystems; @@ -45,9 +49,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.zip.GZIPOutputStream; import static com.conveyal.analysis.util.JsonUtil.toJson; @@ -60,6 +67,7 @@ import static com.google.common.base.Preconditions.checkState; import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON; import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_HTML; +import static org.eclipse.jetty.http.MimeTypes.Type.TEXT_PLAIN; /** * Spark HTTP handler methods that allow launching new regional analyses, as well as deleting them and fetching @@ -80,10 +88,12 @@ public class RegionalAnalysisController implements HttpController { private final Broker broker; private final FileStorage fileStorage; + private final TaskScheduler taskScheduler; - public RegionalAnalysisController (Broker broker, FileStorage fileStorage) { + public RegionalAnalysisController (Broker broker, FileStorage fileStorage, TaskScheduler taskScheduler) { this.broker = broker; this.fileStorage = fileStorage; + this.taskScheduler = taskScheduler; } private Collection getRegionalAnalysesForRegion(String regionId, UserPermissions userPermissions) { @@ -254,8 +264,9 @@ private HumanKey getSingleCutoffGrid ( grid.writeGeotiff(fos); break; } - + LOG.debug("Finished deriving single-cutoff grid {}. Transferring to storage.", singleCutoffKey); fileStorage.moveIntoStorage(singleCutoffFileStorageKey, localFile); + LOG.debug("Finished transferring single-cutoff grid {} to storage.", singleCutoffKey); } String analysisHumanName = humanNameForEntity(analysis); String destinationHumanName = humanNameForEntity(destinations); @@ -266,6 +277,10 @@ private HumanKey getSingleCutoffGrid ( return new HumanKey(singleCutoffFileStorageKey, resultHumanFilename); } + // Prevent multiple requests from creating the same files in parallel. + // This could potentially be integrated into FileStorage with enum return values or an additional boolean method. + private Set filesBeingPrepared = Collections.synchronizedSet(new HashSet<>()); + private Object getAllRegionalResults (Request req, Response res) throws IOException { final String regionalAnalysisId = req.params("_id"); final UserPermissions userPermissions = UserPermissions.from(req); @@ -277,39 +292,61 @@ private Object getAllRegionalResults (Request req, Response res) throws IOExcept throw AnalysisServerException.badRequest("Batch result download only available for gridded origins."); } FileStorageKey zippedResultsKey = new FileStorageKey(RESULTS, analysis._id + "_ALL.zip"); - if (!fileStorage.exists(zippedResultsKey)) { - // Iterate over all dest, cutoff, percentile combinations and generate one geotiff grid output for each one. - List humanKeys = new ArrayList<>(); - for (String destinationPointSetId : analysis.destinationPointSetIds) { - OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions); - for (int cutoffMinutes : analysis.cutoffsMinutes) { - for (int percentile : analysis.travelTimePercentiles) { - HumanKey gridKey = getSingleCutoffGrid( - analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF - ); - humanKeys.add(gridKey); + if (fileStorage.exists(zippedResultsKey)) { + res.type(APPLICATION_JSON.asString()); + String analysisHumanName = humanNameForEntity(analysis); + return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip"); + } + if (filesBeingPrepared.contains(zippedResultsKey.path)) { + res.type(TEXT_PLAIN.asString()); + res.status(HttpStatus.ACCEPTED_202); + return "Geotiff zip is already being prepared in the background."; + } + // File did not exist. Create it in the background and ask caller to request it later. + filesBeingPrepared.add(zippedResultsKey.path); + Task task = Task.create("Zip all geotiffs for regional analysis " + analysis.name) + .forUser(userPermissions) + .withAction(progressListener -> { + int nSteps = analysis.destinationPointSetIds.length * analysis.cutoffsMinutes.length * + analysis.travelTimePercentiles.length * 2 + 1; + progressListener.beginTask("Creating and archiving geotiffs...", nSteps); + // Iterate over all dest, cutoff, percentile combinations and generate one geotiff for each combination. + List humanKeys = new ArrayList<>(); + for (String destinationPointSetId : analysis.destinationPointSetIds) { + OpportunityDataset destinations = getDestinations(destinationPointSetId, userPermissions); + for (int cutoffMinutes : analysis.cutoffsMinutes) { + for (int percentile : analysis.travelTimePercentiles) { + HumanKey gridKey = getSingleCutoffGrid( + analysis, destinations, cutoffMinutes, percentile, FileStorageFormat.GEOTIFF + ); + humanKeys.add(gridKey); + progressListener.increment(); + } } } - } - File tempZipFile = File.createTempFile("regional", ".zip"); - // Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition - // Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual entries. - // May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion. - tempZipFile.delete(); - Map env = Map.of("create", "true"); - URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath()); - try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) { - for (HumanKey key : humanKeys) { - Path storagePath = fileStorage.getFile(key.storageKey).toPath(); - Path zipPath = zipFilesystem.getPath(key.humanName); - Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING); + File tempZipFile = File.createTempFile("regional", ".zip"); + // Zipfs can't open existing empty files, the file has to not exist. FIXME: Non-dangerous race condition + // Examining ZipFileSystemProvider reveals a "useTempFile" env parameter, but this is for the individual + // entries. May be better to just use zipOutputStream which would also allow gzip - zip CSV conversion. + tempZipFile.delete(); + Map env = Map.of("create", "true"); + URI uri = URI.create("jar:file:" + tempZipFile.getAbsolutePath()); + try (FileSystem zipFilesystem = FileSystems.newFileSystem(uri, env)) { + for (HumanKey key : humanKeys) { + Path storagePath = fileStorage.getFile(key.storageKey).toPath(); + Path zipPath = zipFilesystem.getPath(key.humanName); + Files.copy(storagePath, zipPath, StandardCopyOption.REPLACE_EXISTING); + progressListener.increment(); + } } - } - fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile); - } - res.type(APPLICATION_JSON.asString()); - String analysisHumanName = humanNameForEntity(analysis); - return fileStorage.getJsonUrl(zippedResultsKey, analysisHumanName, "zip"); + fileStorage.moveIntoStorage(zippedResultsKey, tempZipFile); + progressListener.increment(); + filesBeingPrepared.remove(zippedResultsKey.path); + }); + taskScheduler.enqueue(task); + res.type(TEXT_PLAIN.asString()); + res.status(HttpStatus.ACCEPTED_202); + return "Building geotiff zip in background."; } /** @@ -666,7 +703,7 @@ public void registerEndpoints (spark.Service sparkService) { sparkService.get("/:_id", this::getRegionalAnalysis); sparkService.get("/:_id/all", this::getAllRegionalResults, toJson); sparkService.get("/:_id/grid/:format", this::getRegionalResults, toJson); - sparkService.get("/:_id/csv/:resultType", this::getCsvResults, toJson); + sparkService.get("/:_id/csv/:resultType", this::getCsvResults); sparkService.get("/:_id/scenarioJsonUrl", this::getScenarioJsonUrl, toJson); sparkService.delete("/:_id", this::deleteRegionalAnalysis, toJson); sparkService.post("", this::createRegionalAnalysis, toJson); diff --git a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java index ca0cf09c5..1bd9ff9ac 100644 --- a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java @@ -100,6 +100,8 @@ public void writeOneWorkResult (RegionalWorkResult workResult) throws Exception // CsvWriter is not threadsafe and multiple threads may call this, so after values are generated, // the actual writing is synchronized (TODO confirm) // Is result row generation slow enough to bother synchronizing only the following block? + // This first dimension check is specific to each subclass. The check in the loop below is more general, + // applying to all subclasses (after the subclass-specific rowValues method may have added some columns). checkDimension(workResult); Iterable rows = rowValues(workResult); synchronized (this) { diff --git a/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java index 0dadb4337..6a7c9ffc7 100644 --- a/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java @@ -42,6 +42,21 @@ public Iterable rowValues (RegionalWorkResult workResult) { return rows; } + // Around 2024-04 we wanted to expand the number of CSV columns and needed to update the dimension checks below. + // The number of columns is checked twice, once in this specific CsvResultWriter implementation and once in the + // abstract superclass. + // We don't want to introduce a column count check with tolerance that is applied separately to each row, because + // this will not catch a whole class of problems where the worker instances are not producing a consistent number + // of columns across origins. + // We do ideally want to allow experimental workers that add an unknown number of columns, but they should add those + // columns to every row. This requires some kind of negotiated, flexible protocol between the backend and workers. + // Or some system where the first worker response received sets expectations and all other responses must match. + // We thought this through and decided it was too big a change to introduce immediately. + // So we only accept one specific quantity of CSV columns, but fail with a very specific message when we see a + // number of CSV columns that we recognize as coming from an obsolete worker version. Breaking backward + // compatibility is acceptable here because CSV paths are still considered an experimental feature. + // Ideally this very case-specific check and error message will be removed when some more general system is added. + @Override protected void checkDimension (RegionalWorkResult workResult) { // Path CSV output only supports a single freeform pointset for now. @@ -53,6 +68,11 @@ protected void checkDimension (RegionalWorkResult workResult) { for (ArrayList oneDestination : workResult.pathResult) { // Number of distinct paths per destination is variable, don't validate it. for (String[] iterationDetails : oneDestination) { + if (iterationDetails.length == 10) { + throw new IllegalArgumentException( + "Please use worker version newer than v7.1. CSV columns in path results have changed." + ); + } checkDimension(workResult, "columns", iterationDetails.length, PathResult.DATA_COLUMNS.length); } } diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java b/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java index 0a444f9aa..839bdde91 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java @@ -74,7 +74,7 @@ public Type getType() { */ @Override public WebMercatorExtents getWebMercatorExtents() { - if (makeTauiSite) { + if (makeTauiSite || this.hasFlag("CROP_DESTINATIONS")) { return WebMercatorExtents.forTask(this); } else { return WebMercatorExtents.forPointsets(this.destinationPointSets); @@ -112,4 +112,8 @@ public int nTargetsPerOrigin () { } } + public boolean hasFlag (String flag) { + return this.flags != null && this.flags.contains(flag); + } + } diff --git a/src/main/java/com/conveyal/r5/analyst/progress/Task.java b/src/main/java/com/conveyal/r5/analyst/progress/Task.java index 49990af7a..a072d234d 100644 --- a/src/main/java/com/conveyal/r5/analyst/progress/Task.java +++ b/src/main/java/com/conveyal/r5/analyst/progress/Task.java @@ -162,7 +162,7 @@ protected void bubbleUpProgress() { } /** - * Check that all necesary fields have been set before enqueueing for execution, and check any invariants. + * Check that all necessary fields have been set before enqueueing for execution, and check any invariants. */ public void validate () { if (this.user == null) { diff --git a/src/main/java/com/conveyal/r5/transit/path/RouteSequence.java b/src/main/java/com/conveyal/r5/transit/path/RouteSequence.java index 62e6cfd34..455e1fab1 100644 --- a/src/main/java/com/conveyal/r5/transit/path/RouteSequence.java +++ b/src/main/java/com/conveyal/r5/transit/path/RouteSequence.java @@ -62,8 +62,8 @@ public String[] detailsWithGtfsIds (TransitLayer transitLayer, CsvResultOptions routeJoiner.toString(), boardStopJoiner.toString(), alightStopJoiner.toString(), + feedJoiner.toString(), rideTimeJoiner.toString(), - feedJoiner.toString(), accessTime, egressTime };