module Network.LibP2P.DHT.Lookup
(
LookupResult (..)
, iterativeFindNode
, iterativeGetValue
, iterativeGetProviders
, bootstrap
) where
import Control.Concurrent.Async (mapConcurrently)
import Control.Concurrent.STM
import Control.Exception (SomeException, catch)
import Data.ByteString (ByteString)
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Time (UTCTime, getCurrentTime)
import Network.LibP2P.Crypto.PeerId (PeerId (..))
import Network.LibP2P.DHT.DHT (DHTNode (..), ProviderEntry (..), Validator (..))
import Network.LibP2P.DHT.Distance (peerIdToKey, sortByDistance)
import Network.LibP2P.DHT.Message
import Network.LibP2P.DHT.RoutingTable (closestPeers, insertPeer, allPeers)
import Network.LibP2P.DHT.Types
data LookupResult
= FoundPeers ![BucketEntry]
| FoundValue !DHTRecord ![BucketEntry]
| FoundProviders ![ProviderEntry] ![BucketEntry]
deriving (Int -> LookupResult -> ShowS
[LookupResult] -> ShowS
LookupResult -> String
(Int -> LookupResult -> ShowS)
-> (LookupResult -> String)
-> ([LookupResult] -> ShowS)
-> Show LookupResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LookupResult -> ShowS
showsPrec :: Int -> LookupResult -> ShowS
$cshow :: LookupResult -> String
show :: LookupResult -> String
$cshowList :: [LookupResult] -> ShowS
showList :: [LookupResult] -> ShowS
Show)
iterativeFindNode :: DHTNode -> DHTKey -> IO [BucketEntry]
iterativeFindNode :: DHTNode -> DHTKey -> IO [BucketEntry]
iterativeFindNode DHTNode
node DHTKey
targetKey = do
rt <- TVar RoutingTable -> IO RoutingTable
forall a. TVar a -> IO a
readTVarIO (DHTNode -> TVar RoutingTable
dhtRoutingTable DHTNode
node)
let seeds = DHTKey -> Int -> RoutingTable -> [BucketEntry]
closestPeers DHTKey
targetKey Int
kValue RoutingTable
rt
now <- getCurrentTime
candidatesVar <- newTVarIO (sortByDistance targetKey seeds)
queriedVar <- newTVarIO Set.empty
knownVar <- newTVarIO (Set.fromList (map entryPeerId seeds))
lookupLoop node targetKey candidatesVar queriedVar knownVar now FindNode
lookupLoop
:: DHTNode
-> DHTKey
-> TVar [BucketEntry]
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> a
-> MessageType
-> IO [BucketEntry]
lookupLoop :: forall a.
DHTNode
-> DHTKey
-> TVar [BucketEntry]
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> a
-> MessageType
-> IO [BucketEntry]
lookupLoop DHTNode
node DHTKey
targetKey TVar [BucketEntry]
candidatesVar TVar (Set PeerId)
queriedVar TVar (Set PeerId)
knownVar a
_now MessageType
queryType = IO [BucketEntry]
go
where
go :: IO [BucketEntry]
go = do
toQuery <- STM [BucketEntry] -> IO [BucketEntry]
forall a. STM a -> IO a
atomically (STM [BucketEntry] -> IO [BucketEntry])
-> STM [BucketEntry] -> IO [BucketEntry]
forall a b. (a -> b) -> a -> b
$ do
candidates <- TVar [BucketEntry] -> STM [BucketEntry]
forall a. TVar a -> STM a
readTVar TVar [BucketEntry]
candidatesVar
queried <- readTVar queriedVar
let unqueried = (BucketEntry -> Bool) -> [BucketEntry] -> [BucketEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter (\BucketEntry
e -> Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
queried)) [BucketEntry]
candidates
batch = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
alphaValue [BucketEntry]
unqueried
let newQueried = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
queried ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList ((BucketEntry -> PeerId) -> [BucketEntry] -> [PeerId]
forall a b. (a -> b) -> [a] -> [b]
map BucketEntry -> PeerId
entryPeerId [BucketEntry]
batch))
writeTVar queriedVar newQueried
pure batch
if null toQuery
then do
candidates <- readTVarIO candidatesVar
pure (take kValue candidates)
else do
results <- mapConcurrently (queryPeer node targetKey queryType) toQuery
atomically $ do
known <- readTVar knownVar
candidates <- readTVar candidatesVar
let newPeers = (Either String [DHTPeer] -> [DHTPeer])
-> [Either String [DHTPeer]] -> [DHTPeer]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap ((String -> [DHTPeer])
-> ([DHTPeer] -> [DHTPeer]) -> Either String [DHTPeer] -> [DHTPeer]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ([DHTPeer] -> String -> [DHTPeer]
forall a b. a -> b -> a
const []) [DHTPeer] -> [DHTPeer]
forall a. a -> a
id) [Either String [DHTPeer]]
results
newEntries = (BucketEntry -> Bool) -> [BucketEntry] -> [BucketEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter (\BucketEntry
e -> Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
known))
([BucketEntry] -> [BucketEntry]) -> [BucketEntry] -> [BucketEntry]
forall a b. (a -> b) -> a -> b
$ (DHTPeer -> BucketEntry) -> [DHTPeer] -> [BucketEntry]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString -> DHTPeer -> BucketEntry
forall a. a -> DHTPeer -> BucketEntry
dhtPeerToEntry ByteString
_now) [DHTPeer]
newPeers
newKnown = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
known ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList ((BucketEntry -> PeerId) -> [BucketEntry] -> [PeerId]
forall a b. (a -> b) -> [a] -> [b]
map BucketEntry -> PeerId
entryPeerId [BucketEntry]
newEntries))
merged = DHTKey -> [BucketEntry] -> [BucketEntry]
sortByDistance DHTKey
targetKey ([BucketEntry]
candidates [BucketEntry] -> [BucketEntry] -> [BucketEntry]
forall a. [a] -> [a] -> [a]
++ [BucketEntry]
newEntries)
writeTVar candidatesVar merged
writeTVar knownVar newKnown
shouldContinue <- atomically $ do
candidates <- readTVar candidatesVar
queried <- readTVar queriedVar
let topK = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
kValue [BucketEntry]
candidates
allQueried = (BucketEntry -> Bool) -> [BucketEntry] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\BucketEntry
e -> PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
queried) [BucketEntry]
topK
pure (not allQueried)
if shouldContinue
then go
else do
candidates <- readTVarIO candidatesVar
pure (take kValue candidates)
_now :: ByteString
_now = let DHTKey ByteString
bs = DHTKey
targetKey in ByteString
bs
queryPeer :: DHTNode -> DHTKey -> MessageType -> BucketEntry -> IO (Either String [DHTPeer])
queryPeer :: DHTNode
-> DHTKey
-> MessageType
-> BucketEntry
-> IO (Either String [DHTPeer])
queryPeer DHTNode
node DHTKey
targetKey MessageType
queryType BucketEntry
entry = do
let (DHTKey ByteString
keyBytes) = DHTKey
targetKey
request :: DHTMessage
request = DHTMessage
emptyDHTMessage
{ msgType = queryType
, msgKey = keyBytes
}
result <- (DHTNode -> PeerId -> DHTMessage -> IO (Either String DHTMessage)
dhtSendRequest DHTNode
node) (BucketEntry -> PeerId
entryPeerId BucketEntry
entry) DHTMessage
request
IO (Either String DHTMessage)
-> (SomeException -> IO (Either String DHTMessage))
-> IO (Either String DHTMessage)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
e :: SomeException) -> Either String DHTMessage -> IO (Either String DHTMessage)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (String -> Either String DHTMessage
forall a b. a -> Either a b
Left (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)))
pure $ case result of
Left String
err -> String -> Either String [DHTPeer]
forall a b. a -> Either a b
Left String
err
Right DHTMessage
resp -> [DHTPeer] -> Either String [DHTPeer]
forall a b. b -> Either a b
Right (DHTMessage -> [DHTPeer]
msgCloserPeers DHTMessage
resp)
iterativeGetValue :: DHTNode -> Validator -> ByteString -> IO (Either String DHTRecord)
iterativeGetValue :: DHTNode -> Validator -> ByteString -> IO (Either String DHTRecord)
iterativeGetValue DHTNode
node Validator
validator ByteString
key = do
rt <- TVar RoutingTable -> IO RoutingTable
forall a. TVar a -> IO a
readTVarIO (DHTNode -> TVar RoutingTable
dhtRoutingTable DHTNode
node)
let targetKey = ByteString -> DHTKey
DHTKey ByteString
key
seeds = DHTKey -> Int -> RoutingTable -> [BucketEntry]
closestPeers DHTKey
targetKey Int
kValue RoutingTable
rt
now <- getCurrentTime
candidatesVar <- newTVarIO (sortByDistance targetKey seeds)
queriedVar <- newTVarIO Set.empty
knownVar <- newTVarIO (Set.fromList (map entryPeerId seeds))
bestVar <- newTVarIO (Nothing :: Maybe DHTRecord)
bestPeersVar <- newTVarIO (Set.empty :: Set PeerId)
outdatedVar <- newTVarIO (Set.empty :: Set PeerId)
valueLoop node targetKey candidatesVar queriedVar knownVar bestVar bestPeersVar outdatedVar validator now
valueLoop
:: DHTNode
-> DHTKey
-> TVar [BucketEntry]
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> TVar (Maybe DHTRecord)
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> Validator
-> a
-> IO (Either String DHTRecord)
valueLoop :: forall a.
DHTNode
-> DHTKey
-> TVar [BucketEntry]
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> TVar (Maybe DHTRecord)
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> Validator
-> a
-> IO (Either String DHTRecord)
valueLoop DHTNode
node DHTKey
targetKey TVar [BucketEntry]
candidatesVar TVar (Set PeerId)
queriedVar TVar (Set PeerId)
knownVar TVar (Maybe DHTRecord)
bestVar TVar (Set PeerId)
bestPeersVar TVar (Set PeerId)
outdatedVar Validator
validator a
_now = IO (Either String DHTRecord)
go
where
go :: IO (Either String DHTRecord)
go = do
toQuery <- STM [BucketEntry] -> IO [BucketEntry]
forall a. STM a -> IO a
atomically (STM [BucketEntry] -> IO [BucketEntry])
-> STM [BucketEntry] -> IO [BucketEntry]
forall a b. (a -> b) -> a -> b
$ do
candidates <- TVar [BucketEntry] -> STM [BucketEntry]
forall a. TVar a -> STM a
readTVar TVar [BucketEntry]
candidatesVar
queried <- readTVar queriedVar
let unqueried = (BucketEntry -> Bool) -> [BucketEntry] -> [BucketEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter (\BucketEntry
e -> Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
queried)) [BucketEntry]
candidates
batch = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
alphaValue [BucketEntry]
unqueried
let newQueried = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
queried ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList ((BucketEntry -> PeerId) -> [BucketEntry] -> [PeerId]
forall a b. (a -> b) -> [a] -> [b]
map BucketEntry -> PeerId
entryPeerId [BucketEntry]
batch))
writeTVar queriedVar newQueried
pure batch
if null toQuery
then finalize
else do
results <- mapConcurrently (queryPeerForValue node targetKey) toQuery
mapM_ (processValueResult node targetKey bestVar bestPeersVar outdatedVar validator _now) results
atomically $ do
known <- readTVar knownVar
candidates <- readTVar candidatesVar
let newPeers = ((PeerId, Either String [DHTPeer], Maybe DHTRecord) -> [DHTPeer])
-> [(PeerId, Either String [DHTPeer], Maybe DHTRecord)]
-> [DHTPeer]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\(PeerId
_, Either String [DHTPeer]
peers, Maybe DHTRecord
_) -> (String -> [DHTPeer])
-> ([DHTPeer] -> [DHTPeer]) -> Either String [DHTPeer] -> [DHTPeer]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ([DHTPeer] -> String -> [DHTPeer]
forall a b. a -> b -> a
const []) [DHTPeer] -> [DHTPeer]
forall a. a -> a
id Either String [DHTPeer]
peers) [(PeerId, Either String [DHTPeer], Maybe DHTRecord)]
results
newEntries = (BucketEntry -> Bool) -> [BucketEntry] -> [BucketEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter (\BucketEntry
e -> Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
known))
([BucketEntry] -> [BucketEntry]) -> [BucketEntry] -> [BucketEntry]
forall a b. (a -> b) -> a -> b
$ (DHTPeer -> BucketEntry) -> [DHTPeer] -> [BucketEntry]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString -> DHTPeer -> BucketEntry
forall a. a -> DHTPeer -> BucketEntry
dhtPeerToEntry ByteString
_now) [DHTPeer]
newPeers
newKnown = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
known ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList ((BucketEntry -> PeerId) -> [BucketEntry] -> [PeerId]
forall a b. (a -> b) -> [a] -> [b]
map BucketEntry -> PeerId
entryPeerId [BucketEntry]
newEntries))
merged = DHTKey -> [BucketEntry] -> [BucketEntry]
sortByDistance DHTKey
targetKey ([BucketEntry]
candidates [BucketEntry] -> [BucketEntry] -> [BucketEntry]
forall a. [a] -> [a] -> [a]
++ [BucketEntry]
newEntries)
writeTVar candidatesVar merged
writeTVar knownVar newKnown
shouldContinue <- atomically $ do
candidates <- readTVar candidatesVar
queried <- readTVar queriedVar
let topK = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
kValue [BucketEntry]
candidates
allQueried = (BucketEntry -> Bool) -> [BucketEntry] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\BucketEntry
e -> PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
queried) [BucketEntry]
topK
pure (not allQueried)
if shouldContinue then go else finalize
finalize :: IO (Either String DHTRecord)
finalize = do
best <- TVar (Maybe DHTRecord) -> IO (Maybe DHTRecord)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe DHTRecord)
bestVar
outdated <- readTVarIO outdatedVar
case best of
Maybe DHTRecord
Nothing -> Either String DHTRecord -> IO (Either String DHTRecord)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (String -> Either String DHTRecord
forall a b. a -> Either a b
Left String
"value not found")
Just DHTRecord
rec -> do
let putMsg :: DHTMessage
putMsg = DHTMessage
emptyDHTMessage
{ msgType = PutValue
, msgKey = recKey rec
, msgRecord = Just rec
}
(PeerId -> IO (Either String DHTMessage)) -> [PeerId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\PeerId
pid -> (DHTNode -> PeerId -> DHTMessage -> IO (Either String DHTMessage)
dhtSendRequest DHTNode
node) PeerId
pid DHTMessage
putMsg
IO (Either String DHTMessage)
-> (SomeException -> IO (Either String DHTMessage))
-> IO (Either String DHTMessage)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
_ :: SomeException) -> Either String DHTMessage -> IO (Either String DHTMessage)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (String -> Either String DHTMessage
forall a b. a -> Either a b
Left String
"repair failed")))
(Set PeerId -> [PeerId]
forall a. Set a -> [a]
Set.toList Set PeerId
outdated)
Either String DHTRecord -> IO (Either String DHTRecord)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DHTRecord -> Either String DHTRecord
forall a b. b -> Either a b
Right DHTRecord
rec)
_now :: ByteString
_now = let DHTKey ByteString
bs = DHTKey
targetKey in ByteString
bs
queryPeerForValue :: DHTNode -> DHTKey -> BucketEntry
-> IO (PeerId, Either String [DHTPeer], Maybe DHTRecord)
queryPeerForValue :: DHTNode
-> DHTKey
-> BucketEntry
-> IO (PeerId, Either String [DHTPeer], Maybe DHTRecord)
queryPeerForValue DHTNode
node DHTKey
targetKey BucketEntry
entry = do
let (DHTKey ByteString
keyBytes) = DHTKey
targetKey
request :: DHTMessage
request = DHTMessage
emptyDHTMessage { msgType = GetValue, msgKey = keyBytes }
result <- (DHTNode -> PeerId -> DHTMessage -> IO (Either String DHTMessage)
dhtSendRequest DHTNode
node) (BucketEntry -> PeerId
entryPeerId BucketEntry
entry) DHTMessage
request
IO (Either String DHTMessage)
-> (SomeException -> IO (Either String DHTMessage))
-> IO (Either String DHTMessage)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
e :: SomeException) -> Either String DHTMessage -> IO (Either String DHTMessage)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (String -> Either String DHTMessage
forall a b. a -> Either a b
Left (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)))
pure $ case result of
Left String
err -> (BucketEntry -> PeerId
entryPeerId BucketEntry
entry, String -> Either String [DHTPeer]
forall a b. a -> Either a b
Left String
err, Maybe DHTRecord
forall a. Maybe a
Nothing)
Right DHTMessage
resp -> (BucketEntry -> PeerId
entryPeerId BucketEntry
entry, [DHTPeer] -> Either String [DHTPeer]
forall a b. b -> Either a b
Right (DHTMessage -> [DHTPeer]
msgCloserPeers DHTMessage
resp), DHTMessage -> Maybe DHTRecord
msgRecord DHTMessage
resp)
processValueResult
:: DHTNode -> DHTKey
-> TVar (Maybe DHTRecord)
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> Validator
-> a
-> (PeerId, Either String [DHTPeer], Maybe DHTRecord)
-> IO ()
processValueResult :: forall a.
DHTNode
-> DHTKey
-> TVar (Maybe DHTRecord)
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> Validator
-> a
-> (PeerId, Either String [DHTPeer], Maybe DHTRecord)
-> IO ()
processValueResult DHTNode
_ DHTKey
_ TVar (Maybe DHTRecord)
bestVar TVar (Set PeerId)
bestPeersVar TVar (Set PeerId)
outdatedVar Validator
validator a
_ (PeerId
pid, Either String [DHTPeer]
_, Just DHTRecord
rec) = do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
best <- TVar (Maybe DHTRecord) -> STM (Maybe DHTRecord)
forall a. TVar a -> STM a
readTVar TVar (Maybe DHTRecord)
bestVar
case best of
Maybe DHTRecord
Nothing -> do
TVar (Maybe DHTRecord) -> Maybe DHTRecord -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe DHTRecord)
bestVar (DHTRecord -> Maybe DHTRecord
forall a. a -> Maybe a
Just DHTRecord
rec)
TVar (Set PeerId) -> Set PeerId -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set PeerId)
bestPeersVar (PeerId -> Set PeerId
forall a. a -> Set a
Set.singleton PeerId
pid)
Just DHTRecord
currentBest -> do
case Validator -> ByteString -> [ByteString] -> Either String Int
valSelect Validator
validator (DHTRecord -> ByteString
recKey DHTRecord
rec) [DHTRecord -> ByteString
recValue DHTRecord
currentBest, DHTRecord -> ByteString
recValue DHTRecord
rec] of
Right Int
0 -> do
TVar (Set PeerId) -> (Set PeerId -> Set PeerId) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set PeerId)
outdatedVar (PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => a -> Set a -> Set a
Set.insert PeerId
pid)
Right Int
1 -> do
oldBestPeers <- TVar (Set PeerId) -> STM (Set PeerId)
forall a. TVar a -> STM a
readTVar TVar (Set PeerId)
bestPeersVar
modifyTVar' outdatedVar (Set.union oldBestPeers)
writeTVar bestVar (Just rec)
writeTVar bestPeersVar (Set.singleton pid)
Either String Int
_ -> do
TVar (Set PeerId) -> (Set PeerId -> Set PeerId) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set PeerId)
bestPeersVar (PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => a -> Set a -> Set a
Set.insert PeerId
pid)
processValueResult DHTNode
_ DHTKey
_ TVar (Maybe DHTRecord)
_ TVar (Set PeerId)
_ TVar (Set PeerId)
_ Validator
_ a
_ (PeerId
_, Either String [DHTPeer]
_, Maybe DHTRecord
Nothing) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
iterativeGetProviders :: DHTNode -> ByteString -> IO [ProviderEntry]
iterativeGetProviders :: DHTNode -> ByteString -> IO [ProviderEntry]
iterativeGetProviders DHTNode
node ByteString
key = do
rt <- TVar RoutingTable -> IO RoutingTable
forall a. TVar a -> IO a
readTVarIO (DHTNode -> TVar RoutingTable
dhtRoutingTable DHTNode
node)
let targetKey = ByteString -> DHTKey
DHTKey ByteString
key
seeds = DHTKey -> Int -> RoutingTable -> [BucketEntry]
closestPeers DHTKey
targetKey Int
kValue RoutingTable
rt
now <- getCurrentTime
candidatesVar <- newTVarIO (sortByDistance targetKey seeds)
queriedVar <- newTVarIO Set.empty
knownVar <- newTVarIO (Set.fromList (map entryPeerId seeds))
providersVar <- newTVarIO ([] :: [ProviderEntry])
providerLoop node targetKey candidatesVar queriedVar knownVar providersVar now
providerLoop
:: DHTNode
-> DHTKey
-> TVar [BucketEntry]
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> TVar [ProviderEntry]
-> a
-> IO [ProviderEntry]
providerLoop :: forall a.
DHTNode
-> DHTKey
-> TVar [BucketEntry]
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> TVar [ProviderEntry]
-> a
-> IO [ProviderEntry]
providerLoop DHTNode
node DHTKey
targetKey TVar [BucketEntry]
candidatesVar TVar (Set PeerId)
queriedVar TVar (Set PeerId)
knownVar TVar [ProviderEntry]
providersVar a
_now = IO [ProviderEntry]
go
where
go :: IO [ProviderEntry]
go = do
toQuery <- STM [BucketEntry] -> IO [BucketEntry]
forall a. STM a -> IO a
atomically (STM [BucketEntry] -> IO [BucketEntry])
-> STM [BucketEntry] -> IO [BucketEntry]
forall a b. (a -> b) -> a -> b
$ do
candidates <- TVar [BucketEntry] -> STM [BucketEntry]
forall a. TVar a -> STM a
readTVar TVar [BucketEntry]
candidatesVar
queried <- readTVar queriedVar
let unqueried = (BucketEntry -> Bool) -> [BucketEntry] -> [BucketEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter (\BucketEntry
e -> Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
queried)) [BucketEntry]
candidates
batch = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
alphaValue [BucketEntry]
unqueried
let newQueried = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
queried ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList ((BucketEntry -> PeerId) -> [BucketEntry] -> [PeerId]
forall a b. (a -> b) -> [a] -> [b]
map BucketEntry -> PeerId
entryPeerId [BucketEntry]
batch))
writeTVar queriedVar newQueried
pure batch
if null toQuery
then readTVarIO providersVar
else do
results <- mapConcurrently (queryPeerForProviders node targetKey) toQuery
atomically $ do
known <- readTVar knownVar
candidates <- readTVar candidatesVar
currentProviders <- readTVar providersVar
let allCloser = ((PeerId, Either String [DHTPeer], [DHTPeer]) -> [DHTPeer])
-> [(PeerId, Either String [DHTPeer], [DHTPeer])] -> [DHTPeer]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\(PeerId
_, Either String [DHTPeer]
closer, [DHTPeer]
_) -> (String -> [DHTPeer])
-> ([DHTPeer] -> [DHTPeer]) -> Either String [DHTPeer] -> [DHTPeer]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ([DHTPeer] -> String -> [DHTPeer]
forall a b. a -> b -> a
const []) [DHTPeer] -> [DHTPeer]
forall a. a -> a
id Either String [DHTPeer]
closer) [(PeerId, Either String [DHTPeer], [DHTPeer])]
results
allProviders = ((PeerId, Either String [DHTPeer], [DHTPeer]) -> [DHTPeer])
-> [(PeerId, Either String [DHTPeer], [DHTPeer])] -> [DHTPeer]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\(PeerId
_, Either String [DHTPeer]
_, [DHTPeer]
provs) -> [DHTPeer]
provs) [(PeerId, Either String [DHTPeer], [DHTPeer])]
results
newEntries = (BucketEntry -> Bool) -> [BucketEntry] -> [BucketEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter (\BucketEntry
e -> Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
known))
([BucketEntry] -> [BucketEntry]) -> [BucketEntry] -> [BucketEntry]
forall a b. (a -> b) -> a -> b
$ (DHTPeer -> BucketEntry) -> [DHTPeer] -> [BucketEntry]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString -> DHTPeer -> BucketEntry
forall a. a -> DHTPeer -> BucketEntry
dhtPeerToEntry ByteString
_now) [DHTPeer]
allCloser
newKnown = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
known ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList ((BucketEntry -> PeerId) -> [BucketEntry] -> [PeerId]
forall a b. (a -> b) -> [a] -> [b]
map BucketEntry -> PeerId
entryPeerId [BucketEntry]
newEntries))
merged = DHTKey -> [BucketEntry] -> [BucketEntry]
sortByDistance DHTKey
targetKey ([BucketEntry]
candidates [BucketEntry] -> [BucketEntry] -> [BucketEntry]
forall a. [a] -> [a] -> [a]
++ [BucketEntry]
newEntries)
newProviderEntries = (DHTPeer -> ProviderEntry) -> [DHTPeer] -> [ProviderEntry]
forall a b. (a -> b) -> [a] -> [b]
map DHTPeer -> ProviderEntry
dhtPeerToProvider [DHTPeer]
allProviders
writeTVar candidatesVar merged
writeTVar knownVar newKnown
writeTVar providersVar (currentProviders ++ newProviderEntries)
shouldContinue <- atomically $ do
candidates <- readTVar candidatesVar
queried <- readTVar queriedVar
let topK = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
kValue [BucketEntry]
candidates
allQueried = (BucketEntry -> Bool) -> [BucketEntry] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\BucketEntry
e -> PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
queried) [BucketEntry]
topK
pure (not allQueried)
if shouldContinue then go else readTVarIO providersVar
_now :: ByteString
_now = let DHTKey ByteString
bs = DHTKey
targetKey in ByteString
bs
queryPeerForProviders :: DHTNode -> DHTKey -> BucketEntry
-> IO (PeerId, Either String [DHTPeer], [DHTPeer])
queryPeerForProviders :: DHTNode
-> DHTKey
-> BucketEntry
-> IO (PeerId, Either String [DHTPeer], [DHTPeer])
queryPeerForProviders DHTNode
node DHTKey
targetKey BucketEntry
entry = do
let (DHTKey ByteString
keyBytes) = DHTKey
targetKey
request :: DHTMessage
request = DHTMessage
emptyDHTMessage { msgType = GetProviders, msgKey = keyBytes }
result <- (DHTNode -> PeerId -> DHTMessage -> IO (Either String DHTMessage)
dhtSendRequest DHTNode
node) (BucketEntry -> PeerId
entryPeerId BucketEntry
entry) DHTMessage
request
IO (Either String DHTMessage)
-> (SomeException -> IO (Either String DHTMessage))
-> IO (Either String DHTMessage)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
e :: SomeException) -> Either String DHTMessage -> IO (Either String DHTMessage)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (String -> Either String DHTMessage
forall a b. a -> Either a b
Left (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)))
pure $ case result of
Left String
err -> (BucketEntry -> PeerId
entryPeerId BucketEntry
entry, String -> Either String [DHTPeer]
forall a b. a -> Either a b
Left String
err, [])
Right DHTMessage
resp -> (BucketEntry -> PeerId
entryPeerId BucketEntry
entry, [DHTPeer] -> Either String [DHTPeer]
forall a b. b -> Either a b
Right (DHTMessage -> [DHTPeer]
msgCloserPeers DHTMessage
resp), DHTMessage -> [DHTPeer]
msgProviderPeers DHTMessage
resp)
bootstrap :: DHTNode -> [PeerId] -> IO ()
bootstrap :: DHTNode -> [PeerId] -> IO ()
bootstrap DHTNode
node [PeerId]
seeds = do
now <- IO UTCTime
getCurrentTime
rt <- readTVarIO (dhtRoutingTable node)
let seedEntries = (PeerId -> BucketEntry) -> [PeerId] -> [BucketEntry]
forall a b. (a -> b) -> [a] -> [b]
map (\PeerId
pid -> PeerId
-> DHTKey
-> [Multiaddr]
-> UTCTime
-> ConnectionType
-> BucketEntry
BucketEntry PeerId
pid (PeerId -> DHTKey
peerIdToKey PeerId
pid) [] UTCTime
now ConnectionType
NotConnected) [PeerId]
seeds
rt' = (RoutingTable -> BucketEntry -> RoutingTable)
-> RoutingTable -> [BucketEntry] -> RoutingTable
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl (\RoutingTable
r BucketEntry
e -> (RoutingTable, InsertResult) -> RoutingTable
forall a b. (a, b) -> a
fst (BucketEntry -> RoutingTable -> (RoutingTable, InsertResult)
insertPeer BucketEntry
e RoutingTable
r)) RoutingTable
rt [BucketEntry]
seedEntries
atomically $ writeTVar (dhtRoutingTable node) rt'
_ <- iterativeFindNode node (dhtLocalKey node)
rt'' <- readTVarIO (dhtRoutingTable node)
let peers = RoutingTable -> [BucketEntry]
allPeers RoutingTable
rt''
let bucketReps = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
10 [BucketEntry]
peers
mapM_ (\BucketEntry
entry -> DHTNode -> DHTKey -> IO [BucketEntry]
iterativeFindNode DHTNode
node (BucketEntry -> DHTKey
entryKey BucketEntry
entry)
IO [BucketEntry]
-> (SomeException -> IO [BucketEntry]) -> IO [BucketEntry]
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
_ :: SomeException) -> [BucketEntry] -> IO [BucketEntry]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []))
bucketReps
dhtPeerToEntry :: a -> DHTPeer -> BucketEntry
dhtPeerToEntry :: forall a. a -> DHTPeer -> BucketEntry
dhtPeerToEntry a
_ DHTPeer
peer = BucketEntry
{ entryPeerId :: PeerId
entryPeerId = ByteString -> PeerId
PeerId (DHTPeer -> ByteString
dhtPeerId DHTPeer
peer)
, entryKey :: DHTKey
entryKey = PeerId -> DHTKey
peerIdToKey (ByteString -> PeerId
PeerId (DHTPeer -> ByteString
dhtPeerId DHTPeer
peer))
, entryAddrs :: [Multiaddr]
entryAddrs = []
, entryLastSeen :: UTCTime
entryLastSeen = UTCTime
epochTime
, entryConnType :: ConnectionType
entryConnType = DHTPeer -> ConnectionType
dhtPeerConnType DHTPeer
peer
}
dhtPeerToProvider :: DHTPeer -> ProviderEntry
dhtPeerToProvider :: DHTPeer -> ProviderEntry
dhtPeerToProvider DHTPeer
peer = ProviderEntry
{ peProvider :: PeerId
peProvider = ByteString -> PeerId
PeerId (DHTPeer -> ByteString
dhtPeerId DHTPeer
peer)
, peAddrs :: [Multiaddr]
peAddrs = []
, peTimestamp :: UTCTime
peTimestamp = UTCTime
epochTime
}
epochTime :: UTCTime
epochTime :: UTCTime
epochTime = String -> UTCTime
forall a. Read a => String -> a
read String
"2000-01-01 00:00:00 UTC"