Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAUTHBEARER compatibility with GCN #203

Open
2 tasks
joshuarwood opened this issue Jul 13, 2023 · 1 comment
Open
2 tasks

OAUTHBEARER compatibility with GCN #203

joshuarwood opened this issue Jul 13, 2023 · 1 comment
Assignees

Comments

@joshuarwood
Copy link

Description

I have some collaborators who tried to use a hop consumer client to consume messages from the new GCN kafka servers. In theory this should work because both hop-client and gcn_kafka clients are based on confluent_kafka. However, my collaborators were unable to establish a connection using the standard stream example:

from hop import stream

with stream.open("kafka://kafka.gcn.nasa.gov:9092/igwn.gwalert", "r") as s:
    for message in s:
         print(message.content)

After doing some digging, it looks like the Stream class doesn't allow users to setup the correct OAUTHBEARER authorization method with the client_id and client_secret needed to connect to the GCN servers. This should be possible because I am able to create a hop Consumer as long as I manually pass the appropriate configuration settings:

from adc import auth, kafka
from hop.auth import Auth
from hop.io import Consumer

from authlib.integrations.requests_client import OAuth2Session

# credentials
client_id = "client_id_hash"
client_secret = "client_secret_hash"

# broker addresses and topic from url
domain = "gcn.nasa.gov"
topic = "igwn.gwalert"
url = f"kafka://kafka.{domain}:9092/{topic}"
_, broker_addresses, topics = kafka.parse_kafka_url(url)

# create authorization used to establish connection with GCN servers
a = Auth(None, None, method=auth.SASLMethod.OAUTHBEARER)
# remove config fields not used by OAUTHBEARER authorization
a._config.pop("sasl.username")
a._config.pop("sasl.password")
# add config fields used by GCN kafka clients
a._config["sasl.oauthbearer.method"] = "oidc"
a._config["sasl.oauthbearer.client.id"] = client_id
a._config["sasl.oauthbearer.client.secret"] = client_secret
a._config["sasl.oauthbearer.token.endpoint.url"] = f"https://auth.{domain}/oauth2/token"

session = OAuth2Session(client_id, client_secret, scope=None)
token_endpoint = a._config["sasl.oauthbearer.token.endpoint.url"]

# function to perform the GCN session authorization
def oauth_cb(*_, **__):
    token = session.fetch_token(token_endpoint, grant_type="client_credentials")
    return token["access_token"], token["expires_at"]

a._config["oauth_cb"] = oauth_cb

# create a consumer instance
c = Consumer(
    group_id="test",
    broker_addresses=broker_addresses,
    topics=topics,
    auth=a)

# read messages
for message in c:
    print(message.content)

I used the igwn.gwalert topic name here because it's also available form SCIMMA servers. It can be used to check both systems. My collaborators are reading igwn.gwalerts from SCIMMA servers and need to compare them to notices sent through the GCN servers. They would like to use hop clients for both types of notices to streamline their code.

Definition of Done

Have a more straightforward method for connecting to GCN servers using a stream instance. Should not need to pop username and password fields to suppress warnings in the authorization config. Should have a high level method that can easily setup the full GCN server config without needing to manually enter everything.

Tasks/Items:

  • Update Stream class to allow for OAUTHBEARER authorizations with client_id/client_secret instead of username/password
  • Have a way to provide default config settings that are matched to GCN servers
@cnweaver
Copy link
Contributor

cnweaver commented Jul 13, 2023

This is supposed to work out of the box, and it seems to for me. Setting up my credential and subscribing from the shell:

$ hop version                                                                       
hop-client==0.8.0
adc_streaming==2.3.1
confluent_kafka==1.9.2
librdkafka==1.9.2
$ hop auth add
2023-07-13 01:04:04,091 | hop : INFO : Generating configuration with user-specified username + password
Username: <client id>
Password: <client secret>
Hostname (may be empty): kafka.gcn.nasa.gov
Token endpoint (empty if not applicable): https://auth.gcn.nasa.gov/oauth2/token
2023-07-13 01:04:41,833 | hop : INFO : Wrote configuration to: /Users/christopher/.config/hop/auth.toml
$ hop subscribe -s EARLIEST kafka://kafka.gcn.nasa.gov/igwn.gwalert
2023-07-13 01:06:36,475 | hop : INFO : group ID not specified, generating a random group ID: 3l6ihk9icb23b1lqq7uhe955fu-2IDFM2ERI6
2023-07-13 01:06:36,475 | hop : INFO : connecting to kafka://kafka.gcn.nasa.gov
2023-07-13 01:06:36,815 | hop : INFO : subscribing to topics: ['igwn.gwalert']
2023-07-13 01:06:37,288 | hop : INFO : processing messages from stream
{'alert_type': 'PRELIMINARY', 'time_created': '2023-06-29T19:00:49Z', 'superevent_id': 'MS230629s', 'urls': {'gracedb': 'https://gracedb.ligo.org/superevents/MS230629s/view/'}, 'event': {'significant': True, 'time': '2023-06-29T18:53:22.304Z', 'far': 9.110699364861297e-14, 'instruments': ['H1', 'L1'], 'group': 'CBC', 'pipeline': 'gstlal', 'search': 'MDC', 'properties': {'HasNS': 1.0, 'HasRemnant': 1.0, 'HasMassGap': 0.0}, 'classification': {'BNS': 0.9999968446698687, 'NSBH': 0.0, 'BBH': 0.0, 'Terrestrial': 3.1553301313647424e-06}, 'duration': None, 'central_frequency': None, 'skymap':. . . 

Equivalents as python scripts:

# With the previous credential pulled from the standard credential store
with hop.stream.open("kafka://kafka.gcn.nasa.gov/igwn.gwalert", "r") as s:
    for message in s:
         print(message.content)

# With a directly constructed credential:
my_auth = hop.auth.Auth("<client id>",
                        "<client secret>",
                        method=hop.auth.SASLMethod.OAUTHBEARER,
                        token_endpoint="https://auth.gcn.nasa.gov/oauth2/token")
with hop.Stream(auth=my_auth).open("kafka://kafka.gcn.nasa.gov/igwn.gwalert", "r") as s:
    for message in s:
         print(message.content)

Do one or more of these methods not work for you, and if so, what output do you get from hop version?

@cnweaver cnweaver changed the title User Story issue OAUTHBEARER compatibility with GCN Jul 13, 2023
@cnweaver cnweaver self-assigned this Jul 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants