From aa0553bdb84127af8533313a691b351e4fa7757a Mon Sep 17 00:00:00 2001 From: Landon Reed Date: Fri, 30 Nov 2018 14:23:56 -0500 Subject: [PATCH] refactor(deploy): shuffle deploy job code for clarity --- .../datatools/manager/jobs/DeployJob.java | 135 +++++++++--------- 1 file changed, 71 insertions(+), 64 deletions(-) diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java index 021e32b97..6017b1035 100644 --- a/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java +++ b/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java @@ -73,6 +73,14 @@ public class DeployJob extends MonitorableJob { private static final Logger LOG = LoggerFactory.getLogger(DeployJob.class); private static final String bundlePrefix = "bundles"; + /** + * S3 bucket to upload deployment to. If not null, uses {@link OtpServer#s3Bucket}. Otherwise, defaults to + * {@link DataManager#feedBucket} + * */ + private final String s3Bucket; + private final int targetCount; + private int tasksCompleted = 0; + private int totalTasks; private AmazonEC2 ec2; private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z"); @@ -89,6 +97,7 @@ public class DeployJob extends MonitorableJob { /** This hides the status field on the parent class, providing additional fields. */ public DeployStatus status; + private String statusMessage; private int serverCounter = 0; // private String imageId; private String dateString = DATE_FORMAT.format(new Date()); @@ -105,8 +114,11 @@ public DeployJob(Deployment deployment, String owner, OtpServer otpServer) { super(owner, "Deploying " + deployment.name, JobType.DEPLOY_TO_OTP); this.deployment = deployment; this.otpServer = otpServer; + this.s3Bucket = otpServer.s3Bucket != null ? otpServer.s3Bucket : DataManager.feedBucket; // Use a special subclass of status here that has additional fields this.status = new DeployStatus(); + this.targetCount = otpServer.internalUrl != null ? otpServer.internalUrl.size() : 0; + this.totalTasks = 1 + targetCount; status.message = "Initializing..."; status.built = false; status.numServersCompleted = 0; @@ -118,14 +130,9 @@ public DeployJob(Deployment deployment, String owner, OtpServer otpServer) { } public void jobLogic () { - int targetCount = otpServer.internalUrl != null ? otpServer.internalUrl.size() : 0; - int totalTasks = 1 + targetCount; if (otpServer.s3Bucket != null) totalTasks++; // FIXME if (otpServer.targetGroupArn != null) totalTasks++; - int tasksCompleted = 0; - String statusMessage; - try { deploymentTempFile = File.createTempFile("deployment", ".zip"); } catch (IOException e) { @@ -155,49 +162,22 @@ public void jobLogic () { LOG.info("Deployment pctComplete = {}", status.percentComplete); status.built = true; - // Upload to S3, if applicable - if(otpServer.s3Bucket != null) { - status.message = "Uploading to S3"; - status.uploadingS3 = true; - String key = getS3BundleKey(); - LOG.info("Uploading deployment {} to s3://{}/{}", deployment.name, otpServer.s3Bucket, key); + // Upload to S3, if specifically required by the OTPServer or needed for servers in the target group to fetch. + if (otpServer.s3Bucket != null || otpServer.targetGroupArn != null) { try { -// PutObjectRequest putObjectRequest = new PutObjectRequest(otpServer.s3Bucket, key, deploymentTempFile).; - TransferManager tx = TransferManagerBuilder.standard().withS3Client(FeedStore.s3Client).build(); - final Upload upload = tx.upload(otpServer.s3Bucket, key, deploymentTempFile); - - upload.addProgressListener((ProgressListener) progressEvent -> { - status.percentUploaded = upload.getProgress().getPercentTransferred(); - }); - - upload.waitForCompletion(); - - // Shutdown the Transfer Manager, but don't shut down the underlying S3 client. - // The default behavior for shutdownNow shut's down the underlying s3 client - // which will cause any following s3 operations to fail. - tx.shutdownNow(false); - - // copy to [name]-latest.zip - String copyKey = getLatestS3BundleKey(); - CopyObjectRequest copyObjRequest = new CopyObjectRequest( - otpServer.s3Bucket, key, otpServer.s3Bucket, copyKey); - FeedStore.s3Client.copyObject(copyObjRequest); - LOG.info("Copied to s3://{}/{}", otpServer.s3Bucket, copyKey); - } catch (AmazonClientException|InterruptedException e) { - statusMessage = String.format("Error uploading (or copying) deployment bundle to s3://%s/%s", otpServer.s3Bucket, key); - LOG.error(statusMessage); - e.printStackTrace(); + uploadBundleToS3(); + } catch (AmazonClientException | InterruptedException e) { + statusMessage = String.format("Error uploading (or copying) deployment bundle to s3://%s", s3Bucket); + LOG.error(statusMessage, e); status.fail(statusMessage); - return; } - LOG.info("Uploaded to s3://{}/{}", otpServer.s3Bucket, getS3BundleKey()); - status.update("Upload to S3 complete.", status.percentComplete + 10); - status.uploadingS3 = false; + } + // Handle spinning up new EC2 servers for the load balancer's target group. if (otpServer.targetGroupArn != null) { if ("true".equals(DataManager.getConfigPropertyAsText("modules.deployment.ec2.enabled"))) { - createServer(); + replaceEC2Servers(); // If creating a new server, there is no need to deploy to an existing one. return; } else { @@ -209,11 +189,46 @@ public void jobLogic () { } // If there are no OTP targets (i.e. we're only deploying to S3), we're done. - if(otpServer.internalUrl == null) { - status.completed = true; - return; + if(otpServer.internalUrl != null) { + // If we come to this point, there are internal URLs we need to deploy to (i.e., build graph over the wire). + boolean sendOverWireSuccessful = buildGraphOverWire(); + if (!sendOverWireSuccessful) return; + // Set baseUrl after success. + status.baseUrl = otpServer.publicUrl; } + status.completed = true; + } + private void uploadBundleToS3() throws InterruptedException, AmazonClientException { + status.message = "Uploading to S3"; + status.uploadingS3 = true; + String key = getS3BundleKey(); + LOG.info("Uploading deployment {} to s3://{}/{}", deployment.name, s3Bucket, key); + TransferManager tx = TransferManagerBuilder.standard().withS3Client(FeedStore.s3Client).build(); + final Upload upload = tx.upload(s3Bucket, key, deploymentTempFile); + + upload.addProgressListener( + (ProgressListener) progressEvent -> status.percentUploaded = upload.getProgress().getPercentTransferred() + ); + + upload.waitForCompletion(); + + // Shutdown the Transfer Manager, but don't shut down the underlying S3 client. + // The default behavior for shutdownNow shut's down the underlying s3 client + // which will cause any following s3 operations to fail. + tx.shutdownNow(false); + + // copy to [name]-latest.zip + String copyKey = getLatestS3BundleKey(); + CopyObjectRequest copyObjRequest = new CopyObjectRequest(s3Bucket, key, s3Bucket, copyKey); + FeedStore.s3Client.copyObject(copyObjRequest); + LOG.info("Copied to s3://{}/{}", s3Bucket, copyKey); + LOG.info("Uploaded to s3://{}/{}", s3Bucket, getS3BundleKey()); + status.update("Upload to S3 complete.", status.percentComplete + 10); + status.uploadingS3 = false; + } + + private boolean buildGraphOverWire() { // figure out what router we're using String router = deployment.routerId != null ? deployment.routerId : "default"; @@ -267,7 +282,7 @@ public void jobLogic () { LOG.error(statusMessage); e.printStackTrace(); status.fail(statusMessage); - return; + return false; } // retrieveById the input file @@ -277,7 +292,7 @@ public void jobLogic () { } catch (FileNotFoundException e) { LOG.error("Internal error: could not read dumped deployment!"); status.fail("Internal error: could not read dumped deployment!"); - return; + return false; } try { @@ -286,7 +301,7 @@ public void jobLogic () { statusMessage = String.format("Unable to open connection to OTP server %s", url); LOG.error(statusMessage); status.fail(statusMessage); - return; + return false; } // copy @@ -297,7 +312,7 @@ public void jobLogic () { LOG.error(statusMessage); e.printStackTrace(); status.fail(statusMessage); - return; + return false; } try { @@ -307,7 +322,7 @@ public void jobLogic () { LOG.error(message); e.printStackTrace(); status.fail(message); - return; + return false; } try { @@ -326,8 +341,8 @@ public void jobLogic () { if (code != HttpURLConnection.HTTP_CREATED) { // Get input/error stream from connection response. InputStream stream = code < HttpURLConnection.HTTP_BAD_REQUEST - ? conn.getInputStream() - : conn.getErrorStream(); + ? conn.getInputStream() + : conn.getErrorStream(); String response; try (Scanner scanner = new Scanner(stream)) { scanner.useDelimiter("\\Z"); @@ -338,7 +353,7 @@ public void jobLogic () { status.fail(statusMessage); // Skip deploying to any other servers. // There is no reason to take out the rest of the servers, it's going to have the same result. - return; + return false; } } catch (IOException e) { statusMessage = String.format("Could not finish request to server %s", url); @@ -350,9 +365,7 @@ public void jobLogic () { tasksCompleted++; status.percentComplete = 100.0 * (double) tasksCompleted / totalTasks; } - - status.completed = true; - status.baseUrl = otpServer.publicUrl; + return true; } private String getS3BundleKey() { @@ -388,7 +401,7 @@ public void jobFinished () { NotifyUsersForSubscriptionJob.createNotification("deployment-updated", deployment.id, message); } - public void createServer () { + public void replaceEC2Servers() { try { // First start graph-building instance and wait for graph to successfully build. List instances = startEC2Instances(1); @@ -460,7 +473,7 @@ private List startEC2Instances(int count) { // 2. Time to live until shutdown/termination (for test servers) // 3. Hosting / nginx // FIXME: Allow for r5 servers to be created. - String userData = constructUserData(otpServer.s3Bucket, deployment.r5); + String userData = constructUserData(deployment.r5); // The subnet ID should only change if starting up a server in some other AWS account. This is not // likely to be a requirement. // Define network interface so that a public IP can be associated with server. @@ -538,7 +551,7 @@ private List getIds (List instances) { return instances.stream().map(Instance::getInstanceId).collect(Collectors.toList()); } - private String constructUserData(String s3Bucket, boolean r5) { + private String constructUserData(boolean r5) { // Prefix/name of JAR file (WITHOUT .jar) FIXME: make this configurable. String jarName = r5 ? deployment.r5Version : deployment.otpVersion; if (jarName == null) { @@ -563,12 +576,6 @@ private String constructUserData(String s3Bucket, boolean r5) { lines.add(String.format("LOGFILE=/var/log/%s.log", tripPlanner)); // Log user data setup to /var/log/user-data.log lines.add("exec > >(tee /var/log/user-data.log|logger -t user-data -s 2>/dev/console) 2>&1"); - // Install the necessary files to run the trip planner. FIXME Remove. This should be captured by the AMI. -// lines.add("apt update -y"); -// lines.add("apt install -y openjdk-8-jre"); -// lines.add("apt install -y openjfx"); -// lines.add("apt install -y awscli"); -// lines.add("apt install -y unzip"); // Create the directory for the graph inputs. lines.add(String.format("mkdir -p %s", routerDir)); lines.add(String.format("chown ubuntu %s", routerDir));