Skip to content

Commit

Permalink
Merge pull request #11 from lyft/tfeng_add_relation
Browse files Browse the repository at this point in the history
[Amundsen People] Add API to build relationship between user and table
  • Loading branch information
Tao Feng authored Mar 15, 2019
2 parents 69b6634 + ef6c5ba commit 7ab2116
Show file tree
Hide file tree
Showing 7 changed files with 430 additions and 8 deletions.
12 changes: 10 additions & 2 deletions metadata_service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from metadata_service.api.table \
import TableDetailAPI, TableOwnerAPI, TableTagAPI, TableDescriptionAPI
from metadata_service.api.tag import TagAPI
from metadata_service.api.user import UserDetailAPI
from metadata_service.api.user import UserDetailAPI, UserFollowAPI, UserOwnAPI, UserReadAPI

# For customized flask use below arguments to override.
FLASK_APP_MODULE_NAME = os.getenv('FLASK_APP_MODULE_NAME')
Expand Down Expand Up @@ -86,7 +86,15 @@ def create_app(*, config_module_class: str) -> Flask:
'/tags/')
api.add_resource(UserDetailAPI,
'/user/<path:user_id>')

api.add_resource(UserFollowAPI,
'/user/<path:user_id>/follow/',
'/user/<path:user_id>/follow/<resource_type>/<path:table_uri>')
api.add_resource(UserOwnAPI,
'/user/<path:user_id>/own/',
'/user/<path:user_id>/own/<resource_type>/<path:table_uri>')
api.add_resource(UserReadAPI,
'/user/<path:user_id>/read/',
'/user/<path:user_id>/read/<resource_type>/<path:table_uri>')
app.register_blueprint(api_bp)

return app
171 changes: 169 additions & 2 deletions metadata_service/api/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

from flask_restful import Resource, fields, marshal

from metadata_service.api.table import table_detail_fields
from metadata_service.exception import NotFoundException
from metadata_service.proxy import neo4j_proxy
from metadata_service.util import UserResourceRel


user_detail_fields = {
Expand All @@ -20,6 +22,10 @@
'manager_fullname': fields.String, # Optional
}

table_list_fields = {
'table': fields.List(fields.Nested(table_detail_fields))
}


