Skip to content

Commit

Permalink
refactor(deploy): shuffle deploy job code for clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
landonreed committed Nov 30, 2018
1 parent 7c92c1d commit aa0553b
Showing 1 changed file with 71 additions and 64 deletions.
135 changes: 71 additions & 64 deletions src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ public class DeployJob extends MonitorableJob {

private static final Logger LOG = LoggerFactory.getLogger(DeployJob.class);
private static final String bundlePrefix = "bundles";
/**
* S3 bucket to upload deployment to. If not null, uses {@link OtpServer#s3Bucket}. Otherwise, defaults to
* {@link DataManager#feedBucket}
* */
private final String s3Bucket;
private final int targetCount;
private int tasksCompleted = 0;
private int totalTasks;

private AmazonEC2 ec2;
private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
Expand All @@ -89,6 +97,7 @@ public class DeployJob extends MonitorableJob {
/** This hides the status field on the parent class, providing additional fields. */
public DeployStatus status;

private String statusMessage;
private int serverCounter = 0;
// private String imageId;
private String dateString = DATE_FORMAT.format(new Date());
Expand All @@ -105,8 +114,11 @@ public DeployJob(Deployment deployment, String owner, OtpServer otpServer) {
super(owner, "Deploying " + deployment.name, JobType.DEPLOY_TO_OTP);
this.deployment = deployment;
this.otpServer = otpServer;
this.s3Bucket = otpServer.s3Bucket != null ? otpServer.s3Bucket : DataManager.feedBucket;
// Use a special subclass of status here that has additional fields
this.status = new DeployStatus();
this.targetCount = otpServer.internalUrl != null ? otpServer.internalUrl.size() : 0;
this.totalTasks = 1 + targetCount;
status.message = "Initializing...";
status.built = false;
status.numServersCompleted = 0;
Expand All @@ -118,14 +130,9 @@ public DeployJob(Deployment deployment, String owner, OtpServer otpServer) {
}

public void jobLogic () {
int targetCount = otpServer.internalUrl != null ? otpServer.internalUrl.size() : 0;
int totalTasks = 1 + targetCount;
if (otpServer.s3Bucket != null) totalTasks++;
// FIXME
if (otpServer.targetGroupArn != null) totalTasks++;
int tasksCompleted = 0;
String statusMessage;

try {
deploymentTempFile = File.createTempFile("deployment", ".zip");
} catch (IOException e) {
Expand Down Expand Up @@ -155,49 +162,22 @@ public void jobLogic () {
LOG.info("Deployment pctComplete = {}", status.percentComplete);
status.built = true;

// Upload to S3, if applicable
if(otpServer.s3Bucket != null) {
status.message = "Uploading to S3";
status.uploadingS3 = true;
String key = getS3BundleKey();
LOG.info("Uploading deployment {} to s3://{}/{}", deployment.name, otpServer.s3Bucket, key);
// Upload to S3, if specifically required by the OTPServer or needed for servers in the target group to fetch.
if (otpServer.s3Bucket != null || otpServer.targetGroupArn != null) {
try {
// PutObjectRequest putObjectRequest = new PutObjectRequest(otpServer.s3Bucket, key, deploymentTempFile).;
TransferManager tx = TransferManagerBuilder.standard().withS3Client(FeedStore.s3Client).build();
final Upload upload = tx.upload(otpServer.s3Bucket, key, deploymentTempFile);

upload.addProgressListener((ProgressListener) progressEvent -> {
status.percentUploaded = upload.getProgress().getPercentTransferred();
});

upload.waitForCompletion();

// Shutdown the Transfer Manager, but don't shut down the underlying S3 client.
// The default behavior for shutdownNow shut's down the underlying s3 client
// which will cause any following s3 operations to fail.
tx.shutdownNow(false);

// copy to [name]-latest.zip
String copyKey = getLatestS3BundleKey();
CopyObjectRequest copyObjRequest = new CopyObjectRequest(
otpServer.s3Bucket, key, otpServer.s3Bucket, copyKey);
FeedStore.s3Client.copyObject(copyObjRequest);
LOG.info("Copied to s3://{}/{}", otpServer.s3Bucket, copyKey);
} catch (AmazonClientException|InterruptedException e) {
statusMessage = String.format("Error uploading (or copying) deployment bundle to s3://%s/%s", otpServer.s3Bucket, key);
LOG.error(statusMessage);
e.printStackTrace();
uploadBundleToS3();
} catch (AmazonClientException | InterruptedException e) {
statusMessage = String.format("Error uploading (or copying) deployment bundle to s3://%s", s3Bucket);
LOG.error(statusMessage, e);
status.fail(statusMessage);
return;
}
LOG.info("Uploaded to s3://{}/{}", otpServer.s3Bucket, getS3BundleKey());
status.update("Upload to S3 complete.", status.percentComplete + 10);
status.uploadingS3 = false;

}

// Handle spinning up new EC2 servers for the load balancer's target group.
if (otpServer.targetGroupArn != null) {
if ("true".equals(DataManager.getConfigPropertyAsText("modules.deployment.ec2.enabled"))) {
createServer();
replaceEC2Servers();
// If creating a new server, there is no need to deploy to an existing one.
return;
} else {
Expand All @@ -209,11 +189,46 @@ public void jobLogic () {
}

// If there are no OTP targets (i.e. we're only deploying to S3), we're done.
if(otpServer.internalUrl == null) {
status.completed = true;
return;
if(otpServer.internalUrl != null) {
// If we come to this point, there are internal URLs we need to deploy to (i.e., build graph over the wire).
boolean sendOverWireSuccessful = buildGraphOverWire();
if (!sendOverWireSuccessful) return;
// Set baseUrl after success.
status.baseUrl = otpServer.publicUrl;
}
status.completed = true;
}

private void uploadBundleToS3() throws InterruptedException, AmazonClientException {
status.message = "Uploading to S3";
status.uploadingS3 = true;
String key = getS3BundleKey();
LOG.info("Uploading deployment {} to s3://{}/{}", deployment.name, s3Bucket, key);
TransferManager tx = TransferManagerBuilder.standard().withS3Client(FeedStore.s3Client).build();
final Upload upload = tx.upload(s3Bucket, key, deploymentTempFile);

upload.addProgressListener(
(ProgressListener) progressEvent -> status.percentUploaded = upload.getProgress().getPercentTransferred()
);

upload.waitForCompletion();

// Shutdown the Transfer Manager, but don't shut down the underlying S3 client.
// The default behavior for shutdownNow shut's down the underlying s3 client
// which will cause any following s3 operations to fail.
tx.shutdownNow(false);

// copy to [name]-latest.zip
String copyKey = getLatestS3BundleKey();
CopyObjectRequest copyObjRequest = new CopyObjectRequest(s3Bucket, key, s3Bucket, copyKey);
FeedStore.s3Client.copyObject(copyObjRequest);
LOG.info("Copied to s3://{}/{}", s3Bucket, copyKey);
LOG.info("Uploaded to s3://{}/{}", s3Bucket, getS3BundleKey());
status.update("Upload to S3 complete.", status.percentComplete + 10);
status.uploadingS3 = false;
}

private boolean buildGraphOverWire() {
// figure out what router we're using
String router = deployment.routerId != null ? deployment.routerId : "default";

Expand Down Expand Up @@ -267,7 +282,7 @@ public void jobLogic () {
LOG.error(statusMessage);
e.printStackTrace();
status.fail(statusMessage);
return;
return false;
}

// retrieveById the input file
Expand All @@ -277,7 +292,7 @@ public void jobLogic () {
} catch (FileNotFoundException e) {
LOG.error("Internal error: could not read dumped deployment!");
status.fail("Internal error: could not read dumped deployment!");
return;
return false;
}

try {
Expand All @@ -286,7 +301,7 @@ public void jobLogic () {
statusMessage = String.format("Unable to open connection to OTP server %s", url);
LOG.error(statusMessage);
status.fail(statusMessage);
return;
return false;
}

// copy
Expand All @@ -297,7 +312,7 @@ public void jobLogic () {
LOG.error(statusMessage);
e.printStackTrace();
status.fail(statusMessage);
return;
return false;
}

try {
Expand All @@ -307,7 +322,7 @@ public void jobLogic () {
LOG.error(message);
e.printStackTrace();
status.fail(message);
return;
return false;
}

try {
Expand All @@ -326,8 +341,8 @@ public void jobLogic () {
if (code != HttpURLConnection.HTTP_CREATED) {
// Get input/error stream from connection response.
InputStream stream = code < HttpURLConnection.HTTP_BAD_REQUEST
? conn.getInputStream()
: conn.getErrorStream();
? conn.getInputStream()
: conn.getErrorStream();
String response;
try (Scanner scanner = new Scanner(stream)) {
scanner.useDelimiter("\\Z");
Expand All @@ -338,7 +353,7 @@ public void jobLogic () {
status.fail(statusMessage);
// Skip deploying to any other servers.
// There is no reason to take out the rest of the servers, it's going to have the same result.
return;
return false;
}
} catch (IOException e) {
statusMessage = String.format("Could not finish request to server %s", url);
Expand All @@ -350,9 +365,7 @@ public void jobLogic () {
tasksCompleted++;
status.percentComplete = 100.0 * (double) tasksCompleted / totalTasks;
}

status.completed = true;
status.baseUrl = otpServer.publicUrl;
return true;
}

private String getS3BundleKey() {
Expand Down Expand Up @@ -388,7 +401,7 @@ public void jobFinished () {
NotifyUsersForSubscriptionJob.createNotification("deployment-updated", deployment.id, message);
}

public void createServer () {
public void replaceEC2Servers() {
try {
// First start graph-building instance and wait for graph to successfully build.
List<Instance> instances = startEC2Instances(1);
Expand Down Expand Up @@ -460,7 +473,7 @@ private List<Instance> startEC2Instances(int count) {
// 2. Time to live until shutdown/termination (for test servers)
// 3. Hosting / nginx
// FIXME: Allow for r5 servers to be created.
String userData = constructUserData(otpServer.s3Bucket, deployment.r5);
String userData = constructUserData(deployment.r5);
// The subnet ID should only change if starting up a server in some other AWS account. This is not
// likely to be a requirement.
// Define network interface so that a public IP can be associated with server.
Expand Down Expand Up @@ -538,7 +551,7 @@ private List<String> getIds (List<Instance> instances) {
return instances.stream().map(Instance::getInstanceId).collect(Collectors.toList());
}

private String constructUserData(String s3Bucket, boolean r5) {
private String constructUserData(boolean r5) {
// Prefix/name of JAR file (WITHOUT .jar) FIXME: make this configurable.
String jarName = r5 ? deployment.r5Version : deployment.otpVersion;
if (jarName == null) {
Expand All @@ -563,12 +576,6 @@ private String constructUserData(String s3Bucket, boolean r5) {
lines.add(String.format("LOGFILE=/var/log/%s.log", tripPlanner));
// Log user data setup to /var/log/user-data.log
lines.add("exec > >(tee /var/log/user-data.log|logger -t user-data -s 2>/dev/console) 2>&1");
// Install the necessary files to run the trip planner. FIXME Remove. This should be captured by the AMI.
// lines.add("apt update -y");
// lines.add("apt install -y openjdk-8-jre");
// lines.add("apt install -y openjfx");
// lines.add("apt install -y awscli");
// lines.add("apt install -y unzip");
// Create the directory for the graph inputs.
lines.add(String.format("mkdir -p %s", routerDir));
lines.add(String.format("chown ubuntu %s", routerDir));
Expand Down

0 comments on commit aa0553b

Please sign in to comment.