Skip to content

Commit

Permalink
Add AllocatedCapacityWeigher
Browse files Browse the repository at this point in the history
AllocatedCapacityWeigher is a weigher that weigh hosts by their
allocated capacity. The main purpose of this weigher is to simulate
the SimpleScheduler's behavior, which sorts hosts by the size of
all volumes on them.  So by allocated capacity, it equals to the
sum of size of all volumes on target host.

In order to keep track of 'allocated' capacity, host state is updated
to add a 'allocated_capacity_gb' attribute to record the value, which
means each back-end must report one extra stats to scheduler.
Fortunately, the 'allocated' capacity we are interested in here is
pure Cinder level capacity, the volume manager can take all the burden
to calculate this value without having to query back-ends. The volume
manager does the initial calculation in init_host() by the time when it
has to query all existing volumes from DB for ensure_export(). After
initial calculation, volume manager/scheduler will keep track of every
new request that changes 'allocated_capacity' and make sure this value
is up to date.

!DriverImpact! Cinder driver developers, please read on:

This patch contains a change that might IMPACT volume drivers: volume
manager now uses 'stats' attribute to save 'allocated_capacity_gb'.
And this information will be merged with those stats drivers provide
as a whole for scheduler to consume.  If you plan to report any form
of allocated space other than the apparent Cinder level value, (e.g.
actual capacity allocated), Please choose a key name other than
'allocated_capacity_gb', otherwise it will *OVERWRITE* the value volume
manager has calculated and confuse scheduler.

Partially implements bp: deprecate-chance-and-simple-schedulers

Change-Id: I306230b8973c2d1ad77bcab14ccde68e997ea816
  • Loading branch information
Zhiteng Huang committed Dec 30, 2013
1 parent 2d6a903 commit 254e37a
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 23 deletions.
6 changes: 6 additions & 0 deletions cinder/scheduler/host_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ def __init__(self, host, capabilities=None, service=None):
# Mutable available resources.
# These will change as resources are virtually "consumed".
self.total_capacity_gb = 0
# capacity has been allocated in cinder POV, which should be
# sum(vol['size'] for vol in vols_on_hosts)
self.allocated_capacity_gb = 0
self.free_capacity_gb = None
self.reserved_percentage = 0

Expand Down Expand Up @@ -128,13 +131,16 @@ def update_from_volume_capability(self, capability):

self.total_capacity_gb = capability['total_capacity_gb']
self.free_capacity_gb = capability['free_capacity_gb']
self.allocated_capacity_gb = capability.get(
'allocated_capacity_gb', 0)
self.reserved_percentage = capability['reserved_percentage']

self.updated = capability['timestamp']

