One Page Quickstart

One Page Quickstart

Overview

Rigor is provides a simple platform for quickly and easily deploying custom data pipelines. It integrates with a codebase to automatically build and deploy pipelines as they are either pushed to production or being tested while in development.

This quickstart is primarily aimed at intermediate developers and does not hold your hand on every minute step along the way. Ask an AI how to complete a step if it doesn't make 100% sense to you. 😉

Quickstart Example

Ready Repo

Create or have ready a Github repo.

Connect Repo and Create Project

From the Rigor dashboard, create a new project, authorise the application to access your Github repositories, and select the repo you want to connect. Going through to the Project Overview page, you won't see much just yet! Let's change that.

Create Pipeline Test

Create both a pipeline_test.py and a pipeline_test.config.json file in your repo. As an example copy-paste the following into each. Place them in the root of your repo.

pipeline_test.py
def runner():
    print("Hello World! 🙂")
    return True
pipeline_test.config.json
{
"name": "First Rigor Pipeline",
"runFile" "pipeline_test.py",
"tasks": 1,
"timeout": 30,
"cpus": 1,
"memory": 2,
"retries": 3
}

Commit and Push Changes

Commit and push those changes to your default branch (typically main or master), this will trigger a production build of your pipeline on the Rigor platform.

Check Pipeline and Run

Go back to the Rigor app and refresh your current project (it could take a minute or two for the new build to complete!). The new pipeline job will be listed when the build is completed. Click on the pipeline and find the "execute" button, click it to watch the pipeline run! 🚀

Check Execution

The execution can be observed and the logs inspected. If all goes to plan you should see hello world in the logs 🎉

Scheduling

From the pipeline job page clicking on the schedule button will allow you to set a schedule. This will run the pipeline on a schedule of your choosing and will persist between new deployments of that job (so long as the job name does not change!). A schedule is defined using CRON syntax.

Need to Knows: Rapidfire

  • The maximum runtime of pipeline jobs is 60 minutes
  • The minimum billable increment is 15 seconds

Parallelising a Task

As a pipeline task can only run for 60 minutes it is best to try and horizontally fan out the job at hand, such that multiple workers can complete the job in parallel. The inclusion of the loader function allows you to achieve this. The loader function will run on a single CPU and can be used to fetch an array of IDs for example, this array is then passed down to the runner function which can spawn as many workers as entries in the array (up to 100).

The loader function is required to return an object/dict with the key runner_inputs. The 'runner_inputs' value needs to be an array which the runner function accesses. When parallelising, the runner function will have access to an environment variable to denote it's operational index. This can be used to access the correct value of the inputs array.

Working Loader Example

Using the pokemon API as a simple example, we can fetch a list of pokemon and then spawn a runner for each pokemon in the list.

loader_func_example.py
import requests
import os
 
def loader():
 
    response = requests.get("https://pokeapi.co/api/v2/pokemon?limit=10")
    data = response.json()
 
    results = data['results']
 
    pokemon_names = []
    for x in results:
        pokemon_names.append(x['name'])
 
    # The data returned from the API contains 10 elements in the array, this will map to 10 workers being spun up.
    return { "runner_inputs": pokemon_names }
 
 
def runner(args):
 
    task_index = os.getenv('TASK_INDEX')
    data_to_process  = args["runner_inputs"][task_index]
 
    print(f'Pipeline runner of index: {task_index} working on this data! {data_to_process}')
 
    return True