Browse Source

Added ability to cancel queries

Andrew Kane 7 years ago
parent
commit
a3d21b87f6

+ 29 - 0
app/assets/javascripts/blazer/application.js

@@ -25,15 +25,41 @@ $( function () {
   });
 });
 
+function uuid() {
+  return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
+    var r = Math.random()*16|0, v = c == 'x' ? r : (r&0x3|0x8);
+    return v.toString(16);
+  });
+}
+
 function cancelQuery(runningQuery) {
   runningQuery.canceled = true;
   var xhr = runningQuery.xhr;
   if (xhr) {
     xhr.abort();
   }
+  remoteCancelQuery(runningQuery);
   queryComplete();
 }
 
+function csrfProtect(payload) {
+  var param = $("meta[name=csrf-param]").attr("content");
+  var token = $("meta[name=csrf-token]").attr("content");
+  if (param && token) payload[param] = token;
+  return new Blob([JSON.stringify(payload)], {type : "application/json; charset=utf-8"});
+}
+
+function remoteCancelQuery(runningQuery) {
+  var path = window.cancelQueriesPath;
+  var data = {run_id: runningQuery.run_id, data_source: runningQuery.data_source};
+  if (navigator.sendBeacon) {
+    navigator.sendBeacon(path, csrfProtect(data));
+  } else {
+    // TODO make sync
+    $.post(path, data);
+  }
+}
+
 var queriesQueue = [];
 var runningQueries = 0;
 var maxQueries = 3;
@@ -61,6 +87,9 @@ function queryComplete() {
 
 function runQuery(data, success, error, runningQuery) {
   queueQuery( function () {
+    runningQuery = runningQuery || {};
+    runningQuery.run_id = data.run_id = uuid();
+    runningQuery.data_source = data.data_source;
     return runQueryHelper(data, success, error, runningQuery);
   });
 }

+ 11 - 2
app/controllers/blazer/queries_controller.rb

@@ -103,9 +103,9 @@ module Blazer
           continue_run
         end
       elsif @success
-        @run_id = Blazer.async ? SecureRandom.uuid : nil
+        @run_id = blazer_run_id
 
-        options = {user: blazer_user, query: @query, refresh_cache: params[:check], run_id: @run_id}
+        options = {user: blazer_user, query: @query, refresh_cache: params[:check], run_id: @run_id, async: Blazer.async}
         if Blazer.async && request.format.symbol != :csv
           result = []
           Blazer::RunStatementJob.perform_async(result, @data_source, @statement, options)
@@ -176,6 +176,11 @@ module Blazer
       @schema = Blazer.data_sources[params[:data_source]].schema
     end
 
+    def cancel
+      Blazer.data_sources[params[:data_source]].cancel(blazer_run_id)
+      render json: {}
+    end
+
     private
 
     def continue_run
@@ -307,5 +312,9 @@ module Blazer
       data_source.local_time_suffix.any? { |s| k.ends_with?(s) } ? v.to_s.sub(" UTC", "") : v.in_time_zone(Blazer.time_zone)
     end
     helper_method :blazer_time_value
+
+    def blazer_run_id
+      params[:run_id].to_s.gsub(/[^a-z0-9\-]/i, "")
+    end
   end
 end

+ 7 - 0
app/views/blazer/queries/_form.html.erb

@@ -131,6 +131,7 @@
 
 
   function queryDone() {
+    runningQuery = null
     $("#run").removeClass("hide")
     $("#cancel").addClass("hide")
   }
@@ -144,6 +145,12 @@
     $("#results").html("")
   })
 
+  $(window).unload(function() {
+    if (runningQuery) {
+      remoteCancelQuery(runningQuery)
+    }
+  })
+
   $("#run").click( function (e) {
     e.preventDefault();
 

+ 1 - 0
app/views/layouts/blazer/application.html.erb

@@ -9,6 +9,7 @@
     <%= javascript_include_tag "blazer/application" %>
     <script>
       var runQueriesPath = <%= run_queries_path.to_json.html_safe %>;
+      var cancelQueriesPath = <%= cancel_queries_path.to_json.html_safe %>;
     </script>
     <% if blazer_maps? %>
       <%= stylesheet_link_tag "https://api.mapbox.com/mapbox.js/v2.4.0/mapbox.css" %>

+ 1 - 0
config/routes.rb

@@ -1,6 +1,7 @@
 Blazer::Engine.routes.draw do
   resources :queries do
     post :run, on: :collection # err on the side of caution
+    post :cancel, on: :collection
     post :refresh, on: :member
     get :tables, on: :collection
     get :schema, on: :collection

+ 4 - 0
lib/blazer/adapters/base_adapter.rb

@@ -35,6 +35,10 @@ module Blazer
         # optional
       end
 
+      def cancel(run_id)
+        # optional
+      end
+
       protected
 
       def settings

+ 15 - 0
lib/blazer/adapters/sql_adapter.rb

@@ -72,8 +72,23 @@ module Blazer
         nil
       end
 
+      def cancel(run_id)
+        if postgresql?
+          execute("SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND query LIKE '%,run_id:#{run_id}%'")
+        elsif redshift?
+          first_row = connection_model.connection.select_all("SELECT pid FROM stv_recents WHERE status = 'Running' AND query LIKE '%,run_id:#{run_id}%'").first
+          if first_row
+            execute("CANCEL #{first_row["pid"].to_i}")
+          end
+        end
+      end
+
       protected
 
+      def execute(statement)
+        connection_model.connection.execute(statement)
+      end
+
       def postgresql?
         ["PostgreSQL", "PostGIS"].include?(adapter_name)
       end

+ 6 - 2
lib/blazer/data_source.rb

@@ -6,7 +6,7 @@ module Blazer
 
     attr_reader :id, :settings, :adapter, :adapter_instance
 
-    def_delegators :adapter_instance, :schema, :tables, :preview_statement, :reconnect, :cost, :explain
+    def_delegators :adapter_instance, :schema, :tables, :preview_statement, :reconnect, :cost, :explain, :cancel
 
     def initialize(id, settings)
       @id = id
@@ -109,6 +109,7 @@ module Blazer
 
     def run_statement(statement, options = {})
       run_id = options[:run_id]
+      async = options[:async]
       result = nil
       if cache_mode != "off" && !options[:refresh_cache]
         result = read_cache(statement_cache_key(statement))
@@ -129,7 +130,10 @@ module Blazer
         if options[:check]
           comment << ",check_id:#{options[:check].id},check_emails:#{options[:check].emails}"
         end
-        result = run_statement_helper(statement, comment, options[:run_id])
+        if options[:run_id]
+          comment << ",run_id:#{options[:run_id]}"
+        end
+        result = run_statement_helper(statement, comment, async ? options[:run_id] : nil)
       end
 
       result