Execute on Demand

Execute on Demand

There are many cases where you only want the job to execute when a certain condition is met, such as a file being present or a certain time of day. In these cases you can leverage the rigor loader function to first check for the condition and then allow a runner to be executed.

This is possible through the loader function mechanism and returning an empty array for runner_inputs, this will mean the runner is not be executed. It is also worth noting that if you do not need the runner job to be parallelised, then you can explicitly return the jobs key with a value of 1. This will ensure only 1 runner is spun up even if runner_inputs array contains multiple variables.

On Demand Parallelised

In this example, we check the API for any reports which are available to be processed, if there are none, we return an empty array which ceases the overall execution.

prepared_reports_fetch_parallel.py
import requests
import os
 
def loader():
 
    response = requests.get('https://mydomain.com/api/v1/reports')
    data = response.json()
 
    if len(data['reports_ready']) == 0:
        return {
            'runner_inputs': []
        }
    else:
        return {
            'runner_inputs': data['reports_ready'],
        }
 
def runner(args):
    print('Reports processor started')
 
    report_to_process = args[int(os.environ['TASK_INDEX'])]
 
    print('I am processing report: ' + report_to_process)

On Demand Single Node

This example is slightly different by explicitly specifying the jobs key with a value of 1 (not dynamically matched to the length of the runner_inputs array)

prepared_reports_fetch_sequential.py
import requests
import os
 
def loader():
 
    response = requests.get('https://mydomain.com/api/v1/reports')
    data = response.json()
 
    if len(data['reports_ready']) == 0:
        return {
            'runner_inputs': []
        }
    else:
        return {
            'runner_inputs': data['reports_ready'],
            'jobs': 1
        }
 
def runner(args):
    print('Reports processor started')
 
    report_to_process = args[int(os.environ['TASK_INDEX'])]
 
    print('I am processing report: ' + report_to_process)