Class: CleanSweep::PurgeRunner
- Inherits:
-
Object
- Object
- CleanSweep::PurgeRunner
- Includes:
- Logging
- Defined in:
- lib/clean_sweep/purge_runner.rb
Overview
This is a utility built to mimic some of the features of the pt_archive script to make really big purges go faster with no production impact.
It uses a strategy of descending an index, querying for purgeable ids and then deleting them in batches.
Required Options
- :model
-
Required: the Active Record model for the table being purged or copied from
Optional Options
- :chunk_size
-
The number of rows to copy in each block. Defaults to 500.
- :index
-
The index to traverse in ascending order doing the purge. Rows are read in the order of the index, which must be a btree index. If not specified,
PRIMARY
is assumed. - :reverse
-
Traverse the index in reverse order. For example, if your index is on
account_id
,timestamp
, this option will move through the rows starting at the highest account number, then move through timestamps starting with the most recent. - :first_only
-
Traverse only the first column of the index, and do so inclusively using the
>=
operator instead of the strict>
operator. This is important if the index is not unique and there are a lot of duplicates. Otherwise the delete could miss rows. Not allowed in copy mode because you'd be inserting duplicate rows. - :dry_run
-
Print out the queries that are going to be used. You should run explain on these.
- :stop_after
-
The operation will end after copying this many rows.
- :report
-
Specify an interval in seconds between status messages being printed out.
- :logger
-
The log instance to use. Defaults to the
ActiveRecord::Base.logger
if not nil, otherwise it uses _$stdout_ - :dest_model
-
Specifies the model for the delete operation, or the copy operation if in copy mode. When this option is present nothing is deleted in the model table. Instead, rows are either inserted into this table or deleted from this table. The columns in this model must include the primary key columns found in the source model. If they have different names you need to specify them with the
dest_columns
option. - :copy_only
-
Specifies copy mode, where rows are inserted into the destination table instead of deleted from the model table. By default, only columns in the named index and primary key are copied but these can be augmented with columns in the
copy_columns
option. - :dest_columns
-
This is a map of column names in the model to column names in the dest model when the corresponding models differ. Only column names that are different need to be specified. For instance your table of account ids might have
account_id
as the primary key column, but you want to delete rows in the accounts table where the account id is the column namedid
- :copy_columns
-
Extra columns to add when copying to a dest model.
Safety thresholds
- :sleep
-
Time in seconds to sleep between each chunk.
- :max_history
-
The history list size (if available) is checked every 5 minutes and if it exceeds this size the purge will pause until the history list is below 90% of this value.
- :max_repl_lag
-
The maximum length of the replication lag. Checked every 5 minutes and if exceeded the purge pauses until the replication lag is below 90% of this value.
Defined Under Namespace
Modules: Logging Classes: MysqlStatus
Instance Attribute Summary (collapse)
-
- (Object) mysql_status
readonly
This helps us track the state of replication and history list and pause if necessary.
Instance Method Summary (collapse)
- - (Boolean) copy_mode?
-
- (Object) execute_in_batches
Execute the purge in chunks according to the parameters given on instance creation.
-
- (PurgeRunner) initialize(options = {})
constructor
A new instance of PurgeRunner.
- - (Object) print_queries(io)
- - (Object) sleep(duration)
Methods included from Logging
Constructor Details
- (PurgeRunner) initialize(options = {})
Returns a new instance of PurgeRunner
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/clean_sweep/purge_runner.rb', line 80 def initialize(={}) @model = [:model] or raise "source model class required" @limit = [:chunk_size] || 500 @target_model = [:dest_model] @stop_after = [:stop_after] @report_interval = [:report] || 10.seconds @logger = [:logger] || ActiveRecord::Base.logger || Logger.new($stdout) @dry_run = [:dry_run] @sleep = [:sleep] @max_history = [:max_history] @max_repl_lag = [:max_repl_lag] @copy_mode = @target_model && [:copy_only] @table_schema = CleanSweep::TableSchema.new @model, key_name: [:index], ascending: ![:reverse], extra_columns: [:copy_columns], first_only: [:first_only], dest_model: @target_model, dest_columns: [:dest_columns] if (@max_history || @max_repl_lag) @mysql_status = CleanSweep::PurgeRunner::MysqlStatus.new model: @model, max_history: @max_history, max_repl_lag: @max_repl_lag, check_period: [:check_period], logger: @logger end raise "You can't copy rows from a table into itself" if copy_mode? && @model == @target_model raise "An index is required in copy mode" if copy_mode? && @table_schema.traversing_key.nil? raise "first_only option not allowed in copy mode" if copy_mode? && @table_schema.first_only? @report_interval_start = Time.now @query = @table_schema.initial_scope.limit(@limit) @query = yield(@query) if block_given? end |
Instance Attribute Details
- (Object) mysql_status (readonly)
This helps us track the state of replication and history list and pause if necessary
78 79 80 |
# File 'lib/clean_sweep/purge_runner.rb', line 78 def mysql_status @mysql_status end |
Instance Method Details
- (Boolean) copy_mode?
124 125 126 |
# File 'lib/clean_sweep/purge_runner.rb', line 124 def copy_mode? @copy_mode end |
- (Object) execute_in_batches
Execute the purge in chunks according to the parameters given on instance
creation. Will raise CleanSweep::PurgeStopped
if a
stop_after
option was provided and that limit is hit.
Returns the number of rows copied or deleted.
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/clean_sweep/purge_runner.rb', line 134 def execute_in_batches if @dry_run print_queries($stdout) return 0 end @start = Time.now verb = copy_mode? ? "copying" : "purging" msg = "starting: #{verb} #{@table_schema.name} records in batches of #@limit" msg << " to #{@target_model.table_name}" if copy_mode? log :info, "sleeping #{@sleep} seconds between purging" if @sleep && !copy_mode? @total_deleted = 0 # Iterate through the rows in limit chunks log :debug, "find rows: #{@query.to_sql}" if @logger.level == Logger::DEBUG @mysql_status.check! if @mysql_status rows = NewRelic::Agent.with_database_metric_name(@model.name, 'SELECT') do @model.connection.select_rows @query.to_sql end while rows.any? && (!@stop_after || @total_deleted < @stop_after) do # index_entrypoint_args = Hash[*@source_keys.zip(rows.last).flatten] log :debug, "#{verb} #{rows.size} records between #{rows.first.inspect} and #{rows.last.inspect}" if @logger.level == Logger::DEBUG stopped = @stop_after && rows.size + @total_deleted > @stop_after rows = rows.first(@stop_after - @total_deleted) if stopped last_row = rows.last if copy_mode? metric_op_name = 'INSERT' statement = @table_schema.insert_statement(rows) else metric_op_name = 'DELETE' statement = @table_schema.delete_statement(rows) end log :debug, statement if @logger.level == Logger::DEBUG chunk_deleted = NewRelic::Agent.with_database_metric_name((@target_model||@model), metric_op_name) do (@target_model||@model).connection.update statement end @total_deleted += chunk_deleted raise CleanSweep::PurgeStopped.new("stopped after #{verb} #{@total_deleted} #{@model} records", @total_deleted) if stopped q = @table_schema.scope_to_next_chunk(@query, last_row).to_sql log :debug, "find rows: #{q}" if @logger.level == Logger::DEBUG sleep @sleep if @sleep && !copy_mode? @mysql_status.check! if @mysql_status rows = NewRelic::Agent.with_database_metric_name(@model, 'SELECT') do @model.connection.select_rows(q) end report end report(true) if copy_mode? log :info, "completed after #{verb} #{@total_deleted} #{@table_schema.name} records to #{@target_model.table_name}" else log :info, "completed after #{verb} #{@total_deleted} #{@table_schema.name} records" end return @total_deleted end |
- (Object) print_queries(io)
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/clean_sweep/purge_runner.rb', line 208 def print_queries(io) io.puts 'Initial Query:' io.puts format_query(' ', @query.to_sql) rows = @model.connection.select_rows @query.limit(1).to_sql if rows.empty? # Don't have any sample data to use for the sample queries, so use NULL values just # so the query will print out. rows << [nil] * 100 end io.puts "Chunk Query:" io.puts format_query(' ', @table_schema.scope_to_next_chunk(@query, rows.first).to_sql) if copy_mode? io.puts "Insert Statement:" io.puts format_query(' ', @table_schema.insert_statement(rows)) else io.puts "Delete Statement:" io.puts format_query(' ', @table_schema.delete_statement(rows)) end end |
- (Object) sleep(duration)
201 202 203 |
# File 'lib/clean_sweep/purge_runner.rb', line 201 def sleep duration Kernel.sleep duration end |