Skip to main content
Geneva automatically packages and deploys your Python execution environment to its worker nodes. This ensures that distributed execution occurs in the same environment and dependencies as your prototype. We currently support one processing backend: Ray. There are 3 ways to connect to a Ray cluster:
  1. Local Ray
  2. KubeRay: create a cluster on demand in your Kubernetes cluster.
  3. Existing Ray Cluster

Ray Clusters

Local Ray

To execute jobs without an external Ray cluster, you can just trigger the Table.backfill method. This will auto-create a Ray cluster on your machine. Because it’s on your laptop/desktop, this is only suitable for prototyping on small datasets. But it is the easiest way to get started. Simply define the UDF, add a column, and trigger the job:
@udf
def filename_len(filename: str) -> int:
    return len(filename)

tbl.add_columns({"filename_len": filename_len})
tbl.backfill("filename_len")
Geneva will package up your local environment and send it to each worker node, so they’ll have access to all the same dependencies as if you ran a simple Python script yourself.

KubeRay

If you have a Kubernetes cluster with kuberay-operator, you can use Geneva to automatically provision RayClusters. To do so, define a Geneva cluster, representing the resource needs, Docker images, and other Ray configurations necessary to run your job. Make sure your cluster has adequate compute resources to provision the RayCluster. Here is an example Geneva cluster definition:
import sys
import ray
import geneva
from geneva.cluster.builder import GenevaClusterBuilder
from geneva.cluster import K8sConfigMethod
from geneva.runners.ray.raycluster import get_ray_image

db = geneva.connect("s3://my-bucket/my-db")

ray_version = ray.__version__
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
cluster_name = "my-geneva-cluster" # lowercase, numbers, hyphens only
service_account = "my_k8s_service_account" # k8s service account bound geneva runs as
k8s_namespace = "geneva"  # k8s namespace

cluster = (
    GenevaClusterBuilder()
        .name(cluster_name)
        .namespace(k8s_namespace)
        .portforwarding(True) # required for kuberay to expose ray ports
        .aws_config(region="us-east-1") # only required if using AWS
        .config_method(K8sConfigMethod.LOCAL) # Load k8s config from `~/.kube.config`
        # (other options include EKS_AUTH to load from AWS EKS, or IN_CLUSTER to load the
        # config when running inside a pod in the cluster)
        .head_group(
            service_account=service_account,
            cpus=2,
            node_selector={"geneva.lancedb.com/ray-head":""}, # k8s label required for head in your cluster
        )
        .add_cpu_worker_group(
            cpus=4,
            memory="8Gi",
            service_account=service_account,
            node_selector={"geneva.lancedb.com/ray-worker-cpu":""}, # k8s label for cpu worker in your cluster
        )
        .add_gpu_worker_group(
            cpus=2,
            memory="8Gi",
            gpus=1,
            service_account=service_account,
            image=get_ray_image(ray_version, python_version, gpu=True), # Note the GPU image here
            node_selector={"geneva.lancedb.com/ray-worker-gpu":""}, # k8s label for gpu worker in your cluster
        )
        .build()
)

db.define_cluster(cluster_name, cluster)
# define_cluster stores the cluster metadata in persistent storage. The Cluster can then be referenced by name and provisioned when creating an execution context.

table = db.get_table("my_table")
with db.context(cluster=cluster_name):
    table.backfill("my_udf")

External Ray cluster

If you already have a Ray cluster, Geneva can execute jobs against it too. You do so by defining a Geneva cluster which has the address of the cluster. Here’s an example:
import geneva
from geneva.cluster.builder import GenevaClusterBuilder
from geneva.cluster import GenevaClusterType

db = geneva.connect(my_db_uri)
cluster_name = "my-geneva-external-cluster"

cluster = (
    GenevaClusterBuilder()
    .name(cluster_name)
    .cluster_type(GenevaClusterType.EXTERNAL_RAY)
    .ray_address("ray://my_ip:my_port")
    .portforwarding(False) # This must be False when using an external Ray cluster
    .build()
)
db.define_cluster(cluster_name, cluster)

Dependencies

Most UDFs require some dependencies: helper libraries like pillow for image processing, pre-trained models like open-clip to calculate embeddings, or even small config files. We have two ways to get them to workers:
  1. Use defaults
  2. Define a manifest

Use Defaults

By default, LanceDB packages your local environment and sends it to Ray workers. This includes your local Python site-packages (defined by site.getsitepackages()) and either the current workspace root (if you’re in a python repo) or the current working directory (if you’re not). If you don’t explicitly define a manifest, this is what will happen.

Define a Manifest

Sometimes you need more control over what the workers get access to. For example:
  • you might need to include files from another directory, or another python package
  • you might not want to send all your local dependencies (if your repo has lots of dependencies but your UDF will only need a few)
  • you might need packages to be built separately for the worker’s architecture (for example, you can’t build pyarrow on a Mac and run it on a Linux Ray worker).
  • you might want to reuse dependencies between two backfill jobs, so you know they are running with the same environment.
For these use cases, you can define a Manifest. Calling define_manifest() packages files in the local environment and stores the Manifest metadata and files in persistent storage. The Manifest can then be referenced by name, shared, and reused.
from geneva.manifest.builder import GenevaManifestBuilder

db = geneva.connect(my_db_uri)

manifest_name="dev-manifest"
manifest = (
    GenevaManifestBuilder()
        .name(manifest_name)
        .skip_site_packages(False)
        .pip(["lancedb", "numpy"])
        .py_modules(["my_module"])
    ).build()

db.define_manifest(manifest_name, manifest)
What’s in a manifest and how can you define it? (methods are all on GenevaManifestBuilder)
ContentsHow you can define it
Local python packagesWill be uploaded automatically, unless you set .skip_site_packages(True).
Local working directory (or workspace root, if in a python repo)Will be uploaded automatically.
Python packages to be installed with pipUse .pip(packages: list[str]) or .add_pip(package: str). See Ray’s RuntimeEnv docs for details.
Local python packages outside of site_packagesUse .py_modules(modules: list[str]) or .add_py_module(module: str). See Ray’s RuntimeEnv docs for details.
Container image for head nodeUse .head_image(head_image: str) or default_head_image() to use the default. Note that, if the image is also defined in the GenevaCluster, the image set here in the Manifest will take priority.
Container image for worker nodesUse .worker_image(worker_image: str) or default_worker_image() to use the default for the current platform. As with the head image, this takes priority over any images set in the Cluster.
If you want to see exactly what is being uploaded to the cluster, set .delete_local_zips(False) and .local_zip_output_dir(path) then examine the zip files in path.

Putting it all together: Execution Contexts

An execution context represents the concrete execution environment (Cluster and Manifest) used to execute a distributed job. Calling context will enter a context manager that will provision an execution cluster and execute the Job using the Cluster and Manifest definitions provided. Because you’ve already defined the cluster and manifest, you can just reference them by name. Note that providing a manifest is optional. Once completed, the context manager will automatically de-provision the cluster.
db = geneva.connect(my_db_uri)
tbl = db.get_table("my_table")

# Providing a manifest is optional; if not provided, it will work as described in "Use defaults" above.
with db.context(cluster=cluster_name, manifest=manifest_name):
    tbl.backfill("embedding")
In a notebook environment, you can manually enter and exit the context manager in multiple steps like so:
ctx = db.context(cluster=cluster_name, manifest=manifest_name)
ctx.__enter()__

# ... do stuff

ctx.__exit__(None,None,None)