From 96c5befe81879bcd958167107a54ee1dc02c8f95 Mon Sep 17 00:00:00 2001 From: willjgould <47950262+willjgould@users.noreply.github.com> Date: Thu, 11 Jan 2024 10:08:48 +0000 Subject: [PATCH] PLT-9070 - Marconi components; further generalisation of index running machinery (#285) * PLT-9070 - Allow monadic preprocessing * PLT-9070 - Loosen runEmitterAndConsumer arg reqs * PLT-9070 - Generalise and expose stream emitter * PLT-9070 - Loosen streamEmitter a bit more * PLT-9070 - More experimental changes * PLT-9070 - More experimentation with stateful streaming * PLT-9070 - Removed the transformer wackiness * PLT-9070 - Remove specialised config dependency from streamEmitter * PLT-9070 - More messy experimenting * PLT-9070 - Use optparse-applicative-fork * PLT-9070 - Try to dedupe the streaming changes * PLT-9070 - Added accidentally removed comment back in * PLT-9070 - Tiny comments * PLT-9070 - optparse-applicative-fork in marconi-chain-index-legacy * PLT-9070 - Updated golden files * PLT-9070 - Missed some golden files * PLT-9070 - Dummy file for missing required folder * PLT-9070 - Added golden files back in * PLT-9070 - Moved the golden files into the right folder * Revert "PLT-9070 - Moved the golden files into the right folder" This reverts commit c3460fedbfc4761579c31a4b0e91cfc8d9315ee5. * Revert "PLT-9070 - Added golden files back in" This reverts commit 644a50b402c6e96eca9d497d9f93e39a5dd46350. * Revert "PLT-9070 - Dummy file for missing required folder" This reverts commit 97d9c733b5b67aac36166eae219ed38d8f897e84. * Revert "PLT-9070 - Missed some golden files" This reverts commit adc4c042738e6098b6e68e73bdba05c5c89a01ae. * Revert "PLT-9070 - Updated golden files" This reverts commit 4cea62dead46bc57a141ac51d6237399e0144c0d. * Revert "PLT-9070 - optparse-applicative-fork in marconi-chain-index-legacy" This reverts commit 3aa082bad0a020e3744d49d0f7ab4403ec28f885. * PLT-9070 - Remove dependency on optparse-applicative-fork * PLT-9070 - Removed unneeded language ext --- .../src/Marconi/Cardano/Core/Runner.hs | 66 ++++++++++++------- .../test-lib/Test/Integration.hs | 1 - 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/marconi-cardano-core/src/Marconi/Cardano/Core/Runner.hs b/marconi-cardano-core/src/Marconi/Cardano/Core/Runner.hs index 876ab74dd5..3d7182a3c3 100644 --- a/marconi-cardano-core/src/Marconi/Cardano/Core/Runner.hs +++ b/marconi-cardano-core/src/Marconi/Cardano/Core/Runner.hs @@ -12,6 +12,10 @@ module Marconi.Cardano.Core.Runner ( -- * Runner runIndexerOnChainSync, runIndexerOnSnapshot, + runEmitterAndConsumer, + + -- * Emitters + streamEmitter, -- ** Runner Config RunIndexerConfig (RunIndexerConfig), @@ -133,8 +137,9 @@ runIndexerOnChainSync runIndexerOnChainSync config indexer = do void $ runEmitterAndConsumer + (eventPreprocessing ^. runIndexerExtractTipDistance) + (eventPreprocessing ^. runIndexerExtractBlockNo) securityParam - eventPreprocessing (chainSyncEventEmitter config indexer) Core.CloseOn where @@ -155,9 +160,10 @@ runIndexerOnSnapshot -> IO (Concurrent.MVar (indexer event)) runIndexerOnSnapshot config indexer stream = runEmitterAndConsumer + (eventPreprocessing ^. runIndexerExtractTipDistance) + (eventPreprocessing ^. runIndexerExtractBlockNo) securityParam - eventPreprocessing - (streamBlockEventEmitter config indexer stream) + (streamEmitter (eventPreprocessing ^. runIndexerPreprocessEvent) securityParam indexer stream) Core.CloseOff where securityParam = Lens.view runIndexerOnSnapshotConfigSecurityParam config @@ -179,14 +185,18 @@ runEmitterAndConsumer , Core.IsIndex (ExceptT Core.IndexerError IO) event indexer , Core.Closeable IO indexer ) - => SecurityParam - -> RunIndexerEventPreprocessing rawEvent event + => (event -> Maybe Word) + -- ^ A tip extraction function + -> (event -> Maybe C.BlockNo) + -- ^ A block extraction function + -> SecurityParam -> IO (EventEmitter indexer event a) -> Core.CloseSwitch -> IO (Concurrent.MVar (indexer event)) runEmitterAndConsumer + tipExtractor + blockExtractor securityParam - eventPreprocessing eventEmitter closeSwitch = do @@ -197,7 +207,11 @@ runEmitterAndConsumer where consumer queue indexerMVar = do Core.processQueue - (stablePointComputation securityParam eventPreprocessing) + ( stablePointComputation + tipExtractor + blockExtractor + securityParam + ) Map.empty queue indexerMVar @@ -245,30 +259,31 @@ chainSyncEventEmitter instead of consuming the stream a caller will consume the created queue. The reason behind this is to provide the same interface as 'chainSyncEventEmitter'. -} -streamBlockEventEmitter +streamEmitter :: (Core.Point event ~ C.ChainPoint) - => RunIndexerOnSnapshotConfig BlockEvent event + => (pre -> [Core.ProcessedInput C.ChainPoint event]) + -- ^ A preprocessing function + -> SecurityParam -> indexer event - -> S.Stream (S.Of BlockEvent) IO () + -> S.Stream (S.Of pre) IO () -> IO (EventEmitter indexer event ()) -streamBlockEventEmitter config indexer stream = do +streamEmitter processEvent securityParam indexer stream = do queue <- STM.newTBQueueIO $ fromIntegral securityParam indexerMVar <- Concurrent.newMVar indexer - let processEvent = eventProcessing ^. runIndexerPreprocessEvent - emitEvents = mkEventStream processEvent queue stream + let emitEvents = mkEventStream processEvent queue stream pure EventEmitter{queue, indexerMVar, emitEvents} - where - securityParam = Lens.view runIndexerOnSnapshotConfigSecurityParam config - eventProcessing = Lens.view runIndexerOnSnapshotConfigEventProcessing config stablePointComputation - :: SecurityParam - -> RunIndexerEventPreprocessing rawEvent event + :: (event -> Maybe Word) + -- ^ A tip extraction function + -> (event -> Maybe C.BlockNo) + -- ^ A block extraction function + -> SecurityParam -> Core.Timed C.ChainPoint (Maybe event) -> State (Map C.BlockNo C.ChainPoint) (Maybe C.ChainPoint) -stablePointComputation securityParam preprocessing (Core.Timed point event) = do - let distanceM = preprocessing ^. runIndexerExtractTipDistance =<< event - blockNoM = preprocessing ^. runIndexerExtractBlockNo =<< event +stablePointComputation tipExtractor blockExtractor securityParam (Core.Timed point event) = do + let distanceM = tipExtractor =<< event + blockNoM = blockExtractor =<< event case (distanceM, blockNoM) of (Just distance, Just blockNo) -> if distance > fromIntegral securityParam @@ -290,8 +305,8 @@ getBlockNo (C.BlockInMode block _eraInMode) = -- | Event preprocessing, to ease the coordinator work mkEventStream - :: (inputEvent -> [Core.ProcessedInput (Core.Point inputEvent) outputEvent]) - -> STM.TBQueue (Core.ProcessedInput (Core.Point inputEvent) outputEvent) + :: (inputEvent -> [Core.ProcessedInput (Core.Point outputEvent) outputEvent]) + -> STM.TBQueue (Core.ProcessedInput (Core.Point outputEvent) outputEvent) -> S.Stream (S.Of inputEvent) IO r -> IO r mkEventStream processEvent q = @@ -321,7 +336,10 @@ withDistanceAndTipPreprocessor = getDistance _ = Nothing blockNoFromBlockEvent (TipAndBlock _ (Just event)) = Just . getBlockNo . blockInMode $ getEvent event blockNoFromBlockEvent _ = Nothing - in RunIndexerEventPreprocessing extractChainTipAndAddDistance blockNoFromBlockEvent getDistance + in RunIndexerEventPreprocessing + extractChainTipAndAddDistance + blockNoFromBlockEvent + getDistance withNoPreprocessor :: RunIndexerEventPreprocessing (ChainSyncEvent BlockEvent) BlockEvent withNoPreprocessor = diff --git a/marconi-cardano-indexers/test-lib/Test/Integration.hs b/marconi-cardano-indexers/test-lib/Test/Integration.hs index e3f0660ff7..63342a7bc7 100644 --- a/marconi-cardano-indexers/test-lib/Test/Integration.hs +++ b/marconi-cardano-indexers/test-lib/Test/Integration.hs @@ -93,7 +93,6 @@ unboundedValidityRange :: (C.TxValidityLowerBound C.BabbageEra, C.TxValidityUppe unboundedValidityRange = (C.TxValidityNoLowerBound, C.TxValidityNoUpperBound C.ValidityNoUpperBoundInBabbageEra) {- Transaction operations -} - validateAndSubmitTx :: (MonadIO m) => C.LocalNodeConnectInfo C.CardanoMode