from __future__ import annotations
import logging
import os
import time
from ev.client import Client
from ev.models import (
CreateRunRequest,
CreateRunResponse,
FileEntrypoint,
FunctionEntrypoint,
GitSource,
RunEntrypoint,
RunEnvironment,
RunSource,
RunStatus,
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)
def main() -> int:
# Authenticate with your access token
access_token = os.environ["DAFT_CLOUD_ACCESS_TOKEN"]
# Create a client pointing to Daft Cloud
client = Client("https://api.daft.ai", access_token)
# Get your workspace and project IDs from the Daft Cloud UI.
# To get your Workspace ID, go to https://cloud.daft.ai/settings
# To get your Project ID, select your project from https://cloud.daft.ai/projects,
# then go to Settings.
workspace_id = "your-workspace-id"
project_id = "your-project-id"
# Create a run request
response: CreateRunResponse = client.create_run(
workspace_id,
project_id,
CreateRunRequest(
# Specify what to run
entrypoint=RunEntrypoint({
"function": FunctionEntrypoint(
module="pipeline.py",
symbol="my_function",
args=[], # Positional arguments
kwargs={}, # Keyword arguments
)
}),
# Specify where the code lives
source=RunSource({
"git": GitSource(
remote="https://github.com/your-org/your-repo",
hash="HEAD", # or specific commit hash
private=False,
)
}),
# Configure the runtime environment
environment=RunEnvironment(
python_version="3.12",
dependencies=[], # e.g., ["pandas", "numpy>=1.24.0"]
environment_variables={}, # Additional env vars
),
secrets={}, # Reference project secrets by name
),
)
# Monitor the run
run_id = response.run.id
run_url = f"{client.endpoint.replace('.api.', '.cloud.')}/~/{run_id}"
logger.info("Created run: %s", run_url)
logger.info("Status: %s", response.run.status.value)
# Poll for completion
status = response.run.status
while not status.is_complete():
time.sleep(1)
status = client.get_run_status(workspace_id, project_id, run_id)
logger.debug("Status: %s", status.value)
logger.info("Run completed with status: %s", status.value)
# Return exit code based on result
if status == RunStatus.SUCCEEDED:
logger.info("Run succeeded!")
return 0
else:
logger.error("Run failed with status: %s", status.value)
return 1
if __name__ == "__main__":
exit(main())