Эффективных потоков в Haskell
В постоянном стремлении эффективно манипулировать битами (например, см. Этоттак вопрос ) новейшей проблемой является эффективное потоковое использование и потребление битов.
В качестве первой простой задачи я выбираю поиск самой длинной последовательности идентичных битов в битовом потоке, генерируемом/dev/urandom
. Типичным заклинанием было бы head -c 1000000 </dev/urandom | my-exe
. Фактическая цель состоит в том, чтобы передавать биты и декодировать код Elias gamma , например, т. е. коды, которые не являются кусками байтов или кратными из этого.
Для таких кодов переменной длины хорошо иметь take
, takeWhile
, group
, и т.д. язык для работы со списком. Поскольку BitStream.take
фактически поглотил бы часть Бистрема, в игру, вероятно, вступила бы какая-то монада.
Очевидной отправной точкой является ленивый байтестринг от Data.ByteString.Lazy
.
A. подсчет байтов
Эта очень простая программа Haskell работает наравне с программой C, как и должно быть ожидаемый.
import qualified Data.ByteString.Lazy as BSL
main :: IO ()
main = do
bs <- BSL.getContents
print $ BSL.length bs
B. добавление байтов
Как только я начну использовать unpack
, все должно стать еще хуже.
main = do
bs <- BSL.getContents
print $ sum $ BSL.unpack bs
Удивительно, но Хаскелл и Си показывают почти одинаковую производительность.
C. Самая длинная последовательность идентичных битов
В качестве первой нетривиальной задачи самая длинная последовательность идентичных битов может быть найдена следующим образом:
module Main where
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import Data.List (group)
import Data.Word8 (Word8)
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (i-> (w `shiftR` i) .&. 1 == 1) [0..7]
bitStream :: BSL.ByteString -> [Bool]
bitStream bs = concat $ map splitByte (BSL.unpack bs)
main :: IO ()
main = do
bs <- BSL.getContents
print $ maximum $ length <$> (group $ bitStream bs)
Ленивый байтестринг преобразуется в список [Word8]
, а затем, используя сдвиги, каждый Word
разбивается на биты, в результате получается список [Bool]
. Этот список списков затем выравнивается с помощью concat
. Получив (ленивый) список Bool
, используйте group
для разбиения списка на последовательности идентичных битов и затем сопоставьте length
над ним. Наконец maximum
дает желаемый результат. Довольно просто, но не очень быстро:
# C
real 0m0.606s
# Haskell
real 0m6.062s
Эта наивная реализация ровно на порядок медленнее.
Профилирование показывает, что выделяется довольно много памяти (около 3 ГБ для разбора 1 МБ входных данных). Здесь нет однако следует отметить массовую утечку информации из космоса.
Отсюда я начинаю копаться:
- существует
bitstream
пакет , который обещает " быстрые, упакованные, строгие битовые потоки (т. е. список Булов) с полуавтоматическим слиянием потоков.". К сожалению, он не соответствует текущему пакетуvector
, см. здесь для получения подробной информации. - далее я исследую
streaming
. Я не совсем понимаю, зачем мне нужна "эффектная" трансляция, которая приносит некоторые монада в игре - по крайней мере, пока я не начну с обратной поставленной задачи, то есть кодирования и записи битовых потоков в файл. - как насчет того, чтобы просто
fold
- надByteString
? Мне пришлось бы ввести штат, чтобы отслеживать потребленные биты. Это не совсем приятно.take
,takeWhile
,group
, и т.д. язык, который является желательным.
Обновление :
Я понял, как это сделать с помощью streaming
и еще streaming-bytestring
. Вероятно, я делаю это неправильно, потому что результат катастрофически плох.
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Streaming as BSS
import Data.Word8 (Word8)
import qualified Streaming as S
import Streaming.Prelude (Of, Stream)
import qualified Streaming.Prelude as S
splitByte :: Word8 -> [Bool]
splitByte w = (i-> (w `shiftR` i) .&. 1 == 1) <$> [0..7]
bitStream :: Monad m => Stream (Of Word8) m () -> Stream (Of Bool) m ()
bitStream s = S.concat $ S.map splitByte s
main :: IO ()
main = do
let bs = BSS.unpack BSS.getContents :: Stream (Of Word8) IO ()
gs = S.group $ bitStream bs :: Stream (Stream (Of Bool) IO) IO ()
maxLen <- S.maximum $ S.mapped S.length gs
print $ S.fst' maxLen
Это проверит ваше терпение с чем-либо, кроме нескольких тысяч байт ввода от stdin. Профилировщик говорит, что он тратит безумное количество времени (квадратичное по размеру входного сигнала) в Streaming.Internal.>>=.loop
и Data.Functor.Of.fmap
. Я не совсем уверен, что первое, но fmap
указывает (?) что жонглирование этими Of a b
не приносит нам никакой пользы, и поскольку мы находимся в монаде ИО, этого не может быть оптимизирован в сторону.
У меня также есть потоковый эквивалент байтового сумматора здесь: SumBytesStream.hs
, что немного медленнее, чем простая ленивая реализация ByteString
, но все же прилично. Поскольку streaming-bytestring
являетсяпровозглашенным "bytestring io сделано правильно ", я ожидал лучшего. Тогда, наверное, я делаю это неправильно.
BSS.getContents
заставляет меня войти в монаду ИО, потому что getContents :: MonadIO m => ByteString m ()
и нет никакого выхода.
Обновление 2
Следуя совету @dfeuer, я использовал streaming
пакет в master@HEAD. И вот результат.
longest-seq-c 0m0.747s (C)
longest-seq 0m8.190s (Haskell ByteString)
longest-seq-stream 0m13.946s (Haskell streaming-bytestring)
Проблема O(n^2) Streaming.concat
решена, но мы все еще не приближаемся к эталону C.
Обновление 3
Решение Cirdec производит производительность наравне с C. используемая конструкция называется "церковные закодированные списки", см. Это так ответьте или Haskell Wiki натипах ранга N .
Исходные файлы:
Все исходные файлы можно найти на github. У Makefile
есть все различные цели для выполнения экспериментов и профилирования. По умолчанию make
просто соберет все (сначала создайте каталог bin/
!) и затем make time
будет выполнять синхронизацию исполняемых файлов longest-seq
. Исполняемые файлы C получают a -c
, добавленное для их различения.
2 ответа:
Промежуточные распределения и соответствующие им накладные расходы могут быть удалены, когда операции над потоками сливаются вместе. Прелюдия GHC обеспечивает слияние foldr / build для ленивых потоков в видеПравил перезаписи . Общая идея заключается в том, что если одна функция производит результат, который выглядит как foldr (он имеет тип
(a -> b -> b) -> b -> b
, примененный к(:)
и[]
), а другая функция потребляет список, который выглядит как foldr, построение промежуточного списка может быть удалено.Для вашего задача я собираюсь построить нечто подобное, но с использованием строгих левых складок (
foldl'
) вместо foldr. Вместо того чтобы использовать правила перезаписи, которые пытаются определить, когда что-то выглядит какfoldl
, я буду использовать тип данных, который заставляет списки выглядеть как левые сгибы.-- A list encoded as a strict left fold. newtype ListS a = ListS {build :: forall b. (b -> a -> b) -> b -> b}
Поскольку я начал с отказа от списков, мы будем повторно реализовывать часть прелюдии для списков.
Строгие левые сгибы могут быть созданы из функцийfoldl'
Как списков, так и байтестрингов.{-# INLINE fromList #-} fromList :: [a] -> ListS a fromList l = ListS (\c z -> foldl' c z l) {-# INLINE fromBS #-} fromBS :: BSL.ByteString -> ListS Word8 fromBS l = ListS (\c z -> BSL.foldl' c z l)
В самый простой пример использования одного из них-найти длину списка.
{-# INLINE length' #-} length' :: ListS a -> Int length' l = build l (\z a -> z+1) 0
Мы также можем сопоставлять и соединять левые складки.
{-# INLINE map' #-} -- fmap renamed so it can be inlined map' f l = ListS (\c z -> build l (\z a -> c z (f a)) z) {-# INLINE concat' #-} concat' :: ListS (ListS a) -> ListS a concat' ll = ListS (\c z -> build ll (\z l -> build l c z) z)
Для вашей задачи нам нужно уметь разбивать слово на биты.
{-# INLINE splitByte #-} splitByte :: Word8 -> [Bool] splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7] {-# INLINE splitByte' #-} splitByte' :: Word8 -> ListS Bool splitByte' = fromList . splitByte
И a
ByteString
в биты{-# INLINE bitStream' #-} bitStream' :: BSL.ByteString -> ListS Bool bitStream' = concat' . map' splitByte' . fromBS
Чтобы найти самый длинный пробег, мы будем отслеживать Предыдущее значение, длину текущего пробега и длину самого длинного пробега. Мы делаем поля строгими так, чтобы строгость сгиба не позволяла цепочкам Громовых звуков падать на землю. накапливается в памяти. Создание строгого типа данных для состояния-это простой способ получить контроль как над его представлением в памяти, так и над вычислением его полей.
data LongestRun = LongestRun !Bool !Int !Int {-# INLINE extendRun #-} extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest) where current = if x == previous then run + 1 else 1 {-# INLINE longestRun #-} longestRun :: ListS Bool -> Int longestRun l = longest where (LongestRun _ _ longest) = build l extendRun (LongestRun False 0 0)
И мы закончили
main :: IO () main = do bs <- BSL.getContents print $ longestRun $ bitStream' bs
Это намного быстрее, но не совсем производительность c.
longest-seq-c 0m00.12s (C) longest-seq 0m08.65s (Haskell ByteString) longest-seq-fuse 0m00.81s (Haskell ByteString fused)
Программа выделяет около 1 Мб для чтения 1000000 байт из входных данных.
total alloc = 1,173,104 bytes (excludes profiling overheads)
Обновленный код github
Я нашел другое решение, которое находится на одном уровне с C.
Data.Vector.Fusion.Stream.Monadic
имеет потоковую реализацию, основанную на этой работеCoutts, Leschinskiy, Stewart 2007 . Идея заключается в том, чтобы использовать слияние потока destroy/unfoldr.Напомним, что список развернут.
Центральной частью решения является функция:: (b -> Maybe (a, b)) -> b -> [a]
создает список путем многократного применения (разворачивания) пошаговой функции, начиная с начального значения. AStream
- это просто функцияunfoldr
с начальным состоянием. (БиблиотекаData.Vector.Fusion.Stream.Monadic
использует GADTs для создания конструкторы дляStep
это может быть удобно согласовано с образцом. Думаю, это можно было бы сделать и без Гадтов.)mkBitstream :: BSL.ByteString -> Stream Bool
, которая превращаетBytesString
в потокBool
. В основном, мы отслеживаем текущийByteString
, текущий байт и сколько из текущего байта все еще не используется. Всякий раз, когда один байт используется, другой байт отсекаетсяByteString
. КогдаNothing
остается, поток являетсяDone
.The
longestRun
функция взята прямо из решения @Cirdec.Вот этюд:
{-# LANGUAGE CPP #-} #define PHASE_FUSED [1] #define PHASE_INNER [0] #define INLINE_FUSED INLINE PHASE_FUSED #define INLINE_INNER INLINE PHASE_INNER module Main where import Control.Monad.Identity (Identity) import Data.Bits (shiftR, (.&.)) import qualified Data.ByteString.Lazy as BSL import Data.Functor.Identity (runIdentity) import qualified Data.Vector.Fusion.Stream.Monadic as S import Data.Word8 (Word8) type Stream a = S.Stream Identity a -- no need for any monad, really data Step = Step BSL.ByteString !Word8 !Word8 -- could use tuples, but this is faster mkBitstream :: BSL.ByteString -> Stream Bool mkBitstream bs' = S.Stream step (Step bs' 0 0) where {-# INLINE_INNER step #-} step (Step bs w n) | n==0 = case (BSL.uncons bs) of Nothing -> return S.Done Just (w', bs') -> return $ S.Yield (w' .&. 1 == 1) (Step bs' (w' `shiftR` 1) 7) | otherwise = return $ S.Yield (w .&. 1 == 1) (Step bs (w `shiftR` 1) (n-1)) data LongestRun = LongestRun !Bool !Int !Int {-# INLINE extendRun #-} extendRun :: LongestRun -> Bool -> LongestRun extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest) where current = if x == previous then run + 1 else 1 {-# INLINE longestRun #-} longestRun :: Stream Bool -> Int longestRun s = runIdentity $ do (LongestRun _ _ longest) <- S.foldl' extendRun (LongestRun False 0 0) s return longest main :: IO () main = do bs <- BSL.getContents print $ longestRun (mkBitstream bs)