Эффективных потоков в Haskell
В постоянном стремлении эффективно манипулировать битами (например, см. Этоттак вопрос ) новейшей проблемой является эффективное потоковое использование и потребление битов.
В качестве первой простой задачи я выбираю поиск самой длинной последовательности идентичных битов в битовом потоке, генерируемом/dev/urandom. Типичным заклинанием было бы head -c 1000000 </dev/urandom | my-exe. Фактическая цель состоит в том, чтобы передавать биты и декодировать код Elias gamma , например, т. е. коды, которые не являются кусками байтов или кратными из этого.
Для таких кодов переменной длины хорошо иметь take, takeWhile, group, и т.д. язык для работы со списком. Поскольку BitStream.take фактически поглотил бы часть Бистрема, в игру, вероятно, вступила бы какая-то монада.
Очевидной отправной точкой является ленивый байтестринг от Data.ByteString.Lazy.
A. подсчет байтов
Эта очень простая программа Haskell работает наравне с программой C, как и должно быть ожидаемый.
import qualified Data.ByteString.Lazy as BSL
main :: IO ()
main = do
bs <- BSL.getContents
print $ BSL.length bs
B. добавление байтов
Как только я начну использовать unpack, все должно стать еще хуже.
main = do
bs <- BSL.getContents
print $ sum $ BSL.unpack bs
Удивительно, но Хаскелл и Си показывают почти одинаковую производительность.
C. Самая длинная последовательность идентичных битов
В качестве первой нетривиальной задачи самая длинная последовательность идентичных битов может быть найдена следующим образом:
module Main where
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Lazy as BSL
import Data.List (group)
import Data.Word8 (Word8)
splitByte :: Word8 -> [Bool]
splitByte w = Prelude.map (i-> (w `shiftR` i) .&. 1 == 1) [0..7]
bitStream :: BSL.ByteString -> [Bool]
bitStream bs = concat $ map splitByte (BSL.unpack bs)
main :: IO ()
main = do
bs <- BSL.getContents
print $ maximum $ length <$> (group $ bitStream bs)
Ленивый байтестринг преобразуется в список [Word8] , а затем, используя сдвиги, каждый Word разбивается на биты, в результате получается список [Bool]. Этот список списков затем выравнивается с помощью concat. Получив (ленивый) список Bool, используйте group для разбиения списка на последовательности идентичных битов и затем сопоставьте length над ним. Наконец maximum дает желаемый результат. Довольно просто, но не очень быстро:
# C
real 0m0.606s
# Haskell
real 0m6.062s
Эта наивная реализация ровно на порядок медленнее.
Профилирование показывает, что выделяется довольно много памяти (около 3 ГБ для разбора 1 МБ входных данных). Здесь нет однако следует отметить массовую утечку информации из космоса.
Отсюда я начинаю копаться:
- существует
bitstreamпакет , который обещает " быстрые, упакованные, строгие битовые потоки (т. е. список Булов) с полуавтоматическим слиянием потоков.". К сожалению, он не соответствует текущему пакетуvector, см. здесь для получения подробной информации. - далее я исследую
streaming. Я не совсем понимаю, зачем мне нужна "эффектная" трансляция, которая приносит некоторые монада в игре - по крайней мере, пока я не начну с обратной поставленной задачи, то есть кодирования и записи битовых потоков в файл. - как насчет того, чтобы просто
fold- надByteString? Мне пришлось бы ввести штат, чтобы отслеживать потребленные биты. Это не совсем приятно.take,takeWhile,group, и т.д. язык, который является желательным.
Обновление :
Я понял, как это сделать с помощью streaming и еще streaming-bytestring. Вероятно, я делаю это неправильно, потому что результат катастрофически плох.
import Data.Bits (shiftR, (.&.))
import qualified Data.ByteString.Streaming as BSS
import Data.Word8 (Word8)
import qualified Streaming as S
import Streaming.Prelude (Of, Stream)
import qualified Streaming.Prelude as S
splitByte :: Word8 -> [Bool]
splitByte w = (i-> (w `shiftR` i) .&. 1 == 1) <$> [0..7]
bitStream :: Monad m => Stream (Of Word8) m () -> Stream (Of Bool) m ()
bitStream s = S.concat $ S.map splitByte s
main :: IO ()
main = do
let bs = BSS.unpack BSS.getContents :: Stream (Of Word8) IO ()
gs = S.group $ bitStream bs :: Stream (Stream (Of Bool) IO) IO ()
maxLen <- S.maximum $ S.mapped S.length gs
print $ S.fst' maxLen
Это проверит ваше терпение с чем-либо, кроме нескольких тысяч байт ввода от stdin. Профилировщик говорит, что он тратит безумное количество времени (квадратичное по размеру входного сигнала) в Streaming.Internal.>>=.loop и Data.Functor.Of.fmap. Я не совсем уверен, что первое, но fmap указывает (?) что жонглирование этими Of a b не приносит нам никакой пользы, и поскольку мы находимся в монаде ИО, этого не может быть оптимизирован в сторону.
У меня также есть потоковый эквивалент байтового сумматора здесь: SumBytesStream.hs, что немного медленнее, чем простая ленивая реализация ByteString, но все же прилично. Поскольку streaming-bytestring являетсяпровозглашенным "bytestring io сделано правильно ", я ожидал лучшего. Тогда, наверное, я делаю это неправильно.
BSS.getContents заставляет меня войти в монаду ИО, потому что getContents :: MonadIO m => ByteString m () и нет никакого выхода.
Обновление 2
Следуя совету @dfeuer, я использовал streaming пакет в master@HEAD. И вот результат.
longest-seq-c 0m0.747s (C)
longest-seq 0m8.190s (Haskell ByteString)
longest-seq-stream 0m13.946s (Haskell streaming-bytestring)
Проблема O(n^2) Streaming.concat решена, но мы все еще не приближаемся к эталону C.
Обновление 3
Решение Cirdec производит производительность наравне с C. используемая конструкция называется "церковные закодированные списки", см. Это так ответьте или Haskell Wiki натипах ранга N .
Исходные файлы:
Все исходные файлы можно найти на github. У Makefile есть все различные цели для выполнения экспериментов и профилирования. По умолчанию make просто соберет все (сначала создайте каталог bin/!) и затем make time будет выполнять синхронизацию исполняемых файлов longest-seq. Исполняемые файлы C получают a -c, добавленное для их различения.
2 ответа:
Промежуточные распределения и соответствующие им накладные расходы могут быть удалены, когда операции над потоками сливаются вместе. Прелюдия GHC обеспечивает слияние foldr / build для ленивых потоков в видеПравил перезаписи . Общая идея заключается в том, что если одна функция производит результат, который выглядит как foldr (он имеет тип
(a -> b -> b) -> b -> b, примененный к(:)и[]), а другая функция потребляет список, который выглядит как foldr, построение промежуточного списка может быть удалено.Для вашего задача я собираюсь построить нечто подобное, но с использованием строгих левых складок (
foldl') вместо foldr. Вместо того чтобы использовать правила перезаписи, которые пытаются определить, когда что-то выглядит какfoldl, я буду использовать тип данных, который заставляет списки выглядеть как левые сгибы.-- A list encoded as a strict left fold. newtype ListS a = ListS {build :: forall b. (b -> a -> b) -> b -> b}Поскольку я начал с отказа от списков, мы будем повторно реализовывать часть прелюдии для списков.
Строгие левые сгибы могут быть созданы из функцийfoldl'Как списков, так и байтестрингов.{-# INLINE fromList #-} fromList :: [a] -> ListS a fromList l = ListS (\c z -> foldl' c z l) {-# INLINE fromBS #-} fromBS :: BSL.ByteString -> ListS Word8 fromBS l = ListS (\c z -> BSL.foldl' c z l)В самый простой пример использования одного из них-найти длину списка.
{-# INLINE length' #-} length' :: ListS a -> Int length' l = build l (\z a -> z+1) 0Мы также можем сопоставлять и соединять левые складки.
{-# INLINE map' #-} -- fmap renamed so it can be inlined map' f l = ListS (\c z -> build l (\z a -> c z (f a)) z) {-# INLINE concat' #-} concat' :: ListS (ListS a) -> ListS a concat' ll = ListS (\c z -> build ll (\z l -> build l c z) z)Для вашей задачи нам нужно уметь разбивать слово на биты.
{-# INLINE splitByte #-} splitByte :: Word8 -> [Bool] splitByte w = Prelude.map (\i-> (w `shiftR` i) .&. 1 == 1) [0..7] {-# INLINE splitByte' #-} splitByte' :: Word8 -> ListS Bool splitByte' = fromList . splitByteИ a
ByteStringв биты{-# INLINE bitStream' #-} bitStream' :: BSL.ByteString -> ListS Bool bitStream' = concat' . map' splitByte' . fromBSЧтобы найти самый длинный пробег, мы будем отслеживать Предыдущее значение, длину текущего пробега и длину самого длинного пробега. Мы делаем поля строгими так, чтобы строгость сгиба не позволяла цепочкам Громовых звуков падать на землю. накапливается в памяти. Создание строгого типа данных для состояния-это простой способ получить контроль как над его представлением в памяти, так и над вычислением его полей.
data LongestRun = LongestRun !Bool !Int !Int {-# INLINE extendRun #-} extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest) where current = if x == previous then run + 1 else 1 {-# INLINE longestRun #-} longestRun :: ListS Bool -> Int longestRun l = longest where (LongestRun _ _ longest) = build l extendRun (LongestRun False 0 0)И мы закончили
main :: IO () main = do bs <- BSL.getContents print $ longestRun $ bitStream' bsЭто намного быстрее, но не совсем производительность c.
longest-seq-c 0m00.12s (C) longest-seq 0m08.65s (Haskell ByteString) longest-seq-fuse 0m00.81s (Haskell ByteString fused)Программа выделяет около 1 Мб для чтения 1000000 байт из входных данных.
total alloc = 1,173,104 bytes (excludes profiling overheads)Обновленный код github
Я нашел другое решение, которое находится на одном уровне с C.
Data.Vector.Fusion.Stream.Monadicимеет потоковую реализацию, основанную на этой работеCoutts, Leschinskiy, Stewart 2007 . Идея заключается в том, чтобы использовать слияние потока destroy/unfoldr.Напомним, что список развернут.
Центральной частью решения является функция:: (b -> Maybe (a, b)) -> b -> [a]создает список путем многократного применения (разворачивания) пошаговой функции, начиная с начального значения. AStream- это просто функцияunfoldrс начальным состоянием. (БиблиотекаData.Vector.Fusion.Stream.Monadicиспользует GADTs для создания конструкторы дляStepэто может быть удобно согласовано с образцом. Думаю, это можно было бы сделать и без Гадтов.)mkBitstream :: BSL.ByteString -> Stream Bool, которая превращаетBytesStringв потокBool. В основном, мы отслеживаем текущийByteString, текущий байт и сколько из текущего байта все еще не используется. Всякий раз, когда один байт используется, другой байт отсекаетсяByteString. КогдаNothingостается, поток являетсяDone.The
longestRunфункция взята прямо из решения @Cirdec.Вот этюд:
{-# LANGUAGE CPP #-} #define PHASE_FUSED [1] #define PHASE_INNER [0] #define INLINE_FUSED INLINE PHASE_FUSED #define INLINE_INNER INLINE PHASE_INNER module Main where import Control.Monad.Identity (Identity) import Data.Bits (shiftR, (.&.)) import qualified Data.ByteString.Lazy as BSL import Data.Functor.Identity (runIdentity) import qualified Data.Vector.Fusion.Stream.Monadic as S import Data.Word8 (Word8) type Stream a = S.Stream Identity a -- no need for any monad, really data Step = Step BSL.ByteString !Word8 !Word8 -- could use tuples, but this is faster mkBitstream :: BSL.ByteString -> Stream Bool mkBitstream bs' = S.Stream step (Step bs' 0 0) where {-# INLINE_INNER step #-} step (Step bs w n) | n==0 = case (BSL.uncons bs) of Nothing -> return S.Done Just (w', bs') -> return $ S.Yield (w' .&. 1 == 1) (Step bs' (w' `shiftR` 1) 7) | otherwise = return $ S.Yield (w .&. 1 == 1) (Step bs (w `shiftR` 1) (n-1)) data LongestRun = LongestRun !Bool !Int !Int {-# INLINE extendRun #-} extendRun :: LongestRun -> Bool -> LongestRun extendRun (LongestRun previous run longest) x = LongestRun x current (max current longest) where current = if x == previous then run + 1 else 1 {-# INLINE longestRun #-} longestRun :: Stream Bool -> Int longestRun s = runIdentity $ do (LongestRun _ _ longest) <- S.foldl' extendRun (LongestRun False 0 0) s return longest main :: IO () main = do bs <- BSL.getContents print $ longestRun (mkBitstream bs)