Skip to content

Commit

Permalink
Merge pull request #12 from chaordic/last-change-variation
Browse files Browse the repository at this point in the history
Last change variation
  • Loading branch information
gmendonca authored Dec 2, 2016
2 parents 24a6b71 + 31006d0 commit 3ed61d0
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 34 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 1.0.2 (, 2016)
- Added cool down and bid threshold times in the configuration file
- Refresh state doesn't affect tiopatinhas' actions anymore

## 1.0.1 (November 29, 2016)
- Added support to get user data from Launch Configuration Group if not provided

Expand Down
2 changes: 2 additions & 0 deletions tp/tp.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"placement": "us-east-1a",
"instance_profile_name": null,
"monitoring_enabled": false,
"cool_down_threshold": 360,
"bid_threshold": 300,
"tags": {},
"user_data_file": null
}
84 changes: 50 additions & 34 deletions tp/tp.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#!/usr/bin/env python

import boto
from urllib import urlopen
import time
import os
import traceback
import logging
import sys
import simplejson as json
Expand Down Expand Up @@ -83,6 +81,8 @@ def __init__(self, side_group, weight_factor=1.0, debug=False,
self.region = region or self.conf.get("region", "us-east-1") # parameter has precedence over config file
self.subnet_id = self.conf.get("subnet_id", None)
self.monitoring_enabled = self.conf.get("monitoring_enabled", False)
self.cool_down_threshold = self.conf.get('cool_down_threshold', 360)
self.bid_threshold = self.conf.get('bid_threshold', 300)

if self.subnet_id is not None:
self.placement = None
Expand All @@ -95,6 +95,7 @@ def __init__(self, side_group, weight_factor=1.0, debug=False,

self.started = False
self.target = None
self.last_bid = 0
self.last_change = 0
self.previous_as_count = None

Expand Down Expand Up @@ -133,21 +134,20 @@ def refresh(self):
if self.previous_as_count != self.managed_by_autoscale():
self.logger.info(">> refresh(): autoscale instance count changed from %s to %s",
self.previous_as_count, self.managed_by_autoscale())
if self.previous_as_count != None:
self.last_change = time.time()
self.previous_as_count = self.managed_by_autoscale()

def guess_target(self):
if not self.started:
self.target = min(self.managed_instances(), self.managed_by_autoscale()) # follow autoscale if stopped :)
return

if self.target == None:
if not self.target:
self.target = self.managed_instances()
previous = self.target

# How many instances we should keep running
if time.time() - self.last_change > 360:
elapsed_time = time.time() - self.last_change
if elapsed_time > self.cool_down_threshold:
candidate = round(self.weight_factor * self.tapping_group.desired_capacity)

# Never less than one
Expand All @@ -161,6 +161,9 @@ def guess_target(self):
if candidate != previous:
self.logger.debug(">> guess_target(): changed target from %s to %s", previous, candidate)
self.target = candidate
else:
self.logger.info("guess_target(): not updating target for instances, waiting for cool down! \
Remaining time to next change %s", self.cool_down_threshold - elapsed_time)

def managed_by_autoscale(self):
return int(self.tapping_group.desired_capacity)
Expand Down Expand Up @@ -195,23 +198,30 @@ def buy(self, amount=1):
subnet_id=self.subnet_id,
user_data=self.user_data,
monitoring_enabled=self.monitoring_enabled)

self.logger.info(">> buy(): purchased 1 on-demand instance")
time.sleep(3)

# get instance since count is 1
instance = r.instances[0]

while 1:
try:
instance.add_tag("tp:group", tapping_group.name)
break
except Exception, e:
traceback.print_exc()
time.sleep(3)
# get the status
status = instance.update()

# check for the status while isn't running
while status != 'running':
time.sleep(5)
status = instance.update()

# when it's finally running, update the tag
self.logger.info("Instance status = " + status)
instance.add_tag('tp:group', tapping_group.name)

def bid(self, force=False):
elapsed_time = time.time() - self.last_change
if not force and elapsed_time < 300:
self.logger.info("bid(): last change was too recent, skipping bid")
self.logger.debug("bid(): remaining time to next change %s", 300 - elapsed_time)
elapsed_time = time.time() - self.last_bid
if not force and elapsed_time < self.bid_threshold:
self.logger.info("bid(): last change was too recent, skipping bid! Remaining time to next change %s",
self.bid_threshold - elapsed_time)
time.sleep(10)
return

Expand All @@ -229,17 +239,25 @@ def bid(self, force=False):
instance_type=self.spot_type,
instance_profile_name=self.instance_profile_name,
monitoring_enabled=self.monitoring_enabled)
# TODO really?
while 1:
try:
request[0].add_tag('tp:tag', self.side_group)
break
except Exception, e:
traceback.print_exc()
time.sleep(3)

# get the first reservation since count is 1
reservation = request[0]

# get the status
status = reservation.status

# check for the status while it isn't running
while status.code != 'fulfilled':
self.logger.info("Reservation status = %s", status.code)
time.sleep(5)
status = self.ec2.get_all_spot_instance_requests(request_ids=[reservation.id])[0].status

# when it's finally fulfilled, update the tag
self.logger.info("Reservation status = %s", status.code)
reservation.add_tag('tp:tag', self.side_group)

self.logger.info(">> bid(): created 1 bid of %s for %s", self.spot_type, self.max_price[self.spot_type])
self.last_change = time.time()
self.last_bid = time.time()
self.bids.append(request)

def check_alive(self, instance_id):
Expand Down Expand Up @@ -294,9 +312,7 @@ def maybe_promote(self, spot_request):
def maybe_replace(self):
for instance in self.emergency:
self.logger.debug("proximity(%s): %s", instance.id, str(self.proximity(instance)))
if (self.proximity(instance) < 10
and self.proximity(instance) > 2
and self.managed_instances() <= self.target):
if (2 < self.proximity(instance) < 10) and self.managed_instances() <= self.target:
self.logger.info(">> maybe_replace(): attempting to replace %s", instance.id)
self.bid(force=True)

Expand Down Expand Up @@ -326,7 +342,7 @@ def maybe_demote(self):
# server
if self.emergency:
for instance in self.emergency:
if self.proximity(instance) < 10 and self.proximity(instance) > 3 and not self.valid_bids():
if (3 < self.proximity(instance) < 10) and not self.valid_bids():
self.logger.info(">> maybe_demote(): removing emergency instance %s", instance.id)
self.dettach_instance(instance.id)
self.ec2.terminate_instances([instance.id])
Expand Down Expand Up @@ -409,20 +425,20 @@ def is_ec2_state_running(instance_id):
all_instances = [r.instances for r in self.ec2.get_all_instances()]
instances = chain.from_iterable(all_instances)
for instance in instances:
if (instance.tags.get('tp:group', None) == self.tapping_group.name and
instance.state not in ('terminated', 'shutting-down')):
if instance.tags.get('tp:group', None) == self.tapping_group.name and \
instance.state not in ('terminated', 'shutting-down'):
self.emergency.append(instance)
if instance.id not in running_in_lb:
self.logger.info(">> load_state: Attaching new emergency instance %s to LB." % instance.id)
self.attach_instance(instance.id, "OD")

def stop(self):
''' Prepares this TPManager to stop by not launching new machines
""" Prepares this TPManager to stop by not launching new machines
and gradually remove old machines.
This manager loop will only stop when both the autoscaling group
and the TP manager has zero instances running.
'''
"""
self.started = False

def start(self):
Expand Down

0 comments on commit 3ed61d0

Please sign in to comment.