One Page Quickstart
Overview
Rigor is provides a simple platform for quickly and easily deploying custom data pipelines. It integrates with a codebase to automatically build and deploy pipelines as they are either pushed to production or being tested while in development.
This quickstart is primarily aimed at intermediate developers and does not hold your hand on every minute step along the way. Ask an AI how to complete a step if it doesn't make 100% sense to you. 😉
Quickstart Example
Ready Repo
Create or have ready a Github repo.
Connect Repo and Create Project
From the Rigor dashboard, create a new project, authorise the application to access your Github repositories, and select the repo you want to connect. Going through to the Project Overview page, you won't see much just yet! Let's change that.
Create Pipeline Test
Create both a pipeline_test.py and a pipeline_test.config.json file in your repo. As an example copy-paste the following into each. Place them in the root of your repo.
def runner():
print("Hello World! 🙂")
return True
{
"name": "First Rigor Pipeline",
"runFile" "pipeline_test.py",
"tasks": 1,
"timeout": 30,
"cpus": 1,
"memory": 2,
"retries": 3
}
Commit and Push Changes
Commit and push those changes to your default branch (typically main or master), this will trigger a production build of your pipeline on the Rigor platform.
Check Pipeline and Run
Go back to the Rigor app and refresh your current project (it could take a minute or two for the new build to complete!). The new pipeline job will be listed when the build is completed. Click on the pipeline and find the "execute" button, click it to watch the pipeline run! 🚀
Check Execution
The execution can be observed and the logs inspected. If all goes to plan you should see hello world in the logs 🎉
Scheduling
From the pipeline job page clicking on the schedule button will allow you to set a schedule. This will run the pipeline on a schedule of your choosing and will persist between new deployments of that job (so long as the job name does not change!). A schedule is defined using CRON syntax.
Need to Knows: Rapidfire
- The maximum runtime of pipeline jobs is 60 minutes
- The minimum billable increment is 15 seconds
Parallelising a Task
As a pipeline task can only run for 60 minutes it is best to try and horizontally fan out the job at hand, such that multiple workers can complete the job in parallel. The inclusion of the loader
function allows you to achieve this.
The loader function will run on a single CPU and can be used to fetch an array of IDs for example, this array is then passed down to the runner
function which can spawn as many workers as entries in the array (up to 100).
The loader
function is required to return an object/dict with the key runner_inputs
. The 'runner_inputs' value needs to be an array which the runner
function accesses. When parallelising, the runner
function will have access to an environment variable to denote it's operational index. This can be used to access the correct value of the inputs array.
Working Loader Example
Using the pokemon API as a simple example, we can fetch a list of pokemon and then spawn a runner for each pokemon in the list.
import requests
import os
def loader():
response = requests.get("https://pokeapi.co/api/v2/pokemon?limit=10")
data = response.json()
results = data['results']
pokemon_names = []
for x in results:
pokemon_names.append(x['name'])
# The data returned from the API contains 10 elements in the array, this will map to 10 workers being spun up.
return { "runner_inputs": pokemon_names }
def runner(args):
task_index = os.getenv('TASK_INDEX')
data_to_process = args["runner_inputs"][task_index]
print(f'Pipeline runner of index: {task_index} working on this data! {data_to_process}')
return True