Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ jobs:
compiler:
- gcc
- clang
dqlite-next:
- yes
- no
runs-on: ${{ matrix.os }}

steps:
Expand All @@ -37,7 +34,7 @@ jobs:
run: |
autoreconf -i
./configure --enable-debug --enable-code-coverage --enable-sanitize \
--enable-build-raft --enable-dqlite-next=${{ matrix.dqlite-next }}
--enable-build-raft
make -j$(nproc) check-norun

- name: Test
Expand Down
4 changes: 0 additions & 4 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ endif
AM_LDFLAGS = $(static)
AM_LDFLAGS += $(UV_LIBS) $(PTHREAD_LIBS)

if DQLITE_NEXT_ENABLED
AM_CFLAGS += -DDQLITE_NEXT
endif

if DEBUG_ENABLED
AM_CFLAGS += -O0
else
Expand Down
3 changes: 0 additions & 3 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ AS_IF([test "x$enable_build_raft" != "xyes"],
AC_MSG_ERROR([linking to a separately-built libraft is no longer supported]),
[])

AC_ARG_ENABLE(dqlite-next, AS_HELP_STRING([--enable-dqlite-next[=ARG]], [build with the experimental dqlite backend [default=no]]))
AM_CONDITIONAL(DQLITE_NEXT_ENABLED, test "x$enable_dqlite_next" = "xyes")

AC_ARG_WITH(static-deps,
AS_HELP_STRING([--with-static-deps[=ARG]],
[skip building a shared library and link test binaries statically]))
Expand Down
24 changes: 0 additions & 24 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -565,39 +565,15 @@ static void query_batch_async(struct handle *req, enum pool_half half)
}
}

#ifdef DQLITE_NEXT

static void qb_top(pool_work_t *w)
{
struct handle *req = CONTAINER_OF(w, struct handle, work);
query_batch_async(req, POOL_TOP_HALF);
}

static void qb_bottom(pool_work_t *w)
{
struct handle *req = CONTAINER_OF(w, struct handle, work);
query_batch_async(req, POOL_BOTTOM_HALF);
}

#endif

static void query_batch(struct gateway *g)
{
struct handle *req = g->req;
assert(req != NULL);
g->req = NULL;
req->gw = g;

#ifdef DQLITE_NEXT
struct dqlite_node *node = g->raft->data;
pool_t *pool = !!(pool_ut_fallback()->flags & POOL_FOR_UT)
? pool_ut_fallback() : &node->pool;
pool_queue_work(pool, &req->work, g->leader->db->cookie,
WT_UNORD, qb_top, qb_bottom);
#else
query_batch_async(req, POOL_TOP_HALF);
query_batch_async(req, POOL_BOTTOM_HALF);
#endif
}

