Эффективных потоков в Haskell


В постоянном стремлении эффективно манипулировать битами (например, см. Этоттак вопрос ) новейшей проблемой является эффективное потоковое использование и потребление битов.

В качестве первой простой задачи я выбираю поиск самой длинной последовательности идентичных битов в битовом потоке, генерируемом /dev/urandom. Типичным заклинанием было бы head -c 1000000 </dev/urandom | my-exe. Фактическая цель состоит в том, чтобы передавать биты и декодировать код Elias gamma , например, т. е. коды, которые не являются кусками байтов или кратными из этого.

Для таких кодов переменной длины хорошо иметь take, takeWhile, group, и т.д. язык для работы со списком. Поскольку BitStream.take фактически поглотил бы часть Бистрема, в игру, вероятно, вступила бы какая-то монада.

Очевидной отправной точкой является ленивый байтестринг от Data.ByteString.Lazy.

A. подсчет байтов

Эта очень простая программа Haskell работает наравне с программой C, как и должно быть ожидаемый.

import qualified Data.ByteString.Lazy as BSL

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ BSL.length bs

B. добавление байтов

Как только я начну использовать unpack, все должно стать еще хуже.

main = do
    bs <- BSL.getContents
    print $ sum $ BSL.unpack bs
Удивительно, но Хаскелл и Си показывают почти одинаковую производительность.

C. Самая длинная последовательность идентичных битов

В качестве первой нетривиальной задачи самая длинная последовательность идентичных битов может быть найдена следующим образом:

module Main where

import           Data.Bits            (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import           Data.List            (group)
import           Data.Word8           (Word8)

splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (i-> (w `shiftR` i) .&. 1 == 1) [0..7]

bitStream :: BSL.ByteString -> [Bool]
bitStream bs = concat $ map splitByte (BSL.unpack bs)

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ maximum $ length <$> (group $ bitStream bs)

Ленивый байтестринг преобразуется в список [Word8] , а затем, используя сдвиги, каждый Word разбивается на биты, в результате получается список [Bool]. Этот список списков затем выравнивается с помощью concat. Получив (ленивый) список Bool, используйте group для разбиения списка на последовательности идентичных битов и затем сопоставьте length над ним. Наконец maximum дает желаемый результат. Довольно просто, но не очень быстро:

# C
real    0m0.606s

# Haskell
real    0m6.062s
Эта наивная реализация ровно на порядок медленнее. Профилирование показывает, что выделяется довольно много памяти (около 3 ГБ для разбора 1 МБ входных данных). Здесь нет однако следует отметить массовую утечку информации из космоса.

Отсюда я начинаю копаться:

  • существует bitstream пакет , который обещает " быстрые, упакованные, строгие битовые потоки (т. е. список Булов) с полуавтоматическим слиянием потоков.". К сожалению, он не соответствует текущему пакету vector, см. здесь для получения подробной информации.
  • далее я исследую streaming. Я не совсем понимаю, зачем мне нужна "эффектная" трансляция, которая приносит некоторые монада в игре - по крайней мере, пока я не начну с обратной поставленной задачи, то есть кодирования и записи битовых потоков в файл.
  • как насчет того, чтобы просто fold - над ByteString? Мне пришлось бы ввести штат, чтобы отслеживать потребленные биты. Это не совсем приятно.take, takeWhile, group, и т.д. язык, который является желательным.
А теперь я не совсем понимаю, куда идти.

Обновление :

Я понял, как это сделать с помощью streaming и еще streaming-bytestring. Вероятно, я делаю это неправильно, потому что результат катастрофически плох.

import           Data.Bits                 (shiftR, (.&.))
import qualified Data.ByteString.Streaming as BSS
import           Data.Word8                (Word8)
import qualified Streaming                 as S
import           Streaming.Prelude         (Of, Stream)
import qualified Streaming.Prelude         as S

splitByte :: Word8 -> [Bool]
splitByte w = (i-> (w `shiftR` i) .&. 1 == 1) <$> [0..7]

bitStream :: Monad m => Stream (Of Word8) m () -> Stream (Of Bool) m ()
bitStream s = S.concat $ S.map splitByte s

main :: IO ()
main = do
    let bs = BSS.unpack BSS.getContents :: Stream (Of Word8) IO ()
        gs = S.group $ bitStream bs ::  Stream (Stream (Of Bool) IO) IO ()
    maxLen <- S.maximum $ S.mapped S.length gs
    print $ S.fst' maxLen

Это проверит ваше терпение с чем-либо, кроме нескольких тысяч байт ввода от stdin. Профилировщик говорит, что он тратит безумное количество времени (квадратичное по размеру входного сигнала) в Streaming.Internal.>>=.loop и Data.Functor.Of.fmap. Я не совсем уверен, что первое, но fmap указывает (?) что жонглирование этими Of a b не приносит нам никакой пользы, и поскольку мы находимся в монаде ИО, этого не может быть оптимизирован в сторону.

У меня также есть потоковый эквивалент байтового сумматора здесь: SumBytesStream.hs, что немного медленнее, чем простая ленивая реализация ByteString, но все же прилично. Поскольку streaming-bytestring являетсяпровозглашенным "bytestring io сделано правильно ", я ожидал лучшего. Тогда, наверное, я делаю это неправильно.

В любом случае, все эти битовые вычисления не должны происходить в монаде IO. Но BSS.getContents заставляет меня войти в монаду ИО, потому что getContents :: MonadIO m => ByteString m () и нет никакого выхода.

Обновление 2

Следуя совету @dfeuer, я использовал streaming пакет в master@HEAD. И вот результат.

longest-seq-c       0m0.747s    (C)
longest-seq         0m8.190s    (Haskell ByteString)
longest-seq-stream  0m13.946s   (Haskell streaming-bytestring)

Проблема O(n^2) Streaming.concat решена, но мы все еще не приближаемся к эталону C.

Обновление 3

Решение Cirdec производит производительность наравне с C. используемая конструкция называется "церковные закодированные списки", см. Это так ответьте или Haskell Wiki натипах ранга N .

Исходные файлы:

Все исходные файлы можно найти на github. У Makefile есть все различные цели для выполнения экспериментов и профилирования. По умолчанию make просто соберет все (сначала создайте каталог bin/!) и затем make time будет выполнять синхронизацию исполняемых файлов longest-seq. Исполняемые файлы C получают a -c, добавленное для их различения.

2 12

2 ответа:

Промежуточные распределения и соответствующие им накладные расходы могут быть удалены, когда операции над потоками сливаются вместе. Прелюдия GHC обеспечивает слияние foldr / build для ленивых потоков в видеПравил перезаписи . Общая идея заключается в том, что если одна функция производит результат, который выглядит как foldr (он имеет тип (a -> b -> b) -> b -> b, примененный к (:) и []), а другая функция потребляет список, который выглядит как foldr, построение промежуточного списка может быть удалено.

Для вашего задача я собираюсь построить нечто подобное, но с использованием строгих левых складок (foldl') вместо foldr. Вместо того чтобы использовать правила перезаписи, которые пытаются определить, когда что-то выглядит как foldl, я буду использовать тип данных, который заставляет списки выглядеть как левые сгибы.

-- A list encoded as a strict left fold.
newtype ListS a = ListS {build :: forall b. (b -> a -> b) -> b -> b}

Поскольку я начал с отказа от списков, мы будем повторно реализовывать часть прелюдии для списков.

Строгие левые сгибы могут быть созданы из функций foldl' Как списков, так и байтестрингов.
{-# INLINE fromList #-}
fromList :: [a] -> ListS a
fromList l = ListS (\c z -> foldl' c z l)

{-# INLINE fromBS #-}
fromBS :: BSL.ByteString -> ListS Word8
fromBS l = ListS (\c z -> BSL.foldl' c z l)

В самый простой пример использования одного из них-найти длину списка.

{-# INLINE length' #-}
length' :: ListS a -> Int
length' l = build l (\z a -> z+1) 0

Мы также можем сопоставлять и соединять левые складки.

{-# INLINE map' #-}
-- fmap renamed so it can be inlined
map' f l = ListS (\c z -> build l (\z a -> c z (f a)) z)

{-# INLINE concat' #-}
concat' :: ListS (ListS a) -> ListS a
concat' ll = ListS (\c z -> build ll (\z l -> build l c z) z)

Для вашей задачи нам нужно уметь разбивать слово на биты.

{-# INLINE splitByte #-}
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7]

{-# INLINE splitByte' #-}
splitByte' :: Word8 -> ListS Bool
splitByte' = fromList . splitByte

И a ByteString в биты

{-# INLINE bitStream' #-}
bitStream' :: BSL.ByteString -> ListS Bool
bitStream' = concat' . map' splitByte' . fromBS

Чтобы найти самый длинный пробег, мы будем отслеживать Предыдущее значение, длину текущего пробега и длину самого длинного пробега. Мы делаем поля строгими так, чтобы строгость сгиба не позволяла цепочкам Громовых звуков падать на землю. накапливается в памяти. Создание строгого типа данных для состояния-это простой способ получить контроль как над его представлением в памяти, так и над вычислением его полей.

data LongestRun = LongestRun !Bool !Int !Int

{-# INLINE extendRun #-}
extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest)
  where
    current = if x == previous then run + 1 else 1

{-# INLINE longestRun #-}
longestRun :: ListS Bool -> Int
longestRun l = longest
 where
   (LongestRun _ _ longest) = build l extendRun (LongestRun False 0 0)

И мы закончили

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ longestRun $ bitStream' bs

Это намного быстрее, но не совсем производительность c.

longest-seq-c       0m00.12s    (C)
longest-seq         0m08.65s    (Haskell ByteString)
longest-seq-fuse    0m00.81s    (Haskell ByteString fused)

Программа выделяет около 1 Мб для чтения 1000000 байт из входных данных.

total alloc =   1,173,104 bytes  (excludes profiling overheads)

Обновленный код github

Я нашел другое решение, которое находится на одном уровне с C. Data.Vector.Fusion.Stream.Monadic имеет потоковую реализацию, основанную на этой работеCoutts, Leschinskiy, Stewart 2007 . Идея заключается в том, чтобы использовать слияние потока destroy/unfoldr.

Напомним, что список развернут. :: (b -> Maybe (a, b)) -> b -> [a] создает список путем многократного применения (разворачивания) пошаговой функции, начиная с начального значения. A Stream - это просто функция unfoldr с начальным состоянием. (Библиотека Data.Vector.Fusion.Stream.Monadic использует GADTs для создания конструкторы для Step это может быть удобно согласовано с образцом. Думаю, это можно было бы сделать и без Гадтов.)

Центральной частью решения является функция mkBitstream :: BSL.ByteString -> Stream Bool, которая превращает BytesString в поток Bool. В основном, мы отслеживаем текущий ByteString, текущий байт и сколько из текущего байта все еще не используется. Всякий раз, когда один байт используется, другой байт отсекается ByteString. Когда Nothing остается, поток является Done.

The longestRun функция взята прямо из решения @Cirdec.

Вот этюд:

{-# LANGUAGE CPP #-}
#define PHASE_FUSED [1]
#define PHASE_INNER [0]
#define INLINE_FUSED INLINE PHASE_FUSED
#define INLINE_INNER INLINE PHASE_INNER
module Main where

import           Control.Monad.Identity            (Identity)
import           Data.Bits                         (shiftR, (.&.))
import qualified Data.ByteString.Lazy              as BSL
import           Data.Functor.Identity             (runIdentity)
import qualified Data.Vector.Fusion.Stream.Monadic as S
import           Data.Word8                        (Word8)

type Stream a = S.Stream Identity a   -- no need for any monad, really

data Step = Step BSL.ByteString !Word8 !Word8   -- could use tuples, but this is faster

mkBitstream :: BSL.ByteString -> Stream Bool
mkBitstream bs' = S.Stream step (Step bs' 0 0) where
    {-# INLINE_INNER step #-}
    step (Step bs w n) | n==0 = case (BSL.uncons bs) of
                            Nothing        -> return S.Done
                            Just (w', bs') -> return $ 
                                S.Yield (w' .&. 1 == 1) (Step bs' (w' `shiftR` 1) 7)
                       | otherwise = return $ 
                                S.Yield (w .&. 1 == 1) (Step bs (w `shiftR` 1) (n-1))


data LongestRun = LongestRun !Bool !Int !Int

{-# INLINE extendRun #-}
extendRun :: LongestRun -> Bool -> LongestRun
extendRun (LongestRun previous run longest) x  = LongestRun x current (max current longest)
    where current = if x == previous then run + 1 else 1

{-# INLINE longestRun #-}
longestRun :: Stream Bool -> Int
longestRun s = runIdentity $ do
    (LongestRun _ _ longest) <- S.foldl' extendRun (LongestRun False 0 0) s
    return longest

main :: IO ()
main = do
    bs <- BSL.getContents
    print $ longestRun (mkBitstream bs)