-- | GossipSub heartbeat procedure (docs/11-pubsub.md).
--
-- The heartbeat runs periodically and performs:
-- 1. Mesh maintenance: prune negative-score, fill undersubscribed, trim oversubscribed
-- 2. Fanout maintenance: expire old, fill undersubscribed
-- 3. Gossip emission: send IHAVE to non-mesh peers, rotate cache
-- 4. Score decay: decay all counters for all peers
-- 5. Seen cache cleanup: remove expired entries
-- 6. Heartbeat counter increment (for opportunistic graft timing)
module Network.LibP2P.Protocol.GossipSub.Heartbeat
  ( heartbeatOnce
  , runHeartbeat
  ) where

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, async)
import Control.Concurrent.STM
import Control.Monad (forM_, unless, when)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Time (UTCTime, addUTCTime, diffUTCTime)
import Data.Word (Word64)
import List.Shuffle (sampleIO)
import Network.LibP2P.Crypto.PeerId (PeerId)
import Network.LibP2P.Protocol.GossipSub.Types
import Network.LibP2P.Protocol.GossipSub.MessageCache (cacheGetGossipIds, cacheShift)
import Network.LibP2P.Protocol.GossipSub.Score (computeScore, decayPeerCounters)

-- | Run a single heartbeat cycle. Exported for testing.
heartbeatOnce :: GossipSubRouter -> IO ()
heartbeatOnce :: GossipSubRouter -> IO ()
heartbeatOnce GossipSubRouter
router = do
  GossipSubRouter -> IO ()
meshMaintenance GossipSubRouter
router
  GossipSubRouter -> IO ()
fanoutMaintenance GossipSubRouter
router
  GossipSubRouter -> IO ()
emitGossip GossipSubRouter
router
  GossipSubRouter -> IO ()
decayAllScores GossipSubRouter
router
  GossipSubRouter -> IO ()
cleanSeenCache GossipSubRouter
router
  -- Increment heartbeat counter
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar Int
gsHeartbeatCount GossipSubRouter
router) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

-- | Start the heartbeat background thread.
runHeartbeat :: GossipSubRouter -> IO (Async ())
runHeartbeat :: GossipSubRouter -> IO (Async ())
runHeartbeat GossipSubRouter
router = IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ GossipSubRouter -> IO ()
heartbeatLoop GossipSubRouter
router

heartbeatLoop :: GossipSubRouter -> IO ()
heartbeatLoop :: GossipSubRouter -> IO ()
heartbeatLoop GossipSubRouter
router = do
  let intervalUs :: Int
intervalUs = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (GossipSubParams -> NominalDiffTime
paramHeartbeatInterval (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router) NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1000000) :: Int
  Int -> IO ()
threadDelay Int
intervalUs
  GossipSubRouter -> IO ()
heartbeatOnce GossipSubRouter
router
  GossipSubRouter -> IO ()
heartbeatLoop GossipSubRouter
router

-- Mesh maintenance

meshMaintenance :: GossipSubRouter -> IO ()
meshMaintenance :: GossipSubRouter -> IO ()
meshMaintenance GossipSubRouter
router = do
  now <- GossipSubRouter -> IO UTCTime
gsGetTime GossipSubRouter
router
  meshMap <- readTVarIO (gsMesh router)
  forM_ (Map.toList meshMap) $ \(Topic
topic, Set PeerId
meshPeers) -> do
    -- Step 1: Remove negative-score peers
    remaining <- GossipSubRouter
-> Topic -> Set PeerId -> UTCTime -> IO (Set PeerId)
pruneNegativeScore GossipSubRouter
router Topic
topic Set PeerId
meshPeers UTCTime
now
    -- Step 2: Fill if undersubscribed (< D_lo)
    filled <- fillUndersubscribed router topic remaining now
    -- Step 3: Trim if oversubscribed (> D_hi)
    trimOversubscribed router topic filled

-- | Remove peers with negative score from mesh, send PRUNE.
pruneNegativeScore :: GossipSubRouter -> Topic -> Set.Set PeerId -> UTCTime -> IO (Set.Set PeerId)
pruneNegativeScore :: GossipSubRouter
-> Topic -> Set PeerId -> UTCTime -> IO (Set PeerId)
pruneNegativeScore GossipSubRouter
router Topic
topic Set PeerId
meshPeers UTCTime
now = do
  let scoreParams :: PeerScoreParams
scoreParams = GossipSubRouter -> PeerScoreParams
gsScoreParams GossipSubRouter
router
  ipMap <- TVar (Map ByteString (Set PeerId))
-> IO (Map ByteString (Set PeerId))
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map ByteString (Set PeerId))
gsIPPeerCount GossipSubRouter
router)
  peers <- readTVarIO (gsPeers router)
  let negatives = (PeerId -> Bool) -> Set PeerId -> Set PeerId
forall a. (a -> Bool) -> Set a -> Set a
Set.filter (\PeerId
pid ->
        case PeerId -> Map PeerId PeerState -> Maybe PeerState
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup PeerId
pid Map PeerId PeerState
peers of
          Maybe PeerState
Nothing -> Bool
False
          Just PeerState
ps -> PeerScoreParams
-> PeerState -> Map ByteString (Set PeerId) -> UTCTime -> Double
computeScore PeerScoreParams
scoreParams PeerState
ps Map ByteString (Set PeerId)
ipMap UTCTime
now Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
< Double
0
        ) Set PeerId
meshPeers
  -- Send PRUNE to negative-score peers
  forM_ (Set.toList negatives) $ \PeerId
pid -> do
    let backoffSecs :: Word64
backoffSecs = NominalDiffTime -> Word64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (GossipSubParams -> NominalDiffTime
paramPruneBackoff (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)) :: Word64
    GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
emptyRPC
      { rpcControl = Just emptyControlMessage
          { ctrlPrune = [Prune topic [] (Just backoffSecs)] }
      }
    -- Start backoff
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map (PeerId, Topic) UTCTime)
-> (Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map (PeerId, Topic) UTCTime)
gsBackoff GossipSubRouter
router) ((Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
 -> STM ())
-> (Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ()
forall a b. (a -> b) -> a -> b
$
      (PeerId, Topic)
-> UTCTime
-> Map (PeerId, Topic) UTCTime
-> Map (PeerId, Topic) UTCTime
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (PeerId
pid, Topic
topic) (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (GossipSubParams -> NominalDiffTime
paramPruneBackoff (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)) UTCTime
now)
  -- Update mesh
  let remaining = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.difference Set PeerId
meshPeers Set PeerId
negatives
  atomically $ modifyTVar' (gsMesh router) $
    Map.insert topic remaining
  pure remaining

-- | Fill mesh if below D_lo with eligible peers (non-negative score, no backoff).
fillUndersubscribed :: GossipSubRouter -> Topic -> Set.Set PeerId -> UTCTime -> IO (Set.Set PeerId)
fillUndersubscribed :: GossipSubRouter
-> Topic -> Set PeerId -> UTCTime -> IO (Set PeerId)
fillUndersubscribed GossipSubRouter
router Topic
topic Set PeerId
meshPeers UTCTime
now = do
  let params :: GossipSubParams
params = GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router
      dlo :: Int
dlo = GossipSubParams -> Int
paramDlo GossipSubParams
params
      d :: Int
d   = GossipSubParams -> Int
paramD GossipSubParams
params
  if Set PeerId -> Int
forall a. Set a -> Int
Set.size Set PeerId
meshPeers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
dlo
    then Set PeerId -> IO (Set PeerId)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Set PeerId
meshPeers
    else do
      -- Find eligible peers: subscribed to topic, not in mesh, not in backoff, score >= 0
      peersMap <- TVar (Map PeerId PeerState) -> IO (Map PeerId PeerState)
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router)
      backoffMap <- readTVarIO (gsBackoff router)
      ipMap <- readTVarIO (gsIPPeerCount router)
      let eligible = [ PeerId
pid | (PeerId
pid, PeerState
ps) <- Map PeerId PeerState -> [(PeerId, PeerState)]
forall k a. Map k a -> [(k, a)]
Map.toList Map PeerId PeerState
peersMap
                           , Topic -> Set Topic -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member Topic
topic (PeerState -> Set Topic
psTopics PeerState
ps)
                           , Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member PeerId
pid Set PeerId
meshPeers)
                           , Bool -> Bool
not (Map (PeerId, Topic) UTCTime -> PeerId -> Topic -> UTCTime -> Bool
isInBackoff Map (PeerId, Topic) UTCTime
backoffMap PeerId
pid Topic
topic UTCTime
now)
                           , PeerScoreParams
-> PeerState -> Map ByteString (Set PeerId) -> UTCTime -> Double
computeScore (GossipSubRouter -> PeerScoreParams
gsScoreParams GossipSubRouter
router) PeerState
ps Map ByteString (Set PeerId)
ipMap UTCTime
now Double -> Double -> Bool
forall a. Ord a => a -> a -> Bool
>= Double
0
                           ]
      let needed = Int
d Int -> Int -> Int
forall a. Num a => a -> a -> a
- Set PeerId -> Int
forall a. Set a -> Int
Set.size Set PeerId
meshPeers
      selected <- sampleIO (min needed (length eligible)) eligible
      -- Send GRAFT and add to mesh
      forM_ selected $ \PeerId
pid ->
        GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
emptyRPC
          { rpcControl = Just emptyControlMessage { ctrlGraft = [Graft topic] } }
      let newMesh = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
meshPeers ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList [PeerId]
selected)
      atomically $ modifyTVar' (gsMesh router) $
        Map.insert topic newMesh
      pure newMesh

-- | Trim mesh if above D_hi by randomly removing excess peers, send PRUNE.
trimOversubscribed :: GossipSubRouter -> Topic -> Set.Set PeerId -> IO ()
trimOversubscribed :: GossipSubRouter -> Topic -> Set PeerId -> IO ()
trimOversubscribed GossipSubRouter
router Topic
topic Set PeerId
meshPeers = do
  let params :: GossipSubParams
params = GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router
      dhi :: Int
dhi = GossipSubParams -> Int
paramDhi GossipSubParams
params
      d :: Int
d   = GossipSubParams -> Int
paramD GossipSubParams
params
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set PeerId -> Int
forall a. Set a -> Int
Set.size Set PeerId
meshPeers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
dhi) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    now <- GossipSubRouter -> IO UTCTime
gsGetTime GossipSubRouter
router
    -- Keep D peers: select D random peers to keep
    let meshList = Set PeerId -> [PeerId]
forall a. Set a -> [a]
Set.toList Set PeerId
meshPeers
    kept <- sampleIO d meshList
    let keptSet = [PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList [PeerId]
kept
        toRemove = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.difference Set PeerId
meshPeers Set PeerId
keptSet
    -- Send PRUNE to removed peers
    forM_ (Set.toList toRemove) $ \PeerId
pid -> do
      let backoffSecs :: Word64
backoffSecs = NominalDiffTime -> Word64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (GossipSubParams -> NominalDiffTime
paramPruneBackoff GossipSubParams
params) :: Word64
      GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
emptyRPC
        { rpcControl = Just emptyControlMessage
            { ctrlPrune = [Prune topic [] (Just backoffSecs)] }
        }
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map (PeerId, Topic) UTCTime)
-> (Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map (PeerId, Topic) UTCTime)
gsBackoff GossipSubRouter
router) ((Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
 -> STM ())
-> (Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ()
forall a b. (a -> b) -> a -> b
$
        (PeerId, Topic)
-> UTCTime
-> Map (PeerId, Topic) UTCTime
-> Map (PeerId, Topic) UTCTime
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (PeerId
pid, Topic
topic) (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (GossipSubParams -> NominalDiffTime
paramPruneBackoff GossipSubParams
params) UTCTime
now)
    -- Update mesh
    atomically $ modifyTVar' (gsMesh router) $
      Map.insert topic keptSet

-- Fanout maintenance

fanoutMaintenance :: GossipSubRouter -> IO ()
fanoutMaintenance :: GossipSubRouter -> IO ()
fanoutMaintenance GossipSubRouter
router = do
  now <- GossipSubRouter -> IO UTCTime
gsGetTime GossipSubRouter
router
  let ttl = GossipSubParams -> NominalDiffTime
paramFanoutTTL (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)
  fanoutMap <- readTVarIO (gsFanout router)
  fanoutPubMap <- readTVarIO (gsFanoutPub router)
  forM_ (Map.toList fanoutMap) $ \(Topic
topic, Set PeerId
fanoutPeers) -> do
    let lastPub :: UTCTime
lastPub = UTCTime -> Topic -> Map Topic UTCTime -> UTCTime
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault UTCTime
now Topic
topic Map Topic UTCTime
fanoutPubMap
    if UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
now UTCTime
lastPub NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
> NominalDiffTime
ttl
      then -- Expire fanout entry
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar (Map Topic (Set PeerId))
-> (Map Topic (Set PeerId) -> Map Topic (Set PeerId)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map Topic (Set PeerId))
gsFanout GossipSubRouter
router) (Topic -> Map Topic (Set PeerId) -> Map Topic (Set PeerId)
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Topic
topic)
          TVar (Map Topic UTCTime)
-> (Map Topic UTCTime -> Map Topic UTCTime) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map Topic UTCTime)
gsFanoutPub GossipSubRouter
router) (Topic -> Map Topic UTCTime -> Map Topic UTCTime
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete Topic
topic)
      else do
        -- Fill if below D
        let d :: Int
d = GossipSubParams -> Int
paramD (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Set PeerId -> Int
forall a. Set a -> Int
Set.size Set PeerId
fanoutPeers Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
d) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          peersMap <- TVar (Map PeerId PeerState) -> IO (Map PeerId PeerState)
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router)
          let eligible = [ PeerId
pid | (PeerId
pid, PeerState
ps) <- Map PeerId PeerState -> [(PeerId, PeerState)]
forall k a. Map k a -> [(k, a)]
Map.toList Map PeerId PeerState
peersMap
                               , Topic -> Set Topic -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member Topic
topic (PeerState -> Set Topic
psTopics PeerState
ps)
                               , Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member PeerId
pid Set PeerId
fanoutPeers)
                               ]
          let needed = Int
d Int -> Int -> Int
forall a. Num a => a -> a -> a
- Set PeerId -> Int
forall a. Set a -> Int
Set.size Set PeerId
fanoutPeers
          selected <- sampleIO (min needed (length eligible)) eligible
          let newFanout = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
fanoutPeers ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList [PeerId]
selected)
          atomically $ modifyTVar' (gsFanout router) $
            Map.insert topic newFanout

-- Gossip emission

emitGossip :: GossipSubRouter -> IO ()
emitGossip :: GossipSubRouter -> IO ()
emitGossip GossipSubRouter
router = do
  meshMap <- TVar (Map Topic (Set PeerId)) -> IO (Map Topic (Set PeerId))
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map Topic (Set PeerId))
gsMesh GossipSubRouter
router)
  cache <- readTVarIO (gsMessageCache router)
  peersMap <- readTVarIO (gsPeers router)
  let params = GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router

  -- For each topic in mesh, send IHAVE to non-mesh peers
  forM_ (Map.keys meshMap) $ \Topic
topic -> do
    let gossipIds :: [ByteString]
gossipIds = Topic -> MessageCache -> [ByteString]
cacheGetGossipIds Topic
topic MessageCache
cache
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([ByteString] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ByteString]
gossipIds) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      let meshPeers :: Set PeerId
meshPeers = Set PeerId -> Topic -> Map Topic (Set PeerId) -> Set PeerId
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault Set PeerId
forall a. Set a
Set.empty Topic
topic Map Topic (Set PeerId)
meshMap
          -- Eligible: subscribed to topic, not in mesh
          nonMeshPeers :: [PeerId]
nonMeshPeers = [ PeerId
pid | (PeerId
pid, PeerState
ps) <- Map PeerId PeerState -> [(PeerId, PeerState)]
forall k a. Map k a -> [(k, a)]
Map.toList Map PeerId PeerState
peersMap
                               , Topic -> Set Topic -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member Topic
topic (PeerState -> Set Topic
psTopics PeerState
ps)
                               , Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member PeerId
pid Set PeerId
meshPeers)
                               ]
          -- Select max(D_lazy, |eligible| * gossipFactor) targets
          dlazy :: Int
dlazy = GossipSubParams -> Int
paramDlazy GossipSubParams
params
          factor :: Double
factor = GossipSubParams -> Double
paramGossipFactor GossipSubParams
params
          targetCount :: Int
targetCount = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
dlazy (Double -> Int
forall b. Integral b => Double -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (Double
factor Double -> Double -> Double
forall a. Num a => a -> a -> a
* Int -> Double
forall a b. (Integral a, Num b) => a -> b
fromIntegral ([PeerId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [PeerId]
nonMeshPeers)))
      targets <- Int -> [PeerId] -> IO [PeerId]
forall (m :: * -> *) a. MonadIO m => Int -> [a] -> m [a]
sampleIO (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
targetCount ([PeerId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [PeerId]
nonMeshPeers)) [PeerId]
nonMeshPeers
      forM_ targets $ \PeerId
pid ->
        GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
emptyRPC
          { rpcControl = Just emptyControlMessage
              { ctrlIHave = [IHave topic gossipIds] }
          }

  -- Rotate cache
  atomically $ modifyTVar' (gsMessageCache router) cacheShift

-- Score decay

decayAllScores :: GossipSubRouter -> IO ()
decayAllScores :: GossipSubRouter -> IO ()
decayAllScores GossipSubRouter
router = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
  TVar (Map PeerId PeerState)
-> (Map PeerId PeerState -> Map PeerId PeerState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router) ((Map PeerId PeerState -> Map PeerId PeerState) -> STM ())
-> (Map PeerId PeerState -> Map PeerId PeerState) -> STM ()
forall a b. (a -> b) -> a -> b
$
    (PeerState -> PeerState)
-> Map PeerId PeerState -> Map PeerId PeerState
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (PeerScoreParams -> PeerState -> PeerState
decayPeerCounters (GossipSubRouter -> PeerScoreParams
gsScoreParams GossipSubRouter
router))

-- Seen cache cleanup

cleanSeenCache :: GossipSubRouter -> IO ()
cleanSeenCache :: GossipSubRouter -> IO ()
cleanSeenCache GossipSubRouter
router = do
  now <- GossipSubRouter -> IO UTCTime
gsGetTime GossipSubRouter
router
  let ttl = GossipSubParams -> NominalDiffTime
paramSeenTTL (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)
  atomically $ modifyTVar' (gsSeen router) $
    Map.filter (\UTCTime
ts -> UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
now UTCTime
ts NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
<= NominalDiffTime
ttl)

-- Helpers

isInBackoff :: Map.Map (PeerId, Topic) UTCTime -> PeerId -> Topic -> UTCTime -> Bool
isInBackoff :: Map (PeerId, Topic) UTCTime -> PeerId -> Topic -> UTCTime -> Bool
isInBackoff Map (PeerId, Topic) UTCTime
backoffMap PeerId
pid Topic
topic UTCTime
now =
  case (PeerId, Topic) -> Map (PeerId, Topic) UTCTime -> Maybe UTCTime
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (PeerId
pid, Topic
topic) Map (PeerId, Topic) UTCTime
backoffMap of
    Maybe UTCTime
Nothing -> Bool
False
    Just UTCTime
expires -> UTCTime
now UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
expires