-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
executable file
·131 lines (108 loc) · 3.41 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python3
import json
import os
import asana
from asana.session import AsanaOAuth2Session
import urwid
import secrets
from ui.auth import AuthPrompt
from ui.constants import palette
from ui.ui import Ui, loading
from asana_service import AsanaService
class NotAuthedException(Exception):
def __init__(self):
super(NotAuthedException, self)
class CmdAsana(object):
loop = None
nav_stack = []
"""Try to get an Asana client using stored tokens
Raises:
NotAuthedException: the user has not authorized the app
"""
def get_client(self):
try:
f = open('.oauth', 'r')
auth_json = f.read()
f.close()
token = json.loads(auth_json)
self.client = asana.Client.oauth(
client_id=secrets.CLIENT_ID,
client_secret=secrets.CLIENT_SECRET,
token=token,
token_updater=self.save_token,
auto_refresh_url=AsanaOAuth2Session.token_url,
auto_refresh_kwargs={
'client_id': secrets.CLIENT_ID,
'client_secret': secrets.CLIENT_SECRET
}
)
except IOError:
raise NotAuthedException()
def authorize(self):
self.client = asana.Client.oauth(
client_id=secrets.CLIENT_ID,
client_secret=secrets.CLIENT_SECRET,
redirect_uri='urn:ietf:wg:oauth:2.0:oob',
token_updater=self.save_token,
auto_refresh_url=AsanaOAuth2Session.token_url,
auto_refresh_kwargs={
'client_id': secrets.CLIENT_ID,
'client_secret': secrets.CLIENT_SECRET
}
)
(url, _) = self.client.session.authorization_url()
auth = AuthPrompt(url, self.auth_callback)
try:
import webbrowser
webbrowser.open(url)
except Exception:
pass
self.loop = urwid.MainLoop(
auth.component(),
unhandled_input=self.exit_handler,
palette=palette
)
self.loop.run()
def auth_callback(self, code):
self.save_token(
self.client.session.fetch_token(code=code))
raise urwid.ExitMainLoop()
def exit_handler(self, key):
if key in ('q', 'Q', 'esc'):
raise urwid.ExitMainLoop()
if key == 'backspace':
self.ui.go_back()
def get_asana_service(self):
self.asana_service = AsanaService(self.client)
def get_ui(self):
self.ui = Ui(self.asana_service, self.update)
def run(self):
self.placeholder = urwid.WidgetPlaceholder(loading())
self.loop = urwid.MainLoop(
self.placeholder,
unhandled_input=self.exit_handler,
palette=palette
)
self.ui.my_tasks()
def update(self, widget):
self.loop.widget.original_widget = widget
try:
self.loop.draw_screen()
except Exception:
self.loop.run()
def save_token(self, token):
f = open('.oauth', 'w')
f.write(json.dumps(token))
f.close()
os.chmod('.oauth', 0o600)
def main():
cmd_asana = CmdAsana()
try:
cmd_asana.get_client()
except NotAuthedException:
cmd_asana.authorize()
cmd_asana.get_asana_service()
cmd_asana.get_ui()
cmd_asana.run()
if __name__ == "__main__":
main()