Скользящая средняя или работает значит
есть ли функция scipy или функция numpy или модуль для python, который вычисляет текущее среднее значение массива 1D с учетом конкретного окна?
22 ответа:
для короткого, быстрого решения, которое делает все это в одном цикле, без зависимостей, приведенный ниже код отлично работает.
mylist = [1, 2, 3, 4, 5, 6, 7] N = 3 cumsum, moving_aves = [0], [] for i, x in enumerate(mylist, 1): cumsum.append(cumsum[i-1] + x) if i>=N: moving_ave = (cumsum[i] - cumsum[i-N])/N #can do stuff with moving_ave here moving_aves.append(moving_ave)
UPD: более эффективные решения были предложены Alleo и jasaarim.
можно использовать
np.convolve
для этого:np.convolve(x, np.ones((N,))/N, mode='valid')
объяснение
текущее среднее-это случай математической операции свертка. Для текущего среднего значения вы перемещаете окно по входу и вычисляете среднее значение содержимого окна. Для дискретных 1D сигналов, свертка-это то же самое, за исключением того, что вместо среднего вы вычисляете произвольную линейную комбинацию, т. е. умножаете каждый элемент на соответствующий коэффициент и складываете результаты. Эти коэффициенты, по одному для каждой позиции в окне, иногда называют сверткой ядро. Теперь, среднее арифметическое N значений
(x_1 + x_2 + ... + x_N) / N
, Так что соответствующее ядро(1/N, 1/N, ..., 1/N)
, а это именно то, что мы получаем с помощьюnp.ones((N,))/N
.края
в
эффективным решением
свертка намного лучше, чем простой подход, но (я думаю) он использует БПФ и, следовательно, довольно медленно. Однако специально для вычисления текущего значения следующий подход отлично работает
def running_mean(x, N): cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) return (cumsum[N:] - cumsum[:-N]) / float(N)
код для проверки
In[3]: x = numpy.random.random(100000) In[4]: N = 1000 In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid') 10 loops, best of 3: 41.4 ms per loop In[6]: %timeit result2 = running_mean(x, N) 1000 loops, best of 3: 1.04 ms per loop
отметим, что
numpy.allclose(result1, result2)
иTrue
два метода эквивалентны. Чем больше N, тем больше разница во времени.
панды более подходит для этого, чем NumPy или SciPy. Его функция rolling_mean делает работу удобнее. Он также возвращает массив NumPy, когда вход является массивом.
трудно победить
rolling_mean
в производительности с любой пользовательской реализацией Pure Python. Вот пример производительности против двух предложенных решений:In [1]: import numpy as np In [2]: import pandas as pd In [3]: def running_mean(x, N): ...: cumsum = np.cumsum(np.insert(x, 0, 0)) ...: return (cumsum[N:] - cumsum[:-N]) / N ...: In [4]: x = np.random.random(100000) In [5]: N = 1000 In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid') 10 loops, best of 3: 172 ms per loop In [7]: %timeit running_mean(x, N) 100 loops, best of 3: 6.72 ms per loop In [8]: %timeit pd.rolling_mean(x, N)[N-1:] 100 loops, best of 3: 4.74 ms per loop In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N)) Out[9]: True
есть также хорошие варианты, как иметь дело с краевыми значениями.
вы можете вычислить текущее среднее значение с помощью:
import numpy as np def runningMean(x, N): y = np.zeros((len(x),)) for ctr in range(len(x)): y[ctr] = np.sum(x[ctr:(ctr+N)]) return y/N
но это медленно.
К счастью, numpy включает в себя свертка функция, которую мы можем использовать, чтобы ускорить процесс. Текущее среднее значение эквивалентно свертке
x
с вектором, которыйN
долго, со всеми членами равна1/N
. Реализация numpy свертки включает в себя начальный переходный процесс, поэтому вам нужно удалить первые N-1 точки:def runningMeanFast(x, N): return np.convolve(x, np.ones((N,))/N)[(N-1):]
на моей машине, быстрая версия в 20-30 раз быстрее, в зависимости от длины входного вектора и размер окна усреднения.
обратите внимание, что свертка включает
'same'
режим, который, похоже, должен решать начальную переходную проблему, но он разделяет ее между началом и концом.
или модуль для python, который вычисляет
в моих тестах на Tradewave.net та-Либ всегда побеждает:
import talib as ta import numpy as np import pandas as pd import scipy from scipy import signal import time as t PAIR = info.primary_pair PERIOD = 30 def initialize(): storage.reset() storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0]) def cumsum_sma(array, period): ret = np.cumsum(array, dtype=float) ret[period:] = ret[period:] - ret[:-period] return ret[period - 1:] / period def pandas_sma(array, period): return pd.rolling_mean(array, period) def api_sma(array, period): # this method is native to Tradewave and does NOT return an array return (data[PAIR].ma(PERIOD)) def talib_sma(array, period): return ta.MA(array, period) def convolve_sma(array, period): return np.convolve(array, np.ones((period,))/period, mode='valid') def fftconvolve_sma(array, period): return scipy.signal.fftconvolve( array, np.ones((period,))/period, mode='valid') def tick(): close = data[PAIR].warmup_period('close') t1 = t.time() sma_api = api_sma(close, PERIOD) t2 = t.time() sma_cumsum = cumsum_sma(close, PERIOD) t3 = t.time() sma_pandas = pandas_sma(close, PERIOD) t4 = t.time() sma_talib = talib_sma(close, PERIOD) t5 = t.time() sma_convolve = convolve_sma(close, PERIOD) t6 = t.time() sma_fftconvolve = fftconvolve_sma(close, PERIOD) t7 = t.time() storage.elapsed[-1] = storage.elapsed[-1] + t2-t1 storage.elapsed[-2] = storage.elapsed[-2] + t3-t2 storage.elapsed[-3] = storage.elapsed[-3] + t4-t3 storage.elapsed[-4] = storage.elapsed[-4] + t5-t4 storage.elapsed[-5] = storage.elapsed[-5] + t6-t5 storage.elapsed[-6] = storage.elapsed[-6] + t7-t6 plot('sma_api', sma_api) plot('sma_cumsum', sma_cumsum[-5]) plot('sma_pandas', sma_pandas[-10]) plot('sma_talib', sma_talib[-15]) plot('sma_convolve', sma_convolve[-20]) plot('sma_fftconvolve', sma_fftconvolve[-25]) def stop(): log('ticks....: %s' % info.max_ticks) log('api......: %.5f' % storage.elapsed[-1]) log('cumsum...: %.5f' % storage.elapsed[-2]) log('pandas...: %.5f' % storage.elapsed[-3]) log('talib....: %.5f' % storage.elapsed[-4]) log('convolve.: %.5f' % storage.elapsed[-5]) log('fft......: %.5f' % storage.elapsed[-6])
результаты:
[2015-01-31 23:00:00] ticks....: 744 [2015-01-31 23:00:00] api......: 0.16445 [2015-01-31 23:00:00] cumsum...: 0.03189 [2015-01-31 23:00:00] pandas...: 0.03677 [2015-01-31 23:00:00] talib....: 0.00700 # <<< Winner! [2015-01-31 23:00:00] convolve.: 0.04871 [2015-01-31 23:00:00] fft......: 0.22306
готовое к использованию решение см. В разделе http://www.scipy.org/Cookbook/SignalSmooth. Он обеспечивает среднюю работу с
flat
тип окна. Обратите внимание, что это немного сложнее, чем простой метод свертки do-it-yourself, поскольку он пытается обрабатывать проблемы в начале и конце данных, отражая их (что может или не может работать в вашем случае...).для начала, вы можете попробовать:
a = np.random.random(100) plt.plot(a) b = smooth(a, window='flat') plt.plot(b)
Я знаю, это старый вопрос, но вот решение, которое не использует никаких дополнительных структур данных или библиотек. Это линейное количество элементов входного списка и я не могу придумать никакой другой способ сделать его более эффективным (на самом деле, если кто знает лучший способ выделить результат, пожалуйста, дайте мне знать).
Примечание: Это было бы намного быстрее, используя массив numpy вместо списка, но я хотел устранить все зависимости. Это также было бы возможно повышение производительности за счет многопоточного выполнения
функция предполагает, что входной список является одномерным, поэтому будьте осторожны.
### Running mean/Moving average def running_mean(l, N): sum = 0 result = list( 0 for x in l) for i in range( 0, N ): sum = sum + l[i] result[i] = sum / (i+1) for i in range( N, len(l) ): sum = sum - l[i-N] + l[i] result[i] = sum / N return result
Я еще не проверил, насколько это быстро, но вы можете попробовать:
from collections import deque cache = deque() # keep track of seen values n = 10 # window size A = xrange(100) # some dummy iterable cum_sum = 0 # initialize cumulative sum for t, val in enumerate(A, 1): cache.append(val) cum_sum += val if t < n: avg = cum_sum / float(t) else: # if window is saturated, cum_sum -= cache.popleft() # subtract oldest value avg = cum_sum / float(n)
немного поздно на вечеринку, но я сделал свою собственную маленькую функцию, которая не обертывает концы или колодки с нулями, которые затем используются для поиска среднего значения. В качестве дальнейшего лечения является то, что он также повторно отсчитывает сигнал в линейно разнесенных точках. Настройка кода по желанию, чтобы получить другие функции.
метод представляет собой простое матричное умножение с нормализованным гауссовым ядром.
def running_mean(y_in, x_in, N_out=101, sigma=1): ''' Returns running mean as a Bell-curve weighted average at evenly spaced points. Does NOT wrap signal around, or pad with zeros. Arguments: y_in -- y values, the values to be smoothed and re-sampled x_in -- x values for array Keyword arguments: N_out -- NoOf elements in resampled array. sigma -- 'Width' of Bell-curve in units of param x . ''' N_in = size(y_in) # Gaussian kernel x_out = np.linspace(np.min(x_in), np.max(x_in), N_out) x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out) gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2)) # Normalize kernel, such that the sum is one along axis 1 normalization = np.tile(np.reshape(sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in)) gauss_kernel_normalized = gauss_kernel / normalization # Perform running average as a linear operation y_out = gauss_kernel_normalized @ y_in return y_out, x_out
простое использование на синусоидальном сигнале с добавленным нормальным распределенный шум:
если важно сохранить размеры входного сигнала (вместо ограничения выхода на
'valid'
площадь свертки), вы можете использовать scipy.ndimage.фильтры.uniform_filter1d:import numpy as np from scipy.ndimage.filters import uniform_filter1d N = 1000 x = np.random.random(100000) y = uniform_filter1d(x, size=N) y.shape == x.shape >>> True
uniform_filter1d
позволяет несколько способов обработки границы, где'reflect'
по умолчанию, но в моем случае, я скорее хотел'nearest'
.это также довольно быстро (почти в 50 раз быстрее, чем
np.convolve
):%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same') 100 loops, best of 3: 9.28 ms per loop %timeit y2 = uniform_filter1d(x, size=N) 10000 loops, best of 3: 191 µs per loop
другое подход к скользящей средней без используя numpy, panda
import itertools sample = [2, 6, 10, 8, 11, 10] list(itertools.starmap(lambda a,b: b/a, enumerate(itertools.accumulate(sample), 1)))
печать [2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]
этот вопрос сейчас даже старше чем когда NeXuS писал об этом в прошлом месяце, но мне нравится, как его код имеет дело с крайними случаями. Однако, так как это "простая скользящая средняя", ее результаты отстают от данных, к которым они применяются. Я думал, что работа с краевыми случаями более удовлетворительна, чем режимы NumPy
valid
,same
иfull
может быть достигнуто путем применения аналогичного подхода кconvolution()
метод, основанный.мой вклад использует центральный запуск среднее значение, чтобы выровнять его результаты с их данными. Когда для полноразмерного окна доступно слишком мало точек, текущие средние значения вычисляются из последовательно меньших окон по краям массива. [На самом деле, из последовательно больших окон, но это деталь реализации.]
import numpy as np def running_mean(l, N): # Also works for the(strictly invalid) cases when N is even. if (N//2)*2 == N: N = N - 1 front = np.zeros(N//2) back = np.zeros(N//2) for i in range(1, (N//2)*2, 2): front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid') for i in range(1, (N//2)*2, 2): back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid') return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])
это относительно медленно, потому что он использует
convolve()
, и, вероятно, может быть приукрашено довольно много истинным Питонистом, однако, я считаю, что идея стоит.
вместо numpy или scipy, я бы рекомендовал панд сделать это более быстро:
df['data'].rolling(3).mean()
это принимает скользящую среднюю (MA) из 3 периодов столбца "данные". Вы также можете вычислить сдвинутые версии, например, тот, который исключает текущую ячейку (сдвинутую назад), можно легко вычислить как:
df['data'].shift(periods=1).rolling(3).mean()
хотя здесь есть решения для этого вопроса, пожалуйста, взгляните на мое решение. Это очень просто и хорошо работает.
import numpy as np dataset = np.asarray([1, 2, 3, 4, 5, 6, 7]) ma = list() window = 3 for t in range(0, len(dataset)): if t+window <= len(dataset): indices = range(t, t+window) ma.append(np.average(np.take(dataset, indices))) else: ma = np.asarray(ma)
есть много ответов выше о вычислении текущего среднего. Мой ответ добавляет две дополнительные функции:
- игнорирует значения nan
- вычисляет среднее значение для n соседних значений, не включая значение самого интереса
Я использую numpy.cumsum так как это наиболее эффективный способ (см. ответ Алло выше).
N=10 # number of points to test on each side of point of interest, best if even padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0) n_nan = np.cumsum(np.isnan(padded_x)) cumsum = np.nancumsum(padded_x) window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x) window_n_values = (N - window_n_nan) movavg = (window_sum) / (window_n_values)
этот код работает только для четных Ns. Его можно настроить для нечетных чисел, изменив np.вставка padded_x и n_nan.
пример вывода (raw в черном, movavg в синем):
этот код может быть легко адаптирован для удаления всех скользящих средних значений, вычисленных из менее чем cutoff = 3 значений, отличных от nan.
window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan cutoff = 3 window_n_values[window_n_values<cutoff] = np.nan movavg = (window_sum) / (window_n_values)
Используйте Только Библиотеку Python Stadnard (Эффективная Память)
просто дайте другую версию использования стандартной библиотеки
deque
только. Это довольно удивительно для меня, что большинство ответов используютpandas
илиnumpy
.def moving_average(iterable, n=3): d = deque(maxlen=n) for i in iterable: d.append(i) if len(d) == n: yield sum(d)/n r = moving_average([40, 30, 50, 46, 39, 44]) assert list(r) == [40.0, 42.0, 45.0, 43.0]
Actaully я нашел другой реализация в python docs
def moving_average(iterable, n=3): # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0 # http://en.wikipedia.org/wiki/Moving_average it = iter(iterable) d = deque(itertools.islice(it, n-1)) d.appendleft(0) s = sum(d) for elem in it: s += elem - d.popleft() d.append(elem) yield s / n
однако реализация кажется мне немного сложнее, чем это должно быть. Но это должно быть в стандартных документах python по какой-то причине, может ли кто-то прокомментировать реализацию моего и стандартного документа?
из чтения других ответов я не думаю, что это то, о чем спрашивал вопрос, но я попал сюда с необходимостью держать среднее значение списка значений, которое росло в размере.
поэтому, если вы хотите сохранить список значений, которые вы получаете откуда-то (сайт, измерительное устройство и т. д.) и средний из последних
n
значения обновлены, вы можете использовать код ниже, что сводит к минимуму усилия по добавлению новых элементов:class Running_Average(object): def __init__(self, buffer_size=10): """ Create a new Running_Average object. This object allows the efficient calculation of the average of the last `buffer_size` numbers added to it. Examples -------- >>> a = Running_Average(2) >>> a.add(1) >>> a.get() 1.0 >>> a.add(1) # there are two 1 in buffer >>> a.get() 1.0 >>> a.add(2) # there's a 1 and a 2 in the buffer >>> a.get() 1.5 >>> a.add(2) >>> a.get() # now there's only two 2 in the buffer 2.0 """ self._buffer_size = int(buffer_size) # make sure it's an int self.reset() def add(self, new): """ Add a new number to the buffer, or replaces the oldest one there. """ new = float(new) # make sure it's a float n = len(self._buffer) if n < self.buffer_size: # still have to had numbers to the buffer. self._buffer.append(new) if self._average != self._average: # ~ if isNaN(). self._average = new # no previous numbers, so it's new. else: self._average *= n # so it's only the sum of numbers. self._average += new # add new number. self._average /= (n+1) # divide by new number of numbers. else: # buffer full, replace oldest value. old = self._buffer[self._index] # the previous oldest number. self._buffer[self._index] = new # replace with new one. self._index += 1 # update the index and make sure it's... self._index %= self.buffer_size # ... smaller than buffer_size. self._average -= old/self.buffer_size # remove old one... self._average += new/self.buffer_size # ...and add new one... # ... weighted by the number of elements. def __call__(self): """ Return the moving average value, for the lazy ones who don't want to write .get . """ return self._average def get(self): """ Return the moving average value. """ return self() def reset(self): """ Reset the moving average. If for some reason you don't want to just create a new one. """ self._buffer = [] # could use np.empty(self.buffer_size)... self._index = 0 # and use this to keep track of how many numbers. self._average = float('nan') # could use np.NaN . def get_buffer_size(self): """ Return current buffer_size. """ return self._buffer_size def set_buffer_size(self, buffer_size): """ >>> a = Running_Average(10) >>> for i in range(15): ... a.add(i) ... >>> a() 9.5 >>> a._buffer # should not access this!! [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0] Decreasing buffer size: >>> a.buffer_size = 6 >>> a._buffer # should not access this!! [9.0, 10.0, 11.0, 12.0, 13.0, 14.0] >>> a.buffer_size = 2 >>> a._buffer [13.0, 14.0] Increasing buffer size: >>> a.buffer_size = 5 Warning: no older data available! >>> a._buffer [13.0, 14.0] Keeping buffer size: >>> a = Running_Average(10) >>> for i in range(15): ... a.add(i) ... >>> a() 9.5 >>> a._buffer # should not access this!! [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0] >>> a.buffer_size = 10 # reorders buffer! >>> a._buffer [5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0] """ buffer_size = int(buffer_size) # order the buffer so index is zero again: new_buffer = self._buffer[self._index:] new_buffer.extend(self._buffer[:self._index]) self._index = 0 if self._buffer_size < buffer_size: print('Warning: no older data available!') # should use Warnings! else: diff = self._buffer_size - buffer_size print(diff) new_buffer = new_buffer[diff:] self._buffer_size = buffer_size self._buffer = new_buffer buffer_size = property(get_buffer_size, set_buffer_size)
и вы можете проверьте его, например:
def graph_test(N=200): import matplotlib.pyplot as plt values = list(range(N)) values_average_calculator = Running_Average(N/2) values_averages = [] for value in values: values_average_calculator.add(value) values_averages.append(values_average_calculator()) fig, ax = plt.subplots(1, 1) ax.plot(values, label='values') ax.plot(values_averages, label='averages') ax.grid() ax.set_xlim(0, N) ax.set_ylim(0, N) fig.show()
что дает:
как о фильтр скользящей средней? Он также является однострочным и имеет то преимущество, что вы можете легко манипулировать типом окна, если вам нужно что-то другое, чем прямоугольник, т. е. N-длинная простая скользящая средняя массива a:
lfilter(np.ones(N)/N, [1], a)[N:]
и с треугольным окном применены:
lfilter(np.ones(N)*scipy.signal.triang(N)/N, [1], a)[N:]
есть комментарий mab похоронен в одном из ответы выше которого есть этот метод.
bottleneck
иmove_mean
это простая скользящая средняя:import numpy as np import bottleneck as bn a = np.arange(10) + np.random.random(10) mva = bn.move_mean(a, window=2, min_count=1)
min_count
это удобный параметр, который в основном будет принимать скользящую среднюю до этого момента в вашем массиве. Если вы не установитеmin_count
она будет равнаwindow
, и все очки будутnan
.
другое решение, просто используя стандартную библиотеку и deque:
from collections import deque import itertools def moving_average(iterable, n=3): # http://en.wikipedia.org/wiki/Moving_average it = iter(iterable) # create an iterable object from input argument d = deque(itertools.islice(it, n-1)) # create deque object by slicing iterable d.appendleft(0) s = sum(d) for elem in it: s += elem - d.popleft() d.append(elem) yield s / n # example on how to use it for i in moving_average([40, 30, 50, 46, 39, 44]): print(i) # 40.0 # 42.0 # 45.0 # 43.0
Если вы решили свернуть свой собственный, а не использовать существующую библиотеку, пожалуйста, имейте в виду ошибку с плавающей запятой и постарайтесь свести к минимуму ее последствия:
class SumAccumulator: def __init__(self): self.values = [0] self.count = 0 def add( self, val ): self.values.append( val ) self.count = self.count + 1 i = self.count while i & 0x01: i = i >> 1 v0 = self.values.pop() v1 = self.values.pop() self.values.append( v0 + v1 ) def get_total(self): return sum( reversed(self.values) ) def get_size( self ): return self.count
Если все ваши значения примерно одинакового порядка, то это поможет сохранить точность, всегда добавляя значения примерно одинаковых величин.