| 
					
				 | 
			
			
				@@ -67,42 +67,46 @@ module Blazer 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     checks = Blazer::Check.includes(:query) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     checks = checks.where(schedule: schedule) if schedule 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     checks.find_each do |check| 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-      rows = nil 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-      error = nil 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-      tries = 1 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      run_check(check) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-      ActiveSupport::Notifications.instrument("run_check.blazer", check_id: check.id, query_id: check.query.id, state_was: check.state) do |instrument| 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        # try 3 times on timeout errors 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        data_source = data_sources[check.query.data_source] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        statement = check.query.statement 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Blazer.transform_statement.call(data_source, statement) if Blazer.transform_statement 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+  def self.run_check(check) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    rows = nil 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    error = nil 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    tries = 1 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        while tries <= 3 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-          columns, rows, error, cached_at = data_source.run_statement(statement, refresh_cache: true) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-          if error == Blazer::TIMEOUT_MESSAGE 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            Rails.logger.info "[blazer timeout] query=#{check.query.name}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            tries += 1 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            sleep(10) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-          elsif error.to_s.start_with?("PG::ConnectionBad") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            data_sources[check.query.data_source].reconnect 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            Rails.logger.info "[blazer reconnect] query=#{check.query.name}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            tries += 1 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            sleep(10) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-          else 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            break 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-          end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        check.update_state(rows, error) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        # TODO use proper logfmt 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Rails.logger.info "[blazer check] query=#{check.query.name} state=#{check.state} rows=#{rows.try(:size)} error=#{error}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    ActiveSupport::Notifications.instrument("run_check.blazer", check_id: check.id, query_id: check.query.id, state_was: check.state) do |instrument| 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      # try 3 times on timeout errors 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      data_source = data_sources[check.query.data_source] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      statement = check.query.statement 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      Blazer.transform_statement.call(data_source, statement) if Blazer.transform_statement 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        instrument[:statement] = statement 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        instrument[:data_source] = data_source 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        instrument[:state] = check.state 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        instrument[:rows] = rows.try(:size) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        instrument[:error] = error 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        instrument[:tries] = tries 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      while tries <= 3 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        columns, rows, error, cached_at = data_source.run_statement(statement, refresh_cache: true) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if error == Blazer::TIMEOUT_MESSAGE 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          Rails.logger.info "[blazer timeout] query=#{check.query.name}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          tries += 1 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          sleep(10) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        elsif error.to_s.start_with?("PG::ConnectionBad") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          data_sources[check.query.data_source].reconnect 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          Rails.logger.info "[blazer reconnect] query=#{check.query.name}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          tries += 1 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          sleep(10) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        else 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+          break 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				       end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      check.update_state(rows, error) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      # TODO use proper logfmt 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      Rails.logger.info "[blazer check] query=#{check.query.name} state=#{check.state} rows=#{rows.try(:size)} error=#{error}" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      instrument[:statement] = statement 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      instrument[:data_source] = data_source 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      instrument[:state] = check.state 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      instrument[:rows] = rows.try(:size) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      instrument[:error] = error 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+      instrument[:tries] = tries 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				   end 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 |