def consume_from_volume(self, volume):
"""Incrementally update host state from an volume."""
volume_gb = volume['size']
self.allocated_capacity_gb += volume_gb
if self.free_capacity_gb == 'infinite':
# There's virtually infinite space on back-end
pass
Expand Down
30 changes: 29 additions & 1 deletion cinder/scheduler/weights/capacity.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# Copyright (c) 2013 eBay Inc.
# Copyright (c) 2012 OpenStack Foundation
# All Rights Reserved.
#
Expand All @@ -13,11 +14,22 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
Capacity Weigher. Weigh hosts by their available capacity.
Weighers that weigh hosts by their capacity, including following two
weighers:
1. Capacity Weigher. Weigh hosts by their available capacity.
The default is to spread volumes across all hosts evenly. If you prefer
stacking, you can set the 'capacity_weight_multiplier' option to a negative
number and the weighing has the opposite effect of the default.
2. Allocated Capacity Weigher. Weigh hosts by their allocated capacity.
The default behavior is to place new volume to the host allocated the least
space. This weigher is intended to simulate the behavior of SimpleScheduler.
If you prefer to place volumes to host allocated the most space, you can
set the 'allocated_capacity_weight_multiplier' option to a postive number
and the weighing has the opposite effect of the default.
"""


Expand All @@ -33,6 +45,10 @@
default=1.0,
help='Multiplier used for weighing volume capacity. '
'Negative numbers mean to stack vs spread.'),
cfg.FloatOpt('allocated_capacity_weight_multiplier',
default=-1.0,
help='Multiplier used for weighing volume capacity. '
'Negative numbers mean to stack vs spread.'),
]

CONF = cfg.CONF
Expand All @@ -55,3 +71,15 @@ def _weigh_object(self, host_state, weight_properties):
else:
free = math.floor(host_state.free_capacity_gb * (1 - reserved))
return free


class AllocatedCapacityWeigher(weights.BaseHostWeigher):
def _weight_multiplier(self):
"""Override the weight multiplier."""
return CONF.allocated_capacity_weight_multiplier

def _weigh_object(self, host_state, weight_properties):
# Higher weights win. We want spreading (choose host with lowest
# allocated_capacity first) to be the default.
allocated_space = host_state.allocated_capacity_gb
return allocated_space
36 changes: 20 additions & 16 deletions cinder/tests/api/contrib/test_admin_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_reset_status_as_admin(self):
def test_reset_status_as_non_admin(self):
# current status is 'error'
volume = db.volume_create(context.get_admin_context(),
{'status': 'error'})
{'status': 'error', 'size': 1})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
Expand All @@ -96,7 +96,7 @@ def test_malformed_reset_status_body(self):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available'})
volume = db.volume_create(ctx, {'status': 'available', 'size': 1})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
Expand All @@ -115,7 +115,7 @@ def test_invalid_status_for_volume(self):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available'})
volume = db.volume_create(ctx, {'status': 'available', 'size': 1})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
Expand Down Expand Up @@ -153,7 +153,8 @@ def test_reset_attached_status(self):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available',
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1,
'attach_status': 'attached'})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
Expand All @@ -177,7 +178,8 @@ def test_invalid_reset_attached_status(self):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available',
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1,
'attach_status': 'detached'})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
Expand All @@ -200,7 +202,8 @@ def test_snapshot_reset_status(self):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# snapshot in 'error_deleting'
volume = db.volume_create(ctx, {})
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1})
snapshot = db.snapshot_create(ctx, {'status': 'error_deleting',
'volume_id': volume['id']})
req = webob.Request.blank('/v2/fake/snapshots/%s/action' %
Expand All @@ -222,7 +225,8 @@ def test_invalid_status_for_snapshot(self):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# snapshot in 'available'
volume = db.volume_create(ctx, {})
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': '', 'size': 1})
snapshot = db.snapshot_create(ctx, {'status': 'available',
'volume_id': volume['id']})
req = webob.Request.blank('/v2/fake/snapshots/%s/action' %
Expand All @@ -245,7 +249,7 @@ def test_force_delete(self):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is creating
volume = db.volume_create(ctx, {'status': 'creating'})
volume = db.volume_create(ctx, {'size': 1})
req = webob.Request.blank('/v2/fake/volumes/%s/action' % volume['id'])
req.method = 'POST'
req.headers['content-type'] = 'application/json'
Expand All @@ -263,7 +267,7 @@ def test_force_delete_snapshot(self):
# admin context
ctx = context.RequestContext('admin', 'fake', True)
# current status is creating
volume = db.volume_create(ctx, {'host': 'test'})
volume = db.volume_create(ctx, {'host': 'test', 'size': 1})
snapshot = db.snapshot_create(ctx, {'status': 'creating',
'volume_size': 1,
'volume_id': volume['id']})
Expand Down Expand Up @@ -291,7 +295,7 @@ def test_force_detach_instance_attached_volume(self):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
Expand Down Expand Up @@ -346,7 +350,7 @@ def test_force_detach_host_attached_volume(self):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
Expand Down Expand Up @@ -402,7 +406,7 @@ def test_attach_in_used_volume_by_instance(self):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
Expand Down Expand Up @@ -438,7 +442,7 @@ def test_attach_in_used_volume_by_host(self):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
connector = {'initiator': 'iqn.2012-07.org.fake:01'}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
Expand Down Expand Up @@ -474,7 +478,7 @@ def test_invalid_iscsi_connector(self):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
connector = {}
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
Expand All @@ -489,7 +493,7 @@ def test_attach_attaching_volume_with_different_instance(self):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
values = {'status': 'attaching',
Expand All @@ -513,7 +517,7 @@ def test_attach_attaching_volume_with_different_mode(self):
ctx = context.RequestContext('admin', 'fake', True)
# current status is available
volume = db.volume_create(ctx, {'status': 'available', 'host': 'test',
'provider_location': ''})
'provider_location': '', 'size': 1})
# start service to handle rpc messages for attach requests
svc = self.start_service('volume', host='test')
values = {'status': 'attaching',
Expand Down
6 changes: 5 additions & 1 deletion cinder/tests/scheduler/fakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,22 @@ def __init__(self):
self.service_states = {
'host1': {'total_capacity_gb': 1024,
'free_capacity_gb': 1024,
'allocated_capacity_gb': 0,
'reserved_percentage': 10,
'timestamp': None},
'host2': {'total_capacity_gb': 2048,
'free_capacity_gb': 300,
'allocated_capacity_gb': 1748,
'reserved_percentage': 10,
'timestamp': None},
'host3': {'total_capacity_gb': 512,
'free_capacity_gb': 512,
'free_capacity_gb': 256,
'allocated_capacity_gb': 256,
'reserved_percentage': 0,
'timestamp': None},
'host4': {'total_capacity_gb': 2048,
'free_capacity_gb': 200,
'allocated_capacity_gb': 1848,
'reserved_percentage': 5,
'timestamp': None},
}
Expand Down
92 changes: 92 additions & 0 deletions cinder/tests/scheduler/test_allocated_capacity_weigher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright 2013 eBay Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Allocated Capacity Weigher.
"""

import mock
from oslo.config import cfg

from cinder import context
from cinder.openstack.common.scheduler.weights import HostWeightHandler
from cinder.scheduler.weights.capacity import AllocatedCapacityWeigher as ACW
from cinder import test
from cinder.tests.scheduler import fakes

CONF = cfg.CONF


class AllocatedCapacityWeigherTestCase(test.TestCase):
def setUp(self):
super(AllocatedCapacityWeigherTestCase, self).setUp()
self.host_manager = fakes.FakeHostManager()
self.weight_handler = HostWeightHandler('cinder.scheduler.weights')

def _get_weighed_host(self, hosts, weight_properties=None):
if weight_properties is None:
weight_properties = {}
return self.weight_handler.get_weighed_objects([ACW], hosts,
weight_properties)[0]

@mock.patch('cinder.db.sqlalchemy.api.service_get_all_by_topic')
def _get_all_hosts(self, _mock_service_get_all_by_topic):
ctxt = context.get_admin_context()
fakes.mock_host_manager_db_calls(_mock_service_get_all_by_topic)
host_states = self.host_manager.get_all_host_states(ctxt)
_mock_service_get_all_by_topic.assert_called_once_with(
ctxt, CONF.volume_topic)
return host_states

def test_default_of_spreading_first(self):
hostinfo_list = self._get_all_hosts()

# host1: allocated_capacity_gb=0, weight=0
# host2: allocated_capacity_gb=1748, weight=-1748
# host3: allocated_capacity_gb=256, weight=-256
# host4: allocated_capacity_gb=1848, weight=-1848

# so, host1 should win:
weighed_host = self._get_weighed_host(hostinfo_list)
self.assertEqual(weighed_host.weight, 0)
self.assertEqual(weighed_host.obj.host, 'host1')

def test_capacity_weight_multiplier1(self):
self.flags(allocated_capacity_weight_multiplier=1.0)
hostinfo_list = self._get_all_hosts()

# host1: allocated_capacity_gb=0, weight=0
# host2: allocated_capacity_gb=1748, weight=1748
# host3: allocated_capacity_gb=256, weight=256
# host4: allocated_capacity_gb=1848, weight=1848

# so, host4 should win:
weighed_host = self._get_weighed_host(hostinfo_list)
self.assertEqual(weighed_host.weight, 1848.0)
self.assertEqual(weighed_host.obj.host, 'host4')

def test_capacity_weight_multiplier2(self):
self.flags(allocated_capacity_weight_multiplier=-2.0)
hostinfo_list = self._get_all_hosts()

# host1: allocated_capacity_gb=0, weight=0
# host2: allocated_capacity_gb=1748, weight=-3496
# host3: allocated_capacity_gb=256, weight=-512
# host4: allocated_capacity_gb=1848, weight=-3696

# so, host1 should win:
weighed_host = self._get_weighed_host(hostinfo_list)
self.assertEqual(weighed_host.weight, 0)
self.assertEqual(weighed_host.obj.host, 'host1')
6 changes: 3 additions & 3 deletions cinder/tests/scheduler/test_capacity_weigher.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_default_of_spreading_first(self):

# host1: free_capacity_gb=1024, free=1024*(1-0.1)
# host2: free_capacity_gb=300, free=300*(1-0.1)
# host3: free_capacity_gb=512, free=512
# host3: free_capacity_gb=512, free=256
# host4: free_capacity_gb=200, free=200*(1-0.05)

# so, host1 should win:
Expand All @@ -70,7 +70,7 @@ def test_capacity_weight_multiplier1(self):

# host1: free_capacity_gb=1024, free=-1024*(1-0.1)
# host2: free_capacity_gb=300, free=-300*(1-0.1)
# host3: free_capacity_gb=512, free=-512
# host3: free_capacity_gb=512, free=-256
# host4: free_capacity_gb=200, free=-200*(1-0.05)

# so, host4 should win:
Expand All @@ -84,7 +84,7 @@ def test_capacity_weight_multiplier2(self):

# host1: free_capacity_gb=1024, free=1024*(1-0.1)*2
# host2: free_capacity_gb=300, free=300*(1-0.1)*2
# host3: free_capacity_gb=512, free=512*2
# host3: free_capacity_gb=512, free=256*2
# host4: free_capacity_gb=200, free=200*(1-0.05)*2

# so, host1 should win:
Expand Down
1 change: 1 addition & 0 deletions cinder/tests/test_gpfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def setUp(self):
self.volume = importutils.import_object(CONF.volume_manager)
self.volume.driver.set_execute(self._execute_wrapper)
self.volume.driver.set_initialized()
self.volume.stats = dict(allocated_capacity_gb=0)

self.stubs.Set(GPFSDriver, '_create_gpfs_snap',
self._fake_gpfs_snap)
Expand Down
1 change: 1 addition & 0 deletions cinder/tests/test_rbd.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ def setUp(self):
super(ManagedRBDTestCase, self).setUp()
fake_image.stub_out_image_service(self.stubs)
self.volume.driver.set_initialized()
self.volume.stats = {'allocated_capacity_gb': 0}
self.called = []

def _create_volume_from_image(self, expected_status, raw=False,
Expand Down
1 change: 1 addition & 0 deletions cinder/tests/test_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def setUp(self):
self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)
self.stubs.Set(os.path, 'exists', lambda x: True)
self.volume.driver.set_initialized()
self.volume.stats = {'allocated_capacity_gb': 0}
# keep ordered record of what we execute
self.called = []

Expand Down
Loading

0 comments on commit 254e37a

Please sign in to comment.