You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi, I have updated the code and created a Jupyter notebook to work on Python 3 (the original code was in Python 2).
You can use them to update your repo.
#!/usr/bin/env python# On 20121129# Critical Line Algorithm# by MLdP <[email protected]>importnumpyasnp# ---------------------------------------------------------------# ---------------------------------------------------------------classCLA:
def__init__(self, mean, covar, lB, uB):
# Initialize the classself.mean=meanself.covar=covarself.lB=lBself.uB=uBself.w= [] # solutionself.l= [] # lambdasself.g= [] # gammasself.f= [] # free weights# ---------------------------------------------------------------defsolve(self):
# Compute the turning points,free sets and weightsf, w=self.initAlgo()
self.w.append(np.copy(w)) # store solutionself.l.append(-np.inf)
self.g.append(None)
self.f.append(f[:])
whileTrue:
# 1) case a): Bound one free weightl_in=-np.infiflen(f) >1:
covarF, covarFB, meanF, wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
j=0foriinf:
l, bi=self.computeLambda(
covarF_inv, covarFB, meanF, wB, j, [self.lB[i], self.uB[i]]
)
ifl>l_in:
l_in, i_in, bi_in=l, i, bij+=1# 2) case b): Free one bounded weightl_out=-np.infiflen(f) <self.mean.shape[0]:
b=self.getB(f)
foriinb:
covarF, covarFB, meanF, wB=self.getMatrices(f+ [i])
covarF_inv=np.linalg.inv(covarF)
l, bi=self.computeLambda(
covarF_inv,
covarFB,
meanF,
wB,
meanF.shape[0] -1,
self.w[-1][i],
)
if (self.l[-1] ==-np.inforl<self.l[-1]) andl>l_out:
l_out, i_out=l, i# 3) decide lambdaif (l_in==-np.inforl_in<0) and (l_out==-np.inforl_out<0):
breakifl_in>l_out:
# If it's the first elment and it's Nan (-np.inf) replace itiflen(self.l) ==1andself.l[0] ==-np.inf:
self.l[0] =l_inelse:
self.l.append(l_in)
f.remove(i_in)
w[i_in] =bi_in# set value at the correct boundaryelse:
# If it's the first elment and it's Nan (-np.inf) replace itiflen(self.l) ==1andself.l[0] ==-np.inf:
self.l[0] =l_outelse:
self.l.append(l_out)
f.append(i_out)
# 4) compute solution vectorcovarF, covarFB, meanF, wB=self.getMatrices(f)
covarF_inv=np.linalg.inv(covarF)
wF, g=self.computeW(covarF_inv, covarFB, meanF, wB)
foriinrange(len(f)):
w[f[i]] =wF[i]
self.w.append(np.copy(w)) # store solutionself.g.append(g)
self.f.append(f[:])
iflen(f) ==self.mean.shape[0]:
# 5) minimum variance solutionwF, g=self.computeW(covarF_inv, covarFB, np.zeros(meanF.shape), wB)
foriinrange(len(f)):
w[f[i]] =wF[i]
self.w.append(np.copy(w)) # store solutionself.g.append(g)
self.f.append(f[:])
# Remove first element from weights (it's a duplicate)self.w=self.w[1:]
# If we miss the last lambda insert itiflen(self.l) <len(self.w):
self.l+= [0.0]
# ---------------------------------------------------------------definitAlgo(self):
# Initialize the algo# 1) Form structured arraya=np.zeros((self.mean.shape[0]), dtype=[("id", int), ("mu", float)])
b= [self.mean[i][0] foriinrange(self.mean.shape[0])] # dump array into lista[:] =list(zip(range(self.mean.shape[0]), b)) # fill structured array# 2) Sort structured arrayb=np.sort(a, order="mu")
# 3) First free weighti, w=b.shape[0], np.copy(self.lB)
whilesum(w) <1:
i-=1w[b[i][0]] =self.uB[b[i][0]]
w[b[i][0]] +=1-sum(w)
return [b[i][0]], w# ---------------------------------------------------------------defcomputeBi(self, c, bi):
ifc>0:
bi=bi[1]
ifc<0:
bi=bi[0]
returnbi# ---------------------------------------------------------------defcomputeW(self, covarF_inv, covarFB, meanF, wB):
# 1) compute gammaonesF=np.ones(meanF.shape)
g1=np.dot(np.dot(onesF.T, covarF_inv), meanF)
g2=np.dot(np.dot(onesF.T, covarF_inv), onesF)
ifwBisNone:
g, w1=float(-self.l[-1] *g1/g2+1/g2), 0else:
onesB=np.ones(wB.shape)
g3=np.dot(onesB.T, wB)
g4=np.dot(covarF_inv, covarFB)
w1=np.dot(g4, wB)
g4=np.dot(onesF.T, w1)
g=float(-self.l[-1] *g1/g2+ (1-g3+g4) /g2)
# 2) compute weightsw2=np.dot(covarF_inv, onesF)
w3=np.dot(covarF_inv, meanF)
return-w1+g*w2+self.l[-1] *w3, g# ---------------------------------------------------------------defcomputeLambda(self, covarF_inv, covarFB, meanF, wB, i, bi):
# 1) ConesF=np.ones(meanF.shape)
c1=np.dot(np.dot(onesF.T, covarF_inv), onesF)
c2=np.dot(covarF_inv, meanF)
c3=np.dot(np.dot(onesF.T, covarF_inv), meanF)
c4=np.dot(covarF_inv, onesF)
c=-c1*c2[i] +c3*c4[i]
ifc==0:
return# 2) biiftype(bi) ==list:
bi=self.computeBi(c, bi)
# 3) LambdaifwBisNone:
# All free assetsreturnfloat((c4[i] -c1*bi) /c), bielse:
onesB=np.ones(wB.shape)
l1=np.dot(onesB.T, wB)
l2=np.dot(covarF_inv, covarFB)
l3=np.dot(l2, wB)
l2=np.dot(onesF.T, l3)
returnfloat(((1-l1+l2) *c4[i] -c1* (bi+l3[i])) /c), bi# ---------------------------------------------------------------defgetMatrices(self, f):
# Slice covarF,covarFB,covarB,meanF,meanB,wF,wBcovarF=self.reduceMatrix(self.covar, f, f)
meanF=self.reduceMatrix(self.mean, f, [0])
b=self.getB(f)
covarFB=self.reduceMatrix(self.covar, f, b)
wB=self.reduceMatrix(self.w[-1], b, [0])
returncovarF, covarFB, meanF, wB# ---------------------------------------------------------------defgetB(self, f):
returnself.diffLists(np.arange(self.mean.shape[0]), f)
# ---------------------------------------------------------------defdiffLists(self, list1, list2):
returnlist(set(list1) -set(list2))
# ---------------------------------------------------------------defreduceMatrix(self, matrix, listX, listY):
# Reduce a matrix to the provided list of rows and columnsiflen(listX) ==0orlen(listY) ==0:
returnmatrix_=matrix[:, listY[0] : listY[0] +1]
foriinlistY[1:]:
a=matrix[:, i : i+1]
matrix_=np.append(matrix_, a, 1)
matrix__=matrix_[listX[0] : listX[0] +1, :]
foriinlistX[1:]:
a=matrix_[i : i+1, :]
matrix__=np.append(matrix__, a, 0)
returnmatrix__# ---------------------------------------------------------------defgetMinVar(self):
# Get the minimum variance solutionvar= []
forwinself.w:
a=np.dot(np.dot(w.T, self.covar), w)
var.append(a)
returnmin(var) **0.5, self.w[var.index(min(var))]
# ---------------------------------------------------------------defgetMaxSR(self):
# Get the max Sharpe ratio portfolio# 1) Compute the local max SR portfolio between any two neighbor turning pointsw_sr, sr= [], []
foriinrange(len(self.w) -1):
w0=np.copy(self.w[i])
w1=np.copy(self.w[i+1])
kargs= {"minimum": False, "args": (w0, w1)}
a, b=self.goldenSection(self.evalSR, 0, 1, **kargs)
w_sr.append(a*w0+ (1-a) *w1)
sr.append(b)
returnmax(sr), w_sr[sr.index(max(sr))]
# ---------------------------------------------------------------defevalSR(self, a, w0, w1):
# Evaluate SR of the portfolio within the convex combinationw=a*w0+ (1-a) *w1b=np.dot(w.ravel(), self.mean)[0]
c=np.sqrt(np.dot(np.dot(w.ravel(), self.covar), w.ravel()))
returnb/c# ---------------------------------------------------------------defgoldenSection(self, obj, a, b, **kargs):
# Golden section method. Maximum if kargs['minimum']==False is passed# from math import log,ceiltol, sign, args=1.0e-9, 1, Noneif"minimum"inkargsandkargs["minimum"] ==False:
sign=-1if"args"inkargs:
args=kargs["args"]
numIter=int(np.ceil(-2.078087*np.log(tol/np.abs(b-a))))
r=0.618033989c=1.0-r# Initializex1=r*a+c*bx2=c*a+r*bf1=sign*obj(x1, *args)
f2=sign*obj(x2, *args)
# Loopforiinrange(numIter):
iff1>f2:
a=x1x1=x2f1=f2x2=c*a+r*bf2=sign*obj(x2, *args)
else:
b=x2x2=x1f2=f1x1=r*a+c*bf1=sign*obj(x1, *args)
iff1<f2:
returnx1, sign*f1else:
returnx2, sign*f2# ---------------------------------------------------------------defefFrontier(self, points):
# Get the efficient frontiermu, sigma, weights= [], [], []
a=np.linspace(0, 1, points//len(self.w))[
:-1
] # remove the 1, to avoid duplicationsb=np.arange(len(self.w) -1)
foriinb:
w0, w1=self.w[i], self.w[i+1]
ifi==b[-1]:
a=np.linspace(
0, 1, points//len(self.w)
) # include the 1 in the last iterationforjina:
w=w1*j+ (1-j) *w0weights.append(np.copy(w))
mu.append(np.dot(w.T, self.mean)[0, 0])
sigma.append(np.dot(np.dot(w.T, self.covar), w)[0, 0] **0.5)
returnmu, sigma, weights# ---------------------------------------------------------------# ---------------------------------------------------------------
importCLAimportmatplotlib.pyplotaspltimportnumpyasnpimportpandasaspdfromIPython.displayimportMarkdown, display# Python >= 3.9 or you might have issues with type-hints
Helper functions
defcompute_return_risk(
df: pd.DataFrame, mean: np.ndarray, covar: np.ndarray
) ->tuple[pd.Series, pd.Series]:
"""Compute Return and Risk (volatility) series for the CLW weights Args: df (pd.DataFrame): Dataframe with the weights mean (np.ndarray): Array of mean returns covar (np.ndarray): Array of covariances Returns: tuple[pd.Series, pd.Series]: series of returns and risk """p_ret= []
risk= []
foriindf.index:
p_ret.append(np.dot(df.loc[i, :].values, mean)[0])
risk.append(
np.sqrt(np.dot(df.loc[i, :].values, np.dot(covar, df.loc[i, :].values)))
)
returnpd.Series(p_ret, index=df.index, name="Return"), pd.Series(
risk, index=df.index, name="Risk"
)
defcreate_weight_table(
cla: CLA, mean: np.ndarray, covar: np.ndarray, var_names: list
) ->pd.DataFrame:
"""Create the table with Return, Risk, Lambda and Weights Args: cla (CLA): The CLA object with the solution mean (np.ndarray): Array with the return means covar (np.ndarray): Array of covariances var_names (list): List of names for the columns Returns: pd.DataFrame: The weights table """weights=pd.DataFrame(
np.hstack(cla.w).T, columns=var_names, index=np.arange(1, len(cla.w) +1)
)
port_return, port_risk=compute_return_risk(weights, mean, covar)
port_lambda=pd.Series(cla.l, index=weights.index, name="Lambda")
returnpd.concat([port_return, port_risk, port_lambda, weights], axis=1)
defdisplay_results(ret_mean: np.ndarray, cla: CLA) ->None:
"""Plot the CLA results and dispaly the maximum Sharpe portfolio and the minimum variance one Args: ret_mean (np.ndarray): Array or return means cla (CLA): CLA object with the solution """plot_results()
print()
# 5) Get Maximum Sharpe ratio portfoliosr, w_sr=cla.getMaxSR()
display(
Markdown(
f"### Portfolio volatility: {np.sqrt(np.dot(np.dot(w_sr.ravel(),cla.covar),w_sr.ravel())):.2%}, Sharpe ratio: {sr:.2f}"
)
)
display(Markdown("### Weights (rounded to 4 decimal places):"))
display(
pd.Series(
np.round(w_sr.ravel(), 4),
name="sr_weights",
index=np.arange(1, len(w_sr) +1),
)
)
print()
# 6) Get Minimum Variance portfoliomv, w_mv=cla.getMinVar()
mu_v=np.dot(w_mv.ravel(), ret_mean)
sr_v=np.round(mu_v/mv, 2).ravel()[0]
display(
Markdown(
f"### Portfolio minimum volatility: {mv.ravel()[0]:.2%}, Sharpe ratio: {sr_v:.2f}"
)
)
display(Markdown("### Weights (rounded to 4 decimal places):"))
display(
pd.Series(
np.round(w_mv.ravel(), 4),
name="mv_weights",
index=np.arange(1, len(w_sr) +1),
)
)
print()
return
Hi, I have updated the code and created a Jupyter notebook to work on Python 3 (the original code was in Python 2).
You can use them to update your repo.
And the notebook:
CLA in Python 3
Original code in Python 2, together with the example dataset, is here: https://github.com/mdengler/cla
Helper functions
Import dataset
Create variables
var_names
Compute CLA
Display results
Portfolio volatility: 22.74%, Sharpe ratio: 4.45
Weights (rounded to 4 decimal places):
Portfolio minimum volatility: 20.52%, Sharpe ratio: 3.91
Weights (rounded to 4 decimal places):
Thank you very much for your repo.
Kind regards,
Andrea Dalseno
The text was updated successfully, but these errors were encountered: