module Network.LibP2P.Switch.Dial
(
dial
, checkBackoff
, recordBackoff
, clearBackoff
, initialBackoffSeconds
, maxBackoffSeconds
, staggerDelayUs
) where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (Async, async, cancel, waitAnyCatch)
import Control.Concurrent.STM
( STM
, TMVar
, TVar
, atomically
, newEmptyTMVar
, putTMVar
, readTMVar
, readTVar
, writeTVar
)
import Control.Exception (SomeException)
import Control.Monad (forM, when)
import Data.List (find)
import qualified Data.Map.Strict as Map
import Data.Time.Clock (NominalDiffTime, addUTCTime, getCurrentTime)
import Network.LibP2P.Crypto.PeerId (PeerId)
import Network.LibP2P.Multiaddr.Multiaddr (Multiaddr)
import Network.LibP2P.Switch.ConnPool (addConn, lookupConn)
import Network.LibP2P.Switch.Listen (streamAcceptLoop)
import Network.LibP2P.Switch.ResourceManager (Direction (..), releaseConnection, reserveConnection)
import Network.LibP2P.Switch.Types
( BackoffEntry (..)
, Connection (..)
, DialError (..)
, MuxerSession (..)
, Switch (..)
)
import Network.LibP2P.Switch.Upgrade (upgradeOutbound)
import Network.LibP2P.Transport.Transport (Transport (..))
initialBackoffSeconds :: NominalDiffTime
initialBackoffSeconds :: NominalDiffTime
initialBackoffSeconds = NominalDiffTime
5
maxBackoffSeconds :: NominalDiffTime
maxBackoffSeconds :: NominalDiffTime
maxBackoffSeconds = NominalDiffTime
300
staggerDelayUs :: Int
staggerDelayUs :: Int
staggerDelayUs = Int
250000
checkBackoff :: TVar (Map.Map PeerId BackoffEntry) -> PeerId -> IO (Either DialError ())
checkBackoff :: TVar (Map PeerId BackoffEntry)
-> PeerId -> IO (Either DialError ())
checkBackoff TVar (Map PeerId BackoffEntry)
backoffsVar PeerId
pid = do
now <- IO UTCTime
getCurrentTime
atomically $ do
boffs <- readTVar backoffsVar
case Map.lookup pid boffs of
Maybe BackoffEntry
Nothing -> Either DialError () -> STM (Either DialError ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either DialError ()
forall a b. b -> Either a b
Right ())
Just BackoffEntry
be
| BackoffEntry -> UTCTime
beExpiry BackoffEntry
be UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
<= UTCTime
now -> do
TVar (Map PeerId BackoffEntry) -> Map PeerId BackoffEntry -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Map PeerId BackoffEntry)
backoffsVar (PeerId -> Map PeerId BackoffEntry -> Map PeerId BackoffEntry
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete PeerId
pid Map PeerId BackoffEntry
boffs)
Either DialError () -> STM (Either DialError ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either DialError ()
forall a b. b -> Either a b
Right ())
| Bool
otherwise -> Either DialError () -> STM (Either DialError ())
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DialError -> Either DialError ()
forall a b. a -> Either a b
Left DialError
DialBackoff)
recordBackoff :: TVar (Map.Map PeerId BackoffEntry) -> PeerId -> IO ()
recordBackoff :: TVar (Map PeerId BackoffEntry) -> PeerId -> IO ()
recordBackoff TVar (Map PeerId BackoffEntry)
backoffsVar PeerId
pid = do
now <- IO UTCTime
getCurrentTime
atomically $ do
boffs <- readTVar backoffsVar
let attempts = case PeerId -> Map PeerId BackoffEntry -> Maybe BackoffEntry
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup PeerId
pid Map PeerId BackoffEntry
boffs of
Maybe BackoffEntry
Nothing -> Int
1
Just BackoffEntry
be -> BackoffEntry -> Int
beAttempts BackoffEntry
be Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
duration = NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Ord a => a -> a -> a
min NominalDiffTime
maxBackoffSeconds
(NominalDiffTime
initialBackoffSeconds NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* Int -> NominalDiffTime
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int
2 Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
attempts Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) :: Int))
entry = BackoffEntry
{ beExpiry :: UTCTime
beExpiry = NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
duration UTCTime
now
, beAttempts :: Int
beAttempts = Int
attempts
}
writeTVar backoffsVar (Map.insert pid entry boffs)
clearBackoff :: TVar (Map.Map PeerId BackoffEntry) -> PeerId -> IO ()
clearBackoff :: TVar (Map PeerId BackoffEntry) -> PeerId -> IO ()
clearBackoff TVar (Map PeerId BackoffEntry)
backoffsVar PeerId
pid = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
boffs <- TVar (Map PeerId BackoffEntry) -> STM (Map PeerId BackoffEntry)
forall a. TVar a -> STM a
readTVar TVar (Map PeerId BackoffEntry)
backoffsVar
writeTVar backoffsVar (Map.delete pid boffs)
data PendingCheck
= JoinExisting !(TMVar (Either DialError Connection))
| StartNew !(TMVar (Either DialError Connection))
dial :: Switch -> PeerId -> [Multiaddr] -> IO (Either DialError Connection)
dial :: Switch -> PeerId -> [Multiaddr] -> IO (Either DialError Connection)
dial Switch
sw PeerId
remotePeerId [Multiaddr]
addrs = do
closed <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (Switch -> TVar Bool
swClosed Switch
sw)
if closed
then pure (Left DialSwitchClosed)
else do
existing <- atomically $ lookupConn (swConnPool sw) remotePeerId
case existing of
Just Connection
conn -> Either DialError Connection -> IO (Either DialError Connection)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Connection -> Either DialError Connection
forall a b. b -> Either a b
Right Connection
conn)
Maybe Connection
Nothing -> do
backoffResult <- TVar (Map PeerId BackoffEntry)
-> PeerId -> IO (Either DialError ())
checkBackoff (Switch -> TVar (Map PeerId BackoffEntry)
swDialBackoffs Switch
sw) PeerId
remotePeerId
case backoffResult of
Left DialError
err -> Either DialError Connection -> IO (Either DialError Connection)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DialError -> Either DialError Connection
forall a b. a -> Either a b
Left DialError
err)
Right () -> do
joinOrCreate <- STM PendingCheck -> IO PendingCheck
forall a. STM a -> IO a
atomically (STM PendingCheck -> IO PendingCheck)
-> STM PendingCheck -> IO PendingCheck
forall a b. (a -> b) -> a -> b
$ Switch -> PeerId -> STM PendingCheck
checkPendingDial Switch
sw PeerId
remotePeerId
case joinOrCreate of
JoinExisting TMVar (Either DialError Connection)
tmvar ->
STM (Either DialError Connection)
-> IO (Either DialError Connection)
forall a. STM a -> IO a
atomically (STM (Either DialError Connection)
-> IO (Either DialError Connection))
-> STM (Either DialError Connection)
-> IO (Either DialError Connection)
forall a b. (a -> b) -> a -> b
$ TMVar (Either DialError Connection)
-> STM (Either DialError Connection)
forall a. TMVar a -> STM a
readTMVar TMVar (Either DialError Connection)
tmvar
StartNew TMVar (Either DialError Connection)
tmvar ->
Switch
-> PeerId
-> [Multiaddr]
-> TMVar (Either DialError Connection)
-> IO (Either DialError Connection)
dialNewAndBroadcast Switch
sw PeerId
remotePeerId [Multiaddr]
addrs TMVar (Either DialError Connection)
tmvar
checkPendingDial :: Switch -> PeerId -> STM PendingCheck
checkPendingDial :: Switch -> PeerId -> STM PendingCheck
checkPendingDial Switch
sw PeerId
pid = do
pending <- TVar (Map PeerId (TMVar (Either DialError Connection)))
-> STM (Map PeerId (TMVar (Either DialError Connection)))
forall a. TVar a -> STM a
readTVar (Switch -> TVar (Map PeerId (TMVar (Either DialError Connection)))
swPendingDials Switch
sw)
case Map.lookup pid pending of
Just TMVar (Either DialError Connection)
tmvar -> PendingCheck -> STM PendingCheck
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMVar (Either DialError Connection) -> PendingCheck
JoinExisting TMVar (Either DialError Connection)
tmvar)
Maybe (TMVar (Either DialError Connection))
Nothing -> do
tmvar <- STM (TMVar (Either DialError Connection))
forall a. STM (TMVar a)
newEmptyTMVar
writeTVar (swPendingDials sw) (Map.insert pid tmvar pending)
pure (StartNew tmvar)
dialNewAndBroadcast
:: Switch -> PeerId -> [Multiaddr]
-> TMVar (Either DialError Connection)
-> IO (Either DialError Connection)
dialNewAndBroadcast :: Switch
-> PeerId
-> [Multiaddr]
-> TMVar (Either DialError Connection)
-> IO (Either DialError Connection)
dialNewAndBroadcast Switch
sw PeerId
remotePeerId [Multiaddr]
addrs TMVar (Either DialError Connection)
tmvar = do
resCheck <- STM (Either ResourceError ()) -> IO (Either ResourceError ())
forall a. STM a -> IO a
atomically (STM (Either ResourceError ()) -> IO (Either ResourceError ()))
-> STM (Either ResourceError ()) -> IO (Either ResourceError ())
forall a b. (a -> b) -> a -> b
$ ResourceManager
-> PeerId -> Direction -> STM (Either ResourceError ())
reserveConnection (Switch -> ResourceManager
swResourceMgr Switch
sw) PeerId
remotePeerId Direction
Outbound
case resCheck of
Left ResourceError
resErr -> do
let result :: Either DialError b
result = DialError -> Either DialError b
forall a b. a -> Either a b
Left (ResourceError -> DialError
DialResourceLimit ResourceError
resErr)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (Either DialError Connection)
-> Either DialError Connection -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Either DialError Connection)
tmvar Either DialError Connection
forall {b}. Either DialError b
result
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
pending <- TVar (Map PeerId (TMVar (Either DialError Connection)))
-> STM (Map PeerId (TMVar (Either DialError Connection)))
forall a. TVar a -> STM a
readTVar (Switch -> TVar (Map PeerId (TMVar (Either DialError Connection)))
swPendingDials Switch
sw)
writeTVar (swPendingDials sw) (Map.delete remotePeerId pending)
Either DialError Connection -> IO (Either DialError Connection)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either DialError Connection
forall {b}. Either DialError b
result
Right () -> do
result <- Switch -> [Multiaddr] -> IO (Either DialError Connection)
dialNewInner Switch
sw [Multiaddr]
addrs
let verified = case Either DialError Connection
result of
Right Connection
conn
| Connection -> PeerId
connPeerId Connection
conn PeerId -> PeerId -> Bool
forall a. Eq a => a -> a -> Bool
/= PeerId
remotePeerId ->
DialError -> Either DialError Connection
forall a b. a -> Either a b
Left (PeerId -> PeerId -> DialError
DialPeerIdMismatch PeerId
remotePeerId (Connection -> PeerId
connPeerId Connection
conn))
Either DialError Connection
_ -> Either DialError Connection
result
atomically $ putTMVar tmvar verified
atomically $ do
pending <- readTVar (swPendingDials sw)
writeTVar (swPendingDials sw) (Map.delete remotePeerId pending)
case verified of
Right Connection
conn -> do
TVar (Map PeerId BackoffEntry) -> PeerId -> IO ()
clearBackoff (Switch -> TVar (Map PeerId BackoffEntry)
swDialBackoffs Switch
sw) PeerId
remotePeerId
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar (Map PeerId [Connection]) -> Connection -> STM ()
addConn (Switch -> TVar (Map PeerId [Connection])
swConnPool Switch
sw) Connection
conn
_ <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Switch -> Connection -> IO ()
streamAcceptLoop Switch
sw Connection
conn
notifiers <- atomically $ readTVar (swNotifiers sw)
mapM_ (\Connection -> IO ()
f -> IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Connection -> IO ()
f Connection
conn) notifiers
pure (Right conn)
Left DialError
_ -> do
case Either DialError Connection
result of
Right Connection
conn -> MuxerSession -> IO ()
muxClose (Connection -> MuxerSession
connSession Connection
conn)
Left DialError
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ ResourceManager -> PeerId -> Direction -> STM ()
releaseConnection (Switch -> ResourceManager
swResourceMgr Switch
sw) PeerId
remotePeerId Direction
Outbound
TVar (Map PeerId BackoffEntry) -> PeerId -> IO ()
recordBackoff (Switch -> TVar (Map PeerId BackoffEntry)
swDialBackoffs Switch
sw) PeerId
remotePeerId
Either DialError Connection -> IO (Either DialError Connection)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either DialError Connection
verified
dialNewInner :: Switch -> [Multiaddr] -> IO (Either DialError Connection)
dialNewInner :: Switch -> [Multiaddr] -> IO (Either DialError Connection)
dialNewInner Switch
_sw [] = Either DialError Connection -> IO (Either DialError Connection)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DialError -> Either DialError Connection
forall a b. a -> Either a b
Left DialError
DialNoAddresses)
dialNewInner Switch
sw [Multiaddr]
addrs = do
transports <- STM [Transport] -> IO [Transport]
forall a. STM a -> IO a
atomically (STM [Transport] -> IO [Transport])
-> STM [Transport] -> IO [Transport]
forall a b. (a -> b) -> a -> b
$ TVar [Transport] -> STM [Transport]
forall a. TVar a -> STM a
readTVar (Switch -> TVar [Transport]
swTransports Switch
sw)
let dialable = (Multiaddr -> Maybe (Multiaddr, Transport))
-> [Multiaddr] -> [(Multiaddr, Transport)]
forall a b. (a -> Maybe b) -> [a] -> [b]
filterMap (\Multiaddr
addr ->
case (Transport -> Bool) -> [Transport] -> Maybe Transport
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find (\Transport
t -> Transport -> Multiaddr -> Bool
transportCanDial Transport
t Multiaddr
addr) [Transport]
transports of
Just Transport
t -> (Multiaddr, Transport) -> Maybe (Multiaddr, Transport)
forall a. a -> Maybe a
Just (Multiaddr
addr, Transport
t)
Maybe Transport
Nothing -> Maybe (Multiaddr, Transport)
forall a. Maybe a
Nothing) [Multiaddr]
addrs
case dialable of
[] -> Either DialError Connection -> IO (Either DialError Connection)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DialError -> Either DialError Connection
forall a b. a -> Either a b
Left (Multiaddr -> DialError
DialNoTransport ([Multiaddr] -> Multiaddr
forall a. HasCallStack => [a] -> a
Prelude.head [Multiaddr]
addrs)))
[(Multiaddr, Transport)]
pairs -> Switch
-> [(Multiaddr, Transport)] -> IO (Either DialError Connection)
staggeredDial Switch
sw [(Multiaddr, Transport)]
pairs
filterMap :: (a -> Maybe b) -> [a] -> [b]
filterMap :: forall a b. (a -> Maybe b) -> [a] -> [b]
filterMap a -> Maybe b
_ [] = []
filterMap a -> Maybe b
f (a
x:[a]
xs) = case a -> Maybe b
f a
x of
Maybe b
Nothing -> (a -> Maybe b) -> [a] -> [b]
forall a b. (a -> Maybe b) -> [a] -> [b]
filterMap a -> Maybe b
f [a]
xs
Just b
y -> b
y b -> [b] -> [b]
forall a. a -> [a] -> [a]
: (a -> Maybe b) -> [a] -> [b]
forall a b. (a -> Maybe b) -> [a] -> [b]
filterMap a -> Maybe b
f [a]
xs
staggeredDial :: Switch -> [(Multiaddr, Transport)] -> IO (Either DialError Connection)
staggeredDial :: Switch
-> [(Multiaddr, Transport)] -> IO (Either DialError Connection)
staggeredDial Switch
sw [(Multiaddr, Transport)]
pairs = do
workers <- [(Int, (Multiaddr, Transport))]
-> ((Int, (Multiaddr, Transport)) -> IO (Async Connection))
-> IO [Async Connection]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM ([Int]
-> [(Multiaddr, Transport)] -> [(Int, (Multiaddr, Transport))]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0 :: Int ..] [(Multiaddr, Transport)]
pairs) (((Int, (Multiaddr, Transport)) -> IO (Async Connection))
-> IO [Async Connection])
-> ((Int, (Multiaddr, Transport)) -> IO (Async Connection))
-> IO [Async Connection]
forall a b. (a -> b) -> a -> b
$ \(Int
i, (Multiaddr
addr, Transport
transport)) ->
IO Connection -> IO (Async Connection)
forall a. IO a -> IO (Async a)
async (IO Connection -> IO (Async Connection))
-> IO Connection -> IO (Async Connection)
forall a b. (a -> b) -> a -> b
$ do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
staggerDelayUs)
rawConn <- Transport -> Multiaddr -> IO RawConnection
transportDial Transport
transport Multiaddr
addr
upgradeOutbound (swIdentityKey sw) rawConn
collectResults workers []
collectResults :: [Async Connection] -> [String] -> IO (Either DialError Connection)
collectResults :: [Async Connection] -> [String] -> IO (Either DialError Connection)
collectResults [] [String]
errs = Either DialError Connection -> IO (Either DialError Connection)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DialError -> Either DialError Connection
forall a b. a -> Either a b
Left ([String] -> DialError
DialAllFailed ([String] -> [String]
forall a. [a] -> [a]
reverse [String]
errs)))
collectResults [Async Connection]
workers [String]
errs = do
(completed, result) <- [Async Connection]
-> IO (Async Connection, Either SomeException Connection)
forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatch [Async Connection]
workers
let remaining = (Async Connection -> Bool)
-> [Async Connection] -> [Async Connection]
forall a. (a -> Bool) -> [a] -> [a]
filter (Async Connection -> Async Connection -> Bool
forall a. Eq a => a -> a -> Bool
/= Async Connection
completed) [Async Connection]
workers
case result of
Right Connection
conn -> do
(Async Connection -> IO ()) -> [Async Connection] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Async Connection -> IO ()
forall a. Async a -> IO ()
cancel [Async Connection]
remaining
Either DialError Connection -> IO (Either DialError Connection)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Connection -> Either DialError Connection
forall a b. b -> Either a b
Right Connection
conn)
Left (SomeException
ex :: SomeException) ->
[Async Connection] -> [String] -> IO (Either DialError Connection)
collectResults [Async Connection]
remaining (SomeException -> String
forall a. Show a => a -> String
show SomeException
ex String -> [String] -> [String]
forall a. a -> [a] -> [a]
: [String]
errs)