Explorar el Código

add bigquery adapter (#138)

* add bigquery adapter

* return empty headers when there are no results
Pedro Carmona hace 7 años
padre
commit
b72b74b303
Se han modificado 3 ficheros con 90 adiciones y 0 borrados
  1. 14 0
      README.md
  2. 2 0
      lib/blazer.rb
  3. 74 0
      lib/blazer/adapters/bigquery_adapter.rb

+ 14 - 0
README.md

@@ -392,6 +392,7 @@ data_sources:
 - [Redshift](#redshift)
 - [Presto](#presto)
 - [Apache Drill](#apache-drill)
+- [Google BigQuery](#google-bigquery)
 - [MongoDB](#mongodb-1)
 - [Elasticsearch](#elasticsearch) [beta]
 
@@ -484,6 +485,19 @@ data_sources:
     url: http://hostname:8047
 ```
 
+### Google BigQuery
+
+Add [google-cloud-bigquery](https://github.com/GoogleCloudPlatform/google-cloud-ruby/tree/master/google-cloud-bigquery) to your Gemfile and set:
+
+```yml
+data_sources:
+  my_source:
+    adapter: bigquery
+    url: "" # required for blazer
+    project: "your_project"
+    keyfile: "json credentials file"
+```
+
 ### MongoDB
 
 Add [mongo](https://github.com/mongodb/mongo-ruby-driver) to your Gemfile and set:

+ 2 - 0
lib/blazer.rb

@@ -7,6 +7,7 @@ require "blazer/data_source"
 require "blazer/result"
 require "blazer/run_statement"
 require "blazer/adapters/base_adapter"
+require "blazer/adapters/bigquery_adapter"
 require "blazer/adapters/drill_adapter"
 require "blazer/adapters/elasticsearch_adapter"
 require "blazer/adapters/mongodb_adapter"
@@ -160,6 +161,7 @@ module Blazer
 end
 
 Blazer.register_adapter "drill", Blazer::Adapters::DrillAdapter
+Blazer.register_adapter "bigquery", Blazer::Adapters::BigQueryAdapter
 Blazer.register_adapter "elasticsearch", Blazer::Adapters::ElasticsearchAdapter
 Blazer.register_adapter "mongodb", Blazer::Adapters::MongodbAdapter
 Blazer.register_adapter "presto", Blazer::Adapters::PrestoAdapter

+ 74 - 0
lib/blazer/adapters/bigquery_adapter.rb

@@ -0,0 +1,74 @@
+module Blazer
+  module Adapters
+    class BigQueryAdapter < BaseAdapter
+      def run_statement(statement, comment)
+        columns = []
+        rows = []
+        error = nil
+        begin
+          results = bigquery.query(statement, timeout: timeout_ms)
+          columns = results.first.keys.map(&:to_s) if results.size > 0
+          rows = results.map(&:values)
+        rescue StandardError => e
+          error = e.message
+        end
+        [columns, rows, error]
+      end
+
+      def tables
+        table_refs.map{|t| "#{t.project_id}.#{t.dataset_id}.#{t.table_id}" }
+      end
+
+      def schema
+        table_refs.map{|table_ref| 
+          {
+            schema: table_ref.dataset_id,
+            table: table_ref.table_id,
+            columns: table_columns(table_ref)
+          }
+        }
+      end
+
+      def preview_statement
+        "SELECT * FROM `{table}` LIMIT 10"
+      end
+
+      private
+
+      def bigquery
+        @bigquery ||= connect!
+      end
+
+      def connect!
+        require "google/cloud/bigquery"
+        params = { project: settings["project"], keyfile: settings["keyfile"] }
+        @bigquery = Google::Cloud::Bigquery.new(params)
+        ::Google::Apis.logger.level = Logger::INFO
+        @bigquery
+      end
+
+      def table_refs
+        bigquery
+          .datasets
+          .map(&:tables)
+          .flat_map { |table_list| table_list.map(&:table_ref) }
+      end
+
+      def table_columns(table_ref)
+        schema = 
+          bigquery
+          .service
+          .get_table(table_ref.dataset_id, table_ref.table_id)
+          .schema
+        return [] if schema.nil?
+        schema
+          .fields
+          .map { |field| { name: field.name, data_type: field.type} }
+      end
+
+      def timeout_ms
+        30 * 1000 # 30 seconds
+      end
+    end
+  end
+end