Skip to content

Commit

Permalink
Merge pull request #79 from nyaruka/bcast_delete_optimzation
Browse files Browse the repository at this point in the history
Skip counting messages per broadcast
  • Loading branch information
rowanseymour authored Jan 6, 2023
2 parents bae126e + 908e189 commit 1d13ba4
Showing 1 changed file with 6 additions and 20 deletions.
26 changes: 6 additions & 20 deletions archives/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,12 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
}

const sqlSelectOldOrgBroadcasts = `
SELECT id
FROM msgs_broadcast
WHERE org_id = $1 AND created_on < $2 AND schedule_id IS NULL
ORDER BY created_on ASC, id ASC
LIMIT 1000000;`
SELECT id
FROM msgs_broadcast b
WHERE b.org_id = $1 AND b.created_on < $2 AND b.schedule_id IS NULL AND NOT EXISTS (SELECT 1 FROM msgs_msg WHERE broadcast_id = b.id)
LIMIT 1000000;`

// DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no active messages on them
// DeleteBroadcasts deletes all broadcasts older than 90 days for the passed in org which have no associated messages
func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, org Org) error {
start := dates.Now()
threshhold := now.AddDate(0, 0, -org.RetentionPeriod)
Expand All @@ -261,23 +260,10 @@ func DeleteBroadcasts(ctx context.Context, now time.Time, config *Config, db *sq
}

var broadcastID int64
err := rows.Scan(&broadcastID)
if err != nil {
if err := rows.Scan(&broadcastID); err != nil {
return errors.Wrap(err, "unable to get broadcast id")
}

// make sure we have no active messages
var msgCount int64
err = db.Get(&msgCount, `SELECT count(*) FROM msgs_msg WHERE broadcast_id = $1`, broadcastID)
if err != nil {
return errors.Wrapf(err, "unable to select number of msgs for broadcast: %d", broadcastID)
}

if msgCount != 0 {
logrus.WithField("broadcast_id", broadcastID).WithField("org_id", org.ID).WithField("msg_count", msgCount).Warn("unable to delete broadcast, has messages still")
continue
}

// we delete broadcasts in a transaction per broadcast
tx, err := db.BeginTx(ctx, nil)
if err != nil {
Expand Down

0 comments on commit 1d13ba4

Please sign in to comment.