Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 155 additions & 56 deletions channeldb/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,22 +216,101 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
startTime := time.Now()
log.Debugf("Populating in-memory channel graph, this might " +
"take a while...")

err := g.ForEachNodeCacheable(
func(tx kvdb.RTx, node GraphCacheNode) error {
return g.graphCache.AddNode(tx, node)
g.graphCache.AddNodeFeatures(node)

return nil
},
)
if err != nil {
return nil, err
}

err = g.ForEachChannel(func(info *ChannelEdgeInfo,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my comment elsewhere: perhaps we just want to have a new private forEachChannelX method here that skips the intermediate map and inserts directly into the cache?

policy1, policy2 *ChannelEdgePolicy) error {

g.graphCache.AddChannel(info, policy1, policy2)

return nil
})
if err != nil {
return nil, err
}

log.Debugf("Finished populating in-memory channel graph (took "+
"%v, %s)", time.Since(startTime), g.graphCache.Stats())
}

return g, nil
}

// channelMapKey is the key structure used for storing channel edge policies.
type channelMapKey struct {
nodeKey route.Vertex
chanID [8]byte
}

// getChannelMap loads all channel edge policies from the database and stores
// them in a map.
func (c *ChannelGraph) getChannelMap(edges kvdb.RBucket) (
map[channelMapKey]*ChannelEdgePolicy, error) {

// Create a map to store all channel edge policies.
channelMap := make(map[channelMapKey]*ChannelEdgePolicy)

err := kvdb.ForAll(edges, func(k, edgeBytes []byte) error {
// Skip embedded buckets.
if bytes.Equal(k, edgeIndexBucket) ||
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking maybe a more future proof way to avoid iterating the embedded buckets is to check if the value is nil. Not perfect either, but AFAIK there's no nil valued key in this bucket other than sub buckets. The unknown policy is an empty slice ([]byte{}) so that would work too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it explicit as a sanity check that there is nothing unexpected in this bucket. But you could also consider it less future proof to not skip over unexpected data.

bytes.Equal(k, edgeUpdateIndexBucket) ||
bytes.Equal(k, zombieBucket) ||
bytes.Equal(k, disabledEdgePolicyBucket) ||
bytes.Equal(k, channelPointBucket) {

return nil
}

// Validate key length.
if len(k) != 33+8 {
return fmt.Errorf("invalid edge key %x encountered", k)
}

var key channelMapKey
copy(key.nodeKey[:], k[:33])
copy(key.chanID[:], k[33:])

// No need to deserialize unknown policy.
if bytes.Equal(edgeBytes, unknownPolicy) {
return nil
}

edgeReader := bytes.NewReader(edgeBytes)
edge, err := deserializeChanEdgePolicyRaw(
edgeReader,
)

switch {
// If the db policy was missing an expected optional field, we
// return nil as if the policy was unknown.
case err == ErrEdgePolicyOptionalFieldNotFound:
return nil

case err != nil:
return err
}

channelMap[key] = edge

return nil
})
if err != nil {
return nil, err
}

return channelMap, nil
}

var graphTopLevelBuckets = [][]byte{
nodeBucket,
edgeBucket,
Expand Down Expand Up @@ -332,50 +411,47 @@ func (c *ChannelGraph) NewPathFindTx() (kvdb.RTx, error) {
func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo,
*ChannelEdgePolicy, *ChannelEdgePolicy) error) error {

// TODO(roasbeef): ptr map to reduce # of allocs? no duplicates

return kvdb.View(c.db, func(tx kvdb.RTx) error {
// First, grab the node bucket. This will be used to populate
// the Node pointers in each edge read from disk.
nodes := tx.ReadBucket(nodeBucket)
if nodes == nil {
return ErrGraphNotFound
}

// Next, grab the edge bucket which stores the edges, and also
// the index itself so we can group the directed edges together
// logically.
return c.db.View(func(tx kvdb.RTx) error {
edges := tx.ReadBucket(edgeBucket)
if edges == nil {
return ErrGraphNoEdgesFound
}

// First, load all edges in memory indexed by node and channel
// id.
channelMap, err := c.getChannelMap(edges)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This impacts all other callers of this method (which ideally should just be hitting the main graph cache instead?) to optimize for only the start up case.

Copy link
Contributor Author

@joostjager joostjager Jan 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no other callers besides graph cache population and DescribeGraph which needs this optimization as well.

It can't hit the main graph cache, because that only contains a subset of the graph data needed for pathfinding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and DescribeGraph which needs this optimization as well.
FWIW we now have in-memory caching of the proto serialization now here.

if err != nil {
return err
}

edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
if edgeIndex == nil {
return ErrGraphNoEdgesFound
}

// For each edge pair within the edge index, we fetch each edge
// itself and also the node information in order to fully
// populated the object.
return edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error {
infoReader := bytes.NewReader(edgeInfoBytes)
edgeInfo, err := deserializeChanEdgeInfo(infoReader)
if err != nil {
return err
}
edgeInfo.db = c.db
// Load edge index, recombine each channel with the policies
// loaded above and invoke the callback.
return kvdb.ForAll(edgeIndex, func(k, edgeInfoBytes []byte) error {
var chanID [8]byte
copy(chanID[:], k)

edge1, edge2, err := fetchChanEdgePolicies(
edgeIndex, edges, nodes, chanID, c.db,
)
edgeInfoReader := bytes.NewReader(edgeInfoBytes)
info, err := deserializeChanEdgeInfo(edgeInfoReader)
if err != nil {
return err
}

// With both edges read, execute the call back. IF this
// function returns an error then the transaction will
// be aborted.
return cb(&edgeInfo, edge1, edge2)
policy1 := channelMap[channelMapKey{
nodeKey: info.NodeKey1Bytes,
chanID: chanID,
}]

policy2 := channelMap[channelMapKey{
nodeKey: info.NodeKey2Bytes,
chanID: chanID,
}]

return cb(&info, policy1, policy2)
})
}, func() {})
}
Expand Down Expand Up @@ -628,7 +704,6 @@ func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx,
return ErrGraphNotFound
}

cacheableNode := newGraphCacheNode(route.Vertex{}, nil)
return nodes.ForEach(func(pubKey, nodeBytes []byte) error {
// If this is the source key, then we skip this
// iteration as the value for this key is a pubKey
Expand All @@ -638,8 +713,8 @@ func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx,
}

nodeReader := bytes.NewReader(nodeBytes)
err := deserializeLightningNodeCacheable(
nodeReader, cacheableNode,
cacheableNode, err := deserializeLightningNodeCacheable(
nodeReader,
)
if err != nil {
return err
Expand Down Expand Up @@ -2740,8 +2815,6 @@ func (c *ChannelGraph) FetchLightningNode(nodePub route.Vertex) (
type graphCacheNode struct {
pubKeyBytes route.Vertex
features *lnwire.FeatureVector

nodeScratch [8]byte
}

// newGraphCacheNode returns a new cache optimized node.
Expand Down Expand Up @@ -4090,51 +4163,60 @@ func fetchLightningNode(nodeBucket kvdb.RBucket,
return deserializeLightningNode(nodeReader)
}

func deserializeLightningNodeCacheable(r io.Reader, node *graphCacheNode) error {
func deserializeLightningNodeCacheable(r io.Reader) (*graphCacheNode, error) {
// Always populate a feature vector, even if we don't have a node
// announcement and short circuit below.
node.features = lnwire.EmptyFeatureVector()
node := newGraphCacheNode(
route.Vertex{},
lnwire.EmptyFeatureVector(),
)

var nodeScratch [8]byte

// Skip ahead:
// - LastUpdate (8 bytes)
if _, err := r.Read(node.nodeScratch[:]); err != nil {
return err
if _, err := r.Read(nodeScratch[:]); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I digged into this a bit and I think many of the allocations might be just due to us using the io.Reader interface here, since both Read and ReadFull do allocate. We may in the future want to switch to just passing around the byte slices to these deserialize functions and just copy out the important parts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems related to: #4884

return nil, err
}

if _, err := io.ReadFull(r, node.pubKeyBytes[:]); err != nil {
return err
return nil, err
}

// Read the node announcement flag.
if _, err := r.Read(node.nodeScratch[:2]); err != nil {
return err
if _, err := r.Read(nodeScratch[:2]); err != nil {
return nil, err
}
hasNodeAnn := byteOrder.Uint16(node.nodeScratch[:2])
hasNodeAnn := byteOrder.Uint16(nodeScratch[:2])

// The rest of the data is optional, and will only be there if we got a
// node announcement for this node.
if hasNodeAnn == 0 {
return nil
return node, nil
}

// We did get a node announcement for this node, so we'll have the rest
// of the data available.
var rgb uint8
if err := binary.Read(r, byteOrder, &rgb); err != nil {
return err
return nil, err
}
if err := binary.Read(r, byteOrder, &rgb); err != nil {
return err
return nil, err
}
if err := binary.Read(r, byteOrder, &rgb); err != nil {
return err
return nil, err
}

if _, err := wire.ReadVarString(r, 0); err != nil {
return err
return nil, err
}

if err := node.features.Decode(r); err != nil {
return nil, err
}

return node.features.Decode(r)
return node, nil
}

func deserializeLightningNode(r io.Reader) (LightningNode, error) {
Expand Down Expand Up @@ -4652,6 +4734,27 @@ func serializeChanEdgePolicy(w io.Writer, edge *ChannelEdgePolicy,
func deserializeChanEdgePolicy(r io.Reader,
nodes kvdb.RBucket) (*ChannelEdgePolicy, error) {

// Deserialize the policy. Note that in case an optional field is not
// found, both an error and a populated policy object are returned.
edge, deserializeErr := deserializeChanEdgePolicyRaw(r)
if deserializeErr != nil &&
deserializeErr != ErrEdgePolicyOptionalFieldNotFound {

return nil, deserializeErr
}

// Populate full LightningNode struct.
pub := edge.Node.PubKeyBytes[:]
node, err := fetchLightningNode(nodes, pub)
if err != nil {
return nil, fmt.Errorf("unable to fetch node: %x, %v", pub, err)
}
edge.Node = &node

return edge, deserializeErr
}

func deserializeChanEdgePolicyRaw(r io.Reader) (*ChannelEdgePolicy, error) {
edge := &ChannelEdgePolicy{}

var err error
Expand Down Expand Up @@ -4701,13 +4804,9 @@ func deserializeChanEdgePolicy(r io.Reader,
if _, err := r.Read(pub[:]); err != nil {
return nil, err
}

node, err := fetchLightningNode(nodes, pub[:])
if err != nil {
return nil, fmt.Errorf("unable to fetch node: %x, %v",
pub[:], err)
edge.Node = &LightningNode{
PubKeyBytes: pub,
}
edge.Node = &node

// We'll try and see if there are any opaque bytes left, if not, then
// we'll ignore the EOF error and return the edge as is.
Expand Down
11 changes: 8 additions & 3 deletions channeldb/graph_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,8 @@ func (c *GraphCache) Stats() string {
numChannels)
}

// AddNode adds a graph node, including all the (directed) channels of that
// node.
func (c *GraphCache) AddNode(tx kvdb.RTx, node GraphCacheNode) error {
// AddNodeFeatures adds a graph node and its features to the cache.
func (c *GraphCache) AddNodeFeatures(node GraphCacheNode) {
nodePubKey := node.PubKey()

// Only hold the lock for a short time. The `ForEachChannel()` below is
Expand All @@ -217,6 +216,12 @@ func (c *GraphCache) AddNode(tx kvdb.RTx, node GraphCacheNode) error {
c.mtx.Lock()
c.nodeFeatures[nodePubKey] = node.Features()
c.mtx.Unlock()
}

// AddNode adds a graph node, including all the (directed) channels of that
// node.
func (c *GraphCache) AddNode(tx kvdb.RTx, node GraphCacheNode) error {
c.AddNodeFeatures(node)

return node.ForEachChannel(
tx, func(tx kvdb.RTx, info *ChannelEdgeInfo,
Expand Down
Loading