blazer.rb 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. require "csv"
  2. require "yaml"
  3. require "chartkick"
  4. require "blazer/version"
  5. require "blazer/data_source"
  6. require "blazer/engine"
  7. require "safely/core"
  8. module Blazer
  9. class Error < StandardError; end
  10. class TimeoutNotSupported < Error; end
  11. class << self
  12. attr_accessor :audit
  13. attr_reader :time_zone
  14. attr_accessor :user_name
  15. attr_accessor :user_class
  16. attr_accessor :user_method
  17. attr_accessor :before_action
  18. attr_accessor :from_email
  19. attr_accessor :cache
  20. attr_accessor :transform_statement
  21. attr_accessor :check_schedules
  22. attr_accessor :anomaly_checks
  23. attr_accessor :async
  24. end
  25. self.audit = true
  26. self.user_name = :name
  27. self.check_schedules = ["5 minutes", "1 hour", "1 day"]
  28. self.anomaly_checks = false
  29. self.async = false
  30. TIMEOUT_MESSAGE = "Query timed out :("
  31. TIMEOUT_ERRORS = [
  32. "canceling statement due to statement timeout", # postgres
  33. "canceling statement due to conflict with recovery", # postgres
  34. "cancelled on user's request", # redshift
  35. "canceled on user's request", # redshift
  36. "system requested abort", # redshift
  37. "maximum statement execution time exceeded" # mysql
  38. ]
  39. BELONGS_TO_OPTIONAL = {}
  40. BELONGS_TO_OPTIONAL[:optional] = true if Rails::VERSION::MAJOR >= 5
  41. def self.time_zone=(time_zone)
  42. @time_zone = time_zone.is_a?(ActiveSupport::TimeZone) ? time_zone : ActiveSupport::TimeZone[time_zone.to_s]
  43. end
  44. def self.settings
  45. @settings ||= begin
  46. path = Rails.root.join("config", "blazer.yml").to_s
  47. if File.exist?(path)
  48. YAML.load(ERB.new(File.read(path)).result)
  49. else
  50. {}
  51. end
  52. end
  53. end
  54. def self.data_sources
  55. @data_sources ||= begin
  56. ds = Hash[
  57. settings["data_sources"].map do |id, s|
  58. [id, Blazer::DataSource.new(id, s)]
  59. end
  60. ]
  61. ds.default = ds.values.first
  62. ds
  63. end
  64. end
  65. def self.run_checks(schedule: nil)
  66. checks = Blazer::Check.includes(:query)
  67. checks = checks.where(schedule: schedule) if schedule
  68. checks.find_each do |check|
  69. next if check.state == "disabled"
  70. Safely.safely { run_check(check) }
  71. end
  72. end
  73. def self.run_check(check)
  74. rows = nil
  75. error = nil
  76. tries = 1
  77. ActiveSupport::Notifications.instrument("run_check.blazer", check_id: check.id, query_id: check.query.id, state_was: check.state) do |instrument|
  78. # try 3 times on timeout errors
  79. data_source = data_sources[check.query.data_source]
  80. statement = check.query.statement
  81. Blazer.transform_statement.call(data_source, statement) if Blazer.transform_statement
  82. while tries <= 3
  83. columns, rows, error, cached_at = data_source.run_statement(statement, refresh_cache: true, check: check, query: check.query)
  84. if error == Blazer::TIMEOUT_MESSAGE
  85. Rails.logger.info "[blazer timeout] query=#{check.query.name}"
  86. tries += 1
  87. sleep(10)
  88. elsif error.to_s.start_with?("PG::ConnectionBad")
  89. data_source.reconnect
  90. Rails.logger.info "[blazer reconnect] query=#{check.query.name}"
  91. tries += 1
  92. sleep(10)
  93. else
  94. break
  95. end
  96. end
  97. check.update_state(columns, rows, error, data_source)
  98. # TODO use proper logfmt
  99. Rails.logger.info "[blazer check] query=#{check.query.name} state=#{check.state} rows=#{rows.try(:size)} error=#{error}"
  100. instrument[:statement] = statement
  101. instrument[:data_source] = data_source
  102. instrument[:state] = check.state
  103. instrument[:rows] = rows.try(:size)
  104. instrument[:error] = error
  105. instrument[:tries] = tries
  106. end
  107. end
  108. def self.send_failing_checks
  109. emails = {}
  110. Blazer::Check.includes(:query).where(state: ["failing", "error", "timed out", "disabled"]).find_each do |check|
  111. check.split_emails.each do |email|
  112. (emails[email] ||= []) << check
  113. end
  114. end
  115. emails.each do |email, checks|
  116. Blazer::CheckMailer.failing_checks(email, checks).deliver_later
  117. end
  118. end
  119. def self.column_types(columns, rows, boom = {})
  120. columns.each_with_index.map do |k, i|
  121. v = (rows.find { |r| r[i] } || {})[i]
  122. if boom[k]
  123. "string"
  124. elsif v.is_a?(Numeric)
  125. "numeric"
  126. elsif v.is_a?(Time) || v.is_a?(Date)
  127. "time"
  128. elsif v.nil?
  129. nil
  130. else
  131. "string"
  132. end
  133. end
  134. end
  135. def self.chart_type(column_types)
  136. if column_types.compact.size >= 2 && column_types.compact == ["time"] + (column_types.compact.size - 1).times.map { "numeric" }
  137. "line"
  138. elsif column_types == ["time", "string", "numeric"]
  139. "line2"
  140. elsif column_types.compact.size >= 2 && column_types == ["string"] + (column_types.compact.size - 1).times.map { "numeric" }
  141. "bar"
  142. end
  143. end
  144. def self.detect_anomaly(columns, rows, data_source)
  145. anomaly = nil
  146. message = nil
  147. if rows.empty?
  148. message = "No data"
  149. else
  150. boom = self.boom(columns, rows, data_source)
  151. chart_type = self.chart_type(column_types(columns, rows, boom))
  152. if chart_type == "line" || chart_type == "line2"
  153. series = []
  154. if chart_type == "line"
  155. columns[1..-1].each_with_index.each do |k, i|
  156. series << {name: k, data: rows.map{ |r| [r[0], r[i + 1]] }}
  157. end
  158. else
  159. rows.group_by { |r| v = r[1]; (boom[columns[1]] || {})[v.to_s] || v }.each_with_index.map do |(name, v), i|
  160. series << {name: name, data: v.map { |v2| [v2[0], v2[2]] }}
  161. end
  162. end
  163. current_series = nil
  164. begin
  165. anomalies = []
  166. series.each do |s|
  167. current_series = s[:name]
  168. anomalies << s[:name] if anomaly?(s[:data])
  169. end
  170. anomaly = anomalies.any?
  171. if anomaly
  172. if anomalies.size == 1
  173. message = "Anomaly detected in #{anomalies.first}"
  174. else
  175. message = "Anomalies detected in #{anomalies.to_sentence}"
  176. end
  177. else
  178. message = "No anomalies detected"
  179. end
  180. rescue => e
  181. message = "#{current_series}: #{e.message}"
  182. end
  183. else
  184. message = "Bad format"
  185. end
  186. end
  187. [anomaly, message]
  188. end
  189. def self.anomaly?(series)
  190. series = series.reject { |v| v[0].nil? }.sort_by { |v| v[0] }
  191. csv_str =
  192. CSV.generate do |csv|
  193. csv << ["timestamp", "count"]
  194. series.each do |row|
  195. csv << row
  196. end
  197. end
  198. timestamps = []
  199. r_script = %x[which Rscript].chomp
  200. raise "R not found" if r_script.empty?
  201. output = %x[#{r_script} --vanilla #{File.expand_path("../blazer/detect_anomalies.R", __FILE__)} #{Shellwords.escape(csv_str)}]
  202. if output.empty?
  203. raise "Unknown R error"
  204. end
  205. rows = CSV.parse(output, headers: true)
  206. error = rows.first && rows.first["x"]
  207. raise error if error
  208. rows.each do |row|
  209. timestamps << Time.parse(row["timestamp"])
  210. end
  211. timestamps.include?(series.last[0].to_time)
  212. end
  213. def self.boom(columns, rows, data_source)
  214. boom = {}
  215. columns.each_with_index do |key, i|
  216. query = data_source.smart_columns[key]
  217. if query
  218. values = rows.map { |r| r[i] }.compact.uniq
  219. columns, rows2, error, cached_at = data_source.run_statement(ActiveRecord::Base.send(:sanitize_sql_array, [query.sub("{value}", "(?)"), values]))
  220. boom[key] = Hash[rows2.map { |k, v| [k.to_s, v] }]
  221. end
  222. end
  223. boom
  224. end
  225. end