Skip to content
This repository was archived by the owner on Nov 25, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,14 @@ import (
"context"
"database/sql"
"fmt"

"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/util"
)

func UpDropEventReferenceSHAEvents(ctx context.Context, tx *sql.Tx) error {
var count int
err := tx.QueryRowContext(ctx, `SELECT count(*) FROM roomserver_events GROUP BY event_id HAVING count(event_id) > 1`).
Scan(&count)
if err != nil && err != sql.ErrNoRows {
return fmt.Errorf("failed to query duplicate event ids")
}
if count > 0 {
return fmt.Errorf("unable to drop column, as there are duplicate event ids")
}
_, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_events DROP COLUMN IF EXISTS reference_sha256;`)
_, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_events DROP COLUMN IF EXISTS reference_sha256;`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
Expand All @@ -46,9 +41,80 @@ func UpDropEventReferenceSHAPrevEvents(ctx context.Context, tx *sql.Tx) error {
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}

// figure out if there are duplicates
dupeRows, err := tx.QueryContext(ctx, `SELECT previous_event_id FROM roomserver_previous_events GROUP BY previous_event_id HAVING count(previous_event_id) > 1`)
if err != nil {
return fmt.Errorf("failed to query duplicate event ids")
}
defer internal.CloseAndLogIfError(ctx, dupeRows, "failed to close rows")

var prevEvents []string
var prevEventID string
for dupeRows.Next() {
if err = dupeRows.Scan(&prevEventID); err != nil {
return err
}
prevEvents = append(prevEvents, prevEventID)
}
if dupeRows.Err() != nil {
return dupeRows.Err()
}

// if we found duplicates, check if we can combine them, e.g. they are in the same room
for _, dupeID := range prevEvents {
var dupeNIDsRows *sql.Rows
dupeNIDsRows, err = tx.QueryContext(ctx, `SELECT event_nids FROM roomserver_previous_events WHERE previous_event_id = $1`, dupeID)
if err != nil {
return fmt.Errorf("failed to query duplicate event ids")
}
defer internal.CloseAndLogIfError(ctx, dupeNIDsRows, "failed to close rows")
var dupeNIDs []int64
for dupeNIDsRows.Next() {
var nids pq.Int64Array
if err = dupeNIDsRows.Scan(&nids); err != nil {
return err
}
dupeNIDs = append(dupeNIDs, nids...)
}

if dupeNIDsRows.Err() != nil {
return dupeNIDsRows.Err()
}
// dedupe NIDs
dupeNIDs = dupeNIDs[:util.SortAndUnique(nids(dupeNIDs))]
// now that we have all NIDs, check which room they belong to
var roomCount int
err = tx.QueryRowContext(ctx, `SELECT count(distinct room_nid) FROM roomserver_events WHERE event_nid = ANY($1)`, pq.Array(dupeNIDs)).Scan(&roomCount)
if err != nil {
return err
}
// if the events are from different rooms, that's bad and we can't continue
if roomCount > 1 {
return fmt.Errorf("detected events (%v) referenced for different rooms (%v)", dupeNIDs, roomCount)
}
// otherwise delete the dupes
_, err = tx.ExecContext(ctx, "DELETE FROM roomserver_previous_events WHERE previous_event_id = $1", dupeID)
if err != nil {
return fmt.Errorf("unable to delete duplicates: %w", err)
}

// insert combined values
_, err = tx.ExecContext(ctx, "INSERT INTO roomserver_previous_events (previous_event_id, event_nids) VALUES ($1, $2)", dupeID, pq.Array(dupeNIDs))
if err != nil {
return fmt.Errorf("unable to insert new event NIDs: %w", err)
}
}

_, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_previous_events ADD CONSTRAINT roomserver_previous_event_id_unique UNIQUE (previous_event_id);`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}

type nids []int64

func (s nids) Len() int { return len(s) }
func (s nids) Less(i, j int) bool { return s[i] < s[j] }
func (s nids) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package deltas

import (
"testing"

"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/stretchr/testify/assert"
)

func TestUpDropEventReferenceSHAPrevEvents(t *testing.T) {

cfg, ctx, close := testrig.CreateConfig(t, test.DBTypePostgres)
defer close()

db, err := sqlutil.Open(&cfg.Global.DatabaseOptions, sqlutil.NewDummyWriter())
assert.Nil(t, err)
assert.NotNil(t, db)
defer db.Close()

// create the table in the old layout
_, err = db.ExecContext(ctx.Context(), `
CREATE TABLE IF NOT EXISTS roomserver_previous_events (
previous_event_id TEXT NOT NULL,
event_nids BIGINT[] NOT NULL,
previous_reference_sha256 BYTEA NOT NULL,
CONSTRAINT roomserver_previous_event_id_unique UNIQUE (previous_event_id, previous_reference_sha256)
);`)
assert.Nil(t, err)

// create the events table as well, slimmed down with one eventNID
_, err = db.ExecContext(ctx.Context(), `
CREATE SEQUENCE IF NOT EXISTS roomserver_event_nid_seq;
CREATE TABLE IF NOT EXISTS roomserver_events (
event_nid BIGINT PRIMARY KEY DEFAULT nextval('roomserver_event_nid_seq'),
room_nid BIGINT NOT NULL
);

INSERT INTO roomserver_events (event_nid, room_nid) VALUES (1, 1)
`)
assert.Nil(t, err)

// insert duplicate prev events with different event_nids
stmt, err := db.PrepareContext(ctx.Context(), `INSERT INTO roomserver_previous_events (previous_event_id, event_nids, previous_reference_sha256) VALUES ($1, $2, $3)`)
assert.Nil(t, err)
assert.NotNil(t, stmt)
_, err = stmt.ExecContext(ctx.Context(), "1", pq.Array([]int64{1, 2}), "a")
assert.Nil(t, err)
_, err = stmt.ExecContext(ctx.Context(), "1", pq.Array([]int64{1, 2, 3}), "b")
assert.Nil(t, err)
// execute the migration
txn, err := db.Begin()
assert.Nil(t, err)
assert.NotNil(t, txn)
defer txn.Rollback()
err = UpDropEventReferenceSHAPrevEvents(ctx.Context(), txn)
assert.NoError(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"context"
"database/sql"
"fmt"

"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/util"
)

func UpDropEventReferenceSHA(ctx context.Context, tx *sql.Tx) error {
Expand Down Expand Up @@ -52,8 +56,72 @@ func UpDropEventReferenceSHAPrevEvents(ctx context.Context, tx *sql.Tx) error {
return fmt.Errorf("tx.ExecContext: %w", err)
}

// figure out if there are duplicates
dupeRows, err := tx.QueryContext(ctx, `SELECT previous_event_id FROM _roomserver_previous_events GROUP BY previous_event_id HAVING count(previous_event_id) > 1`)
if err != nil {
return fmt.Errorf("failed to query duplicate event ids")
}
defer internal.CloseAndLogIfError(ctx, dupeRows, "failed to close rows")

var prevEvents []string
var prevEventID string
for dupeRows.Next() {
if err = dupeRows.Scan(&prevEventID); err != nil {
return err
}
prevEvents = append(prevEvents, prevEventID)
}
if dupeRows.Err() != nil {
return dupeRows.Err()
}

// if we found duplicates, check if we can combine them, e.g. they are in the same room
for _, dupeID := range prevEvents {
var dupeNIDsRows *sql.Rows
dupeNIDsRows, err = tx.QueryContext(ctx, `SELECT event_nids FROM _roomserver_previous_events WHERE previous_event_id = $1`, dupeID)
if err != nil {
return fmt.Errorf("failed to query duplicate event ids")
}
defer internal.CloseAndLogIfError(ctx, dupeNIDsRows, "failed to close rows")
var dupeNIDs []int64
for dupeNIDsRows.Next() {
var nids pq.Int64Array
if err = dupeNIDsRows.Scan(&nids); err != nil {
return err
}
dupeNIDs = append(dupeNIDs, nids...)
}

if dupeNIDsRows.Err() != nil {
return dupeNIDsRows.Err()
}
// dedupe NIDs
dupeNIDs = dupeNIDs[:util.SortAndUnique(nids(dupeNIDs))]
// now that we have all NIDs, check which room they belong to
var roomCount int
err = tx.QueryRowContext(ctx, `SELECT count(distinct room_nid) FROM roomserver_events WHERE event_nid IN ($1)`, pq.Array(dupeNIDs)).Scan(&roomCount)
if err != nil {
return err
}
// if the events are from different rooms, that's bad and we can't continue
if roomCount > 1 {
return fmt.Errorf("detected events (%v) referenced for different rooms (%v)", dupeNIDs, roomCount)
}
// otherwise delete the dupes
_, err = tx.ExecContext(ctx, "DELETE FROM _roomserver_previous_events WHERE previous_event_id = $1", dupeID)
if err != nil {
return fmt.Errorf("unable to delete duplicates: %w", err)
}

// insert combined values
_, err = tx.ExecContext(ctx, "INSERT INTO _roomserver_previous_events (previous_event_id, event_nids) VALUES ($1, $2)", dupeID, pq.Array(dupeNIDs))
if err != nil {
return fmt.Errorf("unable to insert new event NIDs: %w", err)
}
}

// move data
if _, err := tx.ExecContext(ctx, `
if _, err = tx.ExecContext(ctx, `
INSERT
INTO roomserver_previous_events (
previous_event_id, event_nids
Expand All @@ -64,9 +132,15 @@ INSERT
return fmt.Errorf("tx.ExecContext: %w", err)
}
// drop old table
_, err := tx.ExecContext(ctx, `DROP TABLE _roomserver_previous_events;`)
_, err = tx.ExecContext(ctx, `DROP TABLE _roomserver_previous_events;`)
if err != nil {
return fmt.Errorf("failed to execute upgrade: %w", err)
}
return nil
}

type nids []int64

func (s nids) Len() int { return len(s) }
func (s nids) Less(i, j int) bool { return s[i] < s[j] }
func (s nids) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package deltas

import (
"testing"

"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/test/testrig"
"github.com/stretchr/testify/assert"
)

func TestUpDropEventReferenceSHAPrevEvents(t *testing.T) {

cfg, ctx, close := testrig.CreateConfig(t, test.DBTypeSQLite)
defer close()

db, err := sqlutil.Open(&cfg.RoomServer.Database, sqlutil.NewExclusiveWriter())
assert.Nil(t, err)
assert.NotNil(t, db)
defer db.Close()

// create the table in the old layout
_, err = db.ExecContext(ctx.Context(), `
CREATE TABLE IF NOT EXISTS roomserver_previous_events (
previous_event_id TEXT NOT NULL,
previous_reference_sha256 BLOB,
event_nids TEXT NOT NULL,
UNIQUE (previous_event_id, previous_reference_sha256)
);`)
assert.Nil(t, err)

// create the events table as well, slimmed down with one eventNID
_, err = db.ExecContext(ctx.Context(), `
CREATE TABLE IF NOT EXISTS roomserver_events (
event_nid INTEGER PRIMARY KEY AUTOINCREMENT,
room_nid INTEGER NOT NULL
);

INSERT INTO roomserver_events (event_nid, room_nid) VALUES (1, 1)
`)
assert.Nil(t, err)

// insert duplicate prev events with different event_nids
stmt, err := db.PrepareContext(ctx.Context(), `INSERT INTO roomserver_previous_events (previous_event_id, event_nids, previous_reference_sha256) VALUES ($1, $2, $3)`)
assert.Nil(t, err)
assert.NotNil(t, stmt)
_, err = stmt.ExecContext(ctx.Context(), "1", "{1,2}", "a")
assert.Nil(t, err)
_, err = stmt.ExecContext(ctx.Context(), "1", "{1,2,3}", "b")
assert.Nil(t, err)

// execute the migration
txn, err := db.Begin()
assert.Nil(t, err)
assert.NotNil(t, txn)
err = UpDropEventReferenceSHAPrevEvents(ctx.Context(), txn)
defer txn.Rollback()
assert.NoError(t, err)
}