|
@@ -1,10 +1,11 @@
|
|
require "csv"
|
|
require "csv"
|
|
require "yaml"
|
|
require "yaml"
|
|
require "chartkick"
|
|
require "chartkick"
|
|
|
|
+require "safely/core"
|
|
require "blazer/version"
|
|
require "blazer/version"
|
|
require "blazer/data_source"
|
|
require "blazer/data_source"
|
|
|
|
+require "blazer/result"
|
|
require "blazer/engine"
|
|
require "blazer/engine"
|
|
-require "safely/core"
|
|
|
|
|
|
|
|
module Blazer
|
|
module Blazer
|
|
class Error < StandardError; end
|
|
class Error < StandardError; end
|
|
@@ -90,12 +91,12 @@ module Blazer
|
|
Blazer.transform_statement.call(data_source, statement) if Blazer.transform_statement
|
|
Blazer.transform_statement.call(data_source, statement) if Blazer.transform_statement
|
|
|
|
|
|
while tries <= 3
|
|
while tries <= 3
|
|
- columns, rows, error, cached_at = data_source.run_statement(statement, refresh_cache: true, check: check, query: check.query)
|
|
|
|
- if error == Blazer::TIMEOUT_MESSAGE
|
|
|
|
|
|
+ result = data_source.run_statement(statement, refresh_cache: true, check: check, query: check.query)
|
|
|
|
+ if result.timed_out?
|
|
Rails.logger.info "[blazer timeout] query=#{check.query.name}"
|
|
Rails.logger.info "[blazer timeout] query=#{check.query.name}"
|
|
tries += 1
|
|
tries += 1
|
|
sleep(10)
|
|
sleep(10)
|
|
- elsif error.to_s.start_with?("PG::ConnectionBad")
|
|
|
|
|
|
+ elsif result.error.to_s.start_with?("PG::ConnectionBad")
|
|
data_source.reconnect
|
|
data_source.reconnect
|
|
Rails.logger.info "[blazer reconnect] query=#{check.query.name}"
|
|
Rails.logger.info "[blazer reconnect] query=#{check.query.name}"
|
|
tries += 1
|
|
tries += 1
|
|
@@ -104,15 +105,15 @@ module Blazer
|
|
break
|
|
break
|
|
end
|
|
end
|
|
end
|
|
end
|
|
- check.update_state(columns, rows, error, data_source)
|
|
|
|
|
|
+ check.update_state(result)
|
|
# TODO use proper logfmt
|
|
# TODO use proper logfmt
|
|
- Rails.logger.info "[blazer check] query=#{check.query.name} state=#{check.state} rows=#{rows.try(:size)} error=#{error}"
|
|
|
|
|
|
+ Rails.logger.info "[blazer check] query=#{check.query.name} state=#{check.state} rows=#{result.rows.try(:size)} error=#{result.error}"
|
|
|
|
|
|
instrument[:statement] = statement
|
|
instrument[:statement] = statement
|
|
instrument[:data_source] = data_source
|
|
instrument[:data_source] = data_source
|
|
instrument[:state] = check.state
|
|
instrument[:state] = check.state
|
|
- instrument[:rows] = rows.try(:size)
|
|
|
|
- instrument[:error] = error
|
|
|
|
|
|
+ instrument[:rows] = result.rows.try(:size)
|
|
|
|
+ instrument[:error] = result.error
|
|
instrument[:tries] = tries
|
|
instrument[:tries] = tries
|
|
end
|
|
end
|
|
end
|
|
end
|
|
@@ -129,123 +130,4 @@ module Blazer
|
|
Blazer::CheckMailer.failing_checks(email, checks).deliver_later
|
|
Blazer::CheckMailer.failing_checks(email, checks).deliver_later
|
|
end
|
|
end
|
|
end
|
|
end
|
|
-
|
|
|
|
- def self.column_types(columns, rows, boom = {})
|
|
|
|
- columns.each_with_index.map do |k, i|
|
|
|
|
- v = (rows.find { |r| r[i] } || {})[i]
|
|
|
|
- if boom[k]
|
|
|
|
- "string"
|
|
|
|
- elsif v.is_a?(Numeric)
|
|
|
|
- "numeric"
|
|
|
|
- elsif v.is_a?(Time) || v.is_a?(Date)
|
|
|
|
- "time"
|
|
|
|
- elsif v.nil?
|
|
|
|
- nil
|
|
|
|
- else
|
|
|
|
- "string"
|
|
|
|
- end
|
|
|
|
- end
|
|
|
|
- end
|
|
|
|
-
|
|
|
|
- def self.chart_type(column_types)
|
|
|
|
- if column_types.compact.size >= 2 && column_types.compact == ["time"] + (column_types.compact.size - 1).times.map { "numeric" }
|
|
|
|
- "line"
|
|
|
|
- elsif column_types == ["time", "string", "numeric"]
|
|
|
|
- "line2"
|
|
|
|
- elsif column_types.compact.size >= 2 && column_types == ["string"] + (column_types.compact.size - 1).times.map { "numeric" }
|
|
|
|
- "bar"
|
|
|
|
- end
|
|
|
|
- end
|
|
|
|
-
|
|
|
|
- def self.detect_anomaly(columns, rows, data_source)
|
|
|
|
- anomaly = nil
|
|
|
|
- message = nil
|
|
|
|
-
|
|
|
|
- if rows.empty?
|
|
|
|
- message = "No data"
|
|
|
|
- else
|
|
|
|
- boom = self.boom(columns, rows, data_source)
|
|
|
|
- chart_type = self.chart_type(column_types(columns, rows, boom))
|
|
|
|
- if chart_type == "line" || chart_type == "line2"
|
|
|
|
- series = []
|
|
|
|
-
|
|
|
|
- if chart_type == "line"
|
|
|
|
- columns[1..-1].each_with_index.each do |k, i|
|
|
|
|
- series << {name: k, data: rows.map{ |r| [r[0], r[i + 1]] }}
|
|
|
|
- end
|
|
|
|
- else
|
|
|
|
- rows.group_by { |r| v = r[1]; (boom[columns[1]] || {})[v.to_s] || v }.each_with_index.map do |(name, v), i|
|
|
|
|
- series << {name: name, data: v.map { |v2| [v2[0], v2[2]] }}
|
|
|
|
- end
|
|
|
|
- end
|
|
|
|
-
|
|
|
|
- current_series = nil
|
|
|
|
- begin
|
|
|
|
- anomalies = []
|
|
|
|
- series.each do |s|
|
|
|
|
- current_series = s[:name]
|
|
|
|
- anomalies << s[:name] if anomaly?(s[:data])
|
|
|
|
- end
|
|
|
|
- anomaly = anomalies.any?
|
|
|
|
- if anomaly
|
|
|
|
- if anomalies.size == 1
|
|
|
|
- message = "Anomaly detected in #{anomalies.first}"
|
|
|
|
- else
|
|
|
|
- message = "Anomalies detected in #{anomalies.to_sentence}"
|
|
|
|
- end
|
|
|
|
- else
|
|
|
|
- message = "No anomalies detected"
|
|
|
|
- end
|
|
|
|
- rescue => e
|
|
|
|
- message = "#{current_series}: #{e.message}"
|
|
|
|
- end
|
|
|
|
- else
|
|
|
|
- message = "Bad format"
|
|
|
|
- end
|
|
|
|
- end
|
|
|
|
-
|
|
|
|
- [anomaly, message]
|
|
|
|
- end
|
|
|
|
-
|
|
|
|
- def self.anomaly?(series)
|
|
|
|
- series = series.reject { |v| v[0].nil? }.sort_by { |v| v[0] }
|
|
|
|
-
|
|
|
|
- csv_str =
|
|
|
|
- CSV.generate do |csv|
|
|
|
|
- csv << ["timestamp", "count"]
|
|
|
|
- series.each do |row|
|
|
|
|
- csv << row
|
|
|
|
- end
|
|
|
|
- end
|
|
|
|
-
|
|
|
|
- timestamps = []
|
|
|
|
- r_script = %x[which Rscript].chomp
|
|
|
|
- raise "R not found" if r_script.empty?
|
|
|
|
- output = %x[#{r_script} --vanilla #{File.expand_path("../blazer/detect_anomalies.R", __FILE__)} #{Shellwords.escape(csv_str)}]
|
|
|
|
- if output.empty?
|
|
|
|
- raise "Unknown R error"
|
|
|
|
- end
|
|
|
|
-
|
|
|
|
- rows = CSV.parse(output, headers: true)
|
|
|
|
- error = rows.first && rows.first["x"]
|
|
|
|
- raise error if error
|
|
|
|
-
|
|
|
|
- rows.each do |row|
|
|
|
|
- timestamps << Time.parse(row["timestamp"])
|
|
|
|
- end
|
|
|
|
- timestamps.include?(series.last[0].to_time)
|
|
|
|
- end
|
|
|
|
-
|
|
|
|
- def self.boom(columns, rows, data_source)
|
|
|
|
- boom = {}
|
|
|
|
- columns.each_with_index do |key, i|
|
|
|
|
- query = data_source.smart_columns[key]
|
|
|
|
- if query
|
|
|
|
- values = rows.map { |r| r[i] }.compact.uniq
|
|
|
|
- columns, rows2, error, cached_at = data_source.run_statement(ActiveRecord::Base.send(:sanitize_sql_array, [query.sub("{value}", "(?)"), values]))
|
|
|
|
- boom[key] = Hash[rows2.map { |k, v| [k.to_s, v] }]
|
|
|
|
- end
|
|
|
|
- end
|
|
|
|
- boom
|
|
|
|
- end
|
|
|
|
end
|
|
end
|