module Network.LibP2P.Protocol.GossipSub.Heartbeat
( heartbeatOnce
, runHeartbeat
) where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, async)
import Control.Concurrent.STM
import Control.Monad (forM_, unless, when)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Time (UTCTime, addUTCTime, diffUTCTime)
import Data.Word (Word64)
import List.Shuffle (sampleIO)
import Network.LibP2P.Crypto.PeerId (PeerId)
import Network.LibP2P.Protocol.GossipSub.Types
import Network.LibP2P.Protocol.GossipSub.MessageCache (cacheGetGossipIds, cacheShift)
import Network.LibP2P.Protocol.GossipSub.Score (computeScore, decayPeerCounters)
heartbeatOnce :: GossipSubRouter -> IO ()
heartbeatOnce :: GossipSubRouter -> IO ()
heartbeatOnce GossipSubRouter
router = do
GossipSubRouter -> IO ()
meshMaintenance GossipSubRouter
router
GossipSubRouter -> IO ()
fanoutMaintenance GossipSubRouter
router
GossipSubRouter -> IO ()
emitGossip GossipSubRouter
router
GossipSubRouter -> IO ()
decayAllScores GossipSubRouter
router
GossipSubRouter -> IO ()
cleanSeenCache GossipSubRouter
router
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar Int
gsHeartbeatCount GossipSubRouter
router) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
runHeartbeat :: GossipSubRouter -> IO (Async ())
runHeartbeat :: GossipSubRouter -> IO (Async ())
runHeartbeat GossipSubRouter
router = IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ GossipSubRouter -> IO ()
heartbeatLoop GossipSubRouter
router
heartbeatLoop :: GossipSubRouter -> IO ()
heartbeatLoop :: GossipSubRouter -> IO ()
heartbeatLoop GossipSubRouter
router = do
let intervalUs :: Int
intervalUs = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (GossipSubParams -> NominalDiffTime
paramHeartbeatInterval (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router) NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1000000) :: Int
Int -> IO ()
threadDelay Int
intervalUs
GossipSubRouter -> IO ()
heartbeatOnce GossipSubRouter
router
GossipSubRouter -> IO ()
heartbeatLoop GossipSubRouter
router
meshMaintenance :: GossipSubRouter -> IO ()
meshMaintenance :: GossipSubRouter -> IO ()
meshMaintenance GossipSubRouter
router = do
now <- GossipSubRouter -> IO UTCTime
gsGetTime GossipSubRouter
router
meshMap <- readTVarIO (gsMesh router)
forM_ (Map.toList meshMap) $ \(Topic
topic, Set PeerId
meshPeers) -> do
remaining <- GossipSubRouter
-> Topic -> Set PeerId -> UTCTime -> IO (Set PeerId)
pruneNegativeScore GossipSubRouter
router Topic
topic Set PeerId
meshPeers UTCTime
now
filled <- fillUndersubscribed router topic remaining now
trimOversubscribed router topic filled
pruneNegativeScore :: GossipSubRouter -> Topic -> Set.Set PeerId -> UTCTime -> IO (Set.Set PeerId)
pruneNegativeScore :: GossipSubRouter
-> Topic -> Set PeerId -> UTCTime -> IO (Set PeerId)
pruneNegativeScore GossipSubRouter
router Topic
topic Set PeerId
meshPeers UTCTime
now = do
let scoreParams :: PeerScoreParams
scoreParams = GossipSubRouter -> PeerScoreParams
gsScoreParams GossipSubRouter
router
ipMap <- TVar (Map ByteString (Set PeerId))
-> IO (Map ByteString (Set PeerId))
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map ByteString (Set PeerId))
gsIPPeerCount GossipSubRouter
router)
peers <- readTVarIO (gsPeers router)
let negatives = (PeerId -> Bool) -> Set PeerId -> Set PeerId
forall a. (a -> Bool) -> Set a -> Set a
Set.filter (\PeerId
pid ->
case PeerId -> Map PeerId PeerState -> Maybe PeerState
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup PeerId
pid Map PeerId PeerState
peers of
Maybe PeerState
Nothing -> Bool
False
Just PeerState
ps -> PeerScoreParams
-> PeerState -> Map ByteString (Set PeerId) -> UTCTime -> Double
computeScore PeerScoreParams
scoreParams PeerState
ps Map ByteString (Set PeerId)
ipMap UTCTime
now Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
0
) Set PeerId
meshPeers
forM_ (Set.toList negatives) $ \PeerId
pid -> do
let backoffSecs :: Word64
backoffSecs = NominalDiffTime -> Word64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (GossipSubParams -> NominalDiffTime
paramPruneBackoff (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)) :: Word64
GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
emptyRPC
{ rpcControl = Just emptyControlMessage
{ ctrlPrune = [Prune topic [] (Just backoffSecs)] }
}
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map (PeerId, Topic) UTCTime)
-> (Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map (PeerId, Topic) UTCTime)
gsBackoff GossipSubRouter
router) ((Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ())
-> (Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ()
forall a b. (a -> b) -> a -> b
$
(PeerId, Topic)
-> UTCTime
-> Map (PeerId, Topic) UTCTime
-> Map (PeerId, Topic) UTCTime
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (PeerId
pid, Topic
topic) (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (GossipSubParams -> NominalDiffTime
paramPruneBackoff (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)) UTCTime
now)
let remaining = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.difference Set PeerId
meshPeers Set PeerId
negatives
atomically $ modifyTVar' (gsMesh router) $
Map.insert topic remaining
pure remaining
fillUndersubscribed :: GossipSubRouter -> Topic -> Set.Set PeerId -> UTCTime -> IO (Set.Set PeerId)
fillUndersubscribed :: GossipSubRouter
-> Topic -> Set PeerId -> UTCTime -> IO (Set PeerId)
fillUndersubscribed GossipSubRouter
router Topic
topic Set PeerId
meshPeers UTCTime
now = do
let params :: GossipSubParams
params = GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router
dlo :: Int
dlo = GossipSubParams -> Int
paramDlo GossipSubParams
params
d :: Int
d = GossipSubParams -> Int
paramD GossipSubParams
params
if Set PeerId -> Int
forall a. Set a -> Int
Set.size Set PeerId
meshPeers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
dlo
then Set PeerId -> IO (Set PeerId)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Set PeerId
meshPeers
else do
peersMap <- TVar (Map PeerId PeerState) -> IO (Map PeerId PeerState)
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router)
backoffMap <- readTVarIO (gsBackoff router)
ipMap <- readTVarIO (gsIPPeerCount router)
let eligible = [ PeerId
pid | (PeerId
pid, PeerState
ps) <- Map PeerId PeerState -> [(PeerId, PeerState)]
forall k a. Map k a -> [(k, a)]
Map.toList Map PeerId PeerState
peersMap
, Topic -> Set Topic -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member Topic
topic (PeerState -> Set Topic
psTopics PeerState
ps)
, Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member PeerId
pid Set PeerId
meshPeers)
, Bool -> Bool
not (Map (PeerId, Topic) UTCTime -> PeerId -> Topic -> UTCTime -> Bool
isInBackoff Map (PeerId, Topic) UTCTime
backoffMap PeerId
pid Topic
topic UTCTime
now)
, PeerScoreParams
-> PeerState -> Map ByteString (Set PeerId) -> UTCTime -> Double
computeScore (GossipSubRouter -> PeerScoreParams
gsScoreParams GossipSubRouter
router) PeerState
ps Map ByteString (Set PeerId)
ipMap UTCTime
now Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
>= Double
0
]
let needed = Int
d Int -> Int -> Int
forall a. Num a => a -> a -> a
- Set PeerId -> Int
forall a. Set a -> Int
Set.size Set PeerId
meshPeers
selected <- sampleIO (min needed (length eligible)) eligible
forM_ selected $ \PeerId
pid ->
GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
emptyRPC
{ rpcControl = Just emptyControlMessage { ctrlGraft = [Graft topic] } }
let newMesh = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
meshPeers ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList [PeerId]
selected)
atomically $ modifyTVar' (gsMesh router) $
Map.insert topic newMesh
pure newMesh
trimOversubscribed :: GossipSubRouter -> Topic -> Set.Set PeerId -> IO ()
trimOversubscribed :: GossipSubRouter -> Topic -> Set PeerId -> IO ()
trimOversubscribed GossipSubRouter
router Topic
topic Set PeerId
meshPeers = do
let params :: GossipSubParams
params = GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router
dhi :: Int
dhi = GossipSubParams -> Int
paramDhi GossipSubParams
params
d :: Int
d = GossipSubParams -> Int
paramD GossipSubParams
params
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set PeerId -> Int
forall a. Set a -> Int
Set.size Set PeerId
meshPeers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
dhi) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
now <- GossipSubRouter -> IO UTCTime
gsGetTime GossipSubRouter
router
let meshList = Set PeerId -> [PeerId]
forall a. Set a -> [a]
Set.toList Set PeerId
meshPeers
kept <- sampleIO d meshList
let keptSet = [PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList [PeerId]
kept
toRemove = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.difference Set PeerId
meshPeers Set PeerId
keptSet
forM_ (Set.toList toRemove) $ \PeerId
pid -> do
let backoffSecs :: Word64
backoffSecs = NominalDiffTime -> Word64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (GossipSubParams -> NominalDiffTime
paramPruneBackoff GossipSubParams
params) :: Word64
GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
emptyRPC
{ rpcControl = Just emptyControlMessage
{ ctrlPrune = [Prune topic [] (Just backoffSecs)] }
}
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map (PeerId, Topic) UTCTime)
-> (Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map (PeerId, Topic) UTCTime)
gsBackoff GossipSubRouter
router) ((Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ())
-> (Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ()
forall a b. (a -> b) -> a -> b
$
(PeerId, Topic)
-> UTCTime
-> Map (PeerId, Topic) UTCTime
-> Map (PeerId, Topic) UTCTime
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (PeerId
pid, Topic
topic) (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (GossipSubParams -> NominalDiffTime
paramPruneBackoff GossipSubParams
params) UTCTime
now)
atomically $ modifyTVar' (gsMesh router) $
Map.insert topic keptSet
fanoutMaintenance :: GossipSubRouter -> IO ()
fanoutMaintenance :: GossipSubRouter -> IO ()
fanoutMaintenance GossipSubRouter
router = do
now <- GossipSubRouter -> IO UTCTime
gsGetTime GossipSubRouter
router
let ttl = GossipSubParams -> NominalDiffTime
paramFanoutTTL (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)
fanoutMap <- readTVarIO (gsFanout router)
fanoutPubMap <- readTVarIO (gsFanoutPub router)
forM_ (Map.toList fanoutMap) $ \(Topic
topic, Set PeerId
fanoutPeers) -> do
let lastPub :: UTCTime
lastPub = UTCTime -> Topic -> Map Topic UTCTime -> UTCTime
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault UTCTime
now Topic
topic Map Topic UTCTime
fanoutPubMap
if UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
now UTCTime
lastPub NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> NominalDiffTime
ttl
then
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar (Map Topic (Set PeerId))
-> (Map Topic (Set PeerId) -> Map Topic (Set PeerId)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map Topic (Set PeerId))
gsFanout GossipSubRouter
router) (Topic -> Map Topic (Set PeerId) -> Map Topic (Set PeerId)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Topic
topic)
TVar (Map Topic UTCTime)
-> (Map Topic UTCTime -> Map Topic UTCTime) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map Topic UTCTime)
gsFanoutPub GossipSubRouter
router) (Topic -> Map Topic UTCTime -> Map Topic UTCTime
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Topic
topic)
else do
let d :: Int
d = GossipSubParams -> Int
paramD (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set PeerId -> Int
forall a. Set a -> Int
Set.size Set PeerId
fanoutPeers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
d) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
peersMap <- TVar (Map PeerId PeerState) -> IO (Map PeerId PeerState)
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router)
let eligible = [ PeerId
pid | (PeerId
pid, PeerState
ps) <- Map PeerId PeerState -> [(PeerId, PeerState)]
forall k a. Map k a -> [(k, a)]
Map.toList Map PeerId PeerState
peersMap
, Topic -> Set Topic -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member Topic
topic (PeerState -> Set Topic
psTopics PeerState
ps)
, Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member PeerId
pid Set PeerId
fanoutPeers)
]
let needed = Int
d Int -> Int -> Int
forall a. Num a => a -> a -> a
- Set PeerId -> Int
forall a. Set a -> Int
Set.size Set PeerId
fanoutPeers
selected <- sampleIO (min needed (length eligible)) eligible
let newFanout = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
fanoutPeers ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList [PeerId]
selected)
atomically $ modifyTVar' (gsFanout router) $
Map.insert topic newFanout
emitGossip :: GossipSubRouter -> IO ()
emitGossip :: GossipSubRouter -> IO ()
emitGossip GossipSubRouter
router = do
meshMap <- TVar (Map Topic (Set PeerId)) -> IO (Map Topic (Set PeerId))
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map Topic (Set PeerId))
gsMesh GossipSubRouter
router)
cache <- readTVarIO (gsMessageCache router)
peersMap <- readTVarIO (gsPeers router)
let params = GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router
forM_ (Map.keys meshMap) $ \Topic
topic -> do
let gossipIds :: [ByteString]
gossipIds = Topic -> MessageCache -> [ByteString]
cacheGetGossipIds Topic
topic MessageCache
cache
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([ByteString] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
gossipIds) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let meshPeers :: Set PeerId
meshPeers = Set PeerId -> Topic -> Map Topic (Set PeerId) -> Set PeerId
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault Set PeerId
forall a. Set a
Set.empty Topic
topic Map Topic (Set PeerId)
meshMap
nonMeshPeers :: [PeerId]
nonMeshPeers = [ PeerId
pid | (PeerId
pid, PeerState
ps) <- Map PeerId PeerState -> [(PeerId, PeerState)]
forall k a. Map k a -> [(k, a)]
Map.toList Map PeerId PeerState
peersMap
, Topic -> Set Topic -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member Topic
topic (PeerState -> Set Topic
psTopics PeerState
ps)
, Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member PeerId
pid Set PeerId
meshPeers)
]
dlazy :: Int
dlazy = GossipSubParams -> Int
paramDlazy GossipSubParams
params
factor :: Double
factor = GossipSubParams -> Double
paramGossipFactor GossipSubParams
params
targetCount :: Int
targetCount = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
dlazy (Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (Double
factor Double -> Double -> Double
forall a. Num a => a -> a -> a
* Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([PeerId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [PeerId]
nonMeshPeers)))
targets <- Int -> [PeerId] -> IO [PeerId]
forall (m :: * -> *) a. MonadIO m => Int -> [a] -> m [a]
sampleIO (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
targetCount ([PeerId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [PeerId]
nonMeshPeers)) [PeerId]
nonMeshPeers
forM_ targets $ \PeerId
pid ->
GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
emptyRPC
{ rpcControl = Just emptyControlMessage
{ ctrlIHave = [IHave topic gossipIds] }
}
atomically $ modifyTVar' (gsMessageCache router) cacheShift
decayAllScores :: GossipSubRouter -> IO ()
decayAllScores :: GossipSubRouter -> IO ()
decayAllScores GossipSubRouter
router = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar (Map PeerId PeerState)
-> (Map PeerId PeerState -> Map PeerId PeerState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router) ((Map PeerId PeerState -> Map PeerId PeerState) -> STM ())
-> (Map PeerId PeerState -> Map PeerId PeerState) -> STM ()
forall a b. (a -> b) -> a -> b
$
(PeerState -> PeerState)
-> Map PeerId PeerState -> Map PeerId PeerState
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (PeerScoreParams -> PeerState -> PeerState
decayPeerCounters (GossipSubRouter -> PeerScoreParams
gsScoreParams GossipSubRouter
router))
cleanSeenCache :: GossipSubRouter -> IO ()
cleanSeenCache :: GossipSubRouter -> IO ()
cleanSeenCache GossipSubRouter
router = do
now <- GossipSubRouter -> IO UTCTime
gsGetTime GossipSubRouter
router
let ttl = GossipSubParams -> NominalDiffTime
paramSeenTTL (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)
atomically $ modifyTVar' (gsSeen router) $
Map.filter (\UTCTime
ts -> UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
now UTCTime
ts NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
<= NominalDiffTime
ttl)
isInBackoff :: Map.Map (PeerId, Topic) UTCTime -> PeerId -> Topic -> UTCTime -> Bool
isInBackoff :: Map (PeerId, Topic) UTCTime -> PeerId -> Topic -> UTCTime -> Bool
isInBackoff Map (PeerId, Topic) UTCTime
backoffMap PeerId
pid Topic
topic UTCTime
now =
case (PeerId, Topic) -> Map (PeerId, Topic) UTCTime -> Maybe UTCTime
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (PeerId
pid, Topic
topic) Map (PeerId, Topic) UTCTime
backoffMap of
Maybe UTCTime
Nothing -> Bool
False
Just UTCTime
expires -> UTCTime
now UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
expires