-- | Iterative lookup algorithms for the Kademlia DHT.
--
-- Implements FIND_NODE, GET_VALUE, and GET_PROVIDERS iterative lookups
-- per docs/09-dht.md. Uses STM for shared state and async for concurrent
-- queries (alpha=10 parallelism).
--
-- Candidates are maintained as a list sorted by XOR distance to the
-- target key, ensuring the closest peers are always queried first.
--
-- Bootstrap performs a self-lookup followed by per-bucket random refresh.
module Network.LibP2P.DHT.Lookup
  ( -- * Lookup results
    LookupResult (..)
    -- * Iterative lookups
  , iterativeFindNode
  , iterativeGetValue
  , iterativeGetProviders
    -- * Bootstrap
  , bootstrap
  ) where

import Control.Concurrent.Async (mapConcurrently)
import Control.Concurrent.STM
import Control.Exception (SomeException, catch)
import Data.ByteString (ByteString)
import Data.Set (Set)
import qualified Data.Set as Set
import Data.Time (UTCTime, getCurrentTime)
import Network.LibP2P.Crypto.PeerId (PeerId (..))
import Network.LibP2P.DHT.DHT (DHTNode (..), ProviderEntry (..), Validator (..))
import Network.LibP2P.DHT.Distance (peerIdToKey, sortByDistance)
import Network.LibP2P.DHT.Message
import Network.LibP2P.DHT.RoutingTable (closestPeers, insertPeer, allPeers)
import Network.LibP2P.DHT.Types

-- | Result of an iterative lookup.
data LookupResult
  = FoundPeers ![BucketEntry]
  | FoundValue !DHTRecord ![BucketEntry]
  | FoundProviders ![ProviderEntry] ![BucketEntry]
  deriving (Int -> LookupResult -> ShowS
[LookupResult] -> ShowS
LookupResult -> String
(Int -> LookupResult -> ShowS)
-> (LookupResult -> String)
-> ([LookupResult] -> ShowS)
-> Show LookupResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> LookupResult -> ShowS
showsPrec :: Int -> LookupResult -> ShowS
$cshow :: LookupResult -> String
show :: LookupResult -> String
$cshowList :: [LookupResult] -> ShowS
showList :: [LookupResult] -> ShowS
Show)

-- | Iterative FIND_NODE: find the k closest peers to a target key.
--
-- Algorithm:
-- 1. Seed candidates with k closest from local routing table
-- 2. Query up to alpha unqueried candidates in parallel
-- 3. Merge returned closerPeers into candidates
-- 4. Terminate when top-k candidates all queried or no unqueried remain
iterativeFindNode :: DHTNode -> DHTKey -> IO [BucketEntry]
iterativeFindNode :: DHTNode -> DHTKey -> IO [BucketEntry]
iterativeFindNode DHTNode
node DHTKey
targetKey = do
  rt <- TVar RoutingTable -> IO RoutingTable
forall a. TVar a -> IO a
readTVarIO (DHTNode -> TVar RoutingTable
dhtRoutingTable DHTNode
node)
  let seeds = DHTKey -> Int -> RoutingTable -> [BucketEntry]
closestPeers DHTKey
targetKey Int
kValue RoutingTable
rt
  now <- getCurrentTime

  -- State: candidates sorted by XOR distance, queried set, known set (for dedup)
  candidatesVar <- newTVarIO (sortByDistance targetKey seeds)
  queriedVar    <- newTVarIO Set.empty
  knownVar      <- newTVarIO (Set.fromList (map entryPeerId seeds))

  lookupLoop node targetKey candidatesVar queriedVar knownVar now FindNode

-- | Core lookup loop shared by FIND_NODE, GET_VALUE, GET_PROVIDERS.
lookupLoop
  :: DHTNode
  -> DHTKey
  -> TVar [BucketEntry]   -- ^ Candidates sorted by XOR distance to target
  -> TVar (Set PeerId)    -- ^ Already queried peers
  -> TVar (Set PeerId)    -- ^ Known peers (all candidates ever seen, for dedup)
  -> a                    -- ^ Timestamp placeholder (UTCTime)
  -> MessageType          -- ^ Query type
  -> IO [BucketEntry]
lookupLoop :: forall a.
DHTNode
-> DHTKey
-> TVar [BucketEntry]
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> a
-> MessageType
-> IO [BucketEntry]
lookupLoop DHTNode
node DHTKey
targetKey TVar [BucketEntry]
candidatesVar TVar (Set PeerId)
queriedVar TVar (Set PeerId)
knownVar a
_now MessageType
queryType = IO [BucketEntry]
go
  where
    go :: IO [BucketEntry]
go = do
      -- Pick up to alpha unqueried candidates closest to target
      toQuery <- STM [BucketEntry] -> IO [BucketEntry]
forall a. STM a -> IO a
atomically (STM [BucketEntry] -> IO [BucketEntry])
-> STM [BucketEntry] -> IO [BucketEntry]
forall a b. (a -> b) -> a -> b
$ do
        candidates <- TVar [BucketEntry] -> STM [BucketEntry]
forall a. TVar a -> STM a
readTVar TVar [BucketEntry]
candidatesVar
        queried <- readTVar queriedVar
        let unqueried = (BucketEntry -> Bool) -> [BucketEntry] -> [BucketEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter (\BucketEntry
e -> Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
queried)) [BucketEntry]
candidates
            batch = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
alphaValue [BucketEntry]
unqueried
        -- Mark them as queried
        let newQueried = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
queried ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList ((BucketEntry -> PeerId) -> [BucketEntry] -> [PeerId]
forall a b. (a -> b) -> [a] -> [b]
map BucketEntry -> PeerId
entryPeerId [BucketEntry]
batch))
        writeTVar queriedVar newQueried
        pure batch

      if null toQuery
        then do
          -- No more unqueried candidates -> return top k
          candidates <- readTVarIO candidatesVar
          pure (take kValue candidates)
        else do
          -- Query each peer in parallel
          results <- mapConcurrently (queryPeer node targetKey queryType) toQuery

          -- Merge results
          atomically $ do
            known <- readTVar knownVar
            candidates <- readTVar candidatesVar
            let newPeers = (Either String [DHTPeer] -> [DHTPeer])
-> [Either String [DHTPeer]] -> [DHTPeer]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap ((String -> [DHTPeer])
-> ([DHTPeer] -> [DHTPeer]) -> Either String [DHTPeer] -> [DHTPeer]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ([DHTPeer] -> String -> [DHTPeer]
forall a b. a -> b -> a
const []) [DHTPeer] -> [DHTPeer]
forall a. a -> a
id) [Either String [DHTPeer]]
results
                -- Convert DHTPeers to BucketEntries, excluding already known
                newEntries = (BucketEntry -> Bool) -> [BucketEntry] -> [BucketEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter (\BucketEntry
e -> Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
known))
                           ([BucketEntry] -> [BucketEntry]) -> [BucketEntry] -> [BucketEntry]
forall a b. (a -> b) -> a -> b
$ (DHTPeer -> BucketEntry) -> [DHTPeer] -> [BucketEntry]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString -> DHTPeer -> BucketEntry
forall a. a -> DHTPeer -> BucketEntry
dhtPeerToEntry ByteString
_now) [DHTPeer]
newPeers
                -- Mark new entries as known
                newKnown = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
known ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList ((BucketEntry -> PeerId) -> [BucketEntry] -> [PeerId]
forall a b. (a -> b) -> [a] -> [b]
map BucketEntry -> PeerId
entryPeerId [BucketEntry]
newEntries))
                -- Merge and re-sort by XOR distance
                merged = DHTKey -> [BucketEntry] -> [BucketEntry]
sortByDistance DHTKey
targetKey ([BucketEntry]
candidates [BucketEntry] -> [BucketEntry] -> [BucketEntry]
forall a. [a] -> [a] -> [a]
++ [BucketEntry]
newEntries)
            writeTVar candidatesVar merged
            writeTVar knownVar newKnown

          -- Check termination: have we queried top-k?
          shouldContinue <- atomically $ do
            candidates <- readTVar candidatesVar
            queried <- readTVar queriedVar
            let topK = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
kValue [BucketEntry]
candidates
                allQueried = (BucketEntry -> Bool) -> [BucketEntry] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\BucketEntry
e -> PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
queried) [BucketEntry]
topK
            pure (not allQueried)

          if shouldContinue
            then go
            else do
              candidates <- readTVarIO candidatesVar
              pure (take kValue candidates)

    _now :: ByteString
_now = let DHTKey ByteString
bs = DHTKey
targetKey in ByteString
bs  -- placeholder, overridden by caller

-- | Query a single peer and return the closerPeers from the response.
queryPeer :: DHTNode -> DHTKey -> MessageType -> BucketEntry -> IO (Either String [DHTPeer])
queryPeer :: DHTNode
-> DHTKey
-> MessageType
-> BucketEntry
-> IO (Either String [DHTPeer])
queryPeer DHTNode
node DHTKey
targetKey MessageType
queryType BucketEntry
entry = do
  let (DHTKey ByteString
keyBytes) = DHTKey
targetKey
      request :: DHTMessage
