Skip to content

Commit

Permalink
Merge pull request #210 from conveyal/fill-missing-stop-times
Browse files Browse the repository at this point in the history
Normalize stop times travel times for set of pattern stops
  • Loading branch information
Landon Reed authored Apr 1, 2019
2 parents 2cd68f0 + 18d5bbe commit b8ed46b
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 10 deletions.
8 changes: 6 additions & 2 deletions src/main/java/com/conveyal/gtfs/loader/BatchTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,19 @@ public void addBatch() throws SQLException {
}
}

public void executeRemaining() throws SQLException {
/**
* Execute any remaining statements and return the total records processed.
*/
public int executeRemaining() throws SQLException {
if (currentBatchSize > 0) {
totalRecordsProcessed += currentBatchSize;
preparedStatement.executeBatch();
currentBatchSize = 0;
}
// Avoid reuse, signal that this was cleanly closed.
preparedStatement = null;
LOG.info(String.format("Inserted %d %s records", totalRecordsProcessed, recordType));
LOG.info(String.format("Processed %d %s records", totalRecordsProcessed, recordType));
return totalRecordsProcessed;
}

public void finalize () {
Expand Down
122 changes: 117 additions & 5 deletions src/main/java/com/conveyal/gtfs/loader/JdbcTableWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -252,6 +254,39 @@ public String update(Integer id, String json, boolean autoCommit) throws SQLExce
}
}

/**
* For a given pattern id and starting stop sequence (inclusive), normalize all stop times to match the pattern
* stops' travel times.
* @return number of stop times updated
*/
public int normalizeStopTimesForPattern (int id, int beginWithSequence) throws SQLException {
try {
JDBCTableReader<PatternStop> patternStops = new JDBCTableReader(
Table.PATTERN_STOP,
dataSource,
tablePrefix + ".",
EntityPopulator.PATTERN_STOP
);
String patternId = getValueForId(id, "pattern_id", tablePrefix, Table.PATTERNS, connection);
List<PatternStop> patternStopsToNormalize = new ArrayList<>();
for (PatternStop patternStop : patternStops.getOrdered(patternId)) {
// Update stop times for any pattern stop with matching stop sequence (or for all pattern stops if the list
// is null).
if (patternStop.stop_sequence >= beginWithSequence) {
patternStopsToNormalize.add(patternStop);
}
}
int stopTimesUpdated = updateStopTimesForPatternStops(patternStopsToNormalize);
connection.commit();
return stopTimesUpdated;
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
DbUtils.closeQuietly(connection);
}
}

/**
* Updates linked fields with values from entity being updated. This is used to update identical fields in related
* tables (for now just fields in trips and stop_times) where the reference table's value should take precedence over
Expand Down Expand Up @@ -661,6 +696,8 @@ private int updateStopTimesForPatternStop(ObjectNode patternStop, int previousTr
statement.setInt(oneBasedIndex++, arrivalTime + dwellTime);
// Set "where clause" with value for pattern_id and stop_sequence
statement.setString(oneBasedIndex++, patternStop.get("pattern_id").asText());
// In the editor, we can depend on stop_times#stop_sequence matching pattern_stops#stop_sequence because we
// normalize stop sequence values for stop times during snapshotting for the editor.
statement.setInt(oneBasedIndex++, patternStop.get("stop_sequence").asInt());
// Log query, execute statement, and log result.
LOG.debug(statement.toString());
Expand All @@ -669,6 +706,68 @@ private int updateStopTimesForPatternStop(ObjectNode patternStop, int previousTr
return travelTime + dwellTime;
}

/**
* Normalizes all stop times' arrivals and departures for an ordered set of pattern stops. This set can be the full
* set of stops for a pattern or just a subset. Typical usage for this method would be to overwrite the arrival and
* departure times for existing trips after a pattern stop has been added or inserted into a pattern or if a
* pattern stop's default travel or dwell time were updated and the stop times need to reflect this update.
* @param patternStops list of pattern stops for which to update stop times (ordered by increasing stop_sequence)
* @throws SQLException
*
* TODO? add param Set<String> serviceIdFilters service_id values to filter trips on
*/
private int updateStopTimesForPatternStops(List<PatternStop> patternStops) throws SQLException {
PatternStop firstPatternStop = patternStops.iterator().next();
int firstStopSequence = firstPatternStop.stop_sequence;
// Prepare SQL query to determine the time that should form the basis for adding the travel time values.
int previousStopSequence = firstStopSequence > 0 ? firstStopSequence - 1 : 0;
String timeField = firstStopSequence > 0 ? "departure_time" : "arrival_time";
String getFirstTravelTimeSql = String.format(
"select t.trip_id, %s from %s.stop_times st, %s.trips t where stop_sequence = ? " +
"and t.pattern_id = ? " +
"and t.trip_id = st.trip_id",
timeField,
tablePrefix,
tablePrefix
);
PreparedStatement statement = connection.prepareStatement(getFirstTravelTimeSql);
statement.setInt(1, previousStopSequence);
statement.setString(2, firstPatternStop.pattern_id);
LOG.info(statement.toString());
ResultSet resultSet = statement.executeQuery();
Map<String, Integer> timesForTripIds = new HashMap<>();
while (resultSet.next()) {
timesForTripIds.put(resultSet.getString(1), resultSet.getInt(2));
}
// Update stop times for individual trips with normalized travel times.
String updateTravelTimeSql = String.format(
"update %s.stop_times set arrival_time = ?, departure_time = ? where trip_id = ? and stop_sequence = ?",
tablePrefix
);
PreparedStatement updateStopTimeStatement = connection.prepareStatement(updateTravelTimeSql);
LOG.info(updateStopTimeStatement.toString());
final BatchTracker stopTimesTracker = new BatchTracker("stop_times", updateStopTimeStatement);
for (String tripId : timesForTripIds.keySet()) {
// Initialize travel time with previous stop time value.
int cumulativeTravelTime = timesForTripIds.get(tripId);
for (PatternStop patternStop : patternStops) {
// Gather travel/dwell time for pattern stop (being sure to check for missing values).
int travelTime = patternStop.default_travel_time == Entity.INT_MISSING ? 0 : patternStop.default_travel_time;
int dwellTime = patternStop.default_dwell_time == Entity.INT_MISSING ? 0 : patternStop.default_dwell_time;
int oneBasedIndex = 1;
// Increase travel time by current pattern stop's travel and dwell times (and set values for update).
cumulativeTravelTime += travelTime;
updateStopTimeStatement.setInt(oneBasedIndex++, cumulativeTravelTime);
cumulativeTravelTime += dwellTime;
updateStopTimeStatement.setInt(oneBasedIndex++, cumulativeTravelTime);
updateStopTimeStatement.setString(oneBasedIndex++, tripId);
updateStopTimeStatement.setInt(oneBasedIndex++, patternStop.stop_sequence);
stopTimesTracker.addBatch();
}
}
return stopTimesTracker.executeRemaining();
}

/**
* Checks that a set of string references to a set of reference tables are all valid. For each set of references
* mapped to a reference table, the method queries for all of the references. If there are any references that were
Expand Down Expand Up @@ -1037,8 +1136,8 @@ private void insertBlankStopTimes(

/**
* For a given condition (fieldName = 'value'), delete all entities that match the condition. Because this uses the
* primary delete method, it also will delete any "child" entities that reference any entities matching the original
* query.
* {@link #delete(Integer, boolean)} method, it also will delete any "child" entities that reference any entities
* matching the original query.
*/
@Override
public int deleteWhere(String fieldName, String value, boolean autoCommit) throws SQLException {
Expand All @@ -1063,14 +1162,14 @@ public int deleteWhere(String fieldName, String value, boolean autoCommit) throw
LOG.info("Deleted {} {} entities", results.size(), specTable.name);
return results.size();
} catch (Exception e) {
// Rollback changes on failure.
connection.rollback();
LOG.error("Could not delete {} entity where {}={}", specTable.name, fieldName, value);
e.printStackTrace();
throw e;
} finally {
if (autoCommit) {
// Always rollback and close if auto-committing.
connection.rollback();
// Always close connection if auto-committing.
connection.close();
}
}
Expand Down Expand Up @@ -1118,6 +1217,14 @@ public void commit() throws SQLException {
connection.close();
}

/**
* Ensure that database connection closes. This should be called once the table writer is no longer needed.
*/
@Override
public void close() {
DbUtils.closeQuietly(connection);
}

/**
* Delete entities from any referencing tables (if required). This method is defined for convenience and clarity, but
* essentially just runs updateReferencingTables with a null value for newKeyValue param.
Expand Down Expand Up @@ -1202,7 +1309,12 @@ private void ensureReferentialIntegrity(
// There was one match found.
if (isCreating) {
// Under no circumstance should a new entity have a conflict with existing key field.
throw new SQLException("New entity's key field must not match existing value.");
throw new SQLException(
String.format("New %s's %s value (%s) conflicts with an existing record in table.",
table.entityClass.getSimpleName(),
keyField,
keyValue)
);
}
if (!uniqueIds.contains(id)) {
// There are two circumstances we could encounter here.
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/conveyal/gtfs/loader/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,15 @@ public String generateInsertSql (String namespace, boolean setDefaultId) {
String tableName = namespace == null
? name
: String.join(".", namespace, name);
String questionMarks = String.join(", ", Collections.nCopies(editorFields().size(), "?"));
String joinedFieldNames = commaSeparatedNames(editorFields());
String idValue = setDefaultId ? "DEFAULT" : "?";
return String.format("insert into %s (id, %s) values (%s, %s)", tableName, joinedFieldNames, idValue, questionMarks);
return String.format(
"insert into %s (id, %s) values (%s, %s)",
tableName,
joinedFieldNames,
idValue,
String.join(", ", Collections.nCopies(editorFields().size(), "?"))
);
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/conveyal/gtfs/loader/TableWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ public interface TableWriter <T extends Entity> {
int deleteWhere (String fieldName, String value, boolean autoCommit) throws SQLException;

void commit () throws SQLException;

void close ();
}
116 changes: 115 additions & 1 deletion src/test/java/com/conveyal/gtfs/loader/JDBCTableWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
import com.conveyal.gtfs.dto.FareRuleDTO;
import com.conveyal.gtfs.dto.FeedInfoDTO;
import com.conveyal.gtfs.dto.FrequencyDTO;
import com.conveyal.gtfs.model.StopTime;
import com.conveyal.gtfs.util.InvalidNamespaceException;
import com.conveyal.gtfs.dto.PatternDTO;
import com.conveyal.gtfs.dto.PatternStopDTO;
import com.conveyal.gtfs.dto.RouteDTO;
import com.conveyal.gtfs.dto.ShapePointDTO;
import com.conveyal.gtfs.dto.StopDTO;
import com.conveyal.gtfs.dto.StopTimeDTO;
import com.conveyal.gtfs.dto.TripDTO;
import com.conveyal.gtfs.util.InvalidNamespaceException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -24,8 +25,10 @@
import javax.sql.DataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.UUID;

import static com.conveyal.gtfs.GTFS.createDataSource;
import static com.conveyal.gtfs.GTFS.load;
Expand Down Expand Up @@ -332,6 +335,60 @@ public void canCreateUpdateAndDeleteRoutes() throws IOException, SQLException, I
));
}

/**
* This test verifies that stop_times#shape_dist_traveled (and other "linked fields") are updated when a pattern
* is updated.
*/
@Test
public void shouldUpdateStopTimeShapeDistTraveledOnPatternStopUpdate() throws IOException, SQLException, InvalidNamespaceException {
String routeId = newUUID();
String patternId = newUUID();
int startTime = 6 * 60 * 60; // 6 AM
PatternDTO pattern = createRouteAndPattern(
routeId,
patternId,
"pattern name",
null,
new ShapePointDTO[]{},
new PatternStopDTO[]{
new PatternStopDTO(patternId, firstStopId, 0),
new PatternStopDTO(patternId, lastStopId, 1)
},
0
);
// Make sure saved data matches expected data.
assertThat(pattern.route_id, equalTo(routeId));
// Create trip so we can check that the stop_time values are updated after the patter update.
TripDTO tripInput = constructTimetableTrip(pattern.pattern_id, pattern.route_id, startTime, 60);
JdbcTableWriter createTripWriter = createTestTableWriter(Table.TRIPS);
String createdTripOutput = createTripWriter.create(mapper.writeValueAsString(tripInput), true);
TripDTO createdTrip = mapper.readValue(createdTripOutput, TripDTO.class);
// Check the stop_time's initial shape_dist_traveled value. TODO test that other linked fields are updated?
PreparedStatement statement = testDataSource.getConnection().prepareStatement(
String.format(
"select shape_dist_traveled from %s.stop_times where stop_sequence=1 and trip_id='%s'",
testNamespace,
createdTrip.trip_id
)
);
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {
// First stop_time shape_dist_traveled should be zero.
assertThat(resultSet.getInt(1), equalTo(0));
}
// Update pattern_stop#shape_dist_traveled and check that the stop_time's shape_dist value is updated.
final double updatedShapeDistTraveled = 45.5;
pattern.pattern_stops[1].shape_dist_traveled = updatedShapeDistTraveled;
JdbcTableWriter patternUpdater = createTestTableWriter(Table.PATTERNS);
String updatedPatternOutput = patternUpdater.update(pattern.id, mapper.writeValueAsString(pattern), true);
LOG.info("Updated pattern: {}", updatedPatternOutput);
ResultSet resultSet2 = statement.executeQuery();
while (resultSet2.next()) {
// First stop_time shape_dist_traveled should be updated.
assertThat(resultSet2.getDouble(1), equalTo(updatedShapeDistTraveled));
}
}