class UserDetailAPI(Resource):
"""
Expand All @@ -31,8 +37,169 @@ def __init__(self) -> None:

def get(self, user_id: str) -> Iterable[Union[Mapping, int, None]]:
try:
table = self.neo4j.get_user_detail(user_id=user_id)
return marshal(table, user_detail_fields), HTTPStatus.OK
user = self.neo4j.get_user_detail(user_id=user_id)
return marshal(user, user_detail_fields), HTTPStatus.OK

except NotFoundException:
return {'message': 'User id {} does not exist'.format(user_id)}, HTTPStatus.NOT_FOUND


class UserFollowAPI(Resource):
"""
Build get / put API to support user follow resource features.
It will create a relationship(follow / followed_by) between user and resources(table, dashboard etc)
"""

def __init__(self) -> None:
self.neo4j = neo4j_proxy.get_neo4j()

def get(self, user_id: str) -> Iterable[Union[Mapping, int, None]]:
"""
Return a list of resources that user has followed
:param user_id:
:return:
"""
try:
resources = self.neo4j.get_table_by_user_relation(user_email=user_id,
relation_type=UserResourceRel.follow)
return marshal(resources, table_list_fields), HTTPStatus.OK

except NotFoundException:
return {'message': 'user_id {} does not exist'.format(user_id)}, HTTPStatus.NOT_FOUND

except Exception:
return {'message': 'Internal server error!'}, HTTPStatus.INTERNAL_SERVER_ERROR

def put(self, user_id: str, resource_type: str, table_uri: str) -> Iterable[Union[Mapping, int, None]]:
"""
Create the follow relationship between user and resources.
todo: It will need to refactor all neo4j proxy api to take a type argument.
:param user_id:
:param table_uri:
:return:
"""
try:
self.neo4j.add_table_relation_by_user(table_uri=table_uri,
user_email=user_id,
relation_type=UserResourceRel.follow)
return {'message': 'The user {} for table_uri {} '
'is added successfully'.format(user_id,
table_uri)}, HTTPStatus.OK
except Exception as e:
return {'message': 'The user {} for table_uri {} '
'is not added successfully'.format(user_id,
table_uri)}, \
HTTPStatus.INTERNAL_SERVER_ERROR

def delete(self, user_id: str, resource_type: str, table_uri: str) -> Iterable[Union[Mapping, int, None]]:
"""
Delete the follow relationship between user and resources.
todo: It will need to refactor all neo4j proxy api to take a type argument.
:param user_id:
:param table_uri:
:return:
"""
try:
self.neo4j.delete_table_relation_by_user(table_uri=table_uri,
user_email=user_id,
relation_type=UserResourceRel.follow)
return {'message': 'The user {} for table_uri {} '
'is added successfully'.format(user_id,
table_uri)}, HTTPStatus.OK
except Exception as e:
return {'message': 'The user {} for table_uri {} '
'is not added successfully'.format(user_id,
table_uri)}, \
HTTPStatus.INTERNAL_SERVER_ERROR


class UserOwnAPI(Resource):
"""
Build get / put API to support user own resource features.
It will create a relationship(owner / owner_of) between user and resources(table, dashboard etc)
todo: Deprecate TableOwner API
"""

def __init__(self) -> None:
self.neo4j = neo4j_proxy.get_neo4j()

def get(self, user_id: str) -> Iterable[Union[Mapping, int, None]]:
"""
Return a list of resources that user has owned
:param user_id:
:return:
"""
try:
resources = self.neo4j.get_table_by_user_relation(user_email=user_id,
relation_type=UserResourceRel.own)
return marshal(resources, table_list_fields), HTTPStatus.OK

except NotFoundException:
return {'message': 'user_id {} does not exist'.format(user_id)}, HTTPStatus.NOT_FOUND

except Exception:
return {'message': 'Internal server error!'}, HTTPStatus.INTERNAL_SERVER_ERROR

def put(self, user_id: str, resource_type: str, table_uri: str) -> Iterable[Union[Mapping, int, None]]:
"""
Create the follow relationship between user and resources.
:param user_id:
:param resource_type:
:param table_uri:
:return:
"""
try:
self.neo4j.add_owner(table_uri=table_uri,
owner=user_id)
return {'message': 'The owner {} for table_uri {} '
'is added successfully'.format(user_id,
table_uri)}, HTTPStatus.OK
except Exception as e:
return {'message': 'The owner {} for table_uri {} '
'is not added successfully'.format(user_id,
table_uri)}, HTTPStatus.INTERNAL_SERVER_ERROR

def delete(self, user_id: str, resource_type: str, table_uri: str) -> Iterable[Union[Mapping, int, None]]:
try:
self.neo4j.delete_owner(table_uri=table_uri,
owner=user_id)
return {'message': 'The owner {} for table_uri {} '
'is deleted successfully'.format(user_id,
table_uri)}, HTTPStatus.OK
except Exception:
return {'message': 'The owner {} for table_uri {} '
'is not deleted successfully'.format(user_id,
table_uri)}, HTTPStatus.INTERNAL_SERVER_ERROR


class UserReadAPI(Resource):
"""
Build get / put API to support user read resource features.
It will create a relationship(read / read_by) between user and resources(table, dashboard etc)
"""

def __init__(self) -> None:
self.neo4j = neo4j_proxy.get_neo4j()

def get(self, user_id: str) -> Iterable[Union[Mapping, int, None]]:
"""
Return a list of resources that user has read
:param user_id:
:return:
"""
try:
resources = self.neo4j.get_table_by_user_relation(user_email=user_id,
relation_type=UserResourceRel.read)
return marshal(resources, table_list_fields), HTTPStatus.OK

except NotFoundException:
return {'message': 'user_id {} does not exist'.format(user_id)}, HTTPStatus.NOT_FOUND

except Exception:
return {'message': 'Internal server error!'}, HTTPStatus.INTERNAL_SERVER_ERROR
147 changes: 146 additions & 1 deletion metadata_service/proxy/neo4j_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from metadata_service.entity.user_detail import User as UserEntity
from metadata_service.exception import NotFoundException
from metadata_service.proxy.statsd_utilities import timer_with_counter
from metadata_service.util import UserResourceRel

_CACHE = CacheManager(**parse_cache_config_options({'cache.type': 'memory'}))

Expand Down Expand Up @@ -706,6 +707,10 @@ def get_user_detail(self, *, user_id: str) -> Union[UserEntity, None]:
single_result = record.single()
record = single_result.get('user_record', {})
manager_record = single_result.get('manager_record', {})
if manager_record:
manager_name = manager_record.get('full_name', '')
else:
manager_name = ''
result = UserEntity(email=record['email'],
first_name=record.get('first_name'),
last_name=record.get('last_name'),
Expand All @@ -715,9 +720,149 @@ def get_user_detail(self, *, user_id: str) -> Union[UserEntity, None]:
team_name=record.get('team_name'),
slack_id=record.get('slack_id'),
employee_type=record.get('employee_type'),
manager_fullname=manager_record.get('full_name', ''))
manager_fullname=manager_name)
return result

@staticmethod
def _get_relation_by_type(relation_type: UserResourceRel) -> Tuple:

if relation_type == UserResourceRel.follow:
relation, reverse_relation = 'FOLLOW', 'FOLLOWED_BY'
elif relation_type == UserResourceRel.own:
relation, reverse_relation = 'OWNER_OF', 'OWNER'
elif relation_type == UserResourceRel.read:
relation, reverse_relation = 'READ', 'READ_BY'
else:
raise NotImplementedError('The relation type {} is not defined!'.format(relation_type))
return relation, reverse_relation

@timer_with_counter
def get_table_by_user_relation(self, *, user_email: str, relation_type: UserResourceRel) -> Dict[str, Any]:
"""
Retrive all follow the resources per user based on the relation.
We start with table resources only, then add dashboard.
:param user_email: the email of the user
:param relation_type: the relation between the user and the resource
:return:
"""
relation, _ = self._get_relation_by_type(relation_type)
# relationship can't be parameterized
query_key = 'key: "{user_id}"'.format(user_id=user_email)

query = textwrap.dedent("""
MATCH (user:User {{{key}}})-[:{relation}]->(tbl:Table)
RETURN COLLECT(DISTINCT tbl) as table_records
""").format(key=query_key,
relation=relation)

record = self._execute_cypher_query(statement=query,
param_dict={})

if not record:
raise NotFoundException('User {user_id} does not {relation} '
'any resources'.format(user_id=user_email,
relation=relation))
results = []
table_records = record.single().get('table_records', [])

for record in table_records:
# todo: decide whether we want to return a list of table entities or just table_uri
results.append(self.get_table(table_uri=record['key']))
return {'table': results}

@timer_with_counter
def add_table_relation_by_user(self, *,
table_uri: str,
user_email: str,
relation_type: UserResourceRel) -> None:
"""
Update table user informations.
1. Do a upsert of the user node.
2. Do a upsert of the relation/reverse-relation edge.
:param table_uri:
:param user_email:
:param relation_type:
:return:
"""
relation, reverse_relation = self._get_relation_by_type(relation_type)

upsert_user_query = textwrap.dedent("""
MERGE (u:User {key: $user_email})
on CREATE SET u={email: $user_email, key: $user_email}
on MATCH SET u={email: $user_email, key: $user_email}
""")

user_email = 'key: "{user_email}"'.format(user_email=user_email)
tbl_key = 'key: "{tbl_key}"'.format(tbl_key=table_uri)

upsert_user_relation_query = textwrap.dedent("""
MATCH (n1:User {{{user_email}}}), (n2:Table {{{tbl_key}}})
MERGE (n1)-[r1:{relation}]->(n2)-[r2:{reverse_relation}]->(n1)
RETURN n1.key, n2.key
""").format(user_email=user_email,
tbl_key=tbl_key,
relation=relation,
reverse_relation=reverse_relation)

try:
tx = self._driver.session().begin_transaction()
# upsert the node
tx.run(upsert_user_query, {'user_email': user_email})
result = tx.run(upsert_user_relation_query, {})

if not result.single():
raise RuntimeError('Failed to create relation between '
'user {user} and table {tbl}'.format(user=user_email,
tbl=table_uri))
tx.commit()
except Exception as e:
if not tx.closed():
tx.rollback()
# propagate the exception back to api
raise e
finally:
tx.close()

@timer_with_counter
def delete_table_relation_by_user(self, *,
table_uri: str,
user_email: str,
relation_type: UserResourceRel) -> None:
"""
Delete the relationship between user and resources.
:param table_uri:
:param user_email:
:param relation_type:
:return:
"""
relation, reverse_relation = self._get_relation_by_type(relation_type)

user_email = 'key: "{user_email}"'.format(user_email=user_email)
tbl_key = 'key: "{tbl_key}"'.format(tbl_key=table_uri)

delete_query = textwrap.dedent("""
MATCH (n1:User {{{user_email}}})-[r1:{relation}]->
(n2:Table {{{tbl_key}}})-[r2:{reverse_relation}]->(n1) DELETE r1,r2
""").format(user_email=user_email,
tbl_key=tbl_key,
relation=relation,
reverse_relation=reverse_relation)

try:
tx = self._driver.session().begin_transaction()
tx.run(delete_query, {})
tx.commit()
except Exception as e:
# propagate the exception back to api
if not tx.closed():
tx.rollback()
raise e
finally:
tx.close()


_neo4j_proxy = None
_neo4j_lock = Lock()
Expand Down
4 changes: 4 additions & 0 deletions metadata_service/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from collections import namedtuple


UserResourceRel = namedtuple('UserResourceRel', 'follow, own, read')
Loading

0 comments on commit 7ab2116

Please sign in to comment.