request = DHTMessage
emptyDHTMessage
        { msgType = queryType
        , msgKey  = keyBytes
        }
  result <- (DHTNode -> PeerId -> DHTMessage -> IO (Either String DHTMessage)
dhtSendRequest DHTNode
node) (BucketEntry -> PeerId
entryPeerId BucketEntry
entry) DHTMessage
request
    IO (Either String DHTMessage)
-> (SomeException -> IO (Either String DHTMessage))
-> IO (Either String DHTMessage)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
e :: SomeException) -> Either String DHTMessage -> IO (Either String DHTMessage)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (String -> Either String DHTMessage
forall a b. a -> Either a b
Left (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)))
  pure $ case result of
    Left String
err -> String -> Either String [DHTPeer]
forall a b. a -> Either a b
Left String
err
    Right DHTMessage
resp -> [DHTPeer] -> Either String [DHTPeer]
forall a b. b -> Either a b
Right (DHTMessage -> [DHTPeer]
msgCloserPeers DHTMessage
resp)

-- | Iterative GET_VALUE: find a value by key, with convergence repair.
--
-- Same as FIND_NODE but also tracks the best value found and which peers
-- returned it. On completion, sends PUT_VALUE to peers with outdated values.
iterativeGetValue :: DHTNode -> Validator -> ByteString -> IO (Either String DHTRecord)
iterativeGetValue :: DHTNode -> Validator -> ByteString -> IO (Either String DHTRecord)
iterativeGetValue DHTNode
node Validator
validator ByteString
key = do
  rt <- TVar RoutingTable -> IO RoutingTable
forall a. TVar a -> IO a
readTVarIO (DHTNode -> TVar RoutingTable
dhtRoutingTable DHTNode
node)
  let targetKey = ByteString -> DHTKey
DHTKey ByteString
key
      seeds = DHTKey -> Int -> RoutingTable -> [BucketEntry]
closestPeers DHTKey
targetKey Int
kValue RoutingTable
rt
  now <- getCurrentTime

  candidatesVar <- newTVarIO (sortByDistance targetKey seeds)
  queriedVar    <- newTVarIO Set.empty
  knownVar      <- newTVarIO (Set.fromList (map entryPeerId seeds))
  bestVar       <- newTVarIO (Nothing :: Maybe DHTRecord)
  bestPeersVar  <- newTVarIO (Set.empty :: Set PeerId)
  outdatedVar   <- newTVarIO (Set.empty :: Set PeerId)

  valueLoop node targetKey candidatesVar queriedVar knownVar bestVar bestPeersVar outdatedVar validator now

-- | Value lookup loop with best/outdated tracking.
valueLoop
  :: DHTNode
  -> DHTKey
  -> TVar [BucketEntry]
  -> TVar (Set PeerId)
  -> TVar (Set PeerId)    -- ^ Known peers (dedup)
  -> TVar (Maybe DHTRecord)
  -> TVar (Set PeerId)    -- ^ Peers that returned best value
  -> TVar (Set PeerId)    -- ^ Peers with outdated values
  -> Validator
  -> a
  -> IO (Either String DHTRecord)
valueLoop :: forall a.
DHTNode
-> DHTKey
-> TVar [BucketEntry]
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> TVar (Maybe DHTRecord)
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> Validator
-> a
-> IO (Either String DHTRecord)
valueLoop DHTNode
node DHTKey
targetKey TVar [BucketEntry]
candidatesVar TVar (Set PeerId)
queriedVar TVar (Set PeerId)
knownVar TVar (Maybe DHTRecord)
bestVar TVar (Set PeerId)
bestPeersVar TVar (Set PeerId)
outdatedVar Validator
validator a
_now = IO (Either String DHTRecord)
go
  where
    go :: IO (Either String DHTRecord)
go = do
      toQuery <- STM [BucketEntry] -> IO [BucketEntry]
forall a. STM a -> IO a
atomically (STM [BucketEntry] -> IO [BucketEntry])
-> STM [BucketEntry] -> IO [BucketEntry]
forall a b. (a -> b) -> a -> b
$ do
        candidates <- TVar [BucketEntry] -> STM [BucketEntry]
forall a. TVar a -> STM a
readTVar TVar [BucketEntry]
candidatesVar
        queried <- readTVar queriedVar
        let unqueried = (BucketEntry -> Bool) -> [BucketEntry] -> [BucketEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter (\BucketEntry
e -> Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
queried)) [BucketEntry]
candidates
            batch = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
alphaValue [BucketEntry]
unqueried
        let newQueried = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
queried ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList ((BucketEntry -> PeerId) -> [BucketEntry] -> [PeerId]
forall a b. (a -> b) -> [a] -> [b]
map BucketEntry -> PeerId
entryPeerId [BucketEntry]
batch))
        writeTVar queriedVar newQueried
        pure batch

      if null toQuery
        then finalize
        else do
          results <- mapConcurrently (queryPeerForValue node targetKey) toQuery

          -- Process each result
          mapM_ (processValueResult node targetKey bestVar bestPeersVar outdatedVar validator _now) results

          -- Merge closer peers from responses
          atomically $ do
            known <- readTVar knownVar
            candidates <- readTVar candidatesVar
            let newPeers = ((PeerId, Either String [DHTPeer], Maybe DHTRecord) -> [DHTPeer])
-> [(PeerId, Either String [DHTPeer], Maybe DHTRecord)]
-> [DHTPeer]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\(PeerId
_, Either String [DHTPeer]
peers, Maybe DHTRecord
_) -> (String -> [DHTPeer])
-> ([DHTPeer] -> [DHTPeer]) -> Either String [DHTPeer] -> [DHTPeer]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ([DHTPeer] -> String -> [DHTPeer]
forall a b. a -> b -> a
const []) [DHTPeer] -> [DHTPeer]
forall a. a -> a
id Either String [DHTPeer]
peers) [(PeerId, Either String [DHTPeer], Maybe DHTRecord)]
results
                newEntries = (BucketEntry -> Bool) -> [BucketEntry] -> [BucketEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter (\BucketEntry
e -> Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
known))
                           ([BucketEntry] -> [BucketEntry]) -> [BucketEntry] -> [BucketEntry]
forall a b. (a -> b) -> a -> b
$ (DHTPeer -> BucketEntry) -> [DHTPeer] -> [BucketEntry]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString -> DHTPeer -> BucketEntry
forall a. a -> DHTPeer -> BucketEntry
dhtPeerToEntry ByteString
_now) [DHTPeer]
newPeers
                newKnown = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
known ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList ((BucketEntry -> PeerId) -> [BucketEntry] -> [PeerId]
forall a b. (a -> b) -> [a] -> [b]
map BucketEntry -> PeerId
entryPeerId [BucketEntry]
newEntries))
                merged = DHTKey -> [BucketEntry] -> [BucketEntry]
sortByDistance DHTKey
targetKey ([BucketEntry]
candidates [BucketEntry] -> [BucketEntry] -> [BucketEntry]
forall a. [a] -> [a] -> [a]
++ [BucketEntry]
newEntries)
            writeTVar candidatesVar merged
            writeTVar knownVar newKnown

          -- Check termination
          shouldContinue <- atomically $ do
            candidates <- readTVar candidatesVar
            queried <- readTVar queriedVar
            let topK = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
kValue [BucketEntry]
candidates
                allQueried = (BucketEntry -> Bool) -> [BucketEntry] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\BucketEntry
e -> PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
queried) [BucketEntry]
topK
            pure (not allQueried)

          if shouldContinue then go else finalize

    finalize :: IO (Either String DHTRecord)
finalize = do
      best <- TVar (Maybe DHTRecord) -> IO (Maybe DHTRecord)
forall a. TVar a -> IO a
readTVarIO TVar (Maybe DHTRecord)
bestVar
      outdated <- readTVarIO outdatedVar

      -- Convergence repair: PUT_VALUE to outdated peers
      case best of
        Maybe DHTRecord
Nothing -> Either String DHTRecord -> IO (Either String DHTRecord)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (String -> Either String DHTRecord
forall a b. a -> Either a b
Left String
"value not found")
        Just DHTRecord
rec -> do
          let putMsg :: DHTMessage
putMsg = DHTMessage
emptyDHTMessage
                { msgType = PutValue
                , msgKey = recKey rec
                , msgRecord = Just rec
                }
          (PeerId -> IO (Either String DHTMessage)) -> [PeerId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\PeerId
pid -> (DHTNode -> PeerId -> DHTMessage -> IO (Either String DHTMessage)
dhtSendRequest DHTNode
node) PeerId
pid DHTMessage
putMsg
                          IO (Either String DHTMessage)
-> (SomeException -> IO (Either String DHTMessage))
-> IO (Either String DHTMessage)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
_ :: SomeException) -> Either String DHTMessage -> IO (Either String DHTMessage)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (String -> Either String DHTMessage
forall a b. a -> Either a b
Left String
"repair failed")))
                (Set PeerId -> [PeerId]
forall a. Set a -> [a]
Set.toList Set PeerId
outdated)
          Either String DHTRecord -> IO (Either String DHTRecord)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DHTRecord -> Either String DHTRecord
forall a b. b -> Either a b
Right DHTRecord
rec)

    _now :: ByteString
_now = let DHTKey ByteString
bs = DHTKey
targetKey in ByteString
bs  -- placeholder

