bigquery_adapter.rb 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. module Blazer
  2. module Adapters
  3. class BigQueryAdapter < BaseAdapter
  4. def run_statement(statement, comment)
  5. columns = []
  6. rows = []
  7. error = nil
  8. begin
  9. options = {}
  10. options[:timeout] = data_source.timeout.to_i * 1000 if data_source.timeout
  11. results = bigquery.query(statement, options) # ms
  12. if results.complete?
  13. columns = results.first.keys.map(&:to_s) if results.size > 0
  14. rows = results.map(&:values)
  15. else
  16. error = Blazer::TIMEOUT_MESSAGE
  17. end
  18. rescue => e
  19. error = e.message
  20. end
  21. [columns, rows, error]
  22. end
  23. def tables
  24. table_refs.map { |t| "#{t.project_id}.#{t.dataset_id}.#{t.table_id}" }
  25. end
  26. def schema
  27. table_refs.map do |table_ref|
  28. {
  29. schema: table_ref.dataset_id,
  30. table: table_ref.table_id,
  31. columns: table_columns(table_ref)
  32. }
  33. end
  34. end
  35. def preview_statement
  36. "SELECT * FROM `{table}` LIMIT 10"
  37. end
  38. private
  39. def bigquery
  40. @bigquery ||= begin
  41. require "google/cloud/bigquery"
  42. Google::Cloud::Bigquery.new(
  43. project: settings["project"],
  44. keyfile: settings["keyfile"]
  45. )
  46. end
  47. end
  48. def table_refs
  49. bigquery.datasets.map(&:tables).flat_map { |table_list| table_list.map(&:table_ref) }
  50. end
  51. def table_columns(table_ref)
  52. schema = bigquery.service.get_table(table_ref.dataset_id, table_ref.table_id).schema
  53. return [] if schema.nil?
  54. schema.fields.map { |field| {name: field.name, data_type: field.type} }
  55. end
  56. end
  57. end
  58. end