Скользящая средняя или работает значит


есть ли функция scipy или функция numpy или модуль для python, который вычисляет текущее среднее значение массива 1D с учетом конкретного окна?

22 107

22 ответа:

для короткого, быстрого решения, которое делает все это в одном цикле, без зависимостей, приведенный ниже код отлично работает.

mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []

for i, x in enumerate(mylist, 1):
    cumsum.append(cumsum[i-1] + x)
    if i>=N:
        moving_ave = (cumsum[i] - cumsum[i-N])/N
        #can do stuff with moving_ave here
        moving_aves.append(moving_ave)

UPD: более эффективные решения были предложены Alleo и jasaarim.


можно использовать np.convolve для этого:

np.convolve(x, np.ones((N,))/N, mode='valid')

объяснение

текущее среднее-это случай математической операции свертка. Для текущего среднего значения вы перемещаете окно по входу и вычисляете среднее значение содержимого окна. Для дискретных 1D сигналов, свертка-это то же самое, за исключением того, что вместо среднего вы вычисляете произвольную линейную комбинацию, т. е. умножаете каждый элемент на соответствующий коэффициент и складываете результаты. Эти коэффициенты, по одному для каждой позиции в окне, иногда называют сверткой ядро. Теперь, среднее арифметическое N значений (x_1 + x_2 + ... + x_N) / N, Так что соответствующее ядро (1/N, 1/N, ..., 1/N), а это именно то, что мы получаем с помощью np.ones((N,))/N.

края

в

эффективным решением

свертка намного лучше, чем простой подход, но (я думаю) он использует БПФ и, следовательно, довольно медленно. Однако специально для вычисления текущего значения следующий подход отлично работает

def running_mean(x, N):
    cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) 
    return (cumsum[N:] - cumsum[:-N]) / float(N)

код для проверки

In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop

отметим, что numpy.allclose(result1, result2) и True два метода эквивалентны. Чем больше N, тем больше разница во времени.

панды более подходит для этого, чем NumPy или SciPy. Его функция rolling_mean делает работу удобнее. Он также возвращает массив NumPy, когда вход является массивом.

трудно победить rolling_mean в производительности с любой пользовательской реализацией Pure Python. Вот пример производительности против двух предложенных решений:

In [1]: import numpy as np

In [2]: import pandas as pd

In [3]: def running_mean(x, N):
   ...:     cumsum = np.cumsum(np.insert(x, 0, 0)) 
   ...:     return (cumsum[N:] - cumsum[:-N]) / N
   ...:

In [4]: x = np.random.random(100000)

In [5]: N = 1000

In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop

In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop

In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop

In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True

есть также хорошие варианты, как иметь дело с краевыми значениями.

вы можете вычислить текущее среднее значение с помощью:

import numpy as np

def runningMean(x, N):
    y = np.zeros((len(x),))
    for ctr in range(len(x)):
         y[ctr] = np.sum(x[ctr:(ctr+N)])
    return y/N

но это медленно.

К счастью, numpy включает в себя свертка функция, которую мы можем использовать, чтобы ускорить процесс. Текущее среднее значение эквивалентно свертке x с вектором, который N долго, со всеми членами равна 1/N. Реализация numpy свертки включает в себя начальный переходный процесс, поэтому вам нужно удалить первые N-1 точки:

def runningMeanFast(x, N):
    return np.convolve(x, np.ones((N,))/N)[(N-1):]

на моей машине, быстрая версия в 20-30 раз быстрее, в зависимости от длины входного вектора и размер окна усреднения.

обратите внимание, что свертка включает 'same' режим, который, похоже, должен решать начальную переходную проблему, но он разделяет ее между началом и концом.

или модуль для python, который вычисляет

в моих тестах на Tradewave.net та-Либ всегда побеждает:

import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t

PAIR = info.primary_pair
PERIOD = 30

def initialize():
    storage.reset()
    storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0])

def cumsum_sma(array, period):
    ret = np.cumsum(array, dtype=float)
    ret[period:] = ret[period:] - ret[:-period]
    return ret[period - 1:] / period

def pandas_sma(array, period):
    return pd.rolling_mean(array, period)

def api_sma(array, period):
    # this method is native to Tradewave and does NOT return an array
    return (data[PAIR].ma(PERIOD))

def talib_sma(array, period):
    return ta.MA(array, period)

def convolve_sma(array, period):
    return np.convolve(array, np.ones((period,))/period, mode='valid')

def fftconvolve_sma(array, period):    
    return scipy.signal.fftconvolve(
        array, np.ones((period,))/period, mode='valid')    

def tick():

    close = data[PAIR].warmup_period('close')

    t1 = t.time()
    sma_api = api_sma(close, PERIOD)
    t2 = t.time()
    sma_cumsum = cumsum_sma(close, PERIOD)
    t3 = t.time()
    sma_pandas = pandas_sma(close, PERIOD)
    t4 = t.time()
    sma_talib = talib_sma(close, PERIOD)
    t5 = t.time()
    sma_convolve = convolve_sma(close, PERIOD)
    t6 = t.time()
    sma_fftconvolve = fftconvolve_sma(close, PERIOD)
    t7 = t.time()

    storage.elapsed[-1] = storage.elapsed[-1] + t2-t1
    storage.elapsed[-2] = storage.elapsed[-2] + t3-t2
    storage.elapsed[-3] = storage.elapsed[-3] + t4-t3
    storage.elapsed[-4] = storage.elapsed[-4] + t5-t4
    storage.elapsed[-5] = storage.elapsed[-5] + t6-t5    
    storage.elapsed[-6] = storage.elapsed[-6] + t7-t6        

    plot('sma_api', sma_api)  
    plot('sma_cumsum', sma_cumsum[-5])
    plot('sma_pandas', sma_pandas[-10])
    plot('sma_talib', sma_talib[-15])
    plot('sma_convolve', sma_convolve[-20])    
    plot('sma_fftconvolve', sma_fftconvolve[-25])

def stop():

    log('ticks....: %s' % info.max_ticks)

    log('api......: %.5f' % storage.elapsed[-1])
    log('cumsum...: %.5f' % storage.elapsed[-2])
    log('pandas...: %.5f' % storage.elapsed[-3])
    log('talib....: %.5f' % storage.elapsed[-4])
    log('convolve.: %.5f' % storage.elapsed[-5])    
    log('fft......: %.5f' % storage.elapsed[-6])

результаты:

[2015-01-31 23:00:00] ticks....: 744
[2015-01-31 23:00:00] api......: 0.16445
[2015-01-31 23:00:00] cumsum...: 0.03189
[2015-01-31 23:00:00] pandas...: 0.03677
[2015-01-31 23:00:00] talib....: 0.00700  # <<< Winner!
[2015-01-31 23:00:00] convolve.: 0.04871
[2015-01-31 23:00:00] fft......: 0.22306

enter image description here

готовое к использованию решение см. В разделе http://www.scipy.org/Cookbook/SignalSmooth. Он обеспечивает среднюю работу с flat тип окна. Обратите внимание, что это немного сложнее, чем простой метод свертки do-it-yourself, поскольку он пытается обрабатывать проблемы в начале и конце данных, отражая их (что может или не может работать в вашем случае...).

для начала, вы можете попробовать:

a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)

Я знаю, это старый вопрос, но вот решение, которое не использует никаких дополнительных структур данных или библиотек. Это линейное количество элементов входного списка и я не могу придумать никакой другой способ сделать его более эффективным (на самом деле, если кто знает лучший способ выделить результат, пожалуйста, дайте мне знать).

Примечание: Это было бы намного быстрее, используя массив numpy вместо списка, но я хотел устранить все зависимости. Это также было бы возможно повышение производительности за счет многопоточного выполнения

функция предполагает, что входной список является одномерным, поэтому будьте осторожны.

### Running mean/Moving average
def running_mean(l, N):
    sum = 0
    result = list( 0 for x in l)

    for i in range( 0, N ):
        sum = sum + l[i]
        result[i] = sum / (i+1)

    for i in range( N, len(l) ):
        sum = sum - l[i-N] + l[i]
        result[i] = sum / N

    return result

Я еще не проверил, насколько это быстро, но вы можете попробовать:

from collections import deque

cache = deque() # keep track of seen values
n = 10          # window size
A = xrange(100) # some dummy iterable
cum_sum = 0     # initialize cumulative sum

for t, val in enumerate(A, 1):
    cache.append(val)
    cum_sum += val
    if t < n:
        avg = cum_sum / float(t)
    else:                           # if window is saturated,
        cum_sum -= cache.popleft()  # subtract oldest value
        avg = cum_sum / float(n)

немного поздно на вечеринку, но я сделал свою собственную маленькую функцию, которая не обертывает концы или колодки с нулями, которые затем используются для поиска среднего значения. В качестве дальнейшего лечения является то, что он также повторно отсчитывает сигнал в линейно разнесенных точках. Настройка кода по желанию, чтобы получить другие функции.

метод представляет собой простое матричное умножение с нормализованным гауссовым ядром.

def running_mean(y_in, x_in, N_out=101, sigma=1):
    '''
    Returns running mean as a Bell-curve weighted average at evenly spaced
    points. Does NOT wrap signal around, or pad with zeros.

    Arguments:
    y_in -- y values, the values to be smoothed and re-sampled
    x_in -- x values for array

    Keyword arguments:
    N_out -- NoOf elements in resampled array.
    sigma -- 'Width' of Bell-curve in units of param x .
    '''
    N_in = size(y_in)

    # Gaussian kernel
    x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
    x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
    gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
    # Normalize kernel, such that the sum is one along axis 1
    normalization = np.tile(np.reshape(sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
    gauss_kernel_normalized = gauss_kernel / normalization
    # Perform running average as a linear operation
    y_out = gauss_kernel_normalized @ y_in

    return y_out, x_out

простое использование на синусоидальном сигнале с добавленным нормальным распределенный шум: enter image description here

если важно сохранить размеры входного сигнала (вместо ограничения выхода на 'valid' площадь свертки), вы можете использовать scipy.ndimage.фильтры.uniform_filter1d:

import numpy as np
from scipy.ndimage.filters import uniform_filter1d
N = 1000
x = np.random.random(100000)
y = uniform_filter1d(x, size=N)

y.shape == x.shape
>>> True

uniform_filter1d позволяет несколько способов обработки границы, где 'reflect' по умолчанию, но в моем случае, я скорее хотел 'nearest'.

это также довольно быстро (почти в 50 раз быстрее, чем np.convolve):

%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop

%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop

другое подход к скользящей средней без используя numpy, panda

import itertools
sample = [2, 6, 10, 8, 11, 10]
list(itertools.starmap(lambda a,b: b/a, 
               enumerate(itertools.accumulate(sample), 1)))

печать [2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]

этот вопрос сейчас даже старше чем когда NeXuS писал об этом в прошлом месяце, но мне нравится, как его код имеет дело с крайними случаями. Однако, так как это "простая скользящая средняя", ее результаты отстают от данных, к которым они применяются. Я думал, что работа с краевыми случаями более удовлетворительна, чем режимы NumPy valid,same и full может быть достигнуто путем применения аналогичного подхода к convolution() метод, основанный.

мой вклад использует центральный запуск среднее значение, чтобы выровнять его результаты с их данными. Когда для полноразмерного окна доступно слишком мало точек, текущие средние значения вычисляются из последовательно меньших окон по краям массива. [На самом деле, из последовательно больших окон, но это деталь реализации.]

import numpy as np

def running_mean(l, N):
    # Also works for the(strictly invalid) cases when N is even.
    if (N//2)*2 == N:
        N = N - 1
    front = np.zeros(N//2)
    back = np.zeros(N//2)

    for i in range(1, (N//2)*2, 2):
        front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
    for i in range(1, (N//2)*2, 2):
        back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
    return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])

это относительно медленно, потому что он использует convolve(), и, вероятно, может быть приукрашено довольно много истинным Питонистом, однако, я считаю, что идея стоит.

вместо numpy или scipy, я бы рекомендовал панд сделать это более быстро:

df['data'].rolling(3).mean()

это принимает скользящую среднюю (MA) из 3 периодов столбца "данные". Вы также можете вычислить сдвинутые версии, например, тот, который исключает текущую ячейку (сдвинутую назад), можно легко вычислить как:

df['data'].shift(periods=1).rolling(3).mean()

хотя здесь есть решения для этого вопроса, пожалуйста, взгляните на мое решение. Это очень просто и хорошо работает.

import numpy as np
dataset = np.asarray([1, 2, 3, 4, 5, 6, 7])
ma = list()
window = 3
for t in range(0, len(dataset)):
    if t+window <= len(dataset):
        indices = range(t, t+window)
        ma.append(np.average(np.take(dataset, indices)))
else:
    ma = np.asarray(ma)

есть много ответов выше о вычислении текущего среднего. Мой ответ добавляет две дополнительные функции:

  1. игнорирует значения nan
  2. вычисляет среднее значение для n соседних значений, не включая значение самого интереса

Я использую numpy.cumsum так как это наиболее эффективный способ (см. ответ Алло выше).

N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x) 
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)

этот код работает только для четных Ns. Его можно настроить для нечетных чисел, изменив np.вставка padded_x и n_nan.

пример вывода (raw в черном, movavg в синем): raw data (black) and moving average (blue) of 10 points around each value, not including that value. nan values are ignored.

этот код может быть легко адаптирован для удаления всех скользящих средних значений, вычисленных из менее чем cutoff = 3 значений, отличных от nan.

window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)

raw data (black) and moving average (blue) while ignoring any window with fewer than 3 non-nan values

Используйте Только Библиотеку Python Stadnard (Эффективная Память)

просто дайте другую версию использования стандартной библиотеки deque только. Это довольно удивительно для меня, что большинство ответов используют pandas или numpy.

def moving_average(iterable, n=3):
    d = deque(maxlen=n)
    for i in iterable:
        d.append(i)
        if len(d) == n:
            yield sum(d)/n

r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]

Actaully я нашел другой реализация в python docs

def moving_average(iterable, n=3):
    # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable)
    d = deque(itertools.islice(it, n-1))
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

однако реализация кажется мне немного сложнее, чем это должно быть. Но это должно быть в стандартных документах python по какой-то причине, может ли кто-то прокомментировать реализацию моего и стандартного документа?

из чтения других ответов я не думаю, что это то, о чем спрашивал вопрос, но я попал сюда с необходимостью держать среднее значение списка значений, которое росло в размере.

поэтому, если вы хотите сохранить список значений, которые вы получаете откуда-то (сайт, измерительное устройство и т. д.) и средний из последних n значения обновлены, вы можете использовать код ниже, что сводит к минимуму усилия по добавлению новых элементов:

class Running_Average(object):
    def __init__(self, buffer_size=10):
        """
        Create a new Running_Average object.

        This object allows the efficient calculation of the average of the last
        `buffer_size` numbers added to it.

        Examples
        --------
        >>> a = Running_Average(2)
        >>> a.add(1)
        >>> a.get()
        1.0
        >>> a.add(1)  # there are two 1 in buffer
        >>> a.get()
        1.0
        >>> a.add(2)  # there's a 1 and a 2 in the buffer
        >>> a.get()
        1.5
        >>> a.add(2)
        >>> a.get()  # now there's only two 2 in the buffer
        2.0
        """
        self._buffer_size = int(buffer_size)  # make sure it's an int
        self.reset()

    def add(self, new):
        """
        Add a new number to the buffer, or replaces the oldest one there.
        """
        new = float(new)  # make sure it's a float
        n = len(self._buffer)
        if n < self.buffer_size:  # still have to had numbers to the buffer.
            self._buffer.append(new)
            if self._average != self._average:  # ~ if isNaN().
                self._average = new  # no previous numbers, so it's new.
            else:
                self._average *= n  # so it's only the sum of numbers.
                self._average += new  # add new number.
                self._average /= (n+1)  # divide by new number of numbers.
        else:  # buffer full, replace oldest value.
            old = self._buffer[self._index]  # the previous oldest number.
            self._buffer[self._index] = new  # replace with new one.
            self._index += 1  # update the index and make sure it's...
            self._index %= self.buffer_size  # ... smaller than buffer_size.
            self._average -= old/self.buffer_size  # remove old one...
            self._average += new/self.buffer_size  # ...and add new one...
            # ... weighted by the number of elements.

    def __call__(self):
        """
        Return the moving average value, for the lazy ones who don't want
        to write .get .
        """
        return self._average

    def get(self):
        """
        Return the moving average value.
        """
        return self()

    def reset(self):
        """
        Reset the moving average.

        If for some reason you don't want to just create a new one.
        """
        self._buffer = []  # could use np.empty(self.buffer_size)...
        self._index = 0  # and use this to keep track of how many numbers.
        self._average = float('nan')  # could use np.NaN .

    def get_buffer_size(self):
        """
        Return current buffer_size.
        """
        return self._buffer_size

    def set_buffer_size(self, buffer_size):
        """
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]

        Decreasing buffer size:
        >>> a.buffer_size = 6
        >>> a._buffer  # should not access this!!
        [9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        >>> a.buffer_size = 2
        >>> a._buffer
        [13.0, 14.0]

        Increasing buffer size:
        >>> a.buffer_size = 5
        Warning: no older data available!
        >>> a._buffer
        [13.0, 14.0]

        Keeping buffer size:
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
        >>> a.buffer_size = 10  # reorders buffer!
        >>> a._buffer
        [5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        """
        buffer_size = int(buffer_size)
        # order the buffer so index is zero again:
        new_buffer = self._buffer[self._index:]
        new_buffer.extend(self._buffer[:self._index])
        self._index = 0
        if self._buffer_size < buffer_size:
            print('Warning: no older data available!')  # should use Warnings!
        else:
            diff = self._buffer_size - buffer_size
            print(diff)
            new_buffer = new_buffer[diff:]
        self._buffer_size = buffer_size
        self._buffer = new_buffer

    buffer_size = property(get_buffer_size, set_buffer_size)

и вы можете проверьте его, например:

def graph_test(N=200):
    import matplotlib.pyplot as plt
    values = list(range(N))
    values_average_calculator = Running_Average(N/2)
    values_averages = []
    for value in values:
        values_average_calculator.add(value)
        values_averages.append(values_average_calculator())
    fig, ax = plt.subplots(1, 1)
    ax.plot(values, label='values')
    ax.plot(values_averages, label='averages')
    ax.grid()
    ax.set_xlim(0, N)
    ax.set_ylim(0, N)
    fig.show()

что дает:

Values and their average as a function of values #

как о фильтр скользящей средней? Он также является однострочным и имеет то преимущество, что вы можете легко манипулировать типом окна, если вам нужно что-то другое, чем прямоугольник, т. е. N-длинная простая скользящая средняя массива a:

lfilter(np.ones(N)/N, [1], a)[N:]

и с треугольным окном применены:

lfilter(np.ones(N)*scipy.signal.triang(N)/N, [1], a)[N:]

есть комментарий mab похоронен в одном из ответы выше которого есть этот метод. bottleneck и move_mean это простая скользящая средняя:

import numpy as np
import bottleneck as bn

a = np.arange(10) + np.random.random(10)

mva = bn.move_mean(a, window=2, min_count=1)

min_count это удобный параметр, который в основном будет принимать скользящую среднюю до этого момента в вашем массиве. Если вы не установите min_count она будет равна window, и все очки будут nan.

другое решение, просто используя стандартную библиотеку и deque:

from collections import deque
import itertools

def moving_average(iterable, n=3):
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable) 
    # create an iterable object from input argument
    d = deque(itertools.islice(it, n-1))  
    # create deque object by slicing iterable
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

# example on how to use it
for i in  moving_average([40, 30, 50, 46, 39, 44]):
    print(i)

# 40.0
# 42.0
# 45.0
# 43.0

Если вы решили свернуть свой собственный, а не использовать существующую библиотеку, пожалуйста, имейте в виду ошибку с плавающей запятой и постарайтесь свести к минимуму ее последствия:

class SumAccumulator:
    def __init__(self):
        self.values = [0]
        self.count = 0

    def add( self, val ):
        self.values.append( val )
        self.count = self.count + 1
        i = self.count
        while i & 0x01:
            i = i >> 1
            v0 = self.values.pop()
            v1 = self.values.pop()
            self.values.append( v0 + v1 )

    def get_total(self):
        return sum( reversed(self.values) )

    def get_size( self ):
        return self.count

Если все ваши значения примерно одинакового порядка, то это поможет сохранить точность, всегда добавляя значения примерно одинаковых величин.