• Что бы вступить в ряды "Принятый кодер" Вам нужно:
    Написать 10 полезных сообщений или тем и Получить 10 симпатий.
    Для того кто не хочет терять время,может пожертвовать средства для поддержки сервеса, и вступить в ряды VIP на месяц, дополнительная информация в лс.

  • Пользаватели которые будут спамить, уходят в бан без предупреждения. Спам сообщения определяется администрацией и модератором.

  • Гость, Что бы Вы хотели увидеть на нашем Форуме? Изложить свои идеи и пожелания по улучшению форума Вы можете поделиться с нами здесь. ----> Перейдите сюда
  • Все пользователи не прошедшие проверку электронной почты будут заблокированы. Все вопросы с разблокировкой обращайтесь по адресу электронной почте : info@guardianelinks.com . Не пришло сообщение о проверке или о сбросе также сообщите нам.

Using Pub/Sub to Build a Serverless Async Processing Pipeline on GCP

Lomanu4 Оффлайн

Lomanu4

Команда форума
Администратор
Регистрация
1 Мар 2015
Сообщения
1,481
Баллы
155
Google Cloud Run functions seem like the godsend of serverless computing until you hit reach the many different limitations all ultimately related to runtime. I'm talking about the max timeout in functions themselves (

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

), or other products such as API Gateway (

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

).

This is the problem I faced at work where we had a long-running computation process that fires off as a result of an API call from a user. Sure, we could set the max deadline to 1 hour and have the user wait for 60 mins just to get a response, not. Plus, as I mentioned, API Gateway caps it at 5 minutes anyway.

This is an obvious use case for asynchronous processing with the ability to start a process, return a response, and keep it running...

... which is something not supported inherently in a serverless function which ceases running once returning a response.

Pub/Sub to the rescue



Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.


My solution: pub/sub messaging

Contrainsted to our serverless setup, one thing I've learnt about Google Cloud is that the limitations of one GCP product generally can be solved by combining with another.

Thus, my multi-step solution to a fully serverless, async processing pipeline with status updates.


Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



The flow is as follows:

1. Client calls an API to start the job

  • This API generates a unique ID
  • Pubsub message is published with this unique ID, and any other attributes
  • Return the unique ID to the client

def publish_to_pubsub(data, attributes=None):
publisher = pubsub_v1.PublisherClient()
topic_name = 'projects/myproject/topics/async-messages'
#convert to bytes
data_bytes = json.dumps(data).encode('utf-8')
#publish to topic
future = publisher.publish(topic_name, data=data_bytes, **attributes)
future.result() #wait for msg to be published

2. The pubsub topic has push subscribers, optionally with attribute filters. I use this to re-use the same topic for different async processes.

  • Receive the message and a subscriber cloud function runs
  • Check if the long-running process has started based on provided ID
  • If not started, make a HTTPS request to start the long-running cloud function, but don't wait for a resposne
  • We update the database with our ID to mark the function has started

gcloud pubsub subscriptions create EXAMPLE_WORKFLOW --topic=async-messages --push-endpoint=

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

--ack-deadline=10 --push-auth-service-account=pubsub-push-sa@myproject.iam.gserviceaccount.com --message-filter="attributes.workflow=\"Example Workflow\"" --dead-letter-topic=dead-letter-topic --max-delivery-attempts=5


from multiprocessing import Process

def send_request(url, body):
authenticated_post_request(url=url, json_payload=body)

process = Process(target=send_request, args=(url, body))
process.start()

process.join(5) #cancel process
if process.is_alive():
process.terminate()
At this moment, our long-running process has started, essentially on its own in the wide galactic space of Google's servers with no client to receive its response ?
3. Long running function does our actual processing

  • This is really just another cloud function now, but our only time constraint is the timeout of cloud functions themself as there is no client listening to a resposne
  • If for whatever reason you need a longer timeout (maybe assess why you're using serverless ?) you could chain the process of starting and abandoning cloud functions, so long as you continue to pass the id and any requried data
  • Periodically update the status of the process to provide incremental updates, via the database
    • At this point, you could build more robust re-try handling by adding deadlines for certain status changes & resumability

4. We have another API endpoint to GET the current status.
This is just a database query based on the ID we provided back to the client


Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.



Idempotency ?


By using pub/sub, a key requirement is to ensure idempotency as messages are often sent multiple times. This is why the subcriber function must update the database to indicate the job has started.

Retries ?


Pub/sub also has a retry-backoff functionality. A downside of this set-up is that our long-running function does not benefit from this as it is only called once from the subscriber. However any logic we put into the subscriber function can throw an error to result in an unacked message.

So in summary,


Could we not have just started the long-running function from the initial API call?

Yes of course.

The reason I opted to use pub-sub is primarily to allow multiple subscriber functions to fire from messages. By using

Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

, you can what would have been a long-running process into several smaller ones to run in parallel.

After all, with serverless, we want to scale horizontally first wherever possible :)


Пожалуйста Авторизируйтесь или Зарегистрируйтесь для просмотра скрытого текста.

 
Вверх Снизу