bigquery_adapter.rb 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. module Blazer
  2. module Adapters
  3. class BigQueryAdapter < BaseAdapter
  4. def run_statement(statement, comment)
  5. columns = []
  6. rows = []
  7. error = nil
  8. begin
  9. results = bigquery.query(statement, timeout: 30000) # ms
  10. columns = results.first.keys.map(&:to_s) if results.size > 0
  11. rows = results.map(&:values)
  12. rescue => e
  13. error = e.message
  14. end
  15. [columns, rows, error]
  16. end
  17. def tables
  18. table_refs.map { |t| "#{t.project_id}.#{t.dataset_id}.#{t.table_id}" }
  19. end
  20. def schema
  21. table_refs.map do |table_ref|
  22. {
  23. schema: table_ref.dataset_id,
  24. table: table_ref.table_id,
  25. columns: table_columns(table_ref)
  26. }
  27. end
  28. end
  29. def preview_statement
  30. "SELECT * FROM `{table}` LIMIT 10"
  31. end
  32. private
  33. def bigquery
  34. @bigquery ||= begin
  35. require "google/cloud/bigquery"
  36. Google::Cloud::Bigquery.new(
  37. project: settings["project"],
  38. keyfile: settings["keyfile"]
  39. )
  40. end
  41. end
  42. def table_refs
  43. bigquery.datasets.map(&:tables).flat_map { |table_list| table_list.map(&:table_ref) }
  44. end
  45. def table_columns(table_ref)
  46. schema = bigquery.service.get_table(table_ref.dataset_id, table_ref.table_id).schema
  47. return [] if schema.nil?
  48. schema.fields.map { |field| {name: field.name, data_type: field.type} }
  49. end
  50. end
  51. end
  52. end