-- | GossipSub Switch integration handler (Phase 10b).
--
-- Bridges the GossipSub Router with the Switch by:
-- 1. Registering a StreamHandler for inbound /meshsub/1.1.0 streams
-- 2. Providing a sendRPC callback that opens/reuses outbound streams
-- 3. Managing lifecycle (heartbeat start/stop)
--
-- GossipSub maintains persistent bidirectional RPC streams, unlike
-- Identify/Ping which are one-shot. Each peer has at most one cached
-- outbound stream.
module Network.LibP2P.Protocol.GossipSub.Handler
  ( -- * Types
    GossipSubNode (..)
    -- * Construction
  , newGossipSubNode
    -- * Stream handling
  , handleGossipSubStream
    -- * Lifecycle
  , startGossipSub
  , stopGossipSub
    -- * Convenience API
  , gossipJoin
  , gossipLeave
  , gossipPublish
    -- * Constants
  , gossipSubProtocolId
  ) where

import Control.Concurrent.Async (Async, async, cancel)
import Control.Concurrent.STM
  ( TVar
  , atomically
  , newTVarIO
  , readTVar
  , writeTVar
  , modifyTVar'
  )
import Control.Exception (SomeException, catch)
import Data.ByteString (ByteString)
import qualified Data.Map.Strict as Map
import Data.Time.Clock (getCurrentTime)
import Network.LibP2P.Crypto.PeerId (PeerId)
import Network.LibP2P.MultistreamSelect.Negotiation
  ( NegotiationResult (..)
  , ProtocolId
  , StreamIO (..)
  , negotiateInitiator
  )
import Network.LibP2P.Protocol.GossipSub.Heartbeat (runHeartbeat)
import Network.LibP2P.Protocol.GossipSub.Message (readRPCMessage, writeRPCMessage)
import Network.LibP2P.Protocol.GossipSub.Router
  ( addPeer
  , handleRPC
  , join
  , leave
  , newRouter
  , publish
  , removePeer
  )
import qualified Data.Set as Set
import Network.LibP2P.Protocol.GossipSub.Types
  ( GossipSubParams
  , GossipSubRouter (..)
  , PeerProtocol (..)
  , RPC (..)
  , SubOpts (..)
  , Topic
  , emptyRPC
  , maxRPCSize
  )
import Network.LibP2P.Switch.ConnPool (lookupConn)
import Network.LibP2P.Switch.Switch (removeStreamHandler, setStreamHandler)
import Network.LibP2P.Switch.Types
  ( Connection (..)
  , MuxerSession (..)
  , Switch (..)
  )

-- | GossipSub protocol ID.
gossipSubProtocolId :: ProtocolId
gossipSubProtocolId :: ProtocolId
gossipSubProtocolId = ProtocolId
"/meshsub/1.1.0"

-- | A GossipSub node: Router + Switch integration.
data GossipSubNode = GossipSubNode
  { GossipSubNode -> GossipSubRouter
gsnRouter    :: !GossipSubRouter
  , GossipSubNode -> Switch
gsnSwitch    :: !Switch
  , GossipSubNode -> TVar (Maybe (Async ()))
gsnHeartbeat :: !(TVar (Maybe (Async ())))
  , GossipSubNode -> TVar (Map PeerId StreamIO)
gsnStreams   :: !(TVar (Map.Map PeerId StreamIO))  -- ^ Cached outbound streams per peer
  }

-- | Create a new GossipSub node with a Router wired to the Switch.
--
-- The Router's gsSendRPC callback opens/reuses outbound streams to peers
-- via the Switch's connection pool.
newGossipSubNode :: Switch -> GossipSubParams -> IO GossipSubNode
newGossipSubNode :: Switch -> GossipSubParams -> IO GossipSubNode
newGossipSubNode Switch
sw GossipSubParams
params = do
  streamsVar <- Map PeerId StreamIO -> IO (TVar (Map PeerId StreamIO))
forall a. a -> IO (TVar a)
newTVarIO Map PeerId StreamIO
forall k a. Map k a
Map.empty
  hbVar <- newTVarIO Nothing
  -- Create router with real sendRPC that uses the Switch
  let localPid = Switch -> PeerId
swLocalPeerId Switch
sw
  router <- newRouter params localPid (sendRPCviaSwitch sw streamsVar) getCurrentTime
  pure GossipSubNode
    { gsnRouter    = router
    , gsnSwitch    = sw
    , gsnHeartbeat = hbVar
    , gsnStreams   = streamsVar
    }

-- | Send an RPC to a peer via cached or newly opened stream.
sendRPCviaSwitch :: Switch -> TVar (Map.Map PeerId StreamIO) -> PeerId -> RPC -> IO ()
sendRPCviaSwitch :: Switch -> TVar (Map PeerId StreamIO) -> PeerId -> RPC -> IO ()
sendRPCviaSwitch Switch
sw TVar (Map PeerId StreamIO)
streamsVar PeerId
pid RPC
rpc = do
  -- Try to use cached stream
  mCached <- STM (Maybe StreamIO) -> IO (Maybe StreamIO)
forall a. STM a -> IO a
atomically (STM (Maybe StreamIO) -> IO (Maybe StreamIO))
-> STM (Maybe StreamIO) -> IO (Maybe StreamIO)
forall a b. (a -> b) -> a -> b
$ PeerId -> Map PeerId StreamIO -> Maybe StreamIO
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup PeerId
pid (Map PeerId StreamIO -> Maybe StreamIO)
-> STM (Map PeerId StreamIO) -> STM (Maybe StreamIO)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Map PeerId StreamIO) -> STM (Map PeerId StreamIO)
forall a. TVar a -> STM a
readTVar TVar (Map PeerId StreamIO)
streamsVar
  case mCached of
    Just StreamIO
stream -> do
      -- Try sending on cached stream; reopen on failure
      sendResult <- StreamIO -> RPC -> IO (Either () ())
trySend StreamIO
stream RPC
rpc
      case sendResult of
        Right () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Left ()
_ -> do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map PeerId StreamIO)
-> (Map PeerId StreamIO -> Map PeerId StreamIO) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map PeerId StreamIO)
streamsVar (PeerId -> Map PeerId StreamIO -> Map PeerId StreamIO
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete PeerId
pid)
          Switch -> TVar (Map PeerId StreamIO) -> PeerId -> RPC -> IO ()
openAndSend Switch
sw TVar (Map PeerId StreamIO)
streamsVar PeerId
pid RPC
rpc
    Maybe StreamIO
Nothing -> Switch -> TVar (Map PeerId StreamIO) -> PeerId -> RPC -> IO ()
openAndSend Switch
sw TVar (Map PeerId StreamIO)
streamsVar PeerId
pid RPC
rpc

-- | Open a new outbound stream to a peer and send an RPC.
openAndSend :: Switch -> TVar (Map.Map PeerId StreamIO) -> PeerId -> RPC -> IO ()
openAndSend :: Switch -> TVar (Map PeerId StreamIO) -> PeerId -> RPC -> IO ()
openAndSend Switch
sw TVar (Map PeerId StreamIO)
streamsVar PeerId
pid RPC
rpc = do
  mStream <- Switch -> PeerId -> IO (Maybe StreamIO)
openStreamToPeer Switch
sw PeerId
pid
  case mStream of
    Maybe StreamIO
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()  -- No connection to peer; fire-and-forget
    Just StreamIO
stream -> do
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map PeerId StreamIO)
-> (Map PeerId StreamIO -> Map PeerId StreamIO) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Map PeerId StreamIO)
streamsVar (PeerId -> StreamIO -> Map PeerId StreamIO -> Map PeerId StreamIO
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert PeerId
pid StreamIO
stream)
      _ <- StreamIO -> RPC -> IO (Either () ())
trySend StreamIO
stream RPC
rpc
      pure ()

-- | Open a new mux stream to a peer and negotiate GossipSub protocol.
openStreamToPeer :: Switch -> PeerId -> IO (Maybe StreamIO)
openStreamToPeer :: Switch -> PeerId -> IO (Maybe StreamIO)
openStreamToPeer Switch
sw PeerId
pid = do
  mConn <- STM (Maybe Connection) -> IO (Maybe Connection)
forall a. STM a -> IO a
atomically (STM (Maybe Connection) -> IO (Maybe Connection))
-> STM (Maybe Connection) -> IO (Maybe Connection)
forall a b. (a -> b) -> a -> b
$ TVar (Map PeerId [Connection]) -> PeerId -> STM (Maybe Connection)
lookupConn (Switch -> TVar (Map PeerId [Connection])
swConnPool Switch
sw) PeerId
pid
  case mConn of
    Maybe Connection
Nothing -> Maybe StreamIO -> IO (Maybe StreamIO)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe StreamIO
forall a. Maybe a
Nothing
    Just Connection
conn -> do
      result <- (Maybe StreamIO -> Either () (Maybe StreamIO)
forall a b. b -> Either a b
Right (Maybe StreamIO -> Either () (Maybe StreamIO))
-> IO (Maybe StreamIO) -> IO (Either () (Maybe StreamIO))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Connection -> IO (Maybe StreamIO)
openAndNegotiate Connection
conn) IO (Either () (Maybe StreamIO))
-> (SomeException -> IO (Either () (Maybe StreamIO)))
-> IO (Either () (Maybe StreamIO))
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch`
                  (\(SomeException
_ :: SomeException) -> Either () (Maybe StreamIO) -> IO (Either () (Maybe StreamIO))
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () (Maybe StreamIO)
forall a b. a -> Either a b
Left ()))
      case result of
        Left () -> Maybe StreamIO -> IO (Maybe StreamIO)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe StreamIO
forall a. Maybe a
Nothing
        Right Maybe StreamIO
mStream -> Maybe StreamIO -> IO (Maybe StreamIO)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe StreamIO
mStream

-- | Open a mux stream and negotiate /meshsub/1.1.0.
openAndNegotiate :: Connection -> IO (Maybe StreamIO)
openAndNegotiate :: Connection -> IO (Maybe StreamIO)
openAndNegotiate Connection
conn = do
  stream <- MuxerSession -> IO StreamIO
muxOpenStream (Connection -> MuxerSession
connSession Connection
conn)
  negResult <- negotiateInitiator stream [gossipSubProtocolId]
  case negResult of
    Accepted ProtocolId
_ -> Maybe StreamIO -> IO (Maybe StreamIO)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StreamIO -> Maybe StreamIO
forall a. a -> Maybe a
Just StreamIO
stream)
    NegotiationResult
NoProtocol -> Maybe StreamIO -> IO (Maybe StreamIO)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe StreamIO
forall a. Maybe a
Nothing

-- | Try to send an RPC on a stream, catching exceptions.
trySend :: StreamIO -> RPC -> IO (Either () ())
trySend :: StreamIO -> RPC -> IO (Either () ())
trySend StreamIO
stream RPC
rpc =
  (StreamIO -> RPC -> IO ()
writeRPCMessage StreamIO
stream RPC
rpc IO () -> IO (Either () ()) -> IO (Either () ())
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Either () () -> IO (Either () ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () ()
forall a b. b -> Either a b
Right ()))
    IO (Either () ())
-> (SomeException -> IO (Either () ())) -> IO (Either () ())
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
_ :: SomeException) -> Either () () -> IO (Either () ())
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () ()
forall a b. a -> Either a b
Left ()))

-- | Handle an inbound GossipSub stream.
--
-- Reads framed RPCs in a loop and dispatches each to the Router's handleRPC.
-- On error or EOF, cleans up the peer's cached stream and removes the peer.
handleGossipSubStream :: GossipSubNode -> StreamIO -> PeerId -> IO ()
handleGossipSubStream :: GossipSubNode -> StreamIO -> PeerId -> IO ()
handleGossipSubStream GossipSubNode
node StreamIO
stream PeerId
pid = do
  -- Register peer with router
  now <- IO UTCTime
getCurrentTime
  addPeer (gsnRouter node) pid GossipSubPeer False now
  -- Read loop
  readLoop
  -- Cleanup on disconnect
  removePeer (gsnRouter node) pid
  atomically $ modifyTVar' (gsnStreams node) (Map.delete pid)
  where
    readLoop :: IO ()
readLoop = do
      result <- StreamIO -> Int -> IO (Either String RPC)
readRPCMessage StreamIO
stream Int
maxRPCSize
      case result of
        Left String
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()  -- Error/EOF: stop loop
        Right RPC
rpc -> do
          GossipSubRouter -> PeerId -> RPC -> IO ()
handleRPC (GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node) PeerId
pid RPC
rpc
          IO ()
readLoop

-- | Start the GossipSub node: register stream handler, notifier, and start heartbeat.
startGossipSub :: GossipSubNode -> IO ()
startGossipSub :: GossipSubNode -> IO ()
startGossipSub GossipSubNode
node = do
  -- Register inbound stream handler on Switch
  Switch -> ProtocolId -> (StreamIO -> PeerId -> IO ()) -> IO ()
setStreamHandler (GossipSubNode -> Switch
gsnSwitch GossipSubNode
node) ProtocolId
gossipSubProtocolId
    (GossipSubNode -> StreamIO -> PeerId -> IO ()
handleGossipSubStream GossipSubNode
node)
  -- Register connection notifier to auto-open GossipSub streams to new peers
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar [Connection -> IO ()]
-> ([Connection -> IO ()] -> [Connection -> IO ()]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (Switch -> TVar [Connection -> IO ()]
swNotifiers (GossipSubNode -> Switch
gsnSwitch GossipSubNode
node))
    (GossipSubNode -> Connection -> IO ()
onNewConnection GossipSubNode
node (Connection -> IO ())
-> [Connection -> IO ()] -> [Connection -> IO ()]
forall a. a -> [a] -> [a]
:)
  -- Start heartbeat background thread
  hbAsync <- GossipSubRouter -> IO (Async ())
runHeartbeat (GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node)
  atomically $ writeTVar (gsnHeartbeat node) (Just hbAsync)

-- | Called on new connection: open a GossipSub stream to the peer.
-- Caches the stream for outbound writes and starts a read loop
-- on it to receive RPCs sent back by the remote peer (e.g. subscriptions).
onNewConnection :: GossipSubNode -> Connection -> IO ()
onNewConnection :: GossipSubNode -> Connection -> IO ()
onNewConnection GossipSubNode
node Connection
conn = do
  let pid :: PeerId
pid = Connection -> PeerId
connPeerId Connection
conn
  -- Open a mux stream and negotiate GossipSub protocol
  mStream <- Connection -> IO (Maybe StreamIO)
openAndNegotiate Connection
conn
  case mStream of
    Maybe StreamIO
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()  -- Peer doesn't support GossipSub
    Just StreamIO
stream -> do
      -- Cache the outbound stream
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map PeerId StreamIO)
-> (Map PeerId StreamIO -> Map PeerId StreamIO) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (GossipSubNode -> TVar (Map PeerId StreamIO)
gsnStreams GossipSubNode
node) (PeerId -> StreamIO -> Map PeerId StreamIO -> Map PeerId StreamIO
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert PeerId
pid StreamIO
stream)
      -- Register peer
      now <- IO UTCTime
getCurrentTime
      addPeer (gsnRouter node) pid GossipSubPeer True now
      -- Send current subscriptions to the new peer
      sendCurrentSubscriptions node stream
      -- Start read loop on this stream to receive RPCs from the peer
      -- (e.g. subscription announcements sent back on the same yamux stream)
      _ <- async $ outboundReadLoop node stream pid
      pure ()

-- | Send current topic subscriptions to a newly connected peer.
-- This ensures peers joining after we've already subscribed still learn
-- about our subscriptions (standard GossipSub behavior).
-- Writes directly to the stream to avoid any routing issues.
sendCurrentSubscriptions :: GossipSubNode -> StreamIO -> IO ()
sendCurrentSubscriptions :: GossipSubNode -> StreamIO -> IO ()
sendCurrentSubscriptions GossipSubNode
node StreamIO
stream = do
  let router :: GossipSubRouter
router = GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node
  meshTopics <- STM (Set ProtocolId) -> IO (Set ProtocolId)
forall a. STM a -> IO a
atomically (STM (Set ProtocolId) -> IO (Set ProtocolId))
-> STM (Set ProtocolId) -> IO (Set ProtocolId)
forall a b. (a -> b) -> a -> b
$ Map ProtocolId (Set PeerId) -> Set ProtocolId
forall k a. Map k a -> Set k
Map.keysSet (Map ProtocolId (Set PeerId) -> Set ProtocolId)
-> STM (Map ProtocolId (Set PeerId)) -> STM (Set ProtocolId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar (Map ProtocolId (Set PeerId))
-> STM (Map ProtocolId (Set PeerId))
forall a. TVar a -> STM a
readTVar (GossipSubRouter -> TVar (Map ProtocolId (Set PeerId))
gsMesh GossipSubRouter
router)
  let topics = Set ProtocolId -> [ProtocolId]
forall a. Set a -> [a]
Set.toList Set ProtocolId
meshTopics
  if null topics
    then pure ()
    else do
      let subRPC = RPC
emptyRPC
            { rpcSubscriptions = map (\ProtocolId
t -> Bool -> ProtocolId -> SubOpts
SubOpts Bool
True ProtocolId
t) topics }
      _ <- trySend stream subRPC
      pure ()

-- | Read loop on the outbound stream.
-- Handles RPCs sent back by the remote peer on the same yamux stream
-- (e.g. subscription announcements). Does NOT remove the peer on
-- EOF since the inbound handler or another mechanism manages peer lifecycle.
outboundReadLoop :: GossipSubNode -> StreamIO -> PeerId -> IO ()
outboundReadLoop :: GossipSubNode -> StreamIO -> PeerId -> IO ()
outboundReadLoop GossipSubNode
node StreamIO
stream PeerId
pid = IO ()
loop
  where
    loop :: IO ()
loop = do
      result <- StreamIO -> Int -> IO (Either String RPC)
readRPCMessage StreamIO
stream Int
maxRPCSize
      case result of
        Left String
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()  -- EOF or error: stop
        Right RPC
rpc -> do
          GossipSubRouter -> PeerId -> RPC -> IO ()
handleRPC (GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node) PeerId
pid RPC
rpc
          IO ()
loop

-- | Stop the GossipSub node: cancel heartbeat and unregister handler.
stopGossipSub :: GossipSubNode -> IO ()
stopGossipSub :: GossipSubNode -> IO ()
stopGossipSub GossipSubNode
node = do
  -- Cancel heartbeat
  mHb <- STM (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. STM a -> IO a
atomically (STM (Maybe (Async ())) -> IO (Maybe (Async ())))
-> STM (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a b. (a -> b) -> a -> b
$ do
    hb <- TVar (Maybe (Async ())) -> STM (Maybe (Async ()))
forall a. TVar a -> STM a
readTVar (GossipSubNode -> TVar (Maybe (Async ()))
gsnHeartbeat GossipSubNode
node)
    writeTVar (gsnHeartbeat node) Nothing
    pure hb
  case mHb of
    Just Async ()
hbAsync -> Async () -> IO ()
forall a. Async a -> IO ()
cancel Async ()
hbAsync IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
_ :: SomeException) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
    Maybe (Async ())
Nothing -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  -- Unregister stream handler
  removeStreamHandler (gsnSwitch node) gossipSubProtocolId

-- | Subscribe to a topic.
gossipJoin :: GossipSubNode -> Topic -> IO ()
gossipJoin :: GossipSubNode -> ProtocolId -> IO ()
gossipJoin GossipSubNode
node ProtocolId
topic = GossipSubRouter -> ProtocolId -> IO ()
join (GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node) ProtocolId
topic

-- | Unsubscribe from a topic.
gossipLeave :: GossipSubNode -> Topic -> IO ()
gossipLeave :: GossipSubNode -> ProtocolId -> IO ()
gossipLeave GossipSubNode
node ProtocolId
topic = GossipSubRouter -> ProtocolId -> IO ()
leave (GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node) ProtocolId
topic

-- | Publish a message to a topic (signed with the Switch's identity key).
gossipPublish :: GossipSubNode -> Topic -> ByteString -> IO ()
gossipPublish :: GossipSubNode -> ProtocolId -> ByteString -> IO ()
gossipPublish GossipSubNode
node ProtocolId
topic ByteString
payload =
  GossipSubRouter
-> ProtocolId -> ByteString -> Maybe KeyPair -> IO ()
publish (GossipSubNode -> GossipSubRouter
gsnRouter GossipSubNode
node) ProtocolId
topic ByteString
payload (KeyPair -> Maybe KeyPair
forall a. a -> Maybe a
Just (Switch -> KeyPair
swIdentityKey (GossipSubNode -> Switch
gsnSwitch GossipSubNode
node)))