Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15)

project(Kuzu VERSION 0.10.0 LANGUAGES CXX C)
project(Kuzu VERSION 0.10.0.1 LANGUAGES CXX C)

option(SINGLE_THREADED "Single-threaded mode" FALSE)
if(SINGLE_THREADED)
Expand Down
28 changes: 0 additions & 28 deletions src/include/storage/db_file_id.h

This file was deleted.

14 changes: 8 additions & 6 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class OnDiskHashIndex {
template<typename T>
class HashIndex final : public OnDiskHashIndex {
public:
HashIndex(MemoryManager& memoryManager, DBFileIDAndName dbFileIDAndName, FileHandle* fileHandle,
HashIndex(MemoryManager& memoryManager, FileHandle* fileHandle,
OverflowFileHandle* overflowFileHandle, DiskArrayCollection& diskArrays, uint64_t indexPos,
ShadowFile* shadowFile, const HashIndexHeader& indexHeaderForReadTrx,
HashIndexHeader& indexHeaderForWriteTrx);
Expand Down Expand Up @@ -278,7 +278,6 @@ class HashIndex final : public OnDiskHashIndex {
const transaction::Transaction* transaction, slot_id_t pSlotId);

private:
DBFileIDAndName dbFileIDAndName;
ShadowFile* shadowFile;
uint64_t headerPageIdx;
FileHandle* fileHandle;
Expand Down Expand Up @@ -307,9 +306,9 @@ inline bool HashIndex<common::ku_string_t>::equals(const transaction::Transactio

class PrimaryKeyIndex {
public:
PrimaryKeyIndex(const DBFileIDAndName& dbFileIDAndName, bool readOnly, bool inMemMode,
common::PhysicalTypeID keyDataType, MemoryManager& memoryManager, ShadowFile* shadowFile,
common::VirtualFileSystem* vfs, main::ClientContext* context);
PrimaryKeyIndex(FileHandle* dataFH, bool inMemMode, common::PhysicalTypeID keyDataType,
MemoryManager& memoryManager, ShadowFile* shadowFile, common::page_idx_t firstHeaderPage,
common::page_idx_t overflowHeaderPage);

~PrimaryKeyIndex();

Expand Down Expand Up @@ -392,17 +391,20 @@ class PrimaryKeyIndex {

void writeHeaders();

void serialize(common::Serializer& serializer) const;

private:
common::PhysicalTypeID keyDataTypeID;
FileHandle* fileHandle;
std::unique_ptr<OverflowFile> overflowFile;
std::vector<std::unique_ptr<OnDiskHashIndex>> hashIndices;
std::vector<HashIndexHeader> hashIndexHeadersForReadTrx;
std::vector<HashIndexHeader> hashIndexHeadersForWriteTrx;
DBFileIDAndName dbFileIDAndName;
ShadowFile& shadowFile;
// Stores both primary and overflow slots
std::unique_ptr<DiskArrayCollection> hashIndexDiskArrays;
common::page_idx_t firstHeaderPage;
common::page_idx_t overflowHeaderPage;
};

} // namespace storage
Expand Down
8 changes: 4 additions & 4 deletions src/include/storage/shadow_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ShadowUtils {
// Where possible, updatePage/insertNewPage should be used instead
static ShadowPageAndFrame createShadowVersionIfNecessaryAndPinPage(
common::page_idx_t originalPage, bool insertingNewPage, FileHandle& fileHandle,
DBFileID dbFileID, ShadowFile& shadowFile);
ShadowFile& shadowFile);

static std::pair<FileHandle*, common::page_idx_t> getFileHandleAndPhysicalPageIdxToPin(
FileHandle& fileHandle, common::page_idx_t pageIdx, const ShadowFile& shadowFile,
Expand All @@ -47,16 +47,16 @@ class ShadowUtils {
const std::function<void(uint8_t*)>& readOp);

static common::page_idx_t insertNewPage(
FileHandle& fileHandle, DBFileID dbFileID, ShadowFile& shadowFile,
FileHandle& fileHandle, ShadowFile& shadowFile,
const std::function<void(uint8_t*)>& insertOp = [](uint8_t*) -> void {
// DO NOTHING.
});

// Note: This function updates a page "transactionally", i.e., creates the WAL version of the
// page if it doesn't exist. For the original page to be updated, the current WRITE trx needs to
// commit and checkpoint.
static void updatePage(FileHandle& fileHandle, DBFileID dbFileID,
common::page_idx_t originalPageIdx, bool isInsertingNewPage, ShadowFile& shadowFile,
static void updatePage(FileHandle& fileHandle, common::page_idx_t originalPageIdx,
bool isInsertingNewPage, ShadowFile& shadowFile,
const std::function<void(uint8_t*)>& updateOp);
};
} // namespace storage
Expand Down
9 changes: 5 additions & 4 deletions src/include/storage/storage_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ class KUZU_API StorageManager {

static void recover(main::ClientContext& clientContext);

void createTable(catalog::CatalogEntry* entry, main::ClientContext* context);
void createTable(catalog::CatalogEntry* entry, const main::ClientContext* context);

void checkpoint(main::ClientContext& clientContext);
void finalizeCheckpoint(main::ClientContext& clientContext);
void rollbackCheckpoint(main::ClientContext& clientContext);
void rollbackCheckpoint(const main::ClientContext& clientContext);

Table* getTable(common::table_id_t tableID) {
std::lock_guard lck{mtx};
Expand All @@ -54,9 +54,10 @@ class KUZU_API StorageManager {

void loadTables(const catalog::Catalog& catalog, common::VirtualFileSystem* vfs,
main::ClientContext* context);
void createNodeTable(catalog::NodeTableCatalogEntry* entry, main::ClientContext* context);
void createNodeTable(catalog::NodeTableCatalogEntry* entry);
void createRelTable(catalog::RelTableCatalogEntry* entry);
void createRelTableGroup(catalog::RelGroupCatalogEntry* entry, main::ClientContext* context);
void createRelTableGroup(const catalog::RelGroupCatalogEntry* entry,
const main::ClientContext* context);

void reclaimDroppedTables(const main::ClientContext& clientContext);

Expand Down
37 changes: 16 additions & 21 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ struct PIPUpdates {
std::optional<PIPWrapper> updatedLastPIP;
std::vector<PIPWrapper> newPIPs;

inline void clear() {
void clear() {
updatedLastPIP.reset();
newPIPs.clear();
}
Expand Down Expand Up @@ -101,11 +101,9 @@ struct PIPUpdates {
class DiskArrayInternal {
public:
// Used when loading from file
DiskArrayInternal(FileHandle& fileHandle, DBFileID dbFileID,
const DiskArrayHeader& headerForReadTrx, DiskArrayHeader& headerForWriteTrx,
ShadowFile* shadowFile, uint64_t elementSize, bool bypassShadowing = false);

virtual ~DiskArrayInternal() = default;
DiskArrayInternal(FileHandle& fileHandle, const DiskArrayHeader& headerForReadTrx,
DiskArrayHeader& headerForWriteTrx, ShadowFile* shadowFile, uint64_t elementSize,
bool bypassShadowing = false);

uint64_t getNumElements(
transaction::TransactionType trxType = transaction::TransactionType::READ_ONLY);
Expand All @@ -124,16 +122,16 @@ class DiskArrayInternal {
uint64_t resize(const transaction::Transaction* transaction, uint64_t newNumElements,
std::span<std::byte> defaultVal);

virtual inline void checkpointInMemoryIfNecessary() {
void checkpointInMemoryIfNecessary() {
std::unique_lock xlock{this->diskArraySharedMtx};
checkpointOrRollbackInMemoryIfNecessaryNoLock(true /* is checkpoint */);
}
virtual inline void rollbackInMemoryIfNecessary() {
void rollbackInMemoryIfNecessary() {
std::unique_lock xlock{this->diskArraySharedMtx};
checkpointOrRollbackInMemoryIfNecessaryNoLock(false /* is rollback */);
}

virtual void checkpoint();
void checkpoint();

// Write WriteIterator for making fast bulk changes to the disk array
// The pages are cached while the elements are stored on the same page
Expand Down Expand Up @@ -179,7 +177,7 @@ class DiskArrayInternal {
return std::span(shadowPageAndFrame.frame + apCursor.elemPosInPage, valueSize);
}

inline uint64_t size() const { return diskArray.headerForWriteTrx.numElements; }
uint64_t size() const { return diskArray.headerForWriteTrx.numElements; }

private:
void unpin();
Expand All @@ -196,13 +194,11 @@ class DiskArrayInternal {

void updateLastPageOnDisk();

uint64_t pushBackNoLock(std::span<std::byte> val);

inline uint64_t getNumElementsNoLock(transaction::TransactionType trxType) const {
uint64_t getNumElementsNoLock(transaction::TransactionType trxType) const {
return getDiskArrayHeader(trxType).numElements;
}

inline uint64_t getNumAPs(const DiskArrayHeader& header) const {
uint64_t getNumAPs(const DiskArrayHeader& header) const {
return (header.numElements + storageInfo.numElementsPerPage - 1) /
storageInfo.numElementsPerPage;
}
Expand All @@ -220,13 +216,13 @@ class DiskArrayInternal {

void clearWALPageVersionAndRemovePageFromFrameIfNecessary(common::page_idx_t pageIdx);

virtual void checkpointOrRollbackInMemoryIfNecessaryNoLock(bool isCheckpoint);
void checkpointOrRollbackInMemoryIfNecessaryNoLock(bool isCheckpoint);

private:
bool checkOutOfBoundAccess(transaction::TransactionType trxType, uint64_t idx) const;
bool hasPIPUpdatesNoLock(uint64_t pipIdx);
bool hasPIPUpdatesNoLock(uint64_t pipIdx) const;

inline const DiskArrayHeader& getDiskArrayHeader(transaction::TransactionType trxType) const {
const DiskArrayHeader& getDiskArrayHeader(transaction::TransactionType trxType) const {
if (trxType == transaction::TransactionType::CHECKPOINT) {
return headerForWriteTrx;
}
Expand All @@ -241,7 +237,6 @@ class DiskArrayInternal {
protected:
PageStorageInfo storageInfo;
FileHandle& fileHandle;
DBFileID dbFileID;
const DiskArrayHeader& header;
DiskArrayHeader& headerForWriteTrx;
bool hasTransactionalUpdates;
Expand All @@ -267,10 +262,10 @@ class DiskArray {
// If bypassWAL is set, the buffer manager is used to pages new to this transaction to the
// original file, but does not handle flushing them. BufferManager::flushAllDirtyPagesInFrames
// should be called on this file handle exactly once during prepare commit.
DiskArray(FileHandle& fileHandle, DBFileID dbFileID, const DiskArrayHeader& headerForReadTrx,
DiskArray(FileHandle& fileHandle, const DiskArrayHeader& headerForReadTrx,
DiskArrayHeader& headerForWriteTrx, ShadowFile* shadowFile, bool bypassWAL = false)
: diskArray(fileHandle, dbFileID, headerForReadTrx, headerForWriteTrx, shadowFile,
sizeof(U), bypassWAL) {}
: diskArray(fileHandle, headerForReadTrx, headerForWriteTrx, shadowFile, sizeof(U),
bypassWAL) {}

// Note: This function is to be used only by the WRITE trx.
// The return value is the idx of val in array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class DiskArrayCollection {
static_assert(std::has_unique_object_representations_v<HeaderPage>);

public:
DiskArrayCollection(FileHandle& fileHandle, DBFileID dbFileID, ShadowFile& shadowFile,
DiskArrayCollection(FileHandle& fileHandle, ShadowFile& shadowFile,
common::page_idx_t firstHeaderPage = 0, bool bypassShadowing = false);

void checkpoint();
Expand All @@ -52,15 +52,14 @@ class DiskArrayCollection {
->headers[idx % HeaderPage::NUM_HEADERS_PER_PAGE];
auto& writeHeader = headersForWriteTrx[idx / HeaderPage::NUM_HEADERS_PER_PAGE]
->headers[idx % HeaderPage::NUM_HEADERS_PER_PAGE];
return std::make_unique<DiskArray<T>>(fileHandle, dbFileID, readHeader, writeHeader,
&shadowFile, bypassShadowing);
return std::make_unique<DiskArray<T>>(fileHandle, readHeader, writeHeader, &shadowFile,
bypassShadowing);
}

size_t addDiskArray();

private:
FileHandle& fileHandle;
DBFileID dbFileID;
ShadowFile& shadowFile;
bool bypassShadowing;
common::page_idx_t headerPagesOnDisk;
Expand Down
30 changes: 13 additions & 17 deletions src/include/storage/storage_structure/overflow_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
#include "storage//file_handle.h"
#include "storage/index/hash_index_utils.h"
#include "storage/storage_utils.h"
#include "storage/wal/shadow_file.h"
#include "storage/wal/wal.h"
#include "transaction/transaction.h"

namespace kuzu {
namespace storage {
Expand All @@ -28,7 +25,8 @@ class OverflowFileHandle {
// Moving the handle would invalidate those pointers
OverflowFileHandle(OverflowFileHandle&& other) = delete;

std::string readString(transaction::TransactionType trxType, const common::ku_string_t& str);
std::string readString(transaction::TransactionType trxType,
const common::ku_string_t& str) const;

bool equals(transaction::TransactionType trxType, std::string_view keyToLookup,
const common::ku_string_t& keyInEntry) const;
Expand Down Expand Up @@ -81,16 +79,11 @@ class OverflowFile {

public:
// For reading an existing overflow file
OverflowFile(const DBFileIDAndName& dbFileIdAndName, MemoryManager& memoryManager,
ShadowFile* shadowFile, bool readOnly, common::VirtualFileSystem* vfs,
main::ClientContext* context);
OverflowFile(FileHandle* dataFH, MemoryManager& memoryManager, ShadowFile* shadowFile,
common::page_idx_t headerPageIdx);

virtual ~OverflowFile() = default;

// For creating an overflow file from scratch
static void createEmptyFiles(const std::string& fName, common::VirtualFileSystem* vfs,
main::ClientContext* context);

// Handles contain a reference to the overflow file
OverflowFile(OverflowFile&& other) = delete;

Expand All @@ -111,13 +104,16 @@ class OverflowFile {
}

protected:
explicit OverflowFile(const DBFileIDAndName& dbFileIdAndName, MemoryManager& memoryManager);
explicit OverflowFile(FileHandle* dataFH, MemoryManager& memoryManager);

common::page_idx_t getNewPageIdx() {
// If this isn't the first call reserving the page header, then the header flag must be set
// prior to this
KU_ASSERT(pageCounter == HEADER_PAGE_IDX || headerChanged);
return pageCounter.fetch_add(1);
if (fileHandle) {
return fileHandle->addNewPage();
} else {
return pageCounter.fetch_add(1);
}
}

private:
Expand All @@ -133,18 +129,18 @@ class OverflowFile {
std::vector<std::unique_ptr<OverflowFileHandle>> handles;
StringOverflowFileHeader header;
common::page_idx_t numPagesOnDisk;
DBFileID dbFileID;
FileHandle* fileHandle;
ShadowFile* shadowFile;
MemoryManager& memoryManager;
std::atomic<common::page_idx_t> pageCounter;
std::atomic<bool> headerChanged;
common::page_idx_t headerPageIdx;
};

class InMemOverflowFile final : public OverflowFile {
public:
explicit InMemOverflowFile(const DBFileIDAndName& dbFileIDAndName, MemoryManager& memoryManager)
: OverflowFile{dbFileIDAndName, memoryManager} {}
explicit InMemOverflowFile(MemoryManager& memoryManager)
: OverflowFile{nullptr, memoryManager} {}
};

} // namespace storage
Expand Down
Loading