Skip to content

Commit

Permalink
Add initial CRSI indicator
Browse files Browse the repository at this point in the history
Rely on Pandas-TA built-in Wilder's RSI indicator and TA-lib
counterparts, and implement percent_rank and streak calculation
functions using NumPy. This was tested against large time series with 1
million entries and average of 20 runs returned a 4s time for the CRSI
indicator function via *timeit*.
Other approaches, specially ones relying on Pandas native functionality
became very expensive as the timeseries increased in size.
TA-lib native RSI is slightly faster, around 3.4s average of 20 runs on
a 1 million entries OHLCVA timeseries.
  • Loading branch information
luisbarrancos committed Nov 24, 2023
1 parent 39e51af commit ced358c
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 4 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ Back to [Contents](#contents)
* _Center of Gravity_: **cg**
* _Chande Momentum Oscillator_: **cmo**
* _Coppock Curve_: **coppock**
* _Connors Relative Strenght Index_: **crsi**
* _Correlation Trend Indicator_: **cti**
* A wrapper for ```ta.linreg(series, r=True)```
* _Directional Movement_: **dm**
Expand Down
6 changes: 6 additions & 0 deletions pandas_ta/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,12 @@ def coppock(self, length=None, fast=None, slow=None, offset=None, **kwargs: Dict
result = coppock(close=close, length=length, fast=fast, slow=slow, offset=offset, **kwargs)
return self._post_process(result, **kwargs)

def crsi(self, length_rsi=None, length_streak=None, length_rank=None,
drift=None, offset=None, **kwargs: DictLike):
close = self._get_column(kwargs.pop("close", "close"))
result = crsi(close=close, length_rsi=length_rsi, length_streak=length_streak, length_rank=length_rank, drift=drift, offset=offset, **kwargs)
return self._post_process(result, **kwargs)

def cti(self, length=None, offset=None, **kwargs: DictLike):
close = self._get_column(kwargs.pop("close", "close"))
result = cti(close=close, length=length, offset=offset, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion pandas_ta/maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
# Momentum
"momentum": [
"ao", "apo", "bias", "bop", "brar", "cci", "cfo", "cg", "cmo",
"coppock", "cti", "er", "eri", "fisher", "inertia", "kdj", "kst",
"coppock", "crsi", "cti", "er", "eri", "fisher", "inertia", "kdj", "kst",
"macd", "mom", "pgo", "ppo", "psl", "qqe", "roc", "rsi", "rsx",
"rvgi", "slope", "smi", "squeeze", "squeeze_pro", "stc", "stoch",
"stochf", "stochrsi", "td_seq", "tmo", "trix", "tsi", "uo", "willr"
Expand Down
2 changes: 2 additions & 0 deletions pandas_ta/momentum/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from .cg import cg
from .cmo import cmo
from .coppock import coppock
from .crsi import crsi
from .cti import cti
from .dm import dm
from .er import er
Expand Down Expand Up @@ -53,6 +54,7 @@
"cg",
"cmo",
"coppock",
"crsi",
"cti",
"dm",
"er",
Expand Down
218 changes: 218 additions & 0 deletions pandas_ta/momentum/crsi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
# -*- coding: utf-8 -*-
import numpy as np
from pandas import Series

from pandas_ta._typing import Array1d, DictLike, Int, IntFloat
from pandas_ta.maps import Imports
from pandas_ta.momentum.rsi import rsi
from pandas_ta.utils import (
v_drift,
v_offset,
v_pos_default,
v_scalar,
v_series,
v_talib,
)


def calculate_streak_conv(prices: Array1d) -> Array1d:
"""Calculate the streak of consecutive price increases or decreases.
This function computes the streak of consecutive daily price increases
or decreases. A positive streak indicates consecutive days of price
increases, while a negative streak indicates consecutive days of price
decreases. The streak is reset to zero when the direction of the price
change reverses.
Parameters:
prices (np.array): An array of prices.
Returns:
np.array: An array representing the streak of price changes.
The function first calculates the difference between consecutive prices.
It then assigns a +1 for each positive change, -1 for each negative change,
and 0 for no change. The result is an array where each element represents
the streak value for that day.
Example:
>>> prices = np.array([100, 101, 102, 100, 100, 101, 102, 103])
>>> calculate_streak_conv(prices)
array([ 0, 1, 1, -1, 0, 1, 1, 1])
"""
diff = np.diff(prices)
streaks = np.sign(diff)
return np.concatenate(([0], streaks))


def calculate_percent_rank(close: Series, lookback: Int) -> Series:
"""Calculate the Percent Rank of daily returns over given period.
The Percent Rank is computed by comparing the daily returns of a financial
instrument to its previous returns within a given lookback window. It
measures the percentage of values in the lookback period that are less than
the current value.
Args:
close (pd.Series): Series of 'close's
lookback (int): The lookback period for calculating the Percent Rank.
Returns:
pd.Series: A Pandas Series containing the Percent Rank values.
The function first calculates the daily returns of the 'close' prices. It
then creates a rolling window of these returns and compares each value in
the window to the current value (the last value in each window). The
Percent Rank is calculated as the percentage of values in each window that
are less than the current value.
The result is a Series where the initial part (up to 'lookback - 1') is
padded with NaNs, and the rest contains the Percent Rank values.
Example:
>>> close = pd.Series([100, 80, 75, 123, 140, 80, 70, 40, 100, 120])
>>> calculate_percent_rank(close, 3)
0 NaN
1 NaN
2 50.0
3 100.0
4 50.0
5 0.0
6 50.0
7 0.0
8 100.0
9 50.0
dtype: float64
"""

daily_returns_np = (close.pct_change()).to_numpy()

# Iterate over daily returns with fixed window size for given loopback
# period, then compare the previous periods against the last (current)
# period in each window row
rolling_windows = np.lib.stride_tricks.sliding_window_view(
daily_returns_np, window_shape=lookback
)

comparison_matrix = rolling_windows[:, :-1] < rolling_windows[:, -1:]
percent_ranks = np.mean(comparison_matrix, axis=1) * 100

padded_percent_ranks = np.full(len(close), np.nan)
padded_percent_ranks[lookback - 1 :] = percent_ranks

return Series(padded_percent_ranks, index=close.index)


def crsi(
close: Series,
length_rsi: Int = None,
length_streak: Int = None,
length_rank: Int = None,
scalar: IntFloat = None,
talib: bool = None,
drift: Int = None,
offset: Int = None,
**kwargs: DictLike,
) -> Series:
"""Connors Relative Strength Index (RSI)
Connors RSI (CRSI) integrates Relative Strength Index (RSI), UpDown Length,
and Rate of Change (ROC) of RSI components to evaluate overbought and
oversold conditions in financial markets, providing insights into price
momentum and potential reversals.
Sources:
Connors, L., Alvarez, C., & Radtke, M. (2012). An Introduction to
ConnorsRSI. Connors Research Trading Strategy Series.
ISBN 978-0-9853072-9-5.
Retrieved from https://alvarezquanttrading.com/blog/connorsrsi-analysis/
https://www.tradingview.com/support/solutions/43000502017-connors-rsi-crsi/
Args:
close (pd.Series): Series of 'close's
length_rsi (int): It's period. Default: 3
length_streak (int): It's period. Default: 2
length_rank (int): It's period. Default: 100
scalar (float): How much to magnify. Default: 100
talib (bool): Use TAlib for RSI if available. Default: True
drift (int): The difference period. Default: 1
offset (int): How many periods to offset the result. Default: 0
Kwargs:
fillna (value, optional): pd.DataFrame.fillna(value)
fill_method (value, optional): Type of fill method
Returns:
pd.Series: New feature generated
"""

# Validate
length_rsi = v_pos_default(length_rsi, 3)
length_streak = v_pos_default(length_streak, 2)
length_rank = v_pos_default(length_rank, 100)
_length = max(length_rsi, length_streak, length_rank)
close = v_series(close, _length)

if "length" in kwargs:
kwargs.pop("length")

if close is None:
return None

scalar = v_scalar(scalar, 100)
mode_tal = v_talib(talib)
drift = v_drift(drift)
offset = v_offset(offset)

# Initial streaks calculation
streak = Series(calculate_streak_conv(close.to_numpy()), index=close.index)

if Imports["talib"] and mode_tal:
from talib import RSI

close_rsi = RSI(close, length_rsi)
streak_rsi = RSI(streak, length_streak)

# Both TA-lib and Pandas-TA use the Wilder's RSI and its smoothing function.
else:
close_rsi = rsi(
close,
length=length_rsi,
scalar=scalar,
talib=talib,
drift=drift,
offset=offset,
**kwargs,
)

streak_rsi = rsi(
streak,
length=length_streak,
scalar=scalar,
talib=talib,
drift=drift,
offset=offset,
**kwargs,
)

# Percent rank and final arithmetic mean
percent_rank = calculate_percent_rank(close, length_rank)
crsi = (close_rsi + streak_rsi + percent_rank) / 3.0

# Offset
if offset != 0:
crsi = crsi.shift(offset)

# Fill
if "fillna" in kwargs:
crsi.fillna(kwargs["fillna"], inplace=True)
if "fill_method" in kwargs:
crsi.fillna(method=kwargs["fill_method"], inplace=True)

# Name and Category
crsi.name = f"CRSI_{length_rsi}_{length_streak}_{length_rank}"
crsi.category = "momentum"

return crsi
16 changes: 16 additions & 0 deletions tests/test_indicator_momentum.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ def test_cti(df):
assert result.name == "CTI_12"


def test_crsi(df):
result = ta.crsi(df.close)
assert isinstance(result, Series)
assert result.name == "CRSI_3_2_100"


def test_er(df):
result = ta.er(df.close)
assert isinstance(result, Series)
Expand Down Expand Up @@ -637,11 +643,21 @@ def test_ext_coppock(df):
assert df.columns[-1] == "COPC_11_14_10"


def test_ext_crsi(df):
df.ta.crsi(append=True)
assert df.columns[-1] == "CRSI_3_2_100"


def test_ext_cti(df):
df.ta.cti(append=True)
assert df.columns[-1] == "CTI_12"


def test_ext_crsi(df):
df.ta.crsi(append=True)
assert df.columns[-1] == "CRSI_3_2_100"


def test_ext_er(df):
df.ta.er(append=True)
assert df.columns[-1] == "ER_10"
Expand Down
6 changes: 3 additions & 3 deletions tests/test_studies.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
[pytest.param(ta.CommonStudy, id="common"), pytest.param(ta.AllStudy, id="all")]

# +/- when adding/removing indicators
ALL_COLUMNS = 321
ALL_COLUMNS = 322


def test_all_study_props(all_study):
Expand All @@ -31,7 +31,7 @@ def test_common_study_props(common_study):


@pytest.mark.parametrize("category,columns", [
("candles", 70), ("cycles", 2), ("momentum", 77), ("overlap", 56),
("candles", 70), ("cycles", 2), ("momentum", 78), ("overlap", 56),
("performance", 2), ("statistics", 16), ("transform", 5), ("trend", 29),
("volatility", 36), ("volume", 28),
pytest.param(ta.AllStudy, ALL_COLUMNS, id=f"all-{ALL_COLUMNS}"),
Expand Down Expand Up @@ -89,7 +89,7 @@ def test_study_custom_e(df, custom_study_e, talib):

@pytest.mark.parametrize("talib", [False, True])
def test_study_all_multirun(df, all_study, talib):
all_columns = 608 # +/- when adding/removing indicators
all_columns = 609 # +/- when adding/removing indicators
initial_columns = df.shape[1]
df.ta.study(all_study, length=10, cores=0, talib=talib)
df.ta.study(all_study, length=50, cores=0, talib=talib)
Expand Down

0 comments on commit ced358c

Please sign in to comment.