Browse Source

Added support for Cassandra

Andrew Kane 6 years ago
parent
commit
1b1c7dc8c6
5 changed files with 74 additions and 1 deletions
  1. 1 0
      CHANGELOG.md
  2. 11 0
      README.md
  3. 2 0
      lib/blazer.rb
  4. 59 0
      lib/blazer/adapters/cassandra_adapter.rb
  5. 1 1
      lib/blazer/data_source.rb

+ 1 - 0
CHANGELOG.md

@@ -1,5 +1,6 @@
 ## 1.8.2 [unreleased]
 
+- Added support for Cassandra
 - Fixes for Druid
 
 ## 1.8.1

+ 11 - 0
README.md

@@ -404,6 +404,7 @@ data_sources:
 - [Apache Drill](#apache-drill)
 - [Google BigQuery](#google-bigquery)
 - [MongoDB](#mongodb-1)
+- [Cassandra](#cassandra) [master]
 - [Druid](#druid)
 - [Elasticsearch](#elasticsearch-beta) [beta]
 
@@ -530,6 +531,16 @@ data_sources:
     url: mongodb://user:password@hostname:27017/database
 ```
 
+### Cassandra
+
+Add [cassandra-driver](https://github.com/datastax/ruby-driver) to your Gemfile and set:
+
+```yml
+data_sources:
+  my_source:
+    url: cassandra://user:password@hostname:9042/keyspace
+```
+
 ### Druid
 
 First, [enable SQL support](http://druid.io/docs/latest/querying/sql.html#configuration) on the broker.

+ 2 - 0
lib/blazer.rb

@@ -9,6 +9,7 @@ require "blazer/run_statement"
 require "blazer/adapters/base_adapter"
 require "blazer/adapters/athena_adapter"
 require "blazer/adapters/bigquery_adapter"
+require "blazer/adapters/cassandra_adapter"
 require "blazer/adapters/drill_adapter"
 require "blazer/adapters/druid_adapter"
 require "blazer/adapters/elasticsearch_adapter"
@@ -177,6 +178,7 @@ end
 
 Blazer.register_adapter "athena", Blazer::Adapters::AthenaAdapter
 Blazer.register_adapter "bigquery", Blazer::Adapters::BigQueryAdapter
+Blazer.register_adapter "cassandra", Blazer::Adapters::CassandraAdapter
 Blazer.register_adapter "drill", Blazer::Adapters::DrillAdapter
 Blazer.register_adapter "druid", Blazer::Adapters::DruidAdapter
 Blazer.register_adapter "elasticsearch", Blazer::Adapters::ElasticsearchAdapter

+ 59 - 0
lib/blazer/adapters/cassandra_adapter.rb

@@ -0,0 +1,59 @@
+module Blazer
+  module Adapters
+    class CassandraAdapter < BaseAdapter
+      def run_statement(statement, comment)
+        columns = []
+        rows = []
+        error = nil
+
+        begin
+          response = session.execute("#{statement} /*#{comment}*/")
+          rows = response.map { |r| r.values }
+          columns = rows.any? ? response.first.keys : []
+        rescue => e
+          error = e.message
+        end
+
+        [columns, rows, error]
+      end
+
+      def tables
+        session.execute("SELECT table_name FROM system_schema.tables WHERE keyspace_name = '#{keyspace}'").map { |r| r["table_name"] }
+      end
+
+      def schema
+        result = session.execute("SELECT keyspace_name, table_name, column_name, type, position FROM system_schema.columns WHERE keyspace_name = '#{keyspace}'")
+        result.map(&:values).group_by { |r| [r[0], r[1]] }.map { |k, vs| {schema: k[0], table: k[1], columns: vs.sort_by { |v| v[2] }.map { |v| {name: v[2], data_type: v[3]} }} }
+      end
+
+      def preview_statement
+        "SELECT * FROM {table} LIMIT 10"
+      end
+
+      private
+
+      def cluster
+        @cluster ||= begin
+          require "cassandra"
+          options = {hosts: [uri.host]}
+          options[:port] = uri.port if uri.port
+          options[:username] = uri.user if uri.user
+          options[:password] = uri.password if uri.password
+          ::Cassandra.cluster(options)
+        end
+      end
+
+      def session
+        @session ||= cluster.connect(keyspace)
+      end
+
+      def uri
+        @uri ||= URI.parse(data_source.settings["url"])
+      end
+
+      def keyspace
+        @keyspace ||= uri.path[1..-1]
+      end
+    end
+  end
+end

+ 1 - 1
lib/blazer/data_source.rb

@@ -186,7 +186,7 @@ module Blazer
     def detect_adapter
       schema = settings["url"].to_s.split("://").first
       case schema
-      when "mongodb", "presto"
+      when "mongodb", "presto", "cassandra"
         schema
       else
         "sql"