Explorar o código

Added async option for polling

Andrew Kane %!s(int64=8) %!d(string=hai) anos
pai
achega
eb565523e8

+ 1 - 0
CHANGELOG.md

@@ -2,6 +2,7 @@
 
 - Added new bar chart format
 - Added anomaly detection checks
+- Added `async` option for polling
 
 ## 1.4.0
 

+ 11 - 1
app/assets/javascripts/blazer/application.js

@@ -31,7 +31,17 @@ function runQuery(data, success, error) {
     method: "POST",
     data: data,
     dataType: "html"
-  }).done(success).fail( function(jqXHR, textStatus, errorThrown) {
+  }).done( function (d) {
+    if (d[0] == "{") {
+      var response = $.parseJSON(d);
+      data.blazer = response;
+      setTimeout( function () {
+        runQuery(data, success, error);
+      }, 1000);
+    } else {
+      success(d);
+    }
+  }).fail( function(jqXHR, textStatus, errorThrown) {
     var message = (typeof errorThrown === "string") ? errorThrown : errorThrown.message;
     error(message);
   });

+ 1 - 1
app/controllers/blazer/base_controller.rb

@@ -51,7 +51,7 @@ module Blazer
     helper_method :extract_vars
 
     def variable_params
-      params.except(:controller, :action, :id, :host, :query, :dashboard, :query_id, :query_ids, :table_names, :authenticity_token, :utf8, :_method, :commit, :statement, :data_source, :name, :fork_query_id).permit!
+      params.except(:controller, :action, :id, :host, :query, :dashboard, :query_id, :query_ids, :table_names, :authenticity_token, :utf8, :_method, :commit, :statement, :data_source, :name, :fork_query_id, :blazer).permit!
     end
     helper_method :variable_params
 

+ 67 - 17
app/controllers/blazer/queries_controller.rb

@@ -71,25 +71,58 @@ module Blazer
       data_source = params[:data_source]
       process_vars(@statement, data_source)
       @only_chart = params[:only_chart]
-
-      if @success
-        @query = Query.find_by(id: params[:query_id]) if params[:query_id]
-
-        data_source = @query.data_source if @query && @query.data_source
-        @data_source = Blazer.data_sources[data_source]
-
-        @columns, @rows, @error, @cached_at, @just_cached = @data_source.run_main_statement(@statement, user: blazer_user, query: @query, refresh_cache: params[:check])
-
-        render_run
-      end
-
-      respond_to do |format|
-        format.html do
-          render layout: false
+      @run_id = blazer_params[:run_id]
+      @query = Query.find_by(id: params[:query_id]) if params[:query_id]
+      data_source = @query.data_source if @query && @query.data_source
+      @data_source = Blazer.data_sources[data_source]
+
+      if @run_id
+        @timestamp = blazer_params[:timestamp].to_i
+
+        @columns, @rows, @error, @cached_at = @data_source.run_results(@run_id)
+        @success = !@rows.nil?
+
+        if @success
+          @data_source.delete_results(@run_id)
+          @just_cached = !@error && @cached_at.present?
+          @cached_at = nil
+          params[:data_source] = nil
+          render_run
+        elsif Time.now > Time.at(@timestamp + (@data_source.timeout || 120).to_i)
+          # timed out
+          @error = Blazer::TIMEOUT_MESSAGE
+          @rows = []
+          @columns = []
+          render_run
+        else
+          continue_run
         end
-        format.csv do
-          send_data csv_data(@columns, @rows, @data_source), type: "text/csv; charset=utf-8; header=present", disposition: "attachment; filename=\"#{@query.try(:name).try(:parameterize).presence || 'query'}.csv\""
+      elsif @success
+        @run_id = Blazer.async ? SecureRandom.uuid : nil
+
+        options = {user: blazer_user, query: @query, refresh_cache: params[:check], run_id: @run_id}
+        result = []
+        if Blazer.async && request.format.symbol != :csv
+          Blazer::RunStatementJob.perform_async(result, @data_source, @statement, options)
+          wait_start = Time.now
+          loop do
+            sleep(0.02)
+            break if result.any? || Time.now - wait_start > 3
+          end
+        else
+          result = @data_source.run_main_statement(@statement, options)
+        end
+
+        if result.any?
+          @columns, @rows, @error, @cached_at, @just_cached = result
+          @data_source.delete_results(@run_id) if @run_id
+          render_run
+        else
+          @timestamp = Time.now.to_i
+          continue_run
         end
+      else
+        render layout: false
       end
     end
 
@@ -129,6 +162,10 @@ module Blazer
 
     private
 
+    def continue_run
+      render json: {run_id: @run_id, timestamp: @timestamp}, status: :accepted
+    end
+
     def render_run
       @checks = @query ? @query.checks : []
 
@@ -173,6 +210,15 @@ module Blazer
             end
         end
       end
+
+      respond_to do |format|
+        format.html do
+          render layout: false
+        end
+        format.csv do
+          send_data csv_data(@columns, @rows, @data_source), type: "text/csv; charset=utf-8; header=present", disposition: "attachment; filename=\"#{@query.try(:name).try(:parameterize).presence || 'query'}.csv\""
+        end
+      end
     end
 
     def set_queries(limit = nil)
@@ -212,6 +258,10 @@ module Blazer
       params.require(:query).permit(:name, :description, :statement, :data_source)
     end
 
+    def blazer_params
+      params[:blazer] || {}
+    end
+
     def csv_data(columns, rows, data_source)
       CSV.generate do |csv|
         csv << columns

+ 2 - 0
lib/blazer.rb

@@ -22,11 +22,13 @@ module Blazer
     attr_accessor :transform_statement
     attr_accessor :check_schedules
     attr_accessor :anomaly_checks
+    attr_accessor :async
   end
   self.audit = true
   self.user_name = :name
   self.check_schedules = ["5 minutes", "1 hour", "1 day"]
   self.anomaly_checks = false
+  self.async = false
 
   TIMEOUT_MESSAGE = "Query timed out :("
   TIMEOUT_ERRORS = [

+ 44 - 13
lib/blazer/data_source.rb

@@ -133,16 +133,29 @@ module Blazer
       [columns, rows, error, cached_at, just_cached]
     end
 
+    def read_cache(cache_key)
+      value = Blazer.cache.read(cache_key)
+      Marshal.load(value) if value
+    end
+
+    def run_results(run_id)
+      read_cache(run_cache_key(run_id))
+    end
+
+    def delete_results(run_id)
+      Blazer.cache.delete(run_cache_key(run_id))
+    end
+
     def run_statement(statement, options = {})
       columns = nil
       rows = nil
       error = nil
       cached_at = nil
       just_cached = false
-      cache_key = self.cache_key(statement) if cache_mode != "off"
+      run_id = options[:run_id]
+      cache_key = statement_cache_key(statement) if cache_mode != "off"
       if cache_mode != "off" && !options[:refresh_cache]
-        value = Blazer.cache.read(cache_key)
-        columns, rows, cached_at = Marshal.load(value) if value
+        columns, rows, error, cached_at = read_cache(cache_key)
       end
 
       unless rows
@@ -160,7 +173,7 @@ module Blazer
         if options[:check]
           comment << ",check_id:#{options[:check].id},check_emails:#{options[:check].emails}"
         end
-        columns, rows, error, just_cached = run_statement_helper(statement, comment)
+        columns, rows, error, just_cached = run_statement_helper(statement, comment, options[:run_id])
       end
 
       output = [columns, rows, error, cached_at]
@@ -169,11 +182,19 @@ module Blazer
     end
 
     def clear_cache(statement)
-      Blazer.cache.delete(cache_key(statement))
+      Blazer.cache.delete(statement_cache_key(statement))
+    end
+
+    def cache_key(key)
+      (["blazer", "v4"] + key).join("/")
+    end
+
+    def statement_cache_key(statement)
+      cache_key(["statement", id, Digest::MD5.hexdigest(statement)])
     end
 
-    def cache_key(statement)
-      ["blazer", "v3", id, Digest::MD5.hexdigest(statement)].join("/")
+    def run_cache_key(run_id)
+      cache_key(["run", run_id])
     end
 
     def schemas
@@ -204,7 +225,7 @@ module Blazer
 
     protected
 
-    def run_statement_helper(statement, comment)
+    def run_statement_helper(statement, comment, run_id)
       columns = []
       rows = []
       error = nil
@@ -241,14 +262,24 @@ module Blazer
       end
 
       cache_data = nil
-      if !error && (cache_mode == "all" || (cache_mode == "slow" && duration >= cache_slow_threshold))
-        cache_data = Marshal.dump([columns, rows, Time.now]) rescue nil
-        if cache_data
-          Blazer.cache.write(cache_key(statement), cache_data, expires_in: cache_expires_in.to_f * 60)
+      cache = !error && (cache_mode == "all" || (cache_mode == "slow" && duration >= cache_slow_threshold))
+      if cache || run_id
+        cache_data = Marshal.dump([columns, rows, error, cache ? Time.now : nil]) rescue nil
+      end
+
+      if cache && cache_data
+        Blazer.cache.write(statement_cache_key(statement), cache_data, expires_in: cache_expires_in.to_f * 60)
+      end
+
+      if run_id
+        unless cache_data
+          error = "Error storing the results of this query :("
+          cache_data = Marshal.dump([[], [], error, nil])
         end
+        Blazer.cache.write(run_cache_key(run_id), cache_data, expires_in: 30.seconds)
       end
 
-      [columns, rows, error, !cache_data.nil?]
+      [columns, rows, error, cache && !cache_data.nil?]
     end
 
     def adapter_name

+ 4 - 0
lib/blazer/engine.rb

@@ -31,6 +31,10 @@ module Blazer
       Blazer.cache ||= Rails.cache
 
       Blazer.anomaly_checks = Blazer.settings["anomaly_checks"] || false
+      Blazer.async = Blazer.settings["async"] || false
+      if Blazer.async
+        require "blazer/run_statement_job"
+      end
     end
   end
 end

+ 16 - 0
lib/blazer/run_statement_job.rb

@@ -0,0 +1,16 @@
+require "sucker_punch"
+
+module Blazer
+  class RunStatementJob
+    include SuckerPunch::Job
+    workers 4
+
+    def perform(result, data_source, statement, options)
+      ActiveRecord::Base.connection_pool.with_connection do
+        data_source.connection_model.connection_pool.with_connection do
+          result.concat(data_source.run_main_statement(statement, options))
+        end
+      end
+    end
+  end
+end