@Test
public void shouldDeleteReferencingTripsAndStopTimesOnPatternDelete() throws IOException, SQLException, InvalidNamespaceException {
String routeId = "9834914";
Expand Down Expand Up @@ -496,6 +553,59 @@ public void canCreateUpdateAndDeleteFrequencyTripForFrequencyPattern() throws IO
));
}

/**
* Checks that {@link JdbcTableWriter#normalizeStopTimesForPattern(int, int)} can normalize stop times to a pattern's
* default travel times.
*/
@Test
public void canNormalizePatternStopTimes() throws IOException, SQLException, InvalidNamespaceException {
// Store Table and Class values for use in test.
final Table tripsTable = Table.TRIPS;
int initialTravelTime = 60; // one minute
int startTime = 6 * 60 * 60; // 6AM
String patternId = "123456";
PatternStopDTO[] patternStops = new PatternStopDTO[]{
new PatternStopDTO(patternId, firstStopId, 0),
new PatternStopDTO(patternId, lastStopId, 1)
};
patternStops[1].default_travel_time = initialTravelTime;
PatternDTO pattern = createRouteAndPattern(newUUID(),
patternId,
"Pattern A",
null,
new ShapePointDTO[]{},
patternStops,
0);
// Create trip with travel times that match pattern stops.
TripDTO tripInput = constructTimetableTrip(pattern.pattern_id, pattern.route_id, startTime, initialTravelTime);
JdbcTableWriter createTripWriter = createTestTableWriter(tripsTable);
String createTripOutput = createTripWriter.create(mapper.writeValueAsString(tripInput), true);
LOG.info(createTripOutput);
TripDTO createdTrip = mapper.readValue(createTripOutput, TripDTO.class);
// Update pattern stop with new travel time.
JdbcTableWriter patternUpdater = createTestTableWriter(Table.PATTERNS);
int updatedTravelTime = 3600; // one hour
pattern.pattern_stops[1].default_travel_time = updatedTravelTime;
String updatedPatternOutput = patternUpdater.update(pattern.id, mapper.writeValueAsString(pattern), true);
LOG.info("Updated pattern output: {}", updatedPatternOutput);
// Normalize stop times.
JdbcTableWriter updateTripWriter = createTestTableWriter(tripsTable);
updateTripWriter.normalizeStopTimesForPattern(pattern.id, 0);
// Read pattern stops from database and check that the arrivals/departures have been updated.
JDBCTableReader<StopTime> stopTimesTable = new JDBCTableReader(Table.STOP_TIMES,
testDataSource,
testNamespace + ".",
EntityPopulator.STOP_TIME);
int index = 0;
for (StopTime stopTime : stopTimesTable.getOrdered(createdTrip.trip_id)) {
LOG.info("stop times i={} arrival={} departure={}", index, stopTime.arrival_time, stopTime.departure_time);
assertThat(stopTime.arrival_time, equalTo(startTime + index * updatedTravelTime));
index++;
}
// Ensure that updated stop times equals pattern stops length
assertThat(index, equalTo(patternStops.length));
}

/**
* This test makes sure that updated the service_id will properly update affected referenced entities properly.
* This test case was initially developed to prove that https://github.com/conveyal/gtfs-lib/issues/203 is
Expand Down Expand Up @@ -526,6 +636,10 @@ public void canUpdateServiceId() throws InvalidNamespaceException, IOException,
* End tests, begin helpers
****************************************************************************************************************/

private static String newUUID() {
return UUID.randomUUID().toString();
}

private void assertThatSqlQueryYieldsRowCount(String sql, int expectedRowCount) throws SQLException {
LOG.info(sql);
int recordCount = 0;
Expand Down

0 comments on commit b8ed46b

Please sign in to comment.