如何通过熊猫数据框中的记录迭代来计算相对强度指数(RSI)
原标题:How to calculate the Relative Strength Index (RSI) through record iterations in pandas dataframe
I have created a pandas dataframe as follows:
import pandas as pd
import numpy as np
ds = { trend : [1,1,1,1,2,2,3,3,3,3,3,3,4,4,4,4,4], price : [23,43,56,21,43,55,54,32,9,12,11,12,23,3,2,1,1]}
df = pd.DataFrame(data=ds)
The dataframe looks as follows:
display(df)
trend price
0 1 23
1 1 43
2 1 56
3 1 21
4 2 43
5 2 55
6 3 54
7 3 32
8 3 9
9 3 12
10 3 11
11 3 12
12 4 23
13 4 3
14 4 2
15 4 1
16 4 1
I have saved the dataframe to a .csv file called df.csv:
df.to_csv("df.csv", index = False)
I have then created a function that calculates the Relative Strength Index (RSI - see: https://www.investopedia.com/terms/r/rsi.asp):
def get_RSI(df, column, time_window):
"""Return the RSI indicator for the specified time window."""
diff = df[column].diff(1)
# This preservers dimensions off diff values.
up_chg = 0 * diff
down_chg = 0 * diff
# Up change is equal to the positive difference, otherwise equal to zero.
up_chg[diff > 0] = diff[diff > 0]
# Down change is equal to negative deifference, otherwise equal to zero.
down_chg[diff < 0] = diff[diff < 0]
# We set com = time_window-1 so we get decay alpha=1/time_window.
up_chg_avg = up_chg.ewm(com=time_window - 1,
min_periods=time_window).mean()
down_chg_avg = down_chg.ewm(com=time_window - 1,
min_periods=time_window).mean()
RS = abs(up_chg_avg / down_chg_avg)
df[ RSI ] = 100 - 100 / (1 + RS)
df = df[[ RSI ]]
return df
I need to create a new field called RSI which:
iterates through each and every record of the dataframe
calculates the RSI by considering the price observed at each iteration and the last prices (RSI length is 3 in this example) observed in the previous trends.
For example:
I iterate at record 0 and the RSI is NaN (missing).
I iterate at record 1 and the RSI is still NaN (missing)
I Iterate at record 12 and the RSI is 47.667343 (it considers price at record 3, price at record 5 and price at record 12
I Iterate at record 13 and the RSI is 28.631579 (it considers price at record 3, price at record 5 and price at record 13
I Iterate at record 15 and the RSI is 27.586207 (it considers price at record 3, price at record 5 and price at record 15
and so on .....
I have then written this code:
rsi = []
for i in range(len(df)):
ds = pd.read_csv("df.csv", nrows=i+1)
print(ds.info())
d = ds.groupby([ trend ], as_index=False).agg(
{ price : last })
get_RSI(d, price ,3)
rsi.append(d[ RSI ].iloc[-1])
df[ RSI ] = rsi
The dataset looks correct:
display(df)
trend price RSI
0 1 23 NaN
1 1 43 NaN
2 1 56 NaN
3 1 21 NaN
4 2 43 NaN
5 2 55 NaN
6 3 54 NaN
7 3 32 NaN
8 3 9 NaN
9 3 12 NaN
10 3 11 NaN
11 3 12 NaN
12 4 23 47.667343
13 4 3 28.631579
14 4 2 28.099174
15 4 1 27.586207
16 4 1 27.586207
Th problem is that I need to process about 4 million records and it would take approximately 60 hours.
Does anyone know how to get the same results in a quick, efficient way, please?
问题回答
It wasn t completely clear in the question what the analysis was trying to achieve - there is a high likelihood that there s a more efficient method if this was understood.
Using the below approach I managed to get the time down from 5 hours to 35 minutes on the simulated dataset, whilst still using the same methodology.
I simulated your 4m dataset as follows:-
import random
from tqdm import tqdm
random.seed(1337)
trends = [x for x in range(1000)]
ds = {
# sorted as I assume the trend is the time indicator.
"trend": sorted(
[trends[int((random.random() * 1000))] for _ in tqdm(range(int(4e6)))]
),
"price": [random.random() * 100 for _ in tqdm(range(int(4e6)))],
}
df = pd.DataFrame(data=ds)
df.to_csv(
"fake_data.csv", index=True
) # reccommend keeping index to keep track of the time series
Before iterating over the df, I created a list of cartesian products (list of lists), where each cartesian product is the slice of the original df for which you are calculating the RSI. We can then iterate over this list to retrieve each required df slice.
import pandas as pd
from copy import deepcopy
from tqdm import tqdm
df = pd.read_csv("./fake_data.csv")
time_window = 3
def get_trend_ranges(df: pd.DataFrame, trends: list) -> dict[int, tuple]:
"""
trends - list of unique trend values
returns - start and end index of each range (assume df sorted by range)
"""
trends_ranges = {}
for trend in trends:
df_trend = df[df["trend"] == trend]
start_idx = df_trend.iloc[0].name
end_idx = df_trend.iloc[-1].name
trends_ranges[trend] = (start_idx, end_idx)
return trends_ranges
def _get_idx_slices(trends: list, trends_ranges: dict, time_window) -> list[list]:
"""
trends - list of unique trend values
trends_ranges - {trend: (start_index, end idx)}
returns [[idx1, idx2, ..,idxt+1], [idx11, idx22, ..., idx2t+1],..] where t is time_window
"""
if time_window >= len(trends) or time_window <= 0:
raise Exception("Time Window not within bounds")
else:
unique_trend_analyses = len(trends) - time_window
idx_lst = []
for i in tqdm(range(unique_trend_analyses)):
fixed_t = trends[i : i + time_window]
fixed_idx = [trends_ranges[trend][1] for trend in fixed_t]
var_t = trends[i + time_window]
var_t_start_idx, var_t_stop_idx = (
trends_ranges[var_t][0],
trends_ranges[var_t][1] + 1,
)
for j in range(var_t_start_idx, var_t_stop_idx):
full_idx = deepcopy(fixed_idx)
full_idx.append(j)
idx_lst.append(full_idx)
return idx_lst
# in order to store ranges of each trend, we assume trend is sorted.
def get_idx_slices(df, time_window):
trends = df["trend"].unique().tolist()
trends_ranges = get_trend_ranges(df, trends)
idx_slices = _get_idx_slices(trends, trends_ranges, time_window)
return idx_slices
Now we can iterate over the idx list to use the df slices that we require.
def get_RSI(df, time_window):
"""Return the RSI indicator for the specified time window."""
up_chg = df.apply(lambda x: x if x > 0 else 0)
down_chg = df.apply(lambda x: x if x < 0 else 0)
# We set com = time_window-1 so we get decay alpha=1/time_window.
up_chg_avg = up_chg.ewm(com=time_window - 1, min_periods=time_window).mean()
down_chg_avg = down_chg.ewm(com=time_window - 1, min_periods=time_window).mean()
RS = abs(up_chg_avg / down_chg_avg)
RSI = 100 - 100 / (1 + RS)
return RSI.iloc[-1]
df["RSI"] = ""
idx_lst = get_idx_slices(df, time_window)
for idx in tqdm(idx_lst):
df_slice = df.iloc[idx]
df_slice_diff = df_slice["price"].diff()
RSI = get_RSI(df_slice_diff, time_window)
last_idx = idx[len(idx) - 1]
df.loc[last_idx, "RSI"] = RSI
Instead of reading the file on each line in a loop, it is better to do this:
ds = df.loc[:i]
Now what I did: The ind column was created to receive indexes in map. An empty array arr is generated, which is filled on each line in func_numpy. The rest is your operations, which are converted to vector ones, which are excluded from being processed row by row and executed in one go.
The hardest thing here is the grouping in map on each row. I couldn’t solve the problem of getting the last group values on each line without accessing it line by line. Tested 5-10 times faster with a dataframe of up to 100,000 rows(if you count with reading the file on each line, you can add 5 to this figure). I also checked that the calculated RSI values coincide with your algorithm. If use a simple numpy average instead of ewm (counting by last values instead of a sliding window), it will be a little faster, but not by much and the values will be different.
#dataframe generation
n = 15
trend = np.random.randint(low=1, high=7, size=(n))
price = np.random.uniform(low=1, high=100, size=(n))
df = pd.DataFrame({ trend : trend, price : price})
df[ ind ] = df.index
size_trend = df[ trend ].unique().size
arr = np.full((len(df), size_trend), np.nan)
def func_numpy(x):
row_last = df.loc[:x].groupby( trend )[ price ].last().values
arr[x, -row_last.size:] = row_last
df[ ind ].map(func_numpy)
diff = np.diff(arr, axis=1)
up_chg = 0 * diff
down_chg = 0 * diff
up_chg[diff > 0] = diff[diff > 0]
down_chg[diff < 0] = diff[diff < 0]
up_chg_avg = pd.DataFrame(up_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1].values
down_chg_avg = pd.DataFrame(down_chg).T.ewm(
com=time_window - 1, min_periods=time_window).mean().iloc[-1].values
RS = np.abs(up_chg_avg / down_chg_avg)
df[ newRSI ] = 100 - 100 / (1 + RS)