-
Notifications
You must be signed in to change notification settings - Fork 1
/
carbon-aware-service.py
85 lines (74 loc) · 2.74 KB
/
carbon-aware-service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from datetime import datetime
from enum import Enum
from flask import Flask,request,jsonify
from os import environ
from math import floor
# ------ STRATEGIES ------
# Import and enum carbon-aware strategies (aka. flavours)
from flavours.interface import CarbonAwareStrategy
from flavours.low_power import LowPowerStrategy
from flavours.medium_power import MediumPowerStrategy
from flavours.high_power import HighPowerStrategy
class CarbonAwareStrategies(Enum):
LowPower = LowPowerStrategy
MediumPower = MediumPowerStrategy
HighPower = HighPowerStrategy
# ------ CONTEXT ------
# Carbon-aware context
class Context:
# constructor
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
# initializer
def __init__(self):
self.assignment = {}
assignment = open(environ["ASSIGNMENT"])
for a_line in list(assignment)[1:]:
a = a_line.replace("\n","").split(",")
timestamp = datetime.strptime(a[0],"%Y-%m-%dT%H:%MZ")
strategy = a[1]
self.assignment[timestamp.hour + 0.5*floor(timestamp.minute/30)] = CarbonAwareStrategies[strategy].value
assignment.close()
# getter for carbon-aware strategy
def getCarbonAwareStrategy(self,force_strategy) -> CarbonAwareStrategy:
if force_strategy is not None:
return CarbonAwareStrategies[force_strategy].value
now = datetime.now()
return self.assignment[now.hour + 0.5*floor(now.minute/30)]
# ------ SERVICE ------
app = Flask(__name__)
# app data
with open("data/numbers.txt","r") as numbers:
values = numbers.read().split(",")
app.data = []
for val in values:
app.data.append(int(val))
# set service's context
app.context = Context()
@app.route("/")
def nop():
# Parse params and check if forced to run a given strategy
force_strategy = request.args.get("force")
# Get carbon-aware strategy
strategy = app.context.getCarbonAwareStrategy(force_strategy)
# Invoke strategy with dynamic typing
answer = strategy.nop() + "\n"
return answer
@app.route("/avg")
def avg():
# Parse params and check if forced to run a given strategy
force_strategy = request.args.get("force")
# Get carbon-aware strategy
strategy = app.context.getCarbonAwareStrategy(force_strategy)
# Invoke strategy with dynamic typing (and measure running time)
start = datetime.now()
average = strategy.avg(app.data)
end = datetime.now()
elapsed = round((end.timestamp() - start.timestamp())*1000,2) # in milliseconds
# Return result and elapsed time
result = {}
result["value"] = average
result["elapsed"] = elapsed
result["strategy"] = strategy.nop()
return jsonify(result)
app.run(host='0.0.0.0',port=50000)