static void query_barrier_cb(struct barrier *barrier, int status)
Expand Down
30 changes: 2 additions & 28 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ static void leaderMaybeCheckpointLegacy(struct leader *l)
tracef("raft_malloc - no mem");
goto err_after_buf_alloc;
}
rv = raft_apply(l->raft, apply, &buf, NULL, 1, leaderCheckpointApplyCb);
rv = raft_apply(l->raft, apply, &buf, 1, leaderCheckpointApplyCb);
if (rv != 0) {
tracef("raft_apply failed %d", rv);
raft_free(apply);
Expand Down Expand Up @@ -332,9 +332,7 @@ static int leaderApplyFrames(struct exec *req,
apply->type = COMMAND_FRAMES;
idSet(apply->req.req_id, req->id);

/* TODO actual WAL slice goes here */
struct raft_entry_local_data local_data = {};
rv = raft_apply(l->raft, &apply->req, &buf, &local_data, 1, leaderApplyFramesCb);
rv = raft_apply(l->raft, &apply->req, &buf, 1, leaderApplyFramesCb);
if (rv != 0) {
tracef("raft apply failed %d", rv);
goto err_after_command_encode;
Expand Down Expand Up @@ -409,22 +407,6 @@ static void leaderExecV2(struct exec *req, enum pool_half half)
leaderExecDone(l->exec);
}

#ifdef DQLITE_NEXT

static void exec_top(pool_work_t *w)
{
struct exec *req = CONTAINER_OF(w, struct exec, work);
leaderExecV2(req, POOL_TOP_HALF);
}

static void exec_bottom(pool_work_t *w)
{
struct exec *req = CONTAINER_OF(w, struct exec, work);
leaderExecV2(req, POOL_BOTTOM_HALF);
}

#endif

static void execBarrierCb(struct barrier *barrier, int status)
{
tracef("exec barrier cb status %d", status);
Expand All @@ -437,16 +419,8 @@ static void execBarrierCb(struct barrier *barrier, int status)
return;
}

#ifdef DQLITE_NEXT
struct dqlite_node *node = l->raft->data;
pool_t *pool = !!(pool_ut_fallback()->flags & POOL_FOR_UT)
? pool_ut_fallback() : &node->pool;
pool_queue_work(pool, &req->work, l->db->cookie,
WT_UNORD, exec_top, exec_bottom);
#else
leaderExecV2(req, POOL_TOP_HALF);
leaderExecV2(req, POOL_BOTTOM_HALF);
#endif
}

int leader__exec(struct leader *l,
Expand Down
30 changes: 0 additions & 30 deletions src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,28 +191,6 @@ enum {
RAFT_CHANGE /* Raft configuration change. */
};

/**
* A small fixed-size inline buffer that stores extra data for a raft_entry
* that is different for each node in the cluster.
*
* A leader initializes the local data for an entry before passing it into
* raft_apply. This local data is stored in the volatile raft log and also
* in the persistent raft log on the leader. AppendEntries messages sent by
* the leader never contain the local data for entries.
*
* When a follower accepts an AppendEntries request, it invokes a callback
* provided by the FSM to fill out the local data for each new entry before
* appending the entries to its log (volatile and persistent). This local
* data doesn't have to be the same as the local data that the leader computed.
*
* When starting up, a raft node reads the local data for each entry for its
* persistent log as part of populating the volatile log.
*/
struct raft_entry_local_data {
/* Must be the only member of this struct. */
uint8_t buf[16];
};

/**
* A single entry in the raft log.
*
Expand Down Expand Up @@ -243,20 +221,13 @@ struct raft_entry_local_data {
* message or in the persistent log. This field can be used by the FSM's `apply`
* callback to handle a COMMAND entry differently depending on whether it
* originated locally.
*
* Note: The @local_data and @is_local fields do not exist when we use an external
* libraft, because the last separate release of libraft predates their addition.
* The ifdef at the very top of this file ensures that we use the system raft headers
* when we build against an external libraft, so there will be no ABI mismatch as
* a result of incompatible struct layouts.
*/
struct raft_entry
{
raft_term term; /* Term in which the entry was created. */
unsigned short type; /* Type (FSM command, barrier, config change). */
bool is_local; /* Placed here so it goes in the padding after @type. */
struct raft_buffer buf; /* Entry data. */
struct raft_entry_local_data local_data;
void *batch; /* Batch that buf's memory points to, if any. */
};

Expand Down Expand Up @@ -1237,7 +1208,6 @@ struct raft_apply
RAFT_API int raft_apply(struct raft *r,
struct raft_apply *req,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n,
raft_apply_cb cb);

Expand Down
5 changes: 2 additions & 3 deletions src/raft/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
int raft_apply(struct raft *r,
struct raft_apply *req,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n,
raft_apply_cb cb)
{
Expand Down Expand Up @@ -42,7 +41,7 @@ int raft_apply(struct raft *r,
req->cb = cb;

/* Append the new entries to the log. */
rv = logAppendCommands(r->log, r->current_term, bufs, local_data, n);
rv = logAppendCommands(r->log, r->current_term, bufs, n);
if (rv != 0) {
goto err;
}
Expand Down Expand Up @@ -91,7 +90,7 @@ int raft_barrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb)
req->index = index;
req->cb = cb;

rv = logAppend(r->log, r->current_term, RAFT_BARRIER, buf, (struct raft_entry_local_data){}, true, NULL);
rv = logAppend(r->log, r->current_term, RAFT_BARRIER, buf, true, NULL);
if (rv != 0) {
goto err_after_buf_alloc;
}
Expand Down
2 changes: 1 addition & 1 deletion src/raft/fixture.c
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ static void copyLeaderLog(struct raft_fixture *f)
assert(buf.base != NULL);
memcpy(buf.base, entry->buf.base, buf.len);
/* FIXME(cole) what to do here for is_local? */
rv = logAppend(f->log, entry->term, entry->type, buf, (struct raft_entry_local_data){}, false, NULL);
rv = logAppend(f->log, entry->term, entry->type, buf, false, NULL);
assert(rv == 0);
}
logRelease(raft->log, 1, entries, n);
Expand Down
8 changes: 2 additions & 6 deletions src/raft/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ int logAppend(struct raft_log *l,
const raft_term term,
const unsigned short type,
struct raft_buffer buf,
struct raft_entry_local_data local_data,
bool is_local,
void *batch)
{
Expand Down Expand Up @@ -576,7 +575,6 @@ int logAppend(struct raft_log *l,
entry->type = type;
entry->buf = buf;
entry->batch = batch;
entry->local_data = local_data;
entry->is_local = is_local;

l->back += 1;
Expand All @@ -588,7 +586,6 @@ int logAppend(struct raft_log *l,
int logAppendCommands(struct raft_log *l,
const raft_term term,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n)
{
unsigned i;
Expand All @@ -600,8 +597,7 @@ int logAppendCommands(struct raft_log *l,
assert(n > 0);

for (i = 0; i < n; i++) {
struct raft_entry_local_data loc = (local_data != NULL) ? local_data[i] : (struct raft_entry_local_data){};
rv = logAppend(l, term, RAFT_COMMAND, bufs[i], loc, true, NULL);
rv = logAppend(l, term, RAFT_COMMAND, bufs[i], true, NULL);
if (rv != 0) {
return rv;
}
Expand All @@ -628,7 +624,7 @@ int logAppendConfiguration(struct raft_log *l,
}

/* Append the new entry to the log. */
rv = logAppend(l, term, RAFT_CHANGE, buf, (struct raft_entry_local_data){}, true, NULL);
rv = logAppend(l, term, RAFT_CHANGE, buf, true, NULL);
if (rv != 0) {
goto err_after_encode;
}
Expand Down
2 changes: 0 additions & 2 deletions src/raft/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,13 @@ int logAppend(struct raft_log *l,
raft_term term,
unsigned short type,
struct raft_buffer buf,
struct raft_entry_local_data local_data,
bool is_local,
void *batch);

/* Convenience to append a series of #RAFT_COMMAND entries. */
int logAppendCommands(struct raft_log *l,
const raft_term term,
const struct raft_buffer bufs[],
const struct raft_entry_local_data local_data[],
const unsigned n);

/* Convenience to encode and append a single #RAFT_CHANGE entry. */
Expand Down
2 changes: 1 addition & 1 deletion src/raft/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,7 @@ int replicationAppend(struct raft *r,
goto err_after_request_alloc;
}

rv = logAppend(r->log, copy.term, copy.type, copy.buf, (struct raft_entry_local_data){}, false, NULL);
rv = logAppend(r->log, copy.term, copy.type, copy.buf, false, NULL);
if (rv != 0) {
goto err_after_request_alloc;
}
Expand Down
2 changes: 1 addition & 1 deletion src/raft/start.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static int restoreEntries(struct raft *r,
for (i = 0; i < n; i++) {
struct raft_entry *entry = &entries[i];
rv = logAppend(r->log, entry->term, entry->type, entry->buf,
entry->local_data, entry->is_local, entry->batch);
entry->is_local, entry->batch);
if (rv != 0) {
goto err;
}
Expand Down
2 changes: 1 addition & 1 deletion src/raft/uv_append.c
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ static size_t uvAppendSize(struct uvAppend *a)
{
size_t size = sizeof(uint32_t) * 2; /* CRC checksums */
unsigned i;
size += uvSizeofBatchHeader(a->n, true); /* Batch header */
size += uvSizeofBatchHeader(a->n); /* Batch header */
for (i = 0; i < a->n; i++) { /* Entries data */
size += bytePad64(a->entries[i].buf.len);
}
Expand Down
Loading
Loading