-- | Query a peer for a value and return (peerId, closerPeers, Maybe record).
queryPeerForValue :: DHTNode -> DHTKey -> BucketEntry
                  -> IO (PeerId, Either String [DHTPeer], Maybe DHTRecord)
queryPeerForValue :: DHTNode
-> DHTKey
-> BucketEntry
-> IO (PeerId, Either String [DHTPeer], Maybe DHTRecord)
queryPeerForValue DHTNode
node DHTKey
targetKey BucketEntry
entry = do
  let (DHTKey ByteString
keyBytes) = DHTKey
targetKey
      request :: DHTMessage
request = DHTMessage
emptyDHTMessage { msgType = GetValue, msgKey = keyBytes }
  result <- (DHTNode -> PeerId -> DHTMessage -> IO (Either String DHTMessage)
dhtSendRequest DHTNode
node) (BucketEntry -> PeerId
entryPeerId BucketEntry
entry) DHTMessage
request
    IO (Either String DHTMessage)
-> (SomeException -> IO (Either String DHTMessage))
-> IO (Either String DHTMessage)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
e :: SomeException) -> Either String DHTMessage -> IO (Either String DHTMessage)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (String -> Either String DHTMessage
forall a b. a -> Either a b
Left (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)))
  pure $ case result of
    Left String
err -> (BucketEntry -> PeerId
entryPeerId BucketEntry
entry, String -> Either String [DHTPeer]
forall a b. a -> Either a b
Left String
err, Maybe DHTRecord
forall a. Maybe a
Nothing)
    Right DHTMessage
resp -> (BucketEntry -> PeerId
entryPeerId BucketEntry
entry, [DHTPeer] -> Either String [DHTPeer]
forall a b. b -> Either a b
Right (DHTMessage -> [DHTPeer]
msgCloserPeers DHTMessage
resp), DHTMessage -> Maybe DHTRecord
msgRecord DHTMessage
resp)

-- | Process a value result: update best/bestPeers/outdated.
processValueResult
  :: DHTNode -> DHTKey
  -> TVar (Maybe DHTRecord)
  -> TVar (Set PeerId)
  -> TVar (Set PeerId)
  -> Validator
  -> a
  -> (PeerId, Either String [DHTPeer], Maybe DHTRecord)
  -> IO ()
processValueResult :: forall a.
DHTNode
-> DHTKey
-> TVar (Maybe DHTRecord)
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> Validator
-> a
-> (PeerId, Either String [DHTPeer], Maybe DHTRecord)
-> IO ()
processValueResult DHTNode
_ DHTKey
_ TVar (Maybe DHTRecord)
bestVar TVar (Set PeerId)
bestPeersVar TVar (Set PeerId)
outdatedVar Validator
validator a
_ (PeerId
pid, Either String [DHTPeer]
_, Just DHTRecord
rec) = do
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    best <- TVar (Maybe DHTRecord) -> STM (Maybe DHTRecord)
forall a. TVar a -> STM a
readTVar TVar (Maybe DHTRecord)
bestVar
    case best of
      Maybe DHTRecord
Nothing -> do
        TVar (Maybe DHTRecord) -> Maybe DHTRecord -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe DHTRecord)
bestVar (DHTRecord -> Maybe DHTRecord
forall a. a -> Maybe a
Just DHTRecord
rec)
        TVar (Set PeerId) -> Set PeerId -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Set PeerId)
bestPeersVar (PeerId -> Set PeerId
forall a. a -> Set a
Set.singleton PeerId
pid)
      Just DHTRecord
currentBest -> do
        case Validator -> ByteString -> [ByteString] -> Either String Int
valSelect Validator
validator (DHTRecord -> ByteString
recKey DHTRecord
rec) [DHTRecord -> ByteString
recValue DHTRecord
currentBest, DHTRecord -> ByteString
recValue DHTRecord
rec] of
          Right Int
0 -> do
            -- Current best is still best; this peer has outdated value
            TVar (Set PeerId) -> (Set PeerId -> Set PeerId) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set PeerId)
outdatedVar (PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => a -> Set a -> Set a
Set.insert PeerId
pid)
          Right Int
1 -> do
            -- New value is better
            oldBestPeers <- TVar (Set PeerId) -> STM (Set PeerId)
forall a. TVar a -> STM a
readTVar TVar (Set PeerId)
bestPeersVar
            modifyTVar' outdatedVar (Set.union oldBestPeers)
            writeTVar bestVar (Just rec)
            writeTVar bestPeersVar (Set.singleton pid)
          Either String Int
_ -> do
            -- Same or error: add to bestPeers
            TVar (Set PeerId) -> (Set PeerId -> Set PeerId) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar (Set PeerId)
