forked from Nukesor/ultimate-poll-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
bff35a5
commit 9babb70
Showing
4 changed files
with
194 additions
and
132 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
class PollOption: | ||
Yes = "Да" | ||
No = "Нет" | ||
Acknowledge = "Воздержаться" | ||
|
||
ALL_CASES = [Yes, No, Acknowledge] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,152 +1,53 @@ | ||
import traceback | ||
from typing import Optional | ||
|
||
import telegram | ||
from dateutil import parser | ||
from flask import request | ||
from flask_restful import Resource, marshal_with, fields, abort | ||
from sqlalchemy import select | ||
from sqlalchemy.orm import Session | ||
from flask_restful import Resource, abort, marshal_with, fields | ||
|
||
from pollbot.config import config | ||
from pollbot.db import get_session | ||
from pollbot.display.poll.compilation import get_poll_text_and_vote_keyboard | ||
from pollbot.enums import ReferenceType, PollType | ||
from pollbot.models import Poll, User, Reference | ||
from pollbot.poll.option import add_option | ||
from pollbot.models import Poll | ||
from resources.helpers.option import PollOption | ||
|
||
|
||
vote_model = { | ||
'yes': fields.Integer, | ||
'no': fields.Integer, | ||
'acknowledge': fields.Integer | ||
} | ||
|
||
poll_model = { | ||
'id': fields.Integer, | ||
'chat_id': fields.Integer, | ||
'poll_message_id': fields.Integer, | ||
'discussion_message_id': fields.Integer, | ||
'votes': fields.Nested(vote_model) | ||
} | ||
|
||
|
||
class PollApi(Resource): | ||
@marshal_with(poll_model) | ||
def post(self): | ||
def get(self, poll_id: int): | ||
session = get_session() | ||
request_body = request.get_json() | ||
api_config = config['api'] | ||
|
||
try: | ||
stmt = select(User).where(User.username == api_config['admin']) | ||
|
||
user = session.scalar(stmt) | ||
user.expected_input = None | ||
user.current_poll = None | ||
|
||
poll = self.create_poll( | ||
user=user, | ||
poll_name=request_body['name'], | ||
poll_description=request_body['description'] if 'description' in request_body else None, | ||
due_date_string=request_body['due_date'], | ||
session=session, | ||
) | ||
stmt = select(Poll).where(Poll.id == poll_id) | ||
poll = session.scalar(stmt) | ||
|
||
reference = Reference(poll, ReferenceType.api.name, user=user, chat_id=api_config['seeders_channel_id']) | ||
session.add(reference) | ||
session.commit() | ||
if poll is None: | ||
abort(404, message='Not found') | ||
|
||
poll_message_id, discussion_message_id = self.send_message_to_channel(seeders_channel_id=api_config['seeders_channel_id'], reference=reference, session=session) | ||
except: | ||
traceback.print_exc() | ||
yesOptionsCount = 0 | ||
noOptionsCount = 0 | ||
acknowledgeOptionsCount = 0 | ||
|
||
session.delete(poll) | ||
session.commit() | ||
|
||
abort(404, message='Something went wrong...') | ||
for vote in poll.votes: | ||
match vote.option.name: | ||
case PollOption.Yes: | ||
yesOptionsCount += 1 | ||
case PollOption.No: | ||
noOptionsCount += 1 | ||
case PollOption.Acknowledge: | ||
acknowledgeOptionsCount += 1 | ||
|
||
return { | ||
'id': poll.id, | ||
'chat_id': reference.chat_id, | ||
'poll_message_id': poll_message_id, | ||
'discussion_message_id': discussion_message_id, | ||
'votes': { | ||
'yes': yesOptionsCount, | ||
'no': noOptionsCount, | ||
'acknowledge': acknowledgeOptionsCount, | ||
} | ||
}, 200 | ||
|
||
def create_poll(self, user: User, poll_name: str, poll_description: Optional[str], due_date_string: str, session: Session) -> Poll: | ||
poll = Poll(user) | ||
poll.name = poll_name | ||
poll.description = poll_description | ||
poll.locale = user.locale | ||
poll.poll_type = PollType.single_vote.name | ||
poll.number_of_votes = 0 | ||
poll.anonymous = True | ||
poll.results_visible = True | ||
poll.set_due_date(parser.parse(due_date_string)) | ||
poll.allow_new_options = False | ||
poll.allow_sharing = False | ||
poll.show_percentage = True | ||
poll.show_option_votes = True | ||
poll.european_date_format = user.european_date_format | ||
poll.permanently_summarized = False | ||
poll.compact_buttons = False | ||
poll.summarize = False | ||
poll.created = True | ||
|
||
session.add(poll) | ||
|
||
for option_to_add in ['Да', 'Нет', 'Видел']: | ||
option = add_option(poll, option_to_add, [], False) | ||
if option is None: | ||
continue | ||
|
||
session.add(option) | ||
|
||
return poll | ||
|
||
def send_message_to_channel(self, seeders_channel_id: int, reference: Reference, session: Session) -> tuple[int, int]: | ||
poll = reference.poll | ||
text, keyboard = get_poll_text_and_vote_keyboard(session, poll, user=poll.user) | ||
|
||
bot = telegram.Bot(token=config['telegram']['api_key']) | ||
|
||
poll_message = bot.send_message( | ||
text=text, | ||
chat_id=seeders_channel_id, | ||
reply_markup=keyboard, | ||
parse_mode='markdown', | ||
disable_web_page_preview=True, | ||
disable_notification=True, | ||
) | ||
reference.message_id = poll_message.message_id | ||
poll_message_url = self.create_message_url(poll_message) | ||
|
||
text = f'Тред с обсуждением этого [предложения]({poll_message_url})' | ||
discussion_message = bot.send_message( | ||
text=text, | ||
chat_id=seeders_channel_id, | ||
parse_mode='markdown', | ||
disable_web_page_preview=True, | ||
disable_notification=True, | ||
reply_to_message_id=poll_message.message_id, | ||
) | ||
discussion_message_url = self.create_message_url(discussion_message) | ||
|
||
description = f'Тред с обсуждением этого [предложения]({discussion_message_url})' | ||
if poll.description and len(poll.description) > 0: | ||
poll.description += f'\n\n{description}' | ||
else: | ||
poll.description = description | ||
|
||
session.commit() | ||
|
||
text, keyboard = get_poll_text_and_vote_keyboard(session, poll, user=poll.user) | ||
bot.edit_message_text( | ||
text=text, | ||
chat_id=seeders_channel_id, | ||
message_id=poll_message.message_id, | ||
reply_markup=keyboard, | ||
parse_mode='markdown', | ||
disable_web_page_preview=True, | ||
) | ||
|
||
return poll_message.message_id, discussion_message.message_id | ||
|
||
def create_message_url(self, message: telegram.Message): | ||
chat_id = str(message.chat_id) | ||
chat_id = chat_id.removeprefix('-100') | ||
|
||
message_id = message.message_id | ||
return f'https://t.me/c/{chat_id}/{message_id}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
import traceback | ||
from typing import Optional | ||
|
||
import telegram | ||
from dateutil import parser | ||
from flask import request | ||
from flask_restful import Resource, marshal_with, fields, abort | ||
from sqlalchemy import select | ||
from sqlalchemy.orm import Session | ||
|
||
from pollbot.config import config | ||
from pollbot.db import get_session | ||
from pollbot.display.poll.compilation import get_poll_text_and_vote_keyboard | ||
from pollbot.enums import ReferenceType, PollType | ||
from pollbot.models import Poll, User, Reference | ||
from pollbot.poll.option import add_option | ||
from resources.helpers.option import PollOption | ||
|
||
poll_list_model = { | ||
'id': fields.Integer, | ||
'chat_id': fields.Integer, | ||
'poll_message_id': fields.Integer, | ||
'discussion_message_id': fields.Integer, | ||
} | ||
|
||
|
||
class PollListApi(Resource): | ||
@marshal_with(poll_list_model) | ||
def post(self): | ||
session = get_session() | ||
request_body = request.get_json() | ||
api_config = config['api'] | ||
|
||
try: | ||
stmt = select(User).where(User.username == api_config['admin']) | ||
|
||
user = session.scalar(stmt) | ||
user.expected_input = None | ||
user.current_poll = None | ||
|
||
poll = self.create_poll( | ||
user=user, | ||
poll_name=request_body['name'], | ||
poll_description=request_body['description'] if 'description' in request_body else None, | ||
due_date_string=request_body['due_date'], | ||
session=session, | ||
) | ||
|
||
reference = Reference(poll, ReferenceType.api.name, user=user, chat_id=api_config['seeders_channel_id']) | ||
session.add(reference) | ||
session.commit() | ||
|
||
poll_message_id, discussion_message_id = self.send_message_to_channel(seeders_channel_id=api_config['seeders_channel_id'], reference=reference, session=session) | ||
except: | ||
traceback.print_exc() | ||
|
||
session.delete(poll) | ||
session.commit() | ||
|
||
abort(404, message='Something went wrong...') | ||
|
||
return { | ||
'id': poll.id, | ||
'chat_id': reference.chat_id, | ||
'poll_message_id': poll_message_id, | ||
'discussion_message_id': discussion_message_id, | ||
}, 200 | ||
|
||
def create_poll(self, user: User, poll_name: str, poll_description: Optional[str], due_date_string: str, session: Session) -> Poll: | ||
poll = Poll(user) | ||
poll.name = poll_name | ||
poll.description = poll_description | ||
poll.locale = user.locale | ||
poll.poll_type = PollType.single_vote.name | ||
poll.number_of_votes = 0 | ||
poll.anonymous = True | ||
poll.results_visible = True | ||
poll.set_due_date(parser.parse(due_date_string)) | ||
poll.allow_new_options = False | ||
poll.allow_sharing = False | ||
poll.show_percentage = True | ||
poll.show_option_votes = True | ||
poll.european_date_format = user.european_date_format | ||
poll.permanently_summarized = False | ||
poll.compact_buttons = False | ||
poll.summarize = False | ||
poll.created = True | ||
|
||
session.add(poll) | ||
|
||
for option_to_add in PollOption.ALL_CASES: | ||
option = add_option(poll, option_to_add, [], False) | ||
if option is None: | ||
continue | ||
|
||
session.add(option) | ||
|
||
return poll | ||
|
||
def send_message_to_channel(self, seeders_channel_id: int, reference: Reference, session: Session) -> tuple[int, int]: | ||
poll = reference.poll | ||
text, keyboard = get_poll_text_and_vote_keyboard(session, poll, user=poll.user) | ||
|
||
bot = telegram.Bot(token=config['telegram']['api_key']) | ||
|
||
poll_message = bot.send_message( | ||
text=text, | ||
chat_id=seeders_channel_id, | ||
reply_markup=keyboard, | ||
parse_mode='markdown', | ||
disable_web_page_preview=True, | ||
disable_notification=True, | ||
) | ||
reference.message_id = poll_message.message_id | ||
poll_message_url = self.create_message_url(poll_message) | ||
|
||
text = f'Тред с обсуждением этого [предложения]({poll_message_url})' | ||
discussion_message = bot.send_message( | ||
text=text, | ||
chat_id=seeders_channel_id, | ||
parse_mode='markdown', | ||
disable_web_page_preview=True, | ||
disable_notification=True, | ||
reply_to_message_id=poll_message.message_id, | ||
) | ||
discussion_message_url = self.create_message_url(discussion_message) | ||
|
||
description = f'Тред с обсуждением этого [предложения]({discussion_message_url})' | ||
if poll.description and len(poll.description) > 0: | ||
poll.description += f'\n\n{description}' | ||
else: | ||
poll.description = description | ||
|
||
session.commit() | ||
|
||
text, keyboard = get_poll_text_and_vote_keyboard(session, poll, user=poll.user) | ||
bot.edit_message_text( | ||
text=text, | ||
chat_id=seeders_channel_id, | ||
message_id=poll_message.message_id, | ||
reply_markup=keyboard, | ||
parse_mode='markdown', | ||
disable_web_page_preview=True, | ||
) | ||
|
||
return poll_message.message_id, discussion_message.message_id | ||
|
||
def create_message_url(self, message: telegram.Message): | ||
chat_id = str(message.chat_id) | ||
chat_id = chat_id.removeprefix('-100') | ||
|
||
message_id = message.message_id | ||
return f'https://t.me/c/{chat_id}/{message_id}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,11 @@ | ||
from flask_restful import Api | ||
from resources.health_check import HealthCheckApi | ||
from resources.poll_list import PollListApi | ||
from resources.poll import PollApi | ||
|
||
|
||
def initialize_routes(app): | ||
api = Api(app) | ||
api.add_resource(HealthCheckApi, '/healthcheck') | ||
api.add_resource(PollApi, '/poll') | ||
api.add_resource(PollListApi, '/poll') | ||
api.add_resource(PollApi, '/poll/<int:poll_id>') |