Class: CleanSweep::PurgeRunner

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/clean_sweep/purge_runner.rb

Overview

This is a utility built to mimic some of the features of the pt_archive script to make really big purges go faster with no production impact.

It uses a strategy of descending an index, querying for purgeable ids and then deleting them in batches.

Required Options

:model

Required: the Active Record model for the table being purged or copied from

Optional Options

:chunk_size

The number of rows to copy in each block. Defaults to 500.

:index

The index to traverse in ascending order doing the purge. Rows are read in the order of the index, which must be a btree index. If not specified, PRIMARY is assumed.

:reverse

Traverse the index in reverse order. For example, if your index is on account_id, timestamp, this option will move through the rows starting at the highest account number, then move through timestamps starting with the most recent.

:first_only

Traverse only the first column of the index, and do so inclusively using the >= operator instead of the strict > operator. This is important if the index is not unique and there are a lot of duplicates. Otherwise the delete could miss rows. Not allowed in copy mode because you'd be inserting duplicate rows.

:dry_run

Print out the queries that are going to be used. You should run explain on these.

:stop_after

The operation will end after copying this many rows.

:report

Specify an interval in seconds between status messages being printed out.

:logger

The log instance to use. Defaults to the ActiveRecord::Base.logger if not nil, otherwise it uses _$stdout_

:dest_model

Specifies the model for the delete operation, or the copy operation if in copy mode. When this option is present nothing is deleted in the model table. Instead, rows are either inserted into this table or deleted from this table. The columns in this model must include the primary key columns found in the source model. If they have different names you need to specify them with the dest_columns option.

:copy_only

Specifies copy mode, where rows are inserted into the destination table instead of deleted from the model table. By default, only columns in the named index and primary key are copied but these can be augmented with columns in the copy_columns option.

:dest_columns

This is a map of column names in the model to column names in the dest model when the corresponding models differ. Only column names that are different need to be specified. For instance your table of account ids might have account_id as the primary key column, but you want to delete rows in the accounts table where the account id is the column named id

:copy_columns

Extra columns to add when copying to a dest model.

Safety thresholds

:sleep

Time in seconds to sleep between each chunk.

:max_history

The history list size (if available) is checked every 5 minutes and if it exceeds this size the purge will pause until the history list is below 90% of this value.

:max_repl_lag

The maximum length of the replication lag. Checked every 5 minutes and if exceeded the purge pauses until the replication lag is below 90% of this value.

Defined Under Namespace

Modules: Logging Classes: MysqlStatus

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from Logging

#format, #log, #report

Constructor Details

- (PurgeRunner) initialize(options = {})

Returns a new instance of PurgeRunner



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/clean_sweep/purge_runner.rb', line 80

def initialize(options={})
  @model     = options[:model] or raise "source model class required"
  @limit            = options[:chunk_size] || 500

  @target_model     = options[:dest_model]
  @stop_after       = options[:stop_after]
  @report_interval  = options[:report] || 10.seconds
  @logger           = options[:logger] || ActiveRecord::Base.logger || Logger.new($stdout)
  @dry_run          = options[:dry_run]
  @sleep            = options[:sleep]

  @max_history      = options[:max_history]
  @max_repl_lag     = options[:max_repl_lag]

  @copy_mode        = @target_model && options[:copy_only]

  @table_schema     = CleanSweep::TableSchema.new @model,
                                                  key_name: options[:index],
                                                  ascending: !options[:reverse],
                                                  extra_columns: options[:copy_columns],
                                                  first_only: options[:first_only],
                                                  dest_model: @target_model,
                                                  dest_columns: options[:dest_columns]

  if (@max_history || @max_repl_lag)
    @mysql_status = CleanSweep::PurgeRunner::MysqlStatus.new model: @model,
                                                             max_history: @max_history,
                                                             max_repl_lag: @max_repl_lag,
                                                             check_period: options[:check_period],
                                                            logger: @logger
  end

  raise "You can't copy rows from a table into itself" if copy_mode? && @model == @target_model
  raise "An index is required in copy mode" if copy_mode? && @table_schema.traversing_key.nil?
  raise "first_only option not allowed in copy mode" if copy_mode? && @table_schema.first_only?

  @report_interval_start = Time.now

  @query                 = @table_schema.initial_scope.limit(@limit)

  @query = yield(@query) if block_given?
end

Instance Attribute Details

- (Object) mysql_status (readonly)

This helps us track the state of replication and history list and pause if necessary



78
79
80
# File 'lib/clean_sweep/purge_runner.rb', line 78

def mysql_status
  @mysql_status
end

Instance Method Details

- (Boolean) copy_mode?

Returns:

  • (Boolean)


124
125
126
# File 'lib/clean_sweep/purge_runner.rb', line 124

def copy_mode?
  @copy_mode
end

- (Object) execute_in_batches

Execute the purge in chunks according to the parameters given on instance creation. Will raise CleanSweep::PurgeStopped if a stop_after option was provided and that limit is hit.

Returns the number of rows copied or deleted.



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/clean_sweep/purge_runner.rb', line 134

def execute_in_batches

  if @dry_run
    print_queries($stdout)
    return 0
  end

  @start = Time.now
  verb = copy_mode? ? "copying" : "purging"

  msg = "starting: #{verb} #{@table_schema.name} records in batches of #@limit"
  msg << " to #{@target_model.table_name}" if copy_mode?


  log :info,  "sleeping #{@sleep} seconds between purging" if @sleep && !copy_mode?
  @total_deleted = 0

  # Iterate through the rows in limit chunks
  log :debug, "find rows: #{@query.to_sql}" if @logger.level == Logger::DEBUG

  @mysql_status.check! if @mysql_status

  rows = NewRelic::Agent.with_database_metric_name(@model.name, 'SELECT') do
    @model.connection.select_rows @query.to_sql
  end
  while rows.any? && (!@stop_after || @total_deleted < @stop_after) do
#      index_entrypoint_args = Hash[*@source_keys.zip(rows.last).flatten]
    log :debug, "#{verb} #{rows.size} records between #{rows.first.inspect} and #{rows.last.inspect}" if @logger.level == Logger::DEBUG
    stopped = @stop_after && rows.size + @total_deleted > @stop_after

    rows = rows.first(@stop_after - @total_deleted) if stopped
    last_row = rows.last
    if copy_mode?
      metric_op_name = 'INSERT'
      statement = @table_schema.insert_statement(rows)
    else
      metric_op_name = 'DELETE'
      statement = @table_schema.delete_statement(rows)
    end
    log :debug, statement if @logger.level == Logger::DEBUG
    chunk_deleted = NewRelic::Agent.with_database_metric_name((@target_model||@model), metric_op_name) do
      (@target_model||@model).connection.update statement
    end

    @total_deleted += chunk_deleted
    raise CleanSweep::PurgeStopped.new("stopped after #{verb} #{@total_deleted} #{@model} records", @total_deleted) if stopped
    q = @table_schema.scope_to_next_chunk(@query, last_row).to_sql
    log :debug, "find rows: #{q}" if @logger.level == Logger::DEBUG

    sleep @sleep if @sleep && !copy_mode?
    @mysql_status.check! if @mysql_status

    rows = NewRelic::Agent.with_database_metric_name(@model, 'SELECT') do
      @model.connection.select_rows(q)
    end
    report
  end
  report(true)
  if copy_mode?
    log :info,  "completed after #{verb} #{@total_deleted} #{@table_schema.name} records to #{@target_model.table_name}"
  else
    log :info,  "completed after #{verb} #{@total_deleted} #{@table_schema.name} records"
  end

  return @total_deleted
end


208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/clean_sweep/purge_runner.rb', line 208

def print_queries(io)
  io.puts 'Initial Query:'
  io.puts format_query('    ', @query.to_sql)
  rows = @model.connection.select_rows @query.limit(1).to_sql
  if rows.empty?
    # Don't have any sample data to use for the sample queries, so use NULL values just
    # so the query will print out.
    rows << [nil] * 100
  end
  io.puts "Chunk Query:"
  io.puts format_query('    ', @table_schema.scope_to_next_chunk(@query, rows.first).to_sql)
  if copy_mode?
    io.puts "Insert Statement:"
    io.puts format_query('    ', @table_schema.insert_statement(rows))
  else
    io.puts "Delete Statement:"
    io.puts format_query('    ', @table_schema.delete_statement(rows))
  end
end

- (Object) sleep(duration)



201
202
203
# File 'lib/clean_sweep/purge_runner.rb', line 201

def sleep duration
  Kernel.sleep duration
end