bestPeersVar (PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => a -> Set a -> Set a
Set.insert PeerId
pid)
processValueResult DHTNode
_ DHTKey
_ TVar (Maybe DHTRecord)
_ TVar (Set PeerId)
_ TVar (Set PeerId)
_ Validator
_ a
_ (PeerId
_, Either String [DHTPeer]
_, Maybe DHTRecord
Nothing) = () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Iterative GET_PROVIDERS: find providers for a content key.
iterativeGetProviders :: DHTNode -> ByteString -> IO [ProviderEntry]
iterativeGetProviders :: DHTNode -> ByteString -> IO [ProviderEntry]
iterativeGetProviders DHTNode
node ByteString
key = do
  rt <- TVar RoutingTable -> IO RoutingTable
forall a. TVar a -> IO a
readTVarIO (DHTNode -> TVar RoutingTable
dhtRoutingTable DHTNode
node)
  let targetKey = ByteString -> DHTKey
DHTKey ByteString
key
      seeds = DHTKey -> Int -> RoutingTable -> [BucketEntry]
closestPeers DHTKey
targetKey Int
kValue RoutingTable
rt
  now <- getCurrentTime

  candidatesVar <- newTVarIO (sortByDistance targetKey seeds)
  queriedVar    <- newTVarIO Set.empty
  knownVar      <- newTVarIO (Set.fromList (map entryPeerId seeds))
  providersVar  <- newTVarIO ([] :: [ProviderEntry])

  providerLoop node targetKey candidatesVar queriedVar knownVar providersVar now

-- | Provider lookup loop.
providerLoop
  :: DHTNode
  -> DHTKey
  -> TVar [BucketEntry]
  -> TVar (Set PeerId)
  -> TVar (Set PeerId)    -- ^ Known peers (dedup)
  -> TVar [ProviderEntry]
  -> a
  -> IO [ProviderEntry]
providerLoop :: forall a.
DHTNode
-> DHTKey
-> TVar [BucketEntry]
-> TVar (Set PeerId)
-> TVar (Set PeerId)
-> TVar [ProviderEntry]
-> a
-> IO [ProviderEntry]
providerLoop DHTNode
node DHTKey
targetKey TVar [BucketEntry]
candidatesVar TVar (Set PeerId)
queriedVar TVar (Set PeerId)
knownVar TVar [ProviderEntry]
providersVar a
_now = IO [ProviderEntry]
go
  where
    go :: IO [ProviderEntry]
go = do
      toQuery <- STM [BucketEntry] -> IO [BucketEntry]
forall a. STM a -> IO a
atomically (STM [BucketEntry] -> IO [BucketEntry])
-> STM [BucketEntry] -> IO [BucketEntry]
forall a b. (a -> b) -> a -> b
$ do
        candidates <- TVar [BucketEntry] -> STM [BucketEntry]
forall a. TVar a -> STM a
readTVar TVar [BucketEntry]
candidatesVar
        queried <- readTVar queriedVar
        let unqueried = (BucketEntry -> Bool) -> [BucketEntry] -> [BucketEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter (\BucketEntry
e -> Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
queried)) [BucketEntry]
candidates
            batch = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
alphaValue [BucketEntry]
unqueried
        let newQueried = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
queried ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList ((BucketEntry -> PeerId) -> [BucketEntry] -> [PeerId]
forall a b. (a -> b) -> [a] -> [b]
map BucketEntry -> PeerId
entryPeerId [BucketEntry]
batch))
        writeTVar queriedVar newQueried
        pure batch

      if null toQuery
        then readTVarIO providersVar
        else do
          results <- mapConcurrently (queryPeerForProviders node targetKey) toQuery

          -- Collect providers and closer peers
          atomically $ do
            known <- readTVar knownVar
            candidates <- readTVar candidatesVar
            currentProviders <- readTVar providersVar
            let allCloser = ((PeerId, Either String [DHTPeer], [DHTPeer]) -> [DHTPeer])
-> [(PeerId, Either String [DHTPeer], [DHTPeer])] -> [DHTPeer]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\(PeerId
_, Either String [DHTPeer]
closer, [DHTPeer]
_) -> (String -> [DHTPeer])
-> ([DHTPeer] -> [DHTPeer]) -> Either String [DHTPeer] -> [DHTPeer]
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ([DHTPeer] -> String -> [DHTPeer]
forall a b. a -> b -> a
const []) [DHTPeer] -> [DHTPeer]
forall a. a -> a
id Either String [DHTPeer]
closer) [(PeerId, Either String [DHTPeer], [DHTPeer])]
results
                allProviders = ((PeerId, Either String [DHTPeer], [DHTPeer]) -> [DHTPeer])
-> [(PeerId, Either String [DHTPeer], [DHTPeer])] -> [DHTPeer]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\(PeerId
_, Either String [DHTPeer]
_, [DHTPeer]
provs) -> [DHTPeer]
provs) [(PeerId, Either String [DHTPeer], [DHTPeer])]
results
                newEntries = (BucketEntry -> Bool) -> [BucketEntry] -> [BucketEntry]
forall a. (a -> Bool) -> [a] -> [a]
filter (\BucketEntry
e -> Bool -> Bool
not (PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
known))
                           ([BucketEntry] -> [BucketEntry]) -> [BucketEntry] -> [BucketEntry]
forall a b. (a -> b) -> a -> b
$ (DHTPeer -> BucketEntry) -> [DHTPeer] -> [BucketEntry]
forall a b. (a -> b) -> [a] -> [b]
map (ByteString -> DHTPeer -> BucketEntry
forall a. a -> DHTPeer -> BucketEntry
dhtPeerToEntry ByteString
_now) [DHTPeer]
allCloser
                newKnown = Set PeerId -> Set PeerId -> Set PeerId
forall a. Ord a => Set a -> Set a -> Set a
Set.union Set PeerId
known ([PeerId] -> Set PeerId
forall a. Ord a => [a] -> Set a
Set.fromList ((BucketEntry -> PeerId) -> [BucketEntry] -> [PeerId]
forall a b. (a -> b) -> [a] -> [b]
map BucketEntry -> PeerId
entryPeerId [BucketEntry]
newEntries))
                merged = DHTKey -> [BucketEntry] -> [BucketEntry]
sortByDistance DHTKey
targetKey ([BucketEntry]
candidates [BucketEntry] -> [BucketEntry] -> [BucketEntry]
forall a. [a] -> [a] -> [a]
++ [BucketEntry]
newEntries)
                newProviderEntries = (DHTPeer -> ProviderEntry) -> [DHTPeer] -> [ProviderEntry]
forall a b. (a -> b) -> [a] -> [b]
map DHTPeer -> ProviderEntry
dhtPeerToProvider [DHTPeer]
allProviders
            writeTVar candidatesVar merged
            writeTVar knownVar newKnown
            writeTVar providersVar (currentProviders ++ newProviderEntries)

          shouldContinue <- atomically $ do
            candidates <- readTVar candidatesVar
            queried <- readTVar queriedVar
            let topK = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
kValue [BucketEntry]
candidates
                allQueried = (BucketEntry -> Bool) -> [BucketEntry] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all (\BucketEntry
e -> PeerId -> Set PeerId -> Bool
forall a. Ord a => a -> Set a -> Bool
Set.member (BucketEntry -> PeerId
entryPeerId BucketEntry
e) Set PeerId
queried) [BucketEntry]
topK
            pure (not allQueried)

          if shouldContinue then go else readTVarIO providersVar

    _now :: ByteString
_now = let DHTKey ByteString
bs = DHTKey
targetKey in ByteString
bs

-- | Query a peer for providers.
queryPeerForProviders :: DHTNode -> DHTKey -> BucketEntry
                      -> IO (PeerId, Either String [DHTPeer], [DHTPeer])
queryPeerForProviders :: DHTNode
-> DHTKey
-> BucketEntry
-> IO (PeerId, Either String [DHTPeer], [DHTPeer])
queryPeerForProviders DHTNode
node DHTKey
targetKey BucketEntry
entry = do
  let (DHTKey ByteString
keyBytes) = DHTKey
targetKey
      request :: DHTMessage
request = DHTMessage
emptyDHTMessage { msgType = GetProviders, msgKey = keyBytes }
  result <- (DHTNode -> PeerId -> DHTMessage -> IO (Either String DHTMessage)
dhtSendRequest DHTNode
node) (BucketEntry -> PeerId
entryPeerId BucketEntry
entry) DHTMessage
request
    IO (Either String DHTMessage)
-> (SomeException -> IO (Either String DHTMessage))
-> IO (Either String DHTMessage)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
e :: SomeException) -> Either String DHTMessage -> IO (Either String DHTMessage)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (String -> Either String DHTMessage
forall a b. a -> Either a b
Left (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)))
  pure $ case result of
    Left String
err -> (BucketEntry -> PeerId
entryPeerId BucketEntry
entry, String -> Either String [DHTPeer]
forall a b. a -> Either a b
Left String
err, [])
    Right DHTMessage
resp -> (BucketEntry -> PeerId
entryPeerId BucketEntry
entry, [DHTPeer] -> Either String [DHTPeer]
forall a b. b -> Either a b
Right (DHTMessage -> [DHTPeer]
msgCloserPeers DHTMessage
resp), DHTMessage -> [DHTPeer]
msgProviderPeers DHTMessage
resp)

-- | Bootstrap the DHT: connect to seeds, self-lookup, per-bucket refresh.
bootstrap :: DHTNode -> [PeerId] -> IO ()
bootstrap :: DHTNode -> [PeerId] -> IO ()
bootstrap DHTNode
node [PeerId]
seeds = do
  now <- IO UTCTime
getCurrentTime
  -- Step 1: Add seed peers to routing table
  rt <- readTVarIO (dhtRoutingTable node)
  let seedEntries = (PeerId -> BucketEntry) -> [PeerId] -> [BucketEntry]
forall a b. (a -> b) -> [a] -> [b]
map (\PeerId
pid -> PeerId
-> DHTKey
-> [Multiaddr]
-> UTCTime
-> ConnectionType
-> BucketEntry
BucketEntry PeerId
pid (PeerId -> DHTKey
peerIdToKey PeerId
pid) [] UTCTime
now ConnectionType
NotConnected) [PeerId]
seeds
      rt' = (RoutingTable -> BucketEntry -> RoutingTable)
-> RoutingTable -> [BucketEntry] -> RoutingTable
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl (\RoutingTable
r BucketEntry
e -> (RoutingTable, InsertResult) -> RoutingTable
forall a b. (a, b) -> a
fst (BucketEntry -> RoutingTable -> (RoutingTable, InsertResult)
insertPeer BucketEntry
e RoutingTable
r)) RoutingTable
rt [BucketEntry]
seedEntries
  atomically $ writeTVar (dhtRoutingTable node) rt'

  -- Step 2: Self-lookup (FIND_NODE for our own key)
  _ <- iterativeFindNode node (dhtLocalKey node)

  -- Step 3: Refresh non-empty buckets
  -- (simplified: just do another lookup for a peer in each occupied bucket)
  rt'' <- readTVarIO (dhtRoutingTable node)
  let peers = RoutingTable -> [BucketEntry]
allPeers RoutingTable
rt''
  -- For each unique bucket, pick a representative peer and do a lookup
  let bucketReps = Int -> [BucketEntry] -> [BucketEntry]
forall a. Int -> [a] -> [a]
take Int
10 [BucketEntry]
peers  -- limit to avoid excessive lookups during bootstrap
  mapM_ (\BucketEntry
entry -> DHTNode -> DHTKey -> IO [BucketEntry]
iterativeFindNode DHTNode
node (BucketEntry -> DHTKey
entryKey BucketEntry
entry)
                     IO [BucketEntry]
-> (SomeException -> IO [BucketEntry]) -> IO [BucketEntry]
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
_ :: SomeException) -> [BucketEntry] -> IO [BucketEntry]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []))
        bucketReps

-- Helpers

-- | Convert a DHTPeer to a BucketEntry (with current time).
dhtPeerToEntry :: a -> DHTPeer -> BucketEntry
dhtPeerToEntry :: forall a. a -> DHTPeer -> BucketEntry
dhtPeerToEntry a
_ DHTPeer
peer = BucketEntry
  { entryPeerId :: PeerId
entryPeerId   = ByteString -> PeerId
PeerId (DHTPeer -> ByteString
dhtPeerId DHTPeer
peer)
  , entryKey :: DHTKey
entryKey      = PeerId -> DHTKey
peerIdToKey (ByteString -> PeerId
PeerId (DHTPeer -> ByteString
dhtPeerId DHTPeer
peer))
  , entryAddrs :: [Multiaddr]
entryAddrs    = []  -- would need multiaddr decoding
  , entryLastSeen :: UTCTime
entryLastSeen = UTCTime
epochTime
  , entryConnType :: ConnectionType
entryConnType = DHTPeer -> ConnectionType
dhtPeerConnType DHTPeer
peer
  }

-- | Convert a DHTPeer to a ProviderEntry.
dhtPeerToProvider :: DHTPeer -> ProviderEntry
dhtPeerToProvider :: DHTPeer -> ProviderEntry
dhtPeerToProvider DHTPeer
peer = ProviderEntry
  { peProvider :: PeerId
peProvider  = ByteString -> PeerId
PeerId (DHTPeer -> ByteString
dhtPeerId DHTPeer
peer)
  , peAddrs :: [Multiaddr]
peAddrs     = []
  , peTimestamp :: UTCTime
peTimestamp = UTCTime
epochTime
  }

-- | Epoch time as placeholder (0 seconds from epoch).
epochTime :: UTCTime
epochTime :: UTCTime
epochTime = String -> UTCTime
forall a. Read a => String -> a
read String
"2000-01-01 00:00:00 UTC"