Last active
September 6, 2021 13:18
-
-
Save vinodjayachandran/213f790c8a2a789310892a2b81542b23 to your computer and use it in GitHub Desktop.
Publish and Subscribe on a topic of GCP PubSub
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from google.cloud import pubsub | |
from concurrent.futures import TimeoutError | |
project_id = "your-project-id" | |
topic_id = "topic-id-from-gcp-console" | |
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "path-to-your-credential-json-file" | |
publisher = pubsub.PublisherClient() | |
topic_path = publisher.topic_path(project_id, topic_id) | |
# Construct any sample data | |
data = u"{'name':'vinod'}" | |
# Data must be a bytestring | |
data = data.encode("utf-8") | |
# Add two attributes, origin and username, to the message | |
future = publisher.publish(topic_path, data) | |
print("Published messages.") | |
def callback(message): | |
print("Received message: {}".format(message)) | |
message.ack() | |
# Consuming Messages from PubSub via Subscriber | |
subscription_id = "subscriber-id-from-gcp-console" | |
# Number of seconds the subscriber should listen for messages | |
timeout = 5.0 | |
subscriber = pubsub.SubscriberClient() | |
# The `subscription_path` method creates a fully qualified identifier | |
# in the form `projects/{project_id}/subscriptions/{subscription_id}` | |
subscription_path = subscriber.subscription_path(project_id, subscription_id) | |
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) | |
print("Listening for messages on {}..\n".format(subscription_path)) | |
# Wrap subscriber in a 'with' block to automatically call close() when done. | |
with subscriber: | |
try: | |
# When `timeout` is not set, result() will block indefinitely, | |
# unless an exception is encountered first. | |
streaming_pull_future.result(timeout=timeout) | |
except TimeoutError: | |
streaming_pull_future.cancel() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment