fluent-logger-haskell作った

なんかイマサラだけど,少し前にfluent-loggerのHaskell版と,そのconduitインターフェース版を作った.

Haskellからfluentdにポンポンとイベントログ吐ける.

以下,process-conduitとcsv-conduitでdstatの出力を吐き出す*1例,

{-# LANGUAGE OverloadedStrings #-}
import Prelude hiding ( null )
import qualified Data.HashMap.Strict as HM
import Data.Conduit
import Data.Conduit.Process
import qualified Data.Conduit.List as CL
import Data.CSV.Conduit
import Network.Fluent.Logger
import Network.Fluent.Conduit
import Data.ByteString ( ByteString, null )
import Data.ByteString.Char8 ( unpack )
import Data.List ( groupBy )
import Data.Function

type Formatter a = [ByteString] -> [ByteString] -> [ByteString] -> a

settings :: FluentSettings
settings = defaultFluentSettings { fluentSettingsTag = "dstat"
                                 , fluentSettingsHost = "127.0.0.1"
                                 }

main :: IO ()
main = runResourceT $
       sourceDstat "/usr/bin/dstat" ["-cm"] =$=
       awaitForever (mapM_ yield) $$
       sinkFluent settings

sourceDstat :: MonadResource m =>
               FilePath
            -> [String]
            -> GSource m [ ( ByteString, HM.HashMap ByteString Double) ]
sourceDstat dstat opts = run cmd >+> toCSV >+> convert toDstatMap
    where
      run = sourceCmd . unwords
      cmd = dstat : opts ++ [ "--output", "/dev/fd/3"
                            , "3>&1", ">/dev/null", "2>/dev/null" ]

toCSV :: MonadResource m =>
         GInfConduit ByteString m [ByteString]
toCSV = injectLeftovers $ intoCSV defCSVSettings

convert :: Monad m =>
           Formatter o
        -> Pipe l [ByteString] o u m ()
convert format = CL.drop 5 >> withAwait (withAwait . convert' . fill)
    where
      convert' headers subs = withAwait $ \xs -> do
                                yield $ format headers subs xs
                                convert' headers subs

withAwait :: Monad m =>
             (i -> Pipe l i o u m ())
          -> Pipe l i o u m ()
withAwait f = await >>= maybe (return ()) f

toDstatMap :: Formatter [ (ByteString, HM.HashMap ByteString Double) ]
toDstatMap headers subs xs =
    [ ( fst $ head zs
      , HM.fromList $ map snd zs )
    | let ys = zip headers $ zip subs $ map (read . unpack) xs
    , zs <- groupBy ((==) `on` fst) ys
    ]

fill :: [ByteString] -> [ByteString]
fill = scanl1 (\x y -> if null y then x else y)

fluentd側で標準出力に出してみるとこんなカンジ,

...
2012-12-29 17:33:49 +0900: adding match pattern="dstat.**" type="stdout"
2012-12-29 17:33:49 +0900: listening fluent socket on 0.0.0.0:24224
2012-12-29 17:33:49 +0900: listening dRuby uri="druby://0.0.0.0:24230" object="Engine"
2012-12-29 17:34:00 +0900 dstat.total cpu usage: {"hiq":0.0,"wai":0.005,"siq":0.003,"sys":0.099,"usr":0.169,"idl":99.723}
2012-12-29 17:34:00 +0900 dstat.memory usage: {"buff":349925376.0,"cach":3611164672.0,"used":707342336.0,"free":3712630784.0}
2012-12-29 17:34:01 +0900 dstat.total cpu usage: {"hiq":0.0,"wai":0.0,"siq":0.0,"sys":0.0,"usr":0.125,"idl":99.875}
2012-12-29 17:34:01 +0900 dstat.memory usage: {"buff":349925376.0,"cach":3611164672.0,"used":707350528.0,"free":3712622592.0}
2012-12-29 17:34:02 +0900 dstat.total cpu usage: {"hiq":0.0,"wai":0.0,"siq":0.0,"sys":0.0,"usr":0.0,"idl":100.0}
2012-12-29 17:34:02 +0900 dstat.memory usage: {"buff":349925376.0,"cach":3611164672.0,"used":707604480.0,"free":3712368640.0}
2012-12-29 17:34:03 +0900 dstat.total cpu usage: {"hiq":0.0,"wai":0.0,"siq":0.0,"sys":0.124,"usr":0.0,"idl":99.876}
2012-12-29 17:34:03 +0900 dstat.memory usage: {"buff":349925376.0,"cach":3611164672.0,"used":707604480.0,"free":3712368640.0}
2012-12-29 17:34:04 +0900 dstat.total cpu usage: {"hiq":0.0,"wai":0.126,"siq":0.126,"sys":0.0,"usr":0.0,"idl":99.748}
2012-12-29 17:34:04 +0900 dstat.memory usage: {"buff":349929472.0,"cach":3611160576.0,"used":707604480.0,"free":3712368640.0}
2012-12-29 17:34:05 +0900 dstat.total cpu usage: {"hiq":0.0,"wai":0.0,"siq":0.0,"sys":0.0,"usr":0.125,"idl":99.875}
2012-12-29 17:34:05 +0900 dstat.memory usage: {"buff":349929472.0,"cach":3611160576.0,"used":707604480.0,"free":3712368640.0}

動確取ってるのは現在のhaskell-platformに積んであるghc-7.4だが,今現在hackageはghc-7.6でビルドしようとするので,依存してるmsgpackがTHガラミか何かでBuild failureになってしまう.そういうときドキュメントが生成してくれないのがhackageのちょっと悲しいトコロ.

*1:それfluent-plugin-dstatで(ry