Sfoglia il codice sorgente

Add anomaly detection

Andrew Kane 8 anni fa
parent
commit
a5a7918853

+ 1 - 0
CHANGELOG.md

@@ -1,6 +1,7 @@
 ## 1.4.1 [unreleased]
 
 - Added new bar chart format
+- Added anomaly detection checks
 
 ## 1.4.0
 

+ 40 - 0
README.md

@@ -306,6 +306,25 @@ SELECT * FROM ratings WHERE user_id IS NULL /* all ratings should have a user */
 
 Then create check with optional emails if you want to be notified. Emails are sent when a check starts failing, and when it starts passing again.
 
+## Anomaly Detection [master]
+
+Anomaly detection is supported thanks to Twitter’s [AnomalyDetection](https://github.com/twitter/AnomalyDetection) library.
+
+First, [install R](https://cloud.r-project.org/). Then, run:
+
+```R
+install.packages("devtools")
+devtools::install_github("twitter/AnomalyDetection")
+```
+
+And add to `config/blazer.yml`:
+
+```yml
+anomaly_checks: true
+```
+
+If upgrading from version 1.4 or below, also follow the [upgrade instructions](#15).
+
 ## Data Sources
 
 Blazer supports multiple data sources :tada:
@@ -346,6 +365,27 @@ For an easy way to group by day, week, month, and more with correct time zones,
 
 ## Upgrading
 
+### 1.5 [unreleased]
+
+To take advantage of the anomaly detection, create a migration
+
+```sh
+rails g migration upgrade_blazer_to_1_5
+```
+
+with:
+
+```ruby
+add_column(:blazer_checks, :check_type, :string)
+add_column(:blazer_checks, :message, :text)
+commit_db_transaction
+
+BlazerCheck.reset_column_information
+
+BlazerCheck.where(invert: true).update_all(check_type: "missing_data")
+BlazerCheck.where(check_type: nil).update_all(check_type: "bad_data")
+```
+
 ### 1.3
 
 To take advantage of the latest features, create a migration

+ 1 - 1
app/controllers/blazer/checks_controller.rb

@@ -46,7 +46,7 @@ module Blazer
     private
 
     def check_params
-      params.require(:check).permit(:query_id, :emails, :invert, :schedule)
+      params.require(:check).permit(:query_id, :emails, :invert, :check_type, :schedule)
     end
 
     def set_check

+ 1 - 9
app/controllers/blazer/queries_controller.rb

@@ -152,15 +152,7 @@ module Blazer
       @filename = @query.name.parameterize if @query
       @min_width_types = @columns.each_with_index.select { |c, i| @first_row[i].is_a?(Time) || @first_row[i].is_a?(String) || @data_source.smart_columns[c] }
 
-      @boom = {}
-      @columns.each_with_index do |key, i|
-        query = @data_source.smart_columns[key]
-        if query
-          values = @rows.map { |r| r[i] }.compact.uniq
-          columns, rows, error, cached_at = @data_source.run_statement(ActiveRecord::Base.send(:sanitize_sql_array, [query.sub("{value}", "(?)"), values]))
-          @boom[key] = Hash[rows.map { |k, v| [k.to_s, v] }]
-        end
-      end
+      @boom = Blazer.boom(@columns, @rows, @data_source)
 
       @linked_columns = @data_source.linked_columns
 

+ 0 - 17
app/helpers/blazer/base_helper.rb

@@ -16,23 +16,6 @@ module Blazer
       end
     end
 
-    def blazer_column_types(columns, rows, boom)
-      columns.each_with_index.map do |k, i|
-        v = (rows.find { |r| r[i] } || {})[i]
-        if boom[k]
-          "string"
-        elsif v.is_a?(Numeric)
-          "numeric"
-        elsif v.is_a?(Time) || v.is_a?(Date)
-          "time"
-        elsif v.nil?
-          nil
-        else
-          "string"
-        end
-      end
-    end
-
     def blazer_maps?
       ENV["MAPBOX_ACCESS_TOKEN"].present?
     end

+ 25 - 5
app/models/blazer/check.rb

@@ -15,8 +15,18 @@ module Blazer
       emails.to_s.downcase.split(",").map(&:strip)
     end
 
-    def update_state(rows, error)
-      invert = respond_to?(:invert) && self.invert
+    def update_state(columns, rows, error, data_source)
+      check_type =
+        if respond_to?(:check_type)
+          self.check_type
+        elsif respond_to?(:invert)
+          invert ? "missing_data" : "bad_data"
+        else
+          "bad_data"
+        end
+
+      message = error
+
       self.state =
         if error
           if error == Blazer::TIMEOUT_MESSAGE
@@ -24,13 +34,23 @@ module Blazer
           else
             "error"
           end
+        elsif check_type == "anomaly"
+          anomaly, message = Blazer.detect_anomaly(columns, rows, data_source)
+          if anomaly.nil?
+            "error"
+          elsif anomaly
+            "failing"
+          else
+            "passing"
+          end
         elsif rows.any?
-          invert ? "passing" : "failing"
+          check_type == "missing_data" ? "passing" : "failing"
         else
-          invert ? "failing" : "passing"
+          check_type == "missing_data" ? "failing" : "passing"
         end
 
       self.last_run_at = Time.now if respond_to?(:last_run_at=)
+      self.message = message if respond_to?(:message=)
 
       if respond_to?(:timeouts=)
         if state == "timed out"
@@ -43,7 +63,7 @@ module Blazer
 
       # do not notify on creation, except when not passing
       if (state_was || state != "passing") && state != state_was && emails.present?
-        Blazer::CheckMailer.state_change(self, state, state_was, rows.size, error).deliver_later
+        Blazer::CheckMailer.state_change(self, state, state_was, rows.size, message).deliver_later
       end
       save! if changed?
     end

+ 13 - 1
app/views/blazer/checks/_form.html.erb

@@ -20,7 +20,19 @@
     </script>
   </div>
 
-  <% if @check.respond_to?(:invert) %>
+  <% if @check.respond_to?(:check_type) %>
+    <div class="form-group">
+      <%= f.label :check_type, "Alert if" %>
+      <div class="hide">
+        <% check_options = [["Any results (bad data)", "bad_data"], ["No results (missing data)", "missing_data"]] %>
+        <% check_options << ["Anomaly (most recent data point)", "anomaly"] if Blazer.anomaly_checks %>
+        <%= f.select :check_type, check_options %>
+      </div>
+      <script>
+        $("#check_check_type").selectize({}).parent().removeClass("hide");
+      </script>
+    </div>
+  <% elsif @check.respond_to?(:invert) %>
     <div class="form-group">
       <%= f.label :invert, "Fails if" %>
       <div class="hide">

+ 8 - 4
app/views/blazer/queries/run.html.erb

@@ -29,13 +29,17 @@
 
       <% @checks.select(&:state).each do |check| %>
         &middot; <small class="check-state <%= check.state.parameterize("_") %>"><%= link_to check.state.upcase, edit_check_path(check) %></small>
+        <% if check.try(:message) %>
+          &middot; <%= check.message %>
+        <% end %>
       <% end %>
     </p>
   <% end %>
   <% if @rows.any? %>
     <% values = @rows.first %>
     <% chart_id = SecureRandom.hex %>
-    <% column_types = blazer_column_types(@columns, @rows, @boom) %>
+    <% column_types = Blazer.column_types(@columns, @rows, @boom) %>
+    <% chart_type = Blazer.chart_type(column_types) %>
     <% chart_options = {id: chart_id, min: nil} %>
     <% series_library = {} %>
     <% target_index = @columns.index { |k| k.downcase == "target" } %>
@@ -71,11 +75,11 @@
         featureLayer.setGeoJSON(geojson);
         map.fitBounds(featureLayer.getBounds());
       </script>
-    <% elsif values.size >= 2 && column_types.compact == ["time"] + (column_types.compact.size - 1).times.map { "numeric" } %>
+    <% elsif chart_type == "line" %>
       <%= line_chart @columns[1..-1].each_with_index.map{ |k, i| {name: k, data: @rows.map{ |r| [r[0], r[i + 1]] }, library: series_library[i]} }, chart_options %>
-    <% elsif values.size == 3 && column_types == ["time", "string", "numeric"] %>
+    <% elsif chart_type == "line2" %>
       <%= line_chart @rows.group_by { |r| v = r[1]; (@boom[@columns[1]] || {})[v.to_s] || v }.each_with_index.map { |(name, v), i| {name: name, data: v.map { |v2| [v2[0], v2[2]] }, library: series_library[i]} }, chart_options %>
-    <% elsif values.size >= 2 && column_types == ["string"] + (values.size - 1).times.map { "numeric" } %>
+    <% elsif chart_type == "bar" %>
       <%= column_chart (values.size - 1).times.map { |i| name = @columns[i + 1]; {name: name, data: @rows.first(20).map { |r| [(@boom[@columns[0]] || {})[r[0].to_s] || r[0], r[i + 1]] } } }, id: chart_id %>
     <% elsif values.size == 3 && column_types == ["string", "string", "numeric"] %>
       <% first_20 = @rows.group_by { |r| r[0] }.values.first(20).flatten(1) %>

+ 123 - 2
lib/blazer.rb

@@ -21,10 +21,12 @@ module Blazer
     attr_accessor :cache
     attr_accessor :transform_statement
     attr_accessor :check_schedules
+    attr_accessor :anomaly_checks
   end
   self.audit = true
   self.user_name = :name
   self.check_schedules = ["5 minutes", "1 hour", "1 day"]
+  self.anomaly_checks = false
 
   TIMEOUT_MESSAGE = "Query timed out :("
   TIMEOUT_ERRORS = [
@@ -92,7 +94,7 @@ module Blazer
           tries += 1
           sleep(10)
         elsif error.to_s.start_with?("PG::ConnectionBad")
-          data_sources[check.query.data_source].reconnect
+          data_source.reconnect
           Rails.logger.info "[blazer reconnect] query=#{check.query.name}"
           tries += 1
           sleep(10)
@@ -100,7 +102,7 @@ module Blazer
           break
         end
       end
-      check.update_state(rows, error)
+      check.update_state(columns, rows, error, data_source)
       # TODO use proper logfmt
       Rails.logger.info "[blazer check] query=#{check.query.name} state=#{check.state} rows=#{rows.try(:size)} error=#{error}"
 
@@ -125,4 +127,123 @@ module Blazer
       Blazer::CheckMailer.failing_checks(email, checks).deliver_later
     end
   end
+
+  def self.column_types(columns, rows, boom = {})
+    columns.each_with_index.map do |k, i|
+      v = (rows.find { |r| r[i] } || {})[i]
+      if boom[k]
+        "string"
+      elsif v.is_a?(Numeric)
+        "numeric"
+      elsif v.is_a?(Time) || v.is_a?(Date)
+        "time"
+      elsif v.nil?
+        nil
+      else
+        "string"
+      end
+    end
+  end
+
+  def self.chart_type(column_types)
+    if column_types.compact.size >= 2 && column_types.compact == ["time"] + (column_types.compact.size - 1).times.map { "numeric" }
+      "line"
+    elsif column_types == ["time", "string", "numeric"]
+      "line2"
+    elsif column_types.compact.size >= 2 && column_types == ["string"] + (column_types.compact.size - 1).times.map { "numeric" }
+      "bar"
+    end
+  end
+
+  def self.detect_anomaly(columns, rows, data_source)
+    anomaly = nil
+    message = nil
+
+    if rows.empty?
+      message = "No data"
+    else
+      boom = self.boom(columns, rows, data_source)
+      chart_type = self.chart_type(column_types(columns, rows, boom))
+      if chart_type == "line" || chart_type == "line2"
+        series = []
+
+        if chart_type == "line"
+          columns[1..-1].each_with_index.each do |k, i|
+            series << {name: k, data: rows.map{ |r| [r[0], r[i + 1]] }}
+          end
+        else
+          rows.group_by { |r| v = r[1]; (boom[columns[1]] || {})[v.to_s] || v }.each_with_index.map do |(name, v), i|
+            series << {name: name, data: v.map { |v2| [v2[0], v2[2]] }}
+          end
+        end
+
+        current_series = nil
+        begin
+          anomalies = []
+          series.each do |s|
+            current_series = s[:name]
+            anomalies << s[:name] if anomaly?(s[:data])
+          end
+          anomaly = anomalies.any?
+          if anomaly
+            if anomalies.size == 1
+              message = "Anomaly detected in #{anomalies.first}"
+            else
+              message = "Anomalies detected in #{anomalies.to_sentence}"
+            end
+          else
+            message = "No anomalies detected"
+          end
+        rescue => e
+          message = "#{current_series}: #{e.message}"
+        end
+      else
+        message = "Bad format"
+      end
+    end
+
+    [anomaly, message]
+  end
+
+  def self.anomaly?(series)
+    series = series.reject { |v| v[0].nil? }.sort_by { |v| v[0] }
+
+    csv_str =
+      CSV.generate do |csv|
+        csv << ["timestamp", "count"]
+        series.each do |row|
+          csv << row
+        end
+      end
+
+    timestamps = []
+    r_script = %x[which Rscript].chomp
+    raise "R not found" if r_script.empty?
+    output = %x[#{r_script} --vanilla #{File.expand_path("../blazer/detect_anomalies.R", __FILE__)} #{Shellwords.escape(csv_str)}]
+    if output.empty?
+      raise "Unknown R error"
+    end
+
+    rows = CSV.parse(output, headers: true)
+    error = rows.first && rows.first["x"]
+    raise error if error
+
+    rows.each do |row|
+      timestamps << Time.parse(row["timestamp"])
+    end
+    timestamps.include?(series.last[0].to_time)
+  end
+
+  def self.boom(columns, rows, data_source)
+    boom = {}
+    columns.each_with_index do |key, i|
+      query = data_source.smart_columns[key]
+      if query
+        values = rows.map { |r| r[i] }.compact.uniq
+        columns, rows2, error, cached_at = data_source.run_statement(ActiveRecord::Base.send(:sanitize_sql_array, [query.sub("{value}", "(?)"), values]))
+        boom[key] = Hash[rows2.map { |k, v| [k.to_s, v] }]
+      end
+    end
+    boom
+  end
 end

+ 1 - 1
lib/blazer/data_source.rb

@@ -126,7 +126,7 @@ module Blazer
 
       if query && error != Blazer::TIMEOUT_MESSAGE
         query.checks.each do |check|
-          check.update_state(rows, error)
+          check.update_state(columns, rows, error, self)
         end
       end
 

+ 14 - 0
lib/blazer/detect_anomalies.R

@@ -0,0 +1,14 @@
+tryCatch({
+  library(AnomalyDetection)
+
+  args <- commandArgs(trailingOnly = TRUE)
+
+  con <- textConnection(args[1])
+  data <- read.csv(con, stringsAsFactors = FALSE)
+  data$timestamp <- as.POSIXct(data$timestamp)
+
+  res = AnomalyDetectionTs(data, direction = "both", alpha = 0.05)
+  write.csv(res$anoms)
+}, error = function (e) {
+  write.csv(geterrmessage())
+})

+ 2 - 0
lib/blazer/engine.rb

@@ -29,6 +29,8 @@ module Blazer
       end
 
       Blazer.cache ||= Rails.cache
+
+      Blazer.anomaly_checks = Blazer.settings["anomaly_checks"] || false
     end
   end
 end

+ 2 - 1
lib/generators/blazer/templates/install.rb

@@ -36,7 +36,8 @@ class <%= migration_class_name %> < ActiveRecord::Migration
       t.string :state
       t.string :schedule
       t.text :emails
-      t.boolean :invert
+      t.string :check_type
+      t.text :message
       t.timestamp :last_run_at
       t.timestamps
     end