-
Notifications
You must be signed in to change notification settings - Fork 2.2k
kvdb+channeldb: speed up graph cache #6111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
99566b7
102a1cb
b8408a1
aff9685
54324d5
cb47036
2e2229a
352008a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -216,22 +216,101 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int, | |
startTime := time.Now() | ||
log.Debugf("Populating in-memory channel graph, this might " + | ||
"take a while...") | ||
|
||
err := g.ForEachNodeCacheable( | ||
func(tx kvdb.RTx, node GraphCacheNode) error { | ||
return g.graphCache.AddNode(tx, node) | ||
g.graphCache.AddNodeFeatures(node) | ||
|
||
return nil | ||
}, | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
err = g.ForEachChannel(func(info *ChannelEdgeInfo, | ||
policy1, policy2 *ChannelEdgePolicy) error { | ||
|
||
g.graphCache.AddChannel(info, policy1, policy2) | ||
|
||
return nil | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
log.Debugf("Finished populating in-memory channel graph (took "+ | ||
"%v, %s)", time.Since(startTime), g.graphCache.Stats()) | ||
} | ||
|
||
return g, nil | ||
} | ||
|
||
// channelMapKey is the key structure used for storing channel edge policies. | ||
type channelMapKey struct { | ||
nodeKey route.Vertex | ||
chanID [8]byte | ||
} | ||
|
||
// getChannelMap loads all channel edge policies from the database and stores | ||
// them in a map. | ||
func (c *ChannelGraph) getChannelMap(edges kvdb.RBucket) ( | ||
map[channelMapKey]*ChannelEdgePolicy, error) { | ||
|
||
// Create a map to store all channel edge policies. | ||
channelMap := make(map[channelMapKey]*ChannelEdgePolicy) | ||
|
||
err := kvdb.ForAll(edges, func(k, edgeBytes []byte) error { | ||
// Skip embedded buckets. | ||
if bytes.Equal(k, edgeIndexBucket) || | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm thinking maybe a more future proof way to avoid iterating the embedded buckets is to check if the value is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made it explicit as a sanity check that there is nothing unexpected in this bucket. But you could also consider it less future proof to not skip over unexpected data. |
||
bytes.Equal(k, edgeUpdateIndexBucket) || | ||
bytes.Equal(k, zombieBucket) || | ||
bytes.Equal(k, disabledEdgePolicyBucket) || | ||
bytes.Equal(k, channelPointBucket) { | ||
|
||
return nil | ||
} | ||
|
||
// Validate key length. | ||
if len(k) != 33+8 { | ||
return fmt.Errorf("invalid edge key %x encountered", k) | ||
} | ||
|
||
var key channelMapKey | ||
copy(key.nodeKey[:], k[:33]) | ||
copy(key.chanID[:], k[33:]) | ||
|
||
// No need to deserialize unknown policy. | ||
if bytes.Equal(edgeBytes, unknownPolicy) { | ||
return nil | ||
} | ||
|
||
edgeReader := bytes.NewReader(edgeBytes) | ||
edge, err := deserializeChanEdgePolicyRaw( | ||
edgeReader, | ||
) | ||
|
||
switch { | ||
// If the db policy was missing an expected optional field, we | ||
// return nil as if the policy was unknown. | ||
case err == ErrEdgePolicyOptionalFieldNotFound: | ||
return nil | ||
|
||
case err != nil: | ||
return err | ||
} | ||
|
||
channelMap[key] = edge | ||
Roasbeef marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
return nil | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return channelMap, nil | ||
} | ||
|
||
var graphTopLevelBuckets = [][]byte{ | ||
nodeBucket, | ||
edgeBucket, | ||
|
@@ -332,50 +411,47 @@ func (c *ChannelGraph) NewPathFindTx() (kvdb.RTx, error) { | |
func (c *ChannelGraph) ForEachChannel(cb func(*ChannelEdgeInfo, | ||
*ChannelEdgePolicy, *ChannelEdgePolicy) error) error { | ||
|
||
// TODO(roasbeef): ptr map to reduce # of allocs? no duplicates | ||
|
||
return kvdb.View(c.db, func(tx kvdb.RTx) error { | ||
// First, grab the node bucket. This will be used to populate | ||
// the Node pointers in each edge read from disk. | ||
nodes := tx.ReadBucket(nodeBucket) | ||
if nodes == nil { | ||
return ErrGraphNotFound | ||
} | ||
|
||
// Next, grab the edge bucket which stores the edges, and also | ||
// the index itself so we can group the directed edges together | ||
// logically. | ||
return c.db.View(func(tx kvdb.RTx) error { | ||
edges := tx.ReadBucket(edgeBucket) | ||
if edges == nil { | ||
guggero marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return ErrGraphNoEdgesFound | ||
} | ||
|
||
// First, load all edges in memory indexed by node and channel | ||
// id. | ||
channelMap, err := c.getChannelMap(edges) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This impacts all other callers of this method (which ideally should just be hitting the main graph cache instead?) to optimize for only the start up case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are no other callers besides graph cache population and It can't hit the main graph cache, because that only contains a subset of the graph data needed for pathfinding. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
if err != nil { | ||
return err | ||
} | ||
|
||
edgeIndex := edges.NestedReadBucket(edgeIndexBucket) | ||
if edgeIndex == nil { | ||
return ErrGraphNoEdgesFound | ||
} | ||
|
||
// For each edge pair within the edge index, we fetch each edge | ||
// itself and also the node information in order to fully | ||
// populated the object. | ||
return edgeIndex.ForEach(func(chanID, edgeInfoBytes []byte) error { | ||
infoReader := bytes.NewReader(edgeInfoBytes) | ||
edgeInfo, err := deserializeChanEdgeInfo(infoReader) | ||
if err != nil { | ||
return err | ||
} | ||
edgeInfo.db = c.db | ||
// Load edge index, recombine each channel with the policies | ||
// loaded above and invoke the callback. | ||
return kvdb.ForAll(edgeIndex, func(k, edgeInfoBytes []byte) error { | ||
var chanID [8]byte | ||
copy(chanID[:], k) | ||
|
||
edge1, edge2, err := fetchChanEdgePolicies( | ||
edgeIndex, edges, nodes, chanID, c.db, | ||
) | ||
edgeInfoReader := bytes.NewReader(edgeInfoBytes) | ||
info, err := deserializeChanEdgeInfo(edgeInfoReader) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// With both edges read, execute the call back. IF this | ||
// function returns an error then the transaction will | ||
// be aborted. | ||
return cb(&edgeInfo, edge1, edge2) | ||
policy1 := channelMap[channelMapKey{ | ||
nodeKey: info.NodeKey1Bytes, | ||
chanID: chanID, | ||
}] | ||
|
||
policy2 := channelMap[channelMapKey{ | ||
nodeKey: info.NodeKey2Bytes, | ||
chanID: chanID, | ||
}] | ||
|
||
return cb(&info, policy1, policy2) | ||
}) | ||
}, func() {}) | ||
} | ||
|
@@ -628,7 +704,6 @@ func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx, | |
return ErrGraphNotFound | ||
} | ||
|
||
cacheableNode := newGraphCacheNode(route.Vertex{}, nil) | ||
return nodes.ForEach(func(pubKey, nodeBytes []byte) error { | ||
// If this is the source key, then we skip this | ||
// iteration as the value for this key is a pubKey | ||
|
@@ -638,8 +713,8 @@ func (c *ChannelGraph) ForEachNodeCacheable(cb func(kvdb.RTx, | |
} | ||
|
||
nodeReader := bytes.NewReader(nodeBytes) | ||
err := deserializeLightningNodeCacheable( | ||
nodeReader, cacheableNode, | ||
cacheableNode, err := deserializeLightningNodeCacheable( | ||
Roasbeef marked this conversation as resolved.
Show resolved
Hide resolved
|
||
nodeReader, | ||
) | ||
if err != nil { | ||
return err | ||
|
@@ -2740,8 +2815,6 @@ func (c *ChannelGraph) FetchLightningNode(nodePub route.Vertex) ( | |
type graphCacheNode struct { | ||
pubKeyBytes route.Vertex | ||
features *lnwire.FeatureVector | ||
|
||
nodeScratch [8]byte | ||
} | ||
|
||
// newGraphCacheNode returns a new cache optimized node. | ||
|
@@ -4090,51 +4163,60 @@ func fetchLightningNode(nodeBucket kvdb.RBucket, | |
return deserializeLightningNode(nodeReader) | ||
} | ||
|
||
func deserializeLightningNodeCacheable(r io.Reader, node *graphCacheNode) error { | ||
func deserializeLightningNodeCacheable(r io.Reader) (*graphCacheNode, error) { | ||
// Always populate a feature vector, even if we don't have a node | ||
// announcement and short circuit below. | ||
node.features = lnwire.EmptyFeatureVector() | ||
node := newGraphCacheNode( | ||
route.Vertex{}, | ||
lnwire.EmptyFeatureVector(), | ||
) | ||
|
||
var nodeScratch [8]byte | ||
|
||
// Skip ahead: | ||
// - LastUpdate (8 bytes) | ||
if _, err := r.Read(node.nodeScratch[:]); err != nil { | ||
return err | ||
if _, err := r.Read(nodeScratch[:]); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I digged into this a bit and I think many of the allocations might be just due to us using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems related to: #4884 |
||
return nil, err | ||
} | ||
|
||
if _, err := io.ReadFull(r, node.pubKeyBytes[:]); err != nil { | ||
return err | ||
return nil, err | ||
} | ||
|
||
// Read the node announcement flag. | ||
if _, err := r.Read(node.nodeScratch[:2]); err != nil { | ||
return err | ||
if _, err := r.Read(nodeScratch[:2]); err != nil { | ||
return nil, err | ||
} | ||
hasNodeAnn := byteOrder.Uint16(node.nodeScratch[:2]) | ||
hasNodeAnn := byteOrder.Uint16(nodeScratch[:2]) | ||
|
||
// The rest of the data is optional, and will only be there if we got a | ||
// node announcement for this node. | ||
if hasNodeAnn == 0 { | ||
return nil | ||
return node, nil | ||
} | ||
|
||
// We did get a node announcement for this node, so we'll have the rest | ||
// of the data available. | ||
var rgb uint8 | ||
if err := binary.Read(r, byteOrder, &rgb); err != nil { | ||
return err | ||
return nil, err | ||
} | ||
if err := binary.Read(r, byteOrder, &rgb); err != nil { | ||
return err | ||
return nil, err | ||
} | ||
if err := binary.Read(r, byteOrder, &rgb); err != nil { | ||
return err | ||
return nil, err | ||
} | ||
|
||
if _, err := wire.ReadVarString(r, 0); err != nil { | ||
return err | ||
return nil, err | ||
} | ||
|
||
if err := node.features.Decode(r); err != nil { | ||
return nil, err | ||
} | ||
|
||
return node.features.Decode(r) | ||
return node, nil | ||
} | ||
|
||
func deserializeLightningNode(r io.Reader) (LightningNode, error) { | ||
|
@@ -4652,6 +4734,27 @@ func serializeChanEdgePolicy(w io.Writer, edge *ChannelEdgePolicy, | |
func deserializeChanEdgePolicy(r io.Reader, | ||
nodes kvdb.RBucket) (*ChannelEdgePolicy, error) { | ||
|
||
// Deserialize the policy. Note that in case an optional field is not | ||
// found, both an error and a populated policy object are returned. | ||
edge, deserializeErr := deserializeChanEdgePolicyRaw(r) | ||
if deserializeErr != nil && | ||
deserializeErr != ErrEdgePolicyOptionalFieldNotFound { | ||
|
||
return nil, deserializeErr | ||
} | ||
|
||
// Populate full LightningNode struct. | ||
pub := edge.Node.PubKeyBytes[:] | ||
node, err := fetchLightningNode(nodes, pub) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to fetch node: %x, %v", pub, err) | ||
} | ||
edge.Node = &node | ||
|
||
return edge, deserializeErr | ||
} | ||
|
||
func deserializeChanEdgePolicyRaw(r io.Reader) (*ChannelEdgePolicy, error) { | ||
edge := &ChannelEdgePolicy{} | ||
|
||
var err error | ||
|
@@ -4701,13 +4804,9 @@ func deserializeChanEdgePolicy(r io.Reader, | |
if _, err := r.Read(pub[:]); err != nil { | ||
return nil, err | ||
} | ||
|
||
node, err := fetchLightningNode(nodes, pub[:]) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to fetch node: %x, %v", | ||
pub[:], err) | ||
edge.Node = &LightningNode{ | ||
PubKeyBytes: pub, | ||
} | ||
edge.Node = &node | ||
|
||
// We'll try and see if there are any opaque bytes left, if not, then | ||
// we'll ignore the EOF error and return the edge as is. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to my comment elsewhere: perhaps we just want to have a new private
forEachChannelX
method here that skips the intermediate map and inserts directly into the cache?