English 中文(简体)
如何通过熊猫数据框中的记录迭代来计算相对强度指数(RSI)
原标题:How to calculate the Relative Strength Index (RSI) through record iterations in pandas dataframe
I have created a pandas dataframe as follows: import pandas as pd import numpy as np ds = { trend : [1,1,1,1,2,2,3,3,3,3,3,3,4,4,4,4,4], price : [23,43,56,21,43,55,54,32,9,12,11,12,23,3,2,1,1]} df = pd.DataFrame(data=ds) The dataframe looks as follows: display(df) trend price 0 1 23 1 1 43 2 1 56 3 1 21 4 2 43 5 2 55 6 3 54 7 3 32 8 3 9 9 3 12 10 3 11 11 3 12 12 4 23 13 4 3 14 4 2 15 4 1 16 4 1 I have saved the dataframe to a .csv file called df.csv: df.to_csv("df.csv", index = False) I have then created a function that calculates the Relative Strength Index (RSI - see: https://www.investopedia.com/terms/r/rsi.asp): def get_RSI(df, column, time_window): """Return the RSI indicator for the specified time window.""" diff = df[column].diff(1) # This preservers dimensions off diff values. up_chg = 0 * diff down_chg = 0 * diff # Up change is equal to the positive difference, otherwise equal to zero. up_chg[diff > 0] = diff[diff > 0] # Down change is equal to negative deifference, otherwise equal to zero. down_chg[diff < 0] = diff[diff < 0] # We set com = time_window-1 so we get decay alpha=1/time_window. up_chg_avg = up_chg.ewm(com=time_window - 1, min_periods=time_window).mean() down_chg_avg = down_chg.ewm(com=time_window - 1, min_periods=time_window).mean() RS = abs(up_chg_avg / down_chg_avg) df[ RSI ] = 100 - 100 / (1 + RS) df = df[[ RSI ]] return df I need to create a new field called RSI which: iterates through each and every record of the dataframe calculates the RSI by considering the price observed at each iteration and the last prices (RSI length is 3 in this example) observed in the previous trends. For example: I iterate at record 0 and the RSI is NaN (missing). I iterate at record 1 and the RSI is still NaN (missing) I Iterate at record 12 and the RSI is 47.667343 (it considers price at record 3, price at record 5 and price at record 12 I Iterate at record 13 and the RSI is 28.631579 (it considers price at record 3, price at record 5 and price at record 13 I Iterate at record 15 and the RSI is 27.586207 (it considers price at record 3, price at record 5 and price at record 15 and so on ..... I have then written this code: rsi = [] for i in range(len(df)): ds = pd.read_csv("df.csv", nrows=i+1) print(ds.info()) d = ds.groupby([ trend ], as_index=False).agg( { price : last }) get_RSI(d, price ,3) rsi.append(d[ RSI ].iloc[-1]) df[ RSI ] = rsi The dataset looks correct: display(df) trend price RSI 0 1 23 NaN 1 1 43 NaN 2 1 56 NaN 3 1 21 NaN 4 2 43 NaN 5 2 55 NaN 6 3 54 NaN 7 3 32 NaN 8 3 9 NaN 9 3 12 NaN 10 3 11 NaN 11 3 12 NaN 12 4 23 47.667343 13 4 3 28.631579 14 4 2 28.099174 15 4 1 27.586207 16 4 1 27.586207 Th problem is that I need to process about 4 million records and it would take approximately 60 hours. Does anyone know how to get the same results in a quick, efficient way, please?
问题回答
It wasn t completely clear in the question what the analysis was trying to achieve - there is a high likelihood that there s a more efficient method if this was understood. Using the below approach I managed to get the time down from 5 hours to 35 minutes on the simulated dataset, whilst still using the same methodology. I simulated your 4m dataset as follows:- import random from tqdm import tqdm random.seed(1337) trends = [x for x in range(1000)] ds = { # sorted as I assume the trend is the time indicator. "trend": sorted( [trends[int((random.random() * 1000))] for _ in tqdm(range(int(4e6)))] ), "price": [random.random() * 100 for _ in tqdm(range(int(4e6)))], } df = pd.DataFrame(data=ds) df.to_csv( "fake_data.csv", index=True ) # reccommend keeping index to keep track of the time series Before iterating over the df, I created a list of cartesian products (list of lists), where each cartesian product is the slice of the original df for which you are calculating the RSI. We can then iterate over this list to retrieve each required df slice. import pandas as pd from copy import deepcopy from tqdm import tqdm df = pd.read_csv("./fake_data.csv") time_window = 3 def get_trend_ranges(df: pd.DataFrame, trends: list) -> dict[int, tuple]: """ trends - list of unique trend values returns - start and end index of each range (assume df sorted by range) """ trends_ranges = {} for trend in trends: df_trend = df[df["trend"] == trend] start_idx = df_trend.iloc[0].name end_idx = df_trend.iloc[-1].name trends_ranges[trend] = (start_idx, end_idx) return trends_ranges def _get_idx_slices(trends: list, trends_ranges: dict, time_window) -> list[list]: """ trends - list of unique trend values trends_ranges - {trend: (start_index, end idx)} returns [[idx1, idx2, ..,idxt+1], [idx11, idx22, ..., idx2t+1],..] where t is time_window """ if time_window >= len(trends) or time_window <= 0: raise Exception("Time Window not within bounds") else: unique_trend_analyses = len(trends) - time_window idx_lst = [] for i in tqdm(range(unique_trend_analyses)): fixed_t = trends[i : i + time_window] fixed_idx = [trends_ranges[trend][1] for trend in fixed_t] var_t = trends[i + time_window] var_t_start_idx, var_t_stop_idx = ( trends_ranges[var_t][0], trends_ranges[var_t][1] + 1, ) for j in range(var_t_start_idx, var_t_stop_idx): full_idx = deepcopy(fixed_idx) full_idx.append(j) idx_lst.append(full_idx) return idx_lst # in order to store ranges of each trend, we assume trend is sorted. def get_idx_slices(df, time_window): trends = df["trend"].unique().tolist() trends_ranges = get_trend_ranges(df, trends) idx_slices = _get_idx_slices(trends, trends_ranges, time_window) return idx_slices Now we can iterate over the idx list to use the df slices that we require. def get_RSI(df, time_window): """Return the RSI indicator for the specified time window.""" up_chg = df.apply(lambda x: x if x > 0 else 0) down_chg = df.apply(lambda x: x if x < 0 else 0) # We set com = time_window-1 so we get decay alpha=1/time_window. up_chg_avg = up_chg.ewm(com=time_window - 1, min_periods=time_window).mean() down_chg_avg = down_chg.ewm(com=time_window - 1, min_periods=time_window).mean() RS = abs(up_chg_avg / down_chg_avg) RSI = 100 - 100 / (1 + RS) return RSI.iloc[-1] df["RSI"] = "" idx_lst = get_idx_slices(df, time_window) for idx in tqdm(idx_lst): df_slice = df.iloc[idx] df_slice_diff = df_slice["price"].diff() RSI = get_RSI(df_slice_diff, time_window) last_idx = idx[len(idx) - 1] df.loc[last_idx, "RSI"] = RSI
Instead of reading the file on each line in a loop, it is better to do this: ds = df.loc[:i] Now what I did: The ind column was created to receive indexes in map. An empty array arr is generated, which is filled on each line in func_numpy. The rest is your operations, which are converted to vector ones, which are excluded from being processed row by row and executed in one go. The hardest thing here is the grouping in map on each row. I couldn’t solve the problem of getting the last group values on each line without accessing it line by line. Tested 5-10 times faster with a dataframe of up to 100,000 rows(if you count with reading the file on each line, you can add 5 to this figure). I also checked that the calculated RSI values coincide with your algorithm. If use a simple numpy average instead of ewm (counting by last values instead of a sliding window), it will be a little faster, but not by much and the values will be different. #dataframe generation n = 15 trend = np.random.randint(low=1, high=7, size=(n)) price = np.random.uniform(low=1, high=100, size=(n)) df = pd.DataFrame({ trend : trend, price : price}) df[ ind ] = df.index size_trend = df[ trend ].unique().size arr = np.full((len(df), size_trend), np.nan) def func_numpy(x): row_last = df.loc[:x].groupby( trend )[ price ].last().values arr[x, -row_last.size:] = row_last df[ ind ].map(func_numpy) diff = np.diff(arr, axis=1) up_chg = 0 * diff down_chg = 0 * diff up_chg[diff > 0] = diff[diff > 0] down_chg[diff < 0] = diff[diff < 0] up_chg_avg = pd.DataFrame(up_chg).T.ewm( com=time_window - 1, min_periods=time_window).mean().iloc[-1].values down_chg_avg = pd.DataFrame(down_chg).T.ewm( com=time_window - 1, min_periods=time_window).mean().iloc[-1].values RS = np.abs(up_chg_avg / down_chg_avg) df[ newRSI ] = 100 - 100 / (1 + RS)




相关问题
Can Django models use MySQL functions?

Is there a way to force Django models to pass a field to a MySQL function every time the model data is read or loaded? To clarify what I mean in SQL, I want the Django model to produce something like ...

An enterprise scheduler for python (like quartz)

I am looking for an enterprise tasks scheduler for python, like quartz is for Java. Requirements: Persistent: if the process restarts or the machine restarts, then all the jobs must stay there and ...

How to remove unique, then duplicate dictionaries in a list?

Given the following list that contains some duplicate and some unique dictionaries, what is the best method to remove unique dictionaries first, then reduce the duplicate dictionaries to single ...

What is suggested seed value to use with random.seed()?

Simple enough question: I m using python random module to generate random integers. I want to know what is the suggested value to use with the random.seed() function? Currently I am letting this ...

How can I make the PyDev editor selectively ignore errors?

I m using PyDev under Eclipse to write some Jython code. I ve got numerous instances where I need to do something like this: import com.work.project.component.client.Interface.ISubInterface as ...

How do I profile `paster serve` s startup time?

Python s paster serve app.ini is taking longer than I would like to be ready for the first request. I know how to profile requests with middleware, but how do I profile the initialization time? I ...

Pragmatically adding give-aways/freebies to an online store

Our business currently has an online store and recently we ve been offering free specials to our customers. Right now, we simply display the special and give the buyer a notice stating we will add the ...

Converting Dictionary to List? [duplicate]

I m trying to convert a Python dictionary into a Python list, in order to perform some calculations. #My dictionary dict = {} dict[ Capital ]="London" dict[ Food ]="Fish&Chips" dict[ 2012 ]="...

热门标签