Skip to content

Commit

Permalink
Update testdb.sql to reflect schema changes and cleanup sql variables
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Jan 6, 2023
1 parent 700d5ec commit 91ec410
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 108 deletions.
72 changes: 26 additions & 46 deletions archives/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
visibilityDeletedBySender = "X"
)

const lookupMsgs = `
const sqlLookupMsgs = `
SELECT rec.visibility, row_to_json(rec) FROM (
SELECT
mm.id,
Expand Down Expand Up @@ -58,8 +58,7 @@ SELECT rec.visibility, row_to_json(rec) FROM (
LEFT JOIN LATERAL (select coalesce(jsonb_agg(label_row), '[]'::jsonb) as data from (select uuid, name from msgs_label ml INNER JOIN msgs_msg_labels mml ON ml.id = mml.label_id AND mml.msg_id = mm.id) as label_row) as labels_agg ON True
WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3
ORDER BY created_on ASC, id ASC) rec;
`
ORDER BY created_on ASC, id ASC) rec;`

// writeMessageRecords writes the messages in the archive's date range to the passed in writer
func writeMessageRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) {
Expand All @@ -69,7 +68,7 @@ func writeMessageRecords(ctx context.Context, db *sqlx.DB, archive *Archive, wri
// first write our normal records
var record, visibility string

rows, err := db.QueryxContext(ctx, lookupMsgs, archive.Org.ID, archive.StartDate, archive.endDate())
rows, err := db.QueryxContext(ctx, sqlLookupMsgs, archive.Org.ID, archive.StartDate, archive.endDate())
if err != nil {
return 0, errors.Wrapf(err, "error querying messages for org: %d", archive.Org.ID)
}
Expand All @@ -93,28 +92,21 @@ func writeMessageRecords(ctx context.Context, db *sqlx.DB, archive *Archive, wri
return recordCount, nil
}

const selectOrgMessagesInRange = `
SELECT mm.id, mm.visibility
FROM msgs_msg mm
const sqlSelectOrgMessagesInRange = `
SELECT mm.id, mm.visibility
FROM msgs_msg mm
LEFT JOIN contacts_contact cc ON cc.id = mm.contact_id
WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3
ORDER BY mm.created_on ASC, mm.id ASC
`
WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3
ORDER BY mm.created_on ASC, mm.id ASC`

const deleteMessageLogs = `
DELETE FROM channels_channellog
WHERE msg_id IN(?)
`
const sqlDeleteChannelLogs = `
DELETE FROM channels_channellog WHERE msg_id IN(?)`

const deleteMessageLabels = `
DELETE FROM msgs_msg_labels
WHERE msg_id IN(?)
`
const sqlDeleteMessageLabels = `
DELETE FROM msgs_msg_labels WHERE msg_id IN(?)`

const deleteMessages = `
DELETE FROM msgs_msg
WHERE id IN(?)
`
const sqlDeleteMessages = `
DELETE FROM msgs_msg WHERE id IN(?)`

// DeleteArchivedMessages takes the passed in archive, verifies the S3 file is still present (and correct), then selects
// all the messages in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time
Expand Down Expand Up @@ -147,7 +139,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
}

// ok, archive file looks good, let's build up our list of message ids, this may be big but we are int64s so shouldn't be too big
rows, err := db.QueryxContext(outer, selectOrgMessagesInRange, archive.OrgID, archive.StartDate, archive.endDate())
rows, err := db.QueryxContext(outer, sqlSelectOrgMessagesInRange, archive.OrgID, archive.StartDate, archive.endDate())
if err != nil {
return err
}
Expand Down Expand Up @@ -193,19 +185,19 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
}

// first delete any channel logs
err = executeInQuery(ctx, tx, deleteMessageLogs, idBatch)
err = executeInQuery(ctx, tx, sqlDeleteChannelLogs, idBatch)
if err != nil {
return errors.Wrap(err, "error removing channel logs")
}

// then any labels
err = executeInQuery(ctx, tx, deleteMessageLabels, idBatch)
err = executeInQuery(ctx, tx, sqlDeleteMessageLabels, idBatch)
if err != nil {
return errors.Wrap(err, "error removing message labels")
}

// finally, delete our messages
err = executeInQuery(ctx, tx, deleteMessages, idBatch)
err = executeInQuery(ctx, tx, sqlDeleteMessages, idBatch)
if err != nil {
return errors.Wrap(err, "error deleting messages")
}
Expand Down Expand Up @@ -239,27 +231,19 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
return nil
}

const selectOldOrgBroadcasts = `
SELECT
id
FROM
msgs_broadcast
WHERE
org_id = $1 AND
created_on < $2 AND
schedule_id IS NULL
ORDER BY
created_on ASC,
id ASC
LIMIT 1000000;
`
const sqlSelectOldOrgBroadcasts = `
SELECT id
FROM msgs_broadcast
WHERE org_id = $1 AND created_on < $2 AND schedule_id IS NULL
ORDER BY created_on ASC, id ASC
LIMIT 1000000;`

// DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no active messages on them
func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error {
start := dates.Now()
threshhold := now.AddDate(0, 0, -org.RetentionPeriod)

rows, err := db.QueryxContext(ctx, selectOldOrgBroadcasts, org.ID, threshhold)
rows, err := db.QueryxContext(ctx, sqlSelectOldOrgBroadcasts, org.ID, threshhold)
if err != nil {
return err
}
Expand Down Expand Up @@ -344,11 +328,7 @@ func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sq
}

if count > 0 {
logrus.WithFields(logrus.Fields{
"elapsed": dates.Since(start),
"count": count,
"org_id": org.ID,
}).Info("completed deleting broadcasts")
logrus.WithFields(logrus.Fields{"elapsed": dates.Since(start), "count": count, "org_id": org.ID}).Info("completed deleting broadcasts")
}

return nil
Expand Down
28 changes: 12 additions & 16 deletions archives/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
RunStatusFailed = "F"
)

const lookupFlowRuns = `
const sqlLookupRuns = `
SELECT rec.uuid, rec.exited_on, row_to_json(rec)
FROM (
SELECT
Expand Down Expand Up @@ -58,13 +58,12 @@ FROM (
WHERE fr.org_id = $1 AND fr.modified_on >= $2 AND fr.modified_on < $3
ORDER BY fr.modified_on ASC, id ASC
) as rec;
`
) as rec;`

// writeRunRecords writes the runs in the archive's date range to the passed in writer
func writeRunRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer *bufio.Writer) (int, error) {
var rows *sqlx.Rows
rows, err := db.QueryxContext(ctx, lookupFlowRuns, archive.Org.ID, archive.StartDate, archive.endDate())
rows, err := db.QueryxContext(ctx, sqlLookupRuns, archive.Org.ID, archive.StartDate, archive.endDate())
if err != nil {
return 0, errors.Wrapf(err, "error querying run records for org: %d", archive.Org.ID)
}
Expand Down Expand Up @@ -96,18 +95,15 @@ func writeRunRecords(ctx context.Context, db *sqlx.DB, archive *Archive, writer
return recordCount, nil
}

const selectOrgRunsInRange = `
SELECT fr.id, fr.status
FROM flows_flowrun fr
const sqlSelectOrgRunsInRange = `
SELECT fr.id, fr.status
FROM flows_flowrun fr
LEFT JOIN contacts_contact cc ON cc.id = fr.contact_id
WHERE fr.org_id = $1 AND fr.modified_on >= $2 AND fr.modified_on < $3
ORDER BY fr.modified_on ASC, fr.id ASC
`
WHERE fr.org_id = $1 AND fr.modified_on >= $2 AND fr.modified_on < $3
ORDER BY fr.modified_on ASC, fr.id ASC`

const deleteRuns = `
DELETE FROM flows_flowrun
WHERE id IN(?)
`
const sqlDeleteRuns = `
DELETE FROM flows_flowrun WHERE id IN(?)`

// DeleteArchivedRuns takes the passed in archive, verifies the S3 file is still present (and correct), then selects
// all the runs in the archive date range, and if equal or fewer than the number archived, deletes them 100 at a time
Expand Down Expand Up @@ -140,7 +136,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie
}

// ok, archive file looks good, let's build up our list of run ids, this may be big but we are int64s so shouldn't be too big
rows, err := db.QueryxContext(outer, selectOrgRunsInRange, archive.OrgID, archive.StartDate, archive.endDate())
rows, err := db.QueryxContext(outer, sqlSelectOrgRunsInRange, archive.OrgID, archive.StartDate, archive.endDate())
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +185,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie
}

// delete our runs
err = executeInQuery(ctx, tx, deleteRuns, idBatch)
err = executeInQuery(ctx, tx, sqlDeleteRuns, idBatch)
if err != nil {
return errors.Wrap(err, "error deleting runs")
}
Expand Down
Loading

0 comments on commit 91ec410

Please sign in to comment.