Skip to content

Commit

Permalink
Bookmarks implementation for Atlas proxy (#88)
Browse files Browse the repository at this point in the history
* Bookmarks implementation for Atlas proxy

* UserFollowAPI will update atlas for table bookmark.

* Add, Delete bookmarks and Get all bookmarks per user.

* Added testcase - test_get_table_by_user_relation

* Create reader entity if it does not exist upon POST bookmark.

* Added tableUri in reader entity for fetching the table type.
  • Loading branch information
morpheyesh authored and feng-tao committed Nov 5, 2019
1 parent 920efef commit 7baf7d6
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 5 deletions.
141 changes: 136 additions & 5 deletions metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ class AtlasProxy(BaseProxy):
"""
TABLE_ENTITY = app.config['ATLAS_TABLE_ENTITY']
DB_ATTRIBUTE = app.config['ATLAS_DB_ATTRIBUTE']
READER_TYPE = 'Reader'
QN_KEY = 'qualifiedName'
BKMARKS_KEY = 'isFollowing'
METADATA_KEY = 'metadata'
GUID_KEY = 'guid'
ATTRS_KEY = 'attributes'
REL_ATTRS_KEY = 'relationshipAttributes'
ENTITY_URI_KEY = 'entityUri'
_CACHE = CacheManager(**parse_cache_config_options({'cache.regions': 'atlas_proxy',
'cache.atlas_proxy.type': 'memory',
'cache.atlas_proxy.expire': _ATLAS_PROXY_CACHE_EXPIRY_SEC}))
Expand Down Expand Up @@ -100,6 +105,28 @@ def _extract_info_from_uri(self, *, table_uri: str) -> Dict:
result = pattern.match(table_uri)
return result.groupdict() if result else dict()

def _parse_reader_qn(self, reader_qn: str) -> Dict:
"""
Parse reader qualifiedName and extract the info
:param reader_qn:
:return: Dictionary object containing following information:
cluster: cluster information
db: Database name
name: Table name
"""
pattern = re.compile(r"""
^(?P<db>[^.]*)
\.
(?P<table>[^.]*)\.metadata
\.
(?P<user_id>[^.]*)\.reader
\@
(?P<cluster>.*)
$
""", re.X)
result = pattern.match(reader_qn)
return result.groupdict() if result else dict()

def _get_table_entity(self, *, table_uri: str) -> Tuple[EntityUniqueAttribute, Dict]:
"""
Fetch information from table_uri and then find the appropriate entity
Expand All @@ -124,6 +151,72 @@ def _get_table_entity(self, *, table_uri: str) -> Tuple[EntityUniqueAttribute, D
raise NotFoundException('Table URI( {table_uri} ) does not exist'
.format(table_uri=table_uri))

def _get_user_entity(self, user_id: str) -> EntityUniqueAttribute:
"""
Fetches an user entity from an id
:param user_id:
:return:
"""
try:
return self._driver.entity_unique_attribute("User",
qualifiedName=user_id)
except Exception as ex:
raise NotFoundException('(User {user_id}) does not exist'
.format(user_id=user_id))

def _create_reader(self, metadata_guid: str, user_guid: str, reader_qn: str, table_uri: str) -> None:
"""
Creates a reader entity for a specific user and table uri.
:param metadata_guid: Table's metadata guid
:param user_guid: User's guid
:param reader_qn: Reader qualifiedName
:return:
"""
reader_entity = {
'typeName': self.READER_TYPE,
'attributes': {'qualifiedName': reader_qn,
'isFollowing': True,
'count': 0,
'entityMetadata': {'guid': metadata_guid},
'user': {'guid': user_guid},
'entityUri': table_uri}
}
self._driver.entity_bulk.create(data={'entities': [reader_entity]})

def _get_reader_entity(self, table_uri: str, user_id: str) -> EntityUniqueAttribute:
"""
Fetch a Reader entity from parsing table uri and user id.
If Reader is not present, create one for the user.
:param table_uri:
:param user_id: Qualified Name of a user
:return:
"""
table_info = self._extract_info_from_uri(table_uri=table_uri)
reader_qn = '{}.{}.metadata.{}.reader@{}'.format(table_info.get('db'),
table_info.get('name'),
user_id,
table_info.get('cluster'))

try:
reader_entity = self._driver.entity_unique_attribute(
self.READER_TYPE, qualifiedName=reader_qn)
if not reader_entity.entity:
# Fetch the table entity from the uri for obtaining metadata guid.
table_entity, table_info = self._get_table_entity(table_uri=table_uri)
# Fetch user entity from user_id for relation
user_entity = self._get_user_entity(user_id)
# Create reader entity with the metadata and user relation.
self._create_reader(table_entity.entity[self.ATTRS_KEY][self.METADATA_KEY][self.GUID_KEY],
user_entity.entity[self.GUID_KEY], reader_qn, table_uri)
# Fetch reader entity after creating it.
reader_entity = self._driver.entity_unique_attribute(self.READER_TYPE, qualifiedName=reader_qn)
return reader_entity

except Exception as ex:
LOGGER.exception(f'Reader not found. {str(ex)}')
raise NotFoundException('Reader( {reader_qn} ) does not exist'
.format(reader_qn=reader_qn))

def _get_column(self, *, table_uri: str, column_name: str) -> Dict:
"""
Fetch the column information from referredEntities of the table entity
Expand Down Expand Up @@ -416,9 +509,42 @@ def get_tags(self) -> List:
)
return tags

def get_table_by_user_relation(self, *, user_email: str,
relation_type: UserResourceRel) -> Dict[str, Any]:
pass
def get_table_by_user_relation(self, *, user_email: str, relation_type: UserResourceRel) -> Dict[str, Any]:
params = {
'typeName': self.READER_TYPE,
'offset': '0',
'limit': '1000',
'entityFilters': {
'condition': 'AND',
'criterion': [
{
'attributeName': self.QN_KEY,
'operator': 'contains',
'attributeValue': user_email
},
{
'attributeName': self.BKMARKS_KEY,
'operator': 'eq',
'attributeValue': 'true'
}
]
},
'attributes': ['count', self.QN_KEY, self.ENTITY_URI_KEY]
}
# Fetches the reader entities based on filters
search_results = self._driver.search_basic.create(data=params)

results = []
for record in search_results.entities:
table_info = self._extract_info_from_uri(table_uri=record.attributes[self.ENTITY_URI_KEY])
res = self._parse_reader_qn(record.attributes[self.QN_KEY])
results.append(PopularTable(
database=table_info['entity'],
cluster=res['cluster'],
schema=res['db'],
name=res['table']))

return {'table': results}

def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, Any]:
pass
Expand All @@ -427,10 +553,15 @@ def add_table_relation_by_user(self, *,
table_uri: str,
user_email: str,
relation_type: UserResourceRel) -> None:
pass

entity = self._get_reader_entity(table_uri=table_uri, user_id=user_email)
entity.entity[self.ATTRS_KEY][self.BKMARKS_KEY] = True
entity.update()

def delete_table_relation_by_user(self, *,
table_uri: str,
user_email: str,
relation_type: UserResourceRel) -> None:
pass
entity = self._get_reader_entity(table_uri=table_uri, user_id=user_email)
entity.entity[self.ATTRS_KEY][self.BKMARKS_KEY] = False
entity.update()
38 changes: 38 additions & 0 deletions tests/unit/proxy/fixtures/atlas_test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,41 @@ class Data:
entity2,
]
}

reader_entity1 = {
"typeName": "Reader",
"attributes": {
"isFollowing": True,
"qualifiedName": '{}.{}.metadata.{}.reader@{}'.format(db, name, 'test_user_id', cluster),
"count": 96,
"entityUri": table_uri,
},
"guid": "0fa40fd5-016c-472e-a72f-25a5013cc818",
"status": "ACTIVE",
"displayText": '{}.{}.metadata.{}.reader@{}'.format(db, name, 'test_user_id', cluster),
"classificationNames": [],
"meaningNames": [],
"meanings": []
}

reader_entity2 = {
"typeName": "Reader",
"attributes": {
"isFollowing": True,
"qualifiedName": '{}.{}.metadata.{}.reader@{}'.format(db, 'Table2', 'test_user_id', cluster),
"count": 96
},
"guid": "0fa40fd5-016c-472e-a72f-a72ffa40fd5",
"status": "ACTIVE",
"displayText": '{}.{}.metadata.{}.reader@{}'.format(db, 'Table2', 'test_user_id', cluster),
"classificationNames": [],
"meaningNames": [],
"meanings": []
}

reader_entities = {
'entities': [
reader_entity1,
reader_entity2,
]
}
39 changes: 39 additions & 0 deletions tests/unit/proxy/test_atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ def _mock_get_table_entity(self, entity=None):
}))
return mocked_entity

def _mock_get_reader_entity(self, entity=None):
entity = entity or self.entity1
mocked_entity = MagicMock()
mocked_entity.entity = entity
self.proxy._get_reader_entity = MagicMock(return_value=mocked_entity)
return mocked_entity

def test_extract_table_uri_info(self):
table_info = self.proxy._extract_info_from_uri(table_uri=self.table_uri)
self.assertDictEqual(table_info, {
Expand Down Expand Up @@ -333,6 +340,38 @@ def test_put_column_description(self):
column_name=self.test_column['attributes']['name'],
description='DOESNT_MATTER')

def test_get_table_by_user_relation(self):

reader1 = copy.deepcopy(self.reader_entity1)
reader1 = self.to_class(reader1)
reader_collection = MagicMock()
reader_collection.entities = [reader1]

self.proxy._driver.search_basic.create = MagicMock(return_value=reader_collection)
res = self.proxy.get_table_by_user_relation(user_email='test_user_id',
relation_type='follow')

expected = [PopularTable(database=Data.entity_type, cluster=Data.cluster, schema=Data.db,
name=Data.name, description=None)]

self.assertEqual(res, {'table': expected})

def test_add_resource_relation_by_user(self):
reader_entity = self._mock_get_reader_entity()
with patch.object(reader_entity, 'update') as mock_execute:
self.proxy.add_table_relation_by_user(table_uri=self.table_uri,
user_email="test_user_id",
relation_type='follow')
mock_execute.assert_called_with()

def test_delete_resource_relation_by_user(self):
reader_entity = self._mock_get_reader_entity()
with patch.object(reader_entity, 'update') as mock_execute:
self.proxy.delete_table_relation_by_user(table_uri=self.table_uri,
user_email="test_user_id",
relation_type='follow')
mock_execute.assert_called_with()


if __name__ == '__main__':
unittest.main()

0 comments on commit 7baf7d6

Please sign in to comment.