module Network.LibP2P.Protocol.GossipSub.Handler
(
GossipSubNode (..)
, newGossipSubNode
, handleGossipSubStream
, startGossipSub
, stopGossipSub
, gossipJoin
, gossipLeave
, gossipPublish
, gossipSubProtocolId
) where
import Control.Concurrent.Async (Async, async, cancel)
import Control.Concurrent.STM
( TVar
, atomically
, newTVarIO
, readTVar
, writeTVar
, modifyTVar'
)
import Control.Exception (SomeException, catch)
import Data.ByteString (ByteString)
import qualified Data.Map.Strict as Map
import Data.Time.Clock (getCurrentTime)
import Network.LibP2P.Crypto.PeerId (PeerId)
import Network.LibP2P.MultistreamSelect.Negotiation
( NegotiationResult (..)
, ProtocolId
, StreamIO (..)
, negotiateInitiator
)
import Network.LibP2P.Protocol.GossipSub.Heartbeat (runHeartbeat)
import Network.LibP2P.Protocol.GossipSub.Message (readRPCMessage, writeRPCMessage)
import Network.LibP2P.Protocol.GossipSub.Router
( addPeer
, handleRPC
, join
, leave
, newRouter
, publish
, removePeer
)
import qualified Data.Set as Set
import Network.LibP2P.Protocol.GossipSub.Types
( GossipSubParams
, GossipSubRouter (..)
, PeerProtocol (..)
, RPC (..)
, SubOpts (..)
, Topic
, emptyRPC
, maxRPCSize
)
import Network.LibP2P.Switch.ConnPool (lookupConn)
import Network.LibP2P.Switch.Switch (removeStreamHandler, setStreamHandler)
import Network.LibP2P.Switch.Types
( Connection (..)
, MuxerSession (..)
, Switch (..)
)
gossipSubProtocolId :: ProtocolId
gossipSubProtocolId :: ProtocolId
gossipSubProtocolId = ProtocolId
"/meshsub/1.1.0"
data GossipSubNode = GossipSubNode
{ GossipSubNode -> GossipSubRouter
gsnRouter :: !GossipSubRouter
, GossipSubNode -> Switch
gsnSwitch :: !Switch
, GossipSubNode -> TVar (Maybe (Async ()))
gsnHeartbeat :: !(TVar (Maybe (Async ())))
, GossipSubNode -> TVar (Map PeerId StreamIO)
gsnStreams :: !(TVar (Map.Map PeerId StreamIO))
}
newGossipSubNode :: Switch -> GossipSubParams -> IO GossipSubNode
newGossipSubNode :: Switch -> GossipSubParams -> IO GossipSubNode
newGossipSubNode Switch
sw GossipSubParams
params = do
streamsVar <- Map PeerId StreamIO -> IO (TVar (Map PeerId StreamIO))
forall a. a -> IO (TVar a)
newTVarIO Map PeerId StreamIO
forall k a. Map k a
Map.empty
hbVar <- newTVarIO Nothing
let localPid = Switch -> PeerId
swLocalPeerId Switch
sw
router <- newRouter params localPid (sendRPCviaSwitch sw streamsVar) getCurrentTime
pure GossipSubNode
{ gsnRouter = router
, gsnSwitch = sw
, gsnHeartbeat = hbVar
, gsnStreams = streamsVar
}
sendRPCviaSwitch :: Switch -> TVar (Map.Map PeerId StreamIO) -> PeerId -> RPC -> IO ()
sendRPCviaSwitch :: Switch -> TVar (Map PeerId StreamIO) -> PeerId -> RPC -> IO ()
sendRPCviaSwitch Switch
sw TVar (Map PeerId StreamIO)
streamsVar PeerId
pid RPC
rpc = do
mCached <- STM (Maybe StreamIO) -> IO (Maybe StreamIO)
forall a. STM a -> IO a
atomically (STM (Maybe StreamIO) -> IO (Maybe StreamIO))
-> STM (Maybe StreamIO) -> IO (Maybe StreamIO)
forall a b. (a -> b) -> a -> b
$ PeerId -> Map PeerId StreamIO -> Maybe StreamIO
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup PeerId
pid (Map PeerId StreamIO -> Maybe StreamIO)
-> STM (Map PeerId StreamIO) -> STM (Maybe StreamIO)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Map PeerId StreamIO) -> STM (Map PeerId StreamIO)
forall a. TVar a -> STM a
readTVar TVar (Map PeerId StreamIO)
streamsVar
case mCached of
Just StreamIO
stream -> do
sendResult <- StreamIO -> RPC -> IO (Either () ())
trySend StreamIO
stream RPC
rpc
case sendResult of
Right () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left ()
_ -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map PeerId StreamIO)
-> (Map PeerId StreamIO -> Map PeerId StreamIO) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map PeerId StreamIO)
streamsVar (PeerId -> Map PeerId StreamIO -> Map PeerId StreamIO
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete PeerId
pid)
Switch -> TVar (Map PeerId StreamIO) -> PeerId -> RPC -> IO ()
openAndSend Switch
sw TVar (Map PeerId StreamIO)
streamsVar PeerId
pid RPC
rpc
Maybe StreamIO
Nothing -> Switch -> TVar (Map PeerId StreamIO) -> PeerId -> RPC -> IO ()
openAndSend Switch
sw TVar (Map PeerId StreamIO)
streamsVar PeerId
pid RPC
rpc
openAndSend :: Switch -> TVar (Map.Map PeerId StreamIO) -> PeerId -> RPC -> IO ()
openAndSend :: Switch -> TVar (Map PeerId StreamIO) -> PeerId -> RPC -> IO ()
openAndSend Switch
sw TVar (Map PeerId StreamIO)
streamsVar PeerId
pid RPC
rpc = do
mStream <- Switch -> PeerId -> IO (Maybe StreamIO)
openStreamToPeer Switch
sw PeerId
pid
case mStream of
Maybe StreamIO
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just StreamIO
stream -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map PeerId StreamIO)
-> (Map PeerId StreamIO -> Map PeerId StreamIO) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map PeerId StreamIO)
streamsVar (PeerId -> StreamIO -> Map PeerId StreamIO -> Map PeerId StreamIO
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert PeerId
pid StreamIO
stream)
_ <- StreamIO -> RPC -> IO (Either () ())
trySend StreamIO
stream RPC
rpc
pure ()
openStreamToPeer :: Switch -> PeerId -> IO (Maybe StreamIO)
openStreamToPeer :: Switch -> PeerId -> IO (Maybe StreamIO)
openStreamToPeer Switch
sw PeerId
pid = do
mConn <- STM (Maybe Connection) -> IO (Maybe Connection)
forall a. STM a -> IO a
atomically (STM (Maybe Connection) -> IO (Maybe Connection))
-> STM (Maybe Connection) -> IO (Maybe Connection)
forall a b. (a -> b) -> a -> b
$ TVar (Map PeerId [Connection]) -> PeerId -> STM (Maybe Connection)
lookupConn (Switch -> TVar (Map PeerId [Connection])
swConnPool Switch
sw) PeerId
pid
case mConn of
Maybe Connection
Nothing -> Maybe StreamIO -> IO (Maybe StreamIO)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe StreamIO
forall a. Maybe a
Nothing
Just Connection
conn -> do
result <- (Maybe StreamIO -> Either () (Maybe StreamIO)
forall a b. b -> Either a b
Right (Maybe StreamIO -> Either () (Maybe StreamIO))
-> IO (Maybe StreamIO) -> IO (Either () (Maybe StreamIO))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe StreamIO)
openAndNegotiate Connection
conn) IO (Either () (Maybe StreamIO))
-> (SomeException -> IO (Either () (Maybe StreamIO)))
-> IO (Either () (Maybe StreamIO))
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch`
(\(SomeException
_ :: SomeException) -> Either () (Maybe StreamIO) -> IO (Either () (Maybe StreamIO))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () (Maybe StreamIO)
forall a b. a -> Either a b
Left ()))
case result of
Left () -> Maybe StreamIO -> IO (Maybe StreamIO)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe StreamIO
forall a. Maybe a
Nothing
Right Maybe StreamIO
mStream -> Maybe StreamIO -> IO (Maybe StreamIO)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe StreamIO
mStream
openAndNegotiate :: Connection -> IO (Maybe StreamIO)
openAndNegotiate :: Connection -> IO (Maybe StreamIO)
openAndNegotiate Connection
conn = do
stream <- MuxerSession -> IO StreamIO
muxOpenStream (Connection -> MuxerSession
connSession Connection
conn)
negResult <- negotiateInitiator stream [gossipSubProtocolId]
case negResult of
Accepted ProtocolId
_ -> Maybe StreamIO -> IO (Maybe StreamIO)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamIO -> Maybe StreamIO
forall a. a -> Maybe a
Just StreamIO
stream)
NegotiationResult
NoProtocol -> Maybe StreamIO -> IO (Maybe StreamIO)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe StreamIO
forall a. Maybe a
Nothing
trySend :: StreamIO -> RPC -> IO (Either () ())
trySend :: StreamIO -> RPC -> IO (Either () ())
trySend StreamIO
stream RPC
rpc =
(StreamIO -> RPC -> IO ()
writeRPCMessage StreamIO
stream RPC
rpc IO () -> IO (Either () ()) -> IO (Either () ())
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either () () -> IO (Either () ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () ()
forall a b. b -> Either a b
Right ()))
IO (Either () ())
-> (SomeException -> IO (Either () ())) -> IO (Either () ())
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
_ :: SomeException) -> Either () () -> IO (Either () ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () ()
forall a b. a -> Either a b
Left ()))
handleGossipSubStream :: GossipSubNode -> StreamIO -> PeerId -> IO ()
handleGossipSubStream :: GossipSubNode -> StreamIO -> PeerId -> IO ()
handleGossipSubStream GossipSubNode
node StreamIO
stream PeerId
pid = do
now <- IO UTCTime
getCurrentTime
addPeer (gsnRouter node) pid GossipSubPeer False now
readLoop
removePeer (gsnRouter node) pid
atomically $ modifyTVar' (gsnStreams node) (Map.delete pid)
where
readLoop :: IO ()
readLoop = do
result <- StreamIO -> Int -> IO (Either String RPC)
readRPCMessage StreamIO
stream Int
maxRPCSize
case result of
Left String
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Right RPC
rpc -> do
GossipSubRouter -> PeerId -> RPC -> IO ()
handleRPC (GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node) PeerId
pid RPC
rpc
IO ()
readLoop
startGossipSub :: GossipSubNode -> IO ()
startGossipSub :: GossipSubNode -> IO ()
startGossipSub GossipSubNode
node = do
Switch -> ProtocolId -> (StreamIO -> PeerId -> IO ()) -> IO ()
setStreamHandler (GossipSubNode -> Switch
gsnSwitch GossipSubNode
node) ProtocolId
gossipSubProtocolId
(GossipSubNode -> StreamIO -> PeerId -> IO ()
handleGossipSubStream GossipSubNode
node)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar [Connection -> IO ()]
-> ([Connection -> IO ()] -> [Connection -> IO ()]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (Switch -> TVar [Connection -> IO ()]
swNotifiers (GossipSubNode -> Switch
gsnSwitch GossipSubNode
node))
(GossipSubNode -> Connection -> IO ()
onNewConnection GossipSubNode
node (Connection -> IO ())
-> [Connection -> IO ()] -> [Connection -> IO ()]
forall a. a -> [a] -> [a]
:)
hbAsync <- GossipSubRouter -> IO (Async ())
runHeartbeat (GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node)
atomically $ writeTVar (gsnHeartbeat node) (Just hbAsync)
onNewConnection :: GossipSubNode -> Connection -> IO ()
onNewConnection :: GossipSubNode -> Connection -> IO ()
onNewConnection GossipSubNode
node Connection
conn = do
let pid :: PeerId
pid = Connection -> PeerId
connPeerId Connection
conn
mStream <- Connection -> IO (Maybe StreamIO)
openAndNegotiate Connection
conn
case mStream of
Maybe StreamIO
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Just StreamIO
stream -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map PeerId StreamIO)
-> (Map PeerId StreamIO -> Map PeerId StreamIO) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubNode -> TVar (Map PeerId StreamIO)
gsnStreams GossipSubNode
node) (PeerId -> StreamIO -> Map PeerId StreamIO -> Map PeerId StreamIO
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert PeerId
pid StreamIO
stream)
now <- IO UTCTime
getCurrentTime
addPeer (gsnRouter node) pid GossipSubPeer True now
sendCurrentSubscriptions node stream
_ <- async $ outboundReadLoop node stream pid
pure ()
sendCurrentSubscriptions :: GossipSubNode -> StreamIO -> IO ()
sendCurrentSubscriptions :: GossipSubNode -> StreamIO -> IO ()
sendCurrentSubscriptions GossipSubNode
node StreamIO
stream = do
let router :: GossipSubRouter
router = GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node
meshTopics <- STM (Set ProtocolId) -> IO (Set ProtocolId)
forall a. STM a -> IO a
atomically (STM (Set ProtocolId) -> IO (Set ProtocolId))
-> STM (Set ProtocolId) -> IO (Set ProtocolId)
forall a b. (a -> b) -> a -> b
$ Map ProtocolId (Set PeerId) -> Set ProtocolId
forall k a. Map k a -> Set k
Map.keysSet (Map ProtocolId (Set PeerId) -> Set ProtocolId)
-> STM (Map ProtocolId (Set PeerId)) -> STM (Set ProtocolId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Map ProtocolId (Set PeerId))
-> STM (Map ProtocolId (Set PeerId))
forall a. TVar a -> STM a
readTVar (GossipSubRouter -> TVar (Map ProtocolId (Set PeerId))
gsMesh GossipSubRouter
router)
let topics = Set ProtocolId -> [ProtocolId]
forall a. Set a -> [a]
Set.toList Set ProtocolId
meshTopics
if null topics
then pure ()
else do
let subRPC = RPC
emptyRPC
{ rpcSubscriptions = map (\ProtocolId
t -> Bool -> ProtocolId -> SubOpts
SubOpts Bool
True ProtocolId
t) topics }
_ <- trySend stream subRPC
pure ()
outboundReadLoop :: GossipSubNode -> StreamIO -> PeerId -> IO ()
outboundReadLoop :: GossipSubNode -> StreamIO -> PeerId -> IO ()
outboundReadLoop GossipSubNode
node StreamIO
stream PeerId
pid = IO ()
loop
where
loop :: IO ()
loop = do
result <- StreamIO -> Int -> IO (Either String RPC)
readRPCMessage StreamIO
stream Int
maxRPCSize
case result of
Left String
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Right RPC
rpc -> do
GossipSubRouter -> PeerId -> RPC -> IO ()
handleRPC (GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node) PeerId
pid RPC
rpc
IO ()
loop
stopGossipSub :: GossipSubNode -> IO ()
stopGossipSub :: GossipSubNode -> IO ()
stopGossipSub GossipSubNode
node = do
mHb <- STM (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. STM a -> IO a
atomically (STM (Maybe (Async ())) -> IO (Maybe (Async ())))
-> STM (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a b. (a -> b) -> a -> b
$ do
hb <- TVar (Maybe (Async ())) -> STM (Maybe (Async ()))
forall a. TVar a -> STM a
readTVar (GossipSubNode -> TVar (Maybe (Async ()))
gsnHeartbeat GossipSubNode
node)
writeTVar (gsnHeartbeat node) Nothing
pure hb
case mHb of
Just Async ()
hbAsync -> Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
hbAsync IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
_ :: SomeException) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
Maybe (Async ())
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
removeStreamHandler (gsnSwitch node) gossipSubProtocolId
gossipJoin :: GossipSubNode -> Topic -> IO ()
gossipJoin :: GossipSubNode -> ProtocolId -> IO ()
gossipJoin GossipSubNode
node ProtocolId
topic = GossipSubRouter -> ProtocolId -> IO ()
join (GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node) ProtocolId
topic
gossipLeave :: GossipSubNode -> Topic -> IO ()
gossipLeave :: GossipSubNode -> ProtocolId -> IO ()
gossipLeave GossipSubNode
node ProtocolId
topic = GossipSubRouter -> ProtocolId -> IO ()
leave (GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node) ProtocolId
topic
gossipPublish :: GossipSubNode -> Topic -> ByteString -> IO ()
gossipPublish :: GossipSubNode -> ProtocolId -> ByteString -> IO ()
gossipPublish GossipSubNode
node ProtocolId
topic ByteString
payload =
GossipSubRouter
-> ProtocolId -> ByteString -> Maybe KeyPair -> IO ()
publish (GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node) ProtocolId
topic ByteString
payload (KeyPair -> Maybe KeyPair
forall a. a -> Maybe a
Just (Switch -> KeyPair
swIdentityKey (GossipSubNode -> Switch
gsnSwitch GossipSubNode
node)))