-
Notifications
You must be signed in to change notification settings - Fork 6
/
manageHostTopic.py
142 lines (118 loc) · 4.99 KB
/
manageHostTopic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import argparse
import sys
from google.cloud import pubsub_v1
def list_vm_topic_indices(publisher, args):
def parse_topic(topic):
prefix = f"projects/{args.project_id}/topics/"
if not topic.name.startswith(prefix):
print("invalid topic name")
return None
return topic.name[len(prefix) :]
indices = set()
for topic in publisher.list_topics(
request={"project": f"projects/{args.project_id}"}
):
topic_name = parse_topic(topic)
if topic_name and topic_name.startswith(args.vm_topic_prefix):
index = int(topic_name[len(args.vm_topic_prefix) :])
indices.add(index)
return indices
def list_vm_subscription_indices(subscriber, args):
def parse_subscription(subscription):
prefix = f"projects/{args.project_id}/subscriptions/"
if not subscription.name.startswith(prefix):
print("invalid subscription name")
return None
return subscription.name[len(prefix) :]
indices = set()
for subscription in subscriber.list_subscriptions(
request={"project": f"projects/{args.project_id}"}
):
sub_name = parse_subscription(subscription)
if sub_name and sub_name.startswith(args.vm_subscriber_prefix):
index = int(sub_name[len(args.vm_subscriber_prefix) :])
indices.add(index)
return indices
def smallest_empty_index(publisher, subscriber, args):
vm_topic_indices = list_vm_topic_indices(publisher, args)
vm_subscription_indices = list_vm_subscription_indices(subscriber, args)
indices = vm_topic_indices.union(vm_subscription_indices)
cur = 0
while True:
if cur not in indices:
return cur
cur += 1
def create_vm_topic(publisher, topic_id, args):
topic_path = publisher.topic_path(args.project_id, topic_id)
print("Creating topic...")
topic = publisher.create_topic(request={"name": topic_path})
print(f"Created topic: {topic}")
return topic_path
def delete_vm_topic(publisher, topic_id, args):
topic_path = publisher.topic_path(args.project_id, topic_id)
print("Deleting topic...")
publisher.delete_topic(request={"topic": topic_path})
print(f"Deleted topic: {topic_path}")
def create_subscriber(subscriber, subscription_id, topic_path, args):
subscription_path = subscriber.subscription_path(args.project_id, subscription_id)
with subscriber:
print("Creating subscription...")
subscription = subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)
print(f"Created subscription: {subscription}")
def delete_subscriber(subscriber, subscription_id, args):
subscription_path = subscriber.subscription_path(args.project_id, subscription_id)
with subscriber:
print("Deleting subscription...")
subscriber.delete_subscription(request={"subscription": subscription_path})
print(f"Deleted subscription: {subscription_path}")
def check_args(args):
if not args.create and not args.delete:
print("Specify either --create or --delete flag to indicate mode.")
sys.exit(1)
if args.create and args.delete:
print("Specify either --create or --delete flag to indicate mode.")
sys.exit(1)
if args.delete and args.index is None:
print("Specify --index value to indicate which VM topic & subscriber to delete")
sys.exit(1)
if args.create and args.index is not None:
print("Do not specify --index value when creating VM topic & subscriber")
sys.exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Creates VM topic and subscriber.")
parser.add_argument(
"--create", action="store_true", help="Create VM topic and subscriber"
)
parser.add_argument(
"--delete", action="store_true", help="Create VM topic and subscriber"
)
parser.add_argument("--project_id", type=str, required=True, help="GCP project id")
parser.add_argument(
"--vm_topic_prefix", type=str, default="vmTopic", help="Topic prefix"
)
parser.add_argument(
"--vm_subscriber_prefix",
type=str,
default="vmSubscriber",
help="Subscriber prefix",
)
parser.add_argument(
"--index",
type=int,
help='"index" of VM topic/subscriber to delete. ex) 3 if VM topic is vmTopic3.',
)
args = parser.parse_args()
check_args(args)
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
if args.create:
index = smallest_empty_index(publisher, subscriber, args)
topic_path = create_vm_topic(publisher, f"{args.vm_topic_prefix}{index}", args)
create_subscriber(
subscriber, f"{args.vm_subscriber_prefix}{index}", topic_path, args
)
elif args.delete:
delete_vm_topic(publisher, f"{args.vm_topic_prefix}{args.index}", args)
delete_subscriber(subscriber, f"{args.vm_subscriber_prefix}{args.index}", args)