Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
O
olapy
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Labels
Merge Requests
2
Merge Requests
2
Analytics
Analytics
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Commits
Open sidebar
nexedi
olapy
Commits
12be070e
Commit
12be070e
authored
May 12, 2017
by
mouadh
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
split execute into db, csv and config_file execute
parent
1bf75ab7
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
246 additions
and
227 deletions
+246
-227
olapy/core/mdx/executor/execute.py
olapy/core/mdx/executor/execute.py
+15
-227
olapy/core/mdx/executor/execute_config_file.py
olapy/core/mdx/executor/execute_config_file.py
+132
-0
olapy/core/mdx/executor/execute_csv_files.py
olapy/core/mdx/executor/execute_csv_files.py
+47
-0
olapy/core/mdx/executor/execute_db.py
olapy/core/mdx/executor/execute_db.py
+52
-0
No files found.
olapy/core/mdx/executor/execute.py
View file @
12be070e
...
...
@@ -10,7 +10,11 @@ from os.path import expanduser
import
numpy
as
np
import
pandas
as
pd
import
pandas.io.sql
as
psql
from
.execute_csv_files
import
_load_tables_csv_files
,
_construct_star_schema_csv_files
from
.execute_db
import
_load_tables_db
,
_construct_star_schema_db
from
.execute_config_file
import
_load_table_config_file
from
.execute_config_file
import
_construct_web_star_schema_config_file
,
_construct_star_schema_config_file
from
..tools.config_file_parser
import
ConfigParser
from
..tools.connection
import
MyDB
...
...
@@ -120,75 +124,6 @@ class MdxEngine:
"""
return
self
.
tables_loaded
.
keys
()
def
_load_table_config_file
(
self
,
cube_obj
):
"""
Load tables from config file.
:param cube_obj: cubes object
:return: tables dict with table name as key and DataFrame as value
"""
tables
=
{}
# just one facts table right now
self
.
facts
=
cube_obj
.
facts
[
0
].
table_name
db
=
MyDB
(
db
=
self
.
cube
)
for
table
in
cube_obj
.
dimensions
:
value
=
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
table
.
name
),
db
.
connection
)
tables
[
table
.
name
]
=
value
[[
col
for
col
in
value
.
columns
if
col
.
lower
()[
-
3
:]
!=
'_id'
]]
# update table display name
for
dimension
in
cube_obj
.
dimensions
:
if
dimension
.
displayName
and
dimension
.
name
and
dimension
.
displayName
!=
dimension
.
name
:
tables
[
dimension
.
displayName
]
=
tables
[
dimension
.
name
][
dimension
.
columns
]
MdxEngine
.
dimension_display_name
.
append
(
dimension
.
name
)
return
tables
def
_load_tables_csv_files
(
self
):
"""
Load tables from csv files.
:return: tables dict with table name as key and dataframe as value
"""
tables
=
{}
cube
=
self
.
get_cube
()
for
file
in
os
.
listdir
(
cube
):
# to remove file extension ".csv"
table_name
=
os
.
path
.
splitext
(
file
)[
0
]
value
=
pd
.
read_csv
(
os
.
path
.
join
(
cube
,
file
),
sep
=
self
.
sep
)
tables
[
table_name
]
=
value
[[
col
for
col
in
value
.
columns
if
col
.
lower
()[
-
3
:]
!=
'_id'
]]
return
tables
def
_load_tables_db
(
self
):
"""
Load tables from database.
:return: tables dict with table name as key and dataframe as value
"""
tables
=
{}
db
=
MyDB
(
db
=
self
.
cube
)
cursor
=
db
.
connection
.
cursor
()
cursor
.
execute
(
"""SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'"""
)
for
table_name
in
cursor
.
fetchall
():
value
=
psql
.
read_sql_query
(
'SELECT * FROM "{0}" '
.
format
(
table_name
[
0
]),
db
.
connection
)
tables
[
table_name
[
0
]]
=
value
[[
col
for
col
in
value
.
columns
if
col
.
lower
()[
-
3
:]
!=
'_id'
]]
return
tables
def
load_tables
(
self
):
"""
Load all tables { Table name : DataFrame } of the current cube instance.
...
...
@@ -203,13 +138,13 @@ class MdxEngine:
# TODO working with cubes.source == 'csv'
if
cubes
.
source
==
'postgres'
:
tables
=
self
.
_load_table_config_file
(
cubes
)
tables
=
_load_table_config_file
(
self
,
cubes
)
elif
self
.
cube
in
self
.
csv_files_cubes
:
tables
=
self
.
_load_tables_csv_files
(
)
tables
=
_load_tables_csv_files
(
self
)
elif
self
.
cube
in
self
.
postgres_db_cubes
:
tables
=
self
.
_load_tables_db
(
)
tables
=
_load_tables_db
(
self
)
return
tables
...
...
@@ -222,152 +157,6 @@ class MdxEngine:
include
=
[
np
.
number
]).
columns
if
col
.
lower
()[
-
2
:]
!=
'id'
]
def
_construct_star_schema_config_file
(
self
,
cube_name
,
cubes_obj
):
"""
Construct star schema DataFrame from configuration file.
:param cube_name: cube name (or database name)
:param cubes_obj: cubes object
:return: star schema DataFrame
"""
self
.
facts
=
cubes_obj
.
facts
[
0
].
table_name
db
=
MyDB
(
db
=
cube_name
)
# load facts table
fusion
=
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
self
.
facts
),
db
.
connection
)
for
fact_key
,
dimension_and_key
in
cubes_obj
.
facts
[
0
].
keys
.
items
():
df
=
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
dimension_and_key
.
split
(
'.'
)[
0
]),
db
.
connection
)
fusion
=
fusion
.
merge
(
df
,
left_on
=
fact_key
,
right_on
=
dimension_and_key
.
split
(
'.'
)[
1
])
# TODO CHOSE BETWEEN THOSES DF
# if separated dimensions
# fusion = fusion.merge(df, left_on=fact_key,right_on=dimension_and_key.split('.')[1])
# TODO CHOSE BETWEEN THOSES DF
# if facts contains all dimensions
# fusion = facts
# measures in config-file only
if
cubes_obj
.
facts
[
0
].
measures
:
self
.
measures
=
cubes_obj
.
facts
[
0
].
measures
return
fusion
def
_construct_web_star_schema_config_file
(
self
,
cube_name
,
cubes_obj
):
"""
Construct star schema DataFrame from configuration file.
:param cube_name: cube name (or database name)
:param cubes_obj: cubes object
:return: star schema DataFrame
"""
all_columns
=
[]
self
.
facts
=
cubes_obj
.
facts
[
0
].
table_name
db
=
MyDB
(
db
=
cube_name
)
# load facts table
# measures in config-file only
if
cubes_obj
.
facts
[
0
].
measures
:
self
.
measures
=
cubes_obj
.
facts
[
0
].
measures
all_columns
+=
cubes_obj
.
facts
[
0
].
measures
fusion
=
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
self
.
facts
),
db
.
connection
)
tables
=
{}
for
table
in
cubes_obj
.
tables
:
tab
=
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
table
.
name
),
db
.
connection
)
try
:
if
table
.
columns
:
tab
=
tab
[
table
.
columns
]
except
:
print
(
"table columns doesn't exist"
)
print
(
'pass with all columns'
)
try
:
if
table
.
new_names
:
tab
=
tab
.
rename
(
columns
=
table
.
new_names
)
except
:
print
(
"verify your old and new columns names"
)
print
(
'pass with no change'
)
all_columns
+=
list
(
tab
.
columns
)
tables
.
update
({
table
.
name
:
tab
})
for
fact_key
,
dimension_and_key
in
cubes_obj
.
facts
[
0
].
keys
.
items
():
dimension_name
=
dimension_and_key
.
split
(
'.'
)[
0
]
if
dimension_name
in
tables
.
keys
():
df
=
tables
[
dimension_name
]
else
:
df
=
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
dimension_and_key
.
split
(
'.'
)[
0
]),
db
.
connection
)
fusion
=
fusion
.
merge
(
df
,
left_on
=
fact_key
,
right_on
=
dimension_and_key
.
split
(
'.'
)[
1
])
return
fusion
[[
column
for
column
in
all_columns
if
'id'
!=
column
[
-
2
:]
]]
def
_construct_star_schema_csv_files
(
self
,
cube_name
):
"""
Construct star schema DataFrame from csv files.
:param cube_name: cube name (folder name)
:return: star schema DataFrame
"""
cube
=
self
.
get_cube
()
# loading facts table
fusion
=
pd
.
read_csv
(
os
.
path
.
join
(
cube
,
self
.
facts
+
'.csv'
),
sep
=
self
.
sep
)
for
file_name
in
os
.
listdir
(
cube
):
try
:
fusion
=
fusion
.
merge
(
pd
.
read_csv
(
os
.
path
.
join
(
cube
,
file_name
),
sep
=
self
.
sep
))
except
:
print
(
'No common column'
)
pass
return
fusion
def
_construct_star_schema_db
(
self
,
cube_name
):
"""
Construct star schema DataFrame from database.
:param cube_name: cube name (database name)
:return: star schema DataFrame
"""
db
=
MyDB
(
db
=
cube_name
)
# load facts table
fusion
=
psql
.
read_sql_query
(
'SELECT * FROM "{0}" '
.
format
(
self
.
facts
),
db
.
connection
)
cursor
=
db
.
connection
.
cursor
()
cursor
.
execute
(
"""SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'"""
)
for
db_table_name
in
cursor
.
fetchall
():
try
:
fusion
=
fusion
.
merge
(
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
db_table_name
[
0
]),
db
.
connection
))
except
:
print
(
'No common column'
)
pass
return
fusion
def
get_star_schema_dataframe
(
self
,
client_type
=
'excel'
):
"""
Merge all DataFrames as star schema.
...
...
@@ -378,25 +167,24 @@ class MdxEngine:
fusion
=
None
config_file_parser
=
ConfigParser
(
self
.
cube_path
)
if
config_file_parser
.
config_file_exist
(
client_type
if
config_file_parser
.
config_file_exist
(
client_type
)
and
self
.
cube
in
config_file_parser
.
get_cubes_names
():
for
cubes
in
config_file_parser
.
construct_cubes
(
client_type
):
# TODO cubes.source == 'csv'
if
cubes
.
source
==
'postgres'
:
# TODO one config file (I will try to merge dimensions between them in web part)
if
client_type
==
'web'
:
fusion
=
self
.
_construct_web_star_schema_config_file
(
self
.
cube
,
cubes
)
fusion
=
_construct_web_star_schema_config_file
(
self
,
cubes
)
else
:
fusion
=
self
.
_construct_star_schema_config_file
(
self
.
cube
,
cubes
)
fusion
=
_construct_star_schema_config_file
(
self
,
cubes
)
elif
self
.
cube
in
self
.
csv_files_cubes
:
fusion
=
self
.
_construct_star_schema_csv_files
(
self
.
cube
)
fusion
=
_construct_star_schema_csv_files
(
self
)
elif
self
.
cube
in
self
.
postgres_db_cubes
:
fusion
=
self
.
_construct_star_schema_db
(
self
.
cube
)
fusion
=
_construct_star_schema_db
(
self
)
return
fusion
[[
col
for
col
in
fusion
.
columns
if
col
.
lower
()[
-
3
:]
!=
'_id'
...
...
olapy/core/mdx/executor/execute_config_file.py
0 → 100644
View file @
12be070e
from
..tools.connection
import
MyDB
import
pandas.io.sql
as
psql
def
_load_table_config_file
(
executer_instance
,
cube_obj
):
"""
Load tables from config file.
:param cube_obj: cubes object
:return: tables dict with table name as key and DataFrame as value
"""
tables
=
{}
# just one facts table right now
executer_instance
.
facts
=
cube_obj
.
facts
[
0
].
table_name
db
=
MyDB
(
db
=
executer_instance
.
cube
)
for
table
in
cube_obj
.
dimensions
:
value
=
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
table
.
name
),
db
.
connection
)
tables
[
table
.
name
]
=
value
[[
col
for
col
in
value
.
columns
if
col
.
lower
()[
-
3
:]
!=
'_id'
]]
# update table display name
for
dimension
in
cube_obj
.
dimensions
:
if
dimension
.
displayName
and
dimension
.
name
and
dimension
.
displayName
!=
dimension
.
name
:
tables
[
dimension
.
displayName
]
=
tables
[
dimension
.
name
][
dimension
.
columns
]
executer_instance
.
dimension_display_name
.
append
(
dimension
.
name
)
return
tables
def
_construct_star_schema_config_file
(
executer_instance
,
cubes_obj
):
"""
Construct star schema DataFrame from configuration file.
:param cube_name: cube name (or database name)
:param cubes_obj: cubes object
:return: star schema DataFrame
"""
executer_instance
.
facts
=
cubes_obj
.
facts
[
0
].
table_name
db
=
MyDB
(
db
=
executer_instance
.
cube
)
# load facts table
fusion
=
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
executer_instance
.
facts
),
db
.
connection
)
for
fact_key
,
dimension_and_key
in
cubes_obj
.
facts
[
0
].
keys
.
items
():
df
=
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
dimension_and_key
.
split
(
'.'
)[
0
]),
db
.
connection
)
fusion
=
fusion
.
merge
(
df
,
left_on
=
fact_key
,
right_on
=
dimension_and_key
.
split
(
'.'
)[
1
])
# TODO CHOSE BETWEEN THOSES DF
# if separated dimensions
# fusion = fusion.merge(df, left_on=fact_key,right_on=dimension_and_key.split('.')[1])
# TODO CHOSE BETWEEN THOSES DF
# if facts contains all dimensions
# fusion = facts
# measures in config-file only
if
cubes_obj
.
facts
[
0
].
measures
:
executer_instance
.
measures
=
cubes_obj
.
facts
[
0
].
measures
return
fusion
def
_construct_web_star_schema_config_file
(
executer_instance
,
cubes_obj
):
"""
Construct star schema DataFrame from configuration file.
:param cube_name: cube name (or database name)
:param cubes_obj: cubes object
:return: star schema DataFrame
"""
all_columns
=
[]
executer_instance
.
facts
=
cubes_obj
.
facts
[
0
].
table_name
db
=
MyDB
(
db
=
executer_instance
.
cube
)
# load facts table
# measures in config-file only
if
cubes_obj
.
facts
[
0
].
measures
:
executer_instance
.
measures
=
cubes_obj
.
facts
[
0
].
measures
all_columns
+=
cubes_obj
.
facts
[
0
].
measures
fusion
=
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
executer_instance
.
facts
),
db
.
connection
)
tables
=
{}
for
table
in
cubes_obj
.
tables
:
tab
=
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
table
.
name
),
db
.
connection
)
try
:
if
table
.
columns
:
tab
=
tab
[
table
.
columns
]
except
:
print
(
"table columns doesn't exist"
)
print
(
'pass with all columns'
)
try
:
if
table
.
new_names
:
tab
=
tab
.
rename
(
columns
=
table
.
new_names
)
except
:
print
(
"verify your old and new columns names"
)
print
(
'pass with no change'
)
all_columns
+=
list
(
tab
.
columns
)
tables
.
update
({
table
.
name
:
tab
})
for
fact_key
,
dimension_and_key
in
cubes_obj
.
facts
[
0
].
keys
.
items
():
dimension_name
=
dimension_and_key
.
split
(
'.'
)[
0
]
if
dimension_name
in
tables
.
keys
():
df
=
tables
[
dimension_name
]
else
:
df
=
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
dimension_and_key
.
split
(
'.'
)[
0
]),
db
.
connection
)
fusion
=
fusion
.
merge
(
df
,
left_on
=
fact_key
,
right_on
=
dimension_and_key
.
split
(
'.'
)[
1
])
return
fusion
[[
column
for
column
in
all_columns
if
'id'
!=
column
[
-
2
:]]]
olapy/core/mdx/executor/execute_csv_files.py
0 → 100644
View file @
12be070e
import
os
import
pandas
as
pd
def
_load_tables_csv_files
(
executer_instance
):
"""
Load tables from csv files.
:return: tables dict with table name as key and dataframe as value
"""
tables
=
{}
cube
=
executer_instance
.
get_cube
()
for
file
in
os
.
listdir
(
cube
):
# to remove file extension ".csv"
table_name
=
os
.
path
.
splitext
(
file
)[
0
]
value
=
pd
.
read_csv
(
os
.
path
.
join
(
cube
,
file
),
sep
=
executer_instance
.
sep
)
tables
[
table_name
]
=
value
[[
col
for
col
in
value
.
columns
if
col
.
lower
()[
-
3
:]
!=
'_id'
]]
return
tables
def
_construct_star_schema_csv_files
(
executer_instance
):
"""
Construct star schema DataFrame from csv files.
:param cube_name: cube name (folder name)
:return: star schema DataFrame
"""
cube
=
executer_instance
.
get_cube
()
# loading facts table
fusion
=
pd
.
read_csv
(
os
.
path
.
join
(
cube
,
executer_instance
.
facts
+
'.csv'
),
sep
=
executer_instance
.
sep
)
for
file_name
in
os
.
listdir
(
cube
):
try
:
fusion
=
fusion
.
merge
(
pd
.
read_csv
(
os
.
path
.
join
(
cube
,
file_name
),
sep
=
executer_instance
.
sep
))
except
:
print
(
'No common column'
)
pass
return
fusion
olapy/core/mdx/executor/execute_db.py
0 → 100644
View file @
12be070e
from
..tools.connection
import
MyDB
import
pandas.io.sql
as
psql
def
_load_tables_db
(
executer_instance
):
"""
Load tables from database.
:return: tables dict with table name as key and dataframe as value
"""
tables
=
{}
db
=
MyDB
(
db
=
executer_instance
.
cube
)
cursor
=
db
.
connection
.
cursor
()
cursor
.
execute
(
"""SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'"""
)
for
table_name
in
cursor
.
fetchall
():
value
=
psql
.
read_sql_query
(
'SELECT * FROM "{0}" '
.
format
(
table_name
[
0
]),
db
.
connection
)
tables
[
table_name
[
0
]]
=
value
[[
col
for
col
in
value
.
columns
if
col
.
lower
()[
-
3
:]
!=
'_id'
]]
return
tables
def
_construct_star_schema_db
(
executer_instance
):
"""
Construct star schema DataFrame from database.
:param cube_name: cube name (database name)
:return: star schema DataFrame
"""
db
=
MyDB
(
db
=
executer_instance
.
cube
)
# load facts table
fusion
=
psql
.
read_sql_query
(
'SELECT * FROM "{0}" '
.
format
(
executer_instance
.
facts
),
db
.
connection
)
cursor
=
db
.
connection
.
cursor
()
cursor
.
execute
(
"""SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'"""
)
for
db_table_name
in
cursor
.
fetchall
():
try
:
fusion
=
fusion
.
merge
(
psql
.
read_sql_query
(
"SELECT * FROM {0}"
.
format
(
db_table_name
[
0
]),
db
.
connection
))
except
:
print
(
'No common column'
)
pass
return
fusion
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment