Browse Source

Added Druid adapter

Andrew Kane 6 years ago
parent
commit
e4bfa6b42e

+ 1 - 0
CHANGELOG.md

@@ -1,6 +1,7 @@
 ## 1.8.1 [unreleased]
 
 - Added support for Amazon Athena
+- Added support for Druid
 - Fixed query cancellation
 
 ## 1.8.0

+ 14 - 0
README.md

@@ -404,6 +404,7 @@ data_sources:
 - [Apache Drill](#apache-drill)
 - [Google BigQuery](#google-bigquery)
 - [MongoDB](#mongodb-1)
+- [Druid](#druid) [master]
 - [Elasticsearch](#elasticsearch-beta) [beta]
 
 You can also [create an adapter](#creating-an-adapter) for any other data store.
@@ -529,6 +530,19 @@ data_sources:
     url: mongodb://user:password@hostname:27017/database
 ```
 
+### Druid
+
+First, [enable SQL support](http://druid.io/docs/latest/querying/sql.html) on the broker.
+
+Set:
+
+```yml
+data_sources:
+  my_source:
+    adapter: druid
+    url: http://hostname:8082
+```
+
 ### Elasticsearch [beta]
 
 Add [elasticsearch](https://github.com/elastic/elasticsearch-ruby) to your Gemfile and set:

+ 2 - 0
app/views/blazer/queries/run.html.erb

@@ -111,6 +111,8 @@
       <div class="results-container">
         <% if @columns == ["QUERY PLAN"] %>
           <pre><code><%= @rows.map { |r| r[0] }.join("\n") %></code></pre>
+        <% elsif @columns == ["PLAN"] && @data_source.adapter == "druid" %>
+          <pre><code><%= @rows[0][0] %></code></pre>
         <% else %>
           <table class="table results-table" style="margin-bottom: 0;">
             <thead>

+ 1 - 1
app/views/blazer/queries/show.html.erb

@@ -62,7 +62,7 @@
   </script>
 <% end %>
 
-<% if %w[sql presto drill bigquery athena].include?(Blazer.data_sources[@query.data_source].adapter) %>
+<% unless %w(mongodb elasticsearch).include?(Blazer.data_sources[@query.data_source].adapter) %>
   <script>
     // do not highlight really long queries
     // this can lead to performance issues

+ 5 - 3
lib/blazer.rb

@@ -10,6 +10,7 @@ require "blazer/adapters/base_adapter"
 require "blazer/adapters/athena_adapter"
 require "blazer/adapters/bigquery_adapter"
 require "blazer/adapters/drill_adapter"
+require "blazer/adapters/druid_adapter"
 require "blazer/adapters/elasticsearch_adapter"
 require "blazer/adapters/mongodb_adapter"
 require "blazer/adapters/presto_adapter"
@@ -174,10 +175,11 @@ module Blazer
   end
 end
 
-Blazer.register_adapter "drill", Blazer::Adapters::DrillAdapter
-Blazer.register_adapter "bigquery", Blazer::Adapters::BigQueryAdapter
 Blazer.register_adapter "athena", Blazer::Adapters::AthenaAdapter
+Blazer.register_adapter "bigquery", Blazer::Adapters::BigQueryAdapter
+Blazer.register_adapter "drill", Blazer::Adapters::DrillAdapter
+Blazer.register_adapter "druid", Blazer::Adapters::DruidAdapter
 Blazer.register_adapter "elasticsearch", Blazer::Adapters::ElasticsearchAdapter
-Blazer.register_adapter "mongodb", Blazer::Adapters::MongodbAdapter
 Blazer.register_adapter "presto", Blazer::Adapters::PrestoAdapter
+Blazer.register_adapter "mongodb", Blazer::Adapters::MongodbAdapter
 Blazer.register_adapter "sql", Blazer::Adapters::SqlAdapter

+ 55 - 0
lib/blazer/adapters/druid_adapter.rb

@@ -0,0 +1,55 @@
+module Blazer
+  module Adapters
+    class DruidAdapter < BaseAdapter
+      def run_statement(statement, comment)
+        columns = []
+        rows = []
+        error = nil
+
+        header = {"Content-Type" => "application/json", "Accept" => "application/json"}
+        context = {}
+        if data_source.timeout
+          context = data_source.timeout.to_i * 1000
+        end
+        data = {
+          query: statement,
+          context: context
+        }
+
+        uri = URI.parse("#{settings["url"]}/druid/v2/sql/")
+        http = Net::HTTP.new(uri.host, uri.port)
+
+        begin
+          response = JSON.parse(http.post(uri.request_uri, data.to_json, header).body)
+          if response.is_a?(Hash)
+            error = response["errorMessage"]
+            if error.include?("timed out")
+              error = Blazer::TIMEOUT_MESSAGE
+            end
+          else
+            columns = response.first.keys || []
+            rows = response.map { |r| r.values }
+          end
+         rescue => e
+           error = e.message
+         end
+
+        [columns, rows, error]
+      end
+
+      def tables
+        result = data_source.run_statement("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA NOT IN ('INFORMATION_SCHEMA') ORDER BY TABLE_NAME")
+        result.rows.map(&:first)
+      end
+
+      def schema
+        result = data_source.run_statement("SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA NOT IN ('INFORMATION_SCHEMA') ORDER BY 1, 2")
+        result.rows.group_by { |r| [r[0], r[1]] }.map { |k, vs| {schema: k[0], table: k[1], columns: vs.sort_by { |v| v[2] }.map { |v| {name: v[2], data_type: v[3]} }} }
+      end
+
+      def preview_statement
+        "SELECT * FROM {table} LIMIT 10"
+      end
+    end
+  end
+end