module Network.LibP2P.Protocol.GossipSub.Router
(
newRouter
, addPeer
, removePeer
, join
, leave
, publish
, handleRPC
, handleGraft
, handlePrune
, handleIHave
, handleIWant
, handleSubscriptions
, forwardMessage
, peerScore
) where
import Prelude
import Control.Monad (unless)
import Control.Concurrent.STM
import Data.ByteString (ByteString)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Time (UTCTime, addUTCTime)
import Data.Word (Word64)
import Crypto.Random (getRandomBytes)
import List.Shuffle (sampleIO)
import Network.LibP2P.Crypto.PeerId (PeerId, peerIdBytes)
import Network.LibP2P.Crypto.Key (KeyPair (..), sign)
import Network.LibP2P.Crypto.Protobuf (encodePublicKey)
import Network.LibP2P.Protocol.GossipSub.Types
import Network.LibP2P.Protocol.GossipSub.Message (encodePubSubMessageBS)
import Network.LibP2P.Protocol.GossipSub.MessageCache (newMessageCache, cachePut, cacheGet)
import Network.LibP2P.Protocol.GossipSub.Score (computeScore, addP7Penalty, recordMeshFailure)
newRouter :: GossipSubParams
-> PeerId
-> (PeerId -> RPC -> IO ())
-> IO UTCTime
-> IO GossipSubRouter
newRouter :: GossipSubParams
-> PeerId
-> (PeerId -> RPC -> IO ())
-> IO UTCTime
-> IO GossipSubRouter
newRouter GossipSubParams
params PeerId
localPid PeerId -> RPC -> IO ()
sendRPC IO UTCTime
getTime = do
mesh <- Map Topic (Set PeerId) -> IO (TVar (Map Topic (Set PeerId)))
forall a. a -> IO (TVar a)
newTVarIO Map Topic (Set PeerId)
forall k a. Map k a
Map.empty
fanout <- newTVarIO Map.empty
fanoutPub <- newTVarIO Map.empty
peers <- newTVarIO Map.empty
seen <- newTVarIO Map.empty
backoff <- newTVarIO Map.empty
ipCount <- newTVarIO Map.empty
mcache <- newTVarIO (newMessageCache (paramMcacheLen params) (paramMcacheGossip params))
hbCount <- newTVarIO 0
onMsg <- newTVarIO (\Topic
_ PubSubMessage
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
pure GossipSubRouter
{ gsParams = params
, gsLocalPeerId = localPid
, gsMesh = mesh
, gsFanout = fanout
, gsFanoutPub = fanoutPub
, gsPeers = peers
, gsSeen = seen
, gsBackoff = backoff
, gsScoreParams = defaultPeerScoreParams
, gsThresholds = defaultScoreThresholds
, gsIPPeerCount = ipCount
, gsMessageCache = mcache
, gsHeartbeatCount = hbCount
, gsSendRPC = sendRPC
, gsGetTime = getTime
, gsOnMessage = onMsg
}
addPeer :: GossipSubRouter -> PeerId -> PeerProtocol -> Bool -> UTCTime -> IO ()
addPeer :: GossipSubRouter
-> PeerId -> PeerProtocol -> Bool -> UTCTime -> IO ()
addPeer GossipSubRouter
router PeerId
pid PeerProtocol
proto Bool
isOutbound UTCTime
now = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar (Map PeerId PeerState)
-> (Map PeerId PeerState -> Map PeerId PeerState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router) ((Map PeerId PeerState -> Map PeerId PeerState) -> STM ())
-> (Map PeerId PeerState -> Map PeerId PeerState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \Map PeerId PeerState
m ->
case PeerId -> Map PeerId PeerState -> Maybe PeerState
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup PeerId
pid Map PeerId PeerState
m of
Just PeerState
_existing -> Map PeerId PeerState
m
Maybe PeerState
Nothing -> PeerId -> PeerState -> Map PeerId PeerState -> Map PeerId PeerState
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert PeerId
pid PeerState
{ psProtocol :: PeerProtocol
psProtocol = PeerProtocol
proto
, psTopics :: Set Topic
psTopics = Set Topic
forall a. Set a
Set.empty
, psIsOutbound :: Bool
psIsOutbound = Bool
isOutbound
, psConnectedAt :: UTCTime
psConnectedAt = UTCTime
now
, psTopicState :: Map Topic TopicPeerState
psTopicState = Map Topic TopicPeerState
forall k a. Map k a
Map.empty
, psBehaviorPenalty :: Double
psBehaviorPenalty = Double
0
, psIPAddress :: Maybe ByteString
psIPAddress = Maybe ByteString
forall a. Maybe a
Nothing
, psCachedScore :: Double
psCachedScore = Double
0
} Map PeerId PeerState
m
removePeer :: GossipSubRouter -> PeerId -> IO ()
removePeer :: GossipSubRouter -> PeerId -> IO ()
removePeer GossipSubRouter
router PeerId
pid = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar (Map PeerId PeerState)
-> (Map PeerId PeerState -> Map PeerId PeerState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router) (PeerId -> Map PeerId PeerState -> Map PeerId PeerState
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete PeerId
pid)
TVar (Map Topic (Set PeerId))
-> (Map Topic (Set PeerId) -> Map Topic (Set PeerId)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map Topic (Set PeerId))
gsMesh GossipSubRouter
router) ((Set PeerId -> Set PeerId)
-> Map Topic (Set PeerId) -> Map Topic (Set PeerId)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => a -> Set a -> Set a
Set.delete PeerId
pid))
TVar (Map Topic (Set PeerId))
-> (Map Topic (Set PeerId) -> Map Topic (Set PeerId)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map Topic (Set PeerId))
gsFanout GossipSubRouter
router) ((Set PeerId -> Set PeerId)
-> Map Topic (Set PeerId) -> Map Topic (Set PeerId)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => a -> Set a -> Set a
Set.delete PeerId
pid))
join :: GossipSubRouter -> Topic -> IO ()
join :: GossipSubRouter -> Topic -> IO ()
join GossipSubRouter
router Topic
topic = do
peers <- TVar (Map PeerId PeerState) -> IO (Map PeerId PeerState)
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router)
let allPeerIds = Map PeerId PeerState -> [PeerId]
forall k a. Map k a -> [k]
Map.keys Map PeerId PeerState
peers
subRPC = RPC
emptyRPC { rpcSubscriptions = [SubOpts True topic] }
mapM_ (\PeerId
pid -> GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
subRPC) allPeerIds
(fanoutPeers, topicPeers) <- atomically $ do
fo <- readTVar (gsFanout router)
let foPeers = Set PeerId -> Topic -> Map Topic (Set PeerId) -> Set PeerId
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault Set PeerId
forall a. Set a
Set.empty Topic
topic Map Topic (Set PeerId)
fo
unless (Set.null foPeers) $ do
modifyTVar' (gsMesh router) (Map.insert topic foPeers)
modifyTVar' (gsFanout router) (Map.delete topic)
modifyTVar' (gsFanoutPub router) (Map.delete topic)
meshNow <- readTVar (gsMesh router)
let currentMesh = Set PeerId -> Topic -> Map Topic (Set PeerId) -> Set PeerId
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault Set PeerId
forall a. Set a
Set.empty Topic
topic Map Topic (Set PeerId)
meshNow
peerMap <- readTVar (gsPeers router)
let eligible = (Set PeerId -> PeerId -> PeerState -> Set PeerId)
-> Set PeerId -> Map PeerId PeerState -> Set PeerId
forall a k b. (a -> k -> b -> a) -> a -> Map k b -> a
Map.foldlWithKey' (\Set PeerId
acc PeerId
pid PeerState
ps ->
if Topic -> Set Topic -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member Topic
topic (PeerState -> Set Topic
psTopics PeerState
ps)
Bool -> Bool -> Bool
&& Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member PeerId
pid Set PeerId
currentMesh)
Bool -> Bool -> Bool
&& PeerId
pid PeerId -> PeerId -> Bool
forall a. Eq a => a -> a -> Bool
/= GossipSubRouter -> PeerId
gsLocalPeerId GossipSubRouter
router
then PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => a -> Set a -> Set a
Set.insert PeerId
pid Set PeerId
acc
else Set PeerId
acc) Set PeerId
forall a. Set a
Set.empty Map PeerId PeerState
peerMap
pure (foPeers, eligible)
currentMesh <- atomically $ do
m <- readTVar (gsMesh router)
pure (Map.findWithDefault Set.empty topic m)
let needed = GossipSubParams -> Int
paramD (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router) Int -> Int -> Int
forall a. Num a => a -> a -> a
- Set PeerId -> Int
forall a. Set a -> Int
Set.size Set PeerId
currentMesh
newPeers <- if needed > 0 && not (Set.null topicPeers)
then do
selected <- sampleIO (min needed (Set.size topicPeers)) (Set.toList topicPeers)
let newSet = [PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList [PeerId]
selected
atomically $ modifyTVar' (gsMesh router) $
Map.insertWith Set.union topic newSet
pure newSet
else pure Set.empty
let allNewMeshPeers = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union (Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.difference Set PeerId
fanoutPeers Set PeerId
currentMesh) Set PeerId
newPeers
mapM_ (\PeerId
pid -> GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid (Topic -> RPC
graftRPC Topic
topic)) (Set.toList allNewMeshPeers)
leave :: GossipSubRouter -> Topic -> IO ()
leave :: GossipSubRouter -> Topic -> IO ()
leave GossipSubRouter
router Topic
topic = do
peers <- TVar (Map PeerId PeerState) -> IO (Map PeerId PeerState)
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router)
let allPeerIds = Map PeerId PeerState -> [PeerId]
forall k a. Map k a -> [k]
Map.keys Map PeerId PeerState
peers
unsubRPC = RPC
emptyRPC { rpcSubscriptions = [SubOpts False topic] }
mapM_ (\PeerId
pid -> GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
unsubRPC) allPeerIds
meshPeers <- atomically $ do
m <- readTVar (gsMesh router)
let mp = Set PeerId -> Topic -> Map Topic (Set PeerId) -> Set PeerId
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault Set PeerId
forall a. Set a
Set.empty Topic
topic Map Topic (Set PeerId)
m
modifyTVar' (gsMesh router) (Map.delete topic)
pure mp
let backoffSecs = NominalDiffTime -> Word64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (GossipSubParams -> NominalDiffTime
paramUnsubBackoff (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)) :: Word64
mapM_ (\PeerId
pid -> GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid (Topic -> [PeerExchangeInfo] -> Maybe Word64 -> RPC
pruneRPC Topic
topic [] (Word64 -> Maybe Word64
forall a. a -> Maybe a
Just Word64
backoffSecs))) (Set.toList meshPeers)
publish :: GossipSubRouter -> Topic -> ByteString -> Maybe KeyPair -> IO ()
publish :: GossipSubRouter -> Topic -> ByteString -> Maybe KeyPair -> IO ()
publish GossipSubRouter
router Topic
topic ByteString
payload Maybe KeyPair
mKeyPair = do
now <- GossipSubRouter -> IO UTCTime
gsGetTime GossipSubRouter
router
msg <- case paramSignaturePolicy (gsParams router) of
SignaturePolicy
StrictSign -> case Maybe KeyPair
mKeyPair of
Maybe KeyPair
Nothing -> PubSubMessage -> IO PubSubMessage
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PubSubMessage -> IO PubSubMessage)
-> PubSubMessage -> IO PubSubMessage
forall a b. (a -> b) -> a -> b
$ Topic -> ByteString -> PubSubMessage
mkUnsignedMessage Topic
topic ByteString
payload
Just KeyPair
kp -> GossipSubRouter
-> Topic -> ByteString -> KeyPair -> IO PubSubMessage
mkSignedMessage GossipSubRouter
router Topic
topic ByteString
payload KeyPair
kp
SignaturePolicy
StrictNoSign -> PubSubMessage -> IO PubSubMessage
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (PubSubMessage -> IO PubSubMessage)
-> PubSubMessage -> IO PubSubMessage
forall a b. (a -> b) -> a -> b
$ Topic -> ByteString -> PubSubMessage
mkUnsignedMessage Topic
topic ByteString
payload
let msgId = GossipSubParams -> PubSubMessage -> ByteString
paramMessageIdFn (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router) PubSubMessage
msg
atomically $ modifyTVar' (gsSeen router) (Map.insert msgId now)
let pubRPC = RPC
emptyRPC { rpcPublish = [msg] }
if paramFloodPublish (gsParams router)
then do
peers <- readTVarIO (gsPeers router)
let targets = ([PeerId] -> PeerId -> PeerState -> [PeerId])
-> [PeerId] -> Map PeerId PeerState -> [PeerId]
forall a k b. (a -> k -> b -> a) -> a -> Map k b -> a
Map.foldlWithKey' (\[PeerId]
acc PeerId
pid PeerState
ps ->
if Topic -> Set Topic -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member Topic
topic (PeerState -> Set Topic
psTopics PeerState
ps) Bool -> Bool -> Bool
&& PeerId
pid PeerId -> PeerId -> Bool
forall a. Eq a => a -> a -> Bool
/= GossipSubRouter -> PeerId
gsLocalPeerId GossipSubRouter
router
then PeerId
pid PeerId -> [PeerId] -> [PeerId]
forall a. a -> [a] -> [a]
: [PeerId]
acc
else [PeerId]
acc) [] Map PeerId PeerState
peers
mapM_ (\PeerId
pid -> GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
pubRPC) targets
else do
meshPeers <- atomically $ do
m <- readTVar (gsMesh router)
pure (Map.findWithDefault Set.empty topic m)
if not (Set.null meshPeers)
then mapM_ (\PeerId
pid -> GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
pubRPC) (Set.toList meshPeers)
else do
foPeers <- atomically $ do
fo <- readTVar (gsFanout router)
pure (Map.findWithDefault Set.empty topic fo)
targets <- if Set.null foPeers
then do
peers <- readTVarIO (gsPeers router)
let eligible = ([PeerId] -> PeerId -> PeerState -> [PeerId])
-> [PeerId] -> Map PeerId PeerState -> [PeerId]
forall a k b. (a -> k -> b -> a) -> a -> Map k b -> a
Map.foldlWithKey' (\[PeerId]
acc PeerId
pid PeerState
ps ->
if Topic -> Set Topic -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member Topic
topic (PeerState -> Set Topic
psTopics PeerState
ps) Bool -> Bool -> Bool
&& PeerId
pid PeerId -> PeerId -> Bool
forall a. Eq a => a -> a -> Bool
/= GossipSubRouter -> PeerId
gsLocalPeerId GossipSubRouter
router
then PeerId
pid PeerId -> [PeerId] -> [PeerId]
forall a. a -> [a] -> [a]
: [PeerId]
acc
else [PeerId]
acc) [] Map PeerId PeerState
peers
selected <- sampleIO (min (paramD (gsParams router)) (length eligible)) eligible
let selectedSet = [PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList [PeerId]
selected
atomically $ do
modifyTVar' (gsFanout router) (Map.insert topic selectedSet)
modifyTVar' (gsFanoutPub router) (Map.insert topic now)
pure selectedSet
else do
atomically $ modifyTVar' (gsFanoutPub router) (Map.insert topic now)
pure foPeers
mapM_ (\PeerId
pid -> GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
pubRPC) (Set.toList targets)
onMsg <- readTVarIO (gsOnMessage router)
onMsg topic msg
handleRPC :: GossipSubRouter -> PeerId -> RPC -> IO ()
handleRPC :: GossipSubRouter -> PeerId -> RPC -> IO ()
handleRPC GossipSubRouter
router PeerId
sender RPC
rpc = do
GossipSubRouter -> PeerId -> [SubOpts] -> IO ()
handleSubscriptions GossipSubRouter
router PeerId
sender (RPC -> [SubOpts]
rpcSubscriptions RPC
rpc)
(PubSubMessage -> IO ()) -> [PubSubMessage] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (GossipSubRouter -> PeerId -> PubSubMessage -> IO ()
handlePublishedMessage GossipSubRouter
router PeerId
sender) (RPC -> [PubSubMessage]
rpcPublish RPC
rpc)
case RPC -> Maybe ControlMessage
rpcControl RPC
rpc of
Maybe ControlMessage
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just ControlMessage
ctrl -> do
GossipSubRouter -> PeerId -> [IHave] -> IO ()
handleIHave GossipSubRouter
router PeerId
sender (ControlMessage -> [IHave]
ctrlIHave ControlMessage
ctrl)
GossipSubRouter -> PeerId -> [IWant] -> IO ()
handleIWant GossipSubRouter
router PeerId
sender (ControlMessage -> [IWant]
ctrlIWant ControlMessage
ctrl)
GossipSubRouter -> PeerId -> [Graft] -> IO ()
handleGraft GossipSubRouter
router PeerId
sender (ControlMessage -> [Graft]
ctrlGraft ControlMessage
ctrl)
GossipSubRouter -> PeerId -> [Prune] -> IO ()
handlePrune GossipSubRouter
router PeerId
sender (ControlMessage -> [Prune]
ctrlPrune ControlMessage
ctrl)
handlePublishedMessage :: GossipSubRouter -> PeerId -> PubSubMessage -> IO ()
handlePublishedMessage :: GossipSubRouter -> PeerId -> PubSubMessage -> IO ()
handlePublishedMessage GossipSubRouter
router PeerId
sender PubSubMessage
msg = do
let msgId :: ByteString
msgId = GossipSubParams -> PubSubMessage -> ByteString
paramMessageIdFn (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router) PubSubMessage
msg
now <- GossipSubRouter -> IO UTCTime
gsGetTime GossipSubRouter
router
alreadySeen <- atomically $ do
s <- readTVar (gsSeen router)
if Map.member msgId s
then pure True
else do
writeTVar (gsSeen router) (Map.insert msgId now s)
pure False
if alreadySeen
then pure ()
else do
atomically $ modifyTVar' (gsMessageCache router) $
cachePut msgId msg
forwardMessage router sender msg
onMsg <- readTVarIO (gsOnMessage router)
onMsg (msgTopic msg) msg
handleGraft :: GossipSubRouter -> PeerId -> [Graft] -> IO ()
handleGraft :: GossipSubRouter -> PeerId -> [Graft] -> IO ()
handleGraft GossipSubRouter
router PeerId
sender [Graft]
grafts = do
now <- GossipSubRouter -> IO UTCTime
gsGetTime GossipSubRouter
router
pruneResponses <- mapM (handleOneGraft router sender now) grafts
let prunes = [[Prune]] -> [Prune]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[Prune]]
pruneResponses
unless (null prunes) $
gsSendRPC router sender emptyRPC
{ rpcControl = Just emptyControlMessage { ctrlPrune = prunes } }
handleOneGraft :: GossipSubRouter -> PeerId -> UTCTime -> Graft -> IO [Prune]
handleOneGraft :: GossipSubRouter -> PeerId -> UTCTime -> Graft -> IO [Prune]
handleOneGraft GossipSubRouter
router PeerId
sender UTCTime
now (Graft Topic
topic) = do
meshMap <- TVar (Map Topic (Set PeerId)) -> IO (Map Topic (Set PeerId))
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map Topic (Set PeerId))
gsMesh GossipSubRouter
router)
let subscribed = Topic -> Map Topic (Set PeerId) -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member Topic
topic Map Topic (Set PeerId)
meshMap
if not subscribed
then pure [Prune topic [] Nothing]
else do
backoffMap <- readTVarIO (gsBackoff router)
let inBackoff = case (PeerId, Topic) -> Map (PeerId, Topic) UTCTime -> Maybe UTCTime
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup (PeerId
sender, Topic
topic) Map (PeerId, Topic) UTCTime
backoffMap of
Maybe UTCTime
Nothing -> Bool
False
Just UTCTime
expires -> UTCTime
now UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
< UTCTime
expires
score <- peerScore router sender
if inBackoff
then do
atomically $ modifyTVar' (gsPeers router) $
Map.adjust addP7Penalty sender
let backoffSecs = NominalDiffTime -> Word64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (GossipSubParams -> NominalDiffTime
paramPruneBackoff (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)) :: Word64
pure [Prune topic [] (Just backoffSecs)]
else if score < 0
then pure [Prune topic [] Nothing]
else do
atomically $ modifyTVar' (gsMesh router) $
Map.insertWith Set.union topic (Set.singleton sender)
pure []
handlePrune :: GossipSubRouter -> PeerId -> [Prune] -> IO ()
handlePrune :: GossipSubRouter -> PeerId -> [Prune] -> IO ()
handlePrune GossipSubRouter
router PeerId
sender [Prune]
prunes = do
now <- GossipSubRouter -> IO UTCTime
gsGetTime GossipSubRouter
router
mapM_ (handleOnePrune router sender now) prunes
handleOnePrune :: GossipSubRouter -> PeerId -> UTCTime -> Prune -> IO ()
handleOnePrune :: GossipSubRouter -> PeerId -> UTCTime -> Prune -> IO ()
handleOnePrune GossipSubRouter
router PeerId
sender UTCTime
now Prune
prune = do
let topic :: Topic
topic = Prune -> Topic
pruneTopic Prune
prune
let scoreParams :: PeerScoreParams
scoreParams = GossipSubRouter -> PeerScoreParams
gsScoreParams GossipSubRouter
router
case Topic -> Map Topic TopicScoreParams -> Maybe TopicScoreParams
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Topic
topic (PeerScoreParams -> Map Topic TopicScoreParams
pspTopicParams PeerScoreParams
scoreParams) of
Just TopicScoreParams
tsp -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map PeerId PeerState)
-> (Map PeerId PeerState -> Map PeerId PeerState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router) ((Map PeerId PeerState -> Map PeerId PeerState) -> STM ())
-> (Map PeerId PeerState -> Map PeerId PeerState) -> STM ()
forall a b. (a -> b) -> a -> b
$
(PeerState -> PeerState)
-> PeerId -> Map PeerId PeerState -> Map PeerId PeerState
forall k a. Ord k => (a -> a) -> k -> Map k a -> Map k a
Map.adjust (\PeerState
ps ->
let topicSt :: TopicPeerState
topicSt = TopicPeerState
-> Topic -> Map Topic TopicPeerState -> TopicPeerState
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault TopicPeerState
defaultTopicPeerState Topic
topic (PeerState -> Map Topic TopicPeerState
psTopicState PeerState
ps)
topicSt' :: TopicPeerState
topicSt' = TopicScoreParams -> TopicPeerState -> TopicPeerState
recordMeshFailure TopicScoreParams
tsp TopicPeerState
topicSt
in PeerState
ps { psTopicState = Map.insert topic topicSt' (psTopicState ps) }
) PeerId
sender
Maybe TopicScoreParams
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map Topic (Set PeerId))
-> (Map Topic (Set PeerId) -> Map Topic (Set PeerId)) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map Topic (Set PeerId))
gsMesh GossipSubRouter
router) ((Map Topic (Set PeerId) -> Map Topic (Set PeerId)) -> STM ())
-> (Map Topic (Set PeerId) -> Map Topic (Set PeerId)) -> STM ()
forall a b. (a -> b) -> a -> b
$
(Set PeerId -> Set PeerId)
-> Topic -> Map Topic (Set PeerId) -> Map Topic (Set PeerId)
forall k a. Ord k => (a -> a) -> k -> Map k a -> Map k a
Map.adjust (PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => a -> Set a -> Set a
Set.delete PeerId
sender) Topic
topic
let backoffDuration :: NominalDiffTime
backoffDuration = case Prune -> Maybe Word64
pruneBackoff Prune
prune of
Just Word64
secs -> Word64 -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word64
secs
Maybe Word64
Nothing -> GossipSubParams -> NominalDiffTime
paramPruneBackoff (GossipSubRouter -> GossipSubParams
gsParams GossipSubRouter
router)
expires :: UTCTime
expires = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
backoffDuration UTCTime
now
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map (PeerId, Topic) UTCTime)
-> (Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map (PeerId, Topic) UTCTime)
gsBackoff GossipSubRouter
router) ((Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ())
-> (Map (PeerId, Topic) UTCTime -> Map (PeerId, Topic) UTCTime)
-> STM ()
forall a b. (a -> b) -> a -> b
$
(PeerId, Topic)
-> UTCTime
-> Map (PeerId, Topic) UTCTime
-> Map (PeerId, Topic) UTCTime
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert (PeerId
sender, Topic
topic) UTCTime
expires
handleIHave :: GossipSubRouter -> PeerId -> [IHave] -> IO ()
handleIHave :: GossipSubRouter -> PeerId -> [IHave] -> IO ()
handleIHave GossipSubRouter
router PeerId
sender [IHave]
ihaves = do
seenMap <- TVar (Map ByteString UTCTime) -> IO (Map ByteString UTCTime)
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map ByteString UTCTime)
gsSeen GossipSubRouter
router)
let unseen = (IHave -> [ByteString]) -> [IHave] -> [ByteString]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\(IHave Topic
_ [ByteString]
mids) ->
(ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter (\ByteString
mid -> Bool -> Bool
not (ByteString -> Map ByteString UTCTime -> Bool
forall k a. Ord k => k -> Map k a -> Bool
Map.member ByteString
mid Map ByteString UTCTime
seenMap)) [ByteString]
mids) [IHave]
ihaves
unless (null unseen) $
gsSendRPC router sender emptyRPC
{ rpcControl = Just emptyControlMessage { ctrlIWant = [IWant unseen] } }
handleIWant :: GossipSubRouter -> PeerId -> [IWant] -> IO ()
handleIWant :: GossipSubRouter -> PeerId -> [IWant] -> IO ()
handleIWant GossipSubRouter
router PeerId
sender [IWant]
iwants = do
cache <- TVar MessageCache -> IO MessageCache
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar MessageCache
gsMessageCache GossipSubRouter
router)
let requestedIds = (IWant -> [ByteString]) -> [IWant] -> [ByteString]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap IWant -> [ByteString]
iwantMessageIds [IWant]
iwants
found = [ PubSubMessage
msg | ByteString
mid <- [ByteString]
requestedIds
, Just PubSubMessage
msg <- [ByteString -> MessageCache -> Maybe PubSubMessage
cacheGet ByteString
mid MessageCache
cache] ]
unless (null found) $
gsSendRPC router sender emptyRPC { rpcPublish = found }
handleSubscriptions :: GossipSubRouter -> PeerId -> [SubOpts] -> IO ()
handleSubscriptions :: GossipSubRouter -> PeerId -> [SubOpts] -> IO ()
handleSubscriptions GossipSubRouter
router PeerId
sender [SubOpts]
subs = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TVar (Map PeerId PeerState)
-> (Map PeerId PeerState -> Map PeerId PeerState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router) ((Map PeerId PeerState -> Map PeerId PeerState) -> STM ())
-> (Map PeerId PeerState -> Map PeerId PeerState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \Map PeerId PeerState
peerMap ->
case PeerId -> Map PeerId PeerState -> Maybe PeerState
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup PeerId
sender Map PeerId PeerState
peerMap of
Maybe PeerState
Nothing -> Map PeerId PeerState
peerMap
Just PeerState
ps ->
let topics' :: Set Topic
topics' = (Set Topic -> SubOpts -> Set Topic)
-> Set Topic -> [SubOpts] -> Set Topic
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl (\Set Topic
ts SubOpts
sub ->
if SubOpts -> Bool
subSubscribe SubOpts
sub
then Topic -> Set Topic -> Set Topic
forall a. Ord a => a -> Set a -> Set a
Set.insert (SubOpts -> Topic
subTopicId SubOpts
sub) Set Topic
ts
else Topic -> Set Topic -> Set Topic
forall a. Ord a => a -> Set a -> Set a
Set.delete (SubOpts -> Topic
subTopicId SubOpts
sub) Set Topic
ts
) (PeerState -> Set Topic
psTopics PeerState
ps) [SubOpts]
subs
in PeerId -> PeerState -> Map PeerId PeerState -> Map PeerId PeerState
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert PeerId
sender PeerState
ps { psTopics = topics' } Map PeerId PeerState
peerMap
forwardMessage :: GossipSubRouter -> PeerId -> PubSubMessage -> IO ()
forwardMessage :: GossipSubRouter -> PeerId -> PubSubMessage -> IO ()
forwardMessage GossipSubRouter
router PeerId
sender PubSubMessage
msg = do
let topic :: Topic
topic = PubSubMessage -> Topic
msgTopic PubSubMessage
msg
meshPeers <- STM (Set PeerId) -> IO (Set PeerId)
forall a. STM a -> IO a
atomically (STM (Set PeerId) -> IO (Set PeerId))
-> STM (Set PeerId) -> IO (Set PeerId)
forall a b. (a -> b) -> a -> b
$ do
m <- TVar (Map Topic (Set PeerId)) -> STM (Map Topic (Set PeerId))
forall a. TVar a -> STM a
readTVar (GossipSubRouter -> TVar (Map Topic (Set PeerId))
gsMesh GossipSubRouter
router)
pure (Map.findWithDefault Set.empty topic m)
let targets = PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => a -> Set a -> Set a
Set.delete PeerId
sender Set PeerId
meshPeers
fwdRPC = RPC
emptyRPC { rpcPublish = [msg] }
mapM_ (\PeerId
pid -> GossipSubRouter -> PeerId -> RPC -> IO ()
gsSendRPC GossipSubRouter
router PeerId
pid RPC
fwdRPC) (Set.toList targets)
peerScore :: GossipSubRouter -> PeerId -> IO Double
peerScore :: GossipSubRouter -> PeerId -> IO Double
peerScore GossipSubRouter
router PeerId
pid = do
peers <- TVar (Map PeerId PeerState) -> IO (Map PeerId PeerState)
forall a. TVar a -> IO a
readTVarIO (GossipSubRouter -> TVar (Map PeerId PeerState)
gsPeers GossipSubRouter
router)
now <- gsGetTime router
ipMap <- readTVarIO (gsIPPeerCount router)
case Map.lookup pid peers of
Maybe PeerState
Nothing -> Double -> IO Double
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Double
0
Just PeerState
ps -> Double -> IO Double
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Double -> IO Double) -> Double -> IO Double
forall a b. (a -> b) -> a -> b
$ PeerScoreParams
-> PeerState -> Map ByteString (Set PeerId) -> UTCTime -> Double
computeScore (GossipSubRouter -> PeerScoreParams
gsScoreParams GossipSubRouter
router) PeerState
ps Map ByteString (Set PeerId)
ipMap UTCTime
now
graftRPC :: Topic -> RPC
graftRPC :: Topic -> RPC
graftRPC Topic
topic = RPC
emptyRPC
{ rpcControl = Just emptyControlMessage { ctrlGraft = [Graft topic] } }
pruneRPC :: Topic -> [PeerExchangeInfo] -> Maybe Word64 -> RPC
pruneRPC :: Topic -> [PeerExchangeInfo] -> Maybe Word64 -> RPC
pruneRPC Topic
topic [PeerExchangeInfo]
peers Maybe Word64
backoff = RPC
emptyRPC
{ rpcControl = Just emptyControlMessage { ctrlPrune = [Prune topic peers backoff] } }
mkUnsignedMessage :: Topic -> ByteString -> PubSubMessage
mkUnsignedMessage :: Topic -> ByteString -> PubSubMessage
mkUnsignedMessage Topic
topic ByteString
payload = PubSubMessage
{ msgFrom :: Maybe ByteString
msgFrom = Maybe ByteString
forall a. Maybe a
Nothing
, msgData :: ByteString
msgData = ByteString
payload
, msgSeqNo :: Maybe ByteString
msgSeqNo = Maybe ByteString
forall a. Maybe a
Nothing
, msgTopic :: Topic
msgTopic = Topic
topic
, msgSignature :: Maybe ByteString
msgSignature = Maybe ByteString
forall a. Maybe a
Nothing
, msgKey :: Maybe ByteString
msgKey = Maybe ByteString
forall a. Maybe a
Nothing
}
mkSignedMessage :: GossipSubRouter -> Topic -> ByteString -> KeyPair -> IO PubSubMessage
mkSignedMessage :: GossipSubRouter
-> Topic -> ByteString -> KeyPair -> IO PubSubMessage
mkSignedMessage GossipSubRouter
router Topic
topic ByteString
payload KeyPair
kp = do
seqno <- Int -> IO ByteString
forall byteArray. ByteArray byteArray => Int -> IO byteArray
forall (m :: * -> *) byteArray.
(MonadRandom m, ByteArray byteArray) =>
Int -> m byteArray
getRandomBytes Int
8 :: IO ByteString
let from = PeerId -> ByteString
peerIdBytes (GossipSubRouter -> PeerId
gsLocalPeerId GossipSubRouter
router)
pubKeyBytes = PublicKey -> ByteString
encodePublicKey (KeyPair -> PublicKey
kpPublic KeyPair
kp)
unsigned = PubSubMessage
{ msgFrom :: Maybe ByteString
msgFrom = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
from
, msgData :: ByteString
msgData = ByteString
payload
, msgSeqNo :: Maybe ByteString
msgSeqNo = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
seqno
, msgTopic :: Topic
msgTopic = Topic
topic
, msgSignature :: Maybe ByteString
msgSignature = Maybe ByteString
forall a. Maybe a
Nothing
, msgKey :: Maybe ByteString
msgKey = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
pubKeyBytes
}
signData = ByteString
"libp2p-pubsub:" ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> PubSubMessage -> ByteString
marshalForSigning PubSubMessage
unsigned
case sign (kpPrivate kp) signData of
Left String
_err -> PubSubMessage -> IO PubSubMessage
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure PubSubMessage
unsigned
Right ByteString
sig -> PubSubMessage -> IO PubSubMessage
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure PubSubMessage
unsigned { msgSignature = Just sig }
marshalForSigning :: PubSubMessage -> ByteString
marshalForSigning :: PubSubMessage -> ByteString
marshalForSigning PubSubMessage
msg = PubSubMessage -> ByteString
encodePubSubMessageBS
(PubSubMessage
msg { msgSignature = Nothing, msgKey